Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

UCP/PROTO: Handle request status correctly and refactor #10473

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/ucp/am/eager_multi.c
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ static UCS_F_ALWAYS_INLINE ucs_status_t ucp_am_eager_multi_bcopy_send_func(
ucp_am_eager_multi_bcopy_pack_args_first,
&pack_ctx, 0);
status = ucp_proto_bcopy_send_func_status(packed_size);
status = ucp_proto_am_handle_user_header_send_status(req, status);
status = ucp_am_handle_user_header_send_status_or_release(req, status);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

= align

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is after formatting with clang...
I can force alignment.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The = are clearly not aligned. Why clang suggests this?
Anyway, better force IMO

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why? Code style - "Up to 80 columns"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed, let's keep.

} else {
pack_ctx.max_payload = ucp_proto_multi_max_payload(
req, lpriv, UCP_AM_MID_FRAG_META_LEN);
Expand Down
6 changes: 3 additions & 3 deletions src/ucp/am/eager_single.c
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,9 @@ ucp_am_eager_short_proto_progress_common(uct_pending_req_t *self, int is_reply)
status = uct_ep_am_short_iov(ucp_ep_get_fast_lane(req->send.ep,
spriv->super.lane),
am_id, iov, iov_cnt);
status = ucp_proto_am_handle_user_header_send_status(req, status);
if (ucs_unlikely(status == UCS_ERR_NO_RESOURCE)) {
req->send.lane = spriv->super.lane; /* for pending add */
status = ucp_am_handle_user_header_send_status_or_abort(req, status);
return status;
}

Expand Down Expand Up @@ -215,7 +215,7 @@ ucp_am_eager_single_bcopy_proto_progress(uct_pending_req_t *self)
req, UCP_AM_ID_AM_SINGLE, spriv->super.lane,
ucp_am_eager_single_bcopy_pack, req, SIZE_MAX,
ucp_proto_request_am_bcopy_complete_success, 1);
return ucp_proto_am_handle_user_header_send_status(req, status);
return ucp_am_handle_user_header_send_status_or_abort(req, status);
}

static void ucp_am_eager_single_bcopy_probe_common(
Expand Down Expand Up @@ -294,7 +294,7 @@ ucp_am_eager_single_bcopy_reply_proto_progress(uct_pending_req_t *self)
req, UCP_AM_ID_AM_SINGLE_REPLY, spriv->super.lane,
ucp_am_eager_single_bcopy_reply_pack, req, SIZE_MAX,
ucp_proto_request_am_bcopy_complete_success, 1);
return ucp_proto_am_handle_user_header_send_status(req, status);
return ucp_am_handle_user_header_send_status_or_abort(req, status);
}

ucp_proto_t ucp_am_eager_single_bcopy_reply_proto = {
Expand Down
2 changes: 1 addition & 1 deletion src/ucp/am/rndv.c
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_am_rndv_proto_progress, (self),
UCP_AM_ID_RNDV_RTS, rpriv->lane,
ucp_am_rndv_rts_pack, req, max_rts_size,
ucp_am_rndv_rts_complete, 0);
return ucp_proto_am_handle_user_header_send_status(req, status);
return ucp_am_handle_user_header_send_status_or_abort(req, status);
}

static void ucp_am_rndv_rts_probe(const ucp_proto_init_params_t *init_params)
Expand Down
10 changes: 5 additions & 5 deletions src/ucp/core/ucp_am.c
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,7 @@ static ucs_status_t ucp_am_contig_short(uct_pending_req_t *self)
req->send.msg_proto.am.header.ptr,
req->send.msg_proto.am.header.length,
req->send.buffer, req->send.length, 0);
status = ucp_am_handle_user_header_send_status(req, status);
status = ucp_am_handle_user_header_send_status_or_release(req, status);
return ucp_am_short_handle_status_from_pending(req, status);
}

Expand All @@ -542,7 +542,7 @@ static ucs_status_t ucp_am_contig_short_reply(uct_pending_req_t *self)
req->send.msg_proto.am.header.ptr,
req->send.msg_proto.am.header.length,
req->send.buffer, req->send.length, 1);
status = ucp_am_handle_user_header_send_status(req, status);
status = ucp_am_handle_user_header_send_status_or_release(req, status);
return ucp_am_short_handle_status_from_pending(req, status);
}

