Browse Source

server: Update the BlockEventsProcessor to handle possible missing blocks

scalafmt-draft
Alexis Hernandez 7 years ago
parent
commit
8665da9119
  1. 47
      server/app/com/xsn/explorer/data/DatabaseSeeder.scala
  2. 32
      server/app/com/xsn/explorer/data/anorm/DatabasePostgresSeeder.scala
  3. 31
      server/app/com/xsn/explorer/data/async/DatabaseFutureSeeder.scala
  4. 5
      server/app/com/xsn/explorer/modules/DataHandlerModule.scala
  5. 104
      server/app/com/xsn/explorer/processors/BlockEventsProcessor.scala
  6. 10
      server/app/com/xsn/explorer/tasks/BlockSynchronizerTask.scala
  7. 21
      server/test/com/xsn/explorer/processors/BlockEventsProcessorSpec.scala

47
server/app/com/xsn/explorer/data/DatabaseSeeder.scala

@ -0,0 +1,47 @@
package com.xsn.explorer.data
import com.alexitc.playsonify.core.ApplicationResult
import com.xsn.explorer.models.Transaction
import com.xsn.explorer.models.rpc.Block
import scala.language.higherKinds
trait DatabaseSeeder[F[_]] {
import DatabaseSeeder._
/**
* There are no blocks, we are adding the first one which could possibly
* not be the genesis block but the first one on our database.
*/
def firstBlock(command: CreateBlockCommand): F[Unit]
/**
* The database has some blocks, we are appending a block to our latest block.
*/
def newLatestBlock(command: CreateBlockCommand): F[Unit]
/**
* The database has some blocks but there is a rechain happening, we need to
* replace our current latest block with the new latest block.
*/
def replaceLatestBlock(command: ReplaceBlockCommand): F[Unit]
/**
* The database has some blocks but the chain is not complete, we are inserting
* a previous block that's missing in our chain.
*/
def insertPendingBlock(command: CreateBlockCommand): F[Unit]
}
object DatabaseSeeder {
case class CreateBlockCommand(block: Block, transactions: List[Transaction])
case class DeleteBlockCommand(block: Block, transactions: List[Transaction])
case class ReplaceBlockCommand(
orphanBlock: Block, orphanTransactions: List[Transaction],
newBlock: Block, newTransactions: List[Transaction])
}
trait DatabaseBlockingSeeder extends DatabaseSeeder[ApplicationResult]

32
server/app/com/xsn/explorer/data/anorm/DatabasePostgresSeeder.scala

