Browse Source

server: Synchronize the TPoS contracts

- A TPoS contract is created when the transaction is synchronized.
- A TPoS contract is deleted when the transaction is rolled back.
- A TPoS contract is closed when the collateral output is spent.
- A TPoS contract is enabled when the collateral output gets unspent.
master
Alexis Hernandez 6 years ago
parent
commit
887af0ff56
  1. 3
      server/app/com/xsn/explorer/data/LedgerDataHandler.scala
  2. 11
      server/app/com/xsn/explorer/data/anorm/LedgerPostgresDataHandler.scala
  3. 27
      server/app/com/xsn/explorer/data/anorm/dao/TransactionPostgresDAO.scala
  4. 5
      server/app/com/xsn/explorer/data/async/LedgerFutureDataHandler.scala
  5. 86
      server/app/com/xsn/explorer/services/LedgerSynchronizerService.scala
  6. 17
      server/app/com/xsn/explorer/services/TransactionRPCService.scala
  7. 14
      server/test/com/xsn/explorer/data/LedgerPostgresDataHandlerSpec.scala
  8. 4
      server/test/com/xsn/explorer/data/TransactionPostgresDataHandlerSpec.scala
  9. 26
      server/test/com/xsn/explorer/helpers/BlockLoader.scala
  10. 2
      server/test/com/xsn/explorer/helpers/DataHandlerObjects.scala

3
server/app/com/xsn/explorer/data/LedgerDataHandler.scala

