Browse Source

server: Insert transaction batch while synchronizing the ledger

prometheus-integration
Alexis Hernandez 6 years ago
parent
commit
447e31e55c
  1. 7
      server/app/com/xsn/explorer/data/anorm/LedgerPostgresDataHandler.scala
  2. 129
      server/app/com/xsn/explorer/data/anorm/dao/TransactionPostgresDAO.scala
  3. 19
      server/test/com/xsn/explorer/data/TransactionPostgresDataHandlerSpec.scala

7
server/app/com/xsn/explorer/data/anorm/LedgerPostgresDataHandler.scala

@ -70,11 +70,8 @@ class LedgerPostgresDataHandler @Inject() (
_ <- deleteBlockCascade(block).orElse(Some(())) _ <- deleteBlockCascade(block).orElse(Some(()))
_ <- blockPostgresDAO.insert(block) _ <- blockPostgresDAO.insert(block)
// transactions // batch insert
_ <- transactions _ <- transactionPostgresDAO.insert(transactions)
.zipWithIndex
.map { case (tx, index) => transactionPostgresDAO.upsert(index, tx) }
.everything
// balances // balances
balanceList = balances(transactions) balanceList = balances(transactions)

129
server/app/com/xsn/explorer/data/anorm/dao/TransactionPostgresDAO.scala

@ -19,11 +19,67 @@ class TransactionPostgresDAO @Inject() (fieldOrderingSQLInterpreter: FieldOrderi
def upsert(index: Int, transaction: Transaction)(implicit conn: Connection): Option[Transaction] = { def upsert(index: Int, transaction: Transaction)(implicit conn: Connection): Option[Transaction] = {
for { for {
partialTx <- upsertTransaction(index, transaction) partialTx <- upsertTransaction(index, transaction)
inputs <- insertInputs(transaction.id, transaction.inputs) _ <- batchInsertOutputs(transaction.outputs)
outputs <- insertOutputs(transaction.id, transaction.outputs) _ <- batchInsertInputs(transaction.inputs.map(transaction.id -> _))
_ <- spend(transaction.id, inputs) _ <- batchSpend(transaction.id, transaction.inputs)
_ = insertDetails(transaction) _ <- batchInsertDetails(transaction)
} yield partialTx.copy(inputs = inputs, outputs = outputs) } yield partialTx.copy(inputs = transaction.inputs, outputs = transaction.outputs)
}
def insert(transactions: List[Transaction])(implicit conn: Connection): Option[List[Transaction]] = {
for {
r <- batchInsert(transactions)
outputs = transactions.flatMap(_.outputs)
_ <- batchInsertOutputs(outputs)
inputs = transactions.flatMap { tx => tx.inputs.map(tx.id -> _) }
_ <- batchInsertInputs(inputs)
} yield {
val extra = for {
tx <- transactions
_ <- batchInsertDetails(tx)
_ <- batchSpend(tx.id, tx.inputs)
} yield tx
assert(extra.size == transactions.size, "Not all transactions were inserted properly")
r
}
}
private def batchInsert(transactions: List[Transaction])(implicit conn: Connection): Option[List[Transaction]] = {
transactions match {
case Nil => Some(transactions)
case _ =>
val params = transactions.zipWithIndex.map { case (transaction, index) =>
List(
'txid -> transaction.id.string: NamedParameter,
'blockhash -> transaction.blockhash.string: NamedParameter,
'time -> transaction.time: NamedParameter,
'size -> transaction.size.int: NamedParameter,
'index -> index: NamedParameter)
}
val batch = BatchSql(
"""
|INSERT INTO transactions
| (txid, blockhash, time, size, index)
|VALUES
| ({txid}, {blockhash}, {time}, {size}, {index})
""".stripMargin,
params.head,
params.tail: _*
)
val success = batch.execute().forall(_ == 1)
if (success) {
Some(transactions)
} else {
None
}
}
} }
/** /**
@ -299,18 +355,17 @@ class TransactionPostgresDAO @Inject() (fieldOrderingSQLInterpreter: FieldOrderi
).as(parseTransaction.singleOpt).flatten ).as(parseTransaction.singleOpt).flatten
} }
private def insertInputs( private def batchInsertInputs(
transactionId: TransactionId, inputs: List[(TransactionId, Transaction.Input)])(
inputs: List[Transaction.Input])( implicit conn: Connection): Option[List[(TransactionId, Transaction.Input)]] = {
implicit conn: Connection): Option[List[Transaction.Input]] = {
inputs match { inputs match {
case Nil => Some(inputs) case Nil => Some(inputs)
case _ => case _ =>
val params = inputs.map { input => val params = inputs.map { case (txid, input) =>
List( List(
'txid -> transactionId.string: NamedParameter, 'txid -> txid.string: NamedParameter,
'index -> input.index: NamedParameter, 'index -> input.index: NamedParameter,
'from_txid -> input.fromTxid.string: NamedParameter, 'from_txid -> input.fromTxid.string: NamedParameter,
'from_output_index -> input.fromOutputIndex: NamedParameter, 'from_output_index -> input.fromOutputIndex: NamedParameter,
@ -339,8 +394,7 @@ class TransactionPostgresDAO @Inject() (fieldOrderingSQLInterpreter: FieldOrderi
} }
} }
private def insertOutputs( private def batchInsertOutputs(
transactionId: TransactionId,
outputs: List[Transaction.Output])( outputs: List[Transaction.Output])(
implicit conn: Connection): Option[List[Transaction.Output]] = { implicit conn: Connection): Option[List[Transaction.Output]] = {
@ -349,7 +403,7 @@ class TransactionPostgresDAO @Inject() (fieldOrderingSQLInterpreter: FieldOrderi
case _ => case _ =>
val params = outputs.map { output => val params = outputs.map { output =>
List( List(
'txid -> transactionId.string: NamedParameter, 'txid -> output.txid.string: NamedParameter,
'index -> output.index: NamedParameter, 'index -> output.index: NamedParameter,
'value -> output.value: NamedParameter, 'value -> output.value: NamedParameter,
'address -> output.address.string: NamedParameter, 'address -> output.address.string: NamedParameter,
@ -405,7 +459,7 @@ class TransactionPostgresDAO @Inject() (fieldOrderingSQLInterpreter: FieldOrderi
result.flatten result.flatten
} }
private def insertDetails(transaction: Transaction)(implicit conn: Connection): List[AddressTransactionDetails] = { private def batchInsertDetails(transaction: Transaction)(implicit conn: Connection): Option[Unit] = {
val received = transaction val received = transaction
.outputs .outputs
.groupBy(_.address) .groupBy(_.address)
@ -418,7 +472,7 @@ class TransactionPostgresDAO @Inject() (fieldOrderingSQLInterpreter: FieldOrderi
.mapValues { inputs => inputs.map(_.value).sum } .mapValues { inputs => inputs.map(_.value).sum }
.map { case (address, value) => AddressTransactionDetails(address, transaction.id, time = transaction.time, sent = value) } .map { case (address, value) => AddressTransactionDetails(address, transaction.id, time = transaction.time, sent = value) }
val result = (received ++ sent) val details = (received ++ sent)
.groupBy(_.address) .groupBy(_.address)
.mapValues { .mapValues {
case head :: list => list.foldLeft(head) { (acc, current) => case head :: list => list.foldLeft(head) { (acc, current) =>
@ -426,27 +480,42 @@ class TransactionPostgresDAO @Inject() (fieldOrderingSQLInterpreter: FieldOrderi
} }
} }
.values .values
.map(d => insertDetails(d))
result.toList batchInsertDetails(details.toList)
} }
private def insertDetails(details: AddressTransactionDetails)(implicit conn: Connection): AddressTransactionDetails = { private def batchInsertDetails(details: List[AddressTransactionDetails])(implicit conn: Connection): Option[Unit] = {
SQL( details match {
case Nil => Some(())
case _ =>
val params = details.map { d =>
List(
'address -> d.address.string: NamedParameter,
'txid -> d.txid.string: NamedParameter,
'received -> d.received: NamedParameter,
'sent -> d.sent: NamedParameter,
'time -> d.time: NamedParameter)
}
val batch = BatchSql(
""" """
|INSERT INTO address_transaction_details |INSERT INTO address_transaction_details
| (address, txid, received, sent, time) | (address, txid, received, sent, time)
|VALUES |VALUES
| ({address}, {txid}, {received}, {sent}, {time}) | ({address}, {txid}, {received}, {sent}, {time})
|RETURNING address, txid, received, sent, time """.stripMargin,
""".stripMargin params.head,
).on( params.tail: _*
'address -> details.address.string, )
'txid -> details.txid.string,
'received -> details.received, val success = batch.execute().forall(_ == 1)
'sent -> details.sent,
'time -> details.time if (success) {
).as(parseAddressTransactionDetails.single) Some(())
} else {
None
}
}
} }
private def deleteDetails(txid: TransactionId)(implicit conn: Connection): List[AddressTransactionDetails] = { private def deleteDetails(txid: TransactionId)(implicit conn: Connection): List[AddressTransactionDetails] = {
@ -491,7 +560,7 @@ class TransactionPostgresDAO @Inject() (fieldOrderingSQLInterpreter: FieldOrderi
).as(parseTransactionOutput.*).flatten ).as(parseTransactionOutput.*).flatten
} }
private def spend(txid: TransactionId, inputs: List[Transaction.Input])(implicit conn: Connection): Option[Unit] = { private def batchSpend(txid: TransactionId, inputs: List[Transaction.Input])(implicit conn: Connection): Option[Unit] = {
inputs match { inputs match {
case Nil => Option(()) case Nil => Option(())
case _ => case _ =>

19
server/test/com/xsn/explorer/data/TransactionPostgresDataHandlerSpec.scala

@ -326,8 +326,18 @@ class TransactionPostgresDataHandlerSpec extends PostgresDataHandlerSpec with Be
val transaction2 = transaction.copy( val transaction2 = transaction.copy(
id = newTxid2, id = newTxid2,
inputs = List( inputs = List(
Transaction.Input(fromTxid = transaction.id, fromOutputIndex = 0, index = 0, value = transaction.outputs(0).value, address = newAddress), Transaction.Input(
Transaction.Input(fromTxid = transaction.id, fromOutputIndex = 1, index = 1, value = transaction.outputs(1).value, address = newAddress) fromTxid = transaction.id,
fromOutputIndex = 0,
index = 0,
value = transaction.outputs(0).value,
address = newAddress),
Transaction.Input(
fromTxid = transaction.id,
fromOutputIndex = 1,
index = 1,
value = transaction.outputs(1).value,
address = newAddress)
), ),
outputs = transaction.outputs.map(_.copy(txid = newTxid2)) outputs = transaction.outputs.map(_.copy(txid = newTxid2))
) )
@ -341,7 +351,6 @@ class TransactionPostgresDataHandlerSpec extends PostgresDataHandlerSpec with Be
transactions = transactions.map(_.id)) transactions = transactions.map(_.id))
createBlock(block, transactions) createBlock(block, transactions)
val newTx = transactions(1)
// check that the outputs are properly spent // check that the outputs are properly spent
database.withConnection { implicit conn => database.withConnection { implicit conn =>
@ -355,7 +364,7 @@ class TransactionPostgresDataHandlerSpec extends PostgresDataHandlerSpec with Be
""".stripMargin """.stripMargin
).as(SqlParser.str("spent_on").*) ).as(SqlParser.str("spent_on").*)
spentOn.foreach(_ mustEqual newTx.id.string) spentOn.foreach(_ mustEqual transaction2.id.string)
} }
// check that the inputs are linked to the correct output // check that the inputs are linked to the correct output
@ -366,7 +375,7 @@ class TransactionPostgresDataHandlerSpec extends PostgresDataHandlerSpec with Be
s""" s"""
|SELECT from_txid, from_output_index |SELECT from_txid, from_output_index
|FROM transaction_inputs |FROM transaction_inputs
|WHERE txid = '${newTx.id.string}' |WHERE txid = '${transaction2.id.string}'
""".stripMargin """.stripMargin
) )

Loading…
Cancel
Save