diff --git a/src/utr_usb_inventory_with_output_power.py b/src/utr_usb_inventory_with_output_power.py index 3018961..be619c3 100644 --- a/src/utr_usb_inventory_with_output_power.py +++ b/src/utr_usb_inventory_with_output_power.py @@ -26,7 +26,7 @@ import re import sys import time -from typing import Optional +from typing import Any, Optional import serial from serial.tools import list_ports @@ -167,9 +167,28 @@ def _set_command_mode(ser: serial.Serial) -> None: raise RuntimeError("コマンドモード切替に失敗しました。") +def _create_output_power_restore_state() -> dict[str, Any]: + """送信出力一時変更の復元状態を保持します。 + + plans: + 変更用フレームと復元用フレームのセットです。 + restore_required: + 変更送信を試みたため、異常終了時に復元対象とみなす状態です。 + ACK後の読み戻しで例外が発生しても復元できるようにします。 + restore_done: + finallyで二重復元しないためのフラグです。 + """ + return { + "plans": None, + "restore_required": False, + "restore_done": False, + } + + def _offer_temporary_output_power_change( ser: serial.Serial, identified_model_key: str | None, + restore_state: dict[str, Any] | None = None, ) -> OutputPowerTemporaryChangePlans | None: """Inventory前に送信出力一時変更を提案し、変更した場合は復元計画を返します。""" current_setting = _read_current_output_power_setting(ser) @@ -204,6 +223,10 @@ def _offer_temporary_output_power_change( print("送信出力は変更しません。") return None + if restore_state is not None: + restore_state["plans"] = plans + restore_state["restore_required"] = True + _send_output_power_frame(ser, plans.change_plan.command_frame, "送信出力の一時変更") _read_current_output_power_setting(ser) return plans @@ -212,19 +235,31 @@ def _offer_temporary_output_power_change( def _restore_temporary_output_power( ser: serial.Serial, plans: OutputPowerTemporaryChangePlans | None, + restore_state: dict[str, Any] | None = None, ) -> None: """一時変更した送信出力を元に戻します。""" - if plans is None: + effective_plans = plans + if restore_state is not None: + if restore_state.get("restore_done"): + return + if not restore_state.get("restore_required"): + return + effective_plans = effective_plans or restore_state.get("plans") + + if effective_plans is None: return try: - _send_output_power_frame(ser, plans.restore_plan.command_frame, "送信出力の復元") + _send_output_power_frame(ser, effective_plans.restore_plan.command_frame, "送信出力の復元") _read_current_output_power_setting(ser) except Exception as exc: print("") print("送信出力の復元に失敗しました。") print("UTRRWManagerまたは再実行時の読み取りで、機器側の現在設定を確認してください。") print(f"復元エラー: {exc}") + finally: + if restore_state is not None: + restore_state["restore_done"] = True def _read_inventory_parameters(ser: serial.Serial) -> None: @@ -384,6 +419,7 @@ def main() -> None: ser: serial.Serial | None = None antenna_selection: Optional[InventoryAntennaSelection] = None output_power_plans: OutputPowerTemporaryChangePlans | None = None + output_power_restore_state = _create_output_power_restore_state() total_iterations = 0 total_read_time = 0.0 total_read_count = 0 @@ -405,7 +441,11 @@ def main() -> None: rom_info, identified_model_key = _verify_usb_and_read_model_key(ser) _set_command_mode(ser) - output_power_plans = _offer_temporary_output_power_change(ser, identified_model_key) + output_power_plans = _offer_temporary_output_power_change( + ser, + identified_model_key, + restore_state=output_power_restore_state, + ) read_and_print_pre_inventory_reader_settings(ser) _read_inventory_parameters(ser) @@ -427,7 +467,11 @@ def main() -> None: print(f"エラー内容: {exc}") finally: if ser is not None: - _restore_temporary_output_power(ser, output_power_plans) + _restore_temporary_output_power( + ser, + output_power_plans, + restore_state=output_power_restore_state, + ) finish_inventory_session( ser, antenna_selection,