Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions PyAres/Demo/hotplate.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,19 @@ def safe_mode(self):
service.add_new_command(set_cmd, my_hotplate.set_temperature)

# 4. Define Command: Get Temperature
# This schema tells ARES to expect a number back
# This schema tells ARES to expect a struct back
output_schema = {
"current_temp": DeviceSchemaEntry(AresDataType.NUMBER, "Current Temperature", "Celsius")
"output": DeviceSchemaEntry(
AresDataType.STRUCT,
"Current temperature output",
struct_schema={
"current_temp": DeviceSchemaEntry(
AresDataType.NUMBER,
"Current Temperature",
"Celsius"
)
}
)
}
get_cmd = DeviceCommandDescriptor(
"Get Temp",
Expand All @@ -69,4 +79,4 @@ def safe_mode(self):
# 5. Start the Service
# This will block and listen for ARES connections
print("Virtual Hotplate Service Running...")
service.start()
service.start()
69 changes: 69 additions & 0 deletions PyAres/Demo/random_number_device.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import random

from PyAres import AresDeviceService, AresDataType, DeviceSchemaEntry, DeviceCommandDescriptor


# --- PART 1: The Simulated Hardware ---
class VirtualRandomNumberDevice:
def __init__(self):
self.last_number = 0

def generate_number(self):
"""Simulates reading a random value from hardware."""
self.last_number = random.randint(1, 100)
print(f"[Hardware] Generated random number: {self.last_number}")
return {"random_number": self.last_number}

def get_state(self):
"""Required: Tells ARES the current status for logging."""
return {"last_number": self.last_number}

def safe_mode(self):
"""Required: A safety fallback."""
print("[Hardware] SAFE MODE TRIGGERED: Random number device idle.")


# --- PART 2: The Ares Service Wrapper ---
if __name__ == "__main__":
# 1. Initialize the hardware
my_random_device = VirtualRandomNumberDevice()

# 2. Define the Service Info
service = AresDeviceService(
my_random_device.safe_mode,
my_random_device.get_state,
"My Virtual Random Number Device",
"A simulated device that generates random numbers",
"1.0.0",
port=7101
)

# 3. Define Command: Generate Number
# This schema tells ARES to expect a struct back
output_schema = {
"output": DeviceSchemaEntry(
AresDataType.STRUCT,
"Generated random number output",
struct_schema={
"random_number": DeviceSchemaEntry(
AresDataType.NUMBER,
"Random Number",
"1-100",
min_number_value=1,
max_number_value=100,
)
},
)
}
generate_cmd = DeviceCommandDescriptor(
"Generate Number",
"Generates a random number from 1 to 100",
{},
output_schema,
)
service.add_new_command(generate_cmd, my_random_device.generate_number)

# 4. Start the Service
# This will block and listen for ARES connections
print("Virtual Random Number Device Service Running...")
service.start()
66 changes: 66 additions & 0 deletions PyAres/Demo/test_device.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import random

from PyAres import AresDeviceService, AresDataType, DeviceCommandDescriptor, DeviceSchemaEntry


class TestDevice:
def fail(self):
"""Intentionally fails so command failure handling can be tested."""
print("[Test Device] Running intentionally failing command...")
raise RuntimeError("Intentional test command failure")

def maybe_fail(self):
"""Fails half the time and otherwise returns 20."""
print("[Test Device] Running command with a 50% failure chance...")
if random.random() < 0.5:
raise RuntimeError("Random test command failure")

print("[Test Device] Command succeeded and returned 20.")
return {"number": 20}

def get_state(self):
return {}

def safe_mode(self):
print("[Test Device] Safe mode triggered.")


if __name__ == "__main__":
test_device = TestDevice()

service = AresDeviceService(
test_device.safe_mode,
test_device.get_state,
"Test Device",
"A device with commands for testing ARES failure handling",
"1.0.0",
port=7102,
)

fail_command = DeviceCommandDescriptor(
"Fail",
"Intentionally throws an exception",
{},
{},
)
service.add_new_command(fail_command, test_device.fail)

maybe_fail_output_schema = {
"output": DeviceSchemaEntry(
AresDataType.STRUCT,
"Successful command output",
struct_schema={
"number": DeviceSchemaEntry(AresDataType.NUMBER, "Returned Number")
},
)
}
maybe_fail_command = DeviceCommandDescriptor(
"Maybe Fail",
"Has a 50% chance of failing; otherwise returns 20",
{},
maybe_fail_output_schema,
)
service.add_new_command(maybe_fail_command, test_device.maybe_fail)

print("Test Device Service Running...")
service.start()
7 changes: 6 additions & 1 deletion PyAres/Device/device_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,12 @@ def ExecuteCommand(self, request: device_service.ExecuteCommandRequest, context)

#Convert the protobuf map to a Python dictionary
provided_param_dict = ares_struct_utils.ares_struct_to_dict(request.arguments)
result : Dict[str, Any] = method(**provided_param_dict)
try:
result : Dict[str, Any] = method(**provided_param_dict)
except Exception as e:
response.success = False
response.error = f"Command '{request.command_name}' failed: {e}"
return response

if isinstance(result, dict):
for key, value in result.items():
Expand Down
18 changes: 17 additions & 1 deletion tests/test_ares_device.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,22 @@ def simple_cmd(arg1): return {}
self.assertFalse(resp_mis.success)
self.assertIn("parameter count did not match", resp_mis.error)

def test_command_exception_returns_failed_result(self):
"""Test that command exceptions do not escape the service."""
self.service = AresDeviceService(self.enter_safe_mode_func, self.get_state_func, self.device_name, self.device_desc, self.device_version, port=0)

def fail():
raise RuntimeError("Intentional failure")

desc = DeviceCommandDescriptor("Fail", "Intentionally fails", {}, {})
self.service.add_new_command(desc, fail)

req = device_service.ExecuteCommandRequest(command_name="Fail")
response = self.service._service_wrapper.ExecuteCommand(req, None)

self.assertFalse(response.success)
self.assertIn("Intentional failure", response.error)

def test_state_streaming(self):
"""Test the generator function for state streaming."""
self.service = AresDeviceService(self.enter_safe_mode_func, self.get_state_func, self.device_name, self.device_desc, self.device_version, port=0)
Expand Down Expand Up @@ -167,4 +183,4 @@ def test_safe_mode(self):
self.assertTrue(self.safe_mode_called, "EnterSafeMode did not trigger the user callback")

if __name__ == '__main__':
unittest.main(verbosity=2)
unittest.main(verbosity=2)