Browse Source

server: Remove legacy seeder tasks (#38)

The SQS seeder and the Poller seeder are removed, they weren't
adapted to the linear synchronization process and they are not
required anymore.

Also, all related classes and dependencies that where used by
these tasks were removed too.
prometheus-integration
Alexis Hernandez 7 years ago
parent
commit
28570f80f5
  1. 23
      server/app/com/xsn/explorer/config/SeederConfig.scala
  2. 34
      server/app/com/xsn/explorer/data/DatabaseSeeder.scala
  3. 144
      server/app/com/xsn/explorer/data/anorm/DatabasePostgresSeeder.scala
  4. 23
      server/app/com/xsn/explorer/data/async/DatabaseFutureSeeder.scala
  5. 20
      server/app/com/xsn/explorer/modules/AmazonSQSAsyncModule.scala
  6. 1
      server/app/com/xsn/explorer/modules/ConfigModule.scala
  7. 1
      server/app/com/xsn/explorer/modules/DataHandlerModule.scala
  8. 6
      server/app/com/xsn/explorer/modules/PollingSeederModule.scala
  9. 6
      server/app/com/xsn/explorer/modules/SeederModule.scala
  10. 175
      server/app/com/xsn/explorer/processors/BlockEventsProcessor.scala
  11. 65
      server/app/com/xsn/explorer/processors/BlockOps.scala
  12. 137
      server/app/com/xsn/explorer/tasks/BackwardsSynchronizerTask.scala
  13. 64
      server/app/com/xsn/explorer/tasks/PollingSeederTask.scala
  14. 112
      server/app/com/xsn/explorer/tasks/SQSSeederTask.scala
  15. 2
      server/build.sbt
  16. 9
      server/conf/application.conf
  17. 326
      server/test/com/xsn/explorer/processors/BlockEventsProcessorSpec.scala
  18. 51
      server/test/com/xsn/explorer/processors/BlockOpsSpec.scala
  19. 4
      server/test/controllers/common/MyAPISpec.scala

23
server/app/com/xsn/explorer/config/SeederConfig.scala

@ -1,23 +0,0 @@
package com.xsn.explorer.config
import javax.inject.Inject
import play.api.Configuration
trait SeederConfig {
def queueUrl: String
def isEnabled: Boolean
}
class PlaySeederConfig @Inject() (configuration: Configuration) extends SeederConfig {
override def queueUrl: String = {
configuration.get[String]("seeder.queueUrl")
}
override def isEnabled: Boolean = {
configuration.get[Boolean]("seeder.enabled")
}
}

34
server/app/com/xsn/explorer/data/DatabaseSeeder.scala

@ -1,34 +0,0 @@
package com.xsn.explorer.data
import com.alexitc.playsonify.core.ApplicationResult
import com.xsn.explorer.models.Transaction
import com.xsn.explorer.models.rpc.Block
import scala.language.higherKinds
trait DatabaseSeeder[F[_]] {
import DatabaseSeeder._
/**
* The database has some blocks, we are adding a new a block.
*/
def newBlock(command: CreateBlockCommand): F[Unit]
/**
* The database has some blocks but there is a rechain happening, we need to
* replace our current latest block with the new latest block.
*/
def replaceBlock(command: ReplaceBlockCommand): F[Unit]
}
object DatabaseSeeder {
case class CreateBlockCommand(block: Block, transactions: List[Transaction])
case class ReplaceBlockCommand(
orphanBlock: Block,
newBlock: Block, newTransactions: List[Transaction])
}
trait DatabaseBlockingSeeder extends DatabaseSeeder[ApplicationResult]

144
server/app/com/xsn/explorer/data/anorm/DatabasePostgresSeeder.scala

@ -1,144 +0,0 @@
package com.xsn.explorer.data.anorm
import java.sql.Connection
import javax.inject.Inject
import com.alexitc.playsonify.core.ApplicationResult
import com.xsn.explorer.data.DatabaseBlockingSeeder
import com.xsn.explorer.data.DatabaseSeeder._
import com.xsn.explorer.data.anorm.dao.{BalancePostgresDAO, BlockPostgresDAO, TransactionPostgresDAO}
import com.xsn.explorer.models.rpc.Block
import com.xsn.explorer.models.{Address, Balance, Transaction}
import com.xsn.explorer.util.Extensions.ListOptionExt
import org.scalactic.Good
import org.slf4j.LoggerFactory
import play.api.db.Database
class DatabasePostgresSeeder @Inject() (
override val database: Database,
blockPostgresDAO: BlockPostgresDAO,
transactionPostgresDAO: TransactionPostgresDAO,
balancePostgresDAO: BalancePostgresDAO)
extends DatabaseBlockingSeeder
with AnormPostgresDataHandler {
private val logger = LoggerFactory.getLogger(this.getClass)
override def newBlock(command: CreateBlockCommand): ApplicationResult[Unit] = withTransaction { implicit conn =>
val result = for {
_ <- upsertBlockCascade(command)
} yield ()
result
.map(Good(_))
.getOrElse(throw new RuntimeException("Unable to add the new block"))
}
override def replaceBlock(command: ReplaceBlockCommand): ApplicationResult[Unit] = withTransaction { implicit conn =>
val createCommand = CreateBlockCommand(command.newBlock, command.newTransactions)
val result = for {
_ <- deleteBlockCascade(command.orphanBlock)
_ <- upsertBlockCascade(createCommand)
} yield ()
result
.map(Good(_))
.getOrElse(throw new RuntimeException("Unable to replace latest block"))
}
private def upsertBlockCascade(command: CreateBlockCommand)(implicit conn: Connection): Option[Unit] = {
val result = for {
// block
_ <- deleteBlockCascade(command.block)
.orElse(Some(()))
_ <- blockPostgresDAO.insert(command.block)
// transactions
_ <- command.transactions.map(tx => transactionPostgresDAO.upsert(tx)).everything
// balances
_ <- balances(command.transactions)
.map { b => balancePostgresDAO.upsert(b) }
.toList
.everything
} yield ()
// link previous block (if possible)
command.block.previousBlockhash.foreach { previousBlockhash =>
blockPostgresDAO
.setNextBlockhash(previousBlockhash, command.block.hash)
}
// link next block (if possible)
command.block.nextBlockhash.foreach { nextBlockhash =>
blockPostgresDAO
.setPreviousBlockhash(nextBlockhash, command.block.hash)
}
result
}
private def deleteBlockCascade(block: Block)(implicit conn: Connection): Option[Unit] = {
// transactions
val deletedTransactions = transactionPostgresDAO.deleteBy(block.hash)
for {
// block
_ <- blockPostgresDAO.delete(block.hash)
// balances
_ <- balances(deletedTransactions)
.map { b => b.copy(spent = -b.spent, received = -b.received) }
.map { b => balancePostgresDAO.upsert(b) }
.toList
.everything
} yield ()
}
private def spendMap(transactions: List[Transaction]): Map[Address, BigDecimal] = {
transactions
.map(_.inputs)
.flatMap { inputs =>
inputs.flatMap { input =>
for {
address <- input.address
value <- input.value
} yield address -> value
}
}
.groupBy(_._1)
.mapValues { list => list.map(_._2).sum }
}
private def receiveMap(transactions: List[Transaction]): Map[Address, BigDecimal] = {
transactions
.map(_.outputs)
.flatMap { outputs =>
outputs.map { output =>
output.address -> output.value
}
}
.groupBy(_._1)
.mapValues { list => list.map(_._2).sum }
}
private def balances(transactions: List[Transaction]) = {
val spentList = spendMap(transactions).map { case (address, spent) =>
Balance(address, spent = spent)
}
val receiveList = receiveMap(transactions).map { case (address, received) =>
Balance(address, received = received)
}
(spentList ++ receiveList)
.groupBy(_.address)
.mapValues { _.reduce(mergeBalances) }
.values
}
def mergeBalances(a: Balance, b: Balance): Balance = {
Balance(a.address, spent = a.spent + b.spent, received = a.received + b.received)
}
}

23
server/app/com/xsn/explorer/data/async/DatabaseFutureSeeder.scala

@ -1,23 +0,0 @@
package com.xsn.explorer.data.async
import javax.inject.Inject
import com.alexitc.playsonify.core.FutureApplicationResult
import com.xsn.explorer.data.{DatabaseBlockingSeeder, DatabaseSeeder}
import com.xsn.explorer.executors.DatabaseExecutionContext
import scala.concurrent.Future
class DatabaseFutureSeeder @Inject() (
blockingSeeder: DatabaseBlockingSeeder)(
implicit ec: DatabaseExecutionContext)
extends DatabaseSeeder[FutureApplicationResult] {
override def newBlock(command: DatabaseSeeder.CreateBlockCommand): FutureApplicationResult[Unit] = Future {
blockingSeeder.newBlock(command)
}
override def replaceBlock(command: DatabaseSeeder.ReplaceBlockCommand): FutureApplicationResult[Unit] = Future {
blockingSeeder.replaceBlock(command)
}
}

20
server/app/com/xsn/explorer/modules/AmazonSQSAsyncModule.scala

@ -1,20 +0,0 @@
package com.xsn.explorer.modules
import javax.inject.Singleton
import com.amazonaws.services.sqs.{AmazonSQSAsync, AmazonSQSAsyncClientBuilder}
import com.google.inject.{AbstractModule, Provides}
class AmazonSQSAsyncModule extends AbstractModule {
override def configure(): Unit = {
}
@Provides
@Singleton
def createSQSClient: AmazonSQSAsync = {
AmazonSQSAsyncClientBuilder
.standard()
.build()
}
}

1
server/app/com/xsn/explorer/modules/ConfigModule.scala

@ -7,7 +7,6 @@ class ConfigModule extends AbstractModule {
override def configure(): Unit = { override def configure(): Unit = {
bind(classOf[RPCConfig]).to(classOf[PlayRPCConfig]) bind(classOf[RPCConfig]).to(classOf[PlayRPCConfig])
bind(classOf[SeederConfig]).to(classOf[PlaySeederConfig])
bind(classOf[LedgerSynchronizerConfig]).to(classOf[LedgerSynchronizerPlayConfig]) bind(classOf[LedgerSynchronizerConfig]).to(classOf[LedgerSynchronizerPlayConfig])
} }
} }

