From 5ff45a0921eaf9201da37592522c0b5f07646052 Mon Sep 17 00:00:00 2001 From: Alexis Hernandez Date: Sun, 13 May 2018 11:24:16 -0500 Subject: [PATCH] server: Minor updates to the seeding process --- .../anorm/BalancePostgresDataHandler.scala | 2 +- .../data/anorm/DatabasePostgresSeeder.scala | 60 +++++++++---------- .../data/anorm/dao/BalancePostgresDAO.scala | 25 +++++--- 3 files changed, 46 insertions(+), 41 deletions(-) diff --git a/server/app/com/xsn/explorer/data/anorm/BalancePostgresDataHandler.scala b/server/app/com/xsn/explorer/data/anorm/BalancePostgresDataHandler.scala index 8f35edf..5e272ee 100644 --- a/server/app/com/xsn/explorer/data/anorm/BalancePostgresDataHandler.scala +++ b/server/app/com/xsn/explorer/data/anorm/BalancePostgresDataHandler.scala @@ -18,7 +18,7 @@ class BalancePostgresDataHandler @Inject() ( extends BalanceBlockingDataHandler with AnormPostgresDataHandler { - override def upsert(balance: Balance): ApplicationResult[Balance] = withConnection { implicit conn => + override def upsert(balance: Balance): ApplicationResult[Balance] = withTransaction { implicit conn => val maybe = balancePostgresDAO.upsert(balance) Or.from(maybe, One(BalanceUnknownError)) diff --git a/server/app/com/xsn/explorer/data/anorm/DatabasePostgresSeeder.scala b/server/app/com/xsn/explorer/data/anorm/DatabasePostgresSeeder.scala index 328d196..67774e5 100644 --- a/server/app/com/xsn/explorer/data/anorm/DatabasePostgresSeeder.scala +++ b/server/app/com/xsn/explorer/data/anorm/DatabasePostgresSeeder.scala @@ -18,13 +18,13 @@ class DatabasePostgresSeeder @Inject() ( override val database: Database, blockPostgresDAO: BlockPostgresDAO, transactionPostgresDAO: TransactionPostgresDAO, - addressPostgresDAO: BalancePostgresDAO) + balancePostgresDAO: BalancePostgresDAO) extends DatabaseBlockingSeeder with AnormPostgresDataHandler { private val logger = LoggerFactory.getLogger(this.getClass) - override def firstBlock(command: CreateBlockCommand): ApplicationResult[Unit] = database.withTransaction { implicit conn => + override def firstBlock(command: CreateBlockCommand): ApplicationResult[Unit] = withTransaction { implicit conn => val result = upsertBlockCascade(command) result @@ -78,28 +78,17 @@ class DatabasePostgresSeeder @Inject() ( private def upsertBlockCascade(command: CreateBlockCommand)(implicit conn: Connection): Option[Unit] = { for { // block - _ <- blockPostgresDAO - .delete(command.block.hash) - .orElse { Some(command.block) } + _ <- deleteBlockCascade(command.block) + .orElse(Some(())) + _ <- blockPostgresDAO.insert(command.block) // transactions _ <- command.transactions.map(tx => transactionPostgresDAO.upsert(tx)).everything // balances - _ <- spendMap(command.transactions) - .map { case (address, value) => - val balance = Balance(address, spent = value) - addressPostgresDAO.upsert(balance) - } - .toList - .everything - - _ <- receiveMap(command.transactions) - .map { case (address, value) => - val balance = Balance(address, received = value) - addressPostgresDAO.upsert(balance) - } + _ <- balances(command.transactions) + .map { b => balancePostgresDAO.upsert(b) } .toList .everything } yield () @@ -114,19 +103,9 @@ class DatabasePostgresSeeder @Inject() ( deletedTransactions = transactionPostgresDAO.deleteBy(block.hash) // balances - _ <- spendMap(deletedTransactions) - .map { case (address, value) => - val balance = Balance(address, spent = -value) - addressPostgresDAO.upsert(balance) - } - .toList - .everything - - _ <- receiveMap(deletedTransactions) - .map { case (address, value) => - val balance = Balance(address, received = -value) - addressPostgresDAO.upsert(balance) - } + _ <- balances(deletedTransactions) + .map { b => b.copy(spent = -b.spent, received = -b.received) } + .map { b => balancePostgresDAO.upsert(b) } .toList .everything } yield () @@ -158,4 +137,23 @@ class DatabasePostgresSeeder @Inject() ( .groupBy(_._1) .mapValues { list => list.map(_._2).sum } } + + private def balances(transactions: List[Transaction]) = { + val spentList = spendMap(transactions).map { case (address, spent) => + Balance(address, spent = spent) + } + + val receiveList = receiveMap(transactions).map { case (address, received) => + Balance(address, received = received) + } + + (spentList ++ receiveList) + .groupBy(_.address) + .mapValues { _.reduce(mergeBalances) } + .values + } + + def mergeBalances(a: Balance, b: Balance): Balance = { + Balance(a.address, spent = a.spent + b.spent, received = a.received + b.received) + } } diff --git a/server/app/com/xsn/explorer/data/anorm/dao/BalancePostgresDAO.scala b/server/app/com/xsn/explorer/data/anorm/dao/BalancePostgresDAO.scala index c06c888..9d1ce51 100644 --- a/server/app/com/xsn/explorer/data/anorm/dao/BalancePostgresDAO.scala +++ b/server/app/com/xsn/explorer/data/anorm/dao/BalancePostgresDAO.scala @@ -7,16 +7,23 @@ import anorm._ import com.alexitc.playsonify.models.{Count, FieldOrdering, PaginatedQuery} import com.xsn.explorer.data.anorm.interpreters.FieldOrderingSQLInterpreter import com.xsn.explorer.data.anorm.parsers.BalanceParsers._ -import com.xsn.explorer.models.{Address, Balance} import com.xsn.explorer.models.fields.BalanceField +import com.xsn.explorer.models.{Address, Balance} import org.slf4j.LoggerFactory class BalancePostgresDAO @Inject() (fieldOrderingSQLInterpreter: FieldOrderingSQLInterpreter) { private val logger = LoggerFactory.getLogger(this.getClass) - def upsert(balance: Balance)(implicit conn: Connection): Option[Balance] = { - val createdBalance = SQL( + /** + * create or update the balance for an address + * + * NOTE: ensure the connection has an open transaction, this is required + * until the debug log is removed. + */ + def upsert(partial: Balance)(implicit conn: Connection): Option[Balance] = { + val computedMaybe = computeBalance(partial.address) + val updatedBalance = SQL( """ |INSERT INTO balances | (address, received, spent) @@ -28,19 +35,19 @@ class BalancePostgresDAO @Inject() (fieldOrderingSQLInterpreter: FieldOrderingSQ |RETURNING address, received, spent """.stripMargin ).on( - 'address -> balance.address.string, - 'received -> balance.received, - 'spent -> balance.spent, + 'address -> partial.address.string, + 'received -> partial.received, + 'spent -> partial.spent, ).as(parseBalance.singleOpt).flatten for { - balance <- createdBalance - computed <- computeBalance(balance.address) if computed != balance + balance <- updatedBalance + computed <- computedMaybe if computed != balance } { logger.warn(s"CORRUPTED_BALANCE, expected spent = ${computed.spent}, actual = ${balance.spent}, expected received = ${computed.received}, actual = ${balance.received}") } - createdBalance + updatedBalance } private def computeBalance(address: Address)(implicit conn: Connection) = {