Merge pull request #23864 from taosdata/feat/TD-27461-3.0

feat: backgroud task priority
This commit is contained in:
Hongze Cheng 2023-11-30 13:51:57 +08:00 committed by GitHub
commit 0bfa86f188
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 1093 additions and 475 deletions

View File

@ -14,6 +14,8 @@ set(
"src/vnd/vnodeSnapshot.c"
"src/vnd/vnodeRetention.c"
"src/vnd/vnodeInitApi.c"
"src/vnd/vnodeAsync.c"
"src/vnd/vnodeHash.c"
# meta
"src/meta/metaOpen.c"

View File

@ -309,7 +309,12 @@ int32_t tsdbTakeReadSnap2(STsdbReader *pReader, _query_reseek_func_t reseek, STs
void tsdbUntakeReadSnap2(STsdbReader *pReader, STsdbReadSnap *pSnap, bool proactive);
// tsdbMerge.c ==============================================================================================
int32_t tsdbSchedMerge(STsdb *tsdb, int32_t fid);
typedef struct {
STsdb *tsdb;
int32_t fid;
} SMergeArg;
int32_t tsdbMerge(void *arg);
// tsdbDiskData ==============================================================================================
int32_t tDiskDataBuilderCreate(SDiskDataBuilder **ppBuilder);

View File

@ -48,9 +48,32 @@ int32_t vnodeCheckCfg(const SVnodeCfg*);
int32_t vnodeEncodeConfig(const void* pObj, SJson* pJson);
int32_t vnodeDecodeConfig(const SJson* pJson, void* pObj);
// vnodeAsync.c
typedef struct SVAsync SVAsync;
typedef enum {
EVA_PRIORITY_HIGH = 0,
EVA_PRIORITY_NORMAL,
EVA_PRIORITY_LOW,
} EVAPriority;
#define VNODE_ASYNC_VALID_CHANNEL_ID(channelId) ((channelId) > 0)
#define VNODE_ASYNC_VALID_TASK_ID(taskId) ((taskId) > 0)
int32_t vnodeAsyncInit(SVAsync** async, char* label);
int32_t vnodeAsyncDestroy(SVAsync** async);
int32_t vnodeAChannelInit(SVAsync* async, int64_t* channelId);
int32_t vnodeAChannelDestroy(SVAsync* async, int64_t channelId, bool waitRunning);
int32_t vnodeAsync(SVAsync* async, EVAPriority priority, int32_t (*execute)(void*), void (*complete)(void*), void* arg,
int64_t* taskId);
int32_t vnodeAsyncC(SVAsync* async, int64_t channelId, EVAPriority priority, int32_t (*execute)(void*),
void (*complete)(void*), void* arg, int64_t* taskId);
int32_t vnodeAWait(SVAsync* async, int64_t taskId);
int32_t vnodeACancel(SVAsync* async, int64_t taskId);
int32_t vnodeAsyncSetWorkers(SVAsync* async, int32_t numWorkers);
// vnodeModule.c
int vnodeScheduleTask(int (*execute)(void*), void* arg);
int vnodeScheduleTaskEx(int tpid, int (*execute)(void*), void* arg);
extern SVAsync* vnodeAsyncHandle[2];
// vnodeBufPool.c
typedef struct SVBufPoolNode SVBufPoolNode;

View File

@ -452,13 +452,16 @@ struct SVnode {
SVBufPool* recycleTail;
SVBufPool* onRecycle;
// commit variables
int64_t commitChannel;
int64_t commitTask;
SMeta* pMeta;
SSma* pSma;
STsdb* pTsdb;
SWal* pWal;
STQ* pTq;
SSink* pSink;
tsem_t canCommit;
int64_t sync;
TdThreadMutex lock;
bool blocked;

View File

@ -20,8 +20,6 @@
#define BLOCK_COMMIT_FACTOR 3
extern int vnodeScheduleTask(int (*execute)(void *), void *arg);
extern int vnodeScheduleTaskEx(int tpid, int (*execute)(void *), void *arg);
extern void remove_file(const char *fname, bool last_level);
#define TSDB_FS_EDIT_MIN TSDB_FEDIT_COMMIT
@ -651,7 +649,6 @@ _exit:
static int32_t close_file_system(STFileSystem *fs) {
TARRAY2_CLEAR(fs->fSetArr, tsdbTFileSetClear);
TARRAY2_CLEAR(fs->fSetArrTmp, tsdbTFileSetClear);
// TODO
return 0;
}
@ -748,36 +745,31 @@ _exit:
return code;
}
static void tsdbDoWaitBgTask(STFileSystem *fs, STFSBgTask *task) {
task->numWait++;
taosThreadCondWait(task->done, &fs->tsdb->mutex);
task->numWait--;
int32_t tsdbFSCancelAllBgTask(STFileSystem *fs) {
TARRAY2(int64_t) channelArr = {0};
if (task->numWait == 0) {
taosThreadCondDestroy(task->done);
if (task->destroy) {
task->destroy(task->arg);
}
taosMemoryFree(task);
// collect all open channels
taosThreadMutexLock(&fs->tsdb->mutex);
STFileSet *fset;
TARRAY2_FOREACH(fs->fSetArr, fset) {
if (VNODE_ASYNC_VALID_CHANNEL_ID(fset->bgTaskChannel)) {
TARRAY2_APPEND(&channelArr, fset->bgTaskChannel);
fset->bgTaskChannel = 0;
}
}
taosThreadMutexUnlock(&fs->tsdb->mutex);
static void tsdbDoDoneBgTask(STFileSystem *fs, STFSBgTask *task) {
if (task->numWait > 0) {
taosThreadCondBroadcast(task->done);
} else {
taosThreadCondDestroy(task->done);
if (task->destroy) {
task->destroy(task->arg);
}
taosMemoryFree(task);
}
// destroy all channels
int64_t channel;
TARRAY2_FOREACH(&channelArr, channel) { vnodeAChannelDestroy(vnodeAsyncHandle[1], channel, true); }
TARRAY2_DESTROY(&channelArr, NULL);
return 0;
}
int32_t tsdbCloseFS(STFileSystem **fs) {
if (fs[0] == NULL) return 0;
tsdbFSDisableBgTask(fs[0]);
tsdbFSCancelAllBgTask(*fs);
close_file_system(fs[0]);
destroy_fs(fs);
return 0;
@ -910,7 +902,20 @@ int32_t tsdbFSEditCommit(STFileSystem *fs) {
}
if (!skipMerge) {
code = tsdbSchedMerge(fs->tsdb, fset->fid);
code = tsdbTFileSetOpenChannel(fset);
TSDB_CHECK_CODE(code, lino, _exit);
SMergeArg *arg = taosMemoryMalloc(sizeof(*arg));
if (arg == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
arg->tsdb = fs->tsdb;
arg->fid = fset->fid;
code = vnodeAsyncC(vnodeAsyncHandle[1], fset->bgTaskChannel, EVA_PRIORITY_HIGH, tsdbMerge, taosMemoryFree,
arg, NULL);
TSDB_CHECK_CODE(code, lino, _exit);
}
}
@ -939,7 +944,11 @@ int32_t tsdbFSEditCommit(STFileSystem *fs) {
}
}
if (tsdbTFileSetIsEmpty(fset) && fset->bgTaskRunning == NULL) {
if (tsdbTFileSetIsEmpty(fset)) {
if (VNODE_ASYNC_VALID_CHANNEL_ID(fset->bgTaskChannel)) {
vnodeAChannelDestroy(vnodeAsyncHandle[1], fset->bgTaskChannel, false);
fset->bgTaskChannel = 0;
}
TARRAY2_REMOVE(fs->fSetArr, i, tsdbTFileSetClear);
} else {
i++;
@ -1180,135 +1189,3 @@ _out:
}
return code;
}
const char *gFSBgTaskName[] = {NULL, "MERGE", "RETENTION", "COMPACT"};
static int32_t tsdbFSRunBgTask(void *arg) {
STFSBgTask *task = (STFSBgTask *)arg;
STFileSystem *fs = task->fs;
task->launchTime = taosGetTimestampMs();
task->run(task->arg);
task->finishTime = taosGetTimestampMs();
tsdbDebug("vgId:%d bg task:%s task id:%" PRId64 " finished, schedule time:%" PRId64 " launch time:%" PRId64
" finish time:%" PRId64,
TD_VID(fs->tsdb->pVnode), gFSBgTaskName[task->type], task->taskid, task->scheduleTime, task->launchTime,
task->finishTime);
taosThreadMutexLock(&fs->tsdb->mutex);
STFileSet *fset = NULL;
tsdbFSGetFSet(fs, task->fid, &fset);
ASSERT(fset != NULL && fset->bgTaskRunning == task);
// free last
tsdbDoDoneBgTask(fs, task);
fset->bgTaskRunning = NULL;
// schedule next
if (fset->bgTaskNum > 0) {
if (fs->stop) {
while (fset->bgTaskNum > 0) {
STFSBgTask *nextTask = fset->bgTaskQueue->next;
nextTask->prev->next = nextTask->next;
nextTask->next->prev = nextTask->prev;
fset->bgTaskNum--;
tsdbDoDoneBgTask(fs, nextTask);
}
} else {
// pop task from head
fset->bgTaskRunning = fset->bgTaskQueue->next;
fset->bgTaskRunning->prev->next = fset->bgTaskRunning->next;
fset->bgTaskRunning->next->prev = fset->bgTaskRunning->prev;
fset->bgTaskNum--;
vnodeScheduleTaskEx(1, tsdbFSRunBgTask, fset->bgTaskRunning);
}
}
taosThreadMutexUnlock(&fs->tsdb->mutex);
return 0;
}
// IMPORTANT: the caller must hold the fs->tsdb->mutex
int32_t tsdbFSScheduleBgTask(STFileSystem *fs, int32_t fid, EFSBgTaskT type, int32_t (*run)(void *),
void (*destroy)(void *), void *arg, int64_t *taskid) {
if (fs->stop) {
if (destroy) {
destroy(arg);
}
return 0;
}
STFileSet *fset;
tsdbFSGetFSet(fs, fid, &fset);
ASSERT(fset != NULL);
for (STFSBgTask *task = fset->bgTaskQueue->next; task != fset->bgTaskQueue; task = task->next) {
if (task->type == type) {
if (destroy) {
destroy(arg);
}
return 0;
}
}
// do schedule task
STFSBgTask *task = taosMemoryCalloc(1, sizeof(STFSBgTask));
if (task == NULL) return TSDB_CODE_OUT_OF_MEMORY;
taosThreadCondInit(task->done, NULL);
task->fs = fs;
task->fid = fid;
task->type = type;
task->run = run;
task->destroy = destroy;
task->arg = arg;
task->scheduleTime = taosGetTimestampMs();
task->taskid = ++fs->taskid;
if (fset->bgTaskRunning == NULL && fset->bgTaskNum == 0) {
// launch task directly
fset->bgTaskRunning = task;
vnodeScheduleTaskEx(1, tsdbFSRunBgTask, task);
} else {
// add to the queue tail
fset->bgTaskNum++;
task->next = fset->bgTaskQueue;
task->prev = fset->bgTaskQueue->prev;
task->prev->next = task;
task->next->prev = task;
}
if (taskid) *taskid = task->taskid;
return 0;
}
int32_t tsdbFSDisableBgTask(STFileSystem *fs) {
taosThreadMutexLock(&fs->tsdb->mutex);
for (;;) {
fs->stop = true;
bool done = true;
STFileSet *fset;
TARRAY2_FOREACH(fs->fSetArr, fset) {
if (fset->bgTaskRunning) {
tsdbDoWaitBgTask(fs, fset->bgTaskRunning);
done = false;
break;
}
}
if (done) break;
}
taosThreadMutexUnlock(&fs->tsdb->mutex);
return 0;
}
int32_t tsdbFSEnableBgTask(STFileSystem *fs) {
taosThreadMutexLock(&fs->tsdb->mutex);
fs->stop = false;
taosThreadMutexUnlock(&fs->tsdb->mutex);
return 0;
}

View File

@ -55,11 +55,6 @@ int64_t tsdbFSAllocEid(STFileSystem *fs);
int32_t tsdbFSEditBegin(STFileSystem *fs, const TFileOpArray *opArray, EFEditT etype);
int32_t tsdbFSEditCommit(STFileSystem *fs);
int32_t tsdbFSEditAbort(STFileSystem *fs);
// background task
int32_t tsdbFSScheduleBgTask(STFileSystem *fs, int32_t fid, EFSBgTaskT type, int32_t (*run)(void *),
void (*destroy)(void *), void *arg, int64_t *taskid);
int32_t tsdbFSDisableBgTask(STFileSystem *fs);
int32_t tsdbFSEnableBgTask(STFileSystem *fs);
// other
int32_t tsdbFSGetFSet(STFileSystem *fs, int32_t fid, STFileSet **fset);
int32_t tsdbFSCheckCommit(STsdb *tsdb, int32_t fid);

View File

@ -14,6 +14,7 @@
*/
#include "tsdbFSet2.h"
#include "vnd.h"
int32_t tsdbSttLvlInit(int32_t level, SSttLvl **lvl) {
if (!(lvl[0] = taosMemoryMalloc(sizeof(SSttLvl)))) return TSDB_CODE_OUT_OF_MEMORY;
@ -451,10 +452,7 @@ int32_t tsdbTFileSetInit(int32_t fid, STFileSet **fset) {
TARRAY2_INIT(fset[0]->lvlArr);
// background task queue
fset[0]->bgTaskNum = 0;
fset[0]->bgTaskQueue->next = fset[0]->bgTaskQueue;
fset[0]->bgTaskQueue->prev = fset[0]->bgTaskQueue;
fset[0]->bgTaskRunning = NULL;
fset[0]->bgTaskChannel = 0;
// block commit variables
taosThreadCondInit(&fset[0]->canCommit, NULL);
@ -650,3 +648,8 @@ bool tsdbTFileSetIsEmpty(const STFileSet *fset) {
}
return TARRAY2_SIZE(fset->lvlArr) == 0;
}
int32_t tsdbTFileSetOpenChannel(STFileSet *fset) {
if (VNODE_ASYNC_VALID_CHANNEL_ID(fset->bgTaskChannel)) return 0;
return vnodeAChannelInit(vnodeAsyncHandle[1], &fset->bgTaskChannel);
}

View File

@ -22,14 +22,12 @@
extern "C" {
#endif
typedef struct STFileSet STFileSet;
typedef struct STFileOp STFileOp;
typedef struct SSttLvl SSttLvl;
typedef TARRAY2(STFileObj *) TFileObjArray;
typedef TARRAY2(SSttLvl *) TSttLvlArray;
typedef TARRAY2(STFileOp) TFileOpArray;
typedef struct STFileSystem STFileSystem;
typedef struct STFSBgTask STFSBgTask;
typedef enum {
TSDB_FOP_NONE = 0,
@ -72,33 +70,8 @@ bool tsdbTFileSetIsEmpty(const STFileSet *fset);
// stt
int32_t tsdbSttLvlInit(int32_t level, SSttLvl **lvl);
int32_t tsdbSttLvlClear(SSttLvl **lvl);
typedef enum {
TSDB_BG_TASK_MERGER = 1,
TSDB_BG_TASK_RETENTION,
TSDB_BG_TASK_COMPACT,
} EFSBgTaskT;
struct STFSBgTask {
STFileSystem *fs;
int32_t fid;
EFSBgTaskT type;
int32_t (*run)(void *arg);
void (*destroy)(void *arg);
void *arg;
TdThreadCond done[1];
int32_t numWait;
int64_t taskid;
int64_t scheduleTime;
int64_t launchTime;
int64_t finishTime;
struct STFSBgTask *prev;
struct STFSBgTask *next;
};
// open channel
int32_t tsdbTFileSetOpenChannel(STFileSet *fset);
struct STFileOp {
tsdb_fop_t optype;
@ -118,10 +91,8 @@ struct STFileSet {
STFileObj *farr[TSDB_FTYPE_MAX]; // file array
TSttLvlArray lvlArr[1]; // level array
// background task queue
int32_t bgTaskNum;
STFSBgTask bgTaskQueue[1];
STFSBgTask *bgTaskRunning;
// background task channel
int64_t bgTaskChannel;
// block commit variables
TdThreadCond canCommit;

View File

@ -17,11 +17,6 @@
#define TSDB_MAX_LEVEL 2 // means max level is 3
typedef struct {
STsdb *tsdb;
int32_t fid;
} SMergeArg;
typedef struct {
STsdb *tsdb;
int32_t fid;
@ -528,7 +523,7 @@ static int32_t tsdbMergeGetFSet(SMerger *merger) {
return 0;
}
static int32_t tsdbMerge(void *arg) {
int32_t tsdbMerge(void *arg) {
int32_t code = 0;
int32_t lino = 0;
SMergeArg *mergeArg = (SMergeArg *)arg;
@ -597,18 +592,3 @@ _exit:
tsdbTFileSetClear(&merger->fset);
return code;
}
int32_t tsdbSchedMerge(STsdb *tsdb, int32_t fid) {
SMergeArg *arg = taosMemoryMalloc(sizeof(*arg));
if (arg == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
arg->tsdb = tsdb;
arg->fid = fid;
int32_t code = tsdbFSScheduleBgTask(tsdb->pFS, fid, TSDB_BG_TASK_MERGER, tsdbMerge, taosMemoryFree, arg, NULL);
if (code) taosMemoryFree(arg);
return code;
}

View File

@ -249,7 +249,7 @@ _exit:
if (code) {
TSDB_ERROR_LOG(TD_VID(rtner->tsdb->pVnode), lino, code);
} else {
tsdbInfo("vid:%d, cid:%" PRId64 ", %s done", TD_VID(rtner->tsdb->pVnode), rtner->cid, __func__);
tsdbDebug("vid:%d, cid:%" PRId64 ", %s done", TD_VID(rtner->tsdb->pVnode), rtner->cid, __func__);
}
return code;
}
@ -279,7 +279,7 @@ _exit:
if (code) {
TSDB_ERROR_LOG(TD_VID(rtner->tsdb->pVnode), lino, code);
} else {
tsdbInfo("vid:%d, cid:%" PRId64 ", %s done", TD_VID(rtner->tsdb->pVnode), rtner->cid, __func__);
tsdbDebug("vid:%d, cid:%" PRId64 ", %s done", TD_VID(rtner->tsdb->pVnode), rtner->cid, __func__);
}
tsdbFSDestroyCopySnapshot(&rtner->fsetArr);
return code;
@ -391,32 +391,6 @@ _exit:
static void tsdbFreeRtnArg(void *arg) { taosMemoryFree(arg); }
static int32_t tsdbDoRetentionSync(void *arg) {
int32_t code = 0;
int32_t lino = 0;
SRTNer rtner[1] = {0};
code = tsdbDoRetentionBegin(arg, rtner);
TSDB_CHECK_CODE(code, lino, _exit);
STFileSet *fset;
TARRAY2_FOREACH(rtner->fsetArr, fset) {
code = tsdbDoRetentionOnFileSet(rtner, fset);
TSDB_CHECK_CODE(code, lino, _exit);
}
code = tsdbDoRetentionEnd(rtner);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
if (code) {
TSDB_ERROR_LOG(TD_VID(rtner->tsdb->pVnode), lino, code);
}
tsem_post(&((SRtnArg *)arg)->tsdb->pVnode->canCommit);
tsdbFreeRtnArg(arg);
return code;
}
static int32_t tsdbDoRetentionAsync(void *arg) {
int32_t code = 0;
int32_t lino = 0;
@ -454,28 +428,16 @@ _exit:
int32_t tsdbRetention(STsdb *tsdb, int64_t now, int32_t sync) {
int32_t code = 0;
if (sync) { // sync retention
SRtnArg *arg = taosMemoryMalloc(sizeof(*arg));
if (arg == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
arg->tsdb = tsdb;
arg->now = now;
arg->fid = INT32_MAX;
tsem_wait(&tsdb->pVnode->canCommit);
code = vnodeScheduleTask(tsdbDoRetentionSync, arg);
if (code) {
tsem_post(&tsdb->pVnode->canCommit);
taosMemoryFree(arg);
return code;
}
} else { // async retention
taosThreadMutexLock(&tsdb->mutex);
STFileSet *fset;
TARRAY2_FOREACH(tsdb->pFS->fSetArr, fset) {
code = tsdbTFileSetOpenChannel(fset);
if (code) {
taosThreadMutexUnlock(&tsdb->mutex);
return code;
}
SRtnArg *arg = taosMemoryMalloc(sizeof(*arg));
if (arg == NULL) {
taosThreadMutexUnlock(&tsdb->mutex);
@ -486,8 +448,13 @@ int32_t tsdbRetention(STsdb *tsdb, int64_t now, int32_t sync) {
arg->now = now;
arg->fid = fset->fid;
code = tsdbFSScheduleBgTask(tsdb->pFS, fset->fid, TSDB_BG_TASK_RETENTION, tsdbDoRetentionAsync, tsdbFreeRtnArg,
arg, NULL);
if (sync) {
code = vnodeAsyncC(vnodeAsyncHandle[0], tsdb->pVnode->commitChannel, EVA_PRIORITY_LOW, tsdbDoRetentionAsync,
tsdbFreeRtnArg, arg, NULL);
} else {
code = vnodeAsyncC(vnodeAsyncHandle[1], fset->bgTaskChannel, EVA_PRIORITY_LOW, tsdbDoRetentionAsync,
tsdbFreeRtnArg, arg, NULL);
}
if (code) {
tsdbFreeRtnArg(arg);
taosThreadMutexUnlock(&tsdb->mutex);
@ -496,7 +463,6 @@ int32_t tsdbRetention(STsdb *tsdb, int64_t now, int32_t sync) {
}
taosThreadMutexUnlock(&tsdb->mutex);
}
return code;
}

View File

@ -1032,9 +1032,6 @@ int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, void* pRang
int32_t code = 0;
int32_t lino = 0;
// disable background tasks
tsdbFSDisableBgTask(pTsdb->pFS);
// start to write
writer[0] = taosMemoryCalloc(1, sizeof(*writer[0]));
if (writer[0] == NULL) return TSDB_CODE_OUT_OF_MEMORY;
@ -1107,7 +1104,6 @@ int32_t tsdbSnapWriterClose(STsdbSnapWriter** writer, int8_t rollback) {
taosThreadMutexUnlock(&writer[0]->tsdb->mutex);
}
tsdbFSEnableBgTask(tsdb->pFS);
tsdbIterMergerClose(&writer[0]->ctx->tombIterMerger);
tsdbIterMergerClose(&writer[0]->ctx->dataIterMerger);
@ -1595,3 +1591,6 @@ _out:
return code;
}
extern int32_t tsdbFSCancelAllBgTask(STFileSystem* fs);
int32_t tsdbCancelAllBgTask(STsdb* tsdb) { return tsdbFSCancelAllBgTask(tsdb->pFS); }

View File

@ -0,0 +1,719 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "vnd.h"
#include "vnodeHash.h"
typedef struct SVATask SVATask;
typedef struct SVAChannel SVAChannel;
#define VNODE_ASYNC_DEFAULT_WORKERS 4
#define VNODE_ASYNC_MAX_WORKERS 256
// priority
#define EVA_PRIORITY_MAX (EVA_PRIORITY_LOW + 1)
// worker
typedef enum {
EVA_WORKER_STATE_UINIT = 0,
EVA_WORKER_STATE_ACTIVE,
EVA_WORKER_STATE_IDLE,
EVA_WORKER_STATE_STOP,
} EVWorkerState;
typedef struct {
SVAsync *async;
int32_t workerId;
EVWorkerState state;
TdThread thread;
SVATask *runningTask;
} SVWorker;
// task
typedef enum {
EVA_TASK_STATE_WAITTING = 0,
EVA_TASK_STATE_RUNNING,
} EVATaskState;
struct SVATask {
int64_t taskId;
EVAPriority priority;
int32_t priorScore;
SVAChannel *channel;
int32_t (*execute)(void *);
void (*complete)(void *);
void *arg;
EVATaskState state;
// wait
int32_t numWait;
TdThreadCond waitCond;
// queue
struct SVATask *prev;
struct SVATask *next;
};
#define VATASK_PIORITY(task_) ((task_)->priority - ((task_)->priorScore / 4))
// async channel
typedef enum {
EVA_CHANNEL_STATE_OPEN = 0,
EVA_CHANNEL_STATE_CLOSE,
} EVAChannelState;
struct SVAChannel {
int64_t channelId;
EVAChannelState state;
SVATask queue[EVA_PRIORITY_MAX];
SVATask *scheduled;
SVAChannel *prev;
SVAChannel *next;
};
// async handle
struct SVAsync {
const char *label;
TdThreadMutex mutex;
TdThreadCond hasTask;
bool stop;
// worker
int32_t numWorkers;
int32_t numLaunchWorkers;
int32_t numIdleWorkers;
SVWorker workers[VNODE_ASYNC_MAX_WORKERS];
// channel
int64_t nextChannelId;
int32_t numChannels;
SVAChannel chList;
SVHashTable *channelTable;
// task
int64_t nextTaskId;
int32_t numTasks;
SVATask queue[EVA_PRIORITY_MAX];
SVHashTable *taskTable;
};
static int32_t vnodeAsyncTaskDone(SVAsync *async, SVATask *task) {
int32_t ret;
if (task->channel != NULL && task->channel->scheduled == task) {
task->channel->scheduled = NULL;
if (task->channel->state == EVA_CHANNEL_STATE_CLOSE) {
taosMemoryFree(task->channel);
} else {
for (int32_t i = 0; i < EVA_PRIORITY_MAX; i++) {
SVATask *nextTask = task->channel->queue[i].next;
if (nextTask != &task->channel->queue[i]) {
if (task->channel->scheduled == NULL) {
task->channel->scheduled = nextTask;
nextTask->next->prev = nextTask->prev;
nextTask->prev->next = nextTask->next;
} else {
nextTask->priorScore++;
int32_t newPriority = VATASK_PIORITY(nextTask);
if (newPriority != i) {
// remove from current priority queue
nextTask->prev->next = nextTask->next;
nextTask->next->prev = nextTask->prev;
// add to new priority queue
nextTask->next = &task->channel->queue[newPriority];
nextTask->prev = task->channel->queue[newPriority].prev;
nextTask->next->prev = nextTask;
nextTask->prev->next = nextTask;
}
}
}
}
if (task->channel->scheduled != NULL) {
int32_t priority = VATASK_PIORITY(task->channel->scheduled);
task->channel->scheduled->next = &async->queue[priority];
task->channel->scheduled->prev = async->queue[priority].prev;
task->channel->scheduled->next->prev = task->channel->scheduled;
task->channel->scheduled->prev->next = task->channel->scheduled;
}
}
}
ret = vHashDrop(async->taskTable, task);
if (ret != 0) {
ASSERT(0);
}
async->numTasks--;
// call complete callback
if (task->complete) {
task->complete(task->arg);
}
if (task->numWait == 0) {
taosThreadCondDestroy(&task->waitCond);
taosMemoryFree(task);
} else if (task->numWait == 1) {
taosThreadCondSignal(&task->waitCond);
} else {
taosThreadCondBroadcast(&task->waitCond);
}
return 0;
}
static int32_t vnodeAsyncCancelAllTasks(SVAsync *async) {
for (int32_t i = 0; i < EVA_PRIORITY_MAX; i++) {
while (async->queue[i].next != &async->queue[i]) {
SVATask *task = async->queue[i].next;
task->prev->next = task->next;
task->next->prev = task->prev;
vnodeAsyncTaskDone(async, task);
}
}
return 0;
}
static void *vnodeAsyncLoop(void *arg) {
SVWorker *worker = (SVWorker *)arg;
SVAsync *async = worker->async;
setThreadName(async->label);
for (;;) {
taosThreadMutexLock(&async->mutex);
// finish last running task
if (worker->runningTask != NULL) {
vnodeAsyncTaskDone(async, worker->runningTask);
worker->runningTask = NULL;
}
for (;;) {
if (async->stop || worker->workerId >= async->numWorkers) {
if (async->stop) { // cancel all tasks
vnodeAsyncCancelAllTasks(async);
}
worker->state = EVA_WORKER_STATE_STOP;
async->numLaunchWorkers--;
taosThreadMutexUnlock(&async->mutex);
return NULL;
}
for (int32_t i = 0; i < EVA_PRIORITY_MAX; i++) {
SVATask *task = async->queue[i].next;
if (task != &async->queue[i]) {
if (worker->runningTask == NULL) {
worker->runningTask = task;
task->prev->next = task->next;
task->next->prev = task->prev;
} else { // promote priority
task->priorScore++;
int32_t priority = VATASK_PIORITY(task);
if (priority != i) {
// remove from current priority queue
task->prev->next = task->next;
task->next->prev = task->prev;
// add to new priority queue
task->next = &async->queue[priority];
task->prev = async->queue[priority].prev;
task->next->prev = task;
task->prev->next = task;
}
}
}
}
if (worker->runningTask == NULL) {
worker->state = EVA_WORKER_STATE_IDLE;
async->numIdleWorkers++;
taosThreadCondWait(&async->hasTask, &async->mutex);
async->numIdleWorkers--;
worker->state = EVA_WORKER_STATE_ACTIVE;
} else {
worker->runningTask->state = EVA_TASK_STATE_RUNNING;
break;
}
}
taosThreadMutexUnlock(&async->mutex);
// do run the task
worker->runningTask->execute(worker->runningTask->arg);
}
return NULL;
}
static uint32_t vnodeAsyncTaskHash(const void *obj) {
SVATask *task = (SVATask *)obj;
return MurmurHash3_32((const char *)(&task->taskId), sizeof(task->taskId));
}
static int32_t vnodeAsyncTaskCompare(const void *obj1, const void *obj2) {
SVATask *task1 = (SVATask *)obj1;
SVATask *task2 = (SVATask *)obj2;
if (task1->taskId < task2->taskId) {
return -1;
} else if (task1->taskId > task2->taskId) {
return 1;
}
return 0;
}
static uint32_t vnodeAsyncChannelHash(const void *obj) {
SVAChannel *channel = (SVAChannel *)obj;
return MurmurHash3_32((const char *)(&channel->channelId), sizeof(channel->channelId));
}
static int32_t vnodeAsyncChannelCompare(const void *obj1, const void *obj2) {
SVAChannel *channel1 = (SVAChannel *)obj1;
SVAChannel *channel2 = (SVAChannel *)obj2;
if (channel1->channelId < channel2->channelId) {
return -1;
} else if (channel1->channelId > channel2->channelId) {
return 1;
}
return 0;
}
int32_t vnodeAsyncInit(SVAsync **async, char *label) {
int32_t ret;
if (async == NULL) {
return TSDB_CODE_INVALID_PARA;
}
if (label == NULL) {
label = "anonymous";
}
(*async) = (SVAsync *)taosMemoryCalloc(1, sizeof(SVAsync) + strlen(label) + 1);
if ((*async) == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
strcpy((char *)((*async) + 1), label);
(*async)->label = (const char *)((*async) + 1);
taosThreadMutexInit(&(*async)->mutex, NULL);
taosThreadCondInit(&(*async)->hasTask, NULL);
(*async)->stop = false;
// worker
(*async)->numWorkers = VNODE_ASYNC_DEFAULT_WORKERS;
(*async)->numLaunchWorkers = 0;
(*async)->numIdleWorkers = 0;
for (int32_t i = 0; i < VNODE_ASYNC_MAX_WORKERS; i++) {
(*async)->workers[i].async = (*async);
(*async)->workers[i].workerId = i;
(*async)->workers[i].state = EVA_WORKER_STATE_UINIT;
(*async)->workers[i].runningTask = NULL;
}
// channel
(*async)->nextChannelId = 0;
(*async)->numChannels = 0;
(*async)->chList.prev = &(*async)->chList;
(*async)->chList.next = &(*async)->chList;
ret = vHashInit(&(*async)->channelTable, vnodeAsyncChannelHash, vnodeAsyncChannelCompare);
if (ret != 0) {
taosThreadMutexDestroy(&(*async)->mutex);
taosThreadCondDestroy(&(*async)->hasTask);
taosMemoryFree(*async);
return ret;
}
// task
(*async)->nextTaskId = 0;
(*async)->numTasks = 0;
for (int32_t i = 0; i < EVA_PRIORITY_MAX; i++) {
(*async)->queue[i].next = &(*async)->queue[i];
(*async)->queue[i].prev = &(*async)->queue[i];
}
ret = vHashInit(&(*async)->taskTable, vnodeAsyncTaskHash, vnodeAsyncTaskCompare);
if (ret != 0) {
vHashDestroy(&(*async)->channelTable);
taosThreadMutexDestroy(&(*async)->mutex);
taosThreadCondDestroy(&(*async)->hasTask);
taosMemoryFree(*async);
return ret;
}
return 0;
}
int32_t vnodeAsyncDestroy(SVAsync **async) {
if ((*async) == NULL) {
return TSDB_CODE_INVALID_PARA;
}
// set stop and broadcast
taosThreadMutexLock(&(*async)->mutex);
(*async)->stop = true;
taosThreadCondBroadcast(&(*async)->hasTask);
taosThreadMutexUnlock(&(*async)->mutex);
// join all workers
for (int32_t i = 0; i < VNODE_ASYNC_MAX_WORKERS; i++) {
taosThreadMutexLock(&(*async)->mutex);
EVWorkerState state = (*async)->workers[i].state;
taosThreadMutexUnlock(&(*async)->mutex);
if (state == EVA_WORKER_STATE_UINIT) {
continue;
}
taosThreadJoin((*async)->workers[i].thread, NULL);
ASSERT((*async)->workers[i].state == EVA_WORKER_STATE_STOP);
(*async)->workers[i].state = EVA_WORKER_STATE_UINIT;
}
// close all channels
for (SVAChannel *channel = (*async)->chList.next; channel != &(*async)->chList; channel = (*async)->chList.next) {
channel->next->prev = channel->prev;
channel->prev->next = channel->next;
int32_t ret = vHashDrop((*async)->channelTable, channel);
if (ret) {
ASSERT(0);
}
(*async)->numChannels--;
taosMemoryFree(channel);
}
ASSERT((*async)->numLaunchWorkers == 0);
ASSERT((*async)->numIdleWorkers == 0);
ASSERT((*async)->numChannels == 0);
ASSERT((*async)->numTasks == 0);
taosThreadMutexDestroy(&(*async)->mutex);
taosThreadCondDestroy(&(*async)->hasTask);
vHashDestroy(&(*async)->channelTable);
vHashDestroy(&(*async)->taskTable);
taosMemoryFree(*async);
*async = NULL;
return 0;
}
static int32_t vnodeAsyncLaunchWorker(SVAsync *async) {
for (int32_t i = 0; i < async->numWorkers; i++) {
ASSERT(async->workers[i].state != EVA_WORKER_STATE_IDLE);
if (async->workers[i].state == EVA_WORKER_STATE_ACTIVE) {
continue;
} else if (async->workers[i].state == EVA_WORKER_STATE_STOP) {
taosThreadJoin(async->workers[i].thread, NULL);
async->workers[i].state = EVA_WORKER_STATE_UINIT;
}
taosThreadCreate(&async->workers[i].thread, NULL, vnodeAsyncLoop, &async->workers[i]);
async->workers[i].state = EVA_WORKER_STATE_ACTIVE;
async->numLaunchWorkers++;
break;
}
return 0;
}
int32_t vnodeAsync(SVAsync *async, EVAPriority priority, int32_t (*execute)(void *), void (*complete)(void *),
void *arg, int64_t *taskId) {
return vnodeAsyncC(async, 0, priority, execute, complete, arg, taskId);
}
int32_t vnodeAsyncC(SVAsync *async, int64_t channelId, EVAPriority priority, int32_t (*execute)(void *),
void (*complete)(void *), void *arg, int64_t *taskId) {
if (async == NULL || execute == NULL || channelId < 0) {
return TSDB_CODE_INVALID_PARA;
}
int64_t id;
// create task object
SVATask *task = (SVATask *)taosMemoryCalloc(1, sizeof(SVATask));
if (task == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
task->priority = priority;
task->priorScore = 0;
task->execute = execute;
task->complete = complete;
task->arg = arg;
task->state = EVA_TASK_STATE_WAITTING;
task->numWait = 0;
taosThreadCondInit(&task->waitCond, NULL);
// schedule task
taosThreadMutexLock(&async->mutex);
if (channelId == 0) {
task->channel = NULL;
} else {
SVAChannel channel = {.channelId = channelId};
vHashGet(async->channelTable, &channel, (void **)&task->channel);
if (task->channel == NULL) {
taosThreadMutexUnlock(&async->mutex);
taosThreadCondDestroy(&task->waitCond);
taosMemoryFree(task);
return TSDB_CODE_INVALID_PARA;
}
}
task->taskId = id = ++async->nextTaskId;
// add task to hash table
int32_t ret = vHashPut(async->taskTable, task);
if (ret != 0) {
taosThreadMutexUnlock(&async->mutex);
taosThreadCondDestroy(&task->waitCond);
taosMemoryFree(task);
return ret;
}
async->numTasks++;
// add task to queue
if (task->channel == NULL || task->channel->scheduled == NULL) {
// add task to async->queue
if (task->channel) {
task->channel->scheduled = task;
}
task->next = &async->queue[priority];
task->prev = async->queue[priority].prev;
task->next->prev = task;
task->prev->next = task;
// signal worker or launch new worker
if (async->numIdleWorkers > 0) {
taosThreadCondSignal(&(async->hasTask));
} else if (async->numLaunchWorkers < async->numWorkers) {
vnodeAsyncLaunchWorker(async);
}
} else if (task->channel->scheduled->state == EVA_TASK_STATE_RUNNING ||
priority >= VATASK_PIORITY(task->channel->scheduled)) {
// add task to task->channel->queue
task->next = &task->channel->queue[priority];
task->prev = task->channel->queue[priority].prev;
task->next->prev = task;
task->prev->next = task;
} else {
// remove task->channel->scheduled from queue
task->channel->scheduled->prev->next = task->channel->scheduled->next;
task->channel->scheduled->next->prev = task->channel->scheduled->prev;
// promote priority and add task->channel->scheduled to task->channel->queue
task->channel->scheduled->priorScore++;
int32_t newPriority = VATASK_PIORITY(task->channel->scheduled);
task->channel->scheduled->next = &task->channel->queue[newPriority];
task->channel->scheduled->prev = task->channel->queue[newPriority].prev;
task->channel->scheduled->next->prev = task->channel->scheduled;
task->channel->scheduled->prev->next = task->channel->scheduled;
// add task to queue
task->channel->scheduled = task;
task->next = &async->queue[priority];
task->prev = async->queue[priority].prev;
task->next->prev = task;
task->prev->next = task;
}
taosThreadMutexUnlock(&async->mutex);
if (taskId != NULL) {
*taskId = id;
}
return 0;
}
int32_t vnodeAWait(SVAsync *async, int64_t taskId) {
if (async == NULL || taskId <= 0) {
return TSDB_CODE_INVALID_PARA;
}
SVATask *task = NULL;
SVATask task2 = {.taskId = taskId};
taosThreadMutexLock(&async->mutex);
vHashGet(async->taskTable, &task2, (void **)&task);
if (task) {
task->numWait++;
taosThreadCondWait(&task->waitCond, &async->mutex);
task->numWait--;
if (task->numWait == 0) {
taosThreadCondDestroy(&task->waitCond);
taosMemoryFree(task);
}
}
taosThreadMutexUnlock(&async->mutex);
return 0;
}
int32_t vnodeACancel(SVAsync *async, int64_t taskId) {
if (async == NULL) {
return TSDB_CODE_INVALID_PARA;
}
int32_t ret = 0;
SVATask *task = NULL;
SVATask task2 = {.taskId = taskId};
taosThreadMutexLock(&async->mutex);
vHashGet(async->taskTable, &task2, (void **)&task);
if (task) {
if (task->state == EVA_TASK_STATE_WAITTING) {
// remove from queue
task->next->prev = task->prev;
task->prev->next = task->next;
vnodeAsyncTaskDone(async, task);
} else {
ret = 0; // task is running, should return code TSDB_CODE_BUSY ??
}
}
taosThreadMutexUnlock(&async->mutex);
return ret;
}
int32_t vnodeAsyncSetWorkers(SVAsync *async, int32_t numWorkers) {
if (async == NULL || numWorkers <= 0 || numWorkers > VNODE_ASYNC_MAX_WORKERS) {
return TSDB_CODE_INVALID_PARA;
}
taosThreadMutexLock(&async->mutex);
async->numWorkers = numWorkers;
if (async->numIdleWorkers > 0) {
taosThreadCondBroadcast(&async->hasTask);
}
taosThreadMutexUnlock(&async->mutex);
return 0;
}
int32_t vnodeAChannelInit(SVAsync *async, int64_t *channelId) {
if (async == NULL || channelId == NULL) {
return TSDB_CODE_INVALID_PARA;
}
// create channel object
SVAChannel *channel = (SVAChannel *)taosMemoryMalloc(sizeof(SVAChannel));
if (channel == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
channel->state = EVA_CHANNEL_STATE_OPEN;
for (int32_t i = 0; i < EVA_PRIORITY_MAX; i++) {
channel->queue[i].next = &channel->queue[i];
channel->queue[i].prev = &channel->queue[i];
}
channel->scheduled = NULL;
// register channel
taosThreadMutexLock(&async->mutex);
channel->channelId = *channelId = ++async->nextChannelId;
// add to hash table
int32_t ret = vHashPut(async->channelTable, channel);
if (ret != 0) {
taosThreadMutexUnlock(&async->mutex);
taosMemoryFree(channel);
return ret;
}
// add to list
channel->next = &async->chList;
channel->prev = async->chList.prev;
channel->next->prev = channel;
channel->prev->next = channel;
async->numChannels++;
taosThreadMutexUnlock(&async->mutex);
return 0;
}
int32_t vnodeAChannelDestroy(SVAsync *async, int64_t channelId, bool waitRunning) {
if (async == NULL || channelId <= 0) {
return TSDB_CODE_INVALID_PARA;
}
SVAChannel *channel = NULL;
SVAChannel channel2 = {.channelId = channelId};
taosThreadMutexLock(&async->mutex);
vHashGet(async->channelTable, &channel2, (void **)&channel);
if (channel) {
// unregister channel
channel->next->prev = channel->prev;
channel->prev->next = channel->next;
vHashDrop(async->channelTable, channel);
async->numChannels--;
// cancel all waiting tasks
for (int32_t i = 0; i < EVA_PRIORITY_MAX; i++) {
while (channel->queue[i].next != &channel->queue[i]) {
SVATask *task = channel->queue[i].next;
task->prev->next = task->next;
task->next->prev = task->prev;
vnodeAsyncTaskDone(async, task);
}
}
// cancel or wait the scheduled task
if (channel->scheduled == NULL || channel->scheduled->state == EVA_TASK_STATE_WAITTING) {
if (channel->scheduled) {
channel->scheduled->prev->next = channel->scheduled->next;
channel->scheduled->next->prev = channel->scheduled->prev;
vnodeAsyncTaskDone(async, channel->scheduled);
}
taosMemoryFree(channel);
} else {
if (waitRunning) {
// wait task
SVATask *task = channel->scheduled;
task->numWait++;
taosThreadCondWait(&task->waitCond, &async->mutex);
task->numWait--;
if (task->numWait == 0) {
taosThreadCondDestroy(&task->waitCond);
taosMemoryFree(task);
}
taosMemoryFree(channel);
} else {
channel->state = EVA_CHANNEL_STATE_CLOSE;
}
}
} else {
taosThreadMutexUnlock(&async->mutex);
return TSDB_CODE_INVALID_PARA;
}
taosThreadMutexUnlock(&async->mutex);
return 0;
}

View File

@ -203,10 +203,8 @@ int vnodeSaveInfo(const char *dir, const SVnodeInfo *pInfo) {
// free info binary
taosMemoryFree(data);
vInfo("vgId:%d, vnode info is saved, fname:%s replica:%d selfIndex:%d changeVersion:%d",
pInfo->config.vgId, fname,
pInfo->config.syncCfg.replicaNum, pInfo->config.syncCfg.myIndex,
pInfo->config.syncCfg.changeVersion);
vInfo("vgId:%d, vnode info is saved, fname:%s replica:%d selfIndex:%d changeVersion:%d", pInfo->config.vgId, fname,
pInfo->config.syncCfg.replicaNum, pInfo->config.syncCfg.myIndex, pInfo->config.syncCfg.changeVersion);
return 0;
@ -289,7 +287,8 @@ static int32_t vnodePrepareCommit(SVnode *pVnode, SCommitInfo *pInfo) {
char dir[TSDB_FILENAME_LEN] = {0};
int64_t lastCommitted = pInfo->info.state.committed;
tsem_wait(&pVnode->canCommit);
// wait last commit task
vnodeAWait(vnodeAsyncHandle[0], pVnode->commitTask);
if (syncNodeGetConfig(pVnode->sync, &pVnode->config.syncCfg) != 0) goto _exit;
@ -379,12 +378,11 @@ static int32_t vnodeCommitTask(void *arg) {
vnodeReturnBufPool(pVnode);
_exit:
// end commit
tsem_post(&pVnode->canCommit);
taosMemoryFree(pInfo);
return code;
}
static void vnodeCompleteCommit(void *arg) { taosMemoryFree(arg); }
int vnodeAsyncCommit(SVnode *pVnode) {
int32_t code = 0;
@ -401,14 +399,14 @@ int vnodeAsyncCommit(SVnode *pVnode) {
}
// schedule the task
code = vnodeScheduleTask(vnodeCommitTask, pInfo);
code = vnodeAsyncC(vnodeAsyncHandle[0], pVnode->commitChannel, EVA_PRIORITY_HIGH, vnodeCommitTask,
vnodeCompleteCommit, pInfo, &pVnode->commitTask);
_exit:
if (code) {
if (NULL != pInfo) {
taosMemoryFree(pInfo);
}
tsem_post(&pVnode->canCommit);
vError("vgId:%d, %s failed since %s, commit id:%" PRId64, TD_VID(pVnode), __func__, tstrerror(code),
pVnode->state.commitID);
} else {
@ -420,8 +418,7 @@ _exit:
int vnodeSyncCommit(SVnode *pVnode) {
vnodeAsyncCommit(pVnode);
tsem_wait(&pVnode->canCommit);
tsem_post(&pVnode->canCommit);
vnodeAWait(vnodeAsyncHandle[0], pVnode->commitTask);
return 0;
}

View File

@ -0,0 +1,162 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "vnodeHash.h"
#define VNODE_HASH_DEFAULT_NUM_BUCKETS 1024
typedef struct SVHashEntry SVHashEntry;
struct SVHashEntry {
SVHashEntry* next;
void* obj;
};
struct SVHashTable {
uint32_t (*hash)(const void*);
int32_t (*compare)(const void*, const void*);
int32_t numEntries;
uint32_t numBuckets;
SVHashEntry** buckets;
};
static int32_t vHashRehash(SVHashTable* ht, uint32_t newNumBuckets) {
SVHashEntry** newBuckets = (SVHashEntry**)taosMemoryCalloc(newNumBuckets, sizeof(SVHashEntry*));
if (newBuckets == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
for (int32_t i = 0; i < ht->numBuckets; i++) {
SVHashEntry* entry = ht->buckets[i];
while (entry != NULL) {
SVHashEntry* next = entry->next;
uint32_t bucketIndex = ht->hash(entry->obj) % newNumBuckets;
entry->next = newBuckets[bucketIndex];
newBuckets[bucketIndex] = entry;
entry = next;
}
}
taosMemoryFree(ht->buckets);
ht->buckets = newBuckets;
ht->numBuckets = newNumBuckets;
return 0;
}
int32_t vHashInit(SVHashTable** ht, uint32_t (*hash)(const void*), int32_t (*compare)(const void*, const void*)) {
if (ht == NULL || hash == NULL || compare == NULL) {
return TSDB_CODE_INVALID_PARA;
}
(*ht) = (SVHashTable*)taosMemoryMalloc(sizeof(SVHashTable));
if (*ht == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
(*ht)->hash = hash;
(*ht)->compare = compare;
(*ht)->numEntries = 0;
(*ht)->numBuckets = VNODE_HASH_DEFAULT_NUM_BUCKETS;
(*ht)->buckets = (SVHashEntry**)taosMemoryCalloc((*ht)->numBuckets, sizeof(SVHashEntry*));
if ((*ht)->buckets == NULL) {
taosMemoryFree(*ht);
return TSDB_CODE_OUT_OF_MEMORY;
}
return 0;
}
int32_t vHashDestroy(SVHashTable** ht) {
if (ht == NULL) {
return TSDB_CODE_INVALID_PARA;
}
if (*ht) {
ASSERT((*ht)->numEntries == 0);
taosMemoryFree((*ht)->buckets);
taosMemoryFree(*ht);
(*ht) = NULL;
}
return 0;
}
int32_t vHashPut(SVHashTable* ht, void* obj) {
if (ht == NULL || obj == NULL) {
return TSDB_CODE_INVALID_PARA;
}
uint32_t bucketIndex = ht->hash(obj) % ht->numBuckets;
for (SVHashEntry* entry = ht->buckets[bucketIndex]; entry != NULL; entry = entry->next) {
if (ht->compare(entry->obj, obj) == 0) {
return TSDB_CODE_DUP_KEY;
}
}
if (ht->numEntries >= ht->numBuckets) {
vHashRehash(ht, ht->numBuckets * 2);
bucketIndex = ht->hash(obj) % ht->numBuckets;
}
SVHashEntry* entry = (SVHashEntry*)taosMemoryMalloc(sizeof(SVHashEntry));
if (entry == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
entry->obj = obj;
entry->next = ht->buckets[bucketIndex];
ht->buckets[bucketIndex] = entry;
ht->numEntries++;
return 0;
}
int32_t vHashGet(SVHashTable* ht, const void* obj, void** retObj) {
if (ht == NULL || obj == NULL || retObj == NULL) {
return TSDB_CODE_INVALID_PARA;
}
uint32_t bucketIndex = ht->hash(obj) % ht->numBuckets;
for (SVHashEntry* entry = ht->buckets[bucketIndex]; entry != NULL; entry = entry->next) {
if (ht->compare(entry->obj, obj) == 0) {
*retObj = entry->obj;
return 0;
}
}
*retObj = NULL;
return TSDB_CODE_NOT_FOUND;
}
int32_t vHashDrop(SVHashTable* ht, const void* obj) {
if (ht == NULL || obj == NULL) {
return TSDB_CODE_INVALID_PARA;
}
uint32_t bucketIndex = ht->hash(obj) % ht->numBuckets;
for (SVHashEntry** entry = &ht->buckets[bucketIndex]; *entry != NULL; entry = &(*entry)->next) {
if (ht->compare((*entry)->obj, obj) == 0) {
SVHashEntry* tmp = *entry;
*entry = (*entry)->next;
taosMemoryFree(tmp);
ht->numEntries--;
if (ht->numBuckets > VNODE_HASH_DEFAULT_NUM_BUCKETS && ht->numEntries < ht->numBuckets / 4) {
vHashRehash(ht, ht->numBuckets / 2);
}
return 0;
}
}
return TSDB_CODE_NOT_FOUND;
}

View File

@ -0,0 +1,37 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _VNODE_HAS_H_
#define _VNODE_HAS_H_
#include "vnd.h"
#ifdef __cplusplus
extern "C" {
#endif
typedef struct SVHashTable SVHashTable;
int32_t vHashInit(SVHashTable** ht, uint32_t (*hash)(const void*), int32_t (*compare)(const void*, const void*));
int32_t vHashDestroy(SVHashTable** ht);
int32_t vHashPut(SVHashTable* ht, void* obj);
int32_t vHashGet(SVHashTable* ht, const void* obj, void** retObj);
int32_t vHashDrop(SVHashTable* ht, const void* obj);
#ifdef __cplusplus
}
#endif
#endif /*_VNODE_HAS_H_*/

View File

@ -16,65 +16,25 @@
#include "cos.h"
#include "vnd.h"
typedef struct SVnodeTask SVnodeTask;
struct SVnodeTask {
SVnodeTask* next;
SVnodeTask* prev;
int (*execute)(void*);
void* arg;
};
static volatile int32_t VINIT = 0;
typedef struct {
int nthreads;
TdThread* threads;
TdThreadMutex mutex;
TdThreadCond hasTask;
SVnodeTask queue;
} SVnodeThreadPool;
struct SVnodeGlobal {
int8_t init;
int8_t stop;
SVnodeThreadPool tp[2];
};
struct SVnodeGlobal vnodeGlobal;
static void* loop(void* arg);
SVAsync* vnodeAsyncHandle[2];
int vnodeInit(int nthreads) {
int8_t init;
int ret;
int32_t init;
init = atomic_val_compare_exchange_8(&(vnodeGlobal.init), 0, 1);
init = atomic_val_compare_exchange_32(&VINIT, 0, 1);
if (init) {
return 0;
}
vnodeGlobal.stop = 0;
for (int32_t i = 0; i < ARRAY_SIZE(vnodeGlobal.tp); i++) {
taosThreadMutexInit(&vnodeGlobal.tp[i].mutex, NULL);
taosThreadCondInit(&vnodeGlobal.tp[i].hasTask, NULL);
// vnode-commit
vnodeAsyncInit(&vnodeAsyncHandle[0], "vnode-commit");
vnodeAsyncSetWorkers(vnodeAsyncHandle[0], nthreads);
taosThreadMutexLock(&vnodeGlobal.tp[i].mutex);
vnodeGlobal.tp[i].queue.next = &vnodeGlobal.tp[i].queue;
vnodeGlobal.tp[i].queue.prev = &vnodeGlobal.tp[i].queue;
taosThreadMutexUnlock(&(vnodeGlobal.tp[i].mutex));
vnodeGlobal.tp[i].nthreads = nthreads;
vnodeGlobal.tp[i].threads = taosMemoryCalloc(nthreads, sizeof(TdThread));
if (vnodeGlobal.tp[i].threads == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
vError("failed to init vnode module since:%s", tstrerror(terrno));
return -1;
}
for (int j = 0; j < nthreads; j++) {
taosThreadCreate(&(vnodeGlobal.tp[i].threads[j]), NULL, loop, &vnodeGlobal.tp[i]);
}
}
// vnode-merge
vnodeAsyncInit(&vnodeAsyncHandle[1], "vnode-merge");
vnodeAsyncSetWorkers(vnodeAsyncHandle[1], nthreads);
if (walInit() < 0) {
return -1;
@ -90,99 +50,15 @@ int vnodeInit(int nthreads) {
}
void vnodeCleanup() {
int8_t init;
init = atomic_val_compare_exchange_8(&(vnodeGlobal.init), 1, 0);
int32_t init = atomic_val_compare_exchange_32(&VINIT, 1, 0);
if (init == 0) return;
// set stop
vnodeGlobal.stop = 1;
for (int32_t i = 0; i < ARRAY_SIZE(vnodeGlobal.tp); i++) {
taosThreadMutexLock(&(vnodeGlobal.tp[i].mutex));
taosThreadCondBroadcast(&(vnodeGlobal.tp[i].hasTask));
taosThreadMutexUnlock(&(vnodeGlobal.tp[i].mutex));
// wait for threads
for (int j = 0; j < vnodeGlobal.tp[i].nthreads; j++) {
taosThreadJoin(vnodeGlobal.tp[i].threads[j], NULL);
}
// clear source
taosMemoryFreeClear(vnodeGlobal.tp[i].threads);
taosThreadCondDestroy(&(vnodeGlobal.tp[i].hasTask));
taosThreadMutexDestroy(&(vnodeGlobal.tp[i].mutex));
}
vnodeAsyncDestroy(&vnodeAsyncHandle[0]);
vnodeAsyncDestroy(&vnodeAsyncHandle[1]);
walCleanUp();
tqCleanUp();
smaCleanUp();
s3CleanUp();
}
int vnodeScheduleTaskEx(int tpid, int (*execute)(void*), void* arg) {
SVnodeTask* pTask;
ASSERT(!vnodeGlobal.stop);
pTask = taosMemoryMalloc(sizeof(*pTask));
if (pTask == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pTask->execute = execute;
pTask->arg = arg;
taosThreadMutexLock(&(vnodeGlobal.tp[tpid].mutex));
pTask->next = &vnodeGlobal.tp[tpid].queue;
pTask->prev = vnodeGlobal.tp[tpid].queue.prev;
vnodeGlobal.tp[tpid].queue.prev->next = pTask;
vnodeGlobal.tp[tpid].queue.prev = pTask;
taosThreadCondSignal(&(vnodeGlobal.tp[tpid].hasTask));
taosThreadMutexUnlock(&(vnodeGlobal.tp[tpid].mutex));
return 0;
}
int vnodeScheduleTask(int (*execute)(void*), void* arg) { return vnodeScheduleTaskEx(0, execute, arg); }
/* ------------------------ STATIC METHODS ------------------------ */
static void* loop(void* arg) {
SVnodeThreadPool* tp = (SVnodeThreadPool*)arg;
SVnodeTask* pTask;
int ret;
if (tp == &vnodeGlobal.tp[0]) {
setThreadName("vnode-commit");
} else if (tp == &vnodeGlobal.tp[1]) {
setThreadName("vnode-merge");
}
for (;;) {
taosThreadMutexLock(&(tp->mutex));
for (;;) {
pTask = tp->queue.next;
if (pTask == &tp->queue) {
// no task
if (vnodeGlobal.stop) {
taosThreadMutexUnlock(&(tp->mutex));
return NULL;
} else {
taosThreadCondWait(&(tp->hasTask), &(tp->mutex));
}
} else {
// has task
pTask->prev->next = pTask->next;
pTask->next->prev = pTask->prev;
break;
}
}
taosThreadMutexUnlock(&(tp->mutex));
pTask->execute(pTask->arg);
taosMemoryFree(pTask);
}
return NULL;
}

View File

@ -129,8 +129,8 @@ int32_t vnodeAlterReplica(const char *path, SAlterVnodeReplicaReq *pReq, int32_t
}
pCfg->changeVersion = pReq->changeVersion;
vInfo("vgId:%d, save config while alter, replicas:%d totalReplicas:%d selfIndex:%d changeVersion:%d",
pReq->vgId, pCfg->replicaNum, pCfg->totalReplicaNum, pCfg->myIndex, pCfg->changeVersion);
vInfo("vgId:%d, save config while alter, replicas:%d totalReplicas:%d selfIndex:%d changeVersion:%d", pReq->vgId,
pCfg->replicaNum, pCfg->totalReplicaNum, pCfg->myIndex, pCfg->changeVersion);
info.config.syncCfg = *pCfg;
ret = vnodeSaveInfo(dir, &info);
@ -396,10 +396,14 @@ SVnode *vnodeOpen(const char *path, int32_t diskPrimary, STfs *pTfs, SMsgCb msgC
pVnode->blocked = false;
tsem_init(&pVnode->syncSem, 0, 0);
tsem_init(&(pVnode->canCommit), 0, 1);
taosThreadMutexInit(&pVnode->mutex, NULL);
taosThreadCondInit(&pVnode->poolNotEmpty, NULL);
if (vnodeAChannelInit(vnodeAsyncHandle[0], &pVnode->commitChannel) != 0) {
vError("vgId:%d, failed to init commit channel", TD_VID(pVnode));
goto _err;
}
int8_t rollback = vnodeShouldRollback(pVnode);
// open buffer pool
@ -487,7 +491,6 @@ _err:
if (pVnode->pMeta) metaClose(&pVnode->pMeta);
if (pVnode->freeList) vnodeCloseBufPool(pVnode);
tsem_destroy(&(pVnode->canCommit));
taosMemoryFree(pVnode);
return NULL;
}
@ -501,7 +504,8 @@ void vnodePostClose(SVnode *pVnode) { vnodeSyncPostClose(pVnode); }
void vnodeClose(SVnode *pVnode) {
if (pVnode) {
tsem_wait(&pVnode->canCommit);
vnodeAWait(vnodeAsyncHandle[0], pVnode->commitTask);
vnodeAChannelDestroy(vnodeAsyncHandle[0], pVnode->commitChannel, true);
vnodeSyncClose(pVnode);
vnodeQueryClose(pVnode);
tqClose(pVnode->pTq);
@ -510,10 +514,8 @@ void vnodeClose(SVnode *pVnode) {
smaClose(pVnode->pSma);
if (pVnode->pMeta) metaClose(&pVnode->pMeta);
vnodeCloseBufPool(pVnode);
tsem_post(&pVnode->canCommit);
// destroy handle
tsem_destroy(&(pVnode->canCommit));
tsem_destroy(&pVnode->syncSem);
taosThreadCondDestroy(&pVnode->poolNotEmpty);
taosThreadMutexDestroy(&pVnode->mutex);

View File

@ -13,8 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "vnd.h"
#include "tsdb.h"
#include "vnd.h"
// SVSnapReader ========================================================
struct SVSnapReader {
@ -519,6 +519,8 @@ _out:
return code;
}
extern int32_t tsdbCancelAllBgTask(STsdb *tsdb);
int32_t vnodeSnapWriterOpen(SVnode *pVnode, SSnapshotParam *pParam, SVSnapWriter **ppWriter) {
int32_t code = 0;
SVSnapWriter *pWriter = NULL;
@ -526,8 +528,8 @@ int32_t vnodeSnapWriterOpen(SVnode *pVnode, SSnapshotParam *pParam, SVSnapWriter
int64_t ever = pParam->end;
// commit memory data
vnodeAsyncCommit(pVnode);
tsem_wait(&pVnode->canCommit);
vnodeSyncCommit(pVnode);
tsdbCancelAllBgTask(pVnode->pTsdb);
// alloc
pWriter = (SVSnapWriter *)taosMemoryCalloc(1, sizeof(*pWriter));
@ -657,7 +659,6 @@ _exit:
vInfo("vgId:%d, vnode snapshot writer closed, rollback:%d", TD_VID(pVnode), rollback);
taosMemoryFree(pWriter);
}
tsem_post(&pVnode->canCommit);
return code;
}