Skip to content

Commit

Permalink
Merge pull request #577 from liquidata-inc/andy/stream_edits
Browse files Browse the repository at this point in the history
streaming map edits
  • Loading branch information
andy-wm-arthur authored Apr 14, 2020
2 parents 75fb59d + 30c315d commit f5974db
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 13 deletions.
4 changes: 2 additions & 2 deletions bats/compatibility/corona-virus-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ function export_tables() {
places
do
dolt table export "$table" "$table$1.csv"
dolt sql -r csv -q "select * from $table" > "$table$1.sql.csv"
dolt sql -r csv -q "select * from $table" | sed 's/<NULL>//g' > "$table$1.sql.csv"
done
}

Expand Down Expand Up @@ -99,7 +99,7 @@ local_bin="`pwd`"/"$bin"
PATH="$local_bin":"$PATH" dolt clone Liquidata/corona-virus
pushd "corona-virus"
PATH="$local_bin":"$PATH" export_tables "-pre"
dolt migrate
time dolt migrate
export_tables "-post"
diff_tables
echo "success!"
25 changes: 17 additions & 8 deletions go/libraries/doltcore/rebase/rebase_tag.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/liquidata-inc/dolt/go/libraries/doltcore/schema"
"github.com/liquidata-inc/dolt/go/libraries/doltcore/schema/encoding"
"github.com/liquidata-inc/dolt/go/libraries/doltcore/table/typed"
"github.com/liquidata-inc/dolt/go/libraries/doltcore/table/typed/noms"
"github.com/liquidata-inc/dolt/go/libraries/utils/set"
ndiff "github.com/liquidata-inc/dolt/go/store/diff"
"github.com/liquidata-inc/dolt/go/store/hash"
Expand Down Expand Up @@ -404,7 +405,7 @@ func replayCommitWithNewTag(ctx context.Context, root, parentRoot, rebasedParent
return nil, err
}

rebasedRows, err := replayRowDiffs(ctx, rebasedSch, rows, parentRows, rebasedParentRows, tableMapping)
rebasedRows, err := replayRowDiffs(ctx, rebasedParentRoot.VRW(), rebasedSch, rows, parentRows, rebasedParentRows, tableMapping)

if err != nil {
return nil, err
Expand Down Expand Up @@ -442,7 +443,7 @@ func replayCommitWithNewTag(ctx context.Context, root, parentRoot, rebasedParent
return newRoot, nil
}

func replayRowDiffs(ctx context.Context, rSch schema.Schema, rows, parentRows, rebasedParentRows types.Map, tagMapping map[uint64]uint64) (types.Map, error) {
func replayRowDiffs(ctx context.Context, vrw types.ValueReadWriter, rSch schema.Schema, rows, parentRows, rebasedParentRows types.Map, tagMapping map[uint64]uint64) (types.Map, error) {

unmappedTags := set.NewUint64Set(rSch.GetAllCols().Tags)
tm := make(map[uint64]uint64)
Expand All @@ -454,8 +455,7 @@ func replayRowDiffs(ctx context.Context, rSch schema.Schema, rows, parentRows, r
tm[t] = t
}

// we will apply modified differences to the rebasedParent
rebasedRowEditor := rebasedParentRows.Edit()
nmu := noms.NewNomsMapUpdater(ctx, vrw, rebasedParentRows, rSch, func(stats types.AppliedEditStats) {})

ad := diff.NewAsyncDiffer(diffBufSize)
// get all differences (including merges) between original commit and its parent
Expand Down Expand Up @@ -486,16 +486,25 @@ func replayRowDiffs(ctx context.Context, rSch schema.Schema, rows, parentRows, r

switch d.ChangeType {
case types.DiffChangeAdded:
rebasedRowEditor.Set(key, newVal)
err = nmu.WriteEdit(ctx, key, newVal)
case types.DiffChangeRemoved:
rebasedRowEditor.Remove(key)
err = nmu.WriteEdit(ctx, key, nil)
case types.DiffChangeModified:
rebasedRowEditor.Set(key, newVal)
err = nmu.WriteEdit(ctx, key, newVal)
}

if err != nil {
return types.EmptyMap, err
}
}
}

return rebasedRowEditor.Map(ctx)
err := nmu.Close(ctx)
if err != nil {
return types.EmptyMap, err
}

return *nmu.GetMap(), nil
}

func dropValsForDeletedColumns(ctx context.Context, nbf *types.NomsBinFormat, rows types.Map, sch, parentSch schema.Schema) (types.Map, error) {
Expand Down
8 changes: 5 additions & 3 deletions go/libraries/doltcore/table/typed/noms/noms_map_updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,11 @@ func (nmu *NomsMapUpdater) GetSchema() schema.Schema {

// WriteRow will write a row to a table
func (nmu *NomsMapUpdater) WriteRow(ctx context.Context, r row.Row) error {
return nmu.WriteEdit(ctx, r.NomsMapKey(nmu.sch), r.NomsMapValue(nmu.sch))
}

// WriteEdit will write an edit to a table's edit accumulator
func (nmu *NomsMapUpdater) WriteEdit(ctx context.Context, pk types.LesserValuable, fieldVals types.Valuable) error {
if nmu.acc == nil {
return errors.New("Attempting to write after closing.")
}
Expand All @@ -105,9 +110,6 @@ func (nmu *NomsMapUpdater) WriteRow(ctx context.Context, r row.Row) error {
}

err := func() error {
pk := r.NomsMapKey(nmu.sch)
fieldVals := r.NomsMapValue(nmu.sch)

nmu.acc.AddEdit(pk, fieldVals)
nmu.count++

Expand Down

0 comments on commit f5974db

Please sign in to comment.