import psycopg2
import snowflake.connector
from datetime import datetime
class PostgreSQLToSnowflakeETL:
def __init__(self, pg_config, sf_config):
self.pg_config = pg_config
self.sf_config = sf_config
def extract(self, query):
"""Extract data from PostgreSQL"""
print(f"[{datetime.now()}] Starting extraction...")
conn = psycopg2.connect(**self.pg_config)
cursor = conn.cursor()
cursor.execute(query)
columns = [desc[0] for desc in cursor.description]
data = cursor.fetchall()
cursor.close()
conn.close()
print(f"[{datetime.now()}] Extracted {len(data)} rows")
return columns, data
def transform(self, columns, data):
"""Transform data - clean, validate, enrich"""
print(f"[{datetime.now()}] Starting transformation...")
transformed_data = []
for row in data:
# Example transformations
transformed_row = list(row)
# Clean null values
transformed_row = [val if val is not None else '' for val in transformed_row]
# Add audit columns
transformed_row.append(datetime.now()) # loaded_at
transformed_row.append('ETL_PIPELINE') # source_system
transformed_data.append(tuple(transformed_row))
# Add audit columns to schema
columns.extend(['loaded_at', 'source_system'])
print(f"[{datetime.now()}] Transformed {len(transformed_data)} rows")
return columns, transformed_data
def load(self, table_name, columns, data):
"""Load data into Snowflake"""
print(f"[{datetime.now()}] Starting load to Snowflake...")
conn = snowflake.connector.connect(**self.sf_config)
cursor = conn.cursor()
# Create table if not exists
col_defs = ', '.join([f"{col} VARCHAR" for col in columns])
cursor.execute(f"""
CREATE TABLE IF NOT EXISTS {table_name} (
{col_defs}
)
""")
# Bulk insert
placeholders = ', '.join(['%s'] * len(columns))
insert_query = f"INSERT INTO {table_name} VALUES ({placeholders})"
cursor.executemany(insert_query, data)
conn.commit()
cursor.close()
conn.close()
print(f"[{datetime.now()}] Loaded {len(data)} rows successfully")
def run(self, source_query, target_table):
"""Execute full ETL pipeline"""
print("="*50)
print("Starting ETL Pipeline")
print("="*50)
# Extract
columns, data = self.extract(source_query)
# Transform
columns, data = self.transform(columns, data)
# Load
self.load(target_table, columns, data)
print("="*50)
print("ETL Pipeline Completed Successfully!")
print("="*50)
# Usage
if __name__ == "__main__":
pg_config = {
'host': 'localhost',
'database': 'production',
'user': 'etl_user',
'password': 'secure_password'
}
sf_config = {
'account': 'xyz12345',
'user': 'etl_user',
'password': 'secure_password',
'warehouse': 'COMPUTE_WH',
'database': 'ANALYTICS',
'schema': 'STAGING'
}
etl = PostgreSQLToSnowflakeETL(pg_config, sf_config)
source_query = """
SELECT customer_id, name, email, created_at
FROM customers
WHERE created_at >= CURRENT_DATE - INTERVAL '1 day'
"""
etl.run(source_query, 'customers_staging')
const { Kafka } = require('kafkajs');
const { BigQuery } = require('@google-cloud/bigquery');
class KafkaToBigQueryStreaming {
constructor(kafkaConfig, bqConfig) {
// Initialize Kafka consumer
this.kafka = new Kafka({
clientId: kafkaConfig.clientId,
brokers: kafkaConfig.brokers
});
this.consumer = this.kafka.consumer({
groupId: kafkaConfig.groupId
});
// Initialize BigQuery client
this.bigquery = new BigQuery({
projectId: bqConfig.projectId,
keyFilename: bqConfig.keyFilename
});
this.dataset = this.bigquery.dataset(bqConfig.dataset);
this.table = this.dataset.table(bqConfig.table);
this.batchSize = 100;
this.batch = [];
}
async start(topic) {
await this.consumer.connect();
await this.consumer.subscribe({ topic, fromBeginning: false });
console.log(`Started consuming from topic: ${topic}`);
await this.consumer.run({
eachMessage: async ({ topic, partition, message }) => {
await this.processMessage(message);
}
});
}
async processMessage(message) {
try {
// Parse Kafka message
const data = JSON.parse(message.value.toString());
// Transform data
const transformed = this.transform(data);
// Add to batch
this.batch.push(transformed);
// Flush batch if size reached
if (this.batch.length >= this.batchSize) {
await this.flushBatch();
}
} catch (error) {
console.error('Error processing message:', error);
}
}
transform(data) {
// Add transformation logic
return {
...data,
processed_at: new Date().toISOString(),
schema_version: '1.0'
};
}
async flushBatch() {
if (this.batch.length === 0) return;
try {
await this.table.insert(this.batch);
console.log(`Inserted ${this.batch.length} rows to BigQuery`);
this.batch = [];
} catch (error) {
console.error('Error inserting to BigQuery:', error);
// Implement retry logic here
}
}
async stop() {
// Flush remaining batch
await this.flushBatch();
// Disconnect
await this.consumer.disconnect();
console.log('Consumer stopped');
}
}
// Usage
const kafkaConfig = {
clientId: 'data-integration-consumer',
brokers: ['kafka1:9092', 'kafka2:9092'],
groupId: 'bigquery-consumer-group'
};
const bqConfig = {
projectId: 'my-project',
keyFilename: './service-account-key.json',
dataset: 'analytics',
table: 'events'
};
const streaming = new KafkaToBigQueryStreaming(kafkaConfig, bqConfig);
// Start streaming
streaming.start('user-events');
// Graceful shutdown
process.on('SIGTERM', async () => {
await streaming.stop();
process.exit(0);
});
import requests
import psycopg2
from typing import List, Dict
import time
class APIToDatabaseSync:
def __init__(self, api_config, db_config):
self.api_base_url = api_config['base_url']
self.api_key = api_config['api_key']
self.db_config = db_config
def fetch_from_api(self, endpoint: str, params: Dict = None) -> List[Dict]:
"""Fetch data from REST API with pagination"""
headers = {'Authorization': f'Bearer {self.api_key}'}
all_data = []
page = 1
while True:
if params is None:
params = {}
params['page'] = page
params['per_page'] = 100
response = requests.get(
f"{self.api_base_url}/{endpoint}",
headers=headers,
params=params
)
response.raise_for_status()
data = response.json()
if not data:
break
all_data.extend(data)
print(f"Fetched page {page}, total records: {len(all_data)}")
page += 1
time.sleep(0.5) # Rate limiting
return all_data
def transform_data(self, records: List[Dict]) -> List[tuple]:
"""Transform API data to match database schema"""
transformed = []
for record in records:
# Extract and transform fields
transformed_record = (
record.get('id'),
record.get('name'),
record.get('email'),
record.get('status', 'active'),
record.get('created_at'),
# Add computed fields
f"{record.get('first_name')} {record.get('last_name')}",
# Normalize data
record.get('email', '').lower()
)
transformed.append(transformed_record)
return transformed
def upsert_to_database(self, table: str, records: List[tuple]):
"""Insert or update records in database"""
conn = psycopg2.connect(**self.db_config)
cursor = conn.cursor()
upsert_query = f"""
INSERT INTO {table}
(id, name, email, status, created_at, full_name, email_normalized)
VALUES (%s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (id)
DO UPDATE SET
name = EXCLUDED.name,
email = EXCLUDED.email,
status = EXCLUDED.status,
updated_at = CURRENT_TIMESTAMP
"""
cursor.executemany(upsert_query, records)
conn.commit()
print(f"Upserted {len(records)} records")
cursor.close()
conn.close()
def sync(self, endpoint: str, table: str, filters: Dict = None):
"""Execute full sync from API to database"""
print(f"Starting sync: {endpoint} -> {table}")
# Fetch from API
api_data = self.fetch_from_api(endpoint, filters)
# Transform
transformed_data = self.transform_data(api_data)
# Load to database
self.upsert_to_database(table, transformed_data)
print("Sync completed successfully!")
# Usage
api_config = {
'base_url': 'https://api.example.com/v1',
'api_key': 'your-api-key-here'
}
db_config = {
'host': 'localhost',
'database': 'analytics',
'user': 'etl_user',
'password': 'secure_password'
}
sync = APIToDatabaseSync(api_config, db_config)
sync.sync('customers', 'customers_synced', {'status': 'active'})
-- models/staging/stg_customers.sql
-- Stage raw customer data with basic cleaning
{{ config(
materialized='view',
tags=['staging', 'customers']
) }}
WITH source AS (
SELECT * FROM {{ source('raw', 'customers') }}
),
cleaned AS (
SELECT
id AS customer_id,
TRIM(LOWER(email)) AS email,
TRIM(first_name) AS first_name,
TRIM(last_name) AS last_name,
CONCAT(TRIM(first_name), ' ', TRIM(last_name)) AS full_name,
phone,
created_at,
updated_at,
status
FROM source
WHERE email IS NOT NULL
AND email LIKE '%@%'
)
SELECT * FROM cleaned;
-- models/marts/customers/dim_customers.sql
-- Create customer dimension with enrichments
{{ config(
materialized='table',
tags=['marts', 'customers']
) }}
WITH stg_customers AS (
SELECT * FROM {{ ref('stg_customers') }}
),
customer_orders AS (
SELECT
customer_id,
COUNT(*) AS total_orders,
SUM(total_amount) AS lifetime_value,
MAX(order_date) AS last_order_date,
MIN(order_date) AS first_order_date
FROM {{ ref('stg_orders') }}
GROUP BY customer_id
),
final AS (
SELECT
c.customer_id,
c.email,
c.full_name,
c.first_name,
c.last_name,
c.phone,
c.status,
c.created_at,
-- Enriched fields
COALESCE(o.total_orders, 0) AS total_orders,
COALESCE(o.lifetime_value, 0) AS lifetime_value,
o.last_order_date,
o.first_order_date,
-- Customer segmentation
CASE
WHEN COALESCE(o.lifetime_value, 0) >= 1000 THEN 'High Value'
WHEN COALESCE(o.lifetime_value, 0) >= 500 THEN 'Medium Value'
ELSE 'Low Value'
END AS customer_segment,
-- Recency calculation
DATEDIFF('day', o.last_order_date, CURRENT_DATE) AS days_since_last_order,
-- Audit fields
CURRENT_TIMESTAMP AS dbt_updated_at
FROM stg_customers c
LEFT JOIN customer_orders o ON c.customer_id = o.customer_id
)
SELECT * FROM final;
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'data-team',
'depends_on_past': False,
'start_date': datetime(2025, 1, 1),
'email_on_failure': True,
'email_on_retry': False,
'retries': 3,
'retry_delay': timedelta(minutes=5)
}
dag = DAG(
'customer_data_pipeline',
default_args=default_args,
description='Daily customer data ETL pipeline',
schedule_interval='0 2 * * *', # Run at 2 AM daily
catchup=False,
tags=['etl', 'customers']
)
def extract_data(**context):
"""Extract data from source systems"""
import psycopg2
import json
conn = psycopg2.connect(
host='source-db.example.com',
database='production',
user='etl_user',
password='password'
)
cursor = conn.cursor()
cursor.execute("""
SELECT id, name, email, created_at
FROM customers
WHERE updated_at >= CURRENT_DATE - INTERVAL '1 day'
""")
data = cursor.fetchall()
cursor.close()
conn.close()
# Push to XCom for next task
context['ti'].xcom_push(key='extracted_data', value=json.dumps(data))
print(f"Extracted {len(data)} records")
def transform_data(**context):
"""Transform extracted data"""
import json
# Pull from XCom
data = json.loads(context['ti'].xcom_pull(key='extracted_data'))
transformed = []
for record in data:
transformed.append({
'id': record[0],
'name': record[1].upper(),
'email': record[2].lower(),
'created_at': str(record[3]),
'processed_at': datetime.now().isoformat()
})
context['ti'].xcom_push(key='transformed_data', value=json.dumps(transformed))
print(f"Transformed {len(transformed)} records")
extract_task = PythonOperator(
task_id='extract_customers',
python_callable=extract_data,
dag=dag
)
transform_task = PythonOperator(
task_id='transform_customers',
python_callable=transform_data,
dag=dag
)
load_task = SnowflakeOperator(
task_id='load_to_snowflake',
snowflake_conn_id='snowflake_default',
sql="""
COPY INTO customers_staging
FROM @my_stage/customers/
FILE_FORMAT = (TYPE = 'JSON')
""",
dag=dag
)
validate_task = PostgresOperator(
task_id='validate_data',
postgres_conn_id='postgres_default',
sql="""
SELECT COUNT(*) FROM customers_staging
WHERE created_at >= CURRENT_DATE - INTERVAL '1 day'
""",
dag=dag
)
# Define task dependencies
extract_task >> transform_task >> load_task >> validate_task