Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for a stream projection API similar to EventStoreDB #129

Open
albe opened this issue Aug 19, 2020 · 5 comments
Open

Add support for a stream projection API similar to EventStoreDB #129

albe opened this issue Aug 19, 2020 · 5 comments
Labels
enhancement P: EventStore Affects the EventStore layer

Comments

@albe
Copy link
Owner

albe commented Aug 19, 2020

EventStoreDB provides a projection API that can project one or multiple streams into new streams by emitting new events, see https://eventstore.com/docs/projections/user-defined-projections/index.html#functions

The API could also provide an emit function that will effectively just do a commit to a given different stream (needs to be checked to avoid circular projecting).

Right now, something like this can be achieved with a stream definition like:

eventstore.createEventStream('my-stream-projection', e => {
   if (e ...) {
      eventstore.commit('my-projected-stream', [new MyProjectedEvent(...)]);
   }
});

With the drawback, that an empty stream index my-stream-projection will be created.

Optimally, the API would not depend on the eventstore global variable, which could be solved by passing an emit function as second argument:

eventstore.createEventStream('my-stream-projection', (e, emit) => {
   if (e ...) {
      emit('my-projected-stream', [new MyProjectedEvent(...)]);
   }
});

Even better would be, if the intermediary stream index wouldn't be necessary at all:

eventstore.createEventStream('my-projected-stream', (e, emit) => {
   if (e ...) {
      emit([new MyProjectedEvent(...)]);
   }
});

That would mean, that the projection is written into the stream index of the write stream it builds. The issue here is, that the projection invocation happens in the storage layer, but the emit needs to be at the EventStore layer.

@albe albe added enhancement P: EventStore Affects the EventStore layer labels Aug 19, 2020
@albe
Copy link
Owner Author

albe commented Aug 19, 2020

Main use case: Rewrite a stream into a "compacted" version to be able to eventually delete or archive the (high frequency) source stream(s) or keep those internal and publish only the redacted stream.

Current issue with this createEventStream: It provides no guarantees that every source event will be processed in case the process fails before the emit. A stream index that does not index events does not store the last processed global sequence number, so it would need to reprocess all events on every startup.
One workaround would be to return true; for every emit, so that every projected event is indexed and hence the last projected event number can be retrieved. This only works with the intermediary stream index though, because else the resulting stream would also contain source events. More importantly though, this would not be guaranteed to be consistent, because the process could still fail between the emit and the return.
Another solution would be to include the projected events commitId in the emitted event, then query that on startup and reprocess all following events. This would still be costly for sparse projections.

@albe
Copy link
Owner Author

albe commented Dec 22, 2020

The EventStoreDB API contains a state for projections - this is useful to do logic depending on previously seen events. Also, persisting that state makes the projection able to resume. Hence, a more fitting equivalent would be:

const myProjection = eventstore.getConsumer('_all' /* or whatever more concrete stream you want to project from */, 'my-projection-id');
myProjection.on('data', event => {
    if (myProjection.state... && event...) {
        try {
             eventstorage.commit('my-projection-stream', [new MyProjectedEvent(...)], myProjection.position);
             myProjection.setState({...myProjection.state, ...});
        } catch (e) {
             // Handle duplicate processing
             if (e instanceof OptimisticConcurrencyError) return true;
             throw e;
        }
    }
});

