import { Kafka } from 'kafkajs';
// Initialize Kafka client
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['localhost:9092', 'localhost:9093'],
retry: {
retries: 5,
initialRetryTime: 300
}
});
// Create producer
const producer = kafka.producer({
allowAutoTopicCreation: true,
transactionalId: 'my-transactional-producer',
maxInFlightRequests: 5,
idempotent: true
});
async function publishEvents() {
await producer.connect();
try {
// Single event
await producer.send({
topic: 'user-events',
messages: [
{
key: 'user-123',
value: JSON.stringify({
eventType: 'login',
userId: 123,
timestamp: new Date().toISOString(),
metadata: { source: 'web-app', version: '1.0' }
}),
headers: {
'correlation-id': 'abc-123',
'trace-id': 'xyz-789'
}
}
]
});
// Batch events
const messages = Array.from({ length: 100 }, (_, i) => ({
key: `user-${i}`,
value: JSON.stringify({
eventType: 'page-view',
userId: i,
page: '/home',
timestamp: new Date().toISOString()
})
}));
await producer.send({
topic: 'user-events',
messages,
compression: 1 // GZIP compression
});
console.log('Events published successfully');
} finally {
await producer.disconnect();
}
}
publishEvents().catch(console.error);
import { Kafka, EachMessagePayload } from 'kafkajs';
const kafka = new Kafka({
clientId: 'analytics-service',
brokers: ['localhost:9092']
});
const consumer = kafka.consumer({
groupId: 'analytics-group',
sessionTimeout: 30000,
heartbeatInterval: 3000,
maxBytesPerPartition: 1048576
});
async function processEvents() {
await consumer.connect();
await consumer.subscribe({
topics: ['user-events', 'transaction-events'],
fromBeginning: false
});
await consumer.run({
autoCommit: false,
eachMessage: async ({ topic, partition, message }: EachMessagePayload) => {
const key = message.key?.toString();
const value = message.value?.toString();
try {
const event = JSON.parse(value || '{}');
// Process event based on type
switch (event.eventType) {
case 'login':
await handleLogin(event);
break;
case 'purchase':
await handlePurchase(event);
break;
case 'page-view':
await handlePageView(event);
break;
}
// Manually commit offset after successful processing
await consumer.commitOffsets([
{
topic,
partition,
offset: (parseInt(message.offset) + 1).toString()
}
]);
console.log(`Processed event: ${event.eventType} at offset ${message.offset}`);
} catch (error) {
console.error('Error processing message:', error);
// Implement dead-letter queue logic here
}
}
});
}
async function handleLogin(event: any) {
console.log(`User ${event.userId} logged in at ${event.timestamp}`);
// Update analytics dashboard, user session tracking, etc.
}
async function handlePurchase(event: any) {
console.log(`User ${event.userId} purchased ${event.amount}`);
// Update revenue metrics, inventory, recommendations, etc.
}
async function handlePageView(event: any) {
console.log(`User ${event.userId} viewed ${event.page}`);
// Update page analytics, user behavior tracking, etc.
}
processEvents().catch(console.error);
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
public class FraudDetectionJob {
public static void main(String[] args) throws Exception {
// Set up execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(60000); // Checkpoint every 60 seconds
// Configure Kafka source
Properties kafkaProps = new Properties();
kafkaProps.setProperty("bootstrap.servers", "localhost:9092");
kafkaProps.setProperty("group.id", "fraud-detection");
FlinkKafkaConsumer consumer = new FlinkKafkaConsumer<>(
"transactions",
new TransactionDeserializationSchema(),
kafkaProps
);
// Create data stream
DataStream transactions = env.addSource(consumer);
// Process stream with windowing and aggregation
DataStream fraudAlerts = transactions
.filter(tx -> tx.getAmount() > 1000) // High-value transactions
.keyBy(Transaction::getUserId)
.timeWindow(Time.minutes(5)) // 5-minute tumbling window
.aggregate(new TransactionAggregator())
.filter(agg -> agg.getTransactionCount() > 10) // More than 10 transactions
.map(agg -> new FraudAlert(
agg.getUserId(),
agg.getTransactionCount(),
agg.getTotalAmount(),
"Suspicious activity detected"
));
// Write results to Kafka
FlinkKafkaProducer producer = new FlinkKafkaProducer<>(
"fraud-alerts",
new FraudAlertSerializationSchema(),
kafkaProps,
FlinkKafkaProducer.Semantic.EXACTLY_ONCE
);
fraudAlerts.addSink(producer);
// Execute job
env.execute("Fraud Detection Job");
}
}
from kafka import KafkaConsumer
import json
import pandas as pd
from datetime import datetime, timedelta
# Initialize consumer
consumer = KafkaConsumer(
'sensor-data',
bootstrap_servers=['localhost:9092'],
group_id='iot-analytics',
auto_offset_reset='latest',
enable_auto_commit=False,
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
# Sliding window buffer
window_size = timedelta(minutes=5)
event_buffer = []
def process_sensor_data():
for message in consumer:
event = message.value
timestamp = datetime.fromisoformat(event['timestamp'])
# Add to buffer
event_buffer.append({
'timestamp': timestamp,
'sensor_id': event['sensorId'],
'temperature': event['temperature'],
'humidity': event['humidity'],
'pressure': event['pressure']
})
# Remove old events outside window
cutoff_time = datetime.now() - window_size
event_buffer[:] = [e for e in event_buffer if e['timestamp'] > cutoff_time]
# Analyze with pandas
if len(event_buffer) >= 100:
df = pd.DataFrame(event_buffer)
# Compute statistics
stats = {
'avg_temperature': df['temperature'].mean(),
'max_temperature': df['temperature'].max(),
'min_temperature': df['temperature'].min(),
'std_temperature': df['temperature'].std(),
'sensor_count': df['sensor_id'].nunique(),
'event_count': len(df)
}
# Detect anomalies
temp_threshold = stats['avg_temperature'] + (2 * stats['std_temperature'])
anomalies = df[df['temperature'] > temp_threshold]
if not anomalies.empty:
print(f"⚠️ Anomaly detected: {len(anomalies)} sensors exceeding threshold")
print(f" Avg: {stats['avg_temperature']:.2f}°C, Threshold: {temp_threshold:.2f}°C")
# Publish alert
for _, row in anomalies.iterrows():
publish_alert(row['sensor_id'], row['temperature'])
# Commit offset
consumer.commit()
def publish_alert(sensor_id, temperature):
# Publish to alert topic
print(f" Alert: Sensor {sensor_id} at {temperature:.2f}°C")
if __name__ == '__main__':
try:
process_sensor_data()
except KeyboardInterrupt:
print("Shutting down consumer...")
finally:
consumer.close()
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"time"
"github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go/compress"
)
type Event struct {
EventID string `json:"eventId"`
EventType string `json:"eventType"`
Timestamp time.Time `json:"timestamp"`
UserID string `json:"userId"`
Data map[string]interface{} `json:"data"`
}
func main() {
// Configure writer with batching and compression
writer := &kafka.Writer{
Addr: kafka.TCP("localhost:9092"),
Topic: "high-throughput-events",
Balancer: &kafka.Hash{},
Compression: compress.Snappy,
BatchSize: 100,
BatchTimeout: 10 * time.Millisecond,
MaxAttempts: 5,
RequiredAcks: kafka.RequireOne,
}
defer writer.Close()
// Produce events at high rate
ctx := context.Background()
eventCount := 0
ticker := time.NewTicker(time.Millisecond)
defer ticker.Stop()
batch := make([]kafka.Message, 0, 100)
for range ticker.C {
// Create event
event := Event{
EventID: fmt.Sprintf("evt_%d", eventCount),
EventType: "user-activity",
Timestamp: time.Now(),
UserID: fmt.Sprintf("user_%d", eventCount%1000),
Data: map[string]interface{}{
"action": "click",
"page": "/home",
},
}
payload, err := json.Marshal(event)
if err != nil {
log.Printf("Error marshaling event: %v", err)
continue
}
// Add to batch
batch = append(batch, kafka.Message{
Key: []byte(event.UserID),
Value: payload,
Headers: []kafka.Header{
{Key: "event-type", Value: []byte(event.EventType)},
{Key: "timestamp", Value: []byte(event.Timestamp.Format(time.RFC3339))},
},
})
// Write batch when full
if len(batch) >= 100 {
err := writer.WriteMessages(ctx, batch...)
if err != nil {
log.Printf("Error writing messages: %v", err)
} else {
eventCount += len(batch)
if eventCount%1000 == 0 {
log.Printf("Produced %d events", eventCount)
}
}
batch = batch[:0] // Clear batch
}
}
}
import { Kafka, EachMessagePayload } from 'kafkajs';
interface OrderEvent {
eventId: string;
eventType: 'OrderCreated' | 'OrderPaid' | 'OrderShipped' | 'OrderDelivered';
orderId: string;
timestamp: string;
data: any;
}
class OrderAggregate {
orderId: string;
status: string;
items: any[];
totalAmount: number;
createdAt: string;
updatedAt: string;
constructor(orderId: string) {
this.orderId = orderId;
this.status = 'pending';
this.items = [];
this.totalAmount = 0;
this.createdAt = new Date().toISOString();
this.updatedAt = new Date().toISOString();
}
apply(event: OrderEvent) {
switch (event.eventType) {
case 'OrderCreated':
this.items = event.data.items;
this.totalAmount = event.data.totalAmount;
this.status = 'created';
break;
case 'OrderPaid':
this.status = 'paid';
break;
case 'OrderShipped':
this.status = 'shipped';
break;
case 'OrderDelivered':
this.status = 'delivered';
break;
}
this.updatedAt = event.timestamp;
}
}
class OrderProjection {
private orders: Map = new Map();
private kafka = new Kafka({ clientId: 'order-projection', brokers: ['localhost:9092'] });
private consumer = this.kafka.consumer({ groupId: 'order-projection-group' });
async start() {
await this.consumer.connect();
await this.consumer.subscribe({ topic: 'order-events', fromBeginning: true });
await this.consumer.run({
eachMessage: async ({ message }: EachMessagePayload) => {
const event: OrderEvent = JSON.parse(message.value!.toString());
// Get or create aggregate
let order = this.orders.get(event.orderId);
if (!order) {
order = new OrderAggregate(event.orderId);
this.orders.set(event.orderId, order);
}
// Apply event to rebuild state
order.apply(event);
console.log(`Order ${event.orderId} status: ${order.status}`);
}
});
}
getOrder(orderId: string): OrderAggregate | undefined {
return this.orders.get(orderId);
}
}
// Usage
const projection = new OrderProjection();
projection.start();