@ -1,6 +1,7 @@
package com.xsn.explorer.data
import com.alexitc.playsonify.core.ApplicationResult
import com.xsn.explorer.models.TPoSContract
import com.xsn.explorer.models.persisted.Block
import scala.language.higherKinds
@ -16,7 +17,7 @@ trait LedgerDataHandler[F[_]] {
* - The ledger is empty and the block is the genesis one.
* - The ledger has some blocks and the block goes just after the latest one.
*/
def push(block: Block.HasTransactions): F[Unit]
def push(block: Block.HasTransactions, tposContracts: List[TPoSContract]): F[Unit]
/**
* Remove the latest block from the ledger, it will succeed only if the ledger is not empty.

11
server/app/com/xsn/explorer/data/anorm/LedgerPostgresDataHandler.scala

@ -8,6 +8,7 @@ import com.xsn.explorer.data.LedgerBlockingDataHandler
import com.xsn.explorer.data.anorm.dao._
import com.xsn.explorer.errors.{PostgresForeignKeyViolationError, PreviousBlockMissingError, RepeatedBlockHeightError}
import com.xsn.explorer.gcs.{GolombCodedSet, GolombEncoding}
import com.xsn.explorer.models.TPoSContract
import com.xsn.explorer.models.persisted.{Balance, Block, Transaction}
import com.xsn.explorer.models.values.Address
import com.xsn.explorer.util.Extensions.ListOptionExt
@ -33,13 +34,14 @@ class LedgerPostgresDataHandler @Inject() (
* to have a next block, we remove the link because that block is not stored yet.
*/
override def push(
block: Block.HasTransactions): ApplicationResult[Unit] = {
block: Block.HasTransactions,
tposContracts: List[TPoSContract]): ApplicationResult[Unit] = {
// the filter is computed outside the transaction to avoid unnecessary locking
val filter = GolombEncoding.encode(block)
val result = withTransaction { implicit conn =>
val result = for {
_ <- upsertBlockCascade(block.asTip, filter)
_ <- upsertBlockCascade(block.asTip, filter, tposContracts)
} yield ()
result
@ -72,7 +74,8 @@ class LedgerPostgresDataHandler @Inject() (
private def upsertBlockCascade(
block: Block.HasTransactions,
filter: Option[GolombCodedSet])(
filter: Option[GolombCodedSet],
tposContracts: List[TPoSContract])(
implicit conn: Connection): Option[Unit] = {
val result = for {
@ -82,7 +85,7 @@ class LedgerPostgresDataHandler @Inject() (
_ = filter.foreach { f => blockFilterPostgresDAO.insert(block.hash, f) }
// batch insert
_ <- transactionPostgresDAO.insert(block.transactions)
_ <- transactionPostgresDAO.insert(block.transactions, tposContracts)
// balances
balanceList = balances(block.transactions)

27
server/app/com/xsn/explorer/data/anorm/dao/TransactionPostgresDAO.scala

@ -17,6 +17,7 @@ import org.slf4j.LoggerFactory
class TransactionPostgresDAO @Inject() (
transactionInputDAO: TransactionInputPostgresDAO,
transactionOutputDAO: TransactionOutputPostgresDAO,
tposContractDAO: TPoSContractDAO,
addressTransactionDetailsDAO: AddressTransactionDetailsPostgresDAO,
fieldOrderingSQLInterpreter: FieldOrderingSQLInterpreter) {
@ -31,11 +32,13 @@ class TransactionPostgresDAO @Inject() (
_ <- transactionOutputDAO.batchInsertOutputs(transaction.outputs)
_ <- transactionInputDAO.batchInsertInputs(transaction.inputs.map(transaction.id -> _))
_ <- transactionOutputDAO.batchSpend(transaction.id, transaction.inputs)
_ = closeContracts(List(transaction))
_ = transactionOutputDAO.batchSpend(transaction.id, transaction.inputs)
_ <- addressTransactionDetailsDAO.batchInsertDetails(transaction)
} yield Transaction.HasIO(partialTx, inputs = transaction.inputs, outputs = transaction.outputs)
}
def insert(transactions: List[Transaction.HasIO])(implicit conn: Connection): Option[List[Transaction]] = {
def insert(transactions: List[Transaction.HasIO], tposContracts: List[TPoSContract])(implicit conn: Connection): Option[List[Transaction]] = {
for {
r <- batchInsert(transactions.map(_.transaction))
@ -47,6 +50,8 @@ class TransactionPostgresDAO @Inject() (
} yield {
insertDetails(transactions)
spend(transactions)
closeContracts(transactions)
tposContracts.foreach { contract => tposContractDAO.create(contract) }
r
}
}
@ -63,6 +68,17 @@ class TransactionPostgresDAO @Inject() (
assert(spendResult.forall(_.isDefined), "Spending inputs batch failed")
}
private def closeContracts(transactions: List[Transaction.HasIO])(implicit conn: Connection): Unit = {
for {
tx <- transactions
// a contract requires 1 XSN
input <- tx.inputs if input.value == 1
} {
val id = TPoSContract.Id(input.fromTxid, input.fromOutputIndex)
tposContractDAO.close(id, tx.id)
}
}
private def batchInsert(transactions: List[Transaction])(implicit conn: Connection): Option[List[Transaction]] = {
transactions match {
case Nil => Some(transactions)
@ -112,9 +128,16 @@ class TransactionPostgresDAO @Inject() (
).as(parseTransaction.*)
val result = expectedTransactions.map { tx =>
val _ = (
tposContractDAO.deleteBy(tx.id),
addressTransactionDetailsDAO.deleteDetails(tx.id)
)
val inputs = transactionInputDAO.deleteInputs(tx.id)
val outputs = transactionOutputDAO.deleteOutputs(tx.id)
val _ = addressTransactionDetailsDAO.deleteDetails(tx.id)
inputs
.map { input => TPoSContract.Id(input.fromTxid, input.fromOutputIndex) }
.foreach(tposContractDAO.open(_))
Transaction.HasIO(tx, inputs = inputs, outputs = outputs)
}

5
server/app/com/xsn/explorer/data/async/LedgerFutureDataHandler.scala

@ -3,6 +3,7 @@ package com.xsn.explorer.data.async
import com.alexitc.playsonify.core.FutureApplicationResult
import com.xsn.explorer.data.{LedgerBlockingDataHandler, LedgerDataHandler}
import com.xsn.explorer.executors.DatabaseExecutionContext
import com.xsn.explorer.models.TPoSContract
import com.xsn.explorer.models.persisted.Block
import javax.inject.Inject
@ -13,8 +14,8 @@ class LedgerFutureDataHandler @Inject() (
implicit ec: DatabaseExecutionContext)
extends LedgerDataHandler[FutureApplicationResult] {
override def push(block: Block.HasTransactions): FutureApplicationResult[Unit] = Future {
blockingDataHandler.push(block)
override def push(block: Block.HasTransactions, tposContracts: List[TPoSContract]): FutureApplicationResult[Unit] = Future {
blockingDataHandler.push(block, tposContracts)
}
override def pop(): FutureApplicationResult[Block] = Future {

86
server/app/com/xsn/explorer/services/LedgerSynchronizerService.scala

@ -1,15 +1,16 @@
package com.xsn.explorer.services
import com.alexitc.playsonify.core.FutureApplicationResult
import com.alexitc.playsonify.core.FutureOr.Implicits.{FutureOps, OptionOps}
import com.alexitc.playsonify.core.{ApplicationResult, FutureApplicationResult}
import com.xsn.explorer.data.async.{BlockFutureDataHandler, LedgerFutureDataHandler}
import com.xsn.explorer.errors.BlockNotFoundError
import com.xsn.explorer.models.TPoSContract
import com.xsn.explorer.models.persisted.Block
import com.xsn.explorer.models.transformers._
import com.xsn.explorer.models.values.{Blockhash, Height}
import com.xsn.explorer.util.Extensions.FutureOrExt
import javax.inject.Inject
import org.scalactic.Good
import org.scalactic.{Bad, Good}
import org.slf4j.LoggerFactory
import scala.concurrent.{ExecutionContext, Future}
@ -23,6 +24,8 @@ class LedgerSynchronizerService @Inject() (
blockDataHandler: BlockFutureDataHandler)(
implicit ec: ExecutionContext) {
import LedgerSynchronizerService._
private val logger = LoggerFactory.getLogger(this.getClass)
/**
@ -34,14 +37,15 @@ class LedgerSynchronizerService @Inject() (
*/
def synchronize(blockhash: Blockhash): FutureApplicationResult[Unit] = {
val result = for {
block <- getRPCBlock(blockhash).toFutureOr
_ <- synchronize(block).toFutureOr
data <- getRPCBlock(blockhash).toFutureOr
_ <- synchronize(data).toFutureOr
} yield ()
result.toFuture
}
private def synchronize(block: Block.HasTransactions): FutureApplicationResult[Unit] = {
private def synchronize(data: BlockData): FutureApplicationResult[Unit] = {
val block = data._1
logger.info(s"Synchronize block ${block.height}, hash = ${block.hash}")
val result = for {
@ -52,8 +56,8 @@ class LedgerSynchronizerService @Inject() (
.recoverFrom(BlockNotFoundError)(None)
_ <- latestBlockMaybe
.map { latestBlock => onLatestBlock(latestBlock, block) }
.getOrElse { onEmptyLedger(block) }
.map { latestBlock => onLatestBlock(latestBlock, data) }
.getOrElse { onEmptyLedger(data) }
.toFutureOr
} yield ()
@ -65,15 +69,16 @@ class LedgerSynchronizerService @Inject() (
* 1.1. the given block is the genensis block, it is added.
* 1.2. the given block is not the genesis block, sync everything until the given block.
*/
private def onEmptyLedger(block: Block.HasTransactions): FutureApplicationResult[Unit] = {
private def onEmptyLedger(data: BlockData): FutureApplicationResult[Unit] = {
val block = data._1
if (block.height.int == 0) {
logger.info(s"Synchronize genesis block on empty ledger, hash = ${block.hash}")
ledgerDataHandler.push(block)
ledgerDataHandler.push(block, data._2)
} else {
logger.info(s"Synchronize block ${block.height} on empty ledger, hash = ${block.hash}")
val result = for {
_ <- sync(0 until block.height.int).toFutureOr
_ <- synchronize(block).toFutureOr
_ <- synchronize(data).toFutureOr
} yield ()
result.toFuture
@ -88,19 +93,20 @@ class LedgerSynchronizerService @Inject() (
* 2.4. if H <= N, if the hash already exists, it is ignored.
* 2.5. if H <= N, if the hash doesn't exists, remove blocks from N to H (included), then, add the new H.
*/
private def onLatestBlock(ledgerBlock: Block, newBlock: Block.HasTransactions): FutureApplicationResult[Unit] = {
private def onLatestBlock(ledgerBlock: Block, newData: BlockData): FutureApplicationResult[Unit] = {
val newBlock = newData._1
if (ledgerBlock.height.int + 1 == newBlock.height.int &&
newBlock.previousBlockhash.contains(ledgerBlock.hash)) {
logger.info(s"Appending block ${newBlock.height}, hash = ${newBlock.hash}")
ledgerDataHandler.push(newBlock)
ledgerDataHandler.push(newBlock, newData._2)
} else if (ledgerBlock.height.int + 1 == newBlock.height.int) {
logger.info(s"Reorganization to push block ${newBlock.height}, hash = ${newBlock.hash}")
val result = for {
blockhash <- newBlock.previousBlockhash.toFutureOr(BlockNotFoundError)
previousBlock <- getRPCBlock(blockhash).toFutureOr
_ <- synchronize(previousBlock).toFutureOr
_ <- synchronize(newBlock).toFutureOr
_ <- synchronize(newData).toFutureOr
} yield ()
result.toFuture
@ -108,7 +114,7 @@ class LedgerSynchronizerService @Inject() (
logger.info(s"Filling holes to push block ${newBlock.height}, hash = ${newBlock.hash}")
val result = for {
_ <- sync(ledgerBlock.height.int + 1 until newBlock.height.int).toFutureOr
_ <- synchronize(newBlock).toFutureOr
_ <- synchronize(newData).toFutureOr
} yield ()
result.toFuture
@ -126,7 +132,7 @@ class LedgerSynchronizerService @Inject() (
.getOrElse {
val x = for {
_ <- trimTo(newBlock.height).toFutureOr
_ <- synchronize(newBlock).toFutureOr
_ <- synchronize(newData).toFutureOr
} yield ()
x.toFuture
}
@ -148,24 +154,59 @@ class LedgerSynchronizerService @Inject() (
val result = for {
_ <- previous.toFutureOr
blockhash <- xsnService.getBlockhash(Height(height)).toFutureOr
block <- getRPCBlock(blockhash).toFutureOr
_ <- synchronize(block).toFutureOr
data <- getRPCBlock(blockhash).toFutureOr
_ <- synchronize(data).toFutureOr
} yield ()
result.toFuture
}
}
private def getRPCBlock(blockhash: Blockhash): FutureApplicationResult[Block.HasTransactions] = {
private def getRPCBlock(blockhash: Blockhash): FutureApplicationResult[BlockData] = {
val result = for {
rpcBlock <- xsnService.getBlock(blockhash).toFutureOr
extractionMethod <- blockService.extractionMethod(rpcBlock).toFutureOr
transactions <- transactionRPCService.getTransactions(rpcBlock.transactions).toFutureOr
} yield toPersistedBlock(rpcBlock, extractionMethod).withTransactions(transactions)
data <- transactionRPCService.getTransactions(rpcBlock.transactions).toFutureOr
(transactions, contracts) = data
validContracts <- getValidContracts(contracts).toFutureOr
} yield {
val block = toPersistedBlock(rpcBlock, extractionMethod).withTransactions(transactions)
(block, validContracts)
}
result.toFuture
}
private def getValidContracts(contracts: List[TPoSContract]): FutureApplicationResult[List[TPoSContract]] = {
val listF = contracts
.map { contract =>
xsnService
.isTPoSContract(contract.txid)
.toFutureOr
.map { valid =>
if (valid) Some(contract)
else None
}
.toFuture
}
val futureList = Future.sequence(listF)
futureList.map { list =>
val x = list.flatMap {
case Good(a) => a.map(Good(_))
case Bad(e) => Some(Bad(e))
}
val initial: ApplicationResult[List[TPoSContract]] = Good(List.empty)
x.foldLeft(initial) { case (acc, cur) =>
cur match {
case Good(contract) => acc.map(contract :: _)
case Bad(e) => acc.badMap(prev => prev ++ e)
}
}
}
}
/**
* Trim the ledger until the given block height, if the height is 4,
* the last stored block will be 3.
@ -188,3 +229,8 @@ class LedgerSynchronizerService @Inject() (
result.toFuture
}
}
object LedgerSynchronizerService {
type BlockData = (Block.HasTransactions, List[TPoSContract])
}

17
server/app/com/xsn/explorer/services/TransactionRPCService.scala

@ -6,7 +6,7 @@ import com.xsn.explorer.errors.{InvalidRawTransactionError, TransactionFormatErr
import com.xsn.explorer.models.persisted.Transaction
import com.xsn.explorer.models.rpc.TransactionVIN
import com.xsn.explorer.models.values._
import com.xsn.explorer.models.{TransactionDetails, TransactionValue}
import com.xsn.explorer.models.{TPoSContract, TransactionDetails, TransactionValue}
import com.xsn.explorer.util.Extensions.FutureOrExt
import javax.inject.Inject
import org.scalactic.{Bad, Good, One, Or}
@ -49,12 +49,12 @@ class TransactionRPCService @Inject() (
result.toFuture
}
def getTransaction(txid: TransactionId): FutureApplicationResult[Transaction.HasIO] = {
def getTransaction(txid: TransactionId): FutureApplicationResult[(Transaction.HasIO, Option[TPoSContract])] = {
val result = for {
tx <- xsnService.getTransaction(txid).toFutureOr
transactionVIN <- getTransactionVIN(tx.vin).toFutureOr
rpcTransaction = tx.copy(vin = transactionVIN)
} yield Transaction.fromRPC(rpcTransaction)._1
} yield Transaction.fromRPC(rpcTransaction)
result.toFuture
}
@ -92,8 +92,8 @@ class TransactionRPCService @Inject() (
}
}
def getTransactions(ids: List[TransactionId]): FutureApplicationResult[List[Transaction.HasIO]] = {
def loadTransactionsSlowly(pending: List[TransactionId]): FutureOr[List[Transaction.HasIO]] = pending match {
def getTransactions(ids: List[TransactionId]): FutureApplicationResult[(List[Transaction.HasIO], List[TPoSContract])] = {
def loadTransactionsSlowly(pending: List[TransactionId]): FutureOr[List[(Transaction.HasIO, Option[TPoSContract])]] = pending match {
case x :: xs =>
for {
tx <- getTransaction(x).toFutureOr
@ -116,6 +116,13 @@ class TransactionRPCService @Inject() (
logger.warn(s"Unable to load transactions due to server error, loading them sequentially, error = ${ex.getMessage}")
loadTransactionsSlowly(ids).toFuture
}
.toFutureOr
.map { result =>
val contracts = result.flatMap(_._2)
val txs = result.map(_._1)
(txs, contracts)
}
.toFuture
}
def sendRawTransaction(hexString: String): FutureApplicationResult[JsValue] = {

14
server/test/com/xsn/explorer/data/LedgerPostgresDataHandlerSpec.scala

@ -21,7 +21,7 @@ class LedgerPostgresDataHandlerSpec extends PostgresDataHandlerSpec with BeforeA
blockList.foreach { block =>
val transactions = getTransactions(block)
dataHandler.push(block.withTransactions(transactions)) mustEqual Good(())
dataHandler.push(block.withTransactions(transactions), List.empty) mustEqual Good(())
}
}
@ -29,22 +29,22 @@ class LedgerPostgresDataHandlerSpec extends PostgresDataHandlerSpec with BeforeA
blockList.drop(1).foreach { block =>
val transactions = getTransactions(block)
dataHandler.push(block.withTransactions(transactions)) mustEqual Bad(PreviousBlockMissingError).accumulating
dataHandler.push(block.withTransactions(transactions), List.empty) mustEqual Bad(PreviousBlockMissingError).accumulating
}
}
"succeed storing a repeated block by hash" in {
val genesis = blockList(0)
dataHandler.push(genesis.withTransactions(getTransactions(genesis))) mustEqual Good(())
dataHandler.push(genesis.withTransactions(getTransactions(genesis))) mustEqual Good(())
dataHandler.push(genesis.withTransactions(getTransactions(genesis)), List.empty) mustEqual Good(())
dataHandler.push(genesis.withTransactions(getTransactions(genesis)), List.empty) mustEqual Good(())
}
"fail to store a repeated block by height" in {
val genesis = blockList(0)
dataHandler.push(genesis.withTransactions(getTransactions(genesis))) mustEqual Good(())
dataHandler.push(genesis.withTransactions(getTransactions(genesis)), List.empty) mustEqual Good(())
val block = blockList(1).copy(previousBlockhash = None, height = genesis.height)
dataHandler.push(block.withTransactions(getTransactions(block))) mustEqual Bad(RepeatedBlockHeightError).accumulating
dataHandler.push(block.withTransactions(getTransactions(block)), List.empty) mustEqual Bad(RepeatedBlockHeightError).accumulating
}
}
@ -62,7 +62,7 @@ class LedgerPostgresDataHandlerSpec extends PostgresDataHandlerSpec with BeforeA
blockList.foreach { block =>
val transactions = getTransactions(block)
dataHandler.push(block.withTransactions(transactions)) mustEqual Good(())
dataHandler.push(block.withTransactions(transactions), List.empty) mustEqual Good(())
}
blockList.reverse.foreach { block =>

4
server/test/com/xsn/explorer/data/TransactionPostgresDataHandlerSpec.scala

@ -388,13 +388,13 @@ class TransactionPostgresDataHandlerSpec extends PostgresDataHandlerSpec with Be
.map(Transaction.fromRPC)
.map(_._1)
val result = ledgerDataHandler.push(block.withTransactions(transactions))
val result = ledgerDataHandler.push(block.withTransactions(transactions), List.empty)
result.isGood mustEqual true
}
private def createBlock(block: Block, transactions: List[Transaction.HasIO]) = {
val result = ledgerDataHandler.push(block.withTransactions(transactions))
val result = ledgerDataHandler.push(block.withTransactions(transactions), List.empty)
result.isGood mustEqual true
}

26
server/test/com/xsn/explorer/helpers/BlockLoader.scala

@ -15,6 +15,32 @@ object BlockLoader {
Converters.toPersistedBlock(rpcBlock)
}
def getWithTransactions(blockhash: String): persisted.Block.HasTransactions = {
val rpcBlock = getRPC(blockhash)
val block = Converters.toPersistedBlock(rpcBlock)
val transactions = rpcBlock
.transactions
.map(_.string)
.map(TransactionLoader.getWithValues)
.map(persisted.Transaction.fromRPC)
.map(_._1)
persisted.Block.HasTransactions(block, transactions)
}
def getWithTransactions(rpcBlock: rpc.Block): persisted.Block.HasTransactions = {
val block = Converters.toPersistedBlock(rpcBlock)
val transactions = rpcBlock
.transactions
.map(_.string)
.map(TransactionLoader.getWithValues)
.map(_.copy(blockhash = rpcBlock.hash))
.map(persisted.Transaction.fromRPC)
.map(_._1)
persisted.Block.HasTransactions(block, transactions)
}
def getRPC(blockhash: String): rpc.Block = {
val partial = json(blockhash).as[rpc.Block]
cleanGenesisBlock(partial)

2
server/test/com/xsn/explorer/helpers/DataHandlerObjects.scala

@ -11,9 +11,11 @@ trait DataHandlerObjects {
lazy val transactionInputDAO = new TransactionInputPostgresDAO
lazy val transactionOutputDAO = new TransactionOutputPostgresDAO
lazy val addressTransactionDetailsDAO = new AddressTransactionDetailsPostgresDAO
lazy val tposContractDAO = new TPoSContractDAO
lazy val transactionPostgresDAO = new TransactionPostgresDAO(
transactionInputDAO,
transactionOutputDAO,
tposContractDAO,
addressTransactionDetailsDAO,
fieldOrderingSQLInterpreter)
lazy val blockFilterPostgresDAO = new BlockFilterPostgresDAO

Loading…
Cancel
Save