Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pyk/src/pyk/cterm/symbolic.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ def kast_to_kore(self, kinner: KInner) -> Pattern:
def kore_to_kast(self, pattern: Pattern) -> KInner:
return kore_to_kast(self._definition, pattern)

def interrupt(self) -> None:
"""Abort a backend request currently in flight on another thread; see `KoreClient.interrupt`."""
self._kore_client.interrupt()

def execute(
self,
cterm: CTerm,
Expand Down
4 changes: 4 additions & 0 deletions pyk/src/pyk/kcfg/explore.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ def _pretty_printer(self) -> PrettyPrinter:
def pretty_print(self, kinner: KInner) -> str:
return self._pretty_printer.print(kinner)

def interrupt(self) -> None:
"""Abort a backend request currently in flight on another thread; see `KoreClient.interrupt`."""
self.cterm_symbolic.interrupt()

def _extract_rule_labels(self, _logs: tuple[LogEntry, ...]) -> list[str]:
_rule_lines = []
for node_log in _logs:
Expand Down
46 changes: 46 additions & 0 deletions pyk/src/pyk/kore/rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,15 @@ def __exit__(self, *args: Any) -> None:
@abstractmethod
def close(self) -> None: ...

def interrupt(self) -> None:
"""Abort a request that is currently in flight on another thread.

After `interrupt()` returns, a thread blocked in `request()` must raise promptly and
the transport must remain usable for subsequent requests. The default implementation
does nothing; transports backed by an interruptible connection should override it.
"""
...

@abstractmethod
def _request(self, req: str) -> str: ...

Expand All @@ -92,6 +101,7 @@ class TransportType(Enum):
class SingleSocketTransport(Transport):
_host: str
_port: int
_timeout: int | None
_sock: socket.socket
_file: IO[str]

Expand All @@ -104,6 +114,7 @@ def __init__(
):
self._host = host
self._port = port
self._timeout = timeout
self._sock = self._create_connection(host, port, timeout)
self._file = self._sock.makefile('r')

Expand Down Expand Up @@ -131,6 +142,23 @@ def close(self) -> None:
self._file.close()
self._sock.close()

def interrupt(self) -> None:
# Shutting down the socket unblocks a thread currently blocked in `readline`, causing
# its read to raise. We then reconnect so the transport stays usable for later requests.
# The old socket is closed; the old file object is left to be reclaimed by the garbage
# collector to avoid racing a `close()` against the unwinding reader thread.
old_sock = self._sock
try:
old_sock.shutdown(socket.SHUT_RDWR)
except OSError:
pass
self._sock = self._create_connection(self._host, self._port, self._timeout)
self._file = self._sock.makefile('r')
try:
old_sock.close()
except OSError:
pass

def _request(self, req: str) -> str:
self._sock.sendall(req.encode())
server_addr = self._description()
Expand Down Expand Up @@ -235,6 +263,12 @@ def close(self) -> None:
for client in clients:
client.close()

def interrupt(self) -> None:
self._default_client.interrupt()
for clients in self._clients.values():
for client in clients:
client.interrupt()

def request(self, method: str, **params: Any) -> dict[str, Any]:
if method in self._clients:
for client in self._clients[method]:
Expand Down Expand Up @@ -289,6 +323,9 @@ def __exit__(self, *args: Any) -> None:
def close(self) -> None:
self._transport.close()

def interrupt(self) -> None:
self._transport.interrupt()

def request(self, method: str, **params: Any) -> dict[str, Any]:
req_id = f'{id(self)}-{self._req_id:03}'
self._req_id += 1
Expand Down Expand Up @@ -918,6 +955,15 @@ def __exit__(self, *args: Any) -> None:
def close(self) -> None:
self._client.close()

def interrupt(self) -> None:
"""Abort an `execute`/`simplify`/… request currently in flight on another thread.

After this returns the interrupted call raises and the client stays usable. Only
effective for the single-socket transport; a no-op for transports that cannot abort
an in-flight request.
"""
self._client.interrupt()

def _request(self, method: str, **params: Any) -> dict[str, Any]:
try:
return self._client.request(method, **params)
Expand Down
112 changes: 92 additions & 20 deletions pyk/src/pyk/proof/proof.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
import json
import logging
from abc import ABC, abstractmethod
from concurrent.futures import ThreadPoolExecutor, wait
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import TimeoutError as FuturesTimeoutError
from concurrent.futures import wait
from dataclasses import dataclass
from enum import Enum
from itertools import chain
Expand Down Expand Up @@ -499,13 +501,38 @@ def init_proof(self, proof: P) -> None:
"""
...

def get_step_depth(self) -> int | None:
"""Return the current per-step exploration depth, if this prover exposes a tunable one.

Returning `None` (the default) opts the prover out of the progressive depth-halving
policy in `advance_proof`. Provers with a tunable execution depth (e.g. `APRProver`)
should override this together with `set_step_depth`.
"""
return None

def set_step_depth(self, depth: int) -> None:
"""Set the per-step exploration depth. No-op by default; see `get_step_depth`."""
...

def interrupt(self) -> None:
"""Abort a `step_proof` call currently running on another thread, as quickly as possible.

Used by the progressive depth-halving policy in `advance_proof` to abandon a step that
has exhausted its stall window. After this returns, a thread blocked in `step_proof` must
raise promptly and the prover must remain usable for subsequent steps. The default
implementation does nothing; provers backed by an interruptible resource (e.g. a Kore RPC
connection) should override it.
"""
...

def advance_proof(
self,
proof: P,
max_iterations: int | None = None,
fail_fast: bool = False,
callback: Callable[[P], None] = (lambda x: None),
maintenance_rate: int = 1,
per_depth_timeout: float | None = None,
) -> None:
"""Advance a proof.

Expand All @@ -518,29 +545,74 @@ def advance_proof(
halt execution even if there are still available steps.
callback: Callable to run in between each completed step, useful for getting real-time information about the proof.
maintenance_rate: Number of iterations between proof maintenance (writing to disk and executing callback).
per_depth_timeout (optional): Enables progressive depth halving when set to a positive value.
Each step is given a stall window of `current_depth * per_depth_timeout` seconds (where
`current_depth` is the prover's `get_step_depth()`) to commit its result. If a step does not
finish within its window, it is interrupted, the step depth is halved (down to a floor of 1),
and the step is retried at the shallower depth. Has no effect for provers whose
`get_step_depth()` returns `None`.
"""
iterations = 0
_LOGGER.info(f'Initializing proof: {proof.id}')
self.init_proof(proof)
while True:
steps = list(proof.get_steps())
_LOGGER.info(f'Found {len(steps)} next steps for proof: {proof.id}')
if len(steps) == 0:
break
for step in steps:
if fail_fast and proof.failed:
_LOGGER.warning(f'Terminating proof early because fail_fast is set: {proof.id}')
proof.failure_info = self.failure_info(proof)
return
if max_iterations is not None and max_iterations <= iterations:
return
iterations += 1
results = self.step_proof(step)
for result in results:
proof.commit(result)
if iterations % maintenance_rate == 0:
proof.write_proof_data()
callback(proof)

progressive = per_depth_timeout is not None and per_depth_timeout > 0 and self.get_step_depth() is not None
executor = ThreadPoolExecutor(max_workers=1) if progressive else None
try:
while True:
steps = list(proof.get_steps())
_LOGGER.info(f'Found {len(steps)} next steps for proof: {proof.id}')
if len(steps) == 0:
break
halved_depth = False
for step in steps:
if fail_fast and proof.failed:
_LOGGER.warning(f'Terminating proof early because fail_fast is set: {proof.id}')
proof.failure_info = self.failure_info(proof)
return
if max_iterations is not None and max_iterations <= iterations:
return
if progressive:
assert executor is not None
assert per_depth_timeout is not None
depth = self.get_step_depth()
assert depth is not None
window = max(depth, 1) * per_depth_timeout
future = executor.submit(self.step_proof, step)
try:
results = future.result(timeout=window)
except FuturesTimeoutError:
# The step exhausted its stall window: interrupt it, halve the depth,
# and re-fetch steps so the same node is retried at the shallower depth.
self.interrupt()
wait([future])
new_depth = max(1, depth // 2)
if new_depth >= depth:
_LOGGER.warning(
f'Proof {proof.id}: step exhausted {window}s stall window at minimum '
f'depth {depth}; stopping.'
)
return
_LOGGER.warning(
f'Proof {proof.id}: step exhausted {window}s stall window at depth {depth}; '
f'halving to {new_depth}.'
)
self.set_step_depth(new_depth)
halved_depth = True
break
else:
results = self.step_proof(step)
iterations += 1
for result in results:
proof.commit(result)
if iterations % maintenance_rate == 0:
proof.write_proof_data()
callback(proof)
if halved_depth:
continue
finally:
if executor is not None:
executor.shutdown(wait=False)

if proof.failed:
proof.failure_info = self.failure_info(proof)
9 changes: 9 additions & 0 deletions pyk/src/pyk/proof/reachability.py
Original file line number Diff line number Diff line change
Expand Up @@ -755,6 +755,15 @@ def __init__(
def close(self) -> None:
self.kcfg_explore.cterm_symbolic._kore_client.close()

def get_step_depth(self) -> int | None:
return self.execute_depth

def set_step_depth(self, depth: int) -> None:
self.execute_depth = depth

def interrupt(self) -> None:
self.kcfg_explore.interrupt()

def init_proof(self, proof: APRProof) -> None:
main_module_name = self.main_module_name
if self.extra_module:
Expand Down
Loading
Loading