diff --git a/source/dnode/vnode/CMakeLists.txt b/source/dnode/vnode/CMakeLists.txt index 9aeb14cd60..114051f02b 100644 --- a/source/dnode/vnode/CMakeLists.txt +++ b/source/dnode/vnode/CMakeLists.txt @@ -14,6 +14,8 @@ set( "src/vnd/vnodeSnapshot.c" "src/vnd/vnodeRetention.c" "src/vnd/vnodeInitApi.c" + "src/vnd/vnodeAsync.c" + "src/vnd/vnodeHash.c" # meta "src/meta/metaOpen.c" diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index ca9d22a987..88362239f5 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -309,7 +309,12 @@ int32_t tsdbTakeReadSnap2(STsdbReader *pReader, _query_reseek_func_t reseek, STs void tsdbUntakeReadSnap2(STsdbReader *pReader, STsdbReadSnap *pSnap, bool proactive); // tsdbMerge.c ============================================================================================== -int32_t tsdbSchedMerge(STsdb *tsdb, int32_t fid); +typedef struct { + STsdb *tsdb; + int32_t fid; +} SMergeArg; + +int32_t tsdbMerge(void *arg); // tsdbDiskData ============================================================================================== int32_t tDiskDataBuilderCreate(SDiskDataBuilder **ppBuilder); diff --git a/source/dnode/vnode/src/inc/vnd.h b/source/dnode/vnode/src/inc/vnd.h index 55b62dfe48..4036200d73 100644 --- a/source/dnode/vnode/src/inc/vnd.h +++ b/source/dnode/vnode/src/inc/vnd.h @@ -48,9 +48,32 @@ int32_t vnodeCheckCfg(const SVnodeCfg*); int32_t vnodeEncodeConfig(const void* pObj, SJson* pJson); int32_t vnodeDecodeConfig(const SJson* pJson, void* pObj); +// vnodeAsync.c +typedef struct SVAsync SVAsync; + +typedef enum { + EVA_PRIORITY_HIGH = 0, + EVA_PRIORITY_NORMAL, + EVA_PRIORITY_LOW, +} EVAPriority; + +#define VNODE_ASYNC_VALID_CHANNEL_ID(channelId) ((channelId) > 0) +#define VNODE_ASYNC_VALID_TASK_ID(taskId) ((taskId) > 0) + +int32_t vnodeAsyncInit(SVAsync** async, char* label); +int32_t vnodeAsyncDestroy(SVAsync** async); +int32_t vnodeAChannelInit(SVAsync* async, int64_t* channelId); +int32_t vnodeAChannelDestroy(SVAsync* async, int64_t channelId, bool waitRunning); +int32_t vnodeAsync(SVAsync* async, EVAPriority priority, int32_t (*execute)(void*), void (*complete)(void*), void* arg, + int64_t* taskId); +int32_t vnodeAsyncC(SVAsync* async, int64_t channelId, EVAPriority priority, int32_t (*execute)(void*), + void (*complete)(void*), void* arg, int64_t* taskId); +int32_t vnodeAWait(SVAsync* async, int64_t taskId); +int32_t vnodeACancel(SVAsync* async, int64_t taskId); +int32_t vnodeAsyncSetWorkers(SVAsync* async, int32_t numWorkers); + // vnodeModule.c -int vnodeScheduleTask(int (*execute)(void*), void* arg); -int vnodeScheduleTaskEx(int tpid, int (*execute)(void*), void* arg); +extern SVAsync* vnodeAsyncHandle[2]; // vnodeBufPool.c typedef struct SVBufPoolNode SVBufPoolNode; @@ -110,7 +133,7 @@ int32_t vnodeAsyncCommit(SVnode* pVnode); bool vnodeShouldRollback(SVnode* pVnode); // vnodeSync.c -int32_t vnodeSyncOpen(SVnode *pVnode, char *path, int32_t vnodeVersion); +int32_t vnodeSyncOpen(SVnode* pVnode, char* path, int32_t vnodeVersion); int32_t vnodeSyncStart(SVnode* pVnode); void vnodeSyncPreClose(SVnode* pVnode); void vnodeSyncPostClose(SVnode* pVnode); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index fde5f04401..8a4cbb5fd0 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -213,7 +213,7 @@ int32_t tsdbBegin(STsdb* pTsdb); // int32_t tsdbCommit(STsdb* pTsdb, SCommitInfo* pInfo); int32_t tsdbCacheCommit(STsdb* pTsdb); int32_t tsdbCompact(STsdb* pTsdb, SCompactInfo* pInfo); -int32_t tsdbRetention(STsdb *tsdb, int64_t now, int32_t sync); +int32_t tsdbRetention(STsdb* tsdb, int64_t now, int32_t sync); // int32_t tsdbFinishCommit(STsdb* pTsdb); // int32_t tsdbRollbackCommit(STsdb* pTsdb); int tsdbScanAndConvertSubmitMsg(STsdb* pTsdb, SSubmitReq2* pMsg); @@ -452,13 +452,16 @@ struct SVnode { SVBufPool* recycleTail; SVBufPool* onRecycle; + // commit variables + int64_t commitChannel; + int64_t commitTask; + SMeta* pMeta; SSma* pSma; STsdb* pTsdb; SWal* pWal; STQ* pTq; SSink* pSink; - tsem_t canCommit; int64_t sync; TdThreadMutex lock; bool blocked; @@ -498,18 +501,18 @@ struct SSma { void* pRSmaEnv; }; -#define SMA_CFG(s) (&(s)->pVnode->config) -#define SMA_TSDB_CFG(s) (&(s)->pVnode->config.tsdbCfg) -#define SMA_RETENTION(s) ((SRetention*)&(s)->pVnode->config.tsdbCfg.retentions) -#define SMA_LOCKED(s) ((s)->locked) -#define SMA_META(s) ((s)->pVnode->pMeta) -#define SMA_VID(s) TD_VID((s)->pVnode) -#define SMA_TFS(s) ((s)->pVnode->pTfs) -#define SMA_TSMA_ENV(s) ((s)->pTSmaEnv) -#define SMA_RSMA_ENV(s) ((s)->pRSmaEnv) -#define SMA_RSMA_TSDB0(s) ((s)->pVnode->pTsdb) -#define SMA_RSMA_TSDB1(s) ((s)->pRSmaTsdb[TSDB_RETENTION_L0]) -#define SMA_RSMA_TSDB2(s) ((s)->pRSmaTsdb[TSDB_RETENTION_L1]) +#define SMA_CFG(s) (&(s)->pVnode->config) +#define SMA_TSDB_CFG(s) (&(s)->pVnode->config.tsdbCfg) +#define SMA_RETENTION(s) ((SRetention*)&(s)->pVnode->config.tsdbCfg.retentions) +#define SMA_LOCKED(s) ((s)->locked) +#define SMA_META(s) ((s)->pVnode->pMeta) +#define SMA_VID(s) TD_VID((s)->pVnode) +#define SMA_TFS(s) ((s)->pVnode->pTfs) +#define SMA_TSMA_ENV(s) ((s)->pTSmaEnv) +#define SMA_RSMA_ENV(s) ((s)->pRSmaEnv) +#define SMA_RSMA_TSDB0(s) ((s)->pVnode->pTsdb) +#define SMA_RSMA_TSDB1(s) ((s)->pRSmaTsdb[TSDB_RETENTION_L0]) +#define SMA_RSMA_TSDB2(s) ((s)->pRSmaTsdb[TSDB_RETENTION_L1]) #define SMA_RSMA_GET_TSDB(pVnode, level) ((level == 0) ? pVnode->pTsdb : pVnode->pSma->pRSmaTsdb[level - 1]) // sma diff --git a/source/dnode/vnode/src/tsdb/tsdbFS2.c b/source/dnode/vnode/src/tsdb/tsdbFS2.c index 89bed6b42f..add8da52e0 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFS2.c +++ b/source/dnode/vnode/src/tsdb/tsdbFS2.c @@ -20,8 +20,6 @@ #define BLOCK_COMMIT_FACTOR 3 -extern int vnodeScheduleTask(int (*execute)(void *), void *arg); -extern int vnodeScheduleTaskEx(int tpid, int (*execute)(void *), void *arg); extern void remove_file(const char *fname, bool last_level); #define TSDB_FS_EDIT_MIN TSDB_FEDIT_COMMIT @@ -651,7 +649,6 @@ _exit: static int32_t close_file_system(STFileSystem *fs) { TARRAY2_CLEAR(fs->fSetArr, tsdbTFileSetClear); TARRAY2_CLEAR(fs->fSetArrTmp, tsdbTFileSetClear); - // TODO return 0; } @@ -748,36 +745,31 @@ _exit: return code; } -static void tsdbDoWaitBgTask(STFileSystem *fs, STFSBgTask *task) { - task->numWait++; - taosThreadCondWait(task->done, &fs->tsdb->mutex); - task->numWait--; +int32_t tsdbFSCancelAllBgTask(STFileSystem *fs) { + TARRAY2(int64_t) channelArr = {0}; - if (task->numWait == 0) { - taosThreadCondDestroy(task->done); - if (task->destroy) { - task->destroy(task->arg); + // collect all open channels + taosThreadMutexLock(&fs->tsdb->mutex); + STFileSet *fset; + TARRAY2_FOREACH(fs->fSetArr, fset) { + if (VNODE_ASYNC_VALID_CHANNEL_ID(fset->bgTaskChannel)) { + TARRAY2_APPEND(&channelArr, fset->bgTaskChannel); + fset->bgTaskChannel = 0; } - taosMemoryFree(task); } -} + taosThreadMutexUnlock(&fs->tsdb->mutex); -static void tsdbDoDoneBgTask(STFileSystem *fs, STFSBgTask *task) { - if (task->numWait > 0) { - taosThreadCondBroadcast(task->done); - } else { - taosThreadCondDestroy(task->done); - if (task->destroy) { - task->destroy(task->arg); - } - taosMemoryFree(task); - } + // destroy all channels + int64_t channel; + TARRAY2_FOREACH(&channelArr, channel) { vnodeAChannelDestroy(vnodeAsyncHandle[1], channel, true); } + TARRAY2_DESTROY(&channelArr, NULL); + return 0; } int32_t tsdbCloseFS(STFileSystem **fs) { if (fs[0] == NULL) return 0; - tsdbFSDisableBgTask(fs[0]); + tsdbFSCancelAllBgTask(*fs); close_file_system(fs[0]); destroy_fs(fs); return 0; @@ -910,7 +902,20 @@ int32_t tsdbFSEditCommit(STFileSystem *fs) { } if (!skipMerge) { - code = tsdbSchedMerge(fs->tsdb, fset->fid); + code = tsdbTFileSetOpenChannel(fset); + TSDB_CHECK_CODE(code, lino, _exit); + + SMergeArg *arg = taosMemoryMalloc(sizeof(*arg)); + if (arg == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + + arg->tsdb = fs->tsdb; + arg->fid = fset->fid; + + code = vnodeAsyncC(vnodeAsyncHandle[1], fset->bgTaskChannel, EVA_PRIORITY_HIGH, tsdbMerge, taosMemoryFree, + arg, NULL); TSDB_CHECK_CODE(code, lino, _exit); } } @@ -939,7 +944,11 @@ int32_t tsdbFSEditCommit(STFileSystem *fs) { } } - if (tsdbTFileSetIsEmpty(fset) && fset->bgTaskRunning == NULL) { + if (tsdbTFileSetIsEmpty(fset)) { + if (VNODE_ASYNC_VALID_CHANNEL_ID(fset->bgTaskChannel)) { + vnodeAChannelDestroy(vnodeAsyncHandle[1], fset->bgTaskChannel, false); + fset->bgTaskChannel = 0; + } TARRAY2_REMOVE(fs->fSetArr, i, tsdbTFileSetClear); } else { i++; @@ -1179,136 +1188,4 @@ _out: pHash = NULL; } return code; -} - -const char *gFSBgTaskName[] = {NULL, "MERGE", "RETENTION", "COMPACT"}; - -static int32_t tsdbFSRunBgTask(void *arg) { - STFSBgTask *task = (STFSBgTask *)arg; - STFileSystem *fs = task->fs; - - task->launchTime = taosGetTimestampMs(); - task->run(task->arg); - task->finishTime = taosGetTimestampMs(); - - tsdbDebug("vgId:%d bg task:%s task id:%" PRId64 " finished, schedule time:%" PRId64 " launch time:%" PRId64 - " finish time:%" PRId64, - TD_VID(fs->tsdb->pVnode), gFSBgTaskName[task->type], task->taskid, task->scheduleTime, task->launchTime, - task->finishTime); - - taosThreadMutexLock(&fs->tsdb->mutex); - - STFileSet *fset = NULL; - tsdbFSGetFSet(fs, task->fid, &fset); - ASSERT(fset != NULL && fset->bgTaskRunning == task); - - // free last - tsdbDoDoneBgTask(fs, task); - fset->bgTaskRunning = NULL; - - // schedule next - if (fset->bgTaskNum > 0) { - if (fs->stop) { - while (fset->bgTaskNum > 0) { - STFSBgTask *nextTask = fset->bgTaskQueue->next; - nextTask->prev->next = nextTask->next; - nextTask->next->prev = nextTask->prev; - fset->bgTaskNum--; - tsdbDoDoneBgTask(fs, nextTask); - } - } else { - // pop task from head - fset->bgTaskRunning = fset->bgTaskQueue->next; - fset->bgTaskRunning->prev->next = fset->bgTaskRunning->next; - fset->bgTaskRunning->next->prev = fset->bgTaskRunning->prev; - fset->bgTaskNum--; - vnodeScheduleTaskEx(1, tsdbFSRunBgTask, fset->bgTaskRunning); - } - } - - taosThreadMutexUnlock(&fs->tsdb->mutex); - return 0; -} - -// IMPORTANT: the caller must hold the fs->tsdb->mutex -int32_t tsdbFSScheduleBgTask(STFileSystem *fs, int32_t fid, EFSBgTaskT type, int32_t (*run)(void *), - void (*destroy)(void *), void *arg, int64_t *taskid) { - if (fs->stop) { - if (destroy) { - destroy(arg); - } - return 0; - } - - STFileSet *fset; - tsdbFSGetFSet(fs, fid, &fset); - - ASSERT(fset != NULL); - - for (STFSBgTask *task = fset->bgTaskQueue->next; task != fset->bgTaskQueue; task = task->next) { - if (task->type == type) { - if (destroy) { - destroy(arg); - } - return 0; - } - } - - // do schedule task - STFSBgTask *task = taosMemoryCalloc(1, sizeof(STFSBgTask)); - if (task == NULL) return TSDB_CODE_OUT_OF_MEMORY; - taosThreadCondInit(task->done, NULL); - - task->fs = fs; - task->fid = fid; - task->type = type; - task->run = run; - task->destroy = destroy; - task->arg = arg; - task->scheduleTime = taosGetTimestampMs(); - task->taskid = ++fs->taskid; - - if (fset->bgTaskRunning == NULL && fset->bgTaskNum == 0) { - // launch task directly - fset->bgTaskRunning = task; - vnodeScheduleTaskEx(1, tsdbFSRunBgTask, task); - } else { - // add to the queue tail - fset->bgTaskNum++; - task->next = fset->bgTaskQueue; - task->prev = fset->bgTaskQueue->prev; - task->prev->next = task; - task->next->prev = task; - } - - if (taskid) *taskid = task->taskid; - return 0; -} - -int32_t tsdbFSDisableBgTask(STFileSystem *fs) { - taosThreadMutexLock(&fs->tsdb->mutex); - for (;;) { - fs->stop = true; - bool done = true; - - STFileSet *fset; - TARRAY2_FOREACH(fs->fSetArr, fset) { - if (fset->bgTaskRunning) { - tsdbDoWaitBgTask(fs, fset->bgTaskRunning); - done = false; - break; - } - } - - if (done) break; - } - taosThreadMutexUnlock(&fs->tsdb->mutex); - return 0; -} - -int32_t tsdbFSEnableBgTask(STFileSystem *fs) { - taosThreadMutexLock(&fs->tsdb->mutex); - fs->stop = false; - taosThreadMutexUnlock(&fs->tsdb->mutex); - return 0; -} +} \ No newline at end of file diff --git a/source/dnode/vnode/src/tsdb/tsdbFS2.h b/source/dnode/vnode/src/tsdb/tsdbFS2.h index a3a8e2f575..74453126cf 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFS2.h +++ b/source/dnode/vnode/src/tsdb/tsdbFS2.h @@ -55,11 +55,6 @@ int64_t tsdbFSAllocEid(STFileSystem *fs); int32_t tsdbFSEditBegin(STFileSystem *fs, const TFileOpArray *opArray, EFEditT etype); int32_t tsdbFSEditCommit(STFileSystem *fs); int32_t tsdbFSEditAbort(STFileSystem *fs); -// background task -int32_t tsdbFSScheduleBgTask(STFileSystem *fs, int32_t fid, EFSBgTaskT type, int32_t (*run)(void *), - void (*destroy)(void *), void *arg, int64_t *taskid); -int32_t tsdbFSDisableBgTask(STFileSystem *fs); -int32_t tsdbFSEnableBgTask(STFileSystem *fs); // other int32_t tsdbFSGetFSet(STFileSystem *fs, int32_t fid, STFileSet **fset); int32_t tsdbFSCheckCommit(STsdb *tsdb, int32_t fid); diff --git a/source/dnode/vnode/src/tsdb/tsdbFSet2.c b/source/dnode/vnode/src/tsdb/tsdbFSet2.c index 61bedcb996..025671ff3d 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFSet2.c +++ b/source/dnode/vnode/src/tsdb/tsdbFSet2.c @@ -14,6 +14,7 @@ */ #include "tsdbFSet2.h" +#include "vnd.h" int32_t tsdbSttLvlInit(int32_t level, SSttLvl **lvl) { if (!(lvl[0] = taosMemoryMalloc(sizeof(SSttLvl)))) return TSDB_CODE_OUT_OF_MEMORY; @@ -451,10 +452,7 @@ int32_t tsdbTFileSetInit(int32_t fid, STFileSet **fset) { TARRAY2_INIT(fset[0]->lvlArr); // background task queue - fset[0]->bgTaskNum = 0; - fset[0]->bgTaskQueue->next = fset[0]->bgTaskQueue; - fset[0]->bgTaskQueue->prev = fset[0]->bgTaskQueue; - fset[0]->bgTaskRunning = NULL; + fset[0]->bgTaskChannel = 0; // block commit variables taosThreadCondInit(&fset[0]->canCommit, NULL); @@ -650,3 +648,8 @@ bool tsdbTFileSetIsEmpty(const STFileSet *fset) { } return TARRAY2_SIZE(fset->lvlArr) == 0; } + +int32_t tsdbTFileSetOpenChannel(STFileSet *fset) { + if (VNODE_ASYNC_VALID_CHANNEL_ID(fset->bgTaskChannel)) return 0; + return vnodeAChannelInit(vnodeAsyncHandle[1], &fset->bgTaskChannel); +} \ No newline at end of file diff --git a/source/dnode/vnode/src/tsdb/tsdbFSet2.h b/source/dnode/vnode/src/tsdb/tsdbFSet2.h index 34f174ade7..32028db352 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFSet2.h +++ b/source/dnode/vnode/src/tsdb/tsdbFSet2.h @@ -22,14 +22,12 @@ extern "C" { #endif -typedef struct STFileSet STFileSet; -typedef struct STFileOp STFileOp; -typedef struct SSttLvl SSttLvl; +typedef struct STFileOp STFileOp; +typedef struct SSttLvl SSttLvl; typedef TARRAY2(STFileObj *) TFileObjArray; typedef TARRAY2(SSttLvl *) TSttLvlArray; typedef TARRAY2(STFileOp) TFileOpArray; typedef struct STFileSystem STFileSystem; -typedef struct STFSBgTask STFSBgTask; typedef enum { TSDB_FOP_NONE = 0, @@ -72,33 +70,8 @@ bool tsdbTFileSetIsEmpty(const STFileSet *fset); // stt int32_t tsdbSttLvlInit(int32_t level, SSttLvl **lvl); int32_t tsdbSttLvlClear(SSttLvl **lvl); - -typedef enum { - TSDB_BG_TASK_MERGER = 1, - TSDB_BG_TASK_RETENTION, - TSDB_BG_TASK_COMPACT, -} EFSBgTaskT; - -struct STFSBgTask { - STFileSystem *fs; - int32_t fid; - - EFSBgTaskT type; - int32_t (*run)(void *arg); - void (*destroy)(void *arg); - void *arg; - - TdThreadCond done[1]; - int32_t numWait; - - int64_t taskid; - int64_t scheduleTime; - int64_t launchTime; - int64_t finishTime; - - struct STFSBgTask *prev; - struct STFSBgTask *next; -}; +// open channel +int32_t tsdbTFileSetOpenChannel(STFileSet *fset); struct STFileOp { tsdb_fop_t optype; @@ -118,10 +91,8 @@ struct STFileSet { STFileObj *farr[TSDB_FTYPE_MAX]; // file array TSttLvlArray lvlArr[1]; // level array - // background task queue - int32_t bgTaskNum; - STFSBgTask bgTaskQueue[1]; - STFSBgTask *bgTaskRunning; + // background task channel + int64_t bgTaskChannel; // block commit variables TdThreadCond canCommit; diff --git a/source/dnode/vnode/src/tsdb/tsdbMerge.c b/source/dnode/vnode/src/tsdb/tsdbMerge.c index 6d968d0828..b47b951b2b 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMerge.c +++ b/source/dnode/vnode/src/tsdb/tsdbMerge.c @@ -17,11 +17,6 @@ #define TSDB_MAX_LEVEL 2 // means max level is 3 -typedef struct { - STsdb *tsdb; - int32_t fid; -} SMergeArg; - typedef struct { STsdb *tsdb; int32_t fid; @@ -528,7 +523,7 @@ static int32_t tsdbMergeGetFSet(SMerger *merger) { return 0; } -static int32_t tsdbMerge(void *arg) { +int32_t tsdbMerge(void *arg) { int32_t code = 0; int32_t lino = 0; SMergeArg *mergeArg = (SMergeArg *)arg; @@ -597,18 +592,3 @@ _exit: tsdbTFileSetClear(&merger->fset); return code; } - -int32_t tsdbSchedMerge(STsdb *tsdb, int32_t fid) { - SMergeArg *arg = taosMemoryMalloc(sizeof(*arg)); - if (arg == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; - } - - arg->tsdb = tsdb; - arg->fid = fid; - - int32_t code = tsdbFSScheduleBgTask(tsdb->pFS, fid, TSDB_BG_TASK_MERGER, tsdbMerge, taosMemoryFree, arg, NULL); - if (code) taosMemoryFree(arg); - - return code; -} diff --git a/source/dnode/vnode/src/tsdb/tsdbRetention.c b/source/dnode/vnode/src/tsdb/tsdbRetention.c index f6888ba9cb..d8f1ad7c6c 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRetention.c +++ b/source/dnode/vnode/src/tsdb/tsdbRetention.c @@ -249,7 +249,7 @@ _exit: if (code) { TSDB_ERROR_LOG(TD_VID(rtner->tsdb->pVnode), lino, code); } else { - tsdbInfo("vid:%d, cid:%" PRId64 ", %s done", TD_VID(rtner->tsdb->pVnode), rtner->cid, __func__); + tsdbDebug("vid:%d, cid:%" PRId64 ", %s done", TD_VID(rtner->tsdb->pVnode), rtner->cid, __func__); } return code; } @@ -279,7 +279,7 @@ _exit: if (code) { TSDB_ERROR_LOG(TD_VID(rtner->tsdb->pVnode), lino, code); } else { - tsdbInfo("vid:%d, cid:%" PRId64 ", %s done", TD_VID(rtner->tsdb->pVnode), rtner->cid, __func__); + tsdbDebug("vid:%d, cid:%" PRId64 ", %s done", TD_VID(rtner->tsdb->pVnode), rtner->cid, __func__); } tsdbFSDestroyCopySnapshot(&rtner->fsetArr); return code; @@ -391,32 +391,6 @@ _exit: static void tsdbFreeRtnArg(void *arg) { taosMemoryFree(arg); } -static int32_t tsdbDoRetentionSync(void *arg) { - int32_t code = 0; - int32_t lino = 0; - SRTNer rtner[1] = {0}; - - code = tsdbDoRetentionBegin(arg, rtner); - TSDB_CHECK_CODE(code, lino, _exit); - - STFileSet *fset; - TARRAY2_FOREACH(rtner->fsetArr, fset) { - code = tsdbDoRetentionOnFileSet(rtner, fset); - TSDB_CHECK_CODE(code, lino, _exit); - } - - code = tsdbDoRetentionEnd(rtner); - TSDB_CHECK_CODE(code, lino, _exit); - -_exit: - if (code) { - TSDB_ERROR_LOG(TD_VID(rtner->tsdb->pVnode), lino, code); - } - tsem_post(&((SRtnArg *)arg)->tsdb->pVnode->canCommit); - tsdbFreeRtnArg(arg); - return code; -} - static int32_t tsdbDoRetentionAsync(void *arg) { int32_t code = 0; int32_t lino = 0; @@ -454,49 +428,41 @@ _exit: int32_t tsdbRetention(STsdb *tsdb, int64_t now, int32_t sync) { int32_t code = 0; - if (sync) { // sync retention + taosThreadMutexLock(&tsdb->mutex); + + STFileSet *fset; + TARRAY2_FOREACH(tsdb->pFS->fSetArr, fset) { + code = tsdbTFileSetOpenChannel(fset); + if (code) { + taosThreadMutexUnlock(&tsdb->mutex); + return code; + } + SRtnArg *arg = taosMemoryMalloc(sizeof(*arg)); if (arg == NULL) { + taosThreadMutexUnlock(&tsdb->mutex); return TSDB_CODE_OUT_OF_MEMORY; } arg->tsdb = tsdb; arg->now = now; - arg->fid = INT32_MAX; + arg->fid = fset->fid; - tsem_wait(&tsdb->pVnode->canCommit); - code = vnodeScheduleTask(tsdbDoRetentionSync, arg); + if (sync) { + code = vnodeAsyncC(vnodeAsyncHandle[0], tsdb->pVnode->commitChannel, EVA_PRIORITY_LOW, tsdbDoRetentionAsync, + tsdbFreeRtnArg, arg, NULL); + } else { + code = vnodeAsyncC(vnodeAsyncHandle[1], fset->bgTaskChannel, EVA_PRIORITY_LOW, tsdbDoRetentionAsync, + tsdbFreeRtnArg, arg, NULL); + } if (code) { - tsem_post(&tsdb->pVnode->canCommit); - taosMemoryFree(arg); + tsdbFreeRtnArg(arg); + taosThreadMutexUnlock(&tsdb->mutex); return code; } - } else { // async retention - taosThreadMutexLock(&tsdb->mutex); - - STFileSet *fset; - TARRAY2_FOREACH(tsdb->pFS->fSetArr, fset) { - SRtnArg *arg = taosMemoryMalloc(sizeof(*arg)); - if (arg == NULL) { - taosThreadMutexUnlock(&tsdb->mutex); - return TSDB_CODE_OUT_OF_MEMORY; - } - - arg->tsdb = tsdb; - arg->now = now; - arg->fid = fset->fid; - - code = tsdbFSScheduleBgTask(tsdb->pFS, fset->fid, TSDB_BG_TASK_RETENTION, tsdbDoRetentionAsync, tsdbFreeRtnArg, - arg, NULL); - if (code) { - tsdbFreeRtnArg(arg); - taosThreadMutexUnlock(&tsdb->mutex); - return code; - } - } - - taosThreadMutexUnlock(&tsdb->mutex); } + taosThreadMutexUnlock(&tsdb->mutex); + return code; } diff --git a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c index a9da0fbcec..104c9b2f35 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c +++ b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c @@ -1032,9 +1032,6 @@ int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, void* pRang int32_t code = 0; int32_t lino = 0; - // disable background tasks - tsdbFSDisableBgTask(pTsdb->pFS); - // start to write writer[0] = taosMemoryCalloc(1, sizeof(*writer[0])); if (writer[0] == NULL) return TSDB_CODE_OUT_OF_MEMORY; @@ -1107,7 +1104,6 @@ int32_t tsdbSnapWriterClose(STsdbSnapWriter** writer, int8_t rollback) { taosThreadMutexUnlock(&writer[0]->tsdb->mutex); } - tsdbFSEnableBgTask(tsdb->pFS); tsdbIterMergerClose(&writer[0]->ctx->tombIterMerger); tsdbIterMergerClose(&writer[0]->ctx->dataIterMerger); @@ -1595,3 +1591,6 @@ _out: return code; } + +extern int32_t tsdbFSCancelAllBgTask(STFileSystem* fs); +int32_t tsdbCancelAllBgTask(STsdb* tsdb) { return tsdbFSCancelAllBgTask(tsdb->pFS); } \ No newline at end of file diff --git a/source/dnode/vnode/src/vnd/vnodeAsync.c b/source/dnode/vnode/src/vnd/vnodeAsync.c new file mode 100644 index 0000000000..c95d2324aa --- /dev/null +++ b/source/dnode/vnode/src/vnd/vnodeAsync.c @@ -0,0 +1,719 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "vnd.h" +#include "vnodeHash.h" + +typedef struct SVATask SVATask; +typedef struct SVAChannel SVAChannel; + +#define VNODE_ASYNC_DEFAULT_WORKERS 4 +#define VNODE_ASYNC_MAX_WORKERS 256 + +// priority + +#define EVA_PRIORITY_MAX (EVA_PRIORITY_LOW + 1) + +// worker +typedef enum { + EVA_WORKER_STATE_UINIT = 0, + EVA_WORKER_STATE_ACTIVE, + EVA_WORKER_STATE_IDLE, + EVA_WORKER_STATE_STOP, +} EVWorkerState; + +typedef struct { + SVAsync *async; + int32_t workerId; + EVWorkerState state; + TdThread thread; + SVATask *runningTask; +} SVWorker; + +// task +typedef enum { + EVA_TASK_STATE_WAITTING = 0, + EVA_TASK_STATE_RUNNING, +} EVATaskState; + +struct SVATask { + int64_t taskId; + EVAPriority priority; + int32_t priorScore; + SVAChannel *channel; + int32_t (*execute)(void *); + void (*complete)(void *); + void *arg; + EVATaskState state; + + // wait + int32_t numWait; + TdThreadCond waitCond; + + // queue + struct SVATask *prev; + struct SVATask *next; +}; + +#define VATASK_PIORITY(task_) ((task_)->priority - ((task_)->priorScore / 4)) + +// async channel +typedef enum { + EVA_CHANNEL_STATE_OPEN = 0, + EVA_CHANNEL_STATE_CLOSE, +} EVAChannelState; + +struct SVAChannel { + int64_t channelId; + EVAChannelState state; + SVATask queue[EVA_PRIORITY_MAX]; + SVATask *scheduled; + + SVAChannel *prev; + SVAChannel *next; +}; + +// async handle +struct SVAsync { + const char *label; + + TdThreadMutex mutex; + TdThreadCond hasTask; + bool stop; + + // worker + int32_t numWorkers; + int32_t numLaunchWorkers; + int32_t numIdleWorkers; + SVWorker workers[VNODE_ASYNC_MAX_WORKERS]; + + // channel + int64_t nextChannelId; + int32_t numChannels; + SVAChannel chList; + SVHashTable *channelTable; + + // task + int64_t nextTaskId; + int32_t numTasks; + SVATask queue[EVA_PRIORITY_MAX]; + SVHashTable *taskTable; +}; + +static int32_t vnodeAsyncTaskDone(SVAsync *async, SVATask *task) { + int32_t ret; + + if (task->channel != NULL && task->channel->scheduled == task) { + task->channel->scheduled = NULL; + if (task->channel->state == EVA_CHANNEL_STATE_CLOSE) { + taosMemoryFree(task->channel); + } else { + for (int32_t i = 0; i < EVA_PRIORITY_MAX; i++) { + SVATask *nextTask = task->channel->queue[i].next; + if (nextTask != &task->channel->queue[i]) { + if (task->channel->scheduled == NULL) { + task->channel->scheduled = nextTask; + nextTask->next->prev = nextTask->prev; + nextTask->prev->next = nextTask->next; + } else { + nextTask->priorScore++; + int32_t newPriority = VATASK_PIORITY(nextTask); + if (newPriority != i) { + // remove from current priority queue + nextTask->prev->next = nextTask->next; + nextTask->next->prev = nextTask->prev; + // add to new priority queue + nextTask->next = &task->channel->queue[newPriority]; + nextTask->prev = task->channel->queue[newPriority].prev; + nextTask->next->prev = nextTask; + nextTask->prev->next = nextTask; + } + } + } + } + + if (task->channel->scheduled != NULL) { + int32_t priority = VATASK_PIORITY(task->channel->scheduled); + task->channel->scheduled->next = &async->queue[priority]; + task->channel->scheduled->prev = async->queue[priority].prev; + task->channel->scheduled->next->prev = task->channel->scheduled; + task->channel->scheduled->prev->next = task->channel->scheduled; + } + } + } + + ret = vHashDrop(async->taskTable, task); + if (ret != 0) { + ASSERT(0); + } + async->numTasks--; + + // call complete callback + if (task->complete) { + task->complete(task->arg); + } + + if (task->numWait == 0) { + taosThreadCondDestroy(&task->waitCond); + taosMemoryFree(task); + } else if (task->numWait == 1) { + taosThreadCondSignal(&task->waitCond); + } else { + taosThreadCondBroadcast(&task->waitCond); + } + return 0; +} + +static int32_t vnodeAsyncCancelAllTasks(SVAsync *async) { + for (int32_t i = 0; i < EVA_PRIORITY_MAX; i++) { + while (async->queue[i].next != &async->queue[i]) { + SVATask *task = async->queue[i].next; + task->prev->next = task->next; + task->next->prev = task->prev; + vnodeAsyncTaskDone(async, task); + } + } + return 0; +} + +static void *vnodeAsyncLoop(void *arg) { + SVWorker *worker = (SVWorker *)arg; + SVAsync *async = worker->async; + + setThreadName(async->label); + + for (;;) { + taosThreadMutexLock(&async->mutex); + + // finish last running task + if (worker->runningTask != NULL) { + vnodeAsyncTaskDone(async, worker->runningTask); + worker->runningTask = NULL; + } + + for (;;) { + if (async->stop || worker->workerId >= async->numWorkers) { + if (async->stop) { // cancel all tasks + vnodeAsyncCancelAllTasks(async); + } + worker->state = EVA_WORKER_STATE_STOP; + async->numLaunchWorkers--; + taosThreadMutexUnlock(&async->mutex); + return NULL; + } + + for (int32_t i = 0; i < EVA_PRIORITY_MAX; i++) { + SVATask *task = async->queue[i].next; + if (task != &async->queue[i]) { + if (worker->runningTask == NULL) { + worker->runningTask = task; + task->prev->next = task->next; + task->next->prev = task->prev; + } else { // promote priority + task->priorScore++; + int32_t priority = VATASK_PIORITY(task); + if (priority != i) { + // remove from current priority queue + task->prev->next = task->next; + task->next->prev = task->prev; + // add to new priority queue + task->next = &async->queue[priority]; + task->prev = async->queue[priority].prev; + task->next->prev = task; + task->prev->next = task; + } + } + } + } + + if (worker->runningTask == NULL) { + worker->state = EVA_WORKER_STATE_IDLE; + async->numIdleWorkers++; + taosThreadCondWait(&async->hasTask, &async->mutex); + async->numIdleWorkers--; + worker->state = EVA_WORKER_STATE_ACTIVE; + } else { + worker->runningTask->state = EVA_TASK_STATE_RUNNING; + break; + } + } + + taosThreadMutexUnlock(&async->mutex); + + // do run the task + worker->runningTask->execute(worker->runningTask->arg); + } + + return NULL; +} + +static uint32_t vnodeAsyncTaskHash(const void *obj) { + SVATask *task = (SVATask *)obj; + return MurmurHash3_32((const char *)(&task->taskId), sizeof(task->taskId)); +} + +static int32_t vnodeAsyncTaskCompare(const void *obj1, const void *obj2) { + SVATask *task1 = (SVATask *)obj1; + SVATask *task2 = (SVATask *)obj2; + if (task1->taskId < task2->taskId) { + return -1; + } else if (task1->taskId > task2->taskId) { + return 1; + } + return 0; +} + +static uint32_t vnodeAsyncChannelHash(const void *obj) { + SVAChannel *channel = (SVAChannel *)obj; + return MurmurHash3_32((const char *)(&channel->channelId), sizeof(channel->channelId)); +} + +static int32_t vnodeAsyncChannelCompare(const void *obj1, const void *obj2) { + SVAChannel *channel1 = (SVAChannel *)obj1; + SVAChannel *channel2 = (SVAChannel *)obj2; + if (channel1->channelId < channel2->channelId) { + return -1; + } else if (channel1->channelId > channel2->channelId) { + return 1; + } + return 0; +} + +int32_t vnodeAsyncInit(SVAsync **async, char *label) { + int32_t ret; + + if (async == NULL) { + return TSDB_CODE_INVALID_PARA; + } + + if (label == NULL) { + label = "anonymous"; + } + + (*async) = (SVAsync *)taosMemoryCalloc(1, sizeof(SVAsync) + strlen(label) + 1); + if ((*async) == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + strcpy((char *)((*async) + 1), label); + (*async)->label = (const char *)((*async) + 1); + + taosThreadMutexInit(&(*async)->mutex, NULL); + taosThreadCondInit(&(*async)->hasTask, NULL); + (*async)->stop = false; + + // worker + (*async)->numWorkers = VNODE_ASYNC_DEFAULT_WORKERS; + (*async)->numLaunchWorkers = 0; + (*async)->numIdleWorkers = 0; + for (int32_t i = 0; i < VNODE_ASYNC_MAX_WORKERS; i++) { + (*async)->workers[i].async = (*async); + (*async)->workers[i].workerId = i; + (*async)->workers[i].state = EVA_WORKER_STATE_UINIT; + (*async)->workers[i].runningTask = NULL; + } + + // channel + (*async)->nextChannelId = 0; + (*async)->numChannels = 0; + (*async)->chList.prev = &(*async)->chList; + (*async)->chList.next = &(*async)->chList; + ret = vHashInit(&(*async)->channelTable, vnodeAsyncChannelHash, vnodeAsyncChannelCompare); + if (ret != 0) { + taosThreadMutexDestroy(&(*async)->mutex); + taosThreadCondDestroy(&(*async)->hasTask); + taosMemoryFree(*async); + return ret; + } + + // task + (*async)->nextTaskId = 0; + (*async)->numTasks = 0; + for (int32_t i = 0; i < EVA_PRIORITY_MAX; i++) { + (*async)->queue[i].next = &(*async)->queue[i]; + (*async)->queue[i].prev = &(*async)->queue[i]; + } + ret = vHashInit(&(*async)->taskTable, vnodeAsyncTaskHash, vnodeAsyncTaskCompare); + if (ret != 0) { + vHashDestroy(&(*async)->channelTable); + taosThreadMutexDestroy(&(*async)->mutex); + taosThreadCondDestroy(&(*async)->hasTask); + taosMemoryFree(*async); + return ret; + } + + return 0; +} + +int32_t vnodeAsyncDestroy(SVAsync **async) { + if ((*async) == NULL) { + return TSDB_CODE_INVALID_PARA; + } + + // set stop and broadcast + taosThreadMutexLock(&(*async)->mutex); + (*async)->stop = true; + taosThreadCondBroadcast(&(*async)->hasTask); + taosThreadMutexUnlock(&(*async)->mutex); + + // join all workers + for (int32_t i = 0; i < VNODE_ASYNC_MAX_WORKERS; i++) { + taosThreadMutexLock(&(*async)->mutex); + EVWorkerState state = (*async)->workers[i].state; + taosThreadMutexUnlock(&(*async)->mutex); + + if (state == EVA_WORKER_STATE_UINIT) { + continue; + } + + taosThreadJoin((*async)->workers[i].thread, NULL); + ASSERT((*async)->workers[i].state == EVA_WORKER_STATE_STOP); + (*async)->workers[i].state = EVA_WORKER_STATE_UINIT; + } + + // close all channels + for (SVAChannel *channel = (*async)->chList.next; channel != &(*async)->chList; channel = (*async)->chList.next) { + channel->next->prev = channel->prev; + channel->prev->next = channel->next; + + int32_t ret = vHashDrop((*async)->channelTable, channel); + if (ret) { + ASSERT(0); + } + (*async)->numChannels--; + taosMemoryFree(channel); + } + + ASSERT((*async)->numLaunchWorkers == 0); + ASSERT((*async)->numIdleWorkers == 0); + ASSERT((*async)->numChannels == 0); + ASSERT((*async)->numTasks == 0); + + taosThreadMutexDestroy(&(*async)->mutex); + taosThreadCondDestroy(&(*async)->hasTask); + + vHashDestroy(&(*async)->channelTable); + vHashDestroy(&(*async)->taskTable); + taosMemoryFree(*async); + *async = NULL; + + return 0; +} + +static int32_t vnodeAsyncLaunchWorker(SVAsync *async) { + for (int32_t i = 0; i < async->numWorkers; i++) { + ASSERT(async->workers[i].state != EVA_WORKER_STATE_IDLE); + if (async->workers[i].state == EVA_WORKER_STATE_ACTIVE) { + continue; + } else if (async->workers[i].state == EVA_WORKER_STATE_STOP) { + taosThreadJoin(async->workers[i].thread, NULL); + async->workers[i].state = EVA_WORKER_STATE_UINIT; + } + + taosThreadCreate(&async->workers[i].thread, NULL, vnodeAsyncLoop, &async->workers[i]); + async->workers[i].state = EVA_WORKER_STATE_ACTIVE; + async->numLaunchWorkers++; + break; + } + return 0; +} + +int32_t vnodeAsync(SVAsync *async, EVAPriority priority, int32_t (*execute)(void *), void (*complete)(void *), + void *arg, int64_t *taskId) { + return vnodeAsyncC(async, 0, priority, execute, complete, arg, taskId); +} + +int32_t vnodeAsyncC(SVAsync *async, int64_t channelId, EVAPriority priority, int32_t (*execute)(void *), + void (*complete)(void *), void *arg, int64_t *taskId) { + if (async == NULL || execute == NULL || channelId < 0) { + return TSDB_CODE_INVALID_PARA; + } + + int64_t id; + + // create task object + SVATask *task = (SVATask *)taosMemoryCalloc(1, sizeof(SVATask)); + if (task == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + task->priority = priority; + task->priorScore = 0; + task->execute = execute; + task->complete = complete; + task->arg = arg; + task->state = EVA_TASK_STATE_WAITTING; + task->numWait = 0; + taosThreadCondInit(&task->waitCond, NULL); + + // schedule task + taosThreadMutexLock(&async->mutex); + + if (channelId == 0) { + task->channel = NULL; + } else { + SVAChannel channel = {.channelId = channelId}; + vHashGet(async->channelTable, &channel, (void **)&task->channel); + if (task->channel == NULL) { + taosThreadMutexUnlock(&async->mutex); + taosThreadCondDestroy(&task->waitCond); + taosMemoryFree(task); + return TSDB_CODE_INVALID_PARA; + } + } + + task->taskId = id = ++async->nextTaskId; + + // add task to hash table + int32_t ret = vHashPut(async->taskTable, task); + if (ret != 0) { + taosThreadMutexUnlock(&async->mutex); + taosThreadCondDestroy(&task->waitCond); + taosMemoryFree(task); + return ret; + } + + async->numTasks++; + + // add task to queue + if (task->channel == NULL || task->channel->scheduled == NULL) { + // add task to async->queue + if (task->channel) { + task->channel->scheduled = task; + } + + task->next = &async->queue[priority]; + task->prev = async->queue[priority].prev; + task->next->prev = task; + task->prev->next = task; + + // signal worker or launch new worker + if (async->numIdleWorkers > 0) { + taosThreadCondSignal(&(async->hasTask)); + } else if (async->numLaunchWorkers < async->numWorkers) { + vnodeAsyncLaunchWorker(async); + } + } else if (task->channel->scheduled->state == EVA_TASK_STATE_RUNNING || + priority >= VATASK_PIORITY(task->channel->scheduled)) { + // add task to task->channel->queue + task->next = &task->channel->queue[priority]; + task->prev = task->channel->queue[priority].prev; + task->next->prev = task; + task->prev->next = task; + } else { + // remove task->channel->scheduled from queue + task->channel->scheduled->prev->next = task->channel->scheduled->next; + task->channel->scheduled->next->prev = task->channel->scheduled->prev; + + // promote priority and add task->channel->scheduled to task->channel->queue + task->channel->scheduled->priorScore++; + int32_t newPriority = VATASK_PIORITY(task->channel->scheduled); + task->channel->scheduled->next = &task->channel->queue[newPriority]; + task->channel->scheduled->prev = task->channel->queue[newPriority].prev; + task->channel->scheduled->next->prev = task->channel->scheduled; + task->channel->scheduled->prev->next = task->channel->scheduled; + + // add task to queue + task->channel->scheduled = task; + task->next = &async->queue[priority]; + task->prev = async->queue[priority].prev; + task->next->prev = task; + task->prev->next = task; + } + + taosThreadMutexUnlock(&async->mutex); + + if (taskId != NULL) { + *taskId = id; + } + + return 0; +} + +int32_t vnodeAWait(SVAsync *async, int64_t taskId) { + if (async == NULL || taskId <= 0) { + return TSDB_CODE_INVALID_PARA; + } + + SVATask *task = NULL; + SVATask task2 = {.taskId = taskId}; + + taosThreadMutexLock(&async->mutex); + + vHashGet(async->taskTable, &task2, (void **)&task); + if (task) { + task->numWait++; + taosThreadCondWait(&task->waitCond, &async->mutex); + task->numWait--; + + if (task->numWait == 0) { + taosThreadCondDestroy(&task->waitCond); + taosMemoryFree(task); + } + } + + taosThreadMutexUnlock(&async->mutex); + + return 0; +} + +int32_t vnodeACancel(SVAsync *async, int64_t taskId) { + if (async == NULL) { + return TSDB_CODE_INVALID_PARA; + } + + int32_t ret = 0; + SVATask *task = NULL; + SVATask task2 = {.taskId = taskId}; + + taosThreadMutexLock(&async->mutex); + + vHashGet(async->taskTable, &task2, (void **)&task); + if (task) { + if (task->state == EVA_TASK_STATE_WAITTING) { + // remove from queue + task->next->prev = task->prev; + task->prev->next = task->next; + vnodeAsyncTaskDone(async, task); + } else { + ret = 0; // task is running, should return code TSDB_CODE_BUSY ?? + } + } + + taosThreadMutexUnlock(&async->mutex); + + return ret; +} + +int32_t vnodeAsyncSetWorkers(SVAsync *async, int32_t numWorkers) { + if (async == NULL || numWorkers <= 0 || numWorkers > VNODE_ASYNC_MAX_WORKERS) { + return TSDB_CODE_INVALID_PARA; + } + + taosThreadMutexLock(&async->mutex); + async->numWorkers = numWorkers; + if (async->numIdleWorkers > 0) { + taosThreadCondBroadcast(&async->hasTask); + } + taosThreadMutexUnlock(&async->mutex); + + return 0; +} + +int32_t vnodeAChannelInit(SVAsync *async, int64_t *channelId) { + if (async == NULL || channelId == NULL) { + return TSDB_CODE_INVALID_PARA; + } + + // create channel object + SVAChannel *channel = (SVAChannel *)taosMemoryMalloc(sizeof(SVAChannel)); + if (channel == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + channel->state = EVA_CHANNEL_STATE_OPEN; + for (int32_t i = 0; i < EVA_PRIORITY_MAX; i++) { + channel->queue[i].next = &channel->queue[i]; + channel->queue[i].prev = &channel->queue[i]; + } + channel->scheduled = NULL; + + // register channel + taosThreadMutexLock(&async->mutex); + + channel->channelId = *channelId = ++async->nextChannelId; + + // add to hash table + int32_t ret = vHashPut(async->channelTable, channel); + if (ret != 0) { + taosThreadMutexUnlock(&async->mutex); + taosMemoryFree(channel); + return ret; + } + + // add to list + channel->next = &async->chList; + channel->prev = async->chList.prev; + channel->next->prev = channel; + channel->prev->next = channel; + + async->numChannels++; + + taosThreadMutexUnlock(&async->mutex); + + return 0; +} + +int32_t vnodeAChannelDestroy(SVAsync *async, int64_t channelId, bool waitRunning) { + if (async == NULL || channelId <= 0) { + return TSDB_CODE_INVALID_PARA; + } + + SVAChannel *channel = NULL; + SVAChannel channel2 = {.channelId = channelId}; + + taosThreadMutexLock(&async->mutex); + + vHashGet(async->channelTable, &channel2, (void **)&channel); + if (channel) { + // unregister channel + channel->next->prev = channel->prev; + channel->prev->next = channel->next; + vHashDrop(async->channelTable, channel); + async->numChannels--; + + // cancel all waiting tasks + for (int32_t i = 0; i < EVA_PRIORITY_MAX; i++) { + while (channel->queue[i].next != &channel->queue[i]) { + SVATask *task = channel->queue[i].next; + task->prev->next = task->next; + task->next->prev = task->prev; + vnodeAsyncTaskDone(async, task); + } + } + + // cancel or wait the scheduled task + if (channel->scheduled == NULL || channel->scheduled->state == EVA_TASK_STATE_WAITTING) { + if (channel->scheduled) { + channel->scheduled->prev->next = channel->scheduled->next; + channel->scheduled->next->prev = channel->scheduled->prev; + vnodeAsyncTaskDone(async, channel->scheduled); + } + taosMemoryFree(channel); + } else { + if (waitRunning) { + // wait task + SVATask *task = channel->scheduled; + task->numWait++; + taosThreadCondWait(&task->waitCond, &async->mutex); + task->numWait--; + if (task->numWait == 0) { + taosThreadCondDestroy(&task->waitCond); + taosMemoryFree(task); + } + + taosMemoryFree(channel); + } else { + channel->state = EVA_CHANNEL_STATE_CLOSE; + } + } + } else { + taosThreadMutexUnlock(&async->mutex); + return TSDB_CODE_INVALID_PARA; + } + + taosThreadMutexUnlock(&async->mutex); + + return 0; +} \ No newline at end of file diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index a178e1f772..c8cd167393 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -203,10 +203,8 @@ int vnodeSaveInfo(const char *dir, const SVnodeInfo *pInfo) { // free info binary taosMemoryFree(data); - vInfo("vgId:%d, vnode info is saved, fname:%s replica:%d selfIndex:%d changeVersion:%d", - pInfo->config.vgId, fname, - pInfo->config.syncCfg.replicaNum, pInfo->config.syncCfg.myIndex, - pInfo->config.syncCfg.changeVersion); + vInfo("vgId:%d, vnode info is saved, fname:%s replica:%d selfIndex:%d changeVersion:%d", pInfo->config.vgId, fname, + pInfo->config.syncCfg.replicaNum, pInfo->config.syncCfg.myIndex, pInfo->config.syncCfg.changeVersion); return 0; @@ -289,9 +287,10 @@ static int32_t vnodePrepareCommit(SVnode *pVnode, SCommitInfo *pInfo) { char dir[TSDB_FILENAME_LEN] = {0}; int64_t lastCommitted = pInfo->info.state.committed; - tsem_wait(&pVnode->canCommit); + // wait last commit task + vnodeAWait(vnodeAsyncHandle[0], pVnode->commitTask); - if(syncNodeGetConfig(pVnode->sync, &pVnode->config.syncCfg) != 0) goto _exit; + if (syncNodeGetConfig(pVnode->sync, &pVnode->config.syncCfg) != 0) goto _exit; pVnode->state.commitTerm = pVnode->state.applyTerm; @@ -379,12 +378,11 @@ static int32_t vnodeCommitTask(void *arg) { vnodeReturnBufPool(pVnode); _exit: - // end commit - tsem_post(&pVnode->canCommit); - taosMemoryFree(pInfo); return code; } +static void vnodeCompleteCommit(void *arg) { taosMemoryFree(arg); } + int vnodeAsyncCommit(SVnode *pVnode) { int32_t code = 0; @@ -401,14 +399,14 @@ int vnodeAsyncCommit(SVnode *pVnode) { } // schedule the task - code = vnodeScheduleTask(vnodeCommitTask, pInfo); + code = vnodeAsyncC(vnodeAsyncHandle[0], pVnode->commitChannel, EVA_PRIORITY_HIGH, vnodeCommitTask, + vnodeCompleteCommit, pInfo, &pVnode->commitTask); _exit: if (code) { if (NULL != pInfo) { taosMemoryFree(pInfo); } - tsem_post(&pVnode->canCommit); vError("vgId:%d, %s failed since %s, commit id:%" PRId64, TD_VID(pVnode), __func__, tstrerror(code), pVnode->state.commitID); } else { @@ -420,8 +418,7 @@ _exit: int vnodeSyncCommit(SVnode *pVnode) { vnodeAsyncCommit(pVnode); - tsem_wait(&pVnode->canCommit); - tsem_post(&pVnode->canCommit); + vnodeAWait(vnodeAsyncHandle[0], pVnode->commitTask); return 0; } @@ -501,7 +498,7 @@ _exit: } bool vnodeShouldRollback(SVnode *pVnode) { - char tFName[TSDB_FILENAME_LEN] = {0}; + char tFName[TSDB_FILENAME_LEN] = {0}; int32_t offset = 0; vnodeGetPrimaryDir(pVnode->path, pVnode->diskPrimary, pVnode->pTfs, tFName, TSDB_FILENAME_LEN); @@ -512,7 +509,7 @@ bool vnodeShouldRollback(SVnode *pVnode) { } void vnodeRollback(SVnode *pVnode) { - char tFName[TSDB_FILENAME_LEN] = {0}; + char tFName[TSDB_FILENAME_LEN] = {0}; int32_t offset = 0; vnodeGetPrimaryDir(pVnode->path, pVnode->diskPrimary, pVnode->pTfs, tFName, TSDB_FILENAME_LEN); diff --git a/source/dnode/vnode/src/vnd/vnodeHash.c b/source/dnode/vnode/src/vnd/vnodeHash.c new file mode 100644 index 0000000000..33602f6581 --- /dev/null +++ b/source/dnode/vnode/src/vnd/vnodeHash.c @@ -0,0 +1,162 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "vnodeHash.h" + +#define VNODE_HASH_DEFAULT_NUM_BUCKETS 1024 + +typedef struct SVHashEntry SVHashEntry; + +struct SVHashEntry { + SVHashEntry* next; + void* obj; +}; + +struct SVHashTable { + uint32_t (*hash)(const void*); + int32_t (*compare)(const void*, const void*); + int32_t numEntries; + uint32_t numBuckets; + SVHashEntry** buckets; +}; + +static int32_t vHashRehash(SVHashTable* ht, uint32_t newNumBuckets) { + SVHashEntry** newBuckets = (SVHashEntry**)taosMemoryCalloc(newNumBuckets, sizeof(SVHashEntry*)); + if (newBuckets == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + for (int32_t i = 0; i < ht->numBuckets; i++) { + SVHashEntry* entry = ht->buckets[i]; + while (entry != NULL) { + SVHashEntry* next = entry->next; + uint32_t bucketIndex = ht->hash(entry->obj) % newNumBuckets; + entry->next = newBuckets[bucketIndex]; + newBuckets[bucketIndex] = entry; + entry = next; + } + } + + taosMemoryFree(ht->buckets); + ht->buckets = newBuckets; + ht->numBuckets = newNumBuckets; + + return 0; +} + +int32_t vHashInit(SVHashTable** ht, uint32_t (*hash)(const void*), int32_t (*compare)(const void*, const void*)) { + if (ht == NULL || hash == NULL || compare == NULL) { + return TSDB_CODE_INVALID_PARA; + } + + (*ht) = (SVHashTable*)taosMemoryMalloc(sizeof(SVHashTable)); + if (*ht == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + (*ht)->hash = hash; + (*ht)->compare = compare; + (*ht)->numEntries = 0; + (*ht)->numBuckets = VNODE_HASH_DEFAULT_NUM_BUCKETS; + (*ht)->buckets = (SVHashEntry**)taosMemoryCalloc((*ht)->numBuckets, sizeof(SVHashEntry*)); + if ((*ht)->buckets == NULL) { + taosMemoryFree(*ht); + return TSDB_CODE_OUT_OF_MEMORY; + } + + return 0; +} + +int32_t vHashDestroy(SVHashTable** ht) { + if (ht == NULL) { + return TSDB_CODE_INVALID_PARA; + } + + if (*ht) { + ASSERT((*ht)->numEntries == 0); + taosMemoryFree((*ht)->buckets); + taosMemoryFree(*ht); + (*ht) = NULL; + } + return 0; +} + +int32_t vHashPut(SVHashTable* ht, void* obj) { + if (ht == NULL || obj == NULL) { + return TSDB_CODE_INVALID_PARA; + } + + uint32_t bucketIndex = ht->hash(obj) % ht->numBuckets; + for (SVHashEntry* entry = ht->buckets[bucketIndex]; entry != NULL; entry = entry->next) { + if (ht->compare(entry->obj, obj) == 0) { + return TSDB_CODE_DUP_KEY; + } + } + + if (ht->numEntries >= ht->numBuckets) { + vHashRehash(ht, ht->numBuckets * 2); + bucketIndex = ht->hash(obj) % ht->numBuckets; + } + + SVHashEntry* entry = (SVHashEntry*)taosMemoryMalloc(sizeof(SVHashEntry)); + if (entry == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + entry->obj = obj; + entry->next = ht->buckets[bucketIndex]; + ht->buckets[bucketIndex] = entry; + ht->numEntries++; + + return 0; +} + +int32_t vHashGet(SVHashTable* ht, const void* obj, void** retObj) { + if (ht == NULL || obj == NULL || retObj == NULL) { + return TSDB_CODE_INVALID_PARA; + } + + uint32_t bucketIndex = ht->hash(obj) % ht->numBuckets; + for (SVHashEntry* entry = ht->buckets[bucketIndex]; entry != NULL; entry = entry->next) { + if (ht->compare(entry->obj, obj) == 0) { + *retObj = entry->obj; + return 0; + } + } + + *retObj = NULL; + return TSDB_CODE_NOT_FOUND; +} + +int32_t vHashDrop(SVHashTable* ht, const void* obj) { + if (ht == NULL || obj == NULL) { + return TSDB_CODE_INVALID_PARA; + } + + uint32_t bucketIndex = ht->hash(obj) % ht->numBuckets; + for (SVHashEntry** entry = &ht->buckets[bucketIndex]; *entry != NULL; entry = &(*entry)->next) { + if (ht->compare((*entry)->obj, obj) == 0) { + SVHashEntry* tmp = *entry; + *entry = (*entry)->next; + taosMemoryFree(tmp); + ht->numEntries--; + if (ht->numBuckets > VNODE_HASH_DEFAULT_NUM_BUCKETS && ht->numEntries < ht->numBuckets / 4) { + vHashRehash(ht, ht->numBuckets / 2); + } + return 0; + } + } + + return TSDB_CODE_NOT_FOUND; +} \ No newline at end of file diff --git a/source/dnode/vnode/src/vnd/vnodeHash.h b/source/dnode/vnode/src/vnd/vnodeHash.h new file mode 100644 index 0000000000..86f6f9ac87 --- /dev/null +++ b/source/dnode/vnode/src/vnd/vnodeHash.h @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _VNODE_HAS_H_ +#define _VNODE_HAS_H_ + +#include "vnd.h" + +#ifdef __cplusplus +extern "C" { +#endif + +typedef struct SVHashTable SVHashTable; + +int32_t vHashInit(SVHashTable** ht, uint32_t (*hash)(const void*), int32_t (*compare)(const void*, const void*)); +int32_t vHashDestroy(SVHashTable** ht); +int32_t vHashPut(SVHashTable* ht, void* obj); +int32_t vHashGet(SVHashTable* ht, const void* obj, void** retObj); +int32_t vHashDrop(SVHashTable* ht, const void* obj); + +#ifdef __cplusplus +} +#endif + +#endif /*_VNODE_HAS_H_*/ \ No newline at end of file diff --git a/source/dnode/vnode/src/vnd/vnodeModule.c b/source/dnode/vnode/src/vnd/vnodeModule.c index df08fb8a2b..4e3cee42c6 100644 --- a/source/dnode/vnode/src/vnd/vnodeModule.c +++ b/source/dnode/vnode/src/vnd/vnodeModule.c @@ -16,65 +16,25 @@ #include "cos.h" #include "vnd.h" -typedef struct SVnodeTask SVnodeTask; -struct SVnodeTask { - SVnodeTask* next; - SVnodeTask* prev; - int (*execute)(void*); - void* arg; -}; +static volatile int32_t VINIT = 0; -typedef struct { - int nthreads; - TdThread* threads; - TdThreadMutex mutex; - TdThreadCond hasTask; - SVnodeTask queue; -} SVnodeThreadPool; - -struct SVnodeGlobal { - int8_t init; - int8_t stop; - SVnodeThreadPool tp[2]; -}; - -struct SVnodeGlobal vnodeGlobal; - -static void* loop(void* arg); +SVAsync* vnodeAsyncHandle[2]; int vnodeInit(int nthreads) { - int8_t init; - int ret; + int32_t init; - init = atomic_val_compare_exchange_8(&(vnodeGlobal.init), 0, 1); + init = atomic_val_compare_exchange_32(&VINIT, 0, 1); if (init) { return 0; } - vnodeGlobal.stop = 0; - for (int32_t i = 0; i < ARRAY_SIZE(vnodeGlobal.tp); i++) { - taosThreadMutexInit(&vnodeGlobal.tp[i].mutex, NULL); - taosThreadCondInit(&vnodeGlobal.tp[i].hasTask, NULL); + // vnode-commit + vnodeAsyncInit(&vnodeAsyncHandle[0], "vnode-commit"); + vnodeAsyncSetWorkers(vnodeAsyncHandle[0], nthreads); - taosThreadMutexLock(&vnodeGlobal.tp[i].mutex); - - vnodeGlobal.tp[i].queue.next = &vnodeGlobal.tp[i].queue; - vnodeGlobal.tp[i].queue.prev = &vnodeGlobal.tp[i].queue; - - taosThreadMutexUnlock(&(vnodeGlobal.tp[i].mutex)); - - vnodeGlobal.tp[i].nthreads = nthreads; - vnodeGlobal.tp[i].threads = taosMemoryCalloc(nthreads, sizeof(TdThread)); - if (vnodeGlobal.tp[i].threads == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - vError("failed to init vnode module since:%s", tstrerror(terrno)); - return -1; - } - - for (int j = 0; j < nthreads; j++) { - taosThreadCreate(&(vnodeGlobal.tp[i].threads[j]), NULL, loop, &vnodeGlobal.tp[i]); - } - } + // vnode-merge + vnodeAsyncInit(&vnodeAsyncHandle[1], "vnode-merge"); + vnodeAsyncSetWorkers(vnodeAsyncHandle[1], nthreads); if (walInit() < 0) { return -1; @@ -90,99 +50,15 @@ int vnodeInit(int nthreads) { } void vnodeCleanup() { - int8_t init; - - init = atomic_val_compare_exchange_8(&(vnodeGlobal.init), 1, 0); + int32_t init = atomic_val_compare_exchange_32(&VINIT, 1, 0); if (init == 0) return; // set stop - vnodeGlobal.stop = 1; - for (int32_t i = 0; i < ARRAY_SIZE(vnodeGlobal.tp); i++) { - taosThreadMutexLock(&(vnodeGlobal.tp[i].mutex)); - taosThreadCondBroadcast(&(vnodeGlobal.tp[i].hasTask)); - taosThreadMutexUnlock(&(vnodeGlobal.tp[i].mutex)); - - // wait for threads - for (int j = 0; j < vnodeGlobal.tp[i].nthreads; j++) { - taosThreadJoin(vnodeGlobal.tp[i].threads[j], NULL); - } - - // clear source - taosMemoryFreeClear(vnodeGlobal.tp[i].threads); - taosThreadCondDestroy(&(vnodeGlobal.tp[i].hasTask)); - taosThreadMutexDestroy(&(vnodeGlobal.tp[i].mutex)); - } + vnodeAsyncDestroy(&vnodeAsyncHandle[0]); + vnodeAsyncDestroy(&vnodeAsyncHandle[1]); walCleanUp(); tqCleanUp(); smaCleanUp(); s3CleanUp(); } - -int vnodeScheduleTaskEx(int tpid, int (*execute)(void*), void* arg) { - SVnodeTask* pTask; - - ASSERT(!vnodeGlobal.stop); - - pTask = taosMemoryMalloc(sizeof(*pTask)); - if (pTask == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - pTask->execute = execute; - pTask->arg = arg; - - taosThreadMutexLock(&(vnodeGlobal.tp[tpid].mutex)); - pTask->next = &vnodeGlobal.tp[tpid].queue; - pTask->prev = vnodeGlobal.tp[tpid].queue.prev; - vnodeGlobal.tp[tpid].queue.prev->next = pTask; - vnodeGlobal.tp[tpid].queue.prev = pTask; - taosThreadCondSignal(&(vnodeGlobal.tp[tpid].hasTask)); - taosThreadMutexUnlock(&(vnodeGlobal.tp[tpid].mutex)); - - return 0; -} - -int vnodeScheduleTask(int (*execute)(void*), void* arg) { return vnodeScheduleTaskEx(0, execute, arg); } - -/* ------------------------ STATIC METHODS ------------------------ */ -static void* loop(void* arg) { - SVnodeThreadPool* tp = (SVnodeThreadPool*)arg; - SVnodeTask* pTask; - int ret; - - if (tp == &vnodeGlobal.tp[0]) { - setThreadName("vnode-commit"); - } else if (tp == &vnodeGlobal.tp[1]) { - setThreadName("vnode-merge"); - } - - for (;;) { - taosThreadMutexLock(&(tp->mutex)); - for (;;) { - pTask = tp->queue.next; - if (pTask == &tp->queue) { - // no task - if (vnodeGlobal.stop) { - taosThreadMutexUnlock(&(tp->mutex)); - return NULL; - } else { - taosThreadCondWait(&(tp->hasTask), &(tp->mutex)); - } - } else { - // has task - pTask->prev->next = pTask->next; - pTask->next->prev = pTask->prev; - break; - } - } - - taosThreadMutexUnlock(&(tp->mutex)); - - pTask->execute(pTask->arg); - taosMemoryFree(pTask); - } - - return NULL; -} diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index ff79e83d72..946ce9d278 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -129,8 +129,8 @@ int32_t vnodeAlterReplica(const char *path, SAlterVnodeReplicaReq *pReq, int32_t } pCfg->changeVersion = pReq->changeVersion; - vInfo("vgId:%d, save config while alter, replicas:%d totalReplicas:%d selfIndex:%d changeVersion:%d", - pReq->vgId, pCfg->replicaNum, pCfg->totalReplicaNum, pCfg->myIndex, pCfg->changeVersion); + vInfo("vgId:%d, save config while alter, replicas:%d totalReplicas:%d selfIndex:%d changeVersion:%d", pReq->vgId, + pCfg->replicaNum, pCfg->totalReplicaNum, pCfg->myIndex, pCfg->changeVersion); info.config.syncCfg = *pCfg; ret = vnodeSaveInfo(dir, &info); @@ -396,10 +396,14 @@ SVnode *vnodeOpen(const char *path, int32_t diskPrimary, STfs *pTfs, SMsgCb msgC pVnode->blocked = false; tsem_init(&pVnode->syncSem, 0, 0); - tsem_init(&(pVnode->canCommit), 0, 1); taosThreadMutexInit(&pVnode->mutex, NULL); taosThreadCondInit(&pVnode->poolNotEmpty, NULL); + if (vnodeAChannelInit(vnodeAsyncHandle[0], &pVnode->commitChannel) != 0) { + vError("vgId:%d, failed to init commit channel", TD_VID(pVnode)); + goto _err; + } + int8_t rollback = vnodeShouldRollback(pVnode); // open buffer pool @@ -487,7 +491,6 @@ _err: if (pVnode->pMeta) metaClose(&pVnode->pMeta); if (pVnode->freeList) vnodeCloseBufPool(pVnode); - tsem_destroy(&(pVnode->canCommit)); taosMemoryFree(pVnode); return NULL; } @@ -501,7 +504,8 @@ void vnodePostClose(SVnode *pVnode) { vnodeSyncPostClose(pVnode); } void vnodeClose(SVnode *pVnode) { if (pVnode) { - tsem_wait(&pVnode->canCommit); + vnodeAWait(vnodeAsyncHandle[0], pVnode->commitTask); + vnodeAChannelDestroy(vnodeAsyncHandle[0], pVnode->commitChannel, true); vnodeSyncClose(pVnode); vnodeQueryClose(pVnode); tqClose(pVnode->pTq); @@ -510,10 +514,8 @@ void vnodeClose(SVnode *pVnode) { smaClose(pVnode->pSma); if (pVnode->pMeta) metaClose(&pVnode->pMeta); vnodeCloseBufPool(pVnode); - tsem_post(&pVnode->canCommit); // destroy handle - tsem_destroy(&(pVnode->canCommit)); tsem_destroy(&pVnode->syncSem); taosThreadCondDestroy(&pVnode->poolNotEmpty); taosThreadMutexDestroy(&pVnode->mutex); diff --git a/source/dnode/vnode/src/vnd/vnodeSnapshot.c b/source/dnode/vnode/src/vnd/vnodeSnapshot.c index 91244e321f..f2ef11e9ed 100644 --- a/source/dnode/vnode/src/vnd/vnodeSnapshot.c +++ b/source/dnode/vnode/src/vnd/vnodeSnapshot.c @@ -13,8 +13,8 @@ * along with this program. If not, see . */ -#include "vnd.h" #include "tsdb.h" +#include "vnd.h" // SVSnapReader ======================================================== struct SVSnapReader { @@ -32,11 +32,11 @@ struct SVSnapReader { TSnapRangeArray *pRanges; STsdbSnapReader *pTsdbReader; // tq - int8_t tqHandleDone; - STqSnapReader *pTqSnapReader; - int8_t tqOffsetDone; - STqOffsetReader *pTqOffsetReader; - int8_t tqCheckInfoDone; + int8_t tqHandleDone; + STqSnapReader *pTqSnapReader; + int8_t tqOffsetDone; + STqOffsetReader *pTqOffsetReader; + int8_t tqCheckInfoDone; STqCheckInfoReader *pTqCheckInfoReader; // stream int8_t streamTaskDone; @@ -458,8 +458,8 @@ struct SVSnapWriter { TSnapRangeArray *pRanges; STsdbSnapWriter *pTsdbSnapWriter; // tq - STqSnapWriter *pTqSnapWriter; - STqOffsetWriter *pTqOffsetWriter; + STqSnapWriter *pTqSnapWriter; + STqOffsetWriter *pTqOffsetWriter; STqCheckInfoWriter *pTqCheckInfoWriter; // stream SStreamTaskWriter *pStreamTaskWriter; @@ -519,6 +519,8 @@ _out: return code; } +extern int32_t tsdbCancelAllBgTask(STsdb *tsdb); + int32_t vnodeSnapWriterOpen(SVnode *pVnode, SSnapshotParam *pParam, SVSnapWriter **ppWriter) { int32_t code = 0; SVSnapWriter *pWriter = NULL; @@ -526,8 +528,8 @@ int32_t vnodeSnapWriterOpen(SVnode *pVnode, SSnapshotParam *pParam, SVSnapWriter int64_t ever = pParam->end; // commit memory data - vnodeAsyncCommit(pVnode); - tsem_wait(&pVnode->canCommit); + vnodeSyncCommit(pVnode); + tsdbCancelAllBgTask(pVnode->pTsdb); // alloc pWriter = (SVSnapWriter *)taosMemoryCalloc(1, sizeof(*pWriter)); @@ -657,7 +659,6 @@ _exit: vInfo("vgId:%d, vnode snapshot writer closed, rollback:%d", TD_VID(pVnode), rollback); taosMemoryFree(pWriter); } - tsem_post(&pVnode->canCommit); return code; }