Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 95 additions & 5 deletions e2e/helpers/mock_fcc.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,26 @@ def _build_huawei_response(
return bytes(pk)


def _build_huawei_short_response(
mcast_ip_be: bytes,
result_code: int = 1,
resp_type: int = 2,
first_seq: int = 0,
bitrate_kbps: int = 8748,
) -> bytes:
"""Build Huawei FCC short response (FMT 6, PT 205). 24 bytes."""
pk = bytearray(24)
pk[0] = 0x80 | _HUAWEI_FMT_RESP
pk[1] = 205
struct.pack_into("!H", pk, 2, 24 // 4 - 1)
pk[8:12] = mcast_ip_be
pk[12] = result_code
struct.pack_into("!H", pk, 14, resp_type)
struct.pack_into("!H", pk, 16, first_seq)
struct.pack_into("!H", pk, 20, bitrate_kbps)
return bytes(pk)


def _build_huawei_sync(mcast_ip_be: bytes) -> bytes:
"""Build Huawei FCC sync notification (FMT 8, PT 205). 12 bytes."""
pk = bytearray(12)
Expand Down Expand Up @@ -153,6 +173,22 @@ class MockFCCServer:
sync_after : int
Send sync notification after this many unicast packets.
Set to 0 to never send sync (unicast only test).
huawei_response_format : str
"full" (36-byte response) or "short" (24-byte non-NAT response).
redirect_to : tuple[str, int] | None
If set, send a redirect response to this FCC server instead of starting
unicast.
first_unicast_seq : int
Initial sequence number for mock unicast RTP packets and Huawei short
response.
bitrate_kbps : int
Bitrate value returned in Huawei short responses.
telecom_signal_port : int
Optional Telecom signal port returned in the FCC response.
telecom_media_port : int
Optional Telecom media port returned in the FCC response.
telecom_fcc_ip : str | None
Optional Telecom FCC IP returned in the FCC response.
"""

def __init__(
Expand All @@ -162,12 +198,26 @@ def __init__(
protocol: str = "telecom",
unicast_pps: int = 300,
sync_after: int = 100,
huawei_response_format: str = "full",
redirect_to: tuple[str, int] | None = None,
first_unicast_seq: int = 0,
bitrate_kbps: int = 8748,
telecom_signal_port: int = 0,
telecom_media_port: int = 0,
telecom_fcc_ip: str | None = None,
):
self.port = port or find_free_udp_port()
self.mcast_addr = mcast_addr
self.protocol = protocol
self.unicast_pps = unicast_pps
self.sync_after = sync_after
self.huawei_response_format = huawei_response_format
self.redirect_to = redirect_to
self.first_unicast_seq = first_unicast_seq
self.bitrate_kbps = bitrate_kbps
self.telecom_signal_port = telecom_signal_port
self.telecom_media_port = telecom_media_port
self.telecom_fcc_ip = telecom_fcc_ip

self._mcast_ip_be = socket.inet_aton(mcast_addr)
self._sock: socket.socket | None = None
Expand All @@ -176,6 +226,7 @@ def __init__(

# Observable state
self.requests_received = 0
self.request_client_addrs: list[tuple[str, int]] = []
self.terminations_received = 0
self.unicast_packets_sent = 0

Expand Down Expand Up @@ -215,6 +266,7 @@ def _loop(self) -> None:

if pt == 205 and fmt == self._fmt_req:
self.requests_received += 1
self.request_client_addrs.append(addr)
self._handle_request(addr)
elif pt == 205 and fmt == self._fmt_term:
self.terminations_received += 1
Expand All @@ -224,14 +276,47 @@ def _loop(self) -> None:
def _handle_request(self, client_addr: tuple[str, int]) -> None:
"""Send response then start unicast streaming in a background thread."""
assert self._sock is not None
if self.protocol == "huawei":
if self.protocol == "huawei" and self.redirect_to is not None:
redirect_ip, redirect_port = self.redirect_to
resp = _build_huawei_response(
self._mcast_ip_be,
resp_type=3,
server_ip_be=socket.inet_aton(redirect_ip),
server_port=redirect_port,
)
elif self.protocol == "telecom" and self.redirect_to is not None:
redirect_ip, redirect_port = self.redirect_to
resp = _build_telecom_response(
self._mcast_ip_be,
resp_type=3,
signal_port=redirect_port,
fcc_ip=struct.unpack("!I", socket.inet_aton(redirect_ip))[0],
)
elif self.protocol == "huawei" and self.huawei_response_format == "short":
resp = _build_huawei_short_response(
self._mcast_ip_be,
first_seq=self.first_unicast_seq,
bitrate_kbps=self.bitrate_kbps,
)
elif self.protocol == "huawei":
resp = _build_huawei_response(self._mcast_ip_be)
else:
resp = _build_telecom_response(self._mcast_ip_be)
telecom_fcc_ip = 0
if self.telecom_fcc_ip is not None:
telecom_fcc_ip = struct.unpack("!I", socket.inet_aton(self.telecom_fcc_ip))[0]
resp = _build_telecom_response(
self._mcast_ip_be,
signal_port=self.telecom_signal_port,
media_port=self.telecom_media_port,
fcc_ip=telecom_fcc_ip,
)

for _ in range(3):
self._sock.sendto(resp, client_addr)

if self.redirect_to is not None:
return

# Start unicast sender in a separate thread so the receive loop
# keeps processing incoming packets (e.g. termination).
t = threading.Thread(
Expand All @@ -245,20 +330,25 @@ def _send_unicast(self, client_addr: tuple[str, int]) -> None:
"""Stream RTP unicast packets to the client."""
assert self._sock is not None
interval = 1.0 / self.unicast_pps
seq = 0
send_addr = client_addr
if self.protocol == "huawei" and client_addr[1] > 0:
send_addr = (client_addr[0], client_addr[1] - 1)
seq = self.first_unicast_seq
ts = 0
sent_count = 0
while not self._stop.is_set():
pkt = make_rtp_packet(seq, ts)
try:
self._sock.sendto(pkt, client_addr)
self._sock.sendto(pkt, send_addr)
except OSError:
break
self.unicast_packets_sent += 1
sent_count += 1
seq = (seq + 1) & 0xFFFF
ts = (ts + 3600) & 0xFFFFFFFF

# Send sync notification after configured packet count
if self.sync_after > 0 and seq == self.sync_after:
if self.sync_after > 0 and sent_count == self.sync_after:
if self.protocol == "huawei":
sync_pk = _build_huawei_sync(self._mcast_ip_be)
else:
Expand Down
171 changes: 171 additions & 0 deletions e2e/test_fcc.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
R2HProcess,
find_free_port,
find_free_udp_port,
find_free_udp_port_pair,
stream_get,
)

Expand Down Expand Up @@ -73,6 +74,66 @@ def test_fcc_unicast_stream(self, shared_r2h):
finally:
fcc.stop()

def test_fcc_single_socket_keeps_media_when_signal_port_changes(self, shared_r2h):
"""Telecom single socket should not filter RTP by the control port."""
mcast_port = find_free_udp_port()
fcc = MockFCCServer(
mcast_addr=MCAST_ADDR,
protocol="telecom",
unicast_pps=300,
sync_after=0,
telecom_signal_port=find_free_udp_port(),
telecom_fcc_ip="127.0.0.1",
)
fcc.start()
try:
status, _, body = stream_get(
"127.0.0.1",
shared_r2h.port,
f"/rtp/{MCAST_ADDR}:{mcast_port}?fcc=127.0.0.1:{fcc.port}",
read_bytes=4096,
timeout=_FCC_STREAM_TIMEOUT,
)
assert status == 200
assert len(body) > 0, "Expected to receive unicast stream data"
assert body[0] == 0x47, f"Expected TS sync byte 0x47, got 0x{body[0]:02x}"
assert fcc.requests_received >= 1
finally:
fcc.stop()

def test_fcc_redirect_to_unicast_stream(self, shared_r2h):
"""Telecom duplicate redirect responses should not consume redirect budget."""
mcast_port = find_free_udp_port()
target = MockFCCServer(
mcast_addr=MCAST_ADDR,
protocol="telecom",
unicast_pps=300,
sync_after=0,
)
redirect = MockFCCServer(
mcast_addr=MCAST_ADDR,
protocol="telecom",
redirect_to=("127.0.0.1", target.port),
)
target.start()
redirect.start()
try:
status, _, body = stream_get(
"127.0.0.1",
shared_r2h.port,
f"/rtp/{MCAST_ADDR}:{mcast_port}?fcc=127.0.0.1:{redirect.port}",
read_bytes=4096,
timeout=_FCC_STREAM_TIMEOUT,
)
assert status == 200
assert len(body) > 0, "Expected to receive redirected unicast stream data"
assert body[0] == 0x47, f"Expected TS sync byte 0x47, got 0x{body[0]:02x}"
assert redirect.requests_received >= 1
assert target.requests_received >= 1
finally:
redirect.stop()
target.stop()

def test_fcc_with_multicast_transition(self, shared_r2h):
"""FCC unicast followed by sync notification triggers multicast join."""
mcast_port = find_free_udp_port()
Expand Down Expand Up @@ -167,6 +228,116 @@ def test_huawei_fcc_unicast_stream(self, shared_r2h):
finally:
fcc.stop()

def test_huawei_fcc_short_response_unicast_stream(self, shared_r2h):
"""Huawei short non-NAT response sends RTP to the media socket."""
mcast_port = find_free_udp_port()
fcc = MockFCCServer(
mcast_addr=MCAST_ADDR,
protocol="huawei",
unicast_pps=300,
sync_after=0,
huawei_response_format="short",
first_unicast_seq=0xD3AD,
)
fcc.start()
try:
url = f"/rtp/{MCAST_ADDR}:{mcast_port}?fcc=127.0.0.1:{fcc.port}&fcc-type=huawei"
status, _, body = stream_get(
"127.0.0.1",
shared_r2h.port,
url,
read_bytes=4096,
timeout=_FCC_STREAM_TIMEOUT,
)
assert status == 200
assert len(body) > 0, "Expected to receive Huawei short-response unicast data"
assert body[0] == 0x47, f"Expected TS sync byte 0x47, got 0x{body[0]:02x}"
assert fcc.requests_received >= 1
finally:
fcc.stop()

def test_huawei_fcc_socket_pair_stays_inside_listen_range(self, r2h_binary):
"""Huawei double sockets use N/N+1 within fcc-listen-port-range."""
mcast_port = find_free_udp_port()
media_port, signal_port = find_free_udp_port_pair()
r2h_port = find_free_port()
r2h = R2HProcess(
r2h_binary,
r2h_port,
extra_args=[
"-v",
"4",
"-m",
"100",
"-r",
LOOPBACK_IF,
"-P",
f"{media_port}-{signal_port}",
],
)
fcc = MockFCCServer(
mcast_addr=MCAST_ADDR,
protocol="huawei",
unicast_pps=300,
sync_after=0,
huawei_response_format="short",
)
r2h.start()
fcc.start()
try:
url = f"/rtp/{MCAST_ADDR}:{mcast_port}?fcc=127.0.0.1:{fcc.port}&fcc-type=huawei"
status, _, body = stream_get(
"127.0.0.1",
r2h.port,
url,
read_bytes=4096,
timeout=_FCC_STREAM_TIMEOUT,
)
assert status == 200
assert len(body) > 0, "Expected to receive Huawei range-bound unicast data"
assert body[0] == 0x47, f"Expected TS sync byte 0x47, got 0x{body[0]:02x}"
assert fcc.request_client_addrs
assert fcc.request_client_addrs[0][1] == signal_port
finally:
fcc.stop()
r2h.stop()

def test_huawei_fcc_redirect_to_short_response(self, shared_r2h):
"""Huawei redirect followed by short non-NAT response should receive unicast."""
mcast_port = find_free_udp_port()
target = MockFCCServer(
mcast_addr=MCAST_ADDR,
protocol="huawei",
unicast_pps=300,
sync_after=0,
huawei_response_format="short",
first_unicast_seq=0x4319,
)
redirect = MockFCCServer(
mcast_addr=MCAST_ADDR,
protocol="huawei",
redirect_to=("127.0.0.1", target.port),
)
target.start()
redirect.start()
try:
url = f"/rtp/{MCAST_ADDR}:{mcast_port}?fcc=127.0.0.1:{redirect.port}&fcc-type=huawei"
status, _, body = stream_get(
"127.0.0.1",
shared_r2h.port,
url,
read_bytes=4096,
timeout=_FCC_STREAM_TIMEOUT,
)
assert status == 200
assert len(body) > 0, "Expected redirected Huawei short-response unicast data"
assert body[0] == 0x47, f"Expected TS sync byte 0x47, got 0x{body[0]:02x}"
assert redirect.requests_received >= 1
assert target.requests_received >= 1
finally:
redirect.stop()
target.stop()

def test_huawei_fcc_with_multicast_transition(self, shared_r2h):
"""Huawei FCC unicast followed by sync triggers multicast join."""
mcast_port = find_free_udp_port()
Expand Down
Loading
Loading