Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions PyAres/Demo/Analyzers/airship_analyzer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from PyAres import AresAnalyzerService, AnalysisRequest, Analysis, AresDataType, Outcome

def analysis_logic(request: AnalysisRequest):
result = int(request.inputs["ShotOutcome"])
print(f"Analyzing, result is: {result}")
response = Analysis(result=result, outcome=Outcome.SUCCESS)
return response

if __name__ == "__main__":
print("Welcome to the Airship Analyzer")

analyzer = AresAnalyzerService(
custom_analysis_logic=analysis_logic,
description="An analyzer for use with the Airship Device",
name="Airship Analyzer",
version="1.0.0")

analyzer.add_analysis_parameter("ShotOutcome", AresDataType.NUMBER)

analyzer.start()
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,26 @@

def analyze(request: AnalysisRequest) -> Analysis:
#Custom Analysis Logic
temperature = request.inputs.get("Temperature")

if not isinstance(temperature, float):
print("Temperature was not a float")
temperature = 0.0

print(f"Temperature: {temperature}")

analysis = Analysis(result=temperature)
temp_one = request.inputs.get("Temperature One")
print("Processing Analysis Request")

if not isinstance(temp_one, float):
print("Temperature One was not a float")
print(temp_one)
temp_one = 0.0

else:
print(f"Temperature One: {temp_one}")

# if not isinstance(temp_two, float):
# print("Temperature Two was not a float")
# print(temp_two)
# temp_two = 0.0

# else:
# print(f"Temperature Two: {temp_two}")

analysis = Analysis(result=temp_one)
return analysis


Expand All @@ -22,7 +33,8 @@ def analyze(request: AnalysisRequest) -> Analysis:
pythonDemoAnalyzer = AresAnalyzerService(analyze, name, version, description)

#Add Analysis Parameters
pythonDemoAnalyzer.add_analysis_parameter("Temperature", AresDataType.NUMBER)
# pythonDemoAnalyzer.add_analysis_parameter("Temperature Two", AresDataType.NUMBER)
pythonDemoAnalyzer.add_analysis_parameter("Temperature One", AresDataType.NUMBER)
pythonDemoAnalyzer.add_setting(setting_name="", setting_type=AresDataType.NULL, optional=True, constraints=[])
pythonDemoAnalyzer.start(wait_for_termination=True)

Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ def __init__(self):
def set_temperature(self, temperature: float):
self.temperature = temperature
time.sleep(5)
return {}
return DeviceCommandResponse(None, status_code=StatusCode.COMMAND_SUCCESS)

def get_temperature(self):
return { "temperature": self.temperature }
return DeviceCommandResponse(self.temperature, status_code=StatusCode.COMMAND_SUCCESS)

def get_device_state(self) -> Dict:
state_dict = { "temperature": self.temperature }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from PyAres import AresDeviceService, AresDataType, DeviceCommandDescriptor, DeviceSchemaEntry


class TestDevice:
class FailureTestDevice:
def fail(self):
"""Intentionally fails so command failure handling can be tested."""
print("[Test Device] Running intentionally failing command...")
Expand All @@ -26,7 +26,7 @@ def safe_mode(self):


if __name__ == "__main__":
test_device = TestDevice()
test_device = FailureTestDevice()

