Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PgSubscriberImpl.fetch() not implemented #1403

Open
andreas-eberle opened this issue Feb 1, 2024 · 3 comments
Open

PgSubscriberImpl.fetch() not implemented #1403

andreas-eberle opened this issue Feb 1, 2024 · 3 comments
Labels
Milestone

Comments

@andreas-eberle
Copy link
Contributor

Questions

The function PgSubscriberImpl.fetch() is not implemented and throws an UnsupportedOperationException. This is a problem when using the Mutiny wrapper for the vertx-pg-client e.g. in Quarkus.

When you use PgSubscriber mechanism in a quarkus project leveraging the Mutiny framework, the following code runs into the UnsupportedOperationException.

import javax.enterprise.context.ApplicationScoped
import io.quarkus.runtime.ShutdownEvent

import javax.enterprise.event.Observes

import io.quarkus.runtime.StartupEvent
import io.vertx.mutiny.core.Vertx
import io.vertx.mutiny.pgclient.pubsub.PgSubscriber
import io.vertx.pgclient.PgConnectOptions
import java.time.Duration


@ApplicationScoped
class PgChannelObserver {

    fun onStart(@Observes ev: StartupEvent?, vertx: Vertx) {

        val subscriber = PgSubscriber.subscriber(vertx, connectOptions())
        subscriber.connect().await().atMost(Duration.ofSeconds(60))
        
        val channel = subscriber.channel("testchannel")        // I trigger the pg_notify directly in an pgsql session for that channel
        
        channel.toMulti().subscribe().with { n -> print(n) }
    }

    private fun connectOptions() = PgConnectOptions()
            .setPort(5432)
            .setHost("localhost")
            .setDatabase("rainbow_database")
            .setUser("unicorn_user")
            .setPassword("magical_password")
}

When I don't use Mutiny and instead use:

channel.handler { notif -> print("Notification received: $notif") }

everything works as expected.

I was wondering why the implementation for fetch is left out and if it is a Mutiny issue (needs to be addresses in smallrye) or related to an incomplete implementation of the PgSubscriber interface?

@vietj
Copy link
Member

vietj commented Feb 1, 2024

it's not implemented in vertx:

    public ReadStream<String> fetch(long amount) {
      throw new UnsupportedOperationException();
    }

@vietj vietj added this to the 4.5.3 milestone Feb 1, 2024
@vietj
Copy link
Member

vietj commented Feb 1, 2024

I think it could be implemented easily given that this class does not (purposely) buffers events

@vietj
Copy link
Member

vietj commented Feb 2, 2024

are you interested in contributing this @andreas-eberle

@vietj vietj modified the milestones: 4.5.3, 4.5.4 Feb 6, 2024
@vietj vietj modified the milestones: 4.5.4, 4.5.5 Feb 22, 2024
@vietj vietj modified the milestones: 4.5.5, 4.5.6 Mar 14, 2024
@vietj vietj modified the milestones: 4.5.6, 4.5.7, 4.5.8 Mar 21, 2024
@vietj vietj modified the milestones: 4.5.8, 4.5.9 May 24, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants