From 9dd94f299b5adac01b231c42dbfa6a442bd8b647 Mon Sep 17 00:00:00 2001 From: Alexis Hernandez Date: Sun, 13 May 2018 11:24:43 -0500 Subject: [PATCH] server: Add retries to the SQSSeederTask Don't stop consuming messages when the connection to SQS is lost, instead, retry to connect. --- .../xsn/explorer/tasks/SQSSeederTask.scala | 23 +++++++++++-------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/server/app/com/xsn/explorer/tasks/SQSSeederTask.scala b/server/app/com/xsn/explorer/tasks/SQSSeederTask.scala index 39fbd4e..01004f1 100644 --- a/server/app/com/xsn/explorer/tasks/SQSSeederTask.scala +++ b/server/app/com/xsn/explorer/tasks/SQSSeederTask.scala @@ -40,15 +40,20 @@ class SQSSeederTask @Inject() ( def run(): Unit = { logger.info("Starting seeder") - SqsSource(config.queueUrl, settings) - .runWith(Sink.foreach(handleMessage)) - .onComplete { - case Failure(ex) => - logger.error("Failed to stream SQS messages", ex) - - case Success(_) => - logger.info("SQS stream completed") - } + def f(): Unit = { + SqsSource(config.queueUrl, settings) + .runWith(Sink.foreach(handleMessage)) + .onComplete { + case Failure(ex) => + logger.error("Failed to stream SQS messages, restarting seeder", ex) + f() + + case Success(_) => + logger.info("SQS stream completed") + } + } + + f() } private def handleMessage(message: Message): Unit = {