💡 Code Examples

Example 1: Basic Producer (JavaScript/TypeScript)

Publishing events to a Kafka stream using the Kafka client for Node.js.
TypeScript
import { Kafka } from 'kafkajs';

// Initialize Kafka client
const kafka = new Kafka({
  clientId: 'my-app',
  brokers: ['localhost:9092', 'localhost:9093'],
  retry: {
    retries: 5,
    initialRetryTime: 300
  }
});

// Create producer
const producer = kafka.producer({
  allowAutoTopicCreation: true,
  transactionalId: 'my-transactional-producer',
  maxInFlightRequests: 5,
  idempotent: true
});

async function publishEvents() {
  await producer.connect();

  try {
    // Single event
    await producer.send({
      topic: 'user-events',
      messages: [
        {
          key: 'user-123',
          value: JSON.stringify({
            eventType: 'login',
            userId: 123,
            timestamp: new Date().toISOString(),
            metadata: { source: 'web-app', version: '1.0' }
          }),
          headers: {
            'correlation-id': 'abc-123',
            'trace-id': 'xyz-789'
          }
        }
      ]
    });

    // Batch events
    const messages = Array.from({ length: 100 }, (_, i) => ({
      key: `user-${i}`,
      value: JSON.stringify({
        eventType: 'page-view',
        userId: i,
        page: '/home',
        timestamp: new Date().toISOString()
      })
    }));

    await producer.send({
      topic: 'user-events',
      messages,
      compression: 1 // GZIP compression
    });

    console.log('Events published successfully');
  } finally {
    await producer.disconnect();
  }
}

publishEvents().catch(console.error);

Example 2: Consumer with Processing (JavaScript/TypeScript)

Consuming and processing events from a Kafka stream with offset management.
TypeScript
import { Kafka, EachMessagePayload } from 'kafkajs';

const kafka = new Kafka({
  clientId: 'analytics-service',
  brokers: ['localhost:9092']
});

const consumer = kafka.consumer({
  groupId: 'analytics-group',
  sessionTimeout: 30000,
  heartbeatInterval: 3000,
  maxBytesPerPartition: 1048576
});

async function processEvents() {
  await consumer.connect();
  await consumer.subscribe({
    topics: ['user-events', 'transaction-events'],
    fromBeginning: false
  });

  await consumer.run({
    autoCommit: false,
    eachMessage: async ({ topic, partition, message }: EachMessagePayload) => {
      const key = message.key?.toString();
      const value = message.value?.toString();

      try {
        const event = JSON.parse(value || '{}');

        // Process event based on type
        switch (event.eventType) {
          case 'login':
            await handleLogin(event);
            break;
          case 'purchase':
            await handlePurchase(event);
            break;
          case 'page-view':
            await handlePageView(event);
            break;
        }

        // Manually commit offset after successful processing
        await consumer.commitOffsets([
          {
            topic,
            partition,
            offset: (parseInt(message.offset) + 1).toString()
          }
        ]);

        console.log(`Processed event: ${event.eventType} at offset ${message.offset}`);
      } catch (error) {
        console.error('Error processing message:', error);
        // Implement dead-letter queue logic here
      }
    }
  });
}

async function handleLogin(event: any) {
  console.log(`User ${event.userId} logged in at ${event.timestamp}`);
  // Update analytics dashboard, user session tracking, etc.
}

async function handlePurchase(event: any) {
  console.log(`User ${event.userId} purchased ${event.amount}`);
  // Update revenue metrics, inventory, recommendations, etc.
}

async function handlePageView(event: any) {
  console.log(`User ${event.userId} viewed ${event.page}`);
  // Update page analytics, user behavior tracking, etc.
}

processEvents().catch(console.error);

Example 3: Stream Processing with Apache Flink (Java)

Real-time windowed aggregation using Apache Flink for fraud detection.
Java
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;