1
server/app/com/xsn/explorer/modules/DataHandlerModule.scala

@ -10,7 +10,6 @@ class DataHandlerModule extends AbstractModule {
bind(classOf[BlockBlockingDataHandler]).to(classOf[BlockPostgresDataHandler]) bind(classOf[BlockBlockingDataHandler]).to(classOf[BlockPostgresDataHandler])
bind(classOf[BalanceBlockingDataHandler]).to(classOf[BalancePostgresDataHandler]) bind(classOf[BalanceBlockingDataHandler]).to(classOf[BalancePostgresDataHandler])
bind(classOf[StatisticsBlockingDataHandler]).to(classOf[StatisticsPostgresDataHandler]) bind(classOf[StatisticsBlockingDataHandler]).to(classOf[StatisticsPostgresDataHandler])
bind(classOf[DatabaseBlockingSeeder]).to(classOf[DatabasePostgresSeeder])
bind(classOf[TransactionBlockingDataHandler]).to(classOf[TransactionPostgresDataHandler]) bind(classOf[TransactionBlockingDataHandler]).to(classOf[TransactionPostgresDataHandler])
bind(classOf[LedgerBlockingDataHandler]).to(classOf[LedgerPostgresDataHandler]) bind(classOf[LedgerBlockingDataHandler]).to(classOf[LedgerPostgresDataHandler])
} }

6
server/app/com/xsn/explorer/modules/PollingSeederModule.scala

@ -1,6 +0,0 @@
package com.xsn.explorer.modules
import com.xsn.explorer.tasks.PollingSeederTask
import play.api.inject.{SimpleModule, bind}
class PollingSeederModule extends SimpleModule(bind[PollingSeederTask].toSelf.eagerly())

6
server/app/com/xsn/explorer/modules/SeederModule.scala

