diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index c85316f810..84e1996f2c 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -355,6 +355,8 @@ typedef struct { int flush_count; } SCacheFlushState; +typedef struct SCompMonitor SCompMonitor; + struct STsdb { char * path; SVnode * pVnode; @@ -375,8 +377,11 @@ struct STsdb { TdThreadMutex pgMutex; struct STFileSystem *pFS; // new SRocksCache rCache; - // compact monitor - struct SCompMonitor *pCompMonitor; + SCompMonitor *pCompMonitor; + struct { + SVHashTable *ht; + SArray *arr; + } *commitInfo; }; struct TSDBKEY { diff --git a/source/dnode/vnode/src/inc/vnd.h b/source/dnode/vnode/src/inc/vnd.h index 304716744c..7bdc8e1c33 100644 --- a/source/dnode/vnode/src/inc/vnd.h +++ b/source/dnode/vnode/src/inc/vnd.h @@ -49,31 +49,21 @@ int32_t vnodeEncodeConfig(const void* pObj, SJson* pJson); int32_t vnodeDecodeConfig(const SJson* pJson, void* pObj); // vnodeAsync.c -typedef struct SVAsync SVAsync; - typedef enum { EVA_PRIORITY_HIGH = 0, EVA_PRIORITY_NORMAL, EVA_PRIORITY_LOW, } EVAPriority; -#define VNODE_ASYNC_VALID_CHANNEL_ID(channelId) ((channelId) > 0) -#define VNODE_ASYNC_VALID_TASK_ID(taskId) ((taskId) > 0) - -int32_t vnodeAsyncInit(SVAsync** async, char* label); -int32_t vnodeAsyncDestroy(SVAsync** async); -int32_t vnodeAChannelInit(SVAsync* async, int64_t* channelId); -int32_t vnodeAChannelDestroy(SVAsync* async, int64_t channelId, bool waitRunning); -int32_t vnodeAsync(SVAsync* async, EVAPriority priority, int32_t (*execute)(void*), void (*complete)(void*), void* arg, - int64_t* taskId); -int32_t vnodeAsyncC(SVAsync* async, int64_t channelId, EVAPriority priority, int32_t (*execute)(void*), - void (*complete)(void*), void* arg, int64_t* taskId); -int32_t vnodeAWait(SVAsync* async, int64_t taskId); -int32_t vnodeACancel(SVAsync* async, int64_t taskId); -int32_t vnodeAsyncSetWorkers(SVAsync* async, int32_t numWorkers); - -// vnodeModule.c -extern SVAsync* vnodeAsyncHandle[2]; +int32_t vnodeAsyncOpen(int32_t numOfThreads); +int32_t vnodeAsyncClose(); +int32_t vnodeAChannelInit(int64_t async, SVAChannelID* channelID); +int32_t vnodeAChannelDestroy(SVAChannelID* channelID, bool waitRunning); +int32_t vnodeAsync(SVAChannelID* channelID, EVAPriority priority, int32_t (*execute)(void*), void (*complete)(void*), + void* arg, SVATaskID* taskID); +int32_t vnodeAWait(SVATaskID* taskID); +int32_t vnodeACancel(SVATaskID* taskID); +int32_t vnodeAsyncSetWorkers(int64_t async, int32_t numWorkers); // vnodeBufPool.c typedef struct SVBufPoolNode SVBufPoolNode; diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index f3bdc98994..59599fdae6 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -237,9 +237,6 @@ int32_t tsdbCacheNewSTableColumn(STsdb* pTsdb, SArray* uids, int16_t cid, int8_t int32_t tsdbCacheDropSTableColumn(STsdb* pTsdb, SArray* uids, int16_t cid, bool hasPrimayKey); int32_t tsdbCacheNewNTableColumn(STsdb* pTsdb, int64_t uid, int16_t cid, int8_t col_type); int32_t tsdbCacheDropNTableColumn(STsdb* pTsdb, int64_t uid, int16_t cid, bool hasPrimayKey); -int32_t tsdbCompact(STsdb* pTsdb, SCompactInfo* pInfo); -int32_t tsdbRetention(STsdb* tsdb, int64_t now, int32_t sync); -int32_t tsdbS3Migrate(STsdb* tsdb, int64_t now, int32_t sync); int tsdbScanAndConvertSubmitMsg(STsdb* pTsdb, SSubmitReq2* pMsg); int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq2* pMsg, SSubmitRsp2* pRsp); int32_t tsdbInsertTableData(STsdb* pTsdb, int64_t version, SSubmitTbData* pSubmitTbData, int32_t* affectedRows); @@ -472,6 +469,16 @@ typedef struct SVMonitorObj { taos_counter_t* insertCounter; } SVMonitorObj; +typedef struct { + int64_t async; + int64_t id; +} SVAChannelID; + +typedef struct { + int64_t async; + int64_t id; +} SVATaskID; + struct SVnode { char* path; SVnodeCfg config; @@ -493,8 +500,8 @@ struct SVnode { SVBufPool* onRecycle; // commit variables - int64_t commitChannel; - int64_t commitTask; + SVAChannelID commitChannel; + SVATaskID commitTask; SMeta* pMeta; SSma* pSma; @@ -600,6 +607,24 @@ struct SCompactInfo { void initStorageAPI(SStorageAPI* pAPI); +// a simple hash table impl +typedef struct SVHashTable SVHashTable; + +struct SVHashTable { + uint32_t (*hash)(const void*); + int32_t (*compare)(const void*, const void*); + int32_t numEntries; + uint32_t numBuckets; + struct SVHashEntry** buckets; +}; + +#define vHashNumEntries(ht) ((ht)->numEntries) +int32_t vHashInit(SVHashTable** ht, uint32_t (*hash)(const void*), int32_t (*compare)(const void*, const void*)); +int32_t vHashDestroy(SVHashTable** ht); +int32_t vHashPut(SVHashTable* ht, void* obj); +int32_t vHashGet(SVHashTable* ht, const void* obj, void** retObj); +int32_t vHashDrop(SVHashTable* ht, const void* obj); + #ifdef __cplusplus } #endif diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index b8d0e30d30..6babcf3c80 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -622,8 +622,8 @@ int32_t smaRetention(SSma *pSma, int64_t now) { for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { if (pSma->pRSmaTsdb[i]) { - code = tsdbRetention(pSma->pRSmaTsdb[i], now, pSma->pVnode->config.sttTrigger == 1); - if (code) goto _end; + // code = tsdbRetention(pSma->pRSmaTsdb[i], now, pSma->pVnode->config.sttTrigger == 1); + // if (code) goto _end; } } diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit2.c b/source/dnode/vnode/src/tsdb/tsdbCommit2.c index 96c0f2a4ba..1a08f2dc82 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit2.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit2.c @@ -17,13 +17,13 @@ // extern dependencies typedef struct { - STsdb *tsdb; - TFileSetArray *fsetArr; - TFileOpArray fopArray[1]; - - // SSkmInfo skmTb[1]; - // SSkmInfo skmRow[1]; + int32_t fid; + bool hasDataToCommit; + STFileSet *fset; +} SFileSetCommitInfo; +typedef struct { + STsdb *tsdb; int32_t minutes; int8_t precision; int32_t minRow; @@ -32,34 +32,34 @@ typedef struct { int32_t sttTrigger; int32_t szPage; int64_t compactVersion; + int64_t cid; + int64_t now; struct { - int64_t cid; - int64_t now; - TSKEY nextKey; - int32_t fid; - int32_t expLevel; - SDiskID did; - TSKEY minKey; - TSKEY maxKey; - STFileSet *fset; - TABLEID tbid[1]; - bool hasTSData; - bool skipTsRow; - SHashObj *pColCmprObj; + SFileSetCommitInfo *info; + + int32_t expLevel; + SDiskID did; + TSKEY minKey; + TSKEY maxKey; + TABLEID tbid[1]; + bool hasTSData; + + bool skipTsRow; + SHashObj *pColCmprObj; } ctx[1]; // reader TSttFileReaderArray sttReaderArray[1]; - // iter TTsdbIterArray dataIterArray[1]; SIterMerger *dataIterMerger; TTsdbIterArray tombIterArray[1]; SIterMerger *tombIterMerger; - // writer SFSetWriter *writer; + + TFileOpArray fopArray[1]; } SCommitter2; static int32_t tsdbCommitOpenWriter(SCommitter2 *committer) { @@ -74,8 +74,8 @@ static int32_t tsdbCommitOpenWriter(SCommitter2 *committer) { .maxRow = committer->maxRow, .szPage = committer->szPage, .cmprAlg = committer->cmprAlg, - .fid = committer->ctx->fid, - .cid = committer->ctx->cid, + .fid = committer->ctx->info->fid, + .cid = committer->cid, .did = committer->ctx->did, .level = 0, }; @@ -83,11 +83,11 @@ static int32_t tsdbCommitOpenWriter(SCommitter2 *committer) { if (committer->sttTrigger == 1) { config.toSttOnly = false; - if (committer->ctx->fset) { + if (committer->ctx->info->fset) { for (int32_t ftype = TSDB_FTYPE_MIN; ftype < TSDB_FTYPE_MAX; ftype++) { - if (committer->ctx->fset->farr[ftype] != NULL) { + if (committer->ctx->info->fset->farr[ftype] != NULL) { config.files[ftype].exist = true; - config.files[ftype].file = committer->ctx->fset->farr[ftype]->f[0]; + config.files[ftype].file = committer->ctx->info->fset->farr[ftype]->f[0]; } } } @@ -117,7 +117,6 @@ static int32_t tsdbCommitTSData(SCommitter2 *committer) { committer->ctx->tbid->suid = 0; committer->ctx->tbid->uid = 0; - for (SRowInfo *row; (row = tsdbIterMergerGetData(committer->dataIterMerger)) != NULL;) { if (row->uid != committer->ctx->tbid->uid) { committer->ctx->tbid->suid = row->suid; @@ -132,7 +131,6 @@ static int32_t tsdbCommitTSData(SCommitter2 *committer) { int64_t ts = TSDBROW_TS(&row->row); if (ts > committer->ctx->maxKey) { - committer->ctx->nextKey = TMIN(committer->ctx->nextKey, ts); code = tsdbIterMergerSkipTableData(committer->dataIterMerger, committer->ctx->tbid); TSDB_CHECK_CODE(code, lino, _exit); continue; @@ -152,7 +150,8 @@ _exit: if (code) { TSDB_ERROR_LOG(TD_VID(committer->tsdb->pVnode), lino, code); } else { - tsdbDebug("vgId:%d fid:%d commit %" PRId64 " rows", TD_VID(committer->tsdb->pVnode), committer->ctx->fid, numOfRow); + tsdbDebug("vgId:%d fid:%d commit %" PRId64 " rows", TD_VID(committer->tsdb->pVnode), committer->ctx->info->fid, + numOfRow); } return code; } @@ -168,7 +167,7 @@ static int32_t tsdbCommitTombData(SCommitter2 *committer) { } // do not need to write tomb data if there is no ts data - bool skip = (committer->ctx->fset == NULL && !committer->ctx->hasTSData); + bool skip = (committer->ctx->info->fset == NULL && !committer->ctx->hasTSData); committer->ctx->tbid->suid = 0; committer->ctx->tbid->uid = 0; @@ -187,12 +186,8 @@ static int32_t tsdbCommitTombData(SCommitter2 *committer) { if (record->ekey < committer->ctx->minKey) { // do nothing } else if (record->skey > committer->ctx->maxKey) { - committer->ctx->nextKey = TMIN(record->skey, committer->ctx->nextKey); + // committer->ctx->nextKey = TMIN(record->skey, committer->ctx->nextKey); } else { - if (record->ekey > committer->ctx->maxKey) { - committer->ctx->nextKey = TMIN(committer->ctx->nextKey, committer->ctx->maxKey + 1); - } - record->skey = TMAX(record->skey, committer->ctx->minKey); record->ekey = TMIN(record->ekey, committer->ctx->maxKey); @@ -211,8 +206,8 @@ _exit: if (code) { TSDB_ERROR_LOG(TD_VID(committer->tsdb->pVnode), lino, code); } else { - tsdbDebug("vgId:%d fid:%d commit %" PRId64 " tomb records", TD_VID(committer->tsdb->pVnode), committer->ctx->fid, - numRecord); + tsdbDebug("vgId:%d fid:%d commit %" PRId64 " tomb records", TD_VID(committer->tsdb->pVnode), + committer->ctx->info->fid, numRecord); } return code; } @@ -223,15 +218,15 @@ static int32_t tsdbCommitOpenReader(SCommitter2 *committer) { ASSERT(TARRAY2_SIZE(committer->sttReaderArray) == 0); - if (committer->ctx->fset == NULL // - || committer->sttTrigger > 1 // - || TARRAY2_SIZE(committer->ctx->fset->lvlArr) == 0 // + if (committer->ctx->info->fset == NULL // + || committer->sttTrigger > 1 // + || TARRAY2_SIZE(committer->ctx->info->fset->lvlArr) == 0 // ) { return 0; } SSttLvl *lvl; - TARRAY2_FOREACH(committer->ctx->fset->lvlArr, lvl) { + TARRAY2_FOREACH(committer->ctx->info->fset->lvlArr, lvl) { STFileObj *fobj = NULL; TARRAY2_FOREACH(lvl->fobjArr, fobj) { SSttFileReader *sttReader; @@ -289,7 +284,7 @@ static int32_t tsdbCommitOpenIter(SCommitter2 *committer) { config.from->version = VERSION_MIN; config.from->key = (SRowKey){ .ts = committer->ctx->minKey, - .numOfPKs = 0, // TODO: support multiple primary keys + .numOfPKs = 0, }; code = tsdbIterOpen(&config, &iter); @@ -359,22 +354,15 @@ static int32_t tsdbCommitFileSetBegin(SCommitter2 *committer) { int32_t lino = 0; STsdb *tsdb = committer->tsdb; - int32_t fid = tsdbKeyFid(committer->ctx->nextKey, committer->minutes, committer->precision); - // check if can commit - tsdbFSCheckCommit(tsdb, fid); + tsdbFSCheckCommit(tsdb, committer->ctx->info->fid); - committer->ctx->fid = fid; - committer->ctx->expLevel = tsdbFidLevel(committer->ctx->fid, &tsdb->keepCfg, committer->ctx->now); - tsdbFidKeyRange(committer->ctx->fid, committer->minutes, committer->precision, &committer->ctx->minKey, + committer->ctx->expLevel = tsdbFidLevel(committer->ctx->info->fid, &tsdb->keepCfg, committer->now); + tsdbFidKeyRange(committer->ctx->info->fid, committer->minutes, committer->precision, &committer->ctx->minKey, &committer->ctx->maxKey); code = tfsAllocDisk(committer->tsdb->pVnode->pTfs, committer->ctx->expLevel, &committer->ctx->did); TSDB_CHECK_CODE(code, lino, _exit); tfsMkdirRecurAt(committer->tsdb->pVnode->pTfs, committer->tsdb->path, committer->ctx->did); - STFileSet fset = {.fid = committer->ctx->fid}; - committer->ctx->fset = &fset; - STFileSet **fsetPtr = TARRAY2_SEARCH(committer->fsetArr, &committer->ctx->fset, tsdbTFileSetCmprFn, TD_EQ); - committer->ctx->fset = (fsetPtr == NULL) ? NULL : *fsetPtr; committer->ctx->tbid->suid = 0; committer->ctx->tbid->uid = 0; @@ -391,15 +379,13 @@ static int32_t tsdbCommitFileSetBegin(SCommitter2 *committer) { code = tsdbCommitOpenWriter(committer); TSDB_CHECK_CODE(code, lino, _exit); - // reset nextKey - committer->ctx->nextKey = TSKEY_MAX; - _exit: if (code) { TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code); } else { tsdbDebug("vgId:%d %s done, fid:%d minKey:%" PRId64 " maxKey:%" PRId64 " expLevel:%d", TD_VID(tsdb->pVnode), - __func__, committer->ctx->fid, committer->ctx->minKey, committer->ctx->maxKey, committer->ctx->expLevel); + __func__, committer->ctx->info->fid, committer->ctx->minKey, committer->ctx->maxKey, + committer->ctx->expLevel); } return code; } @@ -421,7 +407,7 @@ _exit: if (code) { TSDB_ERROR_LOG(TD_VID(committer->tsdb->pVnode), lino, code); } else { - tsdbDebug("vgId:%d %s done, fid:%d", TD_VID(committer->tsdb->pVnode), __func__, committer->ctx->fid); + tsdbDebug("vgId:%d %s done, fid:%d", TD_VID(committer->tsdb->pVnode), __func__, committer->ctx->info->fid); } return code; } @@ -449,7 +435,201 @@ _exit: if (code) { TSDB_ERROR_LOG(TD_VID(committer->tsdb->pVnode), lino, code); } else { - tsdbDebug("vgId:%d %s done, fid:%d", TD_VID(committer->tsdb->pVnode), __func__, committer->ctx->fid); + tsdbDebug("vgId:%d %s done, fid:%d", TD_VID(committer->tsdb->pVnode), __func__, committer->ctx->info->fid); + } + return code; +} + +static int32_t tFileSetCommitInfoCompare(const void *arg1, const void *arg2) { + SFileSetCommitInfo *info1 = (SFileSetCommitInfo *)arg1; + SFileSetCommitInfo *info2 = (SFileSetCommitInfo *)arg2; + + if (info1->fid < info2->fid) { + return -1; + } else if (info1->fid > info2->fid) { + return 1; + } else { + return 0; + } +} + +static int32_t tFileSetCommitInfoPCompare(const void *arg1, const void *arg2) { + return tFileSetCommitInfoCompare(*(SFileSetCommitInfo **)arg1, *(SFileSetCommitInfo **)arg2); +} + +static uint32_t tFileSetCommitInfoHash(const void *arg) { + SFileSetCommitInfo *info = (SFileSetCommitInfo *)arg; + return MurmurHash3_32((const char *)&info->fid, sizeof(info->fid)); +} + +static int32_t tsdbCommitInfoDestroy(STsdb *pTsdb) { + int32_t code = 0; + int32_t lino = 0; + + if (pTsdb->commitInfo) { + for (int32_t i = 0; i < taosArrayGetSize(pTsdb->commitInfo->arr); i++) { + SFileSetCommitInfo *info = *(SFileSetCommitInfo **)taosArrayGet(pTsdb->commitInfo->arr, i); + vHashDrop(pTsdb->commitInfo->ht, info); + tsdbTFileSetClear(&info->fset); + taosMemoryFree(info); + } + + vHashDestroy(&pTsdb->commitInfo->ht); + taosArrayDestroy(pTsdb->commitInfo->arr); + pTsdb->commitInfo->arr = NULL; + taosMemoryFreeClear(pTsdb->commitInfo); + } + +_exit: + if (code) { + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); + } + return code; +} + +static int32_t tsdbCommitInfoInit(STsdb *pTsdb) { + int32_t code = 0; + int32_t lino = 0; + + pTsdb->commitInfo = taosMemoryCalloc(1, sizeof(*pTsdb->commitInfo)); + if (pTsdb->commitInfo == NULL) { + TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit); + } + + code = vHashInit(&pTsdb->commitInfo->ht, tFileSetCommitInfoHash, tFileSetCommitInfoCompare); + TSDB_CHECK_CODE(code, lino, _exit); + + pTsdb->commitInfo->arr = taosArrayInit(0, sizeof(SFileSetCommitInfo *)); + if (pTsdb->commitInfo->arr == NULL) { + TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit); + } + +_exit: + if (code) { + tsdbCommitInfoDestroy(pTsdb); + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); + } + return code; +} + +static int32_t tsdbCommitInfoAdd(STsdb *tsdb, const SFileSetCommitInfo *info) { + int32_t code = 0; + int32_t lino = 0; + + SFileSetCommitInfo *tinfo; + + vHashGet(tsdb->commitInfo->ht, info, (void **)&tinfo); + if (tinfo) { + if (info->hasDataToCommit && !tinfo->hasDataToCommit) { + tinfo->hasDataToCommit = true; + } + } else { + if ((tinfo = taosMemoryCalloc(1, sizeof(*tinfo))) == NULL) { + TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit); + } + tinfo->fid = info->fid; + tinfo->hasDataToCommit = info->hasDataToCommit; + if (info->fset) { + code = tsdbTFileSetInitCopy(tsdb, info->fset, &tinfo->fset); + TSDB_CHECK_CODE(code, lino, _exit); + } + + code = vHashPut(tsdb->commitInfo->ht, tinfo); + TSDB_CHECK_CODE(code, lino, _exit); + + if ((taosArrayPush(tsdb->commitInfo->arr, &tinfo)) == NULL) { + TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit); + } + taosArraySort(tsdb->commitInfo->arr, tFileSetCommitInfoPCompare); + } + +_exit: + if (code) { + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(tsdb->pVnode), __func__, lino, tstrerror(code)); + } + return code; +} + +static int32_t tsdbCommitInfoBuild(STsdb *tsdb) { + int32_t code = 0; + int32_t lino = 0; + + STFileSet *fset = NULL; + SRBTreeIter iter; + + code = tsdbCommitInfoInit(tsdb); + TSDB_CHECK_CODE(code, lino, _exit); + + taosThreadMutexLock(&tsdb->mutex); + TARRAY2_FOREACH(tsdb->pFS->fSetArr, fset) { + SFileSetCommitInfo info = { + .fid = fset->fid, + .hasDataToCommit = false, + .fset = fset, + }; + if ((code = tsdbCommitInfoAdd(tsdb, &info))) { + taosThreadMutexUnlock(&tsdb->mutex); + TSDB_CHECK_CODE(code, lino, _exit); + } + } + taosThreadMutexUnlock(&tsdb->mutex); + + iter = tRBTreeIterCreate(tsdb->imem->tbDataTree, 1); + for (SRBTreeNode *node = tRBTreeIterNext(&iter); node; node = tRBTreeIterNext(&iter)) { + STbData *pTbData = TCONTAINER_OF(node, STbData, rbtn); + + // scan time-series data + STsdbRowKey from = { + .key.ts = INT64_MIN, + .key.numOfPKs = 0, + .version = INT64_MIN, + }; + for (;;) { + int64_t minKey, maxKey; + STbDataIter tbDataIter = {0}; + TSDBROW *row; + int32_t fid; + + tsdbTbDataIterOpen(pTbData, &from, 0, &tbDataIter); + if ((row = tsdbTbDataIterGet(&tbDataIter)) == NULL) { + break; + } + + fid = tsdbKeyFid(TSDBROW_TS(row), tsdb->keepCfg.days, tsdb->keepCfg.precision); + tsdbFidKeyRange(fid, tsdb->keepCfg.days, tsdb->keepCfg.precision, &minKey, &maxKey); + + SFileSetCommitInfo info = { + .fid = fid, + .hasDataToCommit = true, + .fset = NULL, + }; + code = tsdbCommitInfoAdd(tsdb, &info); + TSDB_CHECK_CODE(code, lino, _exit); + + from.key.ts = maxKey + 1; + } + + // scan tomb data + for (SDelData *pDelData = pTbData->pHead; pDelData; pDelData = pDelData->pNext) { + for (int32_t i = taosArrayGetSize(tsdb->commitInfo->arr) - 1; i >= 0; i--) { + int64_t minKey, maxKey; + SFileSetCommitInfo *info = *(SFileSetCommitInfo **)taosArrayGet(tsdb->commitInfo->arr, i); + + tsdbFidKeyRange(info->fid, tsdb->keepCfg.days, tsdb->keepCfg.precision, &minKey, &maxKey); + + if (pDelData->sKey > maxKey || pDelData->eKey < minKey) { + continue; + } else if (!info->hasDataToCommit) { + info->hasDataToCommit = true; + } + } + } + } + +_exit: + if (code) { + tsdbCommitInfoDestroy(tsdb); + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(tsdb->pVnode), __func__, lino, tstrerror(code)); } return code; } @@ -458,11 +638,7 @@ static int32_t tsdbOpenCommitter(STsdb *tsdb, SCommitInfo *info, SCommitter2 *co int32_t code = 0; int32_t lino = 0; - memset(committer, 0, sizeof(committer[0])); - committer->tsdb = tsdb; - code = tsdbFSCreateCopySnapshot(tsdb->pFS, &committer->fsetArr); - TSDB_CHECK_CODE(code, lino, _exit); committer->minutes = tsdb->keepCfg.days; committer->precision = tsdb->keepCfg.precision; committer->minRow = info->info.config.tsdbCfg.minRows; @@ -471,21 +647,21 @@ static int32_t tsdbOpenCommitter(STsdb *tsdb, SCommitInfo *info, SCommitter2 *co committer->sttTrigger = info->info.config.sttTrigger; committer->szPage = info->info.config.tsdbPageSize; committer->compactVersion = INT64_MAX; - committer->ctx->cid = tsdbFSAllocEid(tsdb->pFS); - committer->ctx->now = taosGetTimestampSec(); + committer->cid = tsdbFSAllocEid(tsdb->pFS); + committer->now = taosGetTimestampSec(); - committer->ctx->nextKey = tsdb->imem->minKey; - if (tsdb->imem->nDel > 0) { - SRBTreeIter iter[1] = {tRBTreeIterCreate(tsdb->imem->tbDataTree, 1)}; + code = tsdbCommitInfoBuild(tsdb); + TSDB_CHECK_CODE(code, lino, _exit); - for (SRBTreeNode *node = tRBTreeIterNext(iter); node; node = tRBTreeIterNext(iter)) { - STbData *tbData = TCONTAINER_OF(node, STbData, rbtn); - - for (SDelData *delData = tbData->pHead; delData; delData = delData->pNext) { - committer->ctx->nextKey = TMIN(committer->ctx->nextKey, delData->sKey); - } + STFileSet *fset; + taosThreadMutexLock(&tsdb->mutex); + for (int i = 0; i < taosArrayGetSize(tsdb->commitInfo->arr); i++) { + SFileSetCommitInfo *info = *(SFileSetCommitInfo **)taosArrayGet(tsdb->commitInfo->arr, i); + if (info->hasDataToCommit && info->fset) { + tsdbBeginTaskOnFileSet(tsdb, info->fid, &fset); } } + taosThreadMutexUnlock(&tsdb->mutex); _exit: if (code) { @@ -516,14 +692,13 @@ static int32_t tsdbCloseCommitter(SCommitter2 *committer, int32_t eno) { TARRAY2_DESTROY(committer->sttReaderArray, NULL); TARRAY2_DESTROY(committer->fopArray, NULL); TARRAY2_DESTROY(committer->sttReaderArray, NULL); - tsdbFSDestroyCopySnapshot(&committer->fsetArr); _exit: if (code) { tsdbError("vgId:%d %s failed at line %d since %s, eid:%" PRId64, TD_VID(committer->tsdb->pVnode), __func__, lino, - tstrerror(code), committer->ctx->cid); + tstrerror(code), committer->cid); } else { - tsdbDebug("vgId:%d %s done, eid:%" PRId64, TD_VID(committer->tsdb->pVnode), __func__, committer->ctx->cid); + tsdbDebug("vgId:%d %s done, eid:%" PRId64, TD_VID(committer->tsdb->pVnode), __func__, committer->cid); } return code; } @@ -553,17 +728,20 @@ int32_t tsdbCommitBegin(STsdb *tsdb, SCommitInfo *info) { taosThreadMutexUnlock(&tsdb->mutex); tsdbUnrefMemTable(imem, NULL, true); } else { - SCommitter2 committer[1]; + SCommitter2 committer = {0}; - code = tsdbOpenCommitter(tsdb, info, committer); + code = tsdbOpenCommitter(tsdb, info, &committer); TSDB_CHECK_CODE(code, lino, _exit); - while (committer->ctx->nextKey != TSKEY_MAX) { - code = tsdbCommitFileSet(committer); - TSDB_CHECK_CODE(code, lino, _exit); + for (int32_t i = 0; i < taosArrayGetSize(tsdb->commitInfo->arr); i++) { + committer.ctx->info = *(SFileSetCommitInfo **)taosArrayGet(tsdb->commitInfo->arr, i); + if (committer.ctx->info->hasDataToCommit) { + code = tsdbCommitFileSet(&committer); + TSDB_CHECK_CODE(code, lino, _exit); + } } - code = tsdbCloseCommitter(committer, code); + code = tsdbCloseCommitter(&committer, code); TSDB_CHECK_CODE(code, lino, _exit); } @@ -580,22 +758,33 @@ int32_t tsdbCommitCommit(STsdb *tsdb) { int32_t code = 0; int32_t lino = 0; - if (tsdb->imem == NULL) goto _exit; + if (tsdb->imem) { + SMemTable *pMemTable = tsdb->imem; + + taosThreadMutexLock(&tsdb->mutex); + + if ((code = tsdbFSEditCommit(tsdb->pFS))) { + taosThreadMutexUnlock(&tsdb->mutex); + TSDB_CHECK_CODE(code, lino, _exit); + } + tsdb->imem = NULL; + + for (int32_t i = 0; i < taosArrayGetSize(tsdb->commitInfo->arr); i++) { + SFileSetCommitInfo *info = *(SFileSetCommitInfo **)taosArrayGet(tsdb->commitInfo->arr, i); + if (info->hasDataToCommit && info->fset) { + tsdbFinishTaskOnFileSet(tsdb, info->fid); + } + } - SMemTable *pMemTable = tsdb->imem; - taosThreadMutexLock(&tsdb->mutex); - code = tsdbFSEditCommit(tsdb->pFS); - if (code) { taosThreadMutexUnlock(&tsdb->mutex); - TSDB_CHECK_CODE(code, lino, _exit); + + tsdbCommitInfoDestroy(tsdb); + tsdbUnrefMemTable(pMemTable, NULL, true); } - tsdb->imem = NULL; - taosThreadMutexUnlock(&tsdb->mutex); - tsdbUnrefMemTable(pMemTable, NULL, true); _exit: if (code) { - TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code); + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(tsdb->pVnode), __func__, lino, tstrerror(code)); } else { tsdbInfo("vgId:%d %s done", TD_VID(tsdb->pVnode), __func__); } @@ -611,6 +800,16 @@ int32_t tsdbCommitAbort(STsdb *pTsdb) { code = tsdbFSEditAbort(pTsdb->pFS); TSDB_CHECK_CODE(code, lino, _exit); + taosThreadMutexLock(&pTsdb->mutex); + for (int32_t i = 0; i < taosArrayGetSize(pTsdb->commitInfo->arr); i++) { + SFileSetCommitInfo *info = *(SFileSetCommitInfo **)taosArrayGet(pTsdb->commitInfo->arr, i); + if (info->hasDataToCommit && info->fset) { + tsdbFinishTaskOnFileSet(pTsdb, info->fid); + } + } + taosThreadMutexUnlock(&pTsdb->mutex); + tsdbCommitInfoDestroy(pTsdb); + _exit: if (code) { tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); @@ -618,4 +817,4 @@ _exit: tsdbInfo("vgId:%d %s done", TD_VID(pTsdb->pVnode), __func__); } return code; -} +} \ No newline at end of file diff --git a/source/dnode/vnode/src/tsdb/tsdbFS2.c b/source/dnode/vnode/src/tsdb/tsdbFS2.c index 3ca26621a1..3087dc44d9 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFS2.c +++ b/source/dnode/vnode/src/tsdb/tsdbFS2.c @@ -22,9 +22,6 @@ extern void remove_file(const char *fname); -#define TSDB_FS_EDIT_MIN TSDB_FEDIT_COMMIT -#define TSDB_FS_EDIT_MAX (TSDB_FEDIT_MERGE + 1) - typedef struct STFileHashEntry { struct STFileHashEntry *next; char fname[TSDB_FILENAME_LEN]; @@ -290,10 +287,8 @@ static int32_t commit_edit(STFileSystem *fs) { current_fname(fs->tsdb, current, TSDB_FCURRENT); if (fs->etype == TSDB_FEDIT_COMMIT) { current_fname(fs->tsdb, current_t, TSDB_FCURRENT_C); - } else if (fs->etype == TSDB_FEDIT_MERGE) { - current_fname(fs->tsdb, current_t, TSDB_FCURRENT_M); } else { - ASSERT(0); + current_fname(fs->tsdb, current_t, TSDB_FCURRENT_M); } int32_t code; @@ -324,11 +319,8 @@ static int32_t abort_edit(STFileSystem *fs) { if (fs->etype == TSDB_FEDIT_COMMIT) { current_fname(fs->tsdb, fname, TSDB_FCURRENT_C); - } else if (fs->etype == TSDB_FEDIT_MERGE) { - current_fname(fs->tsdb, fname, TSDB_FCURRENT_M); } else { - tsdbError("vgId:%d %s failed since invalid etype:%d", TD_VID(fs->tsdb->pVnode), __func__, fs->etype); - ASSERT(0); + current_fname(fs->tsdb, fname, TSDB_FCURRENT_M); } int32_t code; @@ -767,9 +759,12 @@ extern int32_t tsdbStopAllCompTask(STsdb *tsdb); int32_t tsdbDisableAndCancelAllBgTask(STsdb *pTsdb) { STFileSystem *fs = pTsdb->pFS; - TARRAY2(int64_t) channelArr = {0}; + SArray *channelArray = taosArrayInit(0, sizeof(SVAChannelID)); + if (channelArray == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } - taosThreadMutexLock(&fs->tsdb->mutex); + taosThreadMutexLock(&pTsdb->mutex); // disable pTsdb->bgTaskDisabled = true; @@ -777,20 +772,23 @@ int32_t tsdbDisableAndCancelAllBgTask(STsdb *pTsdb) { // collect channel STFileSet *fset; TARRAY2_FOREACH(fs->fSetArr, fset) { - if (VNODE_ASYNC_VALID_CHANNEL_ID(fset->bgTaskChannel)) { - TARRAY2_APPEND(&channelArr, fset->bgTaskChannel); - fset->bgTaskChannel = 0; + if (fset->channelOpened) { + taosArrayPush(channelArray, &fset->channel); + fset->channel = (SVAChannelID){0}; + fset->mergeScheduled = false; + tsdbFSSetBlockCommit(fset, false); + fset->channelOpened = false; } - fset->mergeScheduled = false; - tsdbFSSetBlockCommit(fset, false); } - taosThreadMutexUnlock(&fs->tsdb->mutex); + taosThreadMutexUnlock(&pTsdb->mutex); // destroy all channels - int64_t channel; - TARRAY2_FOREACH(&channelArr, channel) { vnodeAChannelDestroy(vnodeAsyncHandle[1], channel, true); } - TARRAY2_DESTROY(&channelArr, NULL); + for (int32_t i = 0; i < taosArrayGetSize(channelArray); i++) { + SVAChannelID *channel = taosArrayGet(channelArray, i); + vnodeAChannelDestroy(channel, true); + } + taosArrayDestroy(channelArray); #ifdef TD_ENTERPRISE tsdbStopAllCompTask(pTsdb); @@ -832,15 +830,10 @@ int32_t tsdbFSEditBegin(STFileSystem *fs, const TFileOpArray *opArray, EFEditT e int32_t lino; char current_t[TSDB_FILENAME_LEN]; - switch (etype) { - case TSDB_FEDIT_COMMIT: - current_fname(fs->tsdb, current_t, TSDB_FCURRENT_C); - break; - case TSDB_FEDIT_MERGE: - current_fname(fs->tsdb, current_t, TSDB_FCURRENT_M); - break; - default: - ASSERT(0); + if (etype == TSDB_FEDIT_COMMIT) { + current_fname(fs->tsdb, current_t, TSDB_FCURRENT_C); + } else { + current_fname(fs->tsdb, current_t, TSDB_FCURRENT_M); } tsem_wait(&fs->canEdit); @@ -932,8 +925,7 @@ int32_t tsdbFSEditCommit(STFileSystem *fs) { arg->tsdb = fs->tsdb; arg->fid = fset->fid; - code = vnodeAsyncC(vnodeAsyncHandle[1], fset->bgTaskChannel, EVA_PRIORITY_HIGH, tsdbMerge, taosMemoryFree, arg, - NULL); + code = vnodeAsync(&fset->channel, EVA_PRIORITY_HIGH, tsdbMerge, taosMemoryFree, arg, NULL); TSDB_CHECK_CODE(code, lino, _exit); fset->mergeScheduled = true; } @@ -946,33 +938,6 @@ int32_t tsdbFSEditCommit(STFileSystem *fs) { } } - // clear empty level and fset - int32_t i = 0; - while (i < TARRAY2_SIZE(fs->fSetArr)) { - STFileSet *fset = TARRAY2_GET(fs->fSetArr, i); - - int32_t j = 0; - while (j < TARRAY2_SIZE(fset->lvlArr)) { - SSttLvl *lvl = TARRAY2_GET(fset->lvlArr, j); - - if (TARRAY2_SIZE(lvl->fobjArr) == 0) { - TARRAY2_REMOVE(fset->lvlArr, j, tsdbSttLvlClear); - } else { - j++; - } - } - - if (tsdbTFileSetIsEmpty(fset)) { - if (VNODE_ASYNC_VALID_CHANNEL_ID(fset->bgTaskChannel)) { - vnodeAChannelDestroy(vnodeAsyncHandle[1], fset->bgTaskChannel, false); - fset->bgTaskChannel = 0; - } - TARRAY2_REMOVE(fs->fSetArr, i, tsdbTFileSetClear); - } else { - i++; - } - } - _exit: if (code) { tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(fs->tsdb->pVnode), __func__, lino, tstrerror(code)); @@ -1211,3 +1176,47 @@ _out: } int32_t tsdbFSDestroyRefRangedSnapshot(TFileSetRangeArray **fsrArr) { return tsdbTFileSetRangeArrayDestroy(fsrArr); } + +int32_t tsdbBeginTaskOnFileSet(STsdb *tsdb, int32_t fid, STFileSet **fset) { + int16_t sttTrigger = tsdb->pVnode->config.sttTrigger; + + tsdbFSGetFSet(tsdb->pFS, fid, fset); + if (sttTrigger == 1 && fset) { + for (;;) { + if ((*fset)->taskRunning) { + (*fset)->numWaitTask++; + + taosThreadCondWait(&(*fset)->beginTask, &tsdb->mutex); + + tsdbFSGetFSet(tsdb->pFS, fid, fset); + ASSERT(fset != NULL); + + (*fset)->numWaitTask--; + ASSERT((*fset)->numWaitTask >= 0); + } else { + (*fset)->taskRunning = true; + break; + } + } + tsdbInfo("vgId:%d begin task on file set:%d", TD_VID(tsdb->pVnode), fid); + } + + return 0; +} + +int32_t tsdbFinishTaskOnFileSet(STsdb *tsdb, int32_t fid) { + int16_t sttTrigger = tsdb->pVnode->config.sttTrigger; + if (sttTrigger == 1) { + STFileSet *fset = NULL; + tsdbFSGetFSet(tsdb->pFS, fid, &fset); + if (fset != NULL && fset->taskRunning) { + fset->taskRunning = false; + if (fset->numWaitTask > 0) { + taosThreadCondSignal(&fset->beginTask); + } + tsdbInfo("vgId:%d finish task on file set:%d", TD_VID(tsdb->pVnode), fid); + } + } + + return 0; +} \ No newline at end of file diff --git a/source/dnode/vnode/src/tsdb/tsdbFS2.h b/source/dnode/vnode/src/tsdb/tsdbFS2.h index 7099ce8b26..6938efde3c 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFS2.h +++ b/source/dnode/vnode/src/tsdb/tsdbFS2.h @@ -24,7 +24,9 @@ extern "C" { typedef enum { TSDB_FEDIT_COMMIT = 1, // - TSDB_FEDIT_MERGE + TSDB_FEDIT_MERGE, + TSDB_FEDIT_COMPACT, + TSDB_FEDIT_RETENTION, } EFEditT; typedef enum { @@ -59,6 +61,8 @@ int32_t tsdbFSEditAbort(STFileSystem *fs); // other int32_t tsdbFSGetFSet(STFileSystem *fs, int32_t fid, STFileSet **fset); int32_t tsdbFSCheckCommit(STsdb *tsdb, int32_t fid); +int32_t tsdbBeginTaskOnFileSet(STsdb *tsdb, int32_t fid, STFileSet **fset); +int32_t tsdbFinishTaskOnFileSet(STsdb *tsdb, int32_t fid); // utils int32_t save_fs(const TFileSetArray *arr, const char *fname); int32_t current_fname(STsdb *pTsdb, char *fname, EFCurrentT ftype); @@ -72,10 +76,6 @@ struct STFileSystem { EFEditT etype; TFileSetArray fSetArr[1]; TFileSetArray fSetArrTmp[1]; - - // background task queue - bool stop; - int64_t taskid; }; #ifdef __cplusplus diff --git a/source/dnode/vnode/src/tsdb/tsdbFSet2.c b/source/dnode/vnode/src/tsdb/tsdbFSet2.c index 0820f53117..598ca0aec3 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFSet2.c +++ b/source/dnode/vnode/src/tsdb/tsdbFSet2.c @@ -456,13 +456,14 @@ int32_t tsdbTFileSetInit(int32_t fid, STFileSet **fset) { TARRAY2_INIT(fset[0]->lvlArr); // background task queue - fset[0]->bgTaskChannel = 0; - fset[0]->mergeScheduled = false; + taosThreadCondInit(&(*fset)->beginTask, NULL); + (*fset)->taskRunning = false; + (*fset)->numWaitTask = 0; // block commit variables taosThreadCondInit(&fset[0]->canCommit, NULL); - fset[0]->numWaitCommit = 0; - fset[0]->blockCommit = false; + (*fset)->numWaitCommit = 0; + (*fset)->blockCommit = false; return 0; } @@ -598,21 +599,19 @@ int32_t tsdbTFileSetRangeArrayDestroy(TFileSetRangeArray **ppArr) { return 0; } -int32_t tsdbTFileSetClear(STFileSet **fset) { - if (!fset[0]) return 0; +void tsdbTFileSetClear(STFileSet **fset) { + if (fset && *fset) { + for (tsdb_ftype_t ftype = TSDB_FTYPE_MIN; ftype < TSDB_FTYPE_MAX; ++ftype) { + if ((*fset)->farr[ftype] == NULL) continue; + tsdbTFileObjUnref((*fset)->farr[ftype]); + } - for (tsdb_ftype_t ftype = TSDB_FTYPE_MIN; ftype < TSDB_FTYPE_MAX; ++ftype) { - if (fset[0]->farr[ftype] == NULL) continue; - tsdbTFileObjUnref(fset[0]->farr[ftype]); + TARRAY2_DESTROY((*fset)->lvlArr, tsdbSttLvlClear); + + taosThreadCondDestroy(&(*fset)->beginTask); + taosThreadCondDestroy(&(*fset)->canCommit); + taosMemoryFreeClear(*fset); } - - TARRAY2_DESTROY(fset[0]->lvlArr, tsdbSttLvlClear); - - taosThreadCondDestroy(&fset[0]->canCommit); - taosMemoryFree(fset[0]); - fset[0] = NULL; - - return 0; } int32_t tsdbTFileSetRemove(STFileSet *fset) { @@ -665,6 +664,12 @@ bool tsdbTFileSetIsEmpty(const STFileSet *fset) { } int32_t tsdbTFileSetOpenChannel(STFileSet *fset) { - if (VNODE_ASYNC_VALID_CHANNEL_ID(fset->bgTaskChannel)) return 0; - return vnodeAChannelInit(vnodeAsyncHandle[1], &fset->bgTaskChannel); + int32_t code; + if (!fset->channelOpened) { + if ((code = vnodeAChannelInit(2, &fset->channel))) { + return code; + } + fset->channelOpened = true; + } + return 0; } diff --git a/source/dnode/vnode/src/tsdb/tsdbFSet2.h b/source/dnode/vnode/src/tsdb/tsdbFSet2.h index 86db3b01c7..4b99a46dc7 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFSet2.h +++ b/source/dnode/vnode/src/tsdb/tsdbFSet2.h @@ -42,7 +42,7 @@ typedef enum { int32_t tsdbTFileSetInit(int32_t fid, STFileSet **fset); int32_t tsdbTFileSetInitCopy(STsdb *pTsdb, const STFileSet *fset1, STFileSet **fset); int32_t tsdbTFileSetInitRef(STsdb *pTsdb, const STFileSet *fset1, STFileSet **fset); -int32_t tsdbTFileSetClear(STFileSet **fset); +void tsdbTFileSetClear(STFileSet **fset); int32_t tsdbTFileSetRemove(STFileSet *fset); int32_t tsdbTFileSetFilteredInitDup(STsdb *pTsdb, const STFileSet *fset1, int64_t ever, STFileSet **fset, @@ -90,9 +90,15 @@ struct STFileSet { STFileObj *farr[TSDB_FTYPE_MAX]; // file array TSttLvlArray lvlArr[1]; // level array - // background task channel - int64_t bgTaskChannel; - bool mergeScheduled; + // background task + bool channelOpened; + SVAChannelID channel; + bool mergeScheduled; + + // sttTrigger = 1 + TdThreadCond beginTask; + bool taskRunning; + int32_t numWaitTask; // block commit variables TdThreadCond canCommit; diff --git a/source/dnode/vnode/src/tsdb/tsdbMerge.c b/source/dnode/vnode/src/tsdb/tsdbMerge.c index 971020e7d6..022698b0eb 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMerge.c +++ b/source/dnode/vnode/src/tsdb/tsdbMerge.c @@ -593,5 +593,6 @@ _exit: exit(EXIT_FAILURE); } tsdbTFileSetClear(&merger->fset); + taosMemoryFree(arg); return code; } diff --git a/source/dnode/vnode/src/tsdb/tsdbRetention.c b/source/dnode/vnode/src/tsdb/tsdbRetention.c index f4344296b4..3d53d1ada3 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRetention.c +++ b/source/dnode/vnode/src/tsdb/tsdbRetention.c @@ -24,8 +24,8 @@ typedef struct { int64_t now; int64_t cid; - TFileSetArray *fsetArr; - TFileOpArray fopArr[1]; + STFileSet *fset; + TFileOpArray fopArr; } SRTNer; static int32_t tsdbDoRemoveFileObject(SRTNer *rtner, const STFileObj *fobj) { @@ -35,7 +35,7 @@ static int32_t tsdbDoRemoveFileObject(SRTNer *rtner, const STFileObj *fobj) { .of = fobj->f[0], }; - return TARRAY2_APPEND(rtner->fopArr, op); + return TARRAY2_APPEND(&rtner->fopArr, op); } static int32_t tsdbDoCopyFileLC(SRTNer *rtner, const STFileObj *from, const STFile *to) { @@ -71,7 +71,7 @@ static int32_t tsdbDoCopyFileLC(SRTNer *rtner, const STFileObj *from, const STFi _exit: if (code) { - TSDB_ERROR_LOG(TD_VID(rtner->tsdb->pVnode), lino, code); + tsdbError("vgId:%d, %s failed, code:%d, line:%d", TD_VID(rtner->tsdb->pVnode), __func__, code, lino); if (fdFrom) taosCloseFile(&fdFrom); if (fdTo) taosCloseFile(&fdTo); } @@ -108,7 +108,7 @@ static int32_t tsdbDoCopyFile(SRTNer *rtner, const STFileObj *from, const STFile _exit: if (code) { - TSDB_ERROR_LOG(TD_VID(rtner->tsdb->pVnode), lino, code); + tsdbError("vgId:%d, %s failed, code:%d, line:%d", TD_VID(rtner->tsdb->pVnode), __func__, code, lino); taosCloseFile(&fdFrom); taosCloseFile(&fdTo); } @@ -128,7 +128,7 @@ static int32_t tsdbDoMigrateFileObj(SRTNer *rtner, const STFileObj *fobj, const .of = fobj->f[0], }; - code = TARRAY2_APPEND(rtner->fopArr, op); + code = TARRAY2_APPEND(&rtner->fopArr, op); TSDB_CHECK_CODE(code, lino, _exit); // create new @@ -152,7 +152,7 @@ static int32_t tsdbDoMigrateFileObj(SRTNer *rtner, const STFileObj *fobj, const }, }; - code = TARRAY2_APPEND(rtner->fopArr, op); + code = TARRAY2_APPEND(&rtner->fopArr, op); TSDB_CHECK_CODE(code, lino, _exit); // do copy the file @@ -167,7 +167,7 @@ static int32_t tsdbDoMigrateFileObj(SRTNer *rtner, const STFileObj *fobj, const _exit: if (code) { - TSDB_ERROR_LOG(TD_VID(rtner->tsdb->pVnode), lino, code); + tsdbError("vgId:%d, %s failed, code:%d, line:%d", TD_VID(rtner->tsdb->pVnode), __func__, code, lino); } return code; } @@ -176,80 +176,51 @@ typedef struct { STsdb *tsdb; int64_t now; int32_t fid; + bool s3Migrate; } SRtnArg; -static int32_t tsdbDoRetentionBegin(SRtnArg *arg, SRTNer *rtner) { - int32_t code = 0; - int32_t lino = 0; - - STsdb *tsdb = arg->tsdb; - - rtner->tsdb = tsdb; - rtner->szPage = tsdb->pVnode->config.tsdbPageSize; - rtner->now = arg->now; - rtner->cid = tsdbFSAllocEid(tsdb->pFS); - - code = tsdbFSCreateCopySnapshot(tsdb->pFS, &rtner->fsetArr); - TSDB_CHECK_CODE(code, lino, _exit); - -_exit: - if (code) { - TSDB_ERROR_LOG(TD_VID(rtner->tsdb->pVnode), lino, code); - } else { - tsdbDebug("vid:%d, cid:%" PRId64 ", %s done", TD_VID(rtner->tsdb->pVnode), rtner->cid, __func__); - } - return code; -} - static int32_t tsdbDoRetentionEnd(SRTNer *rtner) { int32_t code = 0; int32_t lino = 0; - if (TARRAY2_SIZE(rtner->fopArr) == 0) goto _exit; - - code = tsdbFSEditBegin(rtner->tsdb->pFS, rtner->fopArr, TSDB_FEDIT_MERGE); - TSDB_CHECK_CODE(code, lino, _exit); - - taosThreadMutexLock(&rtner->tsdb->mutex); - - code = tsdbFSEditCommit(rtner->tsdb->pFS); - if (code) { - taosThreadMutexUnlock(&rtner->tsdb->mutex); + if (TARRAY2_SIZE(&rtner->fopArr) > 0) { + code = tsdbFSEditBegin(rtner->tsdb->pFS, &rtner->fopArr, TSDB_FEDIT_RETENTION); TSDB_CHECK_CODE(code, lino, _exit); + + taosThreadMutexLock(&rtner->tsdb->mutex); + + code = tsdbFSEditCommit(rtner->tsdb->pFS); + if (code) { + taosThreadMutexUnlock(&rtner->tsdb->mutex); + TSDB_CHECK_CODE(code, lino, _exit); + } + + taosThreadMutexUnlock(&rtner->tsdb->mutex); + + TARRAY2_DESTROY(&rtner->fopArr, NULL); } - taosThreadMutexUnlock(&rtner->tsdb->mutex); - - TARRAY2_DESTROY(rtner->fopArr, NULL); - _exit: if (code) { - TSDB_ERROR_LOG(TD_VID(rtner->tsdb->pVnode), lino, code); + tsdbError("vgId:%d, %s failed, code:%d, line:%d", TD_VID(rtner->tsdb->pVnode), __func__, code, lino); } else { tsdbDebug("vid:%d, cid:%" PRId64 ", %s done", TD_VID(rtner->tsdb->pVnode), rtner->cid, __func__); } - tsdbFSDestroyCopySnapshot(&rtner->fsetArr); return code; } -static int32_t tsdbDoRetentionOnFileSet(SRTNer *rtner, STFileSet *fset) { +static int32_t tsdbDoRetention(SRTNer *rtner) { int32_t code = 0; int32_t lino = 0; STFileObj *fobj = NULL; + STFileSet *fset = rtner->fset; int32_t expLevel = tsdbFidLevel(fset->fid, &rtner->tsdb->keepCfg, rtner->now); if (expLevel < 0) { // remove the fileset for (int32_t ftype = 0; (ftype < TSDB_FTYPE_MAX) && (fobj = fset->farr[ftype], 1); ++ftype) { if (fobj == NULL) continue; - /* - int32_t nlevel = tfsGetLevel(rtner->tsdb->pVnode->pTfs); - if (tsS3Enabled && nlevel > 1 && TSDB_FTYPE_DATA == ftype && fobj->f->did.level == nlevel - 1) { - code = tsdbRemoveFileObjectS3(rtner, fobj); - TSDB_CHECK_CODE(code, lino, _exit); - } else {*/ code = tsdbDoRemoveFileObject(rtner, fobj); TSDB_CHECK_CODE(code, lino, _exit); - //} } SSttLvl *lvl; @@ -275,10 +246,6 @@ static int32_t tsdbDoRetentionOnFileSet(SRTNer *rtner, STFileSet *fset) { if (fobj == NULL) continue; if (fobj->f->did.level == did.level) { - /* - code = tsdbCheckMigrateS3(rtner, fobj, ftype, &did); - TSDB_CHECK_CODE(code, lino, _exit); - */ continue; } @@ -306,91 +273,110 @@ static int32_t tsdbDoRetentionOnFileSet(SRTNer *rtner, STFileSet *fset) { _exit: if (code) { - TSDB_ERROR_LOG(TD_VID(rtner->tsdb->pVnode), lino, code); + tsdbError("vgId:%d, %s failed, code:%d, line:%d", TD_VID(rtner->tsdb->pVnode), __func__, code, lino); } return code; } -static void tsdbFreeRtnArg(void *arg) { taosMemoryFree(arg); } +static void tsdbRetentionCancel(void *arg) { taosMemoryFree(arg); } -static int32_t tsdbDoRetentionAsync(void *arg) { +static int32_t tsdbDoS3Migrate(SRTNer *rtner); + +static int32_t tsdbRetention(void *arg) { int32_t code = 0; int32_t lino = 0; - SRTNer rtner[1] = {0}; - code = tsdbDoRetentionBegin(arg, rtner); - TSDB_CHECK_CODE(code, lino, _exit); + SRtnArg *rtnArg = (SRtnArg *)arg; + STsdb *pTsdb = rtnArg->tsdb; + SVnode *pVnode = pTsdb->pVnode; + STFileSet *fset = NULL; + SRTNer rtner = { + .tsdb = pTsdb, + .szPage = pVnode->config.tsdbPageSize, + .now = rtnArg->now, + .cid = tsdbFSAllocEid(pTsdb->pFS), + }; - STFileSet *fset; - TARRAY2_FOREACH(rtner->fsetArr, fset) { - if (fset->fid != ((SRtnArg *)arg)->fid) continue; + // begin task + taosThreadMutexLock(&pTsdb->mutex); + tsdbBeginTaskOnFileSet(pTsdb, rtnArg->fid, &fset); + if (fset && (code = tsdbTFileSetInitCopy(pTsdb, fset, &rtner.fset))) { + taosThreadMutexUnlock(&pTsdb->mutex); + TSDB_CHECK_CODE(code, lino, _exit); + } + taosThreadMutexUnlock(&pTsdb->mutex); - code = tsdbDoRetentionOnFileSet(rtner, fset); + // do retention + if (rtner.fset) { + if (rtnArg->s3Migrate) { + code = tsdbDoS3Migrate(&rtner); + TSDB_CHECK_CODE(code, lino, _exit); + } else { + code = tsdbDoRetention(&rtner); + TSDB_CHECK_CODE(code, lino, _exit); + } + + code = tsdbDoRetentionEnd(&rtner); TSDB_CHECK_CODE(code, lino, _exit); } - code = tsdbDoRetentionEnd(rtner); - TSDB_CHECK_CODE(code, lino, _exit); - _exit: - if (code) { - if (TARRAY2_DATA(rtner->fopArr)) { - TARRAY2_DESTROY(rtner->fopArr, NULL); - } - TFileSetArray **fsetArr = &rtner->fsetArr; - if (fsetArr[0]) { - tsdbFSDestroyCopySnapshot(&rtner->fsetArr); - } + if (rtner.fset) { + taosThreadMutexLock(&pTsdb->mutex); + tsdbFinishTaskOnFileSet(pTsdb, rtnArg->fid); + taosThreadMutexUnlock(&pTsdb->mutex); + } - TSDB_ERROR_LOG(TD_VID(rtner->tsdb->pVnode), lino, code); + // clear resources + tsdbTFileSetClear(&rtner.fset); + TARRAY2_DESTROY(&rtner.fopArr, NULL); + taosMemoryFree(arg); + if (code) { + tsdbError("vgId:%d, %s failed, code:%d, line:%d", TD_VID(((SRtnArg *)arg)->tsdb->pVnode), __func__, code, lino); } return code; } -int32_t tsdbRetention(STsdb *tsdb, int64_t now, int32_t sync) { +static int32_t tsdbAsyncRetentionImpl(STsdb *tsdb, int64_t now, bool s3Migrate) { int32_t code = 0; - - taosThreadMutexLock(&tsdb->mutex); - - if (tsdb->bgTaskDisabled) { - taosThreadMutexUnlock(&tsdb->mutex); - return 0; - } + int32_t lino = 0; STFileSet *fset; - TARRAY2_FOREACH(tsdb->pFS->fSetArr, fset) { - code = tsdbTFileSetOpenChannel(fset); - if (code) { - taosThreadMutexUnlock(&tsdb->mutex); - return code; - } - SRtnArg *arg = taosMemoryMalloc(sizeof(*arg)); - if (arg == NULL) { - taosThreadMutexUnlock(&tsdb->mutex); - return TSDB_CODE_OUT_OF_MEMORY; - } + if (!tsdb->bgTaskDisabled) { + TARRAY2_FOREACH(tsdb->pFS->fSetArr, fset) { + code = tsdbTFileSetOpenChannel(fset); + TSDB_CHECK_CODE(code, lino, _exit); - arg->tsdb = tsdb; - arg->now = now; - arg->fid = fset->fid; + SRtnArg *arg = taosMemoryMalloc(sizeof(*arg)); + if (arg == NULL) { + TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit); + } - if (sync) { - code = vnodeAsyncC(vnodeAsyncHandle[0], tsdb->pVnode->commitChannel, EVA_PRIORITY_LOW, tsdbDoRetentionAsync, - tsdbFreeRtnArg, arg, NULL); - } else { - code = vnodeAsyncC(vnodeAsyncHandle[1], fset->bgTaskChannel, EVA_PRIORITY_LOW, tsdbDoRetentionAsync, - tsdbFreeRtnArg, arg, NULL); - } - if (code) { - tsdbFreeRtnArg(arg); - taosThreadMutexUnlock(&tsdb->mutex); - return code; + arg->tsdb = tsdb; + arg->now = now; + arg->fid = fset->fid; + arg->s3Migrate = s3Migrate; + + if ((code = vnodeAsync(&fset->channel, EVA_PRIORITY_LOW, tsdbRetention, tsdbRetentionCancel, arg, NULL))) { + taosMemoryFree(arg); + TSDB_CHECK_CODE(code, lino, _exit); + } } } - taosThreadMutexUnlock(&tsdb->mutex); +_exit: + if (code) { + tsdbError("vgId:%d, %s failed, code:%d, line:%d", TD_VID(tsdb->pVnode), __func__, code, lino); + } + return code; +} +int32_t tsdbAsyncRetention(STsdb *tsdb, int64_t now) { + int32_t code = 0; + taosThreadMutexLock(&tsdb->mutex); + code = tsdbAsyncRetentionImpl(tsdb, now, false); + taosThreadMutexUnlock(&tsdb->mutex); return code; } @@ -462,7 +448,7 @@ static int32_t tsdbMigrateDataFileLCS3(SRTNer *rtner, const STFileObj *fobj, int .of = fobj->f[0], }; - code = TARRAY2_APPEND(rtner->fopArr, op); + code = TARRAY2_APPEND(&rtner->fopArr, op); TSDB_CHECK_CODE(code, lino, _exit); // create new @@ -486,7 +472,7 @@ static int32_t tsdbMigrateDataFileLCS3(SRTNer *rtner, const STFileObj *fobj, int }, }; - code = TARRAY2_APPEND(rtner->fopArr, op); + code = TARRAY2_APPEND(&rtner->fopArr, op); TSDB_CHECK_CODE(code, lino, _exit); char fname[TSDB_FILENAME_LEN]; @@ -566,7 +552,7 @@ static int32_t tsdbMigrateDataFileS3(SRTNer *rtner, const STFileObj *fobj, int64 .of = fobj->f[0], }; - code = TARRAY2_APPEND(rtner->fopArr, op); + code = TARRAY2_APPEND(&rtner->fopArr, op); TSDB_CHECK_CODE(code, lino, _exit); // create new @@ -590,7 +576,7 @@ static int32_t tsdbMigrateDataFileS3(SRTNer *rtner, const STFileObj *fobj, int64 }, }; - code = TARRAY2_APPEND(rtner->fopArr, op); + code = TARRAY2_APPEND(&rtner->fopArr, op); TSDB_CHECK_CODE(code, lino, _exit); char fname[TSDB_FILENAME_LEN]; @@ -653,10 +639,11 @@ _exit: return code; } -static int32_t tsdbDoS3MigrateOnFileSet(SRTNer *rtner, STFileSet *fset) { +static int32_t tsdbDoS3Migrate(SRTNer *rtner) { int32_t code = 0; int32_t lino = 0; + STFileSet *fset = rtner->fset; STFileObj *fobj = fset->farr[TSDB_FTYPE_DATA]; if (!fobj) return code; @@ -720,41 +707,7 @@ _exit: return code; } -static int32_t tsdbDoS3MigrateAsync(void *arg) { - int32_t code = 0; - int32_t lino = 0; - SRTNer rtner[1] = {0}; - - code = tsdbDoRetentionBegin(arg, rtner); - TSDB_CHECK_CODE(code, lino, _exit); - - STFileSet *fset; - TARRAY2_FOREACH(rtner->fsetArr, fset) { - if (fset->fid != ((SRtnArg *)arg)->fid) continue; - - code = tsdbDoS3MigrateOnFileSet(rtner, fset); - TSDB_CHECK_CODE(code, lino, _exit); - } - - code = tsdbDoRetentionEnd(rtner); - TSDB_CHECK_CODE(code, lino, _exit); - -_exit: - if (code) { - if (TARRAY2_DATA(rtner->fopArr)) { - TARRAY2_DESTROY(rtner->fopArr, NULL); - } - TFileSetArray **fsetArr = &rtner->fsetArr; - if (fsetArr[0]) { - tsdbFSDestroyCopySnapshot(&rtner->fsetArr); - } - - TSDB_ERROR_LOG(TD_VID(rtner->tsdb->pVnode), lino, code); - } - return code; -} - -int32_t tsdbS3Migrate(STsdb *tsdb, int64_t now, int32_t sync) { +int32_t tsdbAsyncS3Migrate(STsdb *tsdb, int64_t now) { int32_t code = 0; extern int8_t tsS3EnabledCfg; @@ -772,45 +725,7 @@ int32_t tsdbS3Migrate(STsdb *tsdb, int64_t now, int32_t sync) { } taosThreadMutexLock(&tsdb->mutex); - - if (tsdb->bgTaskDisabled) { - taosThreadMutexUnlock(&tsdb->mutex); - return 0; - } - - STFileSet *fset; - TARRAY2_FOREACH(tsdb->pFS->fSetArr, fset) { - code = tsdbTFileSetOpenChannel(fset); - if (code) { - taosThreadMutexUnlock(&tsdb->mutex); - return code; - } - - SRtnArg *arg = taosMemoryMalloc(sizeof(*arg)); - if (arg == NULL) { - taosThreadMutexUnlock(&tsdb->mutex); - return TSDB_CODE_OUT_OF_MEMORY; - } - - arg->tsdb = tsdb; - arg->now = now; - arg->fid = fset->fid; - - if (sync) { - code = vnodeAsyncC(vnodeAsyncHandle[0], tsdb->pVnode->commitChannel, EVA_PRIORITY_LOW, tsdbDoS3MigrateAsync, - tsdbFreeRtnArg, arg, NULL); - } else { - code = vnodeAsyncC(vnodeAsyncHandle[1], fset->bgTaskChannel, EVA_PRIORITY_LOW, tsdbDoS3MigrateAsync, - tsdbFreeRtnArg, arg, NULL); - } - if (code) { - tsdbFreeRtnArg(arg); - taosThreadMutexUnlock(&tsdb->mutex); - return code; - } - } - + code = tsdbAsyncRetentionImpl(tsdb, now, true); taosThreadMutexUnlock(&tsdb->mutex); - return code; } diff --git a/source/dnode/vnode/src/vnd/vnodeAsync.c b/source/dnode/vnode/src/vnd/vnodeAsync.c index 313603b19f..6d6533463b 100644 --- a/source/dnode/vnode/src/vnd/vnodeAsync.c +++ b/source/dnode/vnode/src/vnd/vnodeAsync.c @@ -16,6 +16,7 @@ #include "vnd.h" #include "vnodeHash.h" +typedef struct SVAsync SVAsync; typedef struct SVATask SVATask; typedef struct SVAChannel SVAChannel; @@ -54,7 +55,7 @@ struct SVATask { int32_t priorScore; SVAChannel *channel; int32_t (*execute)(void *); - void (*complete)(void *); + void (*cancel)(void *); void *arg; EVATaskState state; @@ -67,6 +68,11 @@ struct SVATask { struct SVATask *next; }; +typedef struct { + void (*cancel)(void *); + void *arg; +} SVATaskCancelInfo; + #define VATASK_PIORITY(task_) ((task_)->priority - ((task_)->priorScore / 4)) // async channel @@ -112,6 +118,10 @@ struct SVAsync { SVHashTable *taskTable; }; +SVAsync *vnodeAsyncs[3]; +#define MIN_ASYNC_ID 1 +#define MAX_ASYNC_ID (sizeof(vnodeAsyncs) / sizeof(vnodeAsyncs[0]) - 1) + static int32_t vnodeAsyncTaskDone(SVAsync *async, SVATask *task) { int32_t ret; @@ -160,11 +170,6 @@ static int32_t vnodeAsyncTaskDone(SVAsync *async, SVATask *task) { } async->numTasks--; - // call complete callback - if (task->complete) { - task->complete(task->arg); - } - if (task->numWait == 0) { taosThreadCondDestroy(&task->waitCond); taosMemoryFree(task); @@ -176,7 +181,7 @@ static int32_t vnodeAsyncTaskDone(SVAsync *async, SVATask *task) { return 0; } -static int32_t vnodeAsyncCancelAllTasks(SVAsync *async) { +static int32_t vnodeAsyncCancelAllTasks(SVAsync *async, SArray *cancelArray) { while (async->queue[0].next != &async->queue[0] || async->queue[1].next != &async->queue[1] || async->queue[2].next != &async->queue[2]) { for (int32_t i = 0; i < EVA_PRIORITY_MAX; i++) { @@ -184,6 +189,12 @@ static int32_t vnodeAsyncCancelAllTasks(SVAsync *async) { SVATask *task = async->queue[i].next; task->prev->next = task->next; task->next->prev = task->prev; + if (task->cancel) { + taosArrayPush(cancelArray, &(SVATaskCancelInfo){ + .cancel = task->cancel, + .arg = task->arg, + }); + } vnodeAsyncTaskDone(async, task); } } @@ -194,6 +205,7 @@ static int32_t vnodeAsyncCancelAllTasks(SVAsync *async) { static void *vnodeAsyncLoop(void *arg) { SVWorker *worker = (SVWorker *)arg; SVAsync *async = worker->async; + SArray *cancelArray = taosArrayInit(0, sizeof(SVATaskCancelInfo)); setThreadName(async->label); @@ -209,12 +221,12 @@ static void *vnodeAsyncLoop(void *arg) { for (;;) { if (async->stop || worker->workerId >= async->numWorkers) { if (async->stop) { // cancel all tasks - vnodeAsyncCancelAllTasks(async); + vnodeAsyncCancelAllTasks(async, cancelArray); } worker->state = EVA_WORKER_STATE_STOP; async->numLaunchWorkers--; taosThreadMutexUnlock(&async->mutex); - return NULL; + goto _exit; } for (int32_t i = 0; i < EVA_PRIORITY_MAX; i++) { @@ -259,6 +271,12 @@ static void *vnodeAsyncLoop(void *arg) { worker->runningTask->execute(worker->runningTask->arg); } +_exit: + for (int32_t i = 0; i < taosArrayGetSize(cancelArray); i++) { + SVATaskCancelInfo *cancel = (SVATaskCancelInfo *)taosArrayGet(cancelArray, i); + cancel->cancel(cancel->arg); + } + taosArrayDestroy(cancelArray); return NULL; } @@ -294,7 +312,7 @@ static int32_t vnodeAsyncChannelCompare(const void *obj1, const void *obj2) { return 0; } -int32_t vnodeAsyncInit(SVAsync **async, char *label) { +static int32_t vnodeAsyncInit(SVAsync **async, const char *label) { int32_t ret; if (async == NULL) { @@ -360,7 +378,7 @@ int32_t vnodeAsyncInit(SVAsync **async, char *label) { return 0; } -int32_t vnodeAsyncDestroy(SVAsync **async) { +static int32_t vnodeAsyncDestroy(SVAsync **async) { if ((*async) == NULL) { return TSDB_CODE_INVALID_PARA; } @@ -433,20 +451,39 @@ static int32_t vnodeAsyncLaunchWorker(SVAsync *async) { return 0; } -#ifdef BUILD_NO_CALL -int32_t vnodeAsync(SVAsync *async, EVAPriority priority, int32_t (*execute)(void *), void (*complete)(void *), - void *arg, int64_t *taskId) { - return vnodeAsyncC(async, 0, priority, execute, complete, arg, taskId); -} -#endif +int32_t vnodeAsyncOpen(int32_t numOfThreads) { + int32_t code = 0; + int32_t lino = 0; -int32_t vnodeAsyncC(SVAsync *async, int64_t channelId, EVAPriority priority, int32_t (*execute)(void *), - void (*complete)(void *), void *arg, int64_t *taskId) { - if (async == NULL || execute == NULL || channelId < 0) { + // vnode-commit + code = vnodeAsyncInit(&vnodeAsyncs[1], "vnode-commit"); + TSDB_CHECK_CODE(code, lino, _exit); + vnodeAsyncSetWorkers(1, numOfThreads); + + // vnode-merge + code = vnodeAsyncInit(&vnodeAsyncs[2], "vnode-merge"); + TSDB_CHECK_CODE(code, lino, _exit); + vnodeAsyncSetWorkers(2, numOfThreads); + +_exit: + return 0; +} + +int32_t vnodeAsyncClose() { + vnodeAsyncDestroy(&vnodeAsyncs[1]); + vnodeAsyncDestroy(&vnodeAsyncs[2]); + return 0; +} + +int32_t vnodeAsync(SVAChannelID *channelID, EVAPriority priority, int32_t (*execute)(void *), void (*cancel)(void *), + void *arg, SVATaskID *taskID) { + if (channelID == NULL || channelID->async < MIN_ASYNC_ID || channelID->async > MAX_ASYNC_ID || execute == NULL || + channelID->id < 0) { return TSDB_CODE_INVALID_PARA; } - int64_t id; + int64_t id; + SVAsync *async = vnodeAsyncs[channelID->async]; // create task object SVATask *task = (SVATask *)taosMemoryCalloc(1, sizeof(SVATask)); @@ -457,7 +494,7 @@ int32_t vnodeAsyncC(SVAsync *async, int64_t channelId, EVAPriority priority, int task->priority = priority; task->priorScore = 0; task->execute = execute; - task->complete = complete; + task->cancel = cancel; task->arg = arg; task->state = EVA_TASK_STATE_WAITTING; task->numWait = 0; @@ -466,10 +503,12 @@ int32_t vnodeAsyncC(SVAsync *async, int64_t channelId, EVAPriority priority, int // schedule task taosThreadMutexLock(&async->mutex); - if (channelId == 0) { + if (channelID->id == 0) { task->channel = NULL; } else { - SVAChannel channel = {.channelId = channelId}; + SVAChannel channel = { + .channelId = channelID->id, + }; vHashGet(async->channelTable, &channel, (void **)&task->channel); if (task->channel == NULL) { taosThreadMutexUnlock(&async->mutex); @@ -540,20 +579,24 @@ int32_t vnodeAsyncC(SVAsync *async, int64_t channelId, EVAPriority priority, int taosThreadMutexUnlock(&async->mutex); - if (taskId != NULL) { - *taskId = id; + if (taskID != NULL) { + taskID->async = channelID->async; + taskID->id = id; } return 0; } -int32_t vnodeAWait(SVAsync *async, int64_t taskId) { - if (async == NULL || taskId <= 0) { +int32_t vnodeAWait(SVATaskID *taskID) { + if (taskID == NULL || taskID->async < MIN_ASYNC_ID || taskID->async > MAX_ASYNC_ID || taskID->id <= 0) { return TSDB_CODE_INVALID_PARA; } + SVAsync *async = vnodeAsyncs[taskID->async]; SVATask *task = NULL; - SVATask task2 = {.taskId = taskId}; + SVATask task2 = { + .taskId = taskID->id, + }; taosThreadMutexLock(&async->mutex); @@ -574,21 +617,27 @@ int32_t vnodeAWait(SVAsync *async, int64_t taskId) { return 0; } -int32_t vnodeACancel(SVAsync *async, int64_t taskId) { - if (async == NULL) { +int32_t vnodeACancel(SVATaskID *taskID) { + if (taskID == NULL || taskID->async < MIN_ASYNC_ID || taskID->async > MAX_ASYNC_ID || taskID->id <= 0) { return TSDB_CODE_INVALID_PARA; } int32_t ret = 0; + SVAsync *async = vnodeAsyncs[taskID->async]; SVATask *task = NULL; - SVATask task2 = {.taskId = taskId}; + SVATask task2 = { + .taskId = taskID->id, + }; + void (*cancel)(void *) = NULL; + void *arg = NULL; taosThreadMutexLock(&async->mutex); vHashGet(async->taskTable, &task2, (void **)&task); if (task) { if (task->state == EVA_TASK_STATE_WAITTING) { - // remove from queue + cancel = task->cancel; + arg = task->arg; task->next->prev = task->prev; task->prev->next = task->next; vnodeAsyncTaskDone(async, task); @@ -599,14 +648,18 @@ int32_t vnodeACancel(SVAsync *async, int64_t taskId) { taosThreadMutexUnlock(&async->mutex); + if (cancel) { + cancel(arg); + } + return ret; } -int32_t vnodeAsyncSetWorkers(SVAsync *async, int32_t numWorkers) { - if (async == NULL || numWorkers <= 0 || numWorkers > VNODE_ASYNC_MAX_WORKERS) { +int32_t vnodeAsyncSetWorkers(int64_t asyncID, int32_t numWorkers) { + if (asyncID < MIN_ASYNC_ID || asyncID > MAX_ASYNC_ID || numWorkers <= 0 || numWorkers > VNODE_ASYNC_MAX_WORKERS) { return TSDB_CODE_INVALID_PARA; } - + SVAsync *async = vnodeAsyncs[asyncID]; taosThreadMutexLock(&async->mutex); async->numWorkers = numWorkers; if (async->numIdleWorkers > 0) { @@ -617,11 +670,13 @@ int32_t vnodeAsyncSetWorkers(SVAsync *async, int32_t numWorkers) { return 0; } -int32_t vnodeAChannelInit(SVAsync *async, int64_t *channelId) { - if (async == NULL || channelId == NULL) { +int32_t vnodeAChannelInit(int64_t asyncID, SVAChannelID *channelID) { + if (channelID == NULL || asyncID < MIN_ASYNC_ID || asyncID > MAX_ASYNC_ID) { return TSDB_CODE_INVALID_PARA; } + SVAsync *async = vnodeAsyncs[asyncID]; + // create channel object SVAChannel *channel = (SVAChannel *)taosMemoryMalloc(sizeof(SVAChannel)); if (channel == NULL) { @@ -637,7 +692,7 @@ int32_t vnodeAChannelInit(SVAsync *async, int64_t *channelId) { // register channel taosThreadMutexLock(&async->mutex); - channel->channelId = *channelId = ++async->nextChannelId; + channel->channelId = channelID->id = ++async->nextChannelId; // add to hash table int32_t ret = vHashPut(async->channelTable, channel); @@ -657,16 +712,24 @@ int32_t vnodeAChannelInit(SVAsync *async, int64_t *channelId) { taosThreadMutexUnlock(&async->mutex); + channelID->async = asyncID; return 0; } -int32_t vnodeAChannelDestroy(SVAsync *async, int64_t channelId, bool waitRunning) { - if (async == NULL || channelId <= 0) { +int32_t vnodeAChannelDestroy(SVAChannelID *channelID, bool waitRunning) { + if (channelID == NULL || channelID->async < MIN_ASYNC_ID || channelID->async > MAX_ASYNC_ID || channelID->id <= 0) { return TSDB_CODE_INVALID_PARA; } + SVAsync *async = vnodeAsyncs[channelID->async]; SVAChannel *channel = NULL; - SVAChannel channel2 = {.channelId = channelId}; + SVAChannel channel2 = { + .channelId = channelID->id, + }; + SArray *cancelArray = taosArrayInit(0, sizeof(SVATaskCancelInfo)); + if (cancelArray == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } taosThreadMutexLock(&async->mutex); @@ -684,6 +747,12 @@ int32_t vnodeAChannelDestroy(SVAsync *async, int64_t channelId, bool waitRunning SVATask *task = channel->queue[i].next; task->prev->next = task->next; task->next->prev = task->prev; + if (task->cancel) { + taosArrayPush(cancelArray, &(SVATaskCancelInfo){ + .cancel = task->cancel, + .arg = task->arg, + }); + } vnodeAsyncTaskDone(async, task); } } @@ -693,6 +762,12 @@ int32_t vnodeAChannelDestroy(SVAsync *async, int64_t channelId, bool waitRunning if (channel->scheduled) { channel->scheduled->prev->next = channel->scheduled->next; channel->scheduled->next->prev = channel->scheduled->prev; + if (channel->scheduled->cancel) { + taosArrayPush(cancelArray, &(SVATaskCancelInfo){ + .cancel = channel->scheduled->cancel, + .arg = channel->scheduled->arg, + }); + } vnodeAsyncTaskDone(async, channel->scheduled); } taosMemoryFree(channel); @@ -713,12 +788,16 @@ int32_t vnodeAChannelDestroy(SVAsync *async, int64_t channelId, bool waitRunning channel->state = EVA_CHANNEL_STATE_CLOSE; } } - } else { - taosThreadMutexUnlock(&async->mutex); - return TSDB_CODE_INVALID_PARA; } taosThreadMutexUnlock(&async->mutex); + for (int32_t i = 0; i < taosArrayGetSize(cancelArray); i++) { + SVATaskCancelInfo *cancel = (SVATaskCancelInfo *)taosArrayGet(cancelArray, i); + cancel->cancel(cancel->arg); + } + taosArrayDestroy(cancelArray); + channelID->async = 0; + channelID->id = 0; return 0; } \ No newline at end of file diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index 645f2620dc..f071775990 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -289,7 +289,7 @@ static int32_t vnodePrepareCommit(SVnode *pVnode, SCommitInfo *pInfo) { int64_t lastCommitted = pInfo->info.state.committed; // wait last commit task - vnodeAWait(vnodeAsyncHandle[0], pVnode->commitTask); + vnodeAWait(&pVnode->commitTask); if (syncNodeGetConfig(pVnode->sync, &pVnode->config.syncCfg) != 0) goto _exit; @@ -361,15 +361,14 @@ static void vnodeReturnBufPool(SVnode *pVnode) { taosThreadMutexUnlock(&pVnode->mutex); } -static int32_t vnodeCommitTask(void *arg) { +static int32_t vnodeCommit(void *arg) { int32_t code = 0; SCommitInfo *pInfo = (SCommitInfo *)arg; SVnode *pVnode = pInfo->pVnode; // commit - code = vnodeCommitImpl(pInfo); - if (code) { + if ((code = vnodeCommitImpl(pInfo))) { vFatal("vgId:%d, failed to commit vnode since %s", TD_VID(pVnode), terrstr()); taosMsleep(100); exit(EXIT_FAILURE); @@ -379,37 +378,34 @@ static int32_t vnodeCommitTask(void *arg) { vnodeReturnBufPool(pVnode); _exit: + taosMemoryFree(arg); return code; } -static void vnodeCompleteCommit(void *arg) { taosMemoryFree(arg); } +static void vnodeCommitCancel(void *arg) { taosMemoryFree(arg); } int vnodeAsyncCommit(SVnode *pVnode) { int32_t code = 0; + int32_t lino = 0; SCommitInfo *pInfo = (SCommitInfo *)taosMemoryCalloc(1, sizeof(*pInfo)); if (NULL == pInfo) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _exit; + TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit); } // prepare to commit code = vnodePrepareCommit(pVnode, pInfo); - if (TSDB_CODE_SUCCESS != code) { - goto _exit; - } + TSDB_CHECK_CODE(code, lino, _exit); // schedule the task - code = vnodeAsyncC(vnodeAsyncHandle[0], pVnode->commitChannel, EVA_PRIORITY_HIGH, vnodeCommitTask, - vnodeCompleteCommit, pInfo, &pVnode->commitTask); + code = + vnodeAsync(&pVnode->commitChannel, EVA_PRIORITY_HIGH, vnodeCommit, vnodeCommitCancel, pInfo, &pVnode->commitTask); + TSDB_CHECK_CODE(code, lino, _exit); _exit: if (code) { - if (NULL != pInfo) { - taosMemoryFree(pInfo); - } - vError("vgId:%d, %s failed since %s, commit id:%" PRId64, TD_VID(pVnode), __func__, tstrerror(code), - pVnode->state.commitID); + taosMemoryFree(pInfo); + vError("vgId:%d %s failed at line %d since %s" PRId64, TD_VID(pVnode), __func__, lino, tstrerror(code)); } else { vInfo("vgId:%d, vnode async commit done, commitId:%" PRId64 " term:%" PRId64 " applied:%" PRId64, TD_VID(pVnode), pVnode->state.commitID, pVnode->state.applyTerm, pVnode->state.applied); @@ -419,7 +415,7 @@ _exit: int vnodeSyncCommit(SVnode *pVnode) { vnodeAsyncCommit(pVnode); - vnodeAWait(vnodeAsyncHandle[0], pVnode->commitTask); + vnodeAWait(&pVnode->commitTask); return 0; } diff --git a/source/dnode/vnode/src/vnd/vnodeHash.h b/source/dnode/vnode/src/vnd/vnodeHash.h index 0181ca748d..00b3488930 100644 --- a/source/dnode/vnode/src/vnd/vnodeHash.h +++ b/source/dnode/vnode/src/vnd/vnodeHash.h @@ -22,23 +22,6 @@ extern "C" { #endif -typedef struct SVHashTable SVHashTable; - -struct SVHashTable { - uint32_t (*hash)(const void*); - int32_t (*compare)(const void*, const void*); - int32_t numEntries; - uint32_t numBuckets; - struct SVHashEntry** buckets; -}; - -#define vHashNumEntries(ht) ((ht)->numEntries) -int32_t vHashInit(SVHashTable** ht, uint32_t (*hash)(const void*), int32_t (*compare)(const void*, const void*)); -int32_t vHashDestroy(SVHashTable** ht); -int32_t vHashPut(SVHashTable* ht, void* obj); -int32_t vHashGet(SVHashTable* ht, const void* obj, void** retObj); -int32_t vHashDrop(SVHashTable* ht, const void* obj); - #ifdef __cplusplus } #endif diff --git a/source/dnode/vnode/src/vnd/vnodeModule.c b/source/dnode/vnode/src/vnd/vnodeModule.c index 44fcbefba7..3a454c53ef 100644 --- a/source/dnode/vnode/src/vnd/vnodeModule.c +++ b/source/dnode/vnode/src/vnd/vnodeModule.c @@ -18,8 +18,6 @@ static volatile int32_t VINIT = 0; -SVAsync* vnodeAsyncHandle[2]; - int vnodeInit(int nthreads) { int32_t init; @@ -28,13 +26,9 @@ int vnodeInit(int nthreads) { return 0; } - // vnode-commit - vnodeAsyncInit(&vnodeAsyncHandle[0], "vnode-commit"); - vnodeAsyncSetWorkers(vnodeAsyncHandle[0], nthreads); - - // vnode-merge - vnodeAsyncInit(&vnodeAsyncHandle[1], "vnode-merge"); - vnodeAsyncSetWorkers(vnodeAsyncHandle[1], nthreads); + if (vnodeAsyncOpen(nthreads) != 0) { + return -1; + } if (walInit() < 0) { return -1; @@ -48,8 +42,7 @@ void vnodeCleanup() { if (init == 0) return; // set stop - vnodeAsyncDestroy(&vnodeAsyncHandle[0]); - vnodeAsyncDestroy(&vnodeAsyncHandle[1]); + vnodeAsyncClose(); walCleanUp(); smaCleanUp(); diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index 425cff7ce9..da8c3a6cad 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -399,7 +399,7 @@ SVnode *vnodeOpen(const char *path, int32_t diskPrimary, STfs *pTfs, SMsgCb msgC taosThreadMutexInit(&pVnode->mutex, NULL); taosThreadCondInit(&pVnode->poolNotEmpty, NULL); - if (vnodeAChannelInit(vnodeAsyncHandle[0], &pVnode->commitChannel) != 0) { + if (vnodeAChannelInit(1, &pVnode->commitChannel) != 0) { vError("vgId:%d, failed to init commit channel", TD_VID(pVnode)); goto _err; } @@ -527,8 +527,8 @@ void vnodePostClose(SVnode *pVnode) { vnodeSyncPostClose(pVnode); } void vnodeClose(SVnode *pVnode) { if (pVnode) { - vnodeAWait(vnodeAsyncHandle[0], pVnode->commitTask); - vnodeAChannelDestroy(vnodeAsyncHandle[0], pVnode->commitChannel, true); + vnodeAWait(&pVnode->commitTask); + vnodeAChannelDestroy(&pVnode->commitChannel, true); vnodeSyncClose(pVnode); vnodeQueryClose(pVnode); tqClose(pVnode->pTq); diff --git a/source/dnode/vnode/src/vnd/vnodeRetention.c b/source/dnode/vnode/src/vnd/vnodeRetention.c index 5db20b8fc7..6dca7a9a60 100644 --- a/source/dnode/vnode/src/vnd/vnodeRetention.c +++ b/source/dnode/vnode/src/vnd/vnodeRetention.c @@ -15,20 +15,15 @@ #include "vnd.h" -int32_t vnodeDoRetention(SVnode *pVnode, int64_t now) { - int32_t code = TSDB_CODE_SUCCESS; +extern int32_t tsdbAsyncRetention(STsdb *tsdb, int64_t now); +extern int32_t tsdbAsyncS3Migrate(STsdb *tsdb, int64_t now); - code = tsdbRetention(pVnode->pTsdb, now, pVnode->config.sttTrigger == 1); - - if (TSDB_CODE_SUCCESS == code) code = smaRetention(pVnode->pSma, now); - - return code; +int32_t vnodeAsyncRetention(SVnode *pVnode, int64_t now) { + // async retention + return tsdbAsyncRetention(pVnode->pTsdb, now); } -int32_t vnodeDoS3Migrate(SVnode *pVnode, int64_t now) { - int32_t code = TSDB_CODE_SUCCESS; - - code = tsdbS3Migrate(pVnode->pTsdb, now, pVnode->config.sttTrigger == 1); - - return code; +int32_t vnodeAsyncS3Migrate(SVnode *pVnode, int64_t now) { + // async migration + return tsdbAsyncS3Migrate(pVnode->pTsdb, now); } diff --git a/source/dnode/vnode/src/vnd/vnodeSnapshot.c b/source/dnode/vnode/src/vnd/vnodeSnapshot.c index fd1bb391b2..611a603c63 100644 --- a/source/dnode/vnode/src/vnd/vnodeSnapshot.c +++ b/source/dnode/vnode/src/vnd/vnodeSnapshot.c @@ -622,13 +622,13 @@ extern int32_t tsdbEnableBgTask(STsdb *pTsdb); static int32_t vnodeCancelAndDisableAllBgTask(SVnode *pVnode) { tsdbDisableAndCancelAllBgTask(pVnode->pTsdb); vnodeSyncCommit(pVnode); - vnodeAChannelDestroy(vnodeAsyncHandle[0], pVnode->commitChannel, true); + vnodeAChannelDestroy(&pVnode->commitChannel, true); return 0; } static int32_t vnodeEnableBgTask(SVnode *pVnode) { tsdbEnableBgTask(pVnode->pTsdb); - vnodeAChannelInit(vnodeAsyncHandle[0], &pVnode->commitChannel); + vnodeAChannelInit(1, &pVnode->commitChannel); return 0; } diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 02343206ad..490c3f08ce 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -50,7 +50,7 @@ static int32_t vnodeProcessArbCheckSyncReq(SVnode *pVnode, void *pReq, int32_t l static int32_t vnodePreCheckAssignedLogSyncd(SVnode *pVnode, char *member0Token, char *member1Token); static int32_t vnodeCheckAssignedLogSyncd(SVnode *pVnode, char *member0Token, char *member1Token); -static int32_t vnodeProcessFetchTtlExpiredTbs(SVnode* pVnode, int64_t ver, void* pReq, int32_t len, SRpcMsg* pRsp); +static int32_t vnodeProcessFetchTtlExpiredTbs(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp); extern int32_t vnodeProcessKillCompactReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp); extern int32_t vnodeQueryCompactProgress(SVnode *pVnode, SRpcMsg *pMsg); @@ -875,7 +875,7 @@ void vnodeUpdateMetaRsp(SVnode *pVnode, STableMetaRsp *pMetaRsp) { pMetaRsp->precision = pVnode->config.tsdbCfg.precision; } -extern int32_t vnodeDoRetention(SVnode *pVnode, int64_t now); +extern int32_t vnodeAsyncRetention(SVnode *pVnode, int64_t now); static int32_t vnodeProcessTrimReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) { int32_t code = 0; @@ -889,13 +889,13 @@ static int32_t vnodeProcessTrimReq(SVnode *pVnode, int64_t ver, void *pReq, int3 vInfo("vgId:%d, trim vnode request will be processed, time:%d", pVnode->config.vgId, trimReq.timestamp); - code = vnodeDoRetention(pVnode, trimReq.timestamp); + code = vnodeAsyncRetention(pVnode, trimReq.timestamp); _exit: return code; } -extern int32_t vnodeDoS3Migrate(SVnode *pVnode, int64_t now); +extern int32_t vnodeAsyncS3Migrate(SVnode *pVnode, int64_t now); static int32_t vnodeProcessS3MigrateReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) { int32_t code = 0; @@ -909,7 +909,7 @@ static int32_t vnodeProcessS3MigrateReq(SVnode *pVnode, int64_t ver, void *pReq, vInfo("vgId:%d, s3migrate vnode request will be processed, time:%d", pVnode->config.vgId, s3migrateReq.timestamp); - code = vnodeDoS3Migrate(pVnode, s3migrateReq.timestamp); + code = vnodeAsyncS3Migrate(pVnode, s3migrateReq.timestamp); _exit: return code; @@ -940,13 +940,13 @@ end: return ret; } -static int32_t vnodeProcessFetchTtlExpiredTbs(SVnode* pVnode, int64_t ver, void* pReq, int32_t len, SRpcMsg* pRsp) { +static int32_t vnodeProcessFetchTtlExpiredTbs(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) { int32_t code = -1; SMetaReader mr = {0}; SVDropTtlTableReq ttlReq = {0}; SVFetchTtlExpiredTbsRsp rsp = {0}; SEncoder encoder = {0}; - SArray* pNames = NULL; + SArray *pNames = NULL; pRsp->msgType = TDMT_VND_FETCH_TTL_EXPIRED_TBS_RSP; pRsp->code = TSDB_CODE_SUCCESS; pRsp->pCont = NULL; @@ -959,8 +959,8 @@ static int32_t vnodeProcessFetchTtlExpiredTbs(SVnode* pVnode, int64_t ver, void* ASSERT(ttlReq.nUids == taosArrayGetSize(ttlReq.pTbUids)); - tb_uid_t suid; - char ctbName[TSDB_TABLE_NAME_LEN]; + tb_uid_t suid; + char ctbName[TSDB_TABLE_NAME_LEN]; SVDropTbReq expiredTb = {.igNotExists = true}; metaReaderDoInit(&mr, pVnode->pMeta, 0); rsp.vgId = TD_VID(pVnode); @@ -974,12 +974,12 @@ static int32_t vnodeProcessFetchTtlExpiredTbs(SVnode* pVnode, int64_t ver, void* } char buf[TSDB_TABLE_NAME_LEN]; for (int32_t i = 0; i < ttlReq.nUids; ++i) { - tb_uid_t* uid = taosArrayGet(ttlReq.pTbUids, i); + tb_uid_t *uid = taosArrayGet(ttlReq.pTbUids, i); expiredTb.suid = *uid; terrno = metaReaderGetTableEntryByUid(&mr, *uid); if (terrno < 0) goto _end; strncpy(buf, mr.me.name, TSDB_TABLE_NAME_LEN); - void* p = taosArrayPush(pNames, buf); + void *p = taosArrayPush(pNames, buf); expiredTb.name = p; if (mr.me.type == TSDB_CHILD_TABLE) { expiredTb.suid = mr.me.ctbEntry.suid; @@ -1153,7 +1153,7 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t ver, void *pReq, if (i < tbNames->size - 1) { taosStringBuilderAppendChar(&sb, ','); } - //taosMemoryFreeClear(*key); + // taosMemoryFreeClear(*key); } size_t len = 0; @@ -2250,14 +2250,14 @@ static int32_t vnodeProcessDropIndexReq(SVnode *pVnode, int64_t ver, void *pReq, return TSDB_CODE_SUCCESS; } -extern int32_t vnodeProcessCompactVnodeReqImpl(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp); +extern int32_t vnodeAsyncCompact(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp); static int32_t vnodeProcessCompactVnodeReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) { if (!pVnode->restored) { vInfo("vgId:%d, ignore compact req during restoring. ver:%" PRId64, TD_VID(pVnode), ver); return 0; } - return vnodeProcessCompactVnodeReqImpl(pVnode, ver, pReq, len, pRsp); + return vnodeAsyncCompact(pVnode, ver, pReq, len, pRsp); } static int32_t vnodeProcessConfigChangeReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) { @@ -2364,8 +2364,6 @@ _OVER: } #ifndef TD_ENTERPRISE -int32_t vnodeProcessCompactVnodeReqImpl(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) { - return 0; -} +int32_t vnodeAsyncCompact(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) { return 0; } int32_t tsdbAsyncCompact(STsdb *tsdb, const STimeWindow *tw, bool sync) { return 0; } #endif diff --git a/source/libs/tfs/src/tfs.c b/source/libs/tfs/src/tfs.c index f2b45c5b84..e7c4573c14 100644 --- a/source/libs/tfs/src/tfs.c +++ b/source/libs/tfs/src/tfs.c @@ -177,11 +177,10 @@ int32_t tfsAllocDisk(STfs *pTfs, int32_t expLevel, SDiskID *pDiskId) { continue; } - return 0; + return (terrno = 0); } - terrno = TSDB_CODE_FS_NO_VALID_DISK; - return -1; + return (terrno = TSDB_CODE_FS_NO_VALID_DISK); } const char *tfsGetPrimaryPath(STfs *pTfs) { return TFS_PRIMARY_DISK(pTfs)->path; }