From c921d73415c42b45c9b8cc004f09e02cf7b90092 Mon Sep 17 00:00:00 2001 From: Alexis Hernandez Date: Fri, 22 Feb 2019 18:48:10 -0700 Subject: [PATCH] server: Update classes related to the ledger sync process to use the Block.HasTransactions model --- .../xsn/explorer/data/LedgerDataHandler.scala | 4 +- .../anorm/LedgerPostgresDataHandler.scala | 15 +++---- .../data/async/LedgerFutureDataHandler.scala | 6 +-- .../services/LedgerSynchronizerService.scala | 43 +++++++++---------- .../data/LedgerPostgresDataHandlerSpec.scala | 14 +++--- .../TransactionPostgresDataHandlerSpec.scala | 4 +- 6 files changed, 41 insertions(+), 45 deletions(-) diff --git a/server/app/com/xsn/explorer/data/LedgerDataHandler.scala b/server/app/com/xsn/explorer/data/LedgerDataHandler.scala index c7ef253..91ce18e 100644 --- a/server/app/com/xsn/explorer/data/LedgerDataHandler.scala +++ b/server/app/com/xsn/explorer/data/LedgerDataHandler.scala @@ -1,7 +1,7 @@ package com.xsn.explorer.data import com.alexitc.playsonify.core.ApplicationResult -import com.xsn.explorer.models.persisted.{Block, Transaction} +import com.xsn.explorer.models.persisted.Block import scala.language.higherKinds @@ -16,7 +16,7 @@ trait LedgerDataHandler[F[_]] { * - The ledger is empty and the block is the genesis one. * - The ledger has some blocks and the block goes just after the latest one. */ - def push(block: Block, transactions: List[Transaction.HasIO]): F[Unit] + def push(block: Block.HasTransactions): F[Unit] /** * Remove the latest block from the ledger, it will succeed only if the ledger is not empty. diff --git a/server/app/com/xsn/explorer/data/anorm/LedgerPostgresDataHandler.scala b/server/app/com/xsn/explorer/data/anorm/LedgerPostgresDataHandler.scala index adf624b..0586f7b 100644 --- a/server/app/com/xsn/explorer/data/anorm/LedgerPostgresDataHandler.scala +++ b/server/app/com/xsn/explorer/data/anorm/LedgerPostgresDataHandler.scala @@ -31,12 +31,11 @@ class LedgerPostgresDataHandler @Inject() ( * to have a next block, we remove the link because that block is not stored yet. */ override def push( - block: Block, - transactions: List[Transaction.HasIO]): ApplicationResult[Unit] = { + block: Block.HasTransactions): ApplicationResult[Unit] = { val result = withTransaction { implicit conn => val result = for { - _ <- upsertBlockCascade(block.copy(nextBlockhash = None), transactions) + _ <- upsertBlockCascade(block.asTip) } yield () result @@ -67,17 +66,17 @@ class LedgerPostgresDataHandler @Inject() ( .getOrElse(throw new RuntimeException("Unable to pop block")) } - private def upsertBlockCascade(block: Block, transactions: List[Transaction.HasIO])(implicit conn: Connection): Option[Unit] = { + private def upsertBlockCascade(block: Block.HasTransactions)(implicit conn: Connection): Option[Unit] = { val result = for { // block - _ <- deleteBlockCascade(block).orElse(Some(())) - _ <- blockPostgresDAO.insert(block) + _ <- deleteBlockCascade(block.block).orElse(Some(())) + _ <- blockPostgresDAO.insert(block.block) // batch insert - _ <- transactionPostgresDAO.insert(transactions) + _ <- transactionPostgresDAO.insert(block.transactions) // balances - balanceList = balances(transactions) + balanceList = balances(block.transactions) _ <- insertBalanceBatch(balanceList).toList.everything // compute aggregated amount diff --git a/server/app/com/xsn/explorer/data/async/LedgerFutureDataHandler.scala b/server/app/com/xsn/explorer/data/async/LedgerFutureDataHandler.scala index b371b20..98f7d84 100644 --- a/server/app/com/xsn/explorer/data/async/LedgerFutureDataHandler.scala +++ b/server/app/com/xsn/explorer/data/async/LedgerFutureDataHandler.scala @@ -3,7 +3,7 @@ package com.xsn.explorer.data.async import com.alexitc.playsonify.core.FutureApplicationResult import com.xsn.explorer.data.{LedgerBlockingDataHandler, LedgerDataHandler} import com.xsn.explorer.executors.DatabaseExecutionContext -import com.xsn.explorer.models.persisted.{Block, Transaction} +import com.xsn.explorer.models.persisted.Block import javax.inject.Inject import scala.concurrent.Future @@ -13,8 +13,8 @@ class LedgerFutureDataHandler @Inject() ( implicit ec: DatabaseExecutionContext) extends LedgerDataHandler[FutureApplicationResult] { - override def push(block: Block, transactions: List[Transaction.HasIO]): FutureApplicationResult[Unit] = Future { - blockingDataHandler.push(block, transactions) + override def push(block: Block.HasTransactions): FutureApplicationResult[Unit] = Future { + blockingDataHandler.push(block) } override def pop(): FutureApplicationResult[Block] = Future { diff --git a/server/app/com/xsn/explorer/services/LedgerSynchronizerService.scala b/server/app/com/xsn/explorer/services/LedgerSynchronizerService.scala index 6a463aa..86091ae 100644 --- a/server/app/com/xsn/explorer/services/LedgerSynchronizerService.scala +++ b/server/app/com/xsn/explorer/services/LedgerSynchronizerService.scala @@ -4,7 +4,7 @@ import com.alexitc.playsonify.core.FutureApplicationResult import com.alexitc.playsonify.core.FutureOr.Implicits.{FutureOps, OptionOps} import com.xsn.explorer.data.async.{BlockFutureDataHandler, LedgerFutureDataHandler} import com.xsn.explorer.errors.BlockNotFoundError -import com.xsn.explorer.models.persisted.{Block, Transaction} +import com.xsn.explorer.models.persisted.Block import com.xsn.explorer.models.transformers._ import com.xsn.explorer.models.values.{Blockhash, Height} import com.xsn.explorer.util.Extensions.FutureOrExt @@ -34,15 +34,14 @@ class LedgerSynchronizerService @Inject() ( */ def synchronize(blockhash: Blockhash): FutureApplicationResult[Unit] = { val result = for { - x <- getRPCBlock(blockhash).toFutureOr - (block, transactions) = x - _ <- synchronize(block, transactions).toFutureOr + block <- getRPCBlock(blockhash).toFutureOr + _ <- synchronize(block).toFutureOr } yield () result.toFuture } - private def synchronize(block: Block, transactions: List[Transaction.HasIO]): FutureApplicationResult[Unit] = { + private def synchronize(block: Block.HasTransactions): FutureApplicationResult[Unit] = { logger.info(s"Synchronize block ${block.height}, hash = ${block.hash}") val result = for { @@ -53,8 +52,8 @@ class LedgerSynchronizerService @Inject() ( .recoverFrom(BlockNotFoundError)(None) _ <- latestBlockMaybe - .map { latestBlock => onLatestBlock(latestBlock, block, transactions) } - .getOrElse { onEmptyLedger(block, transactions) } + .map { latestBlock => onLatestBlock(latestBlock, block) } + .getOrElse { onEmptyLedger(block) } .toFutureOr } yield () @@ -66,15 +65,15 @@ class LedgerSynchronizerService @Inject() ( * 1.1. the given block is the genensis block, it is added. * 1.2. the given block is not the genesis block, sync everything until the given block. */ - private def onEmptyLedger(block: Block, transactions: List[Transaction.HasIO]): FutureApplicationResult[Unit] = { + private def onEmptyLedger(block: Block.HasTransactions): FutureApplicationResult[Unit] = { if (block.height.int == 0) { logger.info(s"Synchronize genesis block on empty ledger, hash = ${block.hash}") - ledgerDataHandler.push(block, transactions) + ledgerDataHandler.push(block) } else { logger.info(s"Synchronize block ${block.height} on empty ledger, hash = ${block.hash}") val result = for { _ <- sync(0 until block.height.int).toFutureOr - _ <- synchronize(block, transactions).toFutureOr + _ <- synchronize(block).toFutureOr } yield () result.toFuture @@ -89,20 +88,19 @@ class LedgerSynchronizerService @Inject() ( * 2.4. if H <= N, if the hash already exists, it is ignored. * 2.5. if H <= N, if the hash doesn't exists, remove blocks from N to H (included), then, add the new H. */ - private def onLatestBlock(ledgerBlock: Block, newBlock: Block, newTransactions: List[Transaction.HasIO]): FutureApplicationResult[Unit] = { + private def onLatestBlock(ledgerBlock: Block, newBlock: Block.HasTransactions): FutureApplicationResult[Unit] = { if (ledgerBlock.height.int + 1 == newBlock.height.int && newBlock.previousBlockhash.contains(ledgerBlock.hash)) { logger.info(s"Appending block ${newBlock.height}, hash = ${newBlock.hash}") - ledgerDataHandler.push(newBlock, newTransactions) + ledgerDataHandler.push(newBlock) } else if (ledgerBlock.height.int + 1 == newBlock.height.int) { logger.info(s"Reorganization to push block ${newBlock.height}, hash = ${newBlock.hash}") val result = for { blockhash <- newBlock.previousBlockhash.toFutureOr(BlockNotFoundError) - x <- getRPCBlock(blockhash).toFutureOr - (previousBlock, previousTransactions) = x - _ <- synchronize(previousBlock, previousTransactions).toFutureOr - _ <- synchronize(newBlock, newTransactions).toFutureOr + previousBlock <- getRPCBlock(blockhash).toFutureOr + _ <- synchronize(previousBlock).toFutureOr + _ <- synchronize(newBlock).toFutureOr } yield () result.toFuture @@ -110,7 +108,7 @@ class LedgerSynchronizerService @Inject() ( logger.info(s"Filling holes to push block ${newBlock.height}, hash = ${newBlock.hash}") val result = for { _ <- sync(ledgerBlock.height.int + 1 until newBlock.height.int).toFutureOr - _ <- synchronize(newBlock, newTransactions).toFutureOr + _ <- synchronize(newBlock).toFutureOr } yield () result.toFuture @@ -128,7 +126,7 @@ class LedgerSynchronizerService @Inject() ( .getOrElse { val x = for { _ <- trimTo(newBlock.height).toFutureOr - _ <- synchronize(newBlock, newTransactions).toFutureOr + _ <- synchronize(newBlock).toFutureOr } yield () x.toFuture } @@ -150,21 +148,20 @@ class LedgerSynchronizerService @Inject() ( val result = for { _ <- previous.toFutureOr blockhash <- xsnService.getBlockhash(Height(height)).toFutureOr - x <- getRPCBlock(blockhash).toFutureOr - (block, transactions) = x - _ <- synchronize(block, transactions).toFutureOr + block <- getRPCBlock(blockhash).toFutureOr + _ <- synchronize(block).toFutureOr } yield () result.toFuture } } - private def getRPCBlock(blockhash: Blockhash): FutureApplicationResult[(Block, List[Transaction.HasIO])] = { + private def getRPCBlock(blockhash: Blockhash): FutureApplicationResult[Block.HasTransactions] = { val result = for { rpcBlock <- xsnService.getBlock(blockhash).toFutureOr extractionMethod <- blockService.extractionMethod(rpcBlock).toFutureOr transactions <- transactionRPCService.getTransactions(rpcBlock.transactions).toFutureOr - } yield (toPersistedBlock(rpcBlock, extractionMethod), transactions) + } yield toPersistedBlock(rpcBlock, extractionMethod).withTransactions(transactions) result.toFuture } diff --git a/server/test/com/xsn/explorer/data/LedgerPostgresDataHandlerSpec.scala b/server/test/com/xsn/explorer/data/LedgerPostgresDataHandlerSpec.scala index a33e407..0868203 100644 --- a/server/test/com/xsn/explorer/data/LedgerPostgresDataHandlerSpec.scala +++ b/server/test/com/xsn/explorer/data/LedgerPostgresDataHandlerSpec.scala @@ -21,7 +21,7 @@ class LedgerPostgresDataHandlerSpec extends PostgresDataHandlerSpec with BeforeA blockList.foreach { block => val transactions = getTransactions(block) - dataHandler.push(block, transactions) mustEqual Good(()) + dataHandler.push(block.withTransactions(transactions)) mustEqual Good(()) } } @@ -29,22 +29,22 @@ class LedgerPostgresDataHandlerSpec extends PostgresDataHandlerSpec with BeforeA blockList.drop(1).foreach { block => val transactions = getTransactions(block) - dataHandler.push(block, transactions) mustEqual Bad(PreviousBlockMissingError).accumulating + dataHandler.push(block.withTransactions(transactions)) mustEqual Bad(PreviousBlockMissingError).accumulating } } "succeed storing a repeated block by hash" in { val genesis = blockList(0) - dataHandler.push(genesis, getTransactions(genesis)) mustEqual Good(()) - dataHandler.push(genesis, getTransactions(genesis)) mustEqual Good(()) + dataHandler.push(genesis.withTransactions(getTransactions(genesis))) mustEqual Good(()) + dataHandler.push(genesis.withTransactions(getTransactions(genesis))) mustEqual Good(()) } "fail to store a repeated block by height" in { val genesis = blockList(0) - dataHandler.push(genesis, getTransactions(genesis)) mustEqual Good(()) + dataHandler.push(genesis.withTransactions(getTransactions(genesis))) mustEqual Good(()) val block = blockList(1).copy(previousBlockhash = None, height = genesis.height) - dataHandler.push(block, getTransactions(block)) mustEqual Bad(RepeatedBlockHeightError).accumulating + dataHandler.push(block.withTransactions(getTransactions(block))) mustEqual Bad(RepeatedBlockHeightError).accumulating } } @@ -62,7 +62,7 @@ class LedgerPostgresDataHandlerSpec extends PostgresDataHandlerSpec with BeforeA blockList.foreach { block => val transactions = getTransactions(block) - dataHandler.push(block, transactions) mustEqual Good(()) + dataHandler.push(block.withTransactions(transactions)) mustEqual Good(()) } blockList.reverse.foreach { block => diff --git a/server/test/com/xsn/explorer/data/TransactionPostgresDataHandlerSpec.scala b/server/test/com/xsn/explorer/data/TransactionPostgresDataHandlerSpec.scala index 9beb857..8eacec7 100644 --- a/server/test/com/xsn/explorer/data/TransactionPostgresDataHandlerSpec.scala +++ b/server/test/com/xsn/explorer/data/TransactionPostgresDataHandlerSpec.scala @@ -391,13 +391,13 @@ class TransactionPostgresDataHandlerSpec extends PostgresDataHandlerSpec with Be .map(TransactionLoader.get) .map(Transaction.fromRPC) - val result = ledgerDataHandler.push(block, transactions) + val result = ledgerDataHandler.push(block.withTransactions(transactions)) result.isGood mustEqual true } private def createBlock(block: Block, transactions: List[Transaction.HasIO]) = { - val result = ledgerDataHandler.push(block, transactions) + val result = ledgerDataHandler.push(block.withTransactions(transactions)) result.isGood mustEqual true }