Browse Source

server: Update the BlockEventsProcessor

Now it stores and rolls back the transactions and
balances related to the block event.
scalafmt-draft
Alexis Hernandez 7 years ago
parent
commit
439f9bccc8
  1. 139
      server/app/com/xsn/explorer/data/anorm/DatabasePostgresSeeder.scala
  2. 22
      server/app/com/xsn/explorer/models/Transaction.scala
  3. 5
      server/app/com/xsn/explorer/models/TransactionDetails.scala
  4. 90
      server/app/com/xsn/explorer/processors/BlockEventsProcessor.scala
  5. 10
      server/app/com/xsn/explorer/util/Extensions.scala
  6. 13
      server/test/com/xsn/explorer/helpers/FileBasedXSNService.scala
  7. 11
      server/test/com/xsn/explorer/helpers/TransactionLoader.scala
  8. 67
      server/test/com/xsn/explorer/processors/BlockEventsProcessorSpec.scala
  9. 19
      server/test/resources/blocks/00000766115b26ecbc09cd3a3db6870fdaf2f049d65a910eb2f2b48b566ca7bd
  10. 34
      server/test/resources/transactions/1e591eae200f719344fc5df0c4286e3fb191fb8a645bdf054f9b36a856fce41e

139
server/app/com/xsn/explorer/data/anorm/DatabasePostgresSeeder.scala

