LogoLogo
SnowflakeDocumentation Home
  • Snowpark Migration Accelerator Documentation
  • General
    • Introduction
    • Getting Started
      • Download and Access
      • Installation
        • Windows Installation
        • MacOS Installation
        • Linux Installation
    • Conversion Software Terms of Use
      • Open Source Libraries
    • Release Notes
      • Old Version Release Notes
        • SC Spark Scala Release Notes
          • Known Issues
        • SC Spark Python Release Notes
          • Known Issues
    • Roadmap
  • User Guide
    • Overview
    • Before Using the SMA
      • Supported Platforms
      • Supported Filetypes
      • Code Extraction
      • Pre-Processing Considerations
    • Project Overview
      • Project Setup
      • Configuration and Settings
      • Tool Execution
    • Assessment
      • How the Assessment Works
      • Assessment Quick Start
      • Understanding the Assessment Summary
      • Readiness Scores
      • Output Reports
        • Curated Reports
        • SMA Inventories
        • Generic Inventories
        • Assessment zip file
      • Output Logs
      • Spark Reference Categories
    • Conversion
      • How the Conversion Works
      • Conversion Quick Start
      • Conversion Setup
      • Understanding the Conversion Assessment and Reporting
      • Output Code
    • Using the SMA CLI
      • Additional Parameters
  • Use Cases
    • Assessment Walkthrough
      • Walkthrough Setup
        • Notes on Code Preparation
      • Running the Tool
      • Interpreting the Assessment Output
        • Assessment Output - In Application
        • Assessment Output - Reports Folder
      • Running the SMA Again
    • Conversion Walkthrough
    • Migration Lab
      • Compatibility and Assessment
      • Pipeline Conversion
      • Notebook Conversion
      • Conclusions
    • Sample Project
    • Using SMA with Docker
    • SMA CLI Walkthrough
    • SMA-Checkpoints Walkthrough
      • Prerequisites
      • SMA Execution Guide
        • Feature Settings
          • Default Settings
        • SMA-Checkpoints inventories
      • Snowpark-Checkpoints Execution Guide
        • Collection
        • Validation
  • Issue Analysis
    • Approach
    • Issue Code Categorization
    • Issue Codes by Source
      • General
      • Python
        • SPRKPY1000
        • SPRKPY1001
        • SPRKPY1002
        • SPRKPY1003
        • SPRKPY1004
        • SPRKPY1005
        • SPRKPY1006
        • SPRKPY1007
        • SPRKPY1008
        • SPRKPY1009
        • SPRKPY1010
        • SPRKPY1011
        • SPRKPY1012
        • SPRKPY1013
        • SPRKPY1014
        • SPRKPY1015
        • SPRKPY1016
        • SPRKPY1017
        • SPRKPY1018
        • SPRKPY1019
        • SPRKPY1020
        • SPRKPY1021
        • SPRKPY1022
        • SPRKPY1023
        • SPRKPY1024
        • SPRKPY1025
        • SPRKPY1026
        • SPRKPY1027
        • SPRKPY1028
        • SPRKPY1029
        • SPRKPY1030
        • SPRKPY1031
        • SPRKPY1032
        • SPRKPY1033
        • SPRKPY1034
        • SPRKPY1035
        • SPRKPY1036
        • SPRKPY1037
        • SPRKPY1038
        • SPRKPY1039
        • SPRKPY1040
        • SPRKPY1041
        • SPRKPY1042
        • SPRKPY1043
        • SPRKPY1044
        • SPRKPY1045
        • SPRKPY1046
        • SPRKPY1047
        • SPRKPY1048
        • SPRKPY1049
        • SPRKPY1050
        • SPRKPY1051
        • SPRKPY1052
        • SPRKPY1053
        • SPRKPY1054
        • SPRKPY1055
        • SPRKPY1056
        • SPRKPY1057
        • SPRKPY1058
        • SPRKPY1059
        • SPRKPY1060
        • SPRKPY1061
        • SPRKPY1062
        • SPRKPY1063
        • SPRKPY1064
        • SPRKPY1065
        • SPRKPY1066
        • SPRKPY1067
        • SPRKPY1068
        • SPRKPY1069
        • SPRKPY1070
        • SPRKPY1071
        • SPRKPY1072
        • SPRKPY1073
        • SPRKPY1074
        • SPRKPY1075
        • SPRKPY1076
        • SPRKPY1077
        • SPRKPY1078
        • SPRKPY1079
        • SPRKPY1080
        • SPRKPY1081
        • SPRKPY1082
        • SPRKPY1083
        • SPRKPY1084
        • SPRKPY1085
        • SPRKPY1086
        • SPRKPY1087
        • SPRKPY1088
        • SPRKPY1089
        • SPRKPY1091
        • SPRKPY1101
      • Spark Scala
        • SPRKSCL1000
        • SPRKSCL1001
        • SPRKSCL1002
        • SPRKSCL1100
        • SPRKSCL1101
        • SPRKSCL1102
        • SPRKSCL1103
        • SPRKSCL1104
        • SPRKSCL1105
        • SPRKSCL1106
        • SPRKSCL1107
        • SPRKSCL1108
        • SPRKSCL1109
        • SPRKSCL1110
        • SPRKSCL1111
        • SPRKSCL1112
        • SPRKSCL1113
        • SPRKSCL1114
        • SPRKSCL1115
        • SPRKSCL1116
        • SPRKSCL1117
        • SPRKSCL1118
        • SPRKSCL1119
        • SPRKSCL1120
        • SPRKSCL1121
        • SPRKSCL1122
        • SPRKSCL1123
        • SPRKSCL1124
        • SPRKSCL1125
        • SPRKSCL1126
        • SPRKSCL1127
        • SPRKSCL1128
        • SPRKSCL1129
        • SPRKSCL1130
        • SPRKSCL1131
        • SPRKSCL1132
        • SPRKSCL1133
        • SPRKSCL1134
        • SPRKSCL1135
        • SPRKSCL1136
        • SPRKSCL1137
        • SPRKSCL1138
        • SPRKSCL1139
        • SPRKSCL1140
        • SPRKSCL1141
        • SPRKSCL1142
        • SPRKSCL1143
        • SPRKSCL1144
        • SPRKSCL1145
        • SPRKSCL1146
        • SPRKSCL1147
        • SPRKSCL1148
        • SPRKSCL1149
        • SPRKSCL1150
        • SPRKSCL1151
        • SPRKSCL1152
        • SPRKSCL1153
        • SPRKSCL1154
        • SPRKSCL1155
        • SPRKSCL1156
        • SPRKSCL1157
        • SPRKSCL1158
        • SPRKSCL1159
        • SPRKSCL1160
        • SPRKSCL1161
        • SPRKSCL1162
        • SPRKSCL1163
        • SPRKSCL1164
        • SPRKSCL1165
        • SPRKSCL1166
        • SPRKSCL1167
        • SPRKSCL1168
        • SPRKSCL1169
        • SPRKSCL1170
        • SPRKSCL1171
        • SPRKSCL1172
        • SPRKSCL1173
        • SPRKSCL1174
        • SPRKSCL1175
      • SQL
        • SparkSQL
          • SPRKSPSQL1001
          • SPRKSPSQL1002
          • SPRKSPSQL1003
          • SPRKSPSQL1004
          • SPRKSPSQL1005
          • SPRKSPSQL1006
        • Hive
          • SPRKHVSQL1001
          • SPRKHVSQL1002
          • SPRKHVSQL1003
          • SPRKHVSQL1004
          • SPRKHVSQL1005
          • SPRKHVSQL1006
      • Pandas
        • PNDSPY1001
        • PNDSPY1002
        • PNDSPY1003
        • PNDSPY1004
      • DBX
        • SPRKDBX1000
        • SPRKDBX1001
        • SPRKDBX1002
        • SPRKDBX1003
    • Troubleshooting the Output Code
      • Locating Issues
    • Workarounds
    • Deploying the Output Code
  • Translation Reference
    • Translation Reference Overview
    • SIT Tagging
      • SQL statements
    • SQL Embedded code
    • HiveSQL
      • Supported functions
    • Spark SQL
      • Spark SQL DDL
        • Create Table
          • Using
      • Spark SQL DML
        • Merge
        • Select
          • Distinct
          • Values
          • Join
          • Where
          • Group By
          • Union
      • Spark SQL Data Types
      • Supported functions
    • DBX Notebook
      • Dbutils
        • dbutils.notebook.run
        • dbutils.notebook.exit
      • Magic commands
        • %run
  • Workspace Estimator
    • Overview
    • Getting Started
  • INTERACTIVE ASSESSMENT APPLICATION
    • Overview
    • Installation Guide
  • Support
    • General Troubleshooting
      • How do I give SMA permission to the config folder?
      • Invalid Access Code error on VDI
      • How do I give SMA permission to Documents, Desktop, and Downloads folders?
    • Frequently Asked Questions (FAQ)
      • Using SMA with Jupyter Notebooks
      • How to request an access code
      • Sharing the Output with Snowflake
      • DBC files explode
    • Glossary
    • Contact Us
