diff --git a/server/app/com/xsn/explorer/data/anorm/DatabasePostgresSeeder.scala b/server/app/com/xsn/explorer/data/anorm/DatabasePostgresSeeder.scala new file mode 100644 index 0000000..c9c5c66 --- /dev/null +++ b/server/app/com/xsn/explorer/data/anorm/DatabasePostgresSeeder.scala @@ -0,0 +1,62 @@ +package com.xsn.explorer.data.anorm + +import javax.inject.Inject + +import com.alexitc.playsonify.core.ApplicationResult +import com.xsn.explorer.data.anorm.dao.BlockPostgresDAO +import com.xsn.explorer.models.Blockhash +import com.xsn.explorer.models.rpc.Block +import org.scalactic.Good +import play.api.db.Database + +class DatabasePostgresSeeder @Inject() ( + override val database: Database, + blockPostgresDAO: BlockPostgresDAO) + extends AnormPostgresDataHandler { + + def firstBlock(block: Block): ApplicationResult[Unit] = database.withConnection { implicit conn => + val result = blockPostgresDAO.upsert(block) + + result + .map(_ => Good(())) + .getOrElse(throw new RuntimeException("Unable to add the first block")) + } + + /** + * Creates the new latest block assuming there is a previous block. + * + * @param newBlock + * @return + */ + def newLatestBlock(newBlock: Block): ApplicationResult[Unit] = withTransaction { implicit conn => + val insertedBlock = for { + _ <- blockPostgresDAO.upsert(newBlock) + } yield () + + val result = insertedBlock + .flatMap(_ => newBlock.previousBlockhash) + .flatMap { previousBlockhash => + + for { + previous <- blockPostgresDAO.getBy(previousBlockhash) + newPrevious = previous.copy(nextBlockhash = Some(newBlock.hash)) + _ <- blockPostgresDAO.upsert(newPrevious) + } yield () + } + + result + .map(Good(_)) + .getOrElse(throw new RuntimeException("Unable to add the new latest block")) + } + + def replaceLatestBlock(newBlock: Block, orphan: Blockhash): ApplicationResult[Unit] = withTransaction { implicit conn => + val result = for { + _ <- blockPostgresDAO.upsert(newBlock) + _ <- blockPostgresDAO.delete(orphan) + } yield () + + result + .map(Good(_)) + .getOrElse(throw new RuntimeException("Unable to replace latest block")) + } +} diff --git a/server/app/com/xsn/explorer/processors/BlockEventsProcessor.scala b/server/app/com/xsn/explorer/processors/BlockEventsProcessor.scala new file mode 100644 index 0000000..b1f6bf1 --- /dev/null +++ b/server/app/com/xsn/explorer/processors/BlockEventsProcessor.scala @@ -0,0 +1,76 @@ +package com.xsn.explorer.processors + +import javax.inject.Inject + +import com.alexitc.playsonify.core.FutureOr.Implicits.FutureOps +import com.alexitc.playsonify.core.{ApplicationResult, FutureApplicationResult} +import com.xsn.explorer.data.BlockBlockingDataHandler +import com.xsn.explorer.data.anorm.DatabasePostgresSeeder +import com.xsn.explorer.models.Blockhash +import com.xsn.explorer.models.rpc.Block +import com.xsn.explorer.services.XSNService +import org.scalactic.Good +import org.slf4j.LoggerFactory + +import scala.concurrent.ExecutionContext.Implicits.global + +/** + * Process events related to blocks coming from the RPC server. + */ +class BlockEventsProcessor @Inject() ( + xsnService: XSNService, + databasePostgresSeeder: DatabasePostgresSeeder, + blockBlockingDataHandler: BlockBlockingDataHandler) { + + private val logger = LoggerFactory.getLogger(this.getClass) + + /** + * There is a new latest block in the blockchain, we need to sync our database. + * + * The following scenarios are handled for the new latest block: + * + * 1. It is new on our database, we just append it. + * - current blocks = A -> B, new latest block = C, new blocks = A -> B -> C + * - current blocks = empty, new latest block = A, new blocks = A + * + * 2. It is an existing block, hence, the previous from the latest one in our database . + * - current blocks = A -> B -> C, new latest block = B, new blocks = A -> B + * + * @param blockhash the new latest block + */ + def newLatestBlock(blockhash: Blockhash): FutureApplicationResult[Unit] = { + xsnService + .getBlock(blockhash) + .toFutureOr + .mapWithError { block => + scala.concurrent.blocking { + newLatestBlock(block) + } + } + .toFuture + } + + private def newLatestBlock(newBlock: Block): ApplicationResult[Unit] = { + val latestBlockResult = blockBlockingDataHandler.getLatestBlock() + + latestBlockResult + .map { latestBlock => + if (newBlock.previousBlockhash.contains(latestBlock.hash)) { + // latest block -> new block + logger.info(s"existing latest block = ${latestBlock.hash.string} -> new latest block = ${newBlock.hash.string}") + databasePostgresSeeder.newLatestBlock(newBlock) + } else if (newBlock.hash == latestBlock.hash) { + // duplicated msg + logger.info(s"ignoring duplicated latest block = ${newBlock.hash.string}") + Good(()) + } else { + logger.info(s"orphan block = ${latestBlock.hash.string}, new latest block = ${newBlock.hash.string}") + databasePostgresSeeder.replaceLatestBlock(newBlock, latestBlock.hash) + } + } + .getOrElse { + logger.info(s"first block = ${newBlock.hash.string}") + databasePostgresSeeder.firstBlock(newBlock) + } + } +} diff --git a/server/test/com/xsn/explorer/helpers/BlockLoader.scala b/server/test/com/xsn/explorer/helpers/BlockLoader.scala index 80aa665..2de69b8 100644 --- a/server/test/com/xsn/explorer/helpers/BlockLoader.scala +++ b/server/test/com/xsn/explorer/helpers/BlockLoader.scala @@ -1,5 +1,7 @@ package com.xsn.explorer.helpers +import java.io.File + import com.xsn.explorer.models.rpc.Block import play.api.libs.json.{JsValue, Json} @@ -20,4 +22,13 @@ object BlockLoader { case _ => throw new RuntimeException(s"Block $blockhash not found") } } + + def all(): List[Block] = { + val uri = getClass.getResource(s"/$BasePath") + new File(uri.getPath) + .listFiles() + .toList + .map(_.getName) + .map(get) + } } diff --git a/server/test/com/xsn/explorer/helpers/FileBasedXSNService.scala b/server/test/com/xsn/explorer/helpers/FileBasedXSNService.scala new file mode 100644 index 0000000..715471f --- /dev/null +++ b/server/test/com/xsn/explorer/helpers/FileBasedXSNService.scala @@ -0,0 +1,25 @@ +package com.xsn.explorer.helpers + +import com.alexitc.playsonify.core.FutureApplicationResult +import com.xsn.explorer.errors.BlockNotFoundError +import com.xsn.explorer.models.Blockhash +import com.xsn.explorer.models.rpc.Block +import org.scalactic.{Good, One, Or} + +import scala.concurrent.Future + +class FileBasedXSNService extends DummyXSNService { + + private lazy val blockMap = BlockLoader.all().map { block => block.hash -> block }.toMap + + override def getBlock(blockhash: Blockhash): FutureApplicationResult[Block] = { + val maybe = blockMap.get(blockhash) + val result = Or.from(maybe, One(BlockNotFoundError)) + Future.successful(result) + } + + override def getLatestBlock(): FutureApplicationResult[Block] = { + val block = blockMap.values.maxBy(_.height.int) + Future.successful(Good(block)) + } +} diff --git a/server/test/com/xsn/explorer/processors/BlockEventsProcessorSpec.scala b/server/test/com/xsn/explorer/processors/BlockEventsProcessorSpec.scala new file mode 100644 index 0000000..36359ff --- /dev/null +++ b/server/test/com/xsn/explorer/processors/BlockEventsProcessorSpec.scala @@ -0,0 +1,103 @@ +package com.xsn.explorer.processors + +import com.alexitc.playsonify.core.FutureApplicationResult +import com.xsn.explorer.data.anorm.dao.BlockPostgresDAO +import com.xsn.explorer.data.anorm.{BlockPostgresDataHandler, DatabasePostgresSeeder} +import com.xsn.explorer.data.common.PostgresDataHandlerSpec +import com.xsn.explorer.errors.BlockNotFoundError +import com.xsn.explorer.helpers.{BlockLoader, FileBasedXSNService} +import com.xsn.explorer.models.rpc.Block +import org.scalactic.{Bad, Good} +import org.scalatest.BeforeAndAfter +import org.scalatest.concurrent.ScalaFutures + +import scala.concurrent.Future + +class BlockEventsProcessorSpec extends PostgresDataHandlerSpec with ScalaFutures with BeforeAndAfter { + + lazy val dataHandler = new BlockPostgresDataHandler(database, new BlockPostgresDAO) + lazy val dataSeeder = new DatabasePostgresSeeder(database, new BlockPostgresDAO) + + before { + clearDatabase() + } + + "newLatestBlock" should { + "process first block" in { + val block0 = BlockLoader.get("00000c822abdbb23e28f79a49d29b41429737c6c7e15df40d1b1f1b35907ae34") + + val xsnService = new FileBasedXSNService { + override def getLatestBlock(): FutureApplicationResult[Block] = { + Future.successful(Bad(BlockNotFoundError).accumulating) + } + } + + val processor = new BlockEventsProcessor(xsnService, dataSeeder, dataHandler) + whenReady(processor.newLatestBlock(block0.hash)) { result => + result.isGood mustEqual true + } + } + + "process a new block" in { + val block0 = BlockLoader.get("00000c822abdbb23e28f79a49d29b41429737c6c7e15df40d1b1f1b35907ae34") + val block1 = BlockLoader.get("000003fb382f6892ae96594b81aa916a8923c70701de4e7054aac556c7271ef7") + val block2 = BlockLoader.get("000004645e2717b556682e3c642a4c6e473bf25c653ff8e8c114a3006040ffb8") + + List(block0, block1).foreach(dataHandler.upsert) + val xsnService = new FileBasedXSNService { + override def getLatestBlock(): FutureApplicationResult[Block] = { + val latest = block1.copy(nextBlockhash = None) + Future.successful(Good(latest).accumulating) + } + } + + val processor = new BlockEventsProcessor(xsnService, dataSeeder, dataHandler) + whenReady(processor.newLatestBlock(block2.hash)) { result => + result.isGood mustEqual true + val blocks = List(block0, block1, block2) + verifyBlockchain(blocks) + } + } + + "process a rechain" in { + val block0 = BlockLoader.get("00000c822abdbb23e28f79a49d29b41429737c6c7e15df40d1b1f1b35907ae34") + val block1 = BlockLoader.get("000003fb382f6892ae96594b81aa916a8923c70701de4e7054aac556c7271ef7") + val block2 = BlockLoader.get("000004645e2717b556682e3c642a4c6e473bf25c653ff8e8c114a3006040ffb8") + + List(block0, block1, block2).foreach(dataHandler.upsert) + val xsnService = new FileBasedXSNService { + override def getLatestBlock(): FutureApplicationResult[Block] = { + val latest = block2.copy(nextBlockhash = None) + Future.successful(Good(latest).accumulating) + } + } + + val processor = new BlockEventsProcessor(xsnService, dataSeeder, dataHandler) + whenReady(processor.newLatestBlock(block1.hash)) { result => + result.isGood mustEqual true + val blocks = List(block0, block1) + verifyBlockchain(blocks) + } + } + } + + private def verifyBlockchain(blocks: List[Block]) = { + countBlocks() mustEqual blocks.size + blocks.foreach { block => + val dbBlock = dataHandler.getBy(block.hash).get + dbBlock.previousBlockhash mustEqual block.previousBlockhash + dbBlock.nextBlockhash mustEqual block.nextBlockhash + } + } + private def clearDatabase() = { + database.withConnection { implicit conn => + _root_.anorm.SQL("""DELETE FROM blocks""").execute() + } + } + + private def countBlocks() = { + database.withConnection { implicit conn => + _root_.anorm.SQL("""SELECT COUNT(*) FROM blocks""").as(_root_.anorm.SqlParser.scalar[Int].single) + } + } +}