Browse Source

server: Small fix to the BackwardsSynchronizerTask

When running the task, we replace a block when its height already exists,
this should help on the fix for #11 and #12.
scalafmt-draft
Alexis Hernandez 7 years ago
parent
commit
faef6eadf7
  1. 65
      server/app/com/xsn/explorer/processors/BlockOps.scala
  2. 11
      server/app/com/xsn/explorer/tasks/BackwardsSynchronizerTask.scala
  3. 51
      server/test/com/xsn/explorer/processors/BlockOpsSpec.scala

65
server/app/com/xsn/explorer/processors/BlockOps.scala

@ -0,0 +1,65 @@
package com.xsn.explorer.processors
import javax.inject.Inject
import com.alexitc.playsonify.core.FutureApplicationResult
import com.alexitc.playsonify.core.FutureOr.Implicits.FutureOps
import com.xsn.explorer.data.DatabaseSeeder
import com.xsn.explorer.data.async.{BlockFutureDataHandler, DatabaseFutureSeeder}
import com.xsn.explorer.errors.PostgresForeignKeyViolationError
import com.xsn.explorer.models.Transaction
import com.xsn.explorer.models.rpc.Block
import org.scalactic.{Bad, Good, One}
import scala.concurrent.{ExecutionContext, Future}
/**
* BlockOps is contains useful operations used for seeding the database, while these methods might be adequate to be on
* the [[com.xsn.explorer.services.BlockService]] class, they were added here to avoid growing that service,
* in any case, these methods are used by the seeding process.
*/
class BlockOps @Inject() (
databaseSeeder: DatabaseFutureSeeder,
blockDataHandler: BlockFutureDataHandler)(
implicit val ec: ExecutionContext) {
import BlockOps._
/**
* Creates a new block and in case of a conflict due to repeated height, the old block is replaced.
*/
def createBlock(block: Block, transactions: List[Transaction]): FutureApplicationResult[Result] = {
val command = DatabaseSeeder.CreateBlockCommand(block, transactions)
databaseSeeder
.newBlock(command)
.flatMap {
case Good(_) => Future.successful(Good(Result.BlockCreated))
case Bad(One(PostgresForeignKeyViolationError("height", _))) => onRepeatedBlockHeight(block, transactions)
case Bad(errors) => Future.successful(Bad(errors))
}
}
private def onRepeatedBlockHeight(newBlock: Block, newTransactions: List[Transaction]): FutureApplicationResult[Result] = {
val result = for {
orphanBlock <- blockDataHandler.getBy(newBlock.height).toFutureOr
replaceCommand = DatabaseSeeder.ReplaceBlockCommand(
orphanBlock = orphanBlock,
newBlock = newBlock,
newTransactions = newTransactions)
_ <- databaseSeeder.replaceBlock(replaceCommand).toFutureOr
} yield Result.BlockReplacedByHeight
result.toFuture
}
}
object BlockOps {
sealed trait Result
object Result {
case object BlockCreated extends Result
case object BlockReplacedByHeight extends Result
}
}

11
server/app/com/xsn/explorer/tasks/BackwardsSynchronizerTask.scala

