From ec31644faddd2f76eaca534f47a94d8716f7c90c Mon Sep 17 00:00:00 2001 From: James Osborn Date: Fri, 10 Jan 2025 14:59:32 -0600 Subject: [PATCH] make shifts handle longer displacements --- src/base/threading.nim | 17 ++ src/comms/gather.nim | 17 -- src/layout/layoutTypes.nim | 27 ++- src/layout/qshifts.nim | 402 +++++++++++++++++++++++-------------- src/layout/shiftX.nim | 15 +- src/layout/shifts.nim | 187 +++++++++++++++-- tests/base/tshift.nim | 3 +- tests/examples/test1.nim | 2 +- 8 files changed, 469 insertions(+), 201 deletions(-) diff --git a/src/base/threading.nim b/src/base/threading.nim index 1e5faba2..753c3945 100644 --- a/src/base/threading.nim +++ b/src/base/threading.nim @@ -178,6 +178,23 @@ macro tFor*(index: untyped; slice: Slice; body: untyped): untyped = i1 = slice[2] result = tForX(index, i0, i1, body) +proc adjust[T](i: int, x: openarray[T], b: int): int = + result = i + if i > 0: + let n = x.len + while result 0: - let n = x.len - while result 0: - var sbuf: pointer = aalloc(sbs) + var sbuf = aalloc(sbs) #printf("sbuf: %p\n", sbuf); - var i: cint = 0 - while i < n: + for i in 0.. 0: - sb[i].sqmpmem = QMP_declare_msgmem(sbuf, sbs.csize_t) - sb[i].smsg = QMP_declare_send_to(sb[i].sqmpmem, si[i].sendRanks[0], 0) - #echo &"->{si[i].sendRanks[0]}: {sbs}" - inc(i) + sb[i].sqmpmem = cast[ptr cArray[QMP_msgmem_t]](allocShared(si[i].nSendRanks*sizeof(QMP_msgmem_t))) + sb[i].smsg = cast[ptr cArray[QMP_msghandle_t]](allocShared(si[i].nSendRanks*sizeof(QMP_msghandle_t))) + for j in 0..{si[i].sendRanks[0]}: {sbs}" if rbs > 0: - var rbuf: pointer = aalloc(rbs) - var i: cint = 0 - while i < n: + var rbuf = aalloc(rbs) + for i in 0.. 0: - sb[i].rqmpmem = QMP_declare_msgmem(rbuf, rbs.csize_t) - sb[i].rmsg = QMP_declare_receive_from(sb[i].rqmpmem, - si[i].recvRanks[0], 0) + sb[i].rqmpmem = cast[ptr cArray[QMP_msgmem_t]](allocShared(si[i].nRecvRanks*sizeof(QMP_msgmem_t))) + sb[i].rmsg = cast[ptr cArray[QMP_msghandle_t]](allocShared(si[i].nRecvRanks*sizeof(QMP_msghandle_t))) + for j in 0.. 0: pairmsg = QMP_declare_send_recv_pairs(p[0].addr, nn) - i=0 - while i < n: + for i in 0..rmsg,sb[i]->smsg,sb[i]->pairmsg); - inc(i) #fflush(stdout); proc prepareShiftBufQ*(sb: ptr ShiftBufQ; si: ptr ShiftIndicesQ; esize: cint) = @@ -104,25 +103,26 @@ proc startSendBufQ*(sb: ptr ShiftBufQ) = #echo "QMP_start" discard QMP_start(sb.pairmsg) else: - if not isEmpty sb.smsg: - discard QMP_start(sb.smsg) + for i in 0..rbuf)); proc doneRecvBufQ*(sb: ptr ShiftBufQ) = @@ -132,46 +132,40 @@ proc doneRecvBufQ*(sb: ptr ShiftBufQ) = proc freeShiftBufsQ*(sb: openArray[ptr ShiftBufQ]) = let n = sb.len - var i: cint = 0 - while i < n: - dealloc(sb[i].offr) - dealloc(sb[i].lenr) - dealloc(sb[i].nthreads) - inc(i) + for i in 0.. 0: afree(sb[i].sbuf) if sb[i].first!=0 and sb[i].rbufSize > 0: afree(sb[i].rbuf) - inc(i) proc freeShiftBufQ*(sb: ptr ShiftBufQ) = freeShiftBufsQ([sb]) @@ -246,8 +240,8 @@ proc makeGDFromShiftSubs*(gd: ptr GatherDescription; l: ptr LayoutQ; var nndi = ndisps * myndi var args: mapargs args.l = l - var sidx: ptr cArray[cint] = cast[ptr cArray[cint]](alloc(nndi * sizeof(cint))) - var srank: ptr cArray[cint] = cast[ptr cArray[cint]](alloc(nndi * sizeof(cint))) + var sidx: ptr cArray[cint] = cast[ptr cArray[cint]](allocShared(nndi * sizeof(cint))) + var srank: ptr cArray[cint] = cast[ptr cArray[cint]](allocShared(nndi * sizeof(cint))) # find shift sources var nRecvDests: cint = 0 var n: cint = 0 @@ -336,7 +330,7 @@ proc makeGDFromShiftSubs*(gd: ptr GatherDescription; l: ptr LayoutQ; inc(n) gd.nSendIndices = cint sendSrcIndices.len template ARRAY_CLONE(x,y: typed) = - x = cast[type(x)](alloc(y.len*sizeof(type(x[0])))) + x = cast[type(x)](allocShared(y.len*sizeof(type(x[0])))) for i in 0..nranks,l->nranks,l->nSites*ndisp,l->myrank); - var gd = cast[ptr GatherDescription](alloc(sizeof((GatherDescription)))) + var gd = cast[ptr GatherDescription](allocShared(sizeof((GatherDescription)))) makeGDFromShiftSubs(gd, l, disp, subs, ndisp) #makeGDFromShiftSubs(gd, l, # cast[ptr carray[ptr carray[cint]]](disp[0].unsafeaddr), # cast[ptr carray[cstring]](subs[0].unsafeaddr), ndisp) makeGatherFromGD(gi, gd) - #int si0 = 0; - var si0: cint = ndisp-1 + var si0 = ndisp-1 # si index for send structures var - vvs: cint = 0 perm: cint = 0 pack: cint = 0 - packs = [0,0] - packbits = [0,0] - #sendSites = newSeq[int32]() + packs = [0,0,0,0] + packbits = [0,0,0,0] + lsrcloc = newSeq[int32](l.nSites) si[si0].sendSites.newSeq(0) - # calculate pack, vvs (nSendSites), sendSites + si[si0].sbufcount.newSeq(0) + si[si0].lbufcount.newSeq(0) if gi.nSendIndices > 0: + for i in 0..0 and (ssi != ssi0 or i==gi.nSendIndices-1): if ssi1!=ssi0: ssi0 = ssi1 - #si[si0].sendSites[vvs] = ssi - inc vvs - if vvs > vvol: - echo "vvs(",vvs,")>vvol(",vvol,")" - if myRank == 0: - for i in 0..= packs.len: + echo "pck ", pck, " not in packs ", packs + qexError() + if pck == packs[imask]: break + if packs[imask] == 0: + packs[imask] = pck + packbits[imask] = pckbits + break + inc imask + si[si0].sendSites.add SendSite(maskidx:uint32 imask, site:uint32 ssi) + si[si0].sbufcount.add int32 scount + si[si0].lbufcount.add int32 lcount + scount += pckbits + #lcount += l.nSitesInner - pckbits + for iv in 0.. 0: si[si0].sendRanks = gi.sendRanks - si[si0].sendRankSizes = cast[ptr cArray[cint]](alloc(si[si0].nSendRanks*sizeof(cint))) + #si[si0].sendRankSizes = cast[ptr cArray[int]](allocShared(gi.nSendRanks*sizeof(int))) si[si0].sendRankSizes1 = gi.sendRankSizes - si[si0].sendRankOffsets = cast[ptr cArray[cint]](alloc(si[si0].nSendRanks*sizeof(cint))) + #si[si0].sendRankOffsets = cast[ptr cArray[cint]](allocShared(gi.nSendRanks*sizeof(cint))) si[si0].sendRankOffsets1 = gi.sendRankOffsets - si[si0].sendRankSizes[0] = vvs - si[si0].sendRankOffsets[0] = 0 + #si[si0].sendRankSizes[0] = si[si0].sendSites.len + #si[si0].sendRankOffsets[0] = 0 + #if gi.sendRankSizes[0] != gi.nSendIndices: + # echo &"gi.sendRankSizes[0] {gi.sendRankSizes[0]} != gi.nSendIndices {gi.nSendIndices}" + # qexError() + #if gi.sendRankOffsets[0] != 0: + # echo &"gi.sendRankOffsets[0] {gi.sendRankOffsets[0]} != 0" + # qexError() + var nrsites: cint = 0 nrdests = newSeq[cint](ndisp) @@ -486,9 +486,7 @@ proc makeShiftMultiSubQ*(si: openArray[ptr ShiftIndicesQ]; var p = gi.srcIndices[k0] mod l.nSitesInner if p != 0: perm = p - #si->sidx[i] = -vvs-1; si[dd].pidx[ix] = - (si[dd].pidx[ix]) - 2 - #vvs++; else: rbi = - (rbi + 2) rbi = (2 * rbi) div l.nSitesInner @@ -502,22 +500,28 @@ proc makeShiftMultiSubQ*(si: openArray[ptr ShiftIndicesQ]; si[i].nRecvRanks = 0 si[i].nRecvSites1 = 0 si[0].nRecvRanks = gi.nRecvRanks - si[0].nRecvSites = nrsites + #si[0].nRecvSites = nrsites si[0].nRecvSites1 = gi.recvSize if gi.nRecvRanks > 0: si[0].recvRanks = gi.recvRanks - si[0].recvRankSizes = cast[ptr cArray[cint]](alloc(si[0].nRecvRanks*sizeof(cint))) + #si[0].recvRankSizes = cast[ptr cArray[cint]](allocShared(si[0].nRecvRanks*sizeof(cint))) si[0].recvRankSizes1 = gi.recvRankSizes - si[0].recvRankOffsets = cast[ptr cArray[cint]](alloc(si[0].nRecvRanks * sizeof(cint))) + #si[0].recvRankOffsets = cast[ptr cArray[cint]](allocShared(si[0].nRecvRanks * sizeof(cint))) si[0].recvRankOffsets1 = gi.recvRankOffsets - si[0].recvRankSizes[0] = nrsites - si[0].recvRankOffsets[0] = 0 + #si[0].recvRankSizes[0] = nrsites + #si[0].recvRankOffsets[0] = 0 + #if gi.recvRankSizes[0] != gi.recvSize: + # echo &"gi.recvRankSizes[0] {gi.recvRankSizes[0]} != gi.recvSize {gi.recvSize}" + # qexError() + #if gi.recvRankOffsets[0] != 0: + # echo &"gi.recvRankOffsets[0] {gi.recvRankOffsets[0]} != 0" + # qexError() for n in 0.. 0: - si[n].recvDests = cast[ptr cArray[cint]](alloc(nrdests[n]*sizeof(cint))) - si[n].recvLocalSrcs = cast[ptr cArray[cint]](alloc(nrdests[n]*sizeof(cint))) - si[n].recvRemoteSrcs = cast[ptr cArray[cint]](alloc(nrdests[n]*sizeof(cint))) + si[n].recvDests = cast[ptr cArray[cint]](allocShared(nrdests[n]*sizeof(cint))) + si[n].recvLocalSrcs = cast[ptr cArray[cint]](allocShared(nrdests[n]*sizeof(cint))) + si[n].recvRemoteSrcs = cast[ptr cArray[cint]](allocShared(nrdests[n]*sizeof(cint))) var j = 0 for i in 0..0: + si[n].recvLocalSrcs[nrecv] = lidx[0] + # if si[n].recvLocalSrcs[nrecv] != lidx[0]: + # echo &"si[n].recvLocalSrcs[nrecv] {si[n].recvLocalSrcs[nrecv]} != lidx[0] {lidx[0]}" + # qexError() + #if (rmask and si[n].packmasks[0]) != 0: + # echo &"rmask {rmask} and si[n].packmasks[0] {si[n].packmasks[0]} != 0" + # qexError() + var imask = 0 + while true: + if imask >= si[n].recvmasks.len: + echo &"si[n].recvmasks[{imask}] {si[n].recvmasks[imask]} != rmask {rmask}" + qexError() + if rmask == si[n].recvmasks[imask]: break + if si[n].recvmasks[imask] < 0: + si[n].recvmasks[imask] = rmask + si[n].recvbits[imask] = rbits + break + inc imask + #if (-2-si[n].sidx[io]) != nrecv: # reuse sidx for now, replace later + # echo &"-2-si[n].sidx[io] {-2-si[n].sidx[io]} != nrecv {nrecv}" + # qexError() + #si[n].sidx[io] = int32 -2 - nrecv + si[n].recvIndex[io] = RecvIdx(maskidx: uint32 imask, idx: uint32 nrecv) + inc nrecv proc makeShiftMultiQ*(si: openArray[ptr ShiftIndicesQ]; l: ptr LayoutQ; disp: openArray[ptr cArray[cint]]; ndisp: cint) = diff --git a/src/layout/shiftX.nim b/src/layout/shiftX.nim index b1007883..6d95a027 100644 --- a/src/layout/shiftX.nim +++ b/src/layout/shiftX.nim @@ -5,10 +5,12 @@ import strformat type ShiftBufQ* = object - sqmpmem*: QMP_msgmem_t - smsg*: QMP_msghandle_t - rqmpmem*: QMP_msgmem_t - rmsg*: QMP_msghandle_t + nsend*: int32 + nrecv*: int32 + sqmpmem*: ptr cArray[QMP_msgmem_t] + smsg*: ptr cArray[QMP_msghandle_t] + rqmpmem*: ptr cArray[QMP_msgmem_t] + rmsg*: ptr cArray[QMP_msghandle_t] pairmsg*: QMP_msghandle_t sbuf*: ptr cArray[char] rbuf*: ptr cArray[char] @@ -94,8 +96,9 @@ proc makeShift*(l:var Layout; dir,len:int; sub:string="all") = si.comm = l.comm proc getShift*(l:var Layout; dir,len:int; sub:string="all"):ShiftIndices = #if nRanks>1 and len>l.outerGeom[dir]: # current limitation - if l.rankGeom[dir]>1 and l.innerGeom[dir]>1 and len>l.outerGeom[dir]: - qexError(&"unsupported shift dir: {dir} len: {len} ranks: {nRanks} og: {l.outerGeom}") + #if l.rankGeom[dir]>1 and l.innerGeom[dir]>1 and len>l.outerGeom[dir]: + if len>l.localGeom[dir]: + qexError(&"unsupported shift dir: {dir} len: {len} ranks: {nRanks} og: {l.localGeom}") let key = makeShiftKey(dir, len, sub) if not hasKey(l.shifts, key): makeShift(l, dir, len, sub) diff --git a/src/layout/shifts.nim b/src/layout/shifts.nim index 78d8e102..585f028a 100644 --- a/src/layout/shifts.nim +++ b/src/layout/shifts.nim @@ -13,6 +13,7 @@ export field #import future #import strUtils #import metaUtils +import bitops, strformat getOptimPragmas() type ShiftB*[T] = object @@ -23,16 +24,14 @@ type ShiftB*[T] = object template shiftBType*(x:SomeField):untyped = ShiftB[evalType(x[0])] -template initShiftB*(s:ShiftB; l:Layout; t:typedesc; - dir,len:int; sub="all"):untyped = +template initShiftB*(s:ShiftB; l:Layout; t:typedesc; dir,len:int; sub="all") = if threadNum==0: s.subset.layoutSubset(l, sub) s.si = l.getShift(dir, len, sub) s.size = sizeOf(t) div l.nSitesInner prepareShiftBuf(s.sb, s.si, s.size) -template initShiftB*(s:ShiftB; x:SomeField; dir,len:int; sub="all"):untyped = +template initShiftB*(s:ShiftB; x:SomeField; dir,len:int; sub="all") = if threadNum==0: - #template l:untyped = x.l s.subset.layoutSubset(x.l, sub) s.si = x.l.getShift(dir, len, sub) s.size = sizeOf(x[0]) div x.l.nSitesInner @@ -63,7 +62,7 @@ proc createShiftBufs*(x:auto; ln=1; sub="all"):auto = #proc init*(s:var ShiftB; ; # dir,len:int; sub="all") = - +#[ template startSB*(sb0: ShiftB; e: untyped) = mixin assign, `[]`, numberType if threadNum == 0: @@ -92,11 +91,50 @@ template startSB*(sb0: ShiftB; e: untyped) = if threadNum == 0: #echoRank "send: ", cast[ptr float32](sb0.sb.sq.sbuf)[] startSendBuf(sb0.sb) +]# + +#proc startSB0*(sb0: ShiftB; e: auto) {.alwaysInline.} = +template startSB*(sb0: ShiftB; e: untyped) = + mixin assign, `[]`, numberType + if threadNum == 0: + if sb0.si.nRecvRanks > 0: + startRecvBuf(sb0.sb) + if sb0.si.nSendRanks > 0: + type F = numberType(sb0.T) + let nv = sizeof(sb0.T) div sb0.size + let ne = sb0.size div sizeof(F) + let b = cast[ptr cArray[F]](sb0.sb.sq.sbuf) + let l = cast[ptr cArray[F]](sb0.sb.lbuf) + tFor i, 0.. 0: let rr = cast[ptr cArray[s.T]](s.sb.sq.rbuf) if s.si.blend == 0: let i = -2 - s.si.sq.sidx[ir] - let k2 = s.si.sq.recvRemoteSrcs[i] + #let k2 = s.si.sq.recvRemoteSrcs[i] + let k2 = s.si.sq.recvRemoteSrcs[i] * s.size div sizeof(s.T) #echo "blend0: ", i, " ir: ", ir, " k2: ", k2 subst(it,rr[k2]): e else: let stride = sizeof(s.T) div 2 let i = -2 - s.si.sq.sidx[ir] - let k2 = s.si.sq.recvRemoteSrcs[i] + #let k2 = s.si.sq.recvRemoteSrcs[i] + let k2 = s.si.sq.recvRemoteSrcs[i] * s.size * 2 div sizeof(s.T) #echo "blendb: ", irr, " sidx: ", s.si.sq.sidx[irr].int var itt{.noInit.}: s.T # should be load1(s.T)? blend(itt, s.sb.lbuf[stride*i].addr, s.sb.sq.rbuf[stride*k2].addr, s.si.blend) subst(it,itt): e +]# +proc boundaryGetSBP*[T](s: ShiftB, ir: int, p: ptr T): ptr T {.alwaysInline.} = + result = p + if s.si.nRecvDests > 0: + type F = numberType(s.T) + var itp = cast[ptr cArray[F]](p) + let rb = cast[ptr cArray[F]](s.sb.sq.rbuf) + let lb = cast[ptr cArray[F]](s.sb.lbuf) + let nv = sizeof(s.T) div s.size + let ne = s.size div sizeof(F) + #let ri = -2 - s.si.sq.sidx[ir] + let ri = s.si.sq.recvIndex[ir].idx + let mi = s.si.sq.recvIndex[ir].maskidx + #echo &"ri {ri}" + let rsrc = s.si.sq.recvRemoteSrcs[ri] + let lsrc = s.si.sq.recvLocalSrcs[ri] + var rk = 0 + var lk = 0 + for k in 0.. 0: if threadNum == 0: waitSendBuf(s.sb) +]# +template boundarySB*[T](s:ShiftB[T]; e:untyped) = + var needBoundary = false + boundaryWaitSB(s): needBoundary = true + if needBoundary: + boundarySyncSB() + #echoAll myrank, ": nb" + if s.si.nRecvDests > 0: + #echo "nrd" + if s.sb.sq.nthreads[threadNum] != numThreads: boundaryOffsetSB(s) + let ti0 = s.sb.sq.offr[threadNum] + let ti1 = s.sb.sq.lenr[threadNum] + for i in ti0.. 0: + if threadNum == 0: + doneRecvBuf(s.sb) + if s.si.nSendRanks > 0: + if threadNum == 0: + waitSendBuf(s.sb) -template boundarySB2*[T](s: ShiftB[T]; f: untyped): untyped = +#[ +template boundarySB2*[T](s: ShiftB[T]; f: untyped) = var needBoundary = false boundaryWaitSB(s): needBoundary = true if needBoundary: @@ -264,14 +371,16 @@ template boundarySB2*[T](s: ShiftB[T]; f: untyped): untyped = let rr = cast[ptr cArray[s.T]](s.sb.sq.rbuf) for i in ti0.. 0: if threadNum == 0: waitSendBuf(s.sb) +]# +template boundarySB2*[T](s: ShiftB[T]; f: untyped) = + var needBoundary = false + boundaryWaitSB(s): needBoundary = true + if needBoundary: + boundarySyncSB() + if s.si.nRecvDests > 0: + if s.sb.sq.nthreads[threadNum] != numThreads: boundaryOffsetSB(s) + let ti0 = s.sb.sq.offr[threadNum] + let ti1 = s.sb.sq.lenr[threadNum] + for i in ti0.. 0: + if threadNum == 0: + doneRecvBuf(s.sb) + if s.si.nSendRanks > 0: + if threadNum == 0: + waitSendBuf(s.sb) + template shiftExpr*(sb: ShiftB, er, es) {.dirty.} = startSB(sb, es) @@ -291,7 +421,7 @@ template shiftExpr*(sb: ShiftB, er, es) {.dirty.} = boundarySB(sb, er) - +#[ type Shift*[V: static[int]; T] = object src*: Field[V,T] dest*: Field[V,T] @@ -436,7 +566,9 @@ proc boundary*(s:var Shift) = threadBarrier() if threadNum == 0: freeShiftBuf(sb) +]# +#[ proc shift*(dest:var Field; dir,len:int; sub:string; src:Field) = const v = dest.V var s{.global.}:Shift[v,dest.T] @@ -451,6 +583,7 @@ proc shift*(dest:var Field; dir,len:int; sub:string; src:Field) = threadBarrier() # wait for everyone proc shift*(dest:var Field; dir,len:int; src:Field) = shift(dest, dir, len, "all", src) +]# type Transporter*[U,F,T] = object @@ -506,6 +639,17 @@ proc newShifter*[F](f: F, dir,len: int, sub="all"): auto = r.len = len r +proc initShifterDest*(s: var Shifter, dest: auto, dir,len: int, sub="all") = + if threadNum==0: + s.field = dest + s.sb.initShiftB(dest, dir, len, sub) + s.len = len + +proc newShifterDest*[F](dest: F, dir,len: int, sub="all"): auto = + var r: Shifter[F,evalType(dest[0])] + r.initShifterDest(dest, dir, len, sub) + r + proc newShifters*[F](f: F, len: int, sub="all"): auto = var r: seq[Transporter[void,F,evalType(f[0])]] let nd = f.l.nDim @@ -569,6 +713,15 @@ proc `^*!`*(x: Transporter, y: auto): auto = toc("apply") #template `()`*(x: Transporter, y: untyped): untyped = x ^* y +proc shift*(dest:var Field; dir,len:int; sub:string; src:Field) = + var s{.global.}: type(newShifterDest(dest, dir, len, sub)) + s.initShifterDest(dest, dir, len, sub) + threadBarrier() + discard s.transporterApply(src) + threadBarrier() +proc shift*(dest:var Field; dir,len:int; src:Field) = + shift(dest, dir, len, "all", src) + when isMainModule: import qex import physics/qcdTypes diff --git a/tests/base/tshift.nim b/tests/base/tshift.nim index 7ad89738..481137a1 100644 --- a/tests/base/tshift.nim +++ b/tests/base/tshift.nim @@ -71,7 +71,8 @@ proc test2[N,T](Smd: typedesc, lat: array[N,T]): float = var dmax = lat[mu] #if nRanks>1: dmax = lo.outerGeom[mu] #if lo.rankGeom[mu]>1 and lo.innerGeom[mu]>1: dmax = lo.outerGeom[mu] - if lo.rankGeom[mu]>1: dmax = lo.outerGeom[mu] + #if lo.rankGeom[mu]>1: dmax = lo.outerGeom[mu] + if lo.rankGeom[mu]>1: dmax = lo.localGeom[mu] for d in 1..dmax: result += testf(x,y,z, mu, d) result += testfb(x,y,z, mu, d) diff --git a/tests/examples/test1.nim b/tests/examples/test1.nim index 4d4624b3..d66bb5ee 100644 --- a/tests/examples/test1.nim +++ b/tests/examples/test1.nim @@ -9,7 +9,7 @@ proc test() = var v2 = lo.ColorVector() var m1 = lo.ColorMatrix() var v3 = lo.ColorVector() - template T0(x:v3.type):untyped = + template T0(x: v3.type): auto = shift(v3, 3,1, x) v3