commit 8c0cf3789f072e2bbecd2d7e3b9fbaadd1a4dd41 Author: Exil Productions Date: Fri Dec 26 21:58:16 2025 +0100 Initial Commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..9a24e44 --- /dev/null +++ b/.gitignore @@ -0,0 +1,135 @@ +# File created using '.gitignore Generator' for Visual Studio Code: https://bit.ly/vscode-gig +# Created by https://www.toptal.com/developers/gitignore/api/visualstudiocode,linux,c,c++,cmake +# Edit at https://www.toptal.com/developers/gitignore?templates=visualstudiocode,linux,c,c++,cmake + +### C ### +# Prerequisites +*.d + +# Object files +*.o +*.ko +*.obj +*.elf + +# Linker output +*.ilk +*.map +*.exp + +# Precompiled Headers +*.gch +*.pch + +# Libraries +*.lib +*.a +*.la +*.lo + +# Shared objects (inc. Windows DLLs) +*.dll +*.so +*.so.* +*.dylib + +# Executables +*.exe +*.out +*.app +*.i*86 +*.x86_64 +*.hex + +# Debug files +*.dSYM/ +*.su +*.idb +*.pdb + +# Kernel Module Compile Results +*.mod* +*.cmd +.tmp_versions/ +modules.order +Module.symvers +Mkfile.old +dkms.conf + +### C++ ### +# Prerequisites + +# Compiled Object files +*.slo + +# Precompiled Headers + +# Compiled Dynamic libraries + +# Fortran module files +*.mod +*.smod + +# Compiled Static libraries +*.lai + +# Executables + +### CMake ### +CMakeLists.txt.user +CMakeCache.txt +CMakeFiles +CMakeScripts +Testing +Makefile +cmake_install.cmake +install_manifest.txt +compile_commands.json +CTestTestfile.cmake +_deps +build + +### CMake Patch ### +CMakeUserPresets.json + +# External projects +*-prefix/ + +### Linux ### +*~ + +# temporary files which can be created if a process still has a handle open of a deleted file +.fuse_hidden* + +# KDE directory preferences +.directory + +# Linux trash folder which might appear on any partition or disk +.Trash-* + +# .nfs files are created when an open file is removed but is still being accessed +.nfs* + +### VisualStudioCode ### +.vscode/* +!.vscode/settings.json +!.vscode/tasks.json +!.vscode/launch.json +!.vscode/extensions.json +!.vscode/*.code-snippets + +# Local History for Visual Studio Code +.history/ + +# Built Visual Studio Code Extensions +*.vsix + +### VisualStudioCode Patch ### +# Ignore all local history of files +.history +.ionide + +# End of https://www.toptal.com/developers/gitignore/api/visualstudiocode,linux,c,c++,cmake + +# Custom rules (everything added below won't be overriden by 'Generate .gitignore File' if you use 'Update' option) + diff --git a/CMakeLists.txt b/CMakeLists.txt new file mode 100644 index 0000000..7fb3c3d --- /dev/null +++ b/CMakeLists.txt @@ -0,0 +1,33 @@ +cmake_minimum_required(VERSION 3.10) +project(rtmp-cpp) + +set(CMAKE_CXX_STANDARD 17) +set(CMAKE_CXX_STANDARD_REQUIRED ON) + +include_directories(include) + +add_library(rtmp SHARED + src/rtmp_server.cpp + src/rtmp_capi.cpp +) + +target_include_directories(rtmp PUBLIC + <BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include> + <INSTALL_INTERFACE:include> +) + +set_target_properties(rtmp PROPERTIES + PUBLIC_HEADER include/rtmp_capi.h +) + +# Example executable +add_executable(rtmp_example example/main.c) +target_link_libraries(rtmp_example rtmp) + +install(TARGETS rtmp + LIBRARY DESTINATION lib + ARCHIVE DESTINATION lib + PUBLIC_HEADER DESTINATION include +) + +install(TARGETS rtmp_example DESTINATION bin) diff --git a/README.md b/README.md new file mode 100644 index 0000000..9f66c81 --- /dev/null +++ b/README.md @@ -0,0 +1,42 @@ +# rtmp-cpp +[![Build Status](https://img.shields.io/badge/build-passing-brightgreen.svg)](https://github.com/user/rtmp-cpp) + +A lightweight C++ RTMP server library with C-compatible API. + +## Features +- Full RTMP protocol implementation (handshake, chunking, AMF0) +- Supports publish and play streams +- Callbacks for connect, publish, play, audio/video data, disconnect +- GOP cache for low-latency playback +- FLV file recording +- Authentication callback +- Stream statistics (bitrate, frames, uptime) +- Connection limits, timeouts, ping/pong + +## Quick Start + +### Build +```bash +./build.sh +``` + +This builds `librtmp.so` and example binary `rtmp_example`. + +### Run Example +```bash +./build/rtmp_example +``` + +Server listens on `rtmp://localhost:1935/live/stream` + +Test with OBS: +- Server: `rtmp://127.0.0.1/live` +- Stream key: `stream` + +Or FFmpeg: +```bash +ffmpeg -re -i input.mp4 -c copy -f flv rtmp://127.0.0.1/live/stream +``` + +## License +MIT \ No newline at end of file diff --git a/build.sh b/build.sh new file mode 100644 index 0000000..6272d22 --- /dev/null +++ b/build.sh @@ -0,0 +1,4 @@ +rm -rf build +mkdir build && cd build +cmake -DCMAKE_BUILD_TYPE=Release .. +make -j$(nproc) \ No newline at end of file diff --git a/example/main.c b/example/main.c new file mode 100644 index 0000000..66dcd76 --- /dev/null +++ b/example/main.c @@ -0,0 +1,43 @@ +#include "../include/rtmp_capi.h" +#include +#include +#include + +static void on_connect_cb(const char* ip, void* data) +{ + printf("Client connected: %s\n", ip); +} + +static void on_publish_cb(const char* ip, const char* app, const char* key, + void* data) +{ + printf("Publish from %s: %s/%s\n", ip, app, key); +} + +static void on_audio_cb(const char* app, const char* key, const uint8_t* data, + uint32_t len, uint32_t ts, void* ud) +{ + printf("Audio data for %s/%s, len=%u, ts=%u\n", app, key, len, ts); +} + +int main() +{ + rtmp_logger_set_level(RTMP_LOG_INFO); + RtmpServerHandle server = rtmp_server_create(1935); + rtmp_server_set_on_connect(server, on_connect_cb, NULL); + rtmp_server_set_on_publish(server, on_publish_cb, NULL); + rtmp_server_set_on_audio_data(server, on_audio_cb, NULL); + rtmp_server_enable_gop_cache(server, true); + if (rtmp_server_start(server)) + { + printf("RTMP Server started on port 1935. Press Ctrl+C to stop.\n"); + sleep(300); // 5 min + } + else + { + printf("Failed to start server\n"); + } + rtmp_server_stop(server); + rtmp_server_destroy(server); + return 0; +} \ No newline at end of file diff --git a/include/rtmp_capi.h b/include/rtmp_capi.h new file mode 100644 index 0000000..7bebb51 --- /dev/null +++ b/include/rtmp_capi.h @@ -0,0 +1,112 @@ +#ifndef RTMP_CAPI_H +#define RTMP_CAPI_H + +#include +#include + +#ifdef __cplusplus +extern "C" { +#endif + +typedef void *RtmpServerHandle; + +enum RtmpLogLevel +{ + RTMP_LOG_ERROR = 0, + RTMP_LOG_WARN = 1, + RTMP_LOG_INFO = 2, + RTMP_LOG_DEBUG = 3 +}; + +struct RtmpStreamStats +{ + uint64_t bytes_sent; + uint64_t bytes_received; + uint32_t video_frames; + uint32_t audio_frames; + uint32_t dropped_frames; + double bitrate_kbps; + double uptime_seconds; +}; + +typedef void (*RtmpOnConnectCallback)(const char* client_ip, void* user_data); +typedef void (*RtmpOnPublishCallback)(const char* client_ip, const char* app, + const char* stream_key, void* user_data); +typedef void (*RtmpOnPlayCallback)(const char* client_ip, const char* app, + const char* stream_key, void* user_data); +typedef void (*RtmpOnAudioDataCallback)(const char* app, const char* stream_key, + const uint8_t* data, uint32_t length, uint32_t timestamp, void* user_data); +typedef void (*RtmpOnVideoDataCallback)(const char* app, const char* stream_key, + const uint8_t* data, uint32_t length, uint32_t timestamp, void* user_data); +typedef void (*RtmpOnDisconnectCallback)(const char* client_ip, const char* app, + const char* stream_key, bool was_publishing, bool was_playing, void* user_data); +typedef bool (*RtmpAuthCallback)(const char* app, const char* stream_key, + const char* client_ip, void* user_data); + +// Create and destroy +RtmpServerHandle rtmp_server_create(int port); +void rtmp_server_destroy(RtmpServerHandle handle); +bool rtmp_server_start(RtmpServerHandle handle); +void rtmp_server_stop(RtmpServerHandle handle); +bool rtmp_server_is_running(RtmpServerHandle handle); + +// Callbacks +void rtmp_server_set_on_connect(RtmpServerHandle handle, + RtmpOnConnectCallback cb, void* user_data); +void rtmp_server_set_on_publish(RtmpServerHandle handle, + RtmpOnPublishCallback cb, void* user_data); +void rtmp_server_set_on_play(RtmpServerHandle handle, RtmpOnPlayCallback cb, + void* user_data); +void rtmp_server_set_on_audio_data(RtmpServerHandle handle, + RtmpOnAudioDataCallback cb, void* user_data); +void rtmp_server_set_on_video_data(RtmpServerHandle handle, + RtmpOnVideoDataCallback cb, void* user_data); +void rtmp_server_set_on_disconnect(RtmpServerHandle handle, + RtmpOnDisconnectCallback cb, void* user_data); +void rtmp_server_set_auth_callback(RtmpServerHandle handle, RtmpAuthCallback cb, + void* user_data); + +// Configuration +void rtmp_server_enable_gop_cache(RtmpServerHandle handle, bool enable); +void rtmp_server_set_max_publishers_per_stream(RtmpServerHandle handle, + int max); +void rtmp_server_set_max_players_per_stream(RtmpServerHandle handle, int max); +void rtmp_server_set_max_total_connections(RtmpServerHandle handle, int max); +void rtmp_server_set_connection_timeout(RtmpServerHandle handle, int seconds); +void rtmp_server_enable_ping_pong(RtmpServerHandle handle, bool enable, + int interval_seconds); + +// Stats +int rtmp_server_get_active_publishers(RtmpServerHandle handle); +int rtmp_server_get_active_players(RtmpServerHandle handle); +int rtmp_server_get_total_connections(RtmpServerHandle handle); +struct RtmpStreamStats rtmp_server_get_stream_stats(RtmpServerHandle handle, + const char* app, const char* stream_key); + +// Recording +bool rtmp_server_start_recording(RtmpServerHandle handle, const char* app, + const char* stream_key, const char* filename); +void rtmp_server_stop_recording(RtmpServerHandle handle, const char* app, + const char* stream_key); +bool rtmp_server_is_recording(RtmpServerHandle handle, const char* app, + const char* stream_key); + +// Broadcasting +bool rtmp_server_broadcast_audio(RtmpServerHandle handle, const char* app, + const char* stream_key, const uint8_t* data, uint32_t length, + uint32_t timestamp); +bool rtmp_server_broadcast_video(RtmpServerHandle handle, const char* app, + const char* stream_key, const uint8_t* data, uint32_t length, + uint32_t timestamp); +// FIXED: Added missing declaration +bool rtmp_server_broadcast_metadata(RtmpServerHandle handle, const char* app, + const char* stream_key, const uint8_t* data, uint32_t length); + +// Logger +void rtmp_logger_set_level(enum RtmpLogLevel level); + +#ifdef __cplusplus +} +#endif + +#endif // RTMP_CAPI_H \ No newline at end of file diff --git a/include/rtmp_server.h b/include/rtmp_server.h new file mode 100644 index 0000000..fe09c97 --- /dev/null +++ b/include/rtmp_server.h @@ -0,0 +1,593 @@ +#ifndef RTMP_SERVER_H +#define RTMP_SERVER_H + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace rtmp +{ + +// RTMP Message Types + enum class MessageType : uint8_t + { + SET_CHUNK_SIZE = 1, + ABORT_MESSAGE = 2, + ACKNOWLEDGEMENT = 3, + USER_CONTROL = 4, + WINDOW_ACK_SIZE = 5, + SET_PEER_BANDWIDTH = 6, + AUDIO = 8, + VIDEO = 9, + DATA_AMF3 = 15, + SHARED_OBJECT_AMF3 = 16, + COMMAND_AMF3 = 17, + DATA_AMF0 = 18, + SHARED_OBJECT_AMF0 = 19, + COMMAND_AMF0 = 20, + AGGREGATE = 22 + }; + +// User Control Message Types + enum class UserControlType : uint16_t + { + STREAM_BEGIN = 0, + STREAM_EOF = 1, + STREAM_DRY = 2, + SET_BUFFER_LENGTH = 3, + STREAM_IS_RECORDED = 4, + PING_REQUEST = 6, + PING_RESPONSE = 7 + }; + +// AMF0 Data Types + enum class AMF0Type : uint8_t + { + NUMBER = 0x00, + BOOLEAN = 0x01, + STRING = 0x02, + OBJECT = 0x03, + NULL_TYPE = 0x05, + UNDEFINED = 0x06, + ECMA_ARRAY = 0x08, + OBJECT_END = 0x09 + }; + +// Log Levels + enum class LogLevel + { + ERROR = 0, + WARN = 1, + INFO = 2, + DEBUG = 3 + }; + +// AMF0 Value + class AMF0Value + { + public: + AMF0Type type; + double number; + bool boolean; + std::string string; + std::map> object; + + AMF0Value() : type(AMF0Type::NULL_TYPE), number(0), boolean(false) {} + }; + +// RTMP Chunk Header + struct ChunkHeader + { + uint8_t fmt; + uint32_t csid; + uint32_t timestamp; + uint32_t msg_length; + uint8_t msg_type_id; + uint32_t msg_stream_id; + bool has_extended_timestamp; + }; + +// RTMP Message + struct RTMPMessage + { + ChunkHeader header; + std::vector payload; + }; + +// Stream Information + struct StreamInfo + { + std::string app; + std::string stream_key; + bool is_publishing; + bool is_playing; + int client_fd; + uint32_t stream_id; + std::string client_ip; + }; + +// Stream Statistics + struct StreamStatistics + { + uint64_t bytes_sent = 0; + uint64_t bytes_received = 0; + uint32_t video_frames = 0; + uint32_t audio_frames = 0; + uint32_t dropped_frames = 0; + std::chrono::steady_clock::time_point start_time; + + StreamStatistics() : start_time(std::chrono::steady_clock::now()) {} + + double getBitrate() const + { + auto now = std::chrono::steady_clock::now(); + auto duration = std::chrono::duration_cast( + now - start_time).count(); + if (duration == 0) return 0; + return (bytes_sent * 8.0) / duration / 1000.0; // kbps + } + + double getUptime() const + { + auto now = std::chrono::steady_clock::now(); + return std::chrono::duration_cast( + now - start_time).count(); + } + }; + +// GOP Cache for instant playback + class GOPCache + { + public: + void addVideoFrame(const std::vector &data, uint32_t timestamp); + void addAudioFrame(const std::vector &data, uint32_t timestamp); + void addMetadata(const std::vector &data); + void sendToPlayer(class RTMPSession* session); + void clear(); + bool hasKeyframe() const + { + return has_keyframe; + } + + private: + struct CachedFrame + { + MessageType type; + std::vector data; + uint32_t timestamp; + }; + + std::vector frames; + std::vector metadata; + bool has_keyframe = false; + std::mutex cache_mutex; + + bool isKeyframe(const std::vector &data); + }; + +// FLV File Recorder + class FLVRecorder + { + public: + FLVRecorder(const std::string& filename); + ~FLVRecorder(); + + bool start(); + void stop(); + bool isRecording() const + { + return recording; + } + + void writeAudioFrame(const std::vector &data, uint32_t timestamp); + void writeVideoFrame(const std::vector &data, uint32_t timestamp); + void writeMetadata(const std::map> + &metadata); + + private: + std::string filename; + std::ofstream file; + bool recording = false; + uint32_t last_timestamp = 0; + std::mutex file_mutex; + + void writeFLVHeader(); + void writeFLVTag(uint8_t tag_type, const std::vector &data, + uint32_t timestamp); + std::vector encodeMetadata(const + std::map> &metadata); + }; + +// RTMP Client Session + class RTMPSession + { + public: + RTMPSession(int fd, const std::string& client_ip); + ~RTMPSession(); + + bool handshake(); + bool receiveChunk(); + bool sendMessage(const RTMPMessage& msg); + bool sendChunk(uint32_t csid, uint32_t timestamp, uint8_t msg_type, + uint32_t stream_id, const std::vector &data); + + int getFd() const + { + return client_fd; + } + const StreamInfo &getStreamInfo() const + { + return stream_info; + } + StreamInfo &getStreamInfo() + { + return stream_info; + } + + void setChunkSize(uint32_t size) + { + chunk_size = size; + } + uint32_t getChunkSize() const + { + return chunk_size; + } + + // Acknowledgement handling + void onBytesReceived(size_t bytes); + bool shouldSendAck() const; + void sendAcknowledgement(); + + // Ping/Pong + void sendPing(uint32_t timestamp); + void sendPong(uint32_t timestamp); + + // Statistics + StreamStatistics &getStats() + { + return stats; + } + const StreamStatistics &getStats() const + { + return stats; + } + + // Message queue access for server + std::queue &getMessageQueue() + { + return message_queue; + } + std::mutex &getQueueMutex() + { + return queue_mutex; + } + + // Last activity tracking + std::chrono::steady_clock::time_point getLastActivity() const + { + return last_activity; + } + void updateActivity() + { + last_activity = std::chrono::steady_clock::now(); + } + + // AMF0 public access for server + std::shared_ptr decodeAMF0(const uint8_t* data, size_t len, + size_t &offset); + std::vector encodeAMF0(const AMF0Value& value); + + // FIXED: Moved to public section for RTMPServer access + bool sendErrorResponse(const std::string& command, double transaction_id, + const std::string& description); + + private: + int client_fd; + uint32_t chunk_size; + uint32_t window_ack_size; + uint32_t peer_bandwidth; + uint32_t bytes_received; + uint32_t last_ack_sent; + std::map prev_headers; + std::map> incomplete_chunks; + StreamInfo stream_info; + std::queue message_queue; + std::mutex queue_mutex; + StreamStatistics stats; + std::chrono::steady_clock::time_point last_activity; + + bool readExactly(uint8_t* buf, size_t len); + bool writeExactly(const uint8_t* buf, size_t len); + bool parseChunkHeader(ChunkHeader& header); + bool processMessage(const RTMPMessage& msg); + bool handleCommand(const RTMPMessage& msg); + bool handleAudioMessage(const RTMPMessage& msg); + bool handleVideoMessage(const RTMPMessage& msg); + bool handleDataMessage(const RTMPMessage& msg); + bool handleUserControl(const RTMPMessage& msg); + bool handleAcknowledgement(const RTMPMessage& msg); + + // AMF0 Encoding/Decoding + std::vector encodeAMF0String(const std::string& str); + std::vector encodeAMF0Number(double num); + std::vector encodeAMF0Object(const + std::map> &obj); + + // Command handlers + bool handleConnect(const std::vector> &args); + bool handleReleaseStream(const std::vector> &args); + bool handleFCPublish(const std::vector> &args); + bool handleCreateStream(const std::vector> &args); + bool handlePublish(const std::vector> &args); + bool handlePlay(const std::vector> &args); + bool handleDeleteStream(const std::vector> &args); + + // Response helpers + bool sendConnectResponse(double transaction_id); + bool sendCreateStreamResponse(double transaction_id, double stream_id); + bool sendPublishResponse(); + bool sendPlayResponse(); + }; + +// Callback types + using OnConnectCallback = std::function)>; + using OnPublishCallback = + std::function, const std::string& app, const std::string& stream_key)>; + using OnPlayCallback = + std::function, const std::string& app, const std::string& stream_key)>; + using OnAudioDataCallback = + std::function, const std::vector& data, uint32_t timestamp)>; + using OnVideoDataCallback = + std::function, const std::vector& data, uint32_t timestamp)>; + using OnMetaDataCallback = + std::function, const std::map>& metadata)>; + using OnDisconnectCallback = std::function)>; + using AuthCallback = + std::function; + +// Logger + class Logger + { + public: + static Logger &getInstance() + { + static Logger instance; + return instance; + } + + void setLevel(LogLevel level) + { + current_level = level; + } + LogLevel getLevel() const + { + return current_level; + } + + void error(const std::string& msg); + void warn(const std::string& msg); + void info(const std::string& msg); + void debug(const std::string& msg); + + private: + Logger() : current_level(LogLevel::INFO) {} + LogLevel current_level; + std::mutex log_mutex; + + void log(LogLevel level, const std::string& msg); + }; + +// RTMP Server + class RTMPServer + { + public: + RTMPServer(int port = 1935); + ~RTMPServer(); + + bool start(); + void stop(); + bool isRunning() const + { + return running; + } + + // Callbacks + void setOnConnect(OnConnectCallback cb) + { + on_connect = cb; + } + void setOnPublish(OnPublishCallback cb) + { + on_publish = cb; + } + void setOnPlay(OnPlayCallback cb) + { + on_play = cb; + } + void setOnAudioData(OnAudioDataCallback cb) + { + on_audio_data = cb; + } + void setOnVideoData(OnVideoDataCallback cb) + { + on_video_data = cb; + } + void setOnMetaData(OnMetaDataCallback cb) + { + on_metadata = cb; + } + void setOnDisconnect(OnDisconnectCallback cb) + { + on_disconnect = cb; + } + void setAuthCallback(AuthCallback cb) + { + auth_callback = cb; + } + + // GOP Cache + void enableGOPCache(bool enable) + { + use_gop_cache = enable; + } + bool isGOPCacheEnabled() const + { + return use_gop_cache; + } + + // Recording + bool startRecording(const std::string& app, const std::string& stream_key, + const std::string& filename); + void stopRecording(const std::string& app, const std::string& stream_key); + bool isRecording(const std::string& app, const std::string& stream_key) const; + + // Statistics + StreamStatistics getStreamStats(const std::string& app, + const std::string& stream_key) const; + std::vector> getAllStreamStats() const; + int getActivePublishers() const; + int getActivePlayers() const; + int getTotalConnections() const; + + // Connection limits + void setMaxPublishersPerStream(int max) + { + max_publishers_per_stream = max; + } + void setMaxPlayersPerStream(int max) + { + max_players_per_stream = max; + } + void setMaxTotalConnections(int max) + { + max_total_connections = max; + } + int getMaxPublishersPerStream() const + { + return max_publishers_per_stream; + } + int getMaxPlayersPerStream() const + { + return max_players_per_stream; + } + int getMaxTotalConnections() const + { + return max_total_connections; + } + + // Ping/Pong + void enablePingPong(bool enable, int interval_seconds = 30); + bool isPingPongEnabled() const + { + return ping_enabled; + } + + // Timeout handling + void setConnectionTimeout(int seconds) + { + connection_timeout = seconds; + } + int getConnectionTimeout() const + { + return connection_timeout; + } + + // Broadcasting + bool sendAudioToPlayers(const std::string& app, const std::string& stream_key, + const std::vector &data, uint32_t timestamp); + bool sendVideoToPlayers(const std::string& app, const std::string& stream_key, + const std::vector &data, uint32_t timestamp); + void sendMetadataToPlayers(const std::string& app, + const std::string& stream_key, + const std::vector &data); + + private: + int port; + int server_fd; + bool running; + std::thread accept_thread; + std::thread ping_thread; + std::thread timeout_thread; + std::vector client_threads; + std::vector> sessions; + mutable std::mutex sessions_mutex; // FIXED: Added mutable + + // Callbacks + OnConnectCallback on_connect; + OnPublishCallback on_publish; + OnPlayCallback on_play; + OnAudioDataCallback on_audio_data; + OnVideoDataCallback on_video_data; + OnMetaDataCallback on_metadata; + OnDisconnectCallback on_disconnect; + AuthCallback auth_callback; + + // GOP Caches + bool use_gop_cache = true; + std::map> gop_caches; + std::mutex gop_mutex; + + // Recorders + std::map> recorders; + mutable std::mutex recorder_mutex; // FIXED: Added mutable + + // Statistics + std::map stream_stats; + mutable std::mutex stats_mutex; + + // Connection limits + int max_publishers_per_stream = 1; + int max_players_per_stream = 1000; + int max_total_connections = 1000; + + // Ping/Pong + bool ping_enabled = false; + int ping_interval = 30; + + // Timeout + int connection_timeout = 60; + + void acceptClients(); + void handleClient(std::shared_ptr session); + void removeSession(std::shared_ptr session); + void processMediaMessages(std::shared_ptr session); + void pingClientsRoutine(); + void timeoutCheckRoutine(); + + std::string makeStreamKey(const std::string& app, + const std::string& stream) const + { + return app + "/" + stream; + } + + int countPublishers(const std::string& app, + const std::string& stream_key) const; + int countPlayers(const std::string& app, const std::string& stream_key) const; + bool checkConnectionLimits(const std::string& app, + const std::string& stream_key, bool is_publisher) const; + }; + +// Utility macros +#define LOG_ERROR(msg) rtmp::Logger::getInstance().error(msg) +#define LOG_WARN(msg) rtmp::Logger::getInstance().warn(msg) +#define LOG_INFO(msg) rtmp::Logger::getInstance().info(msg) +#define LOG_DEBUG(msg) rtmp::Logger::getInstance().debug(msg) + +} // namespace rtmp + +#endif // RTMP_SERVER_H \ No newline at end of file diff --git a/src/rtmp_capi.cpp b/src/rtmp_capi.cpp new file mode 100644 index 0000000..1c6385d --- /dev/null +++ b/src/rtmp_capi.cpp @@ -0,0 +1,294 @@ +#include "rtmp_capi.h" +#include "rtmp_server.h" +#include +#include +#include + +namespace rtmp_capi_internal +{ + struct RtmpServerImpl + { + rtmp::RTMPServer *server; + RtmpOnConnectCallback on_connect_cb; + void *on_connect_userdata; + RtmpOnPublishCallback on_publish_cb; + void *on_publish_userdata; + RtmpOnPlayCallback on_play_cb; + void *on_play_userdata; + RtmpOnAudioDataCallback on_audio_cb; + void *on_audio_userdata; + RtmpOnVideoDataCallback on_video_cb; + void *on_video_userdata; + RtmpOnDisconnectCallback on_disconnect_cb; + void *on_disconnect_userdata; + RtmpAuthCallback auth_cb; + void *auth_userdata; + RtmpServerImpl() : server(nullptr), on_connect_cb(nullptr), + on_connect_userdata(nullptr), + on_publish_cb(nullptr), on_publish_userdata(nullptr), on_play_cb(nullptr), + on_play_userdata(nullptr), + on_audio_cb(nullptr), on_audio_userdata(nullptr), on_video_cb(nullptr), + on_video_userdata(nullptr), + on_disconnect_cb(nullptr), on_disconnect_userdata(nullptr), auth_cb(nullptr), + auth_userdata(nullptr) {} + }; +} + +using Impl = rtmp_capi_internal::RtmpServerImpl; + +extern "C" { + RtmpServerHandle rtmp_server_create(int port) + { + Impl* impl = new Impl(); + impl->server = new rtmp::RTMPServer(port); + impl->server->setOnConnect([impl](std::shared_ptr session) + { + if (!impl || !impl->on_connect_cb) return; + const auto& info = session->getStreamInfo(); + impl->on_connect_cb(info.client_ip.c_str(), impl->on_connect_userdata); + }); + impl->server->setOnPublish([impl](std::shared_ptr session, + const std::string & app, const std::string & stream_key) + { + if (!impl || !impl->on_publish_cb) return; + const auto& info = session->getStreamInfo(); + impl->on_publish_cb(info.client_ip.c_str(), app.c_str(), stream_key.c_str(), + impl->on_publish_userdata); + }); + impl->server->setOnPlay([impl](std::shared_ptr session, + const std::string & app, const std::string & stream_key) + { + if (!impl || !impl->on_play_cb) return; + const auto& info = session->getStreamInfo(); + impl->on_play_cb(info.client_ip.c_str(), app.c_str(), stream_key.c_str(), + impl->on_play_userdata); + }); + impl->server->setOnAudioData([impl](std::shared_ptr session, + const std::vector &data, uint32_t timestamp) + { + if (!impl || !impl->on_audio_cb) return; + const auto& info = session->getStreamInfo(); + impl->on_audio_cb(info.app.c_str(), info.stream_key.c_str(), data.data(), + static_cast(data.size()), timestamp, impl->on_audio_userdata); + }); + impl->server->setOnVideoData([impl](std::shared_ptr session, + const std::vector &data, uint32_t timestamp) + { + if (!impl || !impl->on_video_cb) return; + const auto& info = session->getStreamInfo(); + impl->on_video_cb(info.app.c_str(), info.stream_key.c_str(), data.data(), + static_cast(data.size()), timestamp, impl->on_video_userdata); + }); + impl->server->setOnDisconnect([impl](std::shared_ptr + session) + { + if (!impl || !impl->on_disconnect_cb) return; + const auto& info = session->getStreamInfo(); + impl->on_disconnect_cb(info.client_ip.c_str(), info.app.c_str(), + info.stream_key.c_str(), info.is_publishing, info.is_playing, + impl->on_disconnect_userdata); + }); + impl->server->setAuthCallback([impl](const std::string & app, + const std::string & stream_key, const std::string & client_ip) -> bool + { + if (!impl || !impl->auth_cb) return true; + return impl->auth_cb(app.c_str(), stream_key.c_str(), client_ip.c_str(), impl->auth_userdata); + }); + return impl; + } +// add all other functions as above + void rtmp_server_destroy(RtmpServerHandle handle) + { + if (!handle) return; + Impl* impl = static_cast(handle); + delete impl->server; + delete impl; + } + bool rtmp_server_start(RtmpServerHandle handle) + { + if (!handle) return false; + Impl* impl = static_cast(handle); + return impl->server->start(); + } + void rtmp_server_stop(RtmpServerHandle handle) + { + if (!handle) return; + Impl* impl = static_cast(handle); + impl->server->stop(); + } + bool rtmp_server_is_running(RtmpServerHandle handle) + { + if (!handle) return false; + Impl* impl = static_cast(handle); + return impl->server->isRunning(); + } + void rtmp_server_set_on_connect(RtmpServerHandle handle, + RtmpOnConnectCallback cb, void* user_data) + { + if (!handle) return; + Impl* impl = static_cast(handle); + impl->on_connect_cb = cb; + impl->on_connect_userdata = user_data; + } + void rtmp_server_set_on_publish(RtmpServerHandle handle, + RtmpOnPublishCallback cb, void* user_data) + { + if (!handle) return; + Impl* impl = static_cast(handle); + impl->on_publish_cb = cb; + impl->on_publish_userdata = user_data; + } + void rtmp_server_set_on_play(RtmpServerHandle handle, RtmpOnPlayCallback cb, + void* user_data) + { + if (!handle) return; + Impl* impl = static_cast(handle); + impl->on_play_cb = cb; + impl->on_play_userdata = user_data; + } + void rtmp_server_set_on_audio_data(RtmpServerHandle handle, + RtmpOnAudioDataCallback cb, void* user_data) + { + if (!handle) return; + Impl* impl = static_cast(handle); + impl->on_audio_cb = cb; + impl->on_audio_userdata = user_data; + } + void rtmp_server_set_on_video_data(RtmpServerHandle handle, + RtmpOnVideoDataCallback cb, void* user_data) + { + if (!handle) return; + Impl* impl = static_cast(handle); + impl->on_video_cb = cb; + impl->on_video_userdata = user_data; + } + void rtmp_server_set_on_disconnect(RtmpServerHandle handle, + RtmpOnDisconnectCallback cb, void* user_data) + { + if (!handle) return; + Impl* impl = static_cast(handle); + impl->on_disconnect_cb = cb; + impl->on_disconnect_userdata = user_data; + } + void rtmp_server_set_auth_callback(RtmpServerHandle handle, RtmpAuthCallback cb, + void* user_data) + { + if (!handle) return; + Impl* impl = static_cast(handle); + impl->auth_cb = cb; + impl->auth_userdata = user_data; + } + void rtmp_server_enable_gop_cache(RtmpServerHandle handle, bool enable) + { + if (!handle) return; + static_cast(handle)->server->enableGOPCache(enable); + } + void rtmp_server_set_max_publishers_per_stream(RtmpServerHandle handle, + int max) + { + if (!handle) return; + static_cast(handle)->server->setMaxPublishersPerStream(max); + } + void rtmp_server_set_max_players_per_stream(RtmpServerHandle handle, int max) + { + if (!handle) return; + static_cast(handle)->server->setMaxPlayersPerStream(max); + } + void rtmp_server_set_max_total_connections(RtmpServerHandle handle, int max) + { + if (!handle) return; + static_cast(handle)->server->setMaxTotalConnections(max); + } + void rtmp_server_set_connection_timeout(RtmpServerHandle handle, int seconds) + { + if (!handle) return; + static_cast(handle)->server->setConnectionTimeout(seconds); + } + void rtmp_server_enable_ping_pong(RtmpServerHandle handle, bool enable, + int interval_seconds) + { + if (!handle) return; + static_cast(handle)->server->enablePingPong(enable, interval_seconds); + } + int rtmp_server_get_active_publishers(RtmpServerHandle handle) + { + if (!handle) return 0; + return static_cast(handle)->server->getActivePublishers(); + } + int rtmp_server_get_active_players(RtmpServerHandle handle) + { + if (!handle) return 0; + return static_cast(handle)->server->getActivePlayers(); + } + int rtmp_server_get_total_connections(RtmpServerHandle handle) + { + if (!handle) return 0; + return static_cast(handle)->server->getTotalConnections(); + } + struct RtmpStreamStats rtmp_server_get_stream_stats(RtmpServerHandle handle, + const char* app, const char* stream_key) + { + struct RtmpStreamStats stats = {0}; + if (!handle || !app || !stream_key) return stats; + Impl* impl = static_cast(handle); + auto cstats = impl->server->getStreamStats(app, stream_key); + stats.bytes_sent = cstats.bytes_sent; + stats.bytes_received = cstats.bytes_received; + stats.video_frames = cstats.video_frames; + stats.audio_frames = cstats.audio_frames; + stats.dropped_frames = cstats.dropped_frames; + stats.bitrate_kbps = cstats.getBitrate(); + stats.uptime_seconds = cstats.getUptime(); + return stats; + } + bool rtmp_server_start_recording(RtmpServerHandle handle, const char* app, + const char* stream_key, const char* filename) + { + if (!handle || !app || !stream_key || !filename) return false; + return static_cast(handle)->server->startRecording(app, stream_key, + filename); + } + void rtmp_server_stop_recording(RtmpServerHandle handle, const char* app, + const char* stream_key) + { + if (!handle || !app || !stream_key) return; + static_cast(handle)->server->stopRecording(app, stream_key); + } + bool rtmp_server_is_recording(RtmpServerHandle handle, const char* app, + const char* stream_key) + { + if (!handle || !app || !stream_key) return false; + return static_cast(handle)->server->isRecording(app, stream_key); + } + bool rtmp_server_broadcast_audio(RtmpServerHandle handle, const char* app, + const char* stream_key, const uint8_t* data, uint32_t length, + uint32_t timestamp) + { + if (!handle || !app || !stream_key || !data || length == 0) return false; + Impl* impl = static_cast(handle); + std::vector vec(data, data + length); + return impl->server->sendAudioToPlayers(app, stream_key, vec, timestamp); + } + bool rtmp_server_broadcast_video(RtmpServerHandle handle, const char* app, + const char* stream_key, const uint8_t* data, uint32_t length, + uint32_t timestamp) + { + if (!handle || !app || !stream_key || !data || length == 0) return false; + Impl* impl = static_cast(handle); + std::vector vec(data, data + length); + return impl->server->sendVideoToPlayers(app, stream_key, vec, timestamp); + } + bool rtmp_server_broadcast_metadata(RtmpServerHandle handle, const char* app, + const char* stream_key, const uint8_t* data, uint32_t length) + { + if (!handle || !app || !stream_key || !data || length == 0) return false; + Impl* impl = static_cast(handle); + std::vector vec(data, data + length); + impl->server->sendMetadataToPlayers(app, stream_key, vec); + return true; + } + void rtmp_logger_set_level(RtmpLogLevel level) + { + rtmp::Logger::getInstance().setLevel((rtmp::LogLevel)level); + } +} \ No newline at end of file diff --git a/src/rtmp_server.cpp b/src/rtmp_server.cpp new file mode 100644 index 0000000..7cbfdf5 --- /dev/null +++ b/src/rtmp_server.cpp @@ -0,0 +1,1765 @@ +#include "rtmp_server.h" +#include +#include +#include +#include +#include +#include + +namespace rtmp +{ + +// Utility functions + static uint16_t readUint16BE(const uint8_t* data) + { + return (data[0] << 8) | data[1]; + } + + static uint32_t readUint24BE(const uint8_t* data) + { + return (data[0] << 16) | (data[1] << 8) | data[2]; + } + + static uint32_t readUint32BE(const uint8_t* data) + { + return (data[0] << 24) | (data[1] << 16) | (data[2] << 8) | data[3]; + } + + static void writeUint16BE(uint8_t* data, uint16_t val) + { + data[0] = (val >> 8) & 0xFF; + data[1] = val & 0xFF; + } + + static void writeUint24BE(uint8_t* data, uint32_t val) + { + data[0] = (val >> 16) & 0xFF; + data[1] = (val >> 8) & 0xFF; + data[2] = val & 0xFF; + } + + static void writeUint32BE(uint8_t* data, uint32_t val) + { + data[0] = (val >> 24) & 0xFF; + data[1] = (val >> 16) & 0xFF; + data[2] = (val >> 8) & 0xFF; + data[3] = val & 0xFF; + } + + static double readDouble(const uint8_t* data) + { + uint64_t val = ((uint64_t)data[0] << 56) | ((uint64_t)data[1] << 48) | + ((uint64_t)data[2] << 40) | ((uint64_t)data[3] << 32) | + ((uint64_t)data[4] << 24) | ((uint64_t)data[5] << 16) | + ((uint64_t)data[6] << 8) | (uint64_t)data[7]; + double result; + memcpy(&result, &val, 8); + return result; + } + + static void writeDouble(uint8_t* data, double val) + { + uint64_t bits; + memcpy(&bits, &val, 8); + data[0] = (bits >> 56) & 0xFF; + data[1] = (bits >> 48) & 0xFF; + data[2] = (bits >> 40) & 0xFF; + data[3] = (bits >> 32) & 0xFF; + data[4] = (bits >> 24) & 0xFF; + data[5] = (bits >> 16) & 0xFF; + data[6] = (bits >> 8) & 0xFF; + data[7] = bits & 0xFF; + } + +// Logger Implementation + void Logger::log(LogLevel level, const std::string& msg) + { + if (level > current_level) return; + std::lock_guard lock(log_mutex); + auto now = std::chrono::system_clock::now(); + auto time = std::chrono::system_clock::to_time_t(now); + const char *level_str[] = {"ERROR", "WARN", "INFO", "DEBUG"}; + std::cout << "[" << std::put_time(std::localtime(&time), "%Y-%m-%d %H:%M:%S") + << "] [" << level_str[(int)level] << "] " << msg << std::endl; + } + + void Logger::error(const std::string& msg) + { + log(LogLevel::ERROR, msg); + } + void Logger::warn(const std::string& msg) + { + log(LogLevel::WARN, msg); + } + void Logger::info(const std::string& msg) + { + log(LogLevel::INFO, msg); + } + void Logger::debug(const std::string& msg) + { + log(LogLevel::DEBUG, msg); + } + +// GOP Cache Implementation + bool GOPCache::isKeyframe(const std::vector &data) + { + if (data.empty()) return false; + uint8_t frame_type = (data[0] >> 4) & 0x0F; + return frame_type == 1; // 1 = keyframe (IDR) + } + + void GOPCache::addVideoFrame(const std::vector &data, + uint32_t timestamp) + { + std::lock_guard lock(cache_mutex); + if (isKeyframe(data)) + { + frames.clear(); + has_keyframe = true; + } + if (has_keyframe) + { + CachedFrame frame; + frame.type = MessageType::VIDEO; + frame.data = data; + frame.timestamp = timestamp; + frames.push_back(frame); + if (frames.size() > 300) + { + for (size_t i = 1; i < frames.size(); i++) + { + if (frames[i].type == MessageType::VIDEO && isKeyframe(frames[i].data)) + { + frames.erase(frames.begin(), frames.begin() + i); + break; + } + } + } + } + } + + void GOPCache::addAudioFrame(const std::vector &data, + uint32_t timestamp) + { + std::lock_guard lock(cache_mutex); + if (has_keyframe) + { + CachedFrame frame; + frame.type = MessageType::AUDIO; + frame.data = data; + frame.timestamp = timestamp; + frames.push_back(frame); + } + } + + void GOPCache::addMetadata(const std::vector &data) + { + std::lock_guard lock(cache_mutex); + metadata = data; + } + + void GOPCache::sendToPlayer(RTMPSession* session) + { + std::lock_guard lock(cache_mutex); + const auto& info = session->getStreamInfo(); + if (!metadata.empty()) + { + session->sendChunk(4, 0, (uint8_t)MessageType::DATA_AMF0, info.stream_id, + metadata); + } + for (const auto& frame : frames) + { + session->sendChunk(4, frame.timestamp, (uint8_t)frame.type, + info.stream_id, frame.data); + } + } + + void GOPCache::clear() + { + std::lock_guard lock(cache_mutex); + frames.clear(); + metadata.clear(); + has_keyframe = false; + } + +// FLV Recorder Implementation + FLVRecorder::FLVRecorder(const std::string& filename) : filename(filename) {} + + FLVRecorder::~FLVRecorder() + { + stop(); + } + + bool FLVRecorder::start() + { + std::lock_guard lock(file_mutex); + file.open(filename, std::ios::binary); + if (!file.is_open()) return false; + writeFLVHeader(); + recording = true; + return true; + } + + void FLVRecorder::stop() + { + std::lock_guard lock(file_mutex); + if (file.is_open()) + { + file.close(); + } + recording = false; + } + + void FLVRecorder::writeFLVHeader() + { + uint8_t header[] = + { + 'F', 'L', 'V', 0x01, 0x05, + 0x00, 0x00, 0x00, 0x09 + }; + file.write(reinterpret_cast(header), sizeof(header)); + uint32_t prev_size = 0; + file.write(reinterpret_cast(&prev_size), 4); + } + + void FLVRecorder::writeFLVTag(uint8_t tag_type, + const std::vector &data, + uint32_t timestamp) + { + if (!recording || !file.is_open()) return; + std::lock_guard lock(file_mutex); + uint32_t data_size = data.size(); + file.put(tag_type); + file.put((data_size >> 16) & 0xFF); + file.put((data_size >> 8) & 0xFF); + file.put(data_size & 0xFF); + file.put((timestamp >> 16) & 0xFF); + file.put((timestamp >> 8) & 0xFF); + file.put(timestamp & 0xFF); + file.put((timestamp >> 24) & 0xFF); + file.put(0); file.put(0); file.put(0); + file.write(reinterpret_cast(data.data()), data.size()); + uint32_t tag_size = 11 + data_size; + file.put((tag_size >> 24) & 0xFF); + file.put((tag_size >> 16) & 0xFF); + file.put((tag_size >> 8) & 0xFF); + file.put(tag_size & 0xFF); + last_timestamp = timestamp; + } + + void FLVRecorder::writeVideoFrame(const std::vector &data, + uint32_t timestamp) + { + writeFLVTag(0x09, data, timestamp); + } + + void FLVRecorder::writeAudioFrame(const std::vector &data, + uint32_t timestamp) + { + writeFLVTag(0x08, data, timestamp); + } + + void FLVRecorder::writeMetadata(const + std::map> &metadata) + { + auto encoded = encodeMetadata(metadata); + writeFLVTag(0x12, encoded, 0); + } + + std::vector FLVRecorder::encodeMetadata( + const std::map> &metadata) + { + std::vector result; + result.push_back(0x02); + uint16_t len = 10; + result.push_back((len >> 8) & 0xFF); + result.push_back(len & 0xFF); + std::string meta_str = "onMetaData"; + result.insert(result.end(), meta_str.begin(), meta_str.end()); + result.push_back(0x08); + uint32_t count = metadata.size(); + result.push_back((count >> 24) & 0xFF); + result.push_back((count >> 16) & 0xFF); + result.push_back((count >> 8) & 0xFF); + result.push_back(count & 0xFF); + for (const auto& pair : metadata) + { + uint16_t key_len = pair.first.size(); + result.push_back((key_len >> 8) & 0xFF); + result.push_back(key_len & 0xFF); + result.insert(result.end(), pair.first.begin(), pair.first.end()); + if (pair.second->type == AMF0Type::NUMBER) + { + result.push_back(0x00); + uint64_t bits; + memcpy(&bits, &pair.second->number, 8); + for (int i = 7; i >= 0; i--) + { + result.push_back((bits >> (i * 8)) & 0xFF); + } + } + else if (pair.second->type == AMF0Type::STRING) + { + result.push_back(0x02); + uint16_t str_len = pair.second->string.size(); + result.push_back((str_len >> 8) & 0xFF); + result.push_back(str_len & 0xFF); + result.insert(result.end(), pair.second->string.begin(), + pair.second->string.end()); + } + } + result.push_back(0x00); result.push_back(0x00); result.push_back(0x09); + return result; + } + +// RTMPSession Implementation + RTMPSession::RTMPSession(int fd, const std::string& client_ip) + : client_fd(fd), chunk_size(128), window_ack_size(2500000), + peer_bandwidth(2500000), bytes_received(0), last_ack_sent(0) + { + stream_info.is_publishing = false; + stream_info.is_playing = false; + stream_info.client_fd = fd; + stream_info.stream_id = 0; + stream_info.client_ip = client_ip; + last_activity = std::chrono::steady_clock::now(); + } + + RTMPSession::~RTMPSession() + { + if (client_fd >= 0) + { + close(client_fd); + } + } + + bool RTMPSession::readExactly(uint8_t* buf, size_t len) + { + size_t total = 0; + while (total < len) + { + ssize_t n = recv(client_fd, buf + total, len - total, 0); + if (n <= 0) return false; + total += n; + onBytesReceived(n); + } + updateActivity(); + return true; + } + + bool RTMPSession::writeExactly(const uint8_t* buf, size_t len) + { + size_t total = 0; + while (total < len) + { + ssize_t n = send(client_fd, buf + total, len - total, MSG_NOSIGNAL); + if (n <= 0) return false; + total += n; + } + stats.bytes_sent += len; + updateActivity(); + return true; + } + + void RTMPSession::onBytesReceived(size_t bytes) + { + bytes_received += bytes; + stats.bytes_received += bytes; + } + + bool RTMPSession::shouldSendAck() const + { + return (bytes_received - last_ack_sent) >= window_ack_size; + } + + void RTMPSession::sendAcknowledgement() + { + std::vector ack(4); + writeUint32BE(ack.data(), bytes_received); + sendChunk(2, 0, (uint8_t)MessageType::ACKNOWLEDGEMENT, 0, ack); + last_ack_sent = bytes_received; + LOG_DEBUG("Sent ACK: " + std::to_string(bytes_received)); + } + + void RTMPSession::sendPing(uint32_t timestamp) + { + std::vector ping_msg(6); + writeUint16BE(ping_msg.data(), (uint16_t)UserControlType::PING_REQUEST); + writeUint32BE(ping_msg.data() + 2, timestamp); + sendChunk(2, 0, (uint8_t)MessageType::USER_CONTROL, 0, ping_msg); + } + + void RTMPSession::sendPong(uint32_t timestamp) + { + std::vector pong_msg(6); + writeUint16BE(pong_msg.data(), (uint16_t)UserControlType::PING_RESPONSE); + writeUint32BE(pong_msg.data() + 2, timestamp); + sendChunk(2, 0, (uint8_t)MessageType::USER_CONTROL, 0, pong_msg); + } + + bool RTMPSession::handshake() + { + uint8_t c0c1[1537]; + if (!readExactly(c0c1, 1537)) return false; + if (c0c1[0] != 3) return false; + uint8_t s0s1[1537]; + s0s1[0] = 3; + memset(s0s1 + 1, 0, 8); + for (int i = 9; i < 1537; i++) + { + s0s1[i] = rand() % 256; + } + if (!writeExactly(s0s1, 1537)) return false; + if (!writeExactly(c0c1 + 1, 1536)) return false; + uint8_t c2[1536]; + if (!readExactly(c2, 1536)) return false; + return true; + } + + bool RTMPSession::parseChunkHeader(ChunkHeader& header) + { + uint8_t basic_header; + if (!readExactly(&basic_header, 1)) return false; + header.fmt = (basic_header >> 6) & 0x03; + header.csid = basic_header & 0x3F; + if (header.csid == 0) + { + uint8_t csid_byte; + if (!readExactly(&csid_byte, 1)) return false; + header.csid = 64 + csid_byte; + } + else if (header.csid == 1) + { + uint8_t csid_bytes[2]; + if (!readExactly(csid_bytes, 2)) return false; + header.csid = 64 + csid_bytes[0] + (csid_bytes[1] * 256); + } + ChunkHeader prev = prev_headers[header.csid]; + header.has_extended_timestamp = false; + if (header.fmt == 0) + { + uint8_t buf[11]; + if (!readExactly(buf, 11)) return false; + header.timestamp = readUint24BE(buf); + header.msg_length = readUint24BE(buf + 3); + header.msg_type_id = buf[6]; + header.msg_stream_id = buf[7] | (buf[8] << 8) | (buf[9] << 16) | + (buf[10] << 24); + if (header.timestamp == 0xFFFFFF) + { + uint8_t ext_ts[4]; + if (!readExactly(ext_ts, 4)) return false; + header.timestamp = readUint32BE(ext_ts); + header.has_extended_timestamp = true; + } + } + else if (header.fmt == 1) + { + uint8_t buf[7]; + if (!readExactly(buf, 7)) return false; + uint32_t timestamp_delta = readUint24BE(buf); + header.msg_length = readUint24BE(buf + 3); + header.msg_type_id = buf[6]; + header.msg_stream_id = prev.msg_stream_id; + if (timestamp_delta == 0xFFFFFF) + { + uint8_t ext_ts[4]; + if (!readExactly(ext_ts, 4)) return false; + timestamp_delta = readUint32BE(ext_ts); + header.has_extended_timestamp = true; + } + header.timestamp = prev.timestamp + timestamp_delta; + } + else if (header.fmt == 2) + { + uint8_t buf[3]; + if (!readExactly(buf, 3)) return false; + uint32_t timestamp_delta = readUint24BE(buf); + header.msg_length = prev.msg_length; + header.msg_type_id = prev.msg_type_id; + header.msg_stream_id = prev.msg_stream_id; + if (timestamp_delta == 0xFFFFFF) + { + uint8_t ext_ts[4]; + if (!readExactly(ext_ts, 4)) return false; + timestamp_delta = readUint32BE(ext_ts); + header.has_extended_timestamp = true; + } + header.timestamp = prev.timestamp + timestamp_delta; + } + else + { + header.timestamp = prev.timestamp; + header.msg_length = prev.msg_length; + header.msg_type_id = prev.msg_type_id; + header.msg_stream_id = prev.msg_stream_id; + } + prev_headers[header.csid] = header; + return true; + } + + bool RTMPSession::receiveChunk() + { + ChunkHeader header; + if (!parseChunkHeader(header)) return false; + auto& incomplete = incomplete_chunks[header.csid]; + size_t to_read = std::min((size_t)chunk_size, + (size_t)(header.msg_length - incomplete.size())); + std::vector chunk_data(to_read); + if (!readExactly(chunk_data.data(), to_read)) return false; + incomplete.insert(incomplete.end(), chunk_data.begin(), chunk_data.end()); + if (incomplete.size() >= header.msg_length) + { + RTMPMessage msg; + msg.header = header; + msg.payload = incomplete; + incomplete.clear(); + incomplete_chunks.erase(header.csid); + { + std::lock_guard lock(queue_mutex); + message_queue.push(msg); + } + // Send ACK if needed + if (shouldSendAck()) + { + sendAcknowledgement(); + } + return processMessage(msg); + } + return true; + } + + bool RTMPSession::sendChunk(uint32_t csid, uint32_t timestamp, uint8_t msg_type, + uint32_t stream_id, const std::vector &data) + { + size_t sent = 0; + bool first = true; + while (sent < data.size()) + { + std::vector chunk; + uint8_t fmt = first ? 0 : 3; + if (csid < 64) + { + chunk.push_back((fmt << 6) | csid); + } + else if (csid < 320) + { + chunk.push_back(fmt << 6); + chunk.push_back((csid - 64) & 0xFF); + } + else + { + chunk.push_back((fmt << 6) | 1); + chunk.push_back((csid - 64) & 0xFF); + chunk.push_back(((csid - 64) >> 8) & 0xFF); + } + if (first) + { + uint8_t msg_header[11]; + writeUint24BE(msg_header, timestamp >= 0xFFFFFF ? 0xFFFFFF : timestamp); + writeUint24BE(msg_header + 3, data.size()); + msg_header[6] = msg_type; + msg_header[7] = stream_id & 0xFF; + msg_header[8] = (stream_id >> 8) & 0xFF; + msg_header[9] = (stream_id >> 16) & 0xFF; + msg_header[10] = (stream_id >> 24) & 0xFF; + chunk.insert(chunk.end(), msg_header, msg_header + 11); + if (timestamp >= 0xFFFFFF) + { + uint8_t ext_ts[4]; + writeUint32BE(ext_ts, timestamp); + chunk.insert(chunk.end(), ext_ts, ext_ts + 4); + } + } + size_t to_send = std::min(chunk_size, (uint32_t)(data.size() - sent)); + chunk.insert(chunk.end(), data.begin() + sent, data.begin() + sent + to_send); + if (!writeExactly(chunk.data(), chunk.size())) return false; + sent += to_send; + first = false; + } + return true; + } + + std::shared_ptr RTMPSession::decodeAMF0(const uint8_t* data, + size_t len, size_t &offset) + { + if (offset >= len) return nullptr; + auto val = std::make_shared(); + val->type = (AMF0Type)data[offset++]; + switch (val->type) + { + case AMF0Type::NUMBER: + if (offset + 8 > len) return nullptr; + val->number = readDouble(data + offset); + offset += 8; + break; + case AMF0Type::BOOLEAN: + if (offset >= len) return nullptr; + val->boolean = data[offset++] != 0; + break; + case AMF0Type::STRING: + { + if (offset + 2 > len) return nullptr; + uint16_t str_len = readUint16BE(data + offset); + offset += 2; + if (offset + str_len > len) return nullptr; + val->string = std::string((char*)(data + offset), str_len); + offset += str_len; + break; + } + case AMF0Type::OBJECT: + while (offset < len) + { + if (offset + 2 > len) return nullptr; + uint16_t key_len = readUint16BE(data + offset); + offset += 2; + if (key_len == 0 && offset < len && + data[offset] == (uint8_t)AMF0Type::OBJECT_END) + { + offset++; + break; + } + if (offset + key_len > len) return nullptr; + std::string key((char*)(data + offset), key_len); + offset += key_len; + auto value = decodeAMF0(data, len, offset); + if (!value) return nullptr; + val->object[key] = value; + } + break; + case AMF0Type::NULL_TYPE: + case AMF0Type::UNDEFINED: + break; + case AMF0Type::ECMA_ARRAY: + if (offset + 4 > len) return nullptr; + offset += 4; + while (offset < len) + { + if (offset + 2 > len) return nullptr; + uint16_t key_len = readUint16BE(data + offset); + offset += 2; + if (key_len == 0 && offset < len && + data[offset] == (uint8_t)AMF0Type::OBJECT_END) + { + offset++; + break; + } + if (offset + key_len > len) return nullptr; + std::string key((char*)(data + offset), key_len); + offset += key_len; + auto value = decodeAMF0(data, len, offset); + if (!value) return nullptr; + val->object[key] = value; + } + break; + } + return val; + } + + std::vector RTMPSession::encodeAMF0String(const std::string& str) + { + std::vector result; + result.push_back((uint8_t)AMF0Type::STRING); + uint8_t len_buf[2]; + writeUint16BE(len_buf, str.size()); + result.insert(result.end(), len_buf, len_buf + 2); + result.insert(result.end(), str.begin(), str.end()); + return result; + } + + std::vector RTMPSession::encodeAMF0Number(double num) + { + std::vector result; + result.push_back((uint8_t)AMF0Type::NUMBER); + uint8_t num_buf[8]; + writeDouble(num_buf, num); + result.insert(result.end(), num_buf, num_buf + 8); + return result; + } + + std::vector RTMPSession::encodeAMF0Object(const + std::map> &obj) + { + std::vector result; + result.push_back((uint8_t)AMF0Type::OBJECT); + for (const auto& pair : obj) + { + uint8_t key_len[2]; + writeUint16BE(key_len, pair.first.size()); + result.insert(result.end(), key_len, key_len + 2); + result.insert(result.end(), pair.first.begin(), pair.first.end()); + auto encoded = encodeAMF0(*pair.second); + result.insert(result.end(), encoded.begin(), encoded.end()); + } + result.push_back(0); + result.push_back(0); + result.push_back((uint8_t)AMF0Type::OBJECT_END); + return result; + } + + std::vector RTMPSession::encodeAMF0(const AMF0Value& value) + { + switch (value.type) + { + case AMF0Type::NUMBER: + return encodeAMF0Number(value.number); + case AMF0Type::STRING: + return encodeAMF0String(value.string); + case AMF0Type::OBJECT: + return encodeAMF0Object(value.object); + case AMF0Type::NULL_TYPE: + { + std::vector result; + result.push_back((uint8_t)AMF0Type::NULL_TYPE); + return result; + } + case AMF0Type::BOOLEAN: + { + std::vector result; + result.push_back((uint8_t)AMF0Type::BOOLEAN); + result.push_back(value.boolean ? 1 : 0); + return result; + } + default: + return std::vector(); + } + } + + bool RTMPSession::processMessage(const RTMPMessage& msg) + { + MessageType type = (MessageType)msg.header.msg_type_id; + switch (type) + { + case MessageType::SET_CHUNK_SIZE: + if (msg.payload.size() >= 4) + { + chunk_size = readUint32BE(msg.payload.data()) & 0x7FFFFFFF; + LOG_DEBUG("Chunk size set to: " + std::to_string(chunk_size)); + } + break; + case MessageType::WINDOW_ACK_SIZE: + if (msg.payload.size() >= 4) + { + window_ack_size = readUint32BE(msg.payload.data()); + LOG_DEBUG("Window ACK size set to: " + std::to_string(window_ack_size)); + } + break; + case MessageType::SET_PEER_BANDWIDTH: + if (msg.payload.size() >= 5) + { + peer_bandwidth = readUint32BE(msg.payload.data()); + LOG_DEBUG("Peer bandwidth set to: " + std::to_string(peer_bandwidth)); + } + break; + case MessageType::COMMAND_AMF0: + return handleCommand(msg); + case MessageType::AUDIO: + return handleAudioMessage(msg); + case MessageType::VIDEO: + return handleVideoMessage(msg); + case MessageType::DATA_AMF0: + return handleDataMessage(msg); + case MessageType::USER_CONTROL: + return handleUserControl(msg); + case MessageType::ACKNOWLEDGEMENT: + return handleAcknowledgement(msg); + default: + break; + } + return true; + } + + bool RTMPSession::handleUserControl(const RTMPMessage& msg) + { + if (msg.payload.size() < 2) return true; + uint16_t event_type = readUint16BE(msg.payload.data()); + UserControlType type = (UserControlType)event_type; + switch (type) + { + case UserControlType::PING_REQUEST: + if (msg.payload.size() >= 6) + { + uint32_t timestamp = readUint32BE(msg.payload.data() + 2); + sendPong(timestamp); + LOG_DEBUG("Received PING, sent PONG"); + } + break; + case UserControlType::PING_RESPONSE: + LOG_DEBUG("Received PONG response"); + break; + default: + break; + } + return true; + } + + bool RTMPSession::handleAcknowledgement(const RTMPMessage& msg) + { + if (msg.payload.size() >= 4) + { + uint32_t ack_value = readUint32BE(msg.payload.data()); + LOG_DEBUG("Received ACK: " + std::to_string(ack_value)); + } + return true; + } + + bool RTMPSession::handleAudioMessage(const RTMPMessage& msg) + { + stats.audio_frames++; + return true; + } + + bool RTMPSession::handleVideoMessage(const RTMPMessage& msg) + { + stats.video_frames++; + return true; + } + + bool RTMPSession::handleDataMessage(const RTMPMessage& msg) + { + size_t offset = 0; + auto command = decodeAMF0(msg.payload.data(), msg.payload.size(), offset); + if (!command || command->type != AMF0Type::STRING) return true; + if (command->string == "@setDataFrame" || command->string == "onMetaData") + { + if (command->string == "@setDataFrame") + { + auto metadata_name = decodeAMF0(msg.payload.data(), msg.payload.size(), offset); + if (!metadata_name || metadata_name->type != AMF0Type::STRING) return true; + } + auto metadata_obj = decodeAMF0(msg.payload.data(), msg.payload.size(), offset); + if (metadata_obj && (metadata_obj->type == AMF0Type::OBJECT || + metadata_obj->type == AMF0Type::ECMA_ARRAY)) + { + LOG_INFO("Received metadata"); + } + } + return true; + } + + bool RTMPSession::handleCommand(const RTMPMessage& msg) + { + size_t offset = 0; + std::vector> args; + while (offset < msg.payload.size()) + { + auto arg = decodeAMF0(msg.payload.data(), msg.payload.size(), offset); + if (!arg) break; + args.push_back(arg); + } + if (args.empty() || args[0]->type != AMF0Type::STRING) return false; + std::string command = args[0]->string; + LOG_DEBUG("Received command: " + command); + if (command == "connect") + { + return handleConnect(args); + } + else if (command == "releaseStream") + { + return handleReleaseStream(args); + } + else if (command == "FCPublish") + { + return handleFCPublish(args); + } + else if (command == "createStream") + { + return handleCreateStream(args); + } + else if (command == "publish") + { + return handlePublish(args); + } + else if (command == "play") + { + return handlePlay(args); + } + else if (command == "deleteStream") + { + return handleDeleteStream(args); + } + return true; + } + + bool RTMPSession::handleConnect(const std::vector> + &args) + { + if (args.size() < 3) return false; + double transaction_id = args[1]->number; + if (args[2]->type == AMF0Type::OBJECT) + { + auto& props = args[2]->object; + if (props.find("app") != props.end() && props["app"]->type == AMF0Type::STRING) + { + stream_info.app = props["app"]->string; + LOG_INFO("App: " + stream_info.app); + } + } + std::vector ack_size(4); + writeUint32BE(ack_size.data(), 2500000); + sendChunk(2, 0, (uint8_t)MessageType::WINDOW_ACK_SIZE, 0, ack_size); + std::vector bandwidth(5); + writeUint32BE(bandwidth.data(), 2500000); + bandwidth[4] = 2; + sendChunk(2, 0, (uint8_t)MessageType::SET_PEER_BANDWIDTH, 0, bandwidth); + std::vector stream_begin(6); + writeUint16BE(stream_begin.data(), (uint16_t)UserControlType::STREAM_BEGIN); + writeUint32BE(stream_begin.data() + 2, 0); + sendChunk(2, 0, (uint8_t)MessageType::USER_CONTROL, 0, stream_begin); + std::vector chunk_size_msg(4); + writeUint32BE(chunk_size_msg.data(), 4096); + sendChunk(2, 0, (uint8_t)MessageType::SET_CHUNK_SIZE, 0, chunk_size_msg); + chunk_size = 4096; + return sendConnectResponse(transaction_id); + } + + bool RTMPSession::sendConnectResponse(double transaction_id) + { + std::vector response; + auto cmd = encodeAMF0String("_result"); + response.insert(response.end(), cmd.begin(), cmd.end()); + auto tid = encodeAMF0Number(transaction_id); + response.insert(response.end(), tid.begin(), tid.end()); + AMF0Value props; + props.type = AMF0Type::OBJECT; + props.object["fmsVer"] = std::make_shared(); + props.object["fmsVer"]->type = AMF0Type::STRING; + props.object["fmsVer"]->string = "FMS/3,0,1,123"; + props.object["capabilities"] = std::make_shared(); + props.object["capabilities"]->type = AMF0Type::NUMBER; + props.object["capabilities"]->number = 31; + auto props_enc = encodeAMF0(props); + response.insert(response.end(), props_enc.begin(), props_enc.end()); + AMF0Value info; + info.type = AMF0Type::OBJECT; + info.object["level"] = std::make_shared(); + info.object["level"]->type = AMF0Type::STRING; + info.object["level"]->string = "status"; + info.object["code"] = std::make_shared(); + info.object["code"]->type = AMF0Type::STRING; + info.object["code"]->string = "NetConnection.Connect.Success"; + info.object["description"] = std::make_shared(); + info.object["description"]->type = AMF0Type::STRING; + info.object["description"]->string = "Connection succeeded."; + info.object["objectEncoding"] = std::make_shared(); + info.object["objectEncoding"]->type = AMF0Type::NUMBER; + info.object["objectEncoding"]->number = 0; + auto info_enc = encodeAMF0(info); + response.insert(response.end(), info_enc.begin(), info_enc.end()); + return sendChunk(3, 0, (uint8_t)MessageType::COMMAND_AMF0, 0, response); + } + + bool RTMPSession::handleReleaseStream(const + std::vector> &args) + { + return true; + } + + bool RTMPSession::handleFCPublish(const + std::vector> &args) + { + return true; + } + + bool RTMPSession::handleCreateStream(const + std::vector> &args) + { + if (args.size() < 2) return false; + double transaction_id = args[1]->number; + stream_info.stream_id = 1; + return sendCreateStreamResponse(transaction_id, 1); + } + + bool RTMPSession::sendCreateStreamResponse(double transaction_id, + double stream_id) + { + std::vector response; + auto cmd = encodeAMF0String("_result"); + response.insert(response.end(), cmd.begin(), cmd.end()); + auto tid = encodeAMF0Number(transaction_id); + response.insert(response.end(), tid.begin(), tid.end()); + AMF0Value null_val; + null_val.type = AMF0Type::NULL_TYPE; + auto null_enc = encodeAMF0(null_val); + response.insert(response.end(), null_enc.begin(), null_enc.end()); + auto sid = encodeAMF0Number(stream_id); + response.insert(response.end(), sid.begin(), sid.end()); + return sendChunk(3, 0, (uint8_t)MessageType::COMMAND_AMF0, 0, response); + } + + bool RTMPSession::handlePublish(const std::vector> + &args) + { + if (args.size() < 4) return false; + if (args[3]->type == AMF0Type::STRING) + { + stream_info.stream_key = args[3]->string; + stream_info.is_publishing = true; + LOG_INFO("Publishing to: " + stream_info.stream_key); + } + return sendPublishResponse(); + } + + bool RTMPSession::sendPublishResponse() + { + std::vector response; + auto cmd = encodeAMF0String("onStatus"); + response.insert(response.end(), cmd.begin(), cmd.end()); + auto tid = encodeAMF0Number(0); + response.insert(response.end(), tid.begin(), tid.end()); + AMF0Value null_val; + null_val.type = AMF0Type::NULL_TYPE; + auto null_enc = encodeAMF0(null_val); + response.insert(response.end(), null_enc.begin(), null_enc.end()); + AMF0Value info; + info.type = AMF0Type::OBJECT; + info.object["level"] = std::make_shared(); + info.object["level"]->type = AMF0Type::STRING; + info.object["level"]->string = "status"; + info.object["code"] = std::make_shared(); + info.object["code"]->type = AMF0Type::STRING; + info.object["code"]->string = "NetStream.Publish.Start"; + info.object["description"] = std::make_shared(); + info.object["description"]->type = AMF0Type::STRING; + info.object["description"]->string = "Stream is now published."; + auto info_enc = encodeAMF0(info); + response.insert(response.end(), info_enc.begin(), info_enc.end()); + return sendChunk(5, 0, (uint8_t)MessageType::COMMAND_AMF0, + stream_info.stream_id, response); + } + + bool RTMPSession::handlePlay(const std::vector> + &args) + { + if (args.size() < 4) return false; + if (args[3]->type == AMF0Type::STRING) + { + stream_info.stream_key = args[3]->string; + stream_info.is_playing = true; + LOG_INFO("Playing: " + stream_info.stream_key); + } + return sendPlayResponse(); + } + + bool RTMPSession::sendPlayResponse() + { + std::vector response; + auto cmd = encodeAMF0String("onStatus"); + response.insert(response.end(), cmd.begin(), cmd.end()); + auto tid = encodeAMF0Number(0); + response.insert(response.end(), tid.begin(), tid.end()); + AMF0Value null_val; + null_val.type = AMF0Type::NULL_TYPE; + auto null_enc = encodeAMF0(null_val); + response.insert(response.end(), null_enc.begin(), null_enc.end()); + AMF0Value info; + info.type = AMF0Type::OBJECT; + info.object["level"] = std::make_shared(); + info.object["level"]->type = AMF0Type::STRING; + info.object["level"]->string = "status"; + info.object["code"] = std::make_shared(); + info.object["code"]->type = AMF0Type::STRING; + info.object["code"]->string = "NetStream.Play.Start"; + info.object["description"] = std::make_shared(); + info.object["description"]->type = AMF0Type::STRING; + info.object["description"]->string = "Stream is now playing."; + auto info_enc = encodeAMF0(info); + response.insert(response.end(), info_enc.begin(), info_enc.end()); + return sendChunk(5, 0, (uint8_t)MessageType::COMMAND_AMF0, + stream_info.stream_id, response); + } + + bool RTMPSession::handleDeleteStream(const + std::vector> &args) + { + stream_info.is_publishing = false; + stream_info.is_playing = false; + LOG_INFO("Stream deleted"); + return true; + } + + bool RTMPSession::sendErrorResponse(const std::string& command, + double transaction_id, + const std::string& description) + { + std::vector response; + auto cmd = encodeAMF0String("_error"); + response.insert(response.end(), cmd.begin(), cmd.end()); + auto tid = encodeAMF0Number(transaction_id); + response.insert(response.end(), tid.begin(), tid.end()); + AMF0Value null_val; + null_val.type = AMF0Type::NULL_TYPE; + auto null_enc = encodeAMF0(null_val); + response.insert(response.end(), null_enc.begin(), null_enc.end()); + AMF0Value info; + info.type = AMF0Type::OBJECT; + info.object["level"] = std::make_shared(); + info.object["level"]->type = AMF0Type::STRING; + info.object["level"]->string = "error"; + info.object["code"] = std::make_shared(); + info.object["code"]->type = AMF0Type::STRING; + info.object["code"]->string = "NetConnection.Call.Failed"; + info.object["description"] = std::make_shared(); + info.object["description"]->type = AMF0Type::STRING; + info.object["description"]->string = description; + auto info_enc = encodeAMF0(info); + response.insert(response.end(), info_enc.begin(), info_enc.end()); + return sendChunk(3, 0, (uint8_t)MessageType::COMMAND_AMF0, 0, response); + } + +// RTMPServer Implementation + RTMPServer::RTMPServer(int port) : port(port), server_fd(-1), running(false) {} + + RTMPServer::~RTMPServer() + { + stop(); + } + + bool RTMPServer::start() + { + server_fd = socket(AF_INET, SOCK_STREAM, 0); + if (server_fd < 0) + { + LOG_ERROR("Failed to create socket"); + return false; + } + int opt = 1; + if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) + { + close(server_fd); + return false; + } + sockaddr_in addr; + memset(&addr, 0, sizeof(addr)); + addr.sin_family = AF_INET; + addr.sin_addr.s_addr = INADDR_ANY; + addr.sin_port = htons(port); + if (bind(server_fd, (sockaddr *)&addr, sizeof(addr)) < 0) + { + LOG_ERROR("Failed to bind socket"); + close(server_fd); + return false; + } + if (listen(server_fd, 10) < 0) + { + LOG_ERROR("Failed to listen on socket"); + close(server_fd); + return false; + } + running = true; + accept_thread = std::thread(&RTMPServer::acceptClients, this); + if (ping_enabled) + { + ping_thread = std::thread(&RTMPServer::pingClientsRoutine, this); + } + timeout_thread = std::thread(&RTMPServer::timeoutCheckRoutine, this); + LOG_INFO("RTMP Server started on port " + std::to_string(port)); + return true; + } + + void RTMPServer::stop() + { + if (!running) return; + running = false; + if (server_fd >= 0) + { + close(server_fd); + server_fd = -1; + } + if (accept_thread.joinable()) + { + accept_thread.join(); + } + if (ping_thread.joinable()) + { + ping_thread.join(); + } + if (timeout_thread.joinable()) + { + timeout_thread.join(); + } + for (auto& thread : client_threads) + { + if (thread.joinable()) + { + thread.join(); + } + } + client_threads.clear(); + std::lock_guard lock(sessions_mutex); + sessions.clear(); + LOG_INFO("RTMP Server stopped"); + } + + void RTMPServer::acceptClients() + { + while (running) + { + sockaddr_in client_addr; + socklen_t addr_len = sizeof(client_addr); + int client_fd = accept(server_fd, (sockaddr*)&client_addr, &addr_len); + if (client_fd < 0) + { + if (running) + { + LOG_ERROR("Failed to accept client"); + } + continue; + } + std::string client_ip = inet_ntoa(client_addr.sin_addr); + LOG_INFO("New client connected: " + client_ip); + // Check connection limit + { + std::lock_guard lock(sessions_mutex); + if ((int)sessions.size() >= max_total_connections) + { + LOG_WARN("Max connections reached, rejecting client"); + close(client_fd); + continue; + } + } + auto session = std::make_shared(client_fd, client_ip); + { + std::lock_guard lock(sessions_mutex); + sessions.push_back(session); + } + client_threads.emplace_back(&RTMPServer::handleClient, this, session); + } + } + + void RTMPServer::handleClient(std::shared_ptr session) + { + if (!session->handshake()) + { + LOG_ERROR("Handshake failed"); + removeSession(session); + return; + } + LOG_INFO("Handshake completed"); + if (on_connect) + { + on_connect(session); + } + bool connection_active = true; + bool publish_notified = false; + bool play_notified = false; + while (running && connection_active) + { + if (!session->receiveChunk()) + { + connection_active = false; + break; + } + auto& info = session->getStreamInfo(); + // Handle publish callback + if (info.is_publishing && !publish_notified) + { + // Check authentication + if (auth_callback && !auth_callback(info.app, info.stream_key, info.client_ip)) + { + LOG_WARN("Authentication failed for: " + info.app + "/" + info.stream_key); + session->sendErrorResponse("publish", 0, "Authentication failed"); + connection_active = false; + break; + } + // Check connection limits + if (!checkConnectionLimits(info.app, info.stream_key, true)) + { + LOG_WARN("Publisher limit reached for: " + info.app + "/" + info.stream_key); + session->sendErrorResponse("publish", 0, "Publisher limit reached"); + connection_active = false; + break; + } + if (on_publish) + { + on_publish(session, info.app, info.stream_key); + } + publish_notified = true; + // Initialize GOP cache + if (use_gop_cache) + { + std::string key = makeStreamKey(info.app, info.stream_key); + std::lock_guard lock(gop_mutex); + if (gop_caches.find(key) == gop_caches.end()) + { + gop_caches[key] = std::make_shared(); + } + } + } + // Handle play callback + if (info.is_playing && !play_notified) + { + // Check authentication + if (auth_callback && !auth_callback(info.app, info.stream_key, info.client_ip)) + { + LOG_WARN("Authentication failed for: " + info.app + "/" + info.stream_key); + session->sendErrorResponse("play", 0, "Authentication failed"); + connection_active = false; + break; + } + // Check connection limits + if (!checkConnectionLimits(info.app, info.stream_key, false)) + { + LOG_WARN("Player limit reached for: " + info.app + "/" + info.stream_key); + session->sendErrorResponse("play", 0, "Player limit reached"); + connection_active = false; + break; + } + if (on_play) + { + on_play(session, info.app, info.stream_key); + } + play_notified = true; + // Send GOP cache to new player + if (use_gop_cache) + { + std::string key = makeStreamKey(info.app, info.stream_key); + std::lock_guard lock(gop_mutex); + auto it = gop_caches.find(key); + if (it != gop_caches.end() && it->second->hasKeyframe()) + { + it->second->sendToPlayer(session.get()); + LOG_INFO("Sent GOP cache to new player"); + } + } + } + // Process media messages + if (info.is_publishing) + { + processMediaMessages(session); + } + } + LOG_INFO("Client disconnected"); + // Clean up GOP cache if last publisher + auto& info = session->getStreamInfo(); + if (info.is_publishing) + { + std::string key = makeStreamKey(info.app, info.stream_key); + if (countPublishers(info.app, info.stream_key) <= 1) + { + std::lock_guard lock(gop_mutex); + gop_caches.erase(key); + } + } + if (on_disconnect) + { + on_disconnect(session); + } + removeSession(session); + } + + void RTMPServer::processMediaMessages(std::shared_ptr session) + { + std::lock_guard lock(session->getQueueMutex()); + while (!session->getMessageQueue().empty()) + { + RTMPMessage msg = session->getMessageQueue().front(); + session->getMessageQueue().pop(); + MessageType type = (MessageType)msg.header.msg_type_id; + const auto& info = session->getStreamInfo(); + std::string key = makeStreamKey(info.app, info.stream_key); + switch (type) + { + case MessageType::AUDIO: + if (on_audio_data) + { + on_audio_data(session, msg.payload, msg.header.timestamp); + } + // Add to GOP cache + if (use_gop_cache) + { + std::lock_guard lock(gop_mutex); + auto it = gop_caches.find(key); + if (it != gop_caches.end()) + { + it->second->addAudioFrame(msg.payload, msg.header.timestamp); + } + } + // Record if enabled + { + std::lock_guard lock(recorder_mutex); + auto it = recorders.find(key); + if (it != recorders.end() && it->second->isRecording()) + { + it->second->writeAudioFrame(msg.payload, msg.header.timestamp); + } + } + // Relay to players + sendAudioToPlayers(info.app, info.stream_key, + msg.payload, msg.header.timestamp); + break; + case MessageType::VIDEO: + if (on_video_data) + { + on_video_data(session, msg.payload, msg.header.timestamp); + } + // Add to GOP cache + if (use_gop_cache) + { + std::lock_guard lock(gop_mutex); + auto it = gop_caches.find(key); + if (it != gop_caches.end()) + { + it->second->addVideoFrame(msg.payload, msg.header.timestamp); + } + } + // Record if enabled + { + std::lock_guard lock(recorder_mutex); + auto it = recorders.find(key); + if (it != recorders.end() && it->second->isRecording()) + { + it->second->writeVideoFrame(msg.payload, msg.header.timestamp); + } + } + // Relay to players + sendVideoToPlayers(info.app, info.stream_key, + msg.payload, msg.header.timestamp); + break; + case MessageType::DATA_AMF0: + { + size_t offset = 0; + auto command = session->decodeAMF0(msg.payload.data(), + msg.payload.size(), offset); + if (command && command->type == AMF0Type::STRING) + { + if (command->string == "@setDataFrame") + { + auto metadata_name = session->decodeAMF0(msg.payload.data(), + msg.payload.size(), offset); + } + auto metadata_obj = session->decodeAMF0(msg.payload.data(), + msg.payload.size(), offset); + if (metadata_obj && (metadata_obj->type == AMF0Type::OBJECT || + metadata_obj->type == AMF0Type::ECMA_ARRAY)) + { + if (on_metadata) + { + on_metadata(session, metadata_obj->object); + } + // Add to GOP cache + if (use_gop_cache) + { + std::lock_guard lock(gop_mutex); + auto it = gop_caches.find(key); + if (it != gop_caches.end()) + { + it->second->addMetadata(msg.payload); + } + } + // Record if enabled + { + std::lock_guard lock(recorder_mutex); + auto it = recorders.find(key); + if (it != recorders.end() && it->second->isRecording()) + { + it->second->writeMetadata(metadata_obj->object); + } + } + // Relay metadata to players + sendMetadataToPlayers(info.app, info.stream_key, msg.payload); + } + } + break; + } + default: + break; + } + } + } + + void RTMPServer::removeSession(std::shared_ptr session) + { + std::lock_guard lock(sessions_mutex); + sessions.erase(std::remove(sessions.begin(), sessions.end(), session), + sessions.end()); + } + + bool RTMPServer::sendAudioToPlayers(const std::string& app, + const std::string& stream_key, + const std::vector &data, uint32_t timestamp) + { + std::lock_guard lock(sessions_mutex); + for (auto& session : sessions) + { + const auto& info = session->getStreamInfo(); + if (info.is_playing && info.app == app && info.stream_key == stream_key) + { + session->sendChunk(4, timestamp, (uint8_t)MessageType::AUDIO, + info.stream_id, data); + } + } + return true; + } + + bool RTMPServer::sendVideoToPlayers(const std::string& app, + const std::string& stream_key, + const std::vector &data, uint32_t timestamp) + { + std::lock_guard lock(sessions_mutex); + for (auto& session : sessions) + { + const auto& info = session->getStreamInfo(); + if (info.is_playing && info.app == app && info.stream_key == stream_key) + { + session->sendChunk(4, timestamp, (uint8_t)MessageType::VIDEO, + info.stream_id, data); + } + } + return true; + } + + void RTMPServer::sendMetadataToPlayers(const std::string& app, + const std::string& stream_key, + const std::vector &data) + { + std::lock_guard lock(sessions_mutex); + for (auto& session : sessions) + { + const auto& info = session->getStreamInfo(); + if (info.is_playing && info.app == app && info.stream_key == stream_key) + { + session->sendChunk(4, 0, (uint8_t)MessageType::DATA_AMF0, + info.stream_id, data); + } + } + } + + void RTMPServer::pingClientsRoutine() + { + while (running) + { + std::this_thread::sleep_for(std::chrono::seconds(ping_interval)); + if (!running) break; + std::lock_guard lock(sessions_mutex); + uint32_t timestamp = std::chrono::duration_cast( + std::chrono::steady_clock::now().time_since_epoch()).count(); + for (auto& session : sessions) + { + session->sendPing(timestamp); + } + LOG_DEBUG("Sent PING to all clients"); + } + } + + void RTMPServer::timeoutCheckRoutine() + { + while (running) + { + std::this_thread::sleep_for(std::chrono::seconds(5)); + if (!running) break; + auto now = std::chrono::steady_clock::now(); + std::vector> to_remove; + { + std::lock_guard lock(sessions_mutex); + for (auto& session : sessions) + { + auto elapsed = std::chrono::duration_cast( + now - session->getLastActivity()).count(); + if (elapsed > connection_timeout) + { + LOG_WARN("Session timeout: " + + session->getStreamInfo().client_ip); + to_remove.push_back(session); + } + } + } + // Remove timed out sessions + for (auto& session : to_remove) + { + if (on_disconnect) + { + on_disconnect(session); + } + removeSession(session); + } + } + } + + int RTMPServer::countPublishers(const std::string& app, + const std::string& stream_key) const + { + std::lock_guard lock(sessions_mutex); + int count = 0; + for (const auto& session : sessions) + { + const auto& info = session->getStreamInfo(); + if (info.is_publishing && info.app == app && info.stream_key == stream_key) + { + count++; + } + } + return count; + } + + int RTMPServer::countPlayers(const std::string& app, + const std::string& stream_key) const + { + std::lock_guard lock(sessions_mutex); + int count = 0; + for (const auto& session : sessions) + { + const auto& info = session->getStreamInfo(); + if (info.is_playing && info.app == app && info.stream_key == stream_key) + { + count++; + } + } + return count; + } + + bool RTMPServer::checkConnectionLimits(const std::string& app, + const std::string& stream_key, + bool is_publisher) const + { + if (is_publisher) + { + int current = countPublishers(app, stream_key); + return current < max_publishers_per_stream; + } + else + { + int current = countPlayers(app, stream_key); + return current < max_players_per_stream; + } + } + + int RTMPServer::getActivePublishers() const + { + std::lock_guard lock(sessions_mutex); + int count = 0; + for (const auto& session : sessions) + { + if (session->getStreamInfo().is_publishing) + { + count++; + } + } + return count; + } + + int RTMPServer::getActivePlayers() const + { + std::lock_guard lock(sessions_mutex); + int count = 0; + for (const auto& session : sessions) + { + if (session->getStreamInfo().is_playing) + { + count++; + } + } + return count; + } + + int RTMPServer::getTotalConnections() const + { + std::lock_guard lock(sessions_mutex); + return sessions.size(); + } + + StreamStatistics RTMPServer::getStreamStats(const std::string& app, + const std::string& stream_key) const + { + std::lock_guard lock(sessions_mutex); + StreamStatistics combined; + for (const auto& session : sessions) + { + const auto& info = session->getStreamInfo(); + if (info.app == app && info.stream_key == stream_key) + { + const auto& stats = session->getStats(); + combined.bytes_sent += stats.bytes_sent; + combined.bytes_received += stats.bytes_received; + combined.video_frames += stats.video_frames; + combined.audio_frames += stats.audio_frames; + combined.dropped_frames += stats.dropped_frames; + } + } + return combined; + } + + std::vector> + RTMPServer::getAllStreamStats() const + { + std::lock_guard lock(sessions_mutex); + std::map stats_map; + for (const auto& session : sessions) + { + const auto& info = session->getStreamInfo(); + if (info.is_publishing || info.is_playing) + { + std::string key = makeStreamKey(info.app, info.stream_key); + const auto& stats = session->getStats(); + auto& combined = stats_map[key]; + combined.bytes_sent += stats.bytes_sent; + combined.bytes_received += stats.bytes_received; + combined.video_frames += stats.video_frames; + combined.audio_frames += stats.audio_frames; + combined.dropped_frames += stats.dropped_frames; + } + } + std::vector> result; + for (const auto& pair : stats_map) + { + result.push_back(pair); + } + return result; + } + + bool RTMPServer::startRecording(const std::string& app, + const std::string& stream_key, + const std::string& filename) + { + std::string key = makeStreamKey(app, stream_key); + std::lock_guard lock(recorder_mutex); + if (recorders.find(key) != recorders.end() && recorders[key]->isRecording()) + { + LOG_WARN("Already recording stream: " + key); + return false; + } + auto recorder = std::make_shared(filename); + if (!recorder->start()) + { + LOG_ERROR("Failed to start recording: " + filename); + return false; + } + recorders[key] = recorder; + LOG_INFO("Started recording " + key + " to " + filename); + return true; + } + + void RTMPServer::stopRecording(const std::string& app, + const std::string& stream_key) + { + std::string key = makeStreamKey(app, stream_key); + std::lock_guard lock(recorder_mutex); + auto it = recorders.find(key); + if (it != recorders.end()) + { + it->second->stop(); + recorders.erase(it); + LOG_INFO("Stopped recording: " + key); + } + } + + bool RTMPServer::isRecording(const std::string& app, + const std::string& stream_key) const + { + std::string key = makeStreamKey(app, stream_key); + std::lock_guard lock(recorder_mutex); + auto it = recorders.find(key); + return (it != recorders.end() && it->second->isRecording()); + } + + void RTMPServer::enablePingPong(bool enable, int interval_seconds) + { + ping_enabled = enable; + ping_interval = interval_seconds; + if (enable && running && !ping_thread.joinable()) + { + ping_thread = std::thread(&RTMPServer::pingClientsRoutine, this); + } + } + +} // namespace rtmp \ No newline at end of file