Skip to content

josue-lubaki/kafka-beginners-course

Repository files navigation

kafka-beginners-course

introduction to the concept related to the use of Kafka

Kafka est un système distribué, possèdant une architecture resiliente et tolérant aux pannes. capable de supporter des gros debits des messages par seconde.
Use Case
  • Messaging system
  • Activity tracking
  • Collecter des métriques àpartir de nombreux emplacements
  • Journaux d'application
  • Traitement de flux
  • Découpler les dépendances système et microservices (Spark, Flink, Hadoop)
  • etc...

Topic

  • a particular stream of data, it's like a table in a database (without all the constraints)
  • You can have as many topics as you want
  • A topic is identified by its name
  • kafka topics are immutable : once data is written to a partition, it cannot be changed. Vous pouvez continuer à écrire sur le partition, mais pas mettre à jour ni supprimer
  • data kept only for a limited time (default is one week - configurable)
  • Order is guaranteed only within partition (not across partitions)

Producers

  • Producers write data to topics (which are made of partitions)
  • In case of kafka broker failures, producers will automatically recover
  • Producers can choose to send a key with the message (String, number binary, etc...)
  • if key = null, data is sent round robin (partition 0, then 1, then 2,...)
  • if key != null, then all messages for that will always go to the same partition (hashing)
  • A key are typically sent if you need message ordering for a specific field
  • In the defaut kafka partitionner, the keys are hashed using the murmur2 algorithm, will the formula below for the curious:
targetPartition = Math.abs(Utils.murmur2(keyBytes)) % (numPartition - 1)

Consumers

  • Consumers read data from a topic (identified by name) - ce n'est pas kafka qui transmet automatiquement les données aux consumers, mais ils doivent le demander (pull model)
  • Consumers automatically know wich broker to read from
  • In case of broker failures, consumers know how to recover
  • Data is read in order from low to high offset within each partitions
  • if a consumer dies, it willbe able be able to read back from where it left off, thanks to the committed consumer offsets !

Deserializer

  • Deserializer indicates how to transfor bytes into objects/data
  • They are used on the value and the key of the message
  • Common deserializers (JSON, Avro, protobuf, etc...)
  • the serialization/deserialization type must not change during a topic lifecycle (create a new topic instead)

Brokers

  • A kafka cluster is composed of multiple brokers (servers) - ils reçoivent et envoient des données
  • Each broker is identified with its ID (integer)
  • Each broker contains certain topic partitions
  • il n'est pas important de connaître tous les brokers du cluster, il suffit juste de se connaître comment se connecter à un broker et les clients se connecteront automatiquement aux autres.
  • Each broker knows about all brokers, topics and partitions (metadata)

Producer Acknowledgement

    Producer can choose to receive acknowledgement of data writes :
  • acks = 0 : Producer won't wait for acknowledgement (possible data lose)
  • acks = 1 : producer will wait for leader acknowledgement (limited data loss)
  • acks = all : leader + replicas acknowledgement (no data loss)
As a rule, for a replication factor of N, you can permanently lose up to N-1 brokers and still recover your data

