Skip to content

gabrielmbmb/kaflow

Repository files navigation

kaflow

Kafka streams topic processing in Python.

Test


kaflow is a simple framework that allows you to build Kafka streams processing aplications in Python with ease.

Some of the features offered by kaflow:

  • Dependency Injection system inspired by FastAPI and xpresso, and backed by di.
  • Automatic deserialization of incoming messages and serialization of outgoing messages. Supports popular formats like JSON, Avro or Protobuf.
  • Message validation thanks to pydantic.

Requirements

Python 3.8+

Installation

pip install kaflow

Example

from kaflow import (
    FromHeader,
    FromKey,
    FromValue,
    Json,
    Kaflow,
    Message,
    MessageOffset,
    MessagePartition,
    MessageTimestamp,
    String,
)
from pydantic import BaseModel


class UserClick(BaseModel):
    user_id: int
    url: str
    timestamp: int


class Key(BaseModel):
    environment: str


app = Kaflow(name="AwesomeKakfaApp", brokers="localhost:9092")


@app.consume(topic="user_clicks", sink_topics=["user_clicks_json"])
async def consume_user_clicks(
    message: FromValue[Json[UserClick]],
    key: FromKey[Json[Key]],
    x_correlation_id: FromHeader[String[str]],
    x_request_id: FromHeader[String[str]],
    partition: MessagePartition,
    offset: MessageOffset,
    timestamp: MessageTimestamp,
) -> Message:
    # Do something with the message
    ...

    # Publish to another topic
    return Message(value=b'{"user_clicked": "true"}')


app.run()