Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RabbitMq client still a Promise on App start up #322

Open
paulvitic opened this issue Apr 22, 2021 · 6 comments
Open

RabbitMq client still a Promise on App start up #322

paulvitic opened this issue Apr 22, 2021 · 6 comments
Assignees
Labels
bug Something isn't working needs reproducible example Needs reproducible repository scope: messaging Relates to @marblejs/messaging package

Comments

@paulvitic
Copy link

paulvitic commented Apr 22, 2021

Describe the bug
Following the Marble docs for creating an amqp publisher I receive the following error on app start-up
(node:90349) UnhandledPromiseRejectionWarning: TypeError: rabbitMqClient.send is not a function
because the resolved rabbitMQ client is still a Promise even though it was eagerly bounded to the context

To Reproduce
Amqp publisher

export const AmqpClientToken = createContextToken<MessagingClient>('MessagingClient');
export const amqpClient = messagingClient({
    transport: Transport.AMQP,
    options: {
        host: 'amqp://localhost:5672',
        queue: 'hello_queue'
    },
});

Http effect

export const getRoot$ = r.pipe(
    r.matchPath('/'),
    r.matchType('GET'),
    r.useEffect((req$, ctx) => {
        const rabbitMqClient = useContext(AmqpClientToken)(ctx.ask);

        return req$.pipe(
            mergeMapTo(rabbitMqClient.send({ type: 'HELLO', payload: 'John' })),
            mapTo({ status: HttpStatus.ACCEPTED }),
        );
    }),
);

App

const httpServerListener = httpListener({
    middlewares: [
        logger$({ silent: isTestEnv() }),
        bodyParser$(),
    ],
    effects: [
        getRoot$
    ],
});

export const server = createServer({
    port: getPortEnv(),
    listener: httpServerListener,
    dependencies: [
        bindEagerlyTo(AmqpClientToken)(amqpClient),
    ],
});

export const main: IO.IO<void> = async () =>
    await (await server)();

Expected behavior
App starts up.

Desktop (please complete the following information):

  • OS: Ubuntu
  • Package + Version: Marble ^3.4.9
  • Node version: 14.15.4

Additional context

@JozefFlakus
Copy link
Member

JozefFlakus commented Apr 27, 2021

@paulvitic thanks for reporting an issue.

  1. Is rabbit server established?
  2. Do you have or can create a repository with reproducible problem?

@JozefFlakus JozefFlakus self-assigned this Apr 27, 2021
@JozefFlakus JozefFlakus added bug Something isn't working scope: messaging Relates to @marblejs/messaging package labels Apr 27, 2021
@paulvitic
Copy link
Author

Hello @JozefFlakus thnaks for the response.
1- Yes rabbit server is established.
2- I am afraid I did not commit my code to a public repository yet.

I am using the amqp client dependency inside an event store implementation that conforms to:

export interface EventStore {
    append(event: DomainEvent): TaskEither<Error, void>;
    stream(id: string): Observable<DomainEvent>;
}

DomainEvent is just an extension of Marbles's Event struct

Here is how I am dealing with it now

export const EventStoreFake: Reader<Context, EventStore> = createReader(ctx => {
    const rabbitMqClient = useContext(AmqpClientToken)(ctx);
    const store: Record<string, DomainEvent[]> = {};
    const log = (event: DomainEvent) => (streamId: string) => pipe(
        store,
        lookup(streamId),
        fold(() => { store[streamId] = [event] }, events => { events.push(event) })
    )
    return {
        append(event: DomainEvent): TaskEither<Error, void> {
            let streamId = pipe(
                fromNullable(event.payload),
                mapOption(payload => payload.streamId)
            )
            return pipe(
                fromOption(() => toError('stream id not found'))(streamId),
                chainFirst(_streamId => tryCatch(async () => (await rabbitMqClient).emit(event), toError)),  👈🏼 here I await the client
                map(log(event))
            )
        },
        stream(id: string): Observable<DomainEvent> {
            return pipe(
                store,
                lookup(id),
                fold(() => from([]), events => from(events))
            )
        }
    }
});

@JozefFlakus
Copy link
Member

Are you able to create a minimal, reproducible repository that shows the problem?
It is really hard to tell what is wrong based on the snipped that you posted here - it looks correct.

Are you sure that the queue options are correct? Eg. lets say that hello_queue already exists.

@paulvitic
Copy link
Author

I will try, by the way, to be clear, the above implementation, when I await the rabbitMqClient works.

@JozefFlakus JozefFlakus added the needs reproducible example Needs reproducible repository label May 4, 2021
@JozefFlakus
Copy link
Member

@paulvitic any updates in this topic?

@robert-gruner
Copy link

Unfortunately this issue seems stale. Anyways @JozefFlakus we tried to get Marble running with an RabbitMq and failed as well. We even checked out marble source code, ran "yarn" and "yarn build". "yarn test:integration" does show that there might be a real problem with messaging since the tests are red. The script downloaded the latest "rabbitmq:management". Maybe you can have a look?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working needs reproducible example Needs reproducible repository scope: messaging Relates to @marblejs/messaging package
Projects
None yet
Development

No branches or pull requests

3 participants