Producer Default Partition when key = null

  • Round Robin : for kafka 2.3 and below
  • Sticky Partition : for kafka 2.4 and above
  • Sticky Partition improves the performance of the producer especially when high throughput when the key is null

    Zookeeper

    • Zookeeper manages brokers (keeps a list of them)
    • Zookeeper helps in performing leader election for partitions
    • Zookeeper sends notifications to kafka in case of changes (e.g. new topic broker dies, broker comes up, delete topics, etc...)
    • Zookeeper by design operates with an odd number of servers (1, 3, 5, 7)
    • Zookeeper has a leader (writes) the rest of the servers are followers (reads)

    Start Zookeeper and kafka with Docker

    docker-compose.yml
        version: '3'
    
        services:
          zookeeper:
            image: wurstmeister/zookeeper
            container_name: zookeeper
            ports:
              - "2181:2181"
          kafka:
            image: wurstmeister/kafka
            container_name: kafka
            ports:
              - "9092:9092"
            environment:
              KAFKA_ADVERTISED_HOST_NAME: localhost
              KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181

    CLI commands

    Start Zookeeper and kafka server

    zookeeper-server-start.sh ~/kafka_<version>/config/zookeeper.properties
    kafka-server-start.sh ~/kafka_<version>/config/server.properties

    Topics

  • List of topics
  • kafka-topics.sh --bootstrap-server localhost:9092 --list
  • Create a topic
  • kafka-topics.sh --bootstrap-server localhost:9092 --create --topic [topic_name] \
    --partitions 3 --replication-factor 1
  • Describe a topic
  • kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic [topic_name]

    Producer

  • create a producer
  • kafka-console-producer.sh --bootstrap-server localhost:9092 --topic [topic_name] \
    --producer-property acks=all --property parse.key=true --property key.separator=:

    Consumer

  • create a consumer
  • kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic [topic_name] --from-beginning
    [base] --formatter kafka.tools.DefaultMessageFormatter \
    --property print.timestamp=true --property print.key=true \
    --property print.value=true --from-beginning

    Consumer Group

  • List of Consumer groups
  • kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
  • Describe
  • kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group [group_name]
  • Reset
  • [base] --group [group_name] --reset-offsets --execute --to-earliest \
    [--all-topics | --topic {topic_name}]
  • Reculer de N Lag partition
  • [base] --group [group_name] --reset-offsets --execute --shift-by -N \
    [--all-topics | --topic {topic_name}]

    Properties

    retry.backoff.ms (default 100ms) : combien de temps attendre avant de retenter
    delivery.timeout.ms (default 120 000 ms) : Records will be failed if they can't be acknowledgement within time
    max.in.flight.requests.per.connection (default 5) : For this, you can set the setting while controls how many produce requests can be made in parallel; set it to 1 if you ensure ordering (may impact throughput)

    Since kafka 3.0, the producer is "safe" by default :

  • acks=all (-1)
  • enable.idempotence=true
  • retries=MAX_INT
  • max.in.flight.requests.per.connection=5
  • Message Compression at the Broker / Topic level

    compression.type=producer (default), the broker takes the compressed batch from the producer client and writes it directly to the topic's log fil without recompressing the data.
    compression.type=none : all batches are decompressed by the broker
    compression.type=lz4 : (for exemple)
    1. if it's matching the producer setting, data is stored on disk as is
    2. if it's different compression setting, batches are decompressed by the broker and then recompressed using the compression algorithm specified
    linger.ms : (default 0) : how long to wait until we send a batch. Adding a small number for example 5 ms helps add more messages in the batch at the expense of latency.
    batch.size (default 16KB) : if a batch is filled before linger.ms, increase the batch size. Increasing a batch size to something like 32KB or 64KB can help increasing the compression, throughput, and efficiency of requests.
    properties.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
    properties.setProperty(ProducerConfig.LINGER_MS_CONFIG, "20");
    properties.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, Integer.toString(32 * 1024))
    max.block.ms=60000 : the time .send() will block until throwing an exception

    Exceptions are thrown when :

        
  • The producer has filed up its buffer
  • The broker is not accepting any new data
  • 60 seconds has elapsed
  • Delivery Semantics

    1. At least once : offsets are committed as soon as the message batch is received If the processing goes wrong, the message will be lost (it won't be read again)
    2. At least once : offsets are committed after the message is processed. If the processing goes wrong, the message will be read again. This can rsult in duplicate processing of messages. Make sure your processing is idempotent.
    3. Exactly once : can be achieved for kafka => kafka workflows using the transactional API (easy with kafka streams API). For kafka => sink workflows, use an idempotent consumer.
    For most applications you should use at least once processing (we'll see in pratice how to do it) and ensure your transformations / processing are idempotent.

    Consumer Offset Commits Strategies

    enable.auto.commit=true & synchronous processing of batches
        while(true){
            List<Records> batch = consumer.poll(Duration.ofMillis(100));
            doSomethingSynchronous(batch)
        }
  • enable.auto.commit=false& Synchronous processing of batches
  •     while(true){
            List<Records> batch = consumer.poll(Duration.ofMillis(100));
            if(isReady(batch)){
                doSomethingSynchronous(batch);
                consumer.commitAsynch
            }
        }
    heart.interval.ms (default 3 seconds) : howw often to send heartbeats.
    Usually set to 1/3rd of session.timeout.ms
    session.timeout.ms (default 45 seconds kafka 3.0+) : Heartbeats are sent periodically to the broker; If no heartbeat is sent during that period, the consumer is considered dead
    max.poll.interval.ms (default 5 minutes) : Maximun amount of time between two .poll() calls before declaring ths consumer died.
    max.poll.records (default 500) : controls how many records to receive per poll request; increase if your messages are very small and have a lot of available RAM; Lower if it takes you too much time to process records.
    fetch.min.bytes (default 1) : controls how much data you want to pull at least on each request; helps improving throughput and decreasing request number; At the cost of latency.
    fetch.max.wait.ms (default 500) : the maximum amount of time the kafka broker will block before answering the fetch request if there isn't sufficient data to immediately satisfy the requirement given by fetch.min.bytes.
    This means that until the requirement of etch.min.bytes to be satisfied, you will have up to 500 ms of latency before the fetch returns datat to the consumer (e.g. introducing a potential delay to be more efficient in requests)
    max.partition.fetch.bytes (default 1MB) : The maximum amount of data per partition the server will return.
    If you read from 100 partitions, you'll need a lot of memory (RAM)
    fetch.max.bytes (default 55MB) : Maximum data returned for each fetch request.
    If you have available memory, try increasing fetch.max.bytes to allow the consumer to read more data in each request.

    Partitions and Segments

    log.segment.bytes (default 1GB) : the max size of a single segment in bytes.
    log.segment.ms (default 1 week) : the time kafka will wait before committing the segment if not full

    Log Cleanup Policies

    Policy 1: log.cleanup.policy=delete (kafka default for all users topics)
  • Delete based on age of data (default is a week)
  • Delete based on max size of log (default is -1 == infinite)
  • Policy 2 : log.cleanup.policy=compact (kafka default for topic __consumer_offsets)

    Log Compaction

    log.cleanup.policy=compact is compacted by :
    segment.ms (default 7 days) : max amount of time to wait to close active segment.
    segment.bytes (default 1GB) : max size of a segment
    min.compaction.log.ms (default 0) : how long to wait before a message can be compacted.
    delete.retention.ms (default 24 hours) : wait before deleting data marked for compaction
    min.cleanable.dirty.ratio (default 0.5) : higher => less, more efficient cleaning.lower => opposite

    Add Config

    kafka-configs --bootstrap-server localhost:9092 --entity-type [topics | users] \
    --entity-name [topic_name] --alter --add-config min.insync.replicas=2

    summarized by Josue Lubaki

    certification