diff --git a/go.mod b/go.mod index 544bdf3d..0d873aa7 100644 --- a/go.mod +++ b/go.mod @@ -1,14 +1,15 @@ module github.com/pion/interceptor -go 1.21.0 +go 1.24.0 require ( + github.com/pion/bwe v0.0.0-20260204132927-94cf8ba60868 github.com/pion/logging v0.2.4 github.com/pion/rtcp v1.2.16 github.com/pion/rtp v1.10.1 github.com/pion/transport/v3 v3.1.1 github.com/stretchr/testify v1.11.1 - golang.org/x/time v0.10.0 + golang.org/x/time v0.14.0 ) require ( diff --git a/go.sum b/go.sum index 69ed7558..a4f3e17e 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,7 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/pion/bwe v0.0.0-20260204132927-94cf8ba60868 h1:XbdptYjCt5Im8ilmIK3srO0VF7tm5NYKqwkdyShgplY= +github.com/pion/bwe v0.0.0-20260204132927-94cf8ba60868/go.mod h1:k5nSfHOk0AeVIKKqtsAykZDaUwk7iAwbMkeQqbHb7Ag= github.com/pion/logging v0.2.4 h1:tTew+7cmQ+Mc1pTBLKH2puKsOvhm32dROumOZ655zB8= github.com/pion/logging v0.2.4/go.mod h1:DffhXTKYdNZU+KtJ5pyQDjvOAh/GsNSyv1lbkFbe3so= github.com/pion/randutil v0.1.0 h1:CFG1UdESneORglEsnimhUjf33Rwjubwj6xfiOXBa3mA= @@ -14,10 +16,10 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= -golang.org/x/net v0.34.0 h1:Mb7Mrk043xzHgnRM88suvJFwzVrRfHEHJEl5/71CKw0= -golang.org/x/net v0.34.0/go.mod h1:di0qlW3YNM5oh6GqDGQr92MyTozJPmybPK4Ev/Gm31k= -golang.org/x/time v0.10.0 h1:3usCWA8tQn0L8+hFJQNgzpWbd89begxN66o1Ojdn5L4= -golang.org/x/time v0.10.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= +golang.org/x/net v0.35.0 h1:T5GQRQb2y08kTAByq9L4/bz8cipCdA8FbRTXewonqY8= +golang.org/x/net v0.35.0/go.mod h1:EglIi67kWsHKlRzzVMUD93VMSWGFOMSZgxFjparz1Qk= +golang.org/x/time v0.14.0 h1:MRx4UaLrDotUKUdCIqzPC48t1Y9hANFKIRpNx+Te8PI= +golang.org/x/time v0.14.0/go.mod h1:eL/Oa2bBBK0TkX57Fyni+NgnyQQN4LitPmob2Hjnqw4= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= diff --git a/pkg/cc/interceptor.go b/pkg/cc/interceptor.go index 4373d282..79182a32 100644 --- a/pkg/cc/interceptor.go +++ b/pkg/cc/interceptor.go @@ -3,23 +3,35 @@ // Package cc implements an interceptor for bandwidth estimation that can be // used with different BandwidthEstimators. +// +// Deprecated: Directly use the bwe implementation in +// https://github.com/pion/bwe instead. package cc import ( + "fmt" + "github.com/pion/interceptor" - "github.com/pion/interceptor/pkg/gcc" + "github.com/pion/interceptor/pkg/gcc" //nolint + "github.com/pion/interceptor/pkg/rtpfb" "github.com/pion/rtcp" ) // Option can be used to set initial options on CC interceptors. +// +// Deprecated: See package comment. type Option func(*Interceptor) error // BandwidthEstimatorFactory creates new BandwidthEstimators. +// +// Deprecated: See package comment. type BandwidthEstimatorFactory func() (BandwidthEstimator, error) // BandwidthEstimator is the interface that will be returned by a // NewPeerConnectionCallback and can be used to query current bandwidth // metrics and add feedback manually. +// +// Deprecated: See package comment. type BandwidthEstimator interface { AddStream(*interceptor.StreamInfo, interceptor.RTPWriter) interceptor.RTPWriter WriteRTCP([]rtcp.Packet, interceptor.Attributes) error @@ -31,37 +43,53 @@ type BandwidthEstimator interface { // NewPeerConnectionCallback returns the BandwidthEstimator for the // PeerConnection with id. +// +// Deprecated: See package comment. type NewPeerConnectionCallback func(id string, estimator BandwidthEstimator) // InterceptorFactory is a factory for CC interceptors. +// +// Deprecated: See package comment. type InterceptorFactory struct { opts []Option bweFactory func() (BandwidthEstimator, error) addPeerConnection NewPeerConnectionCallback + rtpfbFactory *rtpfb.InterceptorFactory } // NewInterceptor returns a new CC interceptor factory. +// +// Deprecated: See package comment. func NewInterceptor(factory BandwidthEstimatorFactory, opts ...Option) (*InterceptorFactory, error) { if factory == nil { factory = func() (BandwidthEstimator, error) { return gcc.NewSendSideBWE() } } + fbi, err := rtpfb.NewInterceptor() + if err != nil { + return nil, fmt.Errorf("failed to create rtp feedback interceptor factory: %w", err) + } return &InterceptorFactory{ opts: opts, bweFactory: factory, addPeerConnection: nil, + rtpfbFactory: fbi, }, nil } // OnNewPeerConnection sets a callback that is called when a new CC interceptor // is created. +// +// Deprecated: See package comment. func (f *InterceptorFactory) OnNewPeerConnection(cb NewPeerConnectionCallback) { f.addPeerConnection = cb } // NewInterceptor returns a new CC interceptor. +// +// Deprecated: See package comment. func (f *InterceptorFactory) NewInterceptor(id string) (interceptor.Interceptor, error) { bwe, err := f.bweFactory() if err != nil { @@ -75,7 +103,7 @@ func (f *InterceptorFactory) NewInterceptor(id string) (interceptor.Interceptor, } for _, opt := range f.opts { - if err := opt(interceptorInstance); err != nil { + if err = opt(interceptorInstance); err != nil { return nil, err } } @@ -83,11 +111,17 @@ func (f *InterceptorFactory) NewInterceptor(id string) (interceptor.Interceptor, if f.addPeerConnection != nil { f.addPeerConnection(id, interceptorInstance.estimator) } + fbi, err := f.rtpfbFactory.NewInterceptor(id) + if err != nil { + return nil, err + } - return interceptorInstance, nil + return interceptor.NewChain([]interceptor.Interceptor{fbi, interceptorInstance}), nil } // Interceptor implements Google Congestion Control. +// +// Deprecated: See package comment. type Interceptor struct { interceptor.NoOp estimator BandwidthEstimator @@ -98,6 +132,8 @@ type Interceptor struct { // BindRTCPReader lets you modify any incoming RTCP packets. It is called once // per sender/receiver, however this might change in the future. The returned // method will be called once per packet batch. +// +// Deprecated: See package comment. func (c *Interceptor) BindRTCPReader(reader interceptor.RTCPReader) interceptor.RTCPReader { return interceptor.RTCPReaderFunc(func(b []byte, a interceptor.Attributes) (int, interceptor.Attributes, error) { i, attr, err := reader.Read(b, a) @@ -126,6 +162,8 @@ func (c *Interceptor) BindRTCPReader(reader interceptor.RTCPReader) interceptor. // BindLocalStream lets you modify any outgoing RTP packets. It is called once // for per LocalStream. The returned method will be called once per rtp packet. +// +// Deprecated: See package comment. func (c *Interceptor) BindLocalStream( info *interceptor.StreamInfo, writer interceptor.RTPWriter, ) interceptor.RTPWriter { @@ -133,6 +171,8 @@ func (c *Interceptor) BindLocalStream( } // Close closes the interceptor and the associated bandwidth estimator. +// +// Deprecated: See package comment. func (c *Interceptor) Close() error { return c.estimator.Close() } diff --git a/pkg/gcc/adaptive_threshold.go b/pkg/gcc/adaptive_threshold.go deleted file mode 100644 index 7e20ca89..00000000 --- a/pkg/gcc/adaptive_threshold.go +++ /dev/null @@ -1,106 +0,0 @@ -// SPDX-FileCopyrightText: 2026 The Pion community -// SPDX-License-Identifier: MIT - -package gcc - -import ( - "math" - "time" -) - -const ( - maxDeltas = 60 -) - -type adaptiveThresholdOption func(*adaptiveThreshold) - -func setInitialThreshold(t time.Duration) adaptiveThresholdOption { - return func(at *adaptiveThreshold) { - at.thresh = t - } -} - -// adaptiveThreshold implements a threshold that continuously adapts depending on -// the current measurements/estimates. This is necessary to avoid starving GCC -// in the presence of concurrent TCP flows by allowing larger Queueing delays, -// when measurements/estimates increase. overuseCoefficientU and -// overuseCoefficientD define by how much the threshold adapts. We basically -// want the threshold to increase fast, if the measurement is outside [-thresh, -// thresh] and decrease slowly if it is within. -// -// See https://datatracker.ietf.org/doc/html/draft-ietf-rmcat-gcc-02#section-5.4 -// or [Analysis and Design of the Google Congestion Control for Web Real-time -// Communication (WebRTC)](https://c3lab.poliba.it/images/6/65/Gcc-analysis.pdf) -// for a more detailed description. -type adaptiveThreshold struct { - thresh time.Duration - overuseCoefficientUp float64 - overuseCoefficientDown float64 - min time.Duration - max time.Duration - lastUpdate time.Time - numDeltas int -} - -// newAdaptiveThreshold initializes a new adaptiveThreshold with default -// values taken from draft-ietf-rmcat-gcc-02. -func newAdaptiveThreshold(opts ...adaptiveThresholdOption) *adaptiveThreshold { - at := &adaptiveThreshold{ - thresh: time.Duration(12500 * float64(time.Microsecond)), - overuseCoefficientUp: 0.01, - overuseCoefficientDown: 0.00018, - min: 6 * time.Millisecond, - max: 600 * time.Millisecond, - lastUpdate: time.Time{}, - numDeltas: 0, - } - for _, opt := range opts { - opt(at) - } - - return at -} - -func (a *adaptiveThreshold) compare(estimate, _ time.Duration) (usage, time.Duration, time.Duration) { - a.numDeltas++ - if a.numDeltas < 2 { - return usageNormal, estimate, a.max - } - t := time.Duration(min(a.numDeltas, maxDeltas)) * estimate - use := usageNormal - if t > a.thresh { - use = usageOver - } else if t < -a.thresh { - use = usageUnder - } - thresh := a.thresh - a.update(t) - - return use, t, thresh -} - -func (a *adaptiveThreshold) update(estimate time.Duration) { - now := time.Now() - if a.lastUpdate.IsZero() { - a.lastUpdate = now - } - absEstimate := time.Duration(math.Abs(float64(estimate.Microseconds()))) * time.Microsecond - if absEstimate > a.thresh+15*time.Millisecond { - a.lastUpdate = now - - return - } - k := a.overuseCoefficientUp - if absEstimate < a.thresh { - k = a.overuseCoefficientDown - } - maxTimeDelta := 100 * time.Millisecond - timeDelta := time.Duration( - min(int(now.Sub(a.lastUpdate).Milliseconds()), int(maxTimeDelta.Milliseconds())), - ) * time.Millisecond - d := absEstimate - a.thresh - add := k * float64(d.Milliseconds()) * float64(timeDelta.Milliseconds()) - a.thresh += time.Duration(add*1000) * time.Microsecond - a.thresh = clampDuration(a.thresh, a.min, a.max) - a.lastUpdate = now -} diff --git a/pkg/gcc/adaptive_threshold_test.go b/pkg/gcc/adaptive_threshold_test.go deleted file mode 100644 index c88d4e1a..00000000 --- a/pkg/gcc/adaptive_threshold_test.go +++ /dev/null @@ -1,145 +0,0 @@ -// SPDX-FileCopyrightText: 2026 The Pion community -// SPDX-License-Identifier: MIT - -package gcc - -import ( - "testing" - "time" - - "github.com/stretchr/testify/assert" -) - -func TestAdaptiveThreshold(t *testing.T) { - type input struct { - estimate, delta time.Duration - } - cases := []struct { - name string - in []input - expected []usage - options []adaptiveThresholdOption - }{ - { - name: "empty", - in: []input{}, - expected: []usage{}, - options: []adaptiveThresholdOption{}, - }, - { - name: "firstInputIsAlwaysNormal", - in: []input{{ - estimate: 1 * time.Second, - delta: 0, - }}, - expected: []usage{usageNormal}, - options: []adaptiveThresholdOption{}, - }, - { - name: "singleOver", - in: []input{ - { - estimate: 0, - delta: 0, - }, - { - estimate: 20 * time.Millisecond, - delta: 0, - }, - }, - expected: []usage{usageNormal, usageOver}, - options: []adaptiveThresholdOption{ - setInitialThreshold(10 * time.Millisecond), - }, - }, - { - name: "singleNormal", - in: []input{ - { - estimate: 0, - delta: 0, - }, - { - estimate: 5 * time.Millisecond, - delta: 0, - }, - }, - expected: []usage{usageNormal, usageNormal}, - options: []adaptiveThresholdOption{ - setInitialThreshold(10 * time.Millisecond), - }, - }, - { - name: "singleUnder", - in: []input{ - { - estimate: 0, - delta: 0, - }, - { - estimate: -20 * time.Millisecond, - delta: 0, - }, - }, - expected: []usage{usageNormal, usageUnder}, - options: []adaptiveThresholdOption{ - setInitialThreshold(10 * time.Millisecond), - }, - }, - { - name: "increaseThresholdOnOveruse", - in: []input{ - { - estimate: 0, - delta: 0, - }, - { - estimate: 25 * time.Millisecond, - delta: 30 * time.Millisecond, - }, - { - estimate: 13 * time.Millisecond, - delta: 30 * time.Millisecond, - }, - }, - expected: []usage{usageNormal, usageOver, usageNormal}, - options: []adaptiveThresholdOption{ - setInitialThreshold(40 * time.Millisecond), - }, - }, - { - name: "overuseAfterOveruse", - in: []input{ - { - estimate: 0, - delta: 0, - }, - { - estimate: 20 * time.Millisecond, - delta: 30 * time.Millisecond, - }, - { - estimate: 30 * time.Millisecond, - delta: 30 * time.Millisecond, - }, - }, - expected: []usage{usageNormal, usageOver, usageOver}, - options: []adaptiveThresholdOption{ - setInitialThreshold(10 * time.Millisecond), - }, - }, - } - - for _, tc := range cases { - tc := tc - t.Run(tc.name, func(t *testing.T) { - threshold := newAdaptiveThreshold(tc.options...) - usages := []usage{} - for _, in := range tc.in { - use, _, _ := threshold.compare(in.estimate, in.delta) - usages = append(usages, use) - } - assert.Equal(t, tc.expected, usages, "%v != %v", tc.expected, usages) - }) - } -} diff --git a/pkg/gcc/arrival_group.go b/pkg/gcc/arrival_group.go deleted file mode 100644 index bf70307b..00000000 --- a/pkg/gcc/arrival_group.go +++ /dev/null @@ -1,39 +0,0 @@ -// SPDX-FileCopyrightText: 2026 The Pion community -// SPDX-License-Identifier: MIT - -package gcc - -import ( - "fmt" - "time" - - "github.com/pion/interceptor/internal/cc" -) - -type arrivalGroup struct { - packets []cc.Acknowledgment - departure time.Time - arrival time.Time -} - -func newArrivalGroup(a cc.Acknowledgment) arrivalGroup { - return arrivalGroup{ - packets: []cc.Acknowledgment{a}, - departure: a.Departure, - arrival: a.Arrival, - } -} - -func (g *arrivalGroup) add(a cc.Acknowledgment) { - g.packets = append(g.packets, a) - g.arrival = a.Arrival -} - -func (g arrivalGroup) String() string { - s := "ARRIVALGROUP:\n" - s += fmt.Sprintf("\tARRIVAL:\t%v\n", int64(float64(g.arrival.UnixNano())/1e+6)) - s += fmt.Sprintf("\tDEPARTURE:\t%v\n", int64(float64(g.departure.UnixNano())/1e+6)) - s += fmt.Sprintf("\tPACKETS:\n%v\n", g.packets) - - return s -} diff --git a/pkg/gcc/arrival_group_accumulator.go b/pkg/gcc/arrival_group_accumulator.go deleted file mode 100644 index c2fb7dcb..00000000 --- a/pkg/gcc/arrival_group_accumulator.go +++ /dev/null @@ -1,81 +0,0 @@ -// SPDX-FileCopyrightText: 2026 The Pion community -// SPDX-License-Identifier: MIT - -package gcc - -import ( - "time" - - "github.com/pion/interceptor/internal/cc" -) - -type arrivalGroupAccumulator struct { - interDepartureThreshold time.Duration - interArrivalThreshold time.Duration - interGroupDelayVariationTreshold time.Duration -} - -func newArrivalGroupAccumulator() *arrivalGroupAccumulator { - return &arrivalGroupAccumulator{ - interDepartureThreshold: 5 * time.Millisecond, - interArrivalThreshold: 5 * time.Millisecond, - interGroupDelayVariationTreshold: 0, - } -} - -func (a *arrivalGroupAccumulator) run(in <-chan []cc.Acknowledgment, agWriter func(arrivalGroup)) { - init := false - group := arrivalGroup{} - for acks := range in { - for _, next := range acks { - if !init { - group = newArrivalGroup(next) - init = true - - continue - } - if next.Arrival.Before(group.arrival) { - // ignore out of order arrivals - continue - } - if next.Departure.After(group.departure) { - // A sequence of packets which are sent within a burst_time interval - // constitute a group. - if interDepartureTimePkt(group, next) <= a.interDepartureThreshold { - group.add(next) - - continue - } - - // A Packet which has an inter-arrival time less than burst_time and - // an inter-group delay variation d(i) less than 0 is considered - // being part of the current group of packets. - if interArrivalTimePkt(group, next) <= a.interArrivalThreshold && - interGroupDelayVariationPkt(group, next) < a.interGroupDelayVariationTreshold { - group.add(next) - - continue - } - - agWriter(group) - group = newArrivalGroup(next) - } - } - } -} - -func interArrivalTimePkt(group arrivalGroup, ack cc.Acknowledgment) time.Duration { - return ack.Arrival.Sub(group.arrival) -} - -func interDepartureTimePkt(group arrivalGroup, ack cc.Acknowledgment) time.Duration { - if len(group.packets) == 0 { - return 0 - } - - return ack.Departure.Sub(group.departure) -} - -func interGroupDelayVariationPkt(group arrivalGroup, ack cc.Acknowledgment) time.Duration { - return ack.Arrival.Sub(group.arrival) - ack.Departure.Sub(group.departure) -} diff --git a/pkg/gcc/arrival_group_accumulator_test.go b/pkg/gcc/arrival_group_accumulator_test.go deleted file mode 100644 index abbdc60c..00000000 --- a/pkg/gcc/arrival_group_accumulator_test.go +++ /dev/null @@ -1,244 +0,0 @@ -// SPDX-FileCopyrightText: 2026 The Pion community -// SPDX-License-Identifier: MIT - -package gcc - -import ( - "testing" - "time" - - "github.com/pion/interceptor/internal/cc" - "github.com/stretchr/testify/assert" -) - -func TestArrivalGroupAccumulator(t *testing.T) { - triggerNewGroupElement := cc.Acknowledgment{ - Departure: time.Time{}.Add(time.Second), - Arrival: time.Time{}.Add(time.Second), - } - cases := []struct { - name string - log []cc.Acknowledgment - exp []arrivalGroup - }{ - { - name: "emptyCreatesNoGroups", - log: []cc.Acknowledgment{}, - exp: []arrivalGroup{}, - }, - { - name: "createsSingleElementGroup", - log: []cc.Acknowledgment{ - { - Departure: time.Time{}, - Arrival: time.Time{}.Add(time.Millisecond), - }, - triggerNewGroupElement, - }, - exp: []arrivalGroup{ - { - packets: []cc.Acknowledgment{{ - Departure: time.Time{}, - Arrival: time.Time{}.Add(time.Millisecond), - }}, - arrival: time.Time{}.Add(time.Millisecond), - departure: time.Time{}, - }, - }, - }, - { - name: "createsTwoElementGroup", - log: []cc.Acknowledgment{ - { - Arrival: time.Time{}.Add(15 * time.Millisecond), - }, - { - Departure: time.Time{}.Add(3 * time.Millisecond), - Arrival: time.Time{}.Add(20 * time.Millisecond), - }, - triggerNewGroupElement, - }, - exp: []arrivalGroup{{ - packets: []cc.Acknowledgment{ - { - Departure: time.Time{}, - Arrival: time.Time{}.Add(15 * time.Millisecond), - }, - { - Departure: time.Time{}.Add(3 * time.Millisecond), - Arrival: time.Time{}.Add(20 * time.Millisecond), - }, - }, - arrival: time.Time{}.Add(20 * time.Millisecond), - departure: time.Time{}, - }}, - }, - { - name: "createsTwoArrivalGroups", - log: []cc.Acknowledgment{ - { - Departure: time.Time{}, - Arrival: time.Time{}.Add(15 * time.Millisecond), - }, - { - Departure: time.Time{}.Add(3 * time.Millisecond), - Arrival: time.Time{}.Add(20 * time.Millisecond), - }, - { - Departure: time.Time{}.Add(9 * time.Millisecond), - Arrival: time.Time{}.Add(30 * time.Millisecond), - }, - triggerNewGroupElement, - }, - exp: []arrivalGroup{ - { - packets: []cc.Acknowledgment{ - { - Arrival: time.Time{}.Add(15 * time.Millisecond), - }, - { - Departure: time.Time{}.Add(3 * time.Millisecond), - Arrival: time.Time{}.Add(20 * time.Millisecond), - }, - }, - arrival: time.Time{}.Add(20 * time.Millisecond), - departure: time.Time{}.Add(0 * time.Millisecond), - }, - { - packets: []cc.Acknowledgment{ - { - Departure: time.Time{}.Add(9 * time.Millisecond), - Arrival: time.Time{}.Add(30 * time.Millisecond), - }, - }, - arrival: time.Time{}.Add(30 * time.Millisecond), - departure: time.Time{}.Add(9 * time.Millisecond), - }, - }, - }, - { - name: "ignoresOutOfOrderPackets", - log: []cc.Acknowledgment{ - { - Departure: time.Time{}, - Arrival: time.Time{}.Add(15 * time.Millisecond), - }, - { - Departure: time.Time{}.Add(6 * time.Millisecond), - Arrival: time.Time{}.Add(34 * time.Millisecond), - }, - { - Departure: time.Time{}.Add(8 * time.Millisecond), - Arrival: time.Time{}.Add(30 * time.Millisecond), - }, - triggerNewGroupElement, - }, - exp: []arrivalGroup{ - { - packets: []cc.Acknowledgment{ - { - Departure: time.Time{}, - Arrival: time.Time{}.Add(15 * time.Millisecond), - }, - }, - arrival: time.Time{}.Add(15 * time.Millisecond), - departure: time.Time{}, - }, - { - packets: []cc.Acknowledgment{ - { - Departure: time.Time{}.Add(6 * time.Millisecond), - Arrival: time.Time{}.Add(34 * time.Millisecond), - }, - }, - arrival: time.Time{}.Add(34 * time.Millisecond), - departure: time.Time{}.Add(6 * time.Millisecond), - }, - }, - }, - { - name: "newGroupBecauseOfInterDepartureTime", - log: []cc.Acknowledgment{ - { - SequenceNumber: 0, - Departure: time.Time{}, - Arrival: time.Time{}.Add(4 * time.Millisecond), - }, - { - SequenceNumber: 1, - Departure: time.Time{}.Add(3 * time.Millisecond), - Arrival: time.Time{}.Add(4 * time.Millisecond), - }, - { - SequenceNumber: 2, - Departure: time.Time{}.Add(6 * time.Millisecond), - Arrival: time.Time{}.Add(10 * time.Millisecond), - }, - { - SequenceNumber: 3, - Departure: time.Time{}.Add(9 * time.Millisecond), - Arrival: time.Time{}.Add(10 * time.Millisecond), - }, - triggerNewGroupElement, - }, - exp: []arrivalGroup{ - { - packets: []cc.Acknowledgment{ - { - SequenceNumber: 0, - Departure: time.Time{}, - Arrival: time.Time{}.Add(4 * time.Millisecond), - }, - { - SequenceNumber: 1, - Departure: time.Time{}.Add(3 * time.Millisecond), - Arrival: time.Time{}.Add(4 * time.Millisecond), - }, - }, - departure: time.Time{}, - arrival: time.Time{}.Add(4 * time.Millisecond), - }, - { - packets: []cc.Acknowledgment{ - { - SequenceNumber: 2, - Departure: time.Time{}.Add(6 * time.Millisecond), - Arrival: time.Time{}.Add(10 * time.Millisecond), - }, - { - SequenceNumber: 3, - Departure: time.Time{}.Add(9 * time.Millisecond), - Arrival: time.Time{}.Add(10 * time.Millisecond), - }, - }, - departure: time.Time{}.Add(6 * time.Millisecond), - arrival: time.Time{}.Add(10 * time.Millisecond), - }, - }, - }, - } - - for _, tc := range cases { - tc := tc - t.Run(tc.name, func(t *testing.T) { - aga := newArrivalGroupAccumulator() - in := make(chan []cc.Acknowledgment) - out := make(chan arrivalGroup) - go func() { - defer close(out) - aga.run(in, func(ag arrivalGroup) { - out <- ag - }) - }() - go func() { - in <- tc.log - close(in) - }() - received := []arrivalGroup{} - for g := range out { - received = append(received, g) - } - assert.Equal(t, tc.exp, received) - }) - } -} diff --git a/pkg/gcc/arrival_group_test.go b/pkg/gcc/arrival_group_test.go deleted file mode 100644 index a2a79188..00000000 --- a/pkg/gcc/arrival_group_test.go +++ /dev/null @@ -1,132 +0,0 @@ -// SPDX-FileCopyrightText: 2026 The Pion community -// SPDX-License-Identifier: MIT - -package gcc - -import ( - "testing" - "time" - - "github.com/pion/interceptor/internal/cc" - "github.com/stretchr/testify/assert" -) - -func TestArrivalGroup(t *testing.T) { - cases := []struct { - name string - acks []cc.Acknowledgment - expected arrivalGroup - }{ - { - name: "createsEmptyArrivalGroup", - acks: []cc.Acknowledgment{}, - expected: arrivalGroup{ - packets: nil, - arrival: time.Time{}, - departure: time.Time{}, - }, - }, - { - name: "createsArrivalGroupContainingSingleACK", - acks: []cc.Acknowledgment{{ - SequenceNumber: 0, - Size: 0, - Departure: time.Time{}, - Arrival: time.Time{}, - }}, - expected: arrivalGroup{ - packets: []cc.Acknowledgment{{ - SequenceNumber: 0, - Size: 0, - Departure: time.Time{}, - Arrival: time.Time{}, - }}, - arrival: time.Time{}, - departure: time.Time{}, - }, - }, - { - name: "setsTimesToLastACK", - acks: []cc.Acknowledgment{{ - SequenceNumber: 0, - Size: 0, - Departure: time.Time{}, - Arrival: time.Time{}, - }, { - SequenceNumber: 0, - Size: 0, - Departure: time.Time{}.Add(time.Second), - Arrival: time.Time{}.Add(time.Second), - }}, - expected: arrivalGroup{ - packets: []cc.Acknowledgment{{ - SequenceNumber: 0, - Size: 0, - Departure: time.Time{}, - Arrival: time.Time{}, - }, { - SequenceNumber: 0, - Size: 0, - Departure: time.Time{}.Add(time.Second), - Arrival: time.Time{}.Add(time.Second), - }}, - arrival: time.Time{}.Add(time.Second), - departure: time.Time{}, - }, - }, - { - name: "departure time of group is the departure time of the first packet in the group", - acks: []cc.Acknowledgment{{ - SequenceNumber: 0, - Size: 0, - Departure: time.Time{}.Add(27 * time.Millisecond), - Arrival: time.Time{}, - }, { - SequenceNumber: 1, - Size: 1, - Departure: time.Time{}.Add(32 * time.Millisecond), - Arrival: time.Time{}.Add(37 * time.Millisecond), - }, { - SequenceNumber: 2, - Size: 2, - Departure: time.Time{}.Add(50 * time.Millisecond), - Arrival: time.Time{}.Add(56 * time.Millisecond), - }}, - expected: arrivalGroup{ - packets: []cc.Acknowledgment{{ - SequenceNumber: 0, - Size: 0, - Departure: time.Time{}.Add(27 * time.Millisecond), - Arrival: time.Time{}, - }, { - SequenceNumber: 1, - Size: 1, - Departure: time.Time{}.Add(32 * time.Millisecond), - Arrival: time.Time{}.Add(37 * time.Millisecond), - }, { - SequenceNumber: 2, - Size: 2, - Departure: time.Time{}.Add(50 * time.Millisecond), - Arrival: time.Time{}.Add(56 * time.Millisecond), - }}, - arrival: time.Time{}.Add(56 * time.Millisecond), - departure: time.Time{}.Add(27 * time.Millisecond), - }, - }, - } - - for _, tc := range cases { - tc := tc - t.Run(tc.name, func(t *testing.T) { - ag := arrivalGroup{} - for i, ack := range tc.acks { - if i == 0 { - ag = newArrivalGroup(ack) - } else { - ag.add(ack) - } - } - assert.Equal(t, tc.expected, ag) - }) - } -} diff --git a/pkg/gcc/delay_based_bwe.go b/pkg/gcc/delay_based_bwe.go deleted file mode 100644 index 6de38f51..00000000 --- a/pkg/gcc/delay_based_bwe.go +++ /dev/null @@ -1,109 +0,0 @@ -// SPDX-FileCopyrightText: 2026 The Pion community -// SPDX-License-Identifier: MIT - -package gcc - -import ( - "sync" - "time" - - "github.com/pion/interceptor/internal/cc" - "github.com/pion/logging" -) - -// DelayStats contains some internal statistics of the delay based congestion -// controller. -type DelayStats struct { - Measurement time.Duration - Estimate time.Duration - Threshold time.Duration - LastReceiveDelta time.Duration - - Usage usage - State state - TargetBitrate int -} - -type now func() time.Time - -type delayController struct { - ackPipe chan<- []cc.Acknowledgment - ackRatePipe chan<- []cc.Acknowledgment - - *arrivalGroupAccumulator - *rateController - - onUpdateCallback func(DelayStats) - - wg sync.WaitGroup - - log logging.LeveledLogger -} - -type delayControllerConfig struct { - nowFn now - initialBitrate int - minBitrate int - maxBitrate int -} - -func newDelayController(delayConfig delayControllerConfig, loggerFactory logging.LoggerFactory) *delayController { - ackPipe := make(chan []cc.Acknowledgment) - ackRatePipe := make(chan []cc.Acknowledgment) - - delayController := &delayController{ - ackPipe: ackPipe, - ackRatePipe: ackRatePipe, - arrivalGroupAccumulator: nil, - rateController: nil, - onUpdateCallback: nil, - wg: sync.WaitGroup{}, - log: loggerFactory.NewLogger("gcc_delay_controller"), - } - - rateController := newRateController( - delayConfig.nowFn, delayConfig.initialBitrate, delayConfig.minBitrate, delayConfig.maxBitrate, - func(ds DelayStats) { - delayController.log.Infof("delaystats: %v", ds) - if delayController.onUpdateCallback != nil { - delayController.onUpdateCallback(ds) - } - }, - ) - delayController.rateController = rateController - overuseDetector := newOveruseDetector(newAdaptiveThreshold(), 10*time.Millisecond, rateController.onDelayStats) - slopeEstimator := newSlopeEstimator(newKalman(), overuseDetector.onDelayStats) - arrivalGroupAccumulator := newArrivalGroupAccumulator() - - rc := newRateCalculator(500 * time.Millisecond) - - delayController.wg.Add(2) - go func() { - defer delayController.wg.Done() - arrivalGroupAccumulator.run(ackPipe, slopeEstimator.onArrivalGroup) - }() - go func() { - defer delayController.wg.Done() - rc.run(ackRatePipe, rateController.onReceivedRate) - }() - - return delayController -} - -func (d *delayController) onUpdate(f func(DelayStats)) { - d.onUpdateCallback = f -} - -func (d *delayController) updateDelayEstimate(acks []cc.Acknowledgment) { - d.ackPipe <- acks - d.ackRatePipe <- acks -} - -func (d *delayController) Close() error { - defer d.wg.Wait() - - close(d.ackPipe) - close(d.ackRatePipe) - - return nil -} diff --git a/pkg/gcc/gcc.go b/pkg/gcc/gcc.go index 348031bc..d28c4ff9 100644 --- a/pkg/gcc/gcc.go +++ b/pkg/gcc/gcc.go @@ -2,14 +2,6 @@ // SPDX-License-Identifier: MIT // Package gcc implements Google Congestion Control for bandwidth estimation +// +// Deprecated: Use https://github.com/pion/bwe instead. package gcc - -import "time" - -func clampInt(b, minVal, maxVal int) int { - return max(minVal, min(maxVal, b)) -} - -func clampDuration(d, minVal, maxVal time.Duration) time.Duration { - return time.Duration(clampInt(int(d), int(minVal), int(maxVal))) -} diff --git a/pkg/gcc/gcc_test.go b/pkg/gcc/gcc_test.go deleted file mode 100644 index 81d30860..00000000 --- a/pkg/gcc/gcc_test.go +++ /dev/null @@ -1,65 +0,0 @@ -// SPDX-FileCopyrightText: 2026 The Pion community -// SPDX-License-Identifier: MIT - -package gcc - -import ( - "fmt" - "testing" - "time" - - "github.com/stretchr/testify/assert" -) - -func TestClamp(t *testing.T) { - tests := []struct { - expected int - x int - min int - max int - }{ - { - expected: 50, - x: 50, - min: 0, - max: 100, - }, - { - expected: 50, - x: 50, - min: 50, - max: 100, - }, - { - expected: 100, - x: 100, - min: 0, - max: 100, - }, - { - expected: 50, - x: 3, - min: 50, - max: 100, - }, - { - expected: 100, - x: 150, - min: 0, - max: 100, - }, - } - for i, tt := range tests { - tt := tt - t.Run(fmt.Sprintf("int/%v", i), func(t *testing.T) { - assert.Equal(t, tt.expected, clampInt(tt.x, tt.min, tt.max)) - }) - t.Run(fmt.Sprintf("duration/%v", i), func(t *testing.T) { - x := time.Duration(tt.x) - minVal := time.Duration(tt.min) - maxVal := time.Duration(tt.max) - expected := time.Duration(tt.expected) - assert.Equal(t, expected, clampDuration(x, minVal, maxVal)) - }) - } -} diff --git a/pkg/gcc/kalman.go b/pkg/gcc/kalman.go deleted file mode 100644 index c11996f8..00000000 --- a/pkg/gcc/kalman.go +++ /dev/null @@ -1,97 +0,0 @@ -// SPDX-FileCopyrightText: 2026 The Pion community -// SPDX-License-Identifier: MIT - -package gcc - -import ( - "math" - "time" -) - -const ( - chi = 0.001 -) - -type kalmanOption func(*kalman) - -type kalman struct { - gain float64 - estimate time.Duration - processUncertainty float64 // Q_i - estimateError float64 - measurementUncertainty float64 - - disableMeasurementUncertaintyUpdates bool -} - -func initEstimate(e time.Duration) kalmanOption { - return func(k *kalman) { - k.estimate = e - } -} - -func initProcessUncertainty(p float64) kalmanOption { - return func(k *kalman) { - k.processUncertainty = p - } -} - -func initEstimateError(e float64) kalmanOption { - return func(k *kalman) { - k.estimateError = e * e // Only need variance from now on - } -} - -func initMeasurementUncertainty(u float64) kalmanOption { - return func(k *kalman) { - k.measurementUncertainty = u - } -} - -func setDisableMeasurementUncertaintyUpdates(b bool) kalmanOption { - return func(k *kalman) { - k.disableMeasurementUncertaintyUpdates = b - } -} - -func newKalman(opts ...kalmanOption) *kalman { - k := &kalman{ - gain: 0, - estimate: 0, - processUncertainty: 1e-3, - estimateError: 0.1, - measurementUncertainty: 0, - disableMeasurementUncertaintyUpdates: false, - } - for _, opt := range opts { - opt(k) - } - - return k -} - -func (k *kalman) updateEstimate(measurement time.Duration) time.Duration { - z := measurement - k.estimate - - zms := float64(z.Microseconds()) / 1000.0 - - if !k.disableMeasurementUncertaintyUpdates { - alpha := math.Pow((1 - chi), 30.0/(1000.0*5*float64(time.Millisecond))) - root := math.Sqrt(k.measurementUncertainty) - root3 := 3 * root - if zms > root3 { - k.measurementUncertainty = math.Max(alpha*k.measurementUncertainty+(1-alpha)*root3*root3, 1) - } else { - k.measurementUncertainty = math.Max(alpha*k.measurementUncertainty+(1-alpha)*zms*zms, 1) - } - } - - estimateUncertainty := k.estimateError + k.processUncertainty - k.gain = estimateUncertainty / (estimateUncertainty + k.measurementUncertainty) - - k.estimate += time.Duration(k.gain * zms * float64(time.Millisecond)) - - k.estimateError = (1 - k.gain) * estimateUncertainty - - return k.estimate -} diff --git a/pkg/gcc/kalman_test.go b/pkg/gcc/kalman_test.go deleted file mode 100644 index 2686cff1..00000000 --- a/pkg/gcc/kalman_test.go +++ /dev/null @@ -1,71 +0,0 @@ -// SPDX-FileCopyrightText: 2026 The Pion community -// SPDX-License-Identifier: MIT - -package gcc - -import ( - "testing" - "time" - - "github.com/stretchr/testify/assert" -) - -func TestKalman(t *testing.T) { - cases := []struct { - name string - opts []kalmanOption - measurements []time.Duration - expected []time.Duration - }{ - { - name: "empty", - opts: []kalmanOption{}, - measurements: []time.Duration{}, - expected: []time.Duration{}, - }, - { - name: "kalmanfilter.netExample", - opts: []kalmanOption{ - initEstimate(10 * time.Millisecond), - initEstimateError(100), - initProcessUncertainty(0.15), - initMeasurementUncertainty(0.01), - }, - measurements: []time.Duration{ - time.Duration(50.45 * float64(time.Millisecond)), - time.Duration(50.967 * float64(time.Millisecond)), - time.Duration(51.6 * float64(time.Millisecond)), - time.Duration(52.106 * float64(time.Millisecond)), - time.Duration(52.492 * float64(time.Millisecond)), - time.Duration(52.819 * float64(time.Millisecond)), - time.Duration(53.433 * float64(time.Millisecond)), - time.Duration(54.007 * float64(time.Millisecond)), - time.Duration(54.523 * float64(time.Millisecond)), - time.Duration(54.99 * float64(time.Millisecond)), - }, - expected: []time.Duration{ - time.Duration(50.449959 * float64(time.Millisecond)), - time.Duration(50.936547 * float64(time.Millisecond)), - time.Duration(51.560411 * float64(time.Millisecond)), - time.Duration(52.07324 * float64(time.Millisecond)), - time.Duration(52.466566 * float64(time.Millisecond)), - time.Duration(52.797787 * float64(time.Millisecond)), - time.Duration(53.395303 * float64(time.Millisecond)), - time.Duration(53.970236 * float64(time.Millisecond)), - time.Duration(54.489652 * float64(time.Millisecond)), - time.Duration(54.960137 * float64(time.Millisecond)), - }, - }, - } - for _, tc := range cases { - tc := tc - t.Run(tc.name, func(t *testing.T) { - k := newKalman(append(tc.opts, setDisableMeasurementUncertaintyUpdates(true))...) - estimates := []time.Duration{} - for _, m := range tc.measurements { - estimates = append(estimates, k.updateEstimate(m)) - } - assert.Equal(t, tc.expected, estimates, "%v != %v", tc.expected, estimates) - }) - } -} diff --git a/pkg/gcc/leaky_bucket_pacer.go b/pkg/gcc/leaky_bucket_pacer.go index ffa63ffe..0e88ef49 100644 --- a/pkg/gcc/leaky_bucket_pacer.go +++ b/pkg/gcc/leaky_bucket_pacer.go @@ -24,6 +24,9 @@ type item struct { } // LeakyBucketPacer implements a leaky bucket pacing algorithm. +// +// Deprecated: Use the pacer interceptor directly (see also comment on +// SendSideBWE). type LeakyBucketPacer struct { log logging.LeveledLogger @@ -44,6 +47,8 @@ type LeakyBucketPacer struct { } // NewLeakyBucketPacer initializes a new LeakyBucketPacer. +// +// Deprecated: See comment on LeakyBucketPacer. func NewLeakyBucketPacer(initialBitrate int) *LeakyBucketPacer { return newLeakyBucketPacer(initialBitrate, logging.NewDefaultLoggerFactory()) } @@ -74,6 +79,8 @@ func newLeakyBucketPacer(initialBitrate int, loggerFactory logging.LoggerFactory } // AddStream adds a new stream and its corresponding writer to the pacer. +// +// Deprecated: See comment on LeakyBucketPacer. func (p *LeakyBucketPacer) AddStream(ssrc uint32, writer interceptor.RTPWriter) { p.writerLock.Lock() defer p.writerLock.Unlock() @@ -82,6 +89,8 @@ func (p *LeakyBucketPacer) AddStream(ssrc uint32, writer interceptor.RTPWriter) // SetTargetBitrate updates the target bitrate at which the pacer is allowed to // send packets. The pacer may exceed this limit by p.f. +// +// Deprecated: See comment on LeakyBucketPacer. func (p *LeakyBucketPacer) SetTargetBitrate(rate int) { p.targetBitrateLock.Lock() defer p.targetBitrateLock.Unlock() @@ -97,6 +106,8 @@ func (p *LeakyBucketPacer) getTargetBitrate() int { // Write sends a packet with header and payload the a previously registered // stream. +// +// Deprecated: See comment on LeakyBucketPacer. func (p *LeakyBucketPacer) Write(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) { buf, ok := p.pool.Get().(*[]byte) if !ok { @@ -119,6 +130,8 @@ func (p *LeakyBucketPacer) Write(header *rtp.Header, payload []byte, attributes } // Run starts the LeakyBucketPacer. +// +// Deprecated: See comment on LeakyBucketPacer. func (p *LeakyBucketPacer) Run() { ticker := time.NewTicker(p.pacingInterval) defer ticker.Stop() @@ -168,6 +181,8 @@ func (p *LeakyBucketPacer) Run() { } // Close closes the LeakyBucketPacer. +// +// Deprecated: See comment on LeakyBucketPacer. func (p *LeakyBucketPacer) Close() error { close(p.done) diff --git a/pkg/gcc/loss_based_bwe.go b/pkg/gcc/loss_based_bwe.go deleted file mode 100644 index 9898737c..00000000 --- a/pkg/gcc/loss_based_bwe.go +++ /dev/null @@ -1,115 +0,0 @@ -// SPDX-FileCopyrightText: 2026 The Pion community -// SPDX-License-Identifier: MIT - -package gcc - -import ( - "math" - "sync" - "time" - - "github.com/pion/interceptor/internal/cc" - "github.com/pion/logging" -) - -const ( - // constants from - // https://datatracker.ietf.org/doc/html/draft-ietf-rmcat-gcc-02#section-6 - - increaseLossThreshold = 0.02 - increaseTimeThreshold = 200 * time.Millisecond - increaseFactor = 1.05 - - decreaseLossThreshold = 0.1 - decreaseTimeThreshold = 200 * time.Millisecond -) - -// LossStats contains internal statistics of the loss based controller. -type LossStats struct { - TargetBitrate int - AverageLoss float64 -} - -type lossBasedBandwidthEstimator struct { - lock sync.Mutex - maxBitrate int - minBitrate int - bitrate int - averageLoss float64 - lastLossUpdate time.Time - lastIncrease time.Time - lastDecrease time.Time - log logging.LeveledLogger -} - -func newLossBasedBWE(initialBitrate int, loggerFactory logging.LoggerFactory) *lossBasedBandwidthEstimator { - return &lossBasedBandwidthEstimator{ - lock: sync.Mutex{}, - maxBitrate: 100_000_000, // 100 mbit - minBitrate: 100_000, // 100 kbit - bitrate: initialBitrate, - averageLoss: 0, - lastLossUpdate: time.Time{}, - lastIncrease: time.Time{}, - lastDecrease: time.Time{}, - log: loggerFactory.NewLogger("gcc_loss_controller"), - } -} - -func (e *lossBasedBandwidthEstimator) getEstimate(wantedRate int) LossStats { - e.lock.Lock() - defer e.lock.Unlock() - - if e.bitrate <= 0 { - e.bitrate = clampInt(wantedRate, e.minBitrate, e.maxBitrate) - } - e.bitrate = min(wantedRate, e.bitrate) - - return LossStats{ - TargetBitrate: e.bitrate, - AverageLoss: e.averageLoss, - } -} - -func (e *lossBasedBandwidthEstimator) updateLossEstimate(results []cc.Acknowledgment) { - if len(results) == 0 { - return - } - - packetsLost := 0 - for _, p := range results { - if p.Arrival.IsZero() { - packetsLost++ - } - } - - e.lock.Lock() - defer e.lock.Unlock() - - lossRatio := float64(packetsLost) / float64(len(results)) - e.averageLoss = e.average(time.Since(e.lastLossUpdate), e.averageLoss, lossRatio) - e.lastLossUpdate = time.Now() - - increaseLoss := math.Max(e.averageLoss, lossRatio) - decreaseLoss := math.Min(e.averageLoss, lossRatio) - - if increaseLoss < increaseLossThreshold && time.Since(e.lastIncrease) > increaseTimeThreshold { - e.log.Infof( - "loss controller increasing; averageLoss: %v, decreaseLoss: %v, increaseLoss: %v", - e.averageLoss, decreaseLoss, increaseLoss, - ) - e.lastIncrease = time.Now() - e.bitrate = clampInt(int(increaseFactor*float64(e.bitrate)), e.minBitrate, e.maxBitrate) - } else if decreaseLoss > decreaseLossThreshold && time.Since(e.lastDecrease) > decreaseTimeThreshold { - e.log.Infof( - "loss controller decreasing; averageLoss: %v, decreaseLoss: %v, increaseLoss: %v", - e.averageLoss, decreaseLoss, increaseLoss, - ) - e.lastDecrease = time.Now() - e.bitrate = clampInt(int(float64(e.bitrate)*(1-0.5*decreaseLoss)), e.minBitrate, e.maxBitrate) - } -} - -func (e *lossBasedBandwidthEstimator) average(delta time.Duration, prev, sample float64) float64 { - return sample + math.Exp(-float64(delta.Milliseconds())/200.0)*(prev-sample) -} diff --git a/pkg/gcc/noop_pacer.go b/pkg/gcc/noop_pacer.go index 18ad09d5..7a13c0a3 100644 --- a/pkg/gcc/noop_pacer.go +++ b/pkg/gcc/noop_pacer.go @@ -14,15 +14,21 @@ import ( // ErrUnknownStream is returned when trying to send a packet with a SSRC that // was never registered with any stream. +// +// Deprecated: See comment on SendSideBWE. var ErrUnknownStream = errors.New("unknown ssrc") // NoOpPacer implements a pacer that always immediately sends incoming packets. +// +// Deprecated: See comment on SendSideBWE. type NoOpPacer struct { lock sync.Mutex ssrcToWriter map[uint32]interceptor.RTPWriter } // NewNoOpPacer initializes a new NoOpPacer. +// +// Deprecated: See comment on SendSideBWE. func NewNoOpPacer() *NoOpPacer { return &NoOpPacer{ lock: sync.Mutex{}, @@ -32,10 +38,14 @@ func NewNoOpPacer() *NoOpPacer { // SetTargetBitrate sets the bitrate at which the pacer sends data. NoOp for // NoOp pacer. +// +// Deprecated: See comment on SendSideBWE. func (p *NoOpPacer) SetTargetBitrate(int) { } // AddStream adds a stream and corresponding writer to the p. +// +// Deprecated: See comment on SendSideBWE. func (p *NoOpPacer) AddStream(ssrc uint32, writer interceptor.RTPWriter) { p.lock.Lock() defer p.lock.Unlock() @@ -43,6 +53,8 @@ func (p *NoOpPacer) AddStream(ssrc uint32, writer interceptor.RTPWriter) { } // Write sends a packet with header and payload to a previously added stream. +// +// Deprecated: See comment on SendSideBWE. func (p *NoOpPacer) Write(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) { p.lock.Lock() defer p.lock.Unlock() @@ -55,6 +67,8 @@ func (p *NoOpPacer) Write(header *rtp.Header, payload []byte, attributes interce } // Close closes p. +// +// Deprecated: See comment on SendSideBWE. func (p *NoOpPacer) Close() error { return nil } diff --git a/pkg/gcc/overuse_detector.go b/pkg/gcc/overuse_detector.go deleted file mode 100644 index e3deda87..00000000 --- a/pkg/gcc/overuse_detector.go +++ /dev/null @@ -1,86 +0,0 @@ -// SPDX-FileCopyrightText: 2026 The Pion community -// SPDX-License-Identifier: MIT - -package gcc - -import ( - "time" -) - -type threshold interface { - compare(estimate time.Duration, delta time.Duration) (usage, time.Duration, time.Duration) -} - -type overuseDetector struct { - threshold threshold - overuseTime time.Duration - - dsWriter func(DelayStats) - - lastEstimate time.Duration - lastUpdate time.Time - increasingDuration time.Duration - increasingCounter int -} - -func newOveruseDetector(thresh threshold, overuseTime time.Duration, dsw func(DelayStats)) *overuseDetector { - return &overuseDetector{ - threshold: thresh, - overuseTime: overuseTime, - dsWriter: dsw, - lastEstimate: 0, - lastUpdate: time.Now(), - increasingDuration: 0, - increasingCounter: 0, - } -} - -func (d *overuseDetector) onDelayStats(ds DelayStats) { - now := time.Now() - delta := now.Sub(d.lastUpdate) - d.lastUpdate = now - - thresholdUse, estimate, currentThreshold := d.threshold.compare(ds.Estimate, ds.LastReceiveDelta) - - use := usageNormal - if thresholdUse == usageOver { //nolint:nestif - if d.increasingDuration == 0 { - d.increasingDuration = delta / 2 - } else { - d.increasingDuration += delta - } - - d.increasingCounter++ - - if (d.overuseTime == 0 && d.increasingCounter > 1) || - (d.increasingDuration > d.overuseTime && d.increasingCounter > 1) { - if estimate > d.lastEstimate { - use = usageOver - } - } - } - - if thresholdUse == usageUnder { - d.increasingCounter = 0 - d.increasingDuration = 0 - use = usageUnder - } - - if thresholdUse == usageNormal { - d.increasingDuration = 0 - d.increasingCounter = 0 - use = usageNormal - } - - d.lastEstimate = estimate - - d.dsWriter(DelayStats{ - Measurement: ds.Measurement, - Estimate: estimate, - Threshold: currentThreshold, - LastReceiveDelta: ds.LastReceiveDelta, - Usage: use, - State: 0, - TargetBitrate: 0, - }) -} diff --git a/pkg/gcc/overuse_detector_test.go b/pkg/gcc/overuse_detector_test.go deleted file mode 100644 index 80710625..00000000 --- a/pkg/gcc/overuse_detector_test.go +++ /dev/null @@ -1,118 +0,0 @@ -// SPDX-FileCopyrightText: 2026 The Pion community -// SPDX-License-Identifier: MIT - -package gcc - -import ( - "runtime" - "testing" - "time" - - "github.com/stretchr/testify/assert" -) - -type staticThreshold time.Duration - -func (t staticThreshold) compare(estimate, _ time.Duration) (usage, time.Duration, time.Duration) { - if estimate > time.Duration(t) { - return usageOver, estimate, time.Duration(t) - } - if estimate < -time.Duration(t) { - return usageUnder, estimate, time.Duration(t) - } - - return usageNormal, estimate, time.Duration(t) -} - -func TestOveruseDetectorWithoutDelay(t *testing.T) { - cases := []struct { - name string - estimates []DelayStats - expected []usage - thresh threshold - delay time.Duration - }{ - { - name: "noEstimateNoUsage", - estimates: []DelayStats{}, - expected: []usage{}, - thresh: staticThreshold(time.Millisecond), - delay: 0, - }, - { - name: "overuse", - estimates: []DelayStats{ - {}, - {Estimate: 2 * time.Millisecond}, - {Estimate: 3 * time.Millisecond}, - }, - expected: []usage{usageNormal, usageNormal, usageOver}, - thresh: staticThreshold(time.Millisecond), - delay: 13 * time.Millisecond, - }, - { - name: "normaluse", - estimates: []DelayStats{{Estimate: 0}}, - expected: []usage{usageNormal}, - thresh: staticThreshold(time.Millisecond), - delay: 0, - }, - { - name: "underuse", - estimates: []DelayStats{{Estimate: -2 * time.Millisecond}}, - expected: []usage{usageUnder}, - thresh: staticThreshold(time.Millisecond), - delay: 0, - }, - { - name: "noOverUseBeforeDelay", - estimates: []DelayStats{ - {}, - {Estimate: 3 * time.Millisecond}, - {Estimate: 5 * time.Millisecond}, - }, - expected: []usage{usageNormal, usageNormal, usageOver}, - thresh: staticThreshold(1 * time.Millisecond), - delay: 10 * time.Millisecond, - }, - { - name: "noOverUseIfEstimateDecreased", - estimates: []DelayStats{ - {}, - {Estimate: 4 * time.Millisecond}, - {Estimate: 5 * time.Millisecond}, - {Estimate: 3 * time.Millisecond}, - }, - expected: []usage{usageNormal, usageNormal, usageOver, usageNormal}, - thresh: staticThreshold(1 * time.Millisecond), - delay: 0, - }, - } - for _, tc := range cases { - tc := tc - t.Run(tc.name, func(t *testing.T) { - out := make(chan DelayStats) - dsw := func(ds DelayStats) { - out <- ds - } - od := newOveruseDetector(tc.thresh, tc.delay, dsw) - go func() { - defer close(out) - for _, e := range tc.estimates { - od.onDelayStats(e) - if tc.delay == 0 { - // avoid time.Sleep(0) since it's broken on windows. - runtime.Gosched() - } else { - time.Sleep(tc.delay) - } - } - }() - received := []usage{} - for s := range out { - received = append(received, s.Usage) - } - assert.Equal(t, tc.expected, received, "%v != %v", tc.expected, received) - }) - } -} diff --git a/pkg/gcc/rate_calculator.go b/pkg/gcc/rate_calculator.go deleted file mode 100644 index 54fc0b15..00000000 --- a/pkg/gcc/rate_calculator.go +++ /dev/null @@ -1,66 +0,0 @@ -// SPDX-FileCopyrightText: 2026 The Pion community -// SPDX-License-Identifier: MIT - -package gcc - -import ( - "time" - - "github.com/pion/interceptor/internal/cc" -) - -type rateCalculator struct { - window time.Duration -} - -func newRateCalculator(window time.Duration) *rateCalculator { - return &rateCalculator{ - window: window, - } -} - -func (c *rateCalculator) run(in <-chan []cc.Acknowledgment, onRateUpdate func(int)) { - var history []cc.Acknowledgment - init := false - sum := 0 - for acks := range in { - for _, next := range acks { - if next.Arrival.IsZero() { - // Ignore packet if it didn't arrive - continue - } - history = append(history, next) - sum += next.Size - - if !init { - init = true - // Don't know any timeframe here, only arrival of last packet, - // which is by definition in the window that ends with the last - // arrival time - onRateUpdate(next.Size * 8) - - continue - } - - del := 0 - for _, ack := range history { - deadline := next.Arrival.Add(-c.window) - if !ack.Arrival.Before(deadline) { - break - } - del++ - sum -= ack.Size - } - history = history[del:] - if len(history) == 0 { - onRateUpdate(0) - - continue - } - dt := next.Arrival.Sub(history[0].Arrival) - bits := 8 * sum - rate := int(float64(bits) / dt.Seconds()) - onRateUpdate(rate) - } - } -} diff --git a/pkg/gcc/rate_calculator_test.go b/pkg/gcc/rate_calculator_test.go deleted file mode 100644 index 72876961..00000000 --- a/pkg/gcc/rate_calculator_test.go +++ /dev/null @@ -1,118 +0,0 @@ -// SPDX-FileCopyrightText: 2026 The Pion community -// SPDX-License-Identifier: MIT - -package gcc - -import ( - "testing" - "time" - - "github.com/pion/interceptor/internal/cc" - "github.com/stretchr/testify/assert" -) - -func TestRateCalculator(t *testing.T) { - t0 := time.Now() - cases := []struct { - name string - acks []cc.Acknowledgment - expected []int - }{ - { - name: "emptyCreatesNoRate", - acks: []cc.Acknowledgment{}, - expected: []int{}, - }, - { - name: "ignoresZeroArrivalTimes", - acks: []cc.Acknowledgment{{ - SequenceNumber: 0, - Size: 0, - Departure: time.Time{}, - Arrival: time.Time{}, - }}, - expected: []int{}, - }, - { - name: "singleAckCreatesRate", - acks: []cc.Acknowledgment{{ - SequenceNumber: 0, - Size: 1000, - Departure: time.Time{}, - Arrival: t0, - }}, - expected: []int{8000}, - }, - { - name: "twoAcksCalculateCorrectRates", - acks: []cc.Acknowledgment{{ - SequenceNumber: 0, - Size: 125, - Departure: time.Time{}, - Arrival: t0, - }, { - SequenceNumber: 0, - Size: 125, - Departure: time.Time{}, - Arrival: t0.Add(100 * time.Millisecond), - }}, - expected: []int{1000, 20_000}, - }, - { - name: "steadyACKsCalculateCorrectRates", - acks: getACKStream(10, 1200, 100*time.Millisecond), - expected: []int{ - 9_600, - 192_000, - 144_000, - 128_000, - 120_000, - 115_200, - 115_200, - 115_200, - 115_200, - 115_200, - }, - }, - } - - for _, tc := range cases { - tc := tc - t.Run(tc.name, func(t *testing.T) { - rc := newRateCalculator(500 * time.Millisecond) - in := make(chan []cc.Acknowledgment) - out := make(chan int) - onRateUpdate := func(rate int) { - out <- rate - } - go func() { - defer close(out) - rc.run(in, onRateUpdate) - }() - go func() { - in <- tc.acks - close(in) - }() - - received := []int{} - for r := range out { - received = append(received, r) - } - assert.Equal(t, tc.expected, received) - }) - } -} - -func getACKStream(length int, size int, interval time.Duration) []cc.Acknowledgment { - res := []cc.Acknowledgment{} - t0 := time.Now() - for i := 0; i < length; i++ { - res = append(res, cc.Acknowledgment{ - Size: size, - Arrival: t0, - }) - t0 = t0.Add(interval) - } - - return res -} diff --git a/pkg/gcc/rate_controller.go b/pkg/gcc/rate_controller.go deleted file mode 100644 index 57443610..00000000 --- a/pkg/gcc/rate_controller.go +++ /dev/null @@ -1,178 +0,0 @@ -// SPDX-FileCopyrightText: 2026 The Pion community -// SPDX-License-Identifier: MIT - -package gcc - -import ( - "math" - "sync" - "time" -) - -const ( - decreaseEMAAlpha = 0.95 - beta = 0.85 -) - -type rateController struct { - now now - initialTargetBitrate int - minBitrate int - maxBitrate int - - dsWriter func(DelayStats) - - lock sync.Mutex - init bool - delayStats DelayStats - target int - lastUpdate time.Time - lastState state - latestRTT time.Duration - latestReceivedRate int - latestDecreaseRate *exponentialMovingAverage -} - -type exponentialMovingAverage struct { - average float64 - variance float64 - stdDeviation float64 -} - -func (a *exponentialMovingAverage) update(value float64) { - if a.average == 0.0 { - a.average = value - } else { - x := value - a.average - a.average += decreaseEMAAlpha * x - a.variance = (1 - decreaseEMAAlpha) * (a.variance + decreaseEMAAlpha*x*x) - a.stdDeviation = math.Sqrt(a.variance) - } -} - -func newRateController( - now now, initialTargetBitrate, minBitrate, maxBitrate int, dsw func(DelayStats), -) *rateController { - return &rateController{ - now: now, - initialTargetBitrate: initialTargetBitrate, - minBitrate: minBitrate, - maxBitrate: maxBitrate, - dsWriter: dsw, - init: false, - delayStats: DelayStats{}, - target: initialTargetBitrate, - lastUpdate: time.Time{}, - lastState: stateIncrease, - latestRTT: 0, - latestReceivedRate: 0, - latestDecreaseRate: &exponentialMovingAverage{}, - } -} - -func (c *rateController) onReceivedRate(rate int) { - c.lock.Lock() - defer c.lock.Unlock() - c.latestReceivedRate = rate -} - -func (c *rateController) updateRTT(rtt time.Duration) { - c.lock.Lock() - defer c.lock.Unlock() - c.latestRTT = rtt -} - -func (c *rateController) onDelayStats(ds DelayStats) { - now := time.Now() - - if !c.init { - c.delayStats = ds - c.delayStats.State = stateIncrease - c.init = true - - return - } - c.delayStats = ds - c.delayStats.State = c.delayStats.State.transition(ds.Usage) - - if c.delayStats.State == stateHold { - return - } - - var next DelayStats - - c.lock.Lock() - - switch c.delayStats.State { - case stateHold: - // should never occur due to check above, but makes the linter happy - case stateIncrease: - c.target = clampInt(c.increase(now), c.minBitrate, c.maxBitrate) - next = DelayStats{ - Measurement: c.delayStats.Measurement, - Estimate: c.delayStats.Estimate, - Threshold: c.delayStats.Threshold, - LastReceiveDelta: c.delayStats.LastReceiveDelta, - Usage: c.delayStats.Usage, - State: c.delayStats.State, - TargetBitrate: c.target, - } - - case stateDecrease: - c.target = clampInt(c.decrease(), c.minBitrate, c.maxBitrate) - next = DelayStats{ - Measurement: c.delayStats.Measurement, - Estimate: c.delayStats.Estimate, - Threshold: c.delayStats.Threshold, - LastReceiveDelta: c.delayStats.LastReceiveDelta, - Usage: c.delayStats.Usage, - State: c.delayStats.State, - TargetBitrate: c.target, - } - } - - c.lock.Unlock() - - c.dsWriter(next) -} - -func (c *rateController) increase(now time.Time) int { - if c.latestDecreaseRate.average > 0 && - float64(c.latestReceivedRate) > c.latestDecreaseRate.average-3*c.latestDecreaseRate.stdDeviation && - float64(c.latestReceivedRate) < c.latestDecreaseRate.average+3*c.latestDecreaseRate.stdDeviation { - bitsPerFrame := float64(c.target) / 30.0 - packetsPerFrame := math.Ceil(bitsPerFrame / (1200 * 8)) - expectedPacketSizeBits := bitsPerFrame / packetsPerFrame - - responseTime := 100*time.Millisecond + c.latestRTT - alpha := 0.5 * math.Min(float64(now.Sub(c.lastUpdate).Milliseconds())/float64(responseTime.Milliseconds()), 1.0) - increase := int(math.Max(1000.0, alpha*expectedPacketSizeBits)) - c.lastUpdate = now - - return int(math.Min(float64(c.target+increase), 1.5*float64(c.latestReceivedRate))) - } - eta := math.Pow(1.08, math.Min(float64(now.Sub(c.lastUpdate).Milliseconds())/1000, 1.0)) - c.lastUpdate = now - - rate := int(eta * float64(c.target)) - - // maximum increase to 1.5 * received rate - received := int(1.5 * float64(c.latestReceivedRate)) - if rate > received && received > c.target { - return received - } - - if rate < c.target { - return c.target - } - - return rate -} - -func (c *rateController) decrease() int { - target := int(beta * float64(c.latestReceivedRate)) - c.latestDecreaseRate.update(float64(c.latestReceivedRate)) - c.lastUpdate = c.now() - - return target -} diff --git a/pkg/gcc/rate_controller_test.go b/pkg/gcc/rate_controller_test.go deleted file mode 100644 index f19663aa..00000000 --- a/pkg/gcc/rate_controller_test.go +++ /dev/null @@ -1,79 +0,0 @@ -// SPDX-FileCopyrightText: 2026 The Pion community -// SPDX-License-Identifier: MIT - -package gcc - -import ( - "testing" - "time" - - "github.com/stretchr/testify/assert" -) - -func TestRateControllerRun(t *testing.T) { - cases := []struct { - name string - initialBitrate int - usage []usage - expected []DelayStats - }{ - { - name: "empty", - initialBitrate: 100_000, - usage: []usage{}, - expected: []DelayStats{}, - }, - { - name: "increasesMultiplicativelyBy8000", - initialBitrate: 100_000, - usage: []usage{usageNormal, usageNormal}, - expected: []DelayStats{{ - Usage: usageNormal, - State: stateIncrease, - TargetBitrate: 108_000, - Estimate: 0, - Threshold: 0, - }}, - }, - } - - t0 := time.Time{} - mockNoFn := func() time.Time { - t0 = t0.Add(100 * time.Millisecond) - - return t0 - } - for _, tc := range cases { - tc := tc - t.Run(tc.name, func(t *testing.T) { - out := make(chan DelayStats) - dc := newRateController(mockNoFn, 100_000, 1_000, 50_000_000, func(ds DelayStats) { - out <- ds - }) - in := make(chan DelayStats) - dc.onReceivedRate(100_000) - dc.updateRTT(300 * time.Millisecond) - go func() { - defer close(out) - for _, state := range tc.usage { - dc.onDelayStats(DelayStats{ - Measurement: 0, - Estimate: 0, - Threshold: 0, - Usage: state, - State: 0, - TargetBitrate: 0, - }) - } - close(in) - }() - received := []DelayStats{} - for ds := range out { - received = append(received, ds) - } - if len(tc.expected) > 0 { - assert.Equal(t, tc.expected[0], received[0]) - } - }) - } -} diff --git a/pkg/gcc/send_side_bwe.go b/pkg/gcc/send_side_bwe.go index c996ce2a..7ceaa462 100644 --- a/pkg/gcc/send_side_bwe.go +++ b/pkg/gcc/send_side_bwe.go @@ -5,13 +5,13 @@ package gcc import ( "errors" - "math" - "sync" + "sync/atomic" "time" + "github.com/pion/bwe/gcc" "github.com/pion/interceptor" "github.com/pion/interceptor/internal/cc" - "github.com/pion/interceptor/internal/ntp" + "github.com/pion/interceptor/pkg/rtpfb" "github.com/pion/logging" "github.com/pion/rtcp" "github.com/pion/rtp" @@ -36,45 +36,89 @@ type Pacer interface { } // Stats contains internal statistics of the bandwidth estimator. +// +// Deprecated: All stats will always be zero. type Stats struct { LossStats DelayStats } -// SendSideBWE implements a combination of loss and delay based GCC. -type SendSideBWE struct { - pacer Pacer - lossController *lossBasedBandwidthEstimator - delayController *delayController - feedbackAdapter *cc.FeedbackAdapter +// LossStats contains internal statistics of the loss based controller. +// +// Deprecated: All stats will always be zero. +type LossStats struct { + TargetBitrate int + AverageLoss float64 +} - onTargetBitrateChange func(bitrate int) +// DelayStats contains some internal statistics of the delay based congestion +// controller. +// +// Deprecated: All stats will always be zero. +type DelayStats struct { + Measurement time.Duration + Estimate time.Duration + Threshold time.Duration + LastReceiveDelta time.Duration + + Usage usage + State state + TargetBitrate int +} + +// Deprecated but necessary to not break the stats API. +type state int + +// Deprecated but necessary to not break the stats API. +func (s state) String() string { + return "state is deprecated" +} - lock sync.Mutex - latestStats Stats - latestBitrate int - minBitrate int - maxBitrate int +// Deprecated but necessary to not break the stats API. +type usage int - close chan struct{} - closeLock sync.RWMutex +// Deprecated but necessary to not break the stats API. +func (u usage) String() string { + return "usage is deprecated" +} + +// SendSideBWE implements a combination of loss and delay based GCC. +// +// Deprecated: SendSideBWE is now a wrapper around the new GCC implementation +// from https://github.com/pion/bwe +// New applications should directly use that implementation without depending on +// the legacy API from this package. +type SendSideBWE struct { + pacer Pacer + latestBitrate atomic.Int64 + bwe *gcc.SendSideController + loggerFactory logging.LoggerFactory + onTargetBitrateChange func(bitrate int) - loggerFactory logging.LoggerFactory + latestStats Stats + minBitrate int + maxBitrate int } // Option configures a bandwidth estimator. +// +// Deprecated: See comment on SendSideBWE. type Option func(*SendSideBWE) error // SendSideBWEInitialBitrate sets the initial bitrate of new GCC interceptors. +// +// Deprecated: See comment on SendSideBWE. func SendSideBWEInitialBitrate(rate int) Option { return func(e *SendSideBWE) error { - e.latestBitrate = rate + e.latestBitrate.Store(int64(rate)) return nil } } // SendSideBWEMaxBitrate sets the initial bitrate of new GCC interceptors. +// +// Deprecated: See comment on SendSideBWE. func SendSideBWEMaxBitrate(rate int) Option { return func(e *SendSideBWE) error { e.maxBitrate = rate @@ -84,6 +128,8 @@ func SendSideBWEMaxBitrate(rate int) Option { } // SendSideBWEMinBitrate sets the initial bitrate of new GCC interceptors. +// +// Deprecated: See comment on SendSideBWE. func SendSideBWEMinBitrate(rate int) Option { return func(e *SendSideBWE) error { e.minBitrate = rate @@ -93,6 +139,8 @@ func SendSideBWEMinBitrate(rate int) Option { } // SendSideBWEPacer sets the pacing algorithm to use. +// +// Deprecated: See comment on SendSideBWE. func SendSideBWEPacer(p Pacer) Option { return func(e *SendSideBWE) error { e.pacer = p @@ -102,6 +150,8 @@ func SendSideBWEPacer(p Pacer) Option { } // WithLoggerFactory sets the logger factory for the bandwidth estimator. +// +// Deprecated: See comment on SendSideBWE. func WithLoggerFactory(factory logging.LoggerFactory) Option { return func(e *SendSideBWE) error { e.loggerFactory = factory @@ -111,45 +161,43 @@ func WithLoggerFactory(factory logging.LoggerFactory) Option { } // NewSendSideBWE creates a new sender side bandwidth estimator. +// +// Deprecated: See comment on SendSideBWE. func NewSendSideBWE(opts ...Option) (*SendSideBWE, error) { send := &SendSideBWE{ pacer: nil, - lossController: nil, - delayController: nil, - feedbackAdapter: cc.NewFeedbackAdapter(), onTargetBitrateChange: nil, - lock: sync.Mutex{}, latestStats: Stats{}, - latestBitrate: latestBitrate, minBitrate: minBitrate, maxBitrate: maxBitrate, - close: make(chan struct{}), + latestBitrate: atomic.Int64{}, + bwe: nil, + loggerFactory: nil, } + send.latestBitrate.Store(latestBitrate) for _, opt := range opts { if err := opt(send); err != nil { return nil, err } } + var err error + send.bwe, err = gcc.NewSendSideController(send.minBitrate, send.minBitrate, send.maxBitrate) + if err != nil { + return nil, err + } if send.loggerFactory == nil { send.loggerFactory = logging.NewDefaultLoggerFactory() } if send.pacer == nil { - send.pacer = newLeakyBucketPacer(send.latestBitrate, send.loggerFactory) + send.pacer = newLeakyBucketPacer(int(send.latestBitrate.Load()), send.loggerFactory) } - send.lossController = newLossBasedBWE(send.latestBitrate, send.loggerFactory) - send.delayController = newDelayController(delayControllerConfig{ - nowFn: time.Now, - initialBitrate: send.latestBitrate, - minBitrate: send.minBitrate, - maxBitrate: send.maxBitrate, - }, send.loggerFactory) - - send.delayController.onUpdate(send.onDelayUpdate) return send, nil } // AddStream adds a new stream to the bandwidth estimator. +// +// Deprecated: See comment on SendSideBWE. func (e *SendSideBWE) AddStream(info *interceptor.StreamInfo, writer interceptor.RTPWriter) interceptor.RTPWriter { var hdrExtID uint8 for _, e := range info.RTPHeaderExtensions { @@ -168,9 +216,6 @@ func (e *SendSideBWE) AddStream(info *interceptor.StreamInfo, writer interceptor } attributes.Set(cc.TwccExtensionAttributesKey, hdrExtID) } - if err := e.feedbackAdapter.OnSent(time.Now(), header, len(payload), attributes); err != nil { - return 0, err - } return writer.Write(header, payload, attributes) }, @@ -181,136 +226,69 @@ func (e *SendSideBWE) AddStream(info *interceptor.StreamInfo, writer interceptor // WriteRTCP adds some RTCP feedback to the bandwidth estimator. // -//nolint:cyclop -func (e *SendSideBWE) WriteRTCP(pkts []rtcp.Packet, _ interceptor.Attributes) error { - now := time.Now() - e.closeLock.RLock() - defer e.closeLock.RUnlock() - - if e.isClosed() { - return ErrSendSideBWEClosed - } - - for _, pkt := range pkts { - var acks []cc.Acknowledgment - var err error - var feedbackSentTime time.Time - switch fb := pkt.(type) { - case *rtcp.TransportLayerCC: - acks, err = e.feedbackAdapter.OnTransportCCFeedback(now, fb) - if err != nil { - return err +// Deprecated: See comment on SendSideBWE. +func (e *SendSideBWE) WriteRTCP(pkts []rtcp.Packet, attr interceptor.Attributes) error { + report, ok := attr.Get(rtpfb.CCFBAttributesKey).(rtpfb.Report) + if ok { + for _, pr := range report.PacketReports { + if pr.Arrived { + e.bwe.OnAck( + pr.SequenceNumber, + pr.Size, + pr.Departure, + pr.Arrival, + ) + } else { + e.bwe.OnLoss() } - for i, ack := range acks { - if i == 0 { - feedbackSentTime = ack.Arrival - - continue - } - if ack.Arrival.After(feedbackSentTime) { - feedbackSentTime = ack.Arrival - } - } - case *rtcp.CCFeedbackReport: - acks = e.feedbackAdapter.OnRFC8888Feedback(now, fb) - feedbackSentTime = ntp.ToTime(uint64(fb.ReportTimestamp) << 16) - default: - continue } - - feedbackMinRTT := time.Duration(math.MaxInt) - for _, ack := range acks { - if ack.Arrival.IsZero() { - continue - } - pendingTime := feedbackSentTime.Sub(ack.Arrival) - rtt := now.Sub(ack.Departure) - pendingTime - feedbackMinRTT = time.Duration(min(int(rtt), int(feedbackMinRTT))) + rate := e.bwe.OnFeedback(report.Arrival, report.RTT) + prev := e.latestBitrate.Swap(int64(rate)) + if e.pacer != nil { + e.pacer.SetTargetBitrate(rate) } - if feedbackMinRTT < math.MaxInt { - e.delayController.updateRTT(feedbackMinRTT) + if rate != int(prev) && e.onTargetBitrateChange != nil { + e.onTargetBitrateChange(rate) } - - e.lossController.updateLossEstimate(acks) - e.delayController.updateDelayEstimate(acks) } return nil } // GetTargetBitrate returns the current target bitrate in bits per second. +// +// Deprecated: See comment on SendSideBWE. func (e *SendSideBWE) GetTargetBitrate() int { - e.lock.Lock() - defer e.lock.Unlock() - - return e.latestBitrate + return int(e.latestBitrate.Load()) } // GetStats returns some internal statistics of the bandwidth estimator. +// +// Deprecated: See comment on SendSideBWE. func (e *SendSideBWE) GetStats() map[string]any { - e.lock.Lock() - defer e.lock.Unlock() - return map[string]any{ - "lossTargetBitrate": e.latestStats.LossStats.TargetBitrate, - "averageLoss": e.latestStats.AverageLoss, - "delayTargetBitrate": e.latestStats.DelayStats.TargetBitrate, - "delayMeasurement": float64(e.latestStats.Measurement.Microseconds()) / 1000.0, - "delayEstimate": float64(e.latestStats.Estimate.Microseconds()) / 1000.0, - "delayThreshold": float64(e.latestStats.Threshold.Microseconds()) / 1000.0, - "usage": e.latestStats.Usage.String(), - "state": e.latestStats.State.String(), + "lossTargetBitrate": 0, + "averageLoss": 0, + "delayTargetBitrate": 0, + "delayMeasurement": 0, + "delayEstimate": 0, + "delayThreshold": 0, + "usage": 0, + "state": 0, } } // OnTargetBitrateChange sets the callback that is called when the target // bitrate in bits per second changes. +// +// Deprecated: See comment on SendSideBWE. func (e *SendSideBWE) OnTargetBitrateChange(f func(bitrate int)) { e.onTargetBitrateChange = f } -// isClosed returns true if SendSideBWE is closed. -func (e *SendSideBWE) isClosed() bool { - select { - case <-e.close: - return true - default: - return false - } -} - // Close stops and closes the bandwidth estimator. +// +// Deprecated: See comment on SendSideBWE. func (e *SendSideBWE) Close() error { - e.closeLock.Lock() - defer e.closeLock.Unlock() - - if err := e.delayController.Close(); err != nil { - return err - } - close(e.close) - return e.pacer.Close() } - -func (e *SendSideBWE) onDelayUpdate(delayStats DelayStats) { - e.lock.Lock() - defer e.lock.Unlock() - - lossStats := e.lossController.getEstimate(delayStats.TargetBitrate) - bitrateChanged := false - bitrate := min(delayStats.TargetBitrate, lossStats.TargetBitrate) - if bitrate != e.latestBitrate { - bitrateChanged = true - e.latestBitrate = bitrate - e.pacer.SetTargetBitrate(e.latestBitrate) - } - - if bitrateChanged && e.onTargetBitrateChange != nil { - go e.onTargetBitrateChange(bitrate) - } - - e.latestStats = Stats{ - LossStats: lossStats, - DelayStats: delayStats, - } -} diff --git a/pkg/gcc/send_side_bwe_test.go b/pkg/gcc/send_side_bwe_test.go index bd95ded7..cb17830c 100644 --- a/pkg/gcc/send_side_bwe_test.go +++ b/pkg/gcc/send_side_bwe_test.go @@ -95,8 +95,8 @@ func TestSendSideBWE(t *testing.T) { assert.NoError(t, err) } - // Sending a stream with zero loss and no RTT should increase estimate - require.Less(t, latestBitrate, bwe.GetTargetBitrate()) + // Empty reports should not change any bwe state. + require.Equal(t, latestBitrate, bwe.GetTargetBitrate()) } func TestSendSideBWE_ErrorOnWriteRTCPAtClosedState(t *testing.T) { @@ -106,10 +106,7 @@ func TestSendSideBWE_ErrorOnWriteRTCPAtClosedState(t *testing.T) { pkts := []rtcp.Packet{&rtcp.TransportLayerCC{}} require.NoError(t, bwe.WriteRTCP(pkts, nil)) - require.Equal(t, bwe.isClosed(), false) require.NoError(t, bwe.Close()) - require.ErrorIs(t, bwe.WriteRTCP(pkts, nil), ErrSendSideBWEClosed) - require.Equal(t, bwe.isClosed(), true) } func BenchmarkSendSideBWE_WriteRTCP(b *testing.B) { diff --git a/pkg/gcc/slope_estimator.go b/pkg/gcc/slope_estimator.go deleted file mode 100644 index ba55fe0a..00000000 --- a/pkg/gcc/slope_estimator.go +++ /dev/null @@ -1,57 +0,0 @@ -// SPDX-FileCopyrightText: 2026 The Pion community -// SPDX-License-Identifier: MIT - -package gcc - -import ( - "time" -) - -type estimator interface { - updateEstimate(measurement time.Duration) time.Duration -} - -type estimatorFunc func(time.Duration) time.Duration - -func (f estimatorFunc) updateEstimate(d time.Duration) time.Duration { - return f(d) -} - -type slopeEstimator struct { - estimator - init bool - group arrivalGroup - delayStatsWriter func(DelayStats) -} - -func newSlopeEstimator(e estimator, dsw func(DelayStats)) *slopeEstimator { - return &slopeEstimator{ - estimator: e, - delayStatsWriter: dsw, - } -} - -func (e *slopeEstimator) onArrivalGroup(ag arrivalGroup) { - if !e.init { - e.group = ag - e.init = true - - return - } - measurement := interGroupDelayVariation(e.group, ag) - delta := ag.arrival.Sub(e.group.arrival) - e.group = ag - e.delayStatsWriter(DelayStats{ - Measurement: measurement, - Estimate: e.updateEstimate(measurement), - Threshold: 0, - LastReceiveDelta: delta, - Usage: 0, - State: 0, - TargetBitrate: 0, - }) -} - -func interGroupDelayVariation(a, b arrivalGroup) time.Duration { - return b.arrival.Sub(a.arrival) - b.departure.Sub(a.departure) -} diff --git a/pkg/gcc/slope_estimator_test.go b/pkg/gcc/slope_estimator_test.go deleted file mode 100644 index dc8a865c..00000000 --- a/pkg/gcc/slope_estimator_test.go +++ /dev/null @@ -1,112 +0,0 @@ -// SPDX-FileCopyrightText: 2026 The Pion community -// SPDX-License-Identifier: MIT - -package gcc - -import ( - "testing" - "time" - - "github.com/stretchr/testify/assert" -) - -func identity(d time.Duration) time.Duration { - return d -} - -func TestSlopeEstimator(t *testing.T) { - cases := []struct { - name string - ags []arrivalGroup - expected []DelayStats - }{ - { - name: "emptyReturnsEmpty", - ags: []arrivalGroup{}, - expected: []DelayStats{}, - }, - { - name: "simpleDeltaTest", - ags: []arrivalGroup{ - { - arrival: time.Time{}.Add(5 * time.Millisecond), - departure: time.Time{}.Add(15 * time.Millisecond), - }, - { - arrival: time.Time{}.Add(10 * time.Millisecond), - departure: time.Time{}.Add(20 * time.Millisecond), - }, - }, - expected: []DelayStats{ - { - Measurement: 0, - Estimate: 0, - Threshold: 0, - LastReceiveDelta: 5 * time.Millisecond, - Usage: 0, - State: 0, - TargetBitrate: 0, - }, - }, - }, - { - name: "twoMeasurements", - ags: []arrivalGroup{ - { - arrival: time.Time{}.Add(5 * time.Millisecond), - departure: time.Time{}.Add(15 * time.Millisecond), - }, - { - arrival: time.Time{}.Add(10 * time.Millisecond), - departure: time.Time{}.Add(20 * time.Millisecond), - }, - { - arrival: time.Time{}.Add(15 * time.Millisecond), - departure: time.Time{}.Add(30 * time.Millisecond), - }, - }, - expected: []DelayStats{ - { - Measurement: 0, - Estimate: 0, - Threshold: 0, - LastReceiveDelta: 5 * time.Millisecond, - Usage: 0, - State: 0, - TargetBitrate: 0, - }, - { - Measurement: -5 * time.Millisecond, - Estimate: -5 * time.Millisecond, - Threshold: 0, - LastReceiveDelta: 5 * time.Millisecond, - Usage: 0, - State: 0, - TargetBitrate: 0, - }, - }, - }, - } - - for _, tc := range cases { - tc := tc - t.Run(tc.name, func(t *testing.T) { - out := make(chan DelayStats) - se := newSlopeEstimator(estimatorFunc(identity), func(ds DelayStats) { - out <- ds - }) - input := []time.Duration{} - go func() { - defer close(out) - for _, ag := range tc.ags { - se.onArrivalGroup(ag) - } - }() - received := []DelayStats{} - for d := range out { - received = append(received, d) - } - assert.Equal(t, tc.expected, received, "%v != %v", input, received) - }) - } -} diff --git a/pkg/gcc/state.go b/pkg/gcc/state.go deleted file mode 100644 index b28d464d..00000000 --- a/pkg/gcc/state.go +++ /dev/null @@ -1,64 +0,0 @@ -// SPDX-FileCopyrightText: 2026 The Pion community -// SPDX-License-Identifier: MIT - -package gcc - -import "fmt" - -type state int - -const ( - stateIncrease state = iota - stateDecrease - stateHold -) - -//nolint:cyclop -func (s state) transition(use usage) state { - switch s { - case stateHold: - switch use { - case usageOver: - return stateDecrease - case usageNormal: - return stateIncrease - case usageUnder: - return stateHold - } - - case stateIncrease: - switch use { - case usageOver: - return stateDecrease - case usageNormal: - return stateIncrease - case usageUnder: - return stateHold - } - - case stateDecrease: - switch use { - case usageOver: - return stateDecrease - case usageNormal: - return stateHold - case usageUnder: - return stateHold - } - } - - return stateIncrease -} - -func (s state) String() string { - switch s { - case stateIncrease: - return "increase" - case stateDecrease: - return "decrease" - case stateHold: - return "hold" - default: - return fmt.Sprintf("invalid state: %d", s) - } -} diff --git a/pkg/gcc/usage.go b/pkg/gcc/usage.go deleted file mode 100644 index d7291feb..00000000 --- a/pkg/gcc/usage.go +++ /dev/null @@ -1,27 +0,0 @@ -// SPDX-FileCopyrightText: 2026 The Pion community -// SPDX-License-Identifier: MIT - -package gcc - -import "fmt" - -type usage int - -const ( - usageOver usage = iota - usageUnder - usageNormal -) - -func (u usage) String() string { - switch u { - case usageOver: - return "overuse" - case usageUnder: - return "underuse" - case usageNormal: - return "normal" - default: - return fmt.Sprintf("invalid usage: %d", u) - } -}