Note: We no longer publish the latest version of our code here. We primarily use a kumc-bmi github organization. The heron ETL repository, in particular, is not public. Peers in the informatics community should see MultiSiteDev for details on requesting access.

source: webrtc/talk/base/messagequeue.cc @ 0:4bda6873e34c

pub_scrub_3792 tip
Last change on this file since 0:4bda6873e34c was 0:4bda6873e34c, checked in by Michael Prittie <mprittie@…>, 6 years ago

Scrubbed password for publication.

File size: 11.6 KB
Line 
1/*
2 * libjingle
3 * Copyright 2004--2005, Google Inc.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 *  1. Redistributions of source code must retain the above copyright notice,
9 *     this list of conditions and the following disclaimer.
10 *  2. Redistributions in binary form must reproduce the above copyright notice,
11 *     this list of conditions and the following disclaimer in the documentation
12 *     and/or other materials provided with the distribution.
13 *  3. The name of the author may not be used to endorse or promote products
14 *     derived from this software without specific prior written permission.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
17 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
18 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
19 * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
20 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
22 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
23 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
24 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
25 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26 */
27
28#ifdef POSIX
29#include <sys/time.h>
30#endif
31
32#include "talk/base/common.h"
33#include "talk/base/logging.h"
34#include "talk/base/messagequeue.h"
35#include "talk/base/physicalsocketserver.h"
36
37
38namespace talk_base {
39
40const uint32 kMaxMsgLatency = 150;  // 150 ms
41
42//------------------------------------------------------------------
43// MessageQueueManager
44
45MessageQueueManager* MessageQueueManager::instance_ = NULL;
46
47MessageQueueManager* MessageQueueManager::Instance() {
48  // Note: This is not thread safe, but it is first called before threads are
49  // spawned.
50  if (!instance_)
51    instance_ = new MessageQueueManager;
52  return instance_;
53}
54
55bool MessageQueueManager::IsInitialized() {
56  return instance_ != NULL;
57}
58
59MessageQueueManager::MessageQueueManager() {
60}
61
62MessageQueueManager::~MessageQueueManager() {
63}
64
65void MessageQueueManager::Add(MessageQueue *message_queue) {
66  return Instance()->AddInternal(message_queue);
67}
68void MessageQueueManager::AddInternal(MessageQueue *message_queue) {
69  // MessageQueueManager methods should be non-reentrant, so we
70  // ASSERT that is the case.  If any of these ASSERT, please
71  // contact bpm or jbeda.
72  ASSERT(!crit_.CurrentThreadIsOwner());
73  CritScope cs(&crit_);
74  message_queues_.push_back(message_queue);
75}
76
77void MessageQueueManager::Remove(MessageQueue *message_queue) {
78  // If there isn't a message queue manager instance, then there isn't a queue
79  // to remove.
80  if (!instance_) return;
81  return Instance()->RemoveInternal(message_queue);
82}
83void MessageQueueManager::RemoveInternal(MessageQueue *message_queue) {
84  ASSERT(!crit_.CurrentThreadIsOwner());  // See note above.
85  // If this is the last MessageQueue, destroy the manager as well so that
86  // we don't leak this object at program shutdown. As mentioned above, this is
87  // not thread-safe, but this should only happen at program termination (when
88  // the ThreadManager is destroyed, and threads are no longer active).
89  bool destroy = false;
90  {
91    CritScope cs(&crit_);
92    std::vector<MessageQueue *>::iterator iter;
93    iter = std::find(message_queues_.begin(), message_queues_.end(),
94                     message_queue);
95    if (iter != message_queues_.end()) {
96      message_queues_.erase(iter);
97    }
98    destroy = message_queues_.empty();
99  }
100  if (destroy) {
101    instance_ = NULL;
102    delete this;
103  }
104}
105
106void MessageQueueManager::Clear(MessageHandler *handler) {
107  // If there isn't a message queue manager instance, then there aren't any
108  // queues to remove this handler from.
109  if (!instance_) return;
110  return Instance()->ClearInternal(handler);
111}
112void MessageQueueManager::ClearInternal(MessageHandler *handler) {
113  ASSERT(!crit_.CurrentThreadIsOwner());  // See note above.
114  CritScope cs(&crit_);
115  std::vector<MessageQueue *>::iterator iter;
116  for (iter = message_queues_.begin(); iter != message_queues_.end(); iter++)
117    (*iter)->Clear(handler);
118}
119
120//------------------------------------------------------------------
121// MessageQueue
122
123MessageQueue::MessageQueue(SocketServer* ss)
124    : ss_(ss), fStop_(false), fPeekKeep_(false), active_(false),
125      dmsgq_next_num_(0) {
126  if (!ss_) {
127    // Currently, MessageQueue holds a socket server, and is the base class for
128    // Thread.  It seems like it makes more sense for Thread to hold the socket
129    // server, and provide it to the MessageQueue, since the Thread controls
130    // the I/O model, and MQ is agnostic to those details.  Anyway, this causes
131    // messagequeue_unittest to depend on network libraries... yuck.
132    default_ss_.reset(new PhysicalSocketServer());
133    ss_ = default_ss_.get();
134  }
135  ss_->SetMessageQueue(this);
136}
137
138MessageQueue::~MessageQueue() {
139  // The signal is done from here to ensure
140  // that it always gets called when the queue
141  // is going away.
142  SignalQueueDestroyed();
143  if (active_) {
144    MessageQueueManager::Remove(this);
145    Clear(NULL);
146  }
147  if (ss_) {
148    ss_->SetMessageQueue(NULL);
149  }
150}
151
152void MessageQueue::set_socketserver(SocketServer* ss) {
153  ss_ = ss ? ss : default_ss_.get();
154  ss_->SetMessageQueue(this);
155}
156
157void MessageQueue::Quit() {
158  fStop_ = true;
159  ss_->WakeUp();
160}
161
162bool MessageQueue::IsQuitting() {
163  return fStop_;
164}
165
166void MessageQueue::Restart() {
167  fStop_ = false;
168}
169
170bool MessageQueue::Peek(Message *pmsg, int cmsWait) {
171  if (fPeekKeep_) {
172    *pmsg = msgPeek_;
173    return true;
174  }
175  if (!Get(pmsg, cmsWait))
176    return false;
177  msgPeek_ = *pmsg;
178  fPeekKeep_ = true;
179  return true;
180}
181
182bool MessageQueue::Get(Message *pmsg, int cmsWait, bool process_io) {
183  // Return and clear peek if present
184  // Always return the peek if it exists so there is Peek/Get symmetry
185
186  if (fPeekKeep_) {
187    *pmsg = msgPeek_;
188    fPeekKeep_ = false;
189    return true;
190  }
191
192  // Get w/wait + timer scan / dispatch + socket / event multiplexer dispatch
193
194  int cmsTotal = cmsWait;
195  int cmsElapsed = 0;
196  uint32 msStart = Time();
197  uint32 msCurrent = msStart;
198  while (true) {
199    // Check for sent messages
200    ReceiveSends();
201
202    // Check for posted events
203    int cmsDelayNext = kForever;
204    bool first_pass = true;
205    while (true) {
206      // All queue operations need to be locked, but nothing else in this loop
207      // (specifically handling disposed message) can happen inside the crit.
208      // Otherwise, disposed MessageHandlers will cause deadlocks.
209      {
210        CritScope cs(&crit_);
211        // On the first pass, check for delayed messages that have been
212        // triggered and calculate the next trigger time.
213        if (first_pass) {
214          first_pass = false;
215          while (!dmsgq_.empty()) {
216            if (TimeIsLater(msCurrent, dmsgq_.top().msTrigger_)) {
217              cmsDelayNext = TimeDiff(dmsgq_.top().msTrigger_, msCurrent);
218              break;
219            }
220            msgq_.push_back(dmsgq_.top().msg_);
221            dmsgq_.pop();
222          }
223        }
224        // Pull a message off the message queue, if available.
225        if (msgq_.empty()) {
226          break;
227        } else {
228          *pmsg = msgq_.front();
229          msgq_.pop_front();
230        }
231      }  // crit_ is released here.
232
233      // Log a warning for time-sensitive messages that we're late to deliver.
234      if (pmsg->ts_sensitive) {
235        int32 delay = TimeDiff(msCurrent, pmsg->ts_sensitive);
236        if (delay > 0) {
237          LOG_F(LS_WARNING) << "id: " << pmsg->message_id << "  delay: "
238                            << (delay + kMaxMsgLatency) << "ms";
239        }
240      }
241      // If this was a dispose message, delete it and skip it.
242      if (MQID_DISPOSE == pmsg->message_id) {
243        ASSERT(NULL == pmsg->phandler);
244        delete pmsg->pdata;
245        *pmsg = Message();
246        continue;
247      }
248      return true;
249    }
250
251    if (fStop_)
252      break;
253
254    // Which is shorter, the delay wait or the asked wait?
255
256    int cmsNext;
257    if (cmsWait == kForever) {
258      cmsNext = cmsDelayNext;
259    } else {
260      cmsNext = _max(0, cmsTotal - cmsElapsed);
261      if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext))
262        cmsNext = cmsDelayNext;
263    }
264
265    // Wait and multiplex in the meantime
266    if (!ss_->Wait(cmsNext, process_io))
267      return false;
268
269    // If the specified timeout expired, return
270
271    msCurrent = Time();
272    cmsElapsed = TimeDiff(msCurrent, msStart);
273    if (cmsWait != kForever) {
274      if (cmsElapsed >= cmsWait)
275        return false;
276    }
277  }
278  return false;
279}
280
281void MessageQueue::ReceiveSends() {
282}
283
284void MessageQueue::Post(MessageHandler *phandler, uint32 id,
285    MessageData *pdata, bool time_sensitive) {
286  if (fStop_)
287    return;
288
289  // Keep thread safe
290  // Add the message to the end of the queue
291  // Signal for the multiplexer to return
292
293  CritScope cs(&crit_);
294  EnsureActive();
295  Message msg;
296  msg.phandler = phandler;
297  msg.message_id = id;
298  msg.pdata = pdata;
299  if (time_sensitive) {
300    msg.ts_sensitive = Time() + kMaxMsgLatency;
301  }
302  msgq_.push_back(msg);
303  ss_->WakeUp();
304}
305
306void MessageQueue::DoDelayPost(int cmsDelay, uint32 tstamp,
307    MessageHandler *phandler, uint32 id, MessageData* pdata) {
308  if (fStop_)
309    return;
310
311  // Keep thread safe
312  // Add to the priority queue. Gets sorted soonest first.
313  // Signal for the multiplexer to return.
314
315  CritScope cs(&crit_);
316  EnsureActive();
317  Message msg;
318  msg.phandler = phandler;
319  msg.message_id = id;
320  msg.pdata = pdata;
321  DelayedMessage dmsg(cmsDelay, tstamp, dmsgq_next_num_, msg);
322  dmsgq_.push(dmsg);
323  // If this message queue processes 1 message every millisecond for 50 days,
324  // we will wrap this number.  Even then, only messages with identical times
325  // will be misordered, and then only briefly.  This is probably ok.
326  VERIFY(0 != ++dmsgq_next_num_);
327  ss_->WakeUp();
328}
329
330int MessageQueue::GetDelay() {
331  CritScope cs(&crit_);
332
333  if (!msgq_.empty())
334    return 0;
335
336  if (!dmsgq_.empty()) {
337    int delay = TimeUntil(dmsgq_.top().msTrigger_);
338    if (delay < 0)
339      delay = 0;
340    return delay;
341  }
342
343  return kForever;
344}
345
346void MessageQueue::Clear(MessageHandler *phandler, uint32 id,
347                         MessageList* removed) {
348  CritScope cs(&crit_);
349
350  // Remove messages with phandler
351
352  if (fPeekKeep_ && msgPeek_.Match(phandler, id)) {
353    if (removed) {
354      removed->push_back(msgPeek_);
355    } else {
356      delete msgPeek_.pdata;
357    }
358    fPeekKeep_ = false;
359  }
360
361  // Remove from ordered message queue
362
363  for (MessageList::iterator it = msgq_.begin(); it != msgq_.end();) {
364    if (it->Match(phandler, id)) {
365      if (removed) {
366        removed->push_back(*it);
367      } else {
368        delete it->pdata;
369      }
370      it = msgq_.erase(it);
371    } else {
372      ++it;
373    }
374  }
375
376  // Remove from priority queue. Not directly iterable, so use this approach
377
378  PriorityQueue::container_type::iterator new_end = dmsgq_.container().begin();
379  for (PriorityQueue::container_type::iterator it = new_end;
380       it != dmsgq_.container().end(); ++it) {
381    if (it->msg_.Match(phandler, id)) {
382      if (removed) {
383        removed->push_back(it->msg_);
384      } else {
385        delete it->msg_.pdata;
386      }
387    } else {
388      *new_end++ = *it;
389    }
390  }
391  dmsgq_.container().erase(new_end, dmsgq_.container().end());
392  dmsgq_.reheap();
393}
394
395void MessageQueue::Dispatch(Message *pmsg) {
396  pmsg->phandler->OnMessage(pmsg);
397}
398
399void MessageQueue::EnsureActive() {
400  ASSERT(crit_.CurrentThreadIsOwner());
401  if (!active_) {
402    active_ = true;
403    MessageQueueManager::Add(this);
404  }
405}
406
407}  // namespace talk_base
Note: See TracBrowser for help on using the repository browser.