The eventstorage.commit + the exception handling could be moved into the above mentioned emit() function that could get passed in as argument (and would reuse the consumer name as [default] output stream). The latter would mean the signature of the 'data' event on consumers would need to change, since it only allows a single argument.
Hence it would become myProjection.on('data', ({ event, emit }) => {... or a separate new event/method would need to be introduced. Also the Consumer would need to depend on a WritableStorage, i.e. a "Projection" is a "writable Consumer".

@albe
Copy link
Owner Author

albe commented Jan 30, 2021

Note: Setting expectedVersion to myProjection.position like above would only work if every source event leads to exactly one projected event! That is not necessarily the case.

The API for a projection would need to be thought through. The projection method should not be an instance method, as that would mean a specifc projection needs to inherit from Projection in contrast to a Consumer. Using a new stream event (e.g. on('event', ...)) would mean there are two different handlers for each consumed object, which could be confusing.

const myProjection = eventstore.getProjection('_all', 'my-projection-stream');
myProjection.on('event', (event, emit) => {
    emit(new SomeProjectedEvent(myProjection.state, event));
    myProjection.setState(state => { ...state, someData: state.someData + event.someProp });
});

A failing setState should optimally prevent the emit to happen. That is hard to achieve, because a commit is not cancelable. That means that on next execution the projection will reattempt the projection of the event and hence emit the projected event again, which will fail with an optimistic concurrency check that is caught and ignored. So far so good, but still until the setState eventually succeeds, the projected stream is inconsistent to the (persisted) projection state.
Also, a reversed order of emit and setState would mean that the projected stream could be missing an event, unless a failing emit cancels the persistence of the state.

@albe
Copy link
Owner Author

albe commented Feb 9, 2021

On top of this, the addition of a linkTo() function argument would be easy to add, which would allow to create streams that index arbitrary events of other streams. With this, it would be possible to create e.g. an "event-type" or "correlation-id" stream like that:

const byEventType = eventstore.getProjection('_all', 'by-event-type');
byEventType.on('event', (event, emit, linkTo) => {
    linkTo('type-' + event.type, event);
});

However, the same could be achieved with another concept: specify streams through a function that returns a stream name given the event, e.g.:

eventstore.createDynamicStream((event) => 'type-' + event.type);

See #140
The function given is executed for every event committed, then the resulting stream checked for existence or created (and the event added to). The benefit is less processing involved, a more straightforward API and no persistent state necessary (secondary indexes still fail to "keep up" with the storage if the process crashes in between - so maybe the consumer/projection checkpoints are useful).

@albe
Copy link
Owner Author

albe commented Feb 11, 2021

Regarding naming: A projection that emits events is called a ProcessManager in a higher level. Also, a generic "projection" is currently called a "Consumer", so calling an emitting Consumer "Projection" is probably misleading. Maybe this functionality can be added to consumers?

Regarding the issue of setState()/emit() order and correct handling, this could be solved by clearly separating the two handlers. There could be the 'data' event for updating state like for normal consumers, and an additional event type (change?) that only allows to emit based on current state (effectively a state transition handler).

const myProjection = eventstore.getProjection('_all', 'my-projection-stream');
myProjection.on('data', (event) => {
    myProjection.setState(state => { ...state, someData: state.someData + event.someProp });
});
myProjection.on('change', (emit[, linkTo]) => {
    emit(new SomeProjectedEvent(myProjection.state));
});

The 'change' event could also do other work, but it may not call setState(), which could be prevented.
The issue with this is, that only a failing emit could lead to a rollback of the state, since the try/catch can only be wrapped inside there and not generally for the async 'change' listener. An alternative would be a synchronous call to a method/function rather than through the event emitter.

**
Another approach for providing fully consistent emit behaviour would be to store both the projection state + position into it's own emit stream.

emit(event) {
    eventstore.commit(projectionStream, event, { state: this.state, position: this.position });
}

restoreState(initialState, startFrom) {
    const lastEvent = eventstore.getEventStream(projectionStream, -1, -1).next();
    this.position = lastEvent !== false ? lastEvent.metadata.position : startFrom;
    this.state = lastEvent !== false ? lastEvent.metadata.state : initialState;
}

This is effectively an "event-sourced projection manager (with snapshots)". Alternatively, the state is not stored in the stream and on restore the state is rebuilt by running through all events until the last position again but with disabled "emit()"/'change' events.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement P: EventStore Affects the EventStore layer
Projects
None yet
Development

No branches or pull requests

1 participant