Powered by GitBook
On this page
  • Resolve Issues
  • Resolving the Session Calls
  • Notes on the Session Calls
  • Resolving the Inputs and Outputs
  • Access a Locally Accessible File
  • Load the result into Snowflake
  • Move the file to make way for the next one
  • Clean up and Test
  • Running the Pipeline Script in Snowsight
  • Conclusion
  1. Use Cases
  2. Migration Lab

Pipeline Conversion

We've already converted. Let's work through the converted code.

PreviousCompatibility and AssessmentNextNotebook Conversion

Last updated 1 day ago

The SMA has “converted” our scripts, but has it really? What it has actually done is converted all references from the Spark API to the Snowpark API, but what it has not done is to replace the connections that may exist in your pipelines.

The SMA’s power is in the assessment reporting that it does as the conversion is tied to converting references from the Spark API to the Snowpark API. Note that the conversion of these references will not be enough to run any data pipeline. You will have to ensure that the pipeline’s connections are resolved manually. The SMA cannot assume to know connection parameters or other elements that are likely not available to be run through it.

As with any conversion, dealing with the converted code can be done in a variety of ways. The following steps are how we would recommend that you approach the output of the conversion tool. Like SnowConvert, the SMA requires attention to be paid to the output. No conversion will ever be 100% automated. This is particularly true for the SMA. Since the SMA is converting references from the Spark API to the Snowpark API, you will always need to check how those references are being run. It does not attempt to orchestrate the successful execution of any script or notebook run through it.

