From 3cc76b4d9978d6fa5bbdccde2a32ecefb4e82089 Mon Sep 17 00:00:00 2001 From: Exil Productions Date: Fri, 26 Dec 2025 22:52:08 +0100 Subject: [PATCH] fix wrong path --- build.sh | 0 example/main.c | 1 - include/rtmp_capi.h | 109 +- include/rtmp_server.h | 872 +++++------- src/rtmp_capi.cpp | 581 ++++---- src/rtmp_server.cpp | 3156 +++++++++++++++++++---------------------- 6 files changed, 2206 insertions(+), 2513 deletions(-) mode change 100644 => 100755 build.sh diff --git a/build.sh b/build.sh old mode 100644 new mode 100755 diff --git a/example/main.c b/example/main.c index 66dcd76..e22be9d 100644 --- a/example/main.c +++ b/example/main.c @@ -1,6 +1,5 @@ #include "../include/rtmp_capi.h" #include -#include #include static void on_connect_cb(const char* ip, void* data) diff --git a/include/rtmp_capi.h b/include/rtmp_capi.h index 7bebb51..ce7635a 100644 --- a/include/rtmp_capi.h +++ b/include/rtmp_capi.h @@ -1,8 +1,8 @@ #ifndef RTMP_CAPI_H #define RTMP_CAPI_H -#include #include +#include #ifdef __cplusplus extern "C" { @@ -10,38 +10,40 @@ extern "C" { typedef void *RtmpServerHandle; -enum RtmpLogLevel -{ - RTMP_LOG_ERROR = 0, - RTMP_LOG_WARN = 1, - RTMP_LOG_INFO = 2, - RTMP_LOG_DEBUG = 3 +enum RtmpLogLevel { + RTMP_LOG_ERROR = 0, + RTMP_LOG_WARN = 1, + RTMP_LOG_INFO = 2, + RTMP_LOG_DEBUG = 3 }; -struct RtmpStreamStats -{ - uint64_t bytes_sent; - uint64_t bytes_received; - uint32_t video_frames; - uint32_t audio_frames; - uint32_t dropped_frames; - double bitrate_kbps; - double uptime_seconds; +struct RtmpStreamStats { + uint64_t bytes_sent; + uint64_t bytes_received; + uint32_t video_frames; + uint32_t audio_frames; + uint32_t dropped_frames; + double bitrate_kbps; + double uptime_seconds; }; -typedef void (*RtmpOnConnectCallback)(const char* client_ip, void* user_data); -typedef void (*RtmpOnPublishCallback)(const char* client_ip, const char* app, - const char* stream_key, void* user_data); -typedef void (*RtmpOnPlayCallback)(const char* client_ip, const char* app, - const char* stream_key, void* user_data); -typedef void (*RtmpOnAudioDataCallback)(const char* app, const char* stream_key, - const uint8_t* data, uint32_t length, uint32_t timestamp, void* user_data); -typedef void (*RtmpOnVideoDataCallback)(const char* app, const char* stream_key, - const uint8_t* data, uint32_t length, uint32_t timestamp, void* user_data); -typedef void (*RtmpOnDisconnectCallback)(const char* client_ip, const char* app, - const char* stream_key, bool was_publishing, bool was_playing, void* user_data); -typedef bool (*RtmpAuthCallback)(const char* app, const char* stream_key, - const char* client_ip, void* user_data); +typedef void (*RtmpOnConnectCallback)(const char *client_ip, void *user_data); +typedef void (*RtmpOnPublishCallback)(const char *client_ip, const char *app, + const char *stream_key, void *user_data); +typedef void (*RtmpOnPlayCallback)(const char *client_ip, const char *app, + const char *stream_key, void *user_data); +typedef void (*RtmpOnAudioDataCallback)(const char *app, const char *stream_key, + const uint8_t *data, uint32_t length, + uint32_t timestamp, void *user_data); +typedef void (*RtmpOnVideoDataCallback)(const char *app, const char *stream_key, + const uint8_t *data, uint32_t length, + uint32_t timestamp, void *user_data); +typedef void (*RtmpOnDisconnectCallback)(const char *client_ip, const char *app, + const char *stream_key, + bool was_publishing, bool was_playing, + void *user_data); +typedef bool (*RtmpAuthCallback)(const char *app, const char *stream_key, + const char *client_ip, void *user_data); // Create and destroy RtmpServerHandle rtmp_server_create(int port); @@ -52,24 +54,25 @@ bool rtmp_server_is_running(RtmpServerHandle handle); // Callbacks void rtmp_server_set_on_connect(RtmpServerHandle handle, - RtmpOnConnectCallback cb, void* user_data); + RtmpOnConnectCallback cb, void *user_data); void rtmp_server_set_on_publish(RtmpServerHandle handle, - RtmpOnPublishCallback cb, void* user_data); + RtmpOnPublishCallback cb, void *user_data); void rtmp_server_set_on_play(RtmpServerHandle handle, RtmpOnPlayCallback cb, - void* user_data); + void *user_data); void rtmp_server_set_on_audio_data(RtmpServerHandle handle, - RtmpOnAudioDataCallback cb, void* user_data); + RtmpOnAudioDataCallback cb, void *user_data); void rtmp_server_set_on_video_data(RtmpServerHandle handle, - RtmpOnVideoDataCallback cb, void* user_data); + RtmpOnVideoDataCallback cb, void *user_data); void rtmp_server_set_on_disconnect(RtmpServerHandle handle, - RtmpOnDisconnectCallback cb, void* user_data); + RtmpOnDisconnectCallback cb, + void *user_data); void rtmp_server_set_auth_callback(RtmpServerHandle handle, RtmpAuthCallback cb, - void* user_data); + void *user_data); // Configuration void rtmp_server_enable_gop_cache(RtmpServerHandle handle, bool enable); void rtmp_server_set_max_publishers_per_stream(RtmpServerHandle handle, - int max); + int max); void rtmp_server_set_max_players_per_stream(RtmpServerHandle handle, int max); void rtmp_server_set_max_total_connections(RtmpServerHandle handle, int max); void rtmp_server_set_connection_timeout(RtmpServerHandle handle, int seconds); @@ -81,26 +84,28 @@ int rtmp_server_get_active_publishers(RtmpServerHandle handle); int rtmp_server_get_active_players(RtmpServerHandle handle); int rtmp_server_get_total_connections(RtmpServerHandle handle); struct RtmpStreamStats rtmp_server_get_stream_stats(RtmpServerHandle handle, - const char* app, const char* stream_key); + const char *app, + const char *stream_key); // Recording -bool rtmp_server_start_recording(RtmpServerHandle handle, const char* app, - const char* stream_key, const char* filename); -void rtmp_server_stop_recording(RtmpServerHandle handle, const char* app, - const char* stream_key); -bool rtmp_server_is_recording(RtmpServerHandle handle, const char* app, - const char* stream_key); +bool rtmp_server_start_recording(RtmpServerHandle handle, const char *app, + const char *stream_key, const char *filename); +void rtmp_server_stop_recording(RtmpServerHandle handle, const char *app, + const char *stream_key); +bool rtmp_server_is_recording(RtmpServerHandle handle, const char *app, + const char *stream_key); // Broadcasting -bool rtmp_server_broadcast_audio(RtmpServerHandle handle, const char* app, - const char* stream_key, const uint8_t* data, uint32_t length, - uint32_t timestamp); -bool rtmp_server_broadcast_video(RtmpServerHandle handle, const char* app, - const char* stream_key, const uint8_t* data, uint32_t length, - uint32_t timestamp); +bool rtmp_server_broadcast_audio(RtmpServerHandle handle, const char *app, + const char *stream_key, const uint8_t *data, + uint32_t length, uint32_t timestamp); +bool rtmp_server_broadcast_video(RtmpServerHandle handle, const char *app, + const char *stream_key, const uint8_t *data, + uint32_t length, uint32_t timestamp); // FIXED: Added missing declaration -bool rtmp_server_broadcast_metadata(RtmpServerHandle handle, const char* app, - const char* stream_key, const uint8_t* data, uint32_t length); +bool rtmp_server_broadcast_metadata(RtmpServerHandle handle, const char *app, + const char *stream_key, const uint8_t *data, + uint32_t length); // Logger void rtmp_logger_set_level(enum RtmpLogLevel level); diff --git a/include/rtmp_server.h b/include/rtmp_server.h index fe09c97..21809ba 100644 --- a/include/rtmp_server.h +++ b/include/rtmp_server.h @@ -1,586 +1,468 @@ #ifndef RTMP_SERVER_H #define RTMP_SERVER_H -#include -#include +#include +#include +#include +#include +#include +#include +#include +#include #include #include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include #include +#include #include -#include -#include +#include +#include +#include +#include +#include -namespace rtmp -{ +namespace rtmp { // RTMP Message Types - enum class MessageType : uint8_t - { - SET_CHUNK_SIZE = 1, - ABORT_MESSAGE = 2, - ACKNOWLEDGEMENT = 3, - USER_CONTROL = 4, - WINDOW_ACK_SIZE = 5, - SET_PEER_BANDWIDTH = 6, - AUDIO = 8, - VIDEO = 9, - DATA_AMF3 = 15, - SHARED_OBJECT_AMF3 = 16, - COMMAND_AMF3 = 17, - DATA_AMF0 = 18, - SHARED_OBJECT_AMF0 = 19, - COMMAND_AMF0 = 20, - AGGREGATE = 22 - }; +enum class MessageType : uint8_t { + SET_CHUNK_SIZE = 1, + ABORT_MESSAGE = 2, + ACKNOWLEDGEMENT = 3, + USER_CONTROL = 4, + WINDOW_ACK_SIZE = 5, + SET_PEER_BANDWIDTH = 6, + AUDIO = 8, + VIDEO = 9, + DATA_AMF3 = 15, + SHARED_OBJECT_AMF3 = 16, + COMMAND_AMF3 = 17, + DATA_AMF0 = 18, + SHARED_OBJECT_AMF0 = 19, + COMMAND_AMF0 = 20, + AGGREGATE = 22 +}; // User Control Message Types - enum class UserControlType : uint16_t - { - STREAM_BEGIN = 0, - STREAM_EOF = 1, - STREAM_DRY = 2, - SET_BUFFER_LENGTH = 3, - STREAM_IS_RECORDED = 4, - PING_REQUEST = 6, - PING_RESPONSE = 7 - }; +enum class UserControlType : uint16_t { + STREAM_BEGIN = 0, + STREAM_EOF = 1, + STREAM_DRY = 2, + SET_BUFFER_LENGTH = 3, + STREAM_IS_RECORDED = 4, + PING_REQUEST = 6, + PING_RESPONSE = 7 +}; // AMF0 Data Types - enum class AMF0Type : uint8_t - { - NUMBER = 0x00, - BOOLEAN = 0x01, - STRING = 0x02, - OBJECT = 0x03, - NULL_TYPE = 0x05, - UNDEFINED = 0x06, - ECMA_ARRAY = 0x08, - OBJECT_END = 0x09 - }; +enum class AMF0Type : uint8_t { + NUMBER = 0x00, + BOOLEAN = 0x01, + STRING = 0x02, + OBJECT = 0x03, + NULL_TYPE = 0x05, + UNDEFINED = 0x06, + ECMA_ARRAY = 0x08, + OBJECT_END = 0x09 +}; // Log Levels - enum class LogLevel - { - ERROR = 0, - WARN = 1, - INFO = 2, - DEBUG = 3 - }; +enum class LogLevel { ERROR = 0, WARN = 1, INFO = 2, DEBUG = 3 }; // AMF0 Value - class AMF0Value - { - public: - AMF0Type type; - double number; - bool boolean; - std::string string; - std::map> object; +class AMF0Value { +public: + AMF0Type type; + double number; + bool boolean; + std::string string; + std::map> object; - AMF0Value() : type(AMF0Type::NULL_TYPE), number(0), boolean(false) {} - }; + AMF0Value() : type(AMF0Type::NULL_TYPE), number(0), boolean(false) {} +}; // RTMP Chunk Header - struct ChunkHeader - { - uint8_t fmt; - uint32_t csid; - uint32_t timestamp; - uint32_t msg_length; - uint8_t msg_type_id; - uint32_t msg_stream_id; - bool has_extended_timestamp; - }; +struct ChunkHeader { + uint8_t fmt; + uint32_t csid; + uint32_t timestamp; + uint32_t msg_length; + uint8_t msg_type_id; + uint32_t msg_stream_id; + bool has_extended_timestamp; +}; // RTMP Message - struct RTMPMessage - { - ChunkHeader header; - std::vector payload; - }; +struct RTMPMessage { + ChunkHeader header; + std::vector payload; +}; // Stream Information - struct StreamInfo - { - std::string app; - std::string stream_key; - bool is_publishing; - bool is_playing; - int client_fd; - uint32_t stream_id; - std::string client_ip; - }; +struct StreamInfo { + std::string app; + std::string stream_key; + bool is_publishing; + bool is_playing; + int client_fd; + uint32_t stream_id; + std::string client_ip; +}; // Stream Statistics - struct StreamStatistics - { - uint64_t bytes_sent = 0; - uint64_t bytes_received = 0; - uint32_t video_frames = 0; - uint32_t audio_frames = 0; - uint32_t dropped_frames = 0; - std::chrono::steady_clock::time_point start_time; +struct StreamStatistics { + uint64_t bytes_sent = 0; + uint64_t bytes_received = 0; + uint32_t video_frames = 0; + uint32_t audio_frames = 0; + uint32_t dropped_frames = 0; + std::chrono::steady_clock::time_point start_time; - StreamStatistics() : start_time(std::chrono::steady_clock::now()) {} + StreamStatistics() : start_time(std::chrono::steady_clock::now()) {} - double getBitrate() const - { - auto now = std::chrono::steady_clock::now(); - auto duration = std::chrono::duration_cast( - now - start_time).count(); - if (duration == 0) return 0; - return (bytes_sent * 8.0) / duration / 1000.0; // kbps - } + double getBitrate() const { + auto now = std::chrono::steady_clock::now(); + auto duration = + std::chrono::duration_cast(now - start_time) + .count(); + if (duration == 0) + return 0; + return (bytes_sent * 8.0) / duration / 1000.0; // kbps + } - double getUptime() const - { - auto now = std::chrono::steady_clock::now(); - return std::chrono::duration_cast( - now - start_time).count(); - } - }; + double getUptime() const { + auto now = std::chrono::steady_clock::now(); + return std::chrono::duration_cast(now - start_time) + .count(); + } +}; // GOP Cache for instant playback - class GOPCache - { - public: - void addVideoFrame(const std::vector &data, uint32_t timestamp); - void addAudioFrame(const std::vector &data, uint32_t timestamp); - void addMetadata(const std::vector &data); - void sendToPlayer(class RTMPSession* session); - void clear(); - bool hasKeyframe() const - { - return has_keyframe; - } +class GOPCache { +public: + void addVideoFrame(const std::vector &data, uint32_t timestamp); + void addAudioFrame(const std::vector &data, uint32_t timestamp); + void addMetadata(const std::vector &data); + void sendToPlayer(class RTMPSession *session); + void clear(); + bool hasKeyframe() const { return has_keyframe; } - private: - struct CachedFrame - { - MessageType type; - std::vector data; - uint32_t timestamp; - }; +private: + struct CachedFrame { + MessageType type; + std::vector data; + uint32_t timestamp; + }; - std::vector frames; - std::vector metadata; - bool has_keyframe = false; - std::mutex cache_mutex; + std::vector frames; + std::vector metadata; + bool has_keyframe = false; + std::mutex cache_mutex; - bool isKeyframe(const std::vector &data); - }; + bool isKeyframe(const std::vector &data); +}; // FLV File Recorder - class FLVRecorder - { - public: - FLVRecorder(const std::string& filename); - ~FLVRecorder(); +class FLVRecorder { +public: + FLVRecorder(const std::string &filename); + ~FLVRecorder(); - bool start(); - void stop(); - bool isRecording() const - { - return recording; - } + bool start(); + void stop(); + bool isRecording() const { return recording; } - void writeAudioFrame(const std::vector &data, uint32_t timestamp); - void writeVideoFrame(const std::vector &data, uint32_t timestamp); - void writeMetadata(const std::map> - &metadata); + void writeAudioFrame(const std::vector &data, uint32_t timestamp); + void writeVideoFrame(const std::vector &data, uint32_t timestamp); + void writeMetadata( + const std::map> &metadata); - private: - std::string filename; - std::ofstream file; - bool recording = false; - uint32_t last_timestamp = 0; - std::mutex file_mutex; +private: + std::string filename; + std::ofstream file; + bool recording = false; + uint32_t last_timestamp = 0; + std::mutex file_mutex; - void writeFLVHeader(); - void writeFLVTag(uint8_t tag_type, const std::vector &data, - uint32_t timestamp); - std::vector encodeMetadata(const - std::map> &metadata); - }; + void writeFLVHeader(); + void writeFLVTag(uint8_t tag_type, const std::vector &data, + uint32_t timestamp); + std::vector encodeMetadata( + const std::map> &metadata); +}; // RTMP Client Session - class RTMPSession - { - public: - RTMPSession(int fd, const std::string& client_ip); - ~RTMPSession(); +class RTMPSession { +public: + RTMPSession(int fd, const std::string &client_ip); + ~RTMPSession(); - bool handshake(); - bool receiveChunk(); - bool sendMessage(const RTMPMessage& msg); - bool sendChunk(uint32_t csid, uint32_t timestamp, uint8_t msg_type, - uint32_t stream_id, const std::vector &data); + bool handshake(); + bool receiveChunk(); + bool sendMessage(const RTMPMessage &msg); + bool sendChunk(uint32_t csid, uint32_t timestamp, uint8_t msg_type, + uint32_t stream_id, const std::vector &data); - int getFd() const - { - return client_fd; - } - const StreamInfo &getStreamInfo() const - { - return stream_info; - } - StreamInfo &getStreamInfo() - { - return stream_info; - } + int getFd() const { return client_fd; } + const StreamInfo &getStreamInfo() const { return stream_info; } + StreamInfo &getStreamInfo() { return stream_info; } - void setChunkSize(uint32_t size) - { - chunk_size = size; - } - uint32_t getChunkSize() const - { - return chunk_size; - } + void setChunkSize(uint32_t size) { chunk_size = size; } + uint32_t getChunkSize() const { return chunk_size; } - // Acknowledgement handling - void onBytesReceived(size_t bytes); - bool shouldSendAck() const; - void sendAcknowledgement(); + // Acknowledgement handling + void onBytesReceived(size_t bytes); + bool shouldSendAck() const; + void sendAcknowledgement(); - // Ping/Pong - void sendPing(uint32_t timestamp); - void sendPong(uint32_t timestamp); + // Ping/Pong + void sendPing(uint32_t timestamp); + void sendPong(uint32_t timestamp); - // Statistics - StreamStatistics &getStats() - { - return stats; - } - const StreamStatistics &getStats() const - { - return stats; - } + // Statistics + StreamStatistics &getStats() { return stats; } + const StreamStatistics &getStats() const { return stats; } - // Message queue access for server - std::queue &getMessageQueue() - { - return message_queue; - } - std::mutex &getQueueMutex() - { - return queue_mutex; - } + // Message queue access for server + std::queue &getMessageQueue() { return message_queue; } + std::mutex &getQueueMutex() { return queue_mutex; } - // Last activity tracking - std::chrono::steady_clock::time_point getLastActivity() const - { - return last_activity; - } - void updateActivity() - { - last_activity = std::chrono::steady_clock::now(); - } + // Last activity tracking + std::chrono::steady_clock::time_point getLastActivity() const { + return last_activity; + } + void updateActivity() { last_activity = std::chrono::steady_clock::now(); } - // AMF0 public access for server - std::shared_ptr decodeAMF0(const uint8_t* data, size_t len, - size_t &offset); - std::vector encodeAMF0(const AMF0Value& value); + // AMF0 public access for server + std::shared_ptr decodeAMF0(const uint8_t *data, size_t len, + size_t &offset); + std::vector encodeAMF0(const AMF0Value &value); - // FIXED: Moved to public section for RTMPServer access - bool sendErrorResponse(const std::string& command, double transaction_id, - const std::string& description); + // FIXED: Moved to public section for RTMPServer access + bool sendErrorResponse(const std::string &command, double transaction_id, + const std::string &description); - private: - int client_fd; - uint32_t chunk_size; - uint32_t window_ack_size; - uint32_t peer_bandwidth; - uint32_t bytes_received; - uint32_t last_ack_sent; - std::map prev_headers; - std::map> incomplete_chunks; - StreamInfo stream_info; - std::queue message_queue; - std::mutex queue_mutex; - StreamStatistics stats; - std::chrono::steady_clock::time_point last_activity; +private: + int client_fd; + uint32_t chunk_size; + uint32_t window_ack_size; + uint32_t peer_bandwidth; + uint32_t bytes_received; + uint32_t last_ack_sent; + std::map prev_headers; + std::map> incomplete_chunks; + StreamInfo stream_info; + std::queue message_queue; + std::mutex queue_mutex; + StreamStatistics stats; + std::chrono::steady_clock::time_point last_activity; - bool readExactly(uint8_t* buf, size_t len); - bool writeExactly(const uint8_t* buf, size_t len); - bool parseChunkHeader(ChunkHeader& header); - bool processMessage(const RTMPMessage& msg); - bool handleCommand(const RTMPMessage& msg); - bool handleAudioMessage(const RTMPMessage& msg); - bool handleVideoMessage(const RTMPMessage& msg); - bool handleDataMessage(const RTMPMessage& msg); - bool handleUserControl(const RTMPMessage& msg); - bool handleAcknowledgement(const RTMPMessage& msg); + bool readExactly(uint8_t *buf, size_t len); + bool writeExactly(const uint8_t *buf, size_t len); + bool parseChunkHeader(ChunkHeader &header); + bool processMessage(const RTMPMessage &msg); + bool handleCommand(const RTMPMessage &msg); + bool handleAudioMessage(const RTMPMessage &msg); + bool handleVideoMessage(const RTMPMessage &msg); + bool handleDataMessage(const RTMPMessage &msg); + bool handleUserControl(const RTMPMessage &msg); + bool handleAcknowledgement(const RTMPMessage &msg); - // AMF0 Encoding/Decoding - std::vector encodeAMF0String(const std::string& str); - std::vector encodeAMF0Number(double num); - std::vector encodeAMF0Object(const - std::map> &obj); + // AMF0 Encoding/Decoding + std::vector encodeAMF0String(const std::string &str); + std::vector encodeAMF0Number(double num); + std::vector encodeAMF0Object( + const std::map> &obj); - // Command handlers - bool handleConnect(const std::vector> &args); - bool handleReleaseStream(const std::vector> &args); - bool handleFCPublish(const std::vector> &args); - bool handleCreateStream(const std::vector> &args); - bool handlePublish(const std::vector> &args); - bool handlePlay(const std::vector> &args); - bool handleDeleteStream(const std::vector> &args); + // Command handlers + bool handleConnect(const std::vector> &args); + bool handleReleaseStream(const std::vector> &args); + bool handleFCPublish(const std::vector> &args); + bool handleCreateStream(const std::vector> &args); + bool handlePublish(const std::vector> &args); + bool handlePlay(const std::vector> &args); + bool handleDeleteStream(const std::vector> &args); - // Response helpers - bool sendConnectResponse(double transaction_id); - bool sendCreateStreamResponse(double transaction_id, double stream_id); - bool sendPublishResponse(); - bool sendPlayResponse(); - }; + // Response helpers + bool sendConnectResponse(double transaction_id); + bool sendCreateStreamResponse(double transaction_id, double stream_id); + bool sendPublishResponse(); + bool sendPlayResponse(); +}; // Callback types - using OnConnectCallback = std::function)>; - using OnPublishCallback = - std::function, const std::string& app, const std::string& stream_key)>; - using OnPlayCallback = - std::function, const std::string& app, const std::string& stream_key)>; - using OnAudioDataCallback = - std::function, const std::vector& data, uint32_t timestamp)>; - using OnVideoDataCallback = - std::function, const std::vector& data, uint32_t timestamp)>; - using OnMetaDataCallback = - std::function, const std::map>& metadata)>; - using OnDisconnectCallback = std::function)>; - using AuthCallback = - std::function; +using OnConnectCallback = std::function)>; +using OnPublishCallback = + std::function, const std::string &app, + const std::string &stream_key)>; +using OnPlayCallback = + std::function, const std::string &app, + const std::string &stream_key)>; +using OnAudioDataCallback = + std::function, + const std::vector &data, uint32_t timestamp)>; +using OnVideoDataCallback = + std::function, + const std::vector &data, uint32_t timestamp)>; +using OnMetaDataCallback = std::function, + const std::map> &metadata)>; +using OnDisconnectCallback = std::function)>; +using AuthCallback = + std::function; // Logger - class Logger - { - public: - static Logger &getInstance() - { - static Logger instance; - return instance; - } +class Logger { +public: + static Logger &getInstance() { + static Logger instance; + return instance; + } - void setLevel(LogLevel level) - { - current_level = level; - } - LogLevel getLevel() const - { - return current_level; - } + void setLevel(LogLevel level) { current_level = level; } + LogLevel getLevel() const { return current_level; } - void error(const std::string& msg); - void warn(const std::string& msg); - void info(const std::string& msg); - void debug(const std::string& msg); + void error(const std::string &msg); + void warn(const std::string &msg); + void info(const std::string &msg); + void debug(const std::string &msg); - private: - Logger() : current_level(LogLevel::INFO) {} - LogLevel current_level; - std::mutex log_mutex; +private: + Logger() : current_level(LogLevel::INFO) {} + LogLevel current_level; + std::mutex log_mutex; - void log(LogLevel level, const std::string& msg); - }; + void log(LogLevel level, const std::string &msg); +}; // RTMP Server - class RTMPServer - { - public: - RTMPServer(int port = 1935); - ~RTMPServer(); +class RTMPServer { +public: + RTMPServer(int port = 1935); + ~RTMPServer(); - bool start(); - void stop(); - bool isRunning() const - { - return running; - } + bool start(); + void stop(); + bool isRunning() const { return running; } - // Callbacks - void setOnConnect(OnConnectCallback cb) - { - on_connect = cb; - } - void setOnPublish(OnPublishCallback cb) - { - on_publish = cb; - } - void setOnPlay(OnPlayCallback cb) - { - on_play = cb; - } - void setOnAudioData(OnAudioDataCallback cb) - { - on_audio_data = cb; - } - void setOnVideoData(OnVideoDataCallback cb) - { - on_video_data = cb; - } - void setOnMetaData(OnMetaDataCallback cb) - { - on_metadata = cb; - } - void setOnDisconnect(OnDisconnectCallback cb) - { - on_disconnect = cb; - } - void setAuthCallback(AuthCallback cb) - { - auth_callback = cb; - } + // Callbacks + void setOnConnect(OnConnectCallback cb) { on_connect = cb; } + void setOnPublish(OnPublishCallback cb) { on_publish = cb; } + void setOnPlay(OnPlayCallback cb) { on_play = cb; } + void setOnAudioData(OnAudioDataCallback cb) { on_audio_data = cb; } + void setOnVideoData(OnVideoDataCallback cb) { on_video_data = cb; } + void setOnMetaData(OnMetaDataCallback cb) { on_metadata = cb; } + void setOnDisconnect(OnDisconnectCallback cb) { on_disconnect = cb; } + void setAuthCallback(AuthCallback cb) { auth_callback = cb; } - // GOP Cache - void enableGOPCache(bool enable) - { - use_gop_cache = enable; - } - bool isGOPCacheEnabled() const - { - return use_gop_cache; - } + // GOP Cache + void enableGOPCache(bool enable) { use_gop_cache = enable; } + bool isGOPCacheEnabled() const { return use_gop_cache; } - // Recording - bool startRecording(const std::string& app, const std::string& stream_key, - const std::string& filename); - void stopRecording(const std::string& app, const std::string& stream_key); - bool isRecording(const std::string& app, const std::string& stream_key) const; + // Recording + bool startRecording(const std::string &app, const std::string &stream_key, + const std::string &filename); + void stopRecording(const std::string &app, const std::string &stream_key); + bool isRecording(const std::string &app, const std::string &stream_key) const; - // Statistics - StreamStatistics getStreamStats(const std::string& app, - const std::string& stream_key) const; - std::vector> getAllStreamStats() const; - int getActivePublishers() const; - int getActivePlayers() const; - int getTotalConnections() const; + // Statistics + StreamStatistics getStreamStats(const std::string &app, + const std::string &stream_key) const; + std::vector> + getAllStreamStats() const; + int getActivePublishers() const; + int getActivePlayers() const; + int getTotalConnections() const; - // Connection limits - void setMaxPublishersPerStream(int max) - { - max_publishers_per_stream = max; - } - void setMaxPlayersPerStream(int max) - { - max_players_per_stream = max; - } - void setMaxTotalConnections(int max) - { - max_total_connections = max; - } - int getMaxPublishersPerStream() const - { - return max_publishers_per_stream; - } - int getMaxPlayersPerStream() const - { - return max_players_per_stream; - } - int getMaxTotalConnections() const - { - return max_total_connections; - } + // Connection limits + void setMaxPublishersPerStream(int max) { max_publishers_per_stream = max; } + void setMaxPlayersPerStream(int max) { max_players_per_stream = max; } + void setMaxTotalConnections(int max) { max_total_connections = max; } + int getMaxPublishersPerStream() const { return max_publishers_per_stream; } + int getMaxPlayersPerStream() const { return max_players_per_stream; } + int getMaxTotalConnections() const { return max_total_connections; } - // Ping/Pong - void enablePingPong(bool enable, int interval_seconds = 30); - bool isPingPongEnabled() const - { - return ping_enabled; - } + // Ping/Pong + void enablePingPong(bool enable, int interval_seconds = 30); + bool isPingPongEnabled() const { return ping_enabled; } - // Timeout handling - void setConnectionTimeout(int seconds) - { - connection_timeout = seconds; - } - int getConnectionTimeout() const - { - return connection_timeout; - } + // Timeout handling + void setConnectionTimeout(int seconds) { connection_timeout = seconds; } + int getConnectionTimeout() const { return connection_timeout; } - // Broadcasting - bool sendAudioToPlayers(const std::string& app, const std::string& stream_key, - const std::vector &data, uint32_t timestamp); - bool sendVideoToPlayers(const std::string& app, const std::string& stream_key, - const std::vector &data, uint32_t timestamp); - void sendMetadataToPlayers(const std::string& app, - const std::string& stream_key, - const std::vector &data); + // Broadcasting + bool sendAudioToPlayers(const std::string &app, const std::string &stream_key, + const std::vector &data, uint32_t timestamp); + bool sendVideoToPlayers(const std::string &app, const std::string &stream_key, + const std::vector &data, uint32_t timestamp); + void sendMetadataToPlayers(const std::string &app, + const std::string &stream_key, + const std::vector &data); - private: - int port; - int server_fd; - bool running; - std::thread accept_thread; - std::thread ping_thread; - std::thread timeout_thread; - std::vector client_threads; - std::vector> sessions; - mutable std::mutex sessions_mutex; // FIXED: Added mutable +private: + int port; + int server_fd; + bool running; + std::thread accept_thread; + std::thread ping_thread; + std::thread timeout_thread; + std::vector client_threads; + std::vector> sessions; + mutable std::mutex sessions_mutex; // FIXED: Added mutable - // Callbacks - OnConnectCallback on_connect; - OnPublishCallback on_publish; - OnPlayCallback on_play; - OnAudioDataCallback on_audio_data; - OnVideoDataCallback on_video_data; - OnMetaDataCallback on_metadata; - OnDisconnectCallback on_disconnect; - AuthCallback auth_callback; + // Callbacks + OnConnectCallback on_connect; + OnPublishCallback on_publish; + OnPlayCallback on_play; + OnAudioDataCallback on_audio_data; + OnVideoDataCallback on_video_data; + OnMetaDataCallback on_metadata; + OnDisconnectCallback on_disconnect; + AuthCallback auth_callback; - // GOP Caches - bool use_gop_cache = true; - std::map> gop_caches; - std::mutex gop_mutex; + // GOP Caches + bool use_gop_cache = true; + std::map> gop_caches; + std::mutex gop_mutex; - // Recorders - std::map> recorders; - mutable std::mutex recorder_mutex; // FIXED: Added mutable + // Recorders + std::map> recorders; + mutable std::mutex recorder_mutex; // FIXED: Added mutable - // Statistics - std::map stream_stats; - mutable std::mutex stats_mutex; + // Statistics + std::map stream_stats; + mutable std::mutex stats_mutex; - // Connection limits - int max_publishers_per_stream = 1; - int max_players_per_stream = 1000; - int max_total_connections = 1000; + // Connection limits + int max_publishers_per_stream = 1; + int max_players_per_stream = 1000; + int max_total_connections = 1000; - // Ping/Pong - bool ping_enabled = false; - int ping_interval = 30; + // Ping/Pong + bool ping_enabled = false; + int ping_interval = 30; - // Timeout - int connection_timeout = 60; + // Timeout + int connection_timeout = 60; - void acceptClients(); - void handleClient(std::shared_ptr session); - void removeSession(std::shared_ptr session); - void processMediaMessages(std::shared_ptr session); - void pingClientsRoutine(); - void timeoutCheckRoutine(); + void acceptClients(); + void handleClient(std::shared_ptr session); + void removeSession(std::shared_ptr session); + void processMediaMessages(std::shared_ptr session); + void pingClientsRoutine(); + void timeoutCheckRoutine(); - std::string makeStreamKey(const std::string& app, - const std::string& stream) const - { - return app + "/" + stream; - } + std::string makeStreamKey(const std::string &app, + const std::string &stream) const { + return app + "/" + stream; + } - int countPublishers(const std::string& app, - const std::string& stream_key) const; - int countPlayers(const std::string& app, const std::string& stream_key) const; - bool checkConnectionLimits(const std::string& app, - const std::string& stream_key, bool is_publisher) const; - }; + int countPublishers(const std::string &app, + const std::string &stream_key) const; + int countPlayers(const std::string &app, const std::string &stream_key) const; + bool checkConnectionLimits(const std::string &app, + const std::string &stream_key, + bool is_publisher) const; +}; // Utility macros #define LOG_ERROR(msg) rtmp::Logger::getInstance().error(msg) diff --git a/src/rtmp_capi.cpp b/src/rtmp_capi.cpp index 1c6385d..a749bd9 100644 --- a/src/rtmp_capi.cpp +++ b/src/rtmp_capi.cpp @@ -1,294 +1,303 @@ -#include "rtmp_capi.h" -#include "rtmp_server.h" -#include -#include +#include "../include/rtmp_capi.h" +#include "../include/rtmp_server.h" #include +#include -namespace rtmp_capi_internal -{ - struct RtmpServerImpl - { - rtmp::RTMPServer *server; - RtmpOnConnectCallback on_connect_cb; - void *on_connect_userdata; - RtmpOnPublishCallback on_publish_cb; - void *on_publish_userdata; - RtmpOnPlayCallback on_play_cb; - void *on_play_userdata; - RtmpOnAudioDataCallback on_audio_cb; - void *on_audio_userdata; - RtmpOnVideoDataCallback on_video_cb; - void *on_video_userdata; - RtmpOnDisconnectCallback on_disconnect_cb; - void *on_disconnect_userdata; - RtmpAuthCallback auth_cb; - void *auth_userdata; - RtmpServerImpl() : server(nullptr), on_connect_cb(nullptr), - on_connect_userdata(nullptr), - on_publish_cb(nullptr), on_publish_userdata(nullptr), on_play_cb(nullptr), - on_play_userdata(nullptr), - on_audio_cb(nullptr), on_audio_userdata(nullptr), on_video_cb(nullptr), - on_video_userdata(nullptr), - on_disconnect_cb(nullptr), on_disconnect_userdata(nullptr), auth_cb(nullptr), - auth_userdata(nullptr) {} - }; -} +namespace rtmp_capi_internal { +struct RtmpServerImpl { + rtmp::RTMPServer *server; + RtmpOnConnectCallback on_connect_cb; + void *on_connect_userdata; + RtmpOnPublishCallback on_publish_cb; + void *on_publish_userdata; + RtmpOnPlayCallback on_play_cb; + void *on_play_userdata; + RtmpOnAudioDataCallback on_audio_cb; + void *on_audio_userdata; + RtmpOnVideoDataCallback on_video_cb; + void *on_video_userdata; + RtmpOnDisconnectCallback on_disconnect_cb; + void *on_disconnect_userdata; + RtmpAuthCallback auth_cb; + void *auth_userdata; + RtmpServerImpl() + : server(nullptr), on_connect_cb(nullptr), on_connect_userdata(nullptr), + on_publish_cb(nullptr), on_publish_userdata(nullptr), + on_play_cb(nullptr), on_play_userdata(nullptr), on_audio_cb(nullptr), + on_audio_userdata(nullptr), on_video_cb(nullptr), + on_video_userdata(nullptr), on_disconnect_cb(nullptr), + on_disconnect_userdata(nullptr), auth_cb(nullptr), + auth_userdata(nullptr) {} +}; +} // namespace rtmp_capi_internal using Impl = rtmp_capi_internal::RtmpServerImpl; extern "C" { - RtmpServerHandle rtmp_server_create(int port) - { - Impl* impl = new Impl(); - impl->server = new rtmp::RTMPServer(port); - impl->server->setOnConnect([impl](std::shared_ptr session) - { - if (!impl || !impl->on_connect_cb) return; - const auto& info = session->getStreamInfo(); - impl->on_connect_cb(info.client_ip.c_str(), impl->on_connect_userdata); - }); - impl->server->setOnPublish([impl](std::shared_ptr session, - const std::string & app, const std::string & stream_key) - { - if (!impl || !impl->on_publish_cb) return; - const auto& info = session->getStreamInfo(); - impl->on_publish_cb(info.client_ip.c_str(), app.c_str(), stream_key.c_str(), - impl->on_publish_userdata); - }); - impl->server->setOnPlay([impl](std::shared_ptr session, - const std::string & app, const std::string & stream_key) - { - if (!impl || !impl->on_play_cb) return; - const auto& info = session->getStreamInfo(); - impl->on_play_cb(info.client_ip.c_str(), app.c_str(), stream_key.c_str(), - impl->on_play_userdata); - }); - impl->server->setOnAudioData([impl](std::shared_ptr session, - const std::vector &data, uint32_t timestamp) - { - if (!impl || !impl->on_audio_cb) return; - const auto& info = session->getStreamInfo(); - impl->on_audio_cb(info.app.c_str(), info.stream_key.c_str(), data.data(), - static_cast(data.size()), timestamp, impl->on_audio_userdata); - }); - impl->server->setOnVideoData([impl](std::shared_ptr session, - const std::vector &data, uint32_t timestamp) - { - if (!impl || !impl->on_video_cb) return; - const auto& info = session->getStreamInfo(); - impl->on_video_cb(info.app.c_str(), info.stream_key.c_str(), data.data(), - static_cast(data.size()), timestamp, impl->on_video_userdata); - }); - impl->server->setOnDisconnect([impl](std::shared_ptr - session) - { - if (!impl || !impl->on_disconnect_cb) return; - const auto& info = session->getStreamInfo(); - impl->on_disconnect_cb(info.client_ip.c_str(), info.app.c_str(), - info.stream_key.c_str(), info.is_publishing, info.is_playing, - impl->on_disconnect_userdata); - }); - impl->server->setAuthCallback([impl](const std::string & app, - const std::string & stream_key, const std::string & client_ip) -> bool - { - if (!impl || !impl->auth_cb) return true; - return impl->auth_cb(app.c_str(), stream_key.c_str(), client_ip.c_str(), impl->auth_userdata); - }); - return impl; - } +RtmpServerHandle rtmp_server_create(int port) { + Impl *impl = new Impl(); + impl->server = new rtmp::RTMPServer(port); + impl->server->setOnConnect( + [impl](std::shared_ptr session) { + if (!impl || !impl->on_connect_cb) + return; + const auto &info = session->getStreamInfo(); + impl->on_connect_cb(info.client_ip.c_str(), impl->on_connect_userdata); + }); + impl->server->setOnPublish([impl](std::shared_ptr session, + const std::string &app, + const std::string &stream_key) { + if (!impl || !impl->on_publish_cb) + return; + const auto &info = session->getStreamInfo(); + impl->on_publish_cb(info.client_ip.c_str(), app.c_str(), stream_key.c_str(), + impl->on_publish_userdata); + }); + impl->server->setOnPlay([impl](std::shared_ptr session, + const std::string &app, + const std::string &stream_key) { + if (!impl || !impl->on_play_cb) + return; + const auto &info = session->getStreamInfo(); + impl->on_play_cb(info.client_ip.c_str(), app.c_str(), stream_key.c_str(), + impl->on_play_userdata); + }); + impl->server->setOnAudioData( + [impl](std::shared_ptr session, + const std::vector &data, uint32_t timestamp) { + if (!impl || !impl->on_audio_cb) + return; + const auto &info = session->getStreamInfo(); + impl->on_audio_cb(info.app.c_str(), info.stream_key.c_str(), + data.data(), static_cast(data.size()), + timestamp, impl->on_audio_userdata); + }); + impl->server->setOnVideoData( + [impl](std::shared_ptr session, + const std::vector &data, uint32_t timestamp) { + if (!impl || !impl->on_video_cb) + return; + const auto &info = session->getStreamInfo(); + impl->on_video_cb(info.app.c_str(), info.stream_key.c_str(), + data.data(), static_cast(data.size()), + timestamp, impl->on_video_userdata); + }); + impl->server->setOnDisconnect( + [impl](std::shared_ptr session) { + if (!impl || !impl->on_disconnect_cb) + return; + const auto &info = session->getStreamInfo(); + impl->on_disconnect_cb(info.client_ip.c_str(), info.app.c_str(), + info.stream_key.c_str(), info.is_publishing, + info.is_playing, impl->on_disconnect_userdata); + }); + impl->server->setAuthCallback([impl](const std::string &app, + const std::string &stream_key, + const std::string &client_ip) -> bool { + if (!impl || !impl->auth_cb) + return true; + return impl->auth_cb(app.c_str(), stream_key.c_str(), client_ip.c_str(), + impl->auth_userdata); + }); + return impl; +} // add all other functions as above - void rtmp_server_destroy(RtmpServerHandle handle) - { - if (!handle) return; - Impl* impl = static_cast(handle); - delete impl->server; - delete impl; - } - bool rtmp_server_start(RtmpServerHandle handle) - { - if (!handle) return false; - Impl* impl = static_cast(handle); - return impl->server->start(); - } - void rtmp_server_stop(RtmpServerHandle handle) - { - if (!handle) return; - Impl* impl = static_cast(handle); - impl->server->stop(); - } - bool rtmp_server_is_running(RtmpServerHandle handle) - { - if (!handle) return false; - Impl* impl = static_cast(handle); - return impl->server->isRunning(); - } - void rtmp_server_set_on_connect(RtmpServerHandle handle, - RtmpOnConnectCallback cb, void* user_data) - { - if (!handle) return; - Impl* impl = static_cast(handle); - impl->on_connect_cb = cb; - impl->on_connect_userdata = user_data; - } - void rtmp_server_set_on_publish(RtmpServerHandle handle, - RtmpOnPublishCallback cb, void* user_data) - { - if (!handle) return; - Impl* impl = static_cast(handle); - impl->on_publish_cb = cb; - impl->on_publish_userdata = user_data; - } - void rtmp_server_set_on_play(RtmpServerHandle handle, RtmpOnPlayCallback cb, - void* user_data) - { - if (!handle) return; - Impl* impl = static_cast(handle); - impl->on_play_cb = cb; - impl->on_play_userdata = user_data; - } - void rtmp_server_set_on_audio_data(RtmpServerHandle handle, - RtmpOnAudioDataCallback cb, void* user_data) - { - if (!handle) return; - Impl* impl = static_cast(handle); - impl->on_audio_cb = cb; - impl->on_audio_userdata = user_data; - } - void rtmp_server_set_on_video_data(RtmpServerHandle handle, - RtmpOnVideoDataCallback cb, void* user_data) - { - if (!handle) return; - Impl* impl = static_cast(handle); - impl->on_video_cb = cb; - impl->on_video_userdata = user_data; - } - void rtmp_server_set_on_disconnect(RtmpServerHandle handle, - RtmpOnDisconnectCallback cb, void* user_data) - { - if (!handle) return; - Impl* impl = static_cast(handle); - impl->on_disconnect_cb = cb; - impl->on_disconnect_userdata = user_data; - } - void rtmp_server_set_auth_callback(RtmpServerHandle handle, RtmpAuthCallback cb, - void* user_data) - { - if (!handle) return; - Impl* impl = static_cast(handle); - impl->auth_cb = cb; - impl->auth_userdata = user_data; - } - void rtmp_server_enable_gop_cache(RtmpServerHandle handle, bool enable) - { - if (!handle) return; - static_cast(handle)->server->enableGOPCache(enable); - } - void rtmp_server_set_max_publishers_per_stream(RtmpServerHandle handle, - int max) - { - if (!handle) return; - static_cast(handle)->server->setMaxPublishersPerStream(max); - } - void rtmp_server_set_max_players_per_stream(RtmpServerHandle handle, int max) - { - if (!handle) return; - static_cast(handle)->server->setMaxPlayersPerStream(max); - } - void rtmp_server_set_max_total_connections(RtmpServerHandle handle, int max) - { - if (!handle) return; - static_cast(handle)->server->setMaxTotalConnections(max); - } - void rtmp_server_set_connection_timeout(RtmpServerHandle handle, int seconds) - { - if (!handle) return; - static_cast(handle)->server->setConnectionTimeout(seconds); - } - void rtmp_server_enable_ping_pong(RtmpServerHandle handle, bool enable, - int interval_seconds) - { - if (!handle) return; - static_cast(handle)->server->enablePingPong(enable, interval_seconds); - } - int rtmp_server_get_active_publishers(RtmpServerHandle handle) - { - if (!handle) return 0; - return static_cast(handle)->server->getActivePublishers(); - } - int rtmp_server_get_active_players(RtmpServerHandle handle) - { - if (!handle) return 0; - return static_cast(handle)->server->getActivePlayers(); - } - int rtmp_server_get_total_connections(RtmpServerHandle handle) - { - if (!handle) return 0; - return static_cast(handle)->server->getTotalConnections(); - } - struct RtmpStreamStats rtmp_server_get_stream_stats(RtmpServerHandle handle, - const char* app, const char* stream_key) - { - struct RtmpStreamStats stats = {0}; - if (!handle || !app || !stream_key) return stats; - Impl* impl = static_cast(handle); - auto cstats = impl->server->getStreamStats(app, stream_key); - stats.bytes_sent = cstats.bytes_sent; - stats.bytes_received = cstats.bytes_received; - stats.video_frames = cstats.video_frames; - stats.audio_frames = cstats.audio_frames; - stats.dropped_frames = cstats.dropped_frames; - stats.bitrate_kbps = cstats.getBitrate(); - stats.uptime_seconds = cstats.getUptime(); - return stats; - } - bool rtmp_server_start_recording(RtmpServerHandle handle, const char* app, - const char* stream_key, const char* filename) - { - if (!handle || !app || !stream_key || !filename) return false; - return static_cast(handle)->server->startRecording(app, stream_key, - filename); - } - void rtmp_server_stop_recording(RtmpServerHandle handle, const char* app, - const char* stream_key) - { - if (!handle || !app || !stream_key) return; - static_cast(handle)->server->stopRecording(app, stream_key); - } - bool rtmp_server_is_recording(RtmpServerHandle handle, const char* app, - const char* stream_key) - { - if (!handle || !app || !stream_key) return false; - return static_cast(handle)->server->isRecording(app, stream_key); - } - bool rtmp_server_broadcast_audio(RtmpServerHandle handle, const char* app, - const char* stream_key, const uint8_t* data, uint32_t length, - uint32_t timestamp) - { - if (!handle || !app || !stream_key || !data || length == 0) return false; - Impl* impl = static_cast(handle); - std::vector vec(data, data + length); - return impl->server->sendAudioToPlayers(app, stream_key, vec, timestamp); - } - bool rtmp_server_broadcast_video(RtmpServerHandle handle, const char* app, - const char* stream_key, const uint8_t* data, uint32_t length, - uint32_t timestamp) - { - if (!handle || !app || !stream_key || !data || length == 0) return false; - Impl* impl = static_cast(handle); - std::vector vec(data, data + length); - return impl->server->sendVideoToPlayers(app, stream_key, vec, timestamp); - } - bool rtmp_server_broadcast_metadata(RtmpServerHandle handle, const char* app, - const char* stream_key, const uint8_t* data, uint32_t length) - { - if (!handle || !app || !stream_key || !data || length == 0) return false; - Impl* impl = static_cast(handle); - std::vector vec(data, data + length); - impl->server->sendMetadataToPlayers(app, stream_key, vec); - return true; - } - void rtmp_logger_set_level(RtmpLogLevel level) - { - rtmp::Logger::getInstance().setLevel((rtmp::LogLevel)level); - } +void rtmp_server_destroy(RtmpServerHandle handle) { + if (!handle) + return; + Impl *impl = static_cast(handle); + delete impl->server; + delete impl; +} +bool rtmp_server_start(RtmpServerHandle handle) { + if (!handle) + return false; + Impl *impl = static_cast(handle); + return impl->server->start(); +} +void rtmp_server_stop(RtmpServerHandle handle) { + if (!handle) + return; + Impl *impl = static_cast(handle); + impl->server->stop(); +} +bool rtmp_server_is_running(RtmpServerHandle handle) { + if (!handle) + return false; + Impl *impl = static_cast(handle); + return impl->server->isRunning(); +} +void rtmp_server_set_on_connect(RtmpServerHandle handle, + RtmpOnConnectCallback cb, void *user_data) { + if (!handle) + return; + Impl *impl = static_cast(handle); + impl->on_connect_cb = cb; + impl->on_connect_userdata = user_data; +} +void rtmp_server_set_on_publish(RtmpServerHandle handle, + RtmpOnPublishCallback cb, void *user_data) { + if (!handle) + return; + Impl *impl = static_cast(handle); + impl->on_publish_cb = cb; + impl->on_publish_userdata = user_data; +} +void rtmp_server_set_on_play(RtmpServerHandle handle, RtmpOnPlayCallback cb, + void *user_data) { + if (!handle) + return; + Impl *impl = static_cast(handle); + impl->on_play_cb = cb; + impl->on_play_userdata = user_data; +} +void rtmp_server_set_on_audio_data(RtmpServerHandle handle, + RtmpOnAudioDataCallback cb, + void *user_data) { + if (!handle) + return; + Impl *impl = static_cast(handle); + impl->on_audio_cb = cb; + impl->on_audio_userdata = user_data; +} +void rtmp_server_set_on_video_data(RtmpServerHandle handle, + RtmpOnVideoDataCallback cb, + void *user_data) { + if (!handle) + return; + Impl *impl = static_cast(handle); + impl->on_video_cb = cb; + impl->on_video_userdata = user_data; +} +void rtmp_server_set_on_disconnect(RtmpServerHandle handle, + RtmpOnDisconnectCallback cb, + void *user_data) { + if (!handle) + return; + Impl *impl = static_cast(handle); + impl->on_disconnect_cb = cb; + impl->on_disconnect_userdata = user_data; +} +void rtmp_server_set_auth_callback(RtmpServerHandle handle, RtmpAuthCallback cb, + void *user_data) { + if (!handle) + return; + Impl *impl = static_cast(handle); + impl->auth_cb = cb; + impl->auth_userdata = user_data; +} +void rtmp_server_enable_gop_cache(RtmpServerHandle handle, bool enable) { + if (!handle) + return; + static_cast(handle)->server->enableGOPCache(enable); +} +void rtmp_server_set_max_publishers_per_stream(RtmpServerHandle handle, + int max) { + if (!handle) + return; + static_cast(handle)->server->setMaxPublishersPerStream(max); +} +void rtmp_server_set_max_players_per_stream(RtmpServerHandle handle, int max) { + if (!handle) + return; + static_cast(handle)->server->setMaxPlayersPerStream(max); +} +void rtmp_server_set_max_total_connections(RtmpServerHandle handle, int max) { + if (!handle) + return; + static_cast(handle)->server->setMaxTotalConnections(max); +} +void rtmp_server_set_connection_timeout(RtmpServerHandle handle, int seconds) { + if (!handle) + return; + static_cast(handle)->server->setConnectionTimeout(seconds); +} +void rtmp_server_enable_ping_pong(RtmpServerHandle handle, bool enable, + int interval_seconds) { + if (!handle) + return; + static_cast(handle)->server->enablePingPong(enable, interval_seconds); +} +int rtmp_server_get_active_publishers(RtmpServerHandle handle) { + if (!handle) + return 0; + return static_cast(handle)->server->getActivePublishers(); +} +int rtmp_server_get_active_players(RtmpServerHandle handle) { + if (!handle) + return 0; + return static_cast(handle)->server->getActivePlayers(); +} +int rtmp_server_get_total_connections(RtmpServerHandle handle) { + if (!handle) + return 0; + return static_cast(handle)->server->getTotalConnections(); +} +struct RtmpStreamStats rtmp_server_get_stream_stats(RtmpServerHandle handle, + const char *app, + const char *stream_key) { + struct RtmpStreamStats stats = {0}; + if (!handle || !app || !stream_key) + return stats; + Impl *impl = static_cast(handle); + auto cstats = impl->server->getStreamStats(app, stream_key); + stats.bytes_sent = cstats.bytes_sent; + stats.bytes_received = cstats.bytes_received; + stats.video_frames = cstats.video_frames; + stats.audio_frames = cstats.audio_frames; + stats.dropped_frames = cstats.dropped_frames; + stats.bitrate_kbps = cstats.getBitrate(); + stats.uptime_seconds = cstats.getUptime(); + return stats; +} +bool rtmp_server_start_recording(RtmpServerHandle handle, const char *app, + const char *stream_key, const char *filename) { + if (!handle || !app || !stream_key || !filename) + return false; + return static_cast(handle)->server->startRecording(app, stream_key, + filename); +} +void rtmp_server_stop_recording(RtmpServerHandle handle, const char *app, + const char *stream_key) { + if (!handle || !app || !stream_key) + return; + static_cast(handle)->server->stopRecording(app, stream_key); +} +bool rtmp_server_is_recording(RtmpServerHandle handle, const char *app, + const char *stream_key) { + if (!handle || !app || !stream_key) + return false; + return static_cast(handle)->server->isRecording(app, stream_key); +} +bool rtmp_server_broadcast_audio(RtmpServerHandle handle, const char *app, + const char *stream_key, const uint8_t *data, + uint32_t length, uint32_t timestamp) { + if (!handle || !app || !stream_key || !data || length == 0) + return false; + Impl *impl = static_cast(handle); + std::vector vec(data, data + length); + return impl->server->sendAudioToPlayers(app, stream_key, vec, timestamp); +} +bool rtmp_server_broadcast_video(RtmpServerHandle handle, const char *app, + const char *stream_key, const uint8_t *data, + uint32_t length, uint32_t timestamp) { + if (!handle || !app || !stream_key || !data || length == 0) + return false; + Impl *impl = static_cast(handle); + std::vector vec(data, data + length); + return impl->server->sendVideoToPlayers(app, stream_key, vec, timestamp); +} +bool rtmp_server_broadcast_metadata(RtmpServerHandle handle, const char *app, + const char *stream_key, const uint8_t *data, + uint32_t length) { + if (!handle || !app || !stream_key || !data || length == 0) + return false; + Impl *impl = static_cast(handle); + std::vector vec(data, data + length); + impl->server->sendMetadataToPlayers(app, stream_key, vec); + return true; +} +void rtmp_logger_set_level(RtmpLogLevel level) { + rtmp::Logger::getInstance().setLevel((rtmp::LogLevel)level); +} } \ No newline at end of file diff --git a/src/rtmp_server.cpp b/src/rtmp_server.cpp index 7cbfdf5..759fa8c 100644 --- a/src/rtmp_server.cpp +++ b/src/rtmp_server.cpp @@ -1,1765 +1,1563 @@ -#include "rtmp_server.h" -#include -#include +#include "../include/rtmp_server.h" #include -#include +#include #include -#include +#include -namespace rtmp -{ +namespace rtmp { // Utility functions - static uint16_t readUint16BE(const uint8_t* data) - { - return (data[0] << 8) | data[1]; - } +static uint16_t readUint16BE(const uint8_t *data) { + return (data[0] << 8) | data[1]; +} - static uint32_t readUint24BE(const uint8_t* data) - { - return (data[0] << 16) | (data[1] << 8) | data[2]; - } +static uint32_t readUint24BE(const uint8_t *data) { + return (data[0] << 16) | (data[1] << 8) | data[2]; +} - static uint32_t readUint32BE(const uint8_t* data) - { - return (data[0] << 24) | (data[1] << 16) | (data[2] << 8) | data[3]; - } +static uint32_t readUint32BE(const uint8_t *data) { + return (data[0] << 24) | (data[1] << 16) | (data[2] << 8) | data[3]; +} - static void writeUint16BE(uint8_t* data, uint16_t val) - { - data[0] = (val >> 8) & 0xFF; - data[1] = val & 0xFF; - } +static void writeUint16BE(uint8_t *data, uint16_t val) { + data[0] = (val >> 8) & 0xFF; + data[1] = val & 0xFF; +} - static void writeUint24BE(uint8_t* data, uint32_t val) - { - data[0] = (val >> 16) & 0xFF; - data[1] = (val >> 8) & 0xFF; - data[2] = val & 0xFF; - } +static void writeUint24BE(uint8_t *data, uint32_t val) { + data[0] = (val >> 16) & 0xFF; + data[1] = (val >> 8) & 0xFF; + data[2] = val & 0xFF; +} - static void writeUint32BE(uint8_t* data, uint32_t val) - { - data[0] = (val >> 24) & 0xFF; - data[1] = (val >> 16) & 0xFF; - data[2] = (val >> 8) & 0xFF; - data[3] = val & 0xFF; - } +static void writeUint32BE(uint8_t *data, uint32_t val) { + data[0] = (val >> 24) & 0xFF; + data[1] = (val >> 16) & 0xFF; + data[2] = (val >> 8) & 0xFF; + data[3] = val & 0xFF; +} - static double readDouble(const uint8_t* data) - { - uint64_t val = ((uint64_t)data[0] << 56) | ((uint64_t)data[1] << 48) | - ((uint64_t)data[2] << 40) | ((uint64_t)data[3] << 32) | - ((uint64_t)data[4] << 24) | ((uint64_t)data[5] << 16) | - ((uint64_t)data[6] << 8) | (uint64_t)data[7]; - double result; - memcpy(&result, &val, 8); - return result; - } +static double readDouble(const uint8_t *data) { + uint64_t val = ((uint64_t)data[0] << 56) | ((uint64_t)data[1] << 48) | + ((uint64_t)data[2] << 40) | ((uint64_t)data[3] << 32) | + ((uint64_t)data[4] << 24) | ((uint64_t)data[5] << 16) | + ((uint64_t)data[6] << 8) | (uint64_t)data[7]; + double result; + memcpy(&result, &val, 8); + return result; +} - static void writeDouble(uint8_t* data, double val) - { - uint64_t bits; - memcpy(&bits, &val, 8); - data[0] = (bits >> 56) & 0xFF; - data[1] = (bits >> 48) & 0xFF; - data[2] = (bits >> 40) & 0xFF; - data[3] = (bits >> 32) & 0xFF; - data[4] = (bits >> 24) & 0xFF; - data[5] = (bits >> 16) & 0xFF; - data[6] = (bits >> 8) & 0xFF; - data[7] = bits & 0xFF; - } +static void writeDouble(uint8_t *data, double val) { + uint64_t bits; + memcpy(&bits, &val, 8); + data[0] = (bits >> 56) & 0xFF; + data[1] = (bits >> 48) & 0xFF; + data[2] = (bits >> 40) & 0xFF; + data[3] = (bits >> 32) & 0xFF; + data[4] = (bits >> 24) & 0xFF; + data[5] = (bits >> 16) & 0xFF; + data[6] = (bits >> 8) & 0xFF; + data[7] = bits & 0xFF; +} // Logger Implementation - void Logger::log(LogLevel level, const std::string& msg) - { - if (level > current_level) return; - std::lock_guard lock(log_mutex); - auto now = std::chrono::system_clock::now(); - auto time = std::chrono::system_clock::to_time_t(now); - const char *level_str[] = {"ERROR", "WARN", "INFO", "DEBUG"}; - std::cout << "[" << std::put_time(std::localtime(&time), "%Y-%m-%d %H:%M:%S") - << "] [" << level_str[(int)level] << "] " << msg << std::endl; - } +void Logger::log(LogLevel level, const std::string &msg) { + if (level > current_level) + return; + std::lock_guard lock(log_mutex); + auto now = std::chrono::system_clock::now(); + auto time = std::chrono::system_clock::to_time_t(now); + const char *level_str[] = {"ERROR", "WARN", "INFO", "DEBUG"}; + std::cout << "[" << std::put_time(std::localtime(&time), "%Y-%m-%d %H:%M:%S") + << "] [" << level_str[(int)level] << "] " << msg << std::endl; +} - void Logger::error(const std::string& msg) - { - log(LogLevel::ERROR, msg); - } - void Logger::warn(const std::string& msg) - { - log(LogLevel::WARN, msg); - } - void Logger::info(const std::string& msg) - { - log(LogLevel::INFO, msg); - } - void Logger::debug(const std::string& msg) - { - log(LogLevel::DEBUG, msg); - } +void Logger::error(const std::string &msg) { log(LogLevel::ERROR, msg); } +void Logger::warn(const std::string &msg) { log(LogLevel::WARN, msg); } +void Logger::info(const std::string &msg) { log(LogLevel::INFO, msg); } +void Logger::debug(const std::string &msg) { log(LogLevel::DEBUG, msg); } // GOP Cache Implementation - bool GOPCache::isKeyframe(const std::vector &data) - { - if (data.empty()) return false; - uint8_t frame_type = (data[0] >> 4) & 0x0F; - return frame_type == 1; // 1 = keyframe (IDR) - } +bool GOPCache::isKeyframe(const std::vector &data) { + if (data.empty()) + return false; + uint8_t frame_type = (data[0] >> 4) & 0x0F; + return frame_type == 1; // 1 = keyframe (IDR) +} - void GOPCache::addVideoFrame(const std::vector &data, - uint32_t timestamp) - { - std::lock_guard lock(cache_mutex); - if (isKeyframe(data)) - { - frames.clear(); - has_keyframe = true; - } - if (has_keyframe) - { - CachedFrame frame; - frame.type = MessageType::VIDEO; - frame.data = data; - frame.timestamp = timestamp; - frames.push_back(frame); - if (frames.size() > 300) - { - for (size_t i = 1; i < frames.size(); i++) - { - if (frames[i].type == MessageType::VIDEO && isKeyframe(frames[i].data)) - { - frames.erase(frames.begin(), frames.begin() + i); - break; - } - } - } +void GOPCache::addVideoFrame(const std::vector &data, + uint32_t timestamp) { + std::lock_guard lock(cache_mutex); + if (isKeyframe(data)) { + frames.clear(); + has_keyframe = true; + } + if (has_keyframe) { + CachedFrame frame; + frame.type = MessageType::VIDEO; + frame.data = data; + frame.timestamp = timestamp; + frames.push_back(frame); + if (frames.size() > 300) { + for (size_t i = 1; i < frames.size(); i++) { + if (frames[i].type == MessageType::VIDEO && + isKeyframe(frames[i].data)) { + frames.erase(frames.begin(), frames.begin() + i); + break; } + } } + } +} - void GOPCache::addAudioFrame(const std::vector &data, - uint32_t timestamp) - { - std::lock_guard lock(cache_mutex); - if (has_keyframe) - { - CachedFrame frame; - frame.type = MessageType::AUDIO; - frame.data = data; - frame.timestamp = timestamp; - frames.push_back(frame); - } - } +void GOPCache::addAudioFrame(const std::vector &data, + uint32_t timestamp) { + std::lock_guard lock(cache_mutex); + if (has_keyframe) { + CachedFrame frame; + frame.type = MessageType::AUDIO; + frame.data = data; + frame.timestamp = timestamp; + frames.push_back(frame); + } +} - void GOPCache::addMetadata(const std::vector &data) - { - std::lock_guard lock(cache_mutex); - metadata = data; - } +void GOPCache::addMetadata(const std::vector &data) { + std::lock_guard lock(cache_mutex); + metadata = data; +} - void GOPCache::sendToPlayer(RTMPSession* session) - { - std::lock_guard lock(cache_mutex); - const auto& info = session->getStreamInfo(); - if (!metadata.empty()) - { - session->sendChunk(4, 0, (uint8_t)MessageType::DATA_AMF0, info.stream_id, - metadata); - } - for (const auto& frame : frames) - { - session->sendChunk(4, frame.timestamp, (uint8_t)frame.type, - info.stream_id, frame.data); - } - } +void GOPCache::sendToPlayer(RTMPSession *session) { + std::lock_guard lock(cache_mutex); + const auto &info = session->getStreamInfo(); + if (!metadata.empty()) { + session->sendChunk(4, 0, (uint8_t)MessageType::DATA_AMF0, info.stream_id, + metadata); + } + for (const auto &frame : frames) { + session->sendChunk(4, frame.timestamp, (uint8_t)frame.type, info.stream_id, + frame.data); + } +} - void GOPCache::clear() - { - std::lock_guard lock(cache_mutex); - frames.clear(); - metadata.clear(); - has_keyframe = false; - } +void GOPCache::clear() { + std::lock_guard lock(cache_mutex); + frames.clear(); + metadata.clear(); + has_keyframe = false; +} // FLV Recorder Implementation - FLVRecorder::FLVRecorder(const std::string& filename) : filename(filename) {} +FLVRecorder::FLVRecorder(const std::string &filename) : filename(filename) {} - FLVRecorder::~FLVRecorder() - { - stop(); - } +FLVRecorder::~FLVRecorder() { stop(); } - bool FLVRecorder::start() - { - std::lock_guard lock(file_mutex); - file.open(filename, std::ios::binary); - if (!file.is_open()) return false; - writeFLVHeader(); - recording = true; - return true; - } +bool FLVRecorder::start() { + std::lock_guard lock(file_mutex); + file.open(filename, std::ios::binary); + if (!file.is_open()) + return false; + writeFLVHeader(); + recording = true; + return true; +} - void FLVRecorder::stop() - { - std::lock_guard lock(file_mutex); - if (file.is_open()) - { - file.close(); - } - recording = false; - } +void FLVRecorder::stop() { + std::lock_guard lock(file_mutex); + if (file.is_open()) { + file.close(); + } + recording = false; +} - void FLVRecorder::writeFLVHeader() - { - uint8_t header[] = - { - 'F', 'L', 'V', 0x01, 0x05, - 0x00, 0x00, 0x00, 0x09 - }; - file.write(reinterpret_cast(header), sizeof(header)); - uint32_t prev_size = 0; - file.write(reinterpret_cast(&prev_size), 4); - } +void FLVRecorder::writeFLVHeader() { + uint8_t header[] = {'F', 'L', 'V', 0x01, 0x05, 0x00, 0x00, 0x00, 0x09}; + file.write(reinterpret_cast(header), sizeof(header)); + uint32_t prev_size = 0; + file.write(reinterpret_cast(&prev_size), 4); +} - void FLVRecorder::writeFLVTag(uint8_t tag_type, - const std::vector &data, - uint32_t timestamp) - { - if (!recording || !file.is_open()) return; - std::lock_guard lock(file_mutex); - uint32_t data_size = data.size(); - file.put(tag_type); - file.put((data_size >> 16) & 0xFF); - file.put((data_size >> 8) & 0xFF); - file.put(data_size & 0xFF); - file.put((timestamp >> 16) & 0xFF); - file.put((timestamp >> 8) & 0xFF); - file.put(timestamp & 0xFF); - file.put((timestamp >> 24) & 0xFF); - file.put(0); file.put(0); file.put(0); - file.write(reinterpret_cast(data.data()), data.size()); - uint32_t tag_size = 11 + data_size; - file.put((tag_size >> 24) & 0xFF); - file.put((tag_size >> 16) & 0xFF); - file.put((tag_size >> 8) & 0xFF); - file.put(tag_size & 0xFF); - last_timestamp = timestamp; - } +void FLVRecorder::writeFLVTag(uint8_t tag_type, + const std::vector &data, + uint32_t timestamp) { + if (!recording || !file.is_open()) + return; + std::lock_guard lock(file_mutex); + uint32_t data_size = data.size(); + file.put(tag_type); + file.put((data_size >> 16) & 0xFF); + file.put((data_size >> 8) & 0xFF); + file.put(data_size & 0xFF); + file.put((timestamp >> 16) & 0xFF); + file.put((timestamp >> 8) & 0xFF); + file.put(timestamp & 0xFF); + file.put((timestamp >> 24) & 0xFF); + file.put(0); + file.put(0); + file.put(0); + file.write(reinterpret_cast(data.data()), data.size()); + uint32_t tag_size = 11 + data_size; + file.put((tag_size >> 24) & 0xFF); + file.put((tag_size >> 16) & 0xFF); + file.put((tag_size >> 8) & 0xFF); + file.put(tag_size & 0xFF); + last_timestamp = timestamp; +} - void FLVRecorder::writeVideoFrame(const std::vector &data, - uint32_t timestamp) - { - writeFLVTag(0x09, data, timestamp); - } +void FLVRecorder::writeVideoFrame(const std::vector &data, + uint32_t timestamp) { + writeFLVTag(0x09, data, timestamp); +} - void FLVRecorder::writeAudioFrame(const std::vector &data, - uint32_t timestamp) - { - writeFLVTag(0x08, data, timestamp); - } +void FLVRecorder::writeAudioFrame(const std::vector &data, + uint32_t timestamp) { + writeFLVTag(0x08, data, timestamp); +} - void FLVRecorder::writeMetadata(const - std::map> &metadata) - { - auto encoded = encodeMetadata(metadata); - writeFLVTag(0x12, encoded, 0); - } +void FLVRecorder::writeMetadata( + const std::map> &metadata) { + auto encoded = encodeMetadata(metadata); + writeFLVTag(0x12, encoded, 0); +} - std::vector FLVRecorder::encodeMetadata( - const std::map> &metadata) - { - std::vector result; - result.push_back(0x02); - uint16_t len = 10; - result.push_back((len >> 8) & 0xFF); - result.push_back(len & 0xFF); - std::string meta_str = "onMetaData"; - result.insert(result.end(), meta_str.begin(), meta_str.end()); - result.push_back(0x08); - uint32_t count = metadata.size(); - result.push_back((count >> 24) & 0xFF); - result.push_back((count >> 16) & 0xFF); - result.push_back((count >> 8) & 0xFF); - result.push_back(count & 0xFF); - for (const auto& pair : metadata) - { - uint16_t key_len = pair.first.size(); - result.push_back((key_len >> 8) & 0xFF); - result.push_back(key_len & 0xFF); - result.insert(result.end(), pair.first.begin(), pair.first.end()); - if (pair.second->type == AMF0Type::NUMBER) - { - result.push_back(0x00); - uint64_t bits; - memcpy(&bits, &pair.second->number, 8); - for (int i = 7; i >= 0; i--) - { - result.push_back((bits >> (i * 8)) & 0xFF); - } - } - else if (pair.second->type == AMF0Type::STRING) - { - result.push_back(0x02); - uint16_t str_len = pair.second->string.size(); - result.push_back((str_len >> 8) & 0xFF); - result.push_back(str_len & 0xFF); - result.insert(result.end(), pair.second->string.begin(), - pair.second->string.end()); - } - } - result.push_back(0x00); result.push_back(0x00); result.push_back(0x09); - return result; +std::vector FLVRecorder::encodeMetadata( + const std::map> &metadata) { + std::vector result; + result.push_back(0x02); + uint16_t len = 10; + result.push_back((len >> 8) & 0xFF); + result.push_back(len & 0xFF); + std::string meta_str = "onMetaData"; + result.insert(result.end(), meta_str.begin(), meta_str.end()); + result.push_back(0x08); + uint32_t count = metadata.size(); + result.push_back((count >> 24) & 0xFF); + result.push_back((count >> 16) & 0xFF); + result.push_back((count >> 8) & 0xFF); + result.push_back(count & 0xFF); + for (const auto &pair : metadata) { + uint16_t key_len = pair.first.size(); + result.push_back((key_len >> 8) & 0xFF); + result.push_back(key_len & 0xFF); + result.insert(result.end(), pair.first.begin(), pair.first.end()); + if (pair.second->type == AMF0Type::NUMBER) { + result.push_back(0x00); + uint64_t bits; + memcpy(&bits, &pair.second->number, 8); + for (int i = 7; i >= 0; i--) { + result.push_back((bits >> (i * 8)) & 0xFF); + } + } else if (pair.second->type == AMF0Type::STRING) { + result.push_back(0x02); + uint16_t str_len = pair.second->string.size(); + result.push_back((str_len >> 8) & 0xFF); + result.push_back(str_len & 0xFF); + result.insert(result.end(), pair.second->string.begin(), + pair.second->string.end()); } + } + result.push_back(0x00); + result.push_back(0x00); + result.push_back(0x09); + return result; +} // RTMPSession Implementation - RTMPSession::RTMPSession(int fd, const std::string& client_ip) - : client_fd(fd), chunk_size(128), window_ack_size(2500000), - peer_bandwidth(2500000), bytes_received(0), last_ack_sent(0) - { - stream_info.is_publishing = false; - stream_info.is_playing = false; - stream_info.client_fd = fd; - stream_info.stream_id = 0; - stream_info.client_ip = client_ip; - last_activity = std::chrono::steady_clock::now(); - } +RTMPSession::RTMPSession(int fd, const std::string &client_ip) + : client_fd(fd), chunk_size(128), window_ack_size(2500000), + peer_bandwidth(2500000), bytes_received(0), last_ack_sent(0) { + stream_info.is_publishing = false; + stream_info.is_playing = false; + stream_info.client_fd = fd; + stream_info.stream_id = 0; + stream_info.client_ip = client_ip; + last_activity = std::chrono::steady_clock::now(); +} - RTMPSession::~RTMPSession() - { - if (client_fd >= 0) - { - close(client_fd); - } - } +RTMPSession::~RTMPSession() { + if (client_fd >= 0) { + close(client_fd); + } +} - bool RTMPSession::readExactly(uint8_t* buf, size_t len) +bool RTMPSession::readExactly(uint8_t *buf, size_t len) { + size_t total = 0; + while (total < len) { + ssize_t n = recv(client_fd, buf + total, len - total, 0); + if (n <= 0) + return false; + total += n; + onBytesReceived(n); + } + updateActivity(); + return true; +} + +bool RTMPSession::writeExactly(const uint8_t *buf, size_t len) { + size_t total = 0; + while (total < len) { + ssize_t n = send(client_fd, buf + total, len - total, MSG_NOSIGNAL); + if (n <= 0) + return false; + total += n; + } + stats.bytes_sent += len; + updateActivity(); + return true; +} + +void RTMPSession::onBytesReceived(size_t bytes) { + bytes_received += bytes; + stats.bytes_received += bytes; +} + +bool RTMPSession::shouldSendAck() const { + return (bytes_received - last_ack_sent) >= window_ack_size; +} + +void RTMPSession::sendAcknowledgement() { + std::vector ack(4); + writeUint32BE(ack.data(), bytes_received); + sendChunk(2, 0, (uint8_t)MessageType::ACKNOWLEDGEMENT, 0, ack); + last_ack_sent = bytes_received; + LOG_DEBUG("Sent ACK: " + std::to_string(bytes_received)); +} + +void RTMPSession::sendPing(uint32_t timestamp) { + std::vector ping_msg(6); + writeUint16BE(ping_msg.data(), (uint16_t)UserControlType::PING_REQUEST); + writeUint32BE(ping_msg.data() + 2, timestamp); + sendChunk(2, 0, (uint8_t)MessageType::USER_CONTROL, 0, ping_msg); +} + +void RTMPSession::sendPong(uint32_t timestamp) { + std::vector pong_msg(6); + writeUint16BE(pong_msg.data(), (uint16_t)UserControlType::PING_RESPONSE); + writeUint32BE(pong_msg.data() + 2, timestamp); + sendChunk(2, 0, (uint8_t)MessageType::USER_CONTROL, 0, pong_msg); +} + +bool RTMPSession::handshake() { + uint8_t c0c1[1537]; + if (!readExactly(c0c1, 1537)) + return false; + if (c0c1[0] != 3) + return false; + uint8_t s0s1[1537]; + s0s1[0] = 3; + memset(s0s1 + 1, 0, 8); + for (int i = 9; i < 1537; i++) { + s0s1[i] = rand() % 256; + } + if (!writeExactly(s0s1, 1537)) + return false; + if (!writeExactly(c0c1 + 1, 1536)) + return false; + uint8_t c2[1536]; + if (!readExactly(c2, 1536)) + return false; + return true; +} + +bool RTMPSession::parseChunkHeader(ChunkHeader &header) { + uint8_t basic_header; + if (!readExactly(&basic_header, 1)) + return false; + header.fmt = (basic_header >> 6) & 0x03; + header.csid = basic_header & 0x3F; + if (header.csid == 0) { + uint8_t csid_byte; + if (!readExactly(&csid_byte, 1)) + return false; + header.csid = 64 + csid_byte; + } else if (header.csid == 1) { + uint8_t csid_bytes[2]; + if (!readExactly(csid_bytes, 2)) + return false; + header.csid = 64 + csid_bytes[0] + (csid_bytes[1] * 256); + } + ChunkHeader prev = prev_headers[header.csid]; + header.has_extended_timestamp = false; + if (header.fmt == 0) { + uint8_t buf[11]; + if (!readExactly(buf, 11)) + return false; + header.timestamp = readUint24BE(buf); + header.msg_length = readUint24BE(buf + 3); + header.msg_type_id = buf[6]; + header.msg_stream_id = + buf[7] | (buf[8] << 8) | (buf[9] << 16) | (buf[10] << 24); + if (header.timestamp == 0xFFFFFF) { + uint8_t ext_ts[4]; + if (!readExactly(ext_ts, 4)) + return false; + header.timestamp = readUint32BE(ext_ts); + header.has_extended_timestamp = true; + } + } else if (header.fmt == 1) { + uint8_t buf[7]; + if (!readExactly(buf, 7)) + return false; + uint32_t timestamp_delta = readUint24BE(buf); + header.msg_length = readUint24BE(buf + 3); + header.msg_type_id = buf[6]; + header.msg_stream_id = prev.msg_stream_id; + if (timestamp_delta == 0xFFFFFF) { + uint8_t ext_ts[4]; + if (!readExactly(ext_ts, 4)) + return false; + timestamp_delta = readUint32BE(ext_ts); + header.has_extended_timestamp = true; + } + header.timestamp = prev.timestamp + timestamp_delta; + } else if (header.fmt == 2) { + uint8_t buf[3]; + if (!readExactly(buf, 3)) + return false; + uint32_t timestamp_delta = readUint24BE(buf); + header.msg_length = prev.msg_length; + header.msg_type_id = prev.msg_type_id; + header.msg_stream_id = prev.msg_stream_id; + if (timestamp_delta == 0xFFFFFF) { + uint8_t ext_ts[4]; + if (!readExactly(ext_ts, 4)) + return false; + timestamp_delta = readUint32BE(ext_ts); + header.has_extended_timestamp = true; + } + header.timestamp = prev.timestamp + timestamp_delta; + } else { + header.timestamp = prev.timestamp; + header.msg_length = prev.msg_length; + header.msg_type_id = prev.msg_type_id; + header.msg_stream_id = prev.msg_stream_id; + } + prev_headers[header.csid] = header; + return true; +} + +bool RTMPSession::receiveChunk() { + ChunkHeader header; + if (!parseChunkHeader(header)) + return false; + auto &incomplete = incomplete_chunks[header.csid]; + size_t to_read = std::min((size_t)chunk_size, + (size_t)(header.msg_length - incomplete.size())); + std::vector chunk_data(to_read); + if (!readExactly(chunk_data.data(), to_read)) + return false; + incomplete.insert(incomplete.end(), chunk_data.begin(), chunk_data.end()); + if (incomplete.size() >= header.msg_length) { + RTMPMessage msg; + msg.header = header; + msg.payload = incomplete; + incomplete.clear(); + incomplete_chunks.erase(header.csid); { - size_t total = 0; - while (total < len) - { - ssize_t n = recv(client_fd, buf + total, len - total, 0); - if (n <= 0) return false; - total += n; - onBytesReceived(n); - } - updateActivity(); + std::lock_guard lock(queue_mutex); + message_queue.push(msg); + } + // Send ACK if needed + if (shouldSendAck()) { + sendAcknowledgement(); + } + return processMessage(msg); + } + return true; +} + +bool RTMPSession::sendChunk(uint32_t csid, uint32_t timestamp, uint8_t msg_type, + uint32_t stream_id, + const std::vector &data) { + size_t sent = 0; + bool first = true; + while (sent < data.size()) { + std::vector chunk; + uint8_t fmt = first ? 0 : 3; + if (csid < 64) { + chunk.push_back((fmt << 6) | csid); + } else if (csid < 320) { + chunk.push_back(fmt << 6); + chunk.push_back((csid - 64) & 0xFF); + } else { + chunk.push_back((fmt << 6) | 1); + chunk.push_back((csid - 64) & 0xFF); + chunk.push_back(((csid - 64) >> 8) & 0xFF); + } + if (first) { + uint8_t msg_header[11]; + writeUint24BE(msg_header, timestamp >= 0xFFFFFF ? 0xFFFFFF : timestamp); + writeUint24BE(msg_header + 3, data.size()); + msg_header[6] = msg_type; + msg_header[7] = stream_id & 0xFF; + msg_header[8] = (stream_id >> 8) & 0xFF; + msg_header[9] = (stream_id >> 16) & 0xFF; + msg_header[10] = (stream_id >> 24) & 0xFF; + chunk.insert(chunk.end(), msg_header, msg_header + 11); + if (timestamp >= 0xFFFFFF) { + uint8_t ext_ts[4]; + writeUint32BE(ext_ts, timestamp); + chunk.insert(chunk.end(), ext_ts, ext_ts + 4); + } + } + size_t to_send = std::min(chunk_size, (uint32_t)(data.size() - sent)); + chunk.insert(chunk.end(), data.begin() + sent, + data.begin() + sent + to_send); + if (!writeExactly(chunk.data(), chunk.size())) + return false; + sent += to_send; + first = false; + } + return true; +} + +std::shared_ptr RTMPSession::decodeAMF0(const uint8_t *data, + size_t len, size_t &offset) { + if (offset >= len) + return nullptr; + auto val = std::make_shared(); + val->type = (AMF0Type)data[offset++]; + switch (val->type) { + case AMF0Type::NUMBER: + if (offset + 8 > len) + return nullptr; + val->number = readDouble(data + offset); + offset += 8; + break; + case AMF0Type::BOOLEAN: + if (offset >= len) + return nullptr; + val->boolean = data[offset++] != 0; + break; + case AMF0Type::STRING: { + if (offset + 2 > len) + return nullptr; + uint16_t str_len = readUint16BE(data + offset); + offset += 2; + if (offset + str_len > len) + return nullptr; + val->string = std::string((char *)(data + offset), str_len); + offset += str_len; + break; + } + case AMF0Type::OBJECT: + while (offset < len) { + if (offset + 2 > len) + return nullptr; + uint16_t key_len = readUint16BE(data + offset); + offset += 2; + if (key_len == 0 && offset < len && + data[offset] == (uint8_t)AMF0Type::OBJECT_END) { + offset++; + break; + } + if (offset + key_len > len) + return nullptr; + std::string key((char *)(data + offset), key_len); + offset += key_len; + auto value = decodeAMF0(data, len, offset); + if (!value) + return nullptr; + val->object[key] = value; + } + break; + case AMF0Type::NULL_TYPE: + case AMF0Type::UNDEFINED: + break; + case AMF0Type::ECMA_ARRAY: + if (offset + 4 > len) + return nullptr; + offset += 4; + while (offset < len) { + if (offset + 2 > len) + return nullptr; + uint16_t key_len = readUint16BE(data + offset); + offset += 2; + if (key_len == 0 && offset < len && + data[offset] == (uint8_t)AMF0Type::OBJECT_END) { + offset++; + break; + } + if (offset + key_len > len) + return nullptr; + std::string key((char *)(data + offset), key_len); + offset += key_len; + auto value = decodeAMF0(data, len, offset); + if (!value) + return nullptr; + val->object[key] = value; + } + break; + case AMF0Type::OBJECT_END: + break; + } + return val; +} + +std::vector RTMPSession::encodeAMF0String(const std::string &str) { + std::vector result; + result.push_back((uint8_t)AMF0Type::STRING); + uint8_t len_buf[2]; + writeUint16BE(len_buf, str.size()); + result.insert(result.end(), len_buf, len_buf + 2); + result.insert(result.end(), str.begin(), str.end()); + return result; +} + +std::vector RTMPSession::encodeAMF0Number(double num) { + std::vector result; + result.push_back((uint8_t)AMF0Type::NUMBER); + uint8_t num_buf[8]; + writeDouble(num_buf, num); + result.insert(result.end(), num_buf, num_buf + 8); + return result; +} + +std::vector RTMPSession::encodeAMF0Object( + const std::map> &obj) { + std::vector result; + result.push_back((uint8_t)AMF0Type::OBJECT); + for (const auto &pair : obj) { + uint8_t key_len[2]; + writeUint16BE(key_len, pair.first.size()); + result.insert(result.end(), key_len, key_len + 2); + result.insert(result.end(), pair.first.begin(), pair.first.end()); + auto encoded = encodeAMF0(*pair.second); + result.insert(result.end(), encoded.begin(), encoded.end()); + } + result.push_back(0); + result.push_back(0); + result.push_back((uint8_t)AMF0Type::OBJECT_END); + return result; +} + +std::vector RTMPSession::encodeAMF0(const AMF0Value &value) { + switch (value.type) { + case AMF0Type::NUMBER: + return encodeAMF0Number(value.number); + case AMF0Type::STRING: + return encodeAMF0String(value.string); + case AMF0Type::OBJECT: + return encodeAMF0Object(value.object); + case AMF0Type::NULL_TYPE: { + std::vector result; + result.push_back((uint8_t)AMF0Type::NULL_TYPE); + return result; + } + case AMF0Type::BOOLEAN: { + std::vector result; + result.push_back((uint8_t)AMF0Type::BOOLEAN); + result.push_back(value.boolean ? 1 : 0); + return result; + } + default: + return std::vector(); + } +} + +bool RTMPSession::processMessage(const RTMPMessage &msg) { + MessageType type = (MessageType)msg.header.msg_type_id; + switch (type) { + case MessageType::SET_CHUNK_SIZE: + if (msg.payload.size() >= 4) { + chunk_size = readUint32BE(msg.payload.data()) & 0x7FFFFFFF; + LOG_DEBUG("Chunk size set to: " + std::to_string(chunk_size)); + } + break; + case MessageType::WINDOW_ACK_SIZE: + if (msg.payload.size() >= 4) { + window_ack_size = readUint32BE(msg.payload.data()); + LOG_DEBUG("Window ACK size set to: " + std::to_string(window_ack_size)); + } + break; + case MessageType::SET_PEER_BANDWIDTH: + if (msg.payload.size() >= 5) { + peer_bandwidth = readUint32BE(msg.payload.data()); + LOG_DEBUG("Peer bandwidth set to: " + std::to_string(peer_bandwidth)); + } + break; + case MessageType::COMMAND_AMF0: + return handleCommand(msg); + case MessageType::AUDIO: + return handleAudioMessage(msg); + case MessageType::VIDEO: + return handleVideoMessage(msg); + case MessageType::DATA_AMF0: + return handleDataMessage(msg); + case MessageType::USER_CONTROL: + return handleUserControl(msg); + case MessageType::ACKNOWLEDGEMENT: + return handleAcknowledgement(msg); + default: + break; + } + return true; +} + +bool RTMPSession::handleUserControl(const RTMPMessage &msg) { + if (msg.payload.size() < 2) + return true; + uint16_t event_type = readUint16BE(msg.payload.data()); + UserControlType type = (UserControlType)event_type; + switch (type) { + case UserControlType::PING_REQUEST: + if (msg.payload.size() >= 6) { + uint32_t timestamp = readUint32BE(msg.payload.data() + 2); + sendPong(timestamp); + LOG_DEBUG("Received PING, sent PONG"); + } + break; + case UserControlType::PING_RESPONSE: + LOG_DEBUG("Received PONG response"); + break; + default: + break; + } + return true; +} + +bool RTMPSession::handleAcknowledgement(const RTMPMessage &msg) { + if (msg.payload.size() >= 4) { + uint32_t ack_value = readUint32BE(msg.payload.data()); + LOG_DEBUG("Received ACK: " + std::to_string(ack_value)); + } + return true; +} + +bool RTMPSession::handleAudioMessage(const RTMPMessage &msg) { + stats.audio_frames++; + return true; +} + +bool RTMPSession::handleVideoMessage(const RTMPMessage &msg) { + stats.video_frames++; + return true; +} + +bool RTMPSession::handleDataMessage(const RTMPMessage &msg) { + size_t offset = 0; + auto command = decodeAMF0(msg.payload.data(), msg.payload.size(), offset); + if (!command || command->type != AMF0Type::STRING) + return true; + if (command->string == "@setDataFrame" || command->string == "onMetaData") { + if (command->string == "@setDataFrame") { + auto metadata_name = + decodeAMF0(msg.payload.data(), msg.payload.size(), offset); + if (!metadata_name || metadata_name->type != AMF0Type::STRING) return true; } - - bool RTMPSession::writeExactly(const uint8_t* buf, size_t len) - { - size_t total = 0; - while (total < len) - { - ssize_t n = send(client_fd, buf + total, len - total, MSG_NOSIGNAL); - if (n <= 0) return false; - total += n; - } - stats.bytes_sent += len; - updateActivity(); - return true; + auto metadata_obj = + decodeAMF0(msg.payload.data(), msg.payload.size(), offset); + if (metadata_obj && (metadata_obj->type == AMF0Type::OBJECT || + metadata_obj->type == AMF0Type::ECMA_ARRAY)) { + LOG_INFO("Received metadata"); } + } + return true; +} - void RTMPSession::onBytesReceived(size_t bytes) - { - bytes_received += bytes; - stats.bytes_received += bytes; - } +bool RTMPSession::handleCommand(const RTMPMessage &msg) { + size_t offset = 0; + std::vector> args; + while (offset < msg.payload.size()) { + auto arg = decodeAMF0(msg.payload.data(), msg.payload.size(), offset); + if (!arg) + break; + args.push_back(arg); + } + if (args.empty() || args[0]->type != AMF0Type::STRING) + return false; + std::string command = args[0]->string; + LOG_DEBUG("Received command: " + command); + if (command == "connect") { + return handleConnect(args); + } else if (command == "releaseStream") { + return handleReleaseStream(args); + } else if (command == "FCPublish") { + return handleFCPublish(args); + } else if (command == "createStream") { + return handleCreateStream(args); + } else if (command == "publish") { + return handlePublish(args); + } else if (command == "play") { + return handlePlay(args); + } else if (command == "deleteStream") { + return handleDeleteStream(args); + } + return true; +} - bool RTMPSession::shouldSendAck() const - { - return (bytes_received - last_ack_sent) >= window_ack_size; +bool RTMPSession::handleConnect( + const std::vector> &args) { + if (args.size() < 3) + return false; + double transaction_id = args[1]->number; + if (args[2]->type == AMF0Type::OBJECT) { + auto &props = args[2]->object; + if (props.find("app") != props.end() && + props["app"]->type == AMF0Type::STRING) { + stream_info.app = props["app"]->string; + LOG_INFO("App: " + stream_info.app); } + } + std::vector ack_size(4); + writeUint32BE(ack_size.data(), 2500000); + sendChunk(2, 0, (uint8_t)MessageType::WINDOW_ACK_SIZE, 0, ack_size); + std::vector bandwidth(5); + writeUint32BE(bandwidth.data(), 2500000); + bandwidth[4] = 2; + sendChunk(2, 0, (uint8_t)MessageType::SET_PEER_BANDWIDTH, 0, bandwidth); + std::vector stream_begin(6); + writeUint16BE(stream_begin.data(), (uint16_t)UserControlType::STREAM_BEGIN); + writeUint32BE(stream_begin.data() + 2, 0); + sendChunk(2, 0, (uint8_t)MessageType::USER_CONTROL, 0, stream_begin); + std::vector chunk_size_msg(4); + writeUint32BE(chunk_size_msg.data(), 4096); + sendChunk(2, 0, (uint8_t)MessageType::SET_CHUNK_SIZE, 0, chunk_size_msg); + chunk_size = 4096; + return sendConnectResponse(transaction_id); +} - void RTMPSession::sendAcknowledgement() - { - std::vector ack(4); - writeUint32BE(ack.data(), bytes_received); - sendChunk(2, 0, (uint8_t)MessageType::ACKNOWLEDGEMENT, 0, ack); - last_ack_sent = bytes_received; - LOG_DEBUG("Sent ACK: " + std::to_string(bytes_received)); - } +bool RTMPSession::sendConnectResponse(double transaction_id) { + std::vector response; + auto cmd = encodeAMF0String("_result"); + response.insert(response.end(), cmd.begin(), cmd.end()); + auto tid = encodeAMF0Number(transaction_id); + response.insert(response.end(), tid.begin(), tid.end()); + AMF0Value props; + props.type = AMF0Type::OBJECT; + props.object["fmsVer"] = std::make_shared(); + props.object["fmsVer"]->type = AMF0Type::STRING; + props.object["fmsVer"]->string = "FMS/3,0,1,123"; + props.object["capabilities"] = std::make_shared(); + props.object["capabilities"]->type = AMF0Type::NUMBER; + props.object["capabilities"]->number = 31; + auto props_enc = encodeAMF0(props); + response.insert(response.end(), props_enc.begin(), props_enc.end()); + AMF0Value info; + info.type = AMF0Type::OBJECT; + info.object["level"] = std::make_shared(); + info.object["level"]->type = AMF0Type::STRING; + info.object["level"]->string = "status"; + info.object["code"] = std::make_shared(); + info.object["code"]->type = AMF0Type::STRING; + info.object["code"]->string = "NetConnection.Connect.Success"; + info.object["description"] = std::make_shared(); + info.object["description"]->type = AMF0Type::STRING; + info.object["description"]->string = "Connection succeeded."; + info.object["objectEncoding"] = std::make_shared(); + info.object["objectEncoding"]->type = AMF0Type::NUMBER; + info.object["objectEncoding"]->number = 0; + auto info_enc = encodeAMF0(info); + response.insert(response.end(), info_enc.begin(), info_enc.end()); + return sendChunk(3, 0, (uint8_t)MessageType::COMMAND_AMF0, 0, response); +} - void RTMPSession::sendPing(uint32_t timestamp) - { - std::vector ping_msg(6); - writeUint16BE(ping_msg.data(), (uint16_t)UserControlType::PING_REQUEST); - writeUint32BE(ping_msg.data() + 2, timestamp); - sendChunk(2, 0, (uint8_t)MessageType::USER_CONTROL, 0, ping_msg); - } +bool RTMPSession::handleReleaseStream( + const std::vector> &args) { + return true; +} - void RTMPSession::sendPong(uint32_t timestamp) - { - std::vector pong_msg(6); - writeUint16BE(pong_msg.data(), (uint16_t)UserControlType::PING_RESPONSE); - writeUint32BE(pong_msg.data() + 2, timestamp); - sendChunk(2, 0, (uint8_t)MessageType::USER_CONTROL, 0, pong_msg); - } +bool RTMPSession::handleFCPublish( + const std::vector> &args) { + return true; +} - bool RTMPSession::handshake() - { - uint8_t c0c1[1537]; - if (!readExactly(c0c1, 1537)) return false; - if (c0c1[0] != 3) return false; - uint8_t s0s1[1537]; - s0s1[0] = 3; - memset(s0s1 + 1, 0, 8); - for (int i = 9; i < 1537; i++) - { - s0s1[i] = rand() % 256; - } - if (!writeExactly(s0s1, 1537)) return false; - if (!writeExactly(c0c1 + 1, 1536)) return false; - uint8_t c2[1536]; - if (!readExactly(c2, 1536)) return false; - return true; - } +bool RTMPSession::handleCreateStream( + const std::vector> &args) { + if (args.size() < 2) + return false; + double transaction_id = args[1]->number; + stream_info.stream_id = 1; + return sendCreateStreamResponse(transaction_id, 1); +} - bool RTMPSession::parseChunkHeader(ChunkHeader& header) - { - uint8_t basic_header; - if (!readExactly(&basic_header, 1)) return false; - header.fmt = (basic_header >> 6) & 0x03; - header.csid = basic_header & 0x3F; - if (header.csid == 0) - { - uint8_t csid_byte; - if (!readExactly(&csid_byte, 1)) return false; - header.csid = 64 + csid_byte; - } - else if (header.csid == 1) - { - uint8_t csid_bytes[2]; - if (!readExactly(csid_bytes, 2)) return false; - header.csid = 64 + csid_bytes[0] + (csid_bytes[1] * 256); - } - ChunkHeader prev = prev_headers[header.csid]; - header.has_extended_timestamp = false; - if (header.fmt == 0) - { - uint8_t buf[11]; - if (!readExactly(buf, 11)) return false; - header.timestamp = readUint24BE(buf); - header.msg_length = readUint24BE(buf + 3); - header.msg_type_id = buf[6]; - header.msg_stream_id = buf[7] | (buf[8] << 8) | (buf[9] << 16) | - (buf[10] << 24); - if (header.timestamp == 0xFFFFFF) - { - uint8_t ext_ts[4]; - if (!readExactly(ext_ts, 4)) return false; - header.timestamp = readUint32BE(ext_ts); - header.has_extended_timestamp = true; - } - } - else if (header.fmt == 1) - { - uint8_t buf[7]; - if (!readExactly(buf, 7)) return false; - uint32_t timestamp_delta = readUint24BE(buf); - header.msg_length = readUint24BE(buf + 3); - header.msg_type_id = buf[6]; - header.msg_stream_id = prev.msg_stream_id; - if (timestamp_delta == 0xFFFFFF) - { - uint8_t ext_ts[4]; - if (!readExactly(ext_ts, 4)) return false; - timestamp_delta = readUint32BE(ext_ts); - header.has_extended_timestamp = true; - } - header.timestamp = prev.timestamp + timestamp_delta; - } - else if (header.fmt == 2) - { - uint8_t buf[3]; - if (!readExactly(buf, 3)) return false; - uint32_t timestamp_delta = readUint24BE(buf); - header.msg_length = prev.msg_length; - header.msg_type_id = prev.msg_type_id; - header.msg_stream_id = prev.msg_stream_id; - if (timestamp_delta == 0xFFFFFF) - { - uint8_t ext_ts[4]; - if (!readExactly(ext_ts, 4)) return false; - timestamp_delta = readUint32BE(ext_ts); - header.has_extended_timestamp = true; - } - header.timestamp = prev.timestamp + timestamp_delta; - } - else - { - header.timestamp = prev.timestamp; - header.msg_length = prev.msg_length; - header.msg_type_id = prev.msg_type_id; - header.msg_stream_id = prev.msg_stream_id; - } - prev_headers[header.csid] = header; - return true; - } +bool RTMPSession::sendCreateStreamResponse(double transaction_id, + double stream_id) { + std::vector response; + auto cmd = encodeAMF0String("_result"); + response.insert(response.end(), cmd.begin(), cmd.end()); + auto tid = encodeAMF0Number(transaction_id); + response.insert(response.end(), tid.begin(), tid.end()); + AMF0Value null_val; + null_val.type = AMF0Type::NULL_TYPE; + auto null_enc = encodeAMF0(null_val); + response.insert(response.end(), null_enc.begin(), null_enc.end()); + auto sid = encodeAMF0Number(stream_id); + response.insert(response.end(), sid.begin(), sid.end()); + return sendChunk(3, 0, (uint8_t)MessageType::COMMAND_AMF0, 0, response); +} - bool RTMPSession::receiveChunk() - { - ChunkHeader header; - if (!parseChunkHeader(header)) return false; - auto& incomplete = incomplete_chunks[header.csid]; - size_t to_read = std::min((size_t)chunk_size, - (size_t)(header.msg_length - incomplete.size())); - std::vector chunk_data(to_read); - if (!readExactly(chunk_data.data(), to_read)) return false; - incomplete.insert(incomplete.end(), chunk_data.begin(), chunk_data.end()); - if (incomplete.size() >= header.msg_length) - { - RTMPMessage msg; - msg.header = header; - msg.payload = incomplete; - incomplete.clear(); - incomplete_chunks.erase(header.csid); - { - std::lock_guard lock(queue_mutex); - message_queue.push(msg); - } - // Send ACK if needed - if (shouldSendAck()) - { - sendAcknowledgement(); - } - return processMessage(msg); - } - return true; - } +bool RTMPSession::handlePublish( + const std::vector> &args) { + if (args.size() < 4) + return false; + if (args[3]->type == AMF0Type::STRING) { + stream_info.stream_key = args[3]->string; + stream_info.is_publishing = true; + LOG_INFO("Publishing to: " + stream_info.stream_key); + } + return sendPublishResponse(); +} - bool RTMPSession::sendChunk(uint32_t csid, uint32_t timestamp, uint8_t msg_type, - uint32_t stream_id, const std::vector &data) - { - size_t sent = 0; - bool first = true; - while (sent < data.size()) - { - std::vector chunk; - uint8_t fmt = first ? 0 : 3; - if (csid < 64) - { - chunk.push_back((fmt << 6) | csid); - } - else if (csid < 320) - { - chunk.push_back(fmt << 6); - chunk.push_back((csid - 64) & 0xFF); - } - else - { - chunk.push_back((fmt << 6) | 1); - chunk.push_back((csid - 64) & 0xFF); - chunk.push_back(((csid - 64) >> 8) & 0xFF); - } - if (first) - { - uint8_t msg_header[11]; - writeUint24BE(msg_header, timestamp >= 0xFFFFFF ? 0xFFFFFF : timestamp); - writeUint24BE(msg_header + 3, data.size()); - msg_header[6] = msg_type; - msg_header[7] = stream_id & 0xFF; - msg_header[8] = (stream_id >> 8) & 0xFF; - msg_header[9] = (stream_id >> 16) & 0xFF; - msg_header[10] = (stream_id >> 24) & 0xFF; - chunk.insert(chunk.end(), msg_header, msg_header + 11); - if (timestamp >= 0xFFFFFF) - { - uint8_t ext_ts[4]; - writeUint32BE(ext_ts, timestamp); - chunk.insert(chunk.end(), ext_ts, ext_ts + 4); - } - } - size_t to_send = std::min(chunk_size, (uint32_t)(data.size() - sent)); - chunk.insert(chunk.end(), data.begin() + sent, data.begin() + sent + to_send); - if (!writeExactly(chunk.data(), chunk.size())) return false; - sent += to_send; - first = false; - } - return true; - } +bool RTMPSession::sendPublishResponse() { + std::vector response; + auto cmd = encodeAMF0String("onStatus"); + response.insert(response.end(), cmd.begin(), cmd.end()); + auto tid = encodeAMF0Number(0); + response.insert(response.end(), tid.begin(), tid.end()); + AMF0Value null_val; + null_val.type = AMF0Type::NULL_TYPE; + auto null_enc = encodeAMF0(null_val); + response.insert(response.end(), null_enc.begin(), null_enc.end()); + AMF0Value info; + info.type = AMF0Type::OBJECT; + info.object["level"] = std::make_shared(); + info.object["level"]->type = AMF0Type::STRING; + info.object["level"]->string = "status"; + info.object["code"] = std::make_shared(); + info.object["code"]->type = AMF0Type::STRING; + info.object["code"]->string = "NetStream.Publish.Start"; + info.object["description"] = std::make_shared(); + info.object["description"]->type = AMF0Type::STRING; + info.object["description"]->string = "Stream is now published."; + auto info_enc = encodeAMF0(info); + response.insert(response.end(), info_enc.begin(), info_enc.end()); + return sendChunk(5, 0, (uint8_t)MessageType::COMMAND_AMF0, + stream_info.stream_id, response); +} - std::shared_ptr RTMPSession::decodeAMF0(const uint8_t* data, - size_t len, size_t &offset) - { - if (offset >= len) return nullptr; - auto val = std::make_shared(); - val->type = (AMF0Type)data[offset++]; - switch (val->type) - { - case AMF0Type::NUMBER: - if (offset + 8 > len) return nullptr; - val->number = readDouble(data + offset); - offset += 8; - break; - case AMF0Type::BOOLEAN: - if (offset >= len) return nullptr; - val->boolean = data[offset++] != 0; - break; - case AMF0Type::STRING: - { - if (offset + 2 > len) return nullptr; - uint16_t str_len = readUint16BE(data + offset); - offset += 2; - if (offset + str_len > len) return nullptr; - val->string = std::string((char*)(data + offset), str_len); - offset += str_len; - break; - } - case AMF0Type::OBJECT: - while (offset < len) - { - if (offset + 2 > len) return nullptr; - uint16_t key_len = readUint16BE(data + offset); - offset += 2; - if (key_len == 0 && offset < len && - data[offset] == (uint8_t)AMF0Type::OBJECT_END) - { - offset++; - break; - } - if (offset + key_len > len) return nullptr; - std::string key((char*)(data + offset), key_len); - offset += key_len; - auto value = decodeAMF0(data, len, offset); - if (!value) return nullptr; - val->object[key] = value; - } - break; - case AMF0Type::NULL_TYPE: - case AMF0Type::UNDEFINED: - break; - case AMF0Type::ECMA_ARRAY: - if (offset + 4 > len) return nullptr; - offset += 4; - while (offset < len) - { - if (offset + 2 > len) return nullptr; - uint16_t key_len = readUint16BE(data + offset); - offset += 2; - if (key_len == 0 && offset < len && - data[offset] == (uint8_t)AMF0Type::OBJECT_END) - { - offset++; - break; - } - if (offset + key_len > len) return nullptr; - std::string key((char*)(data + offset), key_len); - offset += key_len; - auto value = decodeAMF0(data, len, offset); - if (!value) return nullptr; - val->object[key] = value; - } - break; - } - return val; - } +bool RTMPSession::handlePlay( + const std::vector> &args) { + if (args.size() < 4) + return false; + if (args[3]->type == AMF0Type::STRING) { + stream_info.stream_key = args[3]->string; + stream_info.is_playing = true; + LOG_INFO("Playing: " + stream_info.stream_key); + } + return sendPlayResponse(); +} - std::vector RTMPSession::encodeAMF0String(const std::string& str) - { - std::vector result; - result.push_back((uint8_t)AMF0Type::STRING); - uint8_t len_buf[2]; - writeUint16BE(len_buf, str.size()); - result.insert(result.end(), len_buf, len_buf + 2); - result.insert(result.end(), str.begin(), str.end()); - return result; - } +bool RTMPSession::sendPlayResponse() { + std::vector response; + auto cmd = encodeAMF0String("onStatus"); + response.insert(response.end(), cmd.begin(), cmd.end()); + auto tid = encodeAMF0Number(0); + response.insert(response.end(), tid.begin(), tid.end()); + AMF0Value null_val; + null_val.type = AMF0Type::NULL_TYPE; + auto null_enc = encodeAMF0(null_val); + response.insert(response.end(), null_enc.begin(), null_enc.end()); + AMF0Value info; + info.type = AMF0Type::OBJECT; + info.object["level"] = std::make_shared(); + info.object["level"]->type = AMF0Type::STRING; + info.object["level"]->string = "status"; + info.object["code"] = std::make_shared(); + info.object["code"]->type = AMF0Type::STRING; + info.object["code"]->string = "NetStream.Play.Start"; + info.object["description"] = std::make_shared(); + info.object["description"]->type = AMF0Type::STRING; + info.object["description"]->string = "Stream is now playing."; + auto info_enc = encodeAMF0(info); + response.insert(response.end(), info_enc.begin(), info_enc.end()); + return sendChunk(5, 0, (uint8_t)MessageType::COMMAND_AMF0, + stream_info.stream_id, response); +} - std::vector RTMPSession::encodeAMF0Number(double num) - { - std::vector result; - result.push_back((uint8_t)AMF0Type::NUMBER); - uint8_t num_buf[8]; - writeDouble(num_buf, num); - result.insert(result.end(), num_buf, num_buf + 8); - return result; - } +bool RTMPSession::handleDeleteStream( + const std::vector> &args) { + stream_info.is_publishing = false; + stream_info.is_playing = false; + LOG_INFO("Stream deleted"); + return true; +} - std::vector RTMPSession::encodeAMF0Object(const - std::map> &obj) - { - std::vector result; - result.push_back((uint8_t)AMF0Type::OBJECT); - for (const auto& pair : obj) - { - uint8_t key_len[2]; - writeUint16BE(key_len, pair.first.size()); - result.insert(result.end(), key_len, key_len + 2); - result.insert(result.end(), pair.first.begin(), pair.first.end()); - auto encoded = encodeAMF0(*pair.second); - result.insert(result.end(), encoded.begin(), encoded.end()); - } - result.push_back(0); - result.push_back(0); - result.push_back((uint8_t)AMF0Type::OBJECT_END); - return result; - } - - std::vector RTMPSession::encodeAMF0(const AMF0Value& value) - { - switch (value.type) - { - case AMF0Type::NUMBER: - return encodeAMF0Number(value.number); - case AMF0Type::STRING: - return encodeAMF0String(value.string); - case AMF0Type::OBJECT: - return encodeAMF0Object(value.object); - case AMF0Type::NULL_TYPE: - { - std::vector result; - result.push_back((uint8_t)AMF0Type::NULL_TYPE); - return result; - } - case AMF0Type::BOOLEAN: - { - std::vector result; - result.push_back((uint8_t)AMF0Type::BOOLEAN); - result.push_back(value.boolean ? 1 : 0); - return result; - } - default: - return std::vector(); - } - } - - bool RTMPSession::processMessage(const RTMPMessage& msg) - { - MessageType type = (MessageType)msg.header.msg_type_id; - switch (type) - { - case MessageType::SET_CHUNK_SIZE: - if (msg.payload.size() >= 4) - { - chunk_size = readUint32BE(msg.payload.data()) & 0x7FFFFFFF; - LOG_DEBUG("Chunk size set to: " + std::to_string(chunk_size)); - } - break; - case MessageType::WINDOW_ACK_SIZE: - if (msg.payload.size() >= 4) - { - window_ack_size = readUint32BE(msg.payload.data()); - LOG_DEBUG("Window ACK size set to: " + std::to_string(window_ack_size)); - } - break; - case MessageType::SET_PEER_BANDWIDTH: - if (msg.payload.size() >= 5) - { - peer_bandwidth = readUint32BE(msg.payload.data()); - LOG_DEBUG("Peer bandwidth set to: " + std::to_string(peer_bandwidth)); - } - break; - case MessageType::COMMAND_AMF0: - return handleCommand(msg); - case MessageType::AUDIO: - return handleAudioMessage(msg); - case MessageType::VIDEO: - return handleVideoMessage(msg); - case MessageType::DATA_AMF0: - return handleDataMessage(msg); - case MessageType::USER_CONTROL: - return handleUserControl(msg); - case MessageType::ACKNOWLEDGEMENT: - return handleAcknowledgement(msg); - default: - break; - } - return true; - } - - bool RTMPSession::handleUserControl(const RTMPMessage& msg) - { - if (msg.payload.size() < 2) return true; - uint16_t event_type = readUint16BE(msg.payload.data()); - UserControlType type = (UserControlType)event_type; - switch (type) - { - case UserControlType::PING_REQUEST: - if (msg.payload.size() >= 6) - { - uint32_t timestamp = readUint32BE(msg.payload.data() + 2); - sendPong(timestamp); - LOG_DEBUG("Received PING, sent PONG"); - } - break; - case UserControlType::PING_RESPONSE: - LOG_DEBUG("Received PONG response"); - break; - default: - break; - } - return true; - } - - bool RTMPSession::handleAcknowledgement(const RTMPMessage& msg) - { - if (msg.payload.size() >= 4) - { - uint32_t ack_value = readUint32BE(msg.payload.data()); - LOG_DEBUG("Received ACK: " + std::to_string(ack_value)); - } - return true; - } - - bool RTMPSession::handleAudioMessage(const RTMPMessage& msg) - { - stats.audio_frames++; - return true; - } - - bool RTMPSession::handleVideoMessage(const RTMPMessage& msg) - { - stats.video_frames++; - return true; - } - - bool RTMPSession::handleDataMessage(const RTMPMessage& msg) - { - size_t offset = 0; - auto command = decodeAMF0(msg.payload.data(), msg.payload.size(), offset); - if (!command || command->type != AMF0Type::STRING) return true; - if (command->string == "@setDataFrame" || command->string == "onMetaData") - { - if (command->string == "@setDataFrame") - { - auto metadata_name = decodeAMF0(msg.payload.data(), msg.payload.size(), offset); - if (!metadata_name || metadata_name->type != AMF0Type::STRING) return true; - } - auto metadata_obj = decodeAMF0(msg.payload.data(), msg.payload.size(), offset); - if (metadata_obj && (metadata_obj->type == AMF0Type::OBJECT || - metadata_obj->type == AMF0Type::ECMA_ARRAY)) - { - LOG_INFO("Received metadata"); - } - } - return true; - } - - bool RTMPSession::handleCommand(const RTMPMessage& msg) - { - size_t offset = 0; - std::vector> args; - while (offset < msg.payload.size()) - { - auto arg = decodeAMF0(msg.payload.data(), msg.payload.size(), offset); - if (!arg) break; - args.push_back(arg); - } - if (args.empty() || args[0]->type != AMF0Type::STRING) return false; - std::string command = args[0]->string; - LOG_DEBUG("Received command: " + command); - if (command == "connect") - { - return handleConnect(args); - } - else if (command == "releaseStream") - { - return handleReleaseStream(args); - } - else if (command == "FCPublish") - { - return handleFCPublish(args); - } - else if (command == "createStream") - { - return handleCreateStream(args); - } - else if (command == "publish") - { - return handlePublish(args); - } - else if (command == "play") - { - return handlePlay(args); - } - else if (command == "deleteStream") - { - return handleDeleteStream(args); - } - return true; - } - - bool RTMPSession::handleConnect(const std::vector> - &args) - { - if (args.size() < 3) return false; - double transaction_id = args[1]->number; - if (args[2]->type == AMF0Type::OBJECT) - { - auto& props = args[2]->object; - if (props.find("app") != props.end() && props["app"]->type == AMF0Type::STRING) - { - stream_info.app = props["app"]->string; - LOG_INFO("App: " + stream_info.app); - } - } - std::vector ack_size(4); - writeUint32BE(ack_size.data(), 2500000); - sendChunk(2, 0, (uint8_t)MessageType::WINDOW_ACK_SIZE, 0, ack_size); - std::vector bandwidth(5); - writeUint32BE(bandwidth.data(), 2500000); - bandwidth[4] = 2; - sendChunk(2, 0, (uint8_t)MessageType::SET_PEER_BANDWIDTH, 0, bandwidth); - std::vector stream_begin(6); - writeUint16BE(stream_begin.data(), (uint16_t)UserControlType::STREAM_BEGIN); - writeUint32BE(stream_begin.data() + 2, 0); - sendChunk(2, 0, (uint8_t)MessageType::USER_CONTROL, 0, stream_begin); - std::vector chunk_size_msg(4); - writeUint32BE(chunk_size_msg.data(), 4096); - sendChunk(2, 0, (uint8_t)MessageType::SET_CHUNK_SIZE, 0, chunk_size_msg); - chunk_size = 4096; - return sendConnectResponse(transaction_id); - } - - bool RTMPSession::sendConnectResponse(double transaction_id) - { - std::vector response; - auto cmd = encodeAMF0String("_result"); - response.insert(response.end(), cmd.begin(), cmd.end()); - auto tid = encodeAMF0Number(transaction_id); - response.insert(response.end(), tid.begin(), tid.end()); - AMF0Value props; - props.type = AMF0Type::OBJECT; - props.object["fmsVer"] = std::make_shared(); - props.object["fmsVer"]->type = AMF0Type::STRING; - props.object["fmsVer"]->string = "FMS/3,0,1,123"; - props.object["capabilities"] = std::make_shared(); - props.object["capabilities"]->type = AMF0Type::NUMBER; - props.object["capabilities"]->number = 31; - auto props_enc = encodeAMF0(props); - response.insert(response.end(), props_enc.begin(), props_enc.end()); - AMF0Value info; - info.type = AMF0Type::OBJECT; - info.object["level"] = std::make_shared(); - info.object["level"]->type = AMF0Type::STRING; - info.object["level"]->string = "status"; - info.object["code"] = std::make_shared(); - info.object["code"]->type = AMF0Type::STRING; - info.object["code"]->string = "NetConnection.Connect.Success"; - info.object["description"] = std::make_shared(); - info.object["description"]->type = AMF0Type::STRING; - info.object["description"]->string = "Connection succeeded."; - info.object["objectEncoding"] = std::make_shared(); - info.object["objectEncoding"]->type = AMF0Type::NUMBER; - info.object["objectEncoding"]->number = 0; - auto info_enc = encodeAMF0(info); - response.insert(response.end(), info_enc.begin(), info_enc.end()); - return sendChunk(3, 0, (uint8_t)MessageType::COMMAND_AMF0, 0, response); - } - - bool RTMPSession::handleReleaseStream(const - std::vector> &args) - { - return true; - } - - bool RTMPSession::handleFCPublish(const - std::vector> &args) - { - return true; - } - - bool RTMPSession::handleCreateStream(const - std::vector> &args) - { - if (args.size() < 2) return false; - double transaction_id = args[1]->number; - stream_info.stream_id = 1; - return sendCreateStreamResponse(transaction_id, 1); - } - - bool RTMPSession::sendCreateStreamResponse(double transaction_id, - double stream_id) - { - std::vector response; - auto cmd = encodeAMF0String("_result"); - response.insert(response.end(), cmd.begin(), cmd.end()); - auto tid = encodeAMF0Number(transaction_id); - response.insert(response.end(), tid.begin(), tid.end()); - AMF0Value null_val; - null_val.type = AMF0Type::NULL_TYPE; - auto null_enc = encodeAMF0(null_val); - response.insert(response.end(), null_enc.begin(), null_enc.end()); - auto sid = encodeAMF0Number(stream_id); - response.insert(response.end(), sid.begin(), sid.end()); - return sendChunk(3, 0, (uint8_t)MessageType::COMMAND_AMF0, 0, response); - } - - bool RTMPSession::handlePublish(const std::vector> - &args) - { - if (args.size() < 4) return false; - if (args[3]->type == AMF0Type::STRING) - { - stream_info.stream_key = args[3]->string; - stream_info.is_publishing = true; - LOG_INFO("Publishing to: " + stream_info.stream_key); - } - return sendPublishResponse(); - } - - bool RTMPSession::sendPublishResponse() - { - std::vector response; - auto cmd = encodeAMF0String("onStatus"); - response.insert(response.end(), cmd.begin(), cmd.end()); - auto tid = encodeAMF0Number(0); - response.insert(response.end(), tid.begin(), tid.end()); - AMF0Value null_val; - null_val.type = AMF0Type::NULL_TYPE; - auto null_enc = encodeAMF0(null_val); - response.insert(response.end(), null_enc.begin(), null_enc.end()); - AMF0Value info; - info.type = AMF0Type::OBJECT; - info.object["level"] = std::make_shared(); - info.object["level"]->type = AMF0Type::STRING; - info.object["level"]->string = "status"; - info.object["code"] = std::make_shared(); - info.object["code"]->type = AMF0Type::STRING; - info.object["code"]->string = "NetStream.Publish.Start"; - info.object["description"] = std::make_shared(); - info.object["description"]->type = AMF0Type::STRING; - info.object["description"]->string = "Stream is now published."; - auto info_enc = encodeAMF0(info); - response.insert(response.end(), info_enc.begin(), info_enc.end()); - return sendChunk(5, 0, (uint8_t)MessageType::COMMAND_AMF0, - stream_info.stream_id, response); - } - - bool RTMPSession::handlePlay(const std::vector> - &args) - { - if (args.size() < 4) return false; - if (args[3]->type == AMF0Type::STRING) - { - stream_info.stream_key = args[3]->string; - stream_info.is_playing = true; - LOG_INFO("Playing: " + stream_info.stream_key); - } - return sendPlayResponse(); - } - - bool RTMPSession::sendPlayResponse() - { - std::vector response; - auto cmd = encodeAMF0String("onStatus"); - response.insert(response.end(), cmd.begin(), cmd.end()); - auto tid = encodeAMF0Number(0); - response.insert(response.end(), tid.begin(), tid.end()); - AMF0Value null_val; - null_val.type = AMF0Type::NULL_TYPE; - auto null_enc = encodeAMF0(null_val); - response.insert(response.end(), null_enc.begin(), null_enc.end()); - AMF0Value info; - info.type = AMF0Type::OBJECT; - info.object["level"] = std::make_shared(); - info.object["level"]->type = AMF0Type::STRING; - info.object["level"]->string = "status"; - info.object["code"] = std::make_shared(); - info.object["code"]->type = AMF0Type::STRING; - info.object["code"]->string = "NetStream.Play.Start"; - info.object["description"] = std::make_shared(); - info.object["description"]->type = AMF0Type::STRING; - info.object["description"]->string = "Stream is now playing."; - auto info_enc = encodeAMF0(info); - response.insert(response.end(), info_enc.begin(), info_enc.end()); - return sendChunk(5, 0, (uint8_t)MessageType::COMMAND_AMF0, - stream_info.stream_id, response); - } - - bool RTMPSession::handleDeleteStream(const - std::vector> &args) - { - stream_info.is_publishing = false; - stream_info.is_playing = false; - LOG_INFO("Stream deleted"); - return true; - } - - bool RTMPSession::sendErrorResponse(const std::string& command, - double transaction_id, - const std::string& description) - { - std::vector response; - auto cmd = encodeAMF0String("_error"); - response.insert(response.end(), cmd.begin(), cmd.end()); - auto tid = encodeAMF0Number(transaction_id); - response.insert(response.end(), tid.begin(), tid.end()); - AMF0Value null_val; - null_val.type = AMF0Type::NULL_TYPE; - auto null_enc = encodeAMF0(null_val); - response.insert(response.end(), null_enc.begin(), null_enc.end()); - AMF0Value info; - info.type = AMF0Type::OBJECT; - info.object["level"] = std::make_shared(); - info.object["level"]->type = AMF0Type::STRING; - info.object["level"]->string = "error"; - info.object["code"] = std::make_shared(); - info.object["code"]->type = AMF0Type::STRING; - info.object["code"]->string = "NetConnection.Call.Failed"; - info.object["description"] = std::make_shared(); - info.object["description"]->type = AMF0Type::STRING; - info.object["description"]->string = description; - auto info_enc = encodeAMF0(info); - response.insert(response.end(), info_enc.begin(), info_enc.end()); - return sendChunk(3, 0, (uint8_t)MessageType::COMMAND_AMF0, 0, response); - } +bool RTMPSession::sendErrorResponse(const std::string &command, + double transaction_id, + const std::string &description) { + std::vector response; + auto cmd = encodeAMF0String("_error"); + response.insert(response.end(), cmd.begin(), cmd.end()); + auto tid = encodeAMF0Number(transaction_id); + response.insert(response.end(), tid.begin(), tid.end()); + AMF0Value null_val; + null_val.type = AMF0Type::NULL_TYPE; + auto null_enc = encodeAMF0(null_val); + response.insert(response.end(), null_enc.begin(), null_enc.end()); + AMF0Value info; + info.type = AMF0Type::OBJECT; + info.object["level"] = std::make_shared(); + info.object["level"]->type = AMF0Type::STRING; + info.object["level"]->string = "error"; + info.object["code"] = std::make_shared(); + info.object["code"]->type = AMF0Type::STRING; + info.object["code"]->string = "NetConnection.Call.Failed"; + info.object["description"] = std::make_shared(); + info.object["description"]->type = AMF0Type::STRING; + info.object["description"]->string = description; + auto info_enc = encodeAMF0(info); + response.insert(response.end(), info_enc.begin(), info_enc.end()); + return sendChunk(3, 0, (uint8_t)MessageType::COMMAND_AMF0, 0, response); +} // RTMPServer Implementation - RTMPServer::RTMPServer(int port) : port(port), server_fd(-1), running(false) {} +RTMPServer::RTMPServer(int port) : port(port), server_fd(-1), running(false) {} - RTMPServer::~RTMPServer() - { - stop(); +RTMPServer::~RTMPServer() { stop(); } + +bool RTMPServer::start() { + server_fd = socket(AF_INET, SOCK_STREAM, 0); + if (server_fd < 0) { + LOG_ERROR("Failed to create socket"); + return false; + } + int opt = 1; + if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) { + close(server_fd); + return false; + } + sockaddr_in addr; + memset(&addr, 0, sizeof(addr)); + addr.sin_family = AF_INET; + addr.sin_addr.s_addr = INADDR_ANY; + addr.sin_port = htons(port); + if (bind(server_fd, (sockaddr *)&addr, sizeof(addr)) < 0) { + LOG_ERROR("Failed to bind socket"); + close(server_fd); + return false; + } + if (listen(server_fd, 10) < 0) { + LOG_ERROR("Failed to listen on socket"); + close(server_fd); + return false; + } + running = true; + accept_thread = std::thread(&RTMPServer::acceptClients, this); + if (ping_enabled) { + ping_thread = std::thread(&RTMPServer::pingClientsRoutine, this); + } + timeout_thread = std::thread(&RTMPServer::timeoutCheckRoutine, this); + LOG_INFO("RTMP Server started on port " + std::to_string(port)); + return true; +} + +void RTMPServer::stop() { + if (!running) + return; + running = false; + if (server_fd >= 0) { + close(server_fd); + server_fd = -1; + } + if (accept_thread.joinable()) { + accept_thread.join(); + } + if (ping_thread.joinable()) { + ping_thread.join(); + } + if (timeout_thread.joinable()) { + timeout_thread.join(); + } + for (auto &thread : client_threads) { + if (thread.joinable()) { + thread.join(); } + } + client_threads.clear(); + std::lock_guard lock(sessions_mutex); + sessions.clear(); + LOG_INFO("RTMP Server stopped"); +} - bool RTMPServer::start() - { - server_fd = socket(AF_INET, SOCK_STREAM, 0); - if (server_fd < 0) - { - LOG_ERROR("Failed to create socket"); - return false; - } - int opt = 1; - if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) - { - close(server_fd); - return false; - } - sockaddr_in addr; - memset(&addr, 0, sizeof(addr)); - addr.sin_family = AF_INET; - addr.sin_addr.s_addr = INADDR_ANY; - addr.sin_port = htons(port); - if (bind(server_fd, (sockaddr *)&addr, sizeof(addr)) < 0) - { - LOG_ERROR("Failed to bind socket"); - close(server_fd); - return false; - } - if (listen(server_fd, 10) < 0) - { - LOG_ERROR("Failed to listen on socket"); - close(server_fd); - return false; - } - running = true; - accept_thread = std::thread(&RTMPServer::acceptClients, this); - if (ping_enabled) - { - ping_thread = std::thread(&RTMPServer::pingClientsRoutine, this); - } - timeout_thread = std::thread(&RTMPServer::timeoutCheckRoutine, this); - LOG_INFO("RTMP Server started on port " + std::to_string(port)); - return true; +void RTMPServer::acceptClients() { + while (running) { + sockaddr_in client_addr; + socklen_t addr_len = sizeof(client_addr); + int client_fd = accept(server_fd, (sockaddr *)&client_addr, &addr_len); + if (client_fd < 0) { + if (running) { + LOG_ERROR("Failed to accept client"); + } + continue; } - - void RTMPServer::stop() + std::string client_ip = inet_ntoa(client_addr.sin_addr); + LOG_INFO("New client connected: " + client_ip); + // Check connection limit { - if (!running) return; - running = false; - if (server_fd >= 0) - { - close(server_fd); - server_fd = -1; - } - if (accept_thread.joinable()) - { - accept_thread.join(); - } - if (ping_thread.joinable()) - { - ping_thread.join(); - } - if (timeout_thread.joinable()) - { - timeout_thread.join(); - } - for (auto& thread : client_threads) - { - if (thread.joinable()) - { - thread.join(); - } - } - client_threads.clear(); - std::lock_guard lock(sessions_mutex); - sessions.clear(); - LOG_INFO("RTMP Server stopped"); + std::lock_guard lock(sessions_mutex); + if ((int)sessions.size() >= max_total_connections) { + LOG_WARN("Max connections reached, rejecting client"); + close(client_fd); + continue; + } } - - void RTMPServer::acceptClients() + auto session = std::make_shared(client_fd, client_ip); { - while (running) - { - sockaddr_in client_addr; - socklen_t addr_len = sizeof(client_addr); - int client_fd = accept(server_fd, (sockaddr*)&client_addr, &addr_len); - if (client_fd < 0) - { - if (running) - { - LOG_ERROR("Failed to accept client"); - } - continue; - } - std::string client_ip = inet_ntoa(client_addr.sin_addr); - LOG_INFO("New client connected: " + client_ip); - // Check connection limit - { - std::lock_guard lock(sessions_mutex); - if ((int)sessions.size() >= max_total_connections) - { - LOG_WARN("Max connections reached, rejecting client"); - close(client_fd); - continue; - } - } - auto session = std::make_shared(client_fd, client_ip); - { - std::lock_guard lock(sessions_mutex); - sessions.push_back(session); - } - client_threads.emplace_back(&RTMPServer::handleClient, this, session); - } + std::lock_guard lock(sessions_mutex); + sessions.push_back(session); } + client_threads.emplace_back(&RTMPServer::handleClient, this, session); + } +} - void RTMPServer::handleClient(std::shared_ptr session) - { - if (!session->handshake()) - { - LOG_ERROR("Handshake failed"); - removeSession(session); - return; - } - LOG_INFO("Handshake completed"); - if (on_connect) - { - on_connect(session); - } - bool connection_active = true; - bool publish_notified = false; - bool play_notified = false; - while (running && connection_active) - { - if (!session->receiveChunk()) - { - connection_active = false; - break; - } - auto& info = session->getStreamInfo(); - // Handle publish callback - if (info.is_publishing && !publish_notified) - { - // Check authentication - if (auth_callback && !auth_callback(info.app, info.stream_key, info.client_ip)) - { - LOG_WARN("Authentication failed for: " + info.app + "/" + info.stream_key); - session->sendErrorResponse("publish", 0, "Authentication failed"); - connection_active = false; - break; - } - // Check connection limits - if (!checkConnectionLimits(info.app, info.stream_key, true)) - { - LOG_WARN("Publisher limit reached for: " + info.app + "/" + info.stream_key); - session->sendErrorResponse("publish", 0, "Publisher limit reached"); - connection_active = false; - break; - } - if (on_publish) - { - on_publish(session, info.app, info.stream_key); - } - publish_notified = true; - // Initialize GOP cache - if (use_gop_cache) - { - std::string key = makeStreamKey(info.app, info.stream_key); - std::lock_guard lock(gop_mutex); - if (gop_caches.find(key) == gop_caches.end()) - { - gop_caches[key] = std::make_shared(); - } - } - } - // Handle play callback - if (info.is_playing && !play_notified) - { - // Check authentication - if (auth_callback && !auth_callback(info.app, info.stream_key, info.client_ip)) - { - LOG_WARN("Authentication failed for: " + info.app + "/" + info.stream_key); - session->sendErrorResponse("play", 0, "Authentication failed"); - connection_active = false; - break; - } - // Check connection limits - if (!checkConnectionLimits(info.app, info.stream_key, false)) - { - LOG_WARN("Player limit reached for: " + info.app + "/" + info.stream_key); - session->sendErrorResponse("play", 0, "Player limit reached"); - connection_active = false; - break; - } - if (on_play) - { - on_play(session, info.app, info.stream_key); - } - play_notified = true; - // Send GOP cache to new player - if (use_gop_cache) - { - std::string key = makeStreamKey(info.app, info.stream_key); - std::lock_guard lock(gop_mutex); - auto it = gop_caches.find(key); - if (it != gop_caches.end() && it->second->hasKeyframe()) - { - it->second->sendToPlayer(session.get()); - LOG_INFO("Sent GOP cache to new player"); - } - } - } - // Process media messages - if (info.is_publishing) - { - processMediaMessages(session); - } - } - LOG_INFO("Client disconnected"); - // Clean up GOP cache if last publisher - auto& info = session->getStreamInfo(); - if (info.is_publishing) - { - std::string key = makeStreamKey(info.app, info.stream_key); - if (countPublishers(info.app, info.stream_key) <= 1) - { - std::lock_guard lock(gop_mutex); - gop_caches.erase(key); - } - } - if (on_disconnect) - { - on_disconnect(session); - } - removeSession(session); +void RTMPServer::handleClient(std::shared_ptr session) { + if (!session->handshake()) { + LOG_ERROR("Handshake failed"); + removeSession(session); + return; + } + LOG_INFO("Handshake completed"); + if (on_connect) { + on_connect(session); + } + bool connection_active = true; + bool publish_notified = false; + bool play_notified = false; + while (running && connection_active) { + if (!session->receiveChunk()) { + connection_active = false; + break; } - - void RTMPServer::processMediaMessages(std::shared_ptr session) - { - std::lock_guard lock(session->getQueueMutex()); - while (!session->getMessageQueue().empty()) - { - RTMPMessage msg = session->getMessageQueue().front(); - session->getMessageQueue().pop(); - MessageType type = (MessageType)msg.header.msg_type_id; - const auto& info = session->getStreamInfo(); - std::string key = makeStreamKey(info.app, info.stream_key); - switch (type) - { - case MessageType::AUDIO: - if (on_audio_data) - { - on_audio_data(session, msg.payload, msg.header.timestamp); - } - // Add to GOP cache - if (use_gop_cache) - { - std::lock_guard lock(gop_mutex); - auto it = gop_caches.find(key); - if (it != gop_caches.end()) - { - it->second->addAudioFrame(msg.payload, msg.header.timestamp); - } - } - // Record if enabled - { - std::lock_guard lock(recorder_mutex); - auto it = recorders.find(key); - if (it != recorders.end() && it->second->isRecording()) - { - it->second->writeAudioFrame(msg.payload, msg.header.timestamp); - } - } - // Relay to players - sendAudioToPlayers(info.app, info.stream_key, - msg.payload, msg.header.timestamp); - break; - case MessageType::VIDEO: - if (on_video_data) - { - on_video_data(session, msg.payload, msg.header.timestamp); - } - // Add to GOP cache - if (use_gop_cache) - { - std::lock_guard lock(gop_mutex); - auto it = gop_caches.find(key); - if (it != gop_caches.end()) - { - it->second->addVideoFrame(msg.payload, msg.header.timestamp); - } - } - // Record if enabled - { - std::lock_guard lock(recorder_mutex); - auto it = recorders.find(key); - if (it != recorders.end() && it->second->isRecording()) - { - it->second->writeVideoFrame(msg.payload, msg.header.timestamp); - } - } - // Relay to players - sendVideoToPlayers(info.app, info.stream_key, - msg.payload, msg.header.timestamp); - break; - case MessageType::DATA_AMF0: - { - size_t offset = 0; - auto command = session->decodeAMF0(msg.payload.data(), - msg.payload.size(), offset); - if (command && command->type == AMF0Type::STRING) - { - if (command->string == "@setDataFrame") - { - auto metadata_name = session->decodeAMF0(msg.payload.data(), - msg.payload.size(), offset); - } - auto metadata_obj = session->decodeAMF0(msg.payload.data(), - msg.payload.size(), offset); - if (metadata_obj && (metadata_obj->type == AMF0Type::OBJECT || - metadata_obj->type == AMF0Type::ECMA_ARRAY)) - { - if (on_metadata) - { - on_metadata(session, metadata_obj->object); - } - // Add to GOP cache - if (use_gop_cache) - { - std::lock_guard lock(gop_mutex); - auto it = gop_caches.find(key); - if (it != gop_caches.end()) - { - it->second->addMetadata(msg.payload); - } - } - // Record if enabled - { - std::lock_guard lock(recorder_mutex); - auto it = recorders.find(key); - if (it != recorders.end() && it->second->isRecording()) - { - it->second->writeMetadata(metadata_obj->object); - } - } - // Relay metadata to players - sendMetadataToPlayers(info.app, info.stream_key, msg.payload); - } - } - break; - } - default: - break; - } + auto &info = session->getStreamInfo(); + // Handle publish callback + if (info.is_publishing && !publish_notified) { + // Check authentication + if (auth_callback && + !auth_callback(info.app, info.stream_key, info.client_ip)) { + LOG_WARN("Authentication failed for: " + info.app + "/" + + info.stream_key); + session->sendErrorResponse("publish", 0, "Authentication failed"); + connection_active = false; + break; + } + // Check connection limits + if (!checkConnectionLimits(info.app, info.stream_key, true)) { + LOG_WARN("Publisher limit reached for: " + info.app + "/" + + info.stream_key); + session->sendErrorResponse("publish", 0, "Publisher limit reached"); + connection_active = false; + break; + } + if (on_publish) { + on_publish(session, info.app, info.stream_key); + } + publish_notified = true; + // Initialize GOP cache + if (use_gop_cache) { + std::string key = makeStreamKey(info.app, info.stream_key); + std::lock_guard lock(gop_mutex); + if (gop_caches.find(key) == gop_caches.end()) { + gop_caches[key] = std::make_shared(); } + } } - - void RTMPServer::removeSession(std::shared_ptr session) - { - std::lock_guard lock(sessions_mutex); - sessions.erase(std::remove(sessions.begin(), sessions.end(), session), - sessions.end()); + // Handle play callback + if (info.is_playing && !play_notified) { + // Check authentication + if (auth_callback && + !auth_callback(info.app, info.stream_key, info.client_ip)) { + LOG_WARN("Authentication failed for: " + info.app + "/" + + info.stream_key); + session->sendErrorResponse("play", 0, "Authentication failed"); + connection_active = false; + break; + } + // Check connection limits + if (!checkConnectionLimits(info.app, info.stream_key, false)) { + LOG_WARN("Player limit reached for: " + info.app + "/" + + info.stream_key); + session->sendErrorResponse("play", 0, "Player limit reached"); + connection_active = false; + break; + } + if (on_play) { + on_play(session, info.app, info.stream_key); + } + play_notified = true; + // Send GOP cache to new player + if (use_gop_cache) { + std::string key = makeStreamKey(info.app, info.stream_key); + std::lock_guard lock(gop_mutex); + auto it = gop_caches.find(key); + if (it != gop_caches.end() && it->second->hasKeyframe()) { + it->second->sendToPlayer(session.get()); + LOG_INFO("Sent GOP cache to new player"); + } + } } - - bool RTMPServer::sendAudioToPlayers(const std::string& app, - const std::string& stream_key, - const std::vector &data, uint32_t timestamp) - { - std::lock_guard lock(sessions_mutex); - for (auto& session : sessions) - { - const auto& info = session->getStreamInfo(); - if (info.is_playing && info.app == app && info.stream_key == stream_key) - { - session->sendChunk(4, timestamp, (uint8_t)MessageType::AUDIO, - info.stream_id, data); - } - } - return true; + // Process media messages + if (info.is_publishing) { + processMediaMessages(session); } - - bool RTMPServer::sendVideoToPlayers(const std::string& app, - const std::string& stream_key, - const std::vector &data, uint32_t timestamp) - { - std::lock_guard lock(sessions_mutex); - for (auto& session : sessions) - { - const auto& info = session->getStreamInfo(); - if (info.is_playing && info.app == app && info.stream_key == stream_key) - { - session->sendChunk(4, timestamp, (uint8_t)MessageType::VIDEO, - info.stream_id, data); - } - } - return true; + } + LOG_INFO("Client disconnected"); + // Clean up GOP cache if last publisher + auto &info = session->getStreamInfo(); + if (info.is_publishing) { + std::string key = makeStreamKey(info.app, info.stream_key); + if (countPublishers(info.app, info.stream_key) <= 1) { + std::lock_guard lock(gop_mutex); + gop_caches.erase(key); } + } + if (on_disconnect) { + on_disconnect(session); + } + removeSession(session); +} - void RTMPServer::sendMetadataToPlayers(const std::string& app, - const std::string& stream_key, - const std::vector &data) - { - std::lock_guard lock(sessions_mutex); - for (auto& session : sessions) - { - const auto& info = session->getStreamInfo(); - if (info.is_playing && info.app == app && info.stream_key == stream_key) - { - session->sendChunk(4, 0, (uint8_t)MessageType::DATA_AMF0, - info.stream_id, data); - } +void RTMPServer::processMediaMessages(std::shared_ptr session) { + std::lock_guard lock(session->getQueueMutex()); + while (!session->getMessageQueue().empty()) { + RTMPMessage msg = session->getMessageQueue().front(); + session->getMessageQueue().pop(); + MessageType type = (MessageType)msg.header.msg_type_id; + const auto &info = session->getStreamInfo(); + std::string key = makeStreamKey(info.app, info.stream_key); + switch (type) { + case MessageType::AUDIO: + if (on_audio_data) { + on_audio_data(session, msg.payload, msg.header.timestamp); + } + // Add to GOP cache + if (use_gop_cache) { + std::lock_guard lock(gop_mutex); + auto it = gop_caches.find(key); + if (it != gop_caches.end()) { + it->second->addAudioFrame(msg.payload, msg.header.timestamp); } - } - - void RTMPServer::pingClientsRoutine() - { - while (running) - { - std::this_thread::sleep_for(std::chrono::seconds(ping_interval)); - if (!running) break; - std::lock_guard lock(sessions_mutex); - uint32_t timestamp = std::chrono::duration_cast( - std::chrono::steady_clock::now().time_since_epoch()).count(); - for (auto& session : sessions) - { - session->sendPing(timestamp); - } - LOG_DEBUG("Sent PING to all clients"); - } - } - - void RTMPServer::timeoutCheckRoutine() - { - while (running) - { - std::this_thread::sleep_for(std::chrono::seconds(5)); - if (!running) break; - auto now = std::chrono::steady_clock::now(); - std::vector> to_remove; - { - std::lock_guard lock(sessions_mutex); - for (auto& session : sessions) - { - auto elapsed = std::chrono::duration_cast( - now - session->getLastActivity()).count(); - if (elapsed > connection_timeout) - { - LOG_WARN("Session timeout: " + - session->getStreamInfo().client_ip); - to_remove.push_back(session); - } - } - } - // Remove timed out sessions - for (auto& session : to_remove) - { - if (on_disconnect) - { - on_disconnect(session); - } - removeSession(session); - } - } - } - - int RTMPServer::countPublishers(const std::string& app, - const std::string& stream_key) const - { - std::lock_guard lock(sessions_mutex); - int count = 0; - for (const auto& session : sessions) - { - const auto& info = session->getStreamInfo(); - if (info.is_publishing && info.app == app && info.stream_key == stream_key) - { - count++; - } - } - return count; - } - - int RTMPServer::countPlayers(const std::string& app, - const std::string& stream_key) const - { - std::lock_guard lock(sessions_mutex); - int count = 0; - for (const auto& session : sessions) - { - const auto& info = session->getStreamInfo(); - if (info.is_playing && info.app == app && info.stream_key == stream_key) - { - count++; - } - } - return count; - } - - bool RTMPServer::checkConnectionLimits(const std::string& app, - const std::string& stream_key, - bool is_publisher) const - { - if (is_publisher) - { - int current = countPublishers(app, stream_key); - return current < max_publishers_per_stream; - } - else - { - int current = countPlayers(app, stream_key); - return current < max_players_per_stream; - } - } - - int RTMPServer::getActivePublishers() const - { - std::lock_guard lock(sessions_mutex); - int count = 0; - for (const auto& session : sessions) - { - if (session->getStreamInfo().is_publishing) - { - count++; - } - } - return count; - } - - int RTMPServer::getActivePlayers() const - { - std::lock_guard lock(sessions_mutex); - int count = 0; - for (const auto& session : sessions) - { - if (session->getStreamInfo().is_playing) - { - count++; - } - } - return count; - } - - int RTMPServer::getTotalConnections() const - { - std::lock_guard lock(sessions_mutex); - return sessions.size(); - } - - StreamStatistics RTMPServer::getStreamStats(const std::string& app, - const std::string& stream_key) const - { - std::lock_guard lock(sessions_mutex); - StreamStatistics combined; - for (const auto& session : sessions) - { - const auto& info = session->getStreamInfo(); - if (info.app == app && info.stream_key == stream_key) - { - const auto& stats = session->getStats(); - combined.bytes_sent += stats.bytes_sent; - combined.bytes_received += stats.bytes_received; - combined.video_frames += stats.video_frames; - combined.audio_frames += stats.audio_frames; - combined.dropped_frames += stats.dropped_frames; - } - } - return combined; - } - - std::vector> - RTMPServer::getAllStreamStats() const - { - std::lock_guard lock(sessions_mutex); - std::map stats_map; - for (const auto& session : sessions) - { - const auto& info = session->getStreamInfo(); - if (info.is_publishing || info.is_playing) - { - std::string key = makeStreamKey(info.app, info.stream_key); - const auto& stats = session->getStats(); - auto& combined = stats_map[key]; - combined.bytes_sent += stats.bytes_sent; - combined.bytes_received += stats.bytes_received; - combined.video_frames += stats.video_frames; - combined.audio_frames += stats.audio_frames; - combined.dropped_frames += stats.dropped_frames; - } - } - std::vector> result; - for (const auto& pair : stats_map) - { - result.push_back(pair); - } - return result; - } - - bool RTMPServer::startRecording(const std::string& app, - const std::string& stream_key, - const std::string& filename) - { - std::string key = makeStreamKey(app, stream_key); - std::lock_guard lock(recorder_mutex); - if (recorders.find(key) != recorders.end() && recorders[key]->isRecording()) - { - LOG_WARN("Already recording stream: " + key); - return false; - } - auto recorder = std::make_shared(filename); - if (!recorder->start()) - { - LOG_ERROR("Failed to start recording: " + filename); - return false; - } - recorders[key] = recorder; - LOG_INFO("Started recording " + key + " to " + filename); - return true; - } - - void RTMPServer::stopRecording(const std::string& app, - const std::string& stream_key) - { - std::string key = makeStreamKey(app, stream_key); + } + // Record if enabled + { std::lock_guard lock(recorder_mutex); auto it = recorders.find(key); - if (it != recorders.end()) - { - it->second->stop(); - recorders.erase(it); - LOG_INFO("Stopped recording: " + key); + if (it != recorders.end() && it->second->isRecording()) { + it->second->writeAudioFrame(msg.payload, msg.header.timestamp); } - } - - bool RTMPServer::isRecording(const std::string& app, - const std::string& stream_key) const - { - std::string key = makeStreamKey(app, stream_key); + } + // Relay to players + sendAudioToPlayers(info.app, info.stream_key, msg.payload, + msg.header.timestamp); + break; + case MessageType::VIDEO: + if (on_video_data) { + on_video_data(session, msg.payload, msg.header.timestamp); + } + // Add to GOP cache + if (use_gop_cache) { + std::lock_guard lock(gop_mutex); + auto it = gop_caches.find(key); + if (it != gop_caches.end()) { + it->second->addVideoFrame(msg.payload, msg.header.timestamp); + } + } + // Record if enabled + { std::lock_guard lock(recorder_mutex); auto it = recorders.find(key); - return (it != recorders.end() && it->second->isRecording()); - } - - void RTMPServer::enablePingPong(bool enable, int interval_seconds) - { - ping_enabled = enable; - ping_interval = interval_seconds; - if (enable && running && !ping_thread.joinable()) - { - ping_thread = std::thread(&RTMPServer::pingClientsRoutine, this); + if (it != recorders.end() && it->second->isRecording()) { + it->second->writeVideoFrame(msg.payload, msg.header.timestamp); } + } + // Relay to players + sendVideoToPlayers(info.app, info.stream_key, msg.payload, + msg.header.timestamp); + break; + case MessageType::DATA_AMF0: { + size_t offset = 0; + auto command = + session->decodeAMF0(msg.payload.data(), msg.payload.size(), offset); + if (command && command->type == AMF0Type::STRING) { + if (command->string == "@setDataFrame") { + auto metadata_name = session->decodeAMF0(msg.payload.data(), + msg.payload.size(), offset); + } + auto metadata_obj = + session->decodeAMF0(msg.payload.data(), msg.payload.size(), offset); + if (metadata_obj && (metadata_obj->type == AMF0Type::OBJECT || + metadata_obj->type == AMF0Type::ECMA_ARRAY)) { + if (on_metadata) { + on_metadata(session, metadata_obj->object); + } + // Add to GOP cache + if (use_gop_cache) { + std::lock_guard lock(gop_mutex); + auto it = gop_caches.find(key); + if (it != gop_caches.end()) { + it->second->addMetadata(msg.payload); + } + } + // Record if enabled + { + std::lock_guard lock(recorder_mutex); + auto it = recorders.find(key); + if (it != recorders.end() && it->second->isRecording()) { + it->second->writeMetadata(metadata_obj->object); + } + } + // Relay metadata to players + sendMetadataToPlayers(info.app, info.stream_key, msg.payload); + } + } + break; } + default: + break; + } + } +} + +void RTMPServer::removeSession(std::shared_ptr session) { + std::lock_guard lock(sessions_mutex); + sessions.erase(std::remove(sessions.begin(), sessions.end(), session), + sessions.end()); +} + +bool RTMPServer::sendAudioToPlayers(const std::string &app, + const std::string &stream_key, + const std::vector &data, + uint32_t timestamp) { + std::lock_guard lock(sessions_mutex); + for (auto &session : sessions) { + const auto &info = session->getStreamInfo(); + if (info.is_playing && info.app == app && info.stream_key == stream_key) { + session->sendChunk(4, timestamp, (uint8_t)MessageType::AUDIO, + info.stream_id, data); + } + } + return true; +} + +bool RTMPServer::sendVideoToPlayers(const std::string &app, + const std::string &stream_key, + const std::vector &data, + uint32_t timestamp) { + std::lock_guard lock(sessions_mutex); + for (auto &session : sessions) { + const auto &info = session->getStreamInfo(); + if (info.is_playing && info.app == app && info.stream_key == stream_key) { + session->sendChunk(4, timestamp, (uint8_t)MessageType::VIDEO, + info.stream_id, data); + } + } + return true; +} + +void RTMPServer::sendMetadataToPlayers(const std::string &app, + const std::string &stream_key, + const std::vector &data) { + std::lock_guard lock(sessions_mutex); + for (auto &session : sessions) { + const auto &info = session->getStreamInfo(); + if (info.is_playing && info.app == app && info.stream_key == stream_key) { + session->sendChunk(4, 0, (uint8_t)MessageType::DATA_AMF0, info.stream_id, + data); + } + } +} + +void RTMPServer::pingClientsRoutine() { + while (running) { + std::this_thread::sleep_for(std::chrono::seconds(ping_interval)); + if (!running) + break; + std::lock_guard lock(sessions_mutex); + uint32_t timestamp = + std::chrono::duration_cast( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + for (auto &session : sessions) { + session->sendPing(timestamp); + } + LOG_DEBUG("Sent PING to all clients"); + } +} + +void RTMPServer::timeoutCheckRoutine() { + while (running) { + std::this_thread::sleep_for(std::chrono::seconds(5)); + if (!running) + break; + auto now = std::chrono::steady_clock::now(); + std::vector> to_remove; + { + std::lock_guard lock(sessions_mutex); + for (auto &session : sessions) { + auto elapsed = std::chrono::duration_cast( + now - session->getLastActivity()) + .count(); + if (elapsed > connection_timeout) { + LOG_WARN("Session timeout: " + session->getStreamInfo().client_ip); + to_remove.push_back(session); + } + } + } + // Remove timed out sessions + for (auto &session : to_remove) { + if (on_disconnect) { + on_disconnect(session); + } + removeSession(session); + } + } +} + +int RTMPServer::countPublishers(const std::string &app, + const std::string &stream_key) const { + std::lock_guard lock(sessions_mutex); + int count = 0; + for (const auto &session : sessions) { + const auto &info = session->getStreamInfo(); + if (info.is_publishing && info.app == app && + info.stream_key == stream_key) { + count++; + } + } + return count; +} + +int RTMPServer::countPlayers(const std::string &app, + const std::string &stream_key) const { + std::lock_guard lock(sessions_mutex); + int count = 0; + for (const auto &session : sessions) { + const auto &info = session->getStreamInfo(); + if (info.is_playing && info.app == app && info.stream_key == stream_key) { + count++; + } + } + return count; +} + +bool RTMPServer::checkConnectionLimits(const std::string &app, + const std::string &stream_key, + bool is_publisher) const { + if (is_publisher) { + int current = countPublishers(app, stream_key); + return current < max_publishers_per_stream; + } else { + int current = countPlayers(app, stream_key); + return current < max_players_per_stream; + } +} + +int RTMPServer::getActivePublishers() const { + std::lock_guard lock(sessions_mutex); + int count = 0; + for (const auto &session : sessions) { + if (session->getStreamInfo().is_publishing) { + count++; + } + } + return count; +} + +int RTMPServer::getActivePlayers() const { + std::lock_guard lock(sessions_mutex); + int count = 0; + for (const auto &session : sessions) { + if (session->getStreamInfo().is_playing) { + count++; + } + } + return count; +} + +int RTMPServer::getTotalConnections() const { + std::lock_guard lock(sessions_mutex); + return sessions.size(); +} + +StreamStatistics +RTMPServer::getStreamStats(const std::string &app, + const std::string &stream_key) const { + std::lock_guard lock(sessions_mutex); + StreamStatistics combined; + for (const auto &session : sessions) { + const auto &info = session->getStreamInfo(); + if (info.app == app && info.stream_key == stream_key) { + const auto &stats = session->getStats(); + combined.bytes_sent += stats.bytes_sent; + combined.bytes_received += stats.bytes_received; + combined.video_frames += stats.video_frames; + combined.audio_frames += stats.audio_frames; + combined.dropped_frames += stats.dropped_frames; + } + } + return combined; +} + +std::vector> +RTMPServer::getAllStreamStats() const { + std::lock_guard lock(sessions_mutex); + std::map stats_map; + for (const auto &session : sessions) { + const auto &info = session->getStreamInfo(); + if (info.is_publishing || info.is_playing) { + std::string key = makeStreamKey(info.app, info.stream_key); + const auto &stats = session->getStats(); + auto &combined = stats_map[key]; + combined.bytes_sent += stats.bytes_sent; + combined.bytes_received += stats.bytes_received; + combined.video_frames += stats.video_frames; + combined.audio_frames += stats.audio_frames; + combined.dropped_frames += stats.dropped_frames; + } + } + std::vector> result; + for (const auto &pair : stats_map) { + result.push_back(pair); + } + return result; +} + +bool RTMPServer::startRecording(const std::string &app, + const std::string &stream_key, + const std::string &filename) { + std::string key = makeStreamKey(app, stream_key); + std::lock_guard lock(recorder_mutex); + if (recorders.find(key) != recorders.end() && recorders[key]->isRecording()) { + LOG_WARN("Already recording stream: " + key); + return false; + } + auto recorder = std::make_shared(filename); + if (!recorder->start()) { + LOG_ERROR("Failed to start recording: " + filename); + return false; + } + recorders[key] = recorder; + LOG_INFO("Started recording " + key + " to " + filename); + return true; +} + +void RTMPServer::stopRecording(const std::string &app, + const std::string &stream_key) { + std::string key = makeStreamKey(app, stream_key); + std::lock_guard lock(recorder_mutex); + auto it = recorders.find(key); + if (it != recorders.end()) { + it->second->stop(); + recorders.erase(it); + LOG_INFO("Stopped recording: " + key); + } +} + +bool RTMPServer::isRecording(const std::string &app, + const std::string &stream_key) const { + std::string key = makeStreamKey(app, stream_key); + std::lock_guard lock(recorder_mutex); + auto it = recorders.find(key); + return (it != recorders.end() && it->second->isRecording()); +} + +void RTMPServer::enablePingPong(bool enable, int interval_seconds) { + ping_enabled = enable; + ping_interval = interval_seconds; + if (enable && running && !ping_thread.joinable()) { + ping_thread = std::thread(&RTMPServer::pingClientsRoutine, this); + } +} } // namespace rtmp \ No newline at end of file