Browse Source

server: Update BlockEventsProcessor to return detailed results

scalafmt-draft
Alexis Hernandez 7 years ago
parent
commit
78dbc3d160
  1. 51
      server/app/com/xsn/explorer/processors/BlockEventsProcessor.scala
  2. 20
      server/app/com/xsn/explorer/tasks/SQSSeederTask.scala

51
server/app/com/xsn/explorer/processors/BlockEventsProcessor.scala

@ -26,6 +26,8 @@ class BlockEventsProcessor @Inject() (
databaseSeeder: DatabaseFutureSeeder,
blockDataHandler: BlockFutureDataHandler) {
import BlockEventsProcessor._
private val logger = LoggerFactory.getLogger(this.getClass)
/**
@ -44,12 +46,12 @@ class BlockEventsProcessor @Inject() (
*
* @param blockhash the new latest block
*/
def newLatestBlock(blockhash: Blockhash): FutureApplicationResult[Block] = {
def newLatestBlock(blockhash: Blockhash): FutureApplicationResult[Result] = {
val result = for {
block <- xsnService.getBlock(blockhash).toFutureOr
transactions <- block.transactions.map(getTransaction).toFutureOr
r <- newLatestBlock(block, transactions).toFutureOr
} yield block
} yield r
result.toFuture
}
@ -74,8 +76,8 @@ class BlockEventsProcessor @Inject() (
result.toFuture
}
private def newLatestBlock(newBlock: Block, newTransactions: List[Transaction]): FutureApplicationResult[Unit] = {
def onRechain(orphanBlock: Block): FutureApplicationResult[Unit] = {
private def newLatestBlock(newBlock: Block, newTransactions: List[Transaction]): FutureApplicationResult[Result] = {
def onRechain(orphanBlock: Block): FutureApplicationResult[Result] = {
val result = for {
orphanTransactions <- orphanBlock.transactions.map(getTransaction).toFutureOr
@ -85,36 +87,47 @@ class BlockEventsProcessor @Inject() (
newBlock = newBlock,
newTransactions = newTransactions)
_ <- databaseSeeder.replaceLatestBlock(command).toFutureOr
} yield ()
} yield RechainDone(orphanBlock, newBlock)
result.toFuture
}
def onFirstBlock: FutureApplicationResult[Unit] = {
def onFirstBlock: FutureApplicationResult[Result] = {
logger.info(s"first block = ${newBlock.hash.string}")
val command = DatabaseSeeder.CreateBlockCommand(newBlock, newTransactions)
databaseSeeder.firstBlock(command)
.toFutureOr
.map(_ => FirstBlockCreated(newBlock))
.toFuture
}
def onNewBlock(latestBlock: Block): FutureApplicationResult[Unit] = {
def onNewBlock(latestBlock: Block): FutureApplicationResult[Result] = {
logger.info(s"existing latest block = ${latestBlock.hash.string} -> new latest block = ${newBlock.hash.string}")
val command = DatabaseSeeder.CreateBlockCommand(newBlock, newTransactions)
databaseSeeder.newLatestBlock(command)
databaseSeeder
.newLatestBlock(command)
.toFutureOr
.map(_ => NewBlockAppended(newBlock))
.toFuture
}
def onMissingBlock(): FutureApplicationResult[Unit] = {
def onMissingBlock(): FutureApplicationResult[Result] = {
blockDataHandler
.getBy(newBlock.hash)
.flatMap {
case Good(_) =>
logger.info(s"The block ${newBlock.hash.string} is not missing but duplicated, ignoring")
Future.successful { Good(()) }
Future.successful { Good(ExistingBlockIgnored(newBlock)) }
case Bad(One(BlockNotFoundError)) =>
val command = DatabaseSeeder.CreateBlockCommand(newBlock, newTransactions)
databaseSeeder.insertPendingBlock(command)
databaseSeeder
.insertPendingBlock(command)
.toFutureOr
.map(_ => MissingBlockProcessed(newBlock))
.toFuture
case Bad(errors) =>
Future.successful(Bad(errors))
@ -131,12 +144,12 @@ class BlockEventsProcessor @Inject() (
}
.toFutureOr
_ <- latestBlockMaybe
r <- latestBlockMaybe
.map { latestBlock =>
if (newBlock.hash == latestBlock.hash) {
// duplicated msg
logger.info(s"ignoring duplicated latest block = ${newBlock.hash.string}")
Future.successful(Good(()))
Future.successful(Good(ExistingBlockIgnored(newBlock)))
} else if (newBlock.previousBlockhash.contains(latestBlock.hash)) {
// latest block -> new block
onNewBlock(latestBlock)
@ -151,8 +164,18 @@ class BlockEventsProcessor @Inject() (
}
.getOrElse(onFirstBlock)
.toFutureOr
} yield ()
} yield r
result.toFuture
}
}
object BlockEventsProcessor {
sealed trait Result
case class FirstBlockCreated(block: Block) extends Result
case class MissingBlockProcessed(block: Block) extends Result
case class ExistingBlockIgnored(block: Block) extends Result
case class NewBlockAppended(block: Block) extends Result
case class RechainDone(orphanBlock: Block, newBlock: Block) extends Result
}

20
server/app/com/xsn/explorer/tasks/SQSSeederTask.scala

@ -66,7 +66,11 @@ class SQSSeederTask @Inject() (
case Good(_) =>
logger.info(s"Block processed successfully = ${blockhash.string}")
sqs.deleteMessageAsync(config.queueUrl, message.getReceiptHandle)
blockSynchronizerTask.sync()
}
result.foreach {
case Good(eventResult) => onBlockResult(eventResult)
case _ => ()
}
}
@ -80,4 +84,18 @@ class SQSSeederTask @Inject() (
}
.foreach(onBlockhash)
}
private def onBlockResult(eventResult: BlockEventsProcessor.Result) = eventResult match {
case BlockEventsProcessor.FirstBlockCreated(block) =>
blockSynchronizerTask.sync()
case BlockEventsProcessor.NewBlockAppended(block) =>
blockSynchronizerTask.sync()
case BlockEventsProcessor.RechainDone(orphanBlock, newBlock) =>
blockSynchronizerTask.sync()
case BlockEventsProcessor.MissingBlockProcessed(block) => ()
case BlockEventsProcessor.ExistingBlockIgnored(block) => ()
}
}

Loading…
Cancel
Save