So we’ll follow these steps to work through the output of the SMA that will be slightly different than SnowConvert:

  • Resolve All Issues: “Issues” here means the issues generated by the SMA. Take a look at the output code. Resolve parsing errors and conversion errors, and investigate warnings.

  • Resolve the session calls: How the session call is written in the output code depends on where we are going to run the file. We will resolve this for running the code file(s) in the same location as they were originally going to be run, and then for running them in Snowflake.

  • Resolve the Input/Outputs: Connections to different sources cannot be resolved entirely by the SMA. There are differences in the platforms, and the SMA will usually disregard this. This also is affected by where the file is going to be run.

  • Clean up and Test! Let’s run the code. See if it works. We will be smoke testing in this lab, but there are tools to do more extensive testing and data validation including Snowpark Python Checkpoints.

So let’s take a look at what this looks like. We’re going to do this with two approaches: the first approach is to run this in Python on the local machine (as the source script is running). The second would be to do everything in Snowflake… in Snowsight, but for a data pipeline reading from a local source, this will not be 100% possible in Snowsight. That’s ok though. We are not converting the orchestration of this script in this POC.

Let’s start with the pipeline script file, and get to the notebook in the next section.

Resolve Issues

Let’s open our source and our output code in a code editor. You can use any code editor of your choice, but as has been mentioned multiple times, Snowflake would recommend using VS Code with the Snowflake Extension. Not only does the Snowflake Extension help navigate through the issues from SnowConvert, but can also run Snowpark Checkpoints for Python, which would help with testing and root cause analysis (though just barely out of scope for this lab).

Let’s open the directory that we originally created in the project creation screen (Spark ADW Lab) in VS Code:

