Browse Source

server: Add BlockEventsProcessor

scalafmt-draft
Alexis Hernandez 7 years ago
parent
commit
02b03b22c8
  1. 62
      server/app/com/xsn/explorer/data/anorm/DatabasePostgresSeeder.scala
  2. 76
      server/app/com/xsn/explorer/processors/BlockEventsProcessor.scala
  3. 11
      server/test/com/xsn/explorer/helpers/BlockLoader.scala
  4. 25
      server/test/com/xsn/explorer/helpers/FileBasedXSNService.scala
  5. 103
      server/test/com/xsn/explorer/processors/BlockEventsProcessorSpec.scala

62
server/app/com/xsn/explorer/data/anorm/DatabasePostgresSeeder.scala

@ -0,0 +1,62 @@
package com.xsn.explorer.data.anorm
import javax.inject.Inject
import com.alexitc.playsonify.core.ApplicationResult
import com.xsn.explorer.data.anorm.dao.BlockPostgresDAO
import com.xsn.explorer.models.Blockhash
import com.xsn.explorer.models.rpc.Block
import org.scalactic.Good
import play.api.db.Database
class DatabasePostgresSeeder @Inject() (
override val database: Database,
blockPostgresDAO: BlockPostgresDAO)
extends AnormPostgresDataHandler {
def firstBlock(block: Block): ApplicationResult[Unit] = database.withConnection { implicit conn =>
val result = blockPostgresDAO.upsert(block)
result
.map(_ => Good(()))
.getOrElse(throw new RuntimeException("Unable to add the first block"))
}
/**
* Creates the new latest block assuming there is a previous block.
*
* @param newBlock
* @return
*/
def newLatestBlock(newBlock: Block): ApplicationResult[Unit] = withTransaction { implicit conn =>
val insertedBlock = for {
_ <- blockPostgresDAO.upsert(newBlock)
} yield ()
val result = insertedBlock
.flatMap(_ => newBlock.previousBlockhash)
.flatMap { previousBlockhash =>
for {
previous <- blockPostgresDAO.getBy(previousBlockhash)
newPrevious = previous.copy(nextBlockhash = Some(newBlock.hash))
_ <- blockPostgresDAO.upsert(newPrevious)
} yield ()
}
result
.map(Good(_))
.getOrElse(throw new RuntimeException("Unable to add the new latest block"))
}
def replaceLatestBlock(newBlock: Block, orphan: Blockhash): ApplicationResult[Unit] = withTransaction { implicit conn =>
val result = for {
_ <- blockPostgresDAO.upsert(newBlock)
_ <- blockPostgresDAO.delete(orphan)
} yield ()
result
.map(Good(_))
.getOrElse(throw new RuntimeException("Unable to replace latest block"))
}
}

76
server/app/com/xsn/explorer/processors/BlockEventsProcessor.scala