public class FraudDetectionJob {
    public static void main(String[] args) throws Exception {
        // Set up execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(60000); // Checkpoint every 60 seconds

        // Configure Kafka source
        Properties kafkaProps = new Properties();
        kafkaProps.setProperty("bootstrap.servers", "localhost:9092");
        kafkaProps.setProperty("group.id", "fraud-detection");

        FlinkKafkaConsumer consumer = new FlinkKafkaConsumer<>(
            "transactions",
            new TransactionDeserializationSchema(),
            kafkaProps
        );

        // Create data stream
        DataStream transactions = env.addSource(consumer);

        // Process stream with windowing and aggregation
        DataStream fraudAlerts = transactions
            .filter(tx -> tx.getAmount() > 1000) // High-value transactions
            .keyBy(Transaction::getUserId)
            .timeWindow(Time.minutes(5)) // 5-minute tumbling window
            .aggregate(new TransactionAggregator())
            .filter(agg -> agg.getTransactionCount() > 10) // More than 10 transactions
            .map(agg -> new FraudAlert(
                agg.getUserId(),
                agg.getTransactionCount(),
                agg.getTotalAmount(),
                "Suspicious activity detected"
            ));

        // Write results to Kafka
        FlinkKafkaProducer producer = new FlinkKafkaProducer<>(
            "fraud-alerts",
            new FraudAlertSerializationSchema(),
            kafkaProps,
            FlinkKafkaProducer.Semantic.EXACTLY_ONCE
        );

        fraudAlerts.addSink(producer);

        // Execute job
        env.execute("Fraud Detection Job");
    }
}

Example 4: Python Consumer with Real-time Analytics

Processing IoT sensor data streams using Python Kafka client with pandas for analytics.
Python
from kafka import KafkaConsumer
import json
import pandas as pd
from datetime import datetime, timedelta

# Initialize consumer
consumer = KafkaConsumer(
    'sensor-data',
    bootstrap_servers=['localhost:9092'],
    group_id='iot-analytics',
    auto_offset_reset='latest',
    enable_auto_commit=False,
    value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)

# Sliding window buffer
window_size = timedelta(minutes=5)
event_buffer = []

def process_sensor_data():
    for message in consumer:
        event = message.value
        timestamp = datetime.fromisoformat(event['timestamp'])

        # Add to buffer
        event_buffer.append({
            'timestamp': timestamp,
            'sensor_id': event['sensorId'],
            'temperature': event['temperature'],
            'humidity': event['humidity'],
            'pressure': event['pressure']
        })

        # Remove old events outside window
        cutoff_time = datetime.now() - window_size
        event_buffer[:] = [e for e in event_buffer if e['timestamp'] > cutoff_time]

        # Analyze with pandas
        if len(event_buffer) >= 100:
            df = pd.DataFrame(event_buffer)

            # Compute statistics
            stats = {
                'avg_temperature': df['temperature'].mean(),
                'max_temperature': df['temperature'].max(),
                'min_temperature': df['temperature'].min(),
                'std_temperature': df['temperature'].std(),
                'sensor_count': df['sensor_id'].nunique(),
                'event_count': len(df)
            }

            # Detect anomalies
            temp_threshold = stats['avg_temperature'] + (2 * stats['std_temperature'])
            anomalies = df[df['temperature'] > temp_threshold]

            if not anomalies.empty:
                print(f"⚠️  Anomaly detected: {len(anomalies)} sensors exceeding threshold")
                print(f"   Avg: {stats['avg_temperature']:.2f}°C, Threshold: {temp_threshold:.2f}°C")

                # Publish alert
                for _, row in anomalies.iterrows():
                    publish_alert(row['sensor_id'], row['temperature'])

        # Commit offset
        consumer.commit()

def publish_alert(sensor_id, temperature):
    # Publish to alert topic
    print(f"   Alert: Sensor {sensor_id} at {temperature:.2f}°C")

if __name__ == '__main__':
    try:
        process_sensor_data()
    except KeyboardInterrupt:
        print("Shutting down consumer...")
    finally:
        consumer.close()

Example 5: Go Producer with High Throughput

High-performance event producer in Go with batching and compression.
Go
package main

import (
    "context"
    "encoding/json"
    "fmt"
    "log"
    "time"

    "github.com/segmentio/kafka-go"
    "github.com/segmentio/kafka-go/compress"
)

type Event struct {
    EventID   string                 `json:"eventId"`
    EventType string                 `json:"eventType"`
    Timestamp time.Time              `json:"timestamp"`
    UserID    string                 `json:"userId"`
    Data      map[string]interface{} `json:"data"`
}

