Skip to content

Commit

Permalink
Mult transpose buff
Browse files Browse the repository at this point in the history
  • Loading branch information
Lior Paz committed Dec 30, 2020
1 parent 093c157 commit 09e63e4
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 43 deletions.
89 changes: 63 additions & 26 deletions src/team_lib/mhba/xccl_mhba_collective.c
Original file line number Diff line number Diff line change
Expand Up @@ -325,12 +325,13 @@ xccl_mhba_send_blocks_start_with_transpose(xccl_coll_task_t *task)
int index = SEQ_INDEX(request->seq_num);
int node_size = team->node.sbgp->group_size;
int net_size = team->net.sbgp->group_size;
struct ibv_mr* curr_transpose_buff_mr;
int op_msgsize = node_size * team->max_msg_size * team->size;
int node_msgsize = SQUARED(node_size) * len;
int block_size = request->block_size;
int col_msgsize = len * block_size * node_size;
int block_msgsize = SQUARED(block_size) * len;
int i, j, k, dest_rank, rank, n_compl, ret;
int i, j, k, dest_rank, rank, n_compl, ret, search_buff;
uint64_t src_addr, remote_addr;
struct ibv_wc transpose_completion[1];
xccl_status_t status;
Expand All @@ -349,12 +350,31 @@ xccl_mhba_send_blocks_start_with_transpose(xccl_coll_task_t *task)
remote_addr = (uintptr_t)(op_msgsize * index + node_msgsize * rank +
block_msgsize * j + col_msgsize * k);
prepost_dummy_recv(team->node.umr_qp, 1);
search_buff = 1;
while(search_buff){
for(k=0;k<NUM_OF_TRANSPOSE_BUFF;k++) {
if(request->transpose_buf_cqs[k] == -1){
curr_transpose_buff_mr = request->transpose_buf_mr[k];
request->transpose_buf_cqs[k] = i;
search_buff = 0;
break;
}
else {
if(ibv_poll_cq(team->net.cqs[request->transpose_buf_cqs[k]], 1, transpose_completion)){
curr_transpose_buff_mr = request->transpose_buf_mr[k];
request->transpose_buf_cqs[k] = i;
search_buff = 0;
break;
}
}
}
}
// SW Transpose
status = send_block_data(
team->node.umr_qp, src_addr, block_msgsize,
team->node.team_send_mkey->lkey,
(uintptr_t)request->transpose_buf_mr->addr,
request->transpose_buf_mr->rkey, IBV_SEND_SIGNALED, 1);
(uintptr_t)curr_transpose_buff_mr->addr,
curr_transpose_buff_mr->rkey, IBV_SEND_SIGNALED, 1);
if (status != XCCL_OK) {
xccl_mhba_error(
"Failed sending block to transpose buffer[%d,%d,%d]", i, j, k);
Expand All @@ -375,19 +395,19 @@ xccl_mhba_send_blocks_start_with_transpose(xccl_coll_task_t *task)
n_compl++;
}
}
transpose_square_mat(request->transpose_buf_mr->addr,
transpose_square_mat(curr_transpose_buff_mr->addr,
block_size, request->args.buffer_info.len,
request->tmp_transpose_buf);
status = send_block_data(
team->net.qps[i],
(uintptr_t)request->transpose_buf_mr->addr, block_msgsize,
request->transpose_buf_mr->lkey, remote_addr,
(uintptr_t)curr_transpose_buff_mr->addr, block_msgsize,
curr_transpose_buff_mr->lkey, remote_addr,
team->net.rkeys[i], IBV_SEND_SIGNALED, 0);
if (status != XCCL_OK) {
xccl_mhba_error("Failed sending block [%d,%d,%d]", i, j, k);
return status;
}
while (!ibv_poll_cq(team->net.cqs[i], 1, transpose_completion)) {}
// while (!ibv_poll_cq(team->net.cqs[i], 1, transpose_completion)) {}
}
}
status = send_atomic(team->net.qps[i],
Expand All @@ -399,6 +419,11 @@ xccl_mhba_send_blocks_start_with_transpose(xccl_coll_task_t *task)
return status;
}
}
for(k=0;k<NUM_OF_TRANSPOSE_BUFF;k++) {
if(request->transpose_buf_cqs[k] != -1){
while(!ibv_poll_cq(team->net.cqs[request->transpose_buf_cqs[k]], 1, transpose_completion)){}
}
}
xccl_task_enqueue(task->schedule->tl_ctx->pq, task);
return XCCL_OK;
}
Expand Down Expand Up @@ -501,8 +526,11 @@ xccl_status_t xccl_mhba_alltoall_init(xccl_coll_op_args_t *coll_args,
int is_asr = (team->node.sbgp->group_rank == team->node.asr_rank);
int n_tasks = (!is_asr) ? 2 : 4;
size_t len = coll_args->buffer_info.len;
void * transpose_buf = NULL;
int i, block_msgsize, block_size;
void * transpose_buf[NUM_OF_TRANSPOSE_BUFF];
int i, block_msgsize, block_size, k;
for(k=0;k<NUM_OF_TRANSPOSE_BUFF;k++) {
transpose_buf[k] = NULL;
}
xccl_status_t status;
request->started = 0;
if (len > team->max_msg_size) {
Expand Down Expand Up @@ -539,7 +567,9 @@ xccl_status_t xccl_mhba_alltoall_init(xccl_coll_op_args_t *coll_args,
return XCCL_ERR_NO_RESOURCE;
}
request->block_size = block_size;
request->transpose_buf_mr = NULL;
for(k=0;k<NUM_OF_TRANSPOSE_BUFF;k++) {
request->transpose_buf_mr[k] = NULL;
}
request->tmp_transpose_buf = NULL;
request->tasks =
(xccl_mhba_task_t *)malloc(sizeof(xccl_mhba_task_t) * n_tasks);
Expand Down Expand Up @@ -594,25 +624,32 @@ xccl_status_t xccl_mhba_alltoall_init(xccl_coll_op_args_t *coll_args,
request->tasks[3].super.progress = xccl_mhba_fanout_progress;

if (team->transpose) {
for(k=0;k<NUM_OF_TRANSPOSE_BUFF;k++) {
request->transpose_buf_cqs[k] = -1;
}
if (ctx->cfg.transpose_buf_size >= block_msgsize) {
request->transpose_buf_mr = team->transpose_buf_mr;
for(k=0;k<NUM_OF_TRANSPOSE_BUFF;k++) {
request->transpose_buf_mr[k] = team->transpose_buf_mr[k];
}
}
else {
transpose_buf = malloc(block_msgsize);
if (!transpose_buf) {
xccl_mhba_error("failed to allocate transpose buffer of %d bytes",
block_msgsize);
status = XCCL_ERR_NO_MEMORY;
goto tr_buf_error;
}
request->transpose_buf_mr = ibv_reg_mr(
team->node.shared_pd, transpose_buf, block_msgsize,
IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
if (!request->transpose_buf_mr) {
xccl_mhba_error(
"failed to register transpose buffer, errno %d", errno);
status = XCCL_ERR_NO_MESSAGE;
goto tr_buf_reg;
for(k=0;k<NUM_OF_TRANSPOSE_BUFF;k++) {
transpose_buf[k] = malloc(block_msgsize);
if (!transpose_buf[k]) {
xccl_mhba_error("failed to allocate transpose buffer of %d bytes",
block_msgsize);
status = XCCL_ERR_NO_MEMORY;
goto tr_buf_error;
}
request->transpose_buf_mr[k] = ibv_reg_mr(
team->node.shared_pd, transpose_buf[k], block_msgsize,
IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
if (!request->transpose_buf_mr[k]) {
xccl_mhba_error(
"failed to register transpose buffer, errno %d", errno);
status = XCCL_ERR_NO_MESSAGE;
goto tr_buf_reg;
}
}
}
request->tmp_transpose_buf = NULL;
Expand Down
3 changes: 2 additions & 1 deletion src/team_lib/mhba/xccl_mhba_collective.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ typedef struct xccl_mhba_coll_req {
int started;
xccl_mhba_reg_t *send_rcache_region_p;
xccl_mhba_reg_t *recv_rcache_region_p;
struct ibv_mr *transpose_buf_mr;
struct ibv_mr *transpose_buf_mr[NUM_OF_TRANSPOSE_BUFF];
int transpose_buf_cqs[NUM_OF_TRANSPOSE_BUFF];
void *tmp_transpose_buf;
} xccl_mhba_coll_req_t;

Expand Down
9 changes: 6 additions & 3 deletions src/team_lib/mhba/xccl_mhba_lib.c
Original file line number Diff line number Diff line change
Expand Up @@ -195,9 +195,12 @@ static xccl_status_t xccl_mhba_collective_finalize(xccl_tl_coll_req_t *request)
ucs_rcache_region_put(team->context->rcache,req->recv_rcache_region_p->region);
if (team->transpose) {
free(req->tmp_transpose_buf);
if (req->transpose_buf_mr != team->transpose_buf_mr) {
ibv_dereg_mr(req->transpose_buf_mr);
free(req->transpose_buf_mr->addr);
if (req->transpose_buf_mr[0] != team->transpose_buf_mr[0]) {
int k;
for(k=0;k<NUM_OF_TRANSPOSE_BUFF;k++) {
ibv_dereg_mr(req->transpose_buf_mr[k]);
free(req->transpose_buf_mr[k]->addr);
}
}
}
free(req->tasks);
Expand Down
5 changes: 3 additions & 2 deletions src/team_lib/mhba/xccl_mhba_lib.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ typedef struct xccl_mhba_node {
#define MAX_TRANSPOSE_SIZE 8000 // HW transpose unit is limited to matrix size
#define MAX_MSG_SIZE 128 // HW transpose unit is limited to element size
#define MAX_STRIDED_ENTRIES 55 // from limit of NIC memory - Sergey Gorenko's email
#define NUM_OF_TRANSPOSE_BUFF 3

typedef struct xccl_mhba_reg {
struct ibv_mr *mr;
Expand Down Expand Up @@ -152,8 +153,8 @@ typedef struct xccl_mhba_team {
int requested_block_size;
struct ibv_mr *dummy_bf_mr;
struct ibv_wc *work_completion;
void *transpose_buf;
struct ibv_mr *transpose_buf_mr;
void *transpose_buf[NUM_OF_TRANSPOSE_BUFF];
struct ibv_mr *transpose_buf_mr[NUM_OF_TRANSPOSE_BUFF];
} xccl_mhba_team_t;

xccl_status_t xccl_mhba_team_create_post(xccl_tl_context_t *context,
Expand Down
34 changes: 23 additions & 11 deletions src/team_lib/mhba/xccl_mhba_team.c
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,6 @@ xccl_status_t xccl_mhba_team_create_post(xccl_tl_context_t *context,
mhba_team->net.ctrl_mr = NULL;
mhba_team->net.remote_ctrl = NULL;
mhba_team->net.rank_map = NULL;
mhba_team->transpose_buf_mr = NULL;
mhba_team->transpose_buf = NULL;

XCCL_TEAM_SUPER_INIT(mhba_team->super, context, params, base_team);

Expand Down Expand Up @@ -264,14 +262,22 @@ xccl_status_t xccl_mhba_team_create_post(xccl_tl_context_t *context,
mhba_team->requested_block_size = ctx->cfg.block_size;
if (mhba_team->node.asr_rank == node->group_rank) {
if (mhba_team->transpose) {
mhba_team->transpose_buf = malloc(ctx->cfg.transpose_buf_size); //todo malloc per operation for parallel
if (!mhba_team->transpose_buf) {
goto fail_after_shmat;
int k;
for(k=0;k<NUM_OF_TRANSPOSE_BUFF;k++) {
mhba_team->transpose_buf[k] = malloc(ctx->cfg.transpose_buf_size); //todo malloc per operation for parallel
if (!mhba_team->transpose_buf[k]) {
xccl_mhba_error("Failed to allocate transpose buff %d",k);
goto fail_after_shmat;
}
mhba_team->transpose_buf_mr[k] =
ibv_reg_mr(mhba_team->node.shared_pd, mhba_team->transpose_buf[k],
ctx->cfg.transpose_buf_size,
IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_LOCAL_WRITE);
if(!mhba_team->transpose_buf_mr[k]){
xccl_mhba_error("Failed to register transpose buff %d",k);
goto fail_after_shmat;
}
}
mhba_team->transpose_buf_mr =
ibv_reg_mr(mhba_team->node.shared_pd, mhba_team->transpose_buf,
ctx->cfg.transpose_buf_size,
IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_LOCAL_WRITE);
}
build_rank_map(mhba_team);
status = xccl_mhba_init_umr(ctx, &mhba_team->node);
Expand Down Expand Up @@ -460,6 +466,10 @@ xccl_status_t xccl_mhba_team_create_post(xccl_tl_context_t *context,
fail_after_transpose_reg:
ibv_dereg_mr(mhba_team->transpose_buf_mr);
free(mhba_team->transpose_buf);
fail_transpose_buff_mr_malloc:
free(mhba_team->transpose_buf_mr);
fail_transpose_buff_malloc:
free(mhba_team->transpose_buf);
fail_after_shmat:
if (-1 == shmdt(mhba_team->node.storage)) {
xccl_mhba_error("failed to shmdt %p, errno %d", mhba_team->node.storage,
Expand Down Expand Up @@ -524,8 +534,10 @@ xccl_status_t xccl_mhba_team_destroy(xccl_tl_team_t *team)
free(mhba_team->work_completion);
free(mhba_team->net.rank_map);
if (mhba_team->transpose) {
ibv_dereg_mr(mhba_team->transpose_buf_mr);
free(mhba_team->transpose_buf);
for(i=0;i<NUM_OF_TRANSPOSE_BUFF;i++) {
ibv_dereg_mr(mhba_team->transpose_buf_mr[i]);
free(mhba_team->transpose_buf[i]);
}
}
}
free(team);
Expand Down

0 comments on commit 09e63e4

Please sign in to comment.