From 03ed7a694b8efcf2e161a7d1a1f1976989340d75 Mon Sep 17 00:00:00 2001 From: Alexis Hernandez Date: Sat, 30 Jun 2018 23:25:52 -0500 Subject: [PATCH] server: Add the LedgerSynchronizerService (#37) This is a simplified version of the BlockEventProcessor, it handles the synchronization between the xsn and our ledger database, it also takes care of a lot of corner cases to keep the synchronization process flexible enough and linear. --- .../com/xsn/explorer/models/Transaction.scala | 6 + .../services/LedgerSynchronizerService.scala | 181 ++++++++++++ .../com/xsn/explorer/util/Extensions.scala | 18 ++ .../LedgerSynchronizerServiceSpec.scala | 258 ++++++++++++++++++ ...321041ccb953d53828824217a9dc61a1c857facf85 | 19 ++ ...7173e8c3a22289d7302ac15e7924fab62f743fd6cb | 34 +++ 6 files changed, 516 insertions(+) create mode 100644 server/app/com/xsn/explorer/services/LedgerSynchronizerService.scala create mode 100644 server/test/com/xsn/explorer/services/LedgerSynchronizerServiceSpec.scala create mode 100644 server/test/resources/blocks/0000017ee4121cd8ae22f7321041ccb953d53828824217a9dc61a1c857facf85 create mode 100644 server/test/resources/transactions/c2c232be75be9dc0d761737173e8c3a22289d7302ac15e7924fab62f743fd6cb diff --git a/server/app/com/xsn/explorer/models/Transaction.scala b/server/app/com/xsn/explorer/models/Transaction.scala index 187a582..3f55441 100644 --- a/server/app/com/xsn/explorer/models/Transaction.scala +++ b/server/app/com/xsn/explorer/models/Transaction.scala @@ -22,6 +22,12 @@ object Transaction { tposOwnerAddress: Option[Address], tposMerchantAddress: Option[Address]) + /** + * Please note that the inputs might not be accurate. + * + * If the rpc transaction might not be complete, get the input value and address using + * the utxo index or the getTransaction method from the TransactionService.. + */ def fromRPC(tx: rpc.Transaction): Transaction = { val inputs = tx.vin.zipWithIndex.map { case (vin, index) => Transaction.Input(index, vin.value, vin.address) diff --git a/server/app/com/xsn/explorer/services/LedgerSynchronizerService.scala b/server/app/com/xsn/explorer/services/LedgerSynchronizerService.scala new file mode 100644 index 0000000..e695d09 --- /dev/null +++ b/server/app/com/xsn/explorer/services/LedgerSynchronizerService.scala @@ -0,0 +1,181 @@ +package com.xsn.explorer.services + +import javax.inject.Inject + +import com.alexitc.playsonify.core.FutureApplicationResult +import com.alexitc.playsonify.core.FutureOr.Implicits.{FutureListOps, FutureOps, OptionOps} +import com.xsn.explorer.data.async.{BlockFutureDataHandler, LedgerFutureDataHandler} +import com.xsn.explorer.errors.BlockNotFoundError +import com.xsn.explorer.models.rpc.Block +import com.xsn.explorer.models.{Blockhash, Height, Transaction} +import com.xsn.explorer.util.Extensions.FutureOrExt +import org.scalactic.Good +import org.slf4j.LoggerFactory + +import scala.concurrent.{ExecutionContext, Future} + +class LedgerSynchronizerService @Inject() ( + xsnService: XSNService, + transactionService: TransactionService, + ledgerDataHandler: LedgerFutureDataHandler, + blockDataHandler: BlockFutureDataHandler)( + implicit ec: ExecutionContext) { + + private val logger = LoggerFactory.getLogger(this.getClass) + + /** + * Synchronize the given block with our ledger database. + * + * The synchronization involves a very complex logic in order to handle + * several corner cases, be sure to not call this method concurrently + * because the behavior is undefined. + */ + def synchronize(blockhash: Blockhash): FutureApplicationResult[Unit] = { + val result = for { + block <- xsnService.getBlock(blockhash).toFutureOr + transactions <- block.transactions.map(transactionService.getTransaction).toFutureOr + _ <- synchronize(block, transactions).toFutureOr + } yield () + + result.toFuture + } + + private def synchronize(block: Block, transactions: List[Transaction]): FutureApplicationResult[Unit] = { + logger.info(s"Synchronize block ${block.height}, hash = ${block.hash}") + + val result = for { + latestBlockMaybe <- blockDataHandler + .getLatestBlock() + .toFutureOr + .map(Option.apply) + .recoverFrom(BlockNotFoundError)(None) + + _ <- latestBlockMaybe + .map { latestBlock => onLatestBlock(latestBlock, block, transactions) } + .getOrElse { onEmptyLedger(block, transactions) } + .toFutureOr + } yield () + + result.toFuture + } + + /** + * 1. current ledger is empty: + * 1.1. the given block is the genensis block, it is added. + * 1.2. the given block is not the genesis block, sync everything until the given block. + */ + private def onEmptyLedger(block: Block, transactions: List[Transaction]): FutureApplicationResult[Unit] = { + if (block.height.int == 0) { + logger.info(s"Synchronize genesis block on empty ledger, hash = ${block.hash}") + ledgerDataHandler.push(block, transactions) + } else { + logger.info(s"Synchronize block ${block.height} on empty ledger, hash = ${block.hash}") + val result = for { + _ <- sync(0 until block.height.int).toFutureOr + _ <- synchronize(block, transactions).toFutureOr + } yield () + + result.toFuture + } + } + + /** + * 2. current ledger has blocks until N, given block height H: + * 2.1. if N+1 == H and its previous blockhash is N, it is added. + * 2.2. if N+1 == H and its previous blockhash isn't N, pick the expected block N from H and apply the whole process with it, then, apply H. + * 2.3. if H > N+1, sync everything until H. + * 2.4. if H <= N, if the hash already exists, it is ignored. + * 2.5. if H <= N, if the hash doesn't exists, remove blocks from N to H (included), then, add the new H. + */ + private def onLatestBlock(ledgerBlock: Block, newBlock: Block, newTransactions: List[Transaction]): FutureApplicationResult[Unit] = { + if (ledgerBlock.height.int + 1 == newBlock.height.int && + newBlock.previousBlockhash.contains(ledgerBlock.hash)) { + + logger.info(s"Appending block ${newBlock.height}, hash = ${newBlock.hash}") + ledgerDataHandler.push(newBlock, newTransactions) + } else if (ledgerBlock.height.int + 1 == newBlock.height.int) { + logger.info(s"Reorganization to push block ${newBlock.height}, hash = ${newBlock.hash}") + val result = for { + blockhash <- newBlock.previousBlockhash.toFutureOr(BlockNotFoundError) + previousBlock <- xsnService.getBlock(blockhash).toFutureOr + previousTransactions <- previousBlock.transactions.map(transactionService.getTransaction).toFutureOr + _ <- synchronize(previousBlock, previousTransactions).toFutureOr + _ <- synchronize(newBlock, newTransactions).toFutureOr + } yield () + + result.toFuture + } else if (newBlock.height.int > ledgerBlock.height.int) { + logger.info(s"Filling holes to push block ${newBlock.height}, hash = ${newBlock.hash}") + val result = for { + _ <- sync(ledgerBlock.height.int + 1 until newBlock.height.int).toFutureOr + _ <- synchronize(newBlock, newTransactions).toFutureOr + } yield () + + result.toFuture + } else { + val result = for { + expectedBlockMaybe <- blockDataHandler + .getBy(newBlock.hash) + .toFutureOr + .map(Option.apply) + .recoverFrom(BlockNotFoundError)(None) + + _ = logger.info(s"Checking possible existing block ${newBlock.height}, hash = ${newBlock.hash}, exists = ${expectedBlockMaybe.isDefined}") + _ <- expectedBlockMaybe + .map { _ => Future.successful(Good(())) } + .getOrElse { + val x = for { + _ <- trimTo(newBlock.height).toFutureOr + _ <- synchronize(newBlock, newTransactions).toFutureOr + } yield () + x.toFuture + } + .toFutureOr + } yield () + + result.toFuture + } + } + + /** + * Sync the given range to our ledger. + */ + private def sync(range: Range): FutureApplicationResult[Unit] = { + logger.info(s"Syncing block range = $range") + + // TODO: check, it might be safer to use the nextBlockhash instead of the height + range.foldLeft[FutureApplicationResult[Unit]](Future.successful(Good(()))) { case (previous, height) => + val result = for { + _ <- previous.toFutureOr + blockhash <- xsnService.getBlockhash(Height(height)).toFutureOr + block <- xsnService.getBlock(blockhash).toFutureOr + transactions <- block.transactions.map(transactionService.getTransaction).toFutureOr + _ <- synchronize(block, transactions).toFutureOr + } yield () + + result.toFuture + } + } + + /** + * Trim the ledger until the given block height, if the height is 4, + * the last stored block will be 3. + */ + private def trimTo(height: Height): FutureApplicationResult[Unit] = { + val result = ledgerDataHandler + .pop() + .toFutureOr + .flatMap { block => + logger.info(s"Trimmed block ${block.height} from the ledger") + val result = if (block.height == height) { + Future.successful(Good(())) + } else { + trimTo(height) + } + + result.toFutureOr + } + + result.toFuture + } +} diff --git a/server/app/com/xsn/explorer/util/Extensions.scala b/server/app/com/xsn/explorer/util/Extensions.scala index 308e523..703f10f 100644 --- a/server/app/com/xsn/explorer/util/Extensions.scala +++ b/server/app/com/xsn/explorer/util/Extensions.scala @@ -1,5 +1,11 @@ package com.xsn.explorer.util +import com.alexitc.playsonify.core.FutureOr +import com.alexitc.playsonify.models.ApplicationError +import org.scalactic.{Bad, Good, One} + +import scala.concurrent.ExecutionContext + object Extensions { private val SatoshiScale = 100000000L @@ -19,4 +25,16 @@ object Extensions { } } } + + implicit class FutureOrExt[+A](val inner: FutureOr[A]) { + def recoverFrom[B >: A](error: ApplicationError)(f: => B)(implicit ec: ExecutionContext): FutureOr[B] = { + val future = inner.toFuture.map { + case Good(result) => Good(result) + case Bad(One(e)) if e == error => Good(f) + case Bad(errors) => Bad(errors) + } + + new FutureOr(future) + } + } } diff --git a/server/test/com/xsn/explorer/services/LedgerSynchronizerServiceSpec.scala b/server/test/com/xsn/explorer/services/LedgerSynchronizerServiceSpec.scala new file mode 100644 index 0000000..ffd6e9c --- /dev/null +++ b/server/test/com/xsn/explorer/services/LedgerSynchronizerServiceSpec.scala @@ -0,0 +1,258 @@ +package com.xsn.explorer.services + +import com.alexitc.playsonify.core.FutureApplicationResult +import com.alexitc.playsonify.validators.PaginatedQueryValidator +import com.xsn.explorer.data.anorm.dao.{BalancePostgresDAO, BlockPostgresDAO, TransactionPostgresDAO} +import com.xsn.explorer.data.anorm.interpreters.FieldOrderingSQLInterpreter +import com.xsn.explorer.data.anorm.{BalancePostgresDataHandler, BlockPostgresDataHandler, LedgerPostgresDataHandler, TransactionPostgresDataHandler} +import com.xsn.explorer.data.async.{BlockFutureDataHandler, LedgerFutureDataHandler, TransactionFutureDataHandler} +import com.xsn.explorer.data.common.PostgresDataHandlerSpec +import com.xsn.explorer.errors.BlockNotFoundError +import com.xsn.explorer.helpers._ +import com.xsn.explorer.models.rpc.Block +import com.xsn.explorer.models.{Blockhash, Height} +import com.xsn.explorer.parsers.TransactionOrderingParser +import org.scalactic.{Bad, Good, One, Or} +import org.scalatest.BeforeAndAfter +import org.scalatest.concurrent.ScalaFutures + +import scala.concurrent.Future + +class LedgerSynchronizerServiceSpec extends PostgresDataHandlerSpec with BeforeAndAfter with ScalaFutures { + + lazy val dataHandler = new LedgerPostgresDataHandler( + database, + new BlockPostgresDAO, + new TransactionPostgresDAO(new FieldOrderingSQLInterpreter), + new BalancePostgresDAO(new FieldOrderingSQLInterpreter)) + + lazy val transactionDataHandler = new TransactionPostgresDataHandler( + database, + new TransactionPostgresDAO(new FieldOrderingSQLInterpreter)) + + lazy val blockDataHandler = new BlockPostgresDataHandler(database, new BlockPostgresDAO) + + val blockList = List( + BlockLoader.get("00000c822abdbb23e28f79a49d29b41429737c6c7e15df40d1b1f1b35907ae34"), + BlockLoader.get("000003fb382f6892ae96594b81aa916a8923c70701de4e7054aac556c7271ef7"), + BlockLoader.get("000004645e2717b556682e3c642a4c6e473bf25c653ff8e8c114a3006040ffb8"), + BlockLoader.get("00000766115b26ecbc09cd3a3db6870fdaf2f049d65a910eb2f2b48b566ca7bd"), + BlockLoader.get("00000b59875e80b0afc6c657bc5318d39e03532b7d97fb78a4c7bd55c4840c32"), + BlockLoader.get("00000267225f7dba55d9a3493740e7f0dde0f28a371d2c3b42e7676b5728d020"), + BlockLoader.get("0000017ee4121cd8ae22f7321041ccb953d53828824217a9dc61a1c857facf85") + ) + + val genesis = blockList(0) + + before { + clearDatabase() + } + + "synchronize" should { + "add the genensis block to the empty ledger" in { + val synchronizer = ledgerSynchronizerService(genesis) + + whenReady(synchronizer.synchronize(genesis.hash)) { result => + result mustEqual Good(()) + verifyLedger(genesis) + } + } + + "add the old missing blocks blocks while adding block N to the empty ledger" in { + val block = blockList.last + val synchronizer = ledgerSynchronizerService(blockList: _*) + whenReady(synchronizer.synchronize(block.hash)) { result => + result mustEqual Good(()) + verifyLedger(blockList: _*) + } + } + + "append a block to the latest block" in { + val synchronizer = ledgerSynchronizerService(blockList: _*) + + whenReady(synchronizer.synchronize(genesis.hash)) { _ mustEqual Good(()) } + + blockList.drop(1).foreach { block => + whenReady(synchronizer.synchronize(block.hash)) { _ mustEqual Good(()) } + } + + verifyLedger(blockList: _*) + } + + "ignore a duplicated block" in { + val synchronizer = ledgerSynchronizerService(blockList: _*) + + createBlocks(synchronizer, blockList: _*) + + val block = blockList(3) + whenReady(synchronizer.synchronize(block.hash)) { _ mustEqual Good(()) } + + verifyLedger(blockList: _*) + } + + "add the old missing blocks blocks while adding block N to a ledger with some blocks" in { + val initialBlocks = blockList.take(3) + val synchronizer = ledgerSynchronizerService(blockList: _*) + + createBlocks(synchronizer, initialBlocks: _*) + + val block = blockList.last + whenReady(synchronizer.synchronize(block.hash)) { result => + result mustEqual Good(()) + verifyLedger(blockList: _*) + } + } + + "handle reorganization, ledger has 3 blocks, a rechain occurs from block 2 while adding new block 3" in { + val block1 = blockList(1) + val block2 = blockList(2) + val block3 = blockList(3) + val newBlock2 = blockList(4).copy(previousBlockhash = block2.previousBlockhash, height = block2.height) + val newBlock3 = blockList(5).copy(previousBlockhash = Some(newBlock2.hash), height = Height(3)) + + val initialBlocks = List(genesis, block1, block2, block3) + createBlocks(ledgerSynchronizerService(initialBlocks: _*), initialBlocks: _*) + + val finalBlocks = List( + genesis, + block1.copy(nextBlockhash = Some(newBlock2.hash)), + newBlock2.copy(nextBlockhash = Some(newBlock3.hash)), + newBlock3) + + val synchronizer = ledgerSynchronizerService(finalBlocks: _*) + whenReady(synchronizer.synchronize(newBlock3.hash)) { result => + result mustEqual Good(()) + verifyLedger(finalBlocks: _*) + } + } + + "handle reorganization, ledger has 3 blocks, a rechain occurs from block 2 while adding new block 4" in { + val block1 = blockList(1) + val block2 = blockList(2) + val block3 = blockList(3) + val newBlock2 = blockList(4).copy(previousBlockhash = block2.previousBlockhash, height = block2.height) + val newBlock3 = blockList(5).copy(previousBlockhash = Some(newBlock2.hash), height = Height(3)) + val newBlock4 = blockList(6).copy(previousBlockhash = Some(newBlock3.hash), height = Height(4)) + + val initialBlocks = List(genesis, block1, block2, block3) + createBlocks(ledgerSynchronizerService(initialBlocks: _*), initialBlocks: _*) + + val finalBlocks = List( + genesis, + block1.copy(nextBlockhash = Some(newBlock2.hash)), + newBlock2.copy(nextBlockhash = Some(newBlock3.hash)), + newBlock3.copy(nextBlockhash = Some(newBlock4.hash)), + newBlock4) + + val synchronizer = ledgerSynchronizerService(finalBlocks: _*) + whenReady(synchronizer.synchronize(newBlock4.hash)) { result => + result mustEqual Good(()) + verifyLedger(finalBlocks: _*) + } + } + + "handle reorganization, ledger has 6 blocks, a rechain occurs from block 2 while adding new block 2" in { + val initialBlocks = blockList.take(6) + createBlocks(ledgerSynchronizerService(initialBlocks: _*), initialBlocks: _*) + + val block1 = blockList(1) + val newBlock2 = blockList.drop(6).head.copy(previousBlockhash = Some(block1.hash), height = Height(2)) + val finalBlocks = List( + genesis, + block1.copy(nextBlockhash = Some(newBlock2.hash)), + newBlock2 + ) + + val synchronizer = ledgerSynchronizerService(finalBlocks: _*) + whenReady(synchronizer.synchronize(newBlock2.hash)) { result => + result mustEqual Good(()) + verifyLedger(finalBlocks: _*) + } + } + + "process a block without spent index on transactions" in { + val block = BlockLoader.get("000001ff95f22b8d82db14a5c5e9f725e8239e548be43c668766e7ddaee81924") + .copy(previousBlockhash = None, height = Height(0)) + + val synchronizer = ledgerSynchronizerService(block) + whenReady(synchronizer.synchronize(block.hash)) { result => + result.isGood mustEqual true + + val balanceDataHandler = new BalancePostgresDataHandler(database, new BalancePostgresDAO(new FieldOrderingSQLInterpreter)) + val balance = balanceDataHandler.getBy(DataHelper.createAddress("XdJnCKYNwzCz8ATv8Eu75gonaHyfr9qXg9")) + + balance.get.spent mustEqual BigDecimal("76500000.000000000000000") + } + } + } + + private def verifyLedger(blocks: Block*) = { + countBlocks() mustEqual blocks.size + blocks.foreach { block => + val dbBlock = blockDataHandler.getBy(block.hash).get + + dbBlock.height mustEqual block.height + dbBlock.previousBlockhash mustEqual block.previousBlockhash + if (block == blocks.last) { + dbBlock.nextBlockhash.isEmpty mustEqual true + } else { + dbBlock.nextBlockhash mustEqual block.nextBlockhash + } + } + } + + private def countBlocks() = { + database.withConnection { implicit conn => + _root_.anorm.SQL("""SELECT COUNT(*) FROM blocks""").as(_root_.anorm.SqlParser.scalar[Int].single) + } + } + + private def createBlocks(synchronizer: LedgerSynchronizerService, blocks: Block*) = { + blocks + .foreach { block => + whenReady(synchronizer.synchronize(block.hash)) { result => + result.isGood mustEqual true + } + } + } + + private def ledgerSynchronizerService(blocks: Block*): LedgerSynchronizerService = { + val xsnService = new FileBasedXSNService { + override def getBlock(blockhash: Blockhash): FutureApplicationResult[Block] = { + blocks + .find(_.hash == blockhash) + .map { block => Future.successful(Good(XSNService.cleanGenesisBlock(block))) } + .getOrElse { + Future.successful(Bad(BlockNotFoundError).accumulating) + } + } + + override def getLatestBlock(): FutureApplicationResult[Block] = { + val block = XSNService.cleanGenesisBlock(blocks.maxBy(_.height.int)) + Future.successful(Good(block)) + } + + override def getBlockhash(height: Height): FutureApplicationResult[Blockhash] = { + val maybe = blocks.find(_.height == height).map(_.hash) + val result = Or.from(maybe, One(BlockNotFoundError)) + Future.successful(result) + } + } + + ledgerSynchronizerService(xsnService) + } + + private def ledgerSynchronizerService(xsnService: XSNService): LedgerSynchronizerService = { + val transactionService = new TransactionService( + new PaginatedQueryValidator, + new TransactionOrderingParser, + xsnService, + new TransactionFutureDataHandler(transactionDataHandler)(Executors.databaseEC)) + + new LedgerSynchronizerService( + xsnService, + transactionService, + new LedgerFutureDataHandler(dataHandler)(Executors.databaseEC), + new BlockFutureDataHandler(blockDataHandler)(Executors.databaseEC)) + } +} diff --git a/server/test/resources/blocks/0000017ee4121cd8ae22f7321041ccb953d53828824217a9dc61a1c857facf85 b/server/test/resources/blocks/0000017ee4121cd8ae22f7321041ccb953d53828824217a9dc61a1c857facf85 new file mode 100644 index 0000000..47a5e1b --- /dev/null +++ b/server/test/resources/blocks/0000017ee4121cd8ae22f7321041ccb953d53828824217a9dc61a1c857facf85 @@ -0,0 +1,19 @@ +{ + "hash": "0000017ee4121cd8ae22f7321041ccb953d53828824217a9dc61a1c857facf85", + "confirmations": 163684, + "size": 179, + "height": 6, + "version": 536870912, + "merkleroot": "c2c232be75be9dc0d761737173e8c3a22289d7302ac15e7924fab62f743fd6cb", + "tx": [ + "c2c232be75be9dc0d761737173e8c3a22289d7302ac15e7924fab62f743fd6cb" + ], + "time": 1520276361, + "mediantime": 1520276303, + "nonce": 3562, + "bits": "1e0ffff0", + "difficulty": 0.000244140625, + "chainwork": "0000000000000000000000000000000000000000000000000000000000700070", + "previousblockhash": "00000267225f7dba55d9a3493740e7f0dde0f28a371d2c3b42e7676b5728d020", + "nextblockhash": "000009b8e692cda20b4d3347a95aa9ad1aa9fba3f242e3ba2ff11929a73e3914" +} \ No newline at end of file diff --git a/server/test/resources/transactions/c2c232be75be9dc0d761737173e8c3a22289d7302ac15e7924fab62f743fd6cb b/server/test/resources/transactions/c2c232be75be9dc0d761737173e8c3a22289d7302ac15e7924fab62f743fd6cb new file mode 100644 index 0000000..ccdedb2 --- /dev/null +++ b/server/test/resources/transactions/c2c232be75be9dc0d761737173e8c3a22289d7302ac15e7924fab62f743fd6cb @@ -0,0 +1,34 @@ +{ + "hex": "01000000010000000000000000000000000000000000000000000000000000000000000000ffffffff03560101ffffffff010000000000000000232103e8c52f2c5155771492907095753a43ce776e1fa7c5e769a67a9f3db4467ec029ac00000000", + "txid": "c2c232be75be9dc0d761737173e8c3a22289d7302ac15e7924fab62f743fd6cb", + "size": 98, + "version": 1, + "locktime": 0, + "vin": [ + { + "coinbase": "560101", + "sequence": 4294967295 + } + ], + "vout": [ + { + "value": 0, + "valueSat": 0, + "n": 0, + "scriptPubKey": { + "asm": "03e8c52f2c5155771492907095753a43ce776e1fa7c5e769a67a9f3db4467ec029 OP_CHECKSIG", + "hex": "2103e8c52f2c5155771492907095753a43ce776e1fa7c5e769a67a9f3db4467ec029ac", + "reqSigs": 1, + "type": "pubkey", + "addresses": [ + "XdJnCKYNwzCz8ATv8Eu75gonaHyfr9qXg9" + ] + } + } + ], + "blockhash": "0000017ee4121cd8ae22f7321041ccb953d53828824217a9dc61a1c857facf85", + "height": 6, + "confirmations": 163685, + "time": 1520276361, + "blocktime": 1520276361 +} \ No newline at end of file