Browse Source

server: Split the persisted transaction model

There are two models now, the one having inputs/outputs
and the one without them.
master
Alexis Hernandez 6 years ago
parent
commit
aef41d5f8e
  1. 2
      server/app/com/xsn/explorer/data/LedgerDataHandler.scala
  2. 2
      server/app/com/xsn/explorer/data/TransactionDataHandler.scala
  3. 10
      server/app/com/xsn/explorer/data/anorm/LedgerPostgresDataHandler.scala
  4. 2
      server/app/com/xsn/explorer/data/anorm/TransactionPostgresDataHandler.scala
  5. 2
      server/app/com/xsn/explorer/data/anorm/dao/AddressTransactionDetailsPostgresDAO.scala
  6. 26
      server/app/com/xsn/explorer/data/anorm/dao/TransactionPostgresDAO.scala
  7. 2
      server/app/com/xsn/explorer/data/anorm/parsers/TransactionParsers.scala
  8. 4
      server/app/com/xsn/explorer/data/async/LedgerFutureDataHandler.scala
  9. 2
      server/app/com/xsn/explorer/data/async/TransactionFutureDataHandler.scala
  10. 40
      server/app/com/xsn/explorer/models/persisted/Transaction.scala
  11. 2
      server/app/com/xsn/explorer/models/rpc/Block.scala
  12. 8
      server/app/com/xsn/explorer/services/LedgerSynchronizerService.scala
  13. 6
      server/app/com/xsn/explorer/services/TransactionRPCService.scala
  14. 2
      server/app/com/xsn/explorer/services/TransactionService.scala
  15. 34
      server/test/com/xsn/explorer/data/TransactionPostgresDataHandlerSpec.scala
  16. 15
      server/test/com/xsn/explorer/helpers/DataGenerator.scala
  17. 2
      server/test/com/xsn/explorer/helpers/LedgerHelper.scala
  18. 2
      server/test/com/xsn/explorer/helpers/TransactionDummyDataHandler.scala

2
server/app/com/xsn/explorer/data/LedgerDataHandler.scala

