You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

112 lines
3.3 KiB

package com.xsn.explorer.tasks
import javax.inject.{Inject, Singleton}
import akka.stream.Materializer
import akka.stream.alpakka.sqs.SqsSourceSettings
import akka.stream.alpakka.sqs.scaladsl.SqsSource
import akka.stream.scaladsl.Sink
import com.amazonaws.services.sqs.AmazonSQSAsync
import com.amazonaws.services.sqs.model.Message
import com.xsn.explorer.config.SeederConfig
import com.xsn.explorer.models.Blockhash
import com.xsn.explorer.processors.BlockEventsProcessor
import org.scalactic.{Bad, Good}
import org.slf4j.LoggerFactory
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.control.NonFatal
import scala.util.{Failure, Success}
@Singleton
class SQSSeederTask @Inject() (
config: SeederConfig,
blockEventsProcessor: BlockEventsProcessor,
backwardsSynchronizerTask: BackwardsSynchronizerTask)(
implicit sqs: AmazonSQSAsync,
materializer: Materializer) {
private val logger = LoggerFactory.getLogger(this.getClass)
private val settings = SqsSourceSettings.Defaults.copy(maxBatchSize = 1, maxBufferSize = 1)
if (config.isEnabled) {
run()
} else {
logger.info("Seeder is disabled")
}
def run(): Unit = {
logger.info("Starting seeder")
def f(): Unit = {
SqsSource(config.queueUrl, settings)
.runWith(Sink.foreach(handleMessage))
.onComplete {
case Failure(ex) =>
logger.error("Failed to stream SQS messages, restarting seeder", ex)
f()
case Success(_) =>
logger.info("SQS stream completed")
}
}
f()
}
private def handleMessage(message: Message): Unit = {
def onBlockhash(blockhash: Blockhash) = {
val result = blockEventsProcessor.processBlock(blockhash)
result.recover {
case NonFatal(ex) =>
logger.error(s"Failed to process the latest block = ${blockhash.string}", ex)
}
result.foreach {
case Bad(errors) =>
logger.error(s"Failed to process the latest block = ${blockhash.string}, errors = $errors")
case Good(_) =>
logger.info(s"Block processed successfully = ${blockhash.string}")
sqs.deleteMessageAsync(config.queueUrl, message.getReceiptHandle)
}
result.foreach {
case Good(eventResult) => onBlockResult(eventResult)
case _ => ()
}
}
val body = message.getBody
Blockhash
.from(body)
.orElse {
logger.warn(s"Ignoring invalid message: $body")
sqs.deleteMessageAsync(config.queueUrl, message.getReceiptHandle)
None
}
.foreach(onBlockhash)
}
private def onBlockResult(eventResult: BlockEventsProcessor.Result) = eventResult match {
case BlockEventsProcessor.FirstBlockCreated(block) =>
backwardsSynchronizerTask.sync(block)
case BlockEventsProcessor.NewBlockAppended(_) => ()
case BlockEventsProcessor.RechainDone(_, newBlock) =>
backwardsSynchronizerTask.sync(newBlock)
case BlockEventsProcessor.MissingBlockProcessed(block) =>
backwardsSynchronizerTask.sync(block)
case BlockEventsProcessor.ExistingBlockIgnored(block) =>
backwardsSynchronizerTask.sync(block)
case BlockEventsProcessor.MissingBlockIgnored => ()
case BlockEventsProcessor.ReplacedByBlockHeight(newBlock) =>
backwardsSynchronizerTask.sync(newBlock)
}
}