Note: We no longer publish the latest version of our code here. We primarily use a kumc-bmi github organization. The heron ETL repository, in particular, is not public. Peers in the informatics community should see MultiSiteDev for details on requesting access.

source: webrtc/webrtc/modules/pacing/paced_sender.cc @ 0:4bda6873e34c

pub_scrub_3792 tip
Last change on this file since 0:4bda6873e34c was 0:4bda6873e34c, checked in by Michael Prittie <mprittie@…>, 6 years ago

Scrubbed password for publication.

File size: 12.6 KB
Line 
1/*
2 *  Copyright (c) 2012 The WebRTC project authors. All Rights Reserved.
3 *
4 *  Use of this source code is governed by a BSD-style license
5 *  that can be found in the LICENSE file in the root of the source
6 *  tree. An additional intellectual property rights grant can be found
7 *  in the file PATENTS.  All contributing project authors may
8 *  be found in the AUTHORS file in the root of the source tree.
9 */
10
11#include "webrtc/modules/pacing/include/paced_sender.h"
12
13#include <assert.h>
14
15#include "webrtc/modules/interface/module_common_types.h"
16#include "webrtc/system_wrappers/interface/critical_section_wrapper.h"
17#include "webrtc/system_wrappers/interface/trace_event.h"
18
19namespace {
20// Time limit in milliseconds between packet bursts.
21const int kMinPacketLimitMs = 5;
22
23// Upper cap on process interval, in case process has not been called in a long
24// time.
25const int kMaxIntervalTimeMs = 30;
26
27// Max time that the first packet in the queue can sit in the queue if no
28// packets are sent, regardless of buffer state. In practice only in effect at
29// low bitrates (less than 320 kbits/s).
30const int kMaxQueueTimeWithoutSendingMs = 30;
31
32}  // namespace
33
34namespace webrtc {
35
36namespace paced_sender {
37struct Packet {
38  Packet(uint32_t ssrc, uint16_t seq_number, int64_t capture_time_ms,
39         int length_in_bytes, bool retransmission)
40      : ssrc_(ssrc),
41        sequence_number_(seq_number),
42        capture_time_ms_(capture_time_ms),
43        bytes_(length_in_bytes),
44        retransmission_(retransmission) {
45  }
46  uint32_t ssrc_;
47  uint16_t sequence_number_;
48  int64_t capture_time_ms_;
49  int bytes_;
50  bool retransmission_;
51};
52
53// STL list style class which prevents duplicates in the list.
54class PacketList {
55 public:
56  PacketList() {};
57
58  bool empty() const {
59    return packet_list_.empty();
60  }
61
62  Packet front() const {
63    return packet_list_.front();
64  }
65
66  void pop_front() {
67    Packet& packet = packet_list_.front();
68    uint16_t sequence_number = packet.sequence_number_;
69    packet_list_.pop_front();
70    sequence_number_set_.erase(sequence_number);
71  }
72
73  void push_back(const Packet& packet) {
74    if (sequence_number_set_.find(packet.sequence_number_) ==
75        sequence_number_set_.end()) {
76      // Don't insert duplicates.
77      packet_list_.push_back(packet);
78      sequence_number_set_.insert(packet.sequence_number_);
79    }
80  }
81
82 private:
83  std::list<Packet> packet_list_;
84  std::set<uint16_t> sequence_number_set_;
85};
86
87class IntervalBudget {
88 public:
89  explicit IntervalBudget(int initial_target_rate_kbps)
90      : target_rate_kbps_(initial_target_rate_kbps),
91        bytes_remaining_(0) {}
92
93  void set_target_rate_kbps(int target_rate_kbps) {
94    target_rate_kbps_ = target_rate_kbps;
95  }
96
97  void IncreaseBudget(int delta_time_ms) {
98    int bytes = target_rate_kbps_ * delta_time_ms / 8;
99    if (bytes_remaining_ < 0) {
100      // We overused last interval, compensate this interval.
101      bytes_remaining_ = bytes_remaining_ + bytes;
102    } else {
103      // If we underused last interval we can't use it this interval.
104      bytes_remaining_ = bytes;
105    }
106  }
107
108  void UseBudget(int bytes) {
109    bytes_remaining_ = std::max(bytes_remaining_ - bytes,
110                                -500 * target_rate_kbps_ / 8);
111  }
112
113  int bytes_remaining() const { return bytes_remaining_; }
114
115 private:
116  int target_rate_kbps_;
117  int bytes_remaining_;
118};
119}  // namespace paced_sender
120
121PacedSender::PacedSender(Callback* callback, int target_bitrate_kbps,
122                         float pace_multiplier)
123    : callback_(callback),
124      pace_multiplier_(pace_multiplier),
125      enabled_(false),
126      paused_(false),
127      max_queue_length_ms_(kDefaultMaxQueueLengthMs),
128      critsect_(CriticalSectionWrapper::CreateCriticalSection()),
129      media_budget_(new paced_sender::IntervalBudget(
130          pace_multiplier_ * target_bitrate_kbps)),
131      padding_budget_(new paced_sender::IntervalBudget(0)),
132      // No padding until UpdateBitrate is called.
133      pad_up_to_bitrate_budget_(new paced_sender::IntervalBudget(0)),
134      time_last_update_(TickTime::Now()),
135      capture_time_ms_last_queued_(0),
136      capture_time_ms_last_sent_(0),
137      high_priority_packets_(new paced_sender::PacketList),
138      normal_priority_packets_(new paced_sender::PacketList),
139      low_priority_packets_(new paced_sender::PacketList) {
140  UpdateBytesPerInterval(kMinPacketLimitMs);
141}
142
143PacedSender::~PacedSender() {
144}
145
146void PacedSender::Pause() {
147  CriticalSectionScoped cs(critsect_.get());
148  paused_ = true;
149}
150
151void PacedSender::Resume() {
152  CriticalSectionScoped cs(critsect_.get());
153  paused_ = false;
154}
155
156void PacedSender::SetStatus(bool enable) {
157  CriticalSectionScoped cs(critsect_.get());
158  enabled_ = enable;
159}
160
161bool PacedSender::Enabled() const {
162  CriticalSectionScoped cs(critsect_.get());
163  return enabled_;
164}
165
166void PacedSender::UpdateBitrate(int target_bitrate_kbps,
167                                int max_padding_bitrate_kbps,
168                                int pad_up_to_bitrate_kbps) {
169  CriticalSectionScoped cs(critsect_.get());
170  media_budget_->set_target_rate_kbps(pace_multiplier_ * target_bitrate_kbps);
171  padding_budget_->set_target_rate_kbps(max_padding_bitrate_kbps);
172  pad_up_to_bitrate_budget_->set_target_rate_kbps(pad_up_to_bitrate_kbps);
173}
174
175bool PacedSender::SendPacket(Priority priority, uint32_t ssrc,
176    uint16_t sequence_number, int64_t capture_time_ms, int bytes,
177    bool retransmission) {
178  CriticalSectionScoped cs(critsect_.get());
179
180  if (!enabled_) {
181    return true;  // We can send now.
182  }
183  if (capture_time_ms < 0) {
184    capture_time_ms = TickTime::MillisecondTimestamp();
185  }
186  if (priority != kHighPriority &&
187      capture_time_ms > capture_time_ms_last_queued_) {
188    capture_time_ms_last_queued_ = capture_time_ms;
189    TRACE_EVENT_ASYNC_BEGIN1("webrtc_rtp", "PacedSend", capture_time_ms,
190                             "capture_time_ms", capture_time_ms);
191  }
192  paced_sender::PacketList* packet_list = NULL;
193  switch (priority) {
194    case kHighPriority:
195      packet_list = high_priority_packets_.get();
196      break;
197    case kNormalPriority:
198      packet_list = normal_priority_packets_.get();
199      break;
200    case kLowPriority:
201      packet_list = low_priority_packets_.get();
202      break;
203  }
204  packet_list->push_back(paced_sender::Packet(ssrc, sequence_number,
205                                              capture_time_ms, bytes,
206                                              retransmission));
207  return false;
208}
209
210void PacedSender::set_max_queue_length_ms(int max_queue_length_ms) {
211  CriticalSectionScoped cs(critsect_.get());
212  max_queue_length_ms_ = max_queue_length_ms;
213}
214
215int PacedSender::QueueInMs() const {
216  CriticalSectionScoped cs(critsect_.get());
217  int64_t now_ms = TickTime::MillisecondTimestamp();
218  int64_t oldest_packet_capture_time = now_ms;
219  if (!high_priority_packets_->empty()) {
220    oldest_packet_capture_time = std::min(
221        oldest_packet_capture_time,
222        high_priority_packets_->front().capture_time_ms_);
223  }
224  if (!normal_priority_packets_->empty()) {
225    oldest_packet_capture_time = std::min(
226        oldest_packet_capture_time,
227        normal_priority_packets_->front().capture_time_ms_);
228  }
229  if (!low_priority_packets_->empty()) {
230    oldest_packet_capture_time = std::min(
231        oldest_packet_capture_time,
232        low_priority_packets_->front().capture_time_ms_);
233  }
234  return now_ms - oldest_packet_capture_time;
235}
236
237int32_t PacedSender::TimeUntilNextProcess() {
238  CriticalSectionScoped cs(critsect_.get());
239  int64_t elapsed_time_ms =
240      (TickTime::Now() - time_last_update_).Milliseconds();
241  if (elapsed_time_ms <= 0) {
242    return kMinPacketLimitMs;
243  }
244  if (elapsed_time_ms >= kMinPacketLimitMs) {
245    return 0;
246  }
247  return kMinPacketLimitMs - elapsed_time_ms;
248}
249
250int32_t PacedSender::Process() {
251  TickTime now = TickTime::Now();
252  CriticalSectionScoped cs(critsect_.get());
253  int elapsed_time_ms = (now - time_last_update_).Milliseconds();
254  time_last_update_ = now;
255  if (!enabled_) {
256    return 0;
257  }
258  if (!paused_) {
259    if (elapsed_time_ms > 0) {
260      uint32_t delta_time_ms = std::min(kMaxIntervalTimeMs, elapsed_time_ms);
261      UpdateBytesPerInterval(delta_time_ms);
262    }
263    paced_sender::PacketList* packet_list;
264    while (ShouldSendNextPacket(&packet_list)) {
265      if (!SendPacketFromList(packet_list))
266        return 0;
267    }
268    if (high_priority_packets_->empty() &&
269        normal_priority_packets_->empty() &&
270        low_priority_packets_->empty() &&
271        padding_budget_->bytes_remaining() > 0 &&
272        pad_up_to_bitrate_budget_->bytes_remaining() > 0) {
273      int padding_needed = std::min(
274          padding_budget_->bytes_remaining(),
275          pad_up_to_bitrate_budget_->bytes_remaining());
276      critsect_->Leave();
277      int bytes_sent = callback_->TimeToSendPadding(padding_needed);
278      critsect_->Enter();
279      media_budget_->UseBudget(bytes_sent);
280      padding_budget_->UseBudget(bytes_sent);
281      pad_up_to_bitrate_budget_->UseBudget(bytes_sent);
282    }
283  }
284  return 0;
285}
286
287// MUST have critsect_ when calling.
288bool PacedSender::SendPacketFromList(paced_sender::PacketList* packet_list) {
289  uint32_t ssrc;
290  uint16_t sequence_number;
291  int64_t capture_time_ms;
292  bool retransmission;
293  GetNextPacketFromList(packet_list, &ssrc, &sequence_number,
294                        &capture_time_ms, &retransmission);
295  critsect_->Leave();
296
297  const bool success = callback_->TimeToSendPacket(ssrc, sequence_number,
298                                                   capture_time_ms,
299                                                   retransmission);
300  critsect_->Enter();
301  // If packet cannot be sent then keep it in packet list and exit early.
302  // There's no need to send more packets.
303  if (!success) {
304    return false;
305  }
306  packet_list->pop_front();
307  const bool last_packet = packet_list->empty() ||
308      packet_list->front().capture_time_ms_ > capture_time_ms;
309  if (packet_list != high_priority_packets_.get()) {
310    if (capture_time_ms > capture_time_ms_last_sent_) {
311      capture_time_ms_last_sent_ = capture_time_ms;
312    } else if (capture_time_ms == capture_time_ms_last_sent_ &&
313               last_packet) {
314      TRACE_EVENT_ASYNC_END0("webrtc_rtp", "PacedSend", capture_time_ms);
315    }
316  }
317  return true;
318}
319
320// MUST have critsect_ when calling.
321void PacedSender::UpdateBytesPerInterval(uint32_t delta_time_ms) {
322  media_budget_->IncreaseBudget(delta_time_ms);
323  padding_budget_->IncreaseBudget(delta_time_ms);
324  pad_up_to_bitrate_budget_->IncreaseBudget(delta_time_ms);
325}
326
327// MUST have critsect_ when calling.
328bool PacedSender::ShouldSendNextPacket(paced_sender::PacketList** packet_list) {
329  *packet_list = NULL;
330  if (media_budget_->bytes_remaining() <= 0) {
331    // All bytes consumed for this interval.
332    // Check if we have not sent in a too long time.
333    if ((TickTime::Now() - time_last_send_).Milliseconds() >
334        kMaxQueueTimeWithoutSendingMs) {
335      if (!high_priority_packets_->empty()) {
336        *packet_list = high_priority_packets_.get();
337        return true;
338      }
339      if (!normal_priority_packets_->empty()) {
340        *packet_list = normal_priority_packets_.get();
341        return true;
342      }
343    }
344    // Send any old packets to avoid queuing for too long.
345    if (max_queue_length_ms_ >= 0 && QueueInMs() > max_queue_length_ms_) {
346      int64_t high_priority_capture_time = -1;
347      if (!high_priority_packets_->empty()) {
348        high_priority_capture_time =
349            high_priority_packets_->front().capture_time_ms_;
350        *packet_list = high_priority_packets_.get();
351      }
352      if (!normal_priority_packets_->empty() &&
353          (high_priority_capture_time == -1 || high_priority_capture_time >
354          normal_priority_packets_->front().capture_time_ms_)) {
355        *packet_list = normal_priority_packets_.get();
356      }
357      if (*packet_list)
358        return true;
359    }
360    return false;
361  }
362  if (!high_priority_packets_->empty()) {
363    *packet_list = high_priority_packets_.get();
364    return true;
365  }
366  if (!normal_priority_packets_->empty()) {
367    *packet_list = normal_priority_packets_.get();
368    return true;
369  }
370  if (!low_priority_packets_->empty()) {
371    *packet_list = low_priority_packets_.get();
372    return true;
373  }
374  return false;
375}
376
377void PacedSender::GetNextPacketFromList(paced_sender::PacketList* packets,
378    uint32_t* ssrc, uint16_t* sequence_number, int64_t* capture_time_ms,
379    bool* retransmission) {
380  paced_sender::Packet packet = packets->front();
381  UpdateMediaBytesSent(packet.bytes_);
382  *sequence_number = packet.sequence_number_;
383  *ssrc = packet.ssrc_;
384  *capture_time_ms = packet.capture_time_ms_;
385  *retransmission = packet.retransmission_;
386}
387
388// MUST have critsect_ when calling.
389void PacedSender::UpdateMediaBytesSent(int num_bytes) {
390  time_last_send_ = TickTime::Now();
391  media_budget_->UseBudget(num_bytes);
392  pad_up_to_bitrate_budget_->UseBudget(num_bytes);
393}
394
395}  // namespace webrtc
Note: See TracBrowser for help on using the repository browser.