Spyglass MTG Blog

Get Ready for Microsoft Fabric: Convert Low-Code Synapse Spark to PySpark Notebooks

Written by Paula Burke | Jul 24, 2024 2:15:00 PM

Both Azure Synapse Spark and Fabric Spark are designed to provide a unified experience to ingest, prepare, manage, and serve large scale data processing for immediate business intelligence and machine learning needs. Each is dynamically scalable and leverage the cloud infrastructure to handle large datasets and complex computations.

While Synapse pipelines offer a streamlined way to handle ETL (Extract, Transform, Load) with low code, the upgrade to Microsoft Fabric moves the development emphasis to notebooks using PySpark Python with embedded SQL where needed. Fabric notebooks enhance collaboration, provide a unified development environment, offer rich visualization and interactive exploration capabilities, integrate seamlessly with machine learning, simplify data governance, and provide greater flexibility and cost efficiency. These benefits make Fabric notebooks a more powerful and user-friendly option for modern data analytics and data science workflows and enables Fabric implementations to be much more flexible and scalable.

To help you get started with Fabric notebooks, we'll use this blog to walk you through creating a notebook to perform the same functions as a simple Pipeline. The sample code is for you to copy and paste to get started and adjust to fit your data. You will find that it is easier to continue to build your solution from this first step.

Let’s assume our Pipeline process has these steps:

  • Look up metadata about the source file.
  • Read the CSV file.
  • Transform the data.
  • Write the updated data to a Delta file in the Lake.

These are the steps to create the same in a Notebook:

Note: Set Up Your Environment

Before you start, ensure your Synapse workspace has a Spark pool and that you have appropriate access to data sources such as Azure Blob Storage and Delta Lake. Your system administrator can help you with these details.

Step 1: Lookup Metadata for the File to Process

For this example, let's assume metadata is stored in the Fabric Warehouse named “ControlDB”, and it contains information on the file to process in the table meta.metaMasterControl.

If the Warehouse files are not already linked to your Lakehouse, then edit your Lakehouse and from the “Tables” folder right click and choose “New shortcut”.

Then choose the “Microsoft OneLake” option and select your Warehouse from the Explorer list.

Select the table(s) from the Tables list, and now when a new Notebook is created the Lakehouse can be added as a Data Source in the Explorer, and the metadata tables are included in the Explorer list.

To read the metadata: Use the following code in the Notebook (changing the table path and name to match your information).

path = "Tables/meta/metaMasterControl"
df = spark.read.format("delta").load(path)

To use the first row of the metadata information shown above:

import pyspark.sql.functions as F

filepathname = df.filter(F.col("isActive") == True).collect()
srcFile = filepathname[0].SourcePath + filepathname[0].SourceName
srcType = filepathname[0].SourceType

Step 2: Read the File

We now have the file name and path to read from, and the file type in our variables “srcFile” and “srcType”.  Using these variables, you can read the data from the file.

src_df = spark.read.format(srcType).option("header","true").load(srcFile)

Step 3: Transform the Data

Transform the data as needed. For this example, we'll clean any leading or trailing spaces and perform a simple aggregation.

# Add required imports
from pyspark.sql.functions import col, sum, trim

# remove leading and trailing spaces from all columns
changed_data_df = src_df

for c in changed_data_df.columns:
      changed_data_df = changed_data_df.withColumn(c,trim(col(c)))
     
# Make sure the column to be totaled is a number and then aggregate
changed_data_df = changed_data_df.withColumn('units',col('units').cast('int'))
changed_data_df = changed_data_df \
                     .groupBy('part_id') \
                     .agg(sum('units').alias('total_units'))

display(changed_data_df)

Or you can use a SQL command by giving the DataFrame a view name:

src_df.createTempView('input_vw')

sqlCmd = '''
     SELECT trim(part_id) part_id,
            SUM(cast(units as integer)) total_units
     FROM input_vw
     GROUP BY trim(part_id)
'''
changed_data_df = spark.sql(sqlCmd)

display(changed_data_df)

As you can see, either syntax will ultimately provide the same result. Therefore, this affords you the option to choose the approach you feel most comfortable using.

Step 4: Write results to a Delta File

Now we can save the transformed data to a new Delta file in the Lake.

Copy the ABFS path from the Files/folder_name

outFile = 'abfss://6ffced48-f32e-4dac@onelake.dfs.fabric.microsoft.com/1ac1cb24-881f/Files/delivery_process/part_summary_totalunits'
changed_data_df.write \
                .format('delta') \
                .mode("overwrite") \
                .option("overwriteSchema","true") \
                .save(outFile)

              After the write is complete the new file will show in the Files folder in the source’s Explorer.

Syntax Summary

In the sample code, the following syntax elements were used.

In this blog, we've demonstrated how to recreate a Synapse pipeline using a PySpark notebook. Use this reference to help get started creating code-based Spark Notebooks and embedded SQL. If you have any additional questions, please contact us today!