Browse Source

Remove explicit send calls, part deux (#4408)

* Rename synchronous_get to synchronous_send

This makes it more inline with the method 'send' of which
synchronous_send is the, well, synchronous version.

* Move protocol strings from scripts to network

This is again a small step in the right direction. The network module is
going to accumulate more and more of these simple methods. Once
everything is moved into that module, that module is going to be split.

Note that I've left the scripts which use scripts/util.py alone. I
suspect the same functionality can be reached when using just
lib/network.py and that scripts/util.py is obsolete.

* Remove protocol string from verifier and websocket

Websocket still has some references, that'll take more work to remove. Once the
network module has been split this should be easy.
I took the liberty to rename a variable to better show what it is.

* Remove protocol strings from remainder

The naming scheme I'm following for the newly introduced methods in the network
module is: 'blockchain.<subject>.<action>' -> def <action>_(for|to)_<subject>

* Move explicit protocol calls closer to each other

This makes it easier to keep track of the methods which are due to be
extracted.

* Remove `send` when using `get_transaction`

This is the final step to formalize (the informal) interface of the network
module.
A chance of note is changed interface for async/sync calls. It is no longer
required to use the `synchronous_send` call. Merely NOT passing a callback
makes the call synchronous. I feel this makes the API more intuitive to work
with and easier to replace with a different network module.

* Remove send from get_merkle_for_transaction

The pattern which emerged for calling the lambda yielded an slight refactor.
I'm not happy with the name for the `__invoke` method.

* Remove explict send from websockets

* Remove explicit send from scripts

* Remove explicit send from wallet

* Remove explicit sync_send from commands, scripts

* Remove optional timeout parameter

This parameter doesn't seem to be used a lot and removing it makes the
remaining calls easier. Potentionally a contentious choice!

* Rename `broadcast` to `broadcast_transaction`

Doing so makes the method name consistent with the other ElectrumX protocol
method names.

* Remove synchronous_send

Now every method is intuitive in what it does, no special handling required.
The `broadcast_transaction` method is weird. I've opted not to change the
return type b/c I found it hard to know what the exact consequences are. But
ideally this method should just works as all the other ElectrumX related
messages. On the other hand this shows nicely how you _can_ do something
differnt quite easy.

* Rename the awkwardly name `__invoke` method

The new name reflects what it does.

* Process the result of linter feedback

I've used flake8-diff (and ignored a couple of line length warnings).

* Rename tx_response to on_tx_response

This fell through the cracks when this branch was rebased.

* subscript_to_scripthash should be get_balance

An oversight while refactoring.

* Add missing return statement

Without this statement the transaction would have been broadcasted twice.

* Pass list of tuples to send not single tuple

* Add @staticmethod decorator

* Fix argument to be an array
3.2.x
Harm Aarts 7 years ago
committed by ghost43
parent
commit
e57e55aad8
  1. 2
      gui/kivy/main_window.py
  2. 4
      gui/qt/main_window.py
  3. 2
      gui/stdio.py
  4. 2
      gui/text.py
  5. 14
      lib/commands.py
  6. 154
      lib/network.py
  7. 7
      lib/synchronizer.py
  8. 2
      lib/tests/test_transaction.py
  9. 8
      lib/verifier.py
  10. 10
      lib/wallet.py
  11. 11
      lib/websockets.py
  12. 2
      scripts/block_headers
  13. 2
      scripts/get_history
  14. 2
      scripts/watch_address

2
gui/kivy/main_window.py

@ -809,7 +809,7 @@ class ElectrumWindow(App):
Clock.schedule_once(lambda dt: on_success(tx)) Clock.schedule_once(lambda dt: on_success(tx))
def _broadcast_thread(self, tx, on_complete): def _broadcast_thread(self, tx, on_complete):
ok, txid = self.network.broadcast(tx) ok, txid = self.network.broadcast_transaction(tx)
Clock.schedule_once(lambda dt: on_complete(ok, txid)) Clock.schedule_once(lambda dt: on_complete(ok, txid))
def broadcast(self, tx, pr=None): def broadcast(self, tx, pr=None):

4
gui/qt/main_window.py

@ -1600,7 +1600,7 @@ class ElectrumWindow(QMainWindow, MessageBoxMixin, PrintError):
if pr and pr.has_expired(): if pr and pr.has_expired():
self.payment_request = None self.payment_request = None
return False, _("Payment request has expired") return False, _("Payment request has expired")
status, msg = self.network.broadcast(tx) status, msg = self.network.broadcast_transaction(tx)
if pr and status is True: if pr and status is True:
self.invoices.set_paid(pr, tx.txid()) self.invoices.set_paid(pr, tx.txid())
self.invoices.save() self.invoices.save()
@ -2359,7 +2359,7 @@ class ElectrumWindow(QMainWindow, MessageBoxMixin, PrintError):
if ok and txid: if ok and txid:
txid = str(txid).strip() txid = str(txid).strip()
try: try:
r = self.network.synchronous_get(('blockchain.transaction.get',[txid])) r = self.network.get_transaction(txid)
except BaseException as e: except BaseException as e:
self.show_message(str(e)) self.show_message(str(e))
return return

2
gui/stdio.py

@ -198,7 +198,7 @@ class ElectrumGui:
self.wallet.labels[tx.txid()] = self.str_description self.wallet.labels[tx.txid()] = self.str_description
print(_("Please wait...")) print(_("Please wait..."))
status, msg = self.network.broadcast(tx) status, msg = self.network.broadcast_transaction(tx)
if status: if status:
print(_('Payment sent.')) print(_('Payment sent.'))

2
gui/text.py

@ -349,7 +349,7 @@ class ElectrumGui:
self.wallet.labels[tx.txid()] = self.str_description self.wallet.labels[tx.txid()] = self.str_description
self.show_message(_("Please wait..."), getchar=False) self.show_message(_("Please wait..."), getchar=False)
status, msg = self.network.broadcast(tx) status, msg = self.network.broadcast_transaction(tx)
if status: if status:
self.show_message(_('Payment sent.')) self.show_message(_('Payment sent.'))

14
lib/commands.py

@ -181,7 +181,7 @@ class Commands:
walletless server query, results are not checked by SPV. walletless server query, results are not checked by SPV.
""" """
sh = bitcoin.address_to_scripthash(address) sh = bitcoin.address_to_scripthash(address)
return self.network.synchronous_get(('blockchain.scripthash.get_history', [sh])) return self.network.get_history_for_scripthash(sh)
@command('w') @command('w')
def listunspent(self): def listunspent(self):
@ -199,7 +199,7 @@ class Commands:
is a walletless server query, results are not checked by SPV. is a walletless server query, results are not checked by SPV.
""" """
sh = bitcoin.address_to_scripthash(address) sh = bitcoin.address_to_scripthash(address)
return self.network.synchronous_get(('blockchain.scripthash.listunspent', [sh])) return self.network.listunspent_for_scripthash(sh)
@command('') @command('')
def serialize(self, jsontx): def serialize(self, jsontx):
@ -252,10 +252,10 @@ class Commands:
return tx.deserialize() return tx.deserialize()
@command('n') @command('n')
def broadcast(self, tx, timeout=30): def broadcast(self, tx):
"""Broadcast a transaction to the network. """ """Broadcast a transaction to the network. """
tx = Transaction(tx) tx = Transaction(tx)
return self.network.broadcast(tx, timeout) return self.network.broadcast_transaction(tx)
@command('') @command('')
def createmultisig(self, num, pubkeys): def createmultisig(self, num, pubkeys):
@ -322,7 +322,7 @@ class Commands:
server query, results are not checked by SPV. server query, results are not checked by SPV.
""" """
sh = bitcoin.address_to_scripthash(address) sh = bitcoin.address_to_scripthash(address)
out = self.network.synchronous_get(('blockchain.scripthash.get_balance', [sh])) out = self.network.get_balance_for_scripthash(sh)
out["confirmed"] = str(Decimal(out["confirmed"])/COIN) out["confirmed"] = str(Decimal(out["confirmed"])/COIN)
out["unconfirmed"] = str(Decimal(out["unconfirmed"])/COIN) out["unconfirmed"] = str(Decimal(out["unconfirmed"])/COIN)
return out return out
@ -331,7 +331,7 @@ class Commands:
def getmerkle(self, txid, height): def getmerkle(self, txid, height):
"""Get Merkle branch of a transaction included in a block. Electrum """Get Merkle branch of a transaction included in a block. Electrum
uses this to verify transactions (Simple Payment Verification).""" uses this to verify transactions (Simple Payment Verification)."""
return self.network.synchronous_get(('blockchain.transaction.get_merkle', [txid, int(height)])) return self.network.get_merkle_for_transaction(txid, int(height))
@command('n') @command('n')
def getservers(self): def getservers(self):
@ -517,7 +517,7 @@ class Commands:
if self.wallet and txid in self.wallet.transactions: if self.wallet and txid in self.wallet.transactions:
tx = self.wallet.transactions[txid] tx = self.wallet.transactions[txid]
else: else:
raw = self.network.synchronous_get(('blockchain.transaction.get', [txid])) raw = self.network.get_transaction(txid)
if raw: if raw:
tx = Transaction(raw) tx = Transaction(raw)
else: else:

154
lib/network.py

@ -620,26 +620,6 @@ class Network(util.DaemonThread):
# Response is now in canonical form # Response is now in canonical form
self.process_response(interface, response, callbacks) self.process_response(interface, response, callbacks)
def map_scripthash_to_address(self, callback):
def cb2(x):
x2 = x.copy()
p = x2.pop('params')
addr = self.h2addr[p[0]]
x2['params'] = [addr]
callback(x2)
return cb2
def subscribe_to_addresses(self, addresses, callback):
hash2address = {bitcoin.address_to_scripthash(address): address for address in addresses}
self.h2addr.update(hash2address)
msgs = [('blockchain.scripthash.subscribe', [x]) for x in hash2address.keys()]
self.send(msgs, self.map_scripthash_to_address(callback))
def request_address_history(self, address, callback):
h = bitcoin.address_to_scripthash(address)
self.h2addr.update({h: address})
self.send([('blockchain.scripthash.get_history', [h])], self.map_scripthash_to_address(callback))
def send(self, messages, callback): def send(self, messages, callback):
'''Messages is a list of (method, params) tuples''' '''Messages is a list of (method, params) tuples'''
messages = list(messages) messages = list(messages)
@ -668,6 +648,7 @@ class Network(util.DaemonThread):
self.subscriptions[k] = l self.subscriptions[k] = l
# check cached response for subscriptions # check cached response for subscriptions
r = self.sub_cache.get(k) r = self.sub_cache.get(k)
if r is not None: if r is not None:
self.print_error("cache hit", k) self.print_error("cache hit", k)
callback(r) callback(r)
@ -802,12 +783,6 @@ class Network(util.DaemonThread):
blockchain.catch_up = None blockchain.catch_up = None
self.notify('updated') self.notify('updated')
def request_header(self, interface, height):
#interface.print_error("requesting header %d" % height)
self.queue_request('blockchain.block.get_header', [height], interface)
interface.request = height
interface.req_time = time.time()
def on_get_header(self, interface, response): def on_get_header(self, interface, response):
'''Handle receiving a single block header''' '''Handle receiving a single block header'''
header = response.get('result') header = response.get('result')
@ -1062,27 +1037,134 @@ class Network(util.DaemonThread):
def get_local_height(self): def get_local_height(self):
return self.blockchain().height() return self.blockchain().height()
def synchronous_get(self, request, timeout=30): @staticmethod
def __wait_for(it):
"""Wait for the result of calling lambda `it`."""
q = queue.Queue() q = queue.Queue()
self.send([request], q.put) it(q.put)
try: try:
r = q.get(True, timeout) result = q.get(block=True, timeout=30)
except queue.Empty: except queue.Empty:
raise util.TimeoutException(_('Server did not answer')) raise util.TimeoutException(_('Server did not answer'))
if r.get('error'):
raise Exception(r.get('error'))
return r.get('result')
def broadcast(self, tx, timeout=30): if result.get('error'):
tx_hash = tx.txid() raise Exception(result.get('error'))
return result.get('result')
@staticmethod
def __with_default_synchronous_callback(invocation, callback):
""" Use this method if you want to make the network request
synchronous. """
if not callback:
return Network.__wait_for(invocation)
invocation(callback)
def request_header(self, interface, height):
self.queue_request('blockchain.block.get_header', [height], interface)
interface.request = height
interface.req_time = time.time()
def map_scripthash_to_address(self, callback):
def cb2(x):
x2 = x.copy()
p = x2.pop('params')
addr = self.h2addr[p[0]]
x2['params'] = [addr]
callback(x2)
return cb2
def subscribe_to_addresses(self, addresses, callback):
hash2address = {
bitcoin.address_to_scripthash(address): address
for address in addresses}
self.h2addr.update(hash2address)
msgs = [
('blockchain.scripthash.subscribe', [x])
for x in hash2address.keys()]
self.send(msgs, self.map_scripthash_to_address(callback))
def request_address_history(self, address, callback):
h = bitcoin.address_to_scripthash(address)
self.h2addr.update({h: address})
self.send([('blockchain.scripthash.get_history', [h])], self.map_scripthash_to_address(callback))
# NOTE this method handles exceptions and a special edge case, counter to
# what the other ElectrumX methods do. This is unexpected.
def broadcast_transaction(self, transaction, callback=None):
command = 'blockchain.transaction.broadcast'
invocation = lambda c: self.send([(command, [str(transaction)])], c)
if callback:
invocation(callback)
return
try: try:
out = self.synchronous_get(('blockchain.transaction.broadcast', [str(tx)]), timeout) out = Network.__wait_for(invocation)
except BaseException as e: except BaseException as e:
return False, "error: " + str(e) return False, "error: " + str(e)
if out != tx_hash:
if out != transaction.txid():
return False, "error: " + out return False, "error: " + out
return True, out return True, out
def get_history_for_scripthash(self, hash, callback=None):
command = 'blockchain.scripthash.get_history'
invocation = lambda c: self.send([(command, [hash])], c)
return Network.__with_default_synchronous_callback(invocation, callback)
def subscribe_to_headers(self, callback=None):
command = 'blockchain.headers.subscribe'
invocation = lambda c: self.send([(command, [True])], c)
return Network.__with_default_synchronous_callback(invocation, callback)
def subscribe_to_address(self, address, callback=None):
command = 'blockchain.address.subscribe'
invocation = lambda c: self.send([(command, [address])], c)
return Network.__with_default_synchronous_callback(invocation, callback)
def get_merkle_for_transaction(self, tx_hash, tx_height, callback=None):
command = 'blockchain.transaction.get_merkle'
invocation = lambda c: self.send([(command, [tx_hash, tx_height])], c)
return Network.__with_default_synchronous_callback(invocation, callback)
def subscribe_to_scripthash(self, scripthash, callback=None):
command = 'blockchain.scripthash.subscribe'
invocation = lambda c: self.send([(command, [scripthash])], c)
return Network.__with_default_synchronous_callback(invocation, callback)
def get_transaction(self, transaction_hash, callback=None):
command = 'blockchain.transaction.get'
invocation = lambda c: self.send([(command, [transaction_hash])], c)
return Network.__with_default_synchronous_callback(invocation, callback)
def get_transactions(self, transaction_hashes, callback=None):
command = 'blockchain.transaction.get'
messages = [(command, [tx_hash]) for tx_hash in transaction_hashes]
invocation = lambda c: self.send(messages, c)
return Network.__with_default_synchronous_callback(invocation, callback)
def listunspent_for_scripthash(self, scripthash, callback=None):
command = 'blockchain.scripthash.listunspent'
invocation = lambda c: self.send([(command, [scripthash])], c)
return Network.__with_default_synchronous_callback(invocation, callback)
def get_balance_for_scripthash(self, scripthash, callback=None):
command = 'blockchain.scripthash.get_balance'
invocation = lambda c: self.send([(command, [scripthash])], c)
return Network.__with_default_synchronous_callback(invocation, callback)
def export_checkpoints(self, path): def export_checkpoints(self, path):
# run manually from the console to generate checkpoints # run manually from the console to generate checkpoints
cp = self.blockchain().get_checkpoints() cp = self.blockchain().get_checkpoints()

7
lib/synchronizer.py

@ -160,15 +160,16 @@ class Synchronizer(ThreadJob):
def request_missing_txs(self, hist): def request_missing_txs(self, hist):
# "hist" is a list of [tx_hash, tx_height] lists # "hist" is a list of [tx_hash, tx_height] lists
requests = [] transaction_hashes = []
for tx_hash, tx_height in hist: for tx_hash, tx_height in hist:
if tx_hash in self.requested_tx: if tx_hash in self.requested_tx:
continue continue
if tx_hash in self.wallet.transactions: if tx_hash in self.wallet.transactions:
continue continue
requests.append(('blockchain.transaction.get', [tx_hash])) transaction_hashes.append(tx_hash)
self.requested_tx[tx_hash] = tx_height self.requested_tx[tx_hash] = tx_height
self.network.send(requests, self.on_tx_response)
self.network.get_transactions(transaction_hashes, self.on_tx_response)
def initialize(self): def initialize(self):
'''Check the initial state of the wallet. Subscribe to all its '''Check the initial state of the wallet. Subscribe to all its

2
lib/tests/test_transaction.py

@ -773,5 +773,5 @@ class NetworkMock(object):
def __init__(self, unspent): def __init__(self, unspent):
self.unspent = unspent self.unspent = unspent
def synchronous_get(self, arg): def synchronous_send(self, arg):
return self.unspent return self.unspent

8
lib/verifier.py

@ -54,9 +54,11 @@ class SPV(ThreadJob):
else: else:
if (tx_hash not in self.requested_merkle if (tx_hash not in self.requested_merkle
and tx_hash not in self.merkle_roots): and tx_hash not in self.merkle_roots):
request = ('blockchain.transaction.get_merkle',
[tx_hash, tx_height]) self.network.get_merkle_for_transaction(
self.network.send([request], self.verify_merkle) tx_hash,
tx_height,
self.verify_merkle)
self.print_error('requested merkle', tx_hash) self.print_error('requested merkle', tx_hash)
self.requested_merkle.add(tx_hash) self.requested_merkle.add(tx_hash)

10
lib/wallet.py

@ -93,12 +93,13 @@ def dust_threshold(network):
def append_utxos_to_inputs(inputs, network, pubkey, txin_type, imax): def append_utxos_to_inputs(inputs, network, pubkey, txin_type, imax):
if txin_type != 'p2pk': if txin_type != 'p2pk':
address = bitcoin.pubkey_to_address(txin_type, pubkey) address = bitcoin.pubkey_to_address(txin_type, pubkey)
sh = bitcoin.address_to_scripthash(address) scripthash = bitcoin.address_to_scripthash(address)
else: else:
script = bitcoin.public_key_to_p2pk_script(pubkey) script = bitcoin.public_key_to_p2pk_script(pubkey)
sh = bitcoin.script_to_scripthash(script) scripthash = bitcoin.script_to_scripthash(script)
address = '(pubkey)' address = '(pubkey)'
u = network.synchronous_get(('blockchain.scripthash.listunspent', [sh]))
u = network.listunspent_for_scripthash(scripthash)
for item in u: for item in u:
if len(inputs) >= imax: if len(inputs) >= imax:
break break
@ -1471,9 +1472,8 @@ class Abstract_Wallet(PrintError):
# all the input txs, in which case we ask the network. # all the input txs, in which case we ask the network.
tx = self.transactions.get(tx_hash, None) tx = self.transactions.get(tx_hash, None)
if not tx and self.network: if not tx and self.network:
request = ('blockchain.transaction.get', [tx_hash])
try: try:
tx = Transaction(self.network.synchronous_get(request)) tx = Transaction(self.network.get_transaction(tx_hash))
except TimeoutException as e: except TimeoutException as e:
self.print_error('getting input txn from network timed out for {}'.format(tx_hash)) self.print_error('getting input txn from network timed out for {}'.format(tx_hash))
if not ignore_timeout: if not ignore_timeout:

11
lib/websockets.py

@ -95,17 +95,18 @@ class WsClientThread(util.DaemonThread):
continue continue
util.print_error('response', r) util.print_error('response', r)
method = r.get('method') method = r.get('method')
params = r.get('params') scripthash = r.get('params')[0]
result = r.get('result') result = r.get('result')
if result is None: if result is None:
continue continue
if method == 'blockchain.scripthash.subscribe': if method == 'blockchain.scripthash.subscribe':
self.network.send([('blockchain.scripthash.get_balance', params)], self.response_queue.put) self.network.get_balance_for_scripthash(
scripthash, self.response_queue.put)
elif method == 'blockchain.scripthash.get_balance': elif method == 'blockchain.scripthash.get_balance':
h = params[0] addr = self.network.h2addr.get(scripthash, None)
addr = self.network.h2addr.get(h, None)
if addr is None: if addr is None:
util.print_error("can't find address for scripthash: %s" % h) util.print_error(
"can't find address for scripthash: %s" % scripthash)
l = self.subscriptions.get(addr, []) l = self.subscriptions.get(addr, [])
for ws, amount in l: for ws, amount in l:
if not ws.closed: if not ws.closed:

2
scripts/block_headers

@ -22,7 +22,7 @@ if not network.is_connected():
# 2. send the subscription # 2. send the subscription
callback = lambda response: print_msg(json_encode(response.get('result'))) callback = lambda response: print_msg(json_encode(response.get('result')))
network.send([('server.version',["block_headers script", "1.2"])], callback) network.send([('server.version',["block_headers script", "1.2"])], callback)
network.send([('blockchain.headers.subscribe',[True])], callback) network.subscribe_to_headers(callback)
# 3. wait for results # 3. wait for results
while network.is_connected(): while network.is_connected():

2
scripts/get_history

@ -14,5 +14,5 @@ except Exception:
n = Network() n = Network()
n.start() n.start()
_hash = bitcoin.address_to_scripthash(addr) _hash = bitcoin.address_to_scripthash(addr)
h = n.synchronous_get(('blockchain.scripthash.get_history',[_hash])) h = n.get_history_for_scripthash(_hash)
print_msg(json_encode(h)) print_msg(json_encode(h))

2
scripts/watch_address

@ -29,7 +29,7 @@ if not network.is_connected():
# 2. send the subscription # 2. send the subscription
callback = lambda response: print_msg(json_encode(response.get('result'))) callback = lambda response: print_msg(json_encode(response.get('result')))
network.send([('blockchain.scripthash.subscribe',[sh])], callback) network.subscribe_to_address(addr, callback)
# 3. wait for results # 3. wait for results
while network.is_connected(): while network.is_connected():

Loading…
Cancel
Save