From fcb034b1cfe2c1e22a71e989f6f257d45cc4fa30 Mon Sep 17 00:00:00 2001 From: iyear Date: Sat, 31 Dec 2022 15:32:41 +0800 Subject: [PATCH] fix(dcpool): add flood wait middleware --- app/dl/dl.go | 3 +- pkg/dcpool/dcpool.go | 20 +++++---- pkg/dcpool/lazypool.go | 85 --------------------------------------- pkg/dcpool/middlewares.go | 17 ++++++++ 4 files changed, 31 insertions(+), 94 deletions(-) delete mode 100644 pkg/dcpool/lazypool.go create mode 100644 pkg/dcpool/middlewares.go diff --git a/app/dl/dl.go b/app/dl/dl.go index 95894c20c7..247df30c3f 100644 --- a/app/dl/dl.go +++ b/app/dl/dl.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "github.com/fatih/color" + "github.com/gotd/contrib/middleware/floodwait" "github.com/iyear/tdl/app/internal/tgc" "github.com/iyear/tdl/pkg/consts" "github.com/iyear/tdl/pkg/dcpool" @@ -24,7 +25,7 @@ func Run(ctx context.Context, dir string, rewriteExt, skipSame bool, template st color.Green("Preparing DC pool... It may take a while. size: %d", poolSize) start := time.Now() - pool, err := dcpool.NewPool(ctx, c, poolSize) + pool, err := dcpool.NewPool(ctx, c, poolSize, floodwait.NewSimpleWaiter()) if err != nil { return err } diff --git a/pkg/dcpool/dcpool.go b/pkg/dcpool/dcpool.go index 55aaae95e2..217cd941f2 100644 --- a/pkg/dcpool/dcpool.go +++ b/pkg/dcpool/dcpool.go @@ -14,18 +14,20 @@ var dcs = []int{1, 2, 3, 4, 5} type Pool interface { Client(dc int) *tg.Client - Invoker(dc int) telegram.CloseInvoker + Invoker(dc int) tg.Invoker Default() int Close() error } type pool struct { - invokers map[int]telegram.CloseInvoker + invokers map[int]tg.Invoker + closes map[int]func() error _default int } -func NewPool(ctx context.Context, c *telegram.Client, size int64) (Pool, error) { - m := make(map[int]telegram.CloseInvoker) +func NewPool(ctx context.Context, c *telegram.Client, size int64, middlewares ...telegram.Middleware) (Pool, error) { + m := make(map[int]tg.Invoker) + closes := make(map[int]func() error) mu := &sync.Mutex{} curDC := c.Config().ThisDC @@ -50,7 +52,8 @@ func NewPool(ctx context.Context, c *telegram.Client, size int64) (Pool, error) } mu.Lock() - m[dc] = invoker + closes[dc] = invoker.Close + m[dc] = chainMiddlewares(invoker, middlewares...) mu.Unlock() return nil @@ -67,6 +70,7 @@ func NewPool(ctx context.Context, c *telegram.Client, size int64) (Pool, error) return &pool{ invokers: m, + closes: closes, _default: curDC, }, nil } @@ -75,7 +79,7 @@ func (p *pool) Client(dc int) *tg.Client { return tg.NewClient(p.Invoker(dc)) } -func (p *pool) Invoker(dc int) telegram.CloseInvoker { +func (p *pool) Invoker(dc int) tg.Invoker { i, ok := p.invokers[dc] if !ok { return p.invokers[p._default] @@ -89,8 +93,8 @@ func (p *pool) Default() int { func (p *pool) Close() error { var err error - for _, invokers := range p.invokers { - err = multierr.Append(err, invokers.Close()) + for _, c := range p.closes { + err = multierr.Append(err, c()) } return err diff --git a/pkg/dcpool/lazypool.go b/pkg/dcpool/lazypool.go deleted file mode 100644 index 815e756c43..0000000000 --- a/pkg/dcpool/lazypool.go +++ /dev/null @@ -1,85 +0,0 @@ -package dcpool - -import ( - "context" - "github.com/gotd/td/bin" - "github.com/gotd/td/telegram" - "github.com/gotd/td/tg" - "go.uber.org/multierr" - "sync" -) - -type lazyPool struct { - mu *sync.Mutex - client *telegram.Client - invokers map[int]telegram.CloseInvoker - _default int - size int64 - ctx context.Context -} - -type gotdCloseInvoker struct { - client *telegram.Client -} - -func (g *gotdCloseInvoker) Invoke(ctx context.Context, input bin.Encoder, output bin.Decoder) error { - return g.client.Invoke(ctx, input, output) -} - -func (g *gotdCloseInvoker) Close() error { - return nil -} - -func NewLazyPool(ctx context.Context, c *telegram.Client, size int64) (Pool, error) { - return &lazyPool{ - mu: &sync.Mutex{}, - client: c, - invokers: make(map[int]telegram.CloseInvoker), - _default: c.Config().ThisDC, - size: size, - ctx: ctx, - }, nil -} - -func (p *lazyPool) Client(dc int) *tg.Client { - return tg.NewClient(p.Invoker(dc)) -} - -func (p *lazyPool) Invoker(dc int) telegram.CloseInvoker { - i, ok := p.invokers[dc] - if !ok { - var ( - invoker telegram.CloseInvoker - err error - ) - if dc == p._default { - invoker, err = p.client.Pool(p.size) - } else { - invoker, err = p.client.DC(p.ctx, dc, p.size) - } - - if err != nil { - return &gotdCloseInvoker{client: p.client} - } - - p.mu.Lock() - p.invokers[dc] = invoker - p.mu.Unlock() - return invoker - } - - return i -} - -func (p *lazyPool) Default() int { - return p._default -} - -func (p *lazyPool) Close() error { - var err error - for _, invokers := range p.invokers { - err = multierr.Append(err, invokers.Close()) - } - - return err -} diff --git a/pkg/dcpool/middlewares.go b/pkg/dcpool/middlewares.go new file mode 100644 index 0000000000..c6dbd2449e --- /dev/null +++ b/pkg/dcpool/middlewares.go @@ -0,0 +1,17 @@ +package dcpool + +import ( + "github.com/gotd/td/telegram" + "github.com/gotd/td/tg" +) + +func chainMiddlewares(invoker tg.Invoker, chain ...telegram.Middleware) tg.Invoker { + if len(chain) == 0 { + return invoker + } + for i := len(chain) - 1; i >= 0; i-- { + invoker = chain[i].Handle(invoker) + } + + return invoker +}