Improve Internal Logic for Network Retries (#4880)

This commit is contained in:
Bibo-Joshi
2025-08-02 02:11:57 +02:00
committed by GitHub
parent 197f29b6eb
commit a76fa2c79e
4 changed files with 29 additions and 15 deletions
@@ -0,0 +1,6 @@
internal = "Improve Internal Logic for Network Retries"
[[pull_requests]]
uid = "4880"
author_uid = "Bibo-Joshi"
closes_threads = ["4871"]
+5 -7
View File
@@ -335,7 +335,7 @@ class Updater(contextlib.AbstractAsyncContextManager["Updater"]):
_LOGGER.debug("Bootstrap done")
async def polling_action_cb() -> bool:
async def polling_action_cb() -> None:
try:
updates = await self.bot.get_updates(
offset=self._last_update_id,
@@ -352,7 +352,7 @@ class Updater(contextlib.AbstractAsyncContextManager["Updater"]):
"Received data was *not* processed!",
exc_info=exc,
)
return True
return
if updates:
if not self.running:
@@ -365,7 +365,7 @@ class Updater(contextlib.AbstractAsyncContextManager["Updater"]):
await self.update_queue.put(update)
self._last_update_id = updates[-1].update_id + 1 # Add one to 'confirm' it
return True # Keep fetching updates & don't quit. Polls with poll_interval.
return
def default_error_callback(exc: TelegramError) -> None:
_LOGGER.exception("Exception happened while polling for updates.", exc_info=exc)
@@ -678,14 +678,13 @@ class Updater(contextlib.AbstractAsyncContextManager["Updater"]):
:paramref:`max_retries`.
"""
async def bootstrap_del_webhook() -> bool:
async def bootstrap_del_webhook() -> None:
_LOGGER.debug("Deleting webhook")
if drop_pending_updates:
_LOGGER.debug("Dropping pending updates from Telegram server")
await self.bot.delete_webhook(drop_pending_updates=drop_pending_updates)
return False
async def bootstrap_set_webhook() -> bool:
async def bootstrap_set_webhook() -> None:
_LOGGER.debug("Setting webhook")
if drop_pending_updates:
_LOGGER.debug("Dropping pending updates from Telegram server")
@@ -698,7 +697,6 @@ class Updater(contextlib.AbstractAsyncContextManager["Updater"]):
max_connections=max_connections,
secret_token=secret_token,
)
return False
# Dropping pending updates from TG can be efficiently done with the drop_pending_updates
# parameter of delete/start_webhook, even in the case of polling. Also, we want to make
+16 -8
View File
@@ -54,10 +54,12 @@ async def network_retry_loop(
) -> None:
"""Perform a loop calling `action_cb`, retrying after network errors.
Stop condition for loop:
* `is_running()` evaluates :obj:`False` or
* return value of `action_cb` evaluates :obj:`False`
Stop condition for loop in case of ``max_retries < 0``:
* `is_running()` evaluates :obj:`False`
* or `stop_event` is set.
Additional stop condition for loop in case of `max_retries >= 0``:
* a call to `action_cb` succeeds
* or `max_retries` is reached.
Args:
@@ -86,12 +88,14 @@ async def network_retry_loop(
* > 0: Number of retries.
"""
infinite_loop = max_retries < 0
log_prefix = f"Network Retry Loop ({description}):"
effective_is_running = is_running or (lambda: True)
async def do_action() -> bool:
async def do_action() -> None:
if not stop_event:
return await action_cb()
await action_cb()
return
action_cb_task = asyncio.create_task(action_cb())
stop_task = asyncio.create_task(stop_event.wait())
@@ -104,16 +108,20 @@ async def network_retry_loop(
if stop_task in done:
_LOGGER.debug("%s Cancelled", log_prefix)
return False
return
return action_cb_task.result()
# Calling `result()` on `action_cb_task` will raise an exception if the task failed.
# this is important to propagate the error to the caller.
action_cb_task.result()
_LOGGER.debug("%s Starting", log_prefix)
cur_interval = interval
retries = 0
while effective_is_running():
try:
if not await do_action():
await do_action()
if not infinite_loop:
_LOGGER.debug("%s Action succeeded. Stopping loop.", log_prefix)
break
except RetryAfter as exc:
slack_time = 0.5
+2
View File
@@ -1716,6 +1716,7 @@ class TestApplication:
expected = {
name: name for name in updater_signature.parameters if name != "error_callback"
}
expected["bootstrap_retries"] = 42
thread = Thread(target=thread_target)
thread.start()
app.run_polling(close_loop=False, **expected)
@@ -2022,6 +2023,7 @@ class TestApplication:
assert self.received[name] == param.default
expected = {name: name for name in updater_signature.parameters if name != "self"}
expected["bootstrap_retries"] = 42
thread = Thread(target=thread_target)
thread.start()
app.run_webhook(close_loop=False, **expected)