弹性search中的EsRejectedExecutionException用于并行search
我在我的应用程序中使用单个传输客户端实例查询多个并行请求的elasticsearch。
我得到了以下的并行执行exception。 如何解决这个问题。
org.elasticsearch.common.util.concurrent.EsRejectedExecutionException: rejected execution (queue capacity 1000) on org.elasticsearch.search.action.SearchServiceTransportAction$23@5f804c60 at org.elasticsearch.common.util.concurrent.EsAbortPolicy.rejectedExecution(EsAbortPolicy.java:62) at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:821) at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1372) at org.elasticsearch.search.action.SearchServiceTransportAction.execute(SearchServiceTransportAction.java:509) at org.elasticsearch.search.action.SearchServiceTransportAction.sendExecuteScan(SearchServiceTransportAction.java:441) at org.elasticsearch.action.search.type.TransportSearchScanAction$AsyncAction.sendExecuteFirstPhase(TransportSearchScanAction.java:68) at org.elasticsearch.action.search.type.TransportSearchTypeAction$BaseAsyncAction.performFirstPhase(TransportSearchTypeAction.java:171) at org.elasticsearch.action.search.type.TransportSearchTypeAction$BaseAsyncAction.start(TransportSearchTypeAction.java:153) at org.elasticsearch.action.search.type.TransportSearchScanAction.doExecute(TransportSearchScanAction.java:52) at org.elasticsearch.action.search.type.TransportSearchScanAction.doExecute(TransportSearchScanAction.java:42) at org.elasticsearch.action.support.TransportAction.execute(TransportAction.java:63) at org.elasticsearch.action.search.TransportSearchAction.doExecute(TransportSearchAction.java:107) at org.elasticsearch.action.search.TransportSearchAction.doExecute(TransportSearchAction.java:43) at org.elasticsearch.action.support.TransportAction.execute(TransportAction.java:63) at org.elasticsearch.action.search.TransportSearchAction$TransportHandler.messageReceived(TransportSearchAction.java:124) at org.elasticsearch.action.search.TransportSearchAction$TransportHandler.messageReceived(TransportSearchAction.java:113) at org.elasticsearch.transport.netty.MessageChannelHandler.handleRequest(MessageChannelHandler.java:212) at org.elasticsearch.transport.netty.MessageChannelHandler.messageReceived(MessageChannelHandler.java:109) at org.elasticsearch.common.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70) at org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) at org.elasticsearch.common.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) at org.elasticsearch.common.netty.channel.Channels.fireMessageReceived(Channels.java:296) at org.elasticsearch.common.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:462) at org.elasticsearch.common.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:443) at org.elasticsearch.common.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:303) at org.elasticsearch.common.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70) at org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) at org.elasticsearch.common.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) at org.elasticsearch.common.netty.OpenChannelsHandler.handleUpstream(OpenChannelsHandler.java:74) at org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) at org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559) at org.elasticsearch.common.netty.channel.Channels.fireMessageReceived(Channels.java:268) at org.elasticsearch.common.netty.channel.Channels.fireMessageReceived(Channels.java:255) at org.elasticsearch.common.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88) at org.elasticsearch.common.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108) at org.elasticsearch.common.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:318) at org.elasticsearch.common.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89) at org.elasticsearch.common.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178) at org.elasticsearch.common.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108) at org.elasticsearch.common.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745)
Elasticsearch有一个线程池和一个队列用于每个节点的search。 一个线程池将有N个工作人员准备处理请求。 当一个请求到来,如果一个工人是免费的,这是由工人处理。 现在,默认情况下,工作人员的数量等于该CPU上的内核数量。 当工作人员已满并且有更多的search请求时,请求将进入队列。 队列的大小也是有限的。 如果默认大小是100,并且如果发生比这更多的并行请求,那么这些请求将被拒绝,正如您在错误日志中所看到的那样。
解决scheme:
-
直接的解决scheme是增加search队列的大小。 我们也可以增加线程池的大小,但是那样会严重影响个别查询的性能。 所以,增加队列可能是一个好主意。 但是请记住,这个队列是内存驻留,并且增加队列大小太多会导致Out Of Memory问题。 ( 更多信息 )
-
增加节点和副本的数量 – 记住每个节点都有自己的search线程池/队列。 此外,search可能发生在主分片或副本上。
也许这听起来很奇怪,但是你需要降低并行search次数。 有了这个例外,Elasticsearch告诉你,你正在超载它。 在Elasticsearch中设置了一些限制(在线程数级别),大多数情况下,这些限制的默认值是最好的select。 所以,如果你正在testing你的集群,看看它可以承载多less负载,这将是一个指标,已经达到了一些限制。
或者,如果您确实想要更改默认值,则可以尝试增加search的队列大小以适应并发需求,但请记住,队列大小越大,放置在群集上的压力越大,会造成不稳定。
我看到了这个相同的错误,因为我并行地向ES发送了很多索引请求。 由于我正在编写数据迁移,因此很容易将它们串联起来,并解决了这个问题。