Browse Source

server: Refactor the TransactionCollectorService

bitcoin
Alexis Hernandez 6 years ago
parent
commit
6961c764f7
  1. 135
      server/app/com/xsn/explorer/services/TransactionCollectorService.scala
  2. 197
      server/test/com/xsn/explorer/services/TransactionCollectorServiceSpec.scala

135
server/app/com/xsn/explorer/services/TransactionCollectorService.scala

@ -1,12 +1,11 @@
package com.xsn.explorer.services
import com.alexitc.playsonify.core.FutureApplicationResult
import com.alexitc.playsonify.core.FutureOr.Implicits.FutureOps
import com.alexitc.playsonify.core.{ApplicationResult, FutureApplicationResult}
import com.xsn.explorer.data.async.TransactionFutureDataHandler
import com.xsn.explorer.errors.TransactionNotFoundError
import com.xsn.explorer.models._
import com.xsn.explorer.models.values._
import com.xsn.explorer.util.Extensions.FutureOrExt
import javax.inject.Inject
import org.scalactic.{Bad, Good, One, Or}
@ -39,7 +38,7 @@ class TransactionCollectorService @Inject() (
def collect(txidList: List[TransactionId], excludedTransactions: List[TransactionId]): FutureApplicationResult[Result] = {
val futureOr = for {
rpcTransactions <- getRPCTransactions(txidList).toFutureOr
completeTransactions <- getRPCTransactionsWithValues(rpcTransactions, excludedTransactions).toFutureOr
completeTransactions <- completeValues(rpcTransactions, excludedTransactions).toFutureOr
} yield {
val result = completeTransactions.map(persisted.Transaction.fromRPC)
val contracts = result.flatMap(_._2)
@ -50,7 +49,7 @@ class TransactionCollectorService @Inject() (
futureOr.toFuture
}
private[services] def getRPCTransactionsWithValues(rpcTransactions: RPCTransactionList, excludedTransactions: List[TransactionId]): FutureApplicationResult[RPCTransactionListWithValues] = {
private[services] def completeValues(rpcTransactions: List[RPCTransaction], excludedTransactions: List[TransactionId]): FutureApplicationResult[List[RPCCompleteTransaction]] = {
val neutral: FutureApplicationResult[List[rpc.Transaction[rpc.TransactionVIN.HasValues]]] = Future.successful(Good(List.empty))
val future = rpcTransactions.foldLeft(neutral) { case (acc, tx) =>
val result = for {
@ -76,46 +75,46 @@ class TransactionCollectorService @Inject() (
* - The ones that weren't retrieved are retried sequentially using the RPC API.
*/
private[services] def getRPCTransactionVIN(vinList: List[rpc.TransactionVIN], excludedTransactions: List[TransactionId]): FutureApplicationResult[List[rpc.TransactionVIN.HasValues]] = {
val futureList = Future.sequence {
vinList
.filterNot(excludedTransactions contains _.txid)
.map { vin =>
transactionDataHandler
.getOutput(vin.txid, vin.voutIndex)
.toFutureOr
.map { output =>
vin.withValues(value = output.value, address = output.address)
}
.recoverWith(TransactionNotFoundError) {
getRPCTransactionVINWithValues(vin).toFutureOr
}
.toFuture
.map(vin -> _)
val filtered = vinList.filterNot(excludedTransactions contains _.txid)
getDBPartialVINList(filtered)
.flatMap(completeRPCVINSequentially)
}
private[services] def getDBPartialVINList(vinList: List[rpc.TransactionVIN]): Future[List[PartialTransactionVIN]] = {
val futures = for (vin <- vinList) yield {
transactionDataHandler
.getOutput(vin.txid, vin.voutIndex)
.toFutureOr
.map { output =>
vin.withValues(value = output.value, address = output.address)
}
.toFuture
.map(vin -> _)
}
val future = futureList
.flatMap { resultList =>
val neutral: FutureApplicationResult[List[rpc.TransactionVIN.HasValues]] = Future.successful(Good(List.empty))
resultList.foldLeft(neutral) {
case (acc, (vin, Bad(_))) =>
val result = for {
ready <- acc.toFutureOr
newVIN <- getRPCTransactionVINWithValues(vin).toFutureOr
} yield newVIN :: ready
Future.sequence(futures)
}
result.toFuture
private[services] def completeRPCVINSequentially(partial: List[PartialTransactionVIN]): FutureApplicationResult[List[rpc.TransactionVIN.HasValues]] = {
val neutral: FutureApplicationResult[List[rpc.TransactionVIN.HasValues]] = Future.successful(Good(List.empty))
val result = partial.foldLeft(neutral) {
case (acc, (vin, Bad(_))) =>
val result = for {
ready <- acc.toFutureOr
newVIN <- getRPCTransactionVINWithValues(vin).toFutureOr
} yield newVIN :: ready
case (acc, (_, Good(newVIN))) =>
val result = for {
ready <- acc.toFutureOr
} yield newVIN :: ready
result.toFuture
result.toFuture
}
}
case (acc, (_, Good(newVIN))) =>
val result = for {
ready <- acc.toFutureOr
} yield newVIN :: ready
future
result.toFuture
}
result
.toFutureOr
.map(_.reverse)
.toFuture
@ -126,37 +125,41 @@ class TransactionCollectorService @Inject() (
* - Try to get them all from the RPC API in parallel
* - Retry the ones that weren't retrieved in parallel by retrieving them sequentially.
*/
private[services] def getRPCTransactions(txidList: List[TransactionId]): FutureApplicationResult[RPCTransactionList] = {
val futureList = Future.sequence {
txidList.map { txid =>
for {
r <- xsnService.getTransaction(txid)
} yield txid -> r
}
private[services] def getRPCTransactions(txidList: List[TransactionId]): FutureApplicationResult[List[RPCTransaction]] = {
getPartialRPCTransactions(txidList)
.flatMap(completeRPCTransactionsSequentially)
}
private[services] def getPartialRPCTransactions(txidList: List[TransactionId]): Future[List[PartialRPCTransaction]] = {
val futures = for (txid <- txidList) yield {
for {
r <- xsnService.getTransaction(txid)
} yield txid -> r
}
val future = futureList
.flatMap { resultList =>
val neutral: FutureApplicationResult[RPCTransactionList] = Future.successful(Good(List.empty))
resultList.foldLeft(neutral) {
case (acc, (txid, Bad(_))) =>
val result = for {
ready <- acc.toFutureOr
tx <- xsnService.getTransaction(txid).toFutureOr
} yield tx :: ready
Future.sequence(futures)
}
result.toFuture
private[services] def completeRPCTransactionsSequentially(partial: List[PartialRPCTransaction]): FutureApplicationResult[List[RPCTransaction]] = {
val neutral: FutureApplicationResult[List[RPCTransaction]] = Future.successful(Good(List.empty))
val result = partial.foldLeft(neutral) {
case (acc, (txid, Bad(_))) =>
val result = for {
ready <- acc.toFutureOr
tx <- xsnService.getTransaction(txid).toFutureOr
} yield tx :: ready
case (acc, (_, Good(tx))) =>
val result = for {
ready <- acc.toFutureOr
} yield tx :: ready
result.toFuture
result.toFuture
}
}
case (acc, (_, Good(tx))) =>
val result = for {
ready <- acc.toFutureOr
} yield tx :: ready
future
result.toFuture
}
result
.toFutureOr
.map(_.reverse)
.toFuture
@ -191,6 +194,10 @@ object TransactionCollectorService {
type Result = (List[persisted.Transaction.HasIO], List[TPoSContract])
private type RPCTransactionList = List[rpc.Transaction[rpc.TransactionVIN]]
private type RPCTransactionListWithValues = List[rpc.Transaction[rpc.TransactionVIN.HasValues]]
private type RPCTransaction = rpc.Transaction[rpc.TransactionVIN]
private type RPCCompleteTransaction = rpc.Transaction[rpc.TransactionVIN.HasValues]
private type PartialTransactionVIN = (rpc.TransactionVIN, ApplicationResult[rpc.TransactionVIN.HasValues])
private type PartialRPCTransaction = (TransactionId, ApplicationResult[RPCTransaction])
}

197
server/test/com/xsn/explorer/services/TransactionCollectorServiceSpec.scala

@ -1,28 +1,194 @@
package com.xsn.explorer.services
import com.alexitc.playsonify.core.FutureApplicationResult
import com.xsn.explorer.data.TransactionBlockingDataHandler
import com.xsn.explorer.data.async.TransactionFutureDataHandler
import com.xsn.explorer.errors.TransactionNotFoundError
import com.xsn.explorer.helpers.{DataGenerator, DummyXSNService, Executors}
import com.xsn.explorer.models._
import com.xsn.explorer.models.rpc.{ScriptPubKey, Transaction, TransactionVIN}
import com.xsn.explorer.models.values._
import org.scalactic.{Bad, Good, One}
import org.scalatest.EitherValues._
import org.scalatest.MustMatchers._
import org.scalatest.WordSpec
import org.scalatest.concurrent.ScalaFutures._
import scala.concurrent.Future
class TransactionCollectorServiceSpec extends WordSpec {
lazy val service: TransactionCollectorService = ???
import Executors.globalEC
def create(
xsnService: XSNService,
transactionDataHandler: TransactionBlockingDataHandler): TransactionCollectorService = {
val futureDataHandler = new TransactionFutureDataHandler(transactionDataHandler)(Executors.databaseEC)
new TransactionCollectorService(xsnService, futureDataHandler)
}
"getRPCTransactionVINWithValues" should {
val txid = DataGenerator.randomTransactionId
val outputIndex = 1
val vin = rpc.TransactionVIN.Raw(txid, outputIndex)
val address = DataGenerator.randomAddress
"return the values" in {
pending
val expected = vin.withValues(100, address)
val xsnService = new DummyXSNService {
override def getTransaction(txid: TransactionId): FutureApplicationResult[Transaction[TransactionVIN]] = {
val output = rpc.TransactionVOUT(100, outputIndex, Some(createScript(address)))
val tx = createTransaction(txid, List(output))
Future.successful(Good(tx))
}
}
val service = create(xsnService, null)
whenReady(service.getRPCTransactionVINWithValues(vin)) { result =>
result.toEither.right.value must be(expected)
}
}
"fail when the transaction doesn't have the referenced output" in {
pending
val xsnService = new DummyXSNService {
override def getTransaction(txid: TransactionId): FutureApplicationResult[Transaction[TransactionVIN]] = {
val output = rpc.TransactionVOUT(100, 1 + outputIndex, Some(createScript(address)))
val tx = createTransaction(txid, List(output))
Future.successful(Good(tx))
}
}
val service = create(xsnService, null)
whenReady(service.getRPCTransactionVINWithValues(vin)) { result =>
result.toEither.left.value must be(One(TransactionNotFoundError))
}
}
"fail when the transaction doesn't exists" in {
pending
val xsnService = new DummyXSNService {
override def getTransaction(txid: TransactionId): FutureApplicationResult[Transaction[TransactionVIN]] = {
Future.successful(Bad(TransactionNotFoundError).accumulating)
}
}
val service = create(xsnService, null)
whenReady(service.getRPCTransactionVINWithValues(vin)) { result =>
result.toEither.left.value must be(One(TransactionNotFoundError))
}
}
}
"completeRPCTransactionsSequentially" should {
"do nothing on empty list" in {
val service = create(null, null)
whenReady(service.completeRPCTransactionsSequentially(List.empty)) { result =>
result.toEither.right.value must be(empty)
}
}
"do nothing when all transactions are loaded" in {
val input = for (_ <- 1 to 10) yield {
val txid = DataGenerator.randomTransactionId
val tx = createTransaction(txid, List.empty)
txid -> Good(tx)
}
val service = create(null, null)
whenReady(service.completeRPCTransactionsSequentially(input.toList)) { result =>
result.toEither.right.value must be(input.flatMap(_._2.toOption))
}
}
"fail when a single tx can't be completed" in {
val completed = for (_ <- 1 to 10) yield {
val txid = DataGenerator.randomTransactionId
val tx = createTransaction(txid, List.empty)
txid -> Good(tx)
}
val pending = DataGenerator.randomTransactionId -> Bad(TransactionNotFoundError).accumulating
val input = (completed.take(5).toList ::: pending :: completed.drop(5).toList).reverse
val xsnService = new DummyXSNService {
override def getTransaction(txid: TransactionId): FutureApplicationResult[Transaction[TransactionVIN]] = {
Future.successful(Bad(TransactionNotFoundError).accumulating)
}
}
val service = create(xsnService, null)
whenReady(service.completeRPCTransactionsSequentially(input)) { result =>
result.toEither.left.value must be(One(TransactionNotFoundError))
}
}
"complete the missing transactions" in {
val completed = for (_ <- 1 to 10) yield {
val txid = DataGenerator.randomTransactionId
val tx = createTransaction(txid, List.empty)
txid -> Good(tx)
}
val firstHalf = completed.take(5).toList
val secondHalf = completed.drop(5).toList
val pending1 = DataGenerator.randomTransactionId -> Bad(TransactionNotFoundError).accumulating
val pending1Tx = createTransaction(pending1._1, List.empty)
val pending2 = DataGenerator.randomTransactionId -> Bad(TransactionNotFoundError).accumulating
val pending2Tx = createTransaction(pending2._1, List.empty)
val input = firstHalf ::: List(pending1) ::: secondHalf ::: List(pending2)
val xsnService = new DummyXSNService {
override def getTransaction(txid: TransactionId): FutureApplicationResult[Transaction[TransactionVIN]] = {
if (txid == pending1._1) {
Future.successful(Good(pending1Tx))
} else if (txid == pending2._1) {
Future.successful(Good(pending2Tx))
} else {
Future.successful(Bad(TransactionNotFoundError).accumulating)
}
}
}
val service = create(xsnService, null)
whenReady(service.completeRPCTransactionsSequentially(input)) { result =>
val expected = firstHalf.flatMap(_._2.toOption) ::: List(pending1Tx) ::: secondHalf.flatMap(_._2.toOption) ::: List(pending2Tx)
result.toEither.right.value must be(expected)
}
}
}
"getRPCTransactions" should {
"work" in {
pending
"fallback to retrieving transactions sequentally" in {
val tx = createTransaction(DataGenerator.randomTransactionId, List.empty)
val pending = createTransaction(DataGenerator.randomTransactionId, List.empty)
val xsnService: XSNService = new DummyXSNService {
var ready = Set.empty[TransactionId]
override def getTransaction(txid: TransactionId): FutureApplicationResult[Transaction[TransactionVIN]] = {
if (txid == tx.id) {
Future.successful(Good(tx))
} else if (txid == pending.id) {
if (ready contains txid) {
Future.successful(Good(pending))
} else {
ready = ready + txid
Future.successful(Bad(TransactionNotFoundError).accumulating)
}
} else {
Future.successful(Bad(TransactionNotFoundError).accumulating)
}
}
}
val input = List(tx.id, pending.id)
val service = create(xsnService, null)
whenReady(service.getRPCTransactions(input)) { result =>
val expected = List(tx, pending)
result.toEither.right.value must be(expected)
}
}
}
@ -32,7 +198,7 @@ class TransactionCollectorServiceSpec extends WordSpec {
}
}
"getRPCTransactionsWithValues" should {
"completeValues" should {
"work" in {
pending
}
@ -43,4 +209,21 @@ class TransactionCollectorServiceSpec extends WordSpec {
pending
}
}
def createScript(address: Address) = {
ScriptPubKey("nulldata", "", HexString.from("00").get, List(address))
}
def createTransaction(txid: TransactionId, outputs: List[rpc.TransactionVOUT]) = {
rpc.Transaction(
id = txid,
size = Size(100),
blockhash = DataGenerator.randomBlockhash,
time = 0L,
blocktime = 0L,
confirmations = Confirmations(0),
vin = List.empty[rpc.TransactionVIN],
vout = outputs
)
}
}

Loading…
Cancel
Save