在SQS队列中使用许多消费者

我知道可以使用多个线程来使用SQS队列。 我想保证每个消息将被消耗一次。 我知道可以改变消息的可见性超时,例如等于我的处理时间。 如果我的进程花费比可见性超时更多的时间(例如,连接速度较慢),则其他线程可能会使用相同的消息。

什么是最好的方法来保证消息将被处理一次?

什么是最好的方法来保证消息将被处理一次?

你要求保证你不会得到一个 。 您可以将消息被处理的概率降低到一个很小的数量 ,但是您不能得到保证

我会解释为什么,以及减less重复的策略。

重复来自哪里

  1. 当您在SQS中input消息时,SQS可能会不止一次地收到该消息
    • 例如:在发送消息时发生轻微的networking呃逆,导致一个短暂的错误,自动重试 – 从消息发送者的angular度来看,它一次失败,并成功发送一次,但SQS接收到这两个消息。
  2. SQS可以在内部生成重复项
    • 模拟到第一个例子 – 有很多计算机处理封面下的消息,SQS需要确保没有任何东西丢失 – 消息存储在多个服务器上,这可能会导致重复。

在大多数情况下,通过利用SQS消息可见性超时 ,来自这些源的重复机会已经非常小 – 比如小的百分之几。

如果处理重复的确不是那么糟糕 ( 努力使你的消息消费幂等 !),我认为这足够好 – 减less重复的机会进一步是复杂的,并可能昂贵…


你的应用程序可以做些什么来进一步减less重复?

好的,在这里,我们在高级别下拉兔子洞,您将要为您的消息分配唯一的id,并在开始处理之前检查正在进行或已完成的id的primefacescaching:

  1. 确保您的邮件具有插入时提供的唯一标识符
    • 没有这个,你将无法分辨重复。
  2. 在消息的“行尾”处理重复。
    • 如果您的信息接收器需要将信息发送出去进行进一步处理,那么它可能是另一个重复来源(出于与上述类似的原因)
  3. 您需要在某处以primefaces方式存储和检查这些唯一的ID(并在一段时间后刷新)。 有两个重要的状态:“进行中”和“完成”
    • InProgress条目应该有一个超时时间,因为在处理失败的情况下需要恢复的速度有多快。
    • 根据您希望重复数据删除窗口的时间,完成的条目应该有一个超时
    • 最简单的可能是一个Guavacaching ,但只适用于单个处理应用程序。 如果您有很多消息或分布式消耗,请考虑用于此作业的数据库(使用后台进程扫描过期条目)
  4. 在处理消息之前,尝试将messageId存储在“InProgress”中。 如果它已经在那里,停下来 – 你只是处理了一个副本。
  5. 检查消息是否“已完成”(如果有,则停止)
  6. 您的线程现在拥有该消息的独占locking – 处理您的消息
  7. 将messageId标记为“已完成” – 只要此messageId留在此处,您将不会处理该messageId的任何重复项
    • 你可能无法承受无限的存储空间。
  8. 从“InProgress”中删除messageId(或者让它从这里过期)

一些笔记

  • 请记住,没有所有这些重复的机会已经很低。 根据消息的重复数据消除时间和金钱的价值,您可以随意跳过或修改任何步骤
    • 例如,你可以省略“InProgress”,但是这样做的结果是两个线程同时处理重复消息的机会很小(第二个线程在第一个线程已经完成之前就开始了)
  • 您的重复数据删除窗口只要您将messageIds保留在“已完成”即可。 由于您可能无法承受无限的存储空间,因此您至less可以将您的SQS消息可见性超时设置为2x; 在那之后减less重复的可能性(在已经非常低的机会之上,但仍然不能保证)。
  • 即便如此, 仍然存在重复的机会 – 所有的预防措施和SQS消息可见性超时有助于将这个机会减less到非常小,但机会仍然存在:
    • 在处理完消息之后,你的应用程序可能会崩溃/挂起/执行一个非常长的GC,但是在messageId是“Completed”之前(也许你正在使用一个数据库来存储这个存储,并且连接到它)
    • 在这种情况下,“处理”最终会过期,另一个线程可能会处理此消息(在SQS可见性超时也到期之后,或者因为SQS中有重复)。

将消息或对消息的引用存储在数据库中,并在消息标识上具有唯一的约束。 如果表中存在标识,那么您已经收到了该标识,并且由于唯一的限制,数据库将不允许再次插入标识。

您可以对消息和批处理使用setVisibilityTimeout(),以便延长可见性时间,直到线程处理完消息。

这可以通过使用scheduledExecutorService来完成,并在初始可见性时间的一半之后安排可运行事件。 下面的代码片断创build并执行VisibilityTimeExtender,每隔半个可见时间就会有一半的可见性时间。 (时间应该保证消息被处理,扩展到visibilityTime / 2)

private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); ScheduledFuture<?> futureEvent = scheduler.scheduleAtFixedRate(new VisibilityTimeExtender(..), visibilityTime/2, visibilityTime/2, TimeUnit.SECONDS); 

VisibilityTimeExtender必须实现Runnable,并且是您更新新的可见性时间的地方。

当线程完成处理消息时,可以从队列中删除它,并调用futureEvent.cancel(true)来停止计划的事件。

当您使用API​​等读取消息时,AWS SQS API不会自动“消耗”消息。 开发者需要打电话来自己删除邮件。

作为“死信队列设置”的一部分,SQS确实有一个function调用“重新驱动策略”。 您只需将读取请求设置为1.如果消耗过程崩溃,则后续读取同一消息时会将消息放入死信队列中。

SQS队列可见性超时可以设置为12小时。 除非你有特殊的需要,否则你需要实现过程来将消息处理程序存储在数据库中,以便进行检查。