From ca8420a18d66f3b2824389dc50b9b27a07ffe712 Mon Sep 17 00:00:00 2001 From: hitenjain14 Date: Mon, 17 Jun 2024 17:45:55 +0530 Subject: [PATCH 1/2] add sub to web worker --- wasmsdk/jsbridge/webworker.go | 94 +++++++++++++++++++++-- zboxcore/sdk/chunked_upload_process_js.go | 16 ++-- 2 files changed, 100 insertions(+), 10 deletions(-) diff --git a/wasmsdk/jsbridge/webworker.go b/wasmsdk/jsbridge/webworker.go index 6d18fc895..1c81bd7ec 100644 --- a/wasmsdk/jsbridge/webworker.go +++ b/wasmsdk/jsbridge/webworker.go @@ -5,6 +5,8 @@ package jsbridge import ( "context" + "errors" + "sync" "github.com/google/uuid" "github.com/hack-pad/go-webworkers/worker" @@ -30,9 +32,15 @@ type WasmWebWorker struct { // environment. // If Env contains duplicate environment keys, only the last // value in the slice for each duplicate key is used. - Env []string - + Env []string worker *worker.Worker + + // For subscribing to events + ctx context.Context + cancelContext context.CancelFunc + subscribers map[string]chan worker.MessageEvent + numberOfSubs int + subMutex sync.Mutex } var ( @@ -46,9 +54,10 @@ func NewWasmWebWorker(blobberID, blobberURL, clientID, publicKey, privateKey, mn } w := &WasmWebWorker{ - Name: blobberURL, - Env: []string{"BLOBBER_URL=" + blobberURL, "CLIENT_ID=" + clientID, "PRIVATE_KEY=" + privateKey, "MODE=worker", "PUBLIC_KEY=" + publicKey, "MNEMONIC=" + mnemonic}, - Path: "zcn.wasm", + Name: blobberURL, + Env: []string{"BLOBBER_URL=" + blobberURL, "CLIENT_ID=" + clientID, "PRIVATE_KEY=" + privateKey, "MODE=worker", "PUBLIC_KEY=" + publicKey, "MNEMONIC=" + mnemonic}, + Path: "zcn.wasm", + subscribers: make(map[string]chan worker.MessageEvent), } if err := w.Start(); err != nil { @@ -71,6 +80,81 @@ func RemoveWorker(blobberID string) { } } +// pass a buffered channel to subscribe to events so that the caller is not blocked +func (ww *WasmWebWorker) SubscribeToEvents(remotePath string, ch chan worker.MessageEvent) error { + if ch == nil { + return errors.New("channel is nil") + } + ww.subMutex.Lock() + ww.subscribers[remotePath] = ch + ww.numberOfSubs++ + //start the worker listener if there are subscribers + if ww.numberOfSubs == 1 { + ctx, cancel := context.WithCancel(context.Background()) + ww.ctx = ctx + ww.cancelContext = cancel + eventChan, err := ww.Listen(ctx) + if err != nil { + return err + } + go ww.ListenForEvents(eventChan) + } + ww.subMutex.Unlock() + return nil +} + +func (ww *WasmWebWorker) UnsubscribeToEvents(remotePath string) { + ww.subMutex.Lock() + ch, ok := ww.subscribers[remotePath] + if ok { + close(ch) + delete(ww.subscribers, remotePath) + ww.numberOfSubs-- + //stop the worker listener if there are no subscribers + if ww.numberOfSubs == 0 { + ww.cancelContext() + } + } + ww.subMutex.Unlock() +} + +func (ww *WasmWebWorker) ListenForEvents(eventChan <-chan worker.MessageEvent) { + for { + select { + case <-ww.ctx.Done(): + return + case event := <-eventChan: + //get remote path from the event + data, err := event.Data() + // if above throws an error, pass it to all the subscribers + if err != nil { + ww.sendEventToAllSubscribers(event) + return + } + remotePathObject, err := data.Get("remotePath") + if err != nil { + ww.sendEventToAllSubscribers(event) + return + } + remotePath, _ := remotePathObject.String() + ww.subMutex.Lock() + ch, ok := ww.subscribers[remotePath] + if ok { + ch <- event + } + ww.subMutex.Unlock() + } + } +} + +func (ww *WasmWebWorker) sendEventToAllSubscribers(event worker.MessageEvent) { + ww.subMutex.Lock() + for _, ch := range ww.subscribers { + ch <- event + } + ww.subMutex.Unlock() +} + func (ww *WasmWebWorker) Start() error { workerJS, err := buildWorkerJS(ww.Args, ww.Env, ww.Path) if err != nil { diff --git a/zboxcore/sdk/chunked_upload_process_js.go b/zboxcore/sdk/chunked_upload_process_js.go index cfbd2e0fe..25e25b8cb 100644 --- a/zboxcore/sdk/chunked_upload_process_js.go +++ b/zboxcore/sdk/chunked_upload_process_js.go @@ -217,7 +217,7 @@ type FinalWorkerResult struct { ThumbnailContentHash string } -func (su *ChunkedUpload) listen(allEventChan []<-chan worker.MessageEvent, respChan chan error) { +func (su *ChunkedUpload) listen(allEventChan []chan worker.MessageEvent, respChan chan error) { su.consensus.Reset() var ( @@ -623,14 +623,20 @@ func (su *ChunkedUpload) startProcessor() { su.processMap = make(map[int]int) respChan := make(chan error, 1) su.uploadWG.Add(1) - allEventChan := make([]<-chan worker.MessageEvent, len(su.blobbers)) + allEventChan := make([]chan worker.MessageEvent, len(su.blobbers)) var pos uint64 for i := su.uploadMask; !i.Equals64(0); i = i.And(zboxutil.NewUint128(1).Lsh(pos).Not()) { pos = uint64(i.TrailingZeros()) blobber := su.blobbers[pos] - worker := jsbridge.GetWorker(blobber.blobber.ID) - if worker != nil { - eventChan, _ := worker.Listen(su.ctx) + webWorker := jsbridge.GetWorker(blobber.blobber.ID) + if webWorker != nil { + eventChan := make(chan worker.MessageEvent, su.uploadWorkers) + err := webWorker.SubscribeToEvents(su.fileMeta.Path, eventChan) + if err != nil { + logger.Logger.Error("error subscribing to events: ", err) + su.ctxCncl(thrown.New("upload_failed", "Upload failed. Error subscribing to events")) + return + } allEventChan[pos] = eventChan } } From 81b050f742d3f61bc2256cc7b0d52264119e6d11 Mon Sep 17 00:00:00 2001 From: hitenjain14 Date: Mon, 17 Jun 2024 19:16:31 +0530 Subject: [PATCH 2/2] global listener --- wasmsdk/demo/index.html | 2 +- wasmsdk/jsbridge/webworker.go | 38 ++++++++++--- zboxcore/sdk/allocation.go | 2 +- zboxcore/sdk/chunked_upload_process_js.go | 66 ++++++++++++----------- 4 files changed, 68 insertions(+), 40 deletions(-) diff --git a/wasmsdk/demo/index.html b/wasmsdk/demo/index.html index 88f90bd81..bc4f1fb98 100644 --- a/wasmsdk/demo/index.html +++ b/wasmsdk/demo/index.html @@ -211,7 +211,7 @@

please download zcn.wasm from https://github.com/0chain/gosdk/releases/lates let network = query.get('network') if (!network || network == 'undefined') { - network = "mob.zus.network" + network = "dev.zus.network" } const blockWorker = 'https://' + network + '/dns'; diff --git a/wasmsdk/jsbridge/webworker.go b/wasmsdk/jsbridge/webworker.go index 1c81bd7ec..c7980fc87 100644 --- a/wasmsdk/jsbridge/webworker.go +++ b/wasmsdk/jsbridge/webworker.go @@ -41,6 +41,9 @@ type WasmWebWorker struct { subscribers map[string]chan worker.MessageEvent numberOfSubs int subMutex sync.Mutex + + //isTerminated bool + isTerminated bool } var ( @@ -75,8 +78,13 @@ func GetWorker(blobberID string) *WasmWebWorker { func RemoveWorker(blobberID string) { worker, ok := workers[blobberID] if ok { - worker.Terminate() - delete(workers, blobberID) + worker.subMutex.Lock() + if worker.numberOfSubs == 0 { + worker.Terminate() + delete(workers, blobberID) + worker.isTerminated = true + } + worker.subMutex.Unlock() } } @@ -86,6 +94,10 @@ func (ww *WasmWebWorker) SubscribeToEvents(remotePath string, ch chan worker.Mes return errors.New("channel is nil") } ww.subMutex.Lock() + if ww.isTerminated { + ww.subMutex.Unlock() + return errors.New("worker is terminated") + } ww.subscribers[remotePath] = ch ww.numberOfSubs++ //start the worker listener if there are subscribers @@ -123,20 +135,27 @@ func (ww *WasmWebWorker) ListenForEvents(eventChan <-chan worker.MessageEvent) { select { case <-ww.ctx.Done(): return - case event := <-eventChan: + case event, ok := <-eventChan: + if !ok { + return + } //get remote path from the event data, err := event.Data() // if above throws an error, pass it to all the subscribers if err != nil { - ww.sendEventToAllSubscribers(event) + ww.removeAllSubscribers() return } remotePathObject, err := data.Get("remotePath") if err != nil { - ww.sendEventToAllSubscribers(event) + ww.removeAllSubscribers() return } remotePath, _ := remotePathObject.String() + if remotePath == "" { + ww.removeAllSubscribers() + return + } ww.subMutex.Lock() ch, ok := ww.subscribers[remotePath] if ok { @@ -147,11 +166,14 @@ func (ww *WasmWebWorker) ListenForEvents(eventChan <-chan worker.MessageEvent) { } } -func (ww *WasmWebWorker) sendEventToAllSubscribers(event worker.MessageEvent) { +func (ww *WasmWebWorker) removeAllSubscribers() { ww.subMutex.Lock() - for _, ch := range ww.subscribers { - ch <- event + for path, ch := range ww.subscribers { + close(ch) + delete(ww.subscribers, path) + ww.numberOfSubs-- } + ww.cancelContext() ww.subMutex.Unlock() } diff --git a/zboxcore/sdk/allocation.go b/zboxcore/sdk/allocation.go index 676fd02ff..d1e6647e1 100644 --- a/zboxcore/sdk/allocation.go +++ b/zboxcore/sdk/allocation.go @@ -253,7 +253,7 @@ func SetMultiOpBatchSize(size int) { func SetWasm() { IsWasm = true - BatchSize = 1 + BatchSize = 4 extraCount = 0 } diff --git a/zboxcore/sdk/chunked_upload_process_js.go b/zboxcore/sdk/chunked_upload_process_js.go index 25e25b8cb..b6a0c8d3b 100644 --- a/zboxcore/sdk/chunked_upload_process_js.go +++ b/zboxcore/sdk/chunked_upload_process_js.go @@ -376,8 +376,12 @@ func (su *ChunkedUpload) listen(allEventChan []chan worker.MessageEvent, respCha func ProcessEventData(data safejs.Value) { fileMeta, formInfo, fileShards, thumbnailChunkData, err := parseEventData(data) + var remotePath string + if fileMeta != nil { + remotePath = fileMeta.RemotePath + } if err != nil { - selfPostMessage(false, false, err.Error(), 0, nil) + selfPostMessage(false, false, err.Error(), remotePath, 0, nil) return } wp, ok := hasherMap[fileMeta.RemotePath] @@ -398,7 +402,7 @@ func ProcessEventData(data safejs.Value) { uploadData, err := formBuilder.Build(fileMeta, wp.hasher, formInfo.ConnectionID, formInfo.ChunkSize, formInfo.ChunkStartIndex, formInfo.ChunkEndIndex, formInfo.IsFinal, formInfo.EncryptedKey, formInfo.EncryptedKeyPoint, fileShards, thumbnailChunkData, formInfo.ShardSize) if err != nil { - selfPostMessage(false, false, err.Error(), formInfo.ChunkEndIndex, nil) + selfPostMessage(false, false, err.Error(), remotePath, formInfo.ChunkEndIndex, nil) return } if formInfo.OnlyHash { @@ -408,9 +412,9 @@ func ProcessEventData(data safejs.Value) { ValidationRoot: uploadData.formData.ValidationRoot, ThumbnailContentHash: uploadData.formData.ThumbnailContentHash, } - selfPostMessage(true, true, "", formInfo.ChunkEndIndex, finalResult) + selfPostMessage(true, true, "", remotePath, formInfo.ChunkEndIndex, finalResult) } else { - selfPostMessage(true, false, "", formInfo.ChunkEndIndex, nil) + selfPostMessage(true, false, "", remotePath, formInfo.ChunkEndIndex, nil) } return } @@ -418,17 +422,17 @@ func ProcessEventData(data safejs.Value) { if !formInfo.IsFinal { wp.wg.Add(1) } - go func(blobberData blobberData, wg *sync.WaitGroup) { + go func(blobberData blobberData, remotePath string, wg *sync.WaitGroup) { if formInfo.IsFinal && len(blobberData.dataBuffers) > 1 { err = sendUploadRequest(blobberData.dataBuffers[:len(blobberData.dataBuffers)-1], blobberData.contentSlice[:len(blobberData.contentSlice)-1], blobberURL, formInfo.AllocationID, formInfo.AllocationTx, formInfo.HttpMethod) if err != nil { - selfPostMessage(false, true, err.Error(), formInfo.ChunkEndIndex, nil) + selfPostMessage(false, true, err.Error(), remotePath, formInfo.ChunkEndIndex, nil) return } wg.Wait() err = sendUploadRequest(blobberData.dataBuffers[len(blobberData.dataBuffers)-1:], blobberData.contentSlice[len(blobberData.contentSlice)-1:], blobberURL, formInfo.AllocationID, formInfo.AllocationTx, formInfo.HttpMethod) if err != nil { - selfPostMessage(false, true, err.Error(), formInfo.ChunkEndIndex, nil) + selfPostMessage(false, true, err.Error(), remotePath, formInfo.ChunkEndIndex, nil) return } } else { @@ -439,7 +443,7 @@ func ProcessEventData(data safejs.Value) { } err = sendUploadRequest(blobberData.dataBuffers, blobberData.contentSlice, blobberURL, formInfo.AllocationID, formInfo.AllocationTx, formInfo.HttpMethod) if err != nil { - selfPostMessage(false, formInfo.IsFinal, err.Error(), formInfo.ChunkEndIndex, nil) + selfPostMessage(false, formInfo.IsFinal, err.Error(), remotePath, formInfo.ChunkEndIndex, nil) return } } @@ -449,11 +453,11 @@ func ProcessEventData(data safejs.Value) { ValidationRoot: blobberData.formData.ValidationRoot, ThumbnailContentHash: blobberData.formData.ThumbnailContentHash, } - selfPostMessage(true, true, "", formInfo.ChunkEndIndex, finalResult) + selfPostMessage(true, true, "", remotePath, formInfo.ChunkEndIndex, finalResult) } else { - selfPostMessage(true, false, "", formInfo.ChunkEndIndex, nil) + selfPostMessage(true, false, "", remotePath, formInfo.ChunkEndIndex, nil) } - }(uploadData, wp.wg) + }(uploadData, remotePath, wp.wg) } @@ -461,12 +465,13 @@ func InitHasherMap() { hasherMap = make(map[string]workerProcess) } -func selfPostMessage(success, isFinal bool, errMsg string, chunkEndIndex int, finalResult *FinalWorkerResult) { +func selfPostMessage(success, isFinal bool, errMsg, remotePath string, chunkEndIndex int, finalResult *FinalWorkerResult) { obj := js.Global().Get("Object").New() obj.Set("success", success) obj.Set("error", errMsg) obj.Set("isFinal", isFinal) obj.Set("chunkEndIndex", chunkEndIndex) + obj.Set("remotePath", remotePath) if finalResult != nil { finalResultJSON, err := json.Marshal(finalResult) if err != nil { @@ -621,27 +626,28 @@ func sendUploadRequest(dataBuffers []*bytes.Buffer, contentSlice []string, blobb func (su *ChunkedUpload) startProcessor() { su.listenChan = make(chan struct{}, su.uploadWorkers) su.processMap = make(map[int]int) - respChan := make(chan error, 1) su.uploadWG.Add(1) - allEventChan := make([]chan worker.MessageEvent, len(su.blobbers)) - var pos uint64 - for i := su.uploadMask; !i.Equals64(0); i = i.And(zboxutil.NewUint128(1).Lsh(pos).Not()) { - pos = uint64(i.TrailingZeros()) - blobber := su.blobbers[pos] - webWorker := jsbridge.GetWorker(blobber.blobber.ID) - if webWorker != nil { - eventChan := make(chan worker.MessageEvent, su.uploadWorkers) - err := webWorker.SubscribeToEvents(su.fileMeta.Path, eventChan) - if err != nil { - logger.Logger.Error("error subscribing to events: ", err) - su.ctxCncl(thrown.New("upload_failed", "Upload failed. Error subscribing to events")) - return - } - allEventChan[pos] = eventChan - } - } go func() { + respChan := make(chan error, 1) + allEventChan := make([]chan worker.MessageEvent, len(su.blobbers)) + var pos uint64 + for i := su.uploadMask; !i.Equals64(0); i = i.And(zboxutil.NewUint128(1).Lsh(pos).Not()) { + pos = uint64(i.TrailingZeros()) + blobber := su.blobbers[pos] + webWorker := jsbridge.GetWorker(blobber.blobber.ID) + if webWorker != nil { + eventChan := make(chan worker.MessageEvent, su.uploadWorkers) + err := webWorker.SubscribeToEvents(su.fileMeta.RemotePath, eventChan) + if err != nil { + logger.Logger.Error("error subscribing to events: ", err) + su.ctxCncl(thrown.New("upload_failed", "Upload failed. Error subscribing to events")) + return + } + defer webWorker.UnsubscribeToEvents(su.fileMeta.RemotePath) + allEventChan[pos] = eventChan + } + } defer su.uploadWG.Done() for { go su.listen(allEventChan, respChan)