|
@ -40,17 +40,22 @@ class SQSSeederTask @Inject() ( |
|
|
def run(): Unit = { |
|
|
def run(): Unit = { |
|
|
logger.info("Starting seeder") |
|
|
logger.info("Starting seeder") |
|
|
|
|
|
|
|
|
|
|
|
def f(): Unit = { |
|
|
SqsSource(config.queueUrl, settings) |
|
|
SqsSource(config.queueUrl, settings) |
|
|
.runWith(Sink.foreach(handleMessage)) |
|
|
.runWith(Sink.foreach(handleMessage)) |
|
|
.onComplete { |
|
|
.onComplete { |
|
|
case Failure(ex) => |
|
|
case Failure(ex) => |
|
|
logger.error("Failed to stream SQS messages", ex) |
|
|
logger.error("Failed to stream SQS messages, restarting seeder", ex) |
|
|
|
|
|
f() |
|
|
|
|
|
|
|
|
case Success(_) => |
|
|
case Success(_) => |
|
|
logger.info("SQS stream completed") |
|
|
logger.info("SQS stream completed") |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
f() |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
private def handleMessage(message: Message): Unit = { |
|
|
private def handleMessage(message: Message): Unit = { |
|
|
def onBlockhash(blockhash: Blockhash) = { |
|
|
def onBlockhash(blockhash: Blockhash) = { |
|
|
val result = blockEventsProcessor.newLatestBlock(blockhash) |
|
|
val result = blockEventsProcessor.newLatestBlock(blockhash) |
|
|