Expand All @@ -553,7 +553,7 @@ static ucs_status_t ucp_am_bcopy_single(uct_pending_req_t *self)

status = ucp_do_am_bcopy_single(self, UCP_AM_ID_AM_SINGLE,
ucp_am_bcopy_pack_args_single);
status = ucp_am_handle_user_header_send_status(req, status);
status = ucp_am_handle_user_header_send_status_or_release(req, status);
return ucp_am_bcopy_handle_status_from_pending(self, 0, 0, status);
}

Expand All @@ -564,7 +564,7 @@ static ucs_status_t ucp_am_bcopy_single_reply(uct_pending_req_t *self)

status = ucp_do_am_bcopy_single(self, UCP_AM_ID_AM_SINGLE_REPLY,
ucp_am_bcopy_pack_args_single_reply);
status = ucp_am_handle_user_header_send_status(req, status);
status = ucp_am_handle_user_header_send_status_or_release(req, status);
return ucp_am_bcopy_handle_status_from_pending(self, 0, 0, status);
}

Expand Down Expand Up @@ -722,7 +722,7 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_proto_progress_am_rndv_rts, (self),
status = ucp_rndv_send_rts(sreq, ucp_am_rndv_rts_pack,
sizeof(ucp_rndv_rts_hdr_t) +
sreq->send.msg_proto.am.header.length);
status = ucp_am_handle_user_header_send_status(sreq, status);
status = ucp_am_handle_user_header_send_status_or_release(sreq, status);
return ucp_rndv_send_handle_status_from_pending(sreq, status);
}

Expand Down
25 changes: 0 additions & 25 deletions src/ucp/proto/proto_am.c
Original file line number Diff line number Diff line change
Expand Up @@ -114,28 +114,3 @@ void ucp_proto_am_zcopy_completion(uct_completion_t *self)

ucp_proto_am_zcopy_req_complete(req, self->status);
}

ucs_status_t ucp_proto_am_req_copy_header(ucp_request_t *req)
{
void *user_header;

if ((req->flags & UCP_REQUEST_FLAG_USER_HEADER_COPIED) ||
(req->send.msg_proto.am.header.length == 0)) {
return UCS_OK;
}

ucs_assert(req->send.msg_proto.am.flags & UCP_AM_SEND_FLAG_COPY_HEADER);
user_header = ucs_mpool_set_get_inline(&req->send.ep->worker->am_mps,
req->send.msg_proto.am.header.length);
if (ucs_unlikely(user_header == NULL)) {
ucs_error("failed to allocate active message user header copy");
return UCS_ERR_NO_MEMORY;
}

memcpy(user_header, req->send.msg_proto.am.header.ptr,
req->send.msg_proto.am.header.length);
req->flags |= UCP_REQUEST_FLAG_USER_HEADER_COPIED;
req->send.msg_proto.am.header.ptr = user_header;

return UCS_OK;
}
2 changes: 0 additions & 2 deletions src/ucp/proto/proto_am.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,4 @@ void ucp_proto_am_zcopy_completion(uct_completion_t *self);

void ucp_proto_am_zcopy_req_complete(ucp_request_t *req, ucs_status_t status);

ucs_status_t ucp_proto_am_req_copy_header(ucp_request_t *req);

