SPRKPY1083

pyspark.sql.readwriter.DataFrameWriter.save

Message: The pyspark.sql.readwriter.DataFrameWriter.save function is not supported. A workaround is to use Snowpark DataFrameWriter copy_into_location method instead.

Category: Warning

Description

The pyspark.sql.readwriter.DataFrameWriter.save function is not supported. The workaround is to use Snowpark DataFrameWriter methods instead.

Scenarios

The spark signature for this method DataFrameWriter.save(path, format, mode, partitionBy, **options) does not exists in Snowpark. Therefore, any usage of the load function it's going to have an EWI in the output code.

Scenario 1

Input code

Below is an example that tries to save data with CSV format.

path_csv_file = "/path/to/file.csv"

data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]

df = my_session.createDataFrame(data, schema=["Name", "Age", "City"])

df.write.save(path_csv_file, format="csv")
df.write.save(path_csv_file, format="csv", mode="overwrite")
df.write.save(path_csv_file, format="csv", mode="overwrite", lineSep="\r\n", dateFormat="YYYY/MM/DD")
df.write.save(path_csv_file, format="csv", mode="overwrite", partitionBy="City", lineSep="\r\n", dateFormat="YYYY/MM/DD")

Output code

The tool adds this EWI SPRKPY1083 on the output code to let you know that this function is not supported by Snowpark, but it has a workaround.

path_csv_file = "/path/to/file.csv"

data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]

df = my_session.createDataFrame(data, schema=["Name", "Age", "City"])

#EWI: SPRKPY1083 => The pyspark.sql.readwriter.DataFrameWriter.save function is not supported. A workaround is to use Snowpark DataFrameWriter copy_into_location method instead.
df.write.save(path_csv_file, format="csv")
#EWI: SPRKPY1083 => The pyspark.sql.readwriter.DataFrameWriter.save function is not supported. A workaround is to use Snowpark DataFrameWriter copy_into_location method instead.
df.write.save(path_csv_file, format="csv", mode="overwrite")
#EWI: SPRKPY1083 => The pyspark.sql.readwriter.DataFrameWriter.save function is not supported. A workaround is to use Snowpark DataFrameWriter copy_into_location method instead.
df.write.save(path_csv_file, format="csv", mode="overwrite", lineSep="\r\n", dateFormat="YYYY/MM/DD")
#EWI: SPRKPY1083 => The pyspark.sql.readwriter.DataFrameWriter.save function is not supported. A workaround is to use Snowpark DataFrameWriter copy_into_location method instead.
df.write.save(path_csv_file, format="csv", mode="overwrite", partitionBy="City", lineSep="\r\n", dateFormat="YYYY/MM/DD")

Recommended fix

As a workaround you can use Snowpark DataFrameWriter methods instead.

  • Fixing path and format parameters:

    • Replace the load method with csv or copy_into_location method.

    • If you are using copy_into_location method, you need to specify the format with the file_format_type parameter.

    • The first parameter path must be in a stage to make an equivalence with Snowpark.

Below is an example that creates a temporal stage and put the file into it, then calls one the methods mentioned above.

data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]
df = spark.createDataFrame(data, schema=["Name", "Age", "City"])

# Stage creation

temp_stage = f'{Session.get_fully_qualified_current_schema()}.{_generate_prefix("TEMP_STAGE")}'
my_session.sql(f'CREATE TEMPORARY STAGE IF NOT EXISTS {temp_stage}').show()
my_session.file.put(f"file:///path/to/file.csv", f"@{temp_stage}")
stage_file_path = f"{temp_stage}file.csv"

# Using csv method
df.write.csv(stage_file_path)

# Using copy_into_location method
df.write.copy_into_location(stage_file_path, file_format_type="csv")

Below is an example that adds into the daisy chain the mode method with overwrite as a parameter.

data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]
df = spark.createDataFrame(data, schema=["Name", "Age", "City"])

# Using csv method
df.write.mode("overwrite").csv(temp_stage)

# Using copy_into_location method
df.write.mode("overwrite").copy_into_location(temp_stage, file_format_type="csv")
  • Fixing partitionBy parameter:

    • Use the partition_by parameter from the CSV method, as follows:

Below is an example that used the partition_by parameter from the CSV method.

data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]
df = spark.createDataFrame(data, schema=["Name", "Age", "City"])

# Using csv method
df.write.csv(temp_stage, partition_by="City")

# Using copy_into_location method
df.write.copy_into_location(temp_stage, file_format_type="csv", partition_by="City")
  • Fixing options parameter:

The options between spark and snowpark are not the same, in this case lineSep and dateFormat are replaced with RECORD_DELIMITER and DATE_FORMAT, the Additional recommendations section has table with all the Equivalences.

Below is an example that creates a dictionary with RECORD_DELIMITER and DATE_FORMAT, and calls the options method with that dictionary.

data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]
df = spark.createDataFrame(data, schema=["Name", "Age", "City"])
optionsParam = {"RECORD_DELIMITER": "\r\n", "DATE_FORMAT": "YYYY/MM/DD"}

