๐Ÿ’ก Data Integration Code Examples

Example 1: PostgreSQL to Snowflake ETL

Python Implementation

Python
import psycopg2
import snowflake.connector
from datetime import datetime

class PostgreSQLToSnowflakeETL:
    def __init__(self, pg_config, sf_config):
        self.pg_config = pg_config
        self.sf_config = sf_config

    def extract(self, query):
        """Extract data from PostgreSQL"""
        print(f"[{datetime.now()}] Starting extraction...")

        conn = psycopg2.connect(**self.pg_config)
        cursor = conn.cursor()
        cursor.execute(query)

        columns = [desc[0] for desc in cursor.description]
        data = cursor.fetchall()

        cursor.close()
        conn.close()

        print(f"[{datetime.now()}] Extracted {len(data)} rows")
        return columns, data

    def transform(self, columns, data):
        """Transform data - clean, validate, enrich"""
        print(f"[{datetime.now()}] Starting transformation...")

        transformed_data = []
        for row in data:
            # Example transformations
            transformed_row = list(row)

            # Clean null values
            transformed_row = [val if val is not None else '' for val in transformed_row]

            # Add audit columns
            transformed_row.append(datetime.now())  # loaded_at
            transformed_row.append('ETL_PIPELINE')  # source_system

            transformed_data.append(tuple(transformed_row))

        # Add audit columns to schema
        columns.extend(['loaded_at', 'source_system'])

        print(f"[{datetime.now()}] Transformed {len(transformed_data)} rows")
        return columns, transformed_data

    def load(self, table_name, columns, data):
        """Load data into Snowflake"""
        print(f"[{datetime.now()}] Starting load to Snowflake...")

        conn = snowflake.connector.connect(**self.sf_config)
        cursor = conn.cursor()

        # Create table if not exists
        col_defs = ', '.join([f"{col} VARCHAR" for col in columns])
        cursor.execute(f"""
            CREATE TABLE IF NOT EXISTS {table_name} (
                {col_defs}
            )
        """)

        # Bulk insert
        placeholders = ', '.join(['%s'] * len(columns))
        insert_query = f"INSERT INTO {table_name} VALUES ({placeholders})"

        cursor.executemany(insert_query, data)
        conn.commit()

        cursor.close()
        conn.close()

        print(f"[{datetime.now()}] Loaded {len(data)} rows successfully")

    def run(self, source_query, target_table):
        """Execute full ETL pipeline"""
        print("="*50)
        print("Starting ETL Pipeline")
        print("="*50)

        # Extract
        columns, data = self.extract(source_query)

        # Transform
        columns, data = self.transform(columns, data)

        # Load
        self.load(target_table, columns, data)

        print("="*50)
        print("ETL Pipeline Completed Successfully!")
        print("="*50)

# Usage
if __name__ == "__main__":
    pg_config = {
        'host': 'localhost',
        'database': 'production',
        'user': 'etl_user',
        'password': 'secure_password'
    }

    sf_config = {
        'account': 'xyz12345',
        'user': 'etl_user',
        'password': 'secure_password',
        'warehouse': 'COMPUTE_WH',
        'database': 'ANALYTICS',
        'schema': 'STAGING'
    }

    etl = PostgreSQLToSnowflakeETL(pg_config, sf_config)

    source_query = """
        SELECT customer_id, name, email, created_at
        FROM customers
        WHERE created_at >= CURRENT_DATE - INTERVAL '1 day'
    """

    etl.run(source_query, 'customers_staging')

Example 2: Real-time Kafka Streaming to BigQuery

Node.js Implementation

JavaScript (Node.js)
const { Kafka } = require('kafkajs');
const { BigQuery } = require('@google-cloud/bigquery');

class KafkaToBigQueryStreaming {
    constructor(kafkaConfig, bqConfig) {
        // Initialize Kafka consumer
        this.kafka = new Kafka({
            clientId: kafkaConfig.clientId,
            brokers: kafkaConfig.brokers
        });

        this.consumer = this.kafka.consumer({
            groupId: kafkaConfig.groupId
        });

        // Initialize BigQuery client
        this.bigquery = new BigQuery({
            projectId: bqConfig.projectId,
            keyFilename: bqConfig.keyFilename
        });

        this.dataset = this.bigquery.dataset(bqConfig.dataset);
        this.table = this.dataset.table(bqConfig.table);

        this.batchSize = 100;
        this.batch = [];
    }

    async start(topic) {
        await this.consumer.connect();
        await this.consumer.subscribe({ topic, fromBeginning: false });

        console.log(`Started consuming from topic: ${topic}`);

        await this.consumer.run({
            eachMessage: async ({ topic, partition, message }) => {
                await this.processMessage(message);
            }
        });
    }

