Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

multi_wait improvements #13150

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
129 changes: 73 additions & 56 deletions lib/multi.c
Expand Up @@ -1289,6 +1289,29 @@ static void reset_socket_fdwrite(curl_socket_t s)
}
#endif

static CURLMcode ufds_increase(struct pollfd **pfds, unsigned int *pfds_len,
unsigned int inc, bool *is_malloced)
{
struct pollfd *new_fds, *old_fds = *pfds;
unsigned int new_len = *pfds_len + inc;

new_fds = calloc(new_len, sizeof(struct pollfd));
if(!new_fds) {
if(*is_malloced)
free(old_fds);
*pfds = NULL;
*pfds_len = 0;
return CURLM_OUT_OF_MEMORY;
}
memcpy(new_fds, old_fds, (*pfds_len) * sizeof(struct pollfd));
if(*is_malloced)
free(old_fds);
*pfds = new_fds;
*pfds_len = new_len;
*is_malloced = TRUE;
return CURLM_OK;
}

#define NUM_POLLS_ON_STACK 10

static CURLMcode multi_wait(struct Curl_multi *multi,
Expand All @@ -1302,12 +1325,12 @@ static CURLMcode multi_wait(struct Curl_multi *multi,
struct Curl_easy *data;
struct easy_pollset ps;
size_t i;
unsigned int nfds = 0;
unsigned int curlfds;
long timeout_internal;
int retcode = 0;
struct pollfd a_few_on_stack[NUM_POLLS_ON_STACK];
struct pollfd *ufds = &a_few_on_stack[0];
unsigned int ufds_len = NUM_POLLS_ON_STACK;
unsigned int nfds = 0, curl_nfds = 0; /* how many ufds are in use */
bool ufds_malloc = FALSE;
#ifdef USE_WINSOCK
WSANETWORKEVENTS wsa_events;
Expand All @@ -1326,84 +1349,68 @@ static CURLMcode multi_wait(struct Curl_multi *multi,
if(timeout_ms < 0)
return CURLM_BAD_FUNCTION_ARGUMENT;

/* Count up how many fds we have from the multi handle */
memset(&ps, 0, sizeof(ps));
for(data = multi->easyp; data; data = data->next) {
multi_getsock(data, &ps);
nfds += ps.num;
}

/* If the internally desired timeout is actually shorter than requested from
the outside, then use the shorter time! But only if the internal timer
is actually larger than -1! */
(void)multi_timeout(multi, &timeout_internal);
if((timeout_internal >= 0) && (timeout_internal < (long)timeout_ms))
timeout_ms = (int)timeout_internal;

curlfds = nfds; /* number of internal file descriptors */
nfds += extra_nfds; /* add the externally provided ones */

#ifdef ENABLE_WAKEUP
#ifdef USE_WINSOCK
if(use_wakeup) {
#else
if(use_wakeup && multi->wakeup_pair[0] != CURL_SOCKET_BAD) {
#endif
++nfds;
}
#endif

if(nfds > NUM_POLLS_ON_STACK) {
/* 'nfds' is a 32 bit value and 'struct pollfd' is typically 8 bytes
big, so at 2^29 sockets this value might wrap. When a process gets
the capability to actually handle over 500 million sockets this
calculation needs an integer overflow check. */
ufds = malloc(nfds * sizeof(struct pollfd));
if(!ufds)
return CURLM_OUT_OF_MEMORY;
ufds_malloc = TRUE;
}
nfds = 0;
memset(ufds, 0, ufds_len * sizeof(struct pollfd));
memset(&ps, 0, sizeof(ps));

/* only do the second loop if we found descriptors in the first stage run
above */

if(curlfds) {
mback2k marked this conversation as resolved.
Show resolved Hide resolved
/* Add the curl handles to our pollfds first */
for(data = multi->easyp; data; data = data->next) {
multi_getsock(data, &ps);
/* Add the curl handles to our pollfds first */
for(data = multi->easyp; data; data = data->next) {
multi_getsock(data, &ps);

for(i = 0; i < ps.num; i++) {
struct pollfd *ufd = &ufds[nfds++];
for(i = 0; i < ps.num; i++) {
short events = 0;
#ifdef USE_WINSOCK
long mask = 0;
long mask = 0;
#endif
ufd->fd = ps.sockets[i];
ufd->events = 0;
if(ps.actions[i] & CURL_POLL_IN) {
if(ps.actions[i] & CURL_POLL_IN) {
#ifdef USE_WINSOCK
mask |= FD_READ|FD_ACCEPT|FD_CLOSE;
mask |= FD_READ|FD_ACCEPT|FD_CLOSE;
#endif
ufd->events |= POLLIN;
}
if(ps.actions[i] & CURL_POLL_OUT) {
events |= POLLIN;
}
if(ps.actions[i] & CURL_POLL_OUT) {
#ifdef USE_WINSOCK
mask |= FD_WRITE|FD_CONNECT|FD_CLOSE;
reset_socket_fdwrite(ps.sockets[i]);
mask |= FD_WRITE|FD_CONNECT|FD_CLOSE;
reset_socket_fdwrite(ps.sockets[i]);
#endif
ufd->events |= POLLOUT;
events |= POLLOUT;
}
if(events) {
if(nfds && ps.sockets[i] == ufds[nfds-1].fd) {
ufds[nfds-1].events |= events;
}
else {
if(nfds >= ufds_len) {
if(ufds_increase(&ufds, &ufds_len, 100, &ufds_malloc))
return CURLM_OUT_OF_MEMORY;
}
DEBUGASSERT(nfds < ufds_len);
ufds[nfds].fd = ps.sockets[i];
ufds[nfds].events = events;
++nfds;
}
}
#ifdef USE_WINSOCK
if(mask) {
if(WSAEventSelect(ps.sockets[i], multi->wsa_event, mask) != 0) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mback2k is WSAEventSelect supposed to be called even when mask is 0?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think mask == 0 is used to remove a socket again. See line 1475 after the poll.

The check I added is for the case where a socket is returned, but neither POLLIN, nor POLLOUT is set. Should not happen, but needs to be handled. Calling WSAEventSelect() with 0 would then remove a socket and that is not correct in that situation. The socket might have been added before by another transfer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about this, I am not even sure this works correctly as is for parallel transfers. Think about this case: transfers A and B on the same connection socket X.

  • A wants to POLLIN on X
  • B wants to POLLOUT on X

Does calling WSAEventSelect() on X not override the previous mask?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i recall there was some issue with wsa events, where marc had added it to purposely reset the flags.

Does calling WSAEventSelect() on X not override the previous mask?

it does override so if you want to wait on read and write you have to do it together. i hope marc will have time to review this pr, he has been busy so give it a few days

fwiw i used a build of this pr yesterday with many parallel http/2 httpbin drip replies and didn't see any difference in behavior

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jay you are right regarding the resetting of flags needing to happen, but this comes later in the existing code:

curl/lib/multi.c

Line 1428 in a41cd15

WSAEventSelect(s, multi->wsa_event, 0);

That is why a little bit above that line I left this comment:

curl/lib/multi.c

Lines 1405 to 1407 in a41cd15

/* With WinSock, we have to run the following section unconditionally
to call WSAEventSelect(fd, event, 0) on all the sockets */
{

Does calling WSAEventSelect() on X not override the previous mask?

Yes, it will overwrite the previous mask for the same multi->wsa_event. Anyway, I think your new check referenced by this review thread is fine, as mask == 0 should not happen at this point.

Copy link
Member

@mback2k mback2k Mar 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about this, I am not even sure this works correctly as is for parallel transfers.

I am not into the multi-handle details here, would each transfer have its own multi-handle and therefore own multi->wsa_event? If not, you are probably right and the WSA event will need to be changed to be per-transfer if multi_wait is called individually per-transfer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There can be many transfers on the same multihandle and many transfers on the same connection (HTTP/2, HTTP/3). These report the connection socket with their individual needs.

When transfer A and B are on the same connection, A might request POLLIN and B POLLOUT. Without coalescing the two, the masks will overwrite each other.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main question is if multi_wait will be executed in parallel for these transfers on one multi handle. Also, I am no sure what will happen if the WSA event is changed while already being waited on in another thread for example. Maybe that is undefined behavior or not supported after all?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if multi_wait will be executed in parallel for these transfers on one multi handle

It cannot be executed in parallel for the same handle, no.

if(ufds_malloc)
free(ufds);
return CURLM_INTERNAL_ERROR;
}
#endif
}
#endif
}
}

curl_nfds = nfds; /* what curl internally used in ufds */
mback2k marked this conversation as resolved.
Show resolved Hide resolved

/* Add external file descriptions from poll-like struct curl_waitfd */
for(i = 0; i < extra_nfds; i++) {
#ifdef USE_WINSOCK
Expand All @@ -1422,6 +1429,11 @@ static CURLMcode multi_wait(struct Curl_multi *multi,
return CURLM_INTERNAL_ERROR;
}
#endif
if(nfds >= ufds_len) {
if(ufds_increase(&ufds, &ufds_len, 100, &ufds_malloc))
return CURLM_OUT_OF_MEMORY;
}
DEBUGASSERT(nfds < ufds_len);
ufds[nfds].fd = extra_fds[i].fd;
ufds[nfds].events = 0;
if(extra_fds[i].events & CURL_WAIT_POLLIN)
Expand All @@ -1436,6 +1448,11 @@ static CURLMcode multi_wait(struct Curl_multi *multi,
#ifdef ENABLE_WAKEUP
#ifndef USE_WINSOCK
if(use_wakeup && multi->wakeup_pair[0] != CURL_SOCKET_BAD) {
if(nfds >= ufds_len) {
if(ufds_increase(&ufds, &ufds_len, 100, &ufds_malloc))
return CURLM_OUT_OF_MEMORY;
}
DEBUGASSERT(nfds < ufds_len);
ufds[nfds].fd = multi->wakeup_pair[0];
ufds[nfds].events = POLLIN;
++nfds;
Expand Down Expand Up @@ -1475,7 +1492,7 @@ static CURLMcode multi_wait(struct Curl_multi *multi,
struct, the bit values of the actual underlying poll() implementation
may not be the same as the ones in the public libcurl API! */
for(i = 0; i < extra_nfds; i++) {
unsigned r = ufds[curlfds + i].revents;
unsigned r = ufds[curl_nfds + i].revents;
unsigned short mask = 0;
#ifdef USE_WINSOCK
curl_socket_t s = extra_fds[i].fd;
Expand Down Expand Up @@ -1508,7 +1525,7 @@ static CURLMcode multi_wait(struct Curl_multi *multi,
#ifdef USE_WINSOCK
/* Count up all our own sockets that had activity,
and remove them from the event. */
if(curlfds) {
if(curl_nfds) {

for(data = multi->easyp; data; data = data->next) {
multi_getsock(data, &ps);
Expand All @@ -1529,7 +1546,7 @@ static CURLMcode multi_wait(struct Curl_multi *multi,
#else
#ifdef ENABLE_WAKEUP
if(use_wakeup && multi->wakeup_pair[0] != CURL_SOCKET_BAD) {
if(ufds[curlfds + extra_nfds].revents & POLLIN) {
if(ufds[curl_nfds + extra_nfds].revents & POLLIN) {
char buf[64];
ssize_t nread;
while(1) {
Expand Down
17 changes: 13 additions & 4 deletions tests/http/scorecard.py
Expand Up @@ -49,13 +49,15 @@ def __init__(self, env: Env,
nghttpx: Optional[Nghttpx],
caddy: Optional[Caddy],
verbose: int,
curl_verbose: int):
curl_verbose: int,
download_parallel: int = 0):
self.verbose = verbose
self.env = env
self.httpd = httpd
self.nghttpx = nghttpx
self.caddy = caddy
self._silent_curl = not curl_verbose
self._download_parallel = download_parallel

def info(self, msg):
if self.verbose > 0:
Expand Down Expand Up @@ -138,6 +140,7 @@ def transfer_single(self, url: str, proto: str, count: int):
return {
'count': count,
'samples': sample_size,
'max-parallel': 1,
'speed': mean(samples) if len(samples) else -1,
'errors': errors,
'stats': RunProfile.AverageStats(profiles),
Expand All @@ -164,6 +167,7 @@ def transfer_serial(self, url: str, proto: str, count: int):
return {
'count': count,
'samples': sample_size,
'max-parallel': 1,
'speed': mean(samples) if len(samples) else -1,
'errors': errors,
'stats': RunProfile.AverageStats(profiles),
Expand All @@ -174,6 +178,7 @@ def transfer_parallel(self, url: str, proto: str, count: int):
samples = []
errors = []
profiles = []
max_parallel = self._download_parallel if self._download_parallel > 0 else count
url = f'{url}?[0-{count - 1}]'
self.info(f'parallel...')
for i in range(sample_size):
Expand All @@ -182,7 +187,7 @@ def transfer_parallel(self, url: str, proto: str, count: int):
with_headers=False,
with_profile=True,
extra_args=['--parallel',
'--parallel-max', str(count)])
'--parallel-max', str(max_parallel)])
err = self._check_downloads(r, count)
if err:
errors.append(err)
Expand All @@ -193,6 +198,7 @@ def transfer_parallel(self, url: str, proto: str, count: int):
return {
'count': count,
'samples': sample_size,
'max-parallel': max_parallel,
'speed': mean(samples) if len(samples) else -1,
'errors': errors,
'stats': RunProfile.AverageStats(profiles),
Expand Down Expand Up @@ -436,7 +442,7 @@ def print_score(self, score):
for mkey, mval in server_score[sskey].items():
if mkey not in measures:
measures.append(mkey)
m_names[mkey] = f'{mkey}({mval["count"]}x)'
m_names[mkey] = f'{mkey}({mval["count"]}x{mval["max-parallel"]})'

print('Downloads')
print(f' {"Server":<8} {"Size":>8}', end='')
Expand Down Expand Up @@ -543,6 +549,8 @@ def main():
default=None, help="evaluate download size")
parser.add_argument("--download-count", action='store', type=int,
default=50, help="perform that many downloads")
parser.add_argument("--download-parallel", action='store', type=int,
default=0, help="perform that many downloads in parallel (default all)")
parser.add_argument("-r", "--requests", action='store_true',
default=False, help="evaluate requests")
parser.add_argument("--request-count", action='store', type=int,
Expand Down Expand Up @@ -607,7 +615,8 @@ def main():
assert caddy.start()

card = ScoreCard(env=env, httpd=httpd, nghttpx=nghttpx, caddy=caddy,
verbose=args.verbose, curl_verbose=args.curl_verbose)
verbose=args.verbose, curl_verbose=args.curl_verbose,
download_parallel=args.download_parallel)
score = card.score_proto(proto=protocol,
handshakes=handshakes,
downloads=downloads,
Expand Down