@ -1,6 +0,0 @@
package com.xsn.explorer.modules
import com.xsn.explorer.tasks.SQSSeederTask
import play.api.inject.{SimpleModule, _}
class SeederModule extends SimpleModule(bind[SQSSeederTask].toSelf.eagerly())

175
server/app/com/xsn/explorer/processors/BlockEventsProcessor.scala

@ -1,175 +0,0 @@
package com.xsn.explorer.processors
import javax.inject.Inject
import com.alexitc.playsonify.core.FutureApplicationResult
import com.alexitc.playsonify.core.FutureOr.Implicits.{FutureListOps, FutureOps}
import com.xsn.explorer.data.DatabaseSeeder
import com.xsn.explorer.data.async.{BlockFutureDataHandler, DatabaseFutureSeeder}
import com.xsn.explorer.errors._
import com.xsn.explorer.models.rpc.Block
import com.xsn.explorer.models.{Blockhash, Transaction}
import com.xsn.explorer.services.{TransactionService, XSNService}
import org.scalactic.{Bad, Good, One}
import org.slf4j.LoggerFactory
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
/**
* Process events related to blocks coming from the RPC server.
*/
class BlockEventsProcessor @Inject() (
xsnService: XSNService,
transactionService: TransactionService,
databaseSeeder: DatabaseFutureSeeder,
blockDataHandler: BlockFutureDataHandler,
blockOps: BlockOps) {
import BlockEventsProcessor._
private val logger = LoggerFactory.getLogger(this.getClass)
/**
* There is a possible new block in the blockchain, we need to sync our database.
*
* The following scenarios are handled for the new block:
*
* 0. The block is already present, we ignore it.
*
* x. The block can not be fully retrieved from the rpc server, we ignore it.
*
* x. The first block in the database (not exactly the first block in the chain), we just add it.
* - Example: current blocks = empty, new block = A, new blocks = A
*
* x. A new block on our database having a link to our latest block in the database:
* - The new block is added.
* - The next block from our latest block is set to the next block.
* - Example: current blocks = A -> B, new block = C, new blocks = A -> B -> C
*
* 2. It is the previous block from our latest block (rechain).
* - current blocks = A -> B -> C, new latest block = B, new blocks = A -> B
*
* 3. None of previous cases, it is a block that might be missing in our chain.
*
* @param blockhash the blockhash to process
*/
def processBlock(blockhash: Blockhash): FutureApplicationResult[Result] = {
val result = for {
block <- xsnService.getBlock(blockhash).toFutureOr
transactions <- block.transactions.map(transactionService.getTransaction).toFutureOr
r <- processBlock(block, transactions).toFutureOr
} yield r
result.toFuture.map {
case Good(r) => Good(r)
case Bad(One(BlockNotFoundError)) => Good(MissingBlockIgnored)
case Bad(One(TransactionNotFoundError)) => Good(MissingBlockIgnored)
case Bad(errors) => Bad(errors)
}
}
private def processBlock(newBlock: Block, newTransactions: List[Transaction]): FutureApplicationResult[Result] = {
def onRechain(orphanBlock: Block): FutureApplicationResult[Result] = {
val command = DatabaseSeeder.ReplaceBlockCommand(
orphanBlock = orphanBlock,
newBlock = newBlock,
newTransactions = newTransactions)
val result = for {
_ <- databaseSeeder.replaceBlock(command).toFutureOr
} yield RechainDone(orphanBlock, newBlock)
result.toFuture
}
def onFirstBlock: FutureApplicationResult[Result] = {
logger.info(s"first block = ${newBlock.hash.string}")
val command = DatabaseSeeder.CreateBlockCommand(newBlock, newTransactions)
databaseSeeder.newBlock(command)
.toFutureOr
.map(_ => FirstBlockCreated(newBlock))
.toFuture
}
def onNewBlock(latestBlock: Block): FutureApplicationResult[Result] = {
logger.info(s"existing latest block = ${latestBlock.hash.string} -> new latest block = ${newBlock.hash.string}")
val command = DatabaseSeeder.CreateBlockCommand(newBlock, newTransactions)
databaseSeeder
.newBlock(command)
.toFutureOr
.map(_ => NewBlockAppended(newBlock))
.toFuture
}
def onMissingBlock(): FutureApplicationResult[Result] = {
blockDataHandler
.getBy(newBlock.hash)
.flatMap {
case Good(_) =>
logger.info(s"The block ${newBlock.hash.string} is not missing but duplicated, ignoring")
Future.successful { Good(ExistingBlockIgnored(newBlock)) }
case Bad(One(BlockNotFoundError)) =>
blockOps
.createBlock(newBlock, newTransactions)
.map {
case Good(BlockOps.Result.BlockCreated) => Good(MissingBlockProcessed(newBlock))
case Good(BlockOps.Result.BlockReplacedByHeight) => Good(ReplacedByBlockHeight(newBlock))
case Bad(errors) => Bad(errors)
}
case Bad(errors) =>
Future.successful(Bad(errors))
}
}
val result = for {
latestBlockMaybe <- blockDataHandler
.getLatestBlock()
.map {
case Good(block) => Good(Some(block))
case Bad(One(BlockNotFoundError)) => Good(None)
case Bad(errors) => Bad(errors)
}
.toFutureOr
r <- latestBlockMaybe
.map { latestBlock =>
if (newBlock.hash == latestBlock.hash) {
// duplicated msg
logger.info(s"ignoring duplicated latest block = ${newBlock.hash.string}")
Future.successful(Good(ExistingBlockIgnored(newBlock)))
} else if (newBlock.previousBlockhash.contains(latestBlock.hash)) {
// latest block -> new block
onNewBlock(latestBlock)
} else if (latestBlock.previousBlockhash.contains(newBlock.hash)) {
logger.info(s"rechain, orphan block = ${latestBlock.hash.string}, new latest block = ${newBlock.hash.string}")
onRechain(latestBlock)
} else {
logger.info(s"Adding possible missing block = ${newBlock.hash.string}")
onMissingBlock()
}
}
.getOrElse(onFirstBlock)
.toFutureOr
} yield r
result.toFuture
}
}
object BlockEventsProcessor {
sealed trait Result
case class FirstBlockCreated(block: Block) extends Result
case class MissingBlockProcessed(block: Block) extends Result
case class ExistingBlockIgnored(block: Block) extends Result
case class NewBlockAppended(block: Block) extends Result
case class RechainDone(orphanBlock: Block, newBlock: Block) extends Result
case object MissingBlockIgnored extends Result
case class ReplacedByBlockHeight(newBlock: Block) extends Result
}

