Skip to content

Commit

Permalink
simplify & optimize
Browse files Browse the repository at this point in the history
  • Loading branch information
shikokuchuo committed Sep 11, 2024
1 parent 58a8519 commit 47c3025
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 33 deletions.
63 changes: 31 additions & 32 deletions src/thread.c
Original file line number Diff line number Diff line change
Expand Up @@ -239,14 +239,12 @@ static void thread_disp_finalizer(SEXP xptr) {
if (NANO_PTR(xptr) == NULL) return;
nano_thread_disp *xp = (nano_thread_disp *) NANO_PTR(xptr);
nano_cv *ncv = xp->cv;
if (ncv != NULL) {
nng_mtx *mtx = ncv->mtx;
nng_cv *cv = ncv->cv;
nng_mtx_lock(mtx);
ncv->condition = -1;
nng_cv_wake(cv);
nng_mtx_unlock(mtx);
}
nng_mtx *mtx = ncv->mtx;
nng_cv *cv = ncv->cv;
nng_mtx_lock(mtx);
ncv->condition = -1;
nng_cv_wake(cv);
nng_mtx_unlock(mtx);
if (xp->tls != NULL) {
nng_tls_config_free(xp->tls);
}
Expand All @@ -266,6 +264,9 @@ static void thread_disp_finalizer(SEXP xptr) {
R_Free(xp->haio);
R_Free(xp->url);
R_Free(xp->online);
nng_cv_free(ncv->cv);
nng_mtx_free(ncv->mtx);
R_Free(xp->cv);
R_Free(xp);

}
Expand Down Expand Up @@ -547,7 +548,7 @@ static void rnng_dispatch_thread(void *args) {
if (nng_rep0_open(&hsock))
goto exitlevel1;

if (nng_dial(hsock, disp->host, &hdial, NNG_FLAG_NONBLOCK))
if (nng_dial(hsock, disp->host, &hdial, 0))
goto exitlevel2;

for (R_xlen_t i = 0; i < n; i++) {
Expand Down Expand Up @@ -675,12 +676,17 @@ SEXP rnng_dispatcher_socket(SEXP host, SEXP url, SEXP tls) {
Rf_error("'tls' is not a valid TLS Configuration");

int xc;
SEXP cv, cvt, xptr, sock, list;
SEXP xptr, sock, list;

nano_cv *ncv = R_Calloc(1, nano_cv);
if ((xc = nng_mtx_alloc(&ncv->mtx)))
goto exitlevel1;

if ((xc = nng_cv_alloc(&ncv->cv, ncv->mtx)))
goto exitlevel2;

PROTECT(cvt = rnng_cv_alloc());
nano_cv *tcv = (nano_cv *) NANO_PTR(cvt);
nano_thread_disp *disp = R_Calloc(1, nano_thread_disp);
disp->cv = tcv;
disp->cv = ncv;
disp->n = nd;
disp->tls = sec ? (nng_tls_config *) NANO_PTR(tls) : NULL;
if (sec) nng_tls_config_hold(disp->tls);
Expand All @@ -703,22 +709,18 @@ SEXP rnng_dispatcher_socket(SEXP host, SEXP url, SEXP tls) {
nano_listener *hl = R_Calloc(1, nano_listener);

if (nng_url_parse(&disp->up, disp->url[0]))
goto exitlevel1;
goto exitlevel3;

if ((xc = nng_req0_open(hsock)))
goto exitlevel2;
goto exitlevel4;

PROTECT(cv = rnng_cv_alloc());
nano_cv *ncv = (nano_cv *) NANO_PTR(cv);
if ((xc = nng_socket_set_ms(*hsock, "req:resend-time", 0)) ||
(xc = nng_pipe_notify(*hsock, NNG_PIPE_EV_ADD_POST, pipe_cb_signal, ncv)) ||
(xc = nng_listen(*hsock, disp->host, &hl->list, 0)) ||
(xc = nng_thread_create(&disp->thr, rnng_dispatch_thread, disp)))
goto exitlevel3;
goto exitlevel5;

PROTECT(sock = R_MakeExternalPtr(hsock, nano_SocketSymbol, R_NilValue));
R_RegisterCFinalizerEx(sock, socket_finalizer, TRUE);
Rf_setAttrib(sock, nano_CvSymbol, cvt);

xptr = R_MakeExternalPtr(disp, nano_SocketSymbol, R_NilValue);
Rf_setAttrib(sock, R_MissingArg, xptr);
Expand All @@ -728,21 +730,14 @@ SEXP rnng_dispatcher_socket(SEXP host, SEXP url, SEXP tls) {
Rf_setAttrib(sock, nano_ListenerSymbol, list);
R_RegisterCFinalizerEx(list, listener_finalizer, TRUE);

rnng_cv_wait(cv);
if ((xc = nng_pipe_notify(*hsock, NNG_PIPE_EV_ADD_POST, NULL, NULL)))
goto exitlevel4;

UNPROTECT(3);
UNPROTECT(1);
return sock;

exitlevel4:
UNPROTECT(1);
exitlevel3:
exitlevel5:
nng_close(*hsock);
UNPROTECT(1);
exitlevel2:
exitlevel4:
nng_url_free(disp->up);
exitlevel1:
exitlevel3:
R_Free(hl);
R_Free(hsock);
for (R_xlen_t i = 0; i < nd; i++) {
Expand All @@ -758,7 +753,11 @@ SEXP rnng_dispatcher_socket(SEXP host, SEXP url, SEXP tls) {
R_Free(disp->online);
if (sec) nng_tls_config_free(disp->tls);
R_Free(disp);
UNPROTECT(1);
nng_cv_free(ncv->cv);
exitlevel2:
nng_mtx_free(ncv->mtx);
R_Free(ncv);
exitlevel1:
ERROR_OUT(xc);

}
Expand Down
2 changes: 1 addition & 1 deletion tests/tests.R
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,7 @@ nanotestxp(disp <- .dispatcher(host = "inproc://hostdisp", url = "inproc://disp/
nanotestz(.online(disp))
nanotestn(.online("a"))
nanotestnano(s <- socket(protocol = "rep"))
nanotestz(dial(s, url = "inproc://disp/1", autostart = NA))
nanotestz(dial(s, url = "inproc://disp/1"))
nanotestz(send(disp, NULL, block = 500L))
nanotestn(recv(s, block = 500L))
nanotestz(send(s, TRUE, block = 500L))
Expand Down

0 comments on commit 47c3025

Please sign in to comment.