From 1ad0bccb2baacecb6a9b4553b00d9ae83e40982f Mon Sep 17 00:00:00 2001 From: Exil Productions Date: Sat, 27 Dec 2025 00:00:58 +0100 Subject: [PATCH] fixed conn limits and server shutdown --- example/main.c | 96 +++++++++++++++++++++++++++++++++++++-------- src/rtmp_server.cpp | 40 ++++++++++++------- 2 files changed, 104 insertions(+), 32 deletions(-) diff --git a/example/main.c b/example/main.c index 36590ba..24f98d9 100644 --- a/example/main.c +++ b/example/main.c @@ -1,44 +1,106 @@ #include "../include/rtmp_capi.hpp" #include #include +#include +#include +#include +#include +#include +#include +// Callbacks static void on_connect_cb(const char* ip, void* data) { printf("Client connected: %s\n", ip); } -static void on_publish_cb(const char* ip, const char* app, const char* key, - void *data) +static void on_publish_cb(const char* ip, const char* app, const char* key, void* data) { printf("Publish from %s: %s/%s\n", ip, app, key); } -static void on_audio_cb(const char* app, const char* key, const uint8_t* data, - uint32_t len, uint32_t ts, void *ud) + +static struct termios g_orig_termios; + +static void restore_terminal(void) { - printf("Audio data for %s/%s, len=%u, ts=%u\n", app, key, len, ts); + tcsetattr(STDIN_FILENO, TCSANOW, &g_orig_termios); } -int main() +static int setup_nonblocking_stdin(void) +{ + struct termios raw; + int flags; + + if (tcgetattr(STDIN_FILENO, &g_orig_termios) != 0) + return 0; + + raw = g_orig_termios; + raw.c_lflag &= ~(ICANON | ECHO); + if (tcsetattr(STDIN_FILENO, TCSANOW, &raw) != 0) + return 0; + + flags = fcntl(STDIN_FILENO, F_GETFL, 0); + if (flags < 0) + return 0; + + if (fcntl(STDIN_FILENO, F_SETFL, flags | O_NONBLOCK) < 0) + return 0; + + atexit(restore_terminal); + return 1; +} + +int main(void) { rtmp_logger_set_level(RTMP_LOG_INFO); + + if (!setup_nonblocking_stdin()) { + printf("Failed to configure terminal input\n"); + return 1; + } + RtmpServerHandle server = rtmp_server_create(1935); + if (!server) { + printf("Failed to create server\n"); + return 1; + } + rtmp_server_set_on_connect(server, on_connect_cb, NULL); rtmp_server_set_on_publish(server, on_publish_cb, NULL); - rtmp_server_set_on_audio_data(server, on_audio_cb, NULL); rtmp_server_enable_gop_cache(server, true); + bool isRunning = false; - if (rtmp_server_start(server, &isRunning)) - { - while (isRunning) { - sleep(1); // Block main Thread + if (!rtmp_server_start(server, &isRunning)) { + printf("Failed to start server\n"); + rtmp_server_destroy(server); + return 1; + } + + printf("RTMP server running. Press 'q' to stop.\n"); + + while (isRunning) { + fd_set readfds; + struct timeval tv; + + FD_ZERO(&readfds); + FD_SET(STDIN_FILENO, &readfds); + + tv.tv_sec = 1; + tv.tv_usec = 0; + + int ret = select(STDIN_FILENO + 1, &readfds, NULL, NULL, &tv); + if (ret > 0 && FD_ISSET(STDIN_FILENO, &readfds)) { + char ch; + ssize_t n = read(STDIN_FILENO, &ch, 1); + if (n == 1 && (ch == 'q' || ch == 'Q')) { + printf("Shutting down...\n"); + rtmp_server_stop(server); + break; + } } } - else - { - printf("Failed to start server\n"); - } - rtmp_server_stop(server); + rtmp_server_destroy(server); return 0; -} \ No newline at end of file +} diff --git a/src/rtmp_server.cpp b/src/rtmp_server.cpp index 9a95e50..6a5fd9a 100644 --- a/src/rtmp_server.cpp +++ b/src/rtmp_server.cpp @@ -1007,7 +1007,7 @@ RTMPServer::RTMPServer(int port) : port(port), server_fd(-1), running(false) {} RTMPServer::~RTMPServer() { stop(); } -bool RTMPServer::start(bool& isRunning) { +bool RTMPServer::start(bool &isRunning) { server_fd = socket(AF_INET, SOCK_STREAM, 0); if (server_fd < 0) { LOG_ERROR("Failed to create socket"); @@ -1049,26 +1049,36 @@ void RTMPServer::stop() { return; running = false; if (server_fd >= 0) { + shutdown(server_fd, SHUT_RDWR); close(server_fd); server_fd = -1; } - if (accept_thread.joinable()) { - accept_thread.join(); - } - if (ping_thread.joinable()) { - ping_thread.join(); - } - if (timeout_thread.joinable()) { - timeout_thread.join(); - } - for (auto &thread : client_threads) { - if (thread.joinable()) { - thread.join(); + + { + std::lock_guard lock(sessions_mutex); + for (auto &session : sessions) { + shutdown(session->getFd(), SHUT_RDWR); + close(session->getFd()); } } + + if (accept_thread.joinable()) + accept_thread.join(); + if (ping_thread.joinable()) + ping_thread.join(); + if (timeout_thread.joinable()) + timeout_thread.join(); + + for (auto &thread : client_threads) { + if (thread.joinable()) + thread.join(); + } client_threads.clear(); + + // Clear sessions std::lock_guard lock(sessions_mutex); sessions.clear(); + LOG_INFO("RTMP Server stopped"); } @@ -1437,10 +1447,10 @@ bool RTMPServer::checkConnectionLimits(const std::string &app, bool is_publisher) const { if (is_publisher) { int current = countPublishers(app, stream_key); - return current >= max_publishers_per_stream; + return current >= max_publishers_per_stream; // FIXED: Used propper Op } else { int current = countPlayers(app, stream_key); - return current >= max_players_per_stream; + return current <= max_players_per_stream; // FIXED: User propper Op } }