From b4120e284b992d8bef79e6873b70f0606138647d Mon Sep 17 00:00:00 2001 From: Alexis Hernandez Date: Sun, 29 Apr 2018 20:16:33 -0500 Subject: [PATCH] server: Update the BlockEventsProcessor flow When a block is processed, in case it can't be fully retrieved from the rpc server, the block is ignored. When a block is processed, if there is an existing block having the same height, it will be replaced by the new one. This is a part for fixing the bug #6 --- .../xsn/explorer/data/DatabaseSeeder.scala | 2 +- .../data/anorm/DatabasePostgresSeeder.scala | 2 +- .../data/async/DatabaseFutureSeeder.scala | 4 +- .../processors/BlockEventsProcessor.scala | 39 ++++-- .../data/BlockPostgresDataHandlerSpec.scala | 21 ++- .../processors/BlockEventsProcessorSpec.scala | 128 +++++++++++++++++- 6 files changed, 180 insertions(+), 16 deletions(-) diff --git a/server/app/com/xsn/explorer/data/DatabaseSeeder.scala b/server/app/com/xsn/explorer/data/DatabaseSeeder.scala index 26415b2..df0f39e 100644 --- a/server/app/com/xsn/explorer/data/DatabaseSeeder.scala +++ b/server/app/com/xsn/explorer/data/DatabaseSeeder.scala @@ -25,7 +25,7 @@ trait DatabaseSeeder[F[_]] { * The database has some blocks but there is a rechain happening, we need to * replace our current latest block with the new latest block. */ - def replaceLatestBlock(command: ReplaceBlockCommand): F[Unit] + def replaceBlock(command: ReplaceBlockCommand): F[Unit] /** * The database has some blocks but the chain is not complete, we are inserting diff --git a/server/app/com/xsn/explorer/data/anorm/DatabasePostgresSeeder.scala b/server/app/com/xsn/explorer/data/anorm/DatabasePostgresSeeder.scala index ec521c9..328d196 100644 --- a/server/app/com/xsn/explorer/data/anorm/DatabasePostgresSeeder.scala +++ b/server/app/com/xsn/explorer/data/anorm/DatabasePostgresSeeder.scala @@ -52,7 +52,7 @@ class DatabasePostgresSeeder @Inject() ( .getOrElse(throw new RuntimeException("Unable to add the new latest block")) } - override def replaceLatestBlock(command: ReplaceBlockCommand): ApplicationResult[Unit] = withTransaction { implicit conn => + override def replaceBlock(command: ReplaceBlockCommand): ApplicationResult[Unit] = withTransaction { implicit conn => val createCommand = CreateBlockCommand(command.newBlock, command.newTransactions) val result = for { diff --git a/server/app/com/xsn/explorer/data/async/DatabaseFutureSeeder.scala b/server/app/com/xsn/explorer/data/async/DatabaseFutureSeeder.scala index fd80e13..a303620 100644 --- a/server/app/com/xsn/explorer/data/async/DatabaseFutureSeeder.scala +++ b/server/app/com/xsn/explorer/data/async/DatabaseFutureSeeder.scala @@ -21,8 +21,8 @@ class DatabaseFutureSeeder @Inject() ( blockingSeeder.newLatestBlock(command) } - override def replaceLatestBlock(command: DatabaseSeeder.ReplaceBlockCommand): FutureApplicationResult[Unit] = Future { - blockingSeeder.replaceLatestBlock(command) + override def replaceBlock(command: DatabaseSeeder.ReplaceBlockCommand): FutureApplicationResult[Unit] = Future { + blockingSeeder.replaceBlock(command) } override def insertPendingBlock(command: DatabaseSeeder.CreateBlockCommand): FutureApplicationResult[Unit] = Future { diff --git a/server/app/com/xsn/explorer/processors/BlockEventsProcessor.scala b/server/app/com/xsn/explorer/processors/BlockEventsProcessor.scala index bd6c436..07dec7d 100644 --- a/server/app/com/xsn/explorer/processors/BlockEventsProcessor.scala +++ b/server/app/com/xsn/explorer/processors/BlockEventsProcessor.scala @@ -6,7 +6,7 @@ import com.alexitc.playsonify.core.FutureApplicationResult import com.alexitc.playsonify.core.FutureOr.Implicits.{FutureListOps, FutureOps} import com.xsn.explorer.data.DatabaseSeeder import com.xsn.explorer.data.async.{BlockFutureDataHandler, DatabaseFutureSeeder} -import com.xsn.explorer.errors.BlockNotFoundError +import com.xsn.explorer.errors._ import com.xsn.explorer.models.rpc.Block import com.xsn.explorer.models.{Blockhash, Transaction} import com.xsn.explorer.services.{TransactionService, XSNService} @@ -52,7 +52,12 @@ class BlockEventsProcessor @Inject() ( r <- newLatestBlock(block, transactions).toFutureOr } yield r - result.toFuture + result.toFuture.map { + case Good(r) => Good(r) + case Bad(One(BlockNotFoundError)) => Good(MissingBlockIgnored) + case Bad(One(TransactionNotFoundError)) => Good(MissingBlockIgnored) + case Bad(errors) => Bad(errors) + } } private def newLatestBlock(newBlock: Block, newTransactions: List[Transaction]): FutureApplicationResult[Result] = { @@ -63,7 +68,7 @@ class BlockEventsProcessor @Inject() ( newTransactions = newTransactions) val result = for { - _ <- databaseSeeder.replaceLatestBlock(command).toFutureOr + _ <- databaseSeeder.replaceBlock(command).toFutureOr } yield RechainDone(orphanBlock, newBlock) result.toFuture @@ -90,6 +95,21 @@ class BlockEventsProcessor @Inject() ( .toFuture } + def onRepeatedBlockHeight(): FutureApplicationResult[Result] = { + val result = for { + orphanBlock <- blockDataHandler.getBy(newBlock.height).toFutureOr + + replaceCommand = DatabaseSeeder.ReplaceBlockCommand( + orphanBlock = orphanBlock, + newBlock = newBlock, + newTransactions = newTransactions) + + _ <- databaseSeeder.replaceBlock(replaceCommand).toFutureOr + } yield ReplacedByBlockHeight + + result.toFuture + } + def onMissingBlock(): FutureApplicationResult[Result] = { blockDataHandler .getBy(newBlock.hash) @@ -99,12 +119,13 @@ class BlockEventsProcessor @Inject() ( Future.successful { Good(ExistingBlockIgnored(newBlock)) } case Bad(One(BlockNotFoundError)) => - val command = DatabaseSeeder.CreateBlockCommand(newBlock, newTransactions) + val createCommand = DatabaseSeeder.CreateBlockCommand(newBlock, newTransactions) databaseSeeder - .insertPendingBlock(command) - .toFutureOr - .map(_ => MissingBlockProcessed(newBlock)) - .toFuture + .insertPendingBlock(createCommand) + .flatMap { + case Good(_) => Future.successful { Good(MissingBlockProcessed(newBlock)) } + case Bad(One(PostgresForeignKeyViolationError("height", _))) => onRepeatedBlockHeight() + } case Bad(errors) => Future.successful(Bad(errors)) @@ -155,4 +176,6 @@ object BlockEventsProcessor { case class ExistingBlockIgnored(block: Block) extends Result case class NewBlockAppended(block: Block) extends Result case class RechainDone(orphanBlock: Block, newBlock: Block) extends Result + case object MissingBlockIgnored extends Result + case object ReplacedByBlockHeight extends Result } \ No newline at end of file diff --git a/server/test/com/xsn/explorer/data/BlockPostgresDataHandlerSpec.scala b/server/test/com/xsn/explorer/data/BlockPostgresDataHandlerSpec.scala index 344aa26..ab59252 100644 --- a/server/test/com/xsn/explorer/data/BlockPostgresDataHandlerSpec.scala +++ b/server/test/com/xsn/explorer/data/BlockPostgresDataHandlerSpec.scala @@ -49,7 +49,7 @@ class BlockPostgresDataHandlerSpec extends PostgresDataHandlerSpec with BeforeAn } } - "getBy" should { + "getBy blockhash" should { "return a block" in { val block = BlockLoader.get("1ca318b7a26ed67ca7c8c9b5069d653ba224bf86989125d1dfbb0973b7d6a5e0") @@ -68,6 +68,25 @@ class BlockPostgresDataHandlerSpec extends PostgresDataHandlerSpec with BeforeAn } } + "getBy height" should { + "return a block" in { + val block = BlockLoader.get("1ca318b7a26ed67ca7c8c9b5069d653ba224bf86989125d1dfbb0973b7d6a5e0") + + dataHandler.insert(block) + + val result = dataHandler.getBy(block.height) + result.isGood mustEqual true + matches(block, result.get) + } + + "fail on block not found" in { + val block = BlockLoader.get("1ca318b7a26ed67ca7c8c9b5069d653ba224bf86989125d1dfbb0973b7d6a5e0") + + val result = dataHandler.getBy(block.height) + result mustEqual Bad(BlockNotFoundError).accumulating + } + } + "delete" should { "delete a block" in { val block = BlockLoader.get("1ca318b7a26ed67ca7c8c9b5069d653ba224bf86989125d1dfbb0973b7d6a5e0") diff --git a/server/test/com/xsn/explorer/processors/BlockEventsProcessorSpec.scala b/server/test/com/xsn/explorer/processors/BlockEventsProcessorSpec.scala index c4aa375..42673e8 100644 --- a/server/test/com/xsn/explorer/processors/BlockEventsProcessorSpec.scala +++ b/server/test/com/xsn/explorer/processors/BlockEventsProcessorSpec.scala @@ -1,20 +1,25 @@ package com.xsn.explorer.processors +import com.alexitc.playsonify.core.FutureApplicationResult import com.alexitc.playsonify.models._ import com.xsn.explorer.data.anorm.dao.{BalancePostgresDAO, BlockPostgresDAO, StatisticsPostgresDAO, TransactionPostgresDAO} import com.xsn.explorer.data.anorm.interpreters.FieldOrderingSQLInterpreter import com.xsn.explorer.data.anorm.{BalancePostgresDataHandler, BlockPostgresDataHandler, DatabasePostgresSeeder, StatisticsPostgresDataHandler} import com.xsn.explorer.data.async.{BlockFutureDataHandler, DatabaseFutureSeeder} import com.xsn.explorer.data.common.PostgresDataHandlerSpec +import com.xsn.explorer.errors.{BlockNotFoundError, TransactionNotFoundError} import com.xsn.explorer.helpers.{BlockLoader, Executors, FileBasedXSNService} import com.xsn.explorer.models.fields.BalanceField -import com.xsn.explorer.models.rpc.Block -import com.xsn.explorer.processors.BlockEventsProcessor.{NewBlockAppended, RechainDone} +import com.xsn.explorer.models.rpc.{Block, Transaction} +import com.xsn.explorer.models.{Blockhash, TransactionId} +import com.xsn.explorer.processors.BlockEventsProcessor.{MissingBlockIgnored, NewBlockAppended, RechainDone, ReplacedByBlockHeight} import com.xsn.explorer.services.TransactionService -import org.scalactic.Good +import org.scalactic.{Bad, Good} import org.scalatest.BeforeAndAfter import org.scalatest.concurrent.ScalaFutures +import scala.concurrent.Future + class BlockEventsProcessorSpec extends PostgresDataHandlerSpec with ScalaFutures with BeforeAndAfter { lazy val dataHandler = new BlockPostgresDataHandler(database, new BlockPostgresDAO) @@ -167,6 +172,123 @@ class BlockEventsProcessorSpec extends PostgresDataHandlerSpec with ScalaFutures balance.spent mustEqual BigDecimal(0) } } + + "ignore orphan block on rare rechain events when the rpc server doesn't have the block anymore" in { + // see https://github.com/X9Developers/block-explorer/issues/6 + val block1 = BlockLoader.get("000003fb382f6892ae96594b81aa916a8923c70701de4e7054aac556c7271ef7") + val block2 = BlockLoader.get("000004645e2717b556682e3c642a4c6e473bf25c653ff8e8c114a3006040ffb8") + val block3 = BlockLoader.get("00000766115b26ecbc09cd3a3db6870fdaf2f049d65a910eb2f2b48b566ca7bd") + + val xsnService = new FileBasedXSNService { + override def getBlock(blockhash: Blockhash): FutureApplicationResult[Block] = { + if (blockhash == block3.hash) { + Future.successful(Bad(BlockNotFoundError).accumulating) + } else { + super.getBlock(blockhash) + } + } + } + + val processor = new BlockEventsProcessor( + xsnService, + new TransactionService(xsnService)(Executors.globalEC), + new DatabaseFutureSeeder(dataSeeder)(Executors.databaseEC), + new BlockFutureDataHandler(dataHandler)(Executors.databaseEC)) + + + List(block1, block2) + .map(_.hash) + .map(processor.newLatestBlock) + .foreach { whenReady(_) { _.isGood mustEqual true } } + + /** + * When processing the latest block, a rechain event has occurred in the rpc server which leads to + * a case that we can't retrieve the block information, the block should be ignored. + */ + whenReady(processor.newLatestBlock(block3.hash)) { result => + result mustEqual Good(MissingBlockIgnored) + val blocks = List(block1, block2) + verifyBlockchain(blocks) + } + } + + "ignore orphan block on rare rechain events when the rpc server doesn't have the a transaction from the block anymore" in { + // see https://github.com/X9Developers/block-explorer/issues/6 + val block1 = BlockLoader.get("000003fb382f6892ae96594b81aa916a8923c70701de4e7054aac556c7271ef7") + val block2 = BlockLoader.get("000004645e2717b556682e3c642a4c6e473bf25c653ff8e8c114a3006040ffb8") + val block3 = BlockLoader.get("00000766115b26ecbc09cd3a3db6870fdaf2f049d65a910eb2f2b48b566ca7bd") + + val xsnService = new FileBasedXSNService { + override def getTransaction(txid: TransactionId): FutureApplicationResult[Transaction] = { + if (txid == block3.transactions.last) { + Future.successful(Bad(TransactionNotFoundError).accumulating) + } else { + super.getTransaction(txid) + } + } + } + + val processor = new BlockEventsProcessor( + xsnService, + new TransactionService(xsnService)(Executors.globalEC), + new DatabaseFutureSeeder(dataSeeder)(Executors.databaseEC), + new BlockFutureDataHandler(dataHandler)(Executors.databaseEC)) + + + List(block1, block2) + .map(_.hash) + .map(processor.newLatestBlock) + .foreach { whenReady(_) { _.isGood mustEqual true } } + + /** + * When processing the latest block, a rechain event has occurred in the rpc server which leads to + * a case that we can't retrieve a specific transaction, the block should be ignored. + */ + whenReady(processor.newLatestBlock(block3.hash)) { result => + result mustEqual Good(MissingBlockIgnored) + val blocks = List(block1, block2) + verifyBlockchain(blocks) + } + } + + "remove orphan block on rare rechain events when the block height already exists" in { + // see https://github.com/X9Developers/block-explorer/issues/6 + val block1 = BlockLoader.get("000003fb382f6892ae96594b81aa916a8923c70701de4e7054aac556c7271ef7") + val block2 = BlockLoader.get("000004645e2717b556682e3c642a4c6e473bf25c653ff8e8c114a3006040ffb8") + val block3 = BlockLoader.get("00000766115b26ecbc09cd3a3db6870fdaf2f049d65a910eb2f2b48b566ca7bd") + .copy(height = block2.height) + .copy(previousBlockhash = block2.previousBlockhash) + + val xsnService = new FileBasedXSNService { + override def getBlock(blockhash: Blockhash): FutureApplicationResult[Block] = { + if (blockhash == block3.hash) { + Future.successful(Good(block3)) + } else { + super.getBlock(blockhash) + } + } + } + + val processor = new BlockEventsProcessor( + xsnService, + new TransactionService(xsnService)(Executors.globalEC), + new DatabaseFutureSeeder(dataSeeder)(Executors.databaseEC), + new BlockFutureDataHandler(dataHandler)(Executors.databaseEC)) + + List(block1, block2) + .map(_.hash) + .map(processor.newLatestBlock) + .foreach { whenReady(_) { _.isGood mustEqual true } } + + whenReady(processor.newLatestBlock(block3.hash)) { result => + result mustEqual Good(ReplacedByBlockHeight) + val blocks = List(block1, block3) + verifyBlockchain(blocks) + + // ensure block2 has been removed + dataHandler.getBy(block2.hash) mustEqual Bad(BlockNotFoundError).accumulating + } + } } private def verifyBlockchain(blocks: List[Block]) = {