@ -16,7 +16,7 @@ trait LedgerDataHandler[F[_]] {
* - The ledger is empty and the block is the genesis one.
* - The ledger has some blocks and the block goes just after the latest one.
*/
def push(block: Block, transactions: List[Transaction]): F[Unit]
def push(block: Block, transactions: List[Transaction.HasIO]): F[Unit]
/**
* Remove the latest block from the ledger, it will succeed only if the ledger is not empty.

2
server/app/com/xsn/explorer/data/TransactionDataHandler.scala

@ -17,7 +17,7 @@ trait TransactionDataHandler[F[_]] {
paginatedQuery: PaginatedQuery,
ordering: FieldOrdering[TransactionField]): F[PaginatedResult[TransactionWithValues]]
def getBy(address: Address, limit: Limit, lastSeenTxid: Option[TransactionId], orderingCondition: OrderingCondition): F[List[Transaction]]
def getBy(address: Address, limit: Limit, lastSeenTxid: Option[TransactionId], orderingCondition: OrderingCondition): F[List[Transaction.HasIO]]
def getUnspentOutputs(address: Address): F[List[Transaction.Output]]

10
server/app/com/xsn/explorer/data/anorm/LedgerPostgresDataHandler.scala

@ -32,7 +32,7 @@ class LedgerPostgresDataHandler @Inject() (
*/
override def push(
block: Block,
transactions: List[Transaction]): ApplicationResult[Unit] = {
transactions: List[Transaction.HasIO]): ApplicationResult[Unit] = {
val result = withTransaction { implicit conn =>
val result = for {
@ -67,7 +67,7 @@ class LedgerPostgresDataHandler @Inject() (
.getOrElse(throw new RuntimeException("Unable to pop block"))
}
private def upsertBlockCascade(block: Block, transactions: List[Transaction])(implicit conn: Connection): Option[Unit] = {
private def upsertBlockCascade(block: Block, transactions: List[Transaction.HasIO])(implicit conn: Connection): Option[Unit] = {
val result = for {
// block
_ <- deleteBlockCascade(block).orElse(Some(()))
@ -119,7 +119,7 @@ class LedgerPostgresDataHandler @Inject() (
balanceList.map { b => balancePostgresDAO.upsert(b) }
}
private def spendMap(transactions: List[Transaction]): Map[Address, BigDecimal] = {
private def spendMap(transactions: List[Transaction.HasIO]): Map[Address, BigDecimal] = {
transactions
.map(_.inputs)
.flatMap { inputs =>
@ -129,7 +129,7 @@ class LedgerPostgresDataHandler @Inject() (
.mapValues { list => list.map(_._2).sum }
}
private def receiveMap(transactions: List[Transaction]): Map[Address, BigDecimal] = {
private def receiveMap(transactions: List[Transaction.HasIO]): Map[Address, BigDecimal] = {
transactions
.map(_.outputs)
.flatMap { outputs =>
@ -141,7 +141,7 @@ class LedgerPostgresDataHandler @Inject() (
.mapValues { list => list.map(_._2).sum }
}
private def balances(transactions: List[Transaction]) = {
private def balances(transactions: List[Transaction.HasIO]) = {
val spentList = spendMap(transactions).map { case (address, spent) =>
Balance(address, spent = spent)
}

2
server/app/com/xsn/explorer/data/anorm/TransactionPostgresDataHandler.scala

@ -36,7 +36,7 @@ class TransactionPostgresDataHandler @Inject() (
address: Address,
limit: Limit,
lastSeenTxid: Option[TransactionId],
orderingCondition: OrderingCondition): ApplicationResult[List[Transaction]] = withConnection { implicit conn =>
orderingCondition: OrderingCondition): ApplicationResult[List[Transaction.HasIO]] = withConnection { implicit conn =>
val transactions = lastSeenTxid
.map { transactionPostgresDAO.getBy(address, _, limit, orderingCondition) }

2
server/app/com/xsn/explorer/data/anorm/dao/AddressTransactionDetailsPostgresDAO.scala

@ -9,7 +9,7 @@ import com.xsn.explorer.models.values.TransactionId
class AddressTransactionDetailsPostgresDAO {
def batchInsertDetails(transaction: Transaction)(implicit conn: Connection): Option[Unit] = {
def batchInsertDetails(transaction: Transaction.HasIO)(implicit conn: Connection): Option[Unit] = {
val received = transaction
.outputs
.groupBy(_.address)

26
server/app/com/xsn/explorer/data/anorm/dao/TransactionPostgresDAO.scala

@ -25,19 +25,19 @@ class TransactionPostgresDAO @Inject() (
/**
* NOTE: Ensure the connection has an open transaction.
*/
def upsert(index: Int, transaction: Transaction)(implicit conn: Connection): Option[Transaction] = {
def upsert(index: Int, transaction: Transaction.HasIO)(implicit conn: Connection): Option[Transaction.HasIO] = {
for {
partialTx <- upsertTransaction(index, transaction)
partialTx <- upsertTransaction(index, transaction.transaction)
_ <- transactionOutputDAO.batchInsertOutputs(transaction.outputs)
_ <- transactionInputDAO.batchInsertInputs(transaction.inputs.map(transaction.id -> _))
_ <- transactionOutputDAO.batchSpend(transaction.id, transaction.inputs)
_ <- addressTransactionDetailsDAO.batchInsertDetails(transaction)
} yield partialTx.copy(inputs = transaction.inputs, outputs = transaction.outputs)
} yield Transaction.HasIO(partialTx, inputs = transaction.inputs, outputs = transaction.outputs)
}
def insert(transactions: List[Transaction])(implicit conn: Connection): Option[List[Transaction]] = {
def insert(transactions: List[Transaction.HasIO])(implicit conn: Connection): Option[List[Transaction]] = {
for {
r <- batchInsert(transactions)
r <- batchInsert(transactions.map(_.transaction))
outputs = transactions.flatMap(_.outputs)
_ <- transactionOutputDAO.batchInsertOutputs(outputs)
@ -51,13 +51,13 @@ class TransactionPostgresDAO @Inject() (
}
}
private def insertDetails(transactions: List[Transaction])(implicit conn: Connection): Unit = {
private def insertDetails(transactions: List[Transaction.HasIO])(implicit conn: Connection): Unit = {
val detailsResult = transactions.map(addressTransactionDetailsDAO.batchInsertDetails)
assert(detailsResult.forall(_.isDefined), "Inserting address details batch failed")
}
private def spend(transactions: List[Transaction])(implicit conn: Connection): Unit = {
private def spend(transactions: List[Transaction.HasIO])(implicit conn: Connection): Unit = {
val spendResult = transactions.map { tx => transactionOutputDAO.batchSpend(tx.id, tx.inputs) }
assert(spendResult.forall(_.isDefined), "Spending inputs batch failed")
@ -99,7 +99,7 @@ class TransactionPostgresDAO @Inject() (
/**
* NOTE: Ensure the connection has an open transaction.
*/
def deleteBy(blockhash: Blockhash)(implicit conn: Connection): List[Transaction] = {
def deleteBy(blockhash: Blockhash)(implicit conn: Connection): List[Transaction.HasIO] = {
val expectedTransactions = SQL(
"""
|SELECT txid, blockhash, time, size
@ -116,7 +116,7 @@ class TransactionPostgresDAO @Inject() (
val outputs = transactionOutputDAO.deleteOutputs(tx.id)
val _ = addressTransactionDetailsDAO.deleteDetails(tx.id)
tx.copy(inputs = inputs, outputs = outputs)
Transaction.HasIO(tx, inputs = inputs, outputs = outputs)
}
val deletedTransactions = SQL(
@ -138,7 +138,7 @@ class TransactionPostgresDAO @Inject() (
/**
* Get the transactions by the given address (sorted by time).
*/
def getBy(address: Address, limit: Limit, orderingCondition: OrderingCondition)(implicit conn: Connection): List[Transaction] = {
def getBy(address: Address, limit: Limit, orderingCondition: OrderingCondition)(implicit conn: Connection): List[Transaction.HasIO] = {
val order = toSQL(orderingCondition)
val transactions = SQL(
@ -159,7 +159,7 @@ class TransactionPostgresDAO @Inject() (
} yield {
val inputs = transactionInputDAO.getInputs(tx.id, address)
val outputs = transactionOutputDAO.getOutputs(tx.id, address)
tx.copy(inputs = inputs, outputs = outputs)
Transaction.HasIO(tx, inputs = inputs, outputs = outputs)
}
}
@ -174,7 +174,7 @@ class TransactionPostgresDAO @Inject() (
lastSeenTxid: TransactionId,
limit: Limit,
orderingCondition: OrderingCondition)(
implicit conn: Connection): List[Transaction] = {
implicit conn: Connection): List[Transaction.HasIO] = {
val order = toSQL(orderingCondition)
val comparator = orderingCondition match {
@ -208,7 +208,7 @@ class TransactionPostgresDAO @Inject() (
} yield {
val inputs = transactionInputDAO.getInputs(tx.id, address)
val outputs = transactionOutputDAO.getOutputs(tx.id, address)
tx.copy(inputs = inputs, outputs = outputs)
Transaction.HasIO(tx, inputs = inputs, outputs = outputs)
}
}

2
server/app/com/xsn/explorer/data/anorm/parsers/TransactionParsers.scala

@ -28,7 +28,7 @@ object TransactionParsers {
case txidMaybe ~ blockhash ~ time ~ size =>
for {
txid <- txidMaybe
} yield Transaction(txid, blockhash, time, size, List.empty, List.empty)
} yield Transaction(txid, blockhash, time, size)
}
val parseTransactionWithValues = (

4
server/app/com/xsn/explorer/data/async/LedgerFutureDataHandler.scala

@ -1,10 +1,10 @@
package com.xsn.explorer.data.async
import javax.inject.Inject
import com.alexitc.playsonify.core.FutureApplicationResult
import com.xsn.explorer.data.{LedgerBlockingDataHandler, LedgerDataHandler}
import com.xsn.explorer.executors.DatabaseExecutionContext
import com.xsn.explorer.models.persisted.{Block, Transaction}
import javax.inject.Inject
import scala.concurrent.Future
@ -13,7 +13,7 @@ class LedgerFutureDataHandler @Inject() (
implicit ec: DatabaseExecutionContext)
extends LedgerDataHandler[FutureApplicationResult] {
override def push(block: Block, transactions: List[Transaction]): FutureApplicationResult[Unit] = Future {
override def push(block: Block, transactions: List[Transaction.HasIO]): FutureApplicationResult[Unit] = Future {
blockingDataHandler.push(block, transactions)
}

2
server/app/com/xsn/explorer/data/async/TransactionFutureDataHandler.scala

@ -30,7 +30,7 @@ class TransactionFutureDataHandler @Inject() (
address: Address,
limit: Limit,
lastSeenTxid: Option[TransactionId],
orderingCondition: OrderingCondition): FutureApplicationResult[List[Transaction]] = Future {
orderingCondition: OrderingCondition): FutureApplicationResult[List[Transaction.HasIO]] = Future {
blockingDataHandler.getBy(address, limit, lastSeenTxid, orderingCondition)
}

40
server/app/com/xsn/explorer/models/persisted/Transaction.scala

@ -1,21 +1,13 @@
package com.xsn.explorer.models.persisted
import com.xsn.explorer.models.values._
import com.xsn.explorer.models.rpc
import com.xsn.explorer.models.values._
case class Transaction(
id: TransactionId,
blockhash: Blockhash,
time: Long,
size: Size,
inputs: List[Transaction.Input],
outputs: List[Transaction.Output]) {
require(
outputs.forall(_.txid == id),
"There are outputs that having a different txid"
)
}
size: Size)
object Transaction {
@ -38,13 +30,29 @@ object Transaction {
tposOwnerAddress: Option[Address],
tposMerchantAddress: Option[Address])
case class HasIO(
transaction: Transaction,
inputs: List[Transaction.Input],
outputs: List[Transaction.Output]) {
require(
outputs.forall(_.txid == transaction.id),
"There are outputs that having a different txid"
)
def id: TransactionId = transaction.id
def time: Long = transaction.time
def blockhash: Blockhash = transaction.blockhash
def size: Size = transaction.size
}
/**
* Please note that the inputs might not be accurate.
*
* If the rpc transaction might not be complete, get the input value and address using
* the utxo index or the getTransaction method from the TransactionService..
* the utxo index or the getTransaction method from the TransactionService.
*/
def fromRPC(tx: rpc.Transaction): Transaction = {
def fromRPC(tx: rpc.Transaction): HasIO = {
val inputs = tx.vin.zipWithIndex.flatMap { case (vin, index) =>
for {
value <- vin.value
@ -61,13 +69,13 @@ object Transaction {
} yield Transaction.Output(tx.id, vout.n, vout.value, address, script, tposAddresses.map(_._1), tposAddresses.map(_._2))
}
Transaction(
val transaction = Transaction(
id = tx.id,
blockhash = tx.blockhash,
time = tx.time,
size = tx.size,
inputs = inputs,
outputs = outputs
size = tx.size
)
HasIO(transaction, inputs, outputs)
}
}

2
server/app/com/xsn/explorer/models/rpc/Block.scala

@ -1,6 +1,5 @@
package com.xsn.explorer.models.rpc
import com.xsn.explorer.models._
import com.xsn.explorer.models.values._
import play.api.libs.functional.syntax._
import play.api.libs.json._
@ -46,6 +45,7 @@ case class Block(
}
object Block {
implicit val reads: Reads[Block] = {
val builder = (__ \ 'hash).read[Blockhash] and
(__ \ 'previousblockhash).readNullable[Blockhash] and

8
server/app/com/xsn/explorer/services/LedgerSynchronizerService.scala

@ -41,7 +41,7 @@ class LedgerSynchronizerService @Inject() (
result.toFuture
}
private def synchronize(block: Block, transactions: List[Transaction]): FutureApplicationResult[Unit] = {
private def synchronize(block: Block, transactions: List[Transaction.HasIO]): FutureApplicationResult[Unit] = {
logger.info(s"Synchronize block ${block.height}, hash = ${block.hash}")
val result = for {
@ -65,7 +65,7 @@ class LedgerSynchronizerService @Inject() (
* 1.1. the given block is the genensis block, it is added.
* 1.2. the given block is not the genesis block, sync everything until the given block.
*/
private def onEmptyLedger(block: Block, transactions: List[Transaction]): FutureApplicationResult[Unit] = {
private def onEmptyLedger(block: Block, transactions: List[Transaction.HasIO]): FutureApplicationResult[Unit] = {
if (block.height.int == 0) {
logger.info(s"Synchronize genesis block on empty ledger, hash = ${block.hash}")
ledgerDataHandler.push(block, transactions)
@ -88,7 +88,7 @@ class LedgerSynchronizerService @Inject() (
* 2.4. if H <= N, if the hash already exists, it is ignored.
* 2.5. if H <= N, if the hash doesn't exists, remove blocks from N to H (included), then, add the new H.
*/
private def onLatestBlock(ledgerBlock: Block, newBlock: Block, newTransactions: List[Transaction]): FutureApplicationResult[Unit] = {
private def onLatestBlock(ledgerBlock: Block, newBlock: Block, newTransactions: List[Transaction.HasIO]): FutureApplicationResult[Unit] = {
if (ledgerBlock.height.int + 1 == newBlock.height.int &&
newBlock.previousBlockhash.contains(ledgerBlock.hash)) {
@ -158,7 +158,7 @@ class LedgerSynchronizerService @Inject() (
}
}
private def getRPCBlock(blockhash: Blockhash): FutureApplicationResult[(Block, List[Transaction])] = {
private def getRPCBlock(blockhash: Blockhash): FutureApplicationResult[(Block, List[Transaction.HasIO])] = {
val result = for {
rpcBlock <- xsnService.getBlock(blockhash).toFutureOr
transactions <- transactionRPCService.getTransactions(rpcBlock.transactions).toFutureOr

6
server/app/com/xsn/explorer/services/TransactionRPCService.scala

@ -53,7 +53,7 @@ class TransactionRPCService @Inject() (
result.toFuture
}
def getTransaction(txid: TransactionId): FutureApplicationResult[Transaction] = {
def getTransaction(txid: TransactionId): FutureApplicationResult[Transaction.HasIO] = {
val result = for {
tx <- xsnService.getTransaction(txid).toFutureOr
transactionVIN <- getTransactionVIN(tx.vin).toFutureOr
@ -96,8 +96,8 @@ class TransactionRPCService @Inject() (
}
}
def getTransactions(ids: List[TransactionId]): FutureApplicationResult[List[Transaction]] = {
def loadTransactionsSlowly(pending: List[TransactionId]): FutureOr[List[Transaction]] = pending match {
def getTransactions(ids: List[TransactionId]): FutureApplicationResult[List[Transaction.HasIO]] = {
def loadTransactionsSlowly(pending: List[TransactionId]): FutureOr[List[Transaction.HasIO]] = pending match {
case x :: xs =>
for {
tx <- getTransaction(x).toFutureOr

2
server/app/com/xsn/explorer/services/TransactionService.scala

@ -84,7 +84,7 @@ class TransactionService @Inject() (
output.into[LightWalletTransaction.Output].transform
}
tx
tx.transaction
.into[LightWalletTransaction]
.withFieldConst(_.inputs, inputs)
.withFieldConst(_.outputs, outputs)

34
server/test/com/xsn/explorer/data/TransactionPostgresDataHandlerSpec.scala

@ -123,7 +123,7 @@ class TransactionPostgresDataHandlerSpec extends PostgresDataHandlerSpec with Be
result.data.size mustEqual transactions.size
}
def testOrdering[B](field: TransactionField, orderingCondition: OrderingCondition)(lt: (Transaction, Transaction) => Boolean) = {
def testOrdering[B](field: TransactionField, orderingCondition: OrderingCondition)(lt: (Transaction.HasIO, Transaction.HasIO) => Boolean) = {
createBlock(block, transactions)
val ordering = FieldOrdering(field, orderingCondition)
@ -212,11 +212,12 @@ class TransactionPostgresDataHandlerSpec extends PostgresDataHandlerSpec with Be
)
val transactions = List.fill(4)(randomTransactionId).zip(List(321L, 320L, 319L, 319L)).map { case (txid, time) =>
Transaction(
txid,
blockhash,
time,
Size(1000),
Transaction.HasIO(
Transaction(
txid,
blockhash,
time,
Size(1000)),
inputs,
outputs.map(_.copy(txid = txid)))
}
@ -246,7 +247,7 @@ class TransactionPostgresDataHandlerSpec extends PostgresDataHandlerSpec with Be
}
}
def matchOnlyData(expected: Transaction, actual: Transaction) = {
def matchOnlyData(expected: Transaction.HasIO, actual: Transaction.HasIO) = {
actual.copy(inputs = List.empty, outputs = List.empty) mustEqual expected.copy(inputs = List.empty, outputs = List.empty)
}
@ -308,18 +309,19 @@ class TransactionPostgresDataHandlerSpec extends PostgresDataHandlerSpec with Be
None, None)
)
val transaction = Transaction(
newTxid,
blockhash,
321,
Size(1000),
val transaction = Transaction.HasIO(
Transaction(
newTxid,
blockhash,
321,
Size(1000)),
inputs,
outputs)
val newTxid2 = randomTransactionId
val newAddress = randomAddress
val transaction2 = transaction.copy(
id = newTxid2,
transaction = transaction.transaction.copy(id = newTxid2),
inputs = List(
Transaction.Input(
fromTxid = transaction.id,
@ -394,7 +396,7 @@ class TransactionPostgresDataHandlerSpec extends PostgresDataHandlerSpec with Be
result.isGood mustEqual true
}
private def createBlock(block: Block, transactions: List[Transaction]) = {
private def createBlock(block: Block, transactions: List[Transaction.HasIO]) = {
val result = ledgerDataHandler.push(block, transactions)
result.isGood mustEqual true
@ -411,7 +413,7 @@ class TransactionPostgresDataHandlerSpec extends PostgresDataHandlerSpec with Be
}
}
private def prepareTransaction(transaction: Transaction) = {
private def prepareTransaction(transaction: Transaction.HasIO) = {
try {
upsertTransaction(transaction)
} catch {
@ -419,7 +421,7 @@ class TransactionPostgresDataHandlerSpec extends PostgresDataHandlerSpec with Be
}
}
private def upsertTransaction(transaction: Transaction) = {
private def upsertTransaction(transaction: Transaction.HasIO) = {
database.withConnection { implicit conn =>
val maybe = transactionPostgresDAO.upsert(1, transaction)
Or.from(maybe, One(TransactionNotFoundError))

15
server/test/com/xsn/explorer/helpers/DataGenerator.scala

@ -112,13 +112,14 @@ trait DataGenerator {
def randomTransaction(
blockhash: Blockhash,
id: TransactionId = randomTransactionId,
utxos: List[Transaction.Output]): Transaction = {
Transaction(
id = id,
blockhash = blockhash,
time = java.lang.System.currentTimeMillis(),
Size(1000),
utxos: List[Transaction.Output]): Transaction.HasIO = {
Transaction.HasIO(
Transaction(
id = id,
blockhash = blockhash,
time = java.lang.System.currentTimeMillis(),
Size(1000)),
createInputs(utxos),
randomOutputs().map(_.copy(txid = id))
)

2
server/test/com/xsn/explorer/helpers/LedgerHelper.scala

@ -15,7 +15,7 @@ object LedgerHelper {
BlockLoader.getRPC("0000017ee4121cd8ae22f7321041ccb953d53828824217a9dc61a1c857facf85")
)
def getTransactions(block: rpc.Block): List[Transaction] = {
def getTransactions(block: rpc.Block): List[Transaction.HasIO] = {
block
.transactions
.map(_.string)

2
server/test/com/xsn/explorer/helpers/TransactionDummyDataHandler.scala

@ -14,7 +14,7 @@ class TransactionDummyDataHandler extends TransactionBlockingDataHandler {
override def getBy(address: Address, paginatedQuery: PaginatedQuery, ordering: FieldOrdering[TransactionField]): ApplicationResult[PaginatedResult[TransactionWithValues]] = ???
override def getBy(address: Address, limit: pagination.Limit, lastSeenTxid: Option[TransactionId], orderingCondition: OrderingCondition): ApplicationResult[List[Transaction]] = ???
override def getBy(address: Address, limit: pagination.Limit, lastSeenTxid: Option[TransactionId], orderingCondition: OrderingCondition): ApplicationResult[List[Transaction.HasIO]] = ???
override def getUnspentOutputs(address: Address): ApplicationResult[List[Transaction.Output]] = ???

Loading…
Cancel
Save