Skip to content

Commit 234018d

Browse files
committed
Add test demostrating that HWM applies to messages that have been already consumed.
1 parent 6e064f9 commit 234018d

File tree

1 file changed

+77
-0
lines changed

1 file changed

+77
-0
lines changed

tests/test_hwm_pubsub.cpp

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,81 @@ int test_blocking (int send_hwm, int msgCnt)
151151
return recv_count;
152152
}
153153

154+
// with hwm 11024: send 9999 msg, receive 9999, send 1100, receive 1100
155+
void test_reset_hwm ()
156+
{
157+
int first_count = 9999;
158+
int second_count = 1100;
159+
int hwm = 11024;
160+
161+
void *ctx = zmq_ctx_new ();
162+
assert (ctx);
163+
int rc;
164+
165+
// Set up bind socket
166+
void *pub_socket = zmq_socket (ctx, ZMQ_PUB);
167+
assert (pub_socket);
168+
rc = zmq_setsockopt (pub_socket, ZMQ_SNDHWM, &hwm, sizeof (hwm));
169+
assert (rc == 0);
170+
rc = zmq_bind (pub_socket, "tcp://127.0.0.1:1234");
171+
assert (rc == 0);
172+
173+
// Set up connect socket
174+
void *sub_socket = zmq_socket (ctx, ZMQ_SUB);
175+
assert (sub_socket);
176+
rc = zmq_setsockopt (sub_socket, ZMQ_RCVHWM, &hwm, sizeof (hwm));
177+
assert (rc == 0);
178+
rc = zmq_connect (sub_socket, "tcp://127.0.0.1:1234");
179+
assert (rc == 0);
180+
rc = zmq_setsockopt( sub_socket, ZMQ_SUBSCRIBE, 0, 0);
181+
assert (rc == 0);
182+
183+
msleep (100);
184+
185+
// Send messages
186+
int send_count = 0;
187+
while (send_count < first_count && zmq_send (pub_socket, NULL, 0, ZMQ_DONTWAIT) == 0)
188+
++send_count;
189+
assert (first_count == send_count);
190+
191+
msleep (100);
192+
193+
// Now receive all sent messages
194+
int recv_count = 0;
195+
while (0 == zmq_recv (sub_socket, NULL, 0, ZMQ_DONTWAIT))
196+
{
197+
++recv_count;
198+
}
199+
assert (first_count == recv_count);
200+
201+
msleep (100);
202+
203+
// Send messages
204+
send_count = 0;
205+
while (send_count < second_count && zmq_send (pub_socket, NULL, 0, ZMQ_DONTWAIT) == 0)
206+
++send_count;
207+
assert (second_count == send_count);
208+
209+
msleep (100);
210+
211+
// Now receive all sent messages
212+
recv_count = 0;
213+
while (0 == zmq_recv (sub_socket, NULL, 0, ZMQ_DONTWAIT))
214+
{
215+
++recv_count;
216+
}
217+
assert (second_count == recv_count);
154218

219+
// Clean up
220+
rc = zmq_close (sub_socket);
221+
assert (rc == 0);
222+
223+
rc = zmq_close (pub_socket);
224+
assert (rc == 0);
225+
226+
rc = zmq_ctx_term (ctx);
227+
assert (rc == 0);
228+
}
155229

156230
int main (void)
157231
{
@@ -167,5 +241,8 @@ int main (void)
167241
count = test_blocking (2000,6000);
168242
assert (count == 6000);
169243

244+
// hwm should apply to the messages that have already been received
245+
test_reset_hwm ();
246+
170247
return 0;
171248
}

0 commit comments

Comments
 (0)