Skip to content

Commit

Permalink
Merge pull request #1546 from 0chain/fix/wasm-ready
Browse files Browse the repository at this point in the history
Wait for start event from web workers
  • Loading branch information
dabasov authored Jul 1, 2024
2 parents b55cbcc + fb75949 commit 63badd2
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 8 deletions.
13 changes: 11 additions & 2 deletions wasmsdk/blobber.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/0chain/gosdk/core/common"
"github.com/0chain/gosdk/core/pathutil"
"github.com/0chain/gosdk/core/sys"
"github.com/hack-pad/safejs"

"github.com/0chain/gosdk/core/transaction"
"github.com/0chain/gosdk/wasmsdk/jsbridge"
Expand Down Expand Up @@ -629,6 +630,12 @@ func multiUpload(jsonBulkUploadOptions string) (MultiUploadResult, error) {
result.Success = false
return result, errors.New("Error fetching the allocation")
}
err = addWebWorkers(allocationObj)
if err != nil {
result.Error = err.Error()
result.Success = false
return result, err
}

operationRequests := make([]sdk.OperationRequest, n)
for idx, option := range options {
Expand Down Expand Up @@ -1019,10 +1026,10 @@ func terminateWorkersWithAllocation(alloc *sdk.Allocation) {
}
}

func createWorkers(allocationID string) bool {
func createWorkers(allocationID string) error {
alloc, err := getAllocation(allocationID)
if err != nil {
return false
return err
}
return addWebWorkers(alloc)
}
Expand All @@ -1035,6 +1042,8 @@ func startListener() error {
if err != nil {
return err
}
safeVal, _ := safejs.ValueOf("startListener")
selfWorker.PostMessage(safeVal, nil) //nolint:errcheck

listener, err := selfWorker.Listen(ctx)
if err != nil {
Expand Down
47 changes: 42 additions & 5 deletions wasmsdk/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
package main

import (
"context"
"errors"
"time"

"github.com/0chain/gosdk/wasmsdk/jsbridge"
Expand Down Expand Up @@ -66,17 +68,52 @@ func reloadAllocation(allocationID string) (*sdk.Allocation, error) {
return it.Allocation, nil
}

func addWebWorkers(alloc *sdk.Allocation) (isCreated bool) {
func addWebWorkers(alloc *sdk.Allocation) (err error) {
c := client.GetClient()
if c == nil || len(c.Keys) == 0 {
return
}

ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
respChan := make(chan error, len(alloc.Blobbers))
respRequired := 0
for _, blober := range alloc.Blobbers {
_, workerCreated, _ := jsbridge.NewWasmWebWorker(blober.ID, blober.Baseurl, c.ClientID, c.Keys[0].PublicKey, c.Keys[0].PrivateKey, c.Mnemonic) //nolint:errcheck
weborker, workerCreated, _ := jsbridge.NewWasmWebWorker(blober.ID, blober.Baseurl, c.ClientID, c.Keys[0].PublicKey, c.Keys[0].PrivateKey, c.Mnemonic) //nolint:errcheck
if workerCreated {
isCreated = true
respRequired++
go func() {
eventChan, err := weborker.Listen(ctx)
if err != nil {
respChan <- err
return
}
_, ok := <-eventChan
if !ok {
respChan <- errors.New("worker chan closed")
return
}
respChan <- nil
}()
}
}
if respRequired == 0 {
return
}
for {
select {
case <-ctx.Done():
PrintError(ctx.Err())
return ctx.Err()
case err = <-respChan:
if err != nil {
PrintError(err)
return
}
respRequired--
if respRequired == 0 {
close(respChan)
return
}
}
}
return
}
4 changes: 3 additions & 1 deletion wasmsdk/jsbridge/webworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"errors"
"sync"

"github.com/0chain/gosdk/core/version"
"github.com/google/uuid"
"github.com/hack-pad/go-webworkers/worker"
"github.com/hack-pad/safejs"
Expand Down Expand Up @@ -60,7 +61,7 @@ func NewWasmWebWorker(blobberID, blobberURL, clientID, publicKey, privateKey, mn
w := &WasmWebWorker{
Name: blobberURL,
Env: []string{"BLOBBER_URL=" + blobberURL, "CLIENT_ID=" + clientID, "PRIVATE_KEY=" + privateKey, "MODE=worker", "PUBLIC_KEY=" + publicKey, "MNEMONIC=" + mnemonic},
Path: "zcn.wasm",
Path: "zcn.wasm?v=" + version.VERSIONSTR,
subscribers: make(map[string]chan worker.MessageEvent),
}

Expand Down Expand Up @@ -108,6 +109,7 @@ func (ww *WasmWebWorker) SubscribeToEvents(remotePath string, ch chan worker.Mes
ww.cancelContext = cancel
eventChan, err := ww.Listen(ctx)
if err != nil {
ww.subMutex.Unlock()
return err
}
go ww.ListenForEvents(eventChan)
Expand Down

0 comments on commit 63badd2

Please sign in to comment.