#[cfg(test)]
mod tests;
use crate::data_structures::{AppData, ListenInfo};
use crate::messages::{WebRtcServerCloseRequest, WebRtcServerDumpRequest};
use crate::transport::TransportId;
use crate::uuid_based_wrapper_type;
use crate::webrtc_transport::WebRtcTransport;
use crate::worker::{Channel, RequestError, Worker};
use async_executor::Executor;
use event_listener_primitives::{BagOnce, HandlerId};
use hash_hasher::HashedSet;
use log::{debug, error};
use mediasoup_sys::fbs::transport;
use parking_lot::Mutex;
use serde::{Deserialize, Serialize};
use std::fmt;
use std::net::IpAddr;
use std::ops::Deref;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Weak};
use thiserror::Error;
uuid_based_wrapper_type!(
WebRtcServerId
);
#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash, Deserialize, Serialize)]
#[doc(hidden)]
pub struct WebRtcServerIpPort {
pub ip: IpAddr,
pub port: u16,
}
#[derive(Debug, Clone, Eq, PartialEq, Hash, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
#[doc(hidden)]
pub struct WebRtcServerIceUsernameFragment {
pub local_ice_username_fragment: String,
#[serde(rename = "webRtcTransportId")]
pub webrtc_transport_id: TransportId,
}
#[derive(Debug, Clone, Eq, PartialEq, Hash, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
#[doc(hidden)]
pub struct WebRtcServerTupleHash {
pub tuple_hash: u64,
#[serde(rename = "webRtcTransportId")]
pub webrtc_transport_id: TransportId,
}
#[derive(Debug, Clone, Eq, PartialEq, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
#[doc(hidden)]
#[non_exhaustive]
pub struct WebRtcServerDump {
pub id: WebRtcServerId,
pub udp_sockets: Vec<WebRtcServerIpPort>,
pub tcp_servers: Vec<WebRtcServerIpPort>,
#[serde(rename = "webRtcTransportIds")]
pub webrtc_transport_ids: HashedSet<TransportId>,
pub local_ice_username_fragments: Vec<WebRtcServerIceUsernameFragment>,
pub tuple_hashes: Vec<WebRtcServerTupleHash>,
}
#[derive(Debug, Clone, Eq, PartialEq, Serialize)]
pub struct WebRtcServerListenInfos(Vec<ListenInfo>);
impl WebRtcServerListenInfos {
#[must_use]
pub fn new(listen_info: ListenInfo) -> Self {
Self(vec![listen_info])
}
#[must_use]
pub fn insert(mut self, listen_info: ListenInfo) -> Self {
self.0.push(listen_info);
self
}
pub(crate) fn to_fbs(&self) -> Vec<transport::ListenInfo> {
self.0
.iter()
.map(|listen_info| listen_info.to_fbs())
.collect()
}
}
impl Deref for WebRtcServerListenInfos {
type Target = Vec<ListenInfo>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
#[derive(Error, Debug, Eq, PartialEq)]
#[error("Empty list of listen infos provided, should have at least one element")]
pub struct EmptyListError;
impl TryFrom<Vec<ListenInfo>> for WebRtcServerListenInfos {
type Error = EmptyListError;
fn try_from(listen_infos: Vec<ListenInfo>) -> Result<Self, Self::Error> {
if listen_infos.is_empty() {
Err(EmptyListError)
} else {
Ok(Self(listen_infos))
}
}
}
#[derive(Debug)]
#[non_exhaustive]
pub struct WebRtcServerOptions {
pub listen_infos: WebRtcServerListenInfos,
pub app_data: AppData,
}
impl WebRtcServerOptions {
pub fn new(listen_infos: WebRtcServerListenInfos) -> Self {
Self {
listen_infos,
app_data: AppData::default(),
}
}
}
#[derive(Default)]
#[allow(clippy::type_complexity)]
struct Handlers {
new_webrtc_transport: BagOnce<Box<dyn Fn(&WebRtcTransport) + Send>>,
worker_close: BagOnce<Box<dyn FnOnce() + Send>>,
close: BagOnce<Box<dyn FnOnce() + Send>>,
}
struct Inner {
id: WebRtcServerId,
executor: Arc<Executor<'static>>,
channel: Channel,
handlers: Arc<Handlers>,
app_data: AppData,
worker: Worker,
closed: AtomicBool,
_on_worker_close_handler: Mutex<HandlerId>,
}
impl Drop for Inner {
fn drop(&mut self) {
debug!("drop()");
self.close();
}
}
impl Inner {
fn close(&self) {
if !self.closed.swap(true, Ordering::SeqCst) {
self.handlers.close.call_simple();
{
let channel = self.channel.clone();
let request = WebRtcServerCloseRequest {
webrtc_server_id: self.id,
};
self.executor
.spawn(async move {
if let Err(error) = channel.request("", request).await {
error!("WebRTC server closing failed on drop: {}", error);
}
})
.detach();
}
}
}
}
#[derive(Clone)]
pub struct WebRtcServer {
inner: Arc<Inner>,
}
impl fmt::Debug for WebRtcServer {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("WebRtcServer")
.field("id", &self.inner.id)
.field("worker", &self.inner.worker)
.field("closed", &self.inner.closed)
.finish()
}
}
impl WebRtcServer {
pub(crate) fn new(
id: WebRtcServerId,
executor: Arc<Executor<'static>>,
channel: Channel,
app_data: AppData,
worker: Worker,
) -> Self {
let handlers = Arc::<Handlers>::default();
let inner_weak = Arc::<Mutex<Option<Weak<Inner>>>>::default();
let on_worker_close_handler = worker.on_close({
let inner_weak = Arc::clone(&inner_weak);
move || {
let maybe_inner = inner_weak.lock().as_ref().and_then(Weak::upgrade);
if let Some(inner) = maybe_inner {
inner.handlers.worker_close.call_simple();
if !inner.closed.swap(true, Ordering::SeqCst) {
inner.handlers.close.call_simple();
}
}
}
});
let inner = Arc::new(Inner {
id,
executor,
channel,
handlers,
app_data,
worker,
closed: AtomicBool::new(false),
_on_worker_close_handler: Mutex::new(on_worker_close_handler),
});
inner_weak.lock().replace(Arc::downgrade(&inner));
Self { inner }
}
#[must_use]
pub fn id(&self) -> WebRtcServerId {
self.inner.id
}
pub fn worker(&self) -> &Worker {
&self.inner.worker
}
#[must_use]
pub fn app_data(&self) -> &AppData {
&self.inner.app_data
}
#[must_use]
pub fn closed(&self) -> bool {
self.inner.closed.load(Ordering::SeqCst)
}
#[doc(hidden)]
pub async fn dump(&self) -> Result<WebRtcServerDump, RequestError> {
debug!("dump()");
self.inner
.channel
.request(self.id(), WebRtcServerDumpRequest {})
.await
}
pub fn on_worker_close<F: FnOnce() + Send + 'static>(&self, callback: F) -> HandlerId {
self.inner.handlers.worker_close.add(Box::new(callback))
}
pub fn on_new_webrtc_transport<F>(&self, callback: F) -> HandlerId
where
F: Fn(&WebRtcTransport) + Send + 'static,
{
self.inner
.handlers
.new_webrtc_transport
.add(Box::new(callback))
}
pub fn on_close<F: FnOnce() + Send + 'static>(&self, callback: F) -> HandlerId {
let handler_id = self.inner.handlers.close.add(Box::new(callback));
if self.inner.closed.load(Ordering::Relaxed) {
self.inner.handlers.close.call_simple();
}
handler_id
}
pub(crate) fn notify_new_webrtc_transport(&self, webrtc_transport: &WebRtcTransport) {
self.inner
.handlers
.new_webrtc_transport
.call(|callback| callback(webrtc_transport));
}
#[cfg(test)]
pub(crate) fn close(&self) {
self.inner.close();
}
}