diff --git a/server/app/com/xsn/explorer/processors/BlockOps.scala b/server/app/com/xsn/explorer/processors/BlockOps.scala new file mode 100644 index 0000000..f354de6 --- /dev/null +++ b/server/app/com/xsn/explorer/processors/BlockOps.scala @@ -0,0 +1,65 @@ +package com.xsn.explorer.processors + +import javax.inject.Inject + +import com.alexitc.playsonify.core.FutureApplicationResult +import com.alexitc.playsonify.core.FutureOr.Implicits.FutureOps +import com.xsn.explorer.data.DatabaseSeeder +import com.xsn.explorer.data.async.{BlockFutureDataHandler, DatabaseFutureSeeder} +import com.xsn.explorer.errors.PostgresForeignKeyViolationError +import com.xsn.explorer.models.Transaction +import com.xsn.explorer.models.rpc.Block +import org.scalactic.{Bad, Good, One} + +import scala.concurrent.{ExecutionContext, Future} + +/** + * BlockOps is contains useful operations used for seeding the database, while these methods might be adequate to be on + * the [[com.xsn.explorer.services.BlockService]] class, they were added here to avoid growing that service, + * in any case, these methods are used by the seeding process. + */ +class BlockOps @Inject() ( + databaseSeeder: DatabaseFutureSeeder, + blockDataHandler: BlockFutureDataHandler)( + implicit val ec: ExecutionContext) { + + import BlockOps._ + + /** + * Creates a new block and in case of a conflict due to repeated height, the old block is replaced. + */ + def createBlock(block: Block, transactions: List[Transaction]): FutureApplicationResult[Result] = { + val command = DatabaseSeeder.CreateBlockCommand(block, transactions) + databaseSeeder + .newBlock(command) + .flatMap { + case Good(_) => Future.successful(Good(Result.BlockCreated)) + case Bad(One(PostgresForeignKeyViolationError("height", _))) => onRepeatedBlockHeight(block, transactions) + case Bad(errors) => Future.successful(Bad(errors)) + } + } + + private def onRepeatedBlockHeight(newBlock: Block, newTransactions: List[Transaction]): FutureApplicationResult[Result] = { + val result = for { + orphanBlock <- blockDataHandler.getBy(newBlock.height).toFutureOr + + replaceCommand = DatabaseSeeder.ReplaceBlockCommand( + orphanBlock = orphanBlock, + newBlock = newBlock, + newTransactions = newTransactions) + + _ <- databaseSeeder.replaceBlock(replaceCommand).toFutureOr + } yield Result.BlockReplacedByHeight + + result.toFuture + } +} + +object BlockOps { + + sealed trait Result + object Result { + case object BlockCreated extends Result + case object BlockReplacedByHeight extends Result + } +} \ No newline at end of file diff --git a/server/app/com/xsn/explorer/tasks/BackwardsSynchronizerTask.scala b/server/app/com/xsn/explorer/tasks/BackwardsSynchronizerTask.scala index e194f31..4200f36 100644 --- a/server/app/com/xsn/explorer/tasks/BackwardsSynchronizerTask.scala +++ b/server/app/com/xsn/explorer/tasks/BackwardsSynchronizerTask.scala @@ -4,11 +4,11 @@ import javax.inject.{Inject, Singleton} import com.alexitc.playsonify.core.FutureApplicationResult import com.alexitc.playsonify.core.FutureOr.Implicits.{FutureListOps, FutureOps, OrOps} -import com.xsn.explorer.data.DatabaseSeeder -import com.xsn.explorer.data.async.{BlockFutureDataHandler, DatabaseFutureSeeder} +import com.xsn.explorer.data.async.BlockFutureDataHandler import com.xsn.explorer.errors.BlockNotFoundError import com.xsn.explorer.models.rpc.Block import com.xsn.explorer.models.{Blockhash, Height} +import com.xsn.explorer.processors.BlockOps import com.xsn.explorer.services.{TransactionService, XSNService} import org.scalactic.{Bad, Good, One, Or} import org.slf4j.LoggerFactory @@ -18,7 +18,7 @@ import scala.concurrent.Future import scala.util.control.NonFatal /** - * As the dababse sync process is slow, it's useful to fill it using a fast server and then, release to the + * As the database sync process is slow, it's useful to fill it using a fast server and then, release to the * production server, in this case might have a database updated until block N while the SQS messages started * at N + X, leaving X missing blocks, this task, syncs the database from block N + X to block N. */ @@ -27,7 +27,7 @@ class BackwardsSynchronizerTask @Inject() ( xsnService: XSNService, transactionService: TransactionService, blockDataHandler: BlockFutureDataHandler, - databaseSeeder: DatabaseFutureSeeder) { + blockOps: BlockOps) { private val logger = LoggerFactory.getLogger(this.getClass) private val lock = new Object @@ -116,8 +116,7 @@ class BackwardsSynchronizerTask @Inject() ( block <- xsnService.getBlock(blockhash).toFutureOr transactions <- block.transactions.map(transactionService.getTransaction).toFutureOr - command = DatabaseSeeder.CreateBlockCommand(block, transactions) - _ <- databaseSeeder.newBlock(command).toFutureOr + _ <- blockOps.createBlock(block, transactions).toFutureOr _ = logger.debug(s"Block ${block.height.int} saved") _ <- block diff --git a/server/test/com/xsn/explorer/processors/BlockOpsSpec.scala b/server/test/com/xsn/explorer/processors/BlockOpsSpec.scala new file mode 100644 index 0000000..ee0b8d0 --- /dev/null +++ b/server/test/com/xsn/explorer/processors/BlockOpsSpec.scala @@ -0,0 +1,51 @@ +package com.xsn.explorer.processors + +import com.xsn.explorer.data.anorm.dao.{BalancePostgresDAO, BlockPostgresDAO, TransactionPostgresDAO} +import com.xsn.explorer.data.anorm.interpreters.FieldOrderingSQLInterpreter +import com.xsn.explorer.data.anorm.{BlockPostgresDataHandler, DatabasePostgresSeeder} +import com.xsn.explorer.data.async.{BlockFutureDataHandler, DatabaseFutureSeeder} +import com.xsn.explorer.data.common.PostgresDataHandlerSpec +import com.xsn.explorer.helpers.{BlockLoader, Executors, FileBasedXSNService} +import org.scalactic.Good +import org.scalatest.BeforeAndAfter +import org.scalatest.concurrent.ScalaFutures + +class BlockOpsSpec extends PostgresDataHandlerSpec with ScalaFutures with BeforeAndAfter { + + before { + clearDatabase() + } + + lazy val dataHandler = new BlockPostgresDataHandler(database, new BlockPostgresDAO) + lazy val dataSeeder = new DatabasePostgresSeeder( + database, + new BlockPostgresDAO, + new TransactionPostgresDAO, + new BalancePostgresDAO(new FieldOrderingSQLInterpreter)) + + lazy val xsnService = new FileBasedXSNService + + lazy val blockOps = new BlockOps( + new DatabaseFutureSeeder(dataSeeder)(Executors.databaseEC), + new BlockFutureDataHandler(dataHandler)(Executors.databaseEC)) + + + val block1 = BlockLoader.get("000003fb382f6892ae96594b81aa916a8923c70701de4e7054aac556c7271ef7") + val block2 = BlockLoader.get("000004645e2717b556682e3c642a4c6e473bf25c653ff8e8c114a3006040ffb8") + + "createBlock" should { + "create a new block" in { + whenReady(blockOps.createBlock(block1, List.empty)) { result => + result mustEqual Good(BlockOps.Result.BlockCreated) + } + } + + "replace a block if the height already exist" in { + whenReady(blockOps.createBlock(block1, List.empty)) { _.isGood mustEqual true } + + whenReady(blockOps.createBlock(block2.copy(height = block1.height), List.empty)) { result => + result mustEqual Good(BlockOps.Result.BlockReplacedByHeight) + } + } + } +}