- support asynchronous consume kafka message and ensure message not lost.
- support consumption speed-limiting.
- First you need implement the OffsetPersist interface which is defined in offset_persist.go, the create_table sql see example/create_table.sql.
type OffsetPersist interface {
Find(groupId, topic string, partition int) (int64, error)
Save(groupId, topic string, partition int, offset int64) error
}
- Then you need implement the message Handler which is defined in method_info.go#L30
type BizHandler func(msg *sarama.ConsumerMessage) error
- Create a props for dms consumer, there are several modes of props, async and sync, you also can specify how to commit offset, interval or quantitative by set CommitInterval or CommitSize.
- async: consume messages asynchronous
- sync: consume messages synchronous
- Create a dms consumer to consume kafka messages.
See details in package example.
- when using async mode, the pool size should be larger than topic*partition numbers.