refactor: replace try-except blocks with contextlib.suppress for cleaner error handling across multiple files

This commit is contained in:
Jack Lu 2026-05-01 01:42:31 +08:00 committed by Xubin Ren
parent 1c24f10236
commit d9800ecdd2
27 changed files with 98 additions and 195 deletions

View File

@ -3,6 +3,7 @@
import base64
import mimetypes
import platform
from contextlib import suppress
from importlib.resources import files as pkg_files
from pathlib import Path
from typing import Any
@ -121,12 +122,10 @@ class ContextBuilder:
@staticmethod
def _is_template_content(content: str, template_path: str) -> bool:
"""Check if *content* is identical to the bundled template (user hasn't customized it)."""
try:
with suppress(Exception):
tpl = pkg_files("nanobot") / "templates" / template_path
if tpl.is_file():
return content.strip() == tpl.read_text(encoding="utf-8").strip()
except Exception:
pass
return False
def build_messages(

View File

@ -7,7 +7,7 @@ import dataclasses
import json
import os
import time
from contextlib import AsyncExitStack, nullcontext
from contextlib import AsyncExitStack, nullcontext, suppress
from pathlib import Path
from typing import TYPE_CHECKING, Any, Awaitable, Callable
@ -492,10 +492,8 @@ class AgentLoop:
tasks = self._active_tasks.pop(key, [])
cancelled = sum(1 for t in tasks if not t.done() and t.cancel())
for t in tasks:
try:
with suppress(asyncio.CancelledError, Exception):
await t
except (asyncio.CancelledError, Exception):
pass
sub_cancelled = await self.subagents.cancel_by_session(key)
return cancelled + sub_cancelled

View File

@ -7,6 +7,7 @@ import json
import os
import re
import weakref
from contextlib import suppress
import tiktoken
from datetime import datetime
from pathlib import Path
@ -296,10 +297,8 @@ class MemoryStore:
def _next_cursor(self) -> int:
"""Read the current cursor counter and return the next value."""
if self._cursor_file.exists():
try:
with suppress(ValueError, OSError):
return int(self._cursor_file.read_text(encoding="utf-8").strip()) + 1
except (ValueError, OSError):
pass
# Fast path: trust the tail when intact. Otherwise scan the whole
# file and take ``max`` — that stays correct even if the monotonic
# invariant was broken by external writes.
@ -328,7 +327,7 @@ class MemoryStore:
def _read_entries(self) -> list[dict[str, Any]]:
"""Read all entries from history.jsonl."""
entries: list[dict[str, Any]] = []
try:
with suppress(FileNotFoundError):
with open(self.history_file, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
@ -337,8 +336,7 @@ class MemoryStore:
entries.append(json.loads(line))
except json.JSONDecodeError:
continue
except FileNotFoundError:
pass
return entries
def _read_last_entry(self) -> dict[str, Any] | None:
@ -374,14 +372,12 @@ class MemoryStore:
# On Windows, opening a directory with O_RDONLY raises
# PermissionError — skip the dir sync there (NTFS
# journals metadata synchronously).
try:
with suppress(PermissionError):
fd = os.open(str(self.history_file.parent), os.O_RDONLY)
try:
os.fsync(fd)
finally:
os.close(fd)
except PermissionError:
pass # Windows — directory fsync not supported
except BaseException:
tmp_path.unlink(missing_ok=True)
raise
@ -390,10 +386,8 @@ class MemoryStore:
def get_last_dream_cursor(self) -> int:
if self._dream_cursor_file.exists():
try:
with suppress(ValueError, OSError):
return int(self._dream_cursor_file.read_text(encoding="utf-8").strip())
except (ValueError, OSError):
pass
return 0
def set_last_dream_cursor(self, cursor: int) -> None:

View File

@ -5,6 +5,7 @@ from __future__ import annotations
import asyncio
import inspect
import os
from contextlib import suppress
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any
@ -752,12 +753,10 @@ class AgentRunner:
prepare_call = getattr(spec.tools, "prepare_call", None)
tool, params, prep_error = None, tool_call.arguments, None
if callable(prepare_call):
try:
with suppress(Exception):
prepared = prepare_call(tool_call.name, tool_call.arguments)
if isinstance(prepared, tuple) and len(prepared) == 3:
tool, params, prep_error = prepared
except Exception:
pass
if prep_error:
event = {
"name": tool_call.name,

View File

@ -4,7 +4,7 @@ import asyncio
import os
import re
import shutil
from contextlib import AsyncExitStack
from contextlib import AsyncExitStack, suppress
from typing import Any
import httpx
@ -609,10 +609,8 @@ async def connect_mcp_servers(
"only JSON-RPC to stdout and sends logs/debug output to stderr instead."
)
logger.error("MCP server '{}': failed to connect: {}{}", name, e, hint)
try:
with suppress(Exception):
await server_stack.aclose()
except Exception:
pass
return name, None
server_stacks: dict[str, AsyncExitStack] = {}

View File

@ -5,6 +5,7 @@ from __future__ import annotations
import fnmatch
import os
import re
from contextlib import suppress
from pathlib import Path, PurePosixPath
from typing import Any, Iterable, TypeVar
@ -92,10 +93,8 @@ class _SearchTool(_FsTool):
def _display_path(self, target: Path, root: Path) -> str:
if self._workspace:
try:
with suppress(ValueError):
return target.relative_to(self._workspace).as_posix()
except ValueError:
pass
return target.relative_to(root).as_posix()
def _iter_files(self, root: Path) -> Iterable[Path]:

View File

@ -5,6 +5,7 @@ import os
import re
import shutil
import sys
from contextlib import suppress
from pathlib import Path
from typing import Any
@ -212,9 +213,8 @@ class ExecTool(Tool):
"""Kill a subprocess and reap it to prevent zombies."""
process.kill()
try:
with suppress(asyncio.TimeoutError):
await asyncio.wait_for(process.wait(), timeout=5.0)
except asyncio.TimeoutError:
pass
finally:
if not _IS_WINDOWS:
try:

View File

@ -5,6 +5,7 @@ from __future__ import annotations
import asyncio
import importlib.util
import time
from contextlib import suppress
from dataclasses import dataclass
from pathlib import Path
from typing import TYPE_CHECKING, Any, Literal
@ -564,10 +565,8 @@ class DiscordChannel(BaseChannel):
# Delayed working indicator (cosmetic — not tied to subagent lifecycle)
async def _delayed_working_emoji() -> None:
await asyncio.sleep(self.config.working_emoji_delay)
try:
with suppress(Exception):
await message.add_reaction(self.config.working_emoji)
except Exception:
pass
self._working_emoji_tasks[channel_id] = asyncio.create_task(_delayed_working_emoji())
@ -771,10 +770,8 @@ class DiscordChannel(BaseChannel):
if task is None:
return
task.cancel()
try:
with suppress(asyncio.CancelledError):
await task
except asyncio.CancelledError:
pass
async def _clear_reactions(self, chat_id: str) -> None:
"""Remove all pending reactions after bot replies."""
@ -788,10 +785,8 @@ class DiscordChannel(BaseChannel):
return
bot_user = self._client.user if self._client else None
for emoji in (self.config.read_receipt_emoji, self.config.working_emoji):
try:
with suppress(Exception):
await msg_obj.remove_reaction(emoji, bot_user)
except Exception:
pass
async def _cancel_all_typing(self) -> None:
"""Stop all typing tasks."""

View File

@ -6,6 +6,7 @@ import imaplib
import re
import smtplib
import ssl
from contextlib import suppress
from datetime import date
from email import policy
from email.header import decode_header, make_header
@ -460,10 +461,8 @@ class EmailChannel(BaseChannel):
if mark_seen:
client.store(imap_id, "+FLAGS", "\\Seen")
finally:
try:
with suppress(Exception):
client.logout()
except Exception:
pass
def _collect_self_addresses(self) -> set[str]:
"""Return normalized email addresses owned by this channel instance."""

View File

@ -9,6 +9,7 @@ import threading
import time
import uuid
from collections import OrderedDict
from contextlib import suppress
from dataclasses import dataclass
from typing import Any, Literal
@ -612,12 +613,11 @@ class FeishuChannel(BaseChannel):
"""Callback: store reaction_id after background add-reaction completes."""
if task.cancelled():
return
try:
# Failures already logged by _on_background_task_done.
with suppress(Exception):
reaction_id = task.result()
if reaction_id:
self._reaction_ids[message_id] = reaction_id
except Exception:
pass # already logged by _on_background_task_done
# Trim cache to prevent unbounded growth
if len(self._reaction_ids) > 500:
self._reaction_ids.pop(next(iter(self._reaction_ids)))

View File

@ -3,6 +3,7 @@
from __future__ import annotations
import asyncio
from contextlib import suppress
from pathlib import Path
from typing import TYPE_CHECKING, Any
@ -220,10 +221,8 @@ class ChannelManager:
# Stop dispatcher
if self._dispatch_task:
self._dispatch_task.cancel()
try:
with suppress(asyncio.CancelledError):
await self._dispatch_task
except asyncio.CancelledError:
pass
# Stop all channels
for name, channel in self.channels.items():

View File

@ -5,6 +5,7 @@ import json
import logging
import mimetypes
import time
from contextlib import suppress
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Literal, TypeAlias
@ -341,10 +342,8 @@ class MatrixChannel(BaseChannel):
timeout=self.config.sync_stop_grace_seconds)
except (asyncio.TimeoutError, asyncio.CancelledError):
self._sync_task.cancel()
try:
with suppress(asyncio.CancelledError):
await self._sync_task
except asyncio.CancelledError:
pass
if self.client:
await self.client.close()
@ -609,13 +608,11 @@ class MatrixChannel(BaseChannel):
"""Best-effort typing indicator update."""
if not self.client:
return
try:
with suppress(Exception):
response = await self.client.room_typing(room_id=room_id, typing_state=typing,
timeout=TYPING_NOTICE_TIMEOUT_MS)
if isinstance(response, RoomTypingError):
logger.debug("Matrix typing failed for {}: {}", room_id, response)
except Exception:
pass
async def _start_typing_keepalive(self, room_id: str) -> None:
"""Start periodic typing refresh (spec-recommended keepalive)."""
@ -625,22 +622,18 @@ class MatrixChannel(BaseChannel):
return
async def loop() -> None:
try:
with suppress(asyncio.CancelledError):
while self._running:
await asyncio.sleep(TYPING_KEEPALIVE_INTERVAL_MS / 1000)
await self._set_typing(room_id, True)
except asyncio.CancelledError:
pass
self._typing_tasks[room_id] = asyncio.create_task(loop())
async def _stop_typing_keepalive(self, room_id: str, *, clear_typing: bool) -> None:
if task := self._typing_tasks.pop(room_id, None):
task.cancel()
try:
with suppress(asyncio.CancelledError):
await task
except asyncio.CancelledError:
pass
if clear_typing:
await self._set_typing(room_id, False)

View File

@ -5,6 +5,7 @@ from __future__ import annotations
import asyncio
import json
from collections import deque
from contextlib import suppress
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any
@ -330,10 +331,8 @@ class MochatChannel(BaseChannel):
await self._cancel_delay_timers()
if self._socket:
try:
with suppress(Exception):
await self._socket.disconnect()
except Exception:
pass
self._socket = None
if self._cursor_save_task:
@ -460,10 +459,8 @@ class MochatChannel(BaseChannel):
return True
except Exception as e:
logger.error("Failed to connect Mochat websocket: {}", e)
try:
with suppress(Exception):
await client.disconnect()
except Exception:
pass
self._socket = None
return False

View File

@ -20,7 +20,7 @@ import re
import tempfile
import threading
import time
from contextlib import contextmanager
from contextlib import contextmanager, suppress
from dataclasses import dataclass
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from typing import TYPE_CHECKING, Any
@ -712,10 +712,8 @@ class MSTeamsChannel(BaseChannel):
os.replace(tmp_path, path)
finally:
if tmp_path and os.path.exists(tmp_path):
try:
with suppress(OSError):
os.unlink(tmp_path)
except OSError:
pass
def _save_refs_locked(self, *, prune: bool = True) -> None:
"""Persist conversation references (caller must hold _refs_guard)."""

View File

@ -25,6 +25,7 @@ import os
import re
import time
from collections import deque
from contextlib import suppress
from pathlib import Path
from typing import TYPE_CHECKING, Any, Literal
from urllib.parse import unquote, urlparse
@ -221,17 +222,13 @@ class QQChannel(BaseChannel):
"""Stop bot and cleanup resources."""
self._running = False
if self._client:
try:
with suppress(Exception):
await self._client.close()
except Exception:
pass
self._client = None
if self._http:
try:
with suppress(Exception):
await self._http.close()
except Exception:
pass
self._http = None
logger.info("QQ bot stopped")
@ -683,7 +680,5 @@ class QQChannel(BaseChannel):
finally:
# Cleanup partial file
if tmp_path is not None:
try:
with suppress(Exception):
tmp_path.unlink(missing_ok=True)
except Exception:
pass

View File

@ -6,6 +6,7 @@ import asyncio
import re
import time
import unicodedata
from contextlib import suppress
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Literal
@ -462,10 +463,8 @@ class TelegramChannel(BaseChannel):
if not msg.metadata.get("_progress", False):
self._stop_typing(msg.chat_id)
if reply_to_message_id := msg.metadata.get("message_id"):
try:
with suppress(ValueError):
await self._remove_reaction(msg.chat_id, int(reply_to_message_id))
except ValueError:
pass
try:
chat_id = int(msg.chat_id)
@ -642,10 +641,8 @@ class TelegramChannel(BaseChannel):
return
self._stop_typing(chat_id)
if reply_to_message_id := meta.get("message_id"):
try:
with suppress(ValueError):
await self._remove_reaction(chat_id, int(reply_to_message_id))
except ValueError:
pass
thread_kwargs = {}
if message_thread_id := meta.get("message_thread_id"):
thread_kwargs["message_thread_id"] = message_thread_id
@ -1162,11 +1159,10 @@ class TelegramChannel(BaseChannel):
async def _typing_loop(self, chat_id: str) -> None:
"""Repeatedly send 'typing' action until cancelled."""
try:
with suppress(asyncio.CancelledError):
while self._app:
await self._app.bot.send_chat_action(chat_id=int(chat_id), action="typing")
await asyncio.sleep(4)
except asyncio.CancelledError:
pass
except Exception as e:
logger.debug("Typing indicator stopped for {}: {}", chat_id, e)
@ -1265,10 +1261,8 @@ class TelegramChannel(BaseChannel):
button_label = query.data or ""
await query.answer()
if query.message:
try:
with suppress(Exception):
await query.message.edit_reply_markup(reply_markup=None)
except Exception:
pass
logger.debug("Inline button tap from {}: {}", sender_id, button_label)
self._start_typing(str(chat_id))
await self._handle_message(

View File

@ -19,6 +19,7 @@ import re
import time
import uuid
from collections import OrderedDict
from contextlib import suppress
from pathlib import Path
from typing import Any
from urllib.parse import quote
@ -211,7 +212,7 @@ class WeixinChannel(BaseChannel):
def _save_state(self) -> None:
state_file = self._get_state_dir() / "account.json"
try:
with suppress(Exception):
data = {
"token": self._token,
"get_updates_buf": self._get_updates_buf,
@ -220,8 +221,6 @@ class WeixinChannel(BaseChannel):
"base_url": self.config.base_url,
}
state_file.write_text(json.dumps(data, ensure_ascii=False))
except Exception:
pass
# ------------------------------------------------------------------
# HTTP helpers (matches api.ts buildHeaders / apiFetch)
@ -576,10 +575,8 @@ class WeixinChannel(BaseChannel):
# Process messages (WeixinMessage[] from types.ts)
msgs: list[dict] = data.get("msgs", []) or []
for msg in msgs:
try:
with suppress(Exception):
await self._process_message(msg)
except Exception:
pass
# ------------------------------------------------------------------
# Inbound message processing (matches inbound.ts + process-message.ts)
@ -932,10 +929,8 @@ class WeixinChannel(BaseChannel):
await asyncio.sleep(TYPING_KEEPALIVE_INTERVAL_S)
if stop_event.is_set():
break
try:
with suppress(Exception):
await self._send_typing(user_id, typing_ticket, TYPING_STATUS_TYPING)
except Exception:
pass
finally:
pass
@ -962,16 +957,12 @@ class WeixinChannel(BaseChannel):
return
typing_ticket = ""
try:
with suppress(Exception):
typing_ticket = await self._get_typing_ticket(msg.chat_id, ctx_token)
except Exception:
typing_ticket = ""
if typing_ticket:
try:
with suppress(Exception):
await self._send_typing(msg.chat_id, typing_ticket, TYPING_STATUS_TYPING)
except Exception:
pass
typing_keepalive_stop = asyncio.Event()
typing_keepalive_task: asyncio.Task | None = None
@ -1043,16 +1034,12 @@ class WeixinChannel(BaseChannel):
if typing_keepalive_task:
typing_keepalive_stop.set()
typing_keepalive_task.cancel()
try:
with suppress(asyncio.CancelledError):
await typing_keepalive_task
except asyncio.CancelledError:
pass
if typing_ticket and not is_progress:
try:
with suppress(Exception):
await self._send_typing(msg.chat_id, typing_ticket, TYPING_STATUS_CANCEL)
except Exception:
pass
async def _start_typing(self, chat_id: str, context_token: str = "") -> None:
"""Start typing indicator immediately when a message is received."""
@ -1076,10 +1063,8 @@ class WeixinChannel(BaseChannel):
await asyncio.sleep(TYPING_KEEPALIVE_INTERVAL_S)
if stop_event.is_set():
break
try:
with suppress(Exception):
await self._send_typing(chat_id, ticket, TYPING_STATUS_TYPING)
except Exception:
pass
finally:
pass
@ -1095,10 +1080,8 @@ class WeixinChannel(BaseChannel):
if stop_event:
stop_event.set()
task.cancel()
try:
with suppress(asyncio.CancelledError):
await task
except asyncio.CancelledError:
pass
if not clear_remote:
return
entry = self._typing_tickets.get(chat_id)
@ -1339,13 +1322,11 @@ def _encrypt_aes_ecb(data: bytes, aes_key_b64: str) -> bytes:
pad_len = 16 - len(data) % 16
padded = data + bytes([pad_len] * pad_len)
try:
with suppress(ImportError):
from Crypto.Cipher import AES
cipher = AES.new(key, AES.MODE_ECB)
return cipher.encrypt(padded)
except ImportError:
pass
try:
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
@ -1371,13 +1352,11 @@ def _decrypt_aes_ecb(data: bytes, aes_key_b64: str) -> bytes:
decrypted: bytes | None = None
try:
with suppress(ImportError):
from Crypto.Cipher import AES
cipher = AES.new(key, AES.MODE_ECB)
decrypted = cipher.decrypt(data)
except ImportError:
pass
if decrypted is None:
try:

View File

@ -8,6 +8,7 @@ import os
import secrets
import shutil
import subprocess
from contextlib import suppress
from collections import OrderedDict
from pathlib import Path
from typing import Any, Literal
@ -47,10 +48,8 @@ def _load_or_create_bridge_token(path: Path) -> str:
path.parent.mkdir(parents=True, exist_ok=True)
token = secrets.token_urlsafe(32)
path.write_text(token, encoding="utf-8")
try:
with suppress(OSError):
path.chmod(0o600)
except OSError:
pass
return token

View File

@ -5,7 +5,7 @@ import os
import select
import signal
import sys
from contextlib import nullcontext
from contextlib import nullcontext, suppress
from pathlib import Path
from typing import Any
@ -14,11 +14,9 @@ if sys.platform == "win32":
if sys.stdout.encoding != "utf-8":
os.environ["PYTHONIOENCODING"] = "utf-8"
# Re-open stdout/stderr with UTF-8 encoding
try:
with suppress(Exception):
sys.stdout.reconfigure(encoding="utf-8", errors="replace")
sys.stderr.reconfigure(encoding="utf-8", errors="replace")
except Exception:
pass
import typer
from loguru import logger
@ -83,35 +81,29 @@ def _flush_pending_tty_input() -> None:
except Exception:
return
try:
with suppress(Exception):
import termios
termios.tcflush(fd, termios.TCIFLUSH)
return
except Exception:
pass
try:
with suppress(Exception):
while True:
ready, _, _ = select.select([fd], [], [], 0)
if not ready:
break
if not os.read(fd, 4096):
break
except Exception:
return
def _restore_terminal() -> None:
"""Restore terminal to its original state (echo, line buffering, etc.)."""
if _SAVED_TERM_ATTRS is None:
return
try:
with suppress(Exception):
import termios
termios.tcsetattr(sys.stdin.fileno(), termios.TCSADRAIN, _SAVED_TERM_ATTRS)
except Exception:
pass
def _init_prompt_session() -> None:
@ -119,12 +111,10 @@ def _init_prompt_session() -> None:
global _PROMPT_SESSION, _SAVED_TERM_ATTRS
# Save terminal state so we can restore it on exit
try:
with suppress(Exception):
import termios
_SAVED_TERM_ATTRS = termios.tcgetattr(sys.stdin.fileno())
except Exception:
pass
from nanobot.config.paths import get_cli_history_path
@ -936,10 +926,8 @@ def _run_gateway(
config.gateway.host or "127.0.0.1", port
)
writer.close()
try:
with suppress(Exception):
await writer.wait_closed()
except Exception:
pass
break
except OSError:
await asyncio.sleep(0.1)
@ -1520,10 +1508,8 @@ def _login_openai_codex() -> None:
from oauth_cli_kit import get_token, login_oauth_interactive
token = None
try:
with suppress(Exception):
token = get_token()
except Exception:
pass
if not (token and token.access):
console.print("[cyan]Starting interactive OAuth login...[/cyan]\n")
token = login_oauth_interactive(

View File

@ -5,6 +5,7 @@ from __future__ import annotations
import asyncio
import os
import sys
from contextlib import suppress
from nanobot import __version__
from nanobot.bus.events import OutboundMessage
@ -50,16 +51,15 @@ async def cmd_status(ctx: CommandContext) -> OutboundMessage:
loop = ctx.loop
session = ctx.session or loop.sessions.get_or_create(ctx.key)
ctx_est = 0
try:
with suppress(Exception):
ctx_est, _ = loop.consolidator.estimate_session_prompt_tokens(session)
except Exception:
pass
if ctx_est <= 0:
ctx_est = loop._last_usage.get("prompt_tokens", 0)
# Fetch web search provider usage (best-effort, never blocks the response)
search_usage_text: str | None = None
try:
# Never let usage fetch break /status
with suppress(Exception):
from nanobot.utils.searchusage import fetch_search_usage
web_cfg = getattr(loop, "web_config", None)
search_cfg = getattr(web_cfg, "search", None) if web_cfg else None
@ -68,14 +68,10 @@ async def cmd_status(ctx: CommandContext) -> OutboundMessage:
api_key = getattr(search_cfg, "api_key", "") or None
usage = await fetch_search_usage(provider=provider, api_key=api_key)
search_usage_text = usage.format()
except Exception:
pass # Never let usage fetch break /status
active_tasks = loop._active_tasks.get(ctx.key, [])
task_count = sum(1 for t in active_tasks if not t.done())
try:
with suppress(Exception):
task_count += loop.subagents.get_running_count_by_session(ctx.key)
except Exception:
pass
return OutboundMessage(
channel=ctx.msg.channel,
chat_id=ctx.msg.chat_id,

View File

@ -4,6 +4,7 @@ import asyncio
import json
import re
from abc import ABC, abstractmethod
from contextlib import suppress
from collections.abc import Awaitable, Callable
from dataclasses import dataclass, field
from datetime import datetime, timezone
@ -643,14 +644,12 @@ class LLMProvider(ABC):
return value
return None
try:
with suppress(TypeError, ValueError):
retry_ms = _header_value("retry-after-ms")
if retry_ms is not None:
value = float(retry_ms) / 1000.0
if value > 0:
return value
except (TypeError, ValueError):
pass
retry_after = _header_value("retry-after")
if retry_after is None:

View File

@ -5,6 +5,7 @@ from __future__ import annotations
import time
import webbrowser
from collections.abc import Callable
from contextlib import suppress
import httpx
from oauth_cli_kit.models import OAuthToken
@ -86,10 +87,8 @@ def login_github_copilot(
printer(f"Open: {verify_url}")
printer(f"Code: {user_code}")
if verify_complete:
try:
with suppress(Exception):
webbrowser.open(verify_complete)
except Exception:
pass
deadline = time.time() + expires_in
current_interval = interval

View File

@ -5,6 +5,7 @@ from __future__ import annotations
import ipaddress
import re
import socket
from contextlib import suppress
from urllib.parse import urlparse
_BLOCKED_NETWORKS = [
@ -30,10 +31,8 @@ def configure_ssrf_whitelist(cidrs: list[str]) -> None:
global _allowed_networks
nets = []
for cidr in cidrs:
try:
with suppress(ValueError):
nets.append(ipaddress.ip_network(cidr, strict=False))
except ValueError:
pass
_allowed_networks = nets

View File

@ -3,6 +3,7 @@
import json
import os
import shutil
from contextlib import suppress
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
@ -362,15 +363,11 @@ class SessionManager:
if data.get("_type") == "metadata":
metadata = data.get("metadata", {})
if data.get("created_at"):
try:
with suppress(ValueError, TypeError):
created_at = datetime.fromisoformat(data["created_at"])
except (ValueError, TypeError):
pass
if data.get("updated_at"):
try:
with suppress(ValueError, TypeError):
updated_at = datetime.fromisoformat(data["updated_at"])
except (ValueError, TypeError):
pass
last_consolidated = data.get("last_consolidated", 0)
else:
messages.append(data)
@ -440,14 +437,12 @@ class SessionManager:
# On Windows, opening a directory with O_RDONLY raises
# PermissionError — skip the dir sync there (NTFS
# journals metadata synchronously).
try:
with suppress(PermissionError):
fd = os.open(str(path.parent), os.O_RDONLY)
try:
os.fsync(fd)
finally:
os.close(fd)
except PermissionError:
pass # Windows — directory fsync not supported
except BaseException:
tmp_path.unlink(missing_ok=True)
raise

View File

@ -12,25 +12,23 @@ Example:
import sys
import zipfile
from contextlib import suppress
from pathlib import Path
from quick_validate import validate_skill
def _is_within(path: Path, root: Path) -> bool:
try:
with suppress(ValueError):
path.relative_to(root)
return True
except ValueError:
return False
def _cleanup_partial_archive(skill_filename: Path) -> None:
try:
if skill_filename.exists():
with suppress(OSError):
skill_filename.unlink()
except OSError:
pass
def package_skill(skill_path, output_dir=None):

View File

@ -6,6 +6,7 @@ import re
import shutil
import time
import uuid
from contextlib import suppress
from datetime import datetime
from pathlib import Path
from typing import Any
@ -416,14 +417,10 @@ def estimate_prompt_tokens_chain(
"""Estimate prompt tokens via provider counter first, then tiktoken fallback."""
provider_counter = getattr(provider, "estimate_prompt_tokens", None)
if callable(provider_counter):
try:
with suppress(Exception):
tokens, source = provider_counter(messages, tools, model)
if isinstance(tokens, (int, float)) and tokens > 0:
return int(tokens), str(source or "provider_counter")
except Exception:
pass
estimated = estimate_prompt_tokens(messages, tools)
if estimated > 0:
return int(estimated), "tiktoken"
return 0, "none"

View File

@ -5,6 +5,7 @@ from __future__ import annotations
import json
import os
import time
from contextlib import suppress
from dataclasses import dataclass, field
from typing import Any
@ -26,11 +27,9 @@ def format_restart_completed_message(started_at_raw: str) -> str:
"""Build restart completion text and include elapsed time when available."""
elapsed_suffix = ""
if started_at_raw:
try:
with suppress(ValueError):
elapsed_s = max(0.0, time.time() - float(started_at_raw))
elapsed_suffix = f" in {elapsed_s:.1f}s"
except ValueError:
pass
return f"Restart completed{elapsed_suffix}."