diff --git a/.changeset/send_publisher_offer_with_join_request_to_accelerate_connect.md b/.changeset/send_publisher_offer_with_join_request_to_accelerate_connect.md new file mode 100644 index 000000000..d44910b93 --- /dev/null +++ b/.changeset/send_publisher_offer_with_join_request_to_accelerate_connect.md @@ -0,0 +1,7 @@ +--- +livekit: patch +livekit-api: patch +livekit-ffi: patch +--- + +Send publisher offer with join request to accelerate connection - #996 (@cnderrauber) diff --git a/Cargo.lock b/Cargo.lock index dc50e81f0..28ac98b45 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4018,6 +4018,7 @@ version = "0.4.18" dependencies = [ "async-tungstenite", "base64 0.21.7", + "flate2", "futures-util", "http 1.4.0", "isahc", diff --git a/livekit-api/Cargo.toml b/livekit-api/Cargo.toml index 45eebf8e3..5b42a0266 100644 --- a/livekit-api/Cargo.toml +++ b/livekit-api/Cargo.toml @@ -17,7 +17,8 @@ signal-client-tokio = [ "dep:reqwest", "dep:livekit-runtime", "livekit-runtime/tokio", - "dep:base64" + "dep:base64", + "dep:flate2" ] signal-client-async = [ @@ -36,7 +37,8 @@ __signal-client-async-compatible = [ "dep:futures-util", "dep:isahc", "dep:livekit-runtime", - "dep:base64" + "dep:base64", + "dep:flate2" ] @@ -101,6 +103,7 @@ http = "1.1" reqwest = { version = "0.12", default-features = false, features = [ "json" ], optional = true } isahc = { version = "1.7.2", default-features = false, features = [ "json", "text-decoding" ], optional = true } +flate2 = { version = "1", optional = true } scopeguard = "1.2.0" rand = { workspace = true } os_info = "3.14.0" diff --git a/livekit-api/src/signal_client/mod.rs b/livekit-api/src/signal_client/mod.rs index 2a7b8bada..9d0bd4034 100644 --- a/livekit-api/src/signal_client/mod.rs +++ b/livekit-api/src/signal_client/mod.rs @@ -15,6 +15,7 @@ use std::{ borrow::Cow, fmt::Debug, + io::Write, sync::{ atomic::{AtomicBool, AtomicU32, Ordering}, Arc, @@ -22,7 +23,8 @@ use std::{ time::{Duration, SystemTime, UNIX_EPOCH}, }; -use base64::{engine::general_purpose::STANDARD as BASE64_STANDARD, Engine}; +use base64::{engine::general_purpose::URL_SAFE as BASE64_URL_SAFE, Engine}; +use flate2::{write::GzEncoder, Compression}; use http::StatusCode; use livekit_protocol as proto; use livekit_runtime::{interval, sleep, Instant, JoinHandle}; @@ -157,6 +159,7 @@ impl SignalClient { url: &str, token: &str, options: SignalOptions, + publisher_offer: Option, ) -> SignalResult<(Self, proto::JoinResponse, SignalEvents)> { let handle_success = |inner: Arc, join_response, stream_events| { let (emitter, events) = mpsc::unbounded_channel(); @@ -166,7 +169,7 @@ impl SignalClient { (Self { inner, emitter, handle: Mutex::new(Some(signal_task)) }, join_response, events) }; - match SignalInner::connect(url, token, options.clone()).await { + match SignalInner::connect(url, token, options.clone(), publisher_offer.clone()).await { Ok((inner, join_response, stream_events)) => { Ok(handle_success(inner, join_response, stream_events)) } @@ -180,7 +183,9 @@ impl SignalClient { for url in urls.iter() { log::info!("fallback connection to: {}", url); - match SignalInner::connect(url, token, options.clone()).await { + match SignalInner::connect(url, token, options.clone(), publisher_offer.clone()) + .await + { Ok((inner, join_response, stream_events)) => { return Ok(handle_success(inner, join_response, stream_events)) } @@ -263,6 +268,7 @@ impl SignalInner { url: &str, token: &str, options: SignalOptions, + publisher_offer: Option, ) -> SignalResult<( Arc, proto::JoinResponse, @@ -271,7 +277,8 @@ impl SignalInner { // Try v1 path first if single_peer_connection is enabled let use_v1_path = options.single_peer_connection; // For initial connection: reconnect=false, reconnect_reason=None, participant_sid="" - let lk_url = get_livekit_url(url, &options, use_v1_path, false, None, "")?; + let lk_url = + get_livekit_url(url, &options, use_v1_path, false, None, "", publisher_offer.as_ref())?; // Try to connect to the SignalClient let (stream, mut events, single_pc_mode_active) = match SignalStream::connect(lk_url.clone(), token, options.connect_timeout).await { @@ -301,7 +308,8 @@ impl SignalInner { matches!(&err, SignalError::WsError(WsError::Http(e)) if e.status() == 404); if use_v1_path && is_not_found { - let lk_url_v0 = get_livekit_url(url, &options, false, false, None, "")?; + let lk_url_v0 = + get_livekit_url(url, &options, false, false, None, "", None)?; log::warn!("v1 path not found (404), falling back to v0 path"); match SignalStream::connect( lk_url_v0.clone(), @@ -397,9 +405,17 @@ impl SignalInner { // For reconnects: reconnect=true, participant_sid=sid // For v1 path: reconnect and sid are encoded in the join_request protobuf // For v0 path: reconnect and sid are added as separate query parameters - let lk_url = - get_livekit_url(&self.url, &self.options, self.single_pc_mode_active, true, None, sid) - .unwrap(); + // No publisher offer for reconnections + let lk_url = get_livekit_url( + &self.url, + &self.options, + self.single_pc_mode_active, + true, + None, + sid, + None, + ) + .unwrap(); let (new_stream, mut events) = SignalStream::connect(lk_url, &token, self.options.connect_timeout).await?; @@ -556,6 +572,7 @@ fn create_join_request_param( participant_sid: &str, os: String, os_version: String, + publisher_offer: Option<&proto::SessionDescription>, ) -> String { let connection_settings = proto::ConnectionSettings { auto_subscribe: options.auto_subscribe, @@ -576,6 +593,7 @@ fn create_join_request_param( client_info: Some(client_info), connection_settings: Some(connection_settings), reconnect, + publisher_offer: publisher_offer.cloned(), ..Default::default() }; @@ -592,13 +610,29 @@ fn create_join_request_param( // Serialize JoinRequest to bytes let join_request_bytes = join_request.encode_to_vec(); - // Create WrappedJoinRequest (JS doesn't explicitly set compression, so default is NONE) + // Use gzip compression when publisher offer is included (SDP makes payload large) + let (compressed_bytes, compression) = if publisher_offer.is_some() { + let mut encoder = GzEncoder::new(Vec::new(), Compression::default()); + if encoder.write_all(&join_request_bytes).is_ok() { + if let Ok(compressed) = encoder.finish() { + (compressed, proto::wrapped_join_request::Compression::Gzip as i32) + } else { + (join_request_bytes, proto::wrapped_join_request::Compression::None as i32) + } + } else { + (join_request_bytes, proto::wrapped_join_request::Compression::None as i32) + } + } else { + (join_request_bytes, proto::wrapped_join_request::Compression::None as i32) + }; + let wrapped_join_request = - proto::WrappedJoinRequest { join_request: join_request_bytes, ..Default::default() }; + proto::WrappedJoinRequest { join_request: compressed_bytes, compression }; - // Serialize WrappedJoinRequest to bytes and base64 encode + // Serialize WrappedJoinRequest to bytes and base64url encode + // (URL-safe base64 avoids percent-encoding issues in query parameters) let wrapped_bytes = wrapped_join_request.encode_to_vec(); - BASE64_STANDARD.encode(&wrapped_bytes) + BASE64_URL_SAFE.encode(&wrapped_bytes) } /// Build the LiveKit WebSocket URL for connection @@ -617,6 +651,7 @@ fn get_livekit_url( reconnect: bool, reconnect_reason: Option, participant_sid: &str, + publisher_offer: Option<&proto::SessionDescription>, ) -> SignalResult { let mut lk_url = url::Url::parse(url).map_err(|err| SignalError::UrlParse(err.to_string()))?; @@ -651,6 +686,7 @@ fn get_livekit_url( participant_sid, os_info.os_type().to_string(), os_info.version().to_string(), + publisher_offer, ); lk_url.query_pairs_mut().append_pair("join_request", &join_request_param); } else { @@ -754,32 +790,39 @@ mod tests { fn livekit_url_test() { let io = SignalOptions::default(); - assert!(get_livekit_url("localhost:7880", &io, false, false, None, "").is_err()); + assert!(get_livekit_url("localhost:7880", &io, false, false, None, "", None).is_err()); assert_eq!( - get_livekit_url("https://localhost:7880", &io, false, false, None, "") + get_livekit_url("https://localhost:7880", &io, false, false, None, "", None) .unwrap() .scheme(), "wss" ); assert_eq!( - get_livekit_url("http://localhost:7880", &io, false, false, None, "").unwrap().scheme(), + get_livekit_url("http://localhost:7880", &io, false, false, None, "", None) + .unwrap() + .scheme(), "ws" ); assert_eq!( - get_livekit_url("wss://localhost:7880", &io, false, false, None, "").unwrap().scheme(), + get_livekit_url("wss://localhost:7880", &io, false, false, None, "", None) + .unwrap() + .scheme(), "wss" ); assert_eq!( - get_livekit_url("ws://localhost:7880", &io, false, false, None, "").unwrap().scheme(), + get_livekit_url("ws://localhost:7880", &io, false, false, None, "", None) + .unwrap() + .scheme(), "ws" ); - assert!(get_livekit_url("ftp://localhost:7880", &io, false, false, None, "").is_err()); + assert!(get_livekit_url("ftp://localhost:7880", &io, false, false, None, "", None).is_err()); } #[test] fn validate_url_test() { let io = SignalOptions::default(); - let lk_url = get_livekit_url("wss://localhost:7880", &io, false, false, None, "").unwrap(); + let lk_url = + get_livekit_url("wss://localhost:7880", &io, false, false, None, "", None).unwrap(); let validate_url = get_validate_url(lk_url); // Should be /rtc/validate, not /rtc/rtc/validate diff --git a/livekit/src/room/mod.rs b/livekit/src/room/mod.rs index 0035237b6..89ae9c205 100644 --- a/livekit/src/room/mod.rs +++ b/livekit/src/room/mod.rs @@ -1171,7 +1171,8 @@ impl RoomSession { let stream_id = stream.id(); let lk_stream_id = unpack_stream_id(&stream_id); if lk_stream_id.is_none() { - log::error!("received track with an invalid track_id: {:?}", &stream_id); + // server could require extra media sections to accelerate subscription. + log::debug!("received track with an invalid track_id: {:?}", &stream_id); return; } diff --git a/livekit/src/rtc_engine/peer_transport.rs b/livekit/src/rtc_engine/peer_transport.rs index da1ac2821..62c15f47f 100644 --- a/livekit/src/rtc_engine/peer_transport.rs +++ b/livekit/src/rtc_engine/peer_transport.rs @@ -33,6 +33,7 @@ struct TransportInner { single_pc_mode: bool, // Publish-side target bitrate (bps) for offer munging max_send_bitrate_bps: Option, + pending_initial_offer: Option, } pub struct PeerTransport { @@ -64,6 +65,7 @@ impl PeerTransport { restarting_ice: false, single_pc_mode, max_send_bitrate_bps: None, + pending_initial_offer: None, })), } } @@ -108,6 +110,10 @@ impl PeerTransport { ) -> EngineResult<()> { let mut inner = self.inner.lock().await; + if let Some(pending_offer) = inner.pending_initial_offer.take() { + self.peer_connection.set_local_description(pending_offer).await?; + } + self.peer_connection.set_remote_description(remote_description).await?; for ic in inner.pending_candidates.drain(..) { @@ -136,6 +142,34 @@ impl PeerTransport { Ok(answer) } + /// Create an initial offer without setting it as local description. + /// The offer is stored as pending and will be applied when the server's answer arrives. + pub async fn create_initial_offer(&self) -> EngineResult> { + let mut inner = self.inner.lock().await; + if !inner.single_pc_mode { + return Ok(None); + } + + let offer = self.peer_connection.create_offer(OfferOptions::default()).await?; + let sdp = offer.to_string(); + + let recvonly_munged = Self::munge_inactive_to_recvonly_for_media(&sdp); + if recvonly_munged != sdp { + if let Ok(parsed) = SessionDescription::parse(&recvonly_munged, offer.sdp_type()) { + inner.pending_initial_offer = Some(parsed.clone()); + return Ok(Some(parsed)); + } + } + + inner.pending_initial_offer = Some(offer.clone()); + Ok(Some(offer)) + } + + pub async fn clear_pending_initial_offer(&self) { + let mut inner = self.inner.lock().await; + inner.pending_initial_offer = None; + } + pub async fn set_max_send_bitrate_bps(&self, bps: Option) { let mut inner = self.inner.lock().await; inner.max_send_bitrate_bps = bps; @@ -333,6 +367,11 @@ impl PeerTransport { inner.restarting_ice = true; } + if inner.pending_initial_offer.is_some() { + inner.renegotiate = true; + return Ok(()); + } + if self.peer_connection.signaling_state() == SignalingState::HaveLocalOffer { let remote_sdp = self.peer_connection.current_remote_description(); if options.ice_restart && remote_sdp.is_some() { diff --git a/livekit/src/rtc_engine/rtc_session.rs b/livekit/src/rtc_engine/rtc_session.rs index 3b181fae5..4d3faab03 100644 --- a/livekit/src/rtc_engine/rtc_session.rs +++ b/livekit/src/rtc_engine/rtc_session.rs @@ -391,6 +391,7 @@ struct SessionInner { e2ee_manager: Option, subscriber_primary: bool, + pc_state_notify: Notify, } /// Information about the local participant needed for outgoing @@ -443,8 +444,47 @@ impl RtcSession { ) -> EngineResult<(Self, proto::JoinResponse, SessionEvents)> { let (emitter, session_events) = mpsc::unbounded_channel(); - let (signal_client, mut join_response, signal_events) = - SignalClient::connect(url, token, options.signal_options.clone()).await?; + let lk_runtime = LkRuntime::instance(); + let use_single_pc = options.signal_options.single_peer_connection; + + let mut publisher_offer = None; + let early_publisher_pc = if use_single_pc { + let publisher_pc = PeerTransport::new( + lk_runtime.pc_factory().create_peer_connection(options.rtc_config.clone())?, + proto::SignalTarget::Publisher, + true, + ); + + let dcs = Self::create_data_channels(&publisher_pc, &emitter)?; + Self::add_recv_media_sections(&publisher_pc.peer_connection(), 3, 3)?; + + match publisher_pc.create_initial_offer().await { + Ok(Some(offer)) => { + publisher_offer = Some(proto::SessionDescription { + r#type: "offer".to_string(), + sdp: offer.to_string(), + id: 0, + mid_to_track_id: Default::default(), + }); + } + Ok(None) => {} + Err(err) => { + log::warn!("failed to create initial publisher offer: {:?}", err); + } + } + + Some((publisher_pc, dcs)) + } else { + None + }; + + let (signal_client, mut join_response, signal_events) = SignalClient::connect( + url, + token, + options.signal_options.clone(), + publisher_offer.clone(), + ) + .await?; let signal_client = Arc::new(signal_client); log::debug!("received JoinResponse: {:?}", join_response); let subscriber_primary = join_response.subscriber_primary; @@ -464,12 +504,28 @@ impl RtcSession { let (dc_emitter, dc_events) = mpsc::unbounded_channel(); - let lk_runtime = LkRuntime::instance(); - let mut publisher_pc = PeerTransport::new( - lk_runtime.pc_factory().create_peer_connection(rtc_config.clone())?, - proto::SignalTarget::Publisher, - single_pc_mode, - ); + let sent_publisher_offer; + let (mut publisher_pc, mut reliable_dc, mut lossy_dc, data_track_dc) = + if let Some((pub_pc, dcs)) = early_publisher_pc { + if single_pc_mode { + pub_pc.peer_connection().set_configuration(rtc_config.clone())?; + sent_publisher_offer = publisher_offer.is_some(); + } else { + pub_pc.clear_pending_initial_offer().await; + pub_pc.peer_connection().set_configuration(rtc_config.clone())?; + sent_publisher_offer = false; + } + (pub_pc, dcs.0, dcs.1, dcs.2) + } else { + sent_publisher_offer = false; + let publisher_pc = PeerTransport::new( + lk_runtime.pc_factory().create_peer_connection(rtc_config.clone())?, + proto::SignalTarget::Publisher, + single_pc_mode, + ); + let dcs = Self::create_data_channels(&publisher_pc, &emitter)?; + (publisher_pc, dcs.0, dcs.1, dcs.2) + }; // In single PC mode, subscriber_pc is None let mut subscriber_pc = if single_pc_mode { @@ -482,24 +538,6 @@ impl RtcSession { )) }; - let mut reliable_dc = publisher_pc.peer_connection().create_data_channel( - RELIABLE_DC_LABEL, - // Use ordered: true for reliable delivery with ordering guarantees. - DataChannelInit { ordered: true, ..Default::default() }, - )?; - - let lossy_options = - DataChannelInit { ordered: false, max_retransmits: Some(0), ..Default::default() }; - - let mut lossy_dc = publisher_pc - .peer_connection() - .create_data_channel(LOSSY_DC_LABEL, lossy_options.clone())?; - - let data_track_dc = publisher_pc - .peer_connection() - .create_data_channel(DATA_TRACK_DC_LABEL, lossy_options)?; - handle_remote_dt_packets(&data_track_dc, emitter.downgrade()); - // Forward events received inside the signaling thread to our rtc channel rtc_events::forward_pc_events(&mut publisher_pc, rtc_emitter.clone()); if let Some(ref mut sub_pc) = subscriber_pc { @@ -551,6 +589,7 @@ impl RtcSession { pending_requests: Default::default(), e2ee_manager, subscriber_primary, + pc_state_notify: Notify::new(), }); // Start session tasks @@ -570,15 +609,62 @@ impl RtcSession { dt_sender_task, })); - // In single PC mode (or with fast_publish), trigger initial negotiation - // This matches JS SDK behavior: if (!this.subscriberPrimary || joinResponse.fastPublish) { this.negotiate(); } - if single_pc_mode || join_response.fast_publish || !subscriber_primary { + // If we already sent the publisher offer with the JoinRequest, skip initial + // negotiation - the server will respond with an answer via the signal channel. + // Otherwise, trigger negotiation as before. + if sent_publisher_offer { + inner.has_published.store(true, Ordering::Release); + } else if single_pc_mode || join_response.fast_publish || !subscriber_primary { inner.publisher_negotiation_needed(); } Ok((Self { inner, handle }, join_response, session_events)) } + fn create_data_channels( + publisher_pc: &PeerTransport, + emitter: &mpsc::UnboundedSender, + ) -> EngineResult<(DataChannel, DataChannel, DataChannel)> { + let reliable_dc = publisher_pc.peer_connection().create_data_channel( + RELIABLE_DC_LABEL, + DataChannelInit { ordered: true, ..Default::default() }, + )?; + + let lossy_options = + DataChannelInit { ordered: false, max_retransmits: Some(0), ..Default::default() }; + + let lossy_dc = publisher_pc + .peer_connection() + .create_data_channel(LOSSY_DC_LABEL, lossy_options.clone())?; + + let data_track_dc = publisher_pc + .peer_connection() + .create_data_channel(DATA_TRACK_DC_LABEL, lossy_options)?; + handle_remote_dt_packets(&data_track_dc, emitter.downgrade()); + + Ok((reliable_dc, lossy_dc, data_track_dc)) + } + + fn add_recv_media_sections( + pc: &PeerConnection, + audio_count: u32, + video_count: u32, + ) -> EngineResult<()> { + let recvonly_init = RtpTransceiverInit { + direction: RtpTransceiverDirection::RecvOnly, + stream_ids: Vec::new(), + send_encodings: Vec::new(), + }; + + for _ in 0..audio_count { + pc.add_transceiver_for_media(MediaType::Audio, recvonly_init.clone())?; + } + for _ in 0..video_count { + pc.add_transceiver_for_media(MediaType::Video, recvonly_init.clone())?; + } + Ok(()) + } + pub fn has_published(&self) -> bool { self.inner.has_published.load(Ordering::Acquire) } @@ -1227,25 +1313,11 @@ impl SessionInner { req.num_videos ); - let recvonly_init = RtpTransceiverInit { - direction: RtpTransceiverDirection::RecvOnly, - stream_ids: Vec::new(), - send_encodings: Vec::new(), - }; - - // Add audio transceivers - for _ in 0..req.num_audios { - self.publisher_pc - .peer_connection() - .add_transceiver_for_media(MediaType::Audio, recvonly_init.clone())?; - } - - // Add video transceivers - for _ in 0..req.num_videos { - self.publisher_pc - .peer_connection() - .add_transceiver_for_media(MediaType::Video, recvonly_init.clone())?; - } + RtcSession::add_recv_media_sections( + &self.publisher_pc.peer_connection(), + req.num_audios, + req.num_videos, + )?; // Trigger renegotiation self.publisher_negotiation_needed(); @@ -1273,6 +1345,8 @@ impl SessionInner { RtcEvent::ConnectionChange { state, target } => { log::debug!("connection change, {:?} {:?}", state, target); + self.pc_state_notify.notify_waiters(); + if state == PeerConnectionState::Failed { log::error!("{:?} pc state failed", target); self.on_session_disconnected( @@ -1655,6 +1729,7 @@ impl SessionInner { async fn close(&self, reason: DisconnectReason) { self.closed.store(true, Ordering::Release); + self.pc_state_notify.notify_waiters(); self.signal_client .send(proto::signal_request::Message::Leave(proto::LeaveRequest { @@ -1869,6 +1944,8 @@ impl SessionInner { async fn wait_pc_connection(&self) -> EngineResult<()> { let wait_connected = async move { loop { + let notified = self.pc_state_notify.notified(); + if self.closed.load(Ordering::Acquire) { return Err(EngineError::Connection("closed".into())); } @@ -1886,7 +1963,7 @@ impl SessionInner { break; } - livekit_runtime::sleep(Duration::from_millis(50)).await; + let _ = tokio::time::timeout(Duration::from_millis(50), notified).await; } Ok(())