# Using csv method
df.write.csv(stage, format_type_options=optionsParam)

# Using copy_into_location method
df.write.csv(stage, file_format_type="csv", format_type_options=optionsParam)

Scenario 2

Input code

Below is an example that tries to save data with JSON format.

path_json_file = "/path/to/file.json"

data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]

df = spark.createDataFrame(data, schema=["Name", "Age", "City"])

df.write.save(path_json_file, format="json")
df.write.save(path_json_file, format="json", mode="overwrite")
df.write.save(path_json_file, format="json", mode="overwrite", dateFormat="YYYY/MM/DD", timestampFormat="YYYY-MM-DD HH24:MI:SS.FF3")
df.write.save(path_json_file, format="json", mode="overwrite", partitionBy="City", dateFormat="YYYY/MM/DD", timestampFormat="YYYY-MM-DD HH24:MI:SS.FF3")

Output code

The tool adds this EWI SPRKPY1083 on the output code to let you know that this function is not supported by Snowpark, but it has a workaround.

path_json_file = "/path/to/file.json"

data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]

df = spark.createDataFrame(data, schema=["Name", "Age", "City"])

#EWI: SPRKPY1083 => The pyspark.sql.readwriter.DataFrameWriter.save function is not supported. A workaround is to use Snowpark DataFrameWriter copy_into_location method instead.
df.write.save(path_json_file, format="json")
#EWI: SPRKPY1083 => The pyspark.sql.readwriter.DataFrameWriter.save function is not supported. A workaround is to use Snowpark DataFrameWriter copy_into_location method instead.
df.write.save(path_json_file, format="json", mode="overwrite")
#EWI: SPRKPY1083 => The pyspark.sql.readwriter.DataFrameWriter.save function is not supported. A workaround is to use Snowpark DataFrameWriter copy_into_location method instead.
df.write.save(path_json_file, format="json", mode="overwrite", dateFormat="YYYY/MM/DD", timestampFormat="YYYY-MM-DD HH24:MI:SS.FF3")
#EWI: SPRKPY1083 => The pyspark.sql.readwriter.DataFrameWriter.save function is not supported. A workaround is to use Snowpark DataFrameWriter copy_into_location method instead.
df.write.save(path_json_file, format="json", mode="overwrite", partitionBy="City", dateFormat="YYYY/MM/DD", timestampFormat="YYYY-MM-DD HH24:MI:SS.FF3")

Recommended fix

As a workaround you can use Snowpark DataFrameReader methods instead.

  • Fixing path and format parameters:

    • Replace the load method with json or copy_into_location method

    • If you are using copy_into_location method, you need to specify the format with the file_format_type parameter.

    • The first parameter path must be in a stage to make an equivalence with Snowpark.

Below is an example that creates a temporal stage and put the file into it, then calls one the methods mentioned above.

data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]
df = spark.createDataFrame(data, schema=["Name", "Age", "City"])

# Stage creation

temp_stage = f'{Session.get_fully_qualified_current_schema()}.{_generate_prefix("TEMP_STAGE")}'
my_session.sql(f'CREATE TEMPORARY STAGE IF NOT EXISTS {temp_stage}').show()
my_session.file.put(f"file:///path/to/file.json", f"@{temp_stage}")
stage_file_path = f"{temp_stage}file.json"

# Using json method
df.write.json(stage_file_path)

# Using copy_into_location method
df.write.copy_into_location(stage_file_path, file_format_type="json")

Below is an example that adds into the daisy chain the mode method with overwrite as a parameter.

data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]
df = spark.createDataFrame(data, schema=["Name", "Age", "City"])

# Using json method
df.write.mode("overwrite").json(temp_stage)

# Using copy_into_location method
df.write.mode("overwrite").copy_into_location(temp_stage, file_format_type="json")
  • Fixing partitionBy parameter:

    • Use the partition_by parameter from the CSV method, as follows:

Below is an example that used the partition_by parameter from the CSV method.

data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]
df = spark.createDataFrame(data, schema=["Name", "Age", "City"])

# Using json method
df.write.json(temp_stage, partition_by="City")

# Using copy_into_location method
df.write.copy_into_location(temp_stage, file_format_type="json", partition_by="City")
  • Fixing options parameter:

The options between spark and snowpark are not the same, in this case dateFormat and timestampFormat are replaced with DATE_FORMAT and TIMESTAMP_FORMAT, the Additional recommendations section has table with all the Equivalences.

Below is an example that creates a dictionary with DATE_FORMAT and TIMESTAMP_FORMAT, and calls the options method with that dictionary.

data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]
df = spark.createDataFrame(data, schema=["Name", "Age", "City"])
optionsParam = {"DATE_FORMAT": "YYYY/MM/DD", "TIMESTAMP_FORMAT": "YYYY-MM-DD HH24:MI:SS.FF3"}

# Using json method
df.write.json(stage, format_type_options=optionsParam)

