SPRKPY1026

pyspark.sql.readwriter.DataFrameReader.csv has a workaround

Category

Warning.

Description

This issue appears when the tool detects the usage of pyspark.sql.readwriter.DataFrameReader.csv which has a workaround.

Input code:

stringmap = sparkSession.read.csv(path, schema="...", encoding="UTF-8", header=True, sep=",")

Output code:

#EWI: SPRKPY1026 => pyspark.sql.readwriter.DataFrameReader.csv has a workaround, see documentation for more info
stringmap = sparkSession.read.csv(path, schema="...", encoding="UTF-8", header=True, skipHeader=myVariable)

Scenario:

def csv(

#Path path: Union[str, List[str]],

#Schema schema: Union[pyspark.sql.types.StructType, str, None] = None,

#Options sep: Optional[str] = None, encoding: Optional[str] = None, quote: Optional[str] = None, escape: Optional[str] = None, comment: Optional[str] = None, header: Union[bool, str, None] = None, inferSchema: Union[bool, str, None] = None, ignoreLeadingWhiteSpace: Union[bool, str, None] = None, ignoreTrailingWhiteSpace: Union[bool, str, None] = None, nullValue: Optional[str] = None, nanValue: Optional[str] = None, positiveInf: Optional[str] = None, negativeInf: Optional[str] = None, dateFormat: Optional[str] = None, timestampFormat: Optional[str] = None, maxColumns: Union[str, int, None] = None, maxCharsPerColumn: Union[str, int, None] = None, maxMalformedLogPerPartition: Union[str, int, None] = None, mode: Optional[str] = None, columnNameOfCorruptRecord: Optional[str] = None, multiLine: Union[bool, str, None] = None, charToEscapeQuoteEscaping: Optional[str] = None, samplingRatio: Union[str, float, None] = None, enforceSchema: Union[bool, str, None] = None, emptyValue: Optional[str] = None, locale: Optional[str] = None, lineSep: Optional[str] = None, pathGlobFilter: Union[bool, str, None] = None, recursiveFileLookup: Union[bool, str, None] = None, modifiedBefore: Union[bool, str, None] = None, modifiedAfter: Union[bool, str, None] = None, unescapedQuoteHandling: Optional[str] = None ) Several workarounds are possible in this scenario. Path: The first parameter "path" must be a stage to make an equivalence with Snowpark, so it is recommended to implement a temporary stage and add each ".csv" path to the stage, using the prefix "file://", as follows. Source:

stringmap = sparkSession.read.csv(["./data/file1.csv", "./data/file2.csv"])

Expected:

stage = f'{sparkSession.get_fully_qualified_current_schema()}.{_generate_prefix("TEMP_STAGE")}'
sparkSession.sql(f'CREATE TEMPORARY STAGE IF NOT EXISTS {stage}').show()
sparkSession.file.put(f"file://./data/file1.csv", f"@{stage}")
sparkSession.file.put(f"file://./data/file2.csv", f"@{stage}")
stringmap = sparkSession.read.csv(stage)

Schema: The second parameter "schema" is not supported for Snowpark as a parameter, so you have to specify it; if it doesn't have an schema yet, use the schema function, as follows: Source:

stringmap = sparkSession.read.csv(path, schema)

Expected:

stringmap = sparkSession.read.schema(schema).csv(path)

Options: The additional parameters are also not supported by Snowpark as parameters, but for many of them you can use the "option" function to specify those .csv parameter as options, as follows: Source:

stringmap = sparkSession.read.csv(path, encoding="UTF-8", header=True, sep=",")

Expected:

stringmap = sparkSession.read.option("encoding", "UTF-8").option("sep", ",").csv(path)

The following options are not supported for Snowpark: quoteAll, inferSchema, enforceSchema, ignoreLeadingWhiteSpace, ignoreTrailingWhiteSpace, nullValue, nanValue, positiveInf, negativeInf, timestampNTZFormat, maxColumns, maxCharsPerColumn, mode, columnNameOfCorruptRecord, multiLine, samplingRatio, emptyValue, locale, unescapedQuoteHandling, header.

Recommendation

  • For more support, you can email us at snowconvert-info@snowflake.com. If you have a contract for support with Snowflake, reach out to your sales engineer and they can direct your support needs.

#332: [SIT-1562] SQL Readiness

Change request updated