Browse Source

server: Refactor DatabaseSeeder and BlockEventsProcessor

scalafmt-draft
Alexis Hernandez 7 years ago
parent
commit
0317a4d933
  1. 16
      server/app/com/xsn/explorer/data/DatabaseSeeder.scala
  2. 32
      server/app/com/xsn/explorer/data/anorm/DatabasePostgresSeeder.scala
  3. 12
      server/app/com/xsn/explorer/data/async/DatabaseFutureSeeder.scala
  4. 32
      server/app/com/xsn/explorer/processors/BlockEventsProcessor.scala
  5. 2
      server/app/com/xsn/explorer/tasks/BackwardsSynchronizerTask.scala
  6. 2
      server/app/com/xsn/explorer/tasks/FirstBlockSynchronizerTask.scala
  7. 2
      server/app/com/xsn/explorer/tasks/SQSSeederTask.scala
  8. 30
      server/test/com/xsn/explorer/processors/BlockEventsProcessorSpec.scala

16
server/app/com/xsn/explorer/data/DatabaseSeeder.scala

@ -11,27 +11,15 @@ trait DatabaseSeeder[F[_]] {
import DatabaseSeeder._ import DatabaseSeeder._
/** /**
* There are no blocks, we are adding the first one which could possibly * The database has some blocks, we are adding a new a block.
* not be the genesis block but the first one on our database.
*/ */
def firstBlock(command: CreateBlockCommand): F[Unit] def newBlock(command: CreateBlockCommand): F[Unit]
/**
* The database has some blocks, we are appending a block to our latest block.
*/
def newLatestBlock(command: CreateBlockCommand): F[Unit]
/** /**
* The database has some blocks but there is a rechain happening, we need to * The database has some blocks but there is a rechain happening, we need to
* replace our current latest block with the new latest block. * replace our current latest block with the new latest block.
*/ */
def replaceBlock(command: ReplaceBlockCommand): F[Unit] def replaceBlock(command: ReplaceBlockCommand): F[Unit]
/**
* The database has some blocks but the chain is not complete, we are inserting
* a previous block that's missing in our chain.
*/
def insertPendingBlock(command: CreateBlockCommand): F[Unit]
} }
object DatabaseSeeder { object DatabaseSeeder {

32
server/app/com/xsn/explorer/data/anorm/DatabasePostgresSeeder.scala

@ -24,32 +24,24 @@ class DatabasePostgresSeeder @Inject() (
private val logger = LoggerFactory.getLogger(this.getClass) private val logger = LoggerFactory.getLogger(this.getClass)
override def firstBlock(command: CreateBlockCommand): ApplicationResult[Unit] = withTransaction { implicit conn => override def newBlock(command: CreateBlockCommand): ApplicationResult[Unit] = withTransaction { implicit conn =>
val result = upsertBlockCascade(command) // link previous block (if possible)
command.block.previousBlockhash.foreach { previousBlockhash =>
result blockPostgresDAO
.map(_ => Good(()))
.getOrElse(throw new RuntimeException("Unable to add the first block"))
}
override def newLatestBlock(command: CreateBlockCommand): ApplicationResult[Unit] = withTransaction { implicit conn =>
val result = for {
// link previous block
previousBlockhash <- command.block.previousBlockhash
_ <- blockPostgresDAO
.setNextBlockhash(previousBlockhash, command.block.hash) .setNextBlockhash(previousBlockhash, command.block.hash)
.orElse { .orElse {
logger.warn(s"Failed to link previous block = ${previousBlockhash.string} to ${command.block.hash.string} because it wasn't found") logger.warn(s"Failed to link previous block = ${previousBlockhash.string} to ${command.block.hash.string} because it wasn't found")
None None
} }
}
val result = for {
_ <- upsertBlockCascade(command) _ <- upsertBlockCascade(command)
} yield () } yield ()
result result
.map(Good(_)) .map(Good(_))
.getOrElse(throw new RuntimeException("Unable to add the new latest block")) .getOrElse(throw new RuntimeException("Unable to add the new block"))
} }
override def replaceBlock(command: ReplaceBlockCommand): ApplicationResult[Unit] = withTransaction { implicit conn => override def replaceBlock(command: ReplaceBlockCommand): ApplicationResult[Unit] = withTransaction { implicit conn =>
@ -65,16 +57,6 @@ class DatabasePostgresSeeder @Inject() (
.getOrElse(throw new RuntimeException("Unable to replace latest block")) .getOrElse(throw new RuntimeException("Unable to replace latest block"))
} }
override def insertPendingBlock(command: CreateBlockCommand): ApplicationResult[Unit] = withTransaction { implicit conn =>
val result = for {
_ <- upsertBlockCascade(command)
} yield ()
result
.map(Good(_))
.getOrElse(throw new RuntimeException("Unable to an old block"))
}
private def upsertBlockCascade(command: CreateBlockCommand)(implicit conn: Connection): Option[Unit] = { private def upsertBlockCascade(command: CreateBlockCommand)(implicit conn: Connection): Option[Unit] = {
for { for {
// block // block

12
server/app/com/xsn/explorer/data/async/DatabaseFutureSeeder.scala

@ -13,19 +13,11 @@ class DatabaseFutureSeeder @Inject() (
implicit ec: DatabaseExecutionContext) implicit ec: DatabaseExecutionContext)
extends DatabaseSeeder[FutureApplicationResult] { extends DatabaseSeeder[FutureApplicationResult] {
override def firstBlock(command: DatabaseSeeder.CreateBlockCommand): FutureApplicationResult[Unit] = Future { override def newBlock(command: DatabaseSeeder.CreateBlockCommand): FutureApplicationResult[Unit] = Future {
blockingSeeder.firstBlock(command) blockingSeeder.newBlock(command)
}
override def newLatestBlock(command: DatabaseSeeder.CreateBlockCommand): FutureApplicationResult[Unit] = Future {
blockingSeeder.newLatestBlock(command)
} }
override def replaceBlock(command: DatabaseSeeder.ReplaceBlockCommand): FutureApplicationResult[Unit] = Future { override def replaceBlock(command: DatabaseSeeder.ReplaceBlockCommand): FutureApplicationResult[Unit] = Future {
blockingSeeder.replaceBlock(command) blockingSeeder.replaceBlock(command)
} }
override def insertPendingBlock(command: DatabaseSeeder.CreateBlockCommand): FutureApplicationResult[Unit] = Future {
blockingSeeder.insertPendingBlock(command)
}
} }

32
server/app/com/xsn/explorer/processors/BlockEventsProcessor.scala

@ -30,26 +30,34 @@ class BlockEventsProcessor @Inject() (
private val logger = LoggerFactory.getLogger(this.getClass) private val logger = LoggerFactory.getLogger(this.getClass)
/** /**
* There is a new latest block in the blockchain, we need to sync our database. * There is a possible new block in the blockchain, we need to sync our database.
* *
* The following scenarios are handled for the new latest block: * The following scenarios are handled for the new block:
* *
* 1. It is new on our database, we just append it (possibly first block). * 0. The block is already present, we ignore it.
* - current blocks = A -> B, new latest block = C, new blocks = A -> B -> C *
* - current blocks = empty, new latest block = A, new blocks = A * x. The block can not be fully retrieved from the rpc server, we ignore it.
*
* x. The first block in the database (not exactly the first block in the chain), we just add it.
* - Example: current blocks = empty, new block = A, new blocks = A
*
* x. A new block on our database having a link to our latest block in the database:
* - The new block is added.
* - The next block from our latest block is set to the next block.
* - Example: current blocks = A -> B, new block = C, new blocks = A -> B -> C
* *
* 2. It is the previous block from our latest block (rechain). * 2. It is the previous block from our latest block (rechain).
* - current blocks = A -> B -> C, new latest block = B, new blocks = A -> B * - current blocks = A -> B -> C, new latest block = B, new blocks = A -> B
* *
* 3. None of previous cases, it is a block that might be missing in our chain. * 3. None of previous cases, it is a block that might be missing in our chain.
* *
* @param blockhash the new latest block * @param blockhash the blockhash to process
*/ */
def newLatestBlock(blockhash: Blockhash): FutureApplicationResult[Result] = { def processBlock(blockhash: Blockhash): FutureApplicationResult[Result] = {
val result = for { val result = for {
block <- xsnService.getBlock(blockhash).toFutureOr block <- xsnService.getBlock(blockhash).toFutureOr
transactions <- block.transactions.map(transactionService.getTransaction).toFutureOr transactions <- block.transactions.map(transactionService.getTransaction).toFutureOr
r <- newLatestBlock(block, transactions).toFutureOr r <- processBlock(block, transactions).toFutureOr
} yield r } yield r
result.toFuture.map { result.toFuture.map {
@ -60,7 +68,7 @@ class BlockEventsProcessor @Inject() (
} }
} }
private def newLatestBlock(newBlock: Block, newTransactions: List[Transaction]): FutureApplicationResult[Result] = { private def processBlock(newBlock: Block, newTransactions: List[Transaction]): FutureApplicationResult[Result] = {
def onRechain(orphanBlock: Block): FutureApplicationResult[Result] = { def onRechain(orphanBlock: Block): FutureApplicationResult[Result] = {
val command = DatabaseSeeder.ReplaceBlockCommand( val command = DatabaseSeeder.ReplaceBlockCommand(
orphanBlock = orphanBlock, orphanBlock = orphanBlock,
@ -78,7 +86,7 @@ class BlockEventsProcessor @Inject() (
logger.info(s"first block = ${newBlock.hash.string}") logger.info(s"first block = ${newBlock.hash.string}")
val command = DatabaseSeeder.CreateBlockCommand(newBlock, newTransactions) val command = DatabaseSeeder.CreateBlockCommand(newBlock, newTransactions)
databaseSeeder.firstBlock(command) databaseSeeder.newBlock(command)
.toFutureOr .toFutureOr
.map(_ => FirstBlockCreated(newBlock)) .map(_ => FirstBlockCreated(newBlock))
.toFuture .toFuture
@ -89,7 +97,7 @@ class BlockEventsProcessor @Inject() (
val command = DatabaseSeeder.CreateBlockCommand(newBlock, newTransactions) val command = DatabaseSeeder.CreateBlockCommand(newBlock, newTransactions)
databaseSeeder databaseSeeder
.newLatestBlock(command) .newBlock(command)
.toFutureOr .toFutureOr
.map(_ => NewBlockAppended(newBlock)) .map(_ => NewBlockAppended(newBlock))
.toFuture .toFuture
@ -121,7 +129,7 @@ class BlockEventsProcessor @Inject() (
case Bad(One(BlockNotFoundError)) => case Bad(One(BlockNotFoundError)) =>
val createCommand = DatabaseSeeder.CreateBlockCommand(newBlock, newTransactions) val createCommand = DatabaseSeeder.CreateBlockCommand(newBlock, newTransactions)
databaseSeeder databaseSeeder
.insertPendingBlock(createCommand) .newBlock(createCommand)
.flatMap { .flatMap {
case Good(_) => Future.successful { Good(MissingBlockProcessed(newBlock)) } case Good(_) => Future.successful { Good(MissingBlockProcessed(newBlock)) }
case Bad(One(PostgresForeignKeyViolationError("height", _))) => onRepeatedBlockHeight() case Bad(One(PostgresForeignKeyViolationError("height", _))) => onRepeatedBlockHeight()

2
server/app/com/xsn/explorer/tasks/BackwardsSynchronizerTask.scala

@ -117,7 +117,7 @@ class BackwardsSynchronizerTask @Inject() (
transactions <- block.transactions.map(transactionService.getTransaction).toFutureOr transactions <- block.transactions.map(transactionService.getTransaction).toFutureOr
command = DatabaseSeeder.CreateBlockCommand(block, transactions) command = DatabaseSeeder.CreateBlockCommand(block, transactions)
_ <- databaseSeeder.insertPendingBlock(command).toFutureOr _ <- databaseSeeder.newBlock(command).toFutureOr
_ = logger.debug(s"Block ${block.height.int} saved") _ = logger.debug(s"Block ${block.height.int} saved")
_ <- block _ <- block

2
server/app/com/xsn/explorer/tasks/FirstBlockSynchronizerTask.scala

@ -103,7 +103,7 @@ class FirstBlockSynchronizerTask @Inject() (
transactions <- block.transactions.map(transactionService.getTransaction).toFutureOr transactions <- block.transactions.map(transactionService.getTransaction).toFutureOr
command = DatabaseSeeder.CreateBlockCommand(block, transactions) command = DatabaseSeeder.CreateBlockCommand(block, transactions)
_ <- databaseSeeder.insertPendingBlock(command).toFutureOr _ <- databaseSeeder.newBlock(command).toFutureOr
_ = logger.debug(s"Block ${block.height.int} saved") _ = logger.debug(s"Block ${block.height.int} saved")
_ <- block _ <- block

2
server/app/com/xsn/explorer/tasks/SQSSeederTask.scala

@ -58,7 +58,7 @@ class SQSSeederTask @Inject() (
private def handleMessage(message: Message): Unit = { private def handleMessage(message: Message): Unit = {
def onBlockhash(blockhash: Blockhash) = { def onBlockhash(blockhash: Blockhash) = {
val result = blockEventsProcessor.newLatestBlock(blockhash) val result = blockEventsProcessor.processBlock(blockhash)
result.recover { result.recover {
case NonFatal(ex) => case NonFatal(ex) =>

30
server/test/com/xsn/explorer/processors/BlockEventsProcessorSpec.scala

@ -45,7 +45,7 @@ class BlockEventsProcessorSpec extends PostgresDataHandlerSpec with ScalaFutures
// see https://github.com/X9Developers/XSN/issues/32 // see https://github.com/X9Developers/XSN/issues/32
val block0 = BlockLoader.get("00000c822abdbb23e28f79a49d29b41429737c6c7e15df40d1b1f1b35907ae34") val block0 = BlockLoader.get("00000c822abdbb23e28f79a49d29b41429737c6c7e15df40d1b1f1b35907ae34")
whenReady(processor.newLatestBlock(block0.hash)) { result => whenReady(processor.processBlock(block0.hash)) { result =>
result mustEqual Good(MissingBlockIgnored) result mustEqual Good(MissingBlockIgnored)
} }
} }
@ -53,7 +53,7 @@ class BlockEventsProcessorSpec extends PostgresDataHandlerSpec with ScalaFutures
"process first block" in { "process first block" in {
val block1 = BlockLoader.get("000003fb382f6892ae96594b81aa916a8923c70701de4e7054aac556c7271ef7") val block1 = BlockLoader.get("000003fb382f6892ae96594b81aa916a8923c70701de4e7054aac556c7271ef7")
whenReady(processor.newLatestBlock(block1.hash)) { result => whenReady(processor.processBlock(block1.hash)) { result =>
result.isGood mustEqual true result.isGood mustEqual true
} }
} }
@ -65,7 +65,7 @@ class BlockEventsProcessorSpec extends PostgresDataHandlerSpec with ScalaFutures
List(block1, block2).map(dataHandler.insert).foreach(_.isGood mustEqual true) List(block1, block2).map(dataHandler.insert).foreach(_.isGood mustEqual true)
whenReady(processor.newLatestBlock(block3.hash)) { result => whenReady(processor.processBlock(block3.hash)) { result =>
result mustEqual Good(NewBlockAppended(block3)) result mustEqual Good(NewBlockAppended(block3))
val blocks = List(block1, block2, block3) val blocks = List(block1, block2, block3)
verifyBlockchain(blocks) verifyBlockchain(blocks)
@ -79,7 +79,7 @@ class BlockEventsProcessorSpec extends PostgresDataHandlerSpec with ScalaFutures
List(block1, block2, block3).map(dataHandler.insert).foreach(_.isGood mustEqual true) List(block1, block2, block3).map(dataHandler.insert).foreach(_.isGood mustEqual true)
whenReady(processor.newLatestBlock(block2.hash)) { whenReady(processor.processBlock(block2.hash)) {
case Good(RechainDone(orphanBlock, newBlock)) => case Good(RechainDone(orphanBlock, newBlock)) =>
orphanBlock.hash mustEqual block3.hash orphanBlock.hash mustEqual block3.hash
newBlock.hash mustEqual block2.hash newBlock.hash mustEqual block2.hash
@ -98,7 +98,7 @@ class BlockEventsProcessorSpec extends PostgresDataHandlerSpec with ScalaFutures
List(block2, block3).map(dataHandler.insert).foreach(_.isGood mustEqual true) List(block2, block3).map(dataHandler.insert).foreach(_.isGood mustEqual true)
whenReady(processor.newLatestBlock(block1.hash)) { result => whenReady(processor.processBlock(block1.hash)) { result =>
result.isGood mustEqual true result.isGood mustEqual true
val blocks = List(block1, block2, block3) val blocks = List(block1, block2, block3)
verifyBlockchain(blocks) verifyBlockchain(blocks)
@ -112,7 +112,7 @@ class BlockEventsProcessorSpec extends PostgresDataHandlerSpec with ScalaFutures
List(block1, block2, block3).map(dataHandler.insert).foreach(_.isGood mustEqual true) List(block1, block2, block3).map(dataHandler.insert).foreach(_.isGood mustEqual true)
whenReady(processor.newLatestBlock(block1.hash)) { result => whenReady(processor.processBlock(block1.hash)) { result =>
result.isGood mustEqual true result.isGood mustEqual true
val blocks = List(block1, block2, block3) val blocks = List(block1, block2, block3)
verifyBlockchain(blocks) verifyBlockchain(blocks)
@ -127,7 +127,7 @@ class BlockEventsProcessorSpec extends PostgresDataHandlerSpec with ScalaFutures
"process a block without spent index on transactions" in { "process a block without spent index on transactions" in {
val block = BlockLoader.get("000001ff95f22b8d82db14a5c5e9f725e8239e548be43c668766e7ddaee81924") val block = BlockLoader.get("000001ff95f22b8d82db14a5c5e9f725e8239e548be43c668766e7ddaee81924")
whenReady(processor.newLatestBlock(block.hash)) { result => whenReady(processor.processBlock(block.hash)) { result =>
result.isGood mustEqual true result.isGood mustEqual true
val balanceDataHandler = new BalancePostgresDataHandler(database, new BalancePostgresDAO(new FieldOrderingSQLInterpreter)) val balanceDataHandler = new BalancePostgresDataHandler(database, new BalancePostgresDAO(new FieldOrderingSQLInterpreter))
@ -149,10 +149,10 @@ class BlockEventsProcessorSpec extends PostgresDataHandlerSpec with ScalaFutures
List(block1, block2) List(block1, block2)
.map(_.hash) .map(_.hash)
.map(processor.newLatestBlock) .map(processor.processBlock)
.foreach { whenReady(_) { _.isGood mustEqual true } } .foreach { whenReady(_) { _.isGood mustEqual true } }
whenReady(processor.newLatestBlock(block1.hash)) { result => whenReady(processor.processBlock(block1.hash)) { result =>
result.isGood mustEqual true result.isGood mustEqual true
val blocks = List(block1) val blocks = List(block1)
verifyBlockchain(blocks) verifyBlockchain(blocks)
@ -199,14 +199,14 @@ class BlockEventsProcessorSpec extends PostgresDataHandlerSpec with ScalaFutures
List(block1, block2) List(block1, block2)
.map(_.hash) .map(_.hash)
.map(processor.newLatestBlock) .map(processor.processBlock)
.foreach { whenReady(_) { _.isGood mustEqual true } } .foreach { whenReady(_) { _.isGood mustEqual true } }
/** /**
* When processing the latest block, a rechain event has occurred in the rpc server which leads to * When processing the latest block, a rechain event has occurred in the rpc server which leads to
* a case that we can't retrieve the block information, the block should be ignored. * a case that we can't retrieve the block information, the block should be ignored.
*/ */
whenReady(processor.newLatestBlock(block3.hash)) { result => whenReady(processor.processBlock(block3.hash)) { result =>
result mustEqual Good(MissingBlockIgnored) result mustEqual Good(MissingBlockIgnored)
val blocks = List(block1, block2) val blocks = List(block1, block2)
verifyBlockchain(blocks) verifyBlockchain(blocks)
@ -238,14 +238,14 @@ class BlockEventsProcessorSpec extends PostgresDataHandlerSpec with ScalaFutures
List(block1, block2) List(block1, block2)
.map(_.hash) .map(_.hash)
.map(processor.newLatestBlock) .map(processor.processBlock)
.foreach { whenReady(_) { _.isGood mustEqual true } } .foreach { whenReady(_) { _.isGood mustEqual true } }
/** /**
* When processing the latest block, a rechain event has occurred in the rpc server which leads to * When processing the latest block, a rechain event has occurred in the rpc server which leads to
* a case that we can't retrieve a specific transaction, the block should be ignored. * a case that we can't retrieve a specific transaction, the block should be ignored.
*/ */
whenReady(processor.newLatestBlock(block3.hash)) { result => whenReady(processor.processBlock(block3.hash)) { result =>
result mustEqual Good(MissingBlockIgnored) result mustEqual Good(MissingBlockIgnored)
val blocks = List(block1, block2) val blocks = List(block1, block2)
verifyBlockchain(blocks) verifyBlockchain(blocks)
@ -278,10 +278,10 @@ class BlockEventsProcessorSpec extends PostgresDataHandlerSpec with ScalaFutures
List(block1, block2) List(block1, block2)
.map(_.hash) .map(_.hash)
.map(processor.newLatestBlock) .map(processor.processBlock)
.foreach { whenReady(_) { _.isGood mustEqual true } } .foreach { whenReady(_) { _.isGood mustEqual true } }
whenReady(processor.newLatestBlock(block3.hash)) { result => whenReady(processor.processBlock(block3.hash)) { result =>
result mustEqual Good(ReplacedByBlockHeight) result mustEqual Good(ReplacedByBlockHeight)
val blocks = List(block1, block3) val blocks = List(block1, block3)
verifyBlockchain(blocks) verifyBlockchain(blocks)

Loading…
Cancel
Save