Skip to content

Commit ab6ce81

Browse files
committed
Fix piped producer error (Unable to read message: ..)
1 parent 96723aa commit ab6ce81

File tree

3 files changed

+23
-7
lines changed

3 files changed

+23
-7
lines changed

input.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -328,7 +328,7 @@ int inbuf_read_to_delimeter (struct inbuf *inbuf, FILE *fp,
328328
if (!inbuf->buf)
329329
return 0; /* Previous EOF encountered, see below. */
330330

331-
while (conf.run && 1) {
331+
while (conf.run) {
332332
ssize_t r;
333333
size_t dof;
334334
int delim_found;

kcat.c

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -416,12 +416,13 @@ static void producer_run (FILE *fp, char **paths, int pathcnt) {
416416
} else {
417417
struct inbuf inbuf;
418418
struct buf *b;
419+
int at_eof = 0;
419420

420421
inbuf_init(&inbuf, conf.msg_size, conf.delim, conf.delim_size);
421422

422423
/* Read messages from input, delimited by conf.delim */
423424
while (conf.run &&
424-
inbuf_read_to_delimeter(&inbuf, fp, &b)) {
425+
!(at_eof = !inbuf_read_to_delimeter(&inbuf, fp, &b))) {
425426
int msgflags = 0;
426427
char *buf = b->buf;
427428
char *key = NULL;
@@ -486,11 +487,9 @@ static void producer_run (FILE *fp, char **paths, int pathcnt) {
486487
conf.run = 0;
487488
}
488489

489-
if (conf.run) {
490-
if (!feof(fp))
491-
KC_FATAL("Unable to read message: %s",
492-
strerror(errno));
493-
}
490+
if (conf.run && !at_eof)
491+
KC_FATAL("Unable to read message: %s",
492+
strerror(errno));
494493
}
495494

496495
#if ENABLE_TXNS

tests/0004-piped_producer.sh

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
#!/bin/bash
2+
#
3+
4+
set -e
5+
source helpers.sh
6+
7+
8+
#
9+
# Verify that piping messages to the producer works.
10+
#
11+
12+
13+
topic=$(make_topic_name)
14+
15+
16+
17+
echo "msg1" | $KCAT -P -t $topic

0 commit comments

Comments
 (0)