0

I have a vertx server which sends on events to all streamers an information. Using a console all working as wished. But if I use a vertx webclient the client is connected but did not receive anything.

the web client

val webClientOpt = WebClientOptions()
      .setKeepAlive(true)
      .setUserAgent("Client/2.0")
      .setFollowRedirects(true)
      .setShared(true)

the client call

client
      .get(port, host, UriTemplate.of(s"${path}event-bus/"))
      .putHeader("content-type", "application/json")
      .bearerTokenAuthentication(UserBuffer.loggedInUser().jwtToken())
      .as(BodyCodec.pipe(writeBuffer)).send()

now the writeBuffer and reader

val writeBuffer = ReactiveWriteStream.writeStream[Buffer](vertx)
val readStream = ReactiveReadStream.readStream[Buffer]()

Also the stuff to get the data

readStream.handler(j => {
          println("CONSUMING!!!!")
          println(j.toString("UTF-8"))
        })
        writeBuffer.subscribe(readStream)

I know I have to use the BodyCodec.pipe but I think here is my problem. I think I don't use it correctly.


It was not clearly to me how to use the WriteStream correctly. Now I use

new WriteStream[Buffer]() {
    override def write(buffer: Buffer): io.vertx.core.Future[Void] = {
      println(buffer.toString())
      Future.successful(null).asVertx
    }
    override def end(): io.vertx.core.Future[Void] = {
      Future.successful(null).asVertx
    }
    override def setWriteQueueMaxSize(maxSize: Int): WriteStream[Buffer] = this
    override def writeQueueFull(): Boolean = false
    override def drainHandler(handler: Handler[Void]): WriteStream[Buffer] = this
    override def exceptionHandler(handler: Handler[Throwable]): WriteStream[Buffer] = this
  }

That is what I was looking for.


5
  • Please clarify your specific problem or provide additional details to highlight exactly what you need. As it's currently written, it's hard to tell exactly what you're asking. Commented Sep 10 at 22:06
  • For clarity, is this a one-shot stream (HTTP chunked) or Server Side Events or Websockets? Commented Sep 11 at 5:25
  • I have not understand the WriteBuffer. But now I found a way. Commented Sep 11 at 12:28
  • @AndréSchmidt If you found a solution, can you please answer your own question to the benefit of future readers? :) Commented Sep 15 at 12:52
  • @stefanobaghino yes i found a solution. I will add them Commented Nov 12 at 20:56

1 Answer 1

0

Here is a solution:

    private lazy val templatePipe = new WriteStream[Buffer]() {
    override def write(buffer: Buffer): io.vertx.core.Future[Void] = {
      val body = readFromString[TemplateEventResponse](buffer.toString())
      getTemplate(body.uniqueName)
        .map(_ => null).asVertx
    }

    override def end(): io.vertx.core.Future[Void] = {
      Future.successful(null).asVertx
    }

    override def setWriteQueueMaxSize(maxSize: Int): WriteStream[Buffer] = this

    override def writeQueueFull(): Boolean = false

    override def drainHandler(handler: Handler[Void]): WriteStream[Buffer] = this

    override def exceptionHandler(handler: Handler[Throwable]): WriteStream[Buffer] = this
  }

Above, you see the handler and now the way to register from a client to an server.

    private def updateStream(): Unit = {
    client
      .get(port, host, UriTemplate.of(s"${path}event-register/"))
      .putHeader("content-type", "application/json")
      .bearerTokenAuthentication(UserBuffer.loggedInUser().jwtToken())
      .as(BodyCodec.pipe(templatePipe))
      .send().asScala
      .onComplete {
        case Success(data) if data.statusCode() >= 200 && data.statusCode() < 300 =>
        case Success(value) =>
        //TODO meldung machen
        case Failure(err) =>
        //TODO meldung machen
      }
  }

If you call updateStream() this client register on a server and waits. On each event the handler will start the work. For the server I use this:

    private val streamers = mutable.HashSet[HttpServerResponse]()

    def registerUpdateEvent(ctx: RoutingContext): Future[?] = {
    val response = ctx.response()
      .putHeader("Content-Type", "text/json")
      .setChunked(true)
    streamers += response
    response.endHandler(_ => {
      streamers -= response
    })
    response.exceptionHandler(_ => {
      streamers -= response
    })
    Future.successful("")
  }

    override def start(): Unit = {
    super.start()
    vertx.eventBus().consumer[SettlementEvent](classOf[SettlementEvent].toString, event => {
      persistence.getNumberByEntityId(event.body().entityId).map { nr =>
        streamers.foreach { stream =>
          stream.write(
            writeToString(SettlementEventResponse(nr,event.body.getClass.getSimpleName))
          )
        }
      }
    })
  }

The start method has to be adapt to your needings. I listen on every event and fire to all listeners the json SettlementEventResponse. I tested it and it works. Also it is very fast. I hope that helps others.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.