⚙️ API Reference

Stream Producer API

APIs for producing events to streaming platforms.

POST /api/v1/streams/{streamName}/events

Publish a single event to a stream.

Path Parameters

Parameter Type Description
streamName string Required. Name of the target stream

Request Body

{
  "key": "user-123",
  "value": {
    "eventType": "user-action",
    "timestamp": "2025-01-15T10:30:00Z",
    "data": {
      "action": "click",
      "element": "checkout-button"
    }
  },
  "headers": {
    "correlationId": "abc-123",
    "source": "web-app"
  },
  "partitionKey": "user-123"
}

Response

{
  "eventId": "evt_1234567890",
  "partition": 3,
  "offset": 1234567,
  "timestamp": "2025-01-15T10:30:00.123Z",
  "status": "published"
}

POST /api/v1/streams/{streamName}/events/batch

Publish multiple events in a single batch.

Request Body

{
  "events": [
    {
      "key": "user-123",
      "value": { "eventType": "login", "userId": "123" }
    },
    {
      "key": "user-456",
      "value": { "eventType": "purchase", "amount": 99.99 }
    }
  ],
  "options": {
    "compression": "gzip",
    "timeout": 5000
  }
}

Response

{
  "batchId": "batch_abc123",
  "totalEvents": 2,
  "successCount": 2,
  "failureCount": 0,
  "results": [
    { "eventId": "evt_001", "partition": 0, "offset": 100 },
    { "eventId": "evt_002", "partition": 1, "offset": 200 }
  ]
}

Stream Consumer API

APIs for consuming events from streaming platforms.

GET /api/v1/streams/{streamName}/events

Consume events from a stream with optional filtering.

Query Parameters

Parameter Type Description
consumerGroup string Required. Consumer group identifier
offset string Optional. Starting offset (earliest, latest, or number)
limit integer Optional. Maximum events to return (default: 100)
timeout integer Optional. Poll timeout in milliseconds (default: 1000)

Response

{
  "events": [
    {
      "eventId": "evt_001",
      "partition": 0,
      "offset": 1234,
      "timestamp": "2025-01-15T10:30:00Z",
      "key": "user-123",
      "value": {
        "eventType": "user-action",
        "data": { "action": "click" }
      }
    }
  ],
  "metadata": {
    "consumerGroup": "analytics-group",
    "lag": 0,
    "nextOffset": 1235
  }
}

POST /api/v1/consumers/{consumerGroup}/commit

Manually commit offsets for a consumer group.

Request Body

{
  "offsets": [
    {
      "stream": "user-events",
      "partition": 0,
      "offset": 1234
    },
    {
      "stream": "user-events",
      "partition": 1,
      "offset": 5678
    }
  ]
}

Response

{
  "status": "committed",
  "committedOffsets": [
    { "stream": "user-events", "partition": 0, "offset": 1234 },
    { "stream": "user-events", "partition": 1, "offset": 5678 }
  ]
}

Stream Management API

APIs for managing streams and topics.

POST /api/v1/streams

Create a new stream.

Request Body

{
  "name": "user-events",
  "partitions": 10,
  "replicationFactor": 3,
  "config": {
    "retentionMs": 604800000,
    "compressionType": "gzip",
    "maxMessageBytes": 1048576
  }
}

GET /api/v1/streams/{streamName}

Get stream metadata and statistics.

Response

{
  "name": "user-events",
  "partitions": 10,
  "replicationFactor": 3,
  "totalMessages": 1234567,
  "totalSize": 1048576000,
  "config": {
    "retentionMs": 604800000,
    "compressionType": "gzip"
  },
  "partitionDetails": [
    {
      "partition": 0,
      "leader": "broker-1",
      "replicas": ["broker-1", "broker-2", "broker-3"],
      "startOffset": 0,
      "endOffset": 123456
    }
  ]
}

DELETE /api/v1/streams/{streamName}

Delete a stream and all its data.

Response

{
  "status": "deleted",
  "streamName": "user-events",
  "deletedAt": "2025-01-15T10:30:00Z"
}

Stream Processing API

APIs for stream processing operations.

POST /api/v1/processors

Create a stream processing pipeline.

Request Body

{
  "name": "fraud-detection",
  "source": "transactions",
  "operations": [
    {
      "type": "filter",
      "predicate": "value.amount > 1000"
    },
    {
      "type": "map",
      "function": "enrichWithUserData"
    },
    {
      "type": "window",
      "windowType": "tumbling",
      "duration": 300000
    },
    {
      "type": "aggregate",
      "aggregation": "count",
      "groupBy": "userId"
    }
  ],
  "sink": "fraud-alerts",
  "config": {
    "parallelism": 4,
    "checkpointInterval": 60000
  }
}

Response

{
  "processorId": "proc_abc123",
  "status": "running",
  "createdAt": "2025-01-15T10:30:00Z",
  "metrics": {
    "eventsProcessed": 0,
    "throughput": 0,
    "lag": 0
  }
}

GET /api/v1/processors/{processorId}/metrics

Get real-time metrics for a stream processor.

Response

{
  "processorId": "proc_abc123",
  "status": "running",
  "metrics": {
    "eventsProcessed": 1234567,
    "throughput": 5000,
    "avgLatency": 45,
    "lag": 100,
    "errorRate": 0.001
  },
  "windows": [
    {
      "windowStart": "2025-01-15T10:00:00Z",
      "windowEnd": "2025-01-15T10:05:00Z",
      "eventsCount": 5000
    }
  ]
}

Error Codes

Code Message Description
1001 STREAM_NOT_FOUND The specified stream does not exist
1002 INVALID_OFFSET The specified offset is invalid or out of range
1003 CONSUMER_GROUP_NOT_FOUND The specified consumer group does not exist
1004 SERIALIZATION_ERROR Failed to serialize/deserialize event data
1005 QUOTA_EXCEEDED Request rate or throughput quota exceeded
1006 TIMEOUT Operation timed out