Note: We no longer publish the latest version of our code here. We primarily use a kumc-bmi github organization. The heron ETL repository, in particular, is not public. Peers in the informatics community should see MultiSiteDev for details on requesting access.

source: webrtc/talk/session/tunnel/pseudotcpchannel.cc @ 0:4bda6873e34c

pub_scrub_3792 tip
Last change on this file since 0:4bda6873e34c was 0:4bda6873e34c, checked in by Michael Prittie <mprittie@…>, 6 years ago

Scrubbed password for publication.

File size: 18.1 KB
Line 
1/*
2 * libjingle
3 * Copyright 2004--2006, Google Inc.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 *  1. Redistributions of source code must retain the above copyright notice,
9 *     this list of conditions and the following disclaimer.
10 *  2. Redistributions in binary form must reproduce the above copyright notice,
11 *     this list of conditions and the following disclaimer in the documentation
12 *     and/or other materials provided with the distribution.
13 *  3. The name of the author may not be used to endorse or promote products
14 *     derived from this software without specific prior written permission.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
17 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
18 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
19 * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
20 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
22 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
23 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
24 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
25 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26 */
27
28#include <string>
29#include "talk/base/basictypes.h"
30#include "talk/base/common.h"
31#include "talk/base/logging.h"
32#include "talk/base/scoped_ptr.h"
33#include "talk/base/stringutils.h"
34#include "talk/p2p/base/candidate.h"
35#include "talk/p2p/base/transportchannel.h"
36#include "pseudotcpchannel.h"
37
38using namespace talk_base;
39
40namespace cricket {
41
42extern const talk_base::ConstantLabel SESSION_STATES[];
43
44// MSG_WK_* - worker thread messages
45// MSG_ST_* - stream thread messages
46// MSG_SI_* - signal thread messages
47
48enum {
49  MSG_WK_CLOCK = 1,
50  MSG_WK_PURGE,
51  MSG_ST_EVENT,
52  MSG_SI_DESTROYCHANNEL,
53  MSG_SI_DESTROY,
54};
55
56struct EventData : public MessageData {
57  int event, error;
58  EventData(int ev, int err = 0) : event(ev), error(err) { }
59};
60
61///////////////////////////////////////////////////////////////////////////////
62// PseudoTcpChannel::InternalStream
63///////////////////////////////////////////////////////////////////////////////
64
65class PseudoTcpChannel::InternalStream : public StreamInterface {
66public:
67  InternalStream(PseudoTcpChannel* parent);
68  virtual ~InternalStream();
69
70  virtual StreamState GetState() const;
71  virtual StreamResult Read(void* buffer, size_t buffer_len,
72                                       size_t* read, int* error);
73  virtual StreamResult Write(const void* data, size_t data_len,
74                                        size_t* written, int* error);
75  virtual void Close();
76
77private:
78  // parent_ is accessed and modified exclusively on the event thread, to
79  // avoid thread contention.  This means that the PseudoTcpChannel cannot go
80  // away until after it receives a Close() from TunnelStream.
81  PseudoTcpChannel* parent_;
82};
83
84///////////////////////////////////////////////////////////////////////////////
85// PseudoTcpChannel
86// Member object lifetime summaries:
87//   session_ - passed in constructor, cleared when channel_ goes away.
88//   channel_ - created in Connect, destroyed when session_ or tcp_ goes away.
89//   tcp_ - created in Connect, destroyed when channel_ goes away, or connection
90//     closes.
91//   worker_thread_ - created when channel_ is created, purged when channel_ is
92//     destroyed.
93//   stream_ - created in GetStream, destroyed by owner at arbitrary time.
94//   this - created in constructor, destroyed when worker_thread_ and stream_
95//     are both gone.
96///////////////////////////////////////////////////////////////////////////////
97
98//
99// Signal thread methods
100//
101
102PseudoTcpChannel::PseudoTcpChannel(Thread* stream_thread, Session* session)
103  : signal_thread_(session->session_manager()->signaling_thread()),
104    worker_thread_(NULL),
105    stream_thread_(stream_thread),
106    session_(session), channel_(NULL), tcp_(NULL), stream_(NULL),
107    stream_readable_(false), pending_read_event_(false),
108    ready_to_connect_(false) {
109  ASSERT(signal_thread_->IsCurrent());
110  ASSERT(NULL != session_);
111}
112
113PseudoTcpChannel::~PseudoTcpChannel() {
114  ASSERT(signal_thread_->IsCurrent());
115  ASSERT(worker_thread_ == NULL);
116  ASSERT(session_ == NULL);
117  ASSERT(channel_ == NULL);
118  ASSERT(stream_ == NULL);
119  ASSERT(tcp_ == NULL);
120}
121
122bool PseudoTcpChannel::Connect(const std::string& content_name,
123                               const std::string& channel_name,
124                               int component) {
125  ASSERT(signal_thread_->IsCurrent());
126  CritScope lock(&cs_);
127
128  if (channel_)
129    return false;
130
131  ASSERT(session_ != NULL);
132  worker_thread_ = session_->session_manager()->worker_thread();
133  content_name_ = content_name;
134  channel_ = session_->CreateChannel(
135      content_name, channel_name, component);
136  channel_name_ = channel_name;
137  channel_->SetOption(Socket::OPT_DONTFRAGMENT, 1);
138
139  channel_->SignalDestroyed.connect(this,
140    &PseudoTcpChannel::OnChannelDestroyed);
141  channel_->SignalWritableState.connect(this,
142    &PseudoTcpChannel::OnChannelWritableState);
143  channel_->SignalReadPacket.connect(this,
144    &PseudoTcpChannel::OnChannelRead);
145  channel_->SignalRouteChange.connect(this,
146    &PseudoTcpChannel::OnChannelConnectionChanged);
147
148  ASSERT(tcp_ == NULL);
149  tcp_ = new PseudoTcp(this, 0);
150  if (session_->initiator()) {
151    // Since we may try several protocols and network adapters that won't work,
152    // waiting until we get our first writable notification before initiating
153    // TCP negotiation.
154    ready_to_connect_ = true;
155  }
156
157  return true;
158}
159
160StreamInterface* PseudoTcpChannel::GetStream() {
161  ASSERT(signal_thread_->IsCurrent());
162  CritScope lock(&cs_);
163  ASSERT(NULL != session_);
164  if (!stream_)
165    stream_ = new PseudoTcpChannel::InternalStream(this);
166  //TODO("should we disallow creation of new stream at some point?");
167  return stream_;
168}
169
170void PseudoTcpChannel::OnChannelDestroyed(TransportChannel* channel) {
171  LOG_F(LS_INFO) << "(" << channel->component() << ")";
172  ASSERT(signal_thread_->IsCurrent());
173  CritScope lock(&cs_);
174  ASSERT(channel == channel_);
175  signal_thread_->Clear(this, MSG_SI_DESTROYCHANNEL);
176  // When MSG_WK_PURGE is received, we know there will be no more messages from
177  // the worker thread.
178  worker_thread_->Clear(this, MSG_WK_CLOCK);
179  worker_thread_->Post(this, MSG_WK_PURGE);
180  session_ = NULL;
181  channel_ = NULL;
182  if ((stream_ != NULL)
183      && ((tcp_ == NULL) || (tcp_->State() != PseudoTcp::TCP_CLOSED)))
184    stream_thread_->Post(this, MSG_ST_EVENT, new EventData(SE_CLOSE, 0));
185  if (tcp_) {
186    tcp_->Close(true);
187    AdjustClock();
188  }
189  SignalChannelClosed(this);
190}
191
192void PseudoTcpChannel::OnSessionTerminate(Session* session) {
193  // When the session terminates before we even connected
194  CritScope lock(&cs_);
195  if (session_ != NULL && channel_ == NULL) {
196    ASSERT(session == session_);
197    ASSERT(worker_thread_ == NULL);
198    ASSERT(tcp_ == NULL);
199    LOG(LS_INFO) << "Destroying unconnected PseudoTcpChannel";
200    session_ = NULL;
201    if (stream_ != NULL)
202      stream_thread_->Post(this, MSG_ST_EVENT, new EventData(SE_CLOSE, -1));
203  }
204
205  // Even though session_ is being destroyed, we mustn't clear the pointer,
206  // since we'll need it to tear down channel_.
207  //
208  // TODO: Is it always the case that if channel_ != NULL then we'll get
209  // a channel-destroyed notification?
210}
211
212void PseudoTcpChannel::GetOption(PseudoTcp::Option opt, int* value) {
213  ASSERT(signal_thread_->IsCurrent());
214  CritScope lock(&cs_);
215  ASSERT(tcp_ != NULL);
216  tcp_->GetOption(opt, value);
217}
218
219void PseudoTcpChannel::SetOption(PseudoTcp::Option opt, int value) {
220  ASSERT(signal_thread_->IsCurrent());
221  CritScope lock(&cs_);
222  ASSERT(tcp_ != NULL);
223  tcp_->SetOption(opt, value);
224}
225
226//
227// Stream thread methods
228//
229
230StreamState PseudoTcpChannel::GetState() const {
231  ASSERT(stream_ != NULL && stream_thread_->IsCurrent());
232  CritScope lock(&cs_);
233  if (!session_)
234    return SS_CLOSED;
235  if (!tcp_)
236    return SS_OPENING;
237  switch (tcp_->State()) {
238    case PseudoTcp::TCP_LISTEN:
239    case PseudoTcp::TCP_SYN_SENT:
240    case PseudoTcp::TCP_SYN_RECEIVED:
241      return SS_OPENING;
242    case PseudoTcp::TCP_ESTABLISHED:
243      return SS_OPEN;
244    case PseudoTcp::TCP_CLOSED:
245    default:
246      return SS_CLOSED;
247  }
248}
249
250StreamResult PseudoTcpChannel::Read(void* buffer, size_t buffer_len,
251                                    size_t* read, int* error) {
252  ASSERT(stream_ != NULL && stream_thread_->IsCurrent());
253  CritScope lock(&cs_);
254  if (!tcp_)
255    return SR_BLOCK;
256
257  stream_readable_ = false;
258  int result = tcp_->Recv(static_cast<char*>(buffer), buffer_len);
259  //LOG_F(LS_VERBOSE) << "Recv returned: " << result;
260  if (result > 0) {
261    if (read)
262      *read = result;
263    // PseudoTcp doesn't currently support repeated Readable signals.  Simulate
264    // them here.
265    stream_readable_ = true;
266    if (!pending_read_event_) {
267      pending_read_event_ = true;
268      stream_thread_->Post(this, MSG_ST_EVENT, new EventData(SE_READ), true);
269    }
270    return SR_SUCCESS;
271  } else if (IsBlockingError(tcp_->GetError())) {
272    return SR_BLOCK;
273  } else {
274    if (error)
275      *error = tcp_->GetError();
276    return SR_ERROR;
277  }
278  // This spot is never reached.
279}
280
281StreamResult PseudoTcpChannel::Write(const void* data, size_t data_len,
282                                     size_t* written, int* error) {
283  ASSERT(stream_ != NULL && stream_thread_->IsCurrent());
284  CritScope lock(&cs_);
285  if (!tcp_)
286    return SR_BLOCK;
287  int result = tcp_->Send(static_cast<const char*>(data), data_len);
288  //LOG_F(LS_VERBOSE) << "Send returned: " << result;
289  if (result > 0) {
290    if (written)
291      *written = result;
292    return SR_SUCCESS;
293  } else if (IsBlockingError(tcp_->GetError())) {
294    return SR_BLOCK;
295  } else {
296    if (error)
297      *error = tcp_->GetError();
298    return SR_ERROR;
299  }
300  // This spot is never reached.
301}
302
303void PseudoTcpChannel::Close() {
304  ASSERT(stream_ != NULL && stream_thread_->IsCurrent());
305  CritScope lock(&cs_);
306  stream_ = NULL;
307  // Clear out any pending event notifications
308  stream_thread_->Clear(this, MSG_ST_EVENT);
309  if (tcp_) {
310    tcp_->Close(false);
311    AdjustClock();
312  } else {
313    CheckDestroy();
314  }
315}
316
317//
318// Worker thread methods
319//
320
321void PseudoTcpChannel::OnChannelWritableState(TransportChannel* channel) {
322  LOG_F(LS_VERBOSE) << "[" << channel_name_ << "]";
323  ASSERT(worker_thread_->IsCurrent());
324  CritScope lock(&cs_);
325  if (!channel_) {
326    LOG_F(LS_WARNING) << "NULL channel";
327    return;
328  }
329  ASSERT(channel == channel_);
330  if (!tcp_) {
331    LOG_F(LS_WARNING) << "NULL tcp";
332    return;
333  }
334  if (!ready_to_connect_ || !channel->writable())
335    return;
336
337  ready_to_connect_ = false;
338  tcp_->Connect();
339  AdjustClock();
340}
341
342void PseudoTcpChannel::OnChannelRead(TransportChannel* channel,
343                                     const char* data, size_t size, int flags) {
344  //LOG_F(LS_VERBOSE) << "(" << size << ")";
345  ASSERT(worker_thread_->IsCurrent());
346  CritScope lock(&cs_);
347  if (!channel_) {
348    LOG_F(LS_WARNING) << "NULL channel";
349    return;
350  }
351  ASSERT(channel == channel_);
352  if (!tcp_) {
353    LOG_F(LS_WARNING) << "NULL tcp";
354    return;
355  }
356  tcp_->NotifyPacket(data, size);
357  AdjustClock();
358}
359
360void PseudoTcpChannel::OnChannelConnectionChanged(TransportChannel* channel,
361                                                  const Candidate& candidate) {
362  LOG_F(LS_VERBOSE) << "[" << channel_name_ << "]";
363  ASSERT(worker_thread_->IsCurrent());
364  CritScope lock(&cs_);
365  if (!channel_) {
366    LOG_F(LS_WARNING) << "NULL channel";
367    return;
368  }
369  ASSERT(channel == channel_);
370  if (!tcp_) {
371    LOG_F(LS_WARNING) << "NULL tcp";
372    return;
373  }
374
375  uint16 mtu = 1280;  // safe default
376  int family = candidate.address().family();
377  Socket* socket =
378      worker_thread_->socketserver()->CreateAsyncSocket(family, SOCK_DGRAM);
379  talk_base::scoped_ptr<Socket> mtu_socket(socket);
380  if (socket == NULL) {
381    LOG_F(LS_WARNING) << "Couldn't create socket while estimating MTU.";
382  } else {
383    if (mtu_socket->Connect(candidate.address()) < 0 ||
384        mtu_socket->EstimateMTU(&mtu) < 0) {
385      LOG_F(LS_WARNING) << "Failed to estimate MTU, error="
386                        << mtu_socket->GetError();
387    }
388  }
389
390  LOG_F(LS_VERBOSE) << "Using MTU of " << mtu << " bytes";
391  tcp_->NotifyMTU(mtu);
392  AdjustClock();
393}
394
395void PseudoTcpChannel::OnTcpOpen(PseudoTcp* tcp) {
396  LOG_F(LS_VERBOSE) << "[" << channel_name_ << "]";
397  ASSERT(cs_.CurrentThreadIsOwner());
398  ASSERT(worker_thread_->IsCurrent());
399  ASSERT(tcp == tcp_);
400  if (stream_) {
401    stream_readable_ = true;
402    pending_read_event_ = true;
403    stream_thread_->Post(this, MSG_ST_EVENT,
404                         new EventData(SE_OPEN | SE_READ | SE_WRITE));
405  }
406}
407
408void PseudoTcpChannel::OnTcpReadable(PseudoTcp* tcp) {
409  //LOG_F(LS_VERBOSE);
410  ASSERT(cs_.CurrentThreadIsOwner());
411  ASSERT(worker_thread_->IsCurrent());
412  ASSERT(tcp == tcp_);
413  if (stream_) {
414    stream_readable_ = true;
415    if (!pending_read_event_) {
416      pending_read_event_ = true;
417      stream_thread_->Post(this, MSG_ST_EVENT, new EventData(SE_READ));
418    }
419  }
420}
421
422void PseudoTcpChannel::OnTcpWriteable(PseudoTcp* tcp) {
423  //LOG_F(LS_VERBOSE);
424  ASSERT(cs_.CurrentThreadIsOwner());
425  ASSERT(worker_thread_->IsCurrent());
426  ASSERT(tcp == tcp_);
427  if (stream_)
428    stream_thread_->Post(this, MSG_ST_EVENT, new EventData(SE_WRITE));
429}
430
431void PseudoTcpChannel::OnTcpClosed(PseudoTcp* tcp, uint32 nError) {
432  LOG_F(LS_VERBOSE) << "[" << channel_name_ << "]";
433  ASSERT(cs_.CurrentThreadIsOwner());
434  ASSERT(worker_thread_->IsCurrent());
435  ASSERT(tcp == tcp_);
436  if (stream_)
437    stream_thread_->Post(this, MSG_ST_EVENT, new EventData(SE_CLOSE, nError));
438}
439
440//
441// Multi-thread methods
442//
443
444void PseudoTcpChannel::OnMessage(Message* pmsg) {
445  if (pmsg->message_id == MSG_WK_CLOCK) {
446
447    ASSERT(worker_thread_->IsCurrent());
448    //LOG(LS_INFO) << "PseudoTcpChannel::OnMessage(MSG_WK_CLOCK)";
449    CritScope lock(&cs_);
450    if (tcp_) {
451      tcp_->NotifyClock(PseudoTcp::Now());
452      AdjustClock(false);
453    }
454
455  } else if (pmsg->message_id == MSG_WK_PURGE) {
456
457    ASSERT(worker_thread_->IsCurrent());
458    LOG_F(LS_INFO) << "(MSG_WK_PURGE)";
459    // At this point, we know there are no additional worker thread messages.
460    CritScope lock(&cs_);
461    ASSERT(NULL == session_);
462    ASSERT(NULL == channel_);
463    worker_thread_ = NULL;
464    CheckDestroy();
465
466  } else if (pmsg->message_id == MSG_ST_EVENT) {
467
468    ASSERT(stream_thread_->IsCurrent());
469    //LOG(LS_INFO) << "PseudoTcpChannel::OnMessage(MSG_ST_EVENT, "
470    //             << data->event << ", " << data->error << ")";
471    ASSERT(stream_ != NULL);
472    EventData* data = static_cast<EventData*>(pmsg->pdata);
473    if (data->event & SE_READ) {
474      CritScope lock(&cs_);
475      pending_read_event_ = false;
476    }
477    stream_->SignalEvent(stream_, data->event, data->error);
478    delete data;
479
480  } else if (pmsg->message_id == MSG_SI_DESTROYCHANNEL) {
481
482    ASSERT(signal_thread_->IsCurrent());
483    LOG_F(LS_INFO) << "(MSG_SI_DESTROYCHANNEL)";
484    ASSERT(session_ != NULL);
485    ASSERT(channel_ != NULL);
486    session_->DestroyChannel(content_name_, channel_->component());
487
488  } else if (pmsg->message_id == MSG_SI_DESTROY) {
489
490    ASSERT(signal_thread_->IsCurrent());
491    LOG_F(LS_INFO) << "(MSG_SI_DESTROY)";
492    // The message queue is empty, so it is safe to destroy ourselves.
493    delete this;
494
495  } else {
496    ASSERT(false);
497  }
498}
499
500IPseudoTcpNotify::WriteResult PseudoTcpChannel::TcpWritePacket(
501    PseudoTcp* tcp, const char* buffer, size_t len) {
502  ASSERT(cs_.CurrentThreadIsOwner());
503  ASSERT(tcp == tcp_);
504  ASSERT(NULL != channel_);
505  int sent = channel_->SendPacket(buffer, len, talk_base::DSCP_NO_CHANGE);
506  if (sent > 0) {
507    //LOG_F(LS_VERBOSE) << "(" << sent << ") Sent";
508    return IPseudoTcpNotify::WR_SUCCESS;
509  } else if (IsBlockingError(channel_->GetError())) {
510    LOG_F(LS_VERBOSE) << "Blocking";
511    return IPseudoTcpNotify::WR_SUCCESS;
512  } else if (channel_->GetError() == EMSGSIZE) {
513    LOG_F(LS_ERROR) << "EMSGSIZE";
514    return IPseudoTcpNotify::WR_TOO_LARGE;
515  } else {
516    PLOG(LS_ERROR, channel_->GetError()) << "PseudoTcpChannel::TcpWritePacket";
517    ASSERT(false);
518    return IPseudoTcpNotify::WR_FAIL;
519  }
520}
521
522void PseudoTcpChannel::AdjustClock(bool clear) {
523  ASSERT(cs_.CurrentThreadIsOwner());
524  ASSERT(NULL != tcp_);
525
526  long timeout = 0;
527  if (tcp_->GetNextClock(PseudoTcp::Now(), timeout)) {
528    ASSERT(NULL != channel_);
529    // Reset the next clock, by clearing the old and setting a new one.
530    if (clear)
531      worker_thread_->Clear(this, MSG_WK_CLOCK);
532    worker_thread_->PostDelayed(_max(timeout, 0L), this, MSG_WK_CLOCK);
533    return;
534  }
535
536  delete tcp_;
537  tcp_ = NULL;
538  ready_to_connect_ = false;
539
540  if (channel_) {
541    // If TCP has failed, no need for channel_ anymore
542    signal_thread_->Post(this, MSG_SI_DESTROYCHANNEL);
543  }
544}
545
546void PseudoTcpChannel::CheckDestroy() {
547  ASSERT(cs_.CurrentThreadIsOwner());
548  if ((worker_thread_ != NULL) || (stream_ != NULL))
549    return;
550  signal_thread_->Post(this, MSG_SI_DESTROY);
551}
552
553///////////////////////////////////////////////////////////////////////////////
554// PseudoTcpChannel::InternalStream
555///////////////////////////////////////////////////////////////////////////////
556
557PseudoTcpChannel::InternalStream::InternalStream(PseudoTcpChannel* parent)
558  : parent_(parent) {
559}
560
561PseudoTcpChannel::InternalStream::~InternalStream() {
562  Close();
563}
564
565StreamState PseudoTcpChannel::InternalStream::GetState() const {
566  if (!parent_)
567    return SS_CLOSED;
568  return parent_->GetState();
569}
570
571StreamResult PseudoTcpChannel::InternalStream::Read(
572    void* buffer, size_t buffer_len, size_t* read, int* error) {
573  if (!parent_) {
574    if (error)
575      *error = ENOTCONN;
576    return SR_ERROR;
577  }
578  return parent_->Read(buffer, buffer_len, read, error);
579}
580
581StreamResult PseudoTcpChannel::InternalStream::Write(
582    const void* data, size_t data_len,  size_t* written, int* error) {
583  if (!parent_) {
584    if (error)
585      *error = ENOTCONN;
586    return SR_ERROR;
587  }
588  return parent_->Write(data, data_len, written, error);
589}
590
591void PseudoTcpChannel::InternalStream::Close() {
592  if (!parent_)
593    return;
594  parent_->Close();
595  parent_ = NULL;
596}
597
598///////////////////////////////////////////////////////////////////////////////
599
600} // namespace cricket
Note: See TracBrowser for help on using the repository browser.