Browse Source
This is a replacement for the SQSSeederModule, instead of requiring AWS SQS to sync the database, we poll the latest block from the rpc server every minute. As reuse most of the logic from the SQS seeding approach, this could be a good replacement to remove the AWS dependency, at the moment, the idea is to use this new module locally.prometheus-integration
Alexis Hernandez
7 years ago
2 changed files with 67 additions and 0 deletions
@ -0,0 +1,6 @@ |
|||||
|
package com.xsn.explorer.modules |
||||
|
|
||||
|
import com.xsn.explorer.tasks.PollingSeederTask |
||||
|
import play.api.inject.{SimpleModule, bind} |
||||
|
|
||||
|
class PollingSeederModule extends SimpleModule(bind[PollingSeederTask].toSelf.eagerly()) |
@ -0,0 +1,61 @@ |
|||||
|
package com.xsn.explorer.tasks |
||||
|
|
||||
|
import javax.inject.Inject |
||||
|
|
||||
|
import akka.actor.ActorSystem |
||||
|
import com.alexitc.playsonify.core.FutureOr.Implicits.FutureOps |
||||
|
import com.xsn.explorer.processors.BlockEventsProcessor |
||||
|
import com.xsn.explorer.services.XSNService |
||||
|
import org.scalactic.Bad |
||||
|
import org.slf4j.LoggerFactory |
||||
|
|
||||
|
import scala.concurrent.ExecutionContext |
||||
|
import scala.concurrent.duration.DurationInt |
||||
|
import scala.util.control.NonFatal |
||||
|
|
||||
|
class PollingSeederTask @Inject() ( |
||||
|
actorSystem: ActorSystem, |
||||
|
xsnService: XSNService, |
||||
|
blockEventsProcessor: BlockEventsProcessor, |
||||
|
backwardsSynchronizerTask: BackwardsSynchronizerTask)( |
||||
|
implicit ec: ExecutionContext) { |
||||
|
|
||||
|
private val logger = LoggerFactory.getLogger(this.getClass) |
||||
|
|
||||
|
start() |
||||
|
|
||||
|
def start() = { |
||||
|
logger.info("Starting polling seeder task") |
||||
|
actorSystem.scheduler.schedule(initialDelay = 15.seconds, interval = 1.minute)(run) |
||||
|
} |
||||
|
|
||||
|
private def run(): Unit = { |
||||
|
val result = for { |
||||
|
block <- xsnService.getLatestBlock().toFutureOr |
||||
|
result <- blockEventsProcessor.processBlock(block.hash).toFutureOr |
||||
|
} yield onBlockResult(result) |
||||
|
|
||||
|
val _ = result |
||||
|
.toFuture |
||||
|
.map { |
||||
|
case Bad(errors) => logger.error(s"Failed to sync latest block, errors = $errors") |
||||
|
case _ => () |
||||
|
} |
||||
|
.recover { |
||||
|
case NonFatal(ex) => logger.error("Failed to sync latest block", ex) |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
private def onBlockResult(eventResult: BlockEventsProcessor.Result) = eventResult match { |
||||
|
case BlockEventsProcessor.FirstBlockCreated(block) => |
||||
|
backwardsSynchronizerTask.sync(block) |
||||
|
|
||||
|
case BlockEventsProcessor.RechainDone(_, newBlock) => |
||||
|
backwardsSynchronizerTask.sync(newBlock) |
||||
|
|
||||
|
case BlockEventsProcessor.MissingBlockProcessed(block) => |
||||
|
backwardsSynchronizerTask.sync(block) |
||||
|
|
||||
|
case _ => () |
||||
|
} |
||||
|
} |
Loading…
Reference in new issue