more code

This commit is contained in:
Hongze Cheng 2023-06-13 17:16:06 +08:00
parent 42a09cc9df
commit e01d598da9
3 changed files with 63 additions and 56 deletions

View File

@ -49,7 +49,8 @@ int32_t vnodeEncodeConfig(const void* pObj, SJson* pJson);
int32_t vnodeDecodeConfig(const SJson* pJson, void* pObj); int32_t vnodeDecodeConfig(const SJson* pJson, void* pObj);
// vnodeModule.c // vnodeModule.c
int32_t vnodeScheduleTask(int32_t (*execute)(void*), void* arg); int vnodeScheduleTask(int (*execute)(void*), void* arg);
int vnodeScheduleTaskEx(int tpid, int (*execute)(void*), void* arg);
// vnodeBufPool.c // vnodeBufPool.c
typedef struct SVBufPoolNode SVBufPoolNode; typedef struct SVBufPoolNode SVBufPoolNode;

View File

@ -16,6 +16,7 @@
#include "inc/tsdbFS.h" #include "inc/tsdbFS.h"
extern int vnodeScheduleTask(int (*execute)(void *), void *arg); extern int vnodeScheduleTask(int (*execute)(void *), void *arg);
extern int vnodeScheduleTaskEx(int tpid, int (*execute)(void *), void *arg);
#define TSDB_FS_EDIT_MIN TSDB_FEDIT_COMMIT #define TSDB_FS_EDIT_MIN TSDB_FEDIT_COMMIT
#define TSDB_FS_EDIT_MAX (TSDB_FEDIT_MERGE + 1) #define TSDB_FS_EDIT_MAX (TSDB_FEDIT_MERGE + 1)
@ -612,7 +613,7 @@ int32_t tsdbFSEditCommit(STFileSystem *fs) {
if (fobj->f->stt->nseg < fs->tsdb->pVnode->config.sttTrigger) continue; if (fobj->f->stt->nseg < fs->tsdb->pVnode->config.sttTrigger) continue;
code = vnodeScheduleTask(tsdbMerge, fs->tsdb); code = vnodeScheduleTaskEx(1, tsdbMerge, fs->tsdb);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
fs->mergeTaskOn = true; fs->mergeTaskOn = true;

View File

@ -23,26 +23,24 @@ struct SVnodeTask {
void* arg; void* arg;
}; };
struct SVnodeGlobal { typedef struct {
int8_t init;
int8_t stop;
int nthreads; int nthreads;
TdThread* threads; TdThread* threads;
TdThreadMutex mutex; TdThreadMutex mutex;
TdThreadCond hasTask; TdThreadCond hasTask;
SVnodeTask queue; SVnodeTask queue;
} SVnodeThreadPool;
struct SVnodeGlobal {
int8_t init;
int8_t stop;
SVnodeThreadPool tp[2];
}; };
struct SVnodeGlobal vnodeGlobal; struct SVnodeGlobal vnodeGlobal;
static void* loop(void* arg); static void* loop(void* arg);
static tsem_t canCommit = {0};
static void vnodeInitCommit() { tsem_init(&canCommit, 0, 4); };
void vnode_wait_commit() { tsem_wait(&canCommit); }
void vnode_done_commit() { tsem_wait(&canCommit); }
int vnodeInit(int nthreads) { int vnodeInit(int nthreads) {
int8_t init; int8_t init;
int ret; int ret;
@ -51,28 +49,30 @@ int vnodeInit(int nthreads) {
if (init) { if (init) {
return 0; return 0;
} }
taosThreadMutexInit(&vnodeGlobal.mutex, NULL);
taosThreadCondInit(&vnodeGlobal.hasTask, NULL);
taosThreadMutexLock(&vnodeGlobal.mutex);
vnodeGlobal.stop = 0; vnodeGlobal.stop = 0;
vnodeGlobal.queue.next = &vnodeGlobal.queue;
vnodeGlobal.queue.prev = &vnodeGlobal.queue;
taosThreadMutexUnlock(&(vnodeGlobal.mutex)); for (int32_t i = 0; i < ARRAY_SIZE(vnodeGlobal.tp); i++) {
taosThreadMutexInit(&vnodeGlobal.tp[i].mutex, NULL);
taosThreadCondInit(&vnodeGlobal.tp[i].hasTask, NULL);
vnodeGlobal.nthreads = nthreads; taosThreadMutexLock(&vnodeGlobal.tp[i].mutex);
vnodeGlobal.threads = taosMemoryCalloc(nthreads, sizeof(TdThread));
if (vnodeGlobal.threads == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
vError("failed to init vnode module since:%s", tstrerror(terrno));
return -1;
}
for (int i = 0; i < nthreads; i++) { vnodeGlobal.tp[i].queue.next = &vnodeGlobal.tp[i].queue;
taosThreadCreate(&(vnodeGlobal.threads[i]), NULL, loop, NULL); vnodeGlobal.tp[i].queue.prev = &vnodeGlobal.tp[i].queue;
taosThreadMutexUnlock(&(vnodeGlobal.tp[i].mutex));
vnodeGlobal.tp[i].nthreads = nthreads;
vnodeGlobal.tp[i].threads = taosMemoryCalloc(nthreads, sizeof(TdThread));
if (vnodeGlobal.tp[i].threads == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
vError("failed to init vnode module since:%s", tstrerror(terrno));
return -1;
}
for (int j = 0; j < nthreads; j++) {
taosThreadCreate(&(vnodeGlobal.tp[i].threads[j]), NULL, loop, &vnodeGlobal.tp[i]);
}
} }
if (walInit() < 0) { if (walInit() < 0) {
@ -92,27 +92,29 @@ void vnodeCleanup() {
if (init == 0) return; if (init == 0) return;
// set stop // set stop
taosThreadMutexLock(&(vnodeGlobal.mutex));
vnodeGlobal.stop = 1; vnodeGlobal.stop = 1;
taosThreadCondBroadcast(&(vnodeGlobal.hasTask)); for (int32_t i = 0; i < ARRAY_SIZE(vnodeGlobal.tp); i++) {
taosThreadMutexUnlock(&(vnodeGlobal.mutex)); taosThreadMutexLock(&(vnodeGlobal.tp[i].mutex));
taosThreadCondBroadcast(&(vnodeGlobal.tp[i].hasTask));
taosThreadMutexUnlock(&(vnodeGlobal.tp[i].mutex));
// wait for threads // wait for threads
for (int i = 0; i < vnodeGlobal.nthreads; i++) { for (int j = 0; j < vnodeGlobal.tp[i].nthreads; j++) {
taosThreadJoin(vnodeGlobal.threads[i], NULL); taosThreadJoin(vnodeGlobal.tp[i].threads[j], NULL);
}
// clear source
taosMemoryFreeClear(vnodeGlobal.tp[i].threads);
taosThreadCondDestroy(&(vnodeGlobal.tp[i].hasTask));
taosThreadMutexDestroy(&(vnodeGlobal.tp[i].mutex));
} }
// clear source
taosMemoryFreeClear(vnodeGlobal.threads);
taosThreadCondDestroy(&(vnodeGlobal.hasTask));
taosThreadMutexDestroy(&(vnodeGlobal.mutex));
walCleanUp(); walCleanUp();
tqCleanUp(); tqCleanUp();
smaCleanUp(); smaCleanUp();
} }
int vnodeScheduleTask(int (*execute)(void*), void* arg) { int vnodeScheduleTaskEx(int tpid, int (*execute)(void*), void* arg) {
SVnodeTask* pTask; SVnodeTask* pTask;
ASSERT(!vnodeGlobal.stop); ASSERT(!vnodeGlobal.stop);
@ -126,35 +128,38 @@ int vnodeScheduleTask(int (*execute)(void*), void* arg) {
pTask->execute = execute; pTask->execute = execute;
pTask->arg = arg; pTask->arg = arg;
taosThreadMutexLock(&(vnodeGlobal.mutex)); taosThreadMutexLock(&(vnodeGlobal.tp[tpid].mutex));
pTask->next = &vnodeGlobal.queue; pTask->next = &vnodeGlobal.tp[tpid].queue;
pTask->prev = vnodeGlobal.queue.prev; pTask->prev = vnodeGlobal.tp[tpid].queue.prev;
vnodeGlobal.queue.prev->next = pTask; vnodeGlobal.tp[tpid].queue.prev->next = pTask;
vnodeGlobal.queue.prev = pTask; vnodeGlobal.tp[tpid].queue.prev = pTask;
taosThreadCondSignal(&(vnodeGlobal.hasTask)); taosThreadCondSignal(&(vnodeGlobal.tp[tpid].hasTask));
taosThreadMutexUnlock(&(vnodeGlobal.mutex)); taosThreadMutexUnlock(&(vnodeGlobal.tp[tpid].mutex));
return 0; return 0;
} }
int vnodeScheduleTask(int (*execute)(void*), void* arg) { return vnodeScheduleTaskEx(0, execute, arg); }
/* ------------------------ STATIC METHODS ------------------------ */ /* ------------------------ STATIC METHODS ------------------------ */
static void* loop(void* arg) { static void* loop(void* arg) {
SVnodeTask* pTask; SVnodeThreadPool* tp = (SVnodeThreadPool*)arg;
int ret; SVnodeTask* pTask;
int ret;
setThreadName("vnode-commit"); setThreadName("vnode-commit");
for (;;) { for (;;) {
taosThreadMutexLock(&(vnodeGlobal.mutex)); taosThreadMutexLock(&(tp->mutex));
for (;;) { for (;;) {
pTask = vnodeGlobal.queue.next; pTask = tp->queue.next;
if (pTask == &vnodeGlobal.queue) { if (pTask == &tp->queue) {
// no task // no task
if (vnodeGlobal.stop) { if (vnodeGlobal.stop) {
taosThreadMutexUnlock(&(vnodeGlobal.mutex)); taosThreadMutexUnlock(&(tp->mutex));
return NULL; return NULL;
} else { } else {
taosThreadCondWait(&(vnodeGlobal.hasTask), &(vnodeGlobal.mutex)); taosThreadCondWait(&(tp->hasTask), &(tp->mutex));
} }
} else { } else {
// has task // has task
@ -164,7 +169,7 @@ static void* loop(void* arg) {
} }
} }
taosThreadMutexUnlock(&(vnodeGlobal.mutex)); taosThreadMutexUnlock(&(tp->mutex));
pTask->execute(pTask->arg); pTask->execute(pTask->arg);
taosMemoryFree(pTask); taosMemoryFree(pTask);