From 5ffef619c9fe592878d5df2773d03d5acdb14a19 Mon Sep 17 00:00:00 2001 From: Nick Kleiner Date: Mon, 8 Jun 2026 09:06:09 -0400 Subject: [PATCH 1/2] Added status code functionality to PyAres --- PyAres/Demo/Analyzers/airship_analyzer.py | 20 ++ PyAres/Demo/{ => Analyzers}/analyzer_test.py | 32 ++- .../analyzer_test_tools_demo.py | 0 PyAres/Demo/{ => Analyzers}/analyzer_wiki.py | 0 PyAres/Demo/{ => Devices}/device_test.py | 4 +- .../failure_test_device.py} | 4 +- PyAres/Demo/{ => Devices}/hotplate.py | 8 +- .../{ => Devices}/random_number_device.py | 5 +- PyAres/Demo/{ => Devices}/rotary_mixer.py | 15 +- PyAres/Demo/Planners/airship_planner.py | 187 ++++++++++++++++++ PyAres/Demo/{ => Planners}/planner_test.py | 0 .../{ => Planners}/planner_test_tools_demo.py | 0 PyAres/Demo/{ => Planners}/planner_wiki.py | 0 PyAres/Device/__init__.py | 2 + PyAres/Device/device_models.py | 26 ++- PyAres/Device/device_service.py | 42 +++- PyAres/Planning/planner_models.py | 2 +- PyAres/Utils/device_status_code_utils.py | 19 ++ PyAres/__init__.py | 2 + 19 files changed, 329 insertions(+), 39 deletions(-) create mode 100644 PyAres/Demo/Analyzers/airship_analyzer.py rename PyAres/Demo/{ => Analyzers}/analyzer_test.py (50%) rename PyAres/Demo/{ => Analyzers}/analyzer_test_tools_demo.py (100%) rename PyAres/Demo/{ => Analyzers}/analyzer_wiki.py (100%) rename PyAres/Demo/{ => Devices}/device_test.py (91%) rename PyAres/Demo/{test_device.py => Devices/failure_test_device.py} (96%) rename PyAres/Demo/{ => Devices}/hotplate.py (90%) rename PyAres/Demo/{ => Devices}/random_number_device.py (91%) rename PyAres/Demo/{ => Devices}/rotary_mixer.py (70%) create mode 100644 PyAres/Demo/Planners/airship_planner.py rename PyAres/Demo/{ => Planners}/planner_test.py (100%) rename PyAres/Demo/{ => Planners}/planner_test_tools_demo.py (100%) rename PyAres/Demo/{ => Planners}/planner_wiki.py (100%) create mode 100644 PyAres/Utils/device_status_code_utils.py diff --git a/PyAres/Demo/Analyzers/airship_analyzer.py b/PyAres/Demo/Analyzers/airship_analyzer.py new file mode 100644 index 0000000..8e9be73 --- /dev/null +++ b/PyAres/Demo/Analyzers/airship_analyzer.py @@ -0,0 +1,20 @@ +from PyAres import AresAnalyzerService, AnalysisRequest, Analysis, AresDataType, Outcome + +def analysis_logic(request: AnalysisRequest): + result = int(request.inputs["ShotOutcome"]) + print(f"Analyzing, result is: {result}") + response = Analysis(result=result, outcome=Outcome.SUCCESS) + return response + +if __name__ == "__main__": + print("Welcome to the Airship Analyzer") + + analyzer = AresAnalyzerService( + custom_analysis_logic=analysis_logic, + description="An analyzer for use with the Airship Device", + name="Airship Analyzer", + version="1.0.0") + + analyzer.add_analysis_parameter("ShotOutcome", AresDataType.NUMBER) + + analyzer.start() diff --git a/PyAres/Demo/analyzer_test.py b/PyAres/Demo/Analyzers/analyzer_test.py similarity index 50% rename from PyAres/Demo/analyzer_test.py rename to PyAres/Demo/Analyzers/analyzer_test.py index 019be1b..4eafe1d 100644 --- a/PyAres/Demo/analyzer_test.py +++ b/PyAres/Demo/Analyzers/analyzer_test.py @@ -2,15 +2,26 @@ def analyze(request: AnalysisRequest) -> Analysis: #Custom Analysis Logic - temperature = request.inputs.get("Temperature") - - if not isinstance(temperature, float): - print("Temperature was not a float") - temperature = 0.0 - - print(f"Temperature: {temperature}") - - analysis = Analysis(result=temperature) + temp_one = request.inputs.get("Temperature One") + print("Processing Analysis Request") + + if not isinstance(temp_one, float): + print("Temperature One was not a float") + print(temp_one) + temp_one = 0.0 + + else: + print(f"Temperature One: {temp_one}") + + # if not isinstance(temp_two, float): + # print("Temperature Two was not a float") + # print(temp_two) + # temp_two = 0.0 + + # else: + # print(f"Temperature Two: {temp_two}") + + analysis = Analysis(result=temp_one) return analysis @@ -22,7 +33,8 @@ def analyze(request: AnalysisRequest) -> Analysis: pythonDemoAnalyzer = AresAnalyzerService(analyze, name, version, description) #Add Analysis Parameters - pythonDemoAnalyzer.add_analysis_parameter("Temperature", AresDataType.NUMBER) + # pythonDemoAnalyzer.add_analysis_parameter("Temperature Two", AresDataType.NUMBER) + pythonDemoAnalyzer.add_analysis_parameter("Temperature One", AresDataType.NUMBER) pythonDemoAnalyzer.add_setting(setting_name="", setting_type=AresDataType.NULL, optional=True, constraints=[]) pythonDemoAnalyzer.start(wait_for_termination=True) diff --git a/PyAres/Demo/analyzer_test_tools_demo.py b/PyAres/Demo/Analyzers/analyzer_test_tools_demo.py similarity index 100% rename from PyAres/Demo/analyzer_test_tools_demo.py rename to PyAres/Demo/Analyzers/analyzer_test_tools_demo.py diff --git a/PyAres/Demo/analyzer_wiki.py b/PyAres/Demo/Analyzers/analyzer_wiki.py similarity index 100% rename from PyAres/Demo/analyzer_wiki.py rename to PyAres/Demo/Analyzers/analyzer_wiki.py diff --git a/PyAres/Demo/device_test.py b/PyAres/Demo/Devices/device_test.py similarity index 91% rename from PyAres/Demo/device_test.py rename to PyAres/Demo/Devices/device_test.py index 1cd1b1f..e1b8206 100644 --- a/PyAres/Demo/device_test.py +++ b/PyAres/Demo/Devices/device_test.py @@ -11,10 +11,10 @@ def __init__(self): def set_temperature(self, temperature: float): self.temperature = temperature time.sleep(5) - return {} + return DeviceCommandResponse(None, status_code=StatusCode.COMMAND_SUCCESS) def get_temperature(self): - return { "temperature": self.temperature } + return DeviceCommandResponse(self.temperature, status_code=StatusCode.COMMAND_SUCCESS) def get_device_state(self) -> Dict: state_dict = { "temperature": self.temperature } diff --git a/PyAres/Demo/test_device.py b/PyAres/Demo/Devices/failure_test_device.py similarity index 96% rename from PyAres/Demo/test_device.py rename to PyAres/Demo/Devices/failure_test_device.py index 4a6b4e5..d369ec0 100644 --- a/PyAres/Demo/test_device.py +++ b/PyAres/Demo/Devices/failure_test_device.py @@ -3,7 +3,7 @@ from PyAres import AresDeviceService, AresDataType, DeviceCommandDescriptor, DeviceSchemaEntry -class TestDevice: +class FailureTestDevice: def fail(self): """Intentionally fails so command failure handling can be tested.""" print("[Test Device] Running intentionally failing command...") @@ -26,7 +26,7 @@ def safe_mode(self): if __name__ == "__main__": - test_device = TestDevice() + test_device = FailureTestDevice() service = AresDeviceService( test_device.safe_mode, diff --git a/PyAres/Demo/hotplate.py b/PyAres/Demo/Devices/hotplate.py similarity index 90% rename from PyAres/Demo/hotplate.py rename to PyAres/Demo/Devices/hotplate.py index 42901d2..3e662c0 100644 --- a/PyAres/Demo/hotplate.py +++ b/PyAres/Demo/Devices/hotplate.py @@ -1,4 +1,4 @@ -from PyAres import AresDeviceService, AresDataType, DeviceSchemaEntry, DeviceCommandDescriptor +from PyAres import * # --- PART 1: The Simulated Hardware --- class VirtualHotplate: @@ -8,14 +8,16 @@ def __init__(self): def set_temperature(self, temp: float): """Simulates setting the heater.""" print(f"[Hardware] Heating to {temp}°C...") + response = DeviceCommandResponse(None, status_code=StatusCode.COMMAND_SUCCESS) self.target_temp = temp - return {} # Return empty dict if no data needs to be sent back + return response def get_temperature(self): """Simulates reading the sensor.""" + response = DeviceCommandResponse({ "current_temp": self.target_temp }, status_code=StatusCode.COMMAND_SUCCESS) # In a real device, you'd read a serial port here. print("[Hardware] Retrieving the current temperature...") - return { "current_temp": self.target_temp } + return response def get_state(self): """Required: Tells ARES the current status for logging.""" diff --git a/PyAres/Demo/random_number_device.py b/PyAres/Demo/Devices/random_number_device.py similarity index 91% rename from PyAres/Demo/random_number_device.py rename to PyAres/Demo/Devices/random_number_device.py index d460026..7bbc53a 100644 --- a/PyAres/Demo/random_number_device.py +++ b/PyAres/Demo/Devices/random_number_device.py @@ -1,6 +1,6 @@ import random -from PyAres import AresDeviceService, AresDataType, DeviceSchemaEntry, DeviceCommandDescriptor +from PyAres import AresDeviceService, AresDataType, DeviceSchemaEntry, DeviceCommandDescriptor, DeviceCommandResponse, StatusCode # --- PART 1: The Simulated Hardware --- @@ -12,7 +12,8 @@ def generate_number(self): """Simulates reading a random value from hardware.""" self.last_number = random.randint(1, 100) print(f"[Hardware] Generated random number: {self.last_number}") - return {"random_number": self.last_number} + response = DeviceCommandResponse({"random_number": self.last_number}, status_code=StatusCode.COMMAND_SUCCESS) + return response def get_state(self): """Required: Tells ARES the current status for logging.""" diff --git a/PyAres/Demo/rotary_mixer.py b/PyAres/Demo/Devices/rotary_mixer.py similarity index 70% rename from PyAres/Demo/rotary_mixer.py rename to PyAres/Demo/Devices/rotary_mixer.py index 4f15dea..1d41342 100644 --- a/PyAres/Demo/rotary_mixer.py +++ b/PyAres/Demo/Devices/rotary_mixer.py @@ -1,4 +1,4 @@ -from PyAres import AresDeviceService, AresDataType, DeviceSchemaEntry, DeviceCommandDescriptor +from PyAres import * class RotaryMixer: def __init__(self, speed): @@ -11,12 +11,11 @@ def __init__(self, speed): def set_speed(rpm: float): print(f"Setting motor speed to {rpm}") mixer.speed = rpm - # Hardware communication goes here... - return {} # Return empty dict if no data needs to be sent back + return DeviceCommandResponse(None, status_code=StatusCode.COMMAND_SUCCESS) # Return empty dict if no data needs to be sent back def get_speed(): print("Hey I got speed") - return { "rpm": mixer.speed } + return DeviceCommandResponse(mixer.speed, status_code=StatusCode.COMMAND_SUCCESS) def get_status(): # Return a dictionary matching your state schema @@ -37,14 +36,10 @@ def safe_mode(): # 3. Define the 'Set Speed' Command # Input: One number (Speed) -input_schema = { - "rpm": DeviceSchemaEntry(AresDataType.NUMBER, "Speed in RPM", "RPM") -} +input_schema = { "rpm": DeviceSchemaEntry(AresDataType.NUMBER, "Speed in RPM", "RPM") } cmd_descriptor = DeviceCommandDescriptor("Set Speed", "Sets mixer speed", input_schema, {}) -output_schema = { - "rpm": DeviceSchemaEntry(AresDataType.NUMBER, "Speed in RPM", "RPM") -} +output_schema = { "rpm": DeviceSchemaEntry(AresDataType.NUMBER, "Speed in RPM", "RPM") } get_speed_descriptor = DeviceCommandDescriptor("Get Speed", "Gets mixer speed", {}, output_schema) # 4. Register the command diff --git a/PyAres/Demo/Planners/airship_planner.py b/PyAres/Demo/Planners/airship_planner.py new file mode 100644 index 0000000..f4016cb --- /dev/null +++ b/PyAres/Demo/Planners/airship_planner.py @@ -0,0 +1,187 @@ +from PyAres import AresPlannerService, PlanRequest, PlanResponse, AresDataType +import random +import time +from enum import Enum + +class Planners(Enum): + RANDOM_PLANNER = 1 + TRADITIONAL_PLANNER = 2 + SEARCH_AND_DESTROY_PLANNER = 3 + +def coord_to_tuple(coord: str): + """Converts 'B3' to (1, 2).""" + return (ord(coord[0].upper()) - ord('A'), int(coord[1:]) - 1) + +def tuple_to_coord(tup: tuple): + """Converts (1, 2) to 'B3'.""" + return f"{chr(ord('A') + tup[0])}{tup[1] + 1}" + +def generate_all_coords(board_size=10): + """Generates all coordinates from 'A1' to 'J10'.""" + return [tuple_to_coord((r, c)) for r in range(board_size) for c in range(board_size)] + +def convert_param_history(param_history: list): + converted_history = [] + for value in param_history: + letter = value.planned_value[0] + converted_value = ord(letter) + 1 - ord('A') + converted_history.append((converted_value, int(value.planned_value[1:]))) + return converted_history + +def get_random_shot(param_history: list): + """ Chooses a random, un-shot-at coordinate. """ + converted_param_history = convert_param_history(param_history) + all_possible_shots = [(c, r) for r in range(1,11) for c in range(1, 11)] + available_shots = list(set(all_possible_shots) - set(converted_param_history)) + print(f"{len(available_shots)}/{len(all_possible_shots)}") + + if not available_shots: + return None + + return random.choice(available_shots) + +def get_traditional_search_shot(param_history: list): + """ Uses an extremely basic traditional search algorithm, moving across the board """ + shot_number = len(param_history) + all_possible_shots = [(c, r) for r in range(1,11) for c in range(1, 11)] + return all_possible_shots[shot_number] + +def get_search_and_destroy_shot(request: PlanRequest): + """A 'Hunt/Target' planner for Airship.""" + param = request.parameters[0] + shot_history = [p.planned_value for p in param.param_history] + shot_results = request.analysis_results # 0.0=Miss, 1.0=Hit, 2.0=Sunk + active_hits = [] + + # --- Identify All Active Hits --- + for i in range(len(shot_history)): + coord = shot_history[i] + result = shot_results[i] + + is_sunk = False + for j in range(i, len(shot_history)): + if shot_results[j] == 2.0 and shot_history[j] == coord: + is_sunk = True + active_hits = [] + break + + if result == 1.0 and not is_sunk: + active_hits.append(coord) + + # Get Unique active hits and sort them for consistency + active_hits = sorted(list(set(active_hits))) + next_shot = None + + # --- TARGET MODE --- + if active_hits: + print(f"Target Mode -> Active Hits: {active_hits}") + + is_horizontal = False + is_vertical = False + + if len(active_hits) >= 2: + #Convert the first two active hits to (row, col) tuples + r1, c1 = coord_to_tuple(active_hits[0]) + r2, c2 = coord_to_tuple(active_hits[1]) + + if r1 == r2: + is_horizontal = True + + elif c1 == c2: + is_vertical = True + + potential_targets = [] + + if is_horizontal or is_vertical: + print(f"Orientation Known! {'Horizontal' if is_horizontal else 'Vertical'}") + hit_tuples = [coord_to_tuple(c) for c in active_hits] + + if is_horizontal: + min_col = min(c for r, c in hit_tuples) + max_col = max(c for r, c in hit_tuples) + row = hit_tuples[0][0] + + potential_targets.append((row, min_col - 1)) + potential_targets.append((row, max_col + 1)) + + elif is_vertical: + min_row = min(r for r, c in hit_tuples) + max_row = max(r for r, c in hit_tuples) + col = hit_tuples[0][1] + + potential_targets.append((min_row - 1, col)) + potential_targets.append((max_row + 1, col)) + + else: + active_hit_coord = active_hits[0] + row, col = coord_to_tuple(active_hit_coord) + potential_targets.extend([(row - 1, col), (row + 1, col), (row, col - 1), (row, col + 1)]) + + valid_targets = [] + for r, c in potential_targets: + if 0 <= r < 10 and 0 <= c < 10: + coord = tuple_to_coord((r,c)) + if coord not in shot_history: + valid_targets.append(coord) + + if valid_targets: + next_shot = random.choice(valid_targets) + print(f"Targeting next logical square: {next_shot}") + + # --- HUNT MODE --- + if next_shot is None: + if active_hits: + print("Target mode exhausted (likely cornered the ship). Returning to Hunt Mode... ") + else: + print("Hunt Mode -> Searching for a new target.... ") + + available_shots = list(set(generate_all_coords()) - set(shot_history)) + hunt_candidates = [c for c in available_shots if (coord_to_tuple(c)[0] + coord_to_tuple(c)[1] % 2 == 0)] + + if hunt_candidates: + next_shot = random.choice(hunt_candidates) + + elif available_shots: + next_shot = random.choice(available_shots) + + else: + #Game Over + next_shot = "A1" + + print(f"Planner requesting fire at: {next_shot}") + time.sleep(0.25) + return PlanResponse(parameter_names=[param.name], parameter_values=[next_shot]) + +def plan(request: PlanRequest) -> PlanResponse: + #For an "Airship" game we should only ever have one parameter, which is our coordinate + param = request.parameters[0] + #Get next shot + if(param.planner_name == Planners.RANDOM_PLANNER.name): + shot = get_random_shot(param.param_history) + elif(param.planner_name == Planners.TRADITIONAL_PLANNER.name): + shot = get_traditional_search_shot(param.param_history) + else: + response = get_search_and_destroy_shot(request) + return response + + time.sleep(0.25) + letter = chr(ord('A') - 1 + shot[0]) + shot_string = f"{letter}{shot[1]}" + print(f"Requesting Fire at {shot_string}") + + response = PlanResponse(parameter_names=[param.name], parameter_values=[shot_string]) + return response + +if __name__ == "__main__": + name = "Airship Planner Service" + description = "A planner service that provides some basic algorithms for playing Airship." + planner = AresPlannerService(plan, name, description, "1.0.0", port=8003) + + #Add Supported Types + planner.add_supported_type(AresDataType.STRING) + + planner.add_planner_option(Planners.RANDOM_PLANNER.name, "Randomly shoots at an un-shot-at coordinate", "1.0.0") + planner.add_planner_option(Planners.TRADITIONAL_PLANNER.name, "Follows a very basic traditional search pattern", "1.0.0") + planner.add_planner_option(Planners.SEARCH_AND_DESTROY_PLANNER.name, "Searches for ships with random shots, destroys ships upon locating them.", "1.0.0") + + planner.start() \ No newline at end of file diff --git a/PyAres/Demo/planner_test.py b/PyAres/Demo/Planners/planner_test.py similarity index 100% rename from PyAres/Demo/planner_test.py rename to PyAres/Demo/Planners/planner_test.py diff --git a/PyAres/Demo/planner_test_tools_demo.py b/PyAres/Demo/Planners/planner_test_tools_demo.py similarity index 100% rename from PyAres/Demo/planner_test_tools_demo.py rename to PyAres/Demo/Planners/planner_test_tools_demo.py diff --git a/PyAres/Demo/planner_wiki.py b/PyAres/Demo/Planners/planner_wiki.py similarity index 100% rename from PyAres/Demo/planner_wiki.py rename to PyAres/Demo/Planners/planner_wiki.py diff --git a/PyAres/Device/__init__.py b/PyAres/Device/__init__.py index 707f209..dc4fd30 100644 --- a/PyAres/Device/__init__.py +++ b/PyAres/Device/__init__.py @@ -1,3 +1,5 @@ from .device_models import DeviceCommandDescriptor from .device_models import DeviceSchemaEntry +from .device_models import StatusCode +from .device_models import DeviceCommandResponse from .device_service import AresDeviceService \ No newline at end of file diff --git a/PyAres/Device/device_models.py b/PyAres/Device/device_models.py index 370b788..b1ee00a 100644 --- a/PyAres/Device/device_models.py +++ b/PyAres/Device/device_models.py @@ -1,10 +1,15 @@ -from typing import Dict, Union, Optional +from enum import Enum +from dataclasses import dataclass, field +from typing import Dict, Union, Optional, Any from ..Models import ares_data_models class DeviceSchemaEntry: """ A class that describes an input or output parameter for a device command """ - def __init__(self, type: ares_data_models.AresDataType, description: str = "", unit: str = "", optional: bool = False, + def __init__(self, type: ares_data_models.AresDataType, + description: str = "", + unit: str = "", + optional: bool = False, constraints: Union[list[int], list[float], list[str]] = [], quantity_schema: Optional[ares_data_models.QuantitySchema] = None, struct_schema: Optional[Dict[str, 'DeviceSchemaEntry']] = None, @@ -54,3 +59,20 @@ def __init__(self, name: str, description: str, input_schema: Dict[str, DeviceSc self.description = description self.input_schema = input_schema self.output_schema = output_schema + +class StatusCode(Enum): + STATUS_UNSPECIFIED = 0 + COMMAND_SUCCESS = 1 + SUCCESS_WITH_WARNINGS = 2 + COMMAND_FAILED = 3 + INVALID_COMMAND = 4 + HARDWARE_FAULT = 5 + EMERGENCY_STOP = 6 + OUT_OF_RANGE = 7 + PARAMETERS_UNACHIEVEABLE = 8 + +@dataclass +class DeviceCommandResponse: + response: Union[Dict[str, Any], Any] + error_string: str = "" + status_code: StatusCode = StatusCode.STATUS_UNSPECIFIED diff --git a/PyAres/Device/device_service.py b/PyAres/Device/device_service.py index ac6ace1..a6bb873 100644 --- a/PyAres/Device/device_service.py +++ b/PyAres/Device/device_service.py @@ -14,16 +14,18 @@ from ares_datamodel import ares_struct_pb2 from google.protobuf import empty_pb2 -from .device_models import DeviceCommandDescriptor +from .device_models import DeviceCommandDescriptor, DeviceCommandResponse, StatusCode from ..Utils import ares_device_command_utils from ..Utils import ares_data_schema_utils from ..Utils import ares_struct_utils from ..Utils import ares_value_utils from ..Utils import ares_data_type_utils +from ..Utils import device_status_code_utils # Type hint for the user's custom methods EnterSafeModeMethod = Callable[[], None] -DeviceCommandMethod = Callable[..., Dict[str, Any]] +AllowedReturns = Union[DeviceCommandResponse, Dict[str, Any], Any] +DeviceCommandMethod = Callable[..., AllowedReturns] DeviceStateMethod = Callable[[], Dict[str, Any]] class AresDeviceServiceWrapper(device_service_grpc.AresRemoteDeviceServiceServicer): @@ -85,18 +87,44 @@ def ExecuteCommand(self, request: device_service.ExecuteCommandRequest, context) provided_param_dict = ares_struct_utils.ares_struct_to_dict(request.arguments) try: result : Dict[str, Any] = method(**provided_param_dict) + except Exception as e: response.success = False response.error = f"Command '{request.command_name}' failed: {e}" return response - if isinstance(result, dict): - for key, value in result.items(): + # Modern devices should respond with a device command response + if isinstance(result, DeviceCommandResponse): + response.status_code = device_status_code_utils.python_status_code_to_proto_status_code(result.status_code) + response.success = device_status_code_utils.determine_success(result.status_code) + + if isinstance(result.response, dict): + for key, value in result.response.items(): ares_struct_utils.add_value_to_struct(response.result.struct_value, key, ares_value_utils.create_ares_value(value)) - else: - response.result.CopyFrom(ares_value_utils.create_ares_value(result)) - response.success = True + else: + response.result.CopyFrom(ares_value_utils.create_ares_value(result)) + + # Legacy device responses will only send back the value as the response, ensure backwards compatability + else: + # Keep a backup of the original formatting function + formatwarning_orig = warnings.formatwarning + + # Override it to force the source code line to be empty + warnings.formatwarning = lambda message, category, filename, lineno, line=None: \ + formatwarning_orig(message, category, filename, lineno, line='') + + warnings.warn("Returning raw values or dictionaries directly for device commands is deprecated. The new standard is to return a DeviceCommandResponse object instead. Please consider updating your device to use this standard.", FutureWarning) + + if isinstance(result, dict): + for key, value in result.items(): + ares_struct_utils.add_value_to_struct(response.result.struct_value, key, ares_value_utils.create_ares_value(value)) + + else: + response.result.CopyFrom(ares_value_utils.create_ares_value(result)) + + response.success = True + return response else: diff --git a/PyAres/Planning/planner_models.py b/PyAres/Planning/planner_models.py index e94f143..0dc13f5 100644 --- a/PyAres/Planning/planner_models.py +++ b/PyAres/Planning/planner_models.py @@ -16,7 +16,7 @@ def __init__(self, planned_value: Any, achieved_value: Any): def __str__(self): return (f"ParameterHistoryItem object with:\n" f" planned_value: {self.planned_value}\n" - f" acheived_value: {self.achieved_value}\n") + f" achieved_value: {self.achieved_value}\n") def __repr__(self) -> str: return self.__str__() diff --git a/PyAres/Utils/device_status_code_utils.py b/PyAres/Utils/device_status_code_utils.py new file mode 100644 index 0000000..1d268e9 --- /dev/null +++ b/PyAres/Utils/device_status_code_utils.py @@ -0,0 +1,19 @@ +from ..Device import StatusCode +from ares_datamodel import command_status_code_pb2 +from typing import cast + +def python_status_code_to_proto_status_code(py_value: StatusCode) -> command_status_code_pb2.CommandStatusCode: + """ A method to convert from the python AresDataType class to the protobuf version """ + val = cast( command_status_code_pb2.CommandStatusCode, py_value.value) + return val + +def proto_status_code_to_python_status_code(proto_value: command_status_code_pb2.CommandStatusCode) -> StatusCode: + """ A method to convert from the protobuf AresDataType class to the python version """ + return StatusCode(proto_value) + +def determine_success(code: StatusCode) -> bool: + if code == StatusCode.COMMAND_SUCCESS or code == StatusCode.SUCCESS_WITH_WARNGINGS: + return True + + else: + return False \ No newline at end of file diff --git a/PyAres/__init__.py b/PyAres/__init__.py index 588c9e0..eea2b6d 100644 --- a/PyAres/__init__.py +++ b/PyAres/__init__.py @@ -10,6 +10,8 @@ from .Device import AresDeviceService from .Device import DeviceCommandDescriptor from .Device import DeviceSchemaEntry +from .Device import DeviceCommandResponse +from .Device import StatusCode from .Models import AresDataType from .Models import Outcome from .Models import AresSchemaEntry From ca159aa24539e3fb85a5beee1d71f96f40654c5f Mon Sep 17 00:00:00 2001 From: Nick Kleiner Date: Wed, 10 Jun 2026 14:21:43 -0400 Subject: [PATCH 2/2] Fixed a an error where we provided the full device response instead of the value to the ares value util method --- PyAres/Device/device_service.py | 2 +- PyAres/Utils/device_status_code_utils.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/PyAres/Device/device_service.py b/PyAres/Device/device_service.py index a6bb873..a0ed106 100644 --- a/PyAres/Device/device_service.py +++ b/PyAres/Device/device_service.py @@ -103,7 +103,7 @@ def ExecuteCommand(self, request: device_service.ExecuteCommandRequest, context) ares_struct_utils.add_value_to_struct(response.result.struct_value, key, ares_value_utils.create_ares_value(value)) else: - response.result.CopyFrom(ares_value_utils.create_ares_value(result)) + response.result.CopyFrom(ares_value_utils.create_ares_value(result.response)) # Legacy device responses will only send back the value as the response, ensure backwards compatability else: diff --git a/PyAres/Utils/device_status_code_utils.py b/PyAres/Utils/device_status_code_utils.py index 1d268e9..fbcde35 100644 --- a/PyAres/Utils/device_status_code_utils.py +++ b/PyAres/Utils/device_status_code_utils.py @@ -3,12 +3,12 @@ from typing import cast def python_status_code_to_proto_status_code(py_value: StatusCode) -> command_status_code_pb2.CommandStatusCode: - """ A method to convert from the python AresDataType class to the protobuf version """ + """ A method to convert from the python StatusCode enum class to the protobuf version """ val = cast( command_status_code_pb2.CommandStatusCode, py_value.value) return val def proto_status_code_to_python_status_code(proto_value: command_status_code_pb2.CommandStatusCode) -> StatusCode: - """ A method to convert from the protobuf AresDataType class to the python version """ + """ A method to convert from the protobuf StatusCode enum class to the python version """ return StatusCode(proto_value) def determine_success(code: StatusCode) -> bool: