Module 4 : Apache Spark dans Synapse
Introduction à Spark
Apache Spark est un moteur de traitement distribué pour le Big Data. Dans Synapse, il est disponible via des Spark Pools managés.
Langages supportés
- PySpark (Python) - Le plus populaire
- Scala - Performances natives
- Spark SQL - Requêtes SQL
- R - Analyse statistique
- .NET pour Spark - C#/F#
Notebooks Synapse
Les notebooks permettent d'exécuter du code interactivement :
# Lire des données Parquet
df = spark.read.parquet("abfss://data@moncompte.dfs.core.windows.net/ventes/")
# Afficher le schéma
df.printSchema()
# Afficher les premières lignes
df.show(10)
# Compter les lignes
print(f"Nombre de lignes: {df.count()}")
Transformations courantes
from pyspark.sql.functions import col, sum, avg, year, month
# Filtrer
df_2024 = df.filter(year(col("date_vente")) == 2024)
# Grouper et agreger
ventes_par_mois = df.groupBy(
year("date_vente").alias("annee"),
month("date_vente").alias("mois")
).agg(
sum("montant").alias("total_ventes"),
avg("montant").alias("vente_moyenne")
)
# Trier
ventes_par_mois = ventes_par_mois.orderBy("annee", "mois")
# Afficher
ventes_par_mois.show()
Jointures
# Charger les tables
ventes = spark.read.parquet(".../ventes/")
clients = spark.read.parquet(".../clients/")
# Jointure
df_enrichi = ventes.join(
clients,
ventes.client_id == clients.id,
"left"
).select(
ventes["*"],
clients["nom"],
clients["ville"]
)
Écrire des données
# Écrire en Parquet (partitionné)
df_resultat.write.mode("overwrite").partitionBy("annee", "mois").parquet("abfss://data@moncompte.dfs.core.windows.net/resultats/")
# Écrire en Delta Lake
df_resultat.write.format("delta").mode("overwrite").save("abfss://data@moncompte.dfs.core.windows.net/delta/ventes/")
Version multiligne (cliquez pour afficher)
# Écrire en Parquet (partitionné)
df_resultat.write \
.mode("overwrite") \
.partitionBy("annee", "mois") \
.parquet("abfss://data@moncompte.dfs.core.windows.net/resultats/")
# Écrire en Delta Lake
df_resultat.write \
.format("delta") \
.mode("overwrite") \
.save("abfss://data@moncompte.dfs.core.windows.net/delta/ventes/")
Delta Lake est recommandé pour les cas d'usage avec mises à jour fréquentes. Il supporte les transactions ACID, le versioning et les opérations MERGE.
Intégration avec SQL Pool
# Lire depuis SQL Pool dédié
df = spark.read.format("com.databricks.spark.sqldw").option("url", "jdbc:sqlserver://workspace.sql.azuresynapse.net:1433").option("tempDir", "abfss://temp@moncompte.dfs.core.windows.net/staging/").option("dbTable", "dbo.FactVentes").load()
# Écrire vers SQL Pool
df.write.format("com.databricks.spark.sqldw").option("url", "jdbc:sqlserver://workspace.sql.azuresynapse.net:1433").option("tempDir", "abfss://temp@moncompte.dfs.core.windows.net/staging/").option("dbTable", "dbo.FactVentes_New").mode("overwrite").save()
Version multiligne (cliquez pour afficher)
# Lire depuis SQL Pool dédié
df = spark.read \
.format("com.databricks.spark.sqldw") \
.option("url", "jdbc:sqlserver://workspace.sql.azuresynapse.net:1433") \
.option("tempDir", "abfss://temp@moncompte.dfs.core.windows.net/staging/") \
.option("dbTable", "dbo.FactVentes") \
.load()
# Écrire vers SQL Pool
df.write \
.format("com.databricks.spark.sqldw") \
.option("url", "jdbc:sqlserver://workspace.sql.azuresynapse.net:1433") \
.option("tempDir", "abfss://temp@moncompte.dfs.core.windows.net/staging/") \
.option("dbTable", "dbo.FactVentes_New") \
.mode("overwrite") \
.save()
Optimisations
Performance :
- Utilisez
.cache()pour les DataFrames réutilisés - Évitez les
collect()sur de grands datasets - Préférez les opérations colonnes aux UDFs Python
- Repartitionnez avec
.repartition(n)avant les jointures
Machine Learning avec MLlib
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
# Préparer les features
assembler = VectorAssembler(
inputCols=["feature1", "feature2", "feature3"],
outputCol="features"
)
df_ml = assembler.transform(df)
# Entraîner le modèle
lr = LinearRegression(featuresCol="features", labelCol="target")
model = lr.fit(df_ml)
# Predictions
predictions = model.transform(df_test)