From e93ec20cdb2189221528802113b8716f1c8832ac Mon Sep 17 00:00:00 2001 From: Alexis Hernandez Date: Wed, 11 Apr 2018 22:03:39 -0500 Subject: [PATCH] server: Update the SQSSeederTask to call the BlockSynchronizerTask --- .../data/anorm/DatabasePostgresSeeder.scala | 2 +- .../xsn/explorer/modules/SeederModule.scala | 7 ++- .../processors/BlockEventsProcessor.scala | 7 ++- .../tasks/BlockSynchronizerTask.scala | 48 +++++++++++-------- 4 files changed, 37 insertions(+), 27 deletions(-) diff --git a/server/app/com/xsn/explorer/data/anorm/DatabasePostgresSeeder.scala b/server/app/com/xsn/explorer/data/anorm/DatabasePostgresSeeder.scala index c4f4bff..c2e5b54 100644 --- a/server/app/com/xsn/explorer/data/anorm/DatabasePostgresSeeder.scala +++ b/server/app/com/xsn/explorer/data/anorm/DatabasePostgresSeeder.scala @@ -70,7 +70,7 @@ class DatabasePostgresSeeder @Inject() ( result .map(Good(_)) - .getOrElse(throw new RuntimeException("Unable to add the new latest block")) + .getOrElse(throw new RuntimeException("Unable to an old block")) } private def upsertBlockCascade(command: CreateBlockCommand)(implicit conn: Connection): Option[Unit] = { diff --git a/server/app/com/xsn/explorer/modules/SeederModule.scala b/server/app/com/xsn/explorer/modules/SeederModule.scala index 9422bca..211e711 100644 --- a/server/app/com/xsn/explorer/modules/SeederModule.scala +++ b/server/app/com/xsn/explorer/modules/SeederModule.scala @@ -11,6 +11,7 @@ import com.amazonaws.services.sqs.model.Message import com.xsn.explorer.config.SeederConfig import com.xsn.explorer.models.Blockhash import com.xsn.explorer.processors.BlockEventsProcessor +import com.xsn.explorer.tasks.BlockSynchronizerTask import org.scalactic.{Bad, Good} import org.slf4j.LoggerFactory import play.api.inject.{SimpleModule, _} @@ -24,7 +25,8 @@ class SeederModule extends SimpleModule(bind[SQSSeederTask].toSelf.eagerly()) @Singleton class SQSSeederTask @Inject() ( config: SeederConfig, - blockEventsProcessor: BlockEventsProcessor)( + blockEventsProcessor: BlockEventsProcessor, + blockSynchronizerTask: BlockSynchronizerTask)( implicit sqs: AmazonSQSAsync, materializer: Materializer) { @@ -68,6 +70,7 @@ class SQSSeederTask @Inject() ( case Good(_) => logger.info(s"Block processed successfully = ${blockhash.string}") sqs.deleteMessageAsync(config.queueUrl, message.getReceiptHandle) + blockSynchronizerTask.sync() } } @@ -81,4 +84,4 @@ class SQSSeederTask @Inject() ( } .foreach(onBlockhash) } -} \ No newline at end of file +} diff --git a/server/app/com/xsn/explorer/processors/BlockEventsProcessor.scala b/server/app/com/xsn/explorer/processors/BlockEventsProcessor.scala index e1e512b..36506ca 100644 --- a/server/app/com/xsn/explorer/processors/BlockEventsProcessor.scala +++ b/server/app/com/xsn/explorer/processors/BlockEventsProcessor.scala @@ -6,9 +6,8 @@ import com.alexitc.playsonify.core.FutureOr.Implicits.FutureOps import com.alexitc.playsonify.core.{ApplicationResult, FutureApplicationResult} import com.xsn.explorer.data.BlockBlockingDataHandler import com.xsn.explorer.data.anorm.DatabasePostgresSeeder -import com.xsn.explorer.models.Blockhash +import com.xsn.explorer.models.{Blockhash, Transaction} import com.xsn.explorer.models.rpc.Block -import com.xsn.explorer.models.Transaction import com.xsn.explorer.services.XSNService import com.xsn.explorer.util.Extensions.FutureApplicationResultExt import org.scalactic.Good @@ -41,13 +40,13 @@ class BlockEventsProcessor @Inject() ( * * @param blockhash the new latest block */ - def newLatestBlock(blockhash: Blockhash): FutureApplicationResult[Unit] = { + def newLatestBlock(blockhash: Blockhash): FutureApplicationResult[Block] = { val result = for { block <- xsnService.getBlock(blockhash).toFutureOr rpcTransactions <- block.transactions.map(xsnService.getTransaction).toFutureOr transactions = rpcTransactions.map(Transaction.fromRPC) r <- newLatestBlock(block, transactions).toFutureOr - } yield r + } yield block result.toFuture } diff --git a/server/app/com/xsn/explorer/tasks/BlockSynchronizerTask.scala b/server/app/com/xsn/explorer/tasks/BlockSynchronizerTask.scala index adca5c5..26ef3ec 100644 --- a/server/app/com/xsn/explorer/tasks/BlockSynchronizerTask.scala +++ b/server/app/com/xsn/explorer/tasks/BlockSynchronizerTask.scala @@ -57,7 +57,7 @@ class BlockSynchronizerTask @Inject() ( } private def tryToSync() = { - val result = for { + val futureOr = for { firstBlock <- blockDataHandler.getFirstBlock().toFutureOr previous <- Or.from(firstBlock.previousBlockhash, One(BlockNotFoundError)).toFutureOr _ <- { @@ -67,16 +67,38 @@ class BlockSynchronizerTask @Inject() ( } else { // sync logger.info(s"Sync required until block ${firstBlock.height.int}") - doSync(previous) + val r = doSync(previous) + + r.foreach { + case Good(_) => + logger.info("Sync completed") + case _ => () + } + + r } }.toFutureOr } yield () - result.toFuture + val result = futureOr.toFuture + + result.foreach { + case Bad(errors) => + logger.error(s"Failed to sync blocks, errors = $errors") + + case _ => () + } + + result.recover { + case NonFatal(ex) => + logger.error(s"Failed to sync blocks", ex) + } + + result } private def doSync(blockhash: Blockhash): FutureApplicationResult[Unit] = { - val futureOr = for { + val result = for { block <- xsnService.getBlock(blockhash).toFutureOr rpcTransactions <- block.transactions.map(xsnService.getTransaction).toFutureOr transactions = rpcTransactions.map(Transaction.fromRPC) @@ -87,7 +109,7 @@ class BlockSynchronizerTask @Inject() ( _ <- block .previousBlockhash - .filter(_ => block.height.int >= FirstBlockHeight.int) + .filter(_ => block.height.int > FirstBlockHeight.int) .map { previous => doSync(previous) } @@ -98,20 +120,6 @@ class BlockSynchronizerTask @Inject() ( .toFutureOr } yield () - val result = futureOr.toFuture - - result.foreach { - case Bad(errors) => - logger.error(s"Failed to sync block = ${blockhash.string}, errors = $errors") - - case _ => () - } - - result.recover { - case NonFatal(ex) => - logger.error(s"Failed to sync block = ${blockhash.string}", ex) - } - - result + result.toFuture } }