Skip to content

Commit

Permalink
Merge pull request #1537 from 0chain/fix/download-client
Browse files Browse the repository at this point in the history
Use client in place of hostclient for redirection
  • Loading branch information
dabasov authored Jun 18, 2024
2 parents 19ba55d + a7df55b commit 72bc427
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 44 deletions.
3 changes: 0 additions & 3 deletions zboxcore/sdk/allocation.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,9 +323,6 @@ func (a *Allocation) InitAllocation() {
a.mutex = &sync.Mutex{}
a.commitMutex = &sync.Mutex{}
a.fullconsensus, a.consensusThreshold = a.getConsensuses()
for _, blobber := range a.Blobbers {
zboxutil.SetHostClient(blobber.ID, blobber.Baseurl)
}
a.readFree = true
if a.ReadPriceRange.Max > 0 {
for _, blobberDetail := range a.BlobberDetails {
Expand Down
12 changes: 6 additions & 6 deletions zboxcore/sdk/blockdownloadworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,14 @@ func InitBlockDownloader(blobbers []*blockchain.StorageNode, workerCount int) {
for _, blobber := range blobbers {
if _, ok := downloadBlockChan[blobber.ID]; !ok {
downloadBlockChan[blobber.ID] = make(chan *BlockDownloadRequest, workerCount)
go startBlockDownloadWorker(downloadBlockChan[blobber.ID], workerCount, blobber.ID, blobber.Baseurl)
go startBlockDownloadWorker(downloadBlockChan[blobber.ID], workerCount)
}
}
}

func startBlockDownloadWorker(blobberChan chan *BlockDownloadRequest, workers int, id, baseURL string) {
func startBlockDownloadWorker(blobberChan chan *BlockDownloadRequest, workers int) {
sem := semaphore.NewWeighted(int64(workers))
hostClient := zboxutil.GetHostClient(id, baseURL)
fastClient := zboxutil.GetFastHTTPClient()
for {
blockDownloadReq, open := <-blobberChan
if !open {
Expand All @@ -97,7 +97,7 @@ func startBlockDownloadWorker(blobberChan chan *BlockDownloadRequest, workers in
continue
}
go func() {
blockDownloadReq.downloadBlobberBlock(hostClient)
blockDownloadReq.downloadBlobberBlock(fastClient)
sem.Release(1)
}()
}
Expand All @@ -116,7 +116,7 @@ func splitData(buf []byte, lim int) [][]byte {
return chunks
}

func (req *BlockDownloadRequest) downloadBlobberBlock(hostClient *fasthttp.HostClient) {
func (req *BlockDownloadRequest) downloadBlobberBlock(fastClient *fasthttp.Client) {
if req.numBlocks <= 0 {
req.result <- &downloadBlock{Success: false, idx: req.blobberIdx, err: errors.New("invalid_request", "Invalid number of blocks for download")}
return
Expand Down Expand Up @@ -157,7 +157,7 @@ func (req *BlockDownloadRequest) downloadBlobberBlock(hostClient *fasthttp.HostC

err = func() error {
now := time.Now()
statuscode, respBuf, err := hostClient.GetWithRequest(httpreq, req.respBuf)
statuscode, respBuf, err := fastClient.GetWithRequest(httpreq, req.respBuf)
fasthttp.ReleaseRequest(httpreq)
timeTaken := time.Since(now).Milliseconds()
if err != nil {
Expand Down
40 changes: 5 additions & 35 deletions zboxcore/zboxutil/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ type FastClient interface {

var (
Client HttpClient
HostClientMap = make(map[string]*fasthttp.HostClient)
FastHttpClient FastClient
hostLock sync.RWMutex
log logger.Logger
Expand Down Expand Up @@ -132,41 +131,12 @@ func (pfe *proxyFromEnv) isLoopback(host string) (ok bool) {
return net.ParseIP(host).IsLoopback()
}

func SetHostClient(id, baseURL string) {
hostLock.Lock()
defer hostLock.Unlock()
if _, ok := HostClientMap[id]; !ok {
u, _ := url.Parse(baseURL)
host := fasthttp.AddMissingPort(u.Host, true)
HostClientMap[id] = &fasthttp.HostClient{
NoDefaultUserAgentHeader: true,
Addr: host,
MaxConns: 1024,
MaxIdleConnDuration: 45 * time.Second,
DisableHeaderNamesNormalizing: true,
DisablePathNormalizing: true,
Dial: (&fasthttp.TCPDialer{
Concurrency: 4096,
DNSCacheDuration: time.Hour,
}).Dial,
IsTLS: u.Scheme == "https",
ReadTimeout: 30 * time.Second,
WriteTimeout: 30 * time.Second,
}
func GetFastHTTPClient() *fasthttp.Client {
fc, ok := FastHttpClient.(*fasthttp.Client)
if ok {
return fc
}
}

func GetHostClient(id, baseURL string) *fasthttp.HostClient {
hostLock.RLock()
hc := HostClientMap[id]
if hc == nil {
hostLock.RUnlock()
SetHostClient(id, baseURL)
hostLock.RLock()
hc = HostClientMap[id]
}
hostLock.RUnlock()
return hc
return nil
}

func (pfe *proxyFromEnv) Proxy(req *http.Request) (proxy *url.URL, err error) {
Expand Down

0 comments on commit 72bc427

Please sign in to comment.