Note that the Output directory structure will be the same as the input directory. Even the data file will be copied over despite no conversion taking place. There will also be a couple of checkpoints.json files that will be created by the SMA. These are json files that contain instructions for the Snowpark Checkpoints extension. The Snowflake extension can load checkpoints into both the source and output code based on the data in those files. We will ignore them for now.

Finally, let’s compare the input python script with the converted one in the output script.

This is a very basic side-by-side comparison with the original Spark code on the left and the output Snowpark compatible code on the right. Looks like some imports were converted as well as the session call(s). We can see an EWI at the bottom of the image above, but let’s not start there. We need to find the parsing error before we do anything else.

We can search the document for the error code for that parsing error that was shown in both the UI and the issues.csv: SPRKPY1101.

You can see that this line of code was present at the bottom of the source code.

# Conversion Input.
some rogue code that doesn't make any sense!

# Conversion Output.
some
# EWI: SPRKPY1101 => Unrecognized or invalid CODE STATEMENT @(131, 6). Last valid token was 'some' @(131, 1), failed token 'rogue' @(131, 6)
#     rogue code that doesn't make any sense!

Looks like this parsing error was because of… “some rogue code that doesn’t make any sense!”. This line of code is at the bottom of the pipeline file. This is not unusual to have extra characters or other elements in a code file as part of an extraction from a source. Note have the SMA detected that this was not valid Python code, and it generated the parsing error.

You can also see how the SMA inserts both the error code and the description into the output code as a comment where the error occurred. This is how all error messages will appear in the output.

Since this is not valid code, it is at the end of the file, and there is nothing else that was removed as a result of this error, the original code and the comment can safely be removed from the output code file.

And now we’ve resolved our first and most serious issue. Get excited.

Let’s work through the rest of our EWIs in this file. We can search for “EWI” because we now know that text will appear in the comment every time there is an error code. (Alternatively, we could sort the issues.csv file and order the issues by severity… but that’s not really necessary here.)

The next one is actually just a warning, not an error. It’s telling us that there was a function used that isn’t always equivalent in Spark and Snowpark:

#EWI: SPRKPY1067 => Snowpark does not support split functions with more than two parameters or containing regex pattern. See documentation for more info.
split_col = split(df_uppercase['NAME'], '.first:')

The description here though gives away that we probably don’t have to worry about this. There are only two parameters being passed. Let’s leave this EWI as a comment in the file, so we know to check for it when we are running the file later.

The last one for this file is a conversion error saying that something is not supported:

This is the write call to the spark jdbc driver to write the output dataframe into SQL Server. Since this is part of the “resolve all inputs/outputs” step that we are going to deal with after we address our issues, we’ll leave this for later. Note, however, that this error must be resolved. The previous one was just a warning and may still work with no change being made.

Resolving the Session Calls

The session calls are converted by the SMA, but you should pay special attention to them to make sure they are functional. In our pipeline script, this is the before and after code:

The SparkSession reference was changed to Session. You can see that reference change near the top of this file in the import statement as well:

Note in the image above, the variable assignment of the session call to “spark” is not changed. This is because this is a variable assignment. It is not necessary to change this, but if you’d like to change the “spark” decorator to “session”, that would be more in line with what Snowpark recommends. (Note that the VS Code Extension “SMA Assistant” will suggest these changes as well.)

This is a simple exercise, but it’s worth doing. You can do a find and replace using VS Code’s own search ability to find the references to “spark” in this file and replace them with session. You can see the result of this in the image below. The references to the “spark” variable in the converted code have been replaced with “session”:

We also can remove something else from this session call. Since we are not going to be running “spark” anymore, we do not need to specify the driver path for the spark driver. So we can remove the config function entirely from the session call like this:

# Old Converted output.
# Spark Session
session = Session.builder.config('spark.driver.extraClassPath', driver_path) \
                    .app_name('SparkSQLServerExample', True) \
                    .getOrCreate()

# New Converted Output
# Snowpark Session
session = Session.builder.app_name('SparkSQLServerExample', True).getOrCreate()

