19
19
#include "network.h"
20
20
#include "logging.h"
21
21
22
+ #ifdef HAVE_ZMQ
22
23
#include <zmq.h>
24
+ #endif
23
25
24
26
#define VERSION_ID "$Rev$"
25
27
#define DEFAULT_TTL 16
@@ -41,18 +43,38 @@ char *ip = DEFAULT_HOST;
41
43
static void usage (char * name ) {
42
44
fprintf (
43
45
stderr ,
44
- "%s udp unicast/multicast tester .\n"
46
+ "%s udp unicast/multicast EDI-to-ETI coverter .\n"
45
47
"This is UDP unicast/multicast EDI-stream receiver that converts received stream to ETI-format %s.\n"
46
48
"Usage: %s [options] [address]:[port]\n"
47
- "-v, --verbose : Show more info (default: no)\n"
48
- "-q, --quiet : Minimum info (default: no)\n"
49
- "-I, --interval <value> : Application udp reception interval\n"
50
- "-o, --output <file|url> : Output file path or zeromq url (default: stdout)\n"
51
- "-a, --activity : Dislay activity cursor (default: no)\n"
52
- "-h, --help : Show this help\n"
49
+ "-v, --verbose : Show more info (default: no)\n"
50
+ "-q, --quiet : Minimum info (default: no)\n"
51
+ "-I, --interval <value> : Application udp reception time in seconds. Exit after that. (default:run forever)\n"
52
+ #ifdef HAVE_ZMQ
53
+ "-o, --output <file|zmq-url> : Output file path or zeromq url (default: stdout)\n"
54
+ "-L, --no-align : Disable eti-packets alignment for ZeroMQ packing\n"
55
+ #else
56
+ "-o, --output <file> : Output file path (default: stdout)\n"
57
+ #endif
58
+ "-a, --activity : Dislay activity cursor (default: no)\n"
59
+ "-h, --help : Show this help\n"
53
60
"\n" , name , VERSION_ID , name );
54
61
}
55
62
63
+ static void build_info (char * name ) {
64
+ fprintf (stderr , "ZeroMQ:%s, FEC:%s\n" ,
65
+ #ifdef HAVE_ZMQ
66
+ "enabled" ,
67
+ #else
68
+ "disabled" ,
69
+ #endif
70
+ #ifdef HAVE_FEC
71
+ "enabled"
72
+ #else
73
+ "disabled"
74
+ #endif
75
+ );
76
+ }
77
+
56
78
static void signal_handler (int signum ) {
57
79
// if (signum != SIGPIPE) {
58
80
if (signum != SIGPIPE ) {
@@ -80,6 +102,7 @@ void write_file(void *privData, void *etiData, int etiLen)
80
102
81
103
82
104
105
+ #ifdef HAVE_ZMQ
83
106
84
107
#define NUM_FRAMES_PER_ZMQ_MESSAGE 4
85
108
@@ -98,13 +121,17 @@ typedef struct {
98
121
int is_initial = 1 ;
99
122
zmq_dab_message_t zmq_msg ;
100
123
int zmq_msg_ix ;
124
+ int zmq_align = 1 ;
101
125
102
126
void write_zmq (void * privData , void * etiData , int etiLen )
103
127
{
104
- int i ;
105
- if (verbosity > 1 )
106
- msg_Log ("write:%u bytes to zmq part: %d" , etiLen , zmq_msg_ix );
107
- int offset = 0 ;
128
+ int i ;
129
+ int offset = 0 ;
130
+ uint8_t * etiPkt = (uint8_t * ) etiData ;
131
+
132
+ if (verbosity > 1 )
133
+ msg_Log ("write:%u bytes to zmq part: %d" , etiLen , zmq_msg_ix );
134
+
108
135
// Increment the offset by the accumulated frame offsets
109
136
for (i = 0 ; i < zmq_msg_ix ; i ++ ) {
110
137
offset += zmq_msg .buflen [i ];
@@ -115,8 +142,18 @@ void write_zmq(void *privData, void *etiData, int etiLen)
115
142
return ;
116
143
}
117
144
145
+ if (etiLen < 16 || (zmq_align && zmq_msg_ix != ((etiPkt [6 ] >> 5 ) & 0x03 ))) {
146
+ zmq_msg_ix = 0 ;
147
+ zmq_msg .buflen [0 ] = -1 ;
148
+ zmq_msg .buflen [1 ] = -1 ;
149
+ zmq_msg .buflen [2 ] = -1 ;
150
+ zmq_msg .buflen [3 ] = -1 ;
151
+ msg_Log ("ZMQ: skip non-aligned frames: %02x != %02x" , zmq_msg_ix , ((etiPkt [6 ] >> 5 ) & 0x03 ));
152
+ return ;
153
+ }
154
+
118
155
// Append the new frame to our message
119
- memcpy (zmq_msg .buf + offset , etiData , etiLen );
156
+ memcpy (zmq_msg .buf + offset , etiPkt , etiLen );
120
157
zmq_msg .buflen [zmq_msg_ix ] = etiLen ;
121
158
zmq_msg_ix ++ ;
122
159
@@ -142,7 +179,7 @@ void write_zmq(void *privData, void *etiData, int etiLen)
142
179
}
143
180
}
144
181
}
145
-
182
+ #endif
146
183
147
184
148
185
int main (int argc , char * * argv ) {
@@ -153,26 +190,30 @@ int main(int argc, char **argv) {
153
190
int port = DEFAULT_PORT ;
154
191
int activity = 0 ;
155
192
edi_handler_t * edi_p = NULL ;
193
+ #ifdef HAVE_ZMQ
156
194
void * zmq_context = NULL ;
157
195
void * zmq_publisher = NULL ;
196
+ #endif
158
197
159
198
/******************************************************
160
199
* Getopt
161
200
******************************************************/
162
- const char short_options [] = "vaqhI :o:" ;
201
+ const char short_options [] = "vLaqhI :o:" ;
163
202
const struct option long_options [] = {
164
203
{ "interval" , optional_argument , NULL , 'I' },
165
204
{ "verbose" , optional_argument , NULL , 'v' },
166
205
{ "quiet" , optional_argument , NULL , 'q' },
167
206
{ "output" , optional_argument , NULL , 'o' },
168
207
{ "activity" , optional_argument , NULL , 'a' },
208
+ { "no-align" , no_argument , NULL , 'L' },
169
209
{ "help" , no_argument , NULL , 'h' },
170
210
{ 0 , 0 , 0 , 0 }
171
211
};
172
212
int c , option_index = 0 ;
173
213
174
214
if (argc == 1 ) {
175
215
usage (argv [0 ]);
216
+ build_info (argv [0 ]);
176
217
exit (-1 );
177
218
}
178
219
@@ -192,7 +233,11 @@ int main(int argc, char **argv) {
192
233
case 'a' :
193
234
activity ++ ;
194
235
break ;
195
-
236
+ #ifdef HAVE_ZMQ
237
+ case 'L' :
238
+ zmq_align = 0 ;
239
+ break ;
240
+ #endif
196
241
case 'I' :
197
242
timeout = atoi (optarg );
198
243
break ;
@@ -203,19 +248,25 @@ int main(int argc, char **argv) {
203
248
204
249
case 'h' :
205
250
usage (argv [0 ]);
251
+ build_info (argv [0 ]);
206
252
exit (0 );
207
253
break ;
208
254
}
209
255
}
210
256
257
+ if (verbosity > 0 )
258
+ build_info (argv [0 ]);
259
+
211
260
if (argc <= optind ) {
212
261
if (verbosity > 0 )
213
262
msg_Log ("need to specify ip:port a:%d, o:%d" , argc , optind );
214
263
exit (-1 );
215
264
}
216
265
266
+
217
267
if (outpath ) {
218
268
if (strncmp ("zmq+" , outpath , 4 ) == 0 && strlen (outpath ) > 10 ) {
269
+ #ifdef HAVE_ZMQ
219
270
zmq_context = zmq_ctx_new ();
220
271
zmq_publisher = zmq_socket (zmq_context , ZMQ_PUB );
221
272
zmq_bind (zmq_publisher , outpath + 4 );
@@ -226,8 +277,13 @@ int main(int argc, char **argv) {
226
277
zmq_msg .buflen [3 ] = -1 ;
227
278
zmq_msg .version = 1 ;
228
279
zmq_msg_ix = 0 ;
280
+ #else
281
+ msg_Log ("ZEROMQ is disabled! Can't stream to specified destination: %s" , outpath );
282
+ exit (-1 );
283
+ #endif
229
284
230
- } else {
285
+ } else
286
+ {
231
287
out_fh = fopen (outpath , "wb" );
232
288
edi_p = initEDIHandle (ETI_FMT_RAW , write_file , out_fh );
233
289
}
@@ -322,10 +378,12 @@ int main(int argc, char **argv) {
322
378
sock = 0 ;
323
379
free (buff );
324
380
381
+ #ifdef HAVE_ZMQ
325
382
if (zmq_publisher )
326
383
zmq_close (zmq_publisher );
327
384
if (zmq_context )
328
385
zmq_term (zmq_context );
386
+ #endif
329
387
330
388
return 1 ;
331
389
}
0 commit comments