Browse Source

server: Be resilient while loading a batch of transactions

Instead of failing to load a batch of transactions, we try to load
them sequencially to avoid overloading the RPC API.

This allows a smoother synchronization process.
prometheus-integration
Alexis Hernandez 6 years ago
parent
commit
c8b71046a3
  1. 11
      server/app/com/xsn/explorer/services/LedgerSynchronizerService.scala
  2. 27
      server/app/com/xsn/explorer/services/TransactionService.scala
  3. 12
      server/app/com/xsn/explorer/util/Extensions.scala
  4. 12
      server/test/com/xsn/explorer/services/TransactionServiceSpec.scala

11
server/app/com/xsn/explorer/services/LedgerSynchronizerService.scala

@ -1,14 +1,13 @@
package com.xsn.explorer.services
import javax.inject.Inject
import com.alexitc.playsonify.core.FutureApplicationResult
import com.alexitc.playsonify.core.FutureOr.Implicits.{FutureListOps, FutureOps, OptionOps}
import com.alexitc.playsonify.core.FutureOr.Implicits.{FutureOps, OptionOps}
import com.xsn.explorer.data.async.{BlockFutureDataHandler, LedgerFutureDataHandler}
import com.xsn.explorer.errors.BlockNotFoundError
import com.xsn.explorer.models.rpc.Block
import com.xsn.explorer.models.{Blockhash, Height, Transaction}
import com.xsn.explorer.util.Extensions.FutureOrExt
import javax.inject.Inject
import org.scalactic.Good
import org.slf4j.LoggerFactory
@ -33,7 +32,7 @@ class LedgerSynchronizerService @Inject() (
def synchronize(blockhash: Blockhash): FutureApplicationResult[Unit] = {
val result = for {
block <- xsnService.getBlock(blockhash).toFutureOr
transactions <- block.transactions.map(transactionService.getTransaction).toFutureOr
transactions <- transactionService.getTransactions(block.transactions).toFutureOr
_ <- synchronize(block, transactions).toFutureOr
} yield ()
@ -98,7 +97,7 @@ class LedgerSynchronizerService @Inject() (
val result = for {
blockhash <- newBlock.previousBlockhash.toFutureOr(BlockNotFoundError)
previousBlock <- xsnService.getBlock(blockhash).toFutureOr
previousTransactions <- previousBlock.transactions.map(transactionService.getTransaction).toFutureOr
previousTransactions <- transactionService.getTransactions(previousBlock.transactions).toFutureOr
_ <- synchronize(previousBlock, previousTransactions).toFutureOr
_ <- synchronize(newBlock, newTransactions).toFutureOr
} yield ()
@ -149,7 +148,7 @@ class LedgerSynchronizerService @Inject() (
_ <- previous.toFutureOr
blockhash <- xsnService.getBlockhash(Height(height)).toFutureOr
block <- xsnService.getBlock(blockhash).toFutureOr
transactions <- block.transactions.map(transactionService.getTransaction).toFutureOr
transactions <- transactionService.getTransactions(block.transactions).toFutureOr
_ <- synchronize(block, transactions).toFutureOr
} yield ()

27
server/app/com/xsn/explorer/services/TransactionService.scala

@ -1,7 +1,7 @@
package com.xsn.explorer.services
import com.alexitc.playsonify.core.FutureOr.Implicits.{FutureListOps, FutureOps, OrOps}
import com.alexitc.playsonify.core.{FutureApplicationResult, FuturePaginatedResult}
import com.alexitc.playsonify.core.{FutureApplicationResult, FutureOr, FuturePaginatedResult}
import com.alexitc.playsonify.models.ordering.OrderingQuery
import com.alexitc.playsonify.models.pagination.PaginatedQuery
import com.alexitc.playsonify.validators.PaginatedQueryValidator
@ -10,8 +10,10 @@ import com.xsn.explorer.errors._
import com.xsn.explorer.models._
import com.xsn.explorer.models.rpc.TransactionVIN
import com.xsn.explorer.parsers.TransactionOrderingParser
import com.xsn.explorer.util.Extensions.FutureOrExt
import javax.inject.Inject
import org.scalactic._
import org.slf4j.LoggerFactory
import play.api.libs.json.{JsObject, JsString, JsValue}
import scala.concurrent.{ExecutionContext, Future}
@ -23,6 +25,8 @@ class TransactionService @Inject() (
transactionFutureDataHandler: TransactionFutureDataHandler)(
implicit ec: ExecutionContext) {
private val logger = LoggerFactory.getLogger(this.getClass)
private val maxTransactionsPerQuery = 100
def getRawTransaction(txidString: String): FutureApplicationResult[JsValue] = {
@ -76,6 +80,27 @@ class TransactionService @Inject() (
result.toFuture
}
def getTransactions(ids: List[TransactionId]): FutureApplicationResult[List[Transaction]] = {
def loadTransactionsSlowly(pending: List[TransactionId]): FutureOr[List[Transaction]] = pending match {
case x :: xs =>
for {
tx <- getTransaction(x).toFutureOr
next <- loadTransactionsSlowly(xs)
} yield tx :: next
case _ => Future.successful(Good(List.empty)).toFutureOr
}
ids
.map(getTransaction)
.toFutureOr
.recoverWith(XSNWorkQueueDepthExceeded) {
logger.warn("Unable to load transaction due to server overload, loading them slowly")
loadTransactionsSlowly(ids)
}
.toFuture
}
def getTransactions(
addressString: String,
paginatedQuery: PaginatedQuery,

12
server/app/com/xsn/explorer/util/Extensions.scala

@ -4,7 +4,7 @@ import com.alexitc.playsonify.core.FutureOr
import com.alexitc.playsonify.models.ApplicationError
import org.scalactic.{Bad, Good, One}
import scala.concurrent.ExecutionContext
import scala.concurrent.{ExecutionContext, Future}
object Extensions {
@ -40,5 +40,15 @@ object Extensions {
new FutureOr(future)
}
def recoverWith[B >: A](error: ApplicationError)(f: => FutureOr[B])(implicit ec: ExecutionContext): FutureOr[B] = {
val future = inner.toFuture.flatMap {
case Good(result) => Future.successful(Good(result))
case Bad(One(e)) if e == error => f.toFuture
case Bad(errors) => Future.successful(Bad(errors))
}
new FutureOr(future)
}
}
}

12
server/test/com/xsn/explorer/services/TransactionServiceSpec.scala

@ -0,0 +1,12 @@
package com.xsn.explorer.services
import org.scalatest.WordSpec
class TransactionServiceSpec extends WordSpec {
"getTransactions" should {
"handle server overload and load transactions slowly" in {
pending
}
}
}
Loading…
Cancel
Save