@ -4,11 +4,11 @@ import javax.inject.{Inject, Singleton}
import com.alexitc.playsonify.core.FutureApplicationResult import com.alexitc.playsonify.core.FutureApplicationResult
import com.alexitc.playsonify.core.FutureOr.Implicits.{FutureListOps, FutureOps, OrOps} import com.alexitc.playsonify.core.FutureOr.Implicits.{FutureListOps, FutureOps, OrOps}
import com.xsn.explorer.data.DatabaseSeeder import com.xsn.explorer.data.async.BlockFutureDataHandler
import com.xsn.explorer.data.async.{BlockFutureDataHandler, DatabaseFutureSeeder}
import com.xsn.explorer.errors.BlockNotFoundError import com.xsn.explorer.errors.BlockNotFoundError
import com.xsn.explorer.models.rpc.Block import com.xsn.explorer.models.rpc.Block
import com.xsn.explorer.models.{Blockhash, Height} import com.xsn.explorer.models.{Blockhash, Height}
import com.xsn.explorer.processors.BlockOps
import com.xsn.explorer.services.{TransactionService, XSNService} import com.xsn.explorer.services.{TransactionService, XSNService}
import org.scalactic.{Bad, Good, One, Or} import org.scalactic.{Bad, Good, One, Or}
import org.slf4j.LoggerFactory import org.slf4j.LoggerFactory
@ -18,7 +18,7 @@ import scala.concurrent.Future
import scala.util.control.NonFatal import scala.util.control.NonFatal
/** /**
* As the dababse sync process is slow, it's useful to fill it using a fast server and then, release to the * As the database sync process is slow, it's useful to fill it using a fast server and then, release to the
* production server, in this case might have a database updated until block N while the SQS messages started * production server, in this case might have a database updated until block N while the SQS messages started
* at N + X, leaving X missing blocks, this task, syncs the database from block N + X to block N. * at N + X, leaving X missing blocks, this task, syncs the database from block N + X to block N.
*/ */
@ -27,7 +27,7 @@ class BackwardsSynchronizerTask @Inject() (
xsnService: XSNService, xsnService: XSNService,
transactionService: TransactionService, transactionService: TransactionService,
blockDataHandler: BlockFutureDataHandler, blockDataHandler: BlockFutureDataHandler,
databaseSeeder: DatabaseFutureSeeder) { blockOps: BlockOps) {
private val logger = LoggerFactory.getLogger(this.getClass) private val logger = LoggerFactory.getLogger(this.getClass)
private val lock = new Object private val lock = new Object
@ -116,8 +116,7 @@ class BackwardsSynchronizerTask @Inject() (
block <- xsnService.getBlock(blockhash).toFutureOr block <- xsnService.getBlock(blockhash).toFutureOr
transactions <- block.transactions.map(transactionService.getTransaction).toFutureOr transactions <- block.transactions.map(transactionService.getTransaction).toFutureOr
command = DatabaseSeeder.CreateBlockCommand(block, transactions) _ <- blockOps.createBlock(block, transactions).toFutureOr
_ <- databaseSeeder.newBlock(command).toFutureOr
_ = logger.debug(s"Block ${block.height.int} saved") _ = logger.debug(s"Block ${block.height.int} saved")
_ <- block _ <- block

51
server/test/com/xsn/explorer/processors/BlockOpsSpec.scala

@ -0,0 +1,51 @@
package com.xsn.explorer.processors
import com.xsn.explorer.data.anorm.dao.{BalancePostgresDAO, BlockPostgresDAO, TransactionPostgresDAO}
import com.xsn.explorer.data.anorm.interpreters.FieldOrderingSQLInterpreter
import com.xsn.explorer.data.anorm.{BlockPostgresDataHandler, DatabasePostgresSeeder}
import com.xsn.explorer.data.async.{BlockFutureDataHandler, DatabaseFutureSeeder}
import com.xsn.explorer.data.common.PostgresDataHandlerSpec
import com.xsn.explorer.helpers.{BlockLoader, Executors, FileBasedXSNService}
import org.scalactic.Good
import org.scalatest.BeforeAndAfter
import org.scalatest.concurrent.ScalaFutures
class BlockOpsSpec extends PostgresDataHandlerSpec with ScalaFutures with BeforeAndAfter {
before {
clearDatabase()
}
lazy val dataHandler = new BlockPostgresDataHandler(database, new BlockPostgresDAO)
lazy val dataSeeder = new DatabasePostgresSeeder(
database,
new BlockPostgresDAO,
new TransactionPostgresDAO,
new BalancePostgresDAO(new FieldOrderingSQLInterpreter))
lazy val xsnService = new FileBasedXSNService
lazy val blockOps = new BlockOps(
new DatabaseFutureSeeder(dataSeeder)(Executors.databaseEC),
new BlockFutureDataHandler(dataHandler)(Executors.databaseEC))
val block1 = BlockLoader.get("000003fb382f6892ae96594b81aa916a8923c70701de4e7054aac556c7271ef7")
val block2 = BlockLoader.get("000004645e2717b556682e3c642a4c6e473bf25c653ff8e8c114a3006040ffb8")
"createBlock" should {
"create a new block" in {
whenReady(blockOps.createBlock(block1, List.empty)) { result =>
result mustEqual Good(BlockOps.Result.BlockCreated)
}
}
"replace a block if the height already exist" in {
whenReady(blockOps.createBlock(block1, List.empty)) { _.isGood mustEqual true }
whenReady(blockOps.createBlock(block2.copy(height = block1.height), List.empty)) { result =>
result mustEqual Good(BlockOps.Result.BlockReplacedByHeight)
}
}
}
}
Loading…
Cancel
Save