Module 4 : Apache Spark dans Synapse
Introduction a Spark
Apache Spark est un moteur de traitement distribue pour le Big Data. Dans Synapse, il est disponible via des Spark Pools manages.
Langages supportes
- PySpark (Python) - Le plus populaire
- Scala - Performances natives
- Spark SQL - Requetes SQL
- R - Analyse statistique
- .NET pour Spark - C#/F#
Notebooks Synapse
Les notebooks permettent d'executer du code interactivement :
# Lire des donnees Parquet
df = spark.read.parquet("abfss://data@moncompte.dfs.core.windows.net/ventes/")
# Afficher le schema
df.printSchema()
# Afficher les premieres lignes
df.show(10)
# Compter les lignes
print(f"Nombre de lignes: {df.count()}")
Transformations courantes
from pyspark.sql.functions import col, sum, avg, year, month
# Filtrer
df_2024 = df.filter(year(col("date_vente")) == 2024)
# Grouper et agreger
ventes_par_mois = df.groupBy(
year("date_vente").alias("annee"),
month("date_vente").alias("mois")
).agg(
sum("montant").alias("total_ventes"),
avg("montant").alias("vente_moyenne")
)
# Trier
ventes_par_mois = ventes_par_mois.orderBy("annee", "mois")
# Afficher
ventes_par_mois.show()
Jointures
# Charger les tables
ventes = spark.read.parquet(".../ventes/")
clients = spark.read.parquet(".../clients/")
# Jointure
df_enrichi = ventes.join(
clients,
ventes.client_id == clients.id,
"left"
).select(
ventes["*"],
clients["nom"],
clients["ville"]
)
Ecrire des donnees
# Ecrire en Parquet (partitionne)
df_resultat.write \
.mode("overwrite") \
.partitionBy("annee", "mois") \
.parquet("abfss://data@moncompte.dfs.core.windows.net/resultats/")
# Ecrire en Delta Lake
df_resultat.write \
.format("delta") \
.mode("overwrite") \
.save("abfss://data@moncompte.dfs.core.windows.net/delta/ventes/")
Delta Lake est recommande pour les cas d'usage avec mises a jour frequentes. Il supporte les transactions ACID, le versioning et les operations MERGE.
Integration avec SQL Pool
# Lire depuis SQL Pool dedie
df = spark.read \
.format("com.databricks.spark.sqldw") \
.option("url", "jdbc:sqlserver://workspace.sql.azuresynapse.net:1433") \
.option("tempDir", "abfss://temp@moncompte.dfs.core.windows.net/staging/") \
.option("dbTable", "dbo.FactVentes") \
.load()
# Ecrire vers SQL Pool
df.write \
.format("com.databricks.spark.sqldw") \
.option("url", "jdbc:sqlserver://workspace.sql.azuresynapse.net:1433") \
.option("tempDir", "abfss://temp@moncompte.dfs.core.windows.net/staging/") \
.option("dbTable", "dbo.FactVentes_New") \
.mode("overwrite") \
.save()
Optimisations
Performance :
- Utilisez
.cache()pour les DataFrames reutilises - Evitez les
collect()sur de grands datasets - Preferez les operations colonnes aux UDFs Python
- Repartitionnez avec
.repartition(n)avant les jointures
Machine Learning avec MLlib
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
# Preparer les features
assembler = VectorAssembler(
inputCols=["feature1", "feature2", "feature3"],
outputCol="features"
)
df_ml = assembler.transform(df)
# Entrainer le modele
lr = LinearRegression(featuresCol="features", labelCol="target")
model = lr.fit(df_ml)
# Predictions
predictions = model.transform(df_test)