Might as well convert it to a single line. The SMA couldn’t be sure we didn’t need that driver (although that seems logical), so it did not remove it. But now that we have our session call is complete.

(Note that the SMA also adds a “query tag” to the session. This is to help troubleshoot issues with this session or query later on, but this is completely optional to leave or remove.)

Notes on the Session Calls

Believe it or not that is all that we need to change in the code for the session call, but that’s not all we need to do to create the session. This refers back to the original question that a lot of this depends on where you want to run these files. These original spark session calls used a configuration that was setup elsewhere. If you look at the original Spark session call it’s looking for a config file that is being read into a pandas dataframe location at the start of this script file (this is actually true for our notebook file as well).

The standard way to do this is to input the connection parameters directly as strings and call them with the session:

# Parameters in a dictionary.
connection_parameters = {
  "account": "<your snowflake account>",
  "user": "<your snowflake user>",
  "password": "<your snowflake password>",
  "role": "<your snowflake role>",  # optional
  "warehouse": "<your snowflake warehouse>",  # optional
  "database": "<your snowflake database>",  # optional
  "schema": "<your snowflake schema>",  # optional
}

# The session call
session = Session.builder.configs(connection_parameters).app_name("AdventureWorksSummary", True).getOrCreate()

AdventureWorks appears to have referenced a file with these credentials and called it. Assuming there is a similar file called 'snowflake_credentials.txt' that is accessible, then the syntax that would match that could look something like:

# Load into a dataframe.
snow_creds = pd.read_csv('snowflake_credentials.txt', index_col=None, header=0)

# Build the parameters.
connection_parameters = {
  "account": snow_creds.loc[snow_creds['Specific_Element'] == 'Account', 'Value'].item(),
  "user": snow_creds.loc[snow_creds['Specific_Element'] == 'Username', 'Value'].item(),
  "password": snow_creds.loc[snow_creds['Specific_Element'] == 'Password', 'Value'].item(),
  "role": "<your snowflake role>",  # optional
  "warehouse": snow_creds.loc[snow_creds['Specific_Element'] == 'Warehouse', 'Value'].item(),  # optional
  "database": snow_creds.loc[snow_creds['Specific_Element'] == 'Database', 'Value'].item(),  # optional
  "schema": snow_creds.loc[snow_creds['Specific_Element'] == 'Schema', 'Value'].item(),  # optional
}

# Then pass the parameters to the configs function of the session builder.
session = Session.builder.configs(connection_parameters).app_name("AdventureWorksSummary", True).getOrCreate()

Note that for our notebook file to run inside of Snowflake using Snowsight, you wouldn’t need to do any of this. You would just call the active session and run it.

Now it's time for the most critical component of this migration, resolving any input/output references.

Resolving the Inputs and Outputs

So let’s resolve our inputs and outputs now. Note that this is going to diverge based on whether you’re running the files locally or Snowflake. for the python script, Let’s make sure what we gain/lose by running directly inside of Snowsight: you cannot run the whole operation in Snowsight (at least not currently). The local csv file is not accessible from Snowsight. You will have to load the .csv file into a stage manually. This will likely not be an ideal solution, but we can test the conversion by doing this.

So we’ll first prep this file to be run/orchestrated locally, and then to be run in Snowflake.

To get the pipeline script’s inputs and output resolved, we need to first identify them. They are pretty simple. This script seems to:

  • access a local file

  • load the result into SQL Server (but now Snowflake)

  • moves the file to make way for the next one

Simple enough. So we need to replace each component of the code that does those things. Let’s start with accessing the local file.

As was mentioned at the start of this, it would be strongly suggested to rearchitect the Point of Sale System and the orchestration tools used to run this python script, to put the output file into a cloud storage location. Then you could turn that location into an External Table, and voila… you are in Snowflake. However, the current architecture says that this file is not in a cloud storage location and will stay where it is, so we need to create a way for Snowflake to access this file preserving the existing logic.

