From d8c24f9057fa62646c16b8125c1edf400618d272 Mon Sep 17 00:00:00 2001 From: Alexis Hernandez Date: Thu, 14 Jun 2018 23:26:21 -0500 Subject: [PATCH] server: Fixes #21 Add the PollingSeederModule This is a replacement for the SQSSeederModule, instead of requiring AWS SQS to sync the database, we poll the latest block from the rpc server every minute. As reuse most of the logic from the SQS seeding approach, this could be a good replacement to remove the AWS dependency, at the moment, the idea is to use this new module locally. --- .../modules/PollingSeederModule.scala | 6 ++ .../explorer/tasks/PollingSeederTask.scala | 61 +++++++++++++++++++ 2 files changed, 67 insertions(+) create mode 100644 server/app/com/xsn/explorer/modules/PollingSeederModule.scala create mode 100644 server/app/com/xsn/explorer/tasks/PollingSeederTask.scala diff --git a/server/app/com/xsn/explorer/modules/PollingSeederModule.scala b/server/app/com/xsn/explorer/modules/PollingSeederModule.scala new file mode 100644 index 0000000..385a055 --- /dev/null +++ b/server/app/com/xsn/explorer/modules/PollingSeederModule.scala @@ -0,0 +1,6 @@ +package com.xsn.explorer.modules + +import com.xsn.explorer.tasks.PollingSeederTask +import play.api.inject.{SimpleModule, bind} + +class PollingSeederModule extends SimpleModule(bind[PollingSeederTask].toSelf.eagerly()) diff --git a/server/app/com/xsn/explorer/tasks/PollingSeederTask.scala b/server/app/com/xsn/explorer/tasks/PollingSeederTask.scala new file mode 100644 index 0000000..1606bdb --- /dev/null +++ b/server/app/com/xsn/explorer/tasks/PollingSeederTask.scala @@ -0,0 +1,61 @@ +package com.xsn.explorer.tasks + +import javax.inject.Inject + +import akka.actor.ActorSystem +import com.alexitc.playsonify.core.FutureOr.Implicits.FutureOps +import com.xsn.explorer.processors.BlockEventsProcessor +import com.xsn.explorer.services.XSNService +import org.scalactic.Bad +import org.slf4j.LoggerFactory + +import scala.concurrent.ExecutionContext +import scala.concurrent.duration.DurationInt +import scala.util.control.NonFatal + +class PollingSeederTask @Inject() ( + actorSystem: ActorSystem, + xsnService: XSNService, + blockEventsProcessor: BlockEventsProcessor, + backwardsSynchronizerTask: BackwardsSynchronizerTask)( + implicit ec: ExecutionContext) { + + private val logger = LoggerFactory.getLogger(this.getClass) + + start() + + def start() = { + logger.info("Starting polling seeder task") + actorSystem.scheduler.schedule(initialDelay = 15.seconds, interval = 1.minute)(run) + } + + private def run(): Unit = { + val result = for { + block <- xsnService.getLatestBlock().toFutureOr + result <- blockEventsProcessor.processBlock(block.hash).toFutureOr + } yield onBlockResult(result) + + val _ = result + .toFuture + .map { + case Bad(errors) => logger.error(s"Failed to sync latest block, errors = $errors") + case _ => () + } + .recover { + case NonFatal(ex) => logger.error("Failed to sync latest block", ex) + } + } + + private def onBlockResult(eventResult: BlockEventsProcessor.Result) = eventResult match { + case BlockEventsProcessor.FirstBlockCreated(block) => + backwardsSynchronizerTask.sync(block) + + case BlockEventsProcessor.RechainDone(_, newBlock) => + backwardsSynchronizerTask.sync(newBlock) + + case BlockEventsProcessor.MissingBlockProcessed(block) => + backwardsSynchronizerTask.sync(block) + + case _ => () + } +}