pythonadvanced90 minutes

Advanced Data Pipeline: Build a Custom ETL Framework in Python

Create a scalable, modular Extract-Transform-Load (ETL) mini-project that reads data from multiple formats, applies complex transformations, and loads the result into a target data structure.

Challenge prompt

Build a Python function `run_etl_pipeline(sources, transform_functions, destination)` that handles an advanced ETL process: 1. **Extraction**: The `sources` parameter is a list of dictionaries, each specifying a data source. Each source dictionary includes a `type` key ('csv', 'json', 'api') and a `data` key with either a string path or a raw JSON string or API endpoint URL. 2. **Transformation**: The `transform_functions` parameter is a list of user-defined functions that take and return Python data objects (e.g., lists of dicts). Apply these functions sequentially to the combined extracted data. 3. **Loading**: The `destination` parameter specifies the output location and format (e.g., dictionary with keys like `type`: 'memory', 'csv', or 'json' and a `target` value for file path or memory object). Your ETL pipeline should: - Extract data from multiple sources of different types. - Handle data merging without data duplication. - Apply complex transformations (e.g., filtering, aggregation, enrichment). - Load the resulting data according to destination specifications. Return the final loaded data if destination is 'memory'.

Guidance

  • Implement modular extraction functions for each source type, ensuring consistent output format.
  • Ensure transformation functions are applied in order to the full combined dataset.
  • Build a flexible loader to export or store data based on the destination configuration.

Hints

  • Use Python's csv, json, and requests libraries to handle different types of data extraction.
  • Normalize all extracted data into a common structure such as a list of dictionaries before transformations.
  • For the loader, consider writing to files or returning data directly based on the destination type.

Starter code

import csv
import json
import requests

def extract_csv(path):
    data = []
    with open(path, 'r') as f:
        reader = csv.DictReader(f)
        for row in reader:
            data.append(row)
    return data

def extract_json(json_str):
    return json.loads(json_str)

def extract_api(url):
    response = requests.get(url)
    response.raise_for_status()
    return response.json()

def run_etl_pipeline(sources, transform_functions, destination):
    # Your implementation here
    pass

Expected output

If destination['type'] is 'memory', the function returns a list of dictionaries representing the transformed data.

Core concepts

ETL pipeline designmodular programmingdata extraction and parsingdata transformationfile and API handling

Challenge a Friend

Send this duel to someone else and see if they can solve it.