Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 49 additions & 5 deletions src/utr_usb_inventory_with_output_power.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import re
import sys
import time
from typing import Optional
from typing import Any, Optional

import serial
from serial.tools import list_ports
Expand Down Expand Up @@ -167,9 +167,28 @@ def _set_command_mode(ser: serial.Serial) -> None:
raise RuntimeError("コマンドモード切替に失敗しました。")


def _create_output_power_restore_state() -> dict[str, Any]:
"""送信出力一時変更の復元状態を保持します。

plans:
変更用フレームと復元用フレームのセットです。
restore_required:
変更送信を試みたため、異常終了時に復元対象とみなす状態です。
ACK後の読み戻しで例外が発生しても復元できるようにします。
restore_done:
finallyで二重復元しないためのフラグです。
"""
return {
"plans": None,
"restore_required": False,
"restore_done": False,
}


def _offer_temporary_output_power_change(
ser: serial.Serial,
identified_model_key: str | None,
restore_state: dict[str, Any] | None = None,
) -> OutputPowerTemporaryChangePlans | None:
"""Inventory前に送信出力一時変更を提案し、変更した場合は復元計画を返します。"""
current_setting = _read_current_output_power_setting(ser)
Expand Down Expand Up @@ -204,6 +223,10 @@ def _offer_temporary_output_power_change(
print("送信出力は変更しません。")
return None

if restore_state is not None:
restore_state["plans"] = plans
restore_state["restore_required"] = True

_send_output_power_frame(ser, plans.change_plan.command_frame, "送信出力の一時変更")
_read_current_output_power_setting(ser)
return plans
Expand All @@ -212,19 +235,31 @@ def _offer_temporary_output_power_change(
def _restore_temporary_output_power(
ser: serial.Serial,
plans: OutputPowerTemporaryChangePlans | None,
restore_state: dict[str, Any] | None = None,
) -> None:
"""一時変更した送信出力を元に戻します。"""
if plans is None:
effective_plans = plans
if restore_state is not None:
if restore_state.get("restore_done"):
return
if not restore_state.get("restore_required"):
return
effective_plans = effective_plans or restore_state.get("plans")

if effective_plans is None:
return

try:
_send_output_power_frame(ser, plans.restore_plan.command_frame, "送信出力の復元")
_send_output_power_frame(ser, effective_plans.restore_plan.command_frame, "送信出力の復元")
_read_current_output_power_setting(ser)
except Exception as exc:
print("")
print("送信出力の復元に失敗しました。")
print("UTRRWManagerまたは再実行時の読み取りで、機器側の現在設定を確認してください。")
print(f"復元エラー: {exc}")
finally:
if restore_state is not None:
restore_state["restore_done"] = True


def _read_inventory_parameters(ser: serial.Serial) -> None:
Expand Down Expand Up @@ -384,6 +419,7 @@ def main() -> None:
ser: serial.Serial | None = None
antenna_selection: Optional[InventoryAntennaSelection] = None
output_power_plans: OutputPowerTemporaryChangePlans | None = None
output_power_restore_state = _create_output_power_restore_state()
total_iterations = 0
total_read_time = 0.0
total_read_count = 0
Expand All @@ -405,7 +441,11 @@ def main() -> None:

rom_info, identified_model_key = _verify_usb_and_read_model_key(ser)
_set_command_mode(ser)
output_power_plans = _offer_temporary_output_power_change(ser, identified_model_key)
output_power_plans = _offer_temporary_output_power_change(
ser,
identified_model_key,
restore_state=output_power_restore_state,
)

read_and_print_pre_inventory_reader_settings(ser)
_read_inventory_parameters(ser)
Expand All @@ -427,7 +467,11 @@ def main() -> None:
print(f"エラー内容: {exc}")
finally:
if ser is not None:
_restore_temporary_output_power(ser, output_power_plans)
_restore_temporary_output_power(
ser,
output_power_plans,
restore_state=output_power_restore_state,
)
finish_inventory_session(
ser,
antenna_selection,
Expand Down