Skip to content

Commit

Permalink
Merge pull request #1535 from 0chain/feat/global-listen
Browse files Browse the repository at this point in the history
Global listener chan wasm
  • Loading branch information
dabasov committed Jun 24, 2024
2 parents d39318a + de3b631 commit a7deec7
Show file tree
Hide file tree
Showing 4 changed files with 152 additions and 34 deletions.
2 changes: 1 addition & 1 deletion wasmsdk/demo/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ <h2>please download zcn.wasm from https://github.com/0chain/gosdk/releases/lates

let network = query.get('network')
if (!network || network == 'undefined') {
network = "mob.zus.network"
network = "dev.zus.network"
}

const blockWorker = 'https://' + network + '/dns';
Expand Down
120 changes: 113 additions & 7 deletions wasmsdk/jsbridge/webworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ package jsbridge

import (
"context"
"errors"
"sync"

"github.com/google/uuid"
"github.com/hack-pad/go-webworkers/worker"
Expand All @@ -30,9 +32,18 @@ type WasmWebWorker struct {
// environment.
// If Env contains duplicate environment keys, only the last
// value in the slice for each duplicate key is used.
Env []string

Env []string
worker *worker.Worker

// For subscribing to events
ctx context.Context
cancelContext context.CancelFunc
subscribers map[string]chan worker.MessageEvent
numberOfSubs int
subMutex sync.Mutex

//isTerminated bool
isTerminated bool
}

var (
Expand All @@ -47,9 +58,10 @@ func NewWasmWebWorker(blobberID, blobberURL, clientID, publicKey, privateKey, mn
}

w := &WasmWebWorker{
Name: blobberURL,
Env: []string{"BLOBBER_URL=" + blobberURL, "CLIENT_ID=" + clientID, "PRIVATE_KEY=" + privateKey, "MODE=worker", "PUBLIC_KEY=" + publicKey, "MNEMONIC=" + mnemonic},
Path: "zcn.wasm",
Name: blobberURL,
Env: []string{"BLOBBER_URL=" + blobberURL, "CLIENT_ID=" + clientID, "PRIVATE_KEY=" + privateKey, "MODE=worker", "PUBLIC_KEY=" + publicKey, "MNEMONIC=" + mnemonic},
Path: "zcn.wasm",
subscribers: make(map[string]chan worker.MessageEvent),
}

if err := w.Start(); err != nil {
Expand All @@ -67,9 +79,103 @@ func GetWorker(blobberID string) *WasmWebWorker {
func RemoveWorker(blobberID string) {
worker, ok := workers[blobberID]
if ok {
worker.Terminate()
delete(workers, blobberID)
worker.subMutex.Lock()
if worker.numberOfSubs == 0 {
worker.Terminate()
delete(workers, blobberID)
worker.isTerminated = true
}
worker.subMutex.Unlock()
}
}

// pass a buffered channel to subscribe to events so that the caller is not blocked
func (ww *WasmWebWorker) SubscribeToEvents(remotePath string, ch chan worker.MessageEvent) error {
if ch == nil {
return errors.New("channel is nil")
}
ww.subMutex.Lock()
if ww.isTerminated {
ww.subMutex.Unlock()
return errors.New("worker is terminated")
}
ww.subscribers[remotePath] = ch
ww.numberOfSubs++
//start the worker listener if there are subscribers
if ww.numberOfSubs == 1 {
ctx, cancel := context.WithCancel(context.Background())
ww.ctx = ctx
ww.cancelContext = cancel
eventChan, err := ww.Listen(ctx)
if err != nil {
return err
}
go ww.ListenForEvents(eventChan)
}
ww.subMutex.Unlock()
return nil
}

func (ww *WasmWebWorker) UnsubscribeToEvents(remotePath string) {
ww.subMutex.Lock()
ch, ok := ww.subscribers[remotePath]
if ok {
close(ch)
delete(ww.subscribers, remotePath)
ww.numberOfSubs--
//stop the worker listener if there are no subscribers
if ww.numberOfSubs == 0 {
ww.cancelContext()
}
}
ww.subMutex.Unlock()
}

func (ww *WasmWebWorker) ListenForEvents(eventChan <-chan worker.MessageEvent) {
for {
select {
case <-ww.ctx.Done():
return
case event, ok := <-eventChan:
if !ok {
return
}
//get remote path from the event
data, err := event.Data()
// if above throws an error, pass it to all the subscribers
if err != nil {
ww.removeAllSubscribers()
return
}
remotePathObject, err := data.Get("remotePath")
if err != nil {
ww.removeAllSubscribers()
return
}
remotePath, _ := remotePathObject.String()
if remotePath == "" {
ww.removeAllSubscribers()
return
}
ww.subMutex.Lock()
ch, ok := ww.subscribers[remotePath]
if ok {
ch <- event
}
ww.subMutex.Unlock()
}
}
}

func (ww *WasmWebWorker) removeAllSubscribers() {
ww.subMutex.Lock()
for path, ch := range ww.subscribers {
close(ch)
delete(ww.subscribers, path)
ww.numberOfSubs--
}
ww.cancelContext()
ww.subMutex.Unlock()
}

func (ww *WasmWebWorker) Start() error {
Expand Down
2 changes: 1 addition & 1 deletion zboxcore/sdk/allocation.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ func SetMultiOpBatchSize(size int) {

func SetWasm() {
IsWasm = true
BatchSize = 1
BatchSize = 4
extraCount = 0
}

Expand Down
62 changes: 37 additions & 25 deletions zboxcore/sdk/chunked_upload_process_js.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ type FinalWorkerResult struct {
ThumbnailContentHash string
}

func (su *ChunkedUpload) listen(allEventChan []<-chan worker.MessageEvent, respChan chan error) {
func (su *ChunkedUpload) listen(allEventChan []chan worker.MessageEvent, respChan chan error) {
su.consensus.Reset()

var (
Expand Down Expand Up @@ -376,8 +376,12 @@ func (su *ChunkedUpload) listen(allEventChan []<-chan worker.MessageEvent, respC

func ProcessEventData(data safejs.Value) {
fileMeta, formInfo, fileShards, thumbnailChunkData, err := parseEventData(data)
var remotePath string
if fileMeta != nil {
remotePath = fileMeta.RemotePath
}
if err != nil {
selfPostMessage(false, false, err.Error(), 0, nil)
selfPostMessage(false, false, err.Error(), remotePath, 0, nil)
return
}
wp, ok := hasherMap[fileMeta.RemotePath]
Expand All @@ -398,7 +402,7 @@ func ProcessEventData(data safejs.Value) {
uploadData, err := formBuilder.Build(fileMeta, wp.hasher, formInfo.ConnectionID, formInfo.ChunkSize, formInfo.ChunkStartIndex, formInfo.ChunkEndIndex, formInfo.IsFinal, formInfo.EncryptedKey, formInfo.EncryptedKeyPoint,
fileShards, thumbnailChunkData, formInfo.ShardSize)
if err != nil {
selfPostMessage(false, false, err.Error(), formInfo.ChunkEndIndex, nil)
selfPostMessage(false, false, err.Error(), remotePath, formInfo.ChunkEndIndex, nil)
return
}
if formInfo.OnlyHash {
Expand All @@ -408,27 +412,27 @@ func ProcessEventData(data safejs.Value) {
ValidationRoot: uploadData.formData.ValidationRoot,
ThumbnailContentHash: uploadData.formData.ThumbnailContentHash,
}
selfPostMessage(true, true, "", formInfo.ChunkEndIndex, finalResult)
selfPostMessage(true, true, "", remotePath, formInfo.ChunkEndIndex, finalResult)
} else {
selfPostMessage(true, false, "", formInfo.ChunkEndIndex, nil)
selfPostMessage(true, false, "", remotePath, formInfo.ChunkEndIndex, nil)
}
return
}
blobberURL := os.Getenv("BLOBBER_URL")
if !formInfo.IsFinal {
wp.wg.Add(1)
}
go func(blobberData blobberData, wg *sync.WaitGroup) {
go func(blobberData blobberData, remotePath string, wg *sync.WaitGroup) {
if formInfo.IsFinal && len(blobberData.dataBuffers) > 1 {
err = sendUploadRequest(blobberData.dataBuffers[:len(blobberData.dataBuffers)-1], blobberData.contentSlice[:len(blobberData.contentSlice)-1], blobberURL, formInfo.AllocationID, formInfo.AllocationTx, formInfo.HttpMethod)
if err != nil {
selfPostMessage(false, true, err.Error(), formInfo.ChunkEndIndex, nil)
selfPostMessage(false, true, err.Error(), remotePath, formInfo.ChunkEndIndex, nil)
return
}
wg.Wait()
err = sendUploadRequest(blobberData.dataBuffers[len(blobberData.dataBuffers)-1:], blobberData.contentSlice[len(blobberData.contentSlice)-1:], blobberURL, formInfo.AllocationID, formInfo.AllocationTx, formInfo.HttpMethod)
if err != nil {
selfPostMessage(false, true, err.Error(), formInfo.ChunkEndIndex, nil)
selfPostMessage(false, true, err.Error(), remotePath, formInfo.ChunkEndIndex, nil)
return
}
} else {
Expand All @@ -439,7 +443,7 @@ func ProcessEventData(data safejs.Value) {
}
err = sendUploadRequest(blobberData.dataBuffers, blobberData.contentSlice, blobberURL, formInfo.AllocationID, formInfo.AllocationTx, formInfo.HttpMethod)
if err != nil {
selfPostMessage(false, formInfo.IsFinal, err.Error(), formInfo.ChunkEndIndex, nil)
selfPostMessage(false, formInfo.IsFinal, err.Error(), remotePath, formInfo.ChunkEndIndex, nil)
return
}
}
Expand All @@ -449,24 +453,25 @@ func ProcessEventData(data safejs.Value) {
ValidationRoot: blobberData.formData.ValidationRoot,
ThumbnailContentHash: blobberData.formData.ThumbnailContentHash,
}
selfPostMessage(true, true, "", formInfo.ChunkEndIndex, finalResult)
selfPostMessage(true, true, "", remotePath, formInfo.ChunkEndIndex, finalResult)
} else {
selfPostMessage(true, false, "", formInfo.ChunkEndIndex, nil)
selfPostMessage(true, false, "", remotePath, formInfo.ChunkEndIndex, nil)
}
}(uploadData, wp.wg)
}(uploadData, remotePath, wp.wg)

}

func InitHasherMap() {
hasherMap = make(map[string]workerProcess)
}

func selfPostMessage(success, isFinal bool, errMsg string, chunkEndIndex int, finalResult *FinalWorkerResult) {
func selfPostMessage(success, isFinal bool, errMsg, remotePath string, chunkEndIndex int, finalResult *FinalWorkerResult) {
obj := js.Global().Get("Object").New()
obj.Set("success", success)
obj.Set("error", errMsg)
obj.Set("isFinal", isFinal)
obj.Set("chunkEndIndex", chunkEndIndex)
obj.Set("remotePath", remotePath)
if finalResult != nil {
finalResultJSON, err := json.Marshal(finalResult)
if err != nil {
Expand Down Expand Up @@ -621,21 +626,28 @@ func sendUploadRequest(dataBuffers []*bytes.Buffer, contentSlice []string, blobb
func (su *ChunkedUpload) startProcessor() {
su.listenChan = make(chan struct{}, su.uploadWorkers)
su.processMap = make(map[int]int)
respChan := make(chan error, 1)
su.uploadWG.Add(1)
allEventChan := make([]<-chan worker.MessageEvent, len(su.blobbers))
var pos uint64
for i := su.uploadMask; !i.Equals64(0); i = i.And(zboxutil.NewUint128(1).Lsh(pos).Not()) {
pos = uint64(i.TrailingZeros())
blobber := su.blobbers[pos]
worker := jsbridge.GetWorker(blobber.blobber.ID)
if worker != nil {
eventChan, _ := worker.Listen(su.ctx)
allEventChan[pos] = eventChan
}
}

go func() {
respChan := make(chan error, 1)
allEventChan := make([]chan worker.MessageEvent, len(su.blobbers))
var pos uint64
for i := su.uploadMask; !i.Equals64(0); i = i.And(zboxutil.NewUint128(1).Lsh(pos).Not()) {
pos = uint64(i.TrailingZeros())
blobber := su.blobbers[pos]
webWorker := jsbridge.GetWorker(blobber.blobber.ID)
if webWorker != nil {
eventChan := make(chan worker.MessageEvent, su.uploadWorkers)
err := webWorker.SubscribeToEvents(su.fileMeta.RemotePath, eventChan)
if err != nil {
logger.Logger.Error("error subscribing to events: ", err)
su.ctxCncl(thrown.New("upload_failed", "Upload failed. Error subscribing to events"))
return
}
defer webWorker.UnsubscribeToEvents(su.fileMeta.RemotePath)
allEventChan[pos] = eventChan
}
}
defer su.uploadWG.Done()
for {
go su.listen(allEventChan, respChan)
Expand Down

0 comments on commit a7deec7

Please sign in to comment.