preloader
image

Bonnes pratiques pour un pipeline de données : de la donnée brute à l’analyse

Un pipeline de données déplace des données depuis une ou plusieurs sources, à travers une série de transformations, vers une destination où elles peuvent être analysées ou servies. Bien le concevoir dès le départ évite d’innombrables heures de débogage.

Le modèle ETL

La plupart des pipelines suivent le modèle Extraire → Transformer → Charger (ETL) :

  1. Extraire — récupérer les données depuis les sources (bases de données, API, fichiers)
  2. Transformer — nettoyer, valider, reformater et enrichir les données
  3. Charger — écrire le résultat vers une destination (entrepôt de données, tableau de bord, fichier)

Une variante, ELT, charge d’abord les données brutes puis les transforme dans la destination — pratique courante avec les entrepôts cloud comme BigQuery.

Principe 1 : Séparer les responsabilités

Conservez l’extraction, la transformation et le chargement dans des fonctions ou modules séparés. Cela rend chaque étape testable et remplaçable indépendamment.

def extraire(chemin_source: str) -> pd.DataFrame:
    return pd.read_csv(chemin_source)

def transformer(df: pd.DataFrame) -> pd.DataFrame:
    df = df.dropna(subset=["date", "valeur"])
    df["date"] = pd.to_datetime(df["date"])
    df["valeur"] = df["valeur"].clip(lower=0)
    return df

def charger(df: pd.DataFrame, chemin_dest: str) -> None:
    df.to_parquet(chemin_dest, index=False)

Principe 2 : Valider tôt

Détectez les données incorrectes dès l’étape d’extraction, avant qu’elles ne corrompent vos transformations.

def valider(df: pd.DataFrame) -> None:
    assert "date" in df.columns, "Colonne 'date' manquante"
    assert df["valeur"].dtype in ["float64", "int64"], "La colonne valeur doit être numérique"
    assert not df.duplicated(subset=["date", "site_id"]).any(), "Enregistrements en double détectés"

Principe 3 : Rendre les pipelines idempotents

Exécuter le pipeline deux fois doit produire le même résultat qu’une seule fois. Utilisez une logique d’upsert ou d’écriture par partition plutôt qu’un ajout en aveugle.

Principe 4 : Tout journaliser

import logging

logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
logger = logging.getLogger(__name__)
logger.info("Extrait %d lignes depuis %s", len(df), chemin_source)

Journalisez les nombres de lignes à chaque étape pour détecter les pertes silencieuses de données.

Principe 5 : Paramétrer, ne pas coder en dur

Utilisez des fichiers de configuration ou des variables d’environnement pour les chemins, identifiants et dates. Ne codez jamais une chaîne de connexion ou un chemin de fichier directement dans une fonction.

import os
DB_URL = os.environ["DATABASE_URL"]

Pipelines cloud : exemple BigQuery

Pour les grands volumes de données, un entrepôt de données cloud supprime la nécessité de gérer le stockage local. Le processus ETL pour BigQuery et Data Studio est illustré dans le projet portfolio Google BigQuery & Data Studio, avec un script Python réutilisable publié comme GitHub Gist.

Récapitulatif

PratiquePourquoi c’est important
Séparer les étapes ETLTests et réutilisation indépendants
Valider à l’extractionDétecter les mauvaises données tôt
Écritures idempotentesSécurité en cas de réexécution
Journaliser les nombres de lignesDétecter les pertes silencieuses
Paramétrer la configurationPortabilité entre environnements

L’application cohérente de ces principes distingue un script qui a fonctionné une fois d’un pipeline qui tourne de façon fiable en production.