diff --git a/server/app/com/xsn/explorer/services/LedgerSynchronizerService.scala b/server/app/com/xsn/explorer/services/LedgerSynchronizerService.scala index e695d09..50af8e1 100644 --- a/server/app/com/xsn/explorer/services/LedgerSynchronizerService.scala +++ b/server/app/com/xsn/explorer/services/LedgerSynchronizerService.scala @@ -1,14 +1,13 @@ package com.xsn.explorer.services -import javax.inject.Inject - import com.alexitc.playsonify.core.FutureApplicationResult -import com.alexitc.playsonify.core.FutureOr.Implicits.{FutureListOps, FutureOps, OptionOps} +import com.alexitc.playsonify.core.FutureOr.Implicits.{FutureOps, OptionOps} import com.xsn.explorer.data.async.{BlockFutureDataHandler, LedgerFutureDataHandler} import com.xsn.explorer.errors.BlockNotFoundError import com.xsn.explorer.models.rpc.Block import com.xsn.explorer.models.{Blockhash, Height, Transaction} import com.xsn.explorer.util.Extensions.FutureOrExt +import javax.inject.Inject import org.scalactic.Good import org.slf4j.LoggerFactory @@ -33,7 +32,7 @@ class LedgerSynchronizerService @Inject() ( def synchronize(blockhash: Blockhash): FutureApplicationResult[Unit] = { val result = for { block <- xsnService.getBlock(blockhash).toFutureOr - transactions <- block.transactions.map(transactionService.getTransaction).toFutureOr + transactions <- transactionService.getTransactions(block.transactions).toFutureOr _ <- synchronize(block, transactions).toFutureOr } yield () @@ -98,7 +97,7 @@ class LedgerSynchronizerService @Inject() ( val result = for { blockhash <- newBlock.previousBlockhash.toFutureOr(BlockNotFoundError) previousBlock <- xsnService.getBlock(blockhash).toFutureOr - previousTransactions <- previousBlock.transactions.map(transactionService.getTransaction).toFutureOr + previousTransactions <- transactionService.getTransactions(previousBlock.transactions).toFutureOr _ <- synchronize(previousBlock, previousTransactions).toFutureOr _ <- synchronize(newBlock, newTransactions).toFutureOr } yield () @@ -149,7 +148,7 @@ class LedgerSynchronizerService @Inject() ( _ <- previous.toFutureOr blockhash <- xsnService.getBlockhash(Height(height)).toFutureOr block <- xsnService.getBlock(blockhash).toFutureOr - transactions <- block.transactions.map(transactionService.getTransaction).toFutureOr + transactions <- transactionService.getTransactions(block.transactions).toFutureOr _ <- synchronize(block, transactions).toFutureOr } yield () diff --git a/server/app/com/xsn/explorer/services/TransactionService.scala b/server/app/com/xsn/explorer/services/TransactionService.scala index 9922074..2be6862 100644 --- a/server/app/com/xsn/explorer/services/TransactionService.scala +++ b/server/app/com/xsn/explorer/services/TransactionService.scala @@ -1,7 +1,7 @@ package com.xsn.explorer.services import com.alexitc.playsonify.core.FutureOr.Implicits.{FutureListOps, FutureOps, OrOps} -import com.alexitc.playsonify.core.{FutureApplicationResult, FuturePaginatedResult} +import com.alexitc.playsonify.core.{FutureApplicationResult, FutureOr, FuturePaginatedResult} import com.alexitc.playsonify.models.ordering.OrderingQuery import com.alexitc.playsonify.models.pagination.PaginatedQuery import com.alexitc.playsonify.validators.PaginatedQueryValidator @@ -10,8 +10,10 @@ import com.xsn.explorer.errors._ import com.xsn.explorer.models._ import com.xsn.explorer.models.rpc.TransactionVIN import com.xsn.explorer.parsers.TransactionOrderingParser +import com.xsn.explorer.util.Extensions.FutureOrExt import javax.inject.Inject import org.scalactic._ +import org.slf4j.LoggerFactory import play.api.libs.json.{JsObject, JsString, JsValue} import scala.concurrent.{ExecutionContext, Future} @@ -23,6 +25,8 @@ class TransactionService @Inject() ( transactionFutureDataHandler: TransactionFutureDataHandler)( implicit ec: ExecutionContext) { + private val logger = LoggerFactory.getLogger(this.getClass) + private val maxTransactionsPerQuery = 100 def getRawTransaction(txidString: String): FutureApplicationResult[JsValue] = { @@ -76,6 +80,27 @@ class TransactionService @Inject() ( result.toFuture } + def getTransactions(ids: List[TransactionId]): FutureApplicationResult[List[Transaction]] = { + def loadTransactionsSlowly(pending: List[TransactionId]): FutureOr[List[Transaction]] = pending match { + case x :: xs => + for { + tx <- getTransaction(x).toFutureOr + next <- loadTransactionsSlowly(xs) + } yield tx :: next + + case _ => Future.successful(Good(List.empty)).toFutureOr + } + + ids + .map(getTransaction) + .toFutureOr + .recoverWith(XSNWorkQueueDepthExceeded) { + logger.warn("Unable to load transaction due to server overload, loading them slowly") + loadTransactionsSlowly(ids) + } + .toFuture + } + def getTransactions( addressString: String, paginatedQuery: PaginatedQuery, diff --git a/server/app/com/xsn/explorer/util/Extensions.scala b/server/app/com/xsn/explorer/util/Extensions.scala index e8a5c31..5595d75 100644 --- a/server/app/com/xsn/explorer/util/Extensions.scala +++ b/server/app/com/xsn/explorer/util/Extensions.scala @@ -4,7 +4,7 @@ import com.alexitc.playsonify.core.FutureOr import com.alexitc.playsonify.models.ApplicationError import org.scalactic.{Bad, Good, One} -import scala.concurrent.ExecutionContext +import scala.concurrent.{ExecutionContext, Future} object Extensions { @@ -40,5 +40,15 @@ object Extensions { new FutureOr(future) } + + def recoverWith[B >: A](error: ApplicationError)(f: => FutureOr[B])(implicit ec: ExecutionContext): FutureOr[B] = { + val future = inner.toFuture.flatMap { + case Good(result) => Future.successful(Good(result)) + case Bad(One(e)) if e == error => f.toFuture + case Bad(errors) => Future.successful(Bad(errors)) + } + + new FutureOr(future) + } } } diff --git a/server/test/com/xsn/explorer/services/TransactionServiceSpec.scala b/server/test/com/xsn/explorer/services/TransactionServiceSpec.scala new file mode 100644 index 0000000..dd6279c --- /dev/null +++ b/server/test/com/xsn/explorer/services/TransactionServiceSpec.scala @@ -0,0 +1,12 @@ +package com.xsn.explorer.services + +import org.scalatest.WordSpec + +class TransactionServiceSpec extends WordSpec { + + "getTransactions" should { + "handle server overload and load transactions slowly" in { + pending + } + } +}