diff --git a/server/app/com/xsn/explorer/modules/SeederModule.scala b/server/app/com/xsn/explorer/modules/SeederModule.scala new file mode 100644 index 0000000..9422bca --- /dev/null +++ b/server/app/com/xsn/explorer/modules/SeederModule.scala @@ -0,0 +1,84 @@ +package com.xsn.explorer.modules + +import javax.inject.{Inject, Singleton} + +import akka.stream.Materializer +import akka.stream.alpakka.sqs.SqsSourceSettings +import akka.stream.alpakka.sqs.scaladsl.SqsSource +import akka.stream.scaladsl.Sink +import com.amazonaws.services.sqs.AmazonSQSAsync +import com.amazonaws.services.sqs.model.Message +import com.xsn.explorer.config.SeederConfig +import com.xsn.explorer.models.Blockhash +import com.xsn.explorer.processors.BlockEventsProcessor +import org.scalactic.{Bad, Good} +import org.slf4j.LoggerFactory +import play.api.inject.{SimpleModule, _} + +import scala.concurrent.ExecutionContext.Implicits.global +import scala.util.control.NonFatal +import scala.util.{Failure, Success} + +class SeederModule extends SimpleModule(bind[SQSSeederTask].toSelf.eagerly()) + +@Singleton +class SQSSeederTask @Inject() ( + config: SeederConfig, + blockEventsProcessor: BlockEventsProcessor)( + implicit sqs: AmazonSQSAsync, + materializer: Materializer) { + + private val logger = LoggerFactory.getLogger(this.getClass) + + private val settings = SqsSourceSettings.Defaults.copy(maxBatchSize = 1, maxBufferSize = 1) + + if (config.isEnabled) { + run() + } else { + logger.info("Seeder is disabled") + } + + def run(): Unit = { + logger.info("Starting seeder") + + SqsSource(config.queueUrl, settings) + .runWith(Sink.foreach(handleMessage)) + .onComplete { + case Failure(ex) => + logger.error("Failed to stream SQS messages", ex) + + case Success(_) => + logger.info("SQS stream completed") + } + } + + private def handleMessage(message: Message): Unit = { + def onBlockhash(blockhash: Blockhash) = { + val result = blockEventsProcessor.newLatestBlock(blockhash) + + result.recover { + case NonFatal(ex) => + logger.error(s"Failed to process the latest block = ${blockhash.string}", ex) + } + + result.foreach { + case Bad(errors) => + logger.error(s"Failed to process the latest block = ${blockhash.string}, errors = $errors") + + case Good(_) => + logger.info(s"Block processed successfully = ${blockhash.string}") + sqs.deleteMessageAsync(config.queueUrl, message.getReceiptHandle) + } + } + + val body = message.getBody + Blockhash + .from(body) + .orElse { + logger.warn(s"Ignoring invalid message: $body") + sqs.deleteMessageAsync(config.queueUrl, message.getReceiptHandle) + None + } + .foreach(onBlockhash) + } +} \ No newline at end of file diff --git a/server/conf/application.conf b/server/conf/application.conf index e9e487d..63a8264 100644 --- a/server/conf/application.conf +++ b/server/conf/application.conf @@ -43,6 +43,7 @@ play.modules.enabled += "com.xsn.explorer.modules.AmazonSQSAsyncModule" play.modules.enabled += "com.xsn.explorer.modules.DataHandlerModule" play.modules.enabled += "com.xsn.explorer.modules.ConfigModule" play.modules.enabled += "com.xsn.explorer.modules.ExecutorsModule" +play.modules.enabled += "com.xsn.explorer.modules.SeederModule" play.modules.enabled += "com.xsn.explorer.modules.XSNServiceModule"