Skip to content

Commit

Permalink
Merge pull request #1951 from authzed/enriched-postgres-revisions
Browse files Browse the repository at this point in the history
enriches postgres revisions with txID and timestamp
  • Loading branch information
josephschorr authored Jun 25, 2024
2 parents 63980e1 + c76668b commit 37bf916
Show file tree
Hide file tree
Showing 11 changed files with 203 additions and 63 deletions.
2 changes: 1 addition & 1 deletion internal/datastore/postgres/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (pgd *pgDatastore) TxIDBefore(ctx context.Context, before time.Time) (datas
return datastore.NoRevision, err
}

return postgresRevision{snapshot}, nil
return postgresRevision{snapshot: snapshot, optionalTxID: value}, nil
}

func (pgd *pgDatastore) DeleteBeforeTx(ctx context.Context, txID datastore.Revision) (common.DeletionCounts, error) {
Expand Down
2 changes: 1 addition & 1 deletion internal/datastore/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ func (pgd *pgDatastore) ReadWriteTx(
log.Debug().Uint8("retries", i).Msg("transaction succeeded after retry")
}

return postgresRevision{newSnapshot.markComplete(newXID.Uint64)}, nil
return postgresRevision{snapshot: newSnapshot.markComplete(newXID.Uint64), optionalTxID: newXID}, nil
}

