You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
84 lines
2.4 KiB
84 lines
2.4 KiB
7 years ago
|
package com.xsn.explorer.tasks
|
||
|
|
||
|
import javax.inject.{Inject, Singleton}
|
||
|
|
||
|
import akka.stream.Materializer
|
||
|
import akka.stream.alpakka.sqs.SqsSourceSettings
|
||
|
import akka.stream.alpakka.sqs.scaladsl.SqsSource
|
||
|
import akka.stream.scaladsl.Sink
|
||
|
import com.amazonaws.services.sqs.AmazonSQSAsync
|
||
|
import com.amazonaws.services.sqs.model.Message
|
||
|
import com.xsn.explorer.config.SeederConfig
|
||
|
import com.xsn.explorer.models.Blockhash
|
||
|
import com.xsn.explorer.processors.BlockEventsProcessor
|
||
|
import org.scalactic.{Bad, Good}
|
||
|
import org.slf4j.LoggerFactory
|
||
|
|
||
|
import scala.concurrent.ExecutionContext.Implicits.global
|
||
|
import scala.util.control.NonFatal
|
||
|
import scala.util.{Failure, Success}
|
||
|
|
||
|
@Singleton
|
||
|
class SQSSeederTask @Inject() (
|
||
|
config: SeederConfig,
|
||
|
blockEventsProcessor: BlockEventsProcessor,
|
||
|
blockSynchronizerTask: BlockSynchronizerTask)(
|
||
|
implicit sqs: AmazonSQSAsync,
|
||
|
materializer: Materializer) {
|
||
|
|
||
|
private val logger = LoggerFactory.getLogger(this.getClass)
|
||
|
|
||
|
private val settings = SqsSourceSettings.Defaults.copy(maxBatchSize = 1, maxBufferSize = 1)
|
||
|
|
||
|
if (config.isEnabled) {
|
||
|
run()
|
||
|
} else {
|
||
|
logger.info("Seeder is disabled")
|
||
|
}
|
||
|
|
||
|
def run(): Unit = {
|
||
|
logger.info("Starting seeder")
|
||
|
|
||
|
SqsSource(config.queueUrl, settings)
|
||
|
.runWith(Sink.foreach(handleMessage))
|
||
|
.onComplete {
|
||
|
case Failure(ex) =>
|
||
|
logger.error("Failed to stream SQS messages", ex)
|
||
|
|
||
|
case Success(_) =>
|
||
|
logger.info("SQS stream completed")
|
||
|
}
|
||
|
}
|
||
|
|
||
|
private def handleMessage(message: Message): Unit = {
|
||
|
def onBlockhash(blockhash: Blockhash) = {
|
||
|
val result = blockEventsProcessor.newLatestBlock(blockhash)
|
||
|
|
||
|
result.recover {
|
||
|
case NonFatal(ex) =>
|
||
|
logger.error(s"Failed to process the latest block = ${blockhash.string}", ex)
|
||
|
}
|
||
|
|
||
|
result.foreach {
|
||
|
case Bad(errors) =>
|
||
|
logger.error(s"Failed to process the latest block = ${blockhash.string}, errors = $errors")
|
||
|
|
||
|
case Good(_) =>
|
||
|
logger.info(s"Block processed successfully = ${blockhash.string}")
|
||
|
sqs.deleteMessageAsync(config.queueUrl, message.getReceiptHandle)
|
||
|
blockSynchronizerTask.sync()
|
||
|
}
|
||
|
}
|
||
|
|
||
|
val body = message.getBody
|
||
|
Blockhash
|
||
|
.from(body)
|
||
|
.orElse {
|
||
|
logger.warn(s"Ignoring invalid message: $body")
|
||
|
sqs.deleteMessageAsync(config.queueUrl, message.getReceiptHandle)
|
||
|
None
|
||
|
}
|
||
|
.foreach(onBlockhash)
|
||
|
}
|
||
|
}
|