Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions modules/test/protocol/bin/get_bacnet_i-am_packets.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#!/bin/bash

# Copyright 2026 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

#!/bin/bash

CAPTURE_FILE="$1"
SRC_IP="$2"

TSHARK_FILTER="bacnet && ip.src == $SRC_IP"
TSHARK_OUTPUT="-T json -e ip.src -e ip.dst -e eth.src -e eth.dst -e bacapp.instance_number -e _ws.col.Info"

response=$(tshark -r "$CAPTURE_FILE" $TSHARK_OUTPUT -Y "$TSHARK_FILTER")

echo "$response"
111 changes: 94 additions & 17 deletions modules/test/protocol/python/src/protocol_bacnet.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
"""Module to run all the BACnet related methods for testing"""

import BAC0
from bacpypes3.pdu import Address
from bacpypes3.app import DeviceInfo
from bacpypes3.basetypes import Segmentation
from dataclasses import dataclass
import logging
import json
Expand Down Expand Up @@ -64,35 +67,72 @@ def __init__(self,
self.device_hw_addr = device_hw_addr
self._bin_dir = bin_dir

async def discover(self, local_ip):
async def discover(self, local_ip, device_ip):
LOGGER.info('Performing BACnet discovery...')
self.bacnet = BAC0.lite(local_ip)
self.devices = []
self.bacnet = BAC0.connect(local_ip)
LOGGER.info('Local BACnet object: ' + str(self.bacnet))
try:
await self.bacnet._discover(global_broadcast=True) # pylint: disable=protected-access
await self.bacnet._discover(global_broadcast=True, timeout=10) # pylint: disable=protected-access
except Exception as e: # pylint: disable=W0718
LOGGER.error(e)
LOGGER.info('BACnet discovery complete')
with open(BAC0_LOG, 'r', encoding='utf-8') as f:
bac0_log = f.read()
LOGGER.info('BAC0 Log:\n' + bac0_log)
# Extract discovered devices as a BACnetDevice.
self.devices = []
LOGGER.info('discoveredDevices: ' + str(self.bacnet.discoveredDevices))
if self.bacnet.discoveredDevices is not None:
for device_info in self.bacnet.discoveredDevices.values():
self.devices.append(
BACnetDevice(
device = BACnetDevice(
device_id=str(device_info['object_instance'][1]),
ip=str(device_info['address'])
)
)
LOGGER.info(f'Discovered BACnet device: {device}')
self.devices.append(device)
LOGGER.info('BACnet devices found: ' + str(len(self.devices)))
if not self.devices:
try:
await self.bacnet._discover(timeout=10) # pylint: disable=protected-access
except Exception as e: # pylint: disable=W0718
LOGGER.error(e)
LOGGER.info('BACnet discovery complete')
with open(BAC0_LOG, 'r', encoding='utf-8') as f:
bac0_log = f.read()
LOGGER.info('BAC0 Log:\n' + bac0_log)
LOGGER.info('discoveredDevices: ' + str(self.bacnet.discoveredDevices))
if self.bacnet.discoveredDevices is not None:
for device_info in self.bacnet.discoveredDevices.values():
device = BACnetDevice(
device_id=str(device_info['object_instance'][1]),
ip=str(device_info['address'])
)
self.devices.append(device)
LOGGER.info(f'Discovered BACnet device: {device}')
if not self.devices:
res = await self.bacnet.this_application.app.who_is(
low_limit=0,
high_limit=4194303,
address=Address(f'{device_ip}:47808'),
timeout=10,
)
for iam in res:
instance = iam.iAmDeviceIdentifier[1]
address = str(iam.pduSource)
device = BACnetDevice(
device_id=str(instance),
ip=str(address)
)
self.devices.append(device)
if not self.devices:
self.devices = self._discover_from_packets(device_ip)

# Check if the device being tested is in the discovered devices list
# discover needs to be called before this method is invoked
def validate_device(self):
result = None
description = ''

try:
if len(self.devices) > 0:
result = True
Expand All @@ -119,18 +159,31 @@ async def validate_protocol_version(
LOGGER.info(
f'Resolving protocol version for BACnet device: {device.device_id}'
)
version = None
revision = None
ip = device.ip
d_id = device.device_id
try:
dut = await BAC0.device(
device.ip,
device.device_id,
self.bacnet
)
version = await dut.read_property(
('device', device.device_id, 'protocolVersion')
)
revision = await dut.read_property(
('device', device.device_id, 'protocolRevision')
dev_info = DeviceInfo(
device_instance=int(d_id),
device_address=Address(ip),
max_apdu_length_accepted=1476,
segmentation_supported=Segmentation.noSegmentation,
vendor_identifier=0
)
await self.bacnet.this_application.app.device_info_cache.set_device_info(
dev_info
)
LOGGER.info(f'Manually injected device {d_id} {ip} into BAC0 cache.')
except Exception as cache_err:
LOGGER.warning(f'Failed to pre-populate BAC0 cache: {cache_err}' )
try:
cmd = f'{ip} device {d_id} protocolVersion protocolRevision'
results = await self.bacnet.readMultiple(cmd)
LOGGER.info(f'BACnet readMultiple results: {results}')
if len(results) == 2:
version = results[0]
revision = results[1]
if version is None or revision is None:
result = False
result_description = (
Expand Down Expand Up @@ -194,3 +247,27 @@ def get_bacnet_packets(
command = f'{bin_file} {args}'
response = util.run_command(command)
return json.loads(response[0].strip())

def _discover_from_packets(self, device_ip: str) -> list[BACnetDevice]:
discovered = set()
capture_file = os.path.join(self._captures_dir, self._capture_file)
LOGGER.info(f'Discovering BACnet devices from packets in {capture_file}...')
bin_file = self._bin_dir + '/get_bacnet_i-am_packets.sh'
args = f'"{capture_file}" {device_ip}'
command = f'{bin_file} {args}'
response = util.run_command(command)
packets = json.loads(response[0].strip())
for packet in packets:
info = packet['_source']['layers']['_ws.col.info'][0]
if 'i-Am' in info:
discovered.add(
(
packet['_source']['layers']['bacapp.instance_number'][0],
packet['_source']['layers']['ip.src'][0]
)
)
LOGGER.info(f'Discovered BACnet devices from packets: {discovered}')
return [
BACnetDevice(device_id=device_id, ip=ip)
for device_id, ip in discovered
]
22 changes: 10 additions & 12 deletions modules/test/protocol/python/src/protocol_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,11 @@ class ProtocolModule(TestModule):

def __init__(self, module):
self._supports_bacnet = False
self._bacnet_loop = None
super().__init__(module_name=module, log_name=LOG_NAME)
global LOGGER
LOGGER = self._get_logger()
self._bacnet = BACnet(log=LOGGER, device_hw_addr=self._device_mac)

def _get_bacnet_loop(self):
if self._bacnet_loop is None:
self._bacnet_loop = asyncio.new_event_loop()
return self._bacnet_loop

def _protocol_valid_bacnet(self):
LOGGER.info('Running protocol.valid_bacnet')
result = None
Expand All @@ -53,8 +47,15 @@ def _protocol_valid_bacnet(self):
local_address = self.get_local_ip(interface_name)
if local_address:
local_address += '/24'
self._get_bacnet_loop().run_until_complete(
self._bacnet.discover(local_address))
try:
loop = asyncio.get_running_loop()
loop.run_until_complete(
self._bacnet.discover(local_address, self._device_ipv4_addr)
)
except RuntimeError:
asyncio.run(self._bacnet.discover(
local_address, self._device_ipv4_addr)
)
result = self._bacnet.validate_device()
if result[0]:
self._supports_bacnet = True
Expand All @@ -81,11 +82,8 @@ def _protocol_bacnet_version(self):
if len(self._bacnet.devices) > 0:
for device in self._bacnet.devices:
LOGGER.debug(f'Checking BACnet version for device: {device}')
loop = self._get_bacnet_loop()
result_status, result_description = \
loop.run_until_complete(
self._bacnet.validate_protocol_version(device)
)
self._bacnet.validate_protocol_version(device)
break

LOGGER.info(result_description)
Expand Down
Loading