Copied to clipboard

Kafka Streaming

Stream conversation events from tapes to Kafka for downstream processing.

When to Use Kafka

tapes stores conversations locally. Kafka publishing adds a parallel event stream for external systems. Publishing is best-effort — storage success is never blocked by publish failures.

Use Kafka when you need:

  • Real-time event streaming — Feed conversation data to analytics pipelines
  • Event-driven architectures — Trigger workflows from conversation events
  • Data warehousing — Route events to Clickhouse, Snowflake, or other sinks
Publishing is best-effort

If Kafka is unreachable, tapes continues storing conversations normally. No data is lost from storage — only the event stream is interrupted.

Prerequisites

Start a Kafka broker:

docker run --rm --name kafka -p 9092:9092 \
  -e KAFKA_ADVERTISED_LISTENERS='PLAINTEXT://localhost:29092,PLAINTEXT_HOST://localhost:9092' \
  confluentinc/confluent-local:latest

Create a topic:

docker exec kafka kafka-topics \
  --bootstrap-server localhost:29092 \
  --create --if-not-exists \
  --topic tapes.nodes.v1 \
  --partitions 3 \
  --replication-factor 1

Quick Start

Pass Kafka flags to the proxy server:

tapes serve proxy \
  --sqlite ./tapes.sqlite \
  --kafka-brokers localhost:9092 \
  --kafka-topic tapes.nodes.v1

Events are published after each conversation turn is stored. All events in a conversation share the same partition key (root_hash).

CLI Flags

Flag Description
--kafka-brokers Comma-separated broker addresses (e.g., localhost:9092). Required for Kafka publishing.
--kafka-topic Topic name for conversation events. Required for Kafka publishing.
--kafka-client-id Optional client identifier sent to the broker.

Kafka flags are only available on tapes serve proxy, not the combined tapes serve command.

Event Schema

Each published event follows the tapes.node.v1 schema:

{
  "schema": "tapes.node.v1",
  "root_hash": "339de8b9a547...efd73f",
  "occurred_at": "2026-03-03T16:54:29.217197-07:00",
  "node": {
    "hash": "339de8b9a547...efd73f",
    "parent_hash": null,
    "bucket": {
      "type": "message",
      "role": "user",
      "content": [{"type": "text", "text": "Hello"}],
      "model": "gemma3:latest",
      "provider": "ollama"
    },
    "project": "my-app"
  }
}
  • schema — Event version identifier (tapes.node.v1)
  • root_hash — Conversation root hash, used as the Kafka partition key
  • occurred_at — Timestamp when the event was published
  • node — The Merkle DAG node payload (same structure as stored nodes)

Verify Events

Consume events from your topic to confirm publishing works:

docker exec kafka kafka-console-consumer \
  --bootstrap-server localhost:29092 \
  --topic tapes.nodes.v1 \
  --from-beginning

If you have kaf installed, you can also run kaf consume --brokers localhost:9092 tapes.nodes.v1.

Config File

Save Kafka settings in .tapes/config.toml:

[publisher.kafka]
brokers = "localhost:9092"
topic = "tapes.nodes.v1"
client_id = "my-app"

CLI flags override config file values.

What's Next

For downstream processing with Apache Flink — routing events to Clickhouse, Snowflake, or other sinks — a dedicated guide is coming soon.

Last updated: