Browse Source

server: Update classes related to the ledger sync process to use the Block.HasTransactions model

master
Alexis Hernandez 6 years ago
parent
commit
c921d73415
  1. 4
      server/app/com/xsn/explorer/data/LedgerDataHandler.scala
  2. 15
      server/app/com/xsn/explorer/data/anorm/LedgerPostgresDataHandler.scala
  3. 6
      server/app/com/xsn/explorer/data/async/LedgerFutureDataHandler.scala
  4. 43
      server/app/com/xsn/explorer/services/LedgerSynchronizerService.scala
  5. 14
      server/test/com/xsn/explorer/data/LedgerPostgresDataHandlerSpec.scala
  6. 4
      server/test/com/xsn/explorer/data/TransactionPostgresDataHandlerSpec.scala

4
server/app/com/xsn/explorer/data/LedgerDataHandler.scala

@ -1,7 +1,7 @@
package com.xsn.explorer.data
import com.alexitc.playsonify.core.ApplicationResult
import com.xsn.explorer.models.persisted.{Block, Transaction}
import com.xsn.explorer.models.persisted.Block
import scala.language.higherKinds
@ -16,7 +16,7 @@ trait LedgerDataHandler[F[_]] {
* - The ledger is empty and the block is the genesis one.
* - The ledger has some blocks and the block goes just after the latest one.
*/
def push(block: Block, transactions: List[Transaction.HasIO]): F[Unit]
def push(block: Block.HasTransactions): F[Unit]
/**
* Remove the latest block from the ledger, it will succeed only if the ledger is not empty.

15
server/app/com/xsn/explorer/data/anorm/LedgerPostgresDataHandler.scala

@ -31,12 +31,11 @@ class LedgerPostgresDataHandler @Inject() (
* to have a next block, we remove the link because that block is not stored yet.
*/
override def push(
block: Block,
transactions: List[Transaction.HasIO]): ApplicationResult[Unit] = {
block: Block.HasTransactions): ApplicationResult[Unit] = {
val result = withTransaction { implicit conn =>
val result = for {
_ <- upsertBlockCascade(block.copy(nextBlockhash = None), transactions)
_ <- upsertBlockCascade(block.asTip)
} yield ()
result
@ -67,17 +66,17 @@ class LedgerPostgresDataHandler @Inject() (
.getOrElse(throw new RuntimeException("Unable to pop block"))
}
private def upsertBlockCascade(block: Block, transactions: List[Transaction.HasIO])(implicit conn: Connection): Option[Unit] = {
private def upsertBlockCascade(block: Block.HasTransactions)(implicit conn: Connection): Option[Unit] = {
val result = for {
// block
_ <- deleteBlockCascade(block).orElse(Some(()))
_ <- blockPostgresDAO.insert(block)
_ <- deleteBlockCascade(block.block).orElse(Some(()))
_ <- blockPostgresDAO.insert(block.block)
// batch insert
_ <- transactionPostgresDAO.insert(transactions)
_ <- transactionPostgresDAO.insert(block.transactions)
// balances
balanceList = balances(transactions)
balanceList = balances(block.transactions)
_ <- insertBalanceBatch(balanceList).toList.everything
// compute aggregated amount

6
server/app/com/xsn/explorer/data/async/LedgerFutureDataHandler.scala

@ -3,7 +3,7 @@ package com.xsn.explorer.data.async
import com.alexitc.playsonify.core.FutureApplicationResult
import com.xsn.explorer.data.{LedgerBlockingDataHandler, LedgerDataHandler}
import com.xsn.explorer.executors.DatabaseExecutionContext
import com.xsn.explorer.models.persisted.{Block, Transaction}
import com.xsn.explorer.models.persisted.Block
import javax.inject.Inject
import scala.concurrent.Future
@ -13,8 +13,8 @@ class LedgerFutureDataHandler @Inject() (
implicit ec: DatabaseExecutionContext)
extends LedgerDataHandler[FutureApplicationResult] {
override def push(block: Block, transactions: List[Transaction.HasIO]): FutureApplicationResult[Unit] = Future {
blockingDataHandler.push(block, transactions)
override def push(block: Block.HasTransactions): FutureApplicationResult[Unit] = Future {
blockingDataHandler.push(block)
}
override def pop(): FutureApplicationResult[Block] = Future {

43
server/app/com/xsn/explorer/services/LedgerSynchronizerService.scala

@ -4,7 +4,7 @@ import com.alexitc.playsonify.core.FutureApplicationResult
import com.alexitc.playsonify.core.FutureOr.Implicits.{FutureOps, OptionOps}
import com.xsn.explorer.data.async.{BlockFutureDataHandler, LedgerFutureDataHandler}
import com.xsn.explorer.errors.BlockNotFoundError
import com.xsn.explorer.models.persisted.{Block, Transaction}
import com.xsn.explorer.models.persisted.Block
import com.xsn.explorer.models.transformers._
import com.xsn.explorer.models.values.{Blockhash, Height}
import com.xsn.explorer.util.Extensions.FutureOrExt
@ -34,15 +34,14 @@ class LedgerSynchronizerService @Inject() (
*/
def synchronize(blockhash: Blockhash): FutureApplicationResult[Unit] = {
val result = for {
x <- getRPCBlock(blockhash).toFutureOr
(block, transactions) = x
_ <- synchronize(block, transactions).toFutureOr
block <- getRPCBlock(blockhash).toFutureOr
_ <- synchronize(block).toFutureOr
} yield ()
result.toFuture
}
private def synchronize(block: Block, transactions: List[Transaction.HasIO]): FutureApplicationResult[Unit] = {
private def synchronize(block: Block.HasTransactions): FutureApplicationResult[Unit] = {
logger.info(s"Synchronize block ${block.height}, hash = ${block.hash}")
val result = for {
@ -53,8 +52,8 @@ class LedgerSynchronizerService @Inject() (
.recoverFrom(BlockNotFoundError)(None)
_ <- latestBlockMaybe
.map { latestBlock => onLatestBlock(latestBlock, block, transactions) }
.getOrElse { onEmptyLedger(block, transactions) }
.map { latestBlock => onLatestBlock(latestBlock, block) }
.getOrElse { onEmptyLedger(block) }
.toFutureOr
} yield ()
@ -66,15 +65,15 @@ class LedgerSynchronizerService @Inject() (
* 1.1. the given block is the genensis block, it is added.
* 1.2. the given block is not the genesis block, sync everything until the given block.
*/
private def onEmptyLedger(block: Block, transactions: List[Transaction.HasIO]): FutureApplicationResult[Unit] = {
private def onEmptyLedger(block: Block.HasTransactions): FutureApplicationResult[Unit] = {
if (block.height.int == 0) {
logger.info(s"Synchronize genesis block on empty ledger, hash = ${block.hash}")
ledgerDataHandler.push(block, transactions)
ledgerDataHandler.push(block)
} else {
logger.info(s"Synchronize block ${block.height} on empty ledger, hash = ${block.hash}")
val result = for {
_ <- sync(0 until block.height.int).toFutureOr
_ <- synchronize(block, transactions).toFutureOr
_ <- synchronize(block).toFutureOr
} yield ()
result.toFuture
@ -89,20 +88,19 @@ class LedgerSynchronizerService @Inject() (
* 2.4. if H <= N, if the hash already exists, it is ignored.
* 2.5. if H <= N, if the hash doesn't exists, remove blocks from N to H (included), then, add the new H.
*/
private def onLatestBlock(ledgerBlock: Block, newBlock: Block, newTransactions: List[Transaction.HasIO]): FutureApplicationResult[Unit] = {
private def onLatestBlock(ledgerBlock: Block, newBlock: Block.HasTransactions): FutureApplicationResult[Unit] = {
if (ledgerBlock.height.int + 1 == newBlock.height.int &&
newBlock.previousBlockhash.contains(ledgerBlock.hash)) {
logger.info(s"Appending block ${newBlock.height}, hash = ${newBlock.hash}")
ledgerDataHandler.push(newBlock, newTransactions)
ledgerDataHandler.push(newBlock)
} else if (ledgerBlock.height.int + 1 == newBlock.height.int) {
logger.info(s"Reorganization to push block ${newBlock.height}, hash = ${newBlock.hash}")
val result = for {
blockhash <- newBlock.previousBlockhash.toFutureOr(BlockNotFoundError)
x <- getRPCBlock(blockhash).toFutureOr
(previousBlock, previousTransactions) = x
_ <- synchronize(previousBlock, previousTransactions).toFutureOr
_ <- synchronize(newBlock, newTransactions).toFutureOr
previousBlock <- getRPCBlock(blockhash).toFutureOr
_ <- synchronize(previousBlock).toFutureOr
_ <- synchronize(newBlock).toFutureOr
} yield ()
result.toFuture
@ -110,7 +108,7 @@ class LedgerSynchronizerService @Inject() (
logger.info(s"Filling holes to push block ${newBlock.height}, hash = ${newBlock.hash}")
val result = for {
_ <- sync(ledgerBlock.height.int + 1 until newBlock.height.int).toFutureOr
_ <- synchronize(newBlock, newTransactions).toFutureOr
_ <- synchronize(newBlock).toFutureOr
} yield ()
result.toFuture
@ -128,7 +126,7 @@ class LedgerSynchronizerService @Inject() (
.getOrElse {
val x = for {
_ <- trimTo(newBlock.height).toFutureOr
_ <- synchronize(newBlock, newTransactions).toFutureOr
_ <- synchronize(newBlock).toFutureOr
} yield ()
x.toFuture
}
@ -150,21 +148,20 @@ class LedgerSynchronizerService @Inject() (
val result = for {
_ <- previous.toFutureOr
blockhash <- xsnService.getBlockhash(Height(height)).toFutureOr
x <- getRPCBlock(blockhash).toFutureOr
(block, transactions) = x
_ <- synchronize(block, transactions).toFutureOr
block <- getRPCBlock(blockhash).toFutureOr
_ <- synchronize(block).toFutureOr
} yield ()
result.toFuture
}
}
private def getRPCBlock(blockhash: Blockhash): FutureApplicationResult[(Block, List[Transaction.HasIO])] = {
private def getRPCBlock(blockhash: Blockhash): FutureApplicationResult[Block.HasTransactions] = {
val result = for {
rpcBlock <- xsnService.getBlock(blockhash).toFutureOr
extractionMethod <- blockService.extractionMethod(rpcBlock).toFutureOr
transactions <- transactionRPCService.getTransactions(rpcBlock.transactions).toFutureOr
} yield (toPersistedBlock(rpcBlock, extractionMethod), transactions)
} yield toPersistedBlock(rpcBlock, extractionMethod).withTransactions(transactions)
result.toFuture
}

14
server/test/com/xsn/explorer/data/LedgerPostgresDataHandlerSpec.scala

@ -21,7 +21,7 @@ class LedgerPostgresDataHandlerSpec extends PostgresDataHandlerSpec with BeforeA
blockList.foreach { block =>
val transactions = getTransactions(block)
dataHandler.push(block, transactions) mustEqual Good(())
dataHandler.push(block.withTransactions(transactions)) mustEqual Good(())
}
}
@ -29,22 +29,22 @@ class LedgerPostgresDataHandlerSpec extends PostgresDataHandlerSpec with BeforeA
blockList.drop(1).foreach { block =>
val transactions = getTransactions(block)
dataHandler.push(block, transactions) mustEqual Bad(PreviousBlockMissingError).accumulating
dataHandler.push(block.withTransactions(transactions)) mustEqual Bad(PreviousBlockMissingError).accumulating
}
}
"succeed storing a repeated block by hash" in {
val genesis = blockList(0)
dataHandler.push(genesis, getTransactions(genesis)) mustEqual Good(())
dataHandler.push(genesis, getTransactions(genesis)) mustEqual Good(())
dataHandler.push(genesis.withTransactions(getTransactions(genesis))) mustEqual Good(())
dataHandler.push(genesis.withTransactions(getTransactions(genesis))) mustEqual Good(())
}
"fail to store a repeated block by height" in {
val genesis = blockList(0)
dataHandler.push(genesis, getTransactions(genesis)) mustEqual Good(())
dataHandler.push(genesis.withTransactions(getTransactions(genesis))) mustEqual Good(())
val block = blockList(1).copy(previousBlockhash = None, height = genesis.height)
dataHandler.push(block, getTransactions(block)) mustEqual Bad(RepeatedBlockHeightError).accumulating
dataHandler.push(block.withTransactions(getTransactions(block))) mustEqual Bad(RepeatedBlockHeightError).accumulating
}
}
@ -62,7 +62,7 @@ class LedgerPostgresDataHandlerSpec extends PostgresDataHandlerSpec with BeforeA
blockList.foreach { block =>
val transactions = getTransactions(block)
dataHandler.push(block, transactions) mustEqual Good(())
dataHandler.push(block.withTransactions(transactions)) mustEqual Good(())
}
blockList.reverse.foreach { block =>

4
server/test/com/xsn/explorer/data/TransactionPostgresDataHandlerSpec.scala

@ -391,13 +391,13 @@ class TransactionPostgresDataHandlerSpec extends PostgresDataHandlerSpec with Be
.map(TransactionLoader.get)
.map(Transaction.fromRPC)
val result = ledgerDataHandler.push(block, transactions)
val result = ledgerDataHandler.push(block.withTransactions(transactions))
result.isGood mustEqual true
}
private def createBlock(block: Block, transactions: List[Transaction.HasIO]) = {
val result = ledgerDataHandler.push(block, transactions)
val result = ledgerDataHandler.push(block.withTransactions(transactions))
result.isGood mustEqual true
}

Loading…
Cancel
Save