Browse Source

server: Minor updates to the seeding process

scalafmt-draft
Alexis Hernandez 7 years ago
parent
commit
5ff45a0921
  1. 2
      server/app/com/xsn/explorer/data/anorm/BalancePostgresDataHandler.scala
  2. 60
      server/app/com/xsn/explorer/data/anorm/DatabasePostgresSeeder.scala
  3. 25
      server/app/com/xsn/explorer/data/anorm/dao/BalancePostgresDAO.scala

2
server/app/com/xsn/explorer/data/anorm/BalancePostgresDataHandler.scala

@ -18,7 +18,7 @@ class BalancePostgresDataHandler @Inject() (
extends BalanceBlockingDataHandler
with AnormPostgresDataHandler {
override def upsert(balance: Balance): ApplicationResult[Balance] = withConnection { implicit conn =>
override def upsert(balance: Balance): ApplicationResult[Balance] = withTransaction { implicit conn =>
val maybe = balancePostgresDAO.upsert(balance)
Or.from(maybe, One(BalanceUnknownError))

60
server/app/com/xsn/explorer/data/anorm/DatabasePostgresSeeder.scala

@ -18,13 +18,13 @@ class DatabasePostgresSeeder @Inject() (
override val database: Database,
blockPostgresDAO: BlockPostgresDAO,
transactionPostgresDAO: TransactionPostgresDAO,
addressPostgresDAO: BalancePostgresDAO)
balancePostgresDAO: BalancePostgresDAO)
extends DatabaseBlockingSeeder
with AnormPostgresDataHandler {
private val logger = LoggerFactory.getLogger(this.getClass)
override def firstBlock(command: CreateBlockCommand): ApplicationResult[Unit] = database.withTransaction { implicit conn =>
override def firstBlock(command: CreateBlockCommand): ApplicationResult[Unit] = withTransaction { implicit conn =>
val result = upsertBlockCascade(command)
result
@ -78,28 +78,17 @@ class DatabasePostgresSeeder @Inject() (
private def upsertBlockCascade(command: CreateBlockCommand)(implicit conn: Connection): Option[Unit] = {
for {
// block
_ <- blockPostgresDAO
.delete(command.block.hash)
.orElse { Some(command.block) }
_ <- deleteBlockCascade(command.block)
.orElse(Some(()))
_ <- blockPostgresDAO.insert(command.block)
// transactions
_ <- command.transactions.map(tx => transactionPostgresDAO.upsert(tx)).everything
// balances
_ <- spendMap(command.transactions)
.map { case (address, value) =>
val balance = Balance(address, spent = value)
addressPostgresDAO.upsert(balance)
}
.toList
.everything
_ <- receiveMap(command.transactions)
.map { case (address, value) =>
val balance = Balance(address, received = value)
addressPostgresDAO.upsert(balance)
}
_ <- balances(command.transactions)
.map { b => balancePostgresDAO.upsert(b) }
.toList
.everything
} yield ()
@ -114,19 +103,9 @@ class DatabasePostgresSeeder @Inject() (
deletedTransactions = transactionPostgresDAO.deleteBy(block.hash)
// balances
_ <- spendMap(deletedTransactions)
.map { case (address, value) =>
val balance = Balance(address, spent = -value)
addressPostgresDAO.upsert(balance)
}
.toList
.everything
_ <- receiveMap(deletedTransactions)
.map { case (address, value) =>
val balance = Balance(address, received = -value)
addressPostgresDAO.upsert(balance)
}
_ <- balances(deletedTransactions)
.map { b => b.copy(spent = -b.spent, received = -b.received) }
.map { b => balancePostgresDAO.upsert(b) }
.toList
.everything
} yield ()
@ -158,4 +137,23 @@ class DatabasePostgresSeeder @Inject() (
.groupBy(_._1)
.mapValues { list => list.map(_._2).sum }
}
private def balances(transactions: List[Transaction]) = {
val spentList = spendMap(transactions).map { case (address, spent) =>
Balance(address, spent = spent)
}
val receiveList = receiveMap(transactions).map { case (address, received) =>
Balance(address, received = received)
}
(spentList ++ receiveList)
.groupBy(_.address)
.mapValues { _.reduce(mergeBalances) }
.values
}
def mergeBalances(a: Balance, b: Balance): Balance = {
Balance(a.address, spent = a.spent + b.spent, received = a.received + b.received)
}
}

25
server/app/com/xsn/explorer/data/anorm/dao/BalancePostgresDAO.scala

@ -7,16 +7,23 @@ import anorm._
import com.alexitc.playsonify.models.{Count, FieldOrdering, PaginatedQuery}
import com.xsn.explorer.data.anorm.interpreters.FieldOrderingSQLInterpreter
import com.xsn.explorer.data.anorm.parsers.BalanceParsers._
import com.xsn.explorer.models.{Address, Balance}
import com.xsn.explorer.models.fields.BalanceField
import com.xsn.explorer.models.{Address, Balance}
import org.slf4j.LoggerFactory
class BalancePostgresDAO @Inject() (fieldOrderingSQLInterpreter: FieldOrderingSQLInterpreter) {
private val logger = LoggerFactory.getLogger(this.getClass)
def upsert(balance: Balance)(implicit conn: Connection): Option[Balance] = {
val createdBalance = SQL(
/**
* create or update the balance for an address
*
* NOTE: ensure the connection has an open transaction, this is required
* until the debug log is removed.
*/
def upsert(partial: Balance)(implicit conn: Connection): Option[Balance] = {
val computedMaybe = computeBalance(partial.address)
val updatedBalance = SQL(
"""
|INSERT INTO balances
| (address, received, spent)
@ -28,19 +35,19 @@ class BalancePostgresDAO @Inject() (fieldOrderingSQLInterpreter: FieldOrderingSQ
|RETURNING address, received, spent
""".stripMargin
).on(
'address -> balance.address.string,
'received -> balance.received,
'spent -> balance.spent,
'address -> partial.address.string,
'received -> partial.received,
'spent -> partial.spent,
).as(parseBalance.singleOpt).flatten
for {
balance <- createdBalance
computed <- computeBalance(balance.address) if computed != balance
balance <- updatedBalance
computed <- computedMaybe if computed != balance
} {
logger.warn(s"CORRUPTED_BALANCE, expected spent = ${computed.spent}, actual = ${balance.spent}, expected received = ${computed.received}, actual = ${balance.received}")
}
createdBalance
updatedBalance
}
private def computeBalance(address: Address)(implicit conn: Connection) = {

Loading…
Cancel
Save