65
server/app/com/xsn/explorer/processors/BlockOps.scala

@ -1,65 +0,0 @@
package com.xsn.explorer.processors
import javax.inject.Inject
import com.alexitc.playsonify.core.FutureApplicationResult
import com.alexitc.playsonify.core.FutureOr.Implicits.FutureOps
import com.xsn.explorer.data.DatabaseSeeder
import com.xsn.explorer.data.async.{BlockFutureDataHandler, DatabaseFutureSeeder}
import com.xsn.explorer.errors.PostgresForeignKeyViolationError
import com.xsn.explorer.models.Transaction
import com.xsn.explorer.models.rpc.Block
import org.scalactic.{Bad, Good, One}
import scala.concurrent.{ExecutionContext, Future}
/**
* BlockOps is contains useful operations used for seeding the database, while these methods might be adequate to be on
* the [[com.xsn.explorer.services.BlockService]] class, they were added here to avoid growing that service,
* in any case, these methods are used by the seeding process.
*/
class BlockOps @Inject() (
databaseSeeder: DatabaseFutureSeeder,
blockDataHandler: BlockFutureDataHandler)(
implicit val ec: ExecutionContext) {
import BlockOps._
/**
* Creates a new block and in case of a conflict due to repeated height, the old block is replaced.
*/
def createBlock(block: Block, transactions: List[Transaction]): FutureApplicationResult[Result] = {
val command = DatabaseSeeder.CreateBlockCommand(block, transactions)
databaseSeeder
.newBlock(command)
.flatMap {
case Good(_) => Future.successful(Good(Result.BlockCreated))
case Bad(One(PostgresForeignKeyViolationError("height", _))) => onRepeatedBlockHeight(block, transactions)
case Bad(errors) => Future.successful(Bad(errors))
}
}
private def onRepeatedBlockHeight(newBlock: Block, newTransactions: List[Transaction]): FutureApplicationResult[Result] = {
val result = for {
orphanBlock <- blockDataHandler.getBy(newBlock.height).toFutureOr
replaceCommand = DatabaseSeeder.ReplaceBlockCommand(
orphanBlock = orphanBlock,
newBlock = newBlock,
newTransactions = newTransactions)
_ <- databaseSeeder.replaceBlock(replaceCommand).toFutureOr
} yield Result.BlockReplacedByHeight
result.toFuture
}
}
object BlockOps {
sealed trait Result
object Result {
case object BlockCreated extends Result
case object BlockReplacedByHeight extends Result
}
}

137
server/app/com/xsn/explorer/tasks/BackwardsSynchronizerTask.scala

@ -1,137 +0,0 @@
package com.xsn.explorer.tasks
import javax.inject.{Inject, Singleton}
import com.alexitc.playsonify.core.FutureApplicationResult
import com.alexitc.playsonify.core.FutureOr.Implicits.{FutureListOps, FutureOps, OrOps}
import com.xsn.explorer.data.async.BlockFutureDataHandler
import com.xsn.explorer.errors.BlockNotFoundError
import com.xsn.explorer.models.rpc.Block
import com.xsn.explorer.models.{Blockhash, Height}
import com.xsn.explorer.processors.BlockOps
import com.xsn.explorer.services.{TransactionService, XSNService}
import org.scalactic.{Bad, Good, One, Or}
import org.slf4j.LoggerFactory
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import scala.util.control.NonFatal
/**
* As the database sync process is slow, it's useful to fill it using a fast server and then, release to the
* production server, in this case might have a database updated until block N while the SQS messages started
* at N + X, leaving X missing blocks, this task, syncs the database from block N + X to block N.
*/
@Singleton
class BackwardsSynchronizerTask @Inject() (
xsnService: XSNService,
transactionService: TransactionService,
blockDataHandler: BlockFutureDataHandler,
blockOps: BlockOps) {
private val logger = LoggerFactory.getLogger(this.getClass)
private val lock = new Object
/**
* TODO: Until https://github.com/X9Developers/XSN/issues/32 is fixed, ignore the genesis block.
*/
private val FirstBlockHeight = Height(1)
private var running = false
/**
*
* @param block a block that is stored in our database
*/
def sync(block: Block): Unit = {
if (running) {
()
} else {
lock.synchronized {
if (running) {
()
} else {
running = true
tryToSync(block).onComplete { _ =>
running = false
}
}
}
}
}
private def tryToSync(block: Block) = {
val futureOr = for {
previous <- Or.from(block.previousBlockhash, One(BlockNotFoundError)).toFutureOr
_ <- {
if (block.height == FirstBlockHeight) {
// no sync required
Future.successful(Good(()))
} else {
// sync
logger.info(s"Sync might be required from block ${block.height.int}")
val r = doSync(previous)
r.foreach {
case Good(_) =>
logger.info("Sync completed")
case _ => ()
}
r
}
}.toFutureOr
} yield ()
val result = futureOr.toFuture
result.foreach {
case Bad(errors) =>
logger.error(s"Failed to sync blocks, errors = $errors")
case _ => ()
}
result.recover {
case NonFatal(ex) =>
logger.error(s"Failed to sync blocks", ex)
}
result
}
private def checkAndSync(blockhash: Blockhash): FutureApplicationResult[Unit] = {
blockDataHandler.getBy(blockhash).flatMap {
case Bad(One(BlockNotFoundError)) => doSync(blockhash)
case Bad(errors) => Future.successful(Bad(errors))
case Good(_) =>
logger.debug("No more blocks to sync")
Future.successful(Good(()))
}
}
private def doSync(blockhash: Blockhash): FutureApplicationResult[Unit] = {
val result = for {
block <- xsnService.getBlock(blockhash).toFutureOr
transactions <- block.transactions.map(transactionService.getTransaction).toFutureOr
_ <- blockOps.createBlock(block, transactions).toFutureOr
_ = logger.debug(s"Block ${block.height.int} saved")
_ <- block
.previousBlockhash
.filter(_ => block.height.int > FirstBlockHeight.int)
.map { previous =>
checkAndSync(previous)
}
.getOrElse {
logger.debug(s"No more blocks to sync")
Future.successful(Good(()))
}
.toFutureOr
} yield ()
result.toFuture
}
}