We have options to do this, but we will create an internal stage and move the file into the stage with the script. We would then need to move the file in the local file system, and also move it in the stage. This can all be done with Snowpark. Let’s break it down:

  • accessing a local file: Create an internal stage (it one doesn’t exist already) -> Load the file into the stage -> Read the file into a dataframe

  • loading the result into SQL Server: Load the transformed data into a table in Snowflake

  • moves the file to make way for the next one: Move the local file -> Move the file in the stage.

Let’s look at code that can do each of these things.

Access a Locally Accessible File

This source code in Spark looks like this:

# Spark read from a local csv file.
df = spark.read.csv('customer_update.csv', header=True, inferSchema=True)

And the transformed snowpark code (by the SMA) looks like this:

# Snowpark read from a local csv file.
df = session.read.option("PARSE_HEADER", True).option("INFER_SCHEMA", True).csv('customer_update.csv')

We can replace that with this with code that does the steps above:

  1. Create an internal stage (if one does not exist already). We will create a stage called 'LOCAL_LOAD_STAGE' and go through a few steps to make sure that the stage is r

# Create a stage if one does not already exist.
# name the stage we're going to use.
target_stage_name = "LOCAL_LOAD_STAGE"

# Check to see if this stage already exists.
stages = session.sql("SHOW STAGES").collect()
target_stages = [stage for stage in stages if stage['name'] == target_stage_name]

# Create the stage if it does not already exist.
if(len(target_stages) < 1):
    from snowflake.core import Root
    from snowflake.core.stage import Stage, StageEncryption, StageResource
    root = Root(session)
    my_stage = Stage(name="LOCAL_LOAD_STAGE",encryption=StageEncryption(type="SNOWFLAKE_SSE"))
    root.databases["ADVENTUREWORKS"].schemas["DBO"].stages.create(my_stage)
    print('%s created.'%(target_stage_name))
else:
    print('%s already exists.'%(target_stage_name))
  1. Load the file into the stage.

# Move the file.
put_results = session.file.put(local_file_name="customer_update.csv",
                    stage_location="ADVENTUREWORKS.DBO.LOCAL_LOAD_STAGE",
                    overwrite=False,
                    auto_compress=False)

# Read the results.
for r in put_results:
    str_output = ("File {src}: {stat}").format(src=r.source,stat=r.status)
    print(str_output)    
  1. Read the file into a dataframe. This is the part that the SMA actually converted. We need to specify that the location of the file is now the internal stage.

# Location of the file in the stage.
csv_file_path = "@LOCAL_LOAD_STAGE/customer_update.csv"

# Spark read from a local csv file.
df = session.read.option("PARSE_HEADER", True).option("INFER_SCHEMA", True).csv(csv_file_path)

The result of that would look like this:

Let's move on to the next step.

Load the result into Snowflake

The original script wrote the dataframe into SQL Server. Now we are going to load into Snowflake. This is a much simpler conversion. The dataframe is already a Snowpark dataframe. This is one of the advantages of Snowflake. Now that the data is accessible to Snowflake, everything happens inside Snowflake.

# Original output from the conversion tool.
# Write the DataFrame to SQL Server.
#EWI: SPRKPY1002 => pyspark.sql.readwriter.DataFrameWriter.jdbc is not supported
df_transformed.write.jdbc(url=sql_server_url,
              table='dbo.DimCustomer',
              mode="append",
              properties={
                  "user": sql_server_user,
                  "password": sql_server_password,
                  "driver": driver_path
              })

# Corrected Snowflake/Snowpark code.
df_transformed.write.save_as_table("ADVENTUREWORKS.DBO.DIMCUSTOMER", mode="append")

Note that we may want to write to a temp table to do some testing/validation, but this is the behavior in the original script.

Move the file to make way for the next one

This is the behavior in the orginal script. We don't really need to make this happen in Snowflake, but we can to showcase the exact same functionality in the stage. This is done with an os command in the original file system. That does not depend on Spark and will remain the same. But to emulate this behavior in snowpark, we would need to move this file in the stage to a new directory.

This can be done simply enough with the following python code:

# New filename.
original_filepath = '@LOCAL_LOAD_STAGE/customer_update.csv'
new_filepath = '@LOCAL_LOAD_STAGE/old_versions/customer_update_%s.csv'%(today_time)

