Browse Source
The task runs frequently polling the xsn service for its latest block, then, it calls the LedgerSynchronizerService to ensure that block gets into the ledger database.prometheus-integration
3 changed files with 83 additions and 1 deletions
@ -0,0 +1,25 @@ |
|||
package com.xsn.explorer.config |
|||
|
|||
import javax.inject.Inject |
|||
|
|||
import play.api.Configuration |
|||
|
|||
import scala.concurrent.duration.{DurationLong, FiniteDuration} |
|||
|
|||
trait LedgerSynchronizerConfig { |
|||
|
|||
def enabled: Boolean |
|||
|
|||
def initialDelay: FiniteDuration |
|||
|
|||
def interval: FiniteDuration |
|||
} |
|||
|
|||
class LedgerSynchronizerPlayConfig @Inject() (config: Configuration) extends LedgerSynchronizerConfig { |
|||
|
|||
override lazy val enabled: Boolean = config.getOptional[Boolean]("synchronizer.enabled").getOrElse(false) |
|||
|
|||
override lazy val initialDelay: FiniteDuration = config.getOptional[FiniteDuration]("synchronizer.initialDelay").getOrElse(15.seconds) |
|||
|
|||
override lazy val interval: FiniteDuration = config.getOptional[FiniteDuration]("synchronizer.interval").getOrElse(60.seconds) |
|||
} |
@ -1,12 +1,13 @@ |
|||
package com.xsn.explorer.modules |
|||
|
|||
import com.google.inject.AbstractModule |
|||
import com.xsn.explorer.config.{PlayRPCConfig, PlaySeederConfig, RPCConfig, SeederConfig} |
|||
import com.xsn.explorer.config._ |
|||
|
|||
class ConfigModule extends AbstractModule { |
|||
|
|||
override def configure(): Unit = { |
|||
bind(classOf[RPCConfig]).to(classOf[PlayRPCConfig]) |
|||
bind(classOf[SeederConfig]).to(classOf[PlaySeederConfig]) |
|||
bind(classOf[LedgerSynchronizerConfig]).to(classOf[LedgerSynchronizerPlayConfig]) |
|||
} |
|||
} |
|||
|
@ -0,0 +1,56 @@ |
|||
package com.xsn.explorer.tasks |
|||
|
|||
import javax.inject.Inject |
|||
|
|||
import akka.actor.ActorSystem |
|||
import com.alexitc.playsonify.core.FutureOr.Implicits.FutureOps |
|||
import com.xsn.explorer.config.LedgerSynchronizerConfig |
|||
import com.xsn.explorer.services.{LedgerSynchronizerService, XSNService} |
|||
import org.scalactic.Bad |
|||
import org.slf4j.LoggerFactory |
|||
|
|||
import scala.concurrent.ExecutionContext |
|||
import scala.util.control.NonFatal |
|||
|
|||
class PollerSynchronizerTask @Inject() ( |
|||
config: LedgerSynchronizerConfig, |
|||
actorSystem: ActorSystem, |
|||
xsnService: XSNService, |
|||
ledgerSynchronizerService: LedgerSynchronizerService)( |
|||
implicit ec: ExecutionContext) { |
|||
|
|||
private val logger = LoggerFactory.getLogger(this.getClass) |
|||
|
|||
if (config.enabled) { |
|||
start() |
|||
} else { |
|||
logger.info("The polled synchronizer is not enabled") |
|||
} |
|||
|
|||
def start() = { |
|||
logger.info("Starting the poller synchronizer task") |
|||
actorSystem.scheduler.scheduleOnce(config.initialDelay) { |
|||
run() |
|||
} |
|||
} |
|||
|
|||
private def run(): Unit = { |
|||
val result = for { |
|||
block <- xsnService.getLatestBlock().toFutureOr |
|||
_ <- ledgerSynchronizerService.synchronize(block.hash).toFutureOr |
|||
} yield () |
|||
|
|||
result |
|||
.toFuture |
|||
.map { |
|||
case Bad(errors) => logger.error(s"Failed to sync latest block, errors = $errors") |
|||
case _ => () |
|||
} |
|||
.recover { |
|||
case NonFatal(ex) => logger.error("Failed to sync latest block", ex) |
|||
} |
|||
.foreach { _ => |
|||
actorSystem.scheduler.scheduleOnce(config.interval) { run() } |
|||
} |
|||
} |
|||
} |
Loading…
Reference in new issue