Module 4 : Apache Spark dans Synapse

Introduction a Spark

Apache Spark est un moteur de traitement distribue pour le Big Data. Dans Synapse, il est disponible via des Spark Pools manages.

Langages supportes

Notebooks Synapse

Les notebooks permettent d'executer du code interactivement :

# Lire des donnees Parquet
df = spark.read.parquet("abfss://data@moncompte.dfs.core.windows.net/ventes/")

# Afficher le schema
df.printSchema()

# Afficher les premieres lignes
df.show(10)

# Compter les lignes
print(f"Nombre de lignes: {df.count()}")

Transformations courantes

from pyspark.sql.functions import col, sum, avg, year, month

# Filtrer
df_2024 = df.filter(year(col("date_vente")) == 2024)

# Grouper et agreger
ventes_par_mois = df.groupBy(
    year("date_vente").alias("annee"),
    month("date_vente").alias("mois")
).agg(
    sum("montant").alias("total_ventes"),
    avg("montant").alias("vente_moyenne")
)

# Trier
ventes_par_mois = ventes_par_mois.orderBy("annee", "mois")

# Afficher
ventes_par_mois.show()

Jointures

# Charger les tables
ventes = spark.read.parquet(".../ventes/")
clients = spark.read.parquet(".../clients/")

# Jointure
df_enrichi = ventes.join(
    clients,
    ventes.client_id == clients.id,
    "left"
).select(
    ventes["*"],
    clients["nom"],
    clients["ville"]
)

Ecrire des donnees

# Ecrire en Parquet (partitionne)
df_resultat.write \
    .mode("overwrite") \
    .partitionBy("annee", "mois") \
    .parquet("abfss://data@moncompte.dfs.core.windows.net/resultats/")

# Ecrire en Delta Lake
df_resultat.write \
    .format("delta") \
    .mode("overwrite") \
    .save("abfss://data@moncompte.dfs.core.windows.net/delta/ventes/")
Delta Lake est recommande pour les cas d'usage avec mises a jour frequentes. Il supporte les transactions ACID, le versioning et les operations MERGE.

Integration avec SQL Pool

# Lire depuis SQL Pool dedie
df = spark.read \
    .format("com.databricks.spark.sqldw") \
    .option("url", "jdbc:sqlserver://workspace.sql.azuresynapse.net:1433") \
    .option("tempDir", "abfss://temp@moncompte.dfs.core.windows.net/staging/") \
    .option("dbTable", "dbo.FactVentes") \
    .load()

# Ecrire vers SQL Pool
df.write \
    .format("com.databricks.spark.sqldw") \
    .option("url", "jdbc:sqlserver://workspace.sql.azuresynapse.net:1433") \
    .option("tempDir", "abfss://temp@moncompte.dfs.core.windows.net/staging/") \
    .option("dbTable", "dbo.FactVentes_New") \
    .mode("overwrite") \
    .save()

Optimisations

Performance :
  • Utilisez .cache() pour les DataFrames reutilises
  • Evitez les collect() sur de grands datasets
  • Preferez les operations colonnes aux UDFs Python
  • Repartitionnez avec .repartition(n) avant les jointures

Machine Learning avec MLlib

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression

# Preparer les features
assembler = VectorAssembler(
    inputCols=["feature1", "feature2", "feature3"],
    outputCol="features"
)
df_ml = assembler.transform(df)

# Entrainer le modele
lr = LinearRegression(featuresCol="features", labelCol="target")
model = lr.fit(df_ml)

# Predictions
predictions = model.transform(df_test)