service = AresDeviceService(
test_device.safe_mode,
Expand Down
8 changes: 5 additions & 3 deletions PyAres/Demo/hotplate.py → PyAres/Demo/Devices/hotplate.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from PyAres import AresDeviceService, AresDataType, DeviceSchemaEntry, DeviceCommandDescriptor
from PyAres import *

# --- PART 1: The Simulated Hardware ---
class VirtualHotplate:
Expand All @@ -8,14 +8,16 @@ def __init__(self):
def set_temperature(self, temp: float):
"""Simulates setting the heater."""
print(f"[Hardware] Heating to {temp}°C...")
response = DeviceCommandResponse(None, status_code=StatusCode.COMMAND_SUCCESS)
self.target_temp = temp
return {} # Return empty dict if no data needs to be sent back
return response

def get_temperature(self):
"""Simulates reading the sensor."""
response = DeviceCommandResponse({ "current_temp": self.target_temp }, status_code=StatusCode.COMMAND_SUCCESS)
# In a real device, you'd read a serial port here.
print("[Hardware] Retrieving the current temperature...")
return { "current_temp": self.target_temp }
return response

def get_state(self):
"""Required: Tells ARES the current status for logging."""
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import random

from PyAres import AresDeviceService, AresDataType, DeviceSchemaEntry, DeviceCommandDescriptor
from PyAres import AresDeviceService, AresDataType, DeviceSchemaEntry, DeviceCommandDescriptor, DeviceCommandResponse, StatusCode


# --- PART 1: The Simulated Hardware ---
Expand All @@ -12,7 +12,8 @@ def generate_number(self):
"""Simulates reading a random value from hardware."""
self.last_number = random.randint(1, 100)
print(f"[Hardware] Generated random number: {self.last_number}")
return {"random_number": self.last_number}
response = DeviceCommandResponse({"random_number": self.last_number}, status_code=StatusCode.COMMAND_SUCCESS)
return response

def get_state(self):
"""Required: Tells ARES the current status for logging."""
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from PyAres import AresDeviceService, AresDataType, DeviceSchemaEntry, DeviceCommandDescriptor
from PyAres import *

class RotaryMixer:
def __init__(self, speed):
Expand All @@ -11,12 +11,11 @@ def __init__(self, speed):
def set_speed(rpm: float):
print(f"Setting motor speed to {rpm}")
mixer.speed = rpm
# Hardware communication goes here...
return {} # Return empty dict if no data needs to be sent back
return DeviceCommandResponse(None, status_code=StatusCode.COMMAND_SUCCESS) # Return empty dict if no data needs to be sent back

def get_speed():
print("Hey I got speed")
return { "rpm": mixer.speed }
return DeviceCommandResponse(mixer.speed, status_code=StatusCode.COMMAND_SUCCESS)

def get_status():
# Return a dictionary matching your state schema
Expand All @@ -37,14 +36,10 @@ def safe_mode():

# 3. Define the 'Set Speed' Command
# Input: One number (Speed)
input_schema = {
"rpm": DeviceSchemaEntry(AresDataType.NUMBER, "Speed in RPM", "RPM")
}
input_schema = { "rpm": DeviceSchemaEntry(AresDataType.NUMBER, "Speed in RPM", "RPM") }
cmd_descriptor = DeviceCommandDescriptor("Set Speed", "Sets mixer speed", input_schema, {})

output_schema = {
"rpm": DeviceSchemaEntry(AresDataType.NUMBER, "Speed in RPM", "RPM")
}
output_schema = { "rpm": DeviceSchemaEntry(AresDataType.NUMBER, "Speed in RPM", "RPM") }
get_speed_descriptor = DeviceCommandDescriptor("Get Speed", "Gets mixer speed", {}, output_schema)

# 4. Register the command
Expand Down
187 changes: 187 additions & 0 deletions PyAres/Demo/Planners/airship_planner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
from PyAres import AresPlannerService, PlanRequest, PlanResponse, AresDataType
import random
import time
from enum import Enum

class Planners(Enum):
RANDOM_PLANNER = 1
TRADITIONAL_PLANNER = 2
SEARCH_AND_DESTROY_PLANNER = 3

def coord_to_tuple(coord: str):
"""Converts 'B3' to (1, 2)."""
return (ord(coord[0].upper()) - ord('A'), int(coord[1:]) - 1)

def tuple_to_coord(tup: tuple):
"""Converts (1, 2) to 'B3'."""
return f"{chr(ord('A') + tup[0])}{tup[1] + 1}"

def generate_all_coords(board_size=10):
"""Generates all coordinates from 'A1' to 'J10'."""
return [tuple_to_coord((r, c)) for r in range(board_size) for c in range(board_size)]

def convert_param_history(param_history: list):
converted_history = []
for value in param_history:
letter = value.planned_value[0]
converted_value = ord(letter) + 1 - ord('A')
converted_history.append((converted_value, int(value.planned_value[1:])))
return converted_history

def get_random_shot(param_history: list):
""" Chooses a random, un-shot-at coordinate. """
converted_param_history = convert_param_history(param_history)
all_possible_shots = [(c, r) for r in range(1,11) for c in range(1, 11)]
available_shots = list(set(all_possible_shots) - set(converted_param_history))
print(f"{len(available_shots)}/{len(all_possible_shots)}")

if not available_shots:
return None

return random.choice(available_shots)

def get_traditional_search_shot(param_history: list):
""" Uses an extremely basic traditional search algorithm, moving across the board """
shot_number = len(param_history)
all_possible_shots = [(c, r) for r in range(1,11) for c in range(1, 11)]
return all_possible_shots[shot_number]

def get_search_and_destroy_shot(request: PlanRequest):
"""A 'Hunt/Target' planner for Airship."""
param = request.parameters[0]
shot_history = [p.planned_value for p in param.param_history]
shot_results = request.analysis_results # 0.0=Miss, 1.0=Hit, 2.0=Sunk
active_hits = []

# --- Identify All Active Hits ---
for i in range(len(shot_history)):
coord = shot_history[i]
result = shot_results[i]

is_sunk = False
for j in range(i, len(shot_history)):
if shot_results[j] == 2.0 and shot_history[j] == coord:
is_sunk = True
active_hits = []
break

if result == 1.0 and not is_sunk:
active_hits.append(coord)

# Get Unique active hits and sort them for consistency
active_hits = sorted(list(set(active_hits)))
next_shot = None

# --- TARGET MODE ---
if active_hits:
print(f"Target Mode -> Active Hits: {active_hits}")

is_horizontal = False
is_vertical = False

if len(active_hits) >= 2:
#Convert the first two active hits to (row, col) tuples
r1, c1 = coord_to_tuple(active_hits[0])
r2, c2 = coord_to_tuple(active_hits[1])

if r1 == r2:
is_horizontal = True

elif c1 == c2:
is_vertical = True

potential_targets = []

if is_horizontal or is_vertical:
print(f"Orientation Known! {'Horizontal' if is_horizontal else 'Vertical'}")
hit_tuples = [coord_to_tuple(c) for c in active_hits]

if is_horizontal:
min_col = min(c for r, c in hit_tuples)
max_col = max(c for r, c in hit_tuples)
row = hit_tuples[0][0]

potential_targets.append((row, min_col - 1))
potential_targets.append((row, max_col + 1))

elif is_vertical:
min_row = min(r for r, c in hit_tuples)
max_row = max(r for r, c in hit_tuples)
col = hit_tuples[0][1]

potential_targets.append((min_row - 1, col))
potential_targets.append((max_row + 1, col))

else:
active_hit_coord = active_hits[0]
row, col = coord_to_tuple(active_hit_coord)
potential_targets.extend([(row - 1, col), (row + 1, col), (row, col - 1), (row, col + 1)])

valid_targets = []
for r, c in potential_targets:
if 0 <= r < 10 and 0 <= c < 10:
coord = tuple_to_coord((r,c))
if coord not in shot_history:
valid_targets.append(coord)

if valid_targets:
next_shot = random.choice(valid_targets)
print(f"Targeting next logical square: {next_shot}")

# --- HUNT MODE ---
if next_shot is None:
if active_hits:
print("Target mode exhausted (likely cornered the ship). Returning to Hunt Mode... ")
else:
print("Hunt Mode -> Searching for a new target.... ")

available_shots = list(set(generate_all_coords()) - set(shot_history))
hunt_candidates = [c for c in available_shots if (coord_to_tuple(c)[0] + coord_to_tuple(c)[1] % 2 == 0)]

if hunt_candidates:
next_shot = random.choice(hunt_candidates)

elif available_shots:
next_shot = random.choice(available_shots)

else:
#Game Over
next_shot = "A1"

print(f"Planner requesting fire at: {next_shot}")
time.sleep(0.25)
return PlanResponse(parameter_names=[param.name], parameter_values=[next_shot])

def plan(request: PlanRequest) -> PlanResponse:
#For an "Airship" game we should only ever have one parameter, which is our coordinate
param = request.parameters[0]
#Get next shot
if(param.planner_name == Planners.RANDOM_PLANNER.name):
shot = get_random_shot(param.param_history)
elif(param.planner_name == Planners.TRADITIONAL_PLANNER.name):
shot = get_traditional_search_shot(param.param_history)
else:
response = get_search_and_destroy_shot(request)
return response

time.sleep(0.25)
letter = chr(ord('A') - 1 + shot[0])
shot_string = f"{letter}{shot[1]}"
print(f"Requesting Fire at {shot_string}")

response = PlanResponse(parameter_names=[param.name], parameter_values=[shot_string])
return response

if __name__ == "__main__":
name = "Airship Planner Service"
description = "A planner service that provides some basic algorithms for playing Airship."
planner = AresPlannerService(plan, name, description, "1.0.0", port=8003)

#Add Supported Types
planner.add_supported_type(AresDataType.STRING)

planner.add_planner_option(Planners.RANDOM_PLANNER.name, "Randomly shoots at an un-shot-at coordinate", "1.0.0")
planner.add_planner_option(Planners.TRADITIONAL_PLANNER.name, "Follows a very basic traditional search pattern", "1.0.0")
planner.add_planner_option(Planners.SEARCH_AND_DESTROY_PLANNER.name, "Searches for ships with random shots, destroys ships upon locating them.", "1.0.0")

planner.start()
File renamed without changes.
File renamed without changes.
2 changes: 2 additions & 0 deletions PyAres/Device/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from .device_models import DeviceCommandDescriptor
from .device_models import DeviceSchemaEntry
from .device_models import StatusCode
from .device_models import DeviceCommandResponse
from .device_service import AresDeviceService
Loading