64
server/app/com/xsn/explorer/tasks/PollingSeederTask.scala

@ -1,64 +0,0 @@
package com.xsn.explorer.tasks
import javax.inject.Inject
import akka.actor.ActorSystem
import com.alexitc.playsonify.core.FutureOr.Implicits.FutureOps
import com.xsn.explorer.processors.BlockEventsProcessor
import com.xsn.explorer.services.XSNService
import org.scalactic.Bad
import org.slf4j.LoggerFactory
import scala.concurrent.ExecutionContext
import scala.concurrent.duration.DurationInt
import scala.util.control.NonFatal
class PollingSeederTask @Inject() (
actorSystem: ActorSystem,
xsnService: XSNService,
blockEventsProcessor: BlockEventsProcessor,
backwardsSynchronizerTask: BackwardsSynchronizerTask)(
implicit ec: ExecutionContext) {
private val logger = LoggerFactory.getLogger(this.getClass)
start()
def start() = {
logger.info("Starting polling seeder task")
actorSystem.scheduler.schedule(initialDelay = 15.seconds, interval = 1.minute)(run)
}
private def run(): Unit = {
val result = for {
block <- xsnService.getLatestBlock().toFutureOr
result <- blockEventsProcessor.processBlock(block.hash).toFutureOr
} yield onBlockResult(result)
val _ = result
.toFuture
.map {
case Bad(errors) => logger.error(s"Failed to sync latest block, errors = $errors")
case _ => ()
}
.recover {
case NonFatal(ex) => logger.error("Failed to sync latest block", ex)
}
}
private def onBlockResult(eventResult: BlockEventsProcessor.Result) = eventResult match {
case BlockEventsProcessor.FirstBlockCreated(block) =>
backwardsSynchronizerTask.sync(block)
case BlockEventsProcessor.RechainDone(_, newBlock) =>
backwardsSynchronizerTask.sync(newBlock)
case BlockEventsProcessor.MissingBlockProcessed(block) =>
backwardsSynchronizerTask.sync(block)
case BlockEventsProcessor.ReplacedByBlockHeight(newBlock) =>
backwardsSynchronizerTask.sync(newBlock)
case _ => ()
}
}

112
server/app/com/xsn/explorer/tasks/SQSSeederTask.scala

@ -1,112 +0,0 @@
package com.xsn.explorer.tasks
import javax.inject.{Inject, Singleton}
import akka.stream.Materializer
import akka.stream.alpakka.sqs.SqsSourceSettings
import akka.stream.alpakka.sqs.scaladsl.SqsSource
import akka.stream.scaladsl.Sink
import com.amazonaws.services.sqs.AmazonSQSAsync
import com.amazonaws.services.sqs.model.Message
import com.xsn.explorer.config.SeederConfig
import com.xsn.explorer.models.Blockhash
import com.xsn.explorer.processors.BlockEventsProcessor
import org.scalactic.{Bad, Good}
import org.slf4j.LoggerFactory
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.control.NonFatal
import scala.util.{Failure, Success}
@Singleton
class SQSSeederTask @Inject() (
config: SeederConfig,
blockEventsProcessor: BlockEventsProcessor,
backwardsSynchronizerTask: BackwardsSynchronizerTask)(
implicit sqs: AmazonSQSAsync,
materializer: Materializer) {
private val logger = LoggerFactory.getLogger(this.getClass)
private val settings = SqsSourceSettings.Defaults.copy(maxBatchSize = 1, maxBufferSize = 1)
if (config.isEnabled) {
run()
} else {
logger.info("Seeder is disabled")
}
def run(): Unit = {
logger.info("Starting seeder")
def f(): Unit = {
SqsSource(config.queueUrl, settings)
.runWith(Sink.foreach(handleMessage))
.onComplete {
case Failure(ex) =>
logger.error("Failed to stream SQS messages, restarting seeder", ex)
f()
case Success(_) =>
logger.info("SQS stream completed")
}
}
f()
}
private def handleMessage(message: Message): Unit = {
def onBlockhash(blockhash: Blockhash) = {
val result = blockEventsProcessor.processBlock(blockhash)
result.recover {
case NonFatal(ex) =>
logger.error(s"Failed to process the latest block = ${blockhash.string}", ex)
}
result.foreach {
case Bad(errors) =>
logger.error(s"Failed to process the latest block = ${blockhash.string}, errors = $errors")
case Good(_) =>
logger.info(s"Block processed successfully = ${blockhash.string}")
sqs.deleteMessageAsync(config.queueUrl, message.getReceiptHandle)
}
result.foreach {
case Good(eventResult) => onBlockResult(eventResult)
case _ => ()
}
}
val body = message.getBody
Blockhash
.from(body)
.orElse {
logger.warn(s"Ignoring invalid message: $body")
sqs.deleteMessageAsync(config.queueUrl, message.getReceiptHandle)
None
}
.foreach(onBlockhash)
}
private def onBlockResult(eventResult: BlockEventsProcessor.Result) = eventResult match {
case BlockEventsProcessor.FirstBlockCreated(block) =>
backwardsSynchronizerTask.sync(block)
case BlockEventsProcessor.NewBlockAppended(_) => ()
case BlockEventsProcessor.RechainDone(_, newBlock) =>
backwardsSynchronizerTask.sync(newBlock)
case BlockEventsProcessor.MissingBlockProcessed(block) =>
backwardsSynchronizerTask.sync(block)
case BlockEventsProcessor.ExistingBlockIgnored(block) =>
backwardsSynchronizerTask.sync(block)
case BlockEventsProcessor.MissingBlockIgnored => ()
case BlockEventsProcessor.ReplacedByBlockHeight(newBlock) =>
backwardsSynchronizerTask.sync(newBlock)
}
}