copy_sql = f"COPY FILES INTO {new_filepath} FROM {original_filepath}"
session.sql(copy_sql).collect()
print(f"File copied from {original_filepath} to {new_filepath}")

remove_sql = f"REMOVE {original_filepath}"
session.sql(remove_sql).collect()
print(f"Original file {original_filepath} removed.")

Note that this would not replace any of the existing code. Since we already want to keep the existing motion of moving the spark code to snowpark, we will leave the os reference. The final version will look like this:

Now we have the same motion completely done. Now let's do our final cleanup, and test this script out.

Clean up and Test

We never looked at our import calls and we have config files that are not necessary at all. We could leave the references to the config files and run the script. In fact, assuming those config files are still accessible, then the code will still run. But if we're taking a close look at our import statements, we might as well remove them. These files are represented by all of the code between the import statements and the session call:

There’s a few other things we should do:

  • Check that all of our imports are still necessary. We can leave them for now. If there is an erorr, we can address it.

  • We also have one EWI that we left in there as a warning to check. So we want to make sure we inspect that output.

  • We need to make sure that our file system behavior mirrors that of the expected file system for the POS system. To do this, we should move the customer_update.csv file into the root folder you chose when first launching VS Code.

  • Create a directory called “old_versions” in that same directory. This should allow the os operations to run.

Finally, if you are not comfortable running the code directly into the production table, you can create a copy of that table for this test, and point the load to that copy. Replace the load statement with the one below. Since this is a lab, feel free to write to the “production” table:

# In case we want to test.
create_sql = """
                CREATE OR REPLACE TABLE ADVENTUREWORKS.DBO.DIMCUSTOMER_1
                AS select * from ADVENTUREWORKS.DBO.DIMCUSTOMER;
                """
session.sql(create_sql).collect()

# Write the DataFrame to SQL Server.
df_transformed.write.save_as_table("ADVENTUREWORKS.DBO.DIMCUSTOMER_1", mode="append")

Now we're finally ready to test this out. We can run this script in Python to a testing table and see if it will fail. So run it!

Tragic! The script failed with the following error:

It looks like the way we are referencing an identifier is not the way that Snowpark wanted it. The code that failed is in the exact spot where the remaining EWI is:

You could reference the documentation on the link provided by the error, but in the interest of time, Snowpark needs this variable to expressly be a literal. We need to make the following replacement:

# Old
split_col = split(df_uppercase['NAME'], '.first:')

# New
split_col = split(df_uppercase['NAME'], lit('.first:'))

This should take care of this error. Note that there are always going to be some functional differences between source and a target platforms. Conversion tools like the SMA like to make these differences as obvious as possible. But note that no conversion is 100% automated.

Let’s run it again. This time… success!

We can write some queries in python to validate this, but why don’t we just go into Snowflake (because that’s what we’re about to do anyways).

Navigate to your snowflake account that you have been using to run these scripts. This should be the same one you used to load the database from SQL Server (and if you haven’t done that, the above scripts won’t work anyways beecause the data has not yet been migrated).

You can quickly check this by seeing if the stage was created with the file:

Enable the directory table view to see if the old_versions folder is in there:

And it is:

Since that was the last element of our script, it looks like we’re good!

We can also simply validate that the data was loaded by simply querying the table for the data we uploaded. You can open a new worksheet and simply write this query:

select * from ADVENTUREWORKS.DBO.DIMCUSTOMER
where FIRSTNAME like '%Brandon%'
AND LASTNAME like '%Carver%'

This is one of the names that was just loaded. And it looks like our pipeline has worked:

Running the Pipeline Script in Snowsight

Let’s take a quick look back at the flow we are attempting to convert was doing in Spark:

  • accessing a local file

  • loading the result into SQL Server

  • moving the file to make way for the next one

This flow is not possible to run entirely from within Snowsight. Snowsight does not have access to a local file system. The recommendation here would be to move the export from the POS to a data lake… or any number of other options that would be accessible via Snowsight.

We can, however, take a closer look at how Snowpark handles the transformation logic by running the Python script in Snowflake. If you have already made the changes recommended above, you can run the body of the script in a Python Worksheet in Snowflake.

