diff --git a/src/rtmp_server.cpp b/src/rtmp_server.cpp index 8786334..378683b 100644 --- a/src/rtmp_server.cpp +++ b/src/rtmp_server.cpp @@ -82,27 +82,73 @@ bool GOPCache::isKeyframe(const std::vector &data) { if (data.empty()) return false; uint8_t frame_type = (data[0] >> 4) & 0x0F; - return frame_type == 1; // 1 = keyframe (IDR) + return frame_type == 1; } void GOPCache::addVideoFrame(const std::vector &data, uint32_t timestamp) { std::lock_guard lock(cache_mutex); + + if (data.size() >= 2) { + uint8_t frame_type = (data[0] >> 4) & 0x0F; + uint8_t codec_id = data[0] & 0x0F; + uint8_t avc_packet_type = data[1]; + + if (codec_id == 7 && avc_packet_type == 0) { + CachedFrame frame; + frame.type = MessageType::VIDEO; + frame.data = data; + frame.timestamp = timestamp; + + if (!frames.empty() && frames[0].data.size() >= 2) { + uint8_t first_codec = frames[0].data[0] & 0x0F; + uint8_t first_packet = frames[0].data[1]; + if (first_codec == 7 && first_packet == 0) { + frames.erase(frames.begin()); + } + } + + frames.insert(frames.begin(), frame); + has_keyframe = true; + LOG_INFO("Cached AVC Sequence Header (SPS/PPS)"); + return; + } + } + if (isKeyframe(data)) { - frames.clear(); + std::vector new_frames; + if (!frames.empty() && frames[0].data.size() >= 2) { + uint8_t first_codec = frames[0].data[0] & 0x0F; + uint8_t first_packet = frames[0].data[1]; + if (first_codec == 7 && first_packet == 0) { + new_frames.push_back(frames[0]); // Keep sequence header + } + } + frames = new_frames; has_keyframe = true; } + if (has_keyframe) { CachedFrame frame; frame.type = MessageType::VIDEO; frame.data = data; frame.timestamp = timestamp; frames.push_back(frame); + if (frames.size() > 300) { for (size_t i = 1; i < frames.size(); i++) { if (frames[i].type == MessageType::VIDEO && isKeyframe(frames[i].data)) { - frames.erase(frames.begin(), frames.begin() + i); + std::vector new_frames; + if (!frames.empty() && frames[0].data.size() >= 2) { + uint8_t first_codec = frames[0].data[0] & 0x0F; + uint8_t first_packet = frames[0].data[1]; + if (first_codec == 7 && first_packet == 0) { + new_frames.push_back(frames[0]); + } + } + new_frames.insert(new_frames.end(), frames.begin() + i, frames.end()); + frames = new_frames; break; } } @@ -125,19 +171,33 @@ void GOPCache::addAudioFrame(const std::vector &data, void GOPCache::addMetadata(const std::vector &data) { std::lock_guard lock(cache_mutex); metadata = data; + LOG_DEBUG("Cached metadata"); } void GOPCache::sendToPlayer(RTMPSession *session) { std::lock_guard lock(cache_mutex); const auto &info = session->getStreamInfo(); + if (!metadata.empty()) { session->sendChunk(4, 0, (uint8_t)MessageType::DATA_AMF0, info.stream_id, metadata); + LOG_DEBUG("Sent metadata to new player"); } + for (const auto &frame : frames) { - session->sendChunk(4, frame.timestamp, (uint8_t)frame.type, info.stream_id, - frame.data); + uint32_t csid = 4; + if (frame.type == MessageType::VIDEO) { + csid = 6; + } else if (frame.type == MessageType::AUDIO) { + csid = 4; + } + + session->sendChunk(csid, frame.timestamp, (uint8_t)frame.type, + info.stream_id, frame.data); } + + LOG_INFO("Sent " + std::to_string(frames.size()) + + " cached frames to player"); } void GOPCache::clear() { @@ -936,34 +996,108 @@ bool RTMPSession::handlePlay( stream_info.is_playing = true; LOG_INFO("Playing: " + stream_info.stream_key); } + + std::vector stream_begin(6); + writeUint16BE(stream_begin.data(), (uint16_t)UserControlType::STREAM_BEGIN); + writeUint32BE(stream_begin.data() + 2, stream_info.stream_id); + sendChunk(2, 0, (uint8_t)MessageType::USER_CONTROL, 0, stream_begin); + + std::vector is_recorded(6); + writeUint16BE(is_recorded.data(), + (uint16_t)UserControlType::STREAM_IS_RECORDED); + writeUint32BE(is_recorded.data() + 2, stream_info.stream_id); + sendChunk(2, 0, (uint8_t)MessageType::USER_CONTROL, 0, is_recorded); + return sendPlayResponse(); } bool RTMPSession::sendPlayResponse() { - std::vector response; - auto cmd = encodeAMF0String("onStatus"); - response.insert(response.end(), cmd.begin(), cmd.end()); - auto tid = encodeAMF0Number(0); - response.insert(response.end(), tid.begin(), tid.end()); - AMF0Value null_val; - null_val.type = AMF0Type::NULL_TYPE; - auto null_enc = encodeAMF0(null_val); - response.insert(response.end(), null_enc.begin(), null_enc.end()); - AMF0Value info; - info.type = AMF0Type::OBJECT; - info.object["level"] = std::make_shared(); - info.object["level"]->type = AMF0Type::STRING; - info.object["level"]->string = "status"; - info.object["code"] = std::make_shared(); - info.object["code"]->type = AMF0Type::STRING; - info.object["code"]->string = "NetStream.Play.Start"; - info.object["description"] = std::make_shared(); - info.object["description"]->type = AMF0Type::STRING; - info.object["description"]->string = "Stream is now playing."; - auto info_enc = encodeAMF0(info); - response.insert(response.end(), info_enc.begin(), info_enc.end()); - return sendChunk(5, 0, (uint8_t)MessageType::COMMAND_AMF0, - stream_info.stream_id, response); + { + std::vector response; + auto cmd = encodeAMF0String("onStatus"); + response.insert(response.end(), cmd.begin(), cmd.end()); + auto tid = encodeAMF0Number(0); + response.insert(response.end(), tid.begin(), tid.end()); + AMF0Value null_val; + null_val.type = AMF0Type::NULL_TYPE; + auto null_enc = encodeAMF0(null_val); + response.insert(response.end(), null_enc.begin(), null_enc.end()); + + AMF0Value info; + info.type = AMF0Type::OBJECT; + info.object["level"] = std::make_shared(); + info.object["level"]->type = AMF0Type::STRING; + info.object["level"]->string = "status"; + info.object["code"] = std::make_shared(); + info.object["code"]->type = AMF0Type::STRING; + info.object["code"]->string = "NetStream.Play.Reset"; + info.object["description"] = std::make_shared(); + info.object["description"]->type = AMF0Type::STRING; + info.object["description"]->string = "Playing and resetting stream."; + auto info_enc = encodeAMF0(info); + response.insert(response.end(), info_enc.begin(), info_enc.end()); + + sendChunk(5, 0, (uint8_t)MessageType::COMMAND_AMF0, stream_info.stream_id, + response); + } + + { + std::vector response; + auto cmd = encodeAMF0String("onStatus"); + response.insert(response.end(), cmd.begin(), cmd.end()); + auto tid = encodeAMF0Number(0); + response.insert(response.end(), tid.begin(), tid.end()); + AMF0Value null_val; + null_val.type = AMF0Type::NULL_TYPE; + auto null_enc = encodeAMF0(null_val); + response.insert(response.end(), null_enc.begin(), null_enc.end()); + + AMF0Value info; + info.type = AMF0Type::OBJECT; + info.object["level"] = std::make_shared(); + info.object["level"]->type = AMF0Type::STRING; + info.object["level"]->string = "status"; + info.object["code"] = std::make_shared(); + info.object["code"]->type = AMF0Type::STRING; + info.object["code"]->string = "NetStream.Play.Start"; + info.object["description"] = std::make_shared(); + info.object["description"]->type = AMF0Type::STRING; + info.object["description"]->string = "Started playing stream."; + info.object["details"] = std::make_shared(); + info.object["details"]->type = AMF0Type::STRING; + info.object["details"]->string = stream_info.stream_key; + info.object["clientid"] = std::make_shared(); + info.object["clientid"]->type = AMF0Type::NUMBER; + info.object["clientid"]->number = stream_info.stream_id; + auto info_enc = encodeAMF0(info); + response.insert(response.end(), info_enc.begin(), info_enc.end()); + + sendChunk(5, 0, (uint8_t)MessageType::COMMAND_AMF0, stream_info.stream_id, + response); + } + + { + std::vector response; + auto cmd = encodeAMF0String("|RtmpSampleAccess"); + response.insert(response.end(), cmd.begin(), cmd.end()); + + AMF0Value audio_access; + audio_access.type = AMF0Type::BOOLEAN; + audio_access.boolean = true; + auto audio_enc = encodeAMF0(audio_access); + response.insert(response.end(), audio_enc.begin(), audio_enc.end()); + + AMF0Value video_access; + video_access.type = AMF0Type::BOOLEAN; + video_access.boolean = true; + auto video_enc = encodeAMF0(video_access); + response.insert(response.end(), video_enc.begin(), video_enc.end()); + + sendChunk(5, 0, (uint8_t)MessageType::DATA_AMF0, stream_info.stream_id, + response); + } + + return true; } bool RTMPSession::handleDeleteStream( @@ -1349,7 +1483,7 @@ bool RTMPServer::sendVideoToPlayers(const std::string &app, for (auto &session : sessions) { const auto &info = session->getStreamInfo(); if (info.is_playing && info.app == app && info.stream_key == stream_key) { - session->sendChunk(4, timestamp, (uint8_t)MessageType::VIDEO, + session->sendChunk(6, timestamp, (uint8_t)MessageType::VIDEO, info.stream_id, data); } }