Browse Source

server: Extract the TransactionRPCService from TransactionService

prometheus-integration
Alexis Hernandez 6 years ago
parent
commit
99c0200af0
  1. 3
      server/app/com/xsn/explorer/services/LedgerSynchronizerService.scala
  2. 159
      server/app/com/xsn/explorer/services/TransactionRPCService.scala
  3. 148
      server/app/com/xsn/explorer/services/TransactionService.scala
  4. 10
      server/app/controllers/TransactionsController.scala
  5. 3
      server/test/com/xsn/explorer/services/LedgerSynchronizerServiceSpec.scala

3
server/app/com/xsn/explorer/services/LedgerSynchronizerService.scala

@ -16,6 +16,7 @@ import scala.concurrent.{ExecutionContext, Future}
class LedgerSynchronizerService @Inject() ( class LedgerSynchronizerService @Inject() (
xsnService: XSNService, xsnService: XSNService,
transactionService: TransactionService, transactionService: TransactionService,
transactionRPCService: TransactionRPCService,
ledgerDataHandler: LedgerFutureDataHandler, ledgerDataHandler: LedgerFutureDataHandler,
blockDataHandler: BlockFutureDataHandler)( blockDataHandler: BlockFutureDataHandler)(
implicit ec: ExecutionContext) { implicit ec: ExecutionContext) {
@ -160,7 +161,7 @@ class LedgerSynchronizerService @Inject() (
val start = System.currentTimeMillis() val start = System.currentTimeMillis()
val result = for { val result = for {
block <- xsnService.getBlock(blockhash).toFutureOr block <- xsnService.getBlock(blockhash).toFutureOr
transactions <- transactionService.getTransactions(block.transactions).toFutureOr transactions <- transactionRPCService.getTransactions(block.transactions).toFutureOr
took = System.currentTimeMillis() - start took = System.currentTimeMillis() - start
_ = logger.info(s"Retrieving block = $blockhash, took $took ms") _ = logger.info(s"Retrieving block = $blockhash, took $took ms")
} yield (block, transactions) } yield (block, transactions)

159
server/app/com/xsn/explorer/services/TransactionRPCService.scala

@ -0,0 +1,159 @@
package com.xsn.explorer.services
import com.alexitc.playsonify.core.FutureOr.Implicits.{FutureListOps, FutureOps, OrOps}
import com.alexitc.playsonify.core.{FutureApplicationResult, FutureOr}
import com.xsn.explorer.errors.{InvalidRawTransactionError, TransactionFormatError, TransactionNotFoundError, XSNWorkQueueDepthExceeded}
import com.xsn.explorer.models.rpc.TransactionVIN
import com.xsn.explorer.models.{HexString, Transaction, TransactionDetails, TransactionId, TransactionValue}
import com.xsn.explorer.util.Extensions.FutureOrExt
import javax.inject.Inject
import org.scalactic.{Bad, Good, One, Or}
import org.slf4j.LoggerFactory
import play.api.libs.json.{JsObject, JsString, JsValue}
import scala.concurrent.{ExecutionContext, Future}
import scala.util.control.NonFatal
class TransactionRPCService @Inject() (
xsnService: XSNService)(
implicit ec: ExecutionContext) {
private val logger = LoggerFactory.getLogger(this.getClass)
def getRawTransaction(txidString: String): FutureApplicationResult[JsValue] = {
val result = for {
txid <- {
val maybe = TransactionId.from(txidString)
Or.from(maybe, One(TransactionFormatError)).toFutureOr
}
transaction <- xsnService.getRawTransaction(txid).toFutureOr
} yield transaction
result.toFuture
}
def getTransactionDetails(txidString: String): FutureApplicationResult[TransactionDetails] = {
val result = for {
txid <- {
val maybe = TransactionId.from(txidString)
Or.from(maybe, One(TransactionFormatError)).toFutureOr
}
transaction <- xsnService.getTransaction(txid).toFutureOr
input <- transaction
.vin
.map(getTransactionValue)
.toFutureOr
} yield TransactionDetails.from(transaction, input)
result.toFuture
}
def getTransaction(txid: TransactionId): FutureApplicationResult[Transaction] = {
val result = for {
tx <- xsnService.getTransaction(txid).toFutureOr
transactionVIN <- getTransactionVIN(tx.vin).toFutureOr
rpcTransaction = tx.copy(vin = transactionVIN)
} yield Transaction.fromRPC(rpcTransaction)
result.toFuture
}
private def getTransactionVIN(list: List[TransactionVIN]): FutureApplicationResult[List[TransactionVIN]] = {
def getVIN(vin: TransactionVIN) = {
getTransactionValue(vin)
.map {
case Good(transactionValue) =>
val newVIN = vin.copy(address = Some(transactionValue.address), value = Some(transactionValue.value))
Good(newVIN)
case Bad(e) => Bad(e)
}
}
def loadVINSequentially(pending: List[TransactionVIN]): FutureOr[List[TransactionVIN]] = pending match {
case x :: xs =>
for {
tx <- getVIN(x).toFutureOr
next <- loadVINSequentially(xs)
} yield tx :: next
case _ => Future.successful(Good(List.empty)).toFutureOr
}
list
.map(getVIN)
.toFutureOr
.toFuture
.recoverWith {
case NonFatal(ex) =>
logger.warn(s"Failed to load VIN, trying sequentially, error = ${ex.getMessage}")
loadVINSequentially(list).toFuture
}
}
def getTransactions(ids: List[TransactionId]): FutureApplicationResult[List[Transaction]] = {
def loadTransactionsSlowly(pending: List[TransactionId]): FutureOr[List[Transaction]] = pending match {
case x :: xs =>
for {
tx <- getTransaction(x).toFutureOr
next <- loadTransactionsSlowly(xs)
} yield tx :: next
case _ => Future.successful(Good(List.empty)).toFutureOr
}
ids
.map(getTransaction)
.toFutureOr
.recoverWith(XSNWorkQueueDepthExceeded) {
logger.warn("Unable to load transaction due to server overload, loading them slowly")
loadTransactionsSlowly(ids)
}
.toFuture
.recoverWith {
case NonFatal(ex) =>
logger.warn(s"Unable to load transactions due to server error, loading them sequentially, error = ${ex.getMessage}")
loadTransactionsSlowly(ids).toFuture
}
}
def sendRawTransaction(hexString: String): FutureApplicationResult[JsValue] = {
val result = for {
hex <- Or.from(HexString.from(hexString), One(InvalidRawTransactionError)).toFutureOr
_ <- xsnService.sendRawTransaction(hex).toFutureOr
} yield JsObject.empty + ("hex" -> JsString(hex.string))
result.toFuture
}
private def getTransactionValue(vin: TransactionVIN): FutureApplicationResult[TransactionValue] = {
val valueMaybe = for {
value <- vin.value
address <- vin.address
} yield TransactionValue(address, value)
valueMaybe
.map(Good(_))
.map(Future.successful)
.getOrElse {
val txid = vin.txid
val result = for {
tx <- xsnService.getTransaction(txid).toFutureOr
r <- {
val maybe = tx
.vout
.find(_.n == vin.voutIndex)
.flatMap(TransactionValue.from)
Or.from(maybe, One(TransactionNotFoundError)).toFutureOr
}
} yield r
result.toFuture
}
}
}

148
server/app/com/xsn/explorer/services/TransactionService.scala

@ -1,29 +1,24 @@
package com.xsn.explorer.services package com.xsn.explorer.services
import com.alexitc.playsonify.core.FutureOr.Implicits.{FutureListOps, FutureOps, OrOps} import com.alexitc.playsonify.core.FutureOr.Implicits.{FutureOps, OrOps}
import com.alexitc.playsonify.core.{FutureApplicationResult, FutureOr, FuturePaginatedResult} import com.alexitc.playsonify.core.{FutureApplicationResult, FuturePaginatedResult}
import com.alexitc.playsonify.models.ordering.{OrderingCondition, OrderingError, OrderingQuery} import com.alexitc.playsonify.models.ordering.{OrderingCondition, OrderingError, OrderingQuery}
import com.alexitc.playsonify.models.pagination.{Limit, Offset, PaginatedQuery} import com.alexitc.playsonify.models.pagination.{Limit, Offset, PaginatedQuery}
import com.alexitc.playsonify.validators.PaginatedQueryValidator import com.alexitc.playsonify.validators.PaginatedQueryValidator
import com.xsn.explorer.data.async.TransactionFutureDataHandler import com.xsn.explorer.data.async.TransactionFutureDataHandler
import com.xsn.explorer.errors._ import com.xsn.explorer.errors._
import com.xsn.explorer.models._ import com.xsn.explorer.models._
import com.xsn.explorer.models.rpc.TransactionVIN
import com.xsn.explorer.parsers.TransactionOrderingParser import com.xsn.explorer.parsers.TransactionOrderingParser
import com.xsn.explorer.util.Extensions.FutureOrExt
import io.scalaland.chimney.dsl._ import io.scalaland.chimney.dsl._
import javax.inject.Inject import javax.inject.Inject
import org.scalactic._ import org.scalactic._
import org.slf4j.LoggerFactory import org.slf4j.LoggerFactory
import play.api.libs.json.{JsObject, JsString, JsValue}
import scala.concurrent.{ExecutionContext, Future} import scala.concurrent.ExecutionContext
import scala.util.control.NonFatal
class TransactionService @Inject() ( class TransactionService @Inject() (
paginatedQueryValidator: PaginatedQueryValidator, paginatedQueryValidator: PaginatedQueryValidator,
transactionOrderingParser: TransactionOrderingParser, transactionOrderingParser: TransactionOrderingParser,
xsnService: XSNService,
transactionFutureDataHandler: TransactionFutureDataHandler)( transactionFutureDataHandler: TransactionFutureDataHandler)(
implicit ec: ExecutionContext) { implicit ec: ExecutionContext) {
@ -31,106 +26,6 @@ class TransactionService @Inject() (
private val maxTransactionsPerQuery = 100 private val maxTransactionsPerQuery = 100
def getRawTransaction(txidString: String): FutureApplicationResult[JsValue] = {
val result = for {
txid <- {
val maybe = TransactionId.from(txidString)
Or.from(maybe, One(TransactionFormatError)).toFutureOr
}
transaction <- xsnService.getRawTransaction(txid).toFutureOr
} yield transaction
result.toFuture
}
def getTransactionDetails(txidString: String): FutureApplicationResult[TransactionDetails] = {
val result = for {
txid <- {
val maybe = TransactionId.from(txidString)
Or.from(maybe, One(TransactionFormatError)).toFutureOr
}
transaction <- xsnService.getTransaction(txid).toFutureOr
input <- transaction
.vin
.map(getTransactionValue)
.toFutureOr
} yield TransactionDetails.from(transaction, input)
result.toFuture
}
def getTransaction(txid: TransactionId): FutureApplicationResult[Transaction] = {
val result = for {
tx <- xsnService.getTransaction(txid).toFutureOr
transactionVIN <- getTransactionVIN(tx.vin).toFutureOr
rpcTransaction = tx.copy(vin = transactionVIN)
} yield Transaction.fromRPC(rpcTransaction)
result.toFuture
}
private def getTransactionVIN(list: List[TransactionVIN]): FutureApplicationResult[List[TransactionVIN]] = {
def getVIN(vin: TransactionVIN) = {
getTransactionValue(vin)
.map {
case Good(transactionValue) =>
val newVIN = vin.copy(address = Some(transactionValue.address), value = Some(transactionValue.value))
Good(newVIN)
case Bad(e) => Bad(e)
}
}
def loadVINSequentially(pending: List[TransactionVIN]): FutureOr[List[TransactionVIN]] = pending match {
case x :: xs =>
for {
tx <- getVIN(x).toFutureOr
next <- loadVINSequentially(xs)
} yield tx :: next
case _ => Future.successful(Good(List.empty)).toFutureOr
}
list
.map(getVIN)
.toFutureOr
.toFuture
.recoverWith {
case NonFatal(ex) =>
logger.warn(s"Failed to load VIN, trying sequentially, error = ${ex.getMessage}")
loadVINSequentially(list).toFuture
}
}
def getTransactions(ids: List[TransactionId]): FutureApplicationResult[List[Transaction]] = {
def loadTransactionsSlowly(pending: List[TransactionId]): FutureOr[List[Transaction]] = pending match {
case x :: xs =>
for {
tx <- getTransaction(x).toFutureOr
next <- loadTransactionsSlowly(xs)
} yield tx :: next
case _ => Future.successful(Good(List.empty)).toFutureOr
}
ids
.map(getTransaction)
.toFutureOr
.recoverWith(XSNWorkQueueDepthExceeded) {
logger.warn("Unable to load transaction due to server overload, loading them slowly")
loadTransactionsSlowly(ids)
}
.toFuture
.recoverWith {
case NonFatal(ex) =>
logger.warn(s"Unable to load transactions due to server error, loading them sequentially, error = ${ex.getMessage}")
loadTransactionsSlowly(ids).toFuture
}
}
def getTransactions( def getTransactions(
addressString: String, addressString: String,
paginatedQuery: PaginatedQuery, paginatedQuery: PaginatedQuery,
@ -201,15 +96,6 @@ class TransactionService @Inject() (
result.toFuture result.toFuture
} }
def sendRawTransaction(hexString: String): FutureApplicationResult[JsValue] = {
val result = for {
hex <- Or.from(HexString.from(hexString), One(InvalidRawTransactionError)).toFutureOr
_ <- xsnService.sendRawTransaction(hex).toFutureOr
} yield JsObject.empty + ("hex" -> JsString(hex.string))
result.toFuture
}
def getByBlockhash(blockhashString: String, paginatedQuery: PaginatedQuery, orderingQuery: OrderingQuery): FuturePaginatedResult[TransactionWithValues] = { def getByBlockhash(blockhashString: String, paginatedQuery: PaginatedQuery, orderingQuery: OrderingQuery): FuturePaginatedResult[TransactionWithValues] = {
val result = for { val result = for {
blockhash <- Or.from(Blockhash.from(blockhashString), One(BlockhashFormatError)).toFutureOr blockhash <- Or.from(Blockhash.from(blockhashString), One(BlockhashFormatError)).toFutureOr
@ -240,34 +126,6 @@ class TransactionService @Inject() (
result.toFuture result.toFuture
} }
private def getTransactionValue(vin: TransactionVIN): FutureApplicationResult[TransactionValue] = {
val valueMaybe = for {
value <- vin.value
address <- vin.address
} yield TransactionValue(address, value)
valueMaybe
.map(Good(_))
.map(Future.successful)
.getOrElse {
val txid = vin.txid
val result = for {
tx <- xsnService.getTransaction(txid).toFutureOr
r <- {
val maybe = tx
.vout
.find(_.n == vin.voutIndex)
.flatMap(TransactionValue.from)
Or.from(maybe, One(TransactionNotFoundError)).toFutureOr
}
} yield r
result.toFuture
}
}
/** TODO: Move to another file */ /** TODO: Move to another file */
private def getOrderingConditionResult(unsafeOrderingCondition: String) = { private def getOrderingConditionResult(unsafeOrderingCondition: String) = {
val maybe = parseOrderingCondition(unsafeOrderingCondition) val maybe = parseOrderingCondition(unsafeOrderingCondition)

10
server/app/controllers/TransactionsController.scala

@ -1,26 +1,26 @@
package controllers package controllers
import com.xsn.explorer.models.request.SendRawTransactionRequest import com.xsn.explorer.models.request.SendRawTransactionRequest
import com.xsn.explorer.services.TransactionService import com.xsn.explorer.services.TransactionRPCService
import controllers.common.{MyJsonController, MyJsonControllerComponents} import controllers.common.{MyJsonController, MyJsonControllerComponents}
import javax.inject.Inject import javax.inject.Inject
class TransactionsController @Inject() ( class TransactionsController @Inject() (
transactionService: TransactionService, transactionRPCService: TransactionRPCService,
cc: MyJsonControllerComponents) cc: MyJsonControllerComponents)
extends MyJsonController(cc) { extends MyJsonController(cc) {
import Context._ import Context._
def getTransaction(txid: String) = public { _ => def getTransaction(txid: String) = public { _ =>
transactionService.getTransactionDetails(txid) transactionRPCService.getTransactionDetails(txid)
} }
def getRawTransaction(txid: String) = public { _ => def getRawTransaction(txid: String) = public { _ =>
transactionService.getRawTransaction(txid) transactionRPCService.getRawTransaction(txid)
} }
def sendRawTransaction() = publicInput { ctx: HasModel[SendRawTransactionRequest] => def sendRawTransaction() = publicInput { ctx: HasModel[SendRawTransactionRequest] =>
transactionService.sendRawTransaction(ctx.model.hex) transactionRPCService.sendRawTransaction(ctx.model.hex)
} }
} }

3
server/test/com/xsn/explorer/services/LedgerSynchronizerServiceSpec.scala

@ -221,12 +221,13 @@ class LedgerSynchronizerServiceSpec extends PostgresDataHandlerSpec with BeforeA
val transactionService = new TransactionService( val transactionService = new TransactionService(
new PaginatedQueryValidator, new PaginatedQueryValidator,
new TransactionOrderingParser, new TransactionOrderingParser,
xsnService,
new TransactionFutureDataHandler(transactionDataHandler)(Executors.databaseEC)) new TransactionFutureDataHandler(transactionDataHandler)(Executors.databaseEC))
val transactionRPCService = new TransactionRPCService(xsnService)
new LedgerSynchronizerService( new LedgerSynchronizerService(
xsnService, xsnService,
transactionService, transactionService,
transactionRPCService,
new LedgerFutureDataHandler(dataHandler)(Executors.databaseEC), new LedgerFutureDataHandler(dataHandler)(Executors.databaseEC),
new BlockFutureDataHandler(blockDataHandler)(Executors.databaseEC)) new BlockFutureDataHandler(blockDataHandler)(Executors.databaseEC))
} }

Loading…
Cancel
Save