From 3630e24483fe825b0611b0005e584279f4e5d1bb Mon Sep 17 00:00:00 2001 From: Alexis Hernandez Date: Mon, 21 Jan 2019 22:25:51 -0700 Subject: [PATCH] server: Log timings while synchronizing blocks --- .../anorm/LedgerPostgresDataHandler.scala | 29 +++++++++++---- .../dao/TransactionInputPostgresDAO.scala | 8 +++++ .../dao/TransactionOutputPostgresDAO.scala | 6 ++++ .../anorm/dao/TransactionPostgresDAO.scala | 36 ++++++++++++++----- .../services/LedgerSynchronizerService.scala | 24 +++++++++---- 5 files changed, 83 insertions(+), 20 deletions(-) diff --git a/server/app/com/xsn/explorer/data/anorm/LedgerPostgresDataHandler.scala b/server/app/com/xsn/explorer/data/anorm/LedgerPostgresDataHandler.scala index 3a17886..c397513 100644 --- a/server/app/com/xsn/explorer/data/anorm/LedgerPostgresDataHandler.scala +++ b/server/app/com/xsn/explorer/data/anorm/LedgerPostgresDataHandler.scala @@ -12,6 +12,7 @@ import com.xsn.explorer.models.{Address, Balance, Transaction} import com.xsn.explorer.util.Extensions.ListOptionExt import javax.inject.Inject import org.scalactic.Good +import org.slf4j.LoggerFactory import play.api.db.Database class LedgerPostgresDataHandler @Inject() ( @@ -23,6 +24,8 @@ class LedgerPostgresDataHandler @Inject() ( extends LedgerBlockingDataHandler with AnormPostgresDataHandler { + private val logger = LoggerFactory.getLogger(this.getClass) + /** * Push a block into the database chain, note that even if the block is supposed * to have a next block, we remove the link because that block is not stored yet. @@ -31,6 +34,7 @@ class LedgerPostgresDataHandler @Inject() ( block: Block, transactions: List[Transaction]): ApplicationResult[Unit] = { + val start = System.currentTimeMillis() val result = withTransaction { implicit conn => val result = for { _ <- upsertBlockCascade(block.copy(nextBlockhash = None), transactions) @@ -47,6 +51,8 @@ class LedgerPostgresDataHandler @Inject() ( case _ => e } + val took = System.currentTimeMillis() - start + logger.info(s"Pushing block = ${block.hash}, took $took ms") result.badMap { _.map(fromError) } } @@ -75,11 +81,7 @@ class LedgerPostgresDataHandler @Inject() ( // balances balanceList = balances(transactions) - - _ <- balanceList - .map { b => balancePostgresDAO.upsert(b) } - .toList - .everything + _ <- insertBalanceBatch(balanceList).toList.everything // compute aggregated amount delta = balanceList.map(_.available).sum @@ -116,6 +118,16 @@ class LedgerPostgresDataHandler @Inject() ( } yield () } + private def insertBalanceBatch(balanceList: Iterable[Balance])(implicit conn: Connection) = { + val start = System.currentTimeMillis() + val result = balanceList.map { b => balancePostgresDAO.upsert(b) } + + val took = System.currentTimeMillis() - start + logger.info(s"Inserting balance batch, size = ${balanceList.size}, took = $took ms") + + result + } + private def spendMap(transactions: List[Transaction]): Map[Address, BigDecimal] = { transactions .map(_.inputs) @@ -139,6 +151,7 @@ class LedgerPostgresDataHandler @Inject() ( } private def balances(transactions: List[Transaction]) = { + val start = System.currentTimeMillis() val spentList = spendMap(transactions).map { case (address, spent) => Balance(address, spent = spent) } @@ -147,10 +160,14 @@ class LedgerPostgresDataHandler @Inject() ( Balance(address, received = received) } - (spentList ++ receiveList) + val result = (spentList ++ receiveList) .groupBy(_.address) .mapValues { _.reduce(mergeBalances) } .values + + val took = System.currentTimeMillis() - start + logger.info(s"Computing balances for transaction batch, size = ${transactions.size}, took = $took ms") + result } private def mergeBalances(a: Balance, b: Balance): Balance = { diff --git a/server/app/com/xsn/explorer/data/anorm/dao/TransactionInputPostgresDAO.scala b/server/app/com/xsn/explorer/data/anorm/dao/TransactionInputPostgresDAO.scala index b13f64e..6aa28b6 100644 --- a/server/app/com/xsn/explorer/data/anorm/dao/TransactionInputPostgresDAO.scala +++ b/server/app/com/xsn/explorer/data/anorm/dao/TransactionInputPostgresDAO.scala @@ -5,9 +5,12 @@ import java.sql.Connection import anorm._ import com.xsn.explorer.data.anorm.parsers.TransactionParsers._ import com.xsn.explorer.models.{Address, Transaction, TransactionId} +import org.slf4j.LoggerFactory class TransactionInputPostgresDAO { + private val logger = LoggerFactory.getLogger(this.getClass) + def batchInsertInputs( inputs: List[(TransactionId, Transaction.Input)])( implicit conn: Connection): Option[List[(TransactionId, Transaction.Input)]] = { @@ -16,6 +19,8 @@ class TransactionInputPostgresDAO { case Nil => Some(inputs) case _ => + + val start = System.currentTimeMillis() val params = inputs.map { case (txid, input) => List( 'txid -> txid.string: NamedParameter, @@ -39,6 +44,9 @@ class TransactionInputPostgresDAO { val success = batch.execute().forall(_ == 1) + val took = System.currentTimeMillis() - start + logger.info(s"Inserting input batch, size = ${inputs.size}, took = $took ms") + if (success) { Some(inputs) } else { diff --git a/server/app/com/xsn/explorer/data/anorm/dao/TransactionOutputPostgresDAO.scala b/server/app/com/xsn/explorer/data/anorm/dao/TransactionOutputPostgresDAO.scala index 6a7adfa..d8a40f4 100644 --- a/server/app/com/xsn/explorer/data/anorm/dao/TransactionOutputPostgresDAO.scala +++ b/server/app/com/xsn/explorer/data/anorm/dao/TransactionOutputPostgresDAO.scala @@ -5,9 +5,12 @@ import java.sql.Connection import anorm._ import com.xsn.explorer.data.anorm.parsers.TransactionParsers._ import com.xsn.explorer.models.{Address, Transaction, TransactionId} +import org.slf4j.LoggerFactory class TransactionOutputPostgresDAO { + private val logger = LoggerFactory.getLogger(this.getClass) + def getUnspentOutputs(address: Address)(implicit conn: Connection): List[Transaction.Output] = { SQL( """ @@ -29,6 +32,7 @@ class TransactionOutputPostgresDAO { outputs match { case Nil => Some(outputs) case _ => + val start = System.currentTimeMillis() val params = outputs.map { output => List( 'txid -> output.txid.string: NamedParameter, @@ -53,6 +57,8 @@ class TransactionOutputPostgresDAO { val success = batch.execute().forall(_ == 1) + val took = System.currentTimeMillis() - start + logger.info(s"Inserting output batch, size = ${outputs.size}, took = $took ms") if (success) { Some(outputs) } else { diff --git a/server/app/com/xsn/explorer/data/anorm/dao/TransactionPostgresDAO.scala b/server/app/com/xsn/explorer/data/anorm/dao/TransactionPostgresDAO.scala index 8ab9d22..cf84336 100644 --- a/server/app/com/xsn/explorer/data/anorm/dao/TransactionPostgresDAO.scala +++ b/server/app/com/xsn/explorer/data/anorm/dao/TransactionPostgresDAO.scala @@ -10,6 +10,7 @@ import com.xsn.explorer.data.anorm.parsers.TransactionParsers._ import com.xsn.explorer.models._ import com.xsn.explorer.models.fields.TransactionField import javax.inject.Inject +import org.slf4j.LoggerFactory class TransactionPostgresDAO @Inject() ( transactionInputDAO: TransactionInputPostgresDAO, @@ -17,6 +18,8 @@ class TransactionPostgresDAO @Inject() ( addressTransactionDetailsDAO: AddressTransactionDetailsPostgresDAO, fieldOrderingSQLInterpreter: FieldOrderingSQLInterpreter) { + private val logger = LoggerFactory.getLogger(this.getClass) + /** * NOTE: Ensure the connection has an open transaction. */ @@ -40,22 +43,37 @@ class TransactionPostgresDAO @Inject() ( inputs = transactions.flatMap { tx => tx.inputs.map(tx.id -> _) } _ <- transactionInputDAO.batchInsertInputs(inputs) } yield { - val extra = for { - tx <- transactions - _ <- addressTransactionDetailsDAO.batchInsertDetails(tx) - _ <- transactionOutputDAO.batchSpend(tx.id, tx.inputs) - } yield tx - - assert(extra.size == transactions.size, "Not all transactions were inserted properly") - + insertDetails(transactions) + spend(transactions) r } } + private def insertDetails(transactions: List[Transaction])(implicit conn: Connection): Unit = { + val start = System.currentTimeMillis() + val detailsResult = transactions.map(addressTransactionDetailsDAO.batchInsertDetails) + val took = System.currentTimeMillis() - start + + logger.info(s"Inserting address details batch, size = ${transactions.size}, took = $took ms") + + assert(detailsResult.forall(_.isDefined), "Inserting address details batch failed") + } + + private def spend(transactions: List[Transaction])(implicit conn: Connection): Unit = { + val start = System.currentTimeMillis() + val spendResult = transactions.map { tx => transactionOutputDAO.batchSpend(tx.id, tx.inputs) } + val took = System.currentTimeMillis() - start + + logger.info(s"Spending transaction batch, size = ${transactions.size}, took = $took ms") + + assert(spendResult.forall(_.isDefined), "Spending inputs batch failed") + } + private def batchInsert(transactions: List[Transaction])(implicit conn: Connection): Option[List[Transaction]] = { transactions match { case Nil => Some(transactions) case _ => + val start = System.currentTimeMillis() val params = transactions.zipWithIndex.map { case (transaction, index) => List( 'txid -> transaction.id.string: NamedParameter, @@ -78,6 +96,8 @@ class TransactionPostgresDAO @Inject() ( val success = batch.execute().forall(_ == 1) + val took = System.currentTimeMillis() - start + logger.info(s"Inserting transaction batch, size = ${transactions.size}, took = $took ms") if (success) { Some(transactions) } else { diff --git a/server/app/com/xsn/explorer/services/LedgerSynchronizerService.scala b/server/app/com/xsn/explorer/services/LedgerSynchronizerService.scala index 50af8e1..c9b6cbc 100644 --- a/server/app/com/xsn/explorer/services/LedgerSynchronizerService.scala +++ b/server/app/com/xsn/explorer/services/LedgerSynchronizerService.scala @@ -31,8 +31,8 @@ class LedgerSynchronizerService @Inject() ( */ def synchronize(blockhash: Blockhash): FutureApplicationResult[Unit] = { val result = for { - block <- xsnService.getBlock(blockhash).toFutureOr - transactions <- transactionService.getTransactions(block.transactions).toFutureOr + x <- getRPCBlock(blockhash).toFutureOr + (block, transactions) = x _ <- synchronize(block, transactions).toFutureOr } yield () @@ -96,8 +96,8 @@ class LedgerSynchronizerService @Inject() ( logger.info(s"Reorganization to push block ${newBlock.height}, hash = ${newBlock.hash}") val result = for { blockhash <- newBlock.previousBlockhash.toFutureOr(BlockNotFoundError) - previousBlock <- xsnService.getBlock(blockhash).toFutureOr - previousTransactions <- transactionService.getTransactions(previousBlock.transactions).toFutureOr + x <- getRPCBlock(blockhash).toFutureOr + (previousBlock, previousTransactions) = x _ <- synchronize(previousBlock, previousTransactions).toFutureOr _ <- synchronize(newBlock, newTransactions).toFutureOr } yield () @@ -147,8 +147,8 @@ class LedgerSynchronizerService @Inject() ( val result = for { _ <- previous.toFutureOr blockhash <- xsnService.getBlockhash(Height(height)).toFutureOr - block <- xsnService.getBlock(blockhash).toFutureOr - transactions <- transactionService.getTransactions(block.transactions).toFutureOr + x <- getRPCBlock(blockhash).toFutureOr + (block, transactions) = x _ <- synchronize(block, transactions).toFutureOr } yield () @@ -156,6 +156,18 @@ class LedgerSynchronizerService @Inject() ( } } + private def getRPCBlock(blockhash: Blockhash): FutureApplicationResult[(Block, List[Transaction])] = { + val start = System.currentTimeMillis() + val result = for { + block <- xsnService.getBlock(blockhash).toFutureOr + transactions <- transactionService.getTransactions(block.transactions).toFutureOr + took = System.currentTimeMillis() - start + _ = logger.info(s"Retrieving block = $blockhash, took $took ms") + } yield (block, transactions) + + result.toFuture + } + /** * Trim the ledger until the given block height, if the height is 4, * the last stored block will be 3.