#endif
88 changes: 59 additions & 29 deletions src/ucp/proto/proto_am.inl
Original file line number Diff line number Diff line change
Expand Up @@ -32,23 +32,6 @@ enum ucp_request_am_internal_flags {

typedef void (*ucp_req_complete_func_t)(ucp_request_t *req, ucs_status_t status);

static UCS_F_ALWAYS_INLINE ucs_status_t
ucp_proto_am_handle_user_header_send_status(ucp_request_t *req,
ucs_status_t send_status)
{
ucs_status_t status;

if (ucs_unlikely(send_status == UCS_ERR_NO_RESOURCE) &&
(req->send.msg_proto.am.flags & UCP_AM_SEND_FLAG_COPY_HEADER)) {
status = ucp_proto_am_req_copy_header(req);
if (ucs_unlikely(status != UCS_OK)) {
return status;
}
}

return send_status;
}

static UCS_F_ALWAYS_INLINE int
ucp_proto_am_is_first_fragment(const ucp_request_t *req)
{
Expand Down Expand Up @@ -125,22 +108,68 @@ ucp_do_am_bcopy_single(uct_pending_req_t *self, uint8_t am_id,
}

static UCS_F_ALWAYS_INLINE ucs_status_t
ucp_am_handle_user_header_send_status(ucp_request_t *req,
ucs_status_t send_status)
ucp_proto_am_req_copy_header(ucp_request_t *req)
{
ucs_status_t status;
void *user_header;

if (ucs_unlikely(send_status == UCS_ERR_NO_RESOURCE) &&
(req->send.msg_proto.am.flags & UCP_AM_SEND_FLAG_COPY_HEADER)) {
status = ucp_proto_am_req_copy_header(req);
if (ucs_unlikely(status != UCS_OK)) {
return status;
}
} else {
if (ucs_likely(!(req->send.msg_proto.am.flags &
UCP_AM_SEND_FLAG_COPY_HEADER)) ||
(req->flags & UCP_REQUEST_FLAG_USER_HEADER_COPIED) ||
(req->send.msg_proto.am.header.length == 0)) {
return UCS_OK;
}

user_header = ucs_mpool_set_get_inline(&req->send.ep->worker->am_mps,
req->send.msg_proto.am.header.length);
if (ucs_unlikely(user_header == NULL)) {
ucs_error("failed to allocate active message user header copy");
return UCS_ERR_NO_MEMORY;
}

memcpy(user_header, req->send.msg_proto.am.header.ptr,
req->send.msg_proto.am.header.length);
req->flags |= UCP_REQUEST_FLAG_USER_HEADER_COPIED;
req->send.msg_proto.am.header.ptr = user_header;

return UCS_OK;
}

static UCS_F_ALWAYS_INLINE ucs_status_t
ucp_am_handle_user_header_send_status_or_abort(ucp_request_t *req,
ucs_status_t status)
{
ucs_status_t copy_status;

if (ucs_likely(status != UCS_ERR_NO_RESOURCE)) {
return status;
}

copy_status = ucp_proto_am_req_copy_header(req);
if (ucs_unlikely(copy_status != UCS_OK)) {
ucp_proto_request_abort(req, copy_status);
return UCS_OK;
}

return status;
}

static UCS_F_ALWAYS_INLINE ucs_status_t
ucp_am_handle_user_header_send_status_or_release(ucp_request_t *req,
ucs_status_t status)
{
ucs_status_t copy_status;

if (ucs_likely(status != UCS_ERR_NO_RESOURCE)) {
ucp_am_release_user_header(req);
return status;
}

copy_status = ucp_proto_am_req_copy_header(req);
if (ucs_unlikely(copy_status != UCS_OK)) {
return copy_status;
}

return send_status;
return status;
}

static UCS_F_ALWAYS_INLINE
Expand Down Expand Up @@ -170,7 +199,8 @@ ucs_status_t ucp_do_am_bcopy_multi(uct_pending_req_t *self, uint8_t am_id_first,
UCS_PROFILE_REQUEST_EVENT_CHECK_STATUS(req, "am_bcopy_first",
packed_len, packed_len);
if (handle_user_hdr) {
status = ucp_am_handle_user_header_send_status(req, packed_len);
status = ucp_am_handle_user_header_send_status_or_release(
req, packed_len);
if (ucs_unlikely(status == UCS_ERR_NO_MEMORY)) {
return status;
}
Expand Down
3 changes: 1 addition & 2 deletions src/ucp/proto/proto_reconfig.c
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,7 @@ static ucs_status_t ucp_proto_reconfig_progress(uct_pending_req_t *self)
return UCS_OK;
}

if (ucp_proto_config_is_am(req->send.proto_config) &&
(req->send.msg_proto.am.flags & UCP_AM_SEND_FLAG_COPY_HEADER)) {
if (ucp_proto_config_is_am(req->send.proto_config)) {
status = ucp_proto_am_req_copy_header(req);
if (status != UCS_OK) {
ucp_proto_request_abort(req, status);
Expand Down
Loading