Module 4 : Apache Spark dans Synapse

Introduction à Spark

Apache Spark est un moteur de traitement distribué pour le Big Data. Dans Synapse, il est disponible via des Spark Pools managés.

Langages supportés

Notebooks Synapse

Les notebooks permettent d'exécuter du code interactivement :

# Lire des données Parquet
df = spark.read.parquet("abfss://data@moncompte.dfs.core.windows.net/ventes/")

# Afficher le schéma
df.printSchema()

# Afficher les premières lignes
df.show(10)

# Compter les lignes
print(f"Nombre de lignes: {df.count()}")

Transformations courantes

from pyspark.sql.functions import col, sum, avg, year, month

# Filtrer
df_2024 = df.filter(year(col("date_vente")) == 2024)

# Grouper et agreger
ventes_par_mois = df.groupBy(
    year("date_vente").alias("annee"),
    month("date_vente").alias("mois")
).agg(
    sum("montant").alias("total_ventes"),
    avg("montant").alias("vente_moyenne")
)

# Trier
ventes_par_mois = ventes_par_mois.orderBy("annee", "mois")

# Afficher
ventes_par_mois.show()

Jointures

# Charger les tables
ventes = spark.read.parquet(".../ventes/")
clients = spark.read.parquet(".../clients/")

# Jointure
df_enrichi = ventes.join(
    clients,
    ventes.client_id == clients.id,
    "left"
).select(
    ventes["*"],
    clients["nom"],
    clients["ville"]
)

Écrire des données

# Écrire en Parquet (partitionné)
df_resultat.write.mode("overwrite").partitionBy("annee", "mois").parquet("abfss://data@moncompte.dfs.core.windows.net/resultats/")

# Écrire en Delta Lake
df_resultat.write.format("delta").mode("overwrite").save("abfss://data@moncompte.dfs.core.windows.net/delta/ventes/")
Version multiligne (cliquez pour afficher)
# Écrire en Parquet (partitionné)
df_resultat.write \
    .mode("overwrite") \
    .partitionBy("annee", "mois") \
    .parquet("abfss://data@moncompte.dfs.core.windows.net/resultats/")

# Écrire en Delta Lake
df_resultat.write \
    .format("delta") \
    .mode("overwrite") \
    .save("abfss://data@moncompte.dfs.core.windows.net/delta/ventes/")
Delta Lake est recommandé pour les cas d'usage avec mises à jour fréquentes. Il supporte les transactions ACID, le versioning et les opérations MERGE.

Intégration avec SQL Pool

# Lire depuis SQL Pool dédié
df = spark.read.format("com.databricks.spark.sqldw").option("url", "jdbc:sqlserver://workspace.sql.azuresynapse.net:1433").option("tempDir", "abfss://temp@moncompte.dfs.core.windows.net/staging/").option("dbTable", "dbo.FactVentes").load()

# Écrire vers SQL Pool
df.write.format("com.databricks.spark.sqldw").option("url", "jdbc:sqlserver://workspace.sql.azuresynapse.net:1433").option("tempDir", "abfss://temp@moncompte.dfs.core.windows.net/staging/").option("dbTable", "dbo.FactVentes_New").mode("overwrite").save()
Version multiligne (cliquez pour afficher)
# Lire depuis SQL Pool dédié
df = spark.read \
    .format("com.databricks.spark.sqldw") \
    .option("url", "jdbc:sqlserver://workspace.sql.azuresynapse.net:1433") \
    .option("tempDir", "abfss://temp@moncompte.dfs.core.windows.net/staging/") \
    .option("dbTable", "dbo.FactVentes") \
    .load()

# Écrire vers SQL Pool
df.write \
    .format("com.databricks.spark.sqldw") \
    .option("url", "jdbc:sqlserver://workspace.sql.azuresynapse.net:1433") \
    .option("tempDir", "abfss://temp@moncompte.dfs.core.windows.net/staging/") \
    .option("dbTable", "dbo.FactVentes_New") \
    .mode("overwrite") \
    .save()

Optimisations

Performance :
  • Utilisez .cache() pour les DataFrames réutilisés
  • Évitez les collect() sur de grands datasets
  • Préférez les opérations colonnes aux UDFs Python
  • Repartitionnez avec .repartition(n) avant les jointures

Machine Learning avec MLlib

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression

# Préparer les features
assembler = VectorAssembler(
    inputCols=["feature1", "feature2", "feature3"],
    outputCol="features"
)
df_ml = assembler.transform(df)

# Entraîner le modèle
lr = LinearRegression(featuresCol="features", labelCol="target")
model = lr.fit(df_ml)

# Predictions
predictions = model.transform(df_test)