# Using copy_into_location method
df.write.copy_into_location(stage, file_format_type="json", format_type_options=optionsParam)

Scenario 3

Input code

Below is an example that tries to save data with PARQUET format.

path_parquet_file = "/path/to/file.parquet"

data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]

df = spark.createDataFrame(data, schema=["Name", "Age", "City"])

df.write.save(path_parquet_file, format="parquet")
df.write.save(path_parquet_file, format="parquet", mode="overwrite")
df.write.save(path_parquet_file, format="parquet", mode="overwrite", pathGlobFilter="*.parquet")
df.write.save(path_parquet_file, format="parquet", mode="overwrite", partitionBy="City", pathGlobFilter="*.parquet")

Output code

The tool adds this EWI SPRKPY1083 on the output code to let you know that this function is not supported by Snowpark, but it has a workaround.

path_parquet_file = "/path/to/file.parquet"

data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]

df = spark.createDataFrame(data, schema=["Name", "Age", "City"])

#EWI: SPRKPY1083 => The pyspark.sql.readwriter.DataFrameWriter.save function is not supported. A workaround is to use Snowpark DataFrameWriter copy_into_location method instead.
df.write.save(path_parquet_file, format="parquet")
#EWI: SPRKPY1083 => The pyspark.sql.readwriter.DataFrameWriter.save function is not supported. A workaround is to use Snowpark DataFrameWriter copy_into_location method instead.
df.write.save(path_parquet_file, format="parquet", mode="overwrite")
#EWI: SPRKPY1083 => The pyspark.sql.readwriter.DataFrameWriter.save function is not supported. A workaround is to use Snowpark DataFrameWriter copy_into_location method instead.
df.write.save(path_parquet_file, format="parquet", mode="overwrite", pathGlobFilter="*.parquet")
#EWI: SPRKPY1083 => The pyspark.sql.readwriter.DataFrameWriter.save function is not supported. A workaround is to use Snowpark DataFrameWriter copy_into_location method instead.
df.write.save(path_parquet_file, format="parquet", mode="overwrite", partitionBy="City", pathGlobFilter="*.parquet")

Recommended fix

As a workaround you can use Snowpark DataFrameReader methods instead.

  • Fixing path and format parameters:

    • Replace the load method with parquet or copy_into_location method.

    • If you are using copy_into_location method, you need to specify the format with the file_format_type parameter.

    • The first parameter path must be in a stage to make an equivalence with Snowpark.

Below is an example that creates a temporal stage and put the file into it, then calls one the methods mentioned above.

data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]
df = spark.createDataFrame(data, schema=["Name", "Age", "City"])

# Stage creation

temp_stage = f'{Session.get_fully_qualified_current_schema()}.{_generate_prefix("TEMP_STAGE")}'
my_session.sql(f'CREATE TEMPORARY STAGE IF NOT EXISTS {temp_stage}').show()
my_session.file.put(f"file:///path/to/file.parquet", f"@{temp_stage}")
stage_file_path = f"{temp_stage}file.parquet"

# Using parquet method
df.write.parquet(stage_file_path)

# Using copy_into_location method
df.write.copy_into_location(stage, file_format_type="parquet")

Below is an example that adds into the daisy chain the mode method with overwrite as a parameter.

data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]
df = spark.createDataFrame(data, schema=["Name", "Age", "City"])

# Using parquet method
df.write.mode("overwrite").parquet(temp_stage)

# Using copy_into_location method
df.write.mode("overwrite").copy_into_location(stage, file_format_type="parquet")
  • Fixing partitionBy parameter:

    • Use the partition_by parameter from the CSV method, as follows:

Below is an example that used the partition_by parameter from the parquet method.

data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]
df = spark.createDataFrame(data, schema=["Name", "Age", "City"])

# Using parquet method
df.write.parquet(temp_stage, partition_by="City")

# Using copy_into_location method
df.write.copy_into_location(stage, file_format_type="parquet", partition_by="City")
  • Fixing options parameter:

The options between spark and snowpark are not the same, in this case pathGlobFilter is replaced with PATTERN, the Additional recommendations section has table with all the Equivalences.

Below is an example that creates a dictionary with PATTERN, and calls the options method with that dictionary.

data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]
df = spark.createDataFrame(data, schema=["Name", "Age", "City"])
optionsParam = {"PATTERN": "*.parquet"}

# Using parquet method
df.write.parquet(stage, format_type_options=optionsParam)

# Using copy_into_location method
df.write.copy_into_location(stage, file_format_type="parquet", format_type_options=optionsParam)

Additional recommendations

  • Take into account the options between spark and snowpark are not the same, but they can be mapped:

  • For modifiedBefore / modifiedAfter option you can achieve the same result in Snowflake by using the metadata columns and then add a filter like: df.filter(METADATA_FILE_LAST_MODIFIED > ‘some_date’).

  • For more support, you can email us at sma-support@snowflake.com or post an issue in the SMA.

Last updated