无锡网站制作启,网络服务提供者知道或者应当知道网络,惠州网站建设方案托管,前端的网站重构怎么做rtc::Thread介绍
rtc::Thread类不仅仅实现了线程这个执行器#xff08;比如posix底层调用pthread相关接口创建线程#xff0c;管理线程等#xff09;#xff0c;还包括消息队列#xff08;message_queue)的实现#xff0c;rtc::Thread启动后就作为一个永不停止的event l…rtc::Thread介绍
rtc::Thread类不仅仅实现了线程这个执行器比如posix底层调用pthread相关接口创建线程管理线程等还包括消息队列message_queue)的实现rtc::Thread启动后就作为一个永不停止的event loop没有任务待执行就阻塞等待添加任务后就唤醒event loop去执行任务周而复始直到调用stop退出event loop退出线程线程join。 在WebRTC内部可以将消息队列等同于event loop消息队列为空就进行阻塞等待。 class RTC_LOCKABLE Thread : public MessageQueue {Thread关键接口
public:// Starts the execution of the thread.bool Start(Runnable* runnable nullptr);// Tells the thread to stop and waits until it is joined.// Never call Stop on the current thread. Instead use the inherited Quit// function which will exit the base MessageQueue without terminating the// underlying OS thread.virtual void Stop();virtual void Send(const Location posted_from,MessageHandler* phandler,uint32_t id 0,MessageData* pdata nullptr);// Convenience method to invoke a functor on another thread. Caller must// provide the |ReturnT| template argument, which cannot (easily) be deduced.// Uses Send() internally, which blocks the current thread until execution// is complete.// Ex: bool result thread.Invokebool(RTC_FROM_HERE,// MyFunctionReturningBool);// NOTE: This function can only be called when synchronous calls are allowed.// See ScopedDisallowBlockingCalls for details.template class ReturnT, class FunctorTReturnT Invoke(const Location posted_from, FunctorT functor) {FunctorMessageHandlerReturnT, FunctorT handler(std::forwardFunctorT(functor));InvokeInternal(posted_from, handler);return handler.MoveResult();}// ProcessMessages will process I/O and dispatch messages until:// 1) cms milliseconds have elapsed (returns true)// 2) Stop() is called (returns false)bool ProcessMessages(int cms);protected:// Blocks the calling thread until this thread has terminated.void Join();MessageQueue关键接口
public:
virtual void Quit();// Get() will process I/O until:
// 1) A message is available (returns true)
// 2) cmsWait seconds have elapsed (returns false)
// 3) Stop() is called (returns false)
virtual bool Get(Message* pmsg,int cmsWait kForever,bool process_io true);virtual void Post(const Location posted_from,MessageHandler* phandler,uint32_t id 0,MessageData* pdata nullptr,bool time_sensitive false);
virtual void PostDelayed(const Location posted_from,int cmsDelay,MessageHandler* phandler,uint32_t id 0,MessageData* pdata nullptr);
virtual void PostAt(const Location posted_from,int64_t tstamp,MessageHandler* phandler,uint32_t id 0,MessageData* pdata nullptr);virtual void Dispatch(Message* pmsg);
virtual void ReceiveSends();protected:
void WakeUpSocketServer();MessageList msgq_ RTC_GUARDED_BY(crit_);
PriorityQueue dmsgq_ RTC_GUARDED_BY(crit_);线程启动Start
调用Start接口启动底层线程同时进入一个永不停止的event loop除非调用Stop接口 流程如下 Start-pthread_create-PreRun-Run
void Thread::Run() {ProcessMessages(kForever);
}最终通过Get接口获取消息去执行DispatchGet获取不到消息就是进入阻塞状态wait等待有消息后被唤醒。
线程消息队列处理消息的流程ProcessMessage
1、处理从其他线程发送的要在本线程去执行的消息即同步调用
接收者线程处理流程
发送者线程流程 2、处理延迟消息存储在优先级队列 延迟消息是通过PostDelayed和PostAt接口调用然后push到优先级队列中dmsgq_小根堆 3、异步消息存储在普通队列里 延迟消息是通过Pos接口调用然后push到普通队列中msgq_)
任务提交方式Invoke/Post webrtc内部消息其实是对待执行任务的封装消息和任务可以认为是一个意思 消息要继承MessageHandler实现OnMessage
class MessageHandler {public:virtual ~MessageHandler();virtual void OnMessage(Message* msg) 0;protected:MessageHandler() {}private:RTC_DISALLOW_COPY_AND_ASSIGN(MessageHandler);
};因为执行消息实际上就是执行OnMessage详见Dispatch接口实现
上一章节其实已经把三种任务提交方式介绍过了 1、同步阻塞调用SendInvoke Invoke其实最终也是调用SendInvoke是个函数模版可以非常方便在目标执行线程执行函数然后获得返回值Invoke实现如下 // Convenience method to invoke a functor on another thread. Caller must// provide the |ReturnT| template argument, which cannot (easily) be deduced.// Uses Send() internally, which blocks the current thread until execution// is complete.// Ex: bool result thread.Invokebool(RTC_FROM_HERE,// MyFunctionReturningBool);// NOTE: This function can only be called when synchronous calls are allowed.// See ScopedDisallowBlockingCalls for details.template class ReturnT, class FunctorTReturnT Invoke(const Location posted_from, FunctorT functor) {FunctorMessageHandlerReturnT, FunctorT handler(std::forwardFunctorT(functor));InvokeInternal(posted_from, handler);return handler.MoveResult();}void Thread::InvokeInternal(const Location posted_from,MessageHandler* handler) {TRACE_EVENT2(webrtc, Thread::Invoke, src_file_and_line,posted_from.file_and_line(), src_func,posted_from.function_name());Send(posted_from, handler);
}调用方式举例
bool result thread.Invokebool(RTC_FROM_HERE, MyFunctionReturningBool);2、异步非阻塞延迟调用 PostDelayed和PostAt
3、异步非阻塞调用 Post
线程退出Stop
void Thread::Stop() {MessageQueue::Quit();Join();
}void MessageQueue::Quit() {AtomicOps::ReleaseStore(stop_, 1);WakeUpSocketServer();
}void Thread::Join() {if (!IsRunning())return;RTC_DCHECK(!IsCurrent());if (Current() !Current()-blocking_calls_allowed_) {RTC_LOG(LS_WARNING) Waiting for the thread to join, but blocking calls have been disallowed;}#if defined(WEBRTC_WIN)RTC_DCHECK(thread_ ! nullptr);WaitForSingleObject(thread_, INFINITE);CloseHandle(thread_);thread_ nullptr;thread_id_ 0;
#elif defined(WEBRTC_POSIX)pthread_join(thread_, nullptr);thread_ 0;
#endif
}