2
server/build.sbt

@ -42,8 +42,6 @@ libraryDependencies += "org.slf4j" % "slf4j-api" % "1.7.25"
libraryDependencies += "ch.qos.logback" % "logback-core" % "1.2.3" libraryDependencies += "ch.qos.logback" % "logback-core" % "1.2.3"
libraryDependencies += "ch.qos.logback" % "logback-classic" % "1.2.3" libraryDependencies += "ch.qos.logback" % "logback-classic" % "1.2.3"
libraryDependencies += "com.lightbend.akka" %% "akka-stream-alpakka-sqs" % "0.18"
libraryDependencies ++= Seq( libraryDependencies ++= Seq(
"com.beachape" %% "enumeratum" % "1.5.13" "com.beachape" %% "enumeratum" % "1.5.13"
) )

9
server/conf/application.conf

@ -31,14 +31,6 @@ rpc {
password = ${?XSN_RPC_PASSWORD} password = ${?XSN_RPC_PASSWORD}
} }
seeder {
enabled = true
queueUrl = "https://sqs.us-east-2.amazonaws.com/984148963792/blocks-local.fifo"
enabled = ${?XSN_SEEDER_ENABLED}
queueUrl = ${?XSN_SEEDER_QUEUE_URL}
}
synchronizer { synchronizer {
enabled = true enabled = true
initialDelay = "10 seconds" initialDelay = "10 seconds"
@ -47,7 +39,6 @@ synchronizer {
enabled = ${?XSN_SYNCHRONIZER_ENABLED} enabled = ${?XSN_SYNCHRONIZER_ENABLED}
} }
play.modules.enabled += "com.xsn.explorer.modules.AmazonSQSAsyncModule"
play.modules.enabled += "com.xsn.explorer.modules.DataHandlerModule" play.modules.enabled += "com.xsn.explorer.modules.DataHandlerModule"
play.modules.enabled += "com.xsn.explorer.modules.ConfigModule" play.modules.enabled += "com.xsn.explorer.modules.ConfigModule"
play.modules.enabled += "com.xsn.explorer.modules.ExecutorsModule" play.modules.enabled += "com.xsn.explorer.modules.ExecutorsModule"

326
server/test/com/xsn/explorer/processors/BlockEventsProcessorSpec.scala

@ -1,326 +0,0 @@
package com.xsn.explorer.processors
import com.alexitc.playsonify.core.FutureApplicationResult
import com.alexitc.playsonify.models._
import com.alexitc.playsonify.validators.PaginatedQueryValidator
import com.xsn.explorer.data.anorm._
import com.xsn.explorer.data.anorm.dao.{BalancePostgresDAO, BlockPostgresDAO, StatisticsPostgresDAO, TransactionPostgresDAO}
import com.xsn.explorer.data.anorm.interpreters.FieldOrderingSQLInterpreter
import com.xsn.explorer.data.async.{BlockFutureDataHandler, DatabaseFutureSeeder, TransactionFutureDataHandler}
import com.xsn.explorer.data.common.PostgresDataHandlerSpec
import com.xsn.explorer.errors.{BlockNotFoundError, TransactionNotFoundError}
import com.xsn.explorer.helpers.{BlockLoader, Executors, FileBasedXSNService}
import com.xsn.explorer.models.fields.BalanceField
import com.xsn.explorer.models.rpc.{Block, Transaction}
import com.xsn.explorer.models.{Blockhash, TransactionId}
import com.xsn.explorer.parsers.TransactionOrderingParser
import com.xsn.explorer.processors.BlockEventsProcessor._
import com.xsn.explorer.services.{TransactionService, XSNService}
import org.scalactic.{Bad, Good}
import org.scalatest.BeforeAndAfter
import org.scalatest.concurrent.ScalaFutures
import scala.concurrent.Future
class BlockEventsProcessorSpec extends PostgresDataHandlerSpec with ScalaFutures with BeforeAndAfter {
lazy val dataHandler = new BlockPostgresDataHandler(database, new BlockPostgresDAO)
lazy val transactionDataHandler = new TransactionPostgresDataHandler(
database,
new TransactionPostgresDAO(new FieldOrderingSQLInterpreter))
lazy val xsnService = new FileBasedXSNService
lazy val processor = blockEventsProcessor(xsnService)
val blockList = List(
BlockLoader.get("00000c822abdbb23e28f79a49d29b41429737c6c7e15df40d1b1f1b35907ae34"),
BlockLoader.get("000003fb382f6892ae96594b81aa916a8923c70701de4e7054aac556c7271ef7"),
BlockLoader.get("000004645e2717b556682e3c642a4c6e473bf25c653ff8e8c114a3006040ffb8"),
BlockLoader.get("00000766115b26ecbc09cd3a3db6870fdaf2f049d65a910eb2f2b48b566ca7bd")
)
before {
clearDatabase()
}
"newLatestBlock" should {
"process genesis block" in {
val block = blockList.head
whenReady(processor.processBlock(block.hash)) { result =>
result mustEqual Good(FirstBlockCreated(block))
}
}
"process a new block" in {
val block0 = blockList(0)
val block1 = blockList(1)
val block2 = blockList(2)
createBlocks(processor, block0, block1)
whenReady(processor.processBlock(block2.hash)) { result =>
result mustEqual Good(NewBlockAppended(block2))
val blocks = List(block0, block1, block2)
verifyBlockchain(blocks)
}
}
"process a rechain" in {
val block0 = blockList(0)
val block1 = blockList(1)
val block2 = blockList(2)
createBlocks(processor, block0, block1, block2)
whenReady(processor.processBlock(block1.hash)) {
case Good(RechainDone(orphanBlock, newBlock)) =>
orphanBlock.hash mustEqual block2.hash
newBlock.hash mustEqual block1.hash
val blocks = List(block0, block1)
verifyBlockchain(blocks)
case e => fail()
}
}
"process a repeated block without affecting the balances" in {
val block0 = blockList(0)
val block1 = blockList(1)
val block2 = blockList(2)
createBlocks(processor, block0, block1, block2)
val statsDataHandler = new StatisticsPostgresDataHandler(database, new StatisticsPostgresDAO)
val expectedStats = statsDataHandler.getStatistics().get
whenReady(processor.processBlock(block0.hash)) { result =>
result.isGood mustEqual true
val blocks = List(block0, block1, block2)
verifyBlockchain(blocks)
val stats = statsDataHandler.getStatistics().get
stats.totalSupply mustEqual expectedStats.totalSupply
stats.circulatingSupply mustEqual expectedStats.circulatingSupply
}
}
"process a block without spent index on transactions" in {
val block = BlockLoader.get("000001ff95f22b8d82db14a5c5e9f725e8239e548be43c668766e7ddaee81924")
.copy(previousBlockhash = None)
val processor = blockEventsProcessor(block)
whenReady(processor.processBlock(block.hash)) { result =>
result.isGood mustEqual true
val balanceDataHandler = new BalancePostgresDataHandler(database, new BalancePostgresDAO(new FieldOrderingSQLInterpreter))
val balance = balanceDataHandler.get(
PaginatedQuery(Offset(0), Limit(100)),
FieldOrdering(BalanceField.Available, OrderingCondition.DescendingOrder))
.get
.data
.find(_.address.string == "XdJnCKYNwzCz8ATv8Eu75gonaHyfr9qXg9")
.get
balance.spent mustEqual BigDecimal("76500000.000000000000000")
}
}
"process a rechain without corrupting the balances table" in {
val block0 = blockList(0)
val block1 = blockList(1)
val block2 = blockList(2)
createBlocks(processor, block0, block1, block2)
whenReady(processor.processBlock(block1.hash)) { result =>
result.isGood mustEqual true
val blocks = List(block0, block1)
verifyBlockchain(blocks)
val balanceDataHandler = new BalancePostgresDataHandler(database, new BalancePostgresDAO(new FieldOrderingSQLInterpreter))
val balances = balanceDataHandler.get(
PaginatedQuery(Offset(0), Limit(100)),
FieldOrdering(BalanceField.Available, OrderingCondition.DescendingOrder))
.get
.data
val balance = balances
.find(_.address.string == "XdJnCKYNwzCz8ATv8Eu75gonaHyfr9qXg9")
.get
balance.received mustEqual BigDecimal(0)
balance.spent mustEqual BigDecimal(0)
}
}
"ignore orphan block on rare rechain events when the rpc server doesn't have the block anymore" in {
// see https://github.com/X9Developers/block-explorer/issues/6
val block0 = blockList(0)
val block1 = blockList(1)
val block2 = blockList(2)
val block3 = blockList(3)
val xsnService = new FileBasedXSNService {
override def getBlock(blockhash: Blockhash): FutureApplicationResult[Block] = {
if (blockhash == block3.hash) {
Future.successful(Bad(BlockNotFoundError).accumulating)
} else {
super.getBlock(blockhash)
}
}
}
val processor = blockEventsProcessor(xsn = xsnService)
createBlocks(processor, block0, block1, block2)
/**
* When processing the latest block, a rechain event has occurred in the rpc server which leads to
* a case that we can't retrieve the block information, the block should be ignored.
*/
whenReady(processor.processBlock(block3.hash)) { result =>
result mustEqual Good(MissingBlockIgnored)
val blocks = List(block0, block1, block2)
verifyBlockchain(blocks)
}
}
"ignore orphan block on rare rechain events when the rpc server doesn't have the a transaction from the block anymore" in {
// see https://github.com/X9Developers/block-explorer/issues/6
val block0 = blockList(0)
val block1 = blockList(1)
val block2 = blockList(2)
val block3 = blockList(3)
val xsnService = new FileBasedXSNService {
override def getTransaction(txid: TransactionId): FutureApplicationResult[Transaction] = {
if (txid == block3.transactions.last) {
Future.successful(Bad(TransactionNotFoundError).accumulating)
} else {
super.getTransaction(txid)
}
}
}
val processor = blockEventsProcessor(xsn = xsnService)
createBlocks(processor, block0, block1, block2)
/**
* When processing the latest block, a rechain event has occurred in the rpc server which leads to
* a case that we can't retrieve a specific transaction, the block should be ignored.
*/
whenReady(processor.processBlock(block3.hash)) { result =>
result mustEqual Good(MissingBlockIgnored)
val blocks = List(block0, block1, block2)
verifyBlockchain(blocks)
}
}
"remove orphan block on rare rechain events when the block height already exists" in {
// see https://github.com/X9Developers/block-explorer/issues/6
val block0 = blockList(0)
val block1 = blockList(1)
val block2 = blockList(2)
val block3 = blockList(3)
.copy(height = block2.height)
.copy(previousBlockhash = block2.previousBlockhash)
val processor = blockEventsProcessor(block3)
createBlocks(processor, block0, block1, block2)
whenReady(processor.processBlock(block3.hash)) { result =>
result mustEqual Good(ReplacedByBlockHeight(block3))
val blocks = List(block0, block1.copy(nextBlockhash = Some(block3.hash)), block3)
verifyBlockchain(blocks)
// ensure block2 has been removed
dataHandler.getBy(block2.hash) mustEqual Bad(BlockNotFoundError).accumulating
}
}
"keep the correct previous_blockhash on rare events" in {
// see https://github.com/X9Developers/block-explorer/issues/12
val block0 = blockList(0)
val block1 = blockList(1)
val block2 = blockList(2)
val block3 = blockList(3)
val rareBlock3 = block3.copy(previousBlockhash = block2.previousBlockhash)
val processor = blockEventsProcessor(rareBlock3)
createBlocks(processor, block0, block1, rareBlock3)
whenReady(processor.processBlock(block2.hash)) { result =>
result mustEqual Good(MissingBlockProcessed(block2))
val blocks = List(block0, block1, block2, block3)
verifyBlockchain(blocks)
}
}
}
private def verifyBlockchain(blocks: List[Block]) = {
countBlocks() mustEqual blocks.size
blocks.foreach { block =>
val dbBlock = dataHandler.getBy(block.hash).get
dbBlock.previousBlockhash mustEqual block.previousBlockhash
dbBlock.nextBlockhash mustEqual block.nextBlockhash
}
}
private def countBlocks() = {
database.withConnection { implicit conn =>
_root_.anorm.SQL("""SELECT COUNT(*) FROM blocks""").as(_root_.anorm.SqlParser.scalar[Int].single)
}
}
private def createBlocks(processor: BlockEventsProcessor, blocks: Block*) = {
blocks
.map(_.hash)
.foreach { block =>
whenReady(processor.processBlock(block)) { result =>
result.isGood mustEqual true
}
}
}
private def blockEventsProcessor(blocks: Block*): BlockEventsProcessor = {
val xsn = new FileBasedXSNService {
override def getBlock(blockhash: Blockhash): FutureApplicationResult[Block] = {
blocks
.find(_.hash == blockhash)
.map { block => Future.successful(Good(block)) }
.getOrElse(super.getBlock(blockhash))
}
}
blockEventsProcessor(xsn)
}
private def blockEventsProcessor(xsn: XSNService): BlockEventsProcessor = {
val dataSeeder = new DatabasePostgresSeeder(
database,
new BlockPostgresDAO,
new TransactionPostgresDAO(new FieldOrderingSQLInterpreter),
new BalancePostgresDAO(new FieldOrderingSQLInterpreter))
val blockOps = new BlockOps(
new DatabaseFutureSeeder(dataSeeder)(Executors.databaseEC),
new BlockFutureDataHandler(dataHandler)(Executors.databaseEC))
val transactionService = new TransactionService(
new PaginatedQueryValidator,
new TransactionOrderingParser,
xsn,
new TransactionFutureDataHandler(transactionDataHandler)(Executors.databaseEC))(Executors.globalEC)
new BlockEventsProcessor(
xsn,
transactionService,
new DatabaseFutureSeeder(dataSeeder)(Executors.databaseEC),
new BlockFutureDataHandler(dataHandler)(Executors.databaseEC),
blockOps)
}
}

51
server/test/com/xsn/explorer/processors/BlockOpsSpec.scala

@ -1,51 +0,0 @@
package com.xsn.explorer.processors
import com.xsn.explorer.data.anorm.dao.{BalancePostgresDAO, BlockPostgresDAO, TransactionPostgresDAO}
import com.xsn.explorer.data.anorm.interpreters.FieldOrderingSQLInterpreter
import com.xsn.explorer.data.anorm.{BlockPostgresDataHandler, DatabasePostgresSeeder}
import com.xsn.explorer.data.async.{BlockFutureDataHandler, DatabaseFutureSeeder}
import com.xsn.explorer.data.common.PostgresDataHandlerSpec
import com.xsn.explorer.helpers.{BlockLoader, Executors, FileBasedXSNService}
import org.scalactic.Good
import org.scalatest.BeforeAndAfter
import org.scalatest.concurrent.ScalaFutures
class BlockOpsSpec extends PostgresDataHandlerSpec with ScalaFutures with BeforeAndAfter {
before {
clearDatabase()
}
lazy val dataHandler = new BlockPostgresDataHandler(database, new BlockPostgresDAO)
lazy val dataSeeder = new DatabasePostgresSeeder(
database,
new BlockPostgresDAO,
new TransactionPostgresDAO(new FieldOrderingSQLInterpreter),
new BalancePostgresDAO(new FieldOrderingSQLInterpreter))
lazy val xsnService = new FileBasedXSNService
lazy val blockOps = new BlockOps(
new DatabaseFutureSeeder(dataSeeder)(Executors.databaseEC),
new BlockFutureDataHandler(dataHandler)(Executors.databaseEC))
val block1 = BlockLoader.get("000003fb382f6892ae96594b81aa916a8923c70701de4e7054aac556c7271ef7")
val block2 = BlockLoader.get("000004645e2717b556682e3c642a4c6e473bf25c653ff8e8c114a3006040ffb8")
"createBlock" should {
"create a new block" in {
whenReady(blockOps.createBlock(block1.copy(previousBlockhash = None), List.empty)) { result =>
result mustEqual Good(BlockOps.Result.BlockCreated)
}
}
"replace a block if the height already exist" in {
whenReady(blockOps.createBlock(block1.copy(previousBlockhash = None), List.empty)) { _.isGood mustEqual true }
whenReady(blockOps.createBlock(block2.copy(height = block1.height, previousBlockhash = None), List.empty)) { result =>
result mustEqual Good(BlockOps.Result.BlockReplacedByHeight)
}
}
}
}

4
server/test/controllers/common/MyAPISpec.scala

@ -1,7 +1,7 @@
package controllers.common package controllers.common
import com.alexitc.playsonify.test.PlayAPISpec import com.alexitc.playsonify.test.PlayAPISpec
import com.xsn.explorer.modules.{PollerSynchronizerModule, PollingSeederModule, SeederModule} import com.xsn.explorer.modules.PollerSynchronizerModule
import org.slf4j.LoggerFactory import org.slf4j.LoggerFactory
import play.api.db.{DBApi, Database, Databases} import play.api.db.{DBApi, Database, Databases}
import play.api.inject.bind import play.api.inject.bind
@ -46,8 +46,6 @@ trait MyAPISpec extends PlayAPISpec {
override val guiceApplicationBuilder: GuiceApplicationBuilder = GuiceApplicationBuilder(loadConfiguration = loadConfigWithoutEvolutions) override val guiceApplicationBuilder: GuiceApplicationBuilder = GuiceApplicationBuilder(loadConfiguration = loadConfigWithoutEvolutions)
.in(Mode.Test) .in(Mode.Test)
.disable(classOf[SeederModule])
.disable(classOf[PollingSeederModule])
.disable(classOf[PollerSynchronizerModule]) .disable(classOf[PollerSynchronizerModule])
.overrides(bind[Database].to(dummyDB)) .overrides(bind[Database].to(dummyDB))
.overrides(bind[DBApi].to(dummyDBApi)) .overrides(bind[DBApi].to(dummyDBApi))

Loading…
Cancel
Save