    async processMessage(message) {
        try {
            // Parse Kafka message
            const data = JSON.parse(message.value.toString());

            // Transform data
            const transformed = this.transform(data);

            // Add to batch
            this.batch.push(transformed);

            // Flush batch if size reached
            if (this.batch.length >= this.batchSize) {
                await this.flushBatch();
            }

        } catch (error) {
            console.error('Error processing message:', error);
        }
    }

    transform(data) {
        // Add transformation logic
        return {
            ...data,
            processed_at: new Date().toISOString(),
            schema_version: '1.0'
        };
    }

    async flushBatch() {
        if (this.batch.length === 0) return;

        try {
            await this.table.insert(this.batch);
            console.log(`Inserted ${this.batch.length} rows to BigQuery`);
            this.batch = [];
        } catch (error) {
            console.error('Error inserting to BigQuery:', error);
            // Implement retry logic here
        }
    }

    async stop() {
        // Flush remaining batch
        await this.flushBatch();

        // Disconnect
        await this.consumer.disconnect();
        console.log('Consumer stopped');
    }
}

// Usage
const kafkaConfig = {
    clientId: 'data-integration-consumer',
    brokers: ['kafka1:9092', 'kafka2:9092'],
    groupId: 'bigquery-consumer-group'
};

const bqConfig = {
    projectId: 'my-project',
    keyFilename: './service-account-key.json',
    dataset: 'analytics',
    table: 'events'
};

const streaming = new KafkaToBigQueryStreaming(kafkaConfig, bqConfig);

// Start streaming
streaming.start('user-events');

// Graceful shutdown
process.on('SIGTERM', async () => {
    await streaming.stop();
    process.exit(0);
});

Example 3: REST API Integration

Sync Data from REST API to Database

Python
import requests
import psycopg2
from typing import List, Dict
import time

class APIToDatabaseSync:
    def __init__(self, api_config, db_config):
        self.api_base_url = api_config['base_url']
        self.api_key = api_config['api_key']
        self.db_config = db_config

    def fetch_from_api(self, endpoint: str, params: Dict = None) -> List[Dict]:
        """Fetch data from REST API with pagination"""
        headers = {'Authorization': f'Bearer {self.api_key}'}
        all_data = []
        page = 1

        while True:
            if params is None:
                params = {}
            params['page'] = page
            params['per_page'] = 100

            response = requests.get(
                f"{self.api_base_url}/{endpoint}",
                headers=headers,
                params=params
            )

            response.raise_for_status()
            data = response.json()

            if not data:
                break

            all_data.extend(data)
            print(f"Fetched page {page}, total records: {len(all_data)}")

            page += 1
            time.sleep(0.5)  # Rate limiting

        return all_data

    def transform_data(self, records: List[Dict]) -> List[tuple]:
        """Transform API data to match database schema"""
        transformed = []

        for record in records:
            # Extract and transform fields
            transformed_record = (
                record.get('id'),
                record.get('name'),
                record.get('email'),
                record.get('status', 'active'),
                record.get('created_at'),
                # Add computed fields
                f"{record.get('first_name')} {record.get('last_name')}",
                # Normalize data
                record.get('email', '').lower()
            )
            transformed.append(transformed_record)

        return transformed

    def upsert_to_database(self, table: str, records: List[tuple]):
        """Insert or update records in database"""
        conn = psycopg2.connect(**self.db_config)
        cursor = conn.cursor()

        upsert_query = f"""
            INSERT INTO {table}
                (id, name, email, status, created_at, full_name, email_normalized)
            VALUES (%s, %s, %s, %s, %s, %s, %s)
            ON CONFLICT (id)
            DO UPDATE SET
                name = EXCLUDED.name,
                email = EXCLUDED.email,
                status = EXCLUDED.status,
                updated_at = CURRENT_TIMESTAMP
        """

        cursor.executemany(upsert_query, records)
        conn.commit()

        print(f"Upserted {len(records)} records")

        cursor.close()
        conn.close()

    def sync(self, endpoint: str, table: str, filters: Dict = None):
        """Execute full sync from API to database"""
        print(f"Starting sync: {endpoint} -> {table}")

        # Fetch from API
        api_data = self.fetch_from_api(endpoint, filters)

        # Transform
        transformed_data = self.transform_data(api_data)

        # Load to database
        self.upsert_to_database(table, transformed_data)

        print("Sync completed successfully!")

# Usage
api_config = {
    'base_url': 'https://api.example.com/v1',
    'api_key': 'your-api-key-here'
}

db_config = {
    'host': 'localhost',
    'database': 'analytics',
    'user': 'etl_user',
    'password': 'secure_password'
}

sync = APIToDatabaseSync(api_config, db_config)
sync.sync('customers', 'customers_synced', {'status': 'active'})

Example 4: dbt Transformation Pipeline

SQL-based Transformations with dbt

SQL (dbt)
-- models/staging/stg_customers.sql
-- Stage raw customer data with basic cleaning