@ -0,0 +1,76 @@
package com.xsn.explorer.processors
import javax.inject.Inject
import com.alexitc.playsonify.core.FutureOr.Implicits.FutureOps
import com.alexitc.playsonify.core.{ApplicationResult, FutureApplicationResult}
import com.xsn.explorer.data.BlockBlockingDataHandler
import com.xsn.explorer.data.anorm.DatabasePostgresSeeder
import com.xsn.explorer.models.Blockhash
import com.xsn.explorer.models.rpc.Block
import com.xsn.explorer.services.XSNService
import org.scalactic.Good
import org.slf4j.LoggerFactory
import scala.concurrent.ExecutionContext.Implicits.global
/**
* Process events related to blocks coming from the RPC server.
*/
class BlockEventsProcessor @Inject() (
xsnService: XSNService,
databasePostgresSeeder: DatabasePostgresSeeder,
blockBlockingDataHandler: BlockBlockingDataHandler) {
private val logger = LoggerFactory.getLogger(this.getClass)
/**
* There is a new latest block in the blockchain, we need to sync our database.
*
* The following scenarios are handled for the new latest block:
*
* 1. It is new on our database, we just append it.
* - current blocks = A -> B, new latest block = C, new blocks = A -> B -> C
* - current blocks = empty, new latest block = A, new blocks = A
*
* 2. It is an existing block, hence, the previous from the latest one in our database .
* - current blocks = A -> B -> C, new latest block = B, new blocks = A -> B
*
* @param blockhash the new latest block
*/
def newLatestBlock(blockhash: Blockhash): FutureApplicationResult[Unit] = {
xsnService
.getBlock(blockhash)
.toFutureOr
.mapWithError { block =>
scala.concurrent.blocking {
newLatestBlock(block)
}
}
.toFuture
}
private def newLatestBlock(newBlock: Block): ApplicationResult[Unit] = {
val latestBlockResult = blockBlockingDataHandler.getLatestBlock()
latestBlockResult
.map { latestBlock =>
if (newBlock.previousBlockhash.contains(latestBlock.hash)) {
// latest block -> new block
logger.info(s"existing latest block = ${latestBlock.hash.string} -> new latest block = ${newBlock.hash.string}")
databasePostgresSeeder.newLatestBlock(newBlock)
} else if (newBlock.hash == latestBlock.hash) {
// duplicated msg
logger.info(s"ignoring duplicated latest block = ${newBlock.hash.string}")
Good(())
} else {
logger.info(s"orphan block = ${latestBlock.hash.string}, new latest block = ${newBlock.hash.string}")
databasePostgresSeeder.replaceLatestBlock(newBlock, latestBlock.hash)
}
}
.getOrElse {
logger.info(s"first block = ${newBlock.hash.string}")
databasePostgresSeeder.firstBlock(newBlock)
}
}
}

11
server/test/com/xsn/explorer/helpers/BlockLoader.scala

@ -1,5 +1,7 @@
package com.xsn.explorer.helpers
import java.io.File
import com.xsn.explorer.models.rpc.Block
import play.api.libs.json.{JsValue, Json}
@ -20,4 +22,13 @@ object BlockLoader {
case _ => throw new RuntimeException(s"Block $blockhash not found")
}
}
def all(): List[Block] = {
val uri = getClass.getResource(s"/$BasePath")
new File(uri.getPath)
.listFiles()
.toList
.map(_.getName)
.map(get)
}
}

25
server/test/com/xsn/explorer/helpers/FileBasedXSNService.scala

@ -0,0 +1,25 @@
package com.xsn.explorer.helpers
import com.alexitc.playsonify.core.FutureApplicationResult
import com.xsn.explorer.errors.BlockNotFoundError
import com.xsn.explorer.models.Blockhash
import com.xsn.explorer.models.rpc.Block
import org.scalactic.{Good, One, Or}
import scala.concurrent.Future
class FileBasedXSNService extends DummyXSNService {
private lazy val blockMap = BlockLoader.all().map { block => block.hash -> block }.toMap
override def getBlock(blockhash: Blockhash): FutureApplicationResult[Block] = {
val maybe = blockMap.get(blockhash)
val result = Or.from(maybe, One(BlockNotFoundError))
Future.successful(result)
}
override def getLatestBlock(): FutureApplicationResult[Block] = {
val block = blockMap.values.maxBy(_.height.int)
Future.successful(Good(block))
}
}

103
server/test/com/xsn/explorer/processors/BlockEventsProcessorSpec.scala

