Skip to content

iamazy/kafkas

Folders and files

NameName
Last commit message
Last commit date

Latest commit

e650773 · Mar 22, 2024

History

11 Commits
Mar 21, 2024
Feb 24, 2023
Mar 22, 2024
Mar 22, 2024
Dec 10, 2022
Dec 10, 2022
Feb 24, 2023
Mar 21, 2024
Dec 10, 2022
Feb 24, 2023
Mar 21, 2024
Mar 21, 2024

Repository files navigation

kafkas

Async kafka client in pure Rust.

Features

  • Multiple async runtime (tokio, async-std, etc.)
  • All versions of kafka are supported
  • Compression (gzip, snappy, lz4)

APIs

  • Producer
  • Consumer
  • Streams
  • Connect
  • Admin client

Usage

[dependencies]
kafkas = { git = "https://github.com/iamazy/kafkas", branch = "main" }

To get started using kafkas:

  • Producer
#[tokio::main]
async fn main() -> Result<(), Box<Error>> {
    let client = Kafka::new("127.0.0.1:9092", KafkaOptions::default(), TokioExecutor).await?;

    let producer = Producer::new(client, ProducerOptions::default()).await?;

    let (mut tx, mut rx) = futures::channel::mpsc::unbounded();
    tokio::task::spawn(Box::pin(async move {
        while let Some(fut) = rx.next().await {
            if let Err(e) = fut.await {
                error!("{e}");
            }
        }
    }));

    let topic = topic_name("kafka");
    for _ in 0..10000_0000 {
        let record = TestData::new("hello kafka");
        let ret = producer.send(&topic, record).await?;
        let _ = tx.send(ret).await;
    }
}
  • Consumer
#[tokio::main]
async fn main() -> Result<(), Box<Error>> {
    let client = Kafka::new("127.0.0.1:9092", KafkaOptions::default(), TokioExecutor).await?;

    let mut consumer_options = ConsumerOptions::new("default");
    consumer_options.auto_commit_enabled = false;
    
    let mut consumer = Consumer::new(kafka_client, consumer_options).await?;
    
    let consume_stream = consumer.subscribe::<&str, ConsumerRecord>(vec!["kafka"]).await?;
    pin_mut!(consume_stream);

    while let Some(records) = consume_stream.next().await {
        for record in records {
            if let Some(value) = record.value {
                println!("{:?} - {}", String::from_utf8(value.to_vec())?, record.offset);
            }
        }
        // needed only when `auto_commit_enabled` is false
        consumer.commit_async().await?;
    }
}

Examples

Examples can be found in examples.

Flame graph

flamegraph

Rust version requirements

The rust version used for kafkas development is 1.65.

Acknowledgments

Releases

No releases published

Packages

No packages published

Languages