From 6b16f72f286f54ab42ec60604d7e3c9d0b759cf2 Mon Sep 17 00:00:00 2001 From: Matt Date: Thu, 9 Jun 2022 22:26:36 +1200 Subject: [PATCH 1/2] feat: added noAutoSubscribeDataChannels configuration which prevents automatic subscription to datachannels --- pkg/sfu/peer.go | 5 +++++ pkg/sfu/session.go | 11 ++++++++--- pkg/sfu/subscriber.go | 16 +++++++++------- 3 files changed, 22 insertions(+), 10 deletions(-) diff --git a/pkg/sfu/peer.go b/pkg/sfu/peer.go index 7fa2432a1..0c18cef2c 100644 --- a/pkg/sfu/peer.go +++ b/pkg/sfu/peer.go @@ -44,6 +44,10 @@ type JoinConfig struct { // to customize the subscrbe stream combination as needed. // this parameter depends on NoSubscribe=false. NoAutoSubscribe bool + // If true the peer will not automatically subscribe all data channels, + // The peer can subscribe data channels by creating locally. + // this parameter depends on NoSubscribe=false. + NoAutoSubscribeDataChannels bool } // SessionProvider provides the SessionLocal to the sfu.Peer @@ -111,6 +115,7 @@ func (p *PeerLocal) Join(sid, uid string, config ...JoinConfig) error { } p.subscriber.noAutoSubscribe = conf.NoAutoSubscribe + p.subscriber.noAutoSubscribeDataChannels = conf.NoAutoSubscribeDataChannels p.subscriber.OnNegotiationNeeded(func() { p.Lock() diff --git a/pkg/sfu/session.go b/pkg/sfu/session.go index 9d7895e73..80f9dc902 100644 --- a/pkg/sfu/session.go +++ b/pkg/sfu/session.go @@ -159,6 +159,9 @@ func (s *SessionLocal) AddDatachannel(owner string, dc *webrtc.DataChannel) { label := dc.Label() s.mu.Lock() + peerOwner := s.peers[owner] + peerOwner.Subscriber().RegisterDatachannel(label, dc) + for _, lbl := range s.fanOutDCs { if label == lbl { dc.OnMessage(func(msg webrtc.DataChannelMessage) { @@ -169,10 +172,8 @@ func (s *SessionLocal) AddDatachannel(owner string, dc *webrtc.DataChannel) { } } s.fanOutDCs = append(s.fanOutDCs, label) - peerOwner := s.peers[owner] s.mu.Unlock() peers := s.Peers() - peerOwner.Subscriber().RegisterDatachannel(label, dc) dc.OnMessage(func(msg webrtc.DataChannelMessage) { s.FanOutMessage(owner, label, msg) @@ -180,7 +181,7 @@ func (s *SessionLocal) AddDatachannel(owner string, dc *webrtc.DataChannel) { for _, p := range peers { peer := p - if peer.ID() == owner || peer.Subscriber() == nil { + if peer.ID() == owner || peer.Subscriber() == nil || peer.Subscriber().noAutoSubscribeDataChannels { continue } ndc, err := peer.Subscriber().AddDataChannel(label) @@ -251,6 +252,10 @@ func (s *SessionLocal) Subscribe(peer Peer) { // Subscribe to fan out data channels for _, label := range fdc { + if peer.Subscriber().noAutoSubscribeDataChannels { + continue + } + dc, err := peer.Subscriber().AddDataChannel(label) if err != nil { Logger.Error(err, "error adding datachannel") diff --git a/pkg/sfu/subscriber.go b/pkg/sfu/subscriber.go index 63ea9e0c2..320640f65 100644 --- a/pkg/sfu/subscriber.go +++ b/pkg/sfu/subscriber.go @@ -27,7 +27,8 @@ type Subscriber struct { negotiate func() closeOnce sync.Once - noAutoSubscribe bool + noAutoSubscribe bool + noAutoSubscribeDataChannels bool } // NewSubscriber creates a new Subscriber @@ -46,12 +47,13 @@ func NewSubscriber(id string, cfg WebRTCTransportConfig) (*Subscriber, error) { } s := &Subscriber{ - id: id, - me: me, - pc: pc, - tracks: make(map[string][]*DownTrack), - channels: make(map[string]*webrtc.DataChannel), - noAutoSubscribe: false, + id: id, + me: me, + pc: pc, + tracks: make(map[string][]*DownTrack), + channels: make(map[string]*webrtc.DataChannel), + noAutoSubscribe: false, + noAutoSubscribeDataChannels: false, } pc.OnICEConnectionStateChange(func(connectionState webrtc.ICEConnectionState) { From ef75ad19f85fabd3930fca7256d1b6f7a4e06342 Mon Sep 17 00:00:00 2001 From: Matt Date: Thu, 9 Jun 2022 23:31:50 +1200 Subject: [PATCH 2/2] chore: added configuration remap in grpc server.go --- cmd/signal/grpc/server/server.go | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/cmd/signal/grpc/server/server.go b/cmd/signal/grpc/server/server.go index 561d1bb51..0054c9bc2 100644 --- a/cmd/signal/grpc/server/server.go +++ b/cmd/signal/grpc/server/server.go @@ -185,10 +185,16 @@ func (s *SFUServer) Signal(sig rtc.RTC_SignalServer) error { noautosub = val == "true" } + noautosubdc := false + if val, found := payload.Join.Config["NoAutoSubscribeDataChannels"]; found { + noautosubdc = val == "true" + } + cfg := sfu.JoinConfig{ - NoPublish: nopub, - NoSubscribe: nosub, - NoAutoSubscribe: noautosub, + NoPublish: nopub, + NoSubscribe: nosub, + NoAutoSubscribe: noautosub, + NoAutoSubscribeDataChannels: noautosubdc, } err = peer.Join(sid, uid, cfg)