I have a vertx server which sends on events to all streamers an information. Using a console all working as wished. But if I use a vertx webclient the client is connected but did not receive anything.
the web client
val webClientOpt = WebClientOptions()
.setKeepAlive(true)
.setUserAgent("Client/2.0")
.setFollowRedirects(true)
.setShared(true)
the client call
client
.get(port, host, UriTemplate.of(s"${path}event-bus/"))
.putHeader("content-type", "application/json")
.bearerTokenAuthentication(UserBuffer.loggedInUser().jwtToken())
.as(BodyCodec.pipe(writeBuffer)).send()
now the writeBuffer and reader
val writeBuffer = ReactiveWriteStream.writeStream[Buffer](vertx)
val readStream = ReactiveReadStream.readStream[Buffer]()
Also the stuff to get the data
readStream.handler(j => {
println("CONSUMING!!!!")
println(j.toString("UTF-8"))
})
writeBuffer.subscribe(readStream)
I know I have to use the BodyCodec.pipe but I think here is my problem. I think I don't use it correctly.
It was not clearly to me how to use the WriteStream correctly. Now I use
new WriteStream[Buffer]() {
override def write(buffer: Buffer): io.vertx.core.Future[Void] = {
println(buffer.toString())
Future.successful(null).asVertx
}
override def end(): io.vertx.core.Future[Void] = {
Future.successful(null).asVertx
}
override def setWriteQueueMaxSize(maxSize: Int): WriteStream[Buffer] = this
override def writeQueueFull(): Boolean = false
override def drainHandler(handler: Handler[Void]): WriteStream[Buffer] = this
override def exceptionHandler(handler: Handler[Throwable]): WriteStream[Buffer] = this
}
That is what I was looking for.