Advanced Data Pipeline: Build a Custom ETL Framework in Python
Create a scalable, modular Extract-Transform-Load (ETL) mini-project that reads data from multiple formats, applies complex transformations, and loads the result into a target data structure.
Challenge prompt
Build a Python function `run_etl_pipeline(sources, transform_functions, destination)` that handles an advanced ETL process: 1. **Extraction**: The `sources` parameter is a list of dictionaries, each specifying a data source. Each source dictionary includes a `type` key ('csv', 'json', 'api') and a `data` key with either a string path or a raw JSON string or API endpoint URL. 2. **Transformation**: The `transform_functions` parameter is a list of user-defined functions that take and return Python data objects (e.g., lists of dicts). Apply these functions sequentially to the combined extracted data. 3. **Loading**: The `destination` parameter specifies the output location and format (e.g., dictionary with keys like `type`: 'memory', 'csv', or 'json' and a `target` value for file path or memory object). Your ETL pipeline should: - Extract data from multiple sources of different types. - Handle data merging without data duplication. - Apply complex transformations (e.g., filtering, aggregation, enrichment). - Load the resulting data according to destination specifications. Return the final loaded data if destination is 'memory'.
Guidance
- • Implement modular extraction functions for each source type, ensuring consistent output format.
- • Ensure transformation functions are applied in order to the full combined dataset.
- • Build a flexible loader to export or store data based on the destination configuration.
Hints
- • Use Python's csv, json, and requests libraries to handle different types of data extraction.
- • Normalize all extracted data into a common structure such as a list of dictionaries before transformations.
- • For the loader, consider writing to files or returning data directly based on the destination type.
Starter code
import csv
import json
import requests
def extract_csv(path):
data = []
with open(path, 'r') as f:
reader = csv.DictReader(f)
for row in reader:
data.append(row)
return data
def extract_json(json_str):
return json.loads(json_str)
def extract_api(url):
response = requests.get(url)
response.raise_for_status()
return response.json()
def run_etl_pipeline(sources, transform_functions, destination):
# Your implementation here
passExpected output
If destination['type'] is 'memory', the function returns a list of dictionaries representing the transformed data.
Core concepts
Challenge a Friend
Send this duel to someone else and see if they can solve it.