新聞中心
雖然 Node.js 在 JS 層實(shí)現(xiàn)了子線程模塊,但是因?yàn)樽泳€程持有單獨(dú)的 V8 Isolate 和 loop,所以存在很多限制。比如我們希望在主線程繁忙時(shí)還能收集到主線程的一些數(shù)據(jù)(進(jìn)程 / V8 內(nèi)存、CPU 使用情況等)。這時(shí)候就需要使用 addon 去實(shí)現(xiàn),addon 雖然是 C、C++ 等語(yǔ)言寫(xiě)的,但是依然跑在主線程里。所以我們需要使用底層提供的原生子線程加上 addon 實(shí)現(xiàn)我們的需求。本文正是基于這種場(chǎng)景的 watchdog。

目前成都創(chuàng)新互聯(lián)已為1000+的企業(yè)提供了網(wǎng)站建設(shè)、域名、網(wǎng)站空間、網(wǎng)站托管、服務(wù)器托管、企業(yè)網(wǎng)站設(shè)計(jì)、堆龍德慶網(wǎng)站維護(hù)等服務(wù),公司將堅(jiān)持客戶導(dǎo)向、應(yīng)用為本的策略,正道將秉承"和諧、參與、激情"的文化,與客戶和合作伙伴齊心協(xié)力一起成長(zhǎng),共同發(fā)展。
首先看一下整體的類結(jié)構(gòu)。
下面看一下每個(gè)類的作用和實(shí)現(xiàn)。
1.TaskManager
因?yàn)榇嬖谥骶€程和子線程,線程間會(huì)互相向?qū)Ψ教峤蝗蝿?wù),這就涉及到多線程互斥訪問(wèn)的問(wèn)題。TaskManager 是負(fù)責(zé)管理任務(wù)的類,是線程安全的。
class TaskManager {
public:
TaskManager() {
uv_mutex_init(&tasks_mutex);
};
void add_task(task_cb cb, void * data) {
struct task* t = new task;
t->cb = cb;
t->data = data;
uv_mutex_lock(&tasks_mutex);
tasks.push_back(t);
uv_mutex_unlock(&tasks_mutex);
};
void handle_task() {
std::vector task_list;
uv_mutex_lock(&tasks_mutex);
task_list.swap(tasks);
uv_mutex_unlock(&tasks_mutex);
std::vector::iterator iter;
for(iter = task_list.begin(); task_list.end() != iter;)
{
struct task* t = (*iter);
t->cb(t->data);
iter = task_list.erase(iter);
delete t;
}
};
private:
std::vector tasks;
uv_mutex_t tasks_mutex;
};
實(shí)現(xiàn)很簡(jiǎn)單,主要是使用 uv_mutex_t 解決互斥問(wèn)題,提供 add_task 和 handle_task 兩個(gè)方法。add_task 是追加任務(wù),handle_task 是處理任務(wù)。handle_task 具體的調(diào)用時(shí)機(jī)由 TaskManager 的持有者決定。
2.WatchDog
WatchDog 是對(duì) Libuv 定時(shí)器的封裝,核心數(shù)據(jù)結(jié)構(gòu)是 watch_dog_ctx。
typedef void (*watch_dog_cb)(void* ctx);
struct watch_dog_ctx{
void * data; // 上下文
watch_dog_cb work; // 任務(wù)函數(shù)
int poll_interval; // 定時(shí)時(shí)間
};
watch_dog_ctx 是對(duì)一個(gè)定時(shí)任務(wù)的封裝。watch_dog_ctx 的任務(wù)會(huì)在子線程中定時(shí)被執(zhí)行,這就解決了主線程陷入繁忙時(shí)無(wú)法工作的問(wèn)題。
NodeWatchDog::WatchDog::WatchDog(uv_loop_t* loop, struct watch_dog_ctx* ctx) {
this->ctx = ctx;
uv_timer_init(loop, &timer);
timer.data = this;
is_stop = false;
}
void NodeWatchDog::WatchDog::start() {
if (is_stop) {
return;
}
uv_timer_start(
&timer,
[](uv_timer_t* timer) {
NodeWatchDog::WatchDog* watch_dog = (NodeWatchDog::WatchDog*)timer->data;
struct watch_dog_ctx* ctx = watch_dog->get_ctx();
ctx->work(ctx);
},
ctx->poll_interval,
ctx->poll_interval
);
};
void NodeWatchDog::WatchDog::stop() {
is_stop = true;
uv_timer_stop(&timer);
}
3.WatchDogWorker
WatchDogWorker 是負(fù)責(zé)管理子線程和 watchdog 的類。首先看一下定義。
class WatchDogWorker {
public:
WatchDogWorker();
~WatchDogWorker() {};
void start();
void stop();
void add_watchdog(struct watch_dog_ctx* watchdog);
void add_task(task_cb cb, void * data);
void handle_task();
uv_loop_t* get_event_loop() {
return &loop;
}
uv_sem_t * get_thread_sem() {
return &sem;
}
private:
uv_loop_t loop;
uv_thread_t tid;
uv_sem_t sem;
TaskManager task_manager;
uv_async_t notify_async;
std::vector watch_dogs;
};
1)add_watchdog 用于新增 watchdog。
void NodeWatchDog::WatchDogWorker::add_watchdog(struct watch_dog_ctx* watchdog_ctx) {
NodeWatchDog::WatchDog *watchdog = new NodeWatchDog::WatchDog(&loop, watchdog_ctx);
watch_dogs.push_back(watchdog);
}
2)start 函數(shù)用于創(chuàng)建子線程和啟動(dòng) watchdog。
void NodeWatchDog::WatchDogWorker::start() {
std::vector::iterator iter;
for(iter = watch_dogs.begin(); watch_dogs.end() != iter; iter++)
{
(*iter)->start();
}
int r = uv_thread_create(&tid, [](void *data) {
NodeWatchDog::WatchDogWorker * worker = (NodeWatchDog::WatchDogWorker *)data;
uv_sem_post((uv_sem_t*)worker->get_thread_sem());
uv_loop_t * loop = worker->get_event_loop();
uv_run(loop, UV_RUN_DEFAULT);
uv_loop_close(loop);
}, (void *)this);
if (!r) {
uv_sem_wait(&sem);
}
uv_sem_destroy(&sem);
}
啟動(dòng) watchdog 時(shí)會(huì)往子線程的 loop 里插入一個(gè)定時(shí)器。然后創(chuàng)建子線程,在子線程里開(kāi)啟一個(gè)新的 loop。在這個(gè) loop 里就會(huì)不斷執(zhí)行 watchdog 的任務(wù)。
3)add_task 用于其他線程外另一個(gè)線程插入一個(gè)任務(wù)。下面是任務(wù)的定義。
typedef void (*task_cb)(void *data);
struct task{
task()
{
cb = nullptr;
data = nullptr;
}
task_cb cb;
void *data;
};
定義很簡(jiǎn)單,一個(gè)工作函數(shù)和對(duì)應(yīng)的上下文。接著看 add_task。
void NodeWatchDog::WatchDogWorker::add_task(task_cb cb, void * data) {
task_manager.add_task(cb, data);
uv_async_send(¬ify_async);
}
add_task 直接調(diào)用 task_manager 的 add_task 函數(shù)插入任務(wù),因?yàn)樗WC了線程安全。接著通過(guò) Libuv 提供的 async 線程間通信機(jī)制通知另一個(gè)線程有新任務(wù)。如果是在 Node.js 里的話,還需要通過(guò)另一種方式進(jìn)行通知,這里就不具體展開(kāi)。
此處,我們就實(shí)現(xiàn)了在子線程里定時(shí)執(zhí)行任務(wù),并實(shí)現(xiàn)主線程和子線程互相提交任務(wù)的能力。最終再實(shí)現(xiàn)一個(gè) WatchDogManager 用于管理多個(gè) worker。
4.WatchDogManager
class WatchDogManager
{
public:
WatchDogManager(uv_loop_t *loop);
~WatchDogManager() {};
void start();
void stop();
void add_task(task_cb cb, void *data);
void handle_task();
void add_worker(WatchDogWorker* worker);
WatchDogWorker* get_worker(int index) { return workers[index]; };
private:
uv_async_t notify_async;
TaskManager task_manager;
std::vectorworkers;
};
通過(guò) WatchDogManager 的類定義,我們大概就知道它的功能。首先看一下 start 方法。
void NodeWatchDog::WatchDogManager::start(){
std::vector::iterator iter;
for(iter = workers.begin(); workers.end() != iter; iter++)
{
(*iter)->start();
}
}
start 函數(shù)就是啟動(dòng)多個(gè) worker,剛才已經(jīng)介紹過(guò),worker 會(huì)創(chuàng)建一個(gè)子線程定時(shí)執(zhí)行任務(wù)。其他方法就沒(méi)有太多邏輯,就不一一介紹。
5.使用
接下來(lái)看看使用方式。
#include "src/watch_dog_manager.h"
#include "uv.h"
#include "stdio.h"
using namespace NodeWatchDog;int main() {
setbuf(stdout, NULL);
uv_loop_t loop;
uv_loop_init(&loop);
WatchDogWorker *worker = new WatchDogWorker();
struct watch_dog_ctx* ctx = new watch_dog_ctx;
ctx->work = [](void *ctx) {
printf("worker task execute in thread id => %ld\n", (long)pthread_self());
};
ctx->poll_interval = 1000;
worker->add_watchdog(ctx);
worker->start();
uv_idle_t idle;
uv_idle_init(&loop, &idle);
uv_idle_start(&idle, [](uv_idle_t *) {});
printf("main thread id => %ld\n", (long)pthread_self());
uv_run(&loop, UV_RUN_DEFAULT);
delete worker;
delete ctx;
return 0;
}
新建一個(gè) watchdog 和 worker,并把 watchdog 插入到 worker 中,最后啟動(dòng) worker。同時(shí)主線程也進(jìn)入自己的 loop。執(zhí)行輸出如下。
main thread id => 4338490880
worker task execute in thread id => 123145480077312
worker task execute in thread id => 123145480077312
我們看到,printf 分別執(zhí)行在不同的線程中。接下來(lái)再看一下復(fù)雜的場(chǎng)景。
#include "src/watch_dog_manager.h"
#include "uv.h"
#include "stdio.h"
using namespace NodeWatchDog;
int main() {
setbuf(stdout, NULL);
uv_loop_t loop;
uv_loop_init(&loop);
WatchDogManager *manager = new WatchDogManager(&loop);
WatchDogWorker *worker = new WatchDogWorker();
struct watch_dog_ctx* ctx = new watch_dog_ctx;
ctx->data = (void *)manager;
ctx->work = [](void *ctx) {
struct watch_dog_ctx* watchdog_ctx = (struct watch_dog_ctx*)ctx;
WatchDogManager* manager = (WatchDogManager*)watchdog_ctx->data;
// 提交任務(wù)給主線程
manager->add_task([](void *data) {
printf("manager task execute in thread id => %ld\n", (long)pthread_self());
WatchDogManager* manager = (WatchDogManager*)data;
// 提交任務(wù)給某個(gè)子線程
manager->get_worker(0)->add_task([](void *data) {
printf("worker task execute in thread id => %ld\n", (long)pthread_self());
}, nullptr);
}, manager);
};
ctx->poll_interval = 1000;
worker->add_watchdog(ctx);
manager->add_worker(worker);
manager->start();
uv_idle_t idle;
uv_idle_init(&loop, &idle);
uv_idle_start(&idle, [](uv_idle_t *) {
//
});
printf("main thread id => %ld\n", (long)pthread_self());
uv_run(&loop, UV_RUN_DEFAULT);
delete manager;
delete worker;
delete ctx;
return 0;
}
上面的例子中,當(dāng)在子線程中執(zhí)行回調(diào)時(shí),通過(guò) manager->add_task 往主線程提及一個(gè)任務(wù)。然后在主線程執(zhí)行這個(gè)任務(wù)時(shí),首先輸出當(dāng)前線程 id,再通過(guò) manager->get_worker(0)->add_task 給子線程提交一個(gè)任務(wù)。比如我們希望定時(shí)收集主線程的 CPU 數(shù)據(jù),那么就可以給子線程插入一個(gè) watchdog,然后在 watchdog 回調(diào)里給主線程提交一個(gè)任務(wù),當(dāng)主線程執(zhí)行這個(gè)任務(wù)的時(shí)候,我們就可以拿到主線程的 CPU 數(shù)據(jù)。當(dāng)然還有更復(fù)雜的場(chǎng)景,比如獲取 CPU Profile 時(shí),主線程和子線程會(huì)進(jìn)行多次交互。最后再看一個(gè)多個(gè) worker 線程的例子。
WatchDogManager *manager = new WatchDogManager(&loop);
WatchDogWorker *worker1 = new WatchDogWorker();
WatchDogWorker *worker2 = new WatchDogWorker();
struct watch_dog_ctx* ctx = new watch_dog_ctx;
ctx->work = [](void *ctx) {
printf("manager task execute in thread id => %ld\n", (long)pthread_self());
};
ctx->poll_interval = 1000;
worker1->add_watchdog(ctx);
worker2->add_watchdog(ctx);
manager->add_worker(worker1);
manager->add_worker(worker2);
manager->start();
以上代碼輸出如下。
main thread id => 4469550592
manager task execute in thread id => 123145483321344
manager task execute in thread id => 123145491722240
manager task execute in thread id => 123145483321344
manager task execute in thread id => 123145491722240
我們看到存在多個(gè)子線程,并且成功執(zhí)行了任務(wù)。
后記:?jiǎn)尉€程使得編碼變得簡(jiǎn)單,但是也導(dǎo)致了一些限制,這時(shí)候我們就需要使用額外的子線程來(lái)解決單線程存在的限制。引入多線程后,就需要解決多線程互斥和通信問(wèn)題。以上代碼只是介紹了一個(gè)大致的思路,還有地方需要完善,如果有想法的同學(xué)也可以交流。
倉(cāng)庫(kù):https://github.com/theanarkh/uv-watchdog
網(wǎng)頁(yè)名稱:聊聊使用Libuv實(shí)現(xiàn)的Watchdog,你學(xué)會(huì)了嗎?
網(wǎng)站地址:http://m.fisionsoft.com.cn/article/cdegocp.html


咨詢
建站咨詢
