From e57e55aad8da8dba6a8b7a83ef203e5ab770423c Mon Sep 17 00:00:00 2001 From: Harm Aarts Date: Wed, 6 Jun 2018 15:06:04 +0200 Subject: [PATCH] Remove explicit send calls, part deux (#4408) * Rename synchronous_get to synchronous_send This makes it more inline with the method 'send' of which synchronous_send is the, well, synchronous version. * Move protocol strings from scripts to network This is again a small step in the right direction. The network module is going to accumulate more and more of these simple methods. Once everything is moved into that module, that module is going to be split. Note that I've left the scripts which use scripts/util.py alone. I suspect the same functionality can be reached when using just lib/network.py and that scripts/util.py is obsolete. * Remove protocol string from verifier and websocket Websocket still has some references, that'll take more work to remove. Once the network module has been split this should be easy. I took the liberty to rename a variable to better show what it is. * Remove protocol strings from remainder The naming scheme I'm following for the newly introduced methods in the network module is: 'blockchain..' -> def _(for|to)_ * Move explicit protocol calls closer to each other This makes it easier to keep track of the methods which are due to be extracted. * Remove `send` when using `get_transaction` This is the final step to formalize (the informal) interface of the network module. A chance of note is changed interface for async/sync calls. It is no longer required to use the `synchronous_send` call. Merely NOT passing a callback makes the call synchronous. I feel this makes the API more intuitive to work with and easier to replace with a different network module. * Remove send from get_merkle_for_transaction The pattern which emerged for calling the lambda yielded an slight refactor. I'm not happy with the name for the `__invoke` method. * Remove explict send from websockets * Remove explicit send from scripts * Remove explicit send from wallet * Remove explicit sync_send from commands, scripts * Remove optional timeout parameter This parameter doesn't seem to be used a lot and removing it makes the remaining calls easier. Potentionally a contentious choice! * Rename `broadcast` to `broadcast_transaction` Doing so makes the method name consistent with the other ElectrumX protocol method names. * Remove synchronous_send Now every method is intuitive in what it does, no special handling required. The `broadcast_transaction` method is weird. I've opted not to change the return type b/c I found it hard to know what the exact consequences are. But ideally this method should just works as all the other ElectrumX related messages. On the other hand this shows nicely how you _can_ do something differnt quite easy. * Rename the awkwardly name `__invoke` method The new name reflects what it does. * Process the result of linter feedback I've used flake8-diff (and ignored a couple of line length warnings). * Rename tx_response to on_tx_response This fell through the cracks when this branch was rebased. * subscript_to_scripthash should be get_balance An oversight while refactoring. * Add missing return statement Without this statement the transaction would have been broadcasted twice. * Pass list of tuples to send not single tuple * Add @staticmethod decorator * Fix argument to be an array --- gui/kivy/main_window.py | 2 +- gui/qt/main_window.py | 4 +- gui/stdio.py | 2 +- gui/text.py | 2 +- lib/commands.py | 14 ++-- lib/network.py | 154 ++++++++++++++++++++++++++-------- lib/synchronizer.py | 7 +- lib/tests/test_transaction.py | 2 +- lib/verifier.py | 8 +- lib/wallet.py | 10 +-- lib/websockets.py | 11 +-- scripts/block_headers | 2 +- scripts/get_history | 2 +- scripts/watch_address | 2 +- 14 files changed, 154 insertions(+), 68 deletions(-) diff --git a/gui/kivy/main_window.py b/gui/kivy/main_window.py index 2dc623022..919203c06 100644 --- a/gui/kivy/main_window.py +++ b/gui/kivy/main_window.py @@ -809,7 +809,7 @@ class ElectrumWindow(App): Clock.schedule_once(lambda dt: on_success(tx)) def _broadcast_thread(self, tx, on_complete): - ok, txid = self.network.broadcast(tx) + ok, txid = self.network.broadcast_transaction(tx) Clock.schedule_once(lambda dt: on_complete(ok, txid)) def broadcast(self, tx, pr=None): diff --git a/gui/qt/main_window.py b/gui/qt/main_window.py index 1397f0908..6d5786d8e 100644 --- a/gui/qt/main_window.py +++ b/gui/qt/main_window.py @@ -1600,7 +1600,7 @@ class ElectrumWindow(QMainWindow, MessageBoxMixin, PrintError): if pr and pr.has_expired(): self.payment_request = None return False, _("Payment request has expired") - status, msg = self.network.broadcast(tx) + status, msg = self.network.broadcast_transaction(tx) if pr and status is True: self.invoices.set_paid(pr, tx.txid()) self.invoices.save() @@ -2359,7 +2359,7 @@ class ElectrumWindow(QMainWindow, MessageBoxMixin, PrintError): if ok and txid: txid = str(txid).strip() try: - r = self.network.synchronous_get(('blockchain.transaction.get',[txid])) + r = self.network.get_transaction(txid) except BaseException as e: self.show_message(str(e)) return diff --git a/gui/stdio.py b/gui/stdio.py index 4b0daa156..a4756eae6 100644 --- a/gui/stdio.py +++ b/gui/stdio.py @@ -198,7 +198,7 @@ class ElectrumGui: self.wallet.labels[tx.txid()] = self.str_description print(_("Please wait...")) - status, msg = self.network.broadcast(tx) + status, msg = self.network.broadcast_transaction(tx) if status: print(_('Payment sent.')) diff --git a/gui/text.py b/gui/text.py index 9ecf91aff..b616f8734 100644 --- a/gui/text.py +++ b/gui/text.py @@ -349,7 +349,7 @@ class ElectrumGui: self.wallet.labels[tx.txid()] = self.str_description self.show_message(_("Please wait..."), getchar=False) - status, msg = self.network.broadcast(tx) + status, msg = self.network.broadcast_transaction(tx) if status: self.show_message(_('Payment sent.')) diff --git a/lib/commands.py b/lib/commands.py index af2dd5742..d43292a15 100644 --- a/lib/commands.py +++ b/lib/commands.py @@ -181,7 +181,7 @@ class Commands: walletless server query, results are not checked by SPV. """ sh = bitcoin.address_to_scripthash(address) - return self.network.synchronous_get(('blockchain.scripthash.get_history', [sh])) + return self.network.get_history_for_scripthash(sh) @command('w') def listunspent(self): @@ -199,7 +199,7 @@ class Commands: is a walletless server query, results are not checked by SPV. """ sh = bitcoin.address_to_scripthash(address) - return self.network.synchronous_get(('blockchain.scripthash.listunspent', [sh])) + return self.network.listunspent_for_scripthash(sh) @command('') def serialize(self, jsontx): @@ -252,10 +252,10 @@ class Commands: return tx.deserialize() @command('n') - def broadcast(self, tx, timeout=30): + def broadcast(self, tx): """Broadcast a transaction to the network. """ tx = Transaction(tx) - return self.network.broadcast(tx, timeout) + return self.network.broadcast_transaction(tx) @command('') def createmultisig(self, num, pubkeys): @@ -322,7 +322,7 @@ class Commands: server query, results are not checked by SPV. """ sh = bitcoin.address_to_scripthash(address) - out = self.network.synchronous_get(('blockchain.scripthash.get_balance', [sh])) + out = self.network.get_balance_for_scripthash(sh) out["confirmed"] = str(Decimal(out["confirmed"])/COIN) out["unconfirmed"] = str(Decimal(out["unconfirmed"])/COIN) return out @@ -331,7 +331,7 @@ class Commands: def getmerkle(self, txid, height): """Get Merkle branch of a transaction included in a block. Electrum uses this to verify transactions (Simple Payment Verification).""" - return self.network.synchronous_get(('blockchain.transaction.get_merkle', [txid, int(height)])) + return self.network.get_merkle_for_transaction(txid, int(height)) @command('n') def getservers(self): @@ -517,7 +517,7 @@ class Commands: if self.wallet and txid in self.wallet.transactions: tx = self.wallet.transactions[txid] else: - raw = self.network.synchronous_get(('blockchain.transaction.get', [txid])) + raw = self.network.get_transaction(txid) if raw: tx = Transaction(raw) else: diff --git a/lib/network.py b/lib/network.py index 8b64bd7a4..d6349711f 100644 --- a/lib/network.py +++ b/lib/network.py @@ -620,26 +620,6 @@ class Network(util.DaemonThread): # Response is now in canonical form self.process_response(interface, response, callbacks) - def map_scripthash_to_address(self, callback): - def cb2(x): - x2 = x.copy() - p = x2.pop('params') - addr = self.h2addr[p[0]] - x2['params'] = [addr] - callback(x2) - return cb2 - - def subscribe_to_addresses(self, addresses, callback): - hash2address = {bitcoin.address_to_scripthash(address): address for address in addresses} - self.h2addr.update(hash2address) - msgs = [('blockchain.scripthash.subscribe', [x]) for x in hash2address.keys()] - self.send(msgs, self.map_scripthash_to_address(callback)) - - def request_address_history(self, address, callback): - h = bitcoin.address_to_scripthash(address) - self.h2addr.update({h: address}) - self.send([('blockchain.scripthash.get_history', [h])], self.map_scripthash_to_address(callback)) - def send(self, messages, callback): '''Messages is a list of (method, params) tuples''' messages = list(messages) @@ -668,6 +648,7 @@ class Network(util.DaemonThread): self.subscriptions[k] = l # check cached response for subscriptions r = self.sub_cache.get(k) + if r is not None: self.print_error("cache hit", k) callback(r) @@ -802,12 +783,6 @@ class Network(util.DaemonThread): blockchain.catch_up = None self.notify('updated') - def request_header(self, interface, height): - #interface.print_error("requesting header %d" % height) - self.queue_request('blockchain.block.get_header', [height], interface) - interface.request = height - interface.req_time = time.time() - def on_get_header(self, interface, response): '''Handle receiving a single block header''' header = response.get('result') @@ -1062,27 +1037,134 @@ class Network(util.DaemonThread): def get_local_height(self): return self.blockchain().height() - def synchronous_get(self, request, timeout=30): + @staticmethod + def __wait_for(it): + """Wait for the result of calling lambda `it`.""" q = queue.Queue() - self.send([request], q.put) + it(q.put) try: - r = q.get(True, timeout) + result = q.get(block=True, timeout=30) except queue.Empty: raise util.TimeoutException(_('Server did not answer')) - if r.get('error'): - raise Exception(r.get('error')) - return r.get('result') - def broadcast(self, tx, timeout=30): - tx_hash = tx.txid() + if result.get('error'): + raise Exception(result.get('error')) + + return result.get('result') + + @staticmethod + def __with_default_synchronous_callback(invocation, callback): + """ Use this method if you want to make the network request + synchronous. """ + if not callback: + return Network.__wait_for(invocation) + + invocation(callback) + + def request_header(self, interface, height): + self.queue_request('blockchain.block.get_header', [height], interface) + interface.request = height + interface.req_time = time.time() + + def map_scripthash_to_address(self, callback): + def cb2(x): + x2 = x.copy() + p = x2.pop('params') + addr = self.h2addr[p[0]] + x2['params'] = [addr] + callback(x2) + return cb2 + + def subscribe_to_addresses(self, addresses, callback): + hash2address = { + bitcoin.address_to_scripthash(address): address + for address in addresses} + self.h2addr.update(hash2address) + msgs = [ + ('blockchain.scripthash.subscribe', [x]) + for x in hash2address.keys()] + self.send(msgs, self.map_scripthash_to_address(callback)) + + def request_address_history(self, address, callback): + h = bitcoin.address_to_scripthash(address) + self.h2addr.update({h: address}) + self.send([('blockchain.scripthash.get_history', [h])], self.map_scripthash_to_address(callback)) + + # NOTE this method handles exceptions and a special edge case, counter to + # what the other ElectrumX methods do. This is unexpected. + def broadcast_transaction(self, transaction, callback=None): + command = 'blockchain.transaction.broadcast' + invocation = lambda c: self.send([(command, [str(transaction)])], c) + + if callback: + invocation(callback) + return + try: - out = self.synchronous_get(('blockchain.transaction.broadcast', [str(tx)]), timeout) + out = Network.__wait_for(invocation) except BaseException as e: return False, "error: " + str(e) - if out != tx_hash: + + if out != transaction.txid(): return False, "error: " + out + return True, out + def get_history_for_scripthash(self, hash, callback=None): + command = 'blockchain.scripthash.get_history' + invocation = lambda c: self.send([(command, [hash])], c) + + return Network.__with_default_synchronous_callback(invocation, callback) + + def subscribe_to_headers(self, callback=None): + command = 'blockchain.headers.subscribe' + invocation = lambda c: self.send([(command, [True])], c) + + return Network.__with_default_synchronous_callback(invocation, callback) + + def subscribe_to_address(self, address, callback=None): + command = 'blockchain.address.subscribe' + invocation = lambda c: self.send([(command, [address])], c) + + return Network.__with_default_synchronous_callback(invocation, callback) + + def get_merkle_for_transaction(self, tx_hash, tx_height, callback=None): + command = 'blockchain.transaction.get_merkle' + invocation = lambda c: self.send([(command, [tx_hash, tx_height])], c) + + return Network.__with_default_synchronous_callback(invocation, callback) + + def subscribe_to_scripthash(self, scripthash, callback=None): + command = 'blockchain.scripthash.subscribe' + invocation = lambda c: self.send([(command, [scripthash])], c) + + return Network.__with_default_synchronous_callback(invocation, callback) + + def get_transaction(self, transaction_hash, callback=None): + command = 'blockchain.transaction.get' + invocation = lambda c: self.send([(command, [transaction_hash])], c) + + return Network.__with_default_synchronous_callback(invocation, callback) + + def get_transactions(self, transaction_hashes, callback=None): + command = 'blockchain.transaction.get' + messages = [(command, [tx_hash]) for tx_hash in transaction_hashes] + invocation = lambda c: self.send(messages, c) + + return Network.__with_default_synchronous_callback(invocation, callback) + + def listunspent_for_scripthash(self, scripthash, callback=None): + command = 'blockchain.scripthash.listunspent' + invocation = lambda c: self.send([(command, [scripthash])], c) + + return Network.__with_default_synchronous_callback(invocation, callback) + + def get_balance_for_scripthash(self, scripthash, callback=None): + command = 'blockchain.scripthash.get_balance' + invocation = lambda c: self.send([(command, [scripthash])], c) + + return Network.__with_default_synchronous_callback(invocation, callback) + def export_checkpoints(self, path): # run manually from the console to generate checkpoints cp = self.blockchain().get_checkpoints() diff --git a/lib/synchronizer.py b/lib/synchronizer.py index 341aa0444..c865d272f 100644 --- a/lib/synchronizer.py +++ b/lib/synchronizer.py @@ -160,15 +160,16 @@ class Synchronizer(ThreadJob): def request_missing_txs(self, hist): # "hist" is a list of [tx_hash, tx_height] lists - requests = [] + transaction_hashes = [] for tx_hash, tx_height in hist: if tx_hash in self.requested_tx: continue if tx_hash in self.wallet.transactions: continue - requests.append(('blockchain.transaction.get', [tx_hash])) + transaction_hashes.append(tx_hash) self.requested_tx[tx_hash] = tx_height - self.network.send(requests, self.on_tx_response) + + self.network.get_transactions(transaction_hashes, self.on_tx_response) def initialize(self): '''Check the initial state of the wallet. Subscribe to all its diff --git a/lib/tests/test_transaction.py b/lib/tests/test_transaction.py index 6662c31f9..3f38b5629 100644 --- a/lib/tests/test_transaction.py +++ b/lib/tests/test_transaction.py @@ -773,5 +773,5 @@ class NetworkMock(object): def __init__(self, unspent): self.unspent = unspent - def synchronous_get(self, arg): + def synchronous_send(self, arg): return self.unspent diff --git a/lib/verifier.py b/lib/verifier.py index 06c6b0118..376e8aad6 100644 --- a/lib/verifier.py +++ b/lib/verifier.py @@ -54,9 +54,11 @@ class SPV(ThreadJob): else: if (tx_hash not in self.requested_merkle and tx_hash not in self.merkle_roots): - request = ('blockchain.transaction.get_merkle', - [tx_hash, tx_height]) - self.network.send([request], self.verify_merkle) + + self.network.get_merkle_for_transaction( + tx_hash, + tx_height, + self.verify_merkle) self.print_error('requested merkle', tx_hash) self.requested_merkle.add(tx_hash) diff --git a/lib/wallet.py b/lib/wallet.py index 37718768e..db2d21a0b 100644 --- a/lib/wallet.py +++ b/lib/wallet.py @@ -93,12 +93,13 @@ def dust_threshold(network): def append_utxos_to_inputs(inputs, network, pubkey, txin_type, imax): if txin_type != 'p2pk': address = bitcoin.pubkey_to_address(txin_type, pubkey) - sh = bitcoin.address_to_scripthash(address) + scripthash = bitcoin.address_to_scripthash(address) else: script = bitcoin.public_key_to_p2pk_script(pubkey) - sh = bitcoin.script_to_scripthash(script) + scripthash = bitcoin.script_to_scripthash(script) address = '(pubkey)' - u = network.synchronous_get(('blockchain.scripthash.listunspent', [sh])) + + u = network.listunspent_for_scripthash(scripthash) for item in u: if len(inputs) >= imax: break @@ -1471,9 +1472,8 @@ class Abstract_Wallet(PrintError): # all the input txs, in which case we ask the network. tx = self.transactions.get(tx_hash, None) if not tx and self.network: - request = ('blockchain.transaction.get', [tx_hash]) try: - tx = Transaction(self.network.synchronous_get(request)) + tx = Transaction(self.network.get_transaction(tx_hash)) except TimeoutException as e: self.print_error('getting input txn from network timed out for {}'.format(tx_hash)) if not ignore_timeout: diff --git a/lib/websockets.py b/lib/websockets.py index 5462dd161..ea68dfa5f 100644 --- a/lib/websockets.py +++ b/lib/websockets.py @@ -95,17 +95,18 @@ class WsClientThread(util.DaemonThread): continue util.print_error('response', r) method = r.get('method') - params = r.get('params') + scripthash = r.get('params')[0] result = r.get('result') if result is None: continue if method == 'blockchain.scripthash.subscribe': - self.network.send([('blockchain.scripthash.get_balance', params)], self.response_queue.put) + self.network.get_balance_for_scripthash( + scripthash, self.response_queue.put) elif method == 'blockchain.scripthash.get_balance': - h = params[0] - addr = self.network.h2addr.get(h, None) + addr = self.network.h2addr.get(scripthash, None) if addr is None: - util.print_error("can't find address for scripthash: %s" % h) + util.print_error( + "can't find address for scripthash: %s" % scripthash) l = self.subscriptions.get(addr, []) for ws, amount in l: if not ws.closed: diff --git a/scripts/block_headers b/scripts/block_headers index 14ae918a9..cda9e14b9 100755 --- a/scripts/block_headers +++ b/scripts/block_headers @@ -22,7 +22,7 @@ if not network.is_connected(): # 2. send the subscription callback = lambda response: print_msg(json_encode(response.get('result'))) network.send([('server.version',["block_headers script", "1.2"])], callback) -network.send([('blockchain.headers.subscribe',[True])], callback) +network.subscribe_to_headers(callback) # 3. wait for results while network.is_connected(): diff --git a/scripts/get_history b/scripts/get_history index b78fcc58a..3e25166da 100755 --- a/scripts/get_history +++ b/scripts/get_history @@ -14,5 +14,5 @@ except Exception: n = Network() n.start() _hash = bitcoin.address_to_scripthash(addr) -h = n.synchronous_get(('blockchain.scripthash.get_history',[_hash])) +h = n.get_history_for_scripthash(_hash) print_msg(json_encode(h)) diff --git a/scripts/watch_address b/scripts/watch_address index a8446fc81..9c60afba4 100755 --- a/scripts/watch_address +++ b/scripts/watch_address @@ -29,7 +29,7 @@ if not network.is_connected(): # 2. send the subscription callback = lambda response: print_msg(json_encode(response.get('result'))) -network.send([('blockchain.scripthash.subscribe',[sh])], callback) +network.subscribe_to_address(addr, callback) # 3. wait for results while network.is_connected():