From 0cd0d85d27ef1c6bbad0140cdebd3a59413cdfbe Mon Sep 17 00:00:00 2001 From: iyear Date: Fri, 3 Feb 2023 14:33:24 +0800 Subject: [PATCH] feat(dl): speed up resuming download --- app/internal/dliter/iter.go | 12 +++--- app/internal/dliter/iter_test.go | 69 ++++++++++++++++++++++++++++++++ app/internal/dliter/types.go | 1 + app/internal/dliter/util.go | 9 +++++ pkg/downloader/downloader.go | 21 +++++----- 5 files changed, 95 insertions(+), 17 deletions(-) create mode 100644 app/internal/dliter/iter_test.go diff --git a/app/internal/dliter/iter.go b/app/internal/dliter/iter.go index c0f681cf69..5d6417025b 100644 --- a/app/internal/dliter/iter.go +++ b/app/internal/dliter/iter.go @@ -44,6 +44,7 @@ func New(opts *Options) (*Iter, error) { exclude: excludeMap, curi: 0, curj: -1, + preSum: preSum(dialogs), finished: make(map[int]struct{}), template: tpl, manager: manager, @@ -68,14 +69,15 @@ func (iter *Iter) Next(ctx context.Context) (*downloader.Item, error) { } iter.curj = 0 } + i, j := iter.curi, iter.curj iter.mu.Unlock() // check if finished - if _, ok := iter.finished[iter.ij2n(iter.curi, iter.curj)]; ok { + if _, ok := iter.finished[iter.ij2n(i, j)]; ok { return nil, downloader.ErrSkip } - return iter.item(ctx, iter.curi, iter.curj) + return iter.item(ctx, i, j) } func (iter *Iter) item(ctx context.Context, i, j int) (*downloader.Item, error) { @@ -159,11 +161,7 @@ func (iter *Iter) Total(_ context.Context) int { } func (iter *Iter) ij2n(i, j int) int { - n := 0 - for k := 0; k < i; k++ { - n += len(iter.dialogs[k].Messages) - } - return n + j + return iter.preSum[i] + j } func (iter *Iter) SetFinished(finished map[int]struct{}) { diff --git a/app/internal/dliter/iter_test.go b/app/internal/dliter/iter_test.go new file mode 100644 index 0000000000..51b8393a5d --- /dev/null +++ b/app/internal/dliter/iter_test.go @@ -0,0 +1,69 @@ +package dliter + +import ( + "reflect" + "testing" +) + +func TestPreSum(t *testing.T) { + tests := []struct { + dialogs []*Dialog + want []int + }{ + { + dialogs: []*Dialog{{Messages: []int{1, 2, 3}}, {Messages: []int{1, 2}}}, + want: []int{0, 3, 5}, + }, + { + dialogs: []*Dialog{{Messages: []int{1, 2, 3}}, {Messages: []int{1, 2, 3}}, {Messages: []int{1, 2, 3, 4}}}, + want: []int{0, 3, 6, 10}, + }, + { + dialogs: []*Dialog{{Messages: []int{1, 2, 3}}, {Messages: []int{1, 2, 3}}, {Messages: []int{1, 2, 3, 4}}, {Messages: []int{1}}}, + want: []int{0, 3, 6, 10, 11}, + }, + } + + for _, tt := range tests { + got := preSum(tt.dialogs) + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("preSum() = %v, want %v", got, tt.want) + } + } +} + +func TestIter_ij2n(t *testing.T) { + tests := []struct { + dialogs []*Dialog + input []struct { + i, j int + } + want []int + }{ + { + dialogs: []*Dialog{{Messages: []int{1, 2, 3}}, {Messages: []int{1, 2}}}, + input: []struct { + i, j int + }{{0, 0}, {0, 1}, {0, 2}, {1, 0}, {1, 1}}, + want: []int{0, 1, 2, 3, 4}, + }, + { + dialogs: []*Dialog{{Messages: []int{1, 2, 3}}, {Messages: []int{1, 2, 3}}, {Messages: []int{1, 2, 3, 4}}}, + input: []struct { + i, j int + }{{0, 0}, {0, 1}, {0, 2}, {1, 0}, {1, 1}, {1, 2}, {2, 0}, {2, 1}, {2, 2}, {2, 3}}, + want: []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, + }, + } + + for _, tt := range tests { + iter := &Iter{preSum: preSum(tt.dialogs), dialogs: tt.dialogs} + + for i, input := range tt.input { + got := iter.ij2n(input.i, input.j) + if got != tt.want[i] { + t.Errorf("ij2n(%v, %v) = %v, want %v", input.i, input.j, got, tt.want[i]) + } + } + } +} diff --git a/app/internal/dliter/types.go b/app/internal/dliter/types.go index 1b378c7e6d..d5b0b872ce 100644 --- a/app/internal/dliter/types.go +++ b/app/internal/dliter/types.go @@ -25,6 +25,7 @@ type Iter struct { mu sync.Mutex curi int curj int + preSum []int finished map[int]struct{} template *template.Template manager *peers.Manager diff --git a/app/internal/dliter/util.go b/app/internal/dliter/util.go index bd3be5ad98..b33a2c472d 100644 --- a/app/internal/dliter/util.go +++ b/app/internal/dliter/util.go @@ -58,3 +58,12 @@ func collectDialogs(dialogs [][]*Dialog) []*Dialog { } return res } + +// preSum of dialogs +func preSum(dialogs []*Dialog) []int { + sum := make([]int, len(dialogs)+1) + for i, m := range dialogs { + sum[i+1] = sum[i] + len(m.Messages) + } + return sum +} diff --git a/pkg/downloader/downloader.go b/pkg/downloader/downloader.go index 76111be706..007fa70f50 100644 --- a/pkg/downloader/downloader.go +++ b/pkg/downloader/downloader.go @@ -70,17 +70,18 @@ func (d *Downloader) Download(ctx context.Context, limit int) error { wg.SetLimit(limit) for i := 0; i < total; i++ { - wg.Go(func() (rerr error) { - item, err := d.iter.Next(errctx) - if err != nil { - logger.From(errctx).Debug("Iter next failed", - zap.String("error", err.Error())) - // skip error means we don't need to log error - if !errors.Is(err, ErrSkip) && !errors.Is(err, context.Canceled) { - d.pw.Log(color.RedString("failed: %v", err)) - } - return nil + item, err := d.iter.Next(errctx) + if err != nil { + logger.From(errctx).Debug("Iter next failed", + zap.Int("index", i), zap.String("error", err.Error())) + // skip error means we don't need to log error + if !errors.Is(err, ErrSkip) && !errors.Is(err, context.Canceled) { + d.pw.Log(color.RedString("failed: %v", err)) } + continue + } + + wg.Go(func() error { return d.download(errctx, item) }) }