Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for kafka headers #119

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 76 additions & 17 deletions src/kafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -491,31 +491,60 @@ void
kafka_producer_produce(Producer p, Message msg)
{
char *buf = (char *) message_get_data(msg);
rd_kafka_headers_t *hdrs = (rd_kafka_headers_t *) message_get_headers(msg);
size_t len = message_get_len(msg);
rd_kafka_t *rk = ((Meta) p->meta)->rk;
rd_kafka_topic_t *rkt = ((Meta)p->meta)->rkt;
retry:
if (rd_kafka_produce(
rkt,
RD_KAFKA_PARTITION_UA,
RD_KAFKA_MSG_F_COPY,
buf, len,
NULL, 0,
if (hdrs) {
rd_kafka_headers_t *hdrs_copy = rd_kafka_headers_copy(hdrs);
if(rd_kafka_producev(
rk,
RD_KAFKA_V_RKT(rkt),
RD_KAFKA_V_PARTITION(RD_KAFKA_PARTITION_UA),
RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
RD_KAFKA_V_VALUE(buf, len),
RD_KAFKA_V_HEADERS(hdrs_copy),
NULL) == -1)
{
if (rd_kafka_last_error() == RD_KAFKA_RESP_ERR__QUEUE_FULL)
{
rd_kafka_poll(rk, 10*1000);
goto retry;
if (rd_kafka_last_error() == RD_KAFKA_RESP_ERR__QUEUE_FULL)
{
rd_kafka_poll(rk, 10*1000);
goto retry;
}
else
{
logger_log(
"%s %d Failed to produce to topic %s: %s\n",
__FILE__, __LINE__,
rd_kafka_topic_name(rkt),
rd_kafka_err2str(rd_kafka_last_error())
);
}
}
else
} else {
if (rd_kafka_produce(
rkt,
RD_KAFKA_PARTITION_UA,
RD_KAFKA_MSG_F_COPY,
buf, len,
NULL, 0,
NULL) == -1)
{
logger_log(
"%s %d Failed to produce to topic %s: %s\n",
__FILE__, __LINE__,
rd_kafka_topic_name(rkt),
rd_kafka_err2str(rd_kafka_last_error())
);
if (rd_kafka_last_error() == RD_KAFKA_RESP_ERR__QUEUE_FULL)
{
rd_kafka_poll(rk, 10*1000);
goto retry;
}
else
{
logger_log(
"%s %d Failed to produce to topic %s: %s\n",
__FILE__, __LINE__,
rd_kafka_topic_name(rkt),
rd_kafka_err2str(rd_kafka_last_error())
);
}
}
}
rd_kafka_poll(rk, 0);
Expand Down Expand Up @@ -595,6 +624,7 @@ kafka_simple_consumer_consume(Consumer c, Message msg)
{
rd_kafka_queue_t *rkqu = ((Meta) c->meta)->rkqu;
rd_kafka_message_t *rkmessage;
rd_kafka_headers_t *hdrs;

rkmessage = rd_kafka_consume_queue(rkqu, 10000);

Expand All @@ -621,6 +651,15 @@ kafka_simple_consumer_consume(Consumer c, Message msg)
return 0;
}

// Here we don't detach the headers so the memory gets cleared when
// the message is destroyed.
if (!rd_kafka_message_headers(rkmessage, &hdrs)) {
if (hdrs){
rd_kafka_headers_t *hdrs_copy = rd_kafka_headers_copy(hdrs);
message_set_headers(msg, hdrs_copy);
}
}

char *cpy = SCALLOC((int)rkmessage->len + 1, sizeof(*cpy));
memcpy(cpy, (char *)rkmessage->payload, (size_t)rkmessage->len);
message_set_data(msg, cpy);
Expand All @@ -635,6 +674,7 @@ kafka_transactional_consumer_consume(Consumer c, Message msg)
{
rd_kafka_t *rk = ((Meta) c->meta)->rk;
rd_kafka_message_t *rkmessage;
rd_kafka_headers_t *hdrs;

rkmessage = rd_kafka_consumer_poll(rk, 10000);

Expand All @@ -661,6 +701,15 @@ kafka_transactional_consumer_consume(Consumer c, Message msg)
return 0;
}

// Here we don't detach the headers so the memory gets cleared when
// the message is destroyed.
if (!rd_kafka_message_headers(rkmessage, &hdrs)) {
if (hdrs){
rd_kafka_headers_t *hdrs_copy = rd_kafka_headers_copy(hdrs);
message_set_headers(msg, hdrs_copy);
}
}

message_set_data(msg, rkmessage->payload);
message_set_len(msg, (size_t)rkmessage->len);

Expand Down Expand Up @@ -702,6 +751,7 @@ kafka_consumer_consume(Consumer c, Message msg)
{
rd_kafka_t *rk = ((Meta) c->meta)->rk;
rd_kafka_message_t *rkmessage;
rd_kafka_headers_t *hdrs;

rkmessage = rd_kafka_consumer_poll(rk, 10000);

Expand All @@ -728,6 +778,15 @@ kafka_consumer_consume(Consumer c, Message msg)
return 0;
}

// Here we don't detach the headers so the memory gets cleared when
// the message is destroyed.
if (!rd_kafka_message_headers(rkmessage, &hdrs)) {
if (hdrs){
rd_kafka_headers_t *hdrs_copy = rd_kafka_headers_copy(hdrs);
message_set_headers(msg, hdrs_copy);
}
}

char *cpy = SCALLOC((int)rkmessage->len + 1, sizeof(*cpy));
memcpy(cpy, (char *)rkmessage->payload, (size_t)rkmessage->len);
message_set_data(msg, cpy);
Expand Down
4 changes: 3 additions & 1 deletion src/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ consume(void *config)
continue;

//todo: check result of queue_add()
queue_add(q, message_get_data(msg), message_get_len(msg),
queue_add(q, message_get_data(msg), message_get_len(msg), message_get_headers(msg),
message_get_xmark(msg),message_get_metadata(msg));
//give up ownership
message_set_data(msg, NULL);
Expand Down Expand Up @@ -201,7 +201,9 @@ produce(void *config)
metadata_callback_run(m,msg);
//message was handled: free it
free(message_get_data(msg));
free(message_get_headers(msg));
message_set_data(msg, NULL);
message_set_headers(msg, NULL);
metadata_free(message_get_metadata(msg));
}
}
Expand Down
21 changes: 20 additions & 1 deletion src/queue.c
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ typedef struct Message
{
void *data;
size_t datalen;
void *headers;
int64_t xmark;
Metadata metadata;
} *Message;
Expand Down Expand Up @@ -68,6 +69,22 @@ message_get_len(Message msg)
return msg->datalen;
}

void
message_set_headers(Message msg, void *headers)
{
if (msg)
msg->headers = headers;
}

void *
message_get_headers(Message msg)
{
if (msg == NULL){
return NULL;
}
return msg->headers;
}

int64_t
message_get_xmark(Message msg)
{
Expand Down Expand Up @@ -237,7 +254,7 @@ _xmark_find(Queue q, uint32_t mark)
}

int
queue_add(Queue q, void *data, size_t datalen, int64_t xmark, Metadata *md)
queue_add(Queue q, void *data, size_t datalen, void *headers, int64_t xmark, Metadata *md)
{
MessageList newmsg;
/* We can afford to allocate the message before
Expand All @@ -252,6 +269,7 @@ queue_add(Queue q, void *data, size_t datalen, int64_t xmark, Metadata *md)
}
newmsg->msg->datalen = datalen;
newmsg->msg->data = data;
newmsg->msg->headers = headers;
newmsg->msg->xmark = xmark;
newmsg->msg->metadata = *md;
newmsg->next = NULL;
Expand Down Expand Up @@ -414,6 +432,7 @@ queue_get(Queue q, Message msg)
msg->data = firstrec->msg->data;
msg->datalen = firstrec->msg->datalen;
msg->metadata = firstrec->msg->metadata;
msg->headers = firstrec->msg->headers;

/* this line can cause an unfinishable queue
* consumers do not need xmark anylonger
Expand Down
6 changes: 4 additions & 2 deletions src/queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,17 @@ void message_set_metadata(Message msg, Metadata md);
void *message_get_data(Message msg);
void message_set_data(Message msg, void *data);
size_t message_get_len(Message msg);
void message_set_len(Message msg, size_t len);
void *message_get_headers(Message msg);
void message_set_headers(Message msg, void *data);
int64_t message_get_xmark(Message msg);
void message_set_xmark(Message msg, int64_t xmark);
void message_set_len(Message msg, size_t len);
void message_free(Message *msg);

typedef struct Queue *Queue;

Queue queue_init(config_setting_t *config);
int queue_add(Queue q, void *data, size_t datalen, int64_t msgtype, Metadata *md);
int queue_add(Queue q, void *data, size_t datalen, void *headers, int64_t msgtype, Metadata *md);
int queue_get(Queue q, Message msg);
long queue_length(Queue q);
long queue_added(Queue q);
Expand Down
4 changes: 2 additions & 2 deletions t/queue_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@ main(void)
Metadata *md = message_get_metadata(msg);
message_set_xmark(msg,1);
char *data = "moep";
queue_add(q, data, strlen(data), 1, md);
queue_add(q, data, strlen(data), NULL, 1, md);

Message msg2 = message_init();
md = message_get_metadata(msg2);
message_set_xmark(msg2,65535);
char *data2 = "huuuurz";
queue_add(q, data2, strlen(data2), 65535, md);
queue_add(q, data2, strlen(data2), NULL, 65535, md);

queue_get(q, msg2);
queue_get(q, msg);
Expand Down