From e4d101cdf3e21e00ed96e4eaec454e4ab210bc38 Mon Sep 17 00:00:00 2001 From: Stackie Jia Date: Thu, 7 May 2026 02:47:52 +0800 Subject: [PATCH] fix(fcc): support huawei short response with double sockets --- e2e/helpers/mock_fcc.py | 100 +++++++++++- e2e/test_fcc.py | 171 ++++++++++++++++++++ src/fcc.c | 338 ++++++++++++++++++++++++++++++++-------- src/fcc.h | 5 +- src/fcc_huawei.c | 144 +++++++++++------ src/fcc_telecom.c | 7 + src/stream.c | 7 +- 7 files changed, 651 insertions(+), 121 deletions(-) diff --git a/e2e/helpers/mock_fcc.py b/e2e/helpers/mock_fcc.py index 36c5dec..8e1e95d 100644 --- a/e2e/helpers/mock_fcc.py +++ b/e2e/helpers/mock_fcc.py @@ -122,6 +122,26 @@ def _build_huawei_response( return bytes(pk) +def _build_huawei_short_response( + mcast_ip_be: bytes, + result_code: int = 1, + resp_type: int = 2, + first_seq: int = 0, + bitrate_kbps: int = 8748, +) -> bytes: + """Build Huawei FCC short response (FMT 6, PT 205). 24 bytes.""" + pk = bytearray(24) + pk[0] = 0x80 | _HUAWEI_FMT_RESP + pk[1] = 205 + struct.pack_into("!H", pk, 2, 24 // 4 - 1) + pk[8:12] = mcast_ip_be + pk[12] = result_code + struct.pack_into("!H", pk, 14, resp_type) + struct.pack_into("!H", pk, 16, first_seq) + struct.pack_into("!H", pk, 20, bitrate_kbps) + return bytes(pk) + + def _build_huawei_sync(mcast_ip_be: bytes) -> bytes: """Build Huawei FCC sync notification (FMT 8, PT 205). 12 bytes.""" pk = bytearray(12) @@ -153,6 +173,22 @@ class MockFCCServer: sync_after : int Send sync notification after this many unicast packets. Set to 0 to never send sync (unicast only test). + huawei_response_format : str + "full" (36-byte response) or "short" (24-byte non-NAT response). + redirect_to : tuple[str, int] | None + If set, send a redirect response to this FCC server instead of starting + unicast. + first_unicast_seq : int + Initial sequence number for mock unicast RTP packets and Huawei short + response. + bitrate_kbps : int + Bitrate value returned in Huawei short responses. + telecom_signal_port : int + Optional Telecom signal port returned in the FCC response. + telecom_media_port : int + Optional Telecom media port returned in the FCC response. + telecom_fcc_ip : str | None + Optional Telecom FCC IP returned in the FCC response. """ def __init__( @@ -162,12 +198,26 @@ def __init__( protocol: str = "telecom", unicast_pps: int = 300, sync_after: int = 100, + huawei_response_format: str = "full", + redirect_to: tuple[str, int] | None = None, + first_unicast_seq: int = 0, + bitrate_kbps: int = 8748, + telecom_signal_port: int = 0, + telecom_media_port: int = 0, + telecom_fcc_ip: str | None = None, ): self.port = port or find_free_udp_port() self.mcast_addr = mcast_addr self.protocol = protocol self.unicast_pps = unicast_pps self.sync_after = sync_after + self.huawei_response_format = huawei_response_format + self.redirect_to = redirect_to + self.first_unicast_seq = first_unicast_seq + self.bitrate_kbps = bitrate_kbps + self.telecom_signal_port = telecom_signal_port + self.telecom_media_port = telecom_media_port + self.telecom_fcc_ip = telecom_fcc_ip self._mcast_ip_be = socket.inet_aton(mcast_addr) self._sock: socket.socket | None = None @@ -176,6 +226,7 @@ def __init__( # Observable state self.requests_received = 0 + self.request_client_addrs: list[tuple[str, int]] = [] self.terminations_received = 0 self.unicast_packets_sent = 0 @@ -215,6 +266,7 @@ def _loop(self) -> None: if pt == 205 and fmt == self._fmt_req: self.requests_received += 1 + self.request_client_addrs.append(addr) self._handle_request(addr) elif pt == 205 and fmt == self._fmt_term: self.terminations_received += 1 @@ -224,14 +276,47 @@ def _loop(self) -> None: def _handle_request(self, client_addr: tuple[str, int]) -> None: """Send response then start unicast streaming in a background thread.""" assert self._sock is not None - if self.protocol == "huawei": + if self.protocol == "huawei" and self.redirect_to is not None: + redirect_ip, redirect_port = self.redirect_to + resp = _build_huawei_response( + self._mcast_ip_be, + resp_type=3, + server_ip_be=socket.inet_aton(redirect_ip), + server_port=redirect_port, + ) + elif self.protocol == "telecom" and self.redirect_to is not None: + redirect_ip, redirect_port = self.redirect_to + resp = _build_telecom_response( + self._mcast_ip_be, + resp_type=3, + signal_port=redirect_port, + fcc_ip=struct.unpack("!I", socket.inet_aton(redirect_ip))[0], + ) + elif self.protocol == "huawei" and self.huawei_response_format == "short": + resp = _build_huawei_short_response( + self._mcast_ip_be, + first_seq=self.first_unicast_seq, + bitrate_kbps=self.bitrate_kbps, + ) + elif self.protocol == "huawei": resp = _build_huawei_response(self._mcast_ip_be) else: - resp = _build_telecom_response(self._mcast_ip_be) + telecom_fcc_ip = 0 + if self.telecom_fcc_ip is not None: + telecom_fcc_ip = struct.unpack("!I", socket.inet_aton(self.telecom_fcc_ip))[0] + resp = _build_telecom_response( + self._mcast_ip_be, + signal_port=self.telecom_signal_port, + media_port=self.telecom_media_port, + fcc_ip=telecom_fcc_ip, + ) for _ in range(3): self._sock.sendto(resp, client_addr) + if self.redirect_to is not None: + return + # Start unicast sender in a separate thread so the receive loop # keeps processing incoming packets (e.g. termination). t = threading.Thread( @@ -245,20 +330,25 @@ def _send_unicast(self, client_addr: tuple[str, int]) -> None: """Stream RTP unicast packets to the client.""" assert self._sock is not None interval = 1.0 / self.unicast_pps - seq = 0 + send_addr = client_addr + if self.protocol == "huawei" and client_addr[1] > 0: + send_addr = (client_addr[0], client_addr[1] - 1) + seq = self.first_unicast_seq ts = 0 + sent_count = 0 while not self._stop.is_set(): pkt = make_rtp_packet(seq, ts) try: - self._sock.sendto(pkt, client_addr) + self._sock.sendto(pkt, send_addr) except OSError: break self.unicast_packets_sent += 1 + sent_count += 1 seq = (seq + 1) & 0xFFFF ts = (ts + 3600) & 0xFFFFFFFF # Send sync notification after configured packet count - if self.sync_after > 0 and seq == self.sync_after: + if self.sync_after > 0 and sent_count == self.sync_after: if self.protocol == "huawei": sync_pk = _build_huawei_sync(self._mcast_ip_be) else: diff --git a/e2e/test_fcc.py b/e2e/test_fcc.py index 3cb43a4..2079486 100644 --- a/e2e/test_fcc.py +++ b/e2e/test_fcc.py @@ -18,6 +18,7 @@ R2HProcess, find_free_port, find_free_udp_port, + find_free_udp_port_pair, stream_get, ) @@ -73,6 +74,66 @@ def test_fcc_unicast_stream(self, shared_r2h): finally: fcc.stop() + def test_fcc_single_socket_keeps_media_when_signal_port_changes(self, shared_r2h): + """Telecom single socket should not filter RTP by the control port.""" + mcast_port = find_free_udp_port() + fcc = MockFCCServer( + mcast_addr=MCAST_ADDR, + protocol="telecom", + unicast_pps=300, + sync_after=0, + telecom_signal_port=find_free_udp_port(), + telecom_fcc_ip="127.0.0.1", + ) + fcc.start() + try: + status, _, body = stream_get( + "127.0.0.1", + shared_r2h.port, + f"/rtp/{MCAST_ADDR}:{mcast_port}?fcc=127.0.0.1:{fcc.port}", + read_bytes=4096, + timeout=_FCC_STREAM_TIMEOUT, + ) + assert status == 200 + assert len(body) > 0, "Expected to receive unicast stream data" + assert body[0] == 0x47, f"Expected TS sync byte 0x47, got 0x{body[0]:02x}" + assert fcc.requests_received >= 1 + finally: + fcc.stop() + + def test_fcc_redirect_to_unicast_stream(self, shared_r2h): + """Telecom duplicate redirect responses should not consume redirect budget.""" + mcast_port = find_free_udp_port() + target = MockFCCServer( + mcast_addr=MCAST_ADDR, + protocol="telecom", + unicast_pps=300, + sync_after=0, + ) + redirect = MockFCCServer( + mcast_addr=MCAST_ADDR, + protocol="telecom", + redirect_to=("127.0.0.1", target.port), + ) + target.start() + redirect.start() + try: + status, _, body = stream_get( + "127.0.0.1", + shared_r2h.port, + f"/rtp/{MCAST_ADDR}:{mcast_port}?fcc=127.0.0.1:{redirect.port}", + read_bytes=4096, + timeout=_FCC_STREAM_TIMEOUT, + ) + assert status == 200 + assert len(body) > 0, "Expected to receive redirected unicast stream data" + assert body[0] == 0x47, f"Expected TS sync byte 0x47, got 0x{body[0]:02x}" + assert redirect.requests_received >= 1 + assert target.requests_received >= 1 + finally: + redirect.stop() + target.stop() + def test_fcc_with_multicast_transition(self, shared_r2h): """FCC unicast followed by sync notification triggers multicast join.""" mcast_port = find_free_udp_port() @@ -167,6 +228,116 @@ def test_huawei_fcc_unicast_stream(self, shared_r2h): finally: fcc.stop() + def test_huawei_fcc_short_response_unicast_stream(self, shared_r2h): + """Huawei short non-NAT response sends RTP to the media socket.""" + mcast_port = find_free_udp_port() + fcc = MockFCCServer( + mcast_addr=MCAST_ADDR, + protocol="huawei", + unicast_pps=300, + sync_after=0, + huawei_response_format="short", + first_unicast_seq=0xD3AD, + ) + fcc.start() + try: + url = f"/rtp/{MCAST_ADDR}:{mcast_port}?fcc=127.0.0.1:{fcc.port}&fcc-type=huawei" + status, _, body = stream_get( + "127.0.0.1", + shared_r2h.port, + url, + read_bytes=4096, + timeout=_FCC_STREAM_TIMEOUT, + ) + assert status == 200 + assert len(body) > 0, "Expected to receive Huawei short-response unicast data" + assert body[0] == 0x47, f"Expected TS sync byte 0x47, got 0x{body[0]:02x}" + assert fcc.requests_received >= 1 + finally: + fcc.stop() + + def test_huawei_fcc_socket_pair_stays_inside_listen_range(self, r2h_binary): + """Huawei double sockets use N/N+1 within fcc-listen-port-range.""" + mcast_port = find_free_udp_port() + media_port, signal_port = find_free_udp_port_pair() + r2h_port = find_free_port() + r2h = R2HProcess( + r2h_binary, + r2h_port, + extra_args=[ + "-v", + "4", + "-m", + "100", + "-r", + LOOPBACK_IF, + "-P", + f"{media_port}-{signal_port}", + ], + ) + fcc = MockFCCServer( + mcast_addr=MCAST_ADDR, + protocol="huawei", + unicast_pps=300, + sync_after=0, + huawei_response_format="short", + ) + r2h.start() + fcc.start() + try: + url = f"/rtp/{MCAST_ADDR}:{mcast_port}?fcc=127.0.0.1:{fcc.port}&fcc-type=huawei" + status, _, body = stream_get( + "127.0.0.1", + r2h.port, + url, + read_bytes=4096, + timeout=_FCC_STREAM_TIMEOUT, + ) + assert status == 200 + assert len(body) > 0, "Expected to receive Huawei range-bound unicast data" + assert body[0] == 0x47, f"Expected TS sync byte 0x47, got 0x{body[0]:02x}" + assert fcc.request_client_addrs + assert fcc.request_client_addrs[0][1] == signal_port + finally: + fcc.stop() + r2h.stop() + + def test_huawei_fcc_redirect_to_short_response(self, shared_r2h): + """Huawei redirect followed by short non-NAT response should receive unicast.""" + mcast_port = find_free_udp_port() + target = MockFCCServer( + mcast_addr=MCAST_ADDR, + protocol="huawei", + unicast_pps=300, + sync_after=0, + huawei_response_format="short", + first_unicast_seq=0x4319, + ) + redirect = MockFCCServer( + mcast_addr=MCAST_ADDR, + protocol="huawei", + redirect_to=("127.0.0.1", target.port), + ) + target.start() + redirect.start() + try: + url = f"/rtp/{MCAST_ADDR}:{mcast_port}?fcc=127.0.0.1:{redirect.port}&fcc-type=huawei" + status, _, body = stream_get( + "127.0.0.1", + shared_r2h.port, + url, + read_bytes=4096, + timeout=_FCC_STREAM_TIMEOUT, + ) + assert status == 200 + assert len(body) > 0, "Expected redirected Huawei short-response unicast data" + assert body[0] == 0x47, f"Expected TS sync byte 0x47, got 0x{body[0]:02x}" + assert redirect.requests_received >= 1 + assert target.requests_received >= 1 + finally: + redirect.stop() + target.stop() + def test_huawei_fcc_with_multicast_transition(self, shared_r2h): """Huawei FCC unicast followed by sync triggers multicast join.""" mcast_port = find_free_udp_port() diff --git a/src/fcc.c b/src/fcc.c index ac7ab04..1bb4366 100644 --- a/src/fcc.c +++ b/src/fcc.c @@ -23,46 +23,250 @@ static void log_fcc_state_transition(fcc_state_t from, fcc_state_t to, const cha static int fcc_send_term_packet(fcc_session_t *fcc, service_t *service, uint16_t seqn, const char *reason); static int fcc_send_termination_message(stream_context_t *ctx, uint16_t mcast_seqn); -static int fcc_bind_socket_with_range(int sock, struct sockaddr_in *sin) { - if (!sin) +static int fcc_create_configured_socket(service_t *service, const char *upstream_if, const char *label) { + int sock = socket(AF_INET, service->fcc_addr->ai_socktype, service->fcc_addr->ai_protocol); + if (sock < 0) { + logger(LOG_ERROR, "FCC: Failed to create %s socket: %s", label, strerror(errno)); return -1; + } + + if (connection_set_nonblocking(sock) < 0) { + logger(LOG_ERROR, "FCC: Failed to set %s socket non-blocking: %s", label, strerror(errno)); + close(sock); + return -1; + } + if (set_socket_rcvbuf(sock, config.udp_rcvbuf_size) < 0) { + logger(LOG_WARN, "FCC: Failed to set %s SO_RCVBUF to %d: %s", label, config.udp_rcvbuf_size, strerror(errno)); + } + + bind_to_upstream_interface(sock, upstream_if); + + return sock; +} + +typedef int (*fcc_bind_port_fn)(int port, void *opaque); + +typedef struct { + int sock; + struct sockaddr_in *sin; +} fcc_single_bind_attempt_t; + +typedef struct { + stream_context_t *ctx; + const char *upstream_if; + int media_sock; + int signal_sock; +} fcc_double_bind_attempt_t; + +static int fcc_get_bind_port_range(int paired_port_offset, int *min_port, int *max_port, int *use_ephemeral) { + if (!min_port || !max_port || !use_ephemeral) { + return -1; + } + + *use_ephemeral = 0; if (config.fcc_listen_port_min <= 0 || config.fcc_listen_port_max <= 0) { - sin->sin_port = 0; - return bind(sock, (struct sockaddr *)sin, sizeof(*sin)); + if (paired_port_offset == 0) { + *use_ephemeral = 1; + *min_port = 0; + *max_port = 0; + return 0; + } + + *min_port = 10000; + *max_port = 65535; + } else { + *min_port = config.fcc_listen_port_min; + *max_port = config.fcc_listen_port_max; } - int min_port = config.fcc_listen_port_min; - int max_port = config.fcc_listen_port_max; - if (max_port < min_port) { - int tmp = min_port; - min_port = max_port; - max_port = tmp; + if (*max_port < *min_port) { + int tmp = *min_port; + *min_port = *max_port; + *max_port = tmp; } - int range = max_port - min_port + 1; - if (range <= 0) - range = 1; + *max_port -= paired_port_offset; + + int max_allowed = 65535 - paired_port_offset; + if (*max_port > max_allowed) { + *max_port = max_allowed; + } + if (*min_port < 1) { + *min_port = 1; + } + + return *max_port >= *min_port ? 0 : -1; +} + +static int fcc_try_bind_port_range(int paired_port_offset, const char *label, fcc_bind_port_fn bind_port, + void *opaque) { + int min_port; + int max_port; + int use_ephemeral; + + if (fcc_get_bind_port_range(paired_port_offset, &min_port, &max_port, &use_ephemeral) < 0) { + logger(LOG_ERROR, "FCC: Invalid %s port range", label); + return -1; + } + + if (use_ephemeral) { + return bind_port(0, opaque); + } + int range = max_port - min_port + 1; int start_offset = (int)(get_time_ms() % range); for (int i = 0; i < range; i++) { int port = min_port + ((start_offset + i) % range); - sin->sin_port = htons((uint16_t)port); - if (bind(sock, (struct sockaddr *)sin, sizeof(*sin)) == 0) { - logger(LOG_DEBUG, "FCC: Bound client socket to port %d", port); + int r = bind_port(port, opaque); + if (r == 0) { return 0; } + if (r < -1) { + return -1; + } + } + + logger(LOG_ERROR, "FCC: Unable to bind %s within port range %d-%d", label, min_port, max_port); + return -1; +} + +static int fcc_bind_single_socket_port(int port, void *opaque) { + fcc_single_bind_attempt_t *attempt = (fcc_single_bind_attempt_t *)opaque; + + attempt->sin->sin_port = htons((uint16_t)port); + if (bind(attempt->sock, (struct sockaddr *)attempt->sin, sizeof(*attempt->sin)) == 0) { + if (port != 0) { + logger(LOG_DEBUG, "FCC: Bound client socket to port %d", port); + } + return 0; + } + + if (errno != EADDRINUSE && errno != EACCES) { + logger(LOG_DEBUG, "FCC: Failed to bind port %d: %s", port, strerror(errno)); + } + return -1; +} + +static int fcc_register_socket(stream_context_t *ctx, int sock, const char *label) { + if (poller_add(ctx->epoll_fd, sock, POLLER_IN) < 0) { + logger(LOG_ERROR, "FCC: Failed to add %s socket to poller: %s", label, strerror(errno)); + return -1; + } + + fdmap_set(sock, ctx->conn); + logger(LOG_DEBUG, "FCC: %s socket registered with poller", label); + return 0; +} + +static int fcc_bind_double_socket_port(int media_port, void *opaque) { + fcc_double_bind_attempt_t *attempt = (fcc_double_bind_attempt_t *)opaque; + stream_context_t *ctx = attempt->ctx; + fcc_session_t *fcc = &ctx->fcc; + service_t *service = ctx->service; + int signal_port = media_port + 1; + + if (attempt->media_sock < 0) { + attempt->media_sock = fcc_create_configured_socket(service, attempt->upstream_if, "media"); + } + if (attempt->media_sock < 0) { + return -2; + } - if (errno != EADDRINUSE && errno != EACCES) { - logger(LOG_DEBUG, "FCC: Failed to bind port %d: %s", port, strerror(errno)); + struct sockaddr_in media_sin; + memset(&media_sin, 0, sizeof(media_sin)); + media_sin.sin_family = AF_INET; + media_sin.sin_addr.s_addr = INADDR_ANY; + media_sin.sin_port = htons((uint16_t)media_port); + + int media_bound = bind(attempt->media_sock, (struct sockaddr *)&media_sin, sizeof(media_sin)); + int media_errno = errno; + if (media_bound < 0) { + if (media_errno != EADDRINUSE && media_errno != EACCES) { + logger(LOG_DEBUG, "FCC: Failed to bind media port %d: %s", media_port, strerror(media_errno)); } + return -1; } - logger(LOG_ERROR, "FCC: Unable to bind socket within configured port range %d-%d", min_port, max_port); + if (attempt->signal_sock < 0) { + attempt->signal_sock = fcc_create_configured_socket(service, attempt->upstream_if, "signal"); + } + if (attempt->signal_sock < 0) { + close(attempt->media_sock); + attempt->media_sock = -1; + return -2; + } + + struct sockaddr_in signal_sin; + memset(&signal_sin, 0, sizeof(signal_sin)); + signal_sin.sin_family = AF_INET; + signal_sin.sin_addr.s_addr = INADDR_ANY; + signal_sin.sin_port = htons((uint16_t)signal_port); + + int signal_bound = bind(attempt->signal_sock, (struct sockaddr *)&signal_sin, sizeof(signal_sin)); + int signal_errno = errno; + if (signal_bound == 0) { + fcc->media_sock = attempt->media_sock; + fcc->fcc_sock = attempt->signal_sock; + attempt->media_sock = -1; + attempt->signal_sock = -1; + fcc->fcc_server = (struct sockaddr_in *)(uintptr_t)service->fcc_addr->ai_addr; + + socklen_t media_len = sizeof(fcc->media_client); + socklen_t signal_len = sizeof(fcc->fcc_client); + getsockname(fcc->media_sock, (struct sockaddr *)&fcc->media_client, &media_len); + getsockname(fcc->fcc_sock, (struct sockaddr *)&fcc->fcc_client, &signal_len); + + logger(LOG_DEBUG, "FCC: Bound media socket to port %u, signal socket to port %u", ntohs(fcc->media_client.sin_port), + ntohs(fcc->fcc_client.sin_port)); + return 0; + } + + if (signal_errno != EADDRINUSE && signal_errno != EACCES) { + logger(LOG_DEBUG, "FCC: Failed to bind signal port %d: %s", signal_port, strerror(signal_errno)); + } + + close(attempt->media_sock); + attempt->media_sock = -1; return -1; } +static int fcc_initialize_double_sockets(stream_context_t *ctx, const char *upstream_if) { + fcc_session_t *fcc = &ctx->fcc; + fcc_double_bind_attempt_t attempt = {.ctx = ctx, .upstream_if = upstream_if, .media_sock = -1, .signal_sock = -1}; + + if (fcc_try_bind_port_range(1, "media/signal socket pair", fcc_bind_double_socket_port, &attempt) < 0) { + if (attempt.media_sock >= 0) { + close(attempt.media_sock); + } + if (attempt.signal_sock >= 0) { + close(attempt.signal_sock); + } + return -1; + } + + if (fcc_register_socket(ctx, fcc->fcc_sock, "signal") < 0) { + close(fcc->media_sock); + close(fcc->fcc_sock); + fcc->media_sock = -1; + fcc->fcc_sock = -1; + fcc->fcc_server = NULL; + return -1; + } + + if (fcc_register_socket(ctx, fcc->media_sock, "media") < 0) { + worker_cleanup_socket_from_epoll(ctx->epoll_fd, fcc->fcc_sock); + close(fcc->media_sock); + fcc->media_sock = -1; + fcc->fcc_sock = -1; + fcc->fcc_server = NULL; + return -1; + } + + return 0; +} + ssize_t sendto_triple(int fd, const void *buf, size_t n, int flags, struct sockaddr_in *addr, socklen_t addr_len) { static uint8_t i; for (i = 0; i < 3; i++) { @@ -99,6 +303,13 @@ void fcc_session_cleanup(fcc_session_t *fcc, service_t *service, int epoll_fd) { fcc->pending_list_head = NULL; fcc->pending_list_tail = NULL; + /* Close media socket */ + if (fcc->media_sock >= 0) { + worker_cleanup_socket_from_epoll(epoll_fd, fcc->media_sock); + fcc->media_sock = -1; + logger(LOG_DEBUG, "FCC: Media socket closed"); + } + /* Close FCC socket */ if (fcc->fcc_sock >= 0) { worker_cleanup_socket_from_epoll(epoll_fd, fcc->fcc_sock); @@ -115,6 +326,7 @@ void fcc_session_cleanup(fcc_session_t *fcc, service_t *service, int epoll_fd) { /* Clear client address structure */ memset(&fcc->fcc_client, 0, sizeof(fcc->fcc_client)); + memset(&fcc->media_client, 0, sizeof(fcc->media_client)); /* Mark as not initialized */ fcc->initialized = 0; @@ -189,8 +401,13 @@ static bool is_rtcp_packet(const uint8_t *data, size_t len) { return packet_len > 0 && packet_len <= len; } -int fcc_handle_socket_event(stream_context_t *ctx, int64_t now) { +int fcc_handle_socket_event(stream_context_t *ctx, int fd, int64_t now) { fcc_session_t *fcc = &ctx->fcc; + int recv_sock = fd; + + if (recv_sock < 0) { + return 0; + } /* Drain all available packets for edge-triggered pollers (epoll EPOLLET / kqueue EV_CLEAR) * where the read event fires only once per data arrival. */ @@ -206,13 +423,12 @@ int fcc_handle_socket_event(stream_context_t *ctx, int64_t now) { fcc->last_data_time = now; /* Drain the socket to prevent event loop spinning */ uint8_t dummy[BUFFER_POOL_BUFFER_SIZE]; - recvfrom(fcc->fcc_sock, dummy, sizeof(dummy), 0, NULL, NULL); + recvfrom(recv_sock, dummy, sizeof(dummy), 0, NULL, NULL); return 0; } /* Receive directly into zero-copy buffer (true zero-copy receive) */ - int actualr = - recvfrom(fcc->fcc_sock, recv_buf->data, BUFFER_POOL_BUFFER_SIZE, 0, (struct sockaddr *)&peer_addr, &slen); + int actualr = recvfrom(recv_sock, recv_buf->data, BUFFER_POOL_BUFFER_SIZE, 0, (struct sockaddr *)&peer_addr, &slen); if (actualr < 0) { buffer_ref_put(recv_buf); if (errno != EAGAIN) @@ -278,6 +494,7 @@ void fcc_session_init(fcc_session_t *fcc) { fcc->initialized = 1; fcc->state = FCC_STATE_INIT; fcc->fcc_sock = -1; + fcc->media_sock = -1; fcc->status_index = -1; fcc->redirect_count = 0; } @@ -326,54 +543,45 @@ int fcc_initialize_and_request(stream_context_t *ctx) { logger(LOG_DEBUG, "FCC: Initializing FCC session and sending request"); if (fcc->fcc_sock < 0) { - /* Create and configure FCC socket */ - fcc->fcc_sock = socket(AF_INET, service->fcc_addr->ai_socktype, service->fcc_addr->ai_protocol); - if (fcc->fcc_sock < 0) { - logger(LOG_ERROR, "FCC: Failed to create socket: %s", strerror(errno)); - return -1; - } - - /* Set socket to non-blocking mode for poller */ - if (connection_set_nonblocking(fcc->fcc_sock) < 0) { - logger(LOG_ERROR, "FCC: Failed to set socket non-blocking: %s", strerror(errno)); - close(fcc->fcc_sock); - fcc->fcc_sock = -1; - return -1; - } + upstream_if = get_upstream_interface_for_fcc(service->ifname, service->ifname_fcc); - /* Set receive buffer size */ - if (set_socket_rcvbuf(fcc->fcc_sock, config.udp_rcvbuf_size) < 0) { - logger(LOG_WARN, "FCC: Failed to set SO_RCVBUF to %d: %s", config.udp_rcvbuf_size, strerror(errno)); - } + if (fcc->type == FCC_TYPE_HUAWEI) { + if (fcc_initialize_double_sockets(ctx, upstream_if) < 0) { + logger(LOG_ERROR, "FCC (Huawei): Cannot bind media/signal socket pair"); + return -1; + } + } else { + /* Create and configure FCC socket */ + fcc->fcc_sock = fcc_create_configured_socket(service, upstream_if, "client"); + if (fcc->fcc_sock < 0) { + return -1; + } - upstream_if = get_upstream_interface_for_fcc(service->ifname, service->ifname_fcc); - bind_to_upstream_interface(fcc->fcc_sock, upstream_if); - - /* Bind to configured or ephemeral port */ - sin.sin_family = AF_INET; - sin.sin_addr.s_addr = INADDR_ANY; - if (fcc_bind_socket_with_range(fcc->fcc_sock, &sin) != 0) { - logger(LOG_ERROR, "FCC: Cannot bind socket within configured range"); - close(fcc->fcc_sock); - fcc->fcc_sock = -1; - return -1; - } + /* Bind to configured or ephemeral port */ + memset(&sin, 0, sizeof(sin)); + sin.sin_family = AF_INET; + sin.sin_addr.s_addr = INADDR_ANY; + fcc_single_bind_attempt_t bind_attempt = {.sock = fcc->fcc_sock, .sin = &sin}; + if (fcc_try_bind_port_range(0, "client socket", fcc_bind_single_socket_port, &bind_attempt) != 0) { + logger(LOG_ERROR, "FCC: Cannot bind socket within configured range"); + close(fcc->fcc_sock); + fcc->fcc_sock = -1; + return -1; + } - /* Get the assigned local address */ - slen = sizeof(fcc->fcc_client); - getsockname(fcc->fcc_sock, (struct sockaddr *)&fcc->fcc_client, &slen); + /* Get the assigned local address */ + slen = sizeof(fcc->fcc_client); + getsockname(fcc->fcc_sock, (struct sockaddr *)&fcc->fcc_client, &slen); - fcc->fcc_server = (struct sockaddr_in *)(uintptr_t)service->fcc_addr->ai_addr; + fcc->fcc_server = (struct sockaddr_in *)(uintptr_t)service->fcc_addr->ai_addr; - /* Register socket with poller immediately after creation */ - if (poller_add(ctx->epoll_fd, fcc->fcc_sock, POLLER_IN) < 0) { - logger(LOG_ERROR, "FCC: Failed to add socket to poller: %s", strerror(errno)); - close(fcc->fcc_sock); - fcc->fcc_sock = -1; - return -1; + /* Register socket with poller immediately after creation */ + if (fcc_register_socket(ctx, fcc->fcc_sock, "client") < 0) { + close(fcc->fcc_sock); + fcc->fcc_sock = -1; + return -1; + } } - fdmap_set(fcc->fcc_sock, ctx->conn); - logger(LOG_DEBUG, "FCC: Socket registered with poller"); } /* Send FCC request - different format for Huawei vs Telecom */ diff --git a/src/fcc.h b/src/fcc.h index 8217d1d..8724738 100644 --- a/src/fcc.h +++ b/src/fcc.h @@ -49,8 +49,10 @@ typedef struct { int status_index; /* Index in status_shared->clients array for state updates */ int fcc_sock; + int media_sock; /* Huawei media socket; unused by Telecom/ZTE/Fiberhome */ struct sockaddr_in *fcc_server; struct sockaddr_in fcc_client; + struct sockaddr_in media_client; uint16_t media_port; /* RTP media port (network byte order, for direct comparison with sin_port) */ bool verify_server_ip; /* Verify server ip before processing packets */ @@ -114,10 +116,11 @@ int fcc_session_tick(stream_context_t *ctx, int64_t now); * Handle FCC socket events (receive and process packets) * * @param ctx Stream context + * @param fd Ready FCC socket descriptor * @param now Current timestamp in milliseconds * @return 0 on success, -1 on error */ -int fcc_handle_socket_event(stream_context_t *ctx, int64_t now); +int fcc_handle_socket_event(stream_context_t *ctx, int fd, int64_t now); /** * Set FCC session state with logging and status update diff --git a/src/fcc_huawei.c b/src/fcc_huawei.c index e3d4ab5..a67dc8c 100644 --- a/src/fcc_huawei.c +++ b/src/fcc_huawei.c @@ -5,6 +5,7 @@ #include #include #include +#include #include #include @@ -179,9 +180,8 @@ int fcc_huawei_handle_server_response(stream_context_t *ctx, uint8_t *buf, int b memcpy(&type_be, buf + 14, sizeof(type_be)); uint16_t type = ntohs(type_be); // 1=no unicast, 2=unicast, 3=redirect - logger(LOG_DEBUG, "FCC (Huawei): Response received: result=%u, type=%u", result_code, type); - if (result_code != 1) { + logger(LOG_DEBUG, "FCC Response: FMT=6, result=%u, type=%u", result_code, type); logger(LOG_WARN, "FCC (Huawei): Server response error (result=%u), falling back to " "multicast", @@ -193,63 +193,96 @@ int fcc_huawei_handle_server_response(stream_context_t *ctx, uint8_t *buf, int b if (type == 1) { /* No need for unicast, join multicast immediately */ + logger(LOG_DEBUG, "FCC Response: FMT=6, result=%u, type=%u", result_code, type); logger(LOG_INFO, "FCC (Huawei): Server says no unicast needed, joining multicast"); fcc_session_set_state(fcc, FCC_STATE_MCAST_ACTIVE, "No unicast needed"); mcast_session_join(&ctx->mcast, ctx); } else if (type == 2) { /* Server will send unicast stream */ - if (buf_len < 36) { + if (buf_len < 24) { logger(LOG_WARN, "FCC (Huawei): response too short for unicast fields (%d bytes)", buf_len); fcc_session_set_state(fcc, FCC_STATE_MCAST_ACTIVE, "Short response"); mcast_session_join(&ctx->mcast, ctx); return 0; } - uint8_t nat_flag = buf[24]; - uint8_t need_nat_traversal = (nat_flag << 2) >> 7; // Extract bit 5 - uint16_t server_port_be; - memcpy(&server_port_be, buf + 26, sizeof(server_port_be)); - uint32_t server_ip_be; - memcpy(&server_ip_be, buf + 32, sizeof(server_ip_be)); - - /* Extract session ID for NAT traversal */ - { - uint32_t session_id_be; - memcpy(&session_id_be, buf + 28, sizeof(session_id_be)); - fcc->session_id = ntohl(session_id_be); + uint16_t first_seq_be; + uint16_t bitrate_be; + uint16_t first_seq; + uint16_t bitrate_kbps; + char bitrate_str[32]; + memcpy(&first_seq_be, buf + 16, sizeof(first_seq_be)); + memcpy(&bitrate_be, buf + 20, sizeof(bitrate_be)); + + first_seq = ntohs(first_seq_be); + bitrate_kbps = ntohs(bitrate_be); + if (bitrate_kbps >= 1024) { + snprintf(bitrate_str, sizeof(bitrate_str), "%.2f Mbps", bitrate_kbps / 1024.0); + } else { + snprintf(bitrate_str, sizeof(bitrate_str), "%u Kbps", bitrate_kbps); } - if (need_nat_traversal == 1 && fcc->session_id != 0) { - /* NAT traversal supported - update server address and send NAT packet */ - fcc->need_nat_traversal = 1; - - /* Update unicast server IP and media port (keep fcc_server->sin_port as - * control port7) */ - if (server_ip_be != 0) { - fcc->fcc_server->sin_addr.s_addr = server_ip_be; - fcc->verify_server_ip = true; - } - if (server_port_be != 0) { - fcc->media_port = server_port_be; - } - - /* Build and send NAT traversal packet (FMT 12) to punch hole in NAT */ - uint8_t *nat_pk = build_fcc_nat_pk_huawei(fcc->session_id); - - /* Send NAT packet to media port (for NAT hole punching on RTP port) */ - struct sockaddr_in media_addr; - memcpy(&media_addr, fcc->fcc_server, sizeof(media_addr)); - if (fcc->media_port != 0) { - media_addr.sin_port = fcc->media_port; + if (buf_len >= 36) { + uint8_t nat_flag = buf[24]; + uint8_t need_nat_traversal = (nat_flag << 2) >> 7; // Extract bit 5 + uint16_t server_port_be; + memcpy(&server_port_be, buf + 26, sizeof(server_port_be)); + uint32_t server_ip_be; + memcpy(&server_ip_be, buf + 32, sizeof(server_ip_be)); + + /* Extract session ID for NAT traversal */ + { + uint32_t session_id_be; + memcpy(&session_id_be, buf + 28, sizeof(session_id_be)); + fcc->session_id = ntohl(session_id_be); } - int r = sendto_triple(fcc->fcc_sock, nat_pk, FCC_PK_LEN_NAT_HUAWEI, 0, (struct sockaddr_in *)&media_addr, - sizeof(media_addr)); - if (r < 0) { - logger(LOG_ERROR, "FCC (Huawei): Failed to send NAT packet: %s", strerror(errno)); + struct in_addr server_addr; + char server_ip_str[INET_ADDRSTRLEN]; + server_addr.s_addr = server_ip_be; + inet_ntop(AF_INET, &server_addr, server_ip_str, sizeof(server_ip_str)); + logger(LOG_DEBUG, + "FCC Response: FMT=6, result=%u, type=%u, first_seq=%u, bitrate=%s, " + "nat_support=%u, server_port=%u, session_id=0x%08x, server_ip=%s", + result_code, type, first_seq, bitrate_str, need_nat_traversal, ntohs(server_port_be), fcc->session_id, + server_ip_str); + + if (need_nat_traversal == 1 && fcc->session_id != 0) { + /* NAT traversal supported - update server address and send NAT packet */ + fcc->need_nat_traversal = 1; + + /* Update unicast server IP and media port (keep fcc_server->sin_port as + * control port) */ + if (server_ip_be != 0) { + fcc->fcc_server->sin_addr.s_addr = server_ip_be; + fcc->verify_server_ip = true; + } + if (server_port_be != 0) { + fcc->media_port = server_port_be; + } + + /* Build and send NAT traversal packet (FMT 12) to punch hole in NAT */ + uint8_t *nat_pk = build_fcc_nat_pk_huawei(fcc->session_id); + + /* Send NAT packet to media port (for NAT hole punching on RTP port) */ + struct sockaddr_in media_addr; + memcpy(&media_addr, fcc->fcc_server, sizeof(media_addr)); + if (fcc->media_port != 0) { + media_addr.sin_port = fcc->media_port; + } + + int nat_sock = fcc->media_sock >= 0 ? fcc->media_sock : fcc->fcc_sock; + int r = sendto_triple(nat_sock, nat_pk, FCC_PK_LEN_NAT_HUAWEI, 0, (struct sockaddr_in *)&media_addr, + sizeof(media_addr)); + if (r < 0) { + logger(LOG_ERROR, "FCC (Huawei): Failed to send NAT packet: %s", strerror(errno)); + } + + logger(LOG_DEBUG, "FCC (Huawei): NAT traversal packet sent"); } - - logger(LOG_DEBUG, "FCC (Huawei): NAT traversal packet sent"); + } else { + logger(LOG_DEBUG, "FCC Response: FMT=6, result=%u, type=%u, first_seq=%u, bitrate=%s", result_code, type, + first_seq, bitrate_str); } /* Record start time and transition to waiting for unicast */ @@ -265,6 +298,18 @@ int fcc_huawei_handle_server_response(stream_context_t *ctx, uint8_t *buf, int b return 0; } + uint16_t server_port_be; + memcpy(&server_port_be, buf + 26, sizeof(server_port_be)); + uint32_t server_ip_be; + memcpy(&server_ip_be, buf + 32, sizeof(server_ip_be)); + + uint32_t redirect_ip = server_ip_be != 0 ? server_ip_be : fcc->fcc_server->sin_addr.s_addr; + uint16_t redirect_port = server_port_be != 0 ? server_port_be : fcc->fcc_server->sin_port; + if (fcc->redirect_count > 0 && redirect_ip == fcc->fcc_server->sin_addr.s_addr && + redirect_port == fcc->fcc_server->sin_port) { + return 0; + } + fcc->redirect_count++; if (fcc->redirect_count > FCC_MAX_REDIRECTS) { logger(LOG_WARN, "FCC (Huawei): Too many redirects (%d), falling back to multicast", fcc->redirect_count); @@ -273,10 +318,12 @@ int fcc_huawei_handle_server_response(stream_context_t *ctx, uint8_t *buf, int b return 0; } - uint16_t server_port_be; - memcpy(&server_port_be, buf + 26, sizeof(server_port_be)); - uint32_t server_ip_be; - memcpy(&server_ip_be, buf + 32, sizeof(server_ip_be)); + struct in_addr redirect_addr; + char redirect_ip_str[INET_ADDRSTRLEN]; + redirect_addr.s_addr = redirect_ip; + inet_ntop(AF_INET, &redirect_addr, redirect_ip_str, sizeof(redirect_ip_str)); + logger(LOG_DEBUG, "FCC Response: FMT=6, result=%u, type=%u, server_port=%u, server_ip=%s, redirect=%d", result_code, + type, ntohs(redirect_port), redirect_ip_str, fcc->redirect_count); if (server_ip_be != 0) { fcc->fcc_server->sin_addr.s_addr = server_ip_be; @@ -286,11 +333,10 @@ int fcc_huawei_handle_server_response(stream_context_t *ctx, uint8_t *buf, int b fcc->fcc_server->sin_port = server_port_be; } - logger(LOG_DEBUG, "FCC (Huawei): Server redirect to %s:%u (redirect #%d)", inet_ntoa(fcc->fcc_server->sin_addr), - ntohs(fcc->fcc_server->sin_port), fcc->redirect_count); fcc_session_set_state(fcc, FCC_STATE_INIT, "Server redirect"); return 1; } else { + logger(LOG_DEBUG, "FCC Response: FMT=6, result=%u, type=%u", result_code, type); logger(LOG_WARN, "FCC (Huawei): Unsupported type=%u, falling back to multicast", type); fcc_session_set_state(fcc, FCC_STATE_MCAST_ACTIVE, "Unsupported type"); mcast_session_join(&ctx->mcast, ctx); diff --git a/src/fcc_telecom.c b/src/fcc_telecom.c index 9a152e6..ef0a908 100644 --- a/src/fcc_telecom.c +++ b/src/fcc_telecom.c @@ -170,6 +170,13 @@ int fcc_telecom_handle_server_response(stream_context_t *ctx, uint8_t *buf, size /* Update server endpoints if provided */ int signal_port_changed = 0, media_port_changed = 0; + uint32_t redirect_ip = new_fcc_ip_be != 0 ? new_fcc_ip_be : fcc->fcc_server->sin_addr.s_addr; + uint16_t redirect_port = new_signal_port_be != 0 ? new_signal_port_be : fcc->fcc_server->sin_port; + + if (type == 3 && fcc->redirect_count > 0 && redirect_ip == fcc->fcc_server->sin_addr.s_addr && + redirect_port == fcc->fcc_server->sin_port) { + return 0; + } if (new_signal_port_be && new_signal_port_be != fcc->fcc_server->sin_port) { fcc->fcc_server->sin_port = new_signal_port_be; diff --git a/src/stream.c b/src/stream.c index 933fb68..b52aeef 100644 --- a/src/stream.c +++ b/src/stream.c @@ -75,7 +75,12 @@ int stream_process_rtp_payload(stream_context_t *ctx, buffer_ref_t *buf_ref) { int stream_handle_fd_event(stream_context_t *ctx, int fd, uint32_t events, int64_t now) { /* Process FCC socket events */ if (ctx->fcc.initialized && ctx->fcc.fcc_sock >= 0 && fd == ctx->fcc.fcc_sock) { - return fcc_handle_socket_event(ctx, now); + return fcc_handle_socket_event(ctx, fd, now); + } + + /* Process FCC media socket events */ + if (ctx->fcc.initialized && ctx->fcc.media_sock >= 0 && fd == ctx->fcc.media_sock) { + return fcc_handle_socket_event(ctx, fd, now); } /* Process multicast socket events */