Using following libraries:
grpc-java : v1.30.2
netty : v4.1.65.Final
JDK : 11
I have spring-boot application which acts as grpc client. My application is connected to more than 100 servers in non-secure mode. It is bidirectional stream where server is sending data on regular interval. So I am creating ManagedChannel for each server and processing the received data.
I am using following to create ManagedChannel and passing custom executor while creating Managed channel
@Autowired
@Qualifier("CustomExecutor")
private ExecutorService customExecutor;
final NettyChannelBuilder channelBuilder = NettyChannelBuilder.forAddress(inetAddr, port).executor(customExecutor);
ManagedChannel managedChannel = channelBuilder.usePlaintext().build();
Custom executor is as following:
@Bean(name = "CustomExecutor")
public ExecutorService customExecutor() {
final int maxPoolSize = Runtime.getRuntime().availableProcessors() * 2;
return new CustomThreadPoolExecutor(0, maxPoolSize, 60, new ThreadFactoryBuilder().setNameFormat("Custom-%d").build());
}
private class CustomThreadPoolExecutor extends ThreadPoolExecutor {
public CustomThreadPoolExecutor(final int corePoolSize, final int maximumPoolSize,
final long keepAliveTimeInSeconds,
final ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTimeInSeconds, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), threadFactory, new CustomDiscardPolicy());
}
}
private class CustomDiscardPolicy implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
// log rejection message and further processing.
}
}
Once application started, I observed that after couple of hours, I am getting following errors in my ResponseObserver.
io.grpc.StatusRuntimeException: UNKNOWN
at io.grpc.Status.asRuntimeException(Status.java:533)
at io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onClose(ClientCalls.java:460)
at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:426)
at io.grpc.internal.ClientCallImpl.access$500(ClientCallImpl.java:66)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.close(ClientCallImpl.java:689)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.access$900(ClientCallImpl.java:577)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:751)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:740)
at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.OutOfMemoryError: Direct buffer memory
at java.base/java.nio.Bits.reserveMemory(Bits.java:175)
at java.base/java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:118)
at java.base/java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:317)
at io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:632)
at io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:607)
at io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:202)
at io.netty.buffer.PoolArena.tcacheAllocateSmall(PoolArena.java:172)
at io.netty.buffer.PoolArena.allocate(PoolArena.java:134)
at io.netty.buffer.PoolArena.allocate(PoolArena.java:126)
at io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:395)
at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:187)
at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:178)
at io.netty.channel.unix.PreferredDirectByteBufAllocator.ioBuffer(PreferredDirectByteBufAllocator.java:53)
at io.netty.channel.DefaultMaxMessagesRecvByteBufAllocator$MaxMessageHandle.allocate(DefaultMaxMessagesRecvByteBufAllocator.java:114)
at io.netty.channel.epoll.EpollRecvByteAllocatorHandle.allocate(EpollRecvByteAllocatorHandle.java:75)
at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:780)
at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:480)
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
... 1 common frames omitted
As it is not OOM for heap space, so heap dump is not generated.
I see https://github.com/grpc/grpc-java/issues/6910 which is marked as CLOSED.
Is this a known issue or am I missing anything here?