Browse Source

server: Update the SQSSeederTask to call the BlockSynchronizerTask

scalafmt-draft
Alexis Hernandez 7 years ago
parent
commit
e93ec20cdb
  1. 2
      server/app/com/xsn/explorer/data/anorm/DatabasePostgresSeeder.scala
  2. 5
      server/app/com/xsn/explorer/modules/SeederModule.scala
  3. 7
      server/app/com/xsn/explorer/processors/BlockEventsProcessor.scala
  4. 48
      server/app/com/xsn/explorer/tasks/BlockSynchronizerTask.scala

2
server/app/com/xsn/explorer/data/anorm/DatabasePostgresSeeder.scala

@ -70,7 +70,7 @@ class DatabasePostgresSeeder @Inject() (
result result
.map(Good(_)) .map(Good(_))
.getOrElse(throw new RuntimeException("Unable to add the new latest block")) .getOrElse(throw new RuntimeException("Unable to an old block"))
} }
private def upsertBlockCascade(command: CreateBlockCommand)(implicit conn: Connection): Option[Unit] = { private def upsertBlockCascade(command: CreateBlockCommand)(implicit conn: Connection): Option[Unit] = {

5
server/app/com/xsn/explorer/modules/SeederModule.scala

@ -11,6 +11,7 @@ import com.amazonaws.services.sqs.model.Message
import com.xsn.explorer.config.SeederConfig import com.xsn.explorer.config.SeederConfig
import com.xsn.explorer.models.Blockhash import com.xsn.explorer.models.Blockhash
import com.xsn.explorer.processors.BlockEventsProcessor import com.xsn.explorer.processors.BlockEventsProcessor
import com.xsn.explorer.tasks.BlockSynchronizerTask
import org.scalactic.{Bad, Good} import org.scalactic.{Bad, Good}
import org.slf4j.LoggerFactory import org.slf4j.LoggerFactory
import play.api.inject.{SimpleModule, _} import play.api.inject.{SimpleModule, _}
@ -24,7 +25,8 @@ class SeederModule extends SimpleModule(bind[SQSSeederTask].toSelf.eagerly())
@Singleton @Singleton
class SQSSeederTask @Inject() ( class SQSSeederTask @Inject() (
config: SeederConfig, config: SeederConfig,
blockEventsProcessor: BlockEventsProcessor)( blockEventsProcessor: BlockEventsProcessor,
blockSynchronizerTask: BlockSynchronizerTask)(
implicit sqs: AmazonSQSAsync, implicit sqs: AmazonSQSAsync,
materializer: Materializer) { materializer: Materializer) {
@ -68,6 +70,7 @@ class SQSSeederTask @Inject() (
case Good(_) => case Good(_) =>
logger.info(s"Block processed successfully = ${blockhash.string}") logger.info(s"Block processed successfully = ${blockhash.string}")
sqs.deleteMessageAsync(config.queueUrl, message.getReceiptHandle) sqs.deleteMessageAsync(config.queueUrl, message.getReceiptHandle)
blockSynchronizerTask.sync()
} }
} }

7
server/app/com/xsn/explorer/processors/BlockEventsProcessor.scala

@ -6,9 +6,8 @@ import com.alexitc.playsonify.core.FutureOr.Implicits.FutureOps
import com.alexitc.playsonify.core.{ApplicationResult, FutureApplicationResult} import com.alexitc.playsonify.core.{ApplicationResult, FutureApplicationResult}
import com.xsn.explorer.data.BlockBlockingDataHandler import com.xsn.explorer.data.BlockBlockingDataHandler
import com.xsn.explorer.data.anorm.DatabasePostgresSeeder import com.xsn.explorer.data.anorm.DatabasePostgresSeeder
import com.xsn.explorer.models.Blockhash import com.xsn.explorer.models.{Blockhash, Transaction}
import com.xsn.explorer.models.rpc.Block import com.xsn.explorer.models.rpc.Block
import com.xsn.explorer.models.Transaction
import com.xsn.explorer.services.XSNService import com.xsn.explorer.services.XSNService
import com.xsn.explorer.util.Extensions.FutureApplicationResultExt import com.xsn.explorer.util.Extensions.FutureApplicationResultExt
import org.scalactic.Good import org.scalactic.Good
@ -41,13 +40,13 @@ class BlockEventsProcessor @Inject() (
* *
* @param blockhash the new latest block * @param blockhash the new latest block
*/ */
def newLatestBlock(blockhash: Blockhash): FutureApplicationResult[Unit] = { def newLatestBlock(blockhash: Blockhash): FutureApplicationResult[Block] = {
val result = for { val result = for {
block <- xsnService.getBlock(blockhash).toFutureOr block <- xsnService.getBlock(blockhash).toFutureOr
rpcTransactions <- block.transactions.map(xsnService.getTransaction).toFutureOr rpcTransactions <- block.transactions.map(xsnService.getTransaction).toFutureOr
transactions = rpcTransactions.map(Transaction.fromRPC) transactions = rpcTransactions.map(Transaction.fromRPC)
r <- newLatestBlock(block, transactions).toFutureOr r <- newLatestBlock(block, transactions).toFutureOr
} yield r } yield block
result.toFuture result.toFuture
} }

48
server/app/com/xsn/explorer/tasks/BlockSynchronizerTask.scala

@ -57,7 +57,7 @@ class BlockSynchronizerTask @Inject() (
} }
private def tryToSync() = { private def tryToSync() = {
val result = for { val futureOr = for {
firstBlock <- blockDataHandler.getFirstBlock().toFutureOr firstBlock <- blockDataHandler.getFirstBlock().toFutureOr
previous <- Or.from(firstBlock.previousBlockhash, One(BlockNotFoundError)).toFutureOr previous <- Or.from(firstBlock.previousBlockhash, One(BlockNotFoundError)).toFutureOr
_ <- { _ <- {
@ -67,16 +67,38 @@ class BlockSynchronizerTask @Inject() (
} else { } else {
// sync // sync
logger.info(s"Sync required until block ${firstBlock.height.int}") logger.info(s"Sync required until block ${firstBlock.height.int}")
doSync(previous) val r = doSync(previous)
r.foreach {
case Good(_) =>
logger.info("Sync completed")
case _ => ()
}
r
} }
}.toFutureOr }.toFutureOr
} yield () } yield ()
result.toFuture val result = futureOr.toFuture
result.foreach {
case Bad(errors) =>
logger.error(s"Failed to sync blocks, errors = $errors")
case _ => ()
}
result.recover {
case NonFatal(ex) =>
logger.error(s"Failed to sync blocks", ex)
}
result
} }
private def doSync(blockhash: Blockhash): FutureApplicationResult[Unit] = { private def doSync(blockhash: Blockhash): FutureApplicationResult[Unit] = {
val futureOr = for { val result = for {
block <- xsnService.getBlock(blockhash).toFutureOr block <- xsnService.getBlock(blockhash).toFutureOr
rpcTransactions <- block.transactions.map(xsnService.getTransaction).toFutureOr rpcTransactions <- block.transactions.map(xsnService.getTransaction).toFutureOr
transactions = rpcTransactions.map(Transaction.fromRPC) transactions = rpcTransactions.map(Transaction.fromRPC)
@ -87,7 +109,7 @@ class BlockSynchronizerTask @Inject() (
_ <- block _ <- block
.previousBlockhash .previousBlockhash
.filter(_ => block.height.int >= FirstBlockHeight.int) .filter(_ => block.height.int > FirstBlockHeight.int)
.map { previous => .map { previous =>
doSync(previous) doSync(previous)
} }
@ -98,20 +120,6 @@ class BlockSynchronizerTask @Inject() (
.toFutureOr .toFutureOr
} yield () } yield ()
val result = futureOr.toFuture result.toFuture
result.foreach {
case Bad(errors) =>
logger.error(s"Failed to sync block = ${blockhash.string}, errors = $errors")
case _ => ()
}
result.recover {
case NonFatal(ex) =>
logger.error(s"Failed to sync block = ${blockhash.string}", ex)
}
result
} }
} }

Loading…
Cancel
Save