To do this, first login to your Snowflake account and navigate to the worksheets section. In this worksheet, create a new Python worksheet:

Specify the database, schema, role, and warehouse you’d like to use:

Now we do not have to deal with our session call. You will see a template generated in the worksheet window:

Let’s start by bringing over our import calls. After making the previous script ready to use, we should have the following set of imports:

# General Imports
import pandas as pd
import os
import shutil
import datetime

# Snowpark Imports
from snowflake.snowpark import Session
from snowflake.snowpark.functions import col
from snowflake.snowpark.functions import upper
from snowflake.snowpark.functions import lower
from snowflake.snowpark.functions import split
from snowflake.snowpark.functions import trim
from snowflake.snowpark.functions import when
from snowflake.snowpark.functions import lit
from snowflake.snowpark.functions import expr
from snowflake.snowpark.functions import regexp_replace

We only need the snowpark imports. We will not be moving files around a file system. We could keep the datetime reference if we want to move the file in the stage. (Let’s do it.)

Paste the Snowpark imports (plus datetime) in the python worksheet below the other imports that are already present. Note that ‘col’ is already imported, so you can remove one of those:

Under the “def main” call, let’s paste in all of our transformation code. This will include everything from the assignment of the csv location to the writing of the dataframe to a table.

From here:

To here:

We can also add back in the code that moves the files around in the stage. This part:

Before you can run the code though, you will have to manually create the stage and move the file into the stage. We can add the create stage statement into the script, but we would still need to manually load the file into the stage.

So if you open another worksheet (this time… a sql worksheet), you can run a basic SQL statement that will create the stage:

CREATE STAGE my_int_stage
  ENCRYPTION = (TYPE = 'SNOWFLAKE_SSE');

Make sure to select the correct database, schema, role, and warehouse:

Let’s add our csv file by selecting the +Files option in the top right corner of the window. This will launch the Upload Your Files menu:

Drag and drop or browse to our project directory and load the customer_update.csv file into the stage:

Select Upload in the bottom right corner of the screen. You will be taken back to the stage screen. To view the files, you will need to select Enable Directory Table:

And now… our file appears in the stage:

This is not really a pipeline anymore, of course. But at least we can run the login in Snowflake. Run the rest of the code that you moved into the worksheet. This user had success the first time, but that’s no guarantee of success the second time:

Note that once you’ve defined this function in Snowflake, you can call it in other ways. If AdventureWorks is 100% replacing their POS, then it may make sense to have the transformation logic in Snowflake, especially if orchestration and file movement will be handled somewhere else entirely. This allows Snowpark to focus on where it excels with the transformation logic.

Conclusion

And that's it for the script file. It's not the best example of a pipeline, bu it does hit hard on how to deal wit ht he output from the SMA:

  • Resolve All Issues

  • Resolve the session calls

  • Resolve the Input/Outputs

  • Clean up and Test!

Let's move on to the reporting notebook.

Since I have not filtered the results, the listing of this error code in the issues.csv also comes up in the search and the AssessmentReport.json that is used to build the AssessmentReport.docx summary assessment report. This is the main report that users will navigate through to understand a large workload, but we did not look at it in this lab. (.) Let’s choose where this EWI shows up in the pipeline_dimcustomer.py file as shown above.

Snowpark can function the same way, and this conversion assumes that is how this user will run this code. However, for the existing session call to work, the user would have to load all of the information for their Snowflake account into the local (or at least accessible) connections.toml file on this machine, and that the account they are attempting to connect to is set as the default. , but the idea behind it is that there is an accessible location that has the credentials. When a snowpark session is created, it is going to check this… unless the connection parameters are explicitly passed to the session call.

For the purpose of the time limit on this lab, the first option may make more sense. .

You can also . Now that the stage exists, we can manually load the file of interest into the stage. Navigate to the Databases section of the Snowsight UI, and find the stage we just created in the appropriate database.schema:

More info on the this report can be found in the SMA documentation
You can learn more about updating the connections.toml file in the Snowflake/Snowpark documentation
There’s more on this in the Snowpark documentation
create an internal stage directly in the Snowsight UI