Skip to content

Commit

Permalink
traceucast: better handling of client disconnection
Browse files Browse the repository at this point in the history
Instead of exiting with SIGPIPE, traceucast will now attempt
to continually reconnect to the client if they disappear for
any reason.
  • Loading branch information
salcock committed Nov 24, 2023
1 parent 3698268 commit 2b3e302
Showing 1 changed file with 146 additions and 73 deletions.
219 changes: 146 additions & 73 deletions tools/tracemcast/traceucast.c
Original file line number Diff line number Diff line change
Expand Up @@ -255,54 +255,132 @@ static void *init_reader_thread(libtrace_t *trace,
return rdata;
}

static int send_ndag_packet(read_thread_data_t *rdata) {
static int connect_stream_fd(read_thread_data_t *rdata,
struct global_params *gparams) {

int s;
int fd;
uint8_t block;

if (rdata->livesource) {
block = 0;
} else {
block = 1;
}
fd = create_stream_socket(rdata->streamport,
gparams->clientaddr, &(rdata->target), block);

if (fd == 0) {
return 0;
}

if (fd == -1) {
if (errno != ECONNREFUSED) {
fprintf(stderr, "traceucast: failed to create TCP socket for reader thread %d\n", rdata->threadid);
return -1;
} else {
return 0;
}
} else if (rdata->target == NULL) {
fprintf(stderr, "traceucast: failed to get addrinfo for reader socket %d\n", rdata->threadid);
close(rdata->streamfd);
rdata->streamfd = -1;
return -1;
}
rdata->streamfd = fd;
return fd;
}

#define HANDLE_SEND_ERROR \
if (s < 0) { \
if ((errno == EAGAIN || errno == EWOULDBLOCK) && attempts < 20) { \
attempts ++; \
usleep(backoff); \
backoff = backoff * 2; \
if (backoff > 1000000) { \
backoff = 1000000; \
} \
continue; \
} \
fprintf(stderr, "traceucast: thread %d failed to send streamed ERF packet: %s\n", \
rdata->threadid, strerror(errno)); \
close(rdata->streamfd); \
rdata->streamfd = -1; \
usleep(200000); \
continue; \
}

static int send_ndag_packet(read_thread_data_t *rdata,
struct global_params *gparams) {

int s, r;
int rem = (rdata->writeptr - rdata->pbuffer);
int sentsofar = 0;
int ret = 0;
int attempts = 0;
int backoff = 5000;

int firstsend = 0;
int fs_amount = 0;

rdata->encaphdr->recordcount = ntohs(rdata->reccount);

while (rem > 0) {
s = send(rdata->streamfd, rdata->pbuffer + sentsofar, rem, MSG_DONTWAIT);

if (s < 0) {
if ((errno == EAGAIN || errno == EWOULDBLOCK) && attempts < 20) {
attempts ++;
usleep(backoff);
backoff = backoff * 2;
if (backoff > 1000000) {
backoff = 1000000;
if (rdata->streamfd == -1) {
if ((r = connect_stream_fd(rdata, gparams)) < 0) {
rdata->failed = 1;
trace_interrupt();
return -1;
}
if (r == 0) {
if (rdata->livesource) {
return 0;
}
sleep(1);
continue;
}
fprintf(stderr, "traceucast: thread %d failed to send streamed ERF packet: %s\n",
rdata->threadid, strerror(errno));
fprintf(stderr, "%u\n", rdata->seqno);
ret = -1;
break;
fprintf(stderr,
"traceucast: streaming thread %d established connection\n",
rdata->threadid);
}

sentsofar += s;
rem -= s;
if (firstsend == 0 && rem > 8) {
/* try to detect a broken pipe by attempting a "canary" send
* of 8 bytes so that the main send is more likely to trigger
* EPIPE
*/
s = send(rdata->streamfd, rdata->pbuffer, 8,
MSG_DONTWAIT | MSG_NOSIGNAL);
HANDLE_SEND_ERROR
fs_amount = s;

s = send(rdata->streamfd, rdata->pbuffer + fs_amount,
rem - fs_amount, MSG_DONTWAIT | MSG_NOSIGNAL);
HANDLE_SEND_ERROR
sentsofar += (s + fs_amount);
rem -= (s + fs_amount);
firstsend = 1;
} else {
s = send(rdata->streamfd, rdata->pbuffer + sentsofar,
rem, MSG_DONTWAIT | MSG_NOSIGNAL);
HANDLE_SEND_ERROR
sentsofar += s;
rem -= s;
}
}

rdata->writeptr = rdata->pbuffer;
rdata->encaphdr = NULL;
rdata->reccount = 0;
return ret;
return sentsofar;
}

static void halt_reader_thread(libtrace_t *trace UNUSED,
libtrace_thread_t *t UNUSED, void *global UNUSED, void *tls) {
libtrace_thread_t *t UNUSED, void *global, void *tls) {

read_thread_data_t *rdata = (read_thread_data_t *)tls;
struct global_params *gparams = (struct global_params *)global;

if (rdata->writeptr > rdata->pbuffer) {
send_ndag_packet(rdata);
send_ndag_packet(rdata, gparams);
}

if (rdata->pbuffer) {
Expand Down Expand Up @@ -354,15 +432,16 @@ static uint16_t construct_erf_header(read_thread_data_t *rdata,
}

static void tick_reader_thread(libtrace_t *trace UNUSED,
libtrace_thread_t *t UNUSED, void *global UNUSED, void *tls,
libtrace_thread_t *t UNUSED, void *global, void *tls,
uint64_t order) {

read_thread_data_t *rdata = (read_thread_data_t *)tls;
struct global_params *gparams = (struct global_params *)global;

if (rdata->writeptr > rdata->pbuffer &&
(order >> 32) >= rdata->lastsend + 3) {

if (send_ndag_packet(rdata) < 0) {
if (send_ndag_packet(rdata, gparams) < 0) {
rdata->failed = 1;
}
rdata->lastsend = (order >> 32);
Expand All @@ -379,6 +458,7 @@ static libtrace_packet_t *packet_reader_thread(libtrace_t *trace UNUSED,
uint32_t rem;
void *l2;
uint64_t erfts;
int r;

if (rdata->failed) {
trace_interrupt();
Expand All @@ -389,36 +469,6 @@ static libtrace_packet_t *packet_reader_thread(libtrace_t *trace UNUSED,
return packet;
}

if (rdata->streamfd == -1) {
int fd;
uint8_t block;

if (rdata->livesource) {
block = 0;
} else {
block = 1;
}
fd = create_stream_socket(rdata->streamport,
gparams->clientaddr, &(rdata->target), block);

if (fd == 0) {
return packet;
}
if (fd == -1) {
fprintf(stderr, "traceucast: failed to create TCP socket for reader thread %d\n", rdata->threadid);
trace_interrupt();
return packet;

} else if (rdata->target == NULL) {
fprintf(stderr, "traceucast: failed to get addrinfo for reader socket %d\n", rdata->threadid);
close(rdata->streamfd);
rdata->streamfd = -1;
trace_interrupt();
return packet;
}
rdata->streamfd = fd;
}

/* first, check if there is going to be space in the buffer for this
* packet + an ERF header */
l2 = trace_get_layer2(packet, &ltype, &rem);
Expand All @@ -432,9 +482,12 @@ static libtrace_packet_t *packet_reader_thread(libtrace_t *trace UNUSED,
*/
if (rdata->writeptr > rdata->pbuffer + sizeof(ndag_common_t) +
sizeof(ndag_encap_t)) {

if (send_ndag_packet(rdata) < 0) {
if ((r = send_ndag_packet(rdata, gparams)) < 0) {
rdata->failed = 1;
close(rdata->streamfd);
rdata->streamfd = -1;
return packet;
} else if (r == 0) {
return packet;
}
rdata->lastsend = (erfts >> 32);
Expand Down Expand Up @@ -471,10 +524,11 @@ static libtrace_packet_t *packet_reader_thread(libtrace_t *trace UNUSED,
/* if the buffer is close to full, just send the buffer anyway */
if (MAX_PACKET_SIZE - (rdata->writeptr - rdata->pbuffer) -
(dag_record_size + 2) < 64) {
if (send_ndag_packet(rdata) < 0) {
if ((r = send_ndag_packet(rdata, gparams)) < 0) {
rdata->failed = 1;
} else if (r != 0) {
rdata->lastsend = (erfts >> 32);
}
rdata->lastsend = (erfts >> 32);
}

return packet;
Expand Down Expand Up @@ -574,33 +628,52 @@ static uint32_t form_beacon(char **buffer, struct beacon_params *bparams) {
static void *beaconer_thread(void *tdata) {

struct beacon_params *bparams = (struct beacon_params *)tdata;
int sock;
int sock = -1;
char *beaconpacket = NULL;
uint32_t beaconsize;
struct addrinfo *targetinfo = NULL;

sock = create_stream_socket(bparams->beaconport,
bparams->gparams->clientaddr, &targetinfo, 1);

if (sock == -1) {
fprintf(stderr, "traceucast: failed to create TCP socket for beaconer thread\n");
halted = 1;
} else if (targetinfo == NULL) {
fprintf(stderr, "traceucast: failed to get addrinfo for beaconer socket\n");
halted = 1;
}

beaconsize = form_beacon(&beaconpacket, bparams);

if (beaconsize <= 0 || beaconpacket == NULL) {
halted = 1;
}

while (!halted) {
if (send(sock, beaconpacket, beaconsize, 0) != beaconsize) {
if (sock == -1) {
sock = create_stream_socket(bparams->beaconport,
bparams->gparams->clientaddr, &targetinfo, 1);
}

if (sock == 0) {
sleep(1);
continue;
}

if (sock == -1) {
if (errno != ECONNREFUSED) {
fprintf(stderr,
"traceucast: failed to create TCP socket for beacon thread\n");
halted = 1;
break;
} else {
sleep(1);
continue;
}
} else if (targetinfo == NULL) {
fprintf(stderr,
"traceucast: failed to get addrinfo for beaconer socket\n");
halted = 1;
break;
}

if (send(sock, beaconpacket, beaconsize, MSG_NOSIGNAL) != beaconsize) {
fprintf(stderr, "traceucast: failed to send a beacon packet: %s\n",
strerror(errno));
break;
close(sock);
sock = -1;
usleep(200000);
continue;
}
usleep(1000 * bparams->frequency);
}
Expand All @@ -611,7 +684,7 @@ static void *beaconer_thread(void *tdata) {
if (targetinfo) {
free(targetinfo);
}
if (sock >= 0) {
if (sock > 0) {
close(sock);
}

Expand Down

0 comments on commit 2b3e302

Please sign in to comment.