@ -4,8 +4,9 @@ import java.sql.Connection
import javax.inject.Inject
import com.alexitc.playsonify.core.ApplicationResult
import com.xsn.explorer.data.DatabaseBlockingSeeder
import com.xsn.explorer.data.DatabaseSeeder._
import com.xsn.explorer.data.anorm.dao.{BalancePostgresDAO, BlockPostgresDAO, TransactionPostgresDAO}
import com.xsn.explorer.models.rpc.Block
import com.xsn.explorer.models.{Address, Balance, Transaction}
import com.xsn.explorer.util.Extensions.ListOptionExt
import org.scalactic.Good
@ -16,11 +17,10 @@ class DatabasePostgresSeeder @Inject() (
blockPostgresDAO: BlockPostgresDAO,
transactionPostgresDAO: TransactionPostgresDAO,
addressPostgresDAO: BalancePostgresDAO)
extends AnormPostgresDataHandler {
extends DatabaseBlockingSeeder
with AnormPostgresDataHandler {
import DatabasePostgresSeeder._
def firstBlock(command: CreateBlockCommand): ApplicationResult[Unit] = database.withTransaction { implicit conn =>
override def firstBlock(command: CreateBlockCommand): ApplicationResult[Unit] = database.withTransaction { implicit conn =>
val result = upsertBlockCascade(command)
result
@ -28,13 +28,8 @@ class DatabasePostgresSeeder @Inject() (
.getOrElse(throw new RuntimeException("Unable to add the first block"))
}
/**
* Creates the new latest block assuming there is a previous block.
*
* @param command
* @return
*/
def newLatestBlock(command: CreateBlockCommand): ApplicationResult[Unit] = withTransaction { implicit conn =>
override def newLatestBlock(command: CreateBlockCommand): ApplicationResult[Unit] = withTransaction { implicit conn =>
val result = for {
// link previous block
previousBlockhash <- command.block.previousBlockhash
@ -49,7 +44,7 @@ class DatabasePostgresSeeder @Inject() (
.getOrElse(throw new RuntimeException("Unable to add the new latest block"))
}
def replaceLatestBlock(command: ReplaceBlockCommand): ApplicationResult[Unit] = withTransaction { implicit conn =>
override def replaceLatestBlock(command: ReplaceBlockCommand): ApplicationResult[Unit] = withTransaction { implicit conn =>
val deleteCommand = DeleteBlockCommand(command.orphanBlock, command.orphanTransactions)
val createCommand = CreateBlockCommand(command.newBlock, command.newTransactions)
@ -63,7 +58,7 @@ class DatabasePostgresSeeder @Inject() (
.getOrElse(throw new RuntimeException("Unable to replace latest block"))
}
def insertOldBlock(command: CreateBlockCommand): ApplicationResult[Unit] = withTransaction { implicit conn =>
override def insertPendingBlock(command: CreateBlockCommand): ApplicationResult[Unit] = withTransaction { implicit conn =>
val result = for {
_ <- upsertBlockCascade(command)
} yield ()
@ -154,12 +149,3 @@ class DatabasePostgresSeeder @Inject() (
.mapValues { list => list.map(_._2).sum }
}
}
object DatabasePostgresSeeder {
case class CreateBlockCommand(block: Block, transactions: List[Transaction])
case class DeleteBlockCommand(block: Block, transactions: List[Transaction])
case class ReplaceBlockCommand(
orphanBlock: Block, orphanTransactions: List[Transaction],
newBlock: Block, newTransactions: List[Transaction])
}

31
server/app/com/xsn/explorer/data/async/DatabaseFutureSeeder.scala

@ -0,0 +1,31 @@
package com.xsn.explorer.data.async
import javax.inject.Inject
import com.alexitc.playsonify.core.FutureApplicationResult
import com.xsn.explorer.data.{DatabaseBlockingSeeder, DatabaseSeeder}
import com.xsn.explorer.executors.DatabaseExecutionContext
import scala.concurrent.Future
class DatabaseFutureSeeder @Inject() (
blockingSeeder: DatabaseBlockingSeeder)(
implicit ec: DatabaseExecutionContext)
extends DatabaseSeeder[FutureApplicationResult] {
override def firstBlock(command: DatabaseSeeder.CreateBlockCommand): FutureApplicationResult[Unit] = Future {
blockingSeeder.firstBlock(command)
}
override def newLatestBlock(command: DatabaseSeeder.CreateBlockCommand): FutureApplicationResult[Unit] = Future {
blockingSeeder.newLatestBlock(command)
}
override def replaceLatestBlock(command: DatabaseSeeder.ReplaceBlockCommand): FutureApplicationResult[Unit] = Future {
blockingSeeder.replaceLatestBlock(command)
}
override def insertPendingBlock(command: DatabaseSeeder.CreateBlockCommand): FutureApplicationResult[Unit] = Future {
blockingSeeder.insertPendingBlock(command)
}
}

5
server/app/com/xsn/explorer/modules/DataHandlerModule.scala

@ -1,8 +1,8 @@
package com.xsn.explorer.modules
import com.google.inject.AbstractModule
import com.xsn.explorer.data.anorm.{BalancePostgresDataHandler, BlockPostgresDataHandler, StatisticsPostgresDataHandler}
import com.xsn.explorer.data.{BalanceBlockingDataHandler, BlockBlockingDataHandler, StatisticsBlockingDataHandler}
import com.xsn.explorer.data.anorm.{BalancePostgresDataHandler, BlockPostgresDataHandler, DatabasePostgresSeeder, StatisticsPostgresDataHandler}
import com.xsn.explorer.data.{BalanceBlockingDataHandler, BlockBlockingDataHandler, DatabaseBlockingSeeder, StatisticsBlockingDataHandler}
class DataHandlerModule extends AbstractModule {
@ -10,5 +10,6 @@ class DataHandlerModule extends AbstractModule {
bind(classOf[BlockBlockingDataHandler]).to(classOf[BlockPostgresDataHandler])
bind(classOf[BalanceBlockingDataHandler]).to(classOf[BalancePostgresDataHandler])
bind(classOf[StatisticsBlockingDataHandler]).to(classOf[StatisticsPostgresDataHandler])
bind(classOf[DatabaseBlockingSeeder]).to(classOf[DatabasePostgresSeeder])
}
}

104
server/app/com/xsn/explorer/processors/BlockEventsProcessor.scala

@ -2,15 +2,16 @@ package com.xsn.explorer.processors
import javax.inject.Inject
import com.alexitc.playsonify.core.FutureApplicationResult
import com.alexitc.playsonify.core.FutureOr.Implicits.FutureOps
import com.alexitc.playsonify.core.{ApplicationResult, FutureApplicationResult}
import com.xsn.explorer.data.BlockBlockingDataHandler
import com.xsn.explorer.data.anorm.DatabasePostgresSeeder
import com.xsn.explorer.models.{Blockhash, Transaction}
import com.xsn.explorer.data.DatabaseSeeder
import com.xsn.explorer.data.async.{BlockFutureDataHandler, DatabaseFutureSeeder}
import com.xsn.explorer.errors.BlockNotFoundError
import com.xsn.explorer.models.rpc.Block
import com.xsn.explorer.models.{Blockhash, Transaction}
import com.xsn.explorer.services.XSNService
import com.xsn.explorer.util.Extensions.FutureApplicationResultExt
import org.scalactic.Good
import org.scalactic.{Bad, Good, One}
import org.slf4j.LoggerFactory
import scala.concurrent.ExecutionContext.Implicits.global
@ -21,8 +22,8 @@ import scala.concurrent.Future
*/
class BlockEventsProcessor @Inject() (
xsnService: XSNService,
databasePostgresSeeder: DatabasePostgresSeeder,
blockBlockingDataHandler: BlockBlockingDataHandler) {
databaseSeeder: DatabaseFutureSeeder,
blockDataHandler: BlockFutureDataHandler) {
private val logger = LoggerFactory.getLogger(this.getClass)
@ -31,13 +32,15 @@ class BlockEventsProcessor @Inject() (
*
* The following scenarios are handled for the new latest block:
*
* 1. It is new on our database, we just append it.
* 1. It is new on our database, we just append it (possibly first block).
* - current blocks = A -> B, new latest block = C, new blocks = A -> B -> C
* - current blocks = empty, new latest block = A, new blocks = A
*
* 2. It is an existing block, hence, the previous from the latest one in our database .
* 2. It is the previous block from our latest block (rechain).
* - current blocks = A -> B -> C, new latest block = B, new blocks = A -> B
*
* 3. None of previous cases, it is a block that might be missing in our chain.
*
* @param blockhash the new latest block
*/
def newLatestBlock(blockhash: Blockhash): FutureApplicationResult[Block] = {
@ -56,64 +59,69 @@ class BlockEventsProcessor @Inject() (
val result = for {
orphanRPCTransactions <- orphanBlock.transactions.map(xsnService.getTransaction).toFutureOr
orphanTransactions = orphanRPCTransactions.map(Transaction.fromRPC)
} yield {
val command = DatabasePostgresSeeder.ReplaceBlockCommand(
command = DatabaseSeeder.ReplaceBlockCommand(
orphanBlock = orphanBlock,
orphanTransactions = orphanTransactions,
newBlock = newBlock,
newTransactions = newTransactions)
_ <- databaseSeeder.replaceLatestBlock(command).toFutureOr
} yield ()
scala.concurrent.blocking {
databasePostgresSeeder.replaceLatestBlock(command)
}
}
result
.mapWithError(identity)
.toFuture
result.toFuture
}
def onFirstBlock: FutureApplicationResult[Unit] = {
logger.info(s"first block = ${newBlock.hash.string}")
val command = DatabasePostgresSeeder.CreateBlockCommand(newBlock, newTransactions)
def unsafe: ApplicationResult[Unit] = scala.concurrent.blocking {
databasePostgresSeeder.firstBlock(command)
}
Future(unsafe)
val command = DatabaseSeeder.CreateBlockCommand(newBlock, newTransactions)
databaseSeeder.firstBlock(command)
}
def onNewBlock(latestBlock: Block): FutureApplicationResult[Unit] = {
logger.info(s"existing latest block = ${latestBlock.hash.string} -> new latest block = ${newBlock.hash.string}")
val command = DatabasePostgresSeeder.CreateBlockCommand(newBlock, newTransactions)
def unsafe = scala.concurrent.blocking {
databasePostgresSeeder.newLatestBlock(command)
}
Future(unsafe)
val command = DatabaseSeeder.CreateBlockCommand(newBlock, newTransactions)
databaseSeeder.newLatestBlock(command)
}
val latestBlockResult = scala.concurrent.blocking {
blockBlockingDataHandler.getLatestBlock()
def onMissingBlock(): FutureApplicationResult[Unit] = {
val command = DatabaseSeeder.CreateBlockCommand(newBlock, newTransactions)
databaseSeeder.insertPendingBlock(command)
}
latestBlockResult
.map { latestBlock =>
if (newBlock.hash == latestBlock.hash) {
// duplicated msg
logger.info(s"ignoring duplicated latest block = ${newBlock.hash.string}")
Future.successful(Good(()))
} else if (newBlock.previousBlockhash.contains(latestBlock.hash)) {
// latest block -> new block
onNewBlock(latestBlock)
} else {
logger.info(s"rechain, orphan block = ${latestBlock.hash.string}, new latest block = ${newBlock.hash.string}")
onRechain(latestBlock)
val result = for {
latestBlockMaybe <- blockDataHandler
.getLatestBlock()
.map {
case Good(block) => Good(Some(block))
case Bad(One(BlockNotFoundError)) => Good(None)
case Bad(errors) => Bad(errors)
}
}
.getOrElse(onFirstBlock)
.toFutureOr
_ <- latestBlockMaybe
.map { latestBlock =>
if (newBlock.hash == latestBlock.hash) {
// duplicated msg
logger.info(s"ignoring duplicated latest block = ${newBlock.hash.string}")
Future.successful(Good(()))
} else if (newBlock.previousBlockhash.contains(latestBlock.hash)) {
// latest block -> new block
onNewBlock(latestBlock)
} else if (latestBlock.previousBlockhash.contains(newBlock.hash)) {
logger.info(s"rechain, orphan block = ${latestBlock.hash.string}, new latest block = ${newBlock.hash.string}")
onRechain(latestBlock)
} else {
logger.info(s"Adding possible missing block = ${newBlock.hash.string}")
onMissingBlock()
}
}
.getOrElse(onFirstBlock)
.toFutureOr
} yield ()
result.toFuture
}
}

10
server/app/com/xsn/explorer/tasks/BlockSynchronizerTask.scala

@ -4,8 +4,8 @@ import javax.inject.{Inject, Singleton}
import com.alexitc.playsonify.core.FutureApplicationResult
import com.alexitc.playsonify.core.FutureOr.Implicits.{FutureOps, OrOps}
import com.xsn.explorer.data.anorm.DatabasePostgresSeeder
import com.xsn.explorer.data.async.BlockFutureDataHandler
import com.xsn.explorer.data.DatabaseSeeder
import com.xsn.explorer.data.async.{BlockFutureDataHandler, DatabaseFutureSeeder}
import com.xsn.explorer.errors.BlockNotFoundError
import com.xsn.explorer.models.{Blockhash, Height, Transaction}
import com.xsn.explorer.services.XSNService
@ -26,7 +26,7 @@ import scala.util.control.NonFatal
class BlockSynchronizerTask @Inject() (
xsnService: XSNService,
blockDataHandler: BlockFutureDataHandler,
databasePostgresSeeder: DatabasePostgresSeeder) {
databaseSeeder: DatabaseFutureSeeder) {
private val logger = LoggerFactory.getLogger(this.getClass)
private val lock = new Object
@ -103,8 +103,8 @@ class BlockSynchronizerTask @Inject() (
rpcTransactions <- block.transactions.map(xsnService.getTransaction).toFutureOr
transactions = rpcTransactions.map(Transaction.fromRPC)
command = DatabasePostgresSeeder.CreateBlockCommand(block, transactions)
_ <- scala.concurrent.blocking { databasePostgresSeeder.insertOldBlock(command) }.toFutureOr
command = DatabaseSeeder.CreateBlockCommand(block, transactions)
_ <- databaseSeeder.insertPendingBlock(command).toFutureOr
_ = logger.debug(s"Block ${block.height.int} saved")
_ <- block

21
server/test/com/xsn/explorer/processors/BlockEventsProcessorSpec.scala

@ -2,7 +2,9 @@ package com.xsn.explorer.processors
import com.xsn.explorer.data.anorm.dao.{BalancePostgresDAO, BlockPostgresDAO, TransactionPostgresDAO}
import com.xsn.explorer.data.anorm.{BlockPostgresDataHandler, DatabasePostgresSeeder}
import com.xsn.explorer.data.async.{BlockFutureDataHandler, DatabaseFutureSeeder}
import com.xsn.explorer.data.common.PostgresDataHandlerSpec
import com.xsn.explorer.helpers.Executors._
import com.xsn.explorer.helpers.{BlockLoader, FileBasedXSNService}
import com.xsn.explorer.models.rpc.Block
import org.scalatest.BeforeAndAfter
@ -18,7 +20,10 @@ class BlockEventsProcessorSpec extends PostgresDataHandlerSpec with ScalaFutures
new BalancePostgresDAO)
lazy val xsnService = new FileBasedXSNService
lazy val processor = new BlockEventsProcessor(xsnService, dataSeeder, dataHandler)
lazy val processor = new BlockEventsProcessor(
xsnService,
new DatabaseFutureSeeder(dataSeeder),
new BlockFutureDataHandler(dataHandler))
before {
clearDatabase()
@ -69,6 +74,20 @@ class BlockEventsProcessorSpec extends PostgresDataHandlerSpec with ScalaFutures
verifyBlockchain(blocks)
}
}
"process a missing block" in {
val block1 = BlockLoader.get("000003fb382f6892ae96594b81aa916a8923c70701de4e7054aac556c7271ef7")
val block2 = BlockLoader.get("000004645e2717b556682e3c642a4c6e473bf25c653ff8e8c114a3006040ffb8")
val block3 = BlockLoader.get("00000766115b26ecbc09cd3a3db6870fdaf2f049d65a910eb2f2b48b566ca7bd")
List(block2, block3).map(dataHandler.upsert).foreach(_.isGood mustEqual true)
whenReady(processor.newLatestBlock(block1.hash)) { result =>
result.isGood mustEqual true
val blocks = List(block1, block2, block3)
verifyBlockchain(blocks)
}
}
}
private def verifyBlockchain(blocks: List[Block]) = {

Loading…
Cancel
Save