@ -0,0 +1,103 @@
package com.xsn.explorer.processors
import com.alexitc.playsonify.core.FutureApplicationResult
import com.xsn.explorer.data.anorm.dao.BlockPostgresDAO
import com.xsn.explorer.data.anorm.{BlockPostgresDataHandler, DatabasePostgresSeeder}
import com.xsn.explorer.data.common.PostgresDataHandlerSpec
import com.xsn.explorer.errors.BlockNotFoundError
import com.xsn.explorer.helpers.{BlockLoader, FileBasedXSNService}
import com.xsn.explorer.models.rpc.Block
import org.scalactic.{Bad, Good}
import org.scalatest.BeforeAndAfter
import org.scalatest.concurrent.ScalaFutures
import scala.concurrent.Future
class BlockEventsProcessorSpec extends PostgresDataHandlerSpec with ScalaFutures with BeforeAndAfter {
lazy val dataHandler = new BlockPostgresDataHandler(database, new BlockPostgresDAO)
lazy val dataSeeder = new DatabasePostgresSeeder(database, new BlockPostgresDAO)
before {
clearDatabase()
}
"newLatestBlock" should {
"process first block" in {
val block0 = BlockLoader.get("00000c822abdbb23e28f79a49d29b41429737c6c7e15df40d1b1f1b35907ae34")
val xsnService = new FileBasedXSNService {
override def getLatestBlock(): FutureApplicationResult[Block] = {
Future.successful(Bad(BlockNotFoundError).accumulating)
}
}
val processor = new BlockEventsProcessor(xsnService, dataSeeder, dataHandler)
whenReady(processor.newLatestBlock(block0.hash)) { result =>
result.isGood mustEqual true
}
}
"process a new block" in {
val block0 = BlockLoader.get("00000c822abdbb23e28f79a49d29b41429737c6c7e15df40d1b1f1b35907ae34")
val block1 = BlockLoader.get("000003fb382f6892ae96594b81aa916a8923c70701de4e7054aac556c7271ef7")
val block2 = BlockLoader.get("000004645e2717b556682e3c642a4c6e473bf25c653ff8e8c114a3006040ffb8")
List(block0, block1).foreach(dataHandler.upsert)
val xsnService = new FileBasedXSNService {
override def getLatestBlock(): FutureApplicationResult[Block] = {
val latest = block1.copy(nextBlockhash = None)
Future.successful(Good(latest).accumulating)
}
}
val processor = new BlockEventsProcessor(xsnService, dataSeeder, dataHandler)
whenReady(processor.newLatestBlock(block2.hash)) { result =>
result.isGood mustEqual true
val blocks = List(block0, block1, block2)
verifyBlockchain(blocks)
}
}
"process a rechain" in {
val block0 = BlockLoader.get("00000c822abdbb23e28f79a49d29b41429737c6c7e15df40d1b1f1b35907ae34")
val block1 = BlockLoader.get("000003fb382f6892ae96594b81aa916a8923c70701de4e7054aac556c7271ef7")
val block2 = BlockLoader.get("000004645e2717b556682e3c642a4c6e473bf25c653ff8e8c114a3006040ffb8")
List(block0, block1, block2).foreach(dataHandler.upsert)
val xsnService = new FileBasedXSNService {
override def getLatestBlock(): FutureApplicationResult[Block] = {
val latest = block2.copy(nextBlockhash = None)
Future.successful(Good(latest).accumulating)
}
}
val processor = new BlockEventsProcessor(xsnService, dataSeeder, dataHandler)
whenReady(processor.newLatestBlock(block1.hash)) { result =>
result.isGood mustEqual true
val blocks = List(block0, block1)
verifyBlockchain(blocks)
}
}
}
private def verifyBlockchain(blocks: List[Block]) = {
countBlocks() mustEqual blocks.size
blocks.foreach { block =>
val dbBlock = dataHandler.getBy(block.hash).get
dbBlock.previousBlockhash mustEqual block.previousBlockhash
dbBlock.nextBlockhash mustEqual block.nextBlockhash
}
}
private def clearDatabase() = {
database.withConnection { implicit conn =>
_root_.anorm.SQL("""DELETE FROM blocks""").execute()
}
}
private def countBlocks() = {
database.withConnection { implicit conn =>
_root_.anorm.SQL("""SELECT COUNT(*) FROM blocks""").as(_root_.anorm.SqlParser.scalar[Int].single)
}
}
}
Loading…
Cancel
Save