|
|
@ -1265,6 +1265,37 @@ class OldTaskGroup(aiorpcx.TaskGroup): |
|
|
|
if self.completed: |
|
|
|
self.completed.result() |
|
|
|
|
|
|
|
# We monkey-patch aiorpcx TimeoutAfter (used by timeout_after and ignore_after API), |
|
|
|
# to fix a timing issue present in asyncio as a whole re timing out tasks. |
|
|
|
# To see the issue we are trying to fix, consider example: |
|
|
|
# async def outer_task(): |
|
|
|
# async with timeout_after(0.1): |
|
|
|
# await inner_task() |
|
|
|
# When the 0.1 sec timeout expires, inner_task will get cancelled by timeout_after (=internal cancellation). |
|
|
|
# If around the same time (in terms of event loop iterations) another coroutine |
|
|
|
# cancels outer_task (=external cancellation), there will be a race. |
|
|
|
# Both cancellations work by propagating a CancelledError out to timeout_after, which then |
|
|
|
# needs to decide (in TimeoutAfter.__aexit__) whether it's due to an internal or external cancellation. |
|
|
|
# AFAICT asyncio provides no reliable way of distinguishing between the two. |
|
|
|
# This patch tries to always give priority to external cancellations. |
|
|
|
# see https://github.com/kyuupichan/aiorpcX/issues/44 |
|
|
|
# see https://github.com/aio-libs/async-timeout/issues/229 |
|
|
|
# see https://bugs.python.org/issue42130 and https://bugs.python.org/issue45098 |
|
|
|
def _aiorpcx_monkeypatched_set_new_deadline(task, deadline): |
|
|
|
def timeout_task(): |
|
|
|
task._orig_cancel() |
|
|
|
task._timed_out = None if getattr(task, "_externally_cancelled", False) else deadline |
|
|
|
def mycancel(*args, **kwargs): |
|
|
|
task._orig_cancel(*args, **kwargs) |
|
|
|
task._externally_cancelled = True |
|
|
|
task._timed_out = None |
|
|
|
if not hasattr(task, "_orig_cancel"): |
|
|
|
task._orig_cancel = task.cancel |
|
|
|
task.cancel = mycancel |
|
|
|
task._deadline_handle = task._loop.call_at(deadline, timeout_task) |
|
|
|
|
|
|
|
aiorpcx.curio._set_new_deadline = _aiorpcx_monkeypatched_set_new_deadline |
|
|
|
|
|
|
|
|
|
|
|
class NetworkJobOnDefaultServer(Logger, ABC): |
|
|
|
"""An abstract base class for a job that runs on the main network |
|
|
|