Browse Source

server: Optimize the ledger synchronization process

The block transactions are loaded only when they are required, this
is specially useful while synchronizing bitcoin because it reduces
the workload to the bitcoind on the initial sync.
bitcoin
Alexis Hernandez 6 years ago
parent
commit
3ea419bf85
  1. 50
      server/app/com/xsn/explorer/services/LedgerSynchronizerService.scala

50
server/app/com/xsn/explorer/services/LedgerSynchronizerService.scala

@ -4,10 +4,10 @@ import com.alexitc.playsonify.core.FutureOr.Implicits.{FutureOps, OptionOps}
import com.alexitc.playsonify.core.{ApplicationResult, FutureApplicationResult}
import com.xsn.explorer.data.async.{BlockFutureDataHandler, LedgerFutureDataHandler}
import com.xsn.explorer.errors.BlockNotFoundError
import com.xsn.explorer.models.TPoSContract
import com.xsn.explorer.models._
import com.xsn.explorer.models.persisted.Block
import com.xsn.explorer.models.transformers._
import com.xsn.explorer.models.values.{Blockhash, Height}
import com.xsn.explorer.models.values._
import com.xsn.explorer.util.Extensions.FutureOrExt
import javax.inject.Inject
import org.scalactic.{Bad, Good}
@ -37,15 +37,14 @@ class LedgerSynchronizerService @Inject() (
*/
def synchronize(blockhash: Blockhash): FutureApplicationResult[Unit] = {
val result = for {
data <- getRPCBlock(blockhash).toFutureOr
data <- xsnService.getBlock(blockhash).toFutureOr
_ <- synchronize(data).toFutureOr
} yield ()
result.toFuture
}
private def synchronize(data: BlockData): FutureApplicationResult[Unit] = {
val block = data._1
private def synchronize(block: rpc.Block): FutureApplicationResult[Unit] = {
logger.info(s"Synchronize block ${block.height}, hash = ${block.hash}")
val result = for {
@ -56,8 +55,8 @@ class LedgerSynchronizerService @Inject() (
.recoverFrom(BlockNotFoundError)(None)
_ <- latestBlockMaybe
.map { latestBlock => onLatestBlock(latestBlock, data) }
.getOrElse { onEmptyLedger(data) }
.map { latestBlock => onLatestBlock(latestBlock, block) }
.getOrElse { onEmptyLedger(block) }
.toFutureOr
} yield ()
@ -69,16 +68,15 @@ class LedgerSynchronizerService @Inject() (
* 1.1. the given block is the genensis block, it is added.
* 1.2. the given block is not the genesis block, sync everything until the given block.
*/
private def onEmptyLedger(data: BlockData): FutureApplicationResult[Unit] = {
val block = data._1
private def onEmptyLedger(block: rpc.Block): FutureApplicationResult[Unit] = {
if (block.height.int == 0) {
logger.info(s"Synchronize genesis block on empty ledger, hash = ${block.hash}")
ledgerDataHandler.push(block, data._2)
appendBlock(block)
} else {
logger.info(s"Synchronize block ${block.height} on empty ledger, hash = ${block.hash}")
val result = for {
_ <- sync(0 until block.height.int).toFutureOr
_ <- synchronize(data).toFutureOr
_ <- synchronize(block).toFutureOr
} yield ()
result.toFuture
@ -93,20 +91,19 @@ class LedgerSynchronizerService @Inject() (
* 2.4. if H <= N, if the hash already exists, it is ignored.
* 2.5. if H <= N, if the hash doesn't exists, remove blocks from N to H (included), then, add the new H.
*/
private def onLatestBlock(ledgerBlock: Block, newData: BlockData): FutureApplicationResult[Unit] = {
val newBlock = newData._1
private def onLatestBlock(ledgerBlock: Block, newBlock: rpc.Block): FutureApplicationResult[Unit] = {
if (ledgerBlock.height.int + 1 == newBlock.height.int &&
newBlock.previousBlockhash.contains(ledgerBlock.hash)) {
logger.info(s"Appending block ${newBlock.height}, hash = ${newBlock.hash}")
ledgerDataHandler.push(newBlock, newData._2)
appendBlock(newBlock)
} else if (ledgerBlock.height.int + 1 == newBlock.height.int) {
logger.info(s"Reorganization to push block ${newBlock.height}, hash = ${newBlock.hash}")
val result = for {
blockhash <- newBlock.previousBlockhash.toFutureOr(BlockNotFoundError)
previousBlock <- getRPCBlock(blockhash).toFutureOr
previousBlock <- xsnService.getBlock(blockhash).toFutureOr
_ <- synchronize(previousBlock).toFutureOr
_ <- synchronize(newData).toFutureOr
_ <- synchronize(newBlock).toFutureOr
} yield ()
result.toFuture
@ -114,7 +111,7 @@ class LedgerSynchronizerService @Inject() (
logger.info(s"Filling holes to push block ${newBlock.height}, hash = ${newBlock.hash}")
val result = for {
_ <- sync(ledgerBlock.height.int + 1 until newBlock.height.int).toFutureOr
_ <- synchronize(newData).toFutureOr
_ <- synchronize(newBlock).toFutureOr
} yield ()
result.toFuture
@ -132,7 +129,7 @@ class LedgerSynchronizerService @Inject() (
.getOrElse {
val x = for {
_ <- trimTo(newBlock.height).toFutureOr
_ <- synchronize(newData).toFutureOr
_ <- synchronize(newBlock).toFutureOr
} yield ()
x.toFuture
}
@ -143,6 +140,16 @@ class LedgerSynchronizerService @Inject() (
}
}
private def appendBlock(newBlock: rpc.Block): FutureApplicationResult[Unit] = {
val result = for {
data <- getBlockData(newBlock).toFutureOr
(blockWithTransactions, tposContracts) = data
_ <- ledgerDataHandler.push(blockWithTransactions, tposContracts).toFutureOr
} yield ()
result.toFuture
}
/**
* Sync the given range to our ledger.
*/
@ -154,17 +161,16 @@ class LedgerSynchronizerService @Inject() (
val result = for {
_ <- previous.toFutureOr
blockhash <- xsnService.getBlockhash(Height(height)).toFutureOr
data <- getRPCBlock(blockhash).toFutureOr
_ <- synchronize(data).toFutureOr
block <- xsnService.getBlock(blockhash).toFutureOr
_ <- synchronize(block).toFutureOr
} yield ()
result.toFuture
}
}
private def getRPCBlock(blockhash: Blockhash): FutureApplicationResult[BlockData] = {
private def getBlockData(rpcBlock: rpc.Block): FutureApplicationResult[BlockData] = {
val result = for {
rpcBlock <- xsnService.getBlock(blockhash).toFutureOr
extractionMethod <- blockService.extractionMethod(rpcBlock).toFutureOr
data <- transactionRPCService.getTransactions(rpcBlock.transactions).toFutureOr
(transactions, contracts) = data

Loading…
Cancel
Save