Browse Source

server: Add the LedgerSynchronizerService (#37)

This is a simplified version of the BlockEventProcessor, it handles
the synchronization between the xsn and our ledger database, it also
takes care of a lot of corner cases to keep the synchronization process
flexible enough and linear.
prometheus-integration
Alexis Hernandez 7 years ago
parent
commit
03ed7a694b
  1. 6
      server/app/com/xsn/explorer/models/Transaction.scala
  2. 181
      server/app/com/xsn/explorer/services/LedgerSynchronizerService.scala
  3. 18
      server/app/com/xsn/explorer/util/Extensions.scala
  4. 258
      server/test/com/xsn/explorer/services/LedgerSynchronizerServiceSpec.scala
  5. 19
      server/test/resources/blocks/0000017ee4121cd8ae22f7321041ccb953d53828824217a9dc61a1c857facf85
  6. 34
      server/test/resources/transactions/c2c232be75be9dc0d761737173e8c3a22289d7302ac15e7924fab62f743fd6cb

6
server/app/com/xsn/explorer/models/Transaction.scala

@ -22,6 +22,12 @@ object Transaction {
tposOwnerAddress: Option[Address],
tposMerchantAddress: Option[Address])
/**
* Please note that the inputs might not be accurate.
*
* If the rpc transaction might not be complete, get the input value and address using
* the utxo index or the getTransaction method from the TransactionService..
*/
def fromRPC(tx: rpc.Transaction): Transaction = {
val inputs = tx.vin.zipWithIndex.map { case (vin, index) =>
Transaction.Input(index, vin.value, vin.address)

181
server/app/com/xsn/explorer/services/LedgerSynchronizerService.scala

@ -0,0 +1,181 @@
package com.xsn.explorer.services
import javax.inject.Inject
import com.alexitc.playsonify.core.FutureApplicationResult
import com.alexitc.playsonify.core.FutureOr.Implicits.{FutureListOps, FutureOps, OptionOps}
import com.xsn.explorer.data.async.{BlockFutureDataHandler, LedgerFutureDataHandler}
import com.xsn.explorer.errors.BlockNotFoundError
import com.xsn.explorer.models.rpc.Block
import com.xsn.explorer.models.{Blockhash, Height, Transaction}
import com.xsn.explorer.util.Extensions.FutureOrExt
import org.scalactic.Good
import org.slf4j.LoggerFactory
import scala.concurrent.{ExecutionContext, Future}
class LedgerSynchronizerService @Inject() (
xsnService: XSNService,
transactionService: TransactionService,
ledgerDataHandler: LedgerFutureDataHandler,
blockDataHandler: BlockFutureDataHandler)(
implicit ec: ExecutionContext) {
private val logger = LoggerFactory.getLogger(this.getClass)
/**
* Synchronize the given block with our ledger database.
*
* The synchronization involves a very complex logic in order to handle
* several corner cases, be sure to not call this method concurrently
* because the behavior is undefined.
*/
def synchronize(blockhash: Blockhash): FutureApplicationResult[Unit] = {
val result = for {
block <- xsnService.getBlock(blockhash).toFutureOr
transactions <- block.transactions.map(transactionService.getTransaction).toFutureOr
_ <- synchronize(block, transactions).toFutureOr
} yield ()
result.toFuture
}
private def synchronize(block: Block, transactions: List[Transaction]): FutureApplicationResult[Unit] = {
logger.info(s"Synchronize block ${block.height}, hash = ${block.hash}")
val result = for {
latestBlockMaybe <- blockDataHandler
.getLatestBlock()
.toFutureOr
.map(Option.apply)
.recoverFrom(BlockNotFoundError)(None)
_ <- latestBlockMaybe
.map { latestBlock => onLatestBlock(latestBlock, block, transactions) }
.getOrElse { onEmptyLedger(block, transactions) }
.toFutureOr
} yield ()
result.toFuture
}
/**
* 1. current ledger is empty:
* 1.1. the given block is the genensis block, it is added.
* 1.2. the given block is not the genesis block, sync everything until the given block.
*/
private def onEmptyLedger(block: Block, transactions: List[Transaction]): FutureApplicationResult[Unit] = {
if (block.height.int == 0) {
logger.info(s"Synchronize genesis block on empty ledger, hash = ${block.hash}")
ledgerDataHandler.push(block, transactions)
} else {
logger.info(s"Synchronize block ${block.height} on empty ledger, hash = ${block.hash}")
val result = for {
_ <- sync(0 until block.height.int).toFutureOr
_ <- synchronize(block, transactions).toFutureOr
} yield ()
result.toFuture
}
}
/**
* 2. current ledger has blocks until N, given block height H:
* 2.1. if N+1 == H and its previous blockhash is N, it is added.
* 2.2. if N+1 == H and its previous blockhash isn't N, pick the expected block N from H and apply the whole process with it, then, apply H.
* 2.3. if H > N+1, sync everything until H.
* 2.4. if H <= N, if the hash already exists, it is ignored.
* 2.5. if H <= N, if the hash doesn't exists, remove blocks from N to H (included), then, add the new H.
*/
private def onLatestBlock(ledgerBlock: Block, newBlock: Block, newTransactions: List[Transaction]): FutureApplicationResult[Unit] = {
if (ledgerBlock.height.int + 1 == newBlock.height.int &&
newBlock.previousBlockhash.contains(ledgerBlock.hash)) {
logger.info(s"Appending block ${newBlock.height}, hash = ${newBlock.hash}")
ledgerDataHandler.push(newBlock, newTransactions)
} else if (ledgerBlock.height.int + 1 == newBlock.height.int) {
logger.info(s"Reorganization to push block ${newBlock.height}, hash = ${newBlock.hash}")
val result = for {
blockhash <- newBlock.previousBlockhash.toFutureOr(BlockNotFoundError)
previousBlock <- xsnService.getBlock(blockhash).toFutureOr
previousTransactions <- previousBlock.transactions.map(transactionService.getTransaction).toFutureOr
_ <- synchronize(previousBlock, previousTransactions).toFutureOr
_ <- synchronize(newBlock, newTransactions).toFutureOr
} yield ()
result.toFuture
} else if (newBlock.height.int > ledgerBlock.height.int) {
logger.info(s"Filling holes to push block ${newBlock.height}, hash = ${newBlock.hash}")
val result = for {
_ <- sync(ledgerBlock.height.int + 1 until newBlock.height.int).toFutureOr
_ <- synchronize(newBlock, newTransactions).toFutureOr
} yield ()
result.toFuture
} else {
val result = for {
expectedBlockMaybe <- blockDataHandler
.getBy(newBlock.hash)
.toFutureOr
.map(Option.apply)
.recoverFrom(BlockNotFoundError)(None)
_ = logger.info(s"Checking possible existing block ${newBlock.height}, hash = ${newBlock.hash}, exists = ${expectedBlockMaybe.isDefined}")
_ <- expectedBlockMaybe
.map { _ => Future.successful(Good(())) }
.getOrElse {
val x = for {
_ <- trimTo(newBlock.height).toFutureOr
_ <- synchronize(newBlock, newTransactions).toFutureOr
} yield ()
x.toFuture
}
.toFutureOr
} yield ()
result.toFuture
}
}
/**
* Sync the given range to our ledger.
*/
private def sync(range: Range): FutureApplicationResult[Unit] = {
logger.info(s"Syncing block range = $range")
// TODO: check, it might be safer to use the nextBlockhash instead of the height
range.foldLeft[FutureApplicationResult[Unit]](Future.successful(Good(()))) { case (previous, height) =>
val result = for {
_ <- previous.toFutureOr
blockhash <- xsnService.getBlockhash(Height(height)).toFutureOr
block <- xsnService.getBlock(blockhash).toFutureOr
transactions <- block.transactions.map(transactionService.getTransaction).toFutureOr
_ <- synchronize(block, transactions).toFutureOr
} yield ()
result.toFuture
}
}
/**
* Trim the ledger until the given block height, if the height is 4,
* the last stored block will be 3.
*/
private def trimTo(height: Height): FutureApplicationResult[Unit] = {
val result = ledgerDataHandler
.pop()
.toFutureOr
.flatMap { block =>
logger.info(s"Trimmed block ${block.height} from the ledger")
val result = if (block.height == height) {
Future.successful(Good(()))
} else {
trimTo(height)
}
result.toFutureOr
}
result.toFuture
}
}

18
server/app/com/xsn/explorer/util/Extensions.scala

@ -1,5 +1,11 @@
package com.xsn.explorer.util
import com.alexitc.playsonify.core.FutureOr
import com.alexitc.playsonify.models.ApplicationError
import org.scalactic.{Bad, Good, One}
import scala.concurrent.ExecutionContext
object Extensions {
private val SatoshiScale = 100000000L
@ -19,4 +25,16 @@ object Extensions {
}
}
}
implicit class FutureOrExt[+A](val inner: FutureOr[A]) {
def recoverFrom[B >: A](error: ApplicationError)(f: => B)(implicit ec: ExecutionContext): FutureOr[B] = {
val future = inner.toFuture.map {
case Good(result) => Good(result)
case Bad(One(e)) if e == error => Good(f)
case Bad(errors) => Bad(errors)
}
new FutureOr(future)
}
}
}

258
server/test/com/xsn/explorer/services/LedgerSynchronizerServiceSpec.scala

@ -0,0 +1,258 @@
package com.xsn.explorer.services
import com.alexitc.playsonify.core.FutureApplicationResult
import com.alexitc.playsonify.validators.PaginatedQueryValidator
import com.xsn.explorer.data.anorm.dao.{BalancePostgresDAO, BlockPostgresDAO, TransactionPostgresDAO}
import com.xsn.explorer.data.anorm.interpreters.FieldOrderingSQLInterpreter
import com.xsn.explorer.data.anorm.{BalancePostgresDataHandler, BlockPostgresDataHandler, LedgerPostgresDataHandler, TransactionPostgresDataHandler}
import com.xsn.explorer.data.async.{BlockFutureDataHandler, LedgerFutureDataHandler, TransactionFutureDataHandler}
import com.xsn.explorer.data.common.PostgresDataHandlerSpec
import com.xsn.explorer.errors.BlockNotFoundError
import com.xsn.explorer.helpers._
import com.xsn.explorer.models.rpc.Block
import com.xsn.explorer.models.{Blockhash, Height}
import com.xsn.explorer.parsers.TransactionOrderingParser
import org.scalactic.{Bad, Good, One, Or}
import org.scalatest.BeforeAndAfter
import org.scalatest.concurrent.ScalaFutures
import scala.concurrent.Future
class LedgerSynchronizerServiceSpec extends PostgresDataHandlerSpec with BeforeAndAfter with ScalaFutures {
lazy val dataHandler = new LedgerPostgresDataHandler(
database,
new BlockPostgresDAO,
new TransactionPostgresDAO(new FieldOrderingSQLInterpreter),
new BalancePostgresDAO(new FieldOrderingSQLInterpreter))
lazy val transactionDataHandler = new TransactionPostgresDataHandler(
database,
new TransactionPostgresDAO(new FieldOrderingSQLInterpreter))
lazy val blockDataHandler = new BlockPostgresDataHandler(database, new BlockPostgresDAO)
val blockList = List(
BlockLoader.get("00000c822abdbb23e28f79a49d29b41429737c6c7e15df40d1b1f1b35907ae34"),
BlockLoader.get("000003fb382f6892ae96594b81aa916a8923c70701de4e7054aac556c7271ef7"),
BlockLoader.get("000004645e2717b556682e3c642a4c6e473bf25c653ff8e8c114a3006040ffb8"),
BlockLoader.get("00000766115b26ecbc09cd3a3db6870fdaf2f049d65a910eb2f2b48b566ca7bd"),
BlockLoader.get("00000b59875e80b0afc6c657bc5318d39e03532b7d97fb78a4c7bd55c4840c32"),
BlockLoader.get("00000267225f7dba55d9a3493740e7f0dde0f28a371d2c3b42e7676b5728d020"),
BlockLoader.get("0000017ee4121cd8ae22f7321041ccb953d53828824217a9dc61a1c857facf85")
)
val genesis = blockList(0)
before {
clearDatabase()
}
"synchronize" should {
"add the genensis block to the empty ledger" in {
val synchronizer = ledgerSynchronizerService(genesis)
whenReady(synchronizer.synchronize(genesis.hash)) { result =>
result mustEqual Good(())
verifyLedger(genesis)
}
}
"add the old missing blocks blocks while adding block N to the empty ledger" in {
val block = blockList.last
val synchronizer = ledgerSynchronizerService(blockList: _*)
whenReady(synchronizer.synchronize(block.hash)) { result =>
result mustEqual Good(())
verifyLedger(blockList: _*)
}
}
"append a block to the latest block" in {
val synchronizer = ledgerSynchronizerService(blockList: _*)
whenReady(synchronizer.synchronize(genesis.hash)) { _ mustEqual Good(()) }
blockList.drop(1).foreach { block =>
whenReady(synchronizer.synchronize(block.hash)) { _ mustEqual Good(()) }
}
verifyLedger(blockList: _*)
}
"ignore a duplicated block" in {
val synchronizer = ledgerSynchronizerService(blockList: _*)
createBlocks(synchronizer, blockList: _*)
val block = blockList(3)
whenReady(synchronizer.synchronize(block.hash)) { _ mustEqual Good(()) }
verifyLedger(blockList: _*)
}
"add the old missing blocks blocks while adding block N to a ledger with some blocks" in {
val initialBlocks = blockList.take(3)
val synchronizer = ledgerSynchronizerService(blockList: _*)
createBlocks(synchronizer, initialBlocks: _*)
val block = blockList.last
whenReady(synchronizer.synchronize(block.hash)) { result =>
result mustEqual Good(())
verifyLedger(blockList: _*)
}
}
"handle reorganization, ledger has 3 blocks, a rechain occurs from block 2 while adding new block 3" in {
val block1 = blockList(1)
val block2 = blockList(2)
val block3 = blockList(3)
val newBlock2 = blockList(4).copy(previousBlockhash = block2.previousBlockhash, height = block2.height)
val newBlock3 = blockList(5).copy(previousBlockhash = Some(newBlock2.hash), height = Height(3))
val initialBlocks = List(genesis, block1, block2, block3)
createBlocks(ledgerSynchronizerService(initialBlocks: _*), initialBlocks: _*)
val finalBlocks = List(
genesis,
block1.copy(nextBlockhash = Some(newBlock2.hash)),
newBlock2.copy(nextBlockhash = Some(newBlock3.hash)),
newBlock3)
val synchronizer = ledgerSynchronizerService(finalBlocks: _*)
whenReady(synchronizer.synchronize(newBlock3.hash)) { result =>
result mustEqual Good(())
verifyLedger(finalBlocks: _*)
}
}
"handle reorganization, ledger has 3 blocks, a rechain occurs from block 2 while adding new block 4" in {
val block1 = blockList(1)
val block2 = blockList(2)
val block3 = blockList(3)
val newBlock2 = blockList(4).copy(previousBlockhash = block2.previousBlockhash, height = block2.height)
val newBlock3 = blockList(5).copy(previousBlockhash = Some(newBlock2.hash), height = Height(3))
val newBlock4 = blockList(6).copy(previousBlockhash = Some(newBlock3.hash), height = Height(4))
val initialBlocks = List(genesis, block1, block2, block3)
createBlocks(ledgerSynchronizerService(initialBlocks: _*), initialBlocks: _*)
val finalBlocks = List(
genesis,
block1.copy(nextBlockhash = Some(newBlock2.hash)),
newBlock2.copy(nextBlockhash = Some(newBlock3.hash)),
newBlock3.copy(nextBlockhash = Some(newBlock4.hash)),
newBlock4)
val synchronizer = ledgerSynchronizerService(finalBlocks: _*)
whenReady(synchronizer.synchronize(newBlock4.hash)) { result =>
result mustEqual Good(())
verifyLedger(finalBlocks: _*)
}
}
"handle reorganization, ledger has 6 blocks, a rechain occurs from block 2 while adding new block 2" in {
val initialBlocks = blockList.take(6)
createBlocks(ledgerSynchronizerService(initialBlocks: _*), initialBlocks: _*)
val block1 = blockList(1)
val newBlock2 = blockList.drop(6).head.copy(previousBlockhash = Some(block1.hash), height = Height(2))
val finalBlocks = List(
genesis,
block1.copy(nextBlockhash = Some(newBlock2.hash)),
newBlock2
)
val synchronizer = ledgerSynchronizerService(finalBlocks: _*)
whenReady(synchronizer.synchronize(newBlock2.hash)) { result =>
result mustEqual Good(())
verifyLedger(finalBlocks: _*)
}
}
"process a block without spent index on transactions" in {
val block = BlockLoader.get("000001ff95f22b8d82db14a5c5e9f725e8239e548be43c668766e7ddaee81924")
.copy(previousBlockhash = None, height = Height(0))
val synchronizer = ledgerSynchronizerService(block)
whenReady(synchronizer.synchronize(block.hash)) { result =>
result.isGood mustEqual true
val balanceDataHandler = new BalancePostgresDataHandler(database, new BalancePostgresDAO(new FieldOrderingSQLInterpreter))
val balance = balanceDataHandler.getBy(DataHelper.createAddress("XdJnCKYNwzCz8ATv8Eu75gonaHyfr9qXg9"))
balance.get.spent mustEqual BigDecimal("76500000.000000000000000")
}
}
}
private def verifyLedger(blocks: Block*) = {
countBlocks() mustEqual blocks.size
blocks.foreach { block =>
val dbBlock = blockDataHandler.getBy(block.hash).get
dbBlock.height mustEqual block.height
dbBlock.previousBlockhash mustEqual block.previousBlockhash
if (block == blocks.last) {
dbBlock.nextBlockhash.isEmpty mustEqual true
} else {
dbBlock.nextBlockhash mustEqual block.nextBlockhash
}
}
}
private def countBlocks() = {
database.withConnection { implicit conn =>
_root_.anorm.SQL("""SELECT COUNT(*) FROM blocks""").as(_root_.anorm.SqlParser.scalar[Int].single)
}
}
private def createBlocks(synchronizer: LedgerSynchronizerService, blocks: Block*) = {
blocks
.foreach { block =>
whenReady(synchronizer.synchronize(block.hash)) { result =>
result.isGood mustEqual true
}
}
}
private def ledgerSynchronizerService(blocks: Block*): LedgerSynchronizerService = {
val xsnService = new FileBasedXSNService {
override def getBlock(blockhash: Blockhash): FutureApplicationResult[Block] = {
blocks
.find(_.hash == blockhash)
.map { block => Future.successful(Good(XSNService.cleanGenesisBlock(block))) }
.getOrElse {
Future.successful(Bad(BlockNotFoundError).accumulating)
}
}
override def getLatestBlock(): FutureApplicationResult[Block] = {
val block = XSNService.cleanGenesisBlock(blocks.maxBy(_.height.int))
Future.successful(Good(block))
}
override def getBlockhash(height: Height): FutureApplicationResult[Blockhash] = {
val maybe = blocks.find(_.height == height).map(_.hash)
val result = Or.from(maybe, One(BlockNotFoundError))
Future.successful(result)
}
}
ledgerSynchronizerService(xsnService)
}
private def ledgerSynchronizerService(xsnService: XSNService): LedgerSynchronizerService = {
val transactionService = new TransactionService(
new PaginatedQueryValidator,
new TransactionOrderingParser,
xsnService,
new TransactionFutureDataHandler(transactionDataHandler)(Executors.databaseEC))
new LedgerSynchronizerService(
xsnService,
transactionService,
new LedgerFutureDataHandler(dataHandler)(Executors.databaseEC),
new BlockFutureDataHandler(blockDataHandler)(Executors.databaseEC))
}
}

19
server/test/resources/blocks/0000017ee4121cd8ae22f7321041ccb953d53828824217a9dc61a1c857facf85

@ -0,0 +1,19 @@
{
"hash": "0000017ee4121cd8ae22f7321041ccb953d53828824217a9dc61a1c857facf85",
"confirmations": 163684,
"size": 179,
"height": 6,
"version": 536870912,
"merkleroot": "c2c232be75be9dc0d761737173e8c3a22289d7302ac15e7924fab62f743fd6cb",
"tx": [
"c2c232be75be9dc0d761737173e8c3a22289d7302ac15e7924fab62f743fd6cb"
],
"time": 1520276361,
"mediantime": 1520276303,
"nonce": 3562,
"bits": "1e0ffff0",
"difficulty": 0.000244140625,
"chainwork": "0000000000000000000000000000000000000000000000000000000000700070",
"previousblockhash": "00000267225f7dba55d9a3493740e7f0dde0f28a371d2c3b42e7676b5728d020",
"nextblockhash": "000009b8e692cda20b4d3347a95aa9ad1aa9fba3f242e3ba2ff11929a73e3914"
}

34
server/test/resources/transactions/c2c232be75be9dc0d761737173e8c3a22289d7302ac15e7924fab62f743fd6cb

@ -0,0 +1,34 @@
{
"hex": "01000000010000000000000000000000000000000000000000000000000000000000000000ffffffff03560101ffffffff010000000000000000232103e8c52f2c5155771492907095753a43ce776e1fa7c5e769a67a9f3db4467ec029ac00000000",
"txid": "c2c232be75be9dc0d761737173e8c3a22289d7302ac15e7924fab62f743fd6cb",
"size": 98,
"version": 1,
"locktime": 0,
"vin": [
{
"coinbase": "560101",
"sequence": 4294967295
}
],
"vout": [
{
"value": 0,
"valueSat": 0,
"n": 0,
"scriptPubKey": {
"asm": "03e8c52f2c5155771492907095753a43ce776e1fa7c5e769a67a9f3db4467ec029 OP_CHECKSIG",
"hex": "2103e8c52f2c5155771492907095753a43ce776e1fa7c5e769a67a9f3db4467ec029ac",
"reqSigs": 1,
"type": "pubkey",
"addresses": [
"XdJnCKYNwzCz8ATv8Eu75gonaHyfr9qXg9"
]
}
}
],
"blockhash": "0000017ee4121cd8ae22f7321041ccb953d53828824217a9dc61a1c857facf85",
"height": 6,
"confirmations": 163685,
"time": 1520276361,
"blocktime": 1520276361
}
Loading…
Cancel
Save