From a40ecccf4110e08e3798d7ee6df3acb733419005 Mon Sep 17 00:00:00 2001 From: Andy Arthur Date: Tue, 14 Apr 2020 14:13:20 -0700 Subject: [PATCH 1/3] streaming map edits --- bats/compatibility/corona-virus-test.sh | 4 +-- go/libraries/doltcore/rebase/rebase_tag.go | 25 ++++++++----- .../table/typed/noms/noms_map_updater.go | 36 +++++++++++++++++++ 3 files changed, 55 insertions(+), 10 deletions(-) diff --git a/bats/compatibility/corona-virus-test.sh b/bats/compatibility/corona-virus-test.sh index 68b7c60ec84..b42aa07d34b 100755 --- a/bats/compatibility/corona-virus-test.sh +++ b/bats/compatibility/corona-virus-test.sh @@ -60,7 +60,7 @@ function export_tables() { places do dolt table export "$table" "$table$1.csv" - dolt sql -r csv -q "select * from $table" > "$table$1.sql.csv" + dolt sql -r csv -q "select * from $table" | sed 's///g' > "$table$1.sql.csv" done } @@ -99,7 +99,7 @@ local_bin="`pwd`"/"$bin" PATH="$local_bin":"$PATH" dolt clone Liquidata/corona-virus pushd "corona-virus" PATH="$local_bin":"$PATH" export_tables "-pre" -dolt migrate +time dolt migrate export_tables "-post" diff_tables echo "success!" diff --git a/go/libraries/doltcore/rebase/rebase_tag.go b/go/libraries/doltcore/rebase/rebase_tag.go index 9fb20a28c50..371c16490a7 100644 --- a/go/libraries/doltcore/rebase/rebase_tag.go +++ b/go/libraries/doltcore/rebase/rebase_tag.go @@ -17,6 +17,7 @@ package rebase import ( "context" "fmt" + "github.com/liquidata-inc/dolt/go/libraries/doltcore/table/typed/noms" "time" "github.com/liquidata-inc/dolt/go/libraries/doltcore/diff" @@ -404,7 +405,7 @@ func replayCommitWithNewTag(ctx context.Context, root, parentRoot, rebasedParent return nil, err } - rebasedRows, err := replayRowDiffs(ctx, rebasedSch, rows, parentRows, rebasedParentRows, tableMapping) + rebasedRows, err := replayRowDiffs(ctx, rebasedParentRoot.VRW(), rebasedSch, rows, parentRows, rebasedParentRows, tableMapping) if err != nil { return nil, err @@ -442,7 +443,7 @@ func replayCommitWithNewTag(ctx context.Context, root, parentRoot, rebasedParent return newRoot, nil } -func replayRowDiffs(ctx context.Context, rSch schema.Schema, rows, parentRows, rebasedParentRows types.Map, tagMapping map[uint64]uint64) (types.Map, error) { +func replayRowDiffs(ctx context.Context, vrw types.ValueReadWriter, rSch schema.Schema, rows, parentRows, rebasedParentRows types.Map, tagMapping map[uint64]uint64) (types.Map, error) { unmappedTags := set.NewUint64Set(rSch.GetAllCols().Tags) tm := make(map[uint64]uint64) @@ -454,8 +455,7 @@ func replayRowDiffs(ctx context.Context, rSch schema.Schema, rows, parentRows, r tm[t] = t } - // we will apply modified differences to the rebasedParent - rebasedRowEditor := rebasedParentRows.Edit() + nmu := noms.NewNomsMapUpdater(ctx, vrw, rebasedParentRows, rSch, func(stats types.AppliedEditStats) {}) ad := diff.NewAsyncDiffer(diffBufSize) // get all differences (including merges) between original commit and its parent @@ -486,16 +486,25 @@ func replayRowDiffs(ctx context.Context, rSch schema.Schema, rows, parentRows, r switch d.ChangeType { case types.DiffChangeAdded: - rebasedRowEditor.Set(key, newVal) + err = nmu.WriteEdit(ctx, key, newVal) case types.DiffChangeRemoved: - rebasedRowEditor.Remove(key) + err = nmu.WriteEdit(ctx, key, nil) case types.DiffChangeModified: - rebasedRowEditor.Set(key, newVal) + err = nmu.WriteEdit(ctx, key, newVal) + } + + if err != nil { + return types.EmptyMap, err } } } - return rebasedRowEditor.Map(ctx) + err := nmu.Close(ctx) + if err != nil { + return types.EmptyMap, err + } + + return *nmu.GetMap(), nil } func dropValsForDeletedColumns(ctx context.Context, nbf *types.NomsBinFormat, rows types.Map, sch, parentSch schema.Schema) (types.Map, error) { diff --git a/go/libraries/doltcore/table/typed/noms/noms_map_updater.go b/go/libraries/doltcore/table/typed/noms/noms_map_updater.go index ab5cd9a9339..77cbc8ced69 100644 --- a/go/libraries/doltcore/table/typed/noms/noms_map_updater.go +++ b/go/libraries/doltcore/table/typed/noms/noms_map_updater.go @@ -132,6 +132,42 @@ func (nmu *NomsMapUpdater) WriteRow(ctx context.Context, r row.Row) error { return nil } + +// WriteRow will write a row to a table +func (nmu *NomsMapUpdater) WriteEdit(ctx context.Context, pk types.LesserValuable, fieldVals types.Valuable) error { + if nmu.acc == nil { + return errors.New("Attempting to write after closing.") + } + + if err := nmu.ae.Get(); err != nil { + return err + } + + err := func() error { + nmu.acc.AddEdit(pk, fieldVals) + nmu.count++ + + if nmu.count%maxEdits == 0 { + edits, err := nmu.acc.FinishedEditing() + + if err != nil { + return err + } + + nmu.mapChan <- edits + nmu.acc = types.CreateEditAccForMapEdits(nmu.vrw.Format()) + } + + return nil + }() + + if err != nil { + return err + } + + return nil +} + // Close should flush all writes, release resources being held func (nmu *NomsMapUpdater) Close(ctx context.Context) error { if nmu.result != nil { From 03c353335f7b585ad207cfbeddcbe3d8a976131e Mon Sep 17 00:00:00 2001 From: Andy Arthur Date: Tue, 14 Apr 2020 14:28:00 -0700 Subject: [PATCH 2/3] repo fmt --- go/libraries/doltcore/rebase/rebase_tag.go | 2 +- go/libraries/doltcore/table/typed/noms/noms_map_updater.go | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/go/libraries/doltcore/rebase/rebase_tag.go b/go/libraries/doltcore/rebase/rebase_tag.go index 371c16490a7..42091c6ab1f 100644 --- a/go/libraries/doltcore/rebase/rebase_tag.go +++ b/go/libraries/doltcore/rebase/rebase_tag.go @@ -17,7 +17,6 @@ package rebase import ( "context" "fmt" - "github.com/liquidata-inc/dolt/go/libraries/doltcore/table/typed/noms" "time" "github.com/liquidata-inc/dolt/go/libraries/doltcore/diff" @@ -28,6 +27,7 @@ import ( "github.com/liquidata-inc/dolt/go/libraries/doltcore/schema" "github.com/liquidata-inc/dolt/go/libraries/doltcore/schema/encoding" "github.com/liquidata-inc/dolt/go/libraries/doltcore/table/typed" + "github.com/liquidata-inc/dolt/go/libraries/doltcore/table/typed/noms" "github.com/liquidata-inc/dolt/go/libraries/utils/set" ndiff "github.com/liquidata-inc/dolt/go/store/diff" "github.com/liquidata-inc/dolt/go/store/hash" diff --git a/go/libraries/doltcore/table/typed/noms/noms_map_updater.go b/go/libraries/doltcore/table/typed/noms/noms_map_updater.go index 77cbc8ced69..d0b0e031798 100644 --- a/go/libraries/doltcore/table/typed/noms/noms_map_updater.go +++ b/go/libraries/doltcore/table/typed/noms/noms_map_updater.go @@ -132,7 +132,6 @@ func (nmu *NomsMapUpdater) WriteRow(ctx context.Context, r row.Row) error { return nil } - // WriteRow will write a row to a table func (nmu *NomsMapUpdater) WriteEdit(ctx context.Context, pk types.LesserValuable, fieldVals types.Valuable) error { if nmu.acc == nil { From 30c315d6a058998e2a909a7e708633080f861a09 Mon Sep 17 00:00:00 2001 From: Andy Arthur Date: Tue, 14 Apr 2020 15:24:44 -0700 Subject: [PATCH 3/3] code review feedback --- .../table/typed/noms/noms_map_updater.go | 37 +------------------ 1 file changed, 2 insertions(+), 35 deletions(-) diff --git a/go/libraries/doltcore/table/typed/noms/noms_map_updater.go b/go/libraries/doltcore/table/typed/noms/noms_map_updater.go index d0b0e031798..678ca0ca286 100644 --- a/go/libraries/doltcore/table/typed/noms/noms_map_updater.go +++ b/go/libraries/doltcore/table/typed/noms/noms_map_updater.go @@ -96,43 +96,10 @@ func (nmu *NomsMapUpdater) GetSchema() schema.Schema { // WriteRow will write a row to a table func (nmu *NomsMapUpdater) WriteRow(ctx context.Context, r row.Row) error { - if nmu.acc == nil { - return errors.New("Attempting to write after closing.") - } - - if err := nmu.ae.Get(); err != nil { - return err - } - - err := func() error { - pk := r.NomsMapKey(nmu.sch) - fieldVals := r.NomsMapValue(nmu.sch) - - nmu.acc.AddEdit(pk, fieldVals) - nmu.count++ - - if nmu.count%maxEdits == 0 { - edits, err := nmu.acc.FinishedEditing() - - if err != nil { - return err - } - - nmu.mapChan <- edits - nmu.acc = types.CreateEditAccForMapEdits(nmu.vrw.Format()) - } - - return nil - }() - - if err != nil { - return err - } - - return nil + return nmu.WriteEdit(ctx, r.NomsMapKey(nmu.sch), r.NomsMapValue(nmu.sch)) } -// WriteRow will write a row to a table +// WriteEdit will write an edit to a table's edit accumulator func (nmu *NomsMapUpdater) WriteEdit(ctx context.Context, pk types.LesserValuable, fieldVals types.Valuable) error { if nmu.acc == nil { return errors.New("Attempting to write after closing.")