{{ config(
    materialized='view',
    tags=['staging', 'customers']
) }}

WITH source AS (
    SELECT * FROM {{ source('raw', 'customers') }}
),

cleaned AS (
    SELECT
        id AS customer_id,
        TRIM(LOWER(email)) AS email,
        TRIM(first_name) AS first_name,
        TRIM(last_name) AS last_name,
        CONCAT(TRIM(first_name), ' ', TRIM(last_name)) AS full_name,
        phone,
        created_at,
        updated_at,
        status
    FROM source
    WHERE email IS NOT NULL
      AND email LIKE '%@%'
)

SELECT * FROM cleaned;

-- models/marts/customers/dim_customers.sql
-- Create customer dimension with enrichments

{{ config(
    materialized='table',
    tags=['marts', 'customers']
) }}

WITH stg_customers AS (
    SELECT * FROM {{ ref('stg_customers') }}
),

customer_orders AS (
    SELECT
        customer_id,
        COUNT(*) AS total_orders,
        SUM(total_amount) AS lifetime_value,
        MAX(order_date) AS last_order_date,
        MIN(order_date) AS first_order_date
    FROM {{ ref('stg_orders') }}
    GROUP BY customer_id
),

final AS (
    SELECT
        c.customer_id,
        c.email,
        c.full_name,
        c.first_name,
        c.last_name,
        c.phone,
        c.status,
        c.created_at,

        -- Enriched fields
        COALESCE(o.total_orders, 0) AS total_orders,
        COALESCE(o.lifetime_value, 0) AS lifetime_value,
        o.last_order_date,
        o.first_order_date,

        -- Customer segmentation
        CASE
            WHEN COALESCE(o.lifetime_value, 0) >= 1000 THEN 'High Value'
            WHEN COALESCE(o.lifetime_value, 0) >= 500 THEN 'Medium Value'
            ELSE 'Low Value'
        END AS customer_segment,

        -- Recency calculation
        DATEDIFF('day', o.last_order_date, CURRENT_DATE) AS days_since_last_order,

        -- Audit fields
        CURRENT_TIMESTAMP AS dbt_updated_at

    FROM stg_customers c
    LEFT JOIN customer_orders o ON c.customer_id = o.customer_id
)

SELECT * FROM final;

Example 5: Apache Airflow DAG

Orchestrating Data Pipeline with Airflow

Python (Airflow)
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'data-team',
    'depends_on_past': False,
    'start_date': datetime(2025, 1, 1),
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 3,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG(
    'customer_data_pipeline',
    default_args=default_args,
    description='Daily customer data ETL pipeline',
    schedule_interval='0 2 * * *',  # Run at 2 AM daily
    catchup=False,
    tags=['etl', 'customers']
)

def extract_data(**context):
    """Extract data from source systems"""
    import psycopg2
    import json

    conn = psycopg2.connect(
        host='source-db.example.com',
        database='production',
        user='etl_user',
        password='password'
    )

    cursor = conn.cursor()
    cursor.execute("""
        SELECT id, name, email, created_at
        FROM customers
        WHERE updated_at >= CURRENT_DATE - INTERVAL '1 day'
    """)

    data = cursor.fetchall()
    cursor.close()
    conn.close()

    # Push to XCom for next task
    context['ti'].xcom_push(key='extracted_data', value=json.dumps(data))
    print(f"Extracted {len(data)} records")

def transform_data(**context):
    """Transform extracted data"""
    import json

    # Pull from XCom
    data = json.loads(context['ti'].xcom_pull(key='extracted_data'))

    transformed = []
    for record in data:
        transformed.append({
            'id': record[0],
            'name': record[1].upper(),
            'email': record[2].lower(),
            'created_at': str(record[3]),
            'processed_at': datetime.now().isoformat()
        })

    context['ti'].xcom_push(key='transformed_data', value=json.dumps(transformed))
    print(f"Transformed {len(transformed)} records")

extract_task = PythonOperator(
    task_id='extract_customers',
    python_callable=extract_data,
    dag=dag
)

transform_task = PythonOperator(
    task_id='transform_customers',
    python_callable=transform_data,
    dag=dag
)

load_task = SnowflakeOperator(
    task_id='load_to_snowflake',
    snowflake_conn_id='snowflake_default',
    sql="""
        COPY INTO customers_staging
        FROM @my_stage/customers/
        FILE_FORMAT = (TYPE = 'JSON')
    """,
    dag=dag
)

validate_task = PostgresOperator(
    task_id='validate_data',
    postgres_conn_id='postgres_default',
    sql="""
        SELECT COUNT(*) FROM customers_staging
        WHERE created_at >= CURRENT_DATE - INTERVAL '1 day'
    """,
    dag=dag
)

# Define task dependencies
extract_task >> transform_task >> load_task >> validate_task