diff --git a/server/app/com/xsn/explorer/data/DatabaseSeeder.scala b/server/app/com/xsn/explorer/data/DatabaseSeeder.scala index df0f39e..8d15406 100644 --- a/server/app/com/xsn/explorer/data/DatabaseSeeder.scala +++ b/server/app/com/xsn/explorer/data/DatabaseSeeder.scala @@ -11,27 +11,15 @@ trait DatabaseSeeder[F[_]] { import DatabaseSeeder._ /** - * There are no blocks, we are adding the first one which could possibly - * not be the genesis block but the first one on our database. + * The database has some blocks, we are adding a new a block. */ - def firstBlock(command: CreateBlockCommand): F[Unit] - - /** - * The database has some blocks, we are appending a block to our latest block. - */ - def newLatestBlock(command: CreateBlockCommand): F[Unit] + def newBlock(command: CreateBlockCommand): F[Unit] /** * The database has some blocks but there is a rechain happening, we need to * replace our current latest block with the new latest block. */ def replaceBlock(command: ReplaceBlockCommand): F[Unit] - - /** - * The database has some blocks but the chain is not complete, we are inserting - * a previous block that's missing in our chain. - */ - def insertPendingBlock(command: CreateBlockCommand): F[Unit] } object DatabaseSeeder { diff --git a/server/app/com/xsn/explorer/data/anorm/DatabasePostgresSeeder.scala b/server/app/com/xsn/explorer/data/anorm/DatabasePostgresSeeder.scala index 67774e5..91e1902 100644 --- a/server/app/com/xsn/explorer/data/anorm/DatabasePostgresSeeder.scala +++ b/server/app/com/xsn/explorer/data/anorm/DatabasePostgresSeeder.scala @@ -24,32 +24,24 @@ class DatabasePostgresSeeder @Inject() ( private val logger = LoggerFactory.getLogger(this.getClass) - override def firstBlock(command: CreateBlockCommand): ApplicationResult[Unit] = withTransaction { implicit conn => - val result = upsertBlockCascade(command) - - result - .map(_ => Good(())) - .getOrElse(throw new RuntimeException("Unable to add the first block")) - } - - - override def newLatestBlock(command: CreateBlockCommand): ApplicationResult[Unit] = withTransaction { implicit conn => - val result = for { - // link previous block - previousBlockhash <- command.block.previousBlockhash - _ <- blockPostgresDAO + override def newBlock(command: CreateBlockCommand): ApplicationResult[Unit] = withTransaction { implicit conn => + // link previous block (if possible) + command.block.previousBlockhash.foreach { previousBlockhash => + blockPostgresDAO .setNextBlockhash(previousBlockhash, command.block.hash) .orElse { logger.warn(s"Failed to link previous block = ${previousBlockhash.string} to ${command.block.hash.string} because it wasn't found") None } + } + val result = for { _ <- upsertBlockCascade(command) } yield () result .map(Good(_)) - .getOrElse(throw new RuntimeException("Unable to add the new latest block")) + .getOrElse(throw new RuntimeException("Unable to add the new block")) } override def replaceBlock(command: ReplaceBlockCommand): ApplicationResult[Unit] = withTransaction { implicit conn => @@ -65,16 +57,6 @@ class DatabasePostgresSeeder @Inject() ( .getOrElse(throw new RuntimeException("Unable to replace latest block")) } - override def insertPendingBlock(command: CreateBlockCommand): ApplicationResult[Unit] = withTransaction { implicit conn => - val result = for { - _ <- upsertBlockCascade(command) - } yield () - - result - .map(Good(_)) - .getOrElse(throw new RuntimeException("Unable to an old block")) - } - private def upsertBlockCascade(command: CreateBlockCommand)(implicit conn: Connection): Option[Unit] = { for { // block diff --git a/server/app/com/xsn/explorer/data/async/DatabaseFutureSeeder.scala b/server/app/com/xsn/explorer/data/async/DatabaseFutureSeeder.scala index a303620..737b467 100644 --- a/server/app/com/xsn/explorer/data/async/DatabaseFutureSeeder.scala +++ b/server/app/com/xsn/explorer/data/async/DatabaseFutureSeeder.scala @@ -13,19 +13,11 @@ class DatabaseFutureSeeder @Inject() ( implicit ec: DatabaseExecutionContext) extends DatabaseSeeder[FutureApplicationResult] { - override def firstBlock(command: DatabaseSeeder.CreateBlockCommand): FutureApplicationResult[Unit] = Future { - blockingSeeder.firstBlock(command) - } - - override def newLatestBlock(command: DatabaseSeeder.CreateBlockCommand): FutureApplicationResult[Unit] = Future { - blockingSeeder.newLatestBlock(command) + override def newBlock(command: DatabaseSeeder.CreateBlockCommand): FutureApplicationResult[Unit] = Future { + blockingSeeder.newBlock(command) } override def replaceBlock(command: DatabaseSeeder.ReplaceBlockCommand): FutureApplicationResult[Unit] = Future { blockingSeeder.replaceBlock(command) } - - override def insertPendingBlock(command: DatabaseSeeder.CreateBlockCommand): FutureApplicationResult[Unit] = Future { - blockingSeeder.insertPendingBlock(command) - } } diff --git a/server/app/com/xsn/explorer/processors/BlockEventsProcessor.scala b/server/app/com/xsn/explorer/processors/BlockEventsProcessor.scala index 5c49640..40ebea6 100644 --- a/server/app/com/xsn/explorer/processors/BlockEventsProcessor.scala +++ b/server/app/com/xsn/explorer/processors/BlockEventsProcessor.scala @@ -30,26 +30,34 @@ class BlockEventsProcessor @Inject() ( private val logger = LoggerFactory.getLogger(this.getClass) /** - * There is a new latest block in the blockchain, we need to sync our database. + * There is a possible new block in the blockchain, we need to sync our database. * - * The following scenarios are handled for the new latest block: + * The following scenarios are handled for the new block: * - * 1. It is new on our database, we just append it (possibly first block). - * - current blocks = A -> B, new latest block = C, new blocks = A -> B -> C - * - current blocks = empty, new latest block = A, new blocks = A + * 0. The block is already present, we ignore it. + * + * x. The block can not be fully retrieved from the rpc server, we ignore it. + * + * x. The first block in the database (not exactly the first block in the chain), we just add it. + * - Example: current blocks = empty, new block = A, new blocks = A + * + * x. A new block on our database having a link to our latest block in the database: + * - The new block is added. + * - The next block from our latest block is set to the next block. + * - Example: current blocks = A -> B, new block = C, new blocks = A -> B -> C * * 2. It is the previous block from our latest block (rechain). * - current blocks = A -> B -> C, new latest block = B, new blocks = A -> B * * 3. None of previous cases, it is a block that might be missing in our chain. * - * @param blockhash the new latest block + * @param blockhash the blockhash to process */ - def newLatestBlock(blockhash: Blockhash): FutureApplicationResult[Result] = { + def processBlock(blockhash: Blockhash): FutureApplicationResult[Result] = { val result = for { block <- xsnService.getBlock(blockhash).toFutureOr transactions <- block.transactions.map(transactionService.getTransaction).toFutureOr - r <- newLatestBlock(block, transactions).toFutureOr + r <- processBlock(block, transactions).toFutureOr } yield r result.toFuture.map { @@ -60,7 +68,7 @@ class BlockEventsProcessor @Inject() ( } } - private def newLatestBlock(newBlock: Block, newTransactions: List[Transaction]): FutureApplicationResult[Result] = { + private def processBlock(newBlock: Block, newTransactions: List[Transaction]): FutureApplicationResult[Result] = { def onRechain(orphanBlock: Block): FutureApplicationResult[Result] = { val command = DatabaseSeeder.ReplaceBlockCommand( orphanBlock = orphanBlock, @@ -78,7 +86,7 @@ class BlockEventsProcessor @Inject() ( logger.info(s"first block = ${newBlock.hash.string}") val command = DatabaseSeeder.CreateBlockCommand(newBlock, newTransactions) - databaseSeeder.firstBlock(command) + databaseSeeder.newBlock(command) .toFutureOr .map(_ => FirstBlockCreated(newBlock)) .toFuture @@ -89,7 +97,7 @@ class BlockEventsProcessor @Inject() ( val command = DatabaseSeeder.CreateBlockCommand(newBlock, newTransactions) databaseSeeder - .newLatestBlock(command) + .newBlock(command) .toFutureOr .map(_ => NewBlockAppended(newBlock)) .toFuture @@ -121,7 +129,7 @@ class BlockEventsProcessor @Inject() ( case Bad(One(BlockNotFoundError)) => val createCommand = DatabaseSeeder.CreateBlockCommand(newBlock, newTransactions) databaseSeeder - .insertPendingBlock(createCommand) + .newBlock(createCommand) .flatMap { case Good(_) => Future.successful { Good(MissingBlockProcessed(newBlock)) } case Bad(One(PostgresForeignKeyViolationError("height", _))) => onRepeatedBlockHeight() diff --git a/server/app/com/xsn/explorer/tasks/BackwardsSynchronizerTask.scala b/server/app/com/xsn/explorer/tasks/BackwardsSynchronizerTask.scala index 871b7a9..e194f31 100644 --- a/server/app/com/xsn/explorer/tasks/BackwardsSynchronizerTask.scala +++ b/server/app/com/xsn/explorer/tasks/BackwardsSynchronizerTask.scala @@ -117,7 +117,7 @@ class BackwardsSynchronizerTask @Inject() ( transactions <- block.transactions.map(transactionService.getTransaction).toFutureOr command = DatabaseSeeder.CreateBlockCommand(block, transactions) - _ <- databaseSeeder.insertPendingBlock(command).toFutureOr + _ <- databaseSeeder.newBlock(command).toFutureOr _ = logger.debug(s"Block ${block.height.int} saved") _ <- block diff --git a/server/app/com/xsn/explorer/tasks/FirstBlockSynchronizerTask.scala b/server/app/com/xsn/explorer/tasks/FirstBlockSynchronizerTask.scala index d933b01..232b4b4 100644 --- a/server/app/com/xsn/explorer/tasks/FirstBlockSynchronizerTask.scala +++ b/server/app/com/xsn/explorer/tasks/FirstBlockSynchronizerTask.scala @@ -103,7 +103,7 @@ class FirstBlockSynchronizerTask @Inject() ( transactions <- block.transactions.map(transactionService.getTransaction).toFutureOr command = DatabaseSeeder.CreateBlockCommand(block, transactions) - _ <- databaseSeeder.insertPendingBlock(command).toFutureOr + _ <- databaseSeeder.newBlock(command).toFutureOr _ = logger.debug(s"Block ${block.height.int} saved") _ <- block diff --git a/server/app/com/xsn/explorer/tasks/SQSSeederTask.scala b/server/app/com/xsn/explorer/tasks/SQSSeederTask.scala index 01004f1..bd20a08 100644 --- a/server/app/com/xsn/explorer/tasks/SQSSeederTask.scala +++ b/server/app/com/xsn/explorer/tasks/SQSSeederTask.scala @@ -58,7 +58,7 @@ class SQSSeederTask @Inject() ( private def handleMessage(message: Message): Unit = { def onBlockhash(blockhash: Blockhash) = { - val result = blockEventsProcessor.newLatestBlock(blockhash) + val result = blockEventsProcessor.processBlock(blockhash) result.recover { case NonFatal(ex) => diff --git a/server/test/com/xsn/explorer/processors/BlockEventsProcessorSpec.scala b/server/test/com/xsn/explorer/processors/BlockEventsProcessorSpec.scala index d1c77dd..dea77ed 100644 --- a/server/test/com/xsn/explorer/processors/BlockEventsProcessorSpec.scala +++ b/server/test/com/xsn/explorer/processors/BlockEventsProcessorSpec.scala @@ -45,7 +45,7 @@ class BlockEventsProcessorSpec extends PostgresDataHandlerSpec with ScalaFutures // see https://github.com/X9Developers/XSN/issues/32 val block0 = BlockLoader.get("00000c822abdbb23e28f79a49d29b41429737c6c7e15df40d1b1f1b35907ae34") - whenReady(processor.newLatestBlock(block0.hash)) { result => + whenReady(processor.processBlock(block0.hash)) { result => result mustEqual Good(MissingBlockIgnored) } } @@ -53,7 +53,7 @@ class BlockEventsProcessorSpec extends PostgresDataHandlerSpec with ScalaFutures "process first block" in { val block1 = BlockLoader.get("000003fb382f6892ae96594b81aa916a8923c70701de4e7054aac556c7271ef7") - whenReady(processor.newLatestBlock(block1.hash)) { result => + whenReady(processor.processBlock(block1.hash)) { result => result.isGood mustEqual true } } @@ -65,7 +65,7 @@ class BlockEventsProcessorSpec extends PostgresDataHandlerSpec with ScalaFutures List(block1, block2).map(dataHandler.insert).foreach(_.isGood mustEqual true) - whenReady(processor.newLatestBlock(block3.hash)) { result => + whenReady(processor.processBlock(block3.hash)) { result => result mustEqual Good(NewBlockAppended(block3)) val blocks = List(block1, block2, block3) verifyBlockchain(blocks) @@ -79,7 +79,7 @@ class BlockEventsProcessorSpec extends PostgresDataHandlerSpec with ScalaFutures List(block1, block2, block3).map(dataHandler.insert).foreach(_.isGood mustEqual true) - whenReady(processor.newLatestBlock(block2.hash)) { + whenReady(processor.processBlock(block2.hash)) { case Good(RechainDone(orphanBlock, newBlock)) => orphanBlock.hash mustEqual block3.hash newBlock.hash mustEqual block2.hash @@ -98,7 +98,7 @@ class BlockEventsProcessorSpec extends PostgresDataHandlerSpec with ScalaFutures List(block2, block3).map(dataHandler.insert).foreach(_.isGood mustEqual true) - whenReady(processor.newLatestBlock(block1.hash)) { result => + whenReady(processor.processBlock(block1.hash)) { result => result.isGood mustEqual true val blocks = List(block1, block2, block3) verifyBlockchain(blocks) @@ -112,7 +112,7 @@ class BlockEventsProcessorSpec extends PostgresDataHandlerSpec with ScalaFutures List(block1, block2, block3).map(dataHandler.insert).foreach(_.isGood mustEqual true) - whenReady(processor.newLatestBlock(block1.hash)) { result => + whenReady(processor.processBlock(block1.hash)) { result => result.isGood mustEqual true val blocks = List(block1, block2, block3) verifyBlockchain(blocks) @@ -127,7 +127,7 @@ class BlockEventsProcessorSpec extends PostgresDataHandlerSpec with ScalaFutures "process a block without spent index on transactions" in { val block = BlockLoader.get("000001ff95f22b8d82db14a5c5e9f725e8239e548be43c668766e7ddaee81924") - whenReady(processor.newLatestBlock(block.hash)) { result => + whenReady(processor.processBlock(block.hash)) { result => result.isGood mustEqual true val balanceDataHandler = new BalancePostgresDataHandler(database, new BalancePostgresDAO(new FieldOrderingSQLInterpreter)) @@ -149,10 +149,10 @@ class BlockEventsProcessorSpec extends PostgresDataHandlerSpec with ScalaFutures List(block1, block2) .map(_.hash) - .map(processor.newLatestBlock) + .map(processor.processBlock) .foreach { whenReady(_) { _.isGood mustEqual true } } - whenReady(processor.newLatestBlock(block1.hash)) { result => + whenReady(processor.processBlock(block1.hash)) { result => result.isGood mustEqual true val blocks = List(block1) verifyBlockchain(blocks) @@ -199,14 +199,14 @@ class BlockEventsProcessorSpec extends PostgresDataHandlerSpec with ScalaFutures List(block1, block2) .map(_.hash) - .map(processor.newLatestBlock) + .map(processor.processBlock) .foreach { whenReady(_) { _.isGood mustEqual true } } /** * When processing the latest block, a rechain event has occurred in the rpc server which leads to * a case that we can't retrieve the block information, the block should be ignored. */ - whenReady(processor.newLatestBlock(block3.hash)) { result => + whenReady(processor.processBlock(block3.hash)) { result => result mustEqual Good(MissingBlockIgnored) val blocks = List(block1, block2) verifyBlockchain(blocks) @@ -238,14 +238,14 @@ class BlockEventsProcessorSpec extends PostgresDataHandlerSpec with ScalaFutures List(block1, block2) .map(_.hash) - .map(processor.newLatestBlock) + .map(processor.processBlock) .foreach { whenReady(_) { _.isGood mustEqual true } } /** * When processing the latest block, a rechain event has occurred in the rpc server which leads to * a case that we can't retrieve a specific transaction, the block should be ignored. */ - whenReady(processor.newLatestBlock(block3.hash)) { result => + whenReady(processor.processBlock(block3.hash)) { result => result mustEqual Good(MissingBlockIgnored) val blocks = List(block1, block2) verifyBlockchain(blocks) @@ -278,10 +278,10 @@ class BlockEventsProcessorSpec extends PostgresDataHandlerSpec with ScalaFutures List(block1, block2) .map(_.hash) - .map(processor.newLatestBlock) + .map(processor.processBlock) .foreach { whenReady(_) { _.isGood mustEqual true } } - whenReady(processor.newLatestBlock(block3.hash)) { result => + whenReady(processor.processBlock(block3.hash)) { result => result mustEqual Good(ReplacedByBlockHeight) val blocks = List(block1, block3) verifyBlockchain(blocks)