From ff132ffec2aa21863d3401a48da7a67f23ca8223 Mon Sep 17 00:00:00 2001 From: Stevengre Date: Mon, 8 Jun 2026 16:55:43 +0800 Subject: [PATCH 1/5] Add step-timeout/shrink policy to Prover.advance_proof Implements runtimeverification/k#4924: factor kontrol's `--per-depth-timeout` mechanism out of tool-specific code and into a generic policy in `Prover.advance_proof`. `advance_proof` now runs each step under the prover's `step_timeout` (a per-step wall-clock budget). A step that exceeds its budget is interrupted, the prover is asked to `shrink_step` (do less work per step), and the step is retried (the timed-out node was never committed, so it reappears from `get_steps`). If the prover cannot shrink further, advancement stops. Provers without a `step_timeout` run synchronously as before, so `advance_proof` stays generic and gains no new parameter. - Prover: `step_timeout` attribute (whole seconds, None disables) plus no-op `shrink_step()` / `interrupt()` hooks - APRProver: `step_timeout` ctor arg (floored at 1s); `shrink_step` halves `execute_depth`; `interrupt` aborts the in-flight Kore request - KCFGExplore/CTermSymbolic/KoreClient/JsonRpc*/Transport: `interrupt()` that force-unblocks an in-flight single-socket request and reconnects - Unit tests: shrink-until-progress, stop-at-floor, fast-path, disabled --- pyk/src/pyk/cterm/symbolic.py | 4 + pyk/src/pyk/kcfg/explore.py | 4 + pyk/src/pyk/kore/rpc.py | 46 +++++++ pyk/src/pyk/proof/proof.py | 106 ++++++++++++--- pyk/src/pyk/proof/reachability.py | 15 +++ pyk/src/tests/unit/test_advance_proof.py | 160 +++++++++++++++++++++++ 6 files changed, 314 insertions(+), 21 deletions(-) create mode 100644 pyk/src/tests/unit/test_advance_proof.py diff --git a/pyk/src/pyk/cterm/symbolic.py b/pyk/src/pyk/cterm/symbolic.py index af8f9a4e367..66d178dcb37 100644 --- a/pyk/src/pyk/cterm/symbolic.py +++ b/pyk/src/pyk/cterm/symbolic.py @@ -127,6 +127,10 @@ def kast_to_kore(self, kinner: KInner) -> Pattern: def kore_to_kast(self, pattern: Pattern) -> KInner: return kore_to_kast(self._definition, pattern) + def interrupt(self) -> None: + """Abort a backend request currently in flight on another thread; see `KoreClient.interrupt`.""" + self._kore_client.interrupt() + def _haskell_logging_request(self, haskell_logging: bool | None) -> tuple[str, ...] | None: """Resolve the per-call on/off flag to the list of log entries to request. diff --git a/pyk/src/pyk/kcfg/explore.py b/pyk/src/pyk/kcfg/explore.py index 1eef51588a7..81716fdd4b0 100644 --- a/pyk/src/pyk/kcfg/explore.py +++ b/pyk/src/pyk/kcfg/explore.py @@ -59,6 +59,10 @@ def _pretty_printer(self) -> PrettyPrinter: def pretty_print(self, kinner: KInner) -> str: return self._pretty_printer.print(kinner) + def interrupt(self) -> None: + """Abort a backend request currently in flight on another thread; see `KoreClient.interrupt`.""" + self.cterm_symbolic.interrupt() + def _extract_rule_labels(self, _logs: tuple[LogEntry, ...]) -> list[str]: _rule_lines = [] for node_log in _logs: diff --git a/pyk/src/pyk/kore/rpc.py b/pyk/src/pyk/kore/rpc.py index e4e03558838..94de6401446 100644 --- a/pyk/src/pyk/kore/rpc.py +++ b/pyk/src/pyk/kore/rpc.py @@ -85,6 +85,15 @@ def __exit__(self, *args: Any) -> None: @abstractmethod def close(self) -> None: ... + def interrupt(self) -> None: + """Abort a request that is currently in flight on another thread. + + After `interrupt()` returns, a thread blocked in `request()` must raise promptly and + the transport must remain usable for subsequent requests. The default implementation + does nothing; transports backed by an interruptible connection should override it. + """ + ... + @abstractmethod def _request(self, req: str) -> str: ... @@ -101,6 +110,7 @@ class TransportType(Enum): class SingleSocketTransport(Transport): _host: str _port: int + _timeout: int | None _sock: socket.socket _file: IO[str] @@ -113,6 +123,7 @@ def __init__( ): self._host = host self._port = port + self._timeout = timeout self._sock = self._create_connection(host, port, timeout) self._file = self._sock.makefile('r') @@ -141,6 +152,23 @@ def close(self) -> None: self._file.close() self._sock.close() + def interrupt(self) -> None: + # Shutting down the socket unblocks a thread currently blocked in `readline`, causing + # its read to raise. We then reconnect so the transport stays usable for later requests. + # The old socket is closed; the old file object is left to be reclaimed by the garbage + # collector to avoid racing a `close()` against the unwinding reader thread. + old_sock = self._sock + try: + old_sock.shutdown(socket.SHUT_RDWR) + except OSError: + pass + self._sock = self._create_connection(self._host, self._port, self._timeout) + self._file = self._sock.makefile('r') + try: + old_sock.close() + except OSError: + pass + def _request(self, req: str) -> str: self._sock.sendall(req.encode()) server_addr = self._description() @@ -252,6 +280,12 @@ def last_request_id(self) -> str | None: """The JSON-RPC id of the most recent request issued through this facade (``None`` if none yet).""" return self._last_request_id + def interrupt(self) -> None: + self._default_client.interrupt() + for clients in self._clients.values(): + for client in clients: + client.interrupt() + def request(self, method: str, **params: Any) -> dict[str, Any]: if method in self._clients: for client in self._clients[method]: @@ -316,6 +350,9 @@ def __exit__(self, *args: Any) -> None: def close(self) -> None: self._transport.close() + def interrupt(self) -> None: + self._transport.interrupt() + def request(self, method: str, **params: Any) -> dict[str, Any]: label = client_label.get() prefix = label if label is not None else str(id(self)) @@ -1014,6 +1051,15 @@ def last_request_id(self) -> str | None: """ return self._client.last_request_id + def interrupt(self) -> None: + """Abort an `execute`/`simplify`/… request currently in flight on another thread. + + After this returns the interrupted call raises and the client stays usable. Only + effective for the single-socket transport; a no-op for transports that cannot abort + an in-flight request. + """ + self._client.interrupt() + def _request(self, method: str, **params: Any) -> dict[str, Any]: try: return self._client.request(method, **params) diff --git a/pyk/src/pyk/proof/proof.py b/pyk/src/pyk/proof/proof.py index 0d1eef04d2e..3edc8d45d33 100644 --- a/pyk/src/pyk/proof/proof.py +++ b/pyk/src/pyk/proof/proof.py @@ -3,7 +3,9 @@ import json import logging from abc import ABC, abstractmethod -from concurrent.futures import ThreadPoolExecutor, wait +from concurrent.futures import ThreadPoolExecutor +from concurrent.futures import TimeoutError as FuturesTimeoutError +from concurrent.futures import wait from dataclasses import dataclass from enum import Enum from itertools import chain @@ -499,6 +501,33 @@ def init_proof(self, proof: P) -> None: """ ... + #: Per-step wall-clock budget in whole seconds (minimum 1). When set, `advance_proof` runs each + #: step under this budget and, on timeout, interrupts it, calls `shrink_step`, and retries. + #: `None` (the default) disables the policy, so steps run synchronously with no time limit. A + #: prover that can do less work per step (e.g. `APRProver`, by lowering its execution depth) + #: should pair this with `shrink_step`. + step_timeout: int | None = None + + def shrink_step(self) -> bool: + """Reduce the amount of work a single `step_proof` does, after a step timed out. + + Return `True` if the step was made smaller (so it is worth retrying), or `False` if the + step is already at its minimum size (so `advance_proof` should stop). The default is a + no-op that returns `False`; see `step_timeout`. + """ + return False + + def interrupt(self) -> None: + """Abort a `step_proof` call currently running on another thread, as quickly as possible. + + Used by the timeout-and-shrink policy in `advance_proof` to abandon a step that has + exhausted its `step_timeout` budget. After this returns, a thread blocked in `step_proof` + must raise promptly and the prover must remain usable for subsequent steps. The default + implementation does nothing; provers backed by an interruptible resource (e.g. a Kore RPC + connection) should override it. + """ + ... + def advance_proof( self, proof: P, @@ -509,7 +538,8 @@ def advance_proof( ) -> None: """Advance a proof. - Performs loop `Proof.get_steps()` -> `Prover.step_proof()` -> `Proof.commit()`. + Performs loop `Proof.get_steps()` -> `Prover.step_proof()` (within `step_timeout`, else + `interrupt` + `shrink_step` and retry, or stop if it cannot shrink) -> `Proof.commit()`. Args: proof: proof to advance. @@ -522,25 +552,59 @@ def advance_proof( iterations = 0 _LOGGER.info(f'Initializing proof: {proof.id}') self.init_proof(proof) - while True: - steps = list(proof.get_steps()) - _LOGGER.info(f'Found {len(steps)} next steps for proof: {proof.id}') - if len(steps) == 0: - break - for step in steps: - if fail_fast and proof.failed: - _LOGGER.warning(f'Terminating proof early because fail_fast is set: {proof.id}') - proof.failure_info = self.failure_info(proof) - return - if max_iterations is not None and max_iterations <= iterations: - return - iterations += 1 - results = self.step_proof(step) - for result in results: - proof.commit(result) - if iterations % maintenance_rate == 0: - proof.write_proof_data() - callback(proof) + + timed = self.step_timeout is not None + executor = ThreadPoolExecutor(max_workers=1) if timed else None + try: + while True: + steps = list(proof.get_steps()) + _LOGGER.info(f'Found {len(steps)} next steps for proof: {proof.id}') + if len(steps) == 0: + break + shrank_step = False + for step in steps: + if fail_fast and proof.failed: + _LOGGER.warning(f'Terminating proof early because fail_fast is set: {proof.id}') + proof.failure_info = self.failure_info(proof) + return + if max_iterations is not None and max_iterations <= iterations: + return + if timed: + assert executor is not None + budget = self.step_timeout + assert budget is not None + future = executor.submit(self.step_proof, step) + try: + results = future.result(timeout=budget) + except FuturesTimeoutError: + # The step exhausted its budget: interrupt it, ask the prover to do less + # work per step, and re-fetch steps so the same node is retried smaller. + self.interrupt() + wait([future]) + if not self.shrink_step(): + _LOGGER.warning( + f'Proof {proof.id}: step exhausted {budget}s budget and cannot be ' + f'shrunk further; stopping.' + ) + return + _LOGGER.warning( + f'Proof {proof.id}: step exhausted {budget}s budget; shrinking and retrying.' + ) + shrank_step = True + break + else: + results = self.step_proof(step) + iterations += 1 + for result in results: + proof.commit(result) + if iterations % maintenance_rate == 0: + proof.write_proof_data() + callback(proof) + if shrank_step: + continue + finally: + if executor is not None: + executor.shutdown(wait=False) if proof.failed: proof.failure_info = self.failure_info(proof) diff --git a/pyk/src/pyk/proof/reachability.py b/pyk/src/pyk/proof/reachability.py index 0b4a66d2b35..b7c01d913da 100644 --- a/pyk/src/pyk/proof/reachability.py +++ b/pyk/src/pyk/proof/reachability.py @@ -725,6 +725,7 @@ class APRProver(Prover[APRProof, APRProofStep, APRProofResult]): kcfg_explore: KCFGExplore extra_module: KFlatModule | None optimize_kcfg: bool + step_timeout: int | None def __init__( self, @@ -738,6 +739,7 @@ def __init__( assume_defined: bool = False, extra_module: KFlatModule | None = None, optimize_kcfg: bool = False, + step_timeout: int | None = None, ) -> None: self.kcfg_explore = kcfg_explore @@ -751,10 +753,23 @@ def __init__( self.assume_defined = assume_defined self.extra_module = extra_module self.optimize_kcfg = optimize_kcfg + # Whole seconds, floored at 1; None disables the per-step timeout/shrink policy entirely. + self.step_timeout = max(1, step_timeout) if step_timeout is not None else None def close(self) -> None: self.kcfg_explore.cterm_symbolic._kore_client.close() + def shrink_step(self) -> bool: + # On step timeout, halve the execution depth (floor 1) so the next attempt does less work + # per `extend_cterm`. Returns False once `execute_depth` is unset or already at the minimum. + if self.execute_depth is None or self.execute_depth <= 1: + return False + self.execute_depth = max(1, self.execute_depth // 2) + return True + + def interrupt(self) -> None: + self.kcfg_explore.interrupt() + def init_proof(self, proof: APRProof) -> None: # Stamp proof.id on every subsequent kore-RPC request from this thread so # booster's `{request: ...}` log lines self-identify the originating diff --git a/pyk/src/tests/unit/test_advance_proof.py b/pyk/src/tests/unit/test_advance_proof.py new file mode 100644 index 00000000000..fe955f78e93 --- /dev/null +++ b/pyk/src/tests/unit/test_advance_proof.py @@ -0,0 +1,160 @@ +from __future__ import annotations + +from threading import Event +from typing import TYPE_CHECKING + +from pyk.proof.proof import Proof, ProofStatus, Prover + +if TYPE_CHECKING: + from collections.abc import Mapping + from pathlib import Path + from typing import Any + + +class _StepInterrupted(Exception): + """Raised inside `step_proof` when the prover is interrupted, mimicking a backend abort.""" + + +class CountingProof(Proof[int, int]): + """Minimal proof that needs `target` committed steps to pass.""" + + target: int + committed: int + + def __init__(self, id: str, target: int) -> None: + super().__init__(id) + self.target = target + self.committed = 0 + + def commit(self, result: int) -> None: + self.committed += result + + @property + def own_status(self) -> ProofStatus: + return ProofStatus.PASSED if self.committed >= self.target else ProofStatus.PENDING + + @property + def can_progress(self) -> bool: + return self.committed < self.target + + @classmethod + def from_dict(cls: type[CountingProof], dct: Mapping[str, Any], proof_dir: Path | None = None) -> CountingProof: + raise NotImplementedError + + def write_proof_data(self) -> None: ... + + def get_steps(self) -> list[int]: + return [self.committed] if self.can_progress else [] + + +class CountingProver(Prover[CountingProof, int, int]): + """Prover whose `step_proof` is "slow" (blocks for up to `slow_step_secs`) while `depth` exceeds `quick_at_depth`. + + Mirrors `APRProver`: a fixed `step_timeout` budgets each step, and `shrink_step` halves the depth. + A slow step that is interrupted before its budget elapses raises (mimicking a backend abort); a slow + step that is never interrupted finishes its work on its own. Tracks the number of interruptions so + tests can assert how many times a step was shrunk. + """ + + depth: int + quick_at_depth: int + step_timeout: int | None + slow_step_secs: float + interrupt_count: int + _interrupt_event: Event + + def __init__( + self, depth: int, quick_at_depth: int, step_timeout: int | None = 1, slow_step_secs: float = 10.0 + ) -> None: + self.depth = depth + self.quick_at_depth = quick_at_depth + self.step_timeout = step_timeout + self.slow_step_secs = slow_step_secs + self.interrupt_count = 0 + self._interrupt_event = Event() + + def close(self) -> None: ... + + def failure_info(self, proof: CountingProof) -> Any: + return None + + def init_proof(self, proof: CountingProof) -> None: ... + + def shrink_step(self) -> bool: + if self.depth <= 1: + return False + self.depth = max(1, self.depth // 2) + return True + + def interrupt(self) -> None: + self.interrupt_count += 1 + self._interrupt_event.set() + + def step_proof(self, step: int) -> list[int]: + self._interrupt_event.clear() + if self.depth > self.quick_at_depth: + # A "slow" step: block for up to `slow_step_secs`. If `advance_proof` interrupts us first + # (because the step budget elapsed) abort like a real backend; otherwise the step finishes + # its work on its own and commits normally. + if self._interrupt_event.wait(timeout=self.slow_step_secs): + raise _StepInterrupted() + return [1] + + +def test_advance_proof_shrinks_until_progress() -> None: + # Given: depth 4 stalls, but a step completes once depth drops to <= 2. + proof = CountingProof('counting', target=1) + prover = CountingProver(depth=4, quick_at_depth=2) + + # When + prover.advance_proof(proof) + + # Then: one timeout shrinks 4 -> 2, then a step commits and the proof passes. + assert proof.status == ProofStatus.PASSED + assert prover.depth == 2 + assert prover.interrupt_count == 1 + + +def test_advance_proof_stops_when_cannot_shrink_further() -> None: + # Given: every step stalls regardless of depth. + proof = CountingProof('counting', target=1) + prover = CountingProver(depth=2, quick_at_depth=0) + + # When + prover.advance_proof(proof) + + # Then: depth shrinks 2 -> 1, then stops at the floor; the proof stays pending. + assert proof.status == ProofStatus.PENDING + assert proof.committed == 0 + assert prover.depth == 1 + assert prover.interrupt_count == 2 + + +def test_advance_proof_no_shrink_when_steps_are_fast() -> None: + # Given: step_timeout set but steps always complete in time. + proof = CountingProof('counting', target=3) + prover = CountingProver(depth=2, quick_at_depth=2) + + # When + prover.advance_proof(proof) + + # Then: no interruptions, depth untouched, proof passes. + assert proof.status == ProofStatus.PASSED + assert prover.depth == 2 + assert prover.interrupt_count == 0 + + +def test_advance_proof_without_step_timeout_is_unaffected() -> None: + # Given: step_timeout is None -> classic in-loop behavior, no watchdog thread. Each step is "slow" + # (depth 8 > quick_at_depth 4), but without a budget nothing interrupts it, so the step runs to + # completion synchronously instead of being aborted and shrunk. + proof = CountingProof('counting', target=2) + prover = CountingProver(depth=8, quick_at_depth=4, step_timeout=None, slow_step_secs=0.05) + + # When + prover.advance_proof(proof) + + # Then: both slow steps complete on their own; nothing is interrupted or shrunk. + assert proof.status == ProofStatus.PASSED + assert prover.depth == 8 + assert prover.interrupt_count == 0 From bf771141419753f856c900c12510fb5ccc95e8e6 Mon Sep 17 00:00:00 2001 From: Stevengre Date: Mon, 8 Jun 2026 17:49:47 +0800 Subject: [PATCH 2/5] Expose --step-timeout as a prove CLI option The step-timeout/shrink policy added to Prover.advance_proof was reachable only programmatically. Wire it through the pyk prove command: add a step_timeout field to ProveOptions, a --step-timeout argument (whole seconds, floored at 1 by APRProver), and thread the value from ProveRpc into APRProver. Omitting the flag leaves step_timeout=None, preserving the prior synchronous behavior. --- pyk/src/pyk/cli/pyk.py | 13 +++++++++++++ pyk/src/pyk/proof/prove_rpc.py | 9 ++++++++- 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/pyk/src/pyk/cli/pyk.py b/pyk/src/pyk/cli/pyk.py index 8a23469ed8c..75a9ecc1ed6 100644 --- a/pyk/src/pyk/cli/pyk.py +++ b/pyk/src/pyk/cli/pyk.py @@ -316,6 +316,7 @@ class ProveOptions(LoggingOptions, SpecOptions, SaveDirOptions): kore_rpc_command: str | Iterable[str] | None max_depth: int | None max_iterations: int | None + step_timeout: int | None assume_defined: bool show_kcfg: bool haskell_logging: bool @@ -329,6 +330,7 @@ def default() -> dict[str, Any]: 'kore_rpc_command': None, 'max_depth': None, 'max_iterations': None, + 'step_timeout': None, 'assume_defined': False, 'show_kcfg': False, 'haskell_logging': False, @@ -515,6 +517,17 @@ def create_argument_parser() -> ArgumentParser: type=int, help='Maximum number of KCFG explorations to take in attempting to discharge proof.', ) + prove_args.add_argument( + '--step-timeout', + dest='step_timeout', + type=int, + default=None, + help=( + 'Per-step wall-clock budget in whole seconds (floored at 1). When a symbolic-execution step' + ' exceeds it, the step is interrupted, its execution depth is halved, and it is retried;' + ' proving stops once the depth cannot be reduced further. Omit to disable the timeout.' + ), + ) prove_args.add_argument( '--kore-rpc-command', dest='kore_rpc_command', diff --git a/pyk/src/pyk/proof/prove_rpc.py b/pyk/src/pyk/proof/prove_rpc.py index df01ad30b48..9b852daaa6e 100644 --- a/pyk/src/pyk/proof/prove_rpc.py +++ b/pyk/src/pyk/proof/prove_rpc.py @@ -52,6 +52,7 @@ def prove_rpc(self, options: ProveOptions) -> list[Proof]: max_depth=options.max_depth, save_directory=options.save_directory, max_iterations=options.max_iterations, + step_timeout=options.step_timeout, ) for claim in all_claims ] @@ -63,6 +64,7 @@ def _prove_claim_rpc( max_depth: int | None = None, save_directory: Path | None = None, max_iterations: int | None = None, + step_timeout: int | None = None, ) -> Proof: definition = self._kprove.definition @@ -90,7 +92,12 @@ def _prove_claim_rpc( prover = ImpliesProver(proof, kcfg_explore, assume_defined=assume_defined) else: assert type(proof) is APRProof - prover = APRProver(kcfg_explore, execute_depth=max_depth, assume_defined=assume_defined) + prover = APRProver( + kcfg_explore, + execute_depth=max_depth, + assume_defined=assume_defined, + step_timeout=step_timeout, + ) prover.advance_proof(proof, max_iterations=max_iterations) # type: ignore [arg-type] if proof.passed: From ff13d21fffd5f50ef9ad4409e5c751b3f07816f8 Mon Sep 17 00:00:00 2001 From: Stevengre Date: Tue, 9 Jun 2026 15:41:47 +0800 Subject: [PATCH 3/5] Interrupt in-flight Kore requests via cancel instead of socket shutdown The prior interrupt() shut down the client socket and reconnected. Empirically (legacy/haskell kore-rpc 0.1.145), that does NOT stop the backend: the request runs in a detached worker thread that the server only aborts on an explicit `cancel` JSON-RPC method, never on a dropped connection. So an interrupted step kept a core pinned at ~100% until it finished on its own, while we reconnected and retried a smaller step. Instead, inject a `cancel` request on the live connection. The server aborts the in-flight request, the awaiting thread receives a "Request cancelled" error, and the connection stays open and reusable (no reconnect). Measured: server CPU drops from ~100% to ~2% immediately after interrupt(). - Transport: replace no-arg interrupt() with send_interrupt(data) (raw out-of-band send on the live connection; default no-op; HTTP stays no-op, one conn per request) - SingleSocketTransport: sendall the cancel payload, no shutdown/reconnect - JsonRpcClient: build the `cancel` request and hand it to the transport --- pyk/src/pyk/kore/rpc.py | 57 ++++++++++++++++++++++------------------- 1 file changed, 31 insertions(+), 26 deletions(-) diff --git a/pyk/src/pyk/kore/rpc.py b/pyk/src/pyk/kore/rpc.py index 94de6401446..2d7057d1363 100644 --- a/pyk/src/pyk/kore/rpc.py +++ b/pyk/src/pyk/kore/rpc.py @@ -85,12 +85,14 @@ def __exit__(self, *args: Any) -> None: @abstractmethod def close(self) -> None: ... - def interrupt(self) -> None: - """Abort a request that is currently in flight on another thread. - - After `interrupt()` returns, a thread blocked in `request()` must raise promptly and - the transport must remain usable for subsequent requests. The default implementation - does nothing; transports backed by an interruptible connection should override it. + def send_interrupt(self, data: str) -> None: + """Inject `data` onto the live connection to abort an in-flight request. + + Used to deliver an out-of-band `cancel` message on a connection whose response is + currently being awaited by another thread, so the server aborts the in-flight request + and the awaiting thread receives a "cancelled" error shortly after. The connection is + left open and usable for subsequent requests. The default implementation does nothing; + transports backed by a persistent, multiplexable connection should override it. """ ... @@ -152,22 +154,13 @@ def close(self) -> None: self._file.close() self._sock.close() - def interrupt(self) -> None: - # Shutting down the socket unblocks a thread currently blocked in `readline`, causing - # its read to raise. We then reconnect so the transport stays usable for later requests. - # The old socket is closed; the old file object is left to be reclaimed by the garbage - # collector to avoid racing a `close()` against the unwinding reader thread. - old_sock = self._sock - try: - old_sock.shutdown(socket.SHUT_RDWR) - except OSError: - pass - self._sock = self._create_connection(self._host, self._port, self._timeout) - self._file = self._sock.makefile('r') - try: - old_sock.close() - except OSError: - pass + def send_interrupt(self, data: str) -> None: + # Inject `data` (a `cancel` request) onto the current socket without reading a response: + # the server's read loop picks it up concurrently and aborts the in-flight request, so the + # thread blocked in `_request`'s `readline` receives the server's "cancelled" error for that + # request. The connection is left open and reusable -- no reconnect. The leading newline just + # guarantees the injected value is separated from any preceding request on the byte stream. + self._sock.sendall(b'\n' + data.encode()) def _request(self, req: str) -> str: self._sock.sendall(req.encode()) @@ -351,7 +344,18 @@ def close(self) -> None: self._transport.close() def interrupt(self) -> None: - self._transport.interrupt() + # Send a `cancel` request on the live connection so the server aborts the in-flight request + # (the request currently being awaited by another thread). The cancel itself gets no response; + # the awaiting thread receives the server's "cancelled" error for the original request. We do + # not touch `_req_id` (it is owned by the requesting thread); the cancel id is purely for traceability. + cancel_id = f'{self._last_request_id}-cancel' if self._last_request_id is not None else 'cancel' + payload = { + 'jsonrpc': self._JSON_RPC_VERSION, + 'id': cancel_id, + 'method': 'cancel', + 'params': {}, + } + self._transport.send_interrupt(json.dumps(payload)) def request(self, method: str, **params: Any) -> dict[str, Any]: label = client_label.get() @@ -1054,9 +1058,10 @@ def last_request_id(self) -> str | None: def interrupt(self) -> None: """Abort an `execute`/`simplify`/… request currently in flight on another thread. - After this returns the interrupted call raises and the client stays usable. Only - effective for the single-socket transport; a no-op for transports that cannot abort - an in-flight request. + Sends a `cancel` request on the live connection so the server stops computing the + in-flight request; the interrupted call then raises a "cancelled" error and the + connection stays open and usable. Only effective for the single-socket transport; + a no-op for transports that cannot inject onto an in-flight request (e.g. HTTP). """ self._client.interrupt() From 5315f98e21420de0737bb900855db25fecba5cfc Mon Sep 17 00:00:00 2001 From: Stevengre Date: Tue, 9 Jun 2026 18:57:26 +0800 Subject: [PATCH 4/5] Add integration test for cancel-based request interrupt Asserts that KoreClient.interrupt() aborts an in-flight (non-terminating) execute promptly with a "Request cancelled" error rather than letting it run to completion, and that the connection survives the cancel and serves a subsequent request. Covers the interrupt mechanism advance_proof's step-timeout policy depends on. --- .../tests/integration/kore/test_interrupt.py | 79 +++++++++++++++++++ 1 file changed, 79 insertions(+) create mode 100644 pyk/src/tests/integration/kore/test_interrupt.py diff --git a/pyk/src/tests/integration/kore/test_interrupt.py b/pyk/src/tests/integration/kore/test_interrupt.py new file mode 100644 index 00000000000..1a1b3d2e64f --- /dev/null +++ b/pyk/src/tests/integration/kore/test_interrupt.py @@ -0,0 +1,79 @@ +from __future__ import annotations + +import threading +import time +from string import Template +from typing import TYPE_CHECKING + +from pyk.kore.parser import KoreParser +from pyk.kore.rpc import DefaultError +from pyk.testing import KoreClientTest + +if TYPE_CHECKING: + from pyk.kore.rpc import KoreClient + from pyk.kore.syntax import Pattern + + +def term(n: int) -> Pattern: + template = Template( + r""" + Lbl'-LT-'generatedTop'-GT-'{}( + Lbl'-LT-'k'-GT-'{}( + kseq{}( + inj{SortInt{}, SortKItem{}}(\dv{SortInt{}}("$n")), + K:SortK{} + ) + ), + GCC:SortGeneratedCounterCell{} + ) + """ + ) + parser = KoreParser(template.substitute(n=n)) + pattern = parser.pattern() + assert parser.eof + return pattern + + +class TestInterrupt(KoreClientTest): + # The interrupt mechanism (cancel over the single-socket transport) is what `advance_proof`'s + # step-timeout policy relies on. `inc` never terminates, so an `execute` only ever returns by + # being interrupted -- which is exactly what this test asserts. + DISABLE_BOOSTER = True # exercise the legacy (pure haskell) kore-rpc server + + KOMPILE_DEFINITION = """ + module INTERRUPT-TEST + imports INT + rule [inc]: I:Int => I +Int 1 + endmodule + """ + KOMPILE_MAIN_MODULE = 'INTERRUPT-TEST' + KOMPILE_ARGS = {'syntax_module': 'INTERRUPT-TEST'} + + def test_interrupt_aborts_in_flight_request_and_keeps_connection(self, kore_client: KoreClient) -> None: + # Given: a non-terminating `execute` running on another thread. + box: dict = {} + + def run() -> None: + try: + kore_client.execute(term(0), max_depth=1_000_000_000) + except BaseException as e: # noqa: B036 - record whatever the interrupted call raises + box['exc'] = e + + thread = threading.Thread(target=run, daemon=True) + thread.start() + time.sleep(2.0) # let the step get well underway + assert thread.is_alive() # sanity: it is genuinely long-running, not terminating on its own + + # When: the in-flight request is interrupted. + kore_client.interrupt() + + # Then: the call is aborted promptly (rather than running ~1e9 steps to completion)... + thread.join(timeout=10.0) + assert not thread.is_alive(), 'execute() was not aborted by interrupt() within 10s' + exc = box.get('exc') + assert isinstance(exc, DefaultError) + assert exc.message == 'Request cancelled' + + # ...and the connection survives the cancel: a fresh request still succeeds on it. + result = kore_client.execute(term(0), max_depth=1) + assert result.depth == 1 From d35b594a485788014486bee97a4f76fbff5e4424 Mon Sep 17 00:00:00 2001 From: Stevengre Date: Thu, 11 Jun 2026 18:21:47 +0800 Subject: [PATCH 5/5] make the comments concise --- pyk/src/pyk/kore/rpc.py | 34 ++++++++++++++-------------------- 1 file changed, 14 insertions(+), 20 deletions(-) diff --git a/pyk/src/pyk/kore/rpc.py b/pyk/src/pyk/kore/rpc.py index 2d7057d1363..099c86839f7 100644 --- a/pyk/src/pyk/kore/rpc.py +++ b/pyk/src/pyk/kore/rpc.py @@ -86,13 +86,11 @@ def __exit__(self, *args: Any) -> None: def close(self) -> None: ... def send_interrupt(self, data: str) -> None: - """Inject `data` onto the live connection to abort an in-flight request. + """Send `data` on the live connection without waiting for a reply. - Used to deliver an out-of-band `cancel` message on a connection whose response is - currently being awaited by another thread, so the server aborts the in-flight request - and the awaiting thread receives a "cancelled" error shortly after. The connection is - left open and usable for subsequent requests. The default implementation does nothing; - transports backed by a persistent, multiplexable connection should override it. + Used to deliver a `cancel` to a connection whose reply another thread is already + awaiting. Default: no-op. Override only for connections that can be written to + while a request is in flight. """ ... @@ -155,11 +153,9 @@ def close(self) -> None: self._sock.close() def send_interrupt(self, data: str) -> None: - # Inject `data` (a `cancel` request) onto the current socket without reading a response: - # the server's read loop picks it up concurrently and aborts the in-flight request, so the - # thread blocked in `_request`'s `readline` receives the server's "cancelled" error for that - # request. The connection is left open and reusable -- no reconnect. The leading newline just - # guarantees the injected value is separated from any preceding request on the byte stream. + # Write the cancel to the socket but don't read the reply: the thread already blocked in + # `_request`'s `readline` will read the server's "cancelled" reply. The socket stays open. + # The leading newline separates the cancel from the request bytes already on the stream. self._sock.sendall(b'\n' + data.encode()) def _request(self, req: str) -> str: @@ -344,10 +340,9 @@ def close(self) -> None: self._transport.close() def interrupt(self) -> None: - # Send a `cancel` request on the live connection so the server aborts the in-flight request - # (the request currently being awaited by another thread). The cancel itself gets no response; - # the awaiting thread receives the server's "cancelled" error for the original request. We do - # not touch `_req_id` (it is owned by the requesting thread); the cancel id is purely for traceability. + # Send a `cancel` so the server aborts the in-flight request. The cancel gets no reply of its + # own; the thread awaiting that request gets a "cancelled" error instead. The id is only for + # logs, so we derive it from the last request rather than touching the requester's `_req_id`. cancel_id = f'{self._last_request_id}-cancel' if self._last_request_id is not None else 'cancel' payload = { 'jsonrpc': self._JSON_RPC_VERSION, @@ -1056,12 +1051,11 @@ def last_request_id(self) -> str | None: return self._client.last_request_id def interrupt(self) -> None: - """Abort an `execute`/`simplify`/… request currently in flight on another thread. + """Abort an `execute`/`simplify`/… request running on another thread. - Sends a `cancel` request on the live connection so the server stops computing the - in-flight request; the interrupted call then raises a "cancelled" error and the - connection stays open and usable. Only effective for the single-socket transport; - a no-op for transports that cannot inject onto an in-flight request (e.g. HTTP). + Sends a `cancel` so the server stops computing; the interrupted call raises a + "cancelled" error and the connection stays usable. Works on the single-socket + transport only; a no-op for HTTP (one connection per request, nothing to cancel). """ self._client.interrupt()