From 8f0645b30a75b931ba923b6a14e2708c7de7b069 Mon Sep 17 00:00:00 2001 From: Alexis Hernandez Date: Fri, 18 Jan 2019 22:42:18 -0700 Subject: [PATCH] server: Batch update while spending outputs on TransactionPostgresDAO --- .../anorm/dao/TransactionPostgresDAO.scala | 60 ++++++++----- .../TransactionPostgresDataHandlerSpec.scala | 89 ++++++++++++++++++- 2 files changed, 123 insertions(+), 26 deletions(-) diff --git a/server/app/com/xsn/explorer/data/anorm/dao/TransactionPostgresDAO.scala b/server/app/com/xsn/explorer/data/anorm/dao/TransactionPostgresDAO.scala index e29ee6a..05e2eff 100644 --- a/server/app/com/xsn/explorer/data/anorm/dao/TransactionPostgresDAO.scala +++ b/server/app/com/xsn/explorer/data/anorm/dao/TransactionPostgresDAO.scala @@ -27,29 +27,6 @@ class TransactionPostgresDAO @Inject() (fieldOrderingSQLInterpreter: FieldOrderi } yield partialTx.copy(inputs = inputs, outputs = outputs) } - private def spend(txid: TransactionId, inputs: List[Transaction.Input])(implicit conn: Connection): Option[Unit] = { - val result = inputs.flatMap { input => spend(txid, input) } - Option(result) - .filter(_.size == inputs.size) - .map(_ => ()) - } - - private def spend(txid: TransactionId, input: Transaction.Input)(implicit conn: Connection): Option[Transaction.Output] = { - SQL( - """ - |UPDATE transaction_outputs - |SET spent_on = {spent_on} - |WHERE txid = {output_txid} AND - | index = {output_index} - |RETURNING txid, index, hex_script, value, address, tpos_owner_address, tpos_merchant_address - """.stripMargin - ).on( - 'spent_on -> txid.string, - 'output_txid -> input.fromTxid.string, - 'output_index -> input.fromOutputIndex - ).as(parseTransactionOutput.single) - } - /** * NOTE: Ensure the connection has an open transaction. */ @@ -539,6 +516,43 @@ class TransactionPostgresDAO @Inject() (fieldOrderingSQLInterpreter: FieldOrderi ).as(parseTransactionOutput.*).flatten } + private def spend(txid: TransactionId, inputs: List[Transaction.Input])(implicit conn: Connection): Option[Unit] = { + inputs match { + case Nil => Option(()) + case _ => + val txidArray = inputs + .map { input => s"'${input.fromTxid.string}'" } + .mkString("[", ",", "]") + + val indexArray = inputs.map(_.fromOutputIndex).mkString("[", ",", "]") + + // Note: the TransactionId must meet a safe format, this approach speeds up the inserts + val result = SQL( + s""" + |UPDATE transaction_outputs t + |SET spent_on = tmp.spent_on + |FROM ( + | WITH CTE AS ( + | SELECT '${txid.string}' AS spent_on + | ) + | SELECT spent_on, txid, index + | FROM CTE CROSS JOIN (SELECT + | UNNEST(array$indexArray) AS index, + | UNNEST(array$txidArray) AS txid) x + |) AS tmp + |WHERE t.txid = tmp.txid AND + | t.index = tmp.index + """.stripMargin + ).executeUpdate() + + if (result == inputs.size) { + Option(()) + } else { + None + } + } + } + private def toSQL(condition: OrderingCondition): String = condition match { case OrderingCondition.AscendingOrder => "ASC" case OrderingCondition.DescendingOrder => "DESC" diff --git a/server/test/com/xsn/explorer/data/TransactionPostgresDataHandlerSpec.scala b/server/test/com/xsn/explorer/data/TransactionPostgresDataHandlerSpec.scala index b828ad2..970fa58 100644 --- a/server/test/com/xsn/explorer/data/TransactionPostgresDataHandlerSpec.scala +++ b/server/test/com/xsn/explorer/data/TransactionPostgresDataHandlerSpec.scala @@ -17,11 +17,12 @@ import org.scalatest.BeforeAndAfter class TransactionPostgresDataHandlerSpec extends PostgresDataHandlerSpec with BeforeAndAfter { - lazy val dataHandler = new TransactionPostgresDataHandler(database, new TransactionPostgresDAO(new FieldOrderingSQLInterpreter)) + lazy val transactionPostgresDAO = new TransactionPostgresDAO(new FieldOrderingSQLInterpreter) + lazy val dataHandler = new TransactionPostgresDataHandler(database, transactionPostgresDAO) lazy val ledgerDataHandler = new LedgerPostgresDataHandler( database, new BlockPostgresDAO(new FieldOrderingSQLInterpreter), - new TransactionPostgresDAO(new FieldOrderingSQLInterpreter), + transactionPostgresDAO, new BalancePostgresDAO(new FieldOrderingSQLInterpreter), new AggregatedAmountPostgresDAO) @@ -113,7 +114,7 @@ class TransactionPostgresDataHandlerSpec extends PostgresDataHandlerSpec with Be } private def upsertTransaction(transaction: Transaction) = { - val dao = new TransactionPostgresDAO(new FieldOrderingSQLInterpreter) + val dao = transactionPostgresDAO database.withConnection { implicit conn => val maybe = dao.upsert(1, transaction) Or.from(maybe, One(TransactionNotFoundError)) @@ -422,7 +423,89 @@ class TransactionPostgresDataHandlerSpec extends PostgresDataHandlerSpec with Be testOrdering("desc", OrderingCondition.DescendingOrder) testOrdering("asc", OrderingCondition.AscendingOrder) + } + + "spending an output" should { + "use the right values" in { + val address = createAddress("XxQ7j37LfuXgsLD5DZAwFKhT3s2ZMkW86F") + val blockhash = createBlockhash("0000000000bdbb23e28f79a49d29b41429737c6c7e15df40d1b1f1b35907ae34") + val inputs = List( + Transaction.Input(dummyTransaction.id, 0, 1, 100, address), + Transaction.Input(dummyTransaction.id, 1, 2, 200, address) + ) + + val outputs = List( + Transaction.Output(createTransactionId("ad1320dcea2fdaa357aac6eab00695cf07b487e34113598909f625c24629c981"), 0, BigDecimal(50), createAddress("Xbh5pJdBNm8J9PxnEmwVcuQKRmZZ7DkpcF"), HexString.from("00").get, None, None), + Transaction.Output( + createTransactionId("ad9330dcea2fdaa357aac6eab00695cf07b487e34113598909f625c24629c981"), + 1, + BigDecimal(250), + createAddress("Xbh5pJdBNm8J9PxnEmwVcuQKRmZZ7DkpcF"), + HexString.from("00").get, + None, None) + ) + + val transaction = Transaction( + createTransactionId("00051e4fe89466faa734d6207a7ef6115fa1dd33f7156b006fafc6bb85a79eb8"), + blockhash, + 321, + Size(1000), + inputs, + outputs) + + val newAddress = createAddress("Xbh5pJdBNm8J9PxnEmwVcuQKRmZZ7Dkpcx") + val transactions = List( + transaction, + transaction.copy( + id = createTransactionId("00041e4fe89466faa734d6207a7ef6115fa1dd33f7156b006fafc6bb85a79eb8"), + inputs = List( + Transaction.Input(fromTxid = transaction.id, fromOutputIndex = 0, index = 0, value = transaction.outputs(0).value, address = newAddress), + Transaction.Input(fromTxid = transaction.id, fromOutputIndex = 1, index = 1, value = transaction.outputs(1).value, address = newAddress) + ) + )) + + val block = this.block.copy( + hash = blockhash, + height = Height(10), + transactions = transactions.map(_.id)) + + createBlock(block, transactions) + val newTx = transactions(1) + // check that the outputs are properly spent + database.withConnection { implicit conn => + import _root_.anorm._ + + val spentOn = SQL( + s""" + |SELECT spent_on + |FROM transaction_outputs + |WHERE txid = '${transaction.id.string}' + """.stripMargin + ).as(SqlParser.str("spent_on").*) + + spentOn.foreach(_ mustEqual newTx.id.string) + } + + // check that the inputs are linked to the correct output + database.withConnection { implicit conn => + import _root_.anorm._ + + val query = SQL( + s""" + |SELECT from_txid, from_output_index + |FROM transaction_inputs + |WHERE txid = '${newTx.id.string}' + """.stripMargin + ) + + val fromTxid = query.as(SqlParser.str("from_txid").*) + fromTxid.foreach(_ mustEqual transaction.id.string) + + val fromOutputIndex = query.as(SqlParser.int("from_output_index").*) + fromOutputIndex.sorted mustEqual List(0, 1) + } + } } private def createBlock(block: Block) = {