From 447e31e55c348082d3590f60e37f1675e497fe18 Mon Sep 17 00:00:00 2001 From: Alexis Hernandez Date: Sat, 19 Jan 2019 22:52:49 -0700 Subject: [PATCH] server: Insert transaction batch while synchronizing the ledger --- .../anorm/LedgerPostgresDataHandler.scala | 7 +- .../anorm/dao/TransactionPostgresDAO.scala | 139 +++++++++++++----- .../TransactionPostgresDataHandlerSpec.scala | 19 ++- 3 files changed, 120 insertions(+), 45 deletions(-) diff --git a/server/app/com/xsn/explorer/data/anorm/LedgerPostgresDataHandler.scala b/server/app/com/xsn/explorer/data/anorm/LedgerPostgresDataHandler.scala index fdaa579..3a17886 100644 --- a/server/app/com/xsn/explorer/data/anorm/LedgerPostgresDataHandler.scala +++ b/server/app/com/xsn/explorer/data/anorm/LedgerPostgresDataHandler.scala @@ -70,11 +70,8 @@ class LedgerPostgresDataHandler @Inject() ( _ <- deleteBlockCascade(block).orElse(Some(())) _ <- blockPostgresDAO.insert(block) - // transactions - _ <- transactions - .zipWithIndex - .map { case (tx, index) => transactionPostgresDAO.upsert(index, tx) } - .everything + // batch insert + _ <- transactionPostgresDAO.insert(transactions) // balances balanceList = balances(transactions) diff --git a/server/app/com/xsn/explorer/data/anorm/dao/TransactionPostgresDAO.scala b/server/app/com/xsn/explorer/data/anorm/dao/TransactionPostgresDAO.scala index 40d47e5..7c4b72a 100644 --- a/server/app/com/xsn/explorer/data/anorm/dao/TransactionPostgresDAO.scala +++ b/server/app/com/xsn/explorer/data/anorm/dao/TransactionPostgresDAO.scala @@ -19,11 +19,67 @@ class TransactionPostgresDAO @Inject() (fieldOrderingSQLInterpreter: FieldOrderi def upsert(index: Int, transaction: Transaction)(implicit conn: Connection): Option[Transaction] = { for { partialTx <- upsertTransaction(index, transaction) - inputs <- insertInputs(transaction.id, transaction.inputs) - outputs <- insertOutputs(transaction.id, transaction.outputs) - _ <- spend(transaction.id, inputs) - _ = insertDetails(transaction) - } yield partialTx.copy(inputs = inputs, outputs = outputs) + _ <- batchInsertOutputs(transaction.outputs) + _ <- batchInsertInputs(transaction.inputs.map(transaction.id -> _)) + _ <- batchSpend(transaction.id, transaction.inputs) + _ <- batchInsertDetails(transaction) + } yield partialTx.copy(inputs = transaction.inputs, outputs = transaction.outputs) + } + + def insert(transactions: List[Transaction])(implicit conn: Connection): Option[List[Transaction]] = { + for { + r <- batchInsert(transactions) + + outputs = transactions.flatMap(_.outputs) + _ <- batchInsertOutputs(outputs) + + inputs = transactions.flatMap { tx => tx.inputs.map(tx.id -> _) } + _ <- batchInsertInputs(inputs) + } yield { + val extra = for { + tx <- transactions + _ <- batchInsertDetails(tx) + _ <- batchSpend(tx.id, tx.inputs) + } yield tx + + assert(extra.size == transactions.size, "Not all transactions were inserted properly") + + r + } + } + + private def batchInsert(transactions: List[Transaction])(implicit conn: Connection): Option[List[Transaction]] = { + transactions match { + case Nil => Some(transactions) + case _ => + val params = transactions.zipWithIndex.map { case (transaction, index) => + List( + 'txid -> transaction.id.string: NamedParameter, + 'blockhash -> transaction.blockhash.string: NamedParameter, + 'time -> transaction.time: NamedParameter, + 'size -> transaction.size.int: NamedParameter, + 'index -> index: NamedParameter) + } + + val batch = BatchSql( + """ + |INSERT INTO transactions + | (txid, blockhash, time, size, index) + |VALUES + | ({txid}, {blockhash}, {time}, {size}, {index}) + """.stripMargin, + params.head, + params.tail: _* + ) + + val success = batch.execute().forall(_ == 1) + + if (success) { + Some(transactions) + } else { + None + } + } } /** @@ -299,18 +355,17 @@ class TransactionPostgresDAO @Inject() (fieldOrderingSQLInterpreter: FieldOrderi ).as(parseTransaction.singleOpt).flatten } - private def insertInputs( - transactionId: TransactionId, - inputs: List[Transaction.Input])( - implicit conn: Connection): Option[List[Transaction.Input]] = { + private def batchInsertInputs( + inputs: List[(TransactionId, Transaction.Input)])( + implicit conn: Connection): Option[List[(TransactionId, Transaction.Input)]] = { inputs match { case Nil => Some(inputs) case _ => - val params = inputs.map { input => + val params = inputs.map { case (txid, input) => List( - 'txid -> transactionId.string: NamedParameter, + 'txid -> txid.string: NamedParameter, 'index -> input.index: NamedParameter, 'from_txid -> input.fromTxid.string: NamedParameter, 'from_output_index -> input.fromOutputIndex: NamedParameter, @@ -339,8 +394,7 @@ class TransactionPostgresDAO @Inject() (fieldOrderingSQLInterpreter: FieldOrderi } } - private def insertOutputs( - transactionId: TransactionId, + private def batchInsertOutputs( outputs: List[Transaction.Output])( implicit conn: Connection): Option[List[Transaction.Output]] = { @@ -349,7 +403,7 @@ class TransactionPostgresDAO @Inject() (fieldOrderingSQLInterpreter: FieldOrderi case _ => val params = outputs.map { output => List( - 'txid -> transactionId.string: NamedParameter, + 'txid -> output.txid.string: NamedParameter, 'index -> output.index: NamedParameter, 'value -> output.value: NamedParameter, 'address -> output.address.string: NamedParameter, @@ -405,7 +459,7 @@ class TransactionPostgresDAO @Inject() (fieldOrderingSQLInterpreter: FieldOrderi result.flatten } - private def insertDetails(transaction: Transaction)(implicit conn: Connection): List[AddressTransactionDetails] = { + private def batchInsertDetails(transaction: Transaction)(implicit conn: Connection): Option[Unit] = { val received = transaction .outputs .groupBy(_.address) @@ -418,7 +472,7 @@ class TransactionPostgresDAO @Inject() (fieldOrderingSQLInterpreter: FieldOrderi .mapValues { inputs => inputs.map(_.value).sum } .map { case (address, value) => AddressTransactionDetails(address, transaction.id, time = transaction.time, sent = value) } - val result = (received ++ sent) + val details = (received ++ sent) .groupBy(_.address) .mapValues { case head :: list => list.foldLeft(head) { (acc, current) => @@ -426,27 +480,42 @@ class TransactionPostgresDAO @Inject() (fieldOrderingSQLInterpreter: FieldOrderi } } .values - .map(d => insertDetails(d)) - result.toList + batchInsertDetails(details.toList) } - private def insertDetails(details: AddressTransactionDetails)(implicit conn: Connection): AddressTransactionDetails = { - SQL( - """ - |INSERT INTO address_transaction_details - | (address, txid, received, sent, time) - |VALUES - | ({address}, {txid}, {received}, {sent}, {time}) - |RETURNING address, txid, received, sent, time - """.stripMargin - ).on( - 'address -> details.address.string, - 'txid -> details.txid.string, - 'received -> details.received, - 'sent -> details.sent, - 'time -> details.time - ).as(parseAddressTransactionDetails.single) + private def batchInsertDetails(details: List[AddressTransactionDetails])(implicit conn: Connection): Option[Unit] = { + details match { + case Nil => Some(()) + case _ => + val params = details.map { d => + List( + 'address -> d.address.string: NamedParameter, + 'txid -> d.txid.string: NamedParameter, + 'received -> d.received: NamedParameter, + 'sent -> d.sent: NamedParameter, + 'time -> d.time: NamedParameter) + } + + val batch = BatchSql( + """ + |INSERT INTO address_transaction_details + | (address, txid, received, sent, time) + |VALUES + | ({address}, {txid}, {received}, {sent}, {time}) + """.stripMargin, + params.head, + params.tail: _* + ) + + val success = batch.execute().forall(_ == 1) + + if (success) { + Some(()) + } else { + None + } + } } private def deleteDetails(txid: TransactionId)(implicit conn: Connection): List[AddressTransactionDetails] = { @@ -491,7 +560,7 @@ class TransactionPostgresDAO @Inject() (fieldOrderingSQLInterpreter: FieldOrderi ).as(parseTransactionOutput.*).flatten } - private def spend(txid: TransactionId, inputs: List[Transaction.Input])(implicit conn: Connection): Option[Unit] = { + private def batchSpend(txid: TransactionId, inputs: List[Transaction.Input])(implicit conn: Connection): Option[Unit] = { inputs match { case Nil => Option(()) case _ => diff --git a/server/test/com/xsn/explorer/data/TransactionPostgresDataHandlerSpec.scala b/server/test/com/xsn/explorer/data/TransactionPostgresDataHandlerSpec.scala index a4ce17e..d976152 100644 --- a/server/test/com/xsn/explorer/data/TransactionPostgresDataHandlerSpec.scala +++ b/server/test/com/xsn/explorer/data/TransactionPostgresDataHandlerSpec.scala @@ -326,8 +326,18 @@ class TransactionPostgresDataHandlerSpec extends PostgresDataHandlerSpec with Be val transaction2 = transaction.copy( id = newTxid2, inputs = List( - Transaction.Input(fromTxid = transaction.id, fromOutputIndex = 0, index = 0, value = transaction.outputs(0).value, address = newAddress), - Transaction.Input(fromTxid = transaction.id, fromOutputIndex = 1, index = 1, value = transaction.outputs(1).value, address = newAddress) + Transaction.Input( + fromTxid = transaction.id, + fromOutputIndex = 0, + index = 0, + value = transaction.outputs(0).value, + address = newAddress), + Transaction.Input( + fromTxid = transaction.id, + fromOutputIndex = 1, + index = 1, + value = transaction.outputs(1).value, + address = newAddress) ), outputs = transaction.outputs.map(_.copy(txid = newTxid2)) ) @@ -341,7 +351,6 @@ class TransactionPostgresDataHandlerSpec extends PostgresDataHandlerSpec with Be transactions = transactions.map(_.id)) createBlock(block, transactions) - val newTx = transactions(1) // check that the outputs are properly spent database.withConnection { implicit conn => @@ -355,7 +364,7 @@ class TransactionPostgresDataHandlerSpec extends PostgresDataHandlerSpec with Be """.stripMargin ).as(SqlParser.str("spent_on").*) - spentOn.foreach(_ mustEqual newTx.id.string) + spentOn.foreach(_ mustEqual transaction2.id.string) } // check that the inputs are linked to the correct output @@ -366,7 +375,7 @@ class TransactionPostgresDataHandlerSpec extends PostgresDataHandlerSpec with Be s""" |SELECT from_txid, from_output_index |FROM transaction_inputs - |WHERE txid = '${newTx.id.string}' + |WHERE txid = '${transaction2.id.string}' """.stripMargin )