From 78dbc3d16049b9db160384ea6b8c80a5c1cd05a2 Mon Sep 17 00:00:00 2001 From: Alexis Hernandez Date: Sun, 15 Apr 2018 13:05:05 -0500 Subject: [PATCH] server: Update BlockEventsProcessor to return detailed results --- .../processors/BlockEventsProcessor.scala | 51 ++++++++++++++----- .../xsn/explorer/tasks/SQSSeederTask.scala | 20 +++++++- 2 files changed, 56 insertions(+), 15 deletions(-) diff --git a/server/app/com/xsn/explorer/processors/BlockEventsProcessor.scala b/server/app/com/xsn/explorer/processors/BlockEventsProcessor.scala index ba7fa7e..ffed775 100644 --- a/server/app/com/xsn/explorer/processors/BlockEventsProcessor.scala +++ b/server/app/com/xsn/explorer/processors/BlockEventsProcessor.scala @@ -26,6 +26,8 @@ class BlockEventsProcessor @Inject() ( databaseSeeder: DatabaseFutureSeeder, blockDataHandler: BlockFutureDataHandler) { + import BlockEventsProcessor._ + private val logger = LoggerFactory.getLogger(this.getClass) /** @@ -44,12 +46,12 @@ class BlockEventsProcessor @Inject() ( * * @param blockhash the new latest block */ - def newLatestBlock(blockhash: Blockhash): FutureApplicationResult[Block] = { + def newLatestBlock(blockhash: Blockhash): FutureApplicationResult[Result] = { val result = for { block <- xsnService.getBlock(blockhash).toFutureOr transactions <- block.transactions.map(getTransaction).toFutureOr r <- newLatestBlock(block, transactions).toFutureOr - } yield block + } yield r result.toFuture } @@ -74,8 +76,8 @@ class BlockEventsProcessor @Inject() ( result.toFuture } - private def newLatestBlock(newBlock: Block, newTransactions: List[Transaction]): FutureApplicationResult[Unit] = { - def onRechain(orphanBlock: Block): FutureApplicationResult[Unit] = { + private def newLatestBlock(newBlock: Block, newTransactions: List[Transaction]): FutureApplicationResult[Result] = { + def onRechain(orphanBlock: Block): FutureApplicationResult[Result] = { val result = for { orphanTransactions <- orphanBlock.transactions.map(getTransaction).toFutureOr @@ -85,36 +87,47 @@ class BlockEventsProcessor @Inject() ( newBlock = newBlock, newTransactions = newTransactions) _ <- databaseSeeder.replaceLatestBlock(command).toFutureOr - } yield () + } yield RechainDone(orphanBlock, newBlock) result.toFuture } - def onFirstBlock: FutureApplicationResult[Unit] = { + def onFirstBlock: FutureApplicationResult[Result] = { logger.info(s"first block = ${newBlock.hash.string}") val command = DatabaseSeeder.CreateBlockCommand(newBlock, newTransactions) databaseSeeder.firstBlock(command) + .toFutureOr + .map(_ => FirstBlockCreated(newBlock)) + .toFuture } - def onNewBlock(latestBlock: Block): FutureApplicationResult[Unit] = { + def onNewBlock(latestBlock: Block): FutureApplicationResult[Result] = { logger.info(s"existing latest block = ${latestBlock.hash.string} -> new latest block = ${newBlock.hash.string}") val command = DatabaseSeeder.CreateBlockCommand(newBlock, newTransactions) - databaseSeeder.newLatestBlock(command) + databaseSeeder + .newLatestBlock(command) + .toFutureOr + .map(_ => NewBlockAppended(newBlock)) + .toFuture } - def onMissingBlock(): FutureApplicationResult[Unit] = { + def onMissingBlock(): FutureApplicationResult[Result] = { blockDataHandler .getBy(newBlock.hash) .flatMap { case Good(_) => logger.info(s"The block ${newBlock.hash.string} is not missing but duplicated, ignoring") - Future.successful { Good(()) } + Future.successful { Good(ExistingBlockIgnored(newBlock)) } case Bad(One(BlockNotFoundError)) => val command = DatabaseSeeder.CreateBlockCommand(newBlock, newTransactions) - databaseSeeder.insertPendingBlock(command) + databaseSeeder + .insertPendingBlock(command) + .toFutureOr + .map(_ => MissingBlockProcessed(newBlock)) + .toFuture case Bad(errors) => Future.successful(Bad(errors)) @@ -131,12 +144,12 @@ class BlockEventsProcessor @Inject() ( } .toFutureOr - _ <- latestBlockMaybe + r <- latestBlockMaybe .map { latestBlock => if (newBlock.hash == latestBlock.hash) { // duplicated msg logger.info(s"ignoring duplicated latest block = ${newBlock.hash.string}") - Future.successful(Good(())) + Future.successful(Good(ExistingBlockIgnored(newBlock))) } else if (newBlock.previousBlockhash.contains(latestBlock.hash)) { // latest block -> new block onNewBlock(latestBlock) @@ -151,8 +164,18 @@ class BlockEventsProcessor @Inject() ( } .getOrElse(onFirstBlock) .toFutureOr - } yield () + } yield r result.toFuture } } + +object BlockEventsProcessor { + + sealed trait Result + case class FirstBlockCreated(block: Block) extends Result + case class MissingBlockProcessed(block: Block) extends Result + case class ExistingBlockIgnored(block: Block) extends Result + case class NewBlockAppended(block: Block) extends Result + case class RechainDone(orphanBlock: Block, newBlock: Block) extends Result +} \ No newline at end of file diff --git a/server/app/com/xsn/explorer/tasks/SQSSeederTask.scala b/server/app/com/xsn/explorer/tasks/SQSSeederTask.scala index 7109511..40c0621 100644 --- a/server/app/com/xsn/explorer/tasks/SQSSeederTask.scala +++ b/server/app/com/xsn/explorer/tasks/SQSSeederTask.scala @@ -66,7 +66,11 @@ class SQSSeederTask @Inject() ( case Good(_) => logger.info(s"Block processed successfully = ${blockhash.string}") sqs.deleteMessageAsync(config.queueUrl, message.getReceiptHandle) - blockSynchronizerTask.sync() + } + + result.foreach { + case Good(eventResult) => onBlockResult(eventResult) + case _ => () } } @@ -80,4 +84,18 @@ class SQSSeederTask @Inject() ( } .foreach(onBlockhash) } + + private def onBlockResult(eventResult: BlockEventsProcessor.Result) = eventResult match { + case BlockEventsProcessor.FirstBlockCreated(block) => + blockSynchronizerTask.sync() + + case BlockEventsProcessor.NewBlockAppended(block) => + blockSynchronizerTask.sync() + + case BlockEventsProcessor.RechainDone(orphanBlock, newBlock) => + blockSynchronizerTask.sync() + + case BlockEventsProcessor.MissingBlockProcessed(block) => () + case BlockEventsProcessor.ExistingBlockIgnored(block) => () + } }