if !config.DisableRetries {
Expand Down
4 changes: 2 additions & 2 deletions internal/datastore/postgres/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func (r *pgReader) lookupCounters(ctx context.Context, optionalName string) ([]d

revision := datastore.NoRevision
if snapshot != nil {
revision = postgresRevision{*snapshot}
revision = postgresRevision{snapshot: *snapshot}
}

counters = append(counters, datastore.RelationshipCounter{
Expand Down Expand Up @@ -303,7 +303,7 @@ func loadAllNamespaces(

// revisionForVersion synthesizes a snapshot where the specified version is always visible.
func revisionForVersion(version xid8) postgresRevision {
return postgresRevision{pgSnapshot{
return postgresRevision{snapshot: pgSnapshot{
xmin: version.Uint64 + 1,
xmax: version.Uint64 + 1,
}}
Expand Down
40 changes: 32 additions & 8 deletions internal/datastore/postgres/revisions.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (pgd *pgDatastore) optimizedRevisionFunc(ctx context.Context) (datastore.Re

snapshot = snapshot.markComplete(revision.Uint64)

return postgresRevision{snapshot}, validForNanos, nil
return postgresRevision{snapshot: snapshot, optionalTxID: revision}, validForNanos, nil
}

func (pgd *pgDatastore) HeadRevision(ctx context.Context) (datastore.Revision, error) {
Expand All @@ -98,7 +98,7 @@ func (pgd *pgDatastore) HeadRevision(ctx context.Context) (datastore.Revision, e
return datastore.NoRevision, fmt.Errorf(errRevision, err)
}

return postgresRevision{snapshot}, nil
return postgresRevision{snapshot: snapshot}, nil
}

func (pgd *pgDatastore) CheckRevision(ctx context.Context, revisionRaw datastore.Revision) error {
Expand All @@ -114,7 +114,7 @@ func (pgd *pgDatastore) CheckRevision(ctx context.Context, revisionRaw datastore
return fmt.Errorf(errCheckRevision, err)
}

if revisionRaw.GreaterThan(postgresRevision{currentSnapshot}) {
if revisionRaw.GreaterThan(postgresRevision{snapshot: currentSnapshot}) {
return datastore.NewInvalidRevisionErr(revision, datastore.CouldNotDetermineRevision)
}
if minSnapshot.markComplete(minXid.Uint64).GreaterThan(revision.snapshot) {
Expand Down Expand Up @@ -165,11 +165,13 @@ func parseRevisionProto(revisionStr string) (datastore.Revision, error) {
}

return postgresRevision{
pgSnapshot{
snapshot: pgSnapshot{
xmin: decoded.Xmin,
xmax: uint64(xminInt + decoded.RelativeXmax),
xipList: xips,
},
optionalTxID: xid8{Uint64: decoded.OptionalTxid, Valid: decoded.OptionalTxid != 0},
optionalTimestamp: decoded.OptionalTimestamp,
}, nil
}

Expand Down Expand Up @@ -227,7 +229,7 @@ func parseRevisionDecimal(revisionStr string) (datastore.Revision, error) {
}
}

return postgresRevision{pgSnapshot{
return postgresRevision{snapshot: pgSnapshot{
xmin: xmin,
xmax: xmax,
xipList: xipList,
Expand All @@ -246,7 +248,9 @@ func createNewTransaction(ctx context.Context, tx pgx.Tx) (newXID xid8, newSnaps
}

type postgresRevision struct {
snapshot pgSnapshot
snapshot pgSnapshot
optionalTxID xid8
optionalTimestamp uint64
}

func (pr postgresRevision) Equal(rhsRaw datastore.Revision) bool {
Expand Down Expand Up @@ -284,6 +288,26 @@ func (pr postgresRevision) mustMarshalBinary() []byte {
return serialized
}

// OptionalTransactionID returns the transaction ID at which this revision happened. This value is optionally
// loaded from the database and may not be present.
func (pr postgresRevision) OptionalTransactionID() (xid8, bool) {
if !pr.optionalTxID.Valid {
return xid8{}, false
}

return pr.optionalTxID, true
}

// OptionalTimestamp returns a unix epoch timestamp representing the time at which the transaction committed as
// defined by the Postgres primary. This is not guaranteed to be monotonically increasing.
func (pr postgresRevision) OptionalTimestamp() (uint64, bool) {
if pr.optionalTimestamp == 0 {
return 0, false
}

return pr.optionalTimestamp, true
}

// MarshalBinary creates a version of the snapshot that uses relative encoding
// for xmax and xip list values to save bytes when encoded as varint protos.
// For example, snapshot 1001:1004:1001,1003 becomes 1000:3:0,2.
Expand All @@ -305,6 +329,6 @@ func (pr postgresRevision) MarshalBinary() ([]byte, error) {

var _ datastore.Revision = postgresRevision{}

func revisionKeyFunc(rev revisionWithXid) uint64 {
return rev.tx.Uint64
func revisionKeyFunc(rev postgresRevision) uint64 {
return rev.optionalTxID.Uint64
}
31 changes: 26 additions & 5 deletions internal/datastore/postgres/revisions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"strconv"
"testing"
"time"

"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -32,8 +33,8 @@ func TestRevisionOrdering(t *testing.T) {
t.Run(fmt.Sprintf("%s:%s", tc.lhsSnapshot, tc.rhsSnapshot), func(t *testing.T) {
require := require.New(t)

lhs := postgresRevision{tc.lhsSnapshot}
rhs := postgresRevision{tc.rhsSnapshot}
lhs := postgresRevision{snapshot: tc.lhsSnapshot}
rhs := postgresRevision{snapshot: tc.rhsSnapshot}

require.Equal(tc.relationship == equal, lhs.Equal(rhs))
require.Equal(tc.relationship == equal, rhs.Equal(lhs))
Expand Down Expand Up @@ -72,7 +73,7 @@ func TestRevisionSerDe(t *testing.T) {
t.Run(tc.snapshot.String(), func(t *testing.T) {
require := require.New(t)

rev := postgresRevision{tc.snapshot}
rev := postgresRevision{snapshot: tc.snapshot}
serialized := rev.String()
require.Equal(tc.expectedStr, serialized)

Expand All @@ -83,6 +84,26 @@ func TestRevisionSerDe(t *testing.T) {
}
}

func TestTxIDTimestampAvailable(t *testing.T) {
testTimestamp := uint64(time.Now().Unix())
snapshot := snap(0, 5, 1)
pgr := postgresRevision{snapshot: snapshot, optionalTxID: newXid8(1), optionalTimestamp: testTimestamp}
receivedTimestamp, ok := pgr.OptionalTimestamp()
require.True(t, ok)
require.Equal(t, receivedTimestamp, testTimestamp)
txid, ok := pgr.OptionalTransactionID()
require.True(t, ok)
require.Equal(t, newXid8(1), txid)

anotherRev := postgresRevision{snapshot: snapshot}
_, ok = anotherRev.OptionalTimestamp()
require.False(t, ok)
_, ok = anotherRev.OptionalTransactionID()
require.False(t, ok)

pgr.Equal(anotherRev)
}

func TestRevisionParseOldDecimalFormat(t *testing.T) {
testCases := []struct {
snapshot pgSnapshot
Expand Down Expand Up @@ -114,7 +135,7 @@ func TestRevisionParseOldDecimalFormat(t *testing.T) {
require.Error(err)
} else {
require.NoError(err)
require.Equal(postgresRevision{tc.snapshot}, parsed)
require.Equal(postgresRevision{snapshot: tc.snapshot}, parsed)
}
})
}
Expand Down Expand Up @@ -156,7 +177,7 @@ func TestCombinedRevisionParsing(t *testing.T) {
require.Error(err)
} else {
require.NoError(err)
require.Equal(postgresRevision{tc.snapshot}, parsed)
require.Equal(postgresRevision{snapshot: tc.snapshot}, parsed)
}
})
}
Expand Down
53 changes: 25 additions & 28 deletions internal/datastore/postgres/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,15 @@ const (
minimumWatchSleep = 100 * time.Millisecond
)

type revisionWithXid struct {
postgresRevision
tx xid8
}

var (
// This query must cast an xid8 to xid, which is a safe operation as long as the
// xid8 is one of the last ~2 billion transaction IDs generated. We should be garbage
// collecting these transactions long before we get to that point.
newRevisionsQuery = fmt.Sprintf(`
SELECT %[1]s, %[2]s FROM %[3]s
SELECT %[1]s, %[2]s, %[3]s FROM %[4]s
WHERE %[1]s >= pg_snapshot_xmax($1) OR (
%[1]s >= pg_snapshot_xmin($1) AND NOT pg_visible_in_snapshot(%[1]s, $1)
) ORDER BY pg_xact_commit_timestamp(%[1]s::xid), %[1]s;`, colXID, colSnapshot, tableTransaction)
) ORDER BY pg_xact_commit_timestamp(%[1]s::xid), %[1]s;`, colXID, colSnapshot, colTimestamp, tableTransaction)

queryChangedTuples = psql.Select(
colNamespace,
Expand Down Expand Up @@ -155,9 +150,9 @@ func (pgd *pgDatastore) Watch(
// the *last* transaction to start, as it should encompass all completed transactions
// except those running concurrently, which is handled by calling markComplete on the other
// transactions.
currentTxn = newTxns[len(newTxns)-1].postgresRevision
currentTxn = newTxns[len(newTxns)-1]
for _, newTx := range newTxns {
currentTxn = postgresRevision{currentTxn.snapshot.markComplete(newTx.tx.Uint64)}
currentTxn = postgresRevision{snapshot: currentTxn.snapshot.markComplete(newTx.optionalTxID.Uint64)}
}

// If checkpoints were requested, output a checkpoint. While the Postgres datastore does not
Expand Down Expand Up @@ -188,8 +183,8 @@ func (pgd *pgDatastore) Watch(
return updates, errs
}

func (pgd *pgDatastore) getNewRevisions(ctx context.Context, afterTX postgresRevision) ([]revisionWithXid, error) {
var ids []revisionWithXid
func (pgd *pgDatastore) getNewRevisions(ctx context.Context, afterTX postgresRevision) ([]postgresRevision, error) {
var ids []postgresRevision
if err := pgx.BeginTxFunc(ctx, pgd.readPool, pgx.TxOptions{IsoLevel: pgx.RepeatableRead}, func(tx pgx.Tx) error {
rows, err := tx.Query(ctx, newRevisionsQuery, afterTX.snapshot)
if err != nil {
Expand All @@ -200,13 +195,15 @@ func (pgd *pgDatastore) getNewRevisions(ctx context.Context, afterTX postgresRev
for rows.Next() {
var nextXID xid8
var nextSnapshot pgSnapshot
if err := rows.Scan(&nextXID, &nextSnapshot); err != nil {
var timestamp time.Time
if err := rows.Scan(&nextXID, &nextSnapshot, &timestamp); err != nil {
return fmt.Errorf("unable to decode new revision: %w", err)
}

ids = append(ids, revisionWithXid{
postgresRevision{nextSnapshot.markComplete(nextXID.Uint64)},
nextXID,
ids = append(ids, postgresRevision{
snapshot: nextSnapshot.markComplete(nextXID.Uint64),
optionalTxID: nextXID,
optionalTimestamp: uint64(timestamp.Unix()),
})
}
if rows.Err() != nil {
Expand All @@ -220,21 +217,21 @@ func (pgd *pgDatastore) getNewRevisions(ctx context.Context, afterTX postgresRev
return ids, nil
}

func (pgd *pgDatastore) loadChanges(ctx context.Context, revisions []revisionWithXid, options datastore.WatchOptions) ([]datastore.RevisionChanges, error) {
xmin := revisions[0].tx.Uint64
xmax := revisions[0].tx.Uint64
func (pgd *pgDatastore) loadChanges(ctx context.Context, revisions []postgresRevision, options datastore.WatchOptions) ([]datastore.RevisionChanges, error) {
xmin := revisions[0].optionalTxID.Uint64
xmax := revisions[0].optionalTxID.Uint64
filter := make(map[uint64]int, len(revisions))
txidToRevision := make(map[uint64]revisionWithXid, len(revisions))
txidToRevision := make(map[uint64]postgresRevision, len(revisions))

for i, rev := range revisions {
if rev.tx.Uint64 < xmin {
xmin = rev.tx.Uint64
if rev.optionalTxID.Uint64 < xmin {
xmin = rev.optionalTxID.Uint64
}
if rev.tx.Uint64 > xmax {
xmax = rev.tx.Uint64
if rev.optionalTxID.Uint64 > xmax {
xmax = rev.optionalTxID.Uint64
}
filter[rev.tx.Uint64] = i
txidToRevision[rev.tx.Uint64] = rev
filter[rev.optionalTxID.Uint64] = i
txidToRevision[rev.optionalTxID.Uint64] = rev
}

tracked := common.NewChanges(revisionKeyFunc, options.Content)
Expand Down Expand Up @@ -270,7 +267,7 @@ func (pgd *pgDatastore) loadChanges(ctx context.Context, revisions []revisionWit
return reconciledChanges, nil
}

func (pgd *pgDatastore) loadRelationshipChanges(ctx context.Context, xmin uint64, xmax uint64, txidToRevision map[uint64]revisionWithXid, filter map[uint64]int, tracked *common.Changes[revisionWithXid, uint64]) error {
func (pgd *pgDatastore) loadRelationshipChanges(ctx context.Context, xmin uint64, xmax uint64, txidToRevision map[uint64]postgresRevision, filter map[uint64]int, tracked *common.Changes[postgresRevision, uint64]) error {
sql, args, err := queryChangedTuples.Where(sq.Or{
sq.And{
sq.LtOrEq{colCreatedXid: xmax},
Expand Down Expand Up @@ -344,7 +341,7 @@ func (pgd *pgDatastore) loadRelationshipChanges(ctx context.Context, xmin uint64
return nil
}

func (pgd *pgDatastore) loadNamespaceChanges(ctx context.Context, xmin uint64, xmax uint64, txidToRevision map[uint64]revisionWithXid, filter map[uint64]int, tracked *common.Changes[revisionWithXid, uint64]) error {
func (pgd *pgDatastore) loadNamespaceChanges(ctx context.Context, xmin uint64, xmax uint64, txidToRevision map[uint64]postgresRevision, filter map[uint64]int, tracked *common.Changes[postgresRevision, uint64]) error {
sql, args, err := queryChangedNamespaces.Where(sq.Or{
sq.And{
sq.LtOrEq{colCreatedXid: xmax},
Expand Down Expand Up @@ -395,7 +392,7 @@ func (pgd *pgDatastore) loadNamespaceChanges(ctx context.Context, xmin uint64, x
return nil
}

func (pgd *pgDatastore) loadCaveatChanges(ctx context.Context, min uint64, max uint64, txidToRevision map[uint64]revisionWithXid, filter map[uint64]int, tracked *common.Changes[revisionWithXid, uint64]) error {
func (pgd *pgDatastore) loadCaveatChanges(ctx context.Context, min uint64, max uint64, txidToRevision map[uint64]postgresRevision, filter map[uint64]int, tracked *common.Changes[postgresRevision, uint64]) error {
sql, args, err := queryChangedCaveats.Where(sq.Or{
sq.And{
sq.LtOrEq{colCreatedXid: max},
Expand Down
4 changes: 3 additions & 1 deletion pkg/datastore/test/counters.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,9 @@ func UpdateRelationshipCounterTest(t *testing.T, tester DatastoreTester) {
require.Len(t, filters, 1)
require.Equal(t, "somedocfilter", filters[0].Name)
require.Equal(t, 1234, filters[0].Count)
require.Equal(t, updatedRev, filters[0].ComputedAtRevision)
// we don't use require.Equal, as the internal representation may differ via the optional fields
// the supported way to compare revisions is via their comparison API methods
require.True(t, updatedRev.Equal(filters[0].ComputedAtRevision))

// Register a new filter.
newFilterRev, err := ds.ReadWriteTx(context.Background(), func(ctx context.Context, tx datastore.ReadWriteTransaction) error {
Expand Down
Loading

0 comments on commit 37bf916

Please sign in to comment.