Browse Source

server: Update the BlockEventsProcessor flow

When a block is processed, in case it can't be fully retrieved from the
rpc server, the block is ignored.

When a block is processed, if there is an existing block having the same
height, it will be replaced by the new one.

This is a part for fixing the bug #6
scalafmt-draft
Alexis Hernandez 7 years ago
parent
commit
b4120e284b
  1. 2
      server/app/com/xsn/explorer/data/DatabaseSeeder.scala
  2. 2
      server/app/com/xsn/explorer/data/anorm/DatabasePostgresSeeder.scala
  3. 4
      server/app/com/xsn/explorer/data/async/DatabaseFutureSeeder.scala
  4. 39
      server/app/com/xsn/explorer/processors/BlockEventsProcessor.scala
  5. 21
      server/test/com/xsn/explorer/data/BlockPostgresDataHandlerSpec.scala
  6. 128
      server/test/com/xsn/explorer/processors/BlockEventsProcessorSpec.scala

2
server/app/com/xsn/explorer/data/DatabaseSeeder.scala

@ -25,7 +25,7 @@ trait DatabaseSeeder[F[_]] {
* The database has some blocks but there is a rechain happening, we need to
* replace our current latest block with the new latest block.
*/
def replaceLatestBlock(command: ReplaceBlockCommand): F[Unit]
def replaceBlock(command: ReplaceBlockCommand): F[Unit]
/**
* The database has some blocks but the chain is not complete, we are inserting

2
server/app/com/xsn/explorer/data/anorm/DatabasePostgresSeeder.scala

@ -52,7 +52,7 @@ class DatabasePostgresSeeder @Inject() (
.getOrElse(throw new RuntimeException("Unable to add the new latest block"))
}
override def replaceLatestBlock(command: ReplaceBlockCommand): ApplicationResult[Unit] = withTransaction { implicit conn =>
override def replaceBlock(command: ReplaceBlockCommand): ApplicationResult[Unit] = withTransaction { implicit conn =>
val createCommand = CreateBlockCommand(command.newBlock, command.newTransactions)
val result = for {

4
server/app/com/xsn/explorer/data/async/DatabaseFutureSeeder.scala

@ -21,8 +21,8 @@ class DatabaseFutureSeeder @Inject() (
blockingSeeder.newLatestBlock(command)
}
override def replaceLatestBlock(command: DatabaseSeeder.ReplaceBlockCommand): FutureApplicationResult[Unit] = Future {
blockingSeeder.replaceLatestBlock(command)
override def replaceBlock(command: DatabaseSeeder.ReplaceBlockCommand): FutureApplicationResult[Unit] = Future {
blockingSeeder.replaceBlock(command)
}
override def insertPendingBlock(command: DatabaseSeeder.CreateBlockCommand): FutureApplicationResult[Unit] = Future {

39
server/app/com/xsn/explorer/processors/BlockEventsProcessor.scala

@ -6,7 +6,7 @@ import com.alexitc.playsonify.core.FutureApplicationResult
import com.alexitc.playsonify.core.FutureOr.Implicits.{FutureListOps, FutureOps}
import com.xsn.explorer.data.DatabaseSeeder
import com.xsn.explorer.data.async.{BlockFutureDataHandler, DatabaseFutureSeeder}
import com.xsn.explorer.errors.BlockNotFoundError
import com.xsn.explorer.errors._
import com.xsn.explorer.models.rpc.Block
import com.xsn.explorer.models.{Blockhash, Transaction}
import com.xsn.explorer.services.{TransactionService, XSNService}
@ -52,7 +52,12 @@ class BlockEventsProcessor @Inject() (
r <- newLatestBlock(block, transactions).toFutureOr
} yield r
result.toFuture
result.toFuture.map {
case Good(r) => Good(r)
case Bad(One(BlockNotFoundError)) => Good(MissingBlockIgnored)
case Bad(One(TransactionNotFoundError)) => Good(MissingBlockIgnored)
case Bad(errors) => Bad(errors)
}
}
private def newLatestBlock(newBlock: Block, newTransactions: List[Transaction]): FutureApplicationResult[Result] = {
@ -63,7 +68,7 @@ class BlockEventsProcessor @Inject() (
newTransactions = newTransactions)
val result = for {
_ <- databaseSeeder.replaceLatestBlock(command).toFutureOr
_ <- databaseSeeder.replaceBlock(command).toFutureOr
} yield RechainDone(orphanBlock, newBlock)
result.toFuture
@ -90,6 +95,21 @@ class BlockEventsProcessor @Inject() (
.toFuture
}
def onRepeatedBlockHeight(): FutureApplicationResult[Result] = {
val result = for {
orphanBlock <- blockDataHandler.getBy(newBlock.height).toFutureOr
replaceCommand = DatabaseSeeder.ReplaceBlockCommand(
orphanBlock = orphanBlock,
newBlock = newBlock,
newTransactions = newTransactions)
_ <- databaseSeeder.replaceBlock(replaceCommand).toFutureOr
} yield ReplacedByBlockHeight
result.toFuture
}
def onMissingBlock(): FutureApplicationResult[Result] = {
blockDataHandler
.getBy(newBlock.hash)
@ -99,12 +119,13 @@ class BlockEventsProcessor @Inject() (
Future.successful { Good(ExistingBlockIgnored(newBlock)) }
case Bad(One(BlockNotFoundError)) =>
val command = DatabaseSeeder.CreateBlockCommand(newBlock, newTransactions)
val createCommand = DatabaseSeeder.CreateBlockCommand(newBlock, newTransactions)
databaseSeeder
.insertPendingBlock(command)
.toFutureOr
.map(_ => MissingBlockProcessed(newBlock))
.toFuture
.insertPendingBlock(createCommand)
.flatMap {
case Good(_) => Future.successful { Good(MissingBlockProcessed(newBlock)) }
case Bad(One(PostgresForeignKeyViolationError("height", _))) => onRepeatedBlockHeight()
}
case Bad(errors) =>
Future.successful(Bad(errors))
@ -155,4 +176,6 @@ object BlockEventsProcessor {
case class ExistingBlockIgnored(block: Block) extends Result
case class NewBlockAppended(block: Block) extends Result
case class RechainDone(orphanBlock: Block, newBlock: Block) extends Result
case object MissingBlockIgnored extends Result
case object ReplacedByBlockHeight extends Result
}

21
server/test/com/xsn/explorer/data/BlockPostgresDataHandlerSpec.scala

@ -49,7 +49,7 @@ class BlockPostgresDataHandlerSpec extends PostgresDataHandlerSpec with BeforeAn
}
}
"getBy" should {
"getBy blockhash" should {
"return a block" in {
val block = BlockLoader.get("1ca318b7a26ed67ca7c8c9b5069d653ba224bf86989125d1dfbb0973b7d6a5e0")
@ -68,6 +68,25 @@ class BlockPostgresDataHandlerSpec extends PostgresDataHandlerSpec with BeforeAn
}
}
"getBy height" should {
"return a block" in {
val block = BlockLoader.get("1ca318b7a26ed67ca7c8c9b5069d653ba224bf86989125d1dfbb0973b7d6a5e0")
dataHandler.insert(block)
val result = dataHandler.getBy(block.height)
result.isGood mustEqual true
matches(block, result.get)
}
"fail on block not found" in {
val block = BlockLoader.get("1ca318b7a26ed67ca7c8c9b5069d653ba224bf86989125d1dfbb0973b7d6a5e0")
val result = dataHandler.getBy(block.height)
result mustEqual Bad(BlockNotFoundError).accumulating
}
}
"delete" should {
"delete a block" in {
val block = BlockLoader.get("1ca318b7a26ed67ca7c8c9b5069d653ba224bf86989125d1dfbb0973b7d6a5e0")

128
server/test/com/xsn/explorer/processors/BlockEventsProcessorSpec.scala

@ -1,20 +1,25 @@
package com.xsn.explorer.processors
import com.alexitc.playsonify.core.FutureApplicationResult
import com.alexitc.playsonify.models._
import com.xsn.explorer.data.anorm.dao.{BalancePostgresDAO, BlockPostgresDAO, StatisticsPostgresDAO, TransactionPostgresDAO}
import com.xsn.explorer.data.anorm.interpreters.FieldOrderingSQLInterpreter
import com.xsn.explorer.data.anorm.{BalancePostgresDataHandler, BlockPostgresDataHandler, DatabasePostgresSeeder, StatisticsPostgresDataHandler}
import com.xsn.explorer.data.async.{BlockFutureDataHandler, DatabaseFutureSeeder}
import com.xsn.explorer.data.common.PostgresDataHandlerSpec
import com.xsn.explorer.errors.{BlockNotFoundError, TransactionNotFoundError}
import com.xsn.explorer.helpers.{BlockLoader, Executors, FileBasedXSNService}
import com.xsn.explorer.models.fields.BalanceField
import com.xsn.explorer.models.rpc.Block
import com.xsn.explorer.processors.BlockEventsProcessor.{NewBlockAppended, RechainDone}
import com.xsn.explorer.models.rpc.{Block, Transaction}
import com.xsn.explorer.models.{Blockhash, TransactionId}
import com.xsn.explorer.processors.BlockEventsProcessor.{MissingBlockIgnored, NewBlockAppended, RechainDone, ReplacedByBlockHeight}
import com.xsn.explorer.services.TransactionService
import org.scalactic.Good
import org.scalactic.{Bad, Good}
import org.scalatest.BeforeAndAfter
import org.scalatest.concurrent.ScalaFutures
import scala.concurrent.Future
class BlockEventsProcessorSpec extends PostgresDataHandlerSpec with ScalaFutures with BeforeAndAfter {
lazy val dataHandler = new BlockPostgresDataHandler(database, new BlockPostgresDAO)
@ -167,6 +172,123 @@ class BlockEventsProcessorSpec extends PostgresDataHandlerSpec with ScalaFutures
balance.spent mustEqual BigDecimal(0)
}
}
"ignore orphan block on rare rechain events when the rpc server doesn't have the block anymore" in {
// see https://github.com/X9Developers/block-explorer/issues/6
val block1 = BlockLoader.get("000003fb382f6892ae96594b81aa916a8923c70701de4e7054aac556c7271ef7")
val block2 = BlockLoader.get("000004645e2717b556682e3c642a4c6e473bf25c653ff8e8c114a3006040ffb8")
val block3 = BlockLoader.get("00000766115b26ecbc09cd3a3db6870fdaf2f049d65a910eb2f2b48b566ca7bd")
val xsnService = new FileBasedXSNService {
override def getBlock(blockhash: Blockhash): FutureApplicationResult[Block] = {
if (blockhash == block3.hash) {
Future.successful(Bad(BlockNotFoundError).accumulating)
} else {
super.getBlock(blockhash)
}
}
}
val processor = new BlockEventsProcessor(
xsnService,
new TransactionService(xsnService)(Executors.globalEC),
new DatabaseFutureSeeder(dataSeeder)(Executors.databaseEC),
new BlockFutureDataHandler(dataHandler)(Executors.databaseEC))
List(block1, block2)
.map(_.hash)
.map(processor.newLatestBlock)
.foreach { whenReady(_) { _.isGood mustEqual true } }
/**
* When processing the latest block, a rechain event has occurred in the rpc server which leads to
* a case that we can't retrieve the block information, the block should be ignored.
*/
whenReady(processor.newLatestBlock(block3.hash)) { result =>
result mustEqual Good(MissingBlockIgnored)
val blocks = List(block1, block2)
verifyBlockchain(blocks)
}
}
"ignore orphan block on rare rechain events when the rpc server doesn't have the a transaction from the block anymore" in {
// see https://github.com/X9Developers/block-explorer/issues/6
val block1 = BlockLoader.get("000003fb382f6892ae96594b81aa916a8923c70701de4e7054aac556c7271ef7")
val block2 = BlockLoader.get("000004645e2717b556682e3c642a4c6e473bf25c653ff8e8c114a3006040ffb8")
val block3 = BlockLoader.get("00000766115b26ecbc09cd3a3db6870fdaf2f049d65a910eb2f2b48b566ca7bd")
val xsnService = new FileBasedXSNService {
override def getTransaction(txid: TransactionId): FutureApplicationResult[Transaction] = {
if (txid == block3.transactions.last) {
Future.successful(Bad(TransactionNotFoundError).accumulating)
} else {
super.getTransaction(txid)
}
}
}
val processor = new BlockEventsProcessor(
xsnService,
new TransactionService(xsnService)(Executors.globalEC),
new DatabaseFutureSeeder(dataSeeder)(Executors.databaseEC),
new BlockFutureDataHandler(dataHandler)(Executors.databaseEC))
List(block1, block2)
.map(_.hash)
.map(processor.newLatestBlock)
.foreach { whenReady(_) { _.isGood mustEqual true } }
/**
* When processing the latest block, a rechain event has occurred in the rpc server which leads to
* a case that we can't retrieve a specific transaction, the block should be ignored.
*/
whenReady(processor.newLatestBlock(block3.hash)) { result =>
result mustEqual Good(MissingBlockIgnored)
val blocks = List(block1, block2)
verifyBlockchain(blocks)
}
}
"remove orphan block on rare rechain events when the block height already exists" in {
// see https://github.com/X9Developers/block-explorer/issues/6
val block1 = BlockLoader.get("000003fb382f6892ae96594b81aa916a8923c70701de4e7054aac556c7271ef7")
val block2 = BlockLoader.get("000004645e2717b556682e3c642a4c6e473bf25c653ff8e8c114a3006040ffb8")
val block3 = BlockLoader.get("00000766115b26ecbc09cd3a3db6870fdaf2f049d65a910eb2f2b48b566ca7bd")
.copy(height = block2.height)
.copy(previousBlockhash = block2.previousBlockhash)
val xsnService = new FileBasedXSNService {
override def getBlock(blockhash: Blockhash): FutureApplicationResult[Block] = {
if (blockhash == block3.hash) {
Future.successful(Good(block3))
} else {
super.getBlock(blockhash)
}
}
}
val processor = new BlockEventsProcessor(
xsnService,
new TransactionService(xsnService)(Executors.globalEC),
new DatabaseFutureSeeder(dataSeeder)(Executors.databaseEC),
new BlockFutureDataHandler(dataHandler)(Executors.databaseEC))
List(block1, block2)
.map(_.hash)
.map(processor.newLatestBlock)
.foreach { whenReady(_) { _.isGood mustEqual true } }
whenReady(processor.newLatestBlock(block3.hash)) { result =>
result mustEqual Good(ReplacedByBlockHeight)
val blocks = List(block1, block3)
verifyBlockchain(blocks)
// ensure block2 has been removed
dataHandler.getBy(block2.hash) mustEqual Bad(BlockNotFoundError).accumulating
}
}
}
private def verifyBlockchain(blocks: List[Block]) = {

Loading…
Cancel
Save