@ -1,21 +1,27 @@
package com.xsn.explorer.data.anorm
import java.sql.Connection
import javax.inject.Inject
import com.alexitc.playsonify.core.ApplicationResult
import com.xsn.explorer.data.anorm.dao.BlockPostgresDAO
import com.xsn.explorer.models.Blockhash
import com.xsn.explorer.data.anorm.dao.{BalancePostgresDAO, BlockPostgresDAO, TransactionPostgresDAO}
import com.xsn.explorer.models.rpc.Block
import com.xsn.explorer.models.{Address, Balance, Transaction}
import com.xsn.explorer.util.Extensions.ListOptionExt
import org.scalactic.Good
import play.api.db.Database
class DatabasePostgresSeeder @Inject() (
override val database: Database,
blockPostgresDAO: BlockPostgresDAO)
blockPostgresDAO: BlockPostgresDAO,
transactionPostgresDAO: TransactionPostgresDAO,
addressPostgresDAO: BalancePostgresDAO)
extends AnormPostgresDataHandler {
def firstBlock(block: Block): ApplicationResult[Unit] = database.withConnection { implicit conn =>
val result = blockPostgresDAO.upsert(block)
import DatabasePostgresSeeder._
def firstBlock(command: CreateBlockCommand): ApplicationResult[Unit] = database.withTransaction { implicit conn =>
val result = upsertBlockCascade(command)
result
.map(_ => Good(()))
@ -25,38 +31,125 @@ class DatabasePostgresSeeder @Inject() (
/**
* Creates the new latest block assuming there is a previous block.
*
* @param newBlock
* @param command
* @return
*/
def newLatestBlock(newBlock: Block): ApplicationResult[Unit] = withTransaction { implicit conn =>
val insertedBlock = for {
_ <- blockPostgresDAO.upsert(newBlock)
def newLatestBlock(command: CreateBlockCommand): ApplicationResult[Unit] = withTransaction { implicit conn =>
val result = for {
// link previous block
previousBlockhash <- command.block.previousBlockhash
previous <- blockPostgresDAO.getBy(previousBlockhash)
newPrevious = previous.copy(nextBlockhash = Some(command.block.hash))
_ <- blockPostgresDAO.upsert(newPrevious)
_ <- upsertBlockCascade(command)
} yield ()
val result = insertedBlock
.flatMap(_ => newBlock.previousBlockhash)
.flatMap { previousBlockhash =>
for {
previous <- blockPostgresDAO.getBy(previousBlockhash)
newPrevious = previous.copy(nextBlockhash = Some(newBlock.hash))
_ <- blockPostgresDAO.upsert(newPrevious)
} yield ()
}
result
.map(Good(_))
.getOrElse(throw new RuntimeException("Unable to add the new latest block"))
}
def replaceLatestBlock(newBlock: Block, orphan: Blockhash): ApplicationResult[Unit] = withTransaction { implicit conn =>
def replaceLatestBlock(command: ReplaceBlockCommand): ApplicationResult[Unit] = withTransaction { implicit conn =>
val deleteCommand = DeleteBlockCommand(command.orphanBlock, command.orphanTransactions)
val createCommand = CreateBlockCommand(command.newBlock, command.newTransactions)
val result = for {
_ <- blockPostgresDAO.upsert(newBlock)
_ <- blockPostgresDAO.delete(orphan)
_ <- deleteBlockCascade(deleteCommand)
_ <- upsertBlockCascade(createCommand)
} yield ()
result
.map(Good(_))
.getOrElse(throw new RuntimeException("Unable to replace latest block"))
}
private def upsertBlockCascade(command: CreateBlockCommand)(implicit conn: Connection): Option[Unit] = {
for {
// block
_ <- blockPostgresDAO.upsert(command.block)
// transactions
_ <- command.transactions.map(tx => transactionPostgresDAO.upsert(tx)).everything
// balances
_ <- spendMap(command.transactions)
.map { case (address, value) =>
val balance = Balance(address, spent = value)
addressPostgresDAO.upsert(balance)
}
.toList
.everything
_ <- receiveMap(command.transactions)
.map { case (address, value) =>
val balance = Balance(address, received = value)
addressPostgresDAO.upsert(balance)
}
.toList
.everything
} yield ()
}
private def deleteBlockCascade(command: DeleteBlockCommand)(implicit conn: Connection): Option[Unit] = {
for {
// block
_ <- blockPostgresDAO.delete(command.block.hash)
// transactions
_ = command.transactions.foreach(tx => transactionPostgresDAO.delete(tx.id))
// balances
_ <- spendMap(command.transactions)
.map { case (address, value) =>
val balance = Balance(address, spent = -value)
addressPostgresDAO.upsert(balance)
}
.toList
.everything
_ <- receiveMap(command.transactions)
.map { case (address, value) =>
val balance = Balance(address, received = -value)
addressPostgresDAO.upsert(balance)
}
.toList
.everything
} yield ()
}
private def spendMap(transactions: List[Transaction]): Map[Address, BigDecimal] = {
transactions
.map(_.inputs)
.flatMap { inputs =>
inputs.flatMap { input =>
for {
address <- input.address
value <- input.value
} yield address -> value
}
}
.groupBy(_._1)
.mapValues { list => list.map(_._2).sum }
}
private def receiveMap(transactions: List[Transaction]): Map[Address, BigDecimal] = {
transactions
.map(_.outputs)
.flatMap { outputs =>
outputs.map { output =>
output.address -> output.value
}
}
.groupBy(_._1)
.mapValues { list => list.map(_._2).sum }
}
}
object DatabasePostgresSeeder {
case class CreateBlockCommand(block: Block, transactions: List[Transaction])
case class DeleteBlockCommand(block: Block, transactions: List[Transaction])
case class ReplaceBlockCommand(
orphanBlock: Block, orphanTransactions: List[Transaction],
newBlock: Block, newTransactions: List[Transaction])
}

22
server/app/com/xsn/explorer/models/Transaction.scala

@ -21,4 +21,26 @@ object Transaction {
address: Address,
tposOwnerAddress: Option[Address],
tposMerchantAddress: Option[Address])
def fromRPC(tx: rpc.Transaction): Transaction = {
val inputs = tx.vin.zipWithIndex.map { case (vin, index) =>
Transaction.Input(index, vin.value, vin.address)
}
val outputs = tx.vout.flatMap { vout =>
val tposAddresses = vout.scriptPubKey.flatMap(_.getTPoSAddresses)
for {
address <- vout.address
} yield Transaction.Output(vout.n, vout.value, address, tposAddresses.map(_._1), tposAddresses.map(_._2))
}
Transaction(
id = tx.id,
blockhash = tx.blockhash,
time = tx.time,
size = tx.size,
inputs = inputs,
outputs = outputs
)
}
}

5
server/app/com/xsn/explorer/models/TransactionDetails.scala

@ -1,6 +1,5 @@
package com.xsn.explorer.models
import com.xsn.explorer.models.rpc.Transaction
import play.api.libs.json.{Json, Writes}
case class TransactionDetails(
@ -22,13 +21,13 @@ case class TransactionDetails(
object TransactionDetails {
def from(tx: Transaction, input: List[TransactionValue]): TransactionDetails = {
def from(tx: rpc.Transaction, input: List[TransactionValue]): TransactionDetails = {
TransactionDetails
.from(tx)
.copy(input = input)
}
def from(tx: Transaction): TransactionDetails = {
def from(tx: rpc.Transaction): TransactionDetails = {
val output = tx.vout.flatMap(TransactionValue.from)
TransactionDetails(tx.id, tx.size, tx.blockhash, tx.time, tx.blocktime, tx.confirmations, List.empty, output)

90
server/app/com/xsn/explorer/processors/BlockEventsProcessor.scala

@ -8,11 +8,14 @@ import com.xsn.explorer.data.BlockBlockingDataHandler
import com.xsn.explorer.data.anorm.DatabasePostgresSeeder
import com.xsn.explorer.models.Blockhash
import com.xsn.explorer.models.rpc.Block
import com.xsn.explorer.models.Transaction
import com.xsn.explorer.services.XSNService
import com.xsn.explorer.util.Extensions.FutureApplicationResultExt
import org.scalactic.Good
import org.slf4j.LoggerFactory
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
/**
* Process events related to blocks coming from the RPC server.
@ -39,38 +42,79 @@ class BlockEventsProcessor @Inject() (
* @param blockhash the new latest block
*/
def newLatestBlock(blockhash: Blockhash): FutureApplicationResult[Unit] = {
xsnService
.getBlock(blockhash)
.toFutureOr
.mapWithError { block =>
scala.concurrent.blocking {
newLatestBlock(block)
}
}
.toFuture
val result = for {
block <- xsnService.getBlock(blockhash).toFutureOr
rpcTransactions <- block.transactions.map(xsnService.getTransaction).toFutureOr
transactions = rpcTransactions.map(Transaction.fromRPC)
r <- newLatestBlock(block, transactions).toFutureOr
} yield r
result.toFuture
}
private def newLatestBlock(newBlock: Block): ApplicationResult[Unit] = {
val latestBlockResult = blockBlockingDataHandler.getLatestBlock()
private def newLatestBlock(newBlock: Block, newTransactions: List[Transaction]): FutureApplicationResult[Unit] = {
def onRechain(orphanBlock: Block): FutureApplicationResult[Unit] = {
val result = for {
orphanRPCTransactions <- orphanBlock.transactions.map(xsnService.getTransaction).toFutureOr
orphanTransactions = orphanRPCTransactions.map(Transaction.fromRPC)
} yield {
val command = DatabasePostgresSeeder.ReplaceBlockCommand(
orphanBlock = orphanBlock,
orphanTransactions = orphanTransactions,
newBlock = newBlock,
newTransactions = newTransactions)
scala.concurrent.blocking {
databasePostgresSeeder.replaceLatestBlock(command)
}
}
result
.mapWithError(identity)
.toFuture
}
def onFirstBlock: FutureApplicationResult[Unit] = {
logger.info(s"first block = ${newBlock.hash.string}")
val command = DatabasePostgresSeeder.CreateBlockCommand(newBlock, newTransactions)
def unsafe: ApplicationResult[Unit] = scala.concurrent.blocking {
databasePostgresSeeder.firstBlock(command)
}
Future(unsafe)
}
def onNewBlock(latestBlock: Block): FutureApplicationResult[Unit] = {
logger.info(s"existing latest block = ${latestBlock.hash.string} -> new latest block = ${newBlock.hash.string}")
val command = DatabasePostgresSeeder.CreateBlockCommand(newBlock, newTransactions)
def unsafe = scala.concurrent.blocking {
databasePostgresSeeder.newLatestBlock(command)
}
Future(unsafe)
}
val latestBlockResult = scala.concurrent.blocking {
blockBlockingDataHandler.getLatestBlock()
}
latestBlockResult
.map { latestBlock =>
if (newBlock.previousBlockhash.contains(latestBlock.hash)) {
// latest block -> new block
logger.info(s"existing latest block = ${latestBlock.hash.string} -> new latest block = ${newBlock.hash.string}")
databasePostgresSeeder.newLatestBlock(newBlock)
} else if (newBlock.hash == latestBlock.hash) {
if (newBlock.hash == latestBlock.hash) {
// duplicated msg
logger.info(s"ignoring duplicated latest block = ${newBlock.hash.string}")
Good(())
Future.successful(Good(()))
} else if (newBlock.previousBlockhash.contains(latestBlock.hash)) {
// latest block -> new block
onNewBlock(latestBlock)
} else {
logger.info(s"orphan block = ${latestBlock.hash.string}, new latest block = ${newBlock.hash.string}")
databasePostgresSeeder.replaceLatestBlock(newBlock, latestBlock.hash)
logger.info(s"rechain, orphan block = ${latestBlock.hash.string}, new latest block = ${newBlock.hash.string}")
onRechain(latestBlock)
}
}
.getOrElse {
logger.info(s"first block = ${newBlock.hash.string}")
databasePostgresSeeder.firstBlock(newBlock)
}
.getOrElse(onFirstBlock)
}
}

10
server/app/com/xsn/explorer/util/Extensions.scala

@ -36,4 +36,14 @@ object Extensions {
new FutureOr(future)
}
}
implicit class ListOptionExt[+A](val inner: List[Option[A]]) extends AnyVal {
def everything: Option[List[A]] = {
if (inner.forall(_.isDefined)) {
Some(inner.flatten)
} else {
None
}
}
}
}

13
server/test/com/xsn/explorer/helpers/FileBasedXSNService.scala

@ -1,9 +1,9 @@
package com.xsn.explorer.helpers
import com.alexitc.playsonify.core.FutureApplicationResult
import com.xsn.explorer.errors.BlockNotFoundError
import com.xsn.explorer.models.Blockhash
import com.xsn.explorer.models.rpc.Block
import com.xsn.explorer.errors.{BlockNotFoundError, TransactionNotFoundError}
import com.xsn.explorer.models.{Blockhash, TransactionId}
import com.xsn.explorer.models.rpc.{Block, Transaction}
import org.scalactic.{Good, One, Or}
import scala.concurrent.Future
@ -11,6 +11,7 @@ import scala.concurrent.Future
class FileBasedXSNService extends DummyXSNService {
private lazy val blockMap = BlockLoader.all().map { block => block.hash -> block }.toMap
private lazy val transactionMap = TransactionLoader.all().map { tx => tx.id -> tx }.toMap
override def getBlock(blockhash: Blockhash): FutureApplicationResult[Block] = {
val maybe = blockMap.get(blockhash)
@ -22,4 +23,10 @@ class FileBasedXSNService extends DummyXSNService {
val block = blockMap.values.maxBy(_.height.int)
Future.successful(Good(block))
}
override def getTransaction(txid: TransactionId): FutureApplicationResult[Transaction] = {
val maybe = transactionMap.get(txid)
val result = Or.from(maybe, One(TransactionNotFoundError))
Future.successful(result)
}
}

11
server/test/com/xsn/explorer/helpers/TransactionLoader.scala

@ -1,5 +1,7 @@
package com.xsn.explorer.helpers
import java.io.File
import com.xsn.explorer.models.rpc.Transaction
import play.api.libs.json.{JsValue, Json}
@ -20,4 +22,13 @@ object TransactionLoader {
case _ => throw new RuntimeException(s"Transaction $txid not found")
}
}
def all(): List[Transaction] = {
val uri = getClass.getResource(s"/$BasePath")
new File(uri.getPath)
.listFiles()
.toList
.map(_.getName)
.map(get)
}
}

67
server/test/com/xsn/explorer/processors/BlockEventsProcessorSpec.scala

@ -1,81 +1,71 @@
package com.xsn.explorer.processors
import com.alexitc.playsonify.core.FutureApplicationResult
import com.xsn.explorer.data.anorm.dao.BlockPostgresDAO
import com.xsn.explorer.data.anorm.dao.{BalancePostgresDAO, BlockPostgresDAO, TransactionPostgresDAO}
import com.xsn.explorer.data.anorm.{BlockPostgresDataHandler, DatabasePostgresSeeder}
import com.xsn.explorer.data.common.PostgresDataHandlerSpec
import com.xsn.explorer.errors.BlockNotFoundError
import com.xsn.explorer.helpers.{BlockLoader, FileBasedXSNService}
import com.xsn.explorer.models.rpc.Block
import org.scalactic.{Bad, Good}
import org.scalatest.BeforeAndAfter
import org.scalatest.concurrent.ScalaFutures
import scala.concurrent.Future
class BlockEventsProcessorSpec extends PostgresDataHandlerSpec with ScalaFutures with BeforeAndAfter {
lazy val dataHandler = new BlockPostgresDataHandler(database, new BlockPostgresDAO)
lazy val dataSeeder = new DatabasePostgresSeeder(database, new BlockPostgresDAO)
lazy val dataSeeder = new DatabasePostgresSeeder(
database,
new BlockPostgresDAO,
new TransactionPostgresDAO,
new BalancePostgresDAO)
lazy val xsnService = new FileBasedXSNService
lazy val processor = new BlockEventsProcessor(xsnService, dataSeeder, dataHandler)
before {
clearDatabase()
}
"newLatestBlock" should {
"process first block" in {
"fail on genesis block due to the missing transaction" in {
// see https://github.com/X9Developers/XSN/issues/32
val block0 = BlockLoader.get("00000c822abdbb23e28f79a49d29b41429737c6c7e15df40d1b1f1b35907ae34")
val xsnService = new FileBasedXSNService {
override def getLatestBlock(): FutureApplicationResult[Block] = {
Future.successful(Bad(BlockNotFoundError).accumulating)
}
whenReady(processor.newLatestBlock(block0.hash)) { result =>
result.isBad mustEqual true
}
}
val processor = new BlockEventsProcessor(xsnService, dataSeeder, dataHandler)
whenReady(processor.newLatestBlock(block0.hash)) { result =>
"process first block" in {
val block1 = BlockLoader.get("000003fb382f6892ae96594b81aa916a8923c70701de4e7054aac556c7271ef7")
whenReady(processor.newLatestBlock(block1.hash)) { result =>
result.isGood mustEqual true
}
}
"process a new block" in {
val block0 = BlockLoader.get("00000c822abdbb23e28f79a49d29b41429737c6c7e15df40d1b1f1b35907ae34")
val block1 = BlockLoader.get("000003fb382f6892ae96594b81aa916a8923c70701de4e7054aac556c7271ef7")
val block2 = BlockLoader.get("000004645e2717b556682e3c642a4c6e473bf25c653ff8e8c114a3006040ffb8")
val block3 = BlockLoader.get("00000766115b26ecbc09cd3a3db6870fdaf2f049d65a910eb2f2b48b566ca7bd")
List(block0, block1).foreach(dataHandler.upsert)
val xsnService = new FileBasedXSNService {
override def getLatestBlock(): FutureApplicationResult[Block] = {
val latest = block1.copy(nextBlockhash = None)
Future.successful(Good(latest).accumulating)
}
}
List(block1, block2).map(dataHandler.upsert).foreach(_.isGood mustEqual true)
val processor = new BlockEventsProcessor(xsnService, dataSeeder, dataHandler)
whenReady(processor.newLatestBlock(block2.hash)) { result =>
whenReady(processor.newLatestBlock(block3.hash)) { result =>
result.isGood mustEqual true
val blocks = List(block0, block1, block2)
val blocks = List(block1, block2, block3)
verifyBlockchain(blocks)
}
}
"process a rechain" in {
val block0 = BlockLoader.get("00000c822abdbb23e28f79a49d29b41429737c6c7e15df40d1b1f1b35907ae34")
val block1 = BlockLoader.get("000003fb382f6892ae96594b81aa916a8923c70701de4e7054aac556c7271ef7")
val block2 = BlockLoader.get("000004645e2717b556682e3c642a4c6e473bf25c653ff8e8c114a3006040ffb8")
val block3 = BlockLoader.get("00000766115b26ecbc09cd3a3db6870fdaf2f049d65a910eb2f2b48b566ca7bd")
List(block0, block1, block2).foreach(dataHandler.upsert)
val xsnService = new FileBasedXSNService {
override def getLatestBlock(): FutureApplicationResult[Block] = {
val latest = block2.copy(nextBlockhash = None)
Future.successful(Good(latest).accumulating)
}
}
List(block1, block2, block3).map(dataHandler.upsert).foreach(_.isGood mustEqual true)
val processor = new BlockEventsProcessor(xsnService, dataSeeder, dataHandler)
whenReady(processor.newLatestBlock(block1.hash)) { result =>
whenReady(processor.newLatestBlock(block2.hash)) { result =>
result.isGood mustEqual true
val blocks = List(block0, block1)
val blocks = List(block1, block2)
verifyBlockchain(blocks)
}
}
@ -89,9 +79,14 @@ class BlockEventsProcessorSpec extends PostgresDataHandlerSpec with ScalaFutures
dbBlock.nextBlockhash mustEqual block.nextBlockhash
}
}
private def clearDatabase() = {
database.withConnection { implicit conn =>
_root_.anorm.SQL("""DELETE FROM transaction_outputs""").execute()
_root_.anorm.SQL("""DELETE FROM transaction_inputs""").execute()
_root_.anorm.SQL("""DELETE FROM transactions""").execute()
_root_.anorm.SQL("""DELETE FROM blocks""").execute()
_root_.anorm.SQL("""DELETE FROM balances""").execute()
}
}

19
server/test/resources/blocks/00000766115b26ecbc09cd3a3db6870fdaf2f049d65a910eb2f2b48b566ca7bd

@ -0,0 +1,19 @@
{
"hash": "00000766115b26ecbc09cd3a3db6870fdaf2f049d65a910eb2f2b48b566ca7bd",
"confirmations": 46851,
"size": 179,
"height": 3,
"version": 536870912,
"merkleroot": "1e591eae200f719344fc5df0c4286e3fb191fb8a645bdf054f9b36a856fce41e",
"tx": [
"1e591eae200f719344fc5df0c4286e3fb191fb8a645bdf054f9b36a856fce41e"
],
"time": 1520276303,
"mediantime": 1520276299,
"nonce": 139029,
"bits": "1e0ffff0",
"difficulty": 0.000244140625,
"chainwork": "0000000000000000000000000000000000000000000000000000000000400040",
"previousblockhash": "000004645e2717b556682e3c642a4c6e473bf25c653ff8e8c114a3006040ffb8",
"nextblockhash": "00000b59875e80b0afc6c657bc5318d39e03532b7d97fb78a4c7bd55c4840c32"
}

34
server/test/resources/transactions/1e591eae200f719344fc5df0c4286e3fb191fb8a645bdf054f9b36a856fce41e

@ -0,0 +1,34 @@
{
"hex": "01000000010000000000000000000000000000000000000000000000000000000000000000ffffffff03530101ffffffff010000000000000000232103e8c52f2c5155771492907095753a43ce776e1fa7c5e769a67a9f3db4467ec029ac00000000",
"txid": "1e591eae200f719344fc5df0c4286e3fb191fb8a645bdf054f9b36a856fce41e",
"size": 98,
"version": 1,
"locktime": 0,
"vin": [
{
"coinbase": "530101",
"sequence": 4294967295
}
],
"vout": [
{
"value": 0.00000000,
"valueSat": 0,
"n": 0,
"scriptPubKey": {
"asm": "03e8c52f2c5155771492907095753a43ce776e1fa7c5e769a67a9f3db4467ec029 OP_CHECKSIG",
"hex": "2103e8c52f2c5155771492907095753a43ce776e1fa7c5e769a67a9f3db4467ec029ac",
"reqSigs": 1,
"type": "pubkey",
"addresses": [
"XdJnCKYNwzCz8ATv8Eu75gonaHyfr9qXg9"
]
}
}
],
"blockhash": "00000766115b26ecbc09cd3a3db6870fdaf2f049d65a910eb2f2b48b566ca7bd",
"height": 3,
"confirmations": 46851,
"time": 1520276303,
"blocktime": 1520276303
}
Loading…
Cancel
Save