func main() {
    // Configure writer with batching and compression
    writer := &kafka.Writer{
        Addr:         kafka.TCP("localhost:9092"),
        Topic:        "high-throughput-events",
        Balancer:     &kafka.Hash{},
        Compression:  compress.Snappy,
        BatchSize:    100,
        BatchTimeout: 10 * time.Millisecond,
        MaxAttempts:  5,
        RequiredAcks: kafka.RequireOne,
    }
    defer writer.Close()

    // Produce events at high rate
    ctx := context.Background()
    eventCount := 0

    ticker := time.NewTicker(time.Millisecond)
    defer ticker.Stop()

    batch := make([]kafka.Message, 0, 100)

    for range ticker.C {
        // Create event
        event := Event{
            EventID:   fmt.Sprintf("evt_%d", eventCount),
            EventType: "user-activity",
            Timestamp: time.Now(),
            UserID:    fmt.Sprintf("user_%d", eventCount%1000),
            Data: map[string]interface{}{
                "action": "click",
                "page":   "/home",
            },
        }

        payload, err := json.Marshal(event)
        if err != nil {
            log.Printf("Error marshaling event: %v", err)
            continue
        }

        // Add to batch
        batch = append(batch, kafka.Message{
            Key:   []byte(event.UserID),
            Value: payload,
            Headers: []kafka.Header{
                {Key: "event-type", Value: []byte(event.EventType)},
                {Key: "timestamp", Value: []byte(event.Timestamp.Format(time.RFC3339))},
            },
        })

        // Write batch when full
        if len(batch) >= 100 {
            err := writer.WriteMessages(ctx, batch...)
            if err != nil {
                log.Printf("Error writing messages: %v", err)
            } else {
                eventCount += len(batch)
                if eventCount%1000 == 0 {
                    log.Printf("Produced %d events", eventCount)
                }
            }
            batch = batch[:0] // Clear batch
        }
    }
}

Example 6: Event Sourcing Pattern

Implementing event sourcing with Kafka for maintaining aggregate state.
TypeScript
import { Kafka, EachMessagePayload } from 'kafkajs';

interface OrderEvent {
  eventId: string;
  eventType: 'OrderCreated' | 'OrderPaid' | 'OrderShipped' | 'OrderDelivered';
  orderId: string;
  timestamp: string;
  data: any;
}

class OrderAggregate {
  orderId: string;
  status: string;
  items: any[];
  totalAmount: number;
  createdAt: string;
  updatedAt: string;

  constructor(orderId: string) {
    this.orderId = orderId;
    this.status = 'pending';
    this.items = [];
    this.totalAmount = 0;
    this.createdAt = new Date().toISOString();
    this.updatedAt = new Date().toISOString();
  }

  apply(event: OrderEvent) {
    switch (event.eventType) {
      case 'OrderCreated':
        this.items = event.data.items;
        this.totalAmount = event.data.totalAmount;
        this.status = 'created';
        break;
      case 'OrderPaid':
        this.status = 'paid';
        break;
      case 'OrderShipped':
        this.status = 'shipped';
        break;
      case 'OrderDelivered':
        this.status = 'delivered';
        break;
    }
    this.updatedAt = event.timestamp;
  }
}

class OrderProjection {
  private orders: Map = new Map();
  private kafka = new Kafka({ clientId: 'order-projection', brokers: ['localhost:9092'] });
  private consumer = this.kafka.consumer({ groupId: 'order-projection-group' });

  async start() {
    await this.consumer.connect();
    await this.consumer.subscribe({ topic: 'order-events', fromBeginning: true });

    await this.consumer.run({
      eachMessage: async ({ message }: EachMessagePayload) => {
        const event: OrderEvent = JSON.parse(message.value!.toString());

        // Get or create aggregate
        let order = this.orders.get(event.orderId);
        if (!order) {
          order = new OrderAggregate(event.orderId);
          this.orders.set(event.orderId, order);
        }

        // Apply event to rebuild state
        order.apply(event);

        console.log(`Order ${event.orderId} status: ${order.status}`);
      }
    });
  }

  getOrder(orderId: string): OrderAggregate | undefined {
    return this.orders.get(orderId);
  }
}

// Usage
const projection = new OrderProjection();
projection.start();