Merge pull request #25913 from taosdata/enh/TS-4723-3.0

enh: reduce compact block write time
This commit is contained in:
Hongze Cheng 2024-05-28 13:18:57 +08:00 committed by GitHub
commit 913612e4a6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
20 changed files with 728 additions and 528 deletions

View File

@ -355,6 +355,8 @@ typedef struct {
int flush_count;
} SCacheFlushState;
typedef struct SCompMonitor SCompMonitor;
struct STsdb {
char * path;
SVnode * pVnode;
@ -375,8 +377,11 @@ struct STsdb {
TdThreadMutex pgMutex;
struct STFileSystem *pFS; // new
SRocksCache rCache;
// compact monitor
struct SCompMonitor *pCompMonitor;
SCompMonitor *pCompMonitor;
struct {
SVHashTable *ht;
SArray *arr;
} *commitInfo;
};
struct TSDBKEY {

View File

@ -49,31 +49,21 @@ int32_t vnodeEncodeConfig(const void* pObj, SJson* pJson);
int32_t vnodeDecodeConfig(const SJson* pJson, void* pObj);
// vnodeAsync.c
typedef struct SVAsync SVAsync;
typedef enum {
EVA_PRIORITY_HIGH = 0,
EVA_PRIORITY_NORMAL,
EVA_PRIORITY_LOW,
} EVAPriority;
#define VNODE_ASYNC_VALID_CHANNEL_ID(channelId) ((channelId) > 0)
#define VNODE_ASYNC_VALID_TASK_ID(taskId) ((taskId) > 0)
int32_t vnodeAsyncInit(SVAsync** async, char* label);
int32_t vnodeAsyncDestroy(SVAsync** async);
int32_t vnodeAChannelInit(SVAsync* async, int64_t* channelId);
int32_t vnodeAChannelDestroy(SVAsync* async, int64_t channelId, bool waitRunning);
int32_t vnodeAsync(SVAsync* async, EVAPriority priority, int32_t (*execute)(void*), void (*complete)(void*), void* arg,
int64_t* taskId);
int32_t vnodeAsyncC(SVAsync* async, int64_t channelId, EVAPriority priority, int32_t (*execute)(void*),
void (*complete)(void*), void* arg, int64_t* taskId);
int32_t vnodeAWait(SVAsync* async, int64_t taskId);
int32_t vnodeACancel(SVAsync* async, int64_t taskId);
int32_t vnodeAsyncSetWorkers(SVAsync* async, int32_t numWorkers);
// vnodeModule.c
extern SVAsync* vnodeAsyncHandle[2];
int32_t vnodeAsyncOpen(int32_t numOfThreads);
int32_t vnodeAsyncClose();
int32_t vnodeAChannelInit(int64_t async, SVAChannelID* channelID);
int32_t vnodeAChannelDestroy(SVAChannelID* channelID, bool waitRunning);
int32_t vnodeAsync(SVAChannelID* channelID, EVAPriority priority, int32_t (*execute)(void*), void (*complete)(void*),
void* arg, SVATaskID* taskID);
int32_t vnodeAWait(SVATaskID* taskID);
int32_t vnodeACancel(SVATaskID* taskID);
int32_t vnodeAsyncSetWorkers(int64_t async, int32_t numWorkers);
// vnodeBufPool.c
typedef struct SVBufPoolNode SVBufPoolNode;

View File

@ -237,9 +237,6 @@ int32_t tsdbCacheNewSTableColumn(STsdb* pTsdb, SArray* uids, int16_t cid, int8_t
int32_t tsdbCacheDropSTableColumn(STsdb* pTsdb, SArray* uids, int16_t cid, bool hasPrimayKey);
int32_t tsdbCacheNewNTableColumn(STsdb* pTsdb, int64_t uid, int16_t cid, int8_t col_type);
int32_t tsdbCacheDropNTableColumn(STsdb* pTsdb, int64_t uid, int16_t cid, bool hasPrimayKey);
int32_t tsdbCompact(STsdb* pTsdb, SCompactInfo* pInfo);
int32_t tsdbRetention(STsdb* tsdb, int64_t now, int32_t sync);
int32_t tsdbS3Migrate(STsdb* tsdb, int64_t now, int32_t sync);
int tsdbScanAndConvertSubmitMsg(STsdb* pTsdb, SSubmitReq2* pMsg);
int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq2* pMsg, SSubmitRsp2* pRsp);
int32_t tsdbInsertTableData(STsdb* pTsdb, int64_t version, SSubmitTbData* pSubmitTbData, int32_t* affectedRows);
@ -470,6 +467,16 @@ typedef struct SVMonitorObj {
taos_counter_t* insertCounter;
} SVMonitorObj;
typedef struct {
int64_t async;
int64_t id;
} SVAChannelID;
typedef struct {
int64_t async;
int64_t id;
} SVATaskID;
struct SVnode {
char* path;
SVnodeCfg config;
@ -491,8 +498,8 @@ struct SVnode {
SVBufPool* onRecycle;
// commit variables
int64_t commitChannel;
int64_t commitTask;
SVAChannelID commitChannel;
SVATaskID commitTask;
SMeta* pMeta;
SSma* pSma;
@ -598,6 +605,24 @@ struct SCompactInfo {
void initStorageAPI(SStorageAPI* pAPI);
// a simple hash table impl
typedef struct SVHashTable SVHashTable;
struct SVHashTable {
uint32_t (*hash)(const void*);
int32_t (*compare)(const void*, const void*);
int32_t numEntries;
uint32_t numBuckets;
struct SVHashEntry** buckets;
};
#define vHashNumEntries(ht) ((ht)->numEntries)
int32_t vHashInit(SVHashTable** ht, uint32_t (*hash)(const void*), int32_t (*compare)(const void*, const void*));
int32_t vHashDestroy(SVHashTable** ht);
int32_t vHashPut(SVHashTable* ht, void* obj);
int32_t vHashGet(SVHashTable* ht, const void* obj, void** retObj);
int32_t vHashDrop(SVHashTable* ht, const void* obj);
#ifdef __cplusplus
}
#endif

View File

@ -622,8 +622,8 @@ int32_t smaRetention(SSma *pSma, int64_t now) {
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
if (pSma->pRSmaTsdb[i]) {
code = tsdbRetention(pSma->pRSmaTsdb[i], now, pSma->pVnode->config.sttTrigger == 1);
if (code) goto _end;
// code = tsdbRetention(pSma->pRSmaTsdb[i], now, pSma->pVnode->config.sttTrigger == 1);
// if (code) goto _end;
}
}

View File

@ -17,13 +17,13 @@
// extern dependencies
typedef struct {
STsdb *tsdb;
TFileSetArray *fsetArr;
TFileOpArray fopArray[1];
// SSkmInfo skmTb[1];
// SSkmInfo skmRow[1];
int32_t fid;
bool hasDataToCommit;
STFileSet *fset;
} SFileSetCommitInfo;
typedef struct {
STsdb *tsdb;
int32_t minutes;
int8_t precision;
int32_t minRow;
@ -32,34 +32,34 @@ typedef struct {
int32_t sttTrigger;
int32_t szPage;
int64_t compactVersion;
int64_t cid;
int64_t now;
struct {
int64_t cid;
int64_t now;
TSKEY nextKey;
int32_t fid;
int32_t expLevel;
SDiskID did;
TSKEY minKey;
TSKEY maxKey;
STFileSet *fset;
TABLEID tbid[1];
bool hasTSData;
bool skipTsRow;
SHashObj *pColCmprObj;
SFileSetCommitInfo *info;
int32_t expLevel;
SDiskID did;
TSKEY minKey;
TSKEY maxKey;
TABLEID tbid[1];
bool hasTSData;
bool skipTsRow;
SHashObj *pColCmprObj;
} ctx[1];
// reader
TSttFileReaderArray sttReaderArray[1];
// iter
TTsdbIterArray dataIterArray[1];
SIterMerger *dataIterMerger;
TTsdbIterArray tombIterArray[1];
SIterMerger *tombIterMerger;
// writer
SFSetWriter *writer;
TFileOpArray fopArray[1];
} SCommitter2;
static int32_t tsdbCommitOpenWriter(SCommitter2 *committer) {
@ -74,8 +74,8 @@ static int32_t tsdbCommitOpenWriter(SCommitter2 *committer) {
.maxRow = committer->maxRow,
.szPage = committer->szPage,
.cmprAlg = committer->cmprAlg,
.fid = committer->ctx->fid,
.cid = committer->ctx->cid,
.fid = committer->ctx->info->fid,
.cid = committer->cid,
.did = committer->ctx->did,
.level = 0,
};
@ -83,11 +83,11 @@ static int32_t tsdbCommitOpenWriter(SCommitter2 *committer) {
if (committer->sttTrigger == 1) {
config.toSttOnly = false;
if (committer->ctx->fset) {
if (committer->ctx->info->fset) {
for (int32_t ftype = TSDB_FTYPE_MIN; ftype < TSDB_FTYPE_MAX; ftype++) {
if (committer->ctx->fset->farr[ftype] != NULL) {
if (committer->ctx->info->fset->farr[ftype] != NULL) {
config.files[ftype].exist = true;
config.files[ftype].file = committer->ctx->fset->farr[ftype]->f[0];
config.files[ftype].file = committer->ctx->info->fset->farr[ftype]->f[0];
}
}
}
@ -117,7 +117,6 @@ static int32_t tsdbCommitTSData(SCommitter2 *committer) {
committer->ctx->tbid->suid = 0;
committer->ctx->tbid->uid = 0;
for (SRowInfo *row; (row = tsdbIterMergerGetData(committer->dataIterMerger)) != NULL;) {
if (row->uid != committer->ctx->tbid->uid) {
committer->ctx->tbid->suid = row->suid;
@ -132,7 +131,6 @@ static int32_t tsdbCommitTSData(SCommitter2 *committer) {
int64_t ts = TSDBROW_TS(&row->row);
if (ts > committer->ctx->maxKey) {
committer->ctx->nextKey = TMIN(committer->ctx->nextKey, ts);
code = tsdbIterMergerSkipTableData(committer->dataIterMerger, committer->ctx->tbid);
TSDB_CHECK_CODE(code, lino, _exit);
continue;
@ -152,7 +150,8 @@ _exit:
if (code) {
TSDB_ERROR_LOG(TD_VID(committer->tsdb->pVnode), lino, code);
} else {
tsdbDebug("vgId:%d fid:%d commit %" PRId64 " rows", TD_VID(committer->tsdb->pVnode), committer->ctx->fid, numOfRow);
tsdbDebug("vgId:%d fid:%d commit %" PRId64 " rows", TD_VID(committer->tsdb->pVnode), committer->ctx->info->fid,
numOfRow);
}
return code;
}
@ -168,7 +167,7 @@ static int32_t tsdbCommitTombData(SCommitter2 *committer) {
}
// do not need to write tomb data if there is no ts data
bool skip = (committer->ctx->fset == NULL && !committer->ctx->hasTSData);
bool skip = (committer->ctx->info->fset == NULL && !committer->ctx->hasTSData);
committer->ctx->tbid->suid = 0;
committer->ctx->tbid->uid = 0;
@ -187,12 +186,8 @@ static int32_t tsdbCommitTombData(SCommitter2 *committer) {
if (record->ekey < committer->ctx->minKey) {
// do nothing
} else if (record->skey > committer->ctx->maxKey) {
committer->ctx->nextKey = TMIN(record->skey, committer->ctx->nextKey);
// committer->ctx->nextKey = TMIN(record->skey, committer->ctx->nextKey);
} else {
if (record->ekey > committer->ctx->maxKey) {
committer->ctx->nextKey = TMIN(committer->ctx->nextKey, committer->ctx->maxKey + 1);
}
record->skey = TMAX(record->skey, committer->ctx->minKey);
record->ekey = TMIN(record->ekey, committer->ctx->maxKey);
@ -211,8 +206,8 @@ _exit:
if (code) {
TSDB_ERROR_LOG(TD_VID(committer->tsdb->pVnode), lino, code);
} else {
tsdbDebug("vgId:%d fid:%d commit %" PRId64 " tomb records", TD_VID(committer->tsdb->pVnode), committer->ctx->fid,
numRecord);
tsdbDebug("vgId:%d fid:%d commit %" PRId64 " tomb records", TD_VID(committer->tsdb->pVnode),
committer->ctx->info->fid, numRecord);
}
return code;
}
@ -223,15 +218,15 @@ static int32_t tsdbCommitOpenReader(SCommitter2 *committer) {
ASSERT(TARRAY2_SIZE(committer->sttReaderArray) == 0);
if (committer->ctx->fset == NULL //
|| committer->sttTrigger > 1 //
|| TARRAY2_SIZE(committer->ctx->fset->lvlArr) == 0 //
if (committer->ctx->info->fset == NULL //
|| committer->sttTrigger > 1 //
|| TARRAY2_SIZE(committer->ctx->info->fset->lvlArr) == 0 //
) {
return 0;
}
SSttLvl *lvl;
TARRAY2_FOREACH(committer->ctx->fset->lvlArr, lvl) {
TARRAY2_FOREACH(committer->ctx->info->fset->lvlArr, lvl) {
STFileObj *fobj = NULL;
TARRAY2_FOREACH(lvl->fobjArr, fobj) {
SSttFileReader *sttReader;
@ -289,7 +284,7 @@ static int32_t tsdbCommitOpenIter(SCommitter2 *committer) {
config.from->version = VERSION_MIN;
config.from->key = (SRowKey){
.ts = committer->ctx->minKey,
.numOfPKs = 0, // TODO: support multiple primary keys
.numOfPKs = 0,
};
code = tsdbIterOpen(&config, &iter);
@ -359,22 +354,15 @@ static int32_t tsdbCommitFileSetBegin(SCommitter2 *committer) {
int32_t lino = 0;
STsdb *tsdb = committer->tsdb;
int32_t fid = tsdbKeyFid(committer->ctx->nextKey, committer->minutes, committer->precision);
// check if can commit
tsdbFSCheckCommit(tsdb, fid);
tsdbFSCheckCommit(tsdb, committer->ctx->info->fid);
committer->ctx->fid = fid;
committer->ctx->expLevel = tsdbFidLevel(committer->ctx->fid, &tsdb->keepCfg, committer->ctx->now);
tsdbFidKeyRange(committer->ctx->fid, committer->minutes, committer->precision, &committer->ctx->minKey,
committer->ctx->expLevel = tsdbFidLevel(committer->ctx->info->fid, &tsdb->keepCfg, committer->now);
tsdbFidKeyRange(committer->ctx->info->fid, committer->minutes, committer->precision, &committer->ctx->minKey,
&committer->ctx->maxKey);
code = tfsAllocDisk(committer->tsdb->pVnode->pTfs, committer->ctx->expLevel, &committer->ctx->did);
TSDB_CHECK_CODE(code, lino, _exit);
tfsMkdirRecurAt(committer->tsdb->pVnode->pTfs, committer->tsdb->path, committer->ctx->did);
STFileSet fset = {.fid = committer->ctx->fid};
committer->ctx->fset = &fset;
STFileSet **fsetPtr = TARRAY2_SEARCH(committer->fsetArr, &committer->ctx->fset, tsdbTFileSetCmprFn, TD_EQ);
committer->ctx->fset = (fsetPtr == NULL) ? NULL : *fsetPtr;
committer->ctx->tbid->suid = 0;
committer->ctx->tbid->uid = 0;
@ -391,15 +379,13 @@ static int32_t tsdbCommitFileSetBegin(SCommitter2 *committer) {
code = tsdbCommitOpenWriter(committer);
TSDB_CHECK_CODE(code, lino, _exit);
// reset nextKey
committer->ctx->nextKey = TSKEY_MAX;
_exit:
if (code) {
TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code);
} else {
tsdbDebug("vgId:%d %s done, fid:%d minKey:%" PRId64 " maxKey:%" PRId64 " expLevel:%d", TD_VID(tsdb->pVnode),
__func__, committer->ctx->fid, committer->ctx->minKey, committer->ctx->maxKey, committer->ctx->expLevel);
__func__, committer->ctx->info->fid, committer->ctx->minKey, committer->ctx->maxKey,
committer->ctx->expLevel);
}
return code;
}
@ -421,7 +407,7 @@ _exit:
if (code) {
TSDB_ERROR_LOG(TD_VID(committer->tsdb->pVnode), lino, code);
} else {
tsdbDebug("vgId:%d %s done, fid:%d", TD_VID(committer->tsdb->pVnode), __func__, committer->ctx->fid);
tsdbDebug("vgId:%d %s done, fid:%d", TD_VID(committer->tsdb->pVnode), __func__, committer->ctx->info->fid);
}
return code;
}
@ -449,7 +435,201 @@ _exit:
if (code) {
TSDB_ERROR_LOG(TD_VID(committer->tsdb->pVnode), lino, code);
} else {
tsdbDebug("vgId:%d %s done, fid:%d", TD_VID(committer->tsdb->pVnode), __func__, committer->ctx->fid);
tsdbDebug("vgId:%d %s done, fid:%d", TD_VID(committer->tsdb->pVnode), __func__, committer->ctx->info->fid);
}
return code;
}
static int32_t tFileSetCommitInfoCompare(const void *arg1, const void *arg2) {
SFileSetCommitInfo *info1 = (SFileSetCommitInfo *)arg1;
SFileSetCommitInfo *info2 = (SFileSetCommitInfo *)arg2;
if (info1->fid < info2->fid) {
return -1;
} else if (info1->fid > info2->fid) {
return 1;
} else {
return 0;
}
}
static int32_t tFileSetCommitInfoPCompare(const void *arg1, const void *arg2) {
return tFileSetCommitInfoCompare(*(SFileSetCommitInfo **)arg1, *(SFileSetCommitInfo **)arg2);
}
static uint32_t tFileSetCommitInfoHash(const void *arg) {
SFileSetCommitInfo *info = (SFileSetCommitInfo *)arg;
return MurmurHash3_32((const char *)&info->fid, sizeof(info->fid));
}
static int32_t tsdbCommitInfoDestroy(STsdb *pTsdb) {
int32_t code = 0;
int32_t lino = 0;
if (pTsdb->commitInfo) {
for (int32_t i = 0; i < taosArrayGetSize(pTsdb->commitInfo->arr); i++) {
SFileSetCommitInfo *info = *(SFileSetCommitInfo **)taosArrayGet(pTsdb->commitInfo->arr, i);
vHashDrop(pTsdb->commitInfo->ht, info);
tsdbTFileSetClear(&info->fset);
taosMemoryFree(info);
}
vHashDestroy(&pTsdb->commitInfo->ht);
taosArrayDestroy(pTsdb->commitInfo->arr);
pTsdb->commitInfo->arr = NULL;
taosMemoryFreeClear(pTsdb->commitInfo);
}
_exit:
if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
}
return code;
}
static int32_t tsdbCommitInfoInit(STsdb *pTsdb) {
int32_t code = 0;
int32_t lino = 0;
pTsdb->commitInfo = taosMemoryCalloc(1, sizeof(*pTsdb->commitInfo));
if (pTsdb->commitInfo == NULL) {
TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit);
}
code = vHashInit(&pTsdb->commitInfo->ht, tFileSetCommitInfoHash, tFileSetCommitInfoCompare);
TSDB_CHECK_CODE(code, lino, _exit);
pTsdb->commitInfo->arr = taosArrayInit(0, sizeof(SFileSetCommitInfo *));
if (pTsdb->commitInfo->arr == NULL) {
TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit);
}
_exit:
if (code) {
tsdbCommitInfoDestroy(pTsdb);
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
}
return code;
}
static int32_t tsdbCommitInfoAdd(STsdb *tsdb, const SFileSetCommitInfo *info) {
int32_t code = 0;
int32_t lino = 0;
SFileSetCommitInfo *tinfo;
vHashGet(tsdb->commitInfo->ht, info, (void **)&tinfo);
if (tinfo) {
if (info->hasDataToCommit && !tinfo->hasDataToCommit) {
tinfo->hasDataToCommit = true;
}
} else {
if ((tinfo = taosMemoryCalloc(1, sizeof(*tinfo))) == NULL) {
TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit);
}
tinfo->fid = info->fid;
tinfo->hasDataToCommit = info->hasDataToCommit;
if (info->fset) {
code = tsdbTFileSetInitCopy(tsdb, info->fset, &tinfo->fset);
TSDB_CHECK_CODE(code, lino, _exit);
}
code = vHashPut(tsdb->commitInfo->ht, tinfo);
TSDB_CHECK_CODE(code, lino, _exit);
if ((taosArrayPush(tsdb->commitInfo->arr, &tinfo)) == NULL) {
TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit);
}
taosArraySort(tsdb->commitInfo->arr, tFileSetCommitInfoPCompare);
}
_exit:
if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(tsdb->pVnode), __func__, lino, tstrerror(code));
}
return code;
}
static int32_t tsdbCommitInfoBuild(STsdb *tsdb) {
int32_t code = 0;
int32_t lino = 0;
STFileSet *fset = NULL;
SRBTreeIter iter;
code = tsdbCommitInfoInit(tsdb);
TSDB_CHECK_CODE(code, lino, _exit);
taosThreadMutexLock(&tsdb->mutex);
TARRAY2_FOREACH(tsdb->pFS->fSetArr, fset) {
SFileSetCommitInfo info = {
.fid = fset->fid,
.hasDataToCommit = false,
.fset = fset,
};
if ((code = tsdbCommitInfoAdd(tsdb, &info))) {
taosThreadMutexUnlock(&tsdb->mutex);
TSDB_CHECK_CODE(code, lino, _exit);
}
}
taosThreadMutexUnlock(&tsdb->mutex);
iter = tRBTreeIterCreate(tsdb->imem->tbDataTree, 1);
for (SRBTreeNode *node = tRBTreeIterNext(&iter); node; node = tRBTreeIterNext(&iter)) {
STbData *pTbData = TCONTAINER_OF(node, STbData, rbtn);
// scan time-series data
STsdbRowKey from = {
.key.ts = INT64_MIN,
.key.numOfPKs = 0,
.version = INT64_MIN,
};
for (;;) {
int64_t minKey, maxKey;
STbDataIter tbDataIter = {0};
TSDBROW *row;
int32_t fid;
tsdbTbDataIterOpen(pTbData, &from, 0, &tbDataIter);
if ((row = tsdbTbDataIterGet(&tbDataIter)) == NULL) {
break;
}
fid = tsdbKeyFid(TSDBROW_TS(row), tsdb->keepCfg.days, tsdb->keepCfg.precision);
tsdbFidKeyRange(fid, tsdb->keepCfg.days, tsdb->keepCfg.precision, &minKey, &maxKey);
SFileSetCommitInfo info = {
.fid = fid,
.hasDataToCommit = true,
.fset = NULL,
};
code = tsdbCommitInfoAdd(tsdb, &info);
TSDB_CHECK_CODE(code, lino, _exit);
from.key.ts = maxKey + 1;
}
// scan tomb data
for (SDelData *pDelData = pTbData->pHead; pDelData; pDelData = pDelData->pNext) {
for (int32_t i = taosArrayGetSize(tsdb->commitInfo->arr) - 1; i >= 0; i--) {
int64_t minKey, maxKey;
SFileSetCommitInfo *info = *(SFileSetCommitInfo **)taosArrayGet(tsdb->commitInfo->arr, i);
tsdbFidKeyRange(info->fid, tsdb->keepCfg.days, tsdb->keepCfg.precision, &minKey, &maxKey);
if (pDelData->sKey > maxKey || pDelData->eKey < minKey) {
continue;
} else if (!info->hasDataToCommit) {
info->hasDataToCommit = true;
}
}
}
}
_exit:
if (code) {
tsdbCommitInfoDestroy(tsdb);
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(tsdb->pVnode), __func__, lino, tstrerror(code));
}
return code;
}
@ -458,11 +638,7 @@ static int32_t tsdbOpenCommitter(STsdb *tsdb, SCommitInfo *info, SCommitter2 *co
int32_t code = 0;
int32_t lino = 0;
memset(committer, 0, sizeof(committer[0]));
committer->tsdb = tsdb;
code = tsdbFSCreateCopySnapshot(tsdb->pFS, &committer->fsetArr);
TSDB_CHECK_CODE(code, lino, _exit);
committer->minutes = tsdb->keepCfg.days;
committer->precision = tsdb->keepCfg.precision;
committer->minRow = info->info.config.tsdbCfg.minRows;
@ -471,21 +647,21 @@ static int32_t tsdbOpenCommitter(STsdb *tsdb, SCommitInfo *info, SCommitter2 *co
committer->sttTrigger = info->info.config.sttTrigger;
committer->szPage = info->info.config.tsdbPageSize;
committer->compactVersion = INT64_MAX;
committer->ctx->cid = tsdbFSAllocEid(tsdb->pFS);
committer->ctx->now = taosGetTimestampSec();
committer->cid = tsdbFSAllocEid(tsdb->pFS);
committer->now = taosGetTimestampSec();
committer->ctx->nextKey = tsdb->imem->minKey;
if (tsdb->imem->nDel > 0) {
SRBTreeIter iter[1] = {tRBTreeIterCreate(tsdb->imem->tbDataTree, 1)};
code = tsdbCommitInfoBuild(tsdb);
TSDB_CHECK_CODE(code, lino, _exit);
for (SRBTreeNode *node = tRBTreeIterNext(iter); node; node = tRBTreeIterNext(iter)) {
STbData *tbData = TCONTAINER_OF(node, STbData, rbtn);
for (SDelData *delData = tbData->pHead; delData; delData = delData->pNext) {
committer->ctx->nextKey = TMIN(committer->ctx->nextKey, delData->sKey);
}
STFileSet *fset;
taosThreadMutexLock(&tsdb->mutex);
for (int i = 0; i < taosArrayGetSize(tsdb->commitInfo->arr); i++) {
SFileSetCommitInfo *info = *(SFileSetCommitInfo **)taosArrayGet(tsdb->commitInfo->arr, i);
if (info->hasDataToCommit && info->fset) {
tsdbBeginTaskOnFileSet(tsdb, info->fid, &fset);
}
}
taosThreadMutexUnlock(&tsdb->mutex);
_exit:
if (code) {
@ -516,14 +692,13 @@ static int32_t tsdbCloseCommitter(SCommitter2 *committer, int32_t eno) {
TARRAY2_DESTROY(committer->sttReaderArray, NULL);
TARRAY2_DESTROY(committer->fopArray, NULL);
TARRAY2_DESTROY(committer->sttReaderArray, NULL);
tsdbFSDestroyCopySnapshot(&committer->fsetArr);
_exit:
if (code) {
tsdbError("vgId:%d %s failed at line %d since %s, eid:%" PRId64, TD_VID(committer->tsdb->pVnode), __func__, lino,
tstrerror(code), committer->ctx->cid);
tstrerror(code), committer->cid);
} else {
tsdbDebug("vgId:%d %s done, eid:%" PRId64, TD_VID(committer->tsdb->pVnode), __func__, committer->ctx->cid);
tsdbDebug("vgId:%d %s done, eid:%" PRId64, TD_VID(committer->tsdb->pVnode), __func__, committer->cid);
}
return code;
}
@ -553,17 +728,20 @@ int32_t tsdbCommitBegin(STsdb *tsdb, SCommitInfo *info) {
taosThreadMutexUnlock(&tsdb->mutex);
tsdbUnrefMemTable(imem, NULL, true);
} else {
SCommitter2 committer[1];
SCommitter2 committer = {0};
code = tsdbOpenCommitter(tsdb, info, committer);
code = tsdbOpenCommitter(tsdb, info, &committer);
TSDB_CHECK_CODE(code, lino, _exit);
while (committer->ctx->nextKey != TSKEY_MAX) {
code = tsdbCommitFileSet(committer);
TSDB_CHECK_CODE(code, lino, _exit);
for (int32_t i = 0; i < taosArrayGetSize(tsdb->commitInfo->arr); i++) {
committer.ctx->info = *(SFileSetCommitInfo **)taosArrayGet(tsdb->commitInfo->arr, i);
if (committer.ctx->info->hasDataToCommit) {
code = tsdbCommitFileSet(&committer);
TSDB_CHECK_CODE(code, lino, _exit);
}
}
code = tsdbCloseCommitter(committer, code);
code = tsdbCloseCommitter(&committer, code);
TSDB_CHECK_CODE(code, lino, _exit);
}
@ -580,22 +758,33 @@ int32_t tsdbCommitCommit(STsdb *tsdb) {
int32_t code = 0;
int32_t lino = 0;
if (tsdb->imem == NULL) goto _exit;
if (tsdb->imem) {
SMemTable *pMemTable = tsdb->imem;
taosThreadMutexLock(&tsdb->mutex);
if ((code = tsdbFSEditCommit(tsdb->pFS))) {
taosThreadMutexUnlock(&tsdb->mutex);
TSDB_CHECK_CODE(code, lino, _exit);
}
tsdb->imem = NULL;
for (int32_t i = 0; i < taosArrayGetSize(tsdb->commitInfo->arr); i++) {
SFileSetCommitInfo *info = *(SFileSetCommitInfo **)taosArrayGet(tsdb->commitInfo->arr, i);
if (info->hasDataToCommit && info->fset) {
tsdbFinishTaskOnFileSet(tsdb, info->fid);
}
}
SMemTable *pMemTable = tsdb->imem;
taosThreadMutexLock(&tsdb->mutex);
code = tsdbFSEditCommit(tsdb->pFS);
if (code) {
taosThreadMutexUnlock(&tsdb->mutex);
TSDB_CHECK_CODE(code, lino, _exit);
tsdbCommitInfoDestroy(tsdb);
tsdbUnrefMemTable(pMemTable, NULL, true);
}
tsdb->imem = NULL;
taosThreadMutexUnlock(&tsdb->mutex);
tsdbUnrefMemTable(pMemTable, NULL, true);
_exit:
if (code) {
TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code);
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(tsdb->pVnode), __func__, lino, tstrerror(code));
} else {
tsdbInfo("vgId:%d %s done", TD_VID(tsdb->pVnode), __func__);
}
@ -611,6 +800,16 @@ int32_t tsdbCommitAbort(STsdb *pTsdb) {
code = tsdbFSEditAbort(pTsdb->pFS);
TSDB_CHECK_CODE(code, lino, _exit);
taosThreadMutexLock(&pTsdb->mutex);
for (int32_t i = 0; i < taosArrayGetSize(pTsdb->commitInfo->arr); i++) {
SFileSetCommitInfo *info = *(SFileSetCommitInfo **)taosArrayGet(pTsdb->commitInfo->arr, i);
if (info->hasDataToCommit && info->fset) {
tsdbFinishTaskOnFileSet(pTsdb, info->fid);
}
}
taosThreadMutexUnlock(&pTsdb->mutex);
tsdbCommitInfoDestroy(pTsdb);
_exit:
if (code) {
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
@ -618,4 +817,4 @@ _exit:
tsdbInfo("vgId:%d %s done", TD_VID(pTsdb->pVnode), __func__);
}
return code;
}
}

View File

@ -22,9 +22,6 @@
extern void remove_file(const char *fname);
#define TSDB_FS_EDIT_MIN TSDB_FEDIT_COMMIT
#define TSDB_FS_EDIT_MAX (TSDB_FEDIT_MERGE + 1)
typedef struct STFileHashEntry {
struct STFileHashEntry *next;
char fname[TSDB_FILENAME_LEN];
@ -290,10 +287,8 @@ static int32_t commit_edit(STFileSystem *fs) {
current_fname(fs->tsdb, current, TSDB_FCURRENT);
if (fs->etype == TSDB_FEDIT_COMMIT) {
current_fname(fs->tsdb, current_t, TSDB_FCURRENT_C);
} else if (fs->etype == TSDB_FEDIT_MERGE) {
current_fname(fs->tsdb, current_t, TSDB_FCURRENT_M);
} else {
ASSERT(0);
current_fname(fs->tsdb, current_t, TSDB_FCURRENT_M);
}
int32_t code;
@ -324,11 +319,8 @@ static int32_t abort_edit(STFileSystem *fs) {
if (fs->etype == TSDB_FEDIT_COMMIT) {
current_fname(fs->tsdb, fname, TSDB_FCURRENT_C);
} else if (fs->etype == TSDB_FEDIT_MERGE) {
current_fname(fs->tsdb, fname, TSDB_FCURRENT_M);
} else {
tsdbError("vgId:%d %s failed since invalid etype:%d", TD_VID(fs->tsdb->pVnode), __func__, fs->etype);
ASSERT(0);
current_fname(fs->tsdb, fname, TSDB_FCURRENT_M);
}
int32_t code;
@ -767,9 +759,12 @@ extern int32_t tsdbStopAllCompTask(STsdb *tsdb);
int32_t tsdbDisableAndCancelAllBgTask(STsdb *pTsdb) {
STFileSystem *fs = pTsdb->pFS;
TARRAY2(int64_t) channelArr = {0};
SArray *channelArray = taosArrayInit(0, sizeof(SVAChannelID));
if (channelArray == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
taosThreadMutexLock(&fs->tsdb->mutex);
taosThreadMutexLock(&pTsdb->mutex);
// disable
pTsdb->bgTaskDisabled = true;
@ -777,20 +772,23 @@ int32_t tsdbDisableAndCancelAllBgTask(STsdb *pTsdb) {
// collect channel
STFileSet *fset;
TARRAY2_FOREACH(fs->fSetArr, fset) {
if (VNODE_ASYNC_VALID_CHANNEL_ID(fset->bgTaskChannel)) {
TARRAY2_APPEND(&channelArr, fset->bgTaskChannel);
fset->bgTaskChannel = 0;
if (fset->channelOpened) {
taosArrayPush(channelArray, &fset->channel);
fset->channel = (SVAChannelID){0};
fset->mergeScheduled = false;
tsdbFSSetBlockCommit(fset, false);
fset->channelOpened = false;
}
fset->mergeScheduled = false;
tsdbFSSetBlockCommit(fset, false);
}
taosThreadMutexUnlock(&fs->tsdb->mutex);
taosThreadMutexUnlock(&pTsdb->mutex);
// destroy all channels
int64_t channel;
TARRAY2_FOREACH(&channelArr, channel) { vnodeAChannelDestroy(vnodeAsyncHandle[1], channel, true); }
TARRAY2_DESTROY(&channelArr, NULL);
for (int32_t i = 0; i < taosArrayGetSize(channelArray); i++) {
SVAChannelID *channel = taosArrayGet(channelArray, i);
vnodeAChannelDestroy(channel, true);
}
taosArrayDestroy(channelArray);
#ifdef TD_ENTERPRISE
tsdbStopAllCompTask(pTsdb);
@ -832,15 +830,10 @@ int32_t tsdbFSEditBegin(STFileSystem *fs, const TFileOpArray *opArray, EFEditT e
int32_t lino;
char current_t[TSDB_FILENAME_LEN];
switch (etype) {
case TSDB_FEDIT_COMMIT:
current_fname(fs->tsdb, current_t, TSDB_FCURRENT_C);
break;
case TSDB_FEDIT_MERGE:
current_fname(fs->tsdb, current_t, TSDB_FCURRENT_M);
break;
default:
ASSERT(0);
if (etype == TSDB_FEDIT_COMMIT) {
current_fname(fs->tsdb, current_t, TSDB_FCURRENT_C);
} else {
current_fname(fs->tsdb, current_t, TSDB_FCURRENT_M);
}
tsem_wait(&fs->canEdit);
@ -932,8 +925,7 @@ int32_t tsdbFSEditCommit(STFileSystem *fs) {
arg->tsdb = fs->tsdb;
arg->fid = fset->fid;
code = vnodeAsyncC(vnodeAsyncHandle[1], fset->bgTaskChannel, EVA_PRIORITY_HIGH, tsdbMerge, taosMemoryFree, arg,
NULL);
code = vnodeAsync(&fset->channel, EVA_PRIORITY_HIGH, tsdbMerge, taosMemoryFree, arg, NULL);
TSDB_CHECK_CODE(code, lino, _exit);
fset->mergeScheduled = true;
}
@ -946,33 +938,6 @@ int32_t tsdbFSEditCommit(STFileSystem *fs) {
}
}
// clear empty level and fset
int32_t i = 0;
while (i < TARRAY2_SIZE(fs->fSetArr)) {
STFileSet *fset = TARRAY2_GET(fs->fSetArr, i);
int32_t j = 0;
while (j < TARRAY2_SIZE(fset->lvlArr)) {
SSttLvl *lvl = TARRAY2_GET(fset->lvlArr, j);
if (TARRAY2_SIZE(lvl->fobjArr) == 0) {
TARRAY2_REMOVE(fset->lvlArr, j, tsdbSttLvlClear);
} else {
j++;
}
}
if (tsdbTFileSetIsEmpty(fset)) {
if (VNODE_ASYNC_VALID_CHANNEL_ID(fset->bgTaskChannel)) {
vnodeAChannelDestroy(vnodeAsyncHandle[1], fset->bgTaskChannel, false);
fset->bgTaskChannel = 0;
}
TARRAY2_REMOVE(fs->fSetArr, i, tsdbTFileSetClear);
} else {
i++;
}
}
_exit:
if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(fs->tsdb->pVnode), __func__, lino, tstrerror(code));
@ -1211,3 +1176,47 @@ _out:
}
int32_t tsdbFSDestroyRefRangedSnapshot(TFileSetRangeArray **fsrArr) { return tsdbTFileSetRangeArrayDestroy(fsrArr); }
int32_t tsdbBeginTaskOnFileSet(STsdb *tsdb, int32_t fid, STFileSet **fset) {
int16_t sttTrigger = tsdb->pVnode->config.sttTrigger;
tsdbFSGetFSet(tsdb->pFS, fid, fset);
if (sttTrigger == 1 && fset) {
for (;;) {
if ((*fset)->taskRunning) {
(*fset)->numWaitTask++;
taosThreadCondWait(&(*fset)->beginTask, &tsdb->mutex);
tsdbFSGetFSet(tsdb->pFS, fid, fset);
ASSERT(fset != NULL);
(*fset)->numWaitTask--;
ASSERT((*fset)->numWaitTask >= 0);
} else {
(*fset)->taskRunning = true;
break;
}
}
tsdbInfo("vgId:%d begin task on file set:%d", TD_VID(tsdb->pVnode), fid);
}
return 0;
}
int32_t tsdbFinishTaskOnFileSet(STsdb *tsdb, int32_t fid) {
int16_t sttTrigger = tsdb->pVnode->config.sttTrigger;
if (sttTrigger == 1) {
STFileSet *fset = NULL;
tsdbFSGetFSet(tsdb->pFS, fid, &fset);
if (fset != NULL && fset->taskRunning) {
fset->taskRunning = false;
if (fset->numWaitTask > 0) {
taosThreadCondSignal(&fset->beginTask);
}
tsdbInfo("vgId:%d finish task on file set:%d", TD_VID(tsdb->pVnode), fid);
}
}
return 0;
}

View File

@ -24,7 +24,9 @@ extern "C" {
typedef enum {
TSDB_FEDIT_COMMIT = 1, //
TSDB_FEDIT_MERGE
TSDB_FEDIT_MERGE,
TSDB_FEDIT_COMPACT,
TSDB_FEDIT_RETENTION,
} EFEditT;
typedef enum {
@ -59,6 +61,8 @@ int32_t tsdbFSEditAbort(STFileSystem *fs);
// other
int32_t tsdbFSGetFSet(STFileSystem *fs, int32_t fid, STFileSet **fset);
int32_t tsdbFSCheckCommit(STsdb *tsdb, int32_t fid);
int32_t tsdbBeginTaskOnFileSet(STsdb *tsdb, int32_t fid, STFileSet **fset);
int32_t tsdbFinishTaskOnFileSet(STsdb *tsdb, int32_t fid);
// utils
int32_t save_fs(const TFileSetArray *arr, const char *fname);
int32_t current_fname(STsdb *pTsdb, char *fname, EFCurrentT ftype);
@ -72,10 +76,6 @@ struct STFileSystem {
EFEditT etype;
TFileSetArray fSetArr[1];
TFileSetArray fSetArrTmp[1];
// background task queue
bool stop;
int64_t taskid;
};
#ifdef __cplusplus

View File

@ -456,13 +456,14 @@ int32_t tsdbTFileSetInit(int32_t fid, STFileSet **fset) {
TARRAY2_INIT(fset[0]->lvlArr);
// background task queue
fset[0]->bgTaskChannel = 0;
fset[0]->mergeScheduled = false;
taosThreadCondInit(&(*fset)->beginTask, NULL);
(*fset)->taskRunning = false;
(*fset)->numWaitTask = 0;
// block commit variables
taosThreadCondInit(&fset[0]->canCommit, NULL);
fset[0]->numWaitCommit = 0;
fset[0]->blockCommit = false;
(*fset)->numWaitCommit = 0;
(*fset)->blockCommit = false;
return 0;
}
@ -598,21 +599,19 @@ int32_t tsdbTFileSetRangeArrayDestroy(TFileSetRangeArray **ppArr) {
return 0;
}
int32_t tsdbTFileSetClear(STFileSet **fset) {
if (!fset[0]) return 0;
void tsdbTFileSetClear(STFileSet **fset) {
if (fset && *fset) {
for (tsdb_ftype_t ftype = TSDB_FTYPE_MIN; ftype < TSDB_FTYPE_MAX; ++ftype) {
if ((*fset)->farr[ftype] == NULL) continue;
tsdbTFileObjUnref((*fset)->farr[ftype]);
}
for (tsdb_ftype_t ftype = TSDB_FTYPE_MIN; ftype < TSDB_FTYPE_MAX; ++ftype) {
if (fset[0]->farr[ftype] == NULL) continue;
tsdbTFileObjUnref(fset[0]->farr[ftype]);
TARRAY2_DESTROY((*fset)->lvlArr, tsdbSttLvlClear);
taosThreadCondDestroy(&(*fset)->beginTask);
taosThreadCondDestroy(&(*fset)->canCommit);
taosMemoryFreeClear(*fset);
}
TARRAY2_DESTROY(fset[0]->lvlArr, tsdbSttLvlClear);
taosThreadCondDestroy(&fset[0]->canCommit);
taosMemoryFree(fset[0]);
fset[0] = NULL;
return 0;
}
int32_t tsdbTFileSetRemove(STFileSet *fset) {
@ -665,6 +664,12 @@ bool tsdbTFileSetIsEmpty(const STFileSet *fset) {
}
int32_t tsdbTFileSetOpenChannel(STFileSet *fset) {
if (VNODE_ASYNC_VALID_CHANNEL_ID(fset->bgTaskChannel)) return 0;
return vnodeAChannelInit(vnodeAsyncHandle[1], &fset->bgTaskChannel);
int32_t code;
if (!fset->channelOpened) {
if ((code = vnodeAChannelInit(2, &fset->channel))) {
return code;
}
fset->channelOpened = true;
}
return 0;
}

View File

@ -42,7 +42,7 @@ typedef enum {
int32_t tsdbTFileSetInit(int32_t fid, STFileSet **fset);
int32_t tsdbTFileSetInitCopy(STsdb *pTsdb, const STFileSet *fset1, STFileSet **fset);
int32_t tsdbTFileSetInitRef(STsdb *pTsdb, const STFileSet *fset1, STFileSet **fset);
int32_t tsdbTFileSetClear(STFileSet **fset);
void tsdbTFileSetClear(STFileSet **fset);
int32_t tsdbTFileSetRemove(STFileSet *fset);
int32_t tsdbTFileSetFilteredInitDup(STsdb *pTsdb, const STFileSet *fset1, int64_t ever, STFileSet **fset,
@ -90,9 +90,15 @@ struct STFileSet {
STFileObj *farr[TSDB_FTYPE_MAX]; // file array
TSttLvlArray lvlArr[1]; // level array
// background task channel
int64_t bgTaskChannel;
bool mergeScheduled;
// background task
bool channelOpened;
SVAChannelID channel;
bool mergeScheduled;
// sttTrigger = 1
TdThreadCond beginTask;
bool taskRunning;
int32_t numWaitTask;
// block commit variables
TdThreadCond canCommit;

View File

@ -593,5 +593,6 @@ _exit:
exit(EXIT_FAILURE);
}
tsdbTFileSetClear(&merger->fset);
taosMemoryFree(arg);
return code;
}

View File

@ -24,8 +24,8 @@ typedef struct {
int64_t now;
int64_t cid;
TFileSetArray *fsetArr;
TFileOpArray fopArr[1];
STFileSet *fset;
TFileOpArray fopArr;
} SRTNer;
static int32_t tsdbDoRemoveFileObject(SRTNer *rtner, const STFileObj *fobj) {
@ -35,7 +35,7 @@ static int32_t tsdbDoRemoveFileObject(SRTNer *rtner, const STFileObj *fobj) {
.of = fobj->f[0],
};
return TARRAY2_APPEND(rtner->fopArr, op);
return TARRAY2_APPEND(&rtner->fopArr, op);
}
static int32_t tsdbDoCopyFileLC(SRTNer *rtner, const STFileObj *from, const STFile *to) {
@ -71,7 +71,7 @@ static int32_t tsdbDoCopyFileLC(SRTNer *rtner, const STFileObj *from, const STFi
_exit:
if (code) {
TSDB_ERROR_LOG(TD_VID(rtner->tsdb->pVnode), lino, code);
tsdbError("vgId:%d, %s failed, code:%d, line:%d", TD_VID(rtner->tsdb->pVnode), __func__, code, lino);
if (fdFrom) taosCloseFile(&fdFrom);
if (fdTo) taosCloseFile(&fdTo);
}
@ -108,7 +108,7 @@ static int32_t tsdbDoCopyFile(SRTNer *rtner, const STFileObj *from, const STFile
_exit:
if (code) {
TSDB_ERROR_LOG(TD_VID(rtner->tsdb->pVnode), lino, code);
tsdbError("vgId:%d, %s failed, code:%d, line:%d", TD_VID(rtner->tsdb->pVnode), __func__, code, lino);
taosCloseFile(&fdFrom);
taosCloseFile(&fdTo);
}
@ -128,7 +128,7 @@ static int32_t tsdbDoMigrateFileObj(SRTNer *rtner, const STFileObj *fobj, const
.of = fobj->f[0],
};
code = TARRAY2_APPEND(rtner->fopArr, op);
code = TARRAY2_APPEND(&rtner->fopArr, op);
TSDB_CHECK_CODE(code, lino, _exit);
// create new
@ -152,7 +152,7 @@ static int32_t tsdbDoMigrateFileObj(SRTNer *rtner, const STFileObj *fobj, const
},
};
code = TARRAY2_APPEND(rtner->fopArr, op);
code = TARRAY2_APPEND(&rtner->fopArr, op);
TSDB_CHECK_CODE(code, lino, _exit);
// do copy the file
@ -167,7 +167,7 @@ static int32_t tsdbDoMigrateFileObj(SRTNer *rtner, const STFileObj *fobj, const
_exit:
if (code) {
TSDB_ERROR_LOG(TD_VID(rtner->tsdb->pVnode), lino, code);
tsdbError("vgId:%d, %s failed, code:%d, line:%d", TD_VID(rtner->tsdb->pVnode), __func__, code, lino);
}
return code;
}
@ -176,80 +176,51 @@ typedef struct {
STsdb *tsdb;
int64_t now;
int32_t fid;
bool s3Migrate;
} SRtnArg;
static int32_t tsdbDoRetentionBegin(SRtnArg *arg, SRTNer *rtner) {
int32_t code = 0;
int32_t lino = 0;
STsdb *tsdb = arg->tsdb;
rtner->tsdb = tsdb;
rtner->szPage = tsdb->pVnode->config.tsdbPageSize;
rtner->now = arg->now;
rtner->cid = tsdbFSAllocEid(tsdb->pFS);
code = tsdbFSCreateCopySnapshot(tsdb->pFS, &rtner->fsetArr);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
if (code) {
TSDB_ERROR_LOG(TD_VID(rtner->tsdb->pVnode), lino, code);
} else {
tsdbDebug("vid:%d, cid:%" PRId64 ", %s done", TD_VID(rtner->tsdb->pVnode), rtner->cid, __func__);
}
return code;
}
static int32_t tsdbDoRetentionEnd(SRTNer *rtner) {
int32_t code = 0;
int32_t lino = 0;
if (TARRAY2_SIZE(rtner->fopArr) == 0) goto _exit;
code = tsdbFSEditBegin(rtner->tsdb->pFS, rtner->fopArr, TSDB_FEDIT_MERGE);
TSDB_CHECK_CODE(code, lino, _exit);
taosThreadMutexLock(&rtner->tsdb->mutex);
code = tsdbFSEditCommit(rtner->tsdb->pFS);
if (code) {
taosThreadMutexUnlock(&rtner->tsdb->mutex);
if (TARRAY2_SIZE(&rtner->fopArr) > 0) {
code = tsdbFSEditBegin(rtner->tsdb->pFS, &rtner->fopArr, TSDB_FEDIT_RETENTION);
TSDB_CHECK_CODE(code, lino, _exit);
taosThreadMutexLock(&rtner->tsdb->mutex);
code = tsdbFSEditCommit(rtner->tsdb->pFS);
if (code) {
taosThreadMutexUnlock(&rtner->tsdb->mutex);
TSDB_CHECK_CODE(code, lino, _exit);
}
taosThreadMutexUnlock(&rtner->tsdb->mutex);
TARRAY2_DESTROY(&rtner->fopArr, NULL);
}
taosThreadMutexUnlock(&rtner->tsdb->mutex);
TARRAY2_DESTROY(rtner->fopArr, NULL);
_exit:
if (code) {
TSDB_ERROR_LOG(TD_VID(rtner->tsdb->pVnode), lino, code);
tsdbError("vgId:%d, %s failed, code:%d, line:%d", TD_VID(rtner->tsdb->pVnode), __func__, code, lino);
} else {
tsdbDebug("vid:%d, cid:%" PRId64 ", %s done", TD_VID(rtner->tsdb->pVnode), rtner->cid, __func__);
}
tsdbFSDestroyCopySnapshot(&rtner->fsetArr);
return code;
}
static int32_t tsdbDoRetentionOnFileSet(SRTNer *rtner, STFileSet *fset) {
static int32_t tsdbDoRetention(SRTNer *rtner) {
int32_t code = 0;
int32_t lino = 0;
STFileObj *fobj = NULL;
STFileSet *fset = rtner->fset;
int32_t expLevel = tsdbFidLevel(fset->fid, &rtner->tsdb->keepCfg, rtner->now);
if (expLevel < 0) { // remove the fileset
for (int32_t ftype = 0; (ftype < TSDB_FTYPE_MAX) && (fobj = fset->farr[ftype], 1); ++ftype) {
if (fobj == NULL) continue;
/*
int32_t nlevel = tfsGetLevel(rtner->tsdb->pVnode->pTfs);
if (tsS3Enabled && nlevel > 1 && TSDB_FTYPE_DATA == ftype && fobj->f->did.level == nlevel - 1) {
code = tsdbRemoveFileObjectS3(rtner, fobj);
TSDB_CHECK_CODE(code, lino, _exit);
} else {*/
code = tsdbDoRemoveFileObject(rtner, fobj);
TSDB_CHECK_CODE(code, lino, _exit);
//}
}
SSttLvl *lvl;
@ -275,10 +246,6 @@ static int32_t tsdbDoRetentionOnFileSet(SRTNer *rtner, STFileSet *fset) {
if (fobj == NULL) continue;
if (fobj->f->did.level == did.level) {
/*
code = tsdbCheckMigrateS3(rtner, fobj, ftype, &did);
TSDB_CHECK_CODE(code, lino, _exit);
*/
continue;
}
@ -306,91 +273,110 @@ static int32_t tsdbDoRetentionOnFileSet(SRTNer *rtner, STFileSet *fset) {
_exit:
if (code) {
TSDB_ERROR_LOG(TD_VID(rtner->tsdb->pVnode), lino, code);
tsdbError("vgId:%d, %s failed, code:%d, line:%d", TD_VID(rtner->tsdb->pVnode), __func__, code, lino);
}
return code;
}
static void tsdbFreeRtnArg(void *arg) { taosMemoryFree(arg); }
static void tsdbRetentionCancel(void *arg) { taosMemoryFree(arg); }
static int32_t tsdbDoRetentionAsync(void *arg) {
static int32_t tsdbDoS3Migrate(SRTNer *rtner);
static int32_t tsdbRetention(void *arg) {
int32_t code = 0;
int32_t lino = 0;
SRTNer rtner[1] = {0};
code = tsdbDoRetentionBegin(arg, rtner);
TSDB_CHECK_CODE(code, lino, _exit);
SRtnArg *rtnArg = (SRtnArg *)arg;
STsdb *pTsdb = rtnArg->tsdb;
SVnode *pVnode = pTsdb->pVnode;
STFileSet *fset = NULL;
SRTNer rtner = {
.tsdb = pTsdb,
.szPage = pVnode->config.tsdbPageSize,
.now = rtnArg->now,
.cid = tsdbFSAllocEid(pTsdb->pFS),
};
STFileSet *fset;
TARRAY2_FOREACH(rtner->fsetArr, fset) {
if (fset->fid != ((SRtnArg *)arg)->fid) continue;
// begin task
taosThreadMutexLock(&pTsdb->mutex);
tsdbBeginTaskOnFileSet(pTsdb, rtnArg->fid, &fset);
if (fset && (code = tsdbTFileSetInitCopy(pTsdb, fset, &rtner.fset))) {
taosThreadMutexUnlock(&pTsdb->mutex);
TSDB_CHECK_CODE(code, lino, _exit);
}
taosThreadMutexUnlock(&pTsdb->mutex);
code = tsdbDoRetentionOnFileSet(rtner, fset);
// do retention
if (rtner.fset) {
if (rtnArg->s3Migrate) {
code = tsdbDoS3Migrate(&rtner);
TSDB_CHECK_CODE(code, lino, _exit);
} else {
code = tsdbDoRetention(&rtner);
TSDB_CHECK_CODE(code, lino, _exit);
}
code = tsdbDoRetentionEnd(&rtner);
TSDB_CHECK_CODE(code, lino, _exit);
}
code = tsdbDoRetentionEnd(rtner);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
if (code) {
if (TARRAY2_DATA(rtner->fopArr)) {
TARRAY2_DESTROY(rtner->fopArr, NULL);
}
TFileSetArray **fsetArr = &rtner->fsetArr;
if (fsetArr[0]) {
tsdbFSDestroyCopySnapshot(&rtner->fsetArr);
}
if (rtner.fset) {
taosThreadMutexLock(&pTsdb->mutex);
tsdbFinishTaskOnFileSet(pTsdb, rtnArg->fid);
taosThreadMutexUnlock(&pTsdb->mutex);
}
TSDB_ERROR_LOG(TD_VID(rtner->tsdb->pVnode), lino, code);
// clear resources
tsdbTFileSetClear(&rtner.fset);
TARRAY2_DESTROY(&rtner.fopArr, NULL);
taosMemoryFree(arg);
if (code) {
tsdbError("vgId:%d, %s failed, code:%d, line:%d", TD_VID(((SRtnArg *)arg)->tsdb->pVnode), __func__, code, lino);
}
return code;
}
int32_t tsdbRetention(STsdb *tsdb, int64_t now, int32_t sync) {
static int32_t tsdbAsyncRetentionImpl(STsdb *tsdb, int64_t now, bool s3Migrate) {
int32_t code = 0;
taosThreadMutexLock(&tsdb->mutex);
if (tsdb->bgTaskDisabled) {
taosThreadMutexUnlock(&tsdb->mutex);
return 0;
}
int32_t lino = 0;
STFileSet *fset;
TARRAY2_FOREACH(tsdb->pFS->fSetArr, fset) {
code = tsdbTFileSetOpenChannel(fset);
if (code) {
taosThreadMutexUnlock(&tsdb->mutex);
return code;
}
SRtnArg *arg = taosMemoryMalloc(sizeof(*arg));
if (arg == NULL) {
taosThreadMutexUnlock(&tsdb->mutex);
return TSDB_CODE_OUT_OF_MEMORY;
}
if (!tsdb->bgTaskDisabled) {
TARRAY2_FOREACH(tsdb->pFS->fSetArr, fset) {
code = tsdbTFileSetOpenChannel(fset);
TSDB_CHECK_CODE(code, lino, _exit);
arg->tsdb = tsdb;
arg->now = now;
arg->fid = fset->fid;
SRtnArg *arg = taosMemoryMalloc(sizeof(*arg));
if (arg == NULL) {
TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit);
}
if (sync) {
code = vnodeAsyncC(vnodeAsyncHandle[0], tsdb->pVnode->commitChannel, EVA_PRIORITY_LOW, tsdbDoRetentionAsync,
tsdbFreeRtnArg, arg, NULL);
} else {
code = vnodeAsyncC(vnodeAsyncHandle[1], fset->bgTaskChannel, EVA_PRIORITY_LOW, tsdbDoRetentionAsync,
tsdbFreeRtnArg, arg, NULL);
}
if (code) {
tsdbFreeRtnArg(arg);
taosThreadMutexUnlock(&tsdb->mutex);
return code;
arg->tsdb = tsdb;
arg->now = now;
arg->fid = fset->fid;
arg->s3Migrate = s3Migrate;
if ((code = vnodeAsync(&fset->channel, EVA_PRIORITY_LOW, tsdbRetention, tsdbRetentionCancel, arg, NULL))) {
taosMemoryFree(arg);
TSDB_CHECK_CODE(code, lino, _exit);
}
}
}
taosThreadMutexUnlock(&tsdb->mutex);
_exit:
if (code) {
tsdbError("vgId:%d, %s failed, code:%d, line:%d", TD_VID(tsdb->pVnode), __func__, code, lino);
}
return code;
}
int32_t tsdbAsyncRetention(STsdb *tsdb, int64_t now) {
int32_t code = 0;
taosThreadMutexLock(&tsdb->mutex);
code = tsdbAsyncRetentionImpl(tsdb, now, false);
taosThreadMutexUnlock(&tsdb->mutex);
return code;
}
@ -462,7 +448,7 @@ static int32_t tsdbMigrateDataFileLCS3(SRTNer *rtner, const STFileObj *fobj, int
.of = fobj->f[0],
};
code = TARRAY2_APPEND(rtner->fopArr, op);
code = TARRAY2_APPEND(&rtner->fopArr, op);
TSDB_CHECK_CODE(code, lino, _exit);
// create new
@ -486,7 +472,7 @@ static int32_t tsdbMigrateDataFileLCS3(SRTNer *rtner, const STFileObj *fobj, int
},
};
code = TARRAY2_APPEND(rtner->fopArr, op);
code = TARRAY2_APPEND(&rtner->fopArr, op);
TSDB_CHECK_CODE(code, lino, _exit);
char fname[TSDB_FILENAME_LEN];
@ -566,7 +552,7 @@ static int32_t tsdbMigrateDataFileS3(SRTNer *rtner, const STFileObj *fobj, int64
.of = fobj->f[0],
};
code = TARRAY2_APPEND(rtner->fopArr, op);
code = TARRAY2_APPEND(&rtner->fopArr, op);
TSDB_CHECK_CODE(code, lino, _exit);
// create new
@ -590,7 +576,7 @@ static int32_t tsdbMigrateDataFileS3(SRTNer *rtner, const STFileObj *fobj, int64
},
};
code = TARRAY2_APPEND(rtner->fopArr, op);
code = TARRAY2_APPEND(&rtner->fopArr, op);
TSDB_CHECK_CODE(code, lino, _exit);
char fname[TSDB_FILENAME_LEN];
@ -653,10 +639,11 @@ _exit:
return code;
}
static int32_t tsdbDoS3MigrateOnFileSet(SRTNer *rtner, STFileSet *fset) {
static int32_t tsdbDoS3Migrate(SRTNer *rtner) {
int32_t code = 0;
int32_t lino = 0;
STFileSet *fset = rtner->fset;
STFileObj *fobj = fset->farr[TSDB_FTYPE_DATA];
if (!fobj) return code;
@ -720,41 +707,7 @@ _exit:
return code;
}
static int32_t tsdbDoS3MigrateAsync(void *arg) {
int32_t code = 0;
int32_t lino = 0;
SRTNer rtner[1] = {0};
code = tsdbDoRetentionBegin(arg, rtner);
TSDB_CHECK_CODE(code, lino, _exit);
STFileSet *fset;
TARRAY2_FOREACH(rtner->fsetArr, fset) {
if (fset->fid != ((SRtnArg *)arg)->fid) continue;
code = tsdbDoS3MigrateOnFileSet(rtner, fset);
TSDB_CHECK_CODE(code, lino, _exit);
}
code = tsdbDoRetentionEnd(rtner);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
if (code) {
if (TARRAY2_DATA(rtner->fopArr)) {
TARRAY2_DESTROY(rtner->fopArr, NULL);
}
TFileSetArray **fsetArr = &rtner->fsetArr;
if (fsetArr[0]) {
tsdbFSDestroyCopySnapshot(&rtner->fsetArr);
}
TSDB_ERROR_LOG(TD_VID(rtner->tsdb->pVnode), lino, code);
}
return code;
}
int32_t tsdbS3Migrate(STsdb *tsdb, int64_t now, int32_t sync) {
int32_t tsdbAsyncS3Migrate(STsdb *tsdb, int64_t now) {
int32_t code = 0;
extern int8_t tsS3EnabledCfg;
@ -772,45 +725,7 @@ int32_t tsdbS3Migrate(STsdb *tsdb, int64_t now, int32_t sync) {
}
taosThreadMutexLock(&tsdb->mutex);
if (tsdb->bgTaskDisabled) {
taosThreadMutexUnlock(&tsdb->mutex);
return 0;
}
STFileSet *fset;
TARRAY2_FOREACH(tsdb->pFS->fSetArr, fset) {
code = tsdbTFileSetOpenChannel(fset);
if (code) {
taosThreadMutexUnlock(&tsdb->mutex);
return code;
}
SRtnArg *arg = taosMemoryMalloc(sizeof(*arg));
if (arg == NULL) {
taosThreadMutexUnlock(&tsdb->mutex);
return TSDB_CODE_OUT_OF_MEMORY;
}
arg->tsdb = tsdb;
arg->now = now;
arg->fid = fset->fid;
if (sync) {
code = vnodeAsyncC(vnodeAsyncHandle[0], tsdb->pVnode->commitChannel, EVA_PRIORITY_LOW, tsdbDoS3MigrateAsync,
tsdbFreeRtnArg, arg, NULL);
} else {
code = vnodeAsyncC(vnodeAsyncHandle[1], fset->bgTaskChannel, EVA_PRIORITY_LOW, tsdbDoS3MigrateAsync,
tsdbFreeRtnArg, arg, NULL);
}
if (code) {
tsdbFreeRtnArg(arg);
taosThreadMutexUnlock(&tsdb->mutex);
return code;
}
}
code = tsdbAsyncRetentionImpl(tsdb, now, true);
taosThreadMutexUnlock(&tsdb->mutex);
return code;
}

View File

@ -16,6 +16,7 @@
#include "vnd.h"
#include "vnodeHash.h"
typedef struct SVAsync SVAsync;
typedef struct SVATask SVATask;
typedef struct SVAChannel SVAChannel;
@ -54,7 +55,7 @@ struct SVATask {
int32_t priorScore;
SVAChannel *channel;
int32_t (*execute)(void *);
void (*complete)(void *);
void (*cancel)(void *);
void *arg;
EVATaskState state;
@ -67,6 +68,11 @@ struct SVATask {
struct SVATask *next;
};
typedef struct {
void (*cancel)(void *);
void *arg;
} SVATaskCancelInfo;
#define VATASK_PIORITY(task_) ((task_)->priority - ((task_)->priorScore / 4))
// async channel
@ -112,6 +118,10 @@ struct SVAsync {
SVHashTable *taskTable;
};
SVAsync *vnodeAsyncs[3];
#define MIN_ASYNC_ID 1
#define MAX_ASYNC_ID (sizeof(vnodeAsyncs) / sizeof(vnodeAsyncs[0]) - 1)
static int32_t vnodeAsyncTaskDone(SVAsync *async, SVATask *task) {
int32_t ret;
@ -160,11 +170,6 @@ static int32_t vnodeAsyncTaskDone(SVAsync *async, SVATask *task) {
}
async->numTasks--;
// call complete callback
if (task->complete) {
task->complete(task->arg);
}
if (task->numWait == 0) {
taosThreadCondDestroy(&task->waitCond);
taosMemoryFree(task);
@ -176,7 +181,7 @@ static int32_t vnodeAsyncTaskDone(SVAsync *async, SVATask *task) {
return 0;
}
static int32_t vnodeAsyncCancelAllTasks(SVAsync *async) {
static int32_t vnodeAsyncCancelAllTasks(SVAsync *async, SArray *cancelArray) {
while (async->queue[0].next != &async->queue[0] || async->queue[1].next != &async->queue[1] ||
async->queue[2].next != &async->queue[2]) {
for (int32_t i = 0; i < EVA_PRIORITY_MAX; i++) {
@ -184,6 +189,12 @@ static int32_t vnodeAsyncCancelAllTasks(SVAsync *async) {
SVATask *task = async->queue[i].next;
task->prev->next = task->next;
task->next->prev = task->prev;
if (task->cancel) {
taosArrayPush(cancelArray, &(SVATaskCancelInfo){
.cancel = task->cancel,
.arg = task->arg,
});
}
vnodeAsyncTaskDone(async, task);
}
}
@ -194,6 +205,7 @@ static int32_t vnodeAsyncCancelAllTasks(SVAsync *async) {
static void *vnodeAsyncLoop(void *arg) {
SVWorker *worker = (SVWorker *)arg;
SVAsync *async = worker->async;
SArray *cancelArray = taosArrayInit(0, sizeof(SVATaskCancelInfo));
setThreadName(async->label);
@ -209,12 +221,12 @@ static void *vnodeAsyncLoop(void *arg) {
for (;;) {
if (async->stop || worker->workerId >= async->numWorkers) {
if (async->stop) { // cancel all tasks
vnodeAsyncCancelAllTasks(async);
vnodeAsyncCancelAllTasks(async, cancelArray);
}
worker->state = EVA_WORKER_STATE_STOP;
async->numLaunchWorkers--;
taosThreadMutexUnlock(&async->mutex);
return NULL;
goto _exit;
}
for (int32_t i = 0; i < EVA_PRIORITY_MAX; i++) {
@ -259,6 +271,12 @@ static void *vnodeAsyncLoop(void *arg) {
worker->runningTask->execute(worker->runningTask->arg);
}
_exit:
for (int32_t i = 0; i < taosArrayGetSize(cancelArray); i++) {
SVATaskCancelInfo *cancel = (SVATaskCancelInfo *)taosArrayGet(cancelArray, i);
cancel->cancel(cancel->arg);
}
taosArrayDestroy(cancelArray);
return NULL;
}
@ -294,7 +312,7 @@ static int32_t vnodeAsyncChannelCompare(const void *obj1, const void *obj2) {
return 0;
}
int32_t vnodeAsyncInit(SVAsync **async, char *label) {
static int32_t vnodeAsyncInit(SVAsync **async, const char *label) {
int32_t ret;
if (async == NULL) {
@ -360,7 +378,7 @@ int32_t vnodeAsyncInit(SVAsync **async, char *label) {
return 0;
}
int32_t vnodeAsyncDestroy(SVAsync **async) {
static int32_t vnodeAsyncDestroy(SVAsync **async) {
if ((*async) == NULL) {
return TSDB_CODE_INVALID_PARA;
}
@ -433,20 +451,39 @@ static int32_t vnodeAsyncLaunchWorker(SVAsync *async) {
return 0;
}
#ifdef BUILD_NO_CALL
int32_t vnodeAsync(SVAsync *async, EVAPriority priority, int32_t (*execute)(void *), void (*complete)(void *),
void *arg, int64_t *taskId) {
return vnodeAsyncC(async, 0, priority, execute, complete, arg, taskId);
}
#endif
int32_t vnodeAsyncOpen(int32_t numOfThreads) {
int32_t code = 0;
int32_t lino = 0;
int32_t vnodeAsyncC(SVAsync *async, int64_t channelId, EVAPriority priority, int32_t (*execute)(void *),
void (*complete)(void *), void *arg, int64_t *taskId) {
if (async == NULL || execute == NULL || channelId < 0) {
// vnode-commit
code = vnodeAsyncInit(&vnodeAsyncs[1], "vnode-commit");
TSDB_CHECK_CODE(code, lino, _exit);
vnodeAsyncSetWorkers(1, numOfThreads);
// vnode-merge
code = vnodeAsyncInit(&vnodeAsyncs[2], "vnode-merge");
TSDB_CHECK_CODE(code, lino, _exit);
vnodeAsyncSetWorkers(2, numOfThreads);
_exit:
return 0;
}
int32_t vnodeAsyncClose() {
vnodeAsyncDestroy(&vnodeAsyncs[1]);
vnodeAsyncDestroy(&vnodeAsyncs[2]);
return 0;
}
int32_t vnodeAsync(SVAChannelID *channelID, EVAPriority priority, int32_t (*execute)(void *), void (*cancel)(void *),
void *arg, SVATaskID *taskID) {
if (channelID == NULL || channelID->async < MIN_ASYNC_ID || channelID->async > MAX_ASYNC_ID || execute == NULL ||
channelID->id < 0) {
return TSDB_CODE_INVALID_PARA;
}
int64_t id;
int64_t id;
SVAsync *async = vnodeAsyncs[channelID->async];
// create task object
SVATask *task = (SVATask *)taosMemoryCalloc(1, sizeof(SVATask));
@ -457,7 +494,7 @@ int32_t vnodeAsyncC(SVAsync *async, int64_t channelId, EVAPriority priority, int
task->priority = priority;
task->priorScore = 0;
task->execute = execute;
task->complete = complete;
task->cancel = cancel;
task->arg = arg;
task->state = EVA_TASK_STATE_WAITTING;
task->numWait = 0;
@ -466,10 +503,12 @@ int32_t vnodeAsyncC(SVAsync *async, int64_t channelId, EVAPriority priority, int
// schedule task
taosThreadMutexLock(&async->mutex);
if (channelId == 0) {
if (channelID->id == 0) {
task->channel = NULL;
} else {
SVAChannel channel = {.channelId = channelId};
SVAChannel channel = {
.channelId = channelID->id,
};
vHashGet(async->channelTable, &channel, (void **)&task->channel);
if (task->channel == NULL) {
taosThreadMutexUnlock(&async->mutex);
@ -540,20 +579,24 @@ int32_t vnodeAsyncC(SVAsync *async, int64_t channelId, EVAPriority priority, int
taosThreadMutexUnlock(&async->mutex);
if (taskId != NULL) {
*taskId = id;
if (taskID != NULL) {
taskID->async = channelID->async;
taskID->id = id;
}
return 0;
}
int32_t vnodeAWait(SVAsync *async, int64_t taskId) {
if (async == NULL || taskId <= 0) {
int32_t vnodeAWait(SVATaskID *taskID) {
if (taskID == NULL || taskID->async < MIN_ASYNC_ID || taskID->async > MAX_ASYNC_ID || taskID->id <= 0) {
return TSDB_CODE_INVALID_PARA;
}
SVAsync *async = vnodeAsyncs[taskID->async];
SVATask *task = NULL;
SVATask task2 = {.taskId = taskId};
SVATask task2 = {
.taskId = taskID->id,
};
taosThreadMutexLock(&async->mutex);
@ -574,21 +617,27 @@ int32_t vnodeAWait(SVAsync *async, int64_t taskId) {
return 0;
}
int32_t vnodeACancel(SVAsync *async, int64_t taskId) {
if (async == NULL) {
int32_t vnodeACancel(SVATaskID *taskID) {
if (taskID == NULL || taskID->async < MIN_ASYNC_ID || taskID->async > MAX_ASYNC_ID || taskID->id <= 0) {
return TSDB_CODE_INVALID_PARA;
}
int32_t ret = 0;
SVAsync *async = vnodeAsyncs[taskID->async];
SVATask *task = NULL;
SVATask task2 = {.taskId = taskId};
SVATask task2 = {
.taskId = taskID->id,
};
void (*cancel)(void *) = NULL;
void *arg = NULL;
taosThreadMutexLock(&async->mutex);
vHashGet(async->taskTable, &task2, (void **)&task);
if (task) {
if (task->state == EVA_TASK_STATE_WAITTING) {
// remove from queue
cancel = task->cancel;
arg = task->arg;
task->next->prev = task->prev;
task->prev->next = task->next;
vnodeAsyncTaskDone(async, task);
@ -599,14 +648,18 @@ int32_t vnodeACancel(SVAsync *async, int64_t taskId) {
taosThreadMutexUnlock(&async->mutex);
if (cancel) {
cancel(arg);
}
return ret;
}
int32_t vnodeAsyncSetWorkers(SVAsync *async, int32_t numWorkers) {
if (async == NULL || numWorkers <= 0 || numWorkers > VNODE_ASYNC_MAX_WORKERS) {
int32_t vnodeAsyncSetWorkers(int64_t asyncID, int32_t numWorkers) {
if (asyncID < MIN_ASYNC_ID || asyncID > MAX_ASYNC_ID || numWorkers <= 0 || numWorkers > VNODE_ASYNC_MAX_WORKERS) {
return TSDB_CODE_INVALID_PARA;
}
SVAsync *async = vnodeAsyncs[asyncID];
taosThreadMutexLock(&async->mutex);
async->numWorkers = numWorkers;
if (async->numIdleWorkers > 0) {
@ -617,11 +670,13 @@ int32_t vnodeAsyncSetWorkers(SVAsync *async, int32_t numWorkers) {
return 0;
}
int32_t vnodeAChannelInit(SVAsync *async, int64_t *channelId) {
if (async == NULL || channelId == NULL) {
int32_t vnodeAChannelInit(int64_t asyncID, SVAChannelID *channelID) {
if (channelID == NULL || asyncID < MIN_ASYNC_ID || asyncID > MAX_ASYNC_ID) {
return TSDB_CODE_INVALID_PARA;
}
SVAsync *async = vnodeAsyncs[asyncID];
// create channel object
SVAChannel *channel = (SVAChannel *)taosMemoryMalloc(sizeof(SVAChannel));
if (channel == NULL) {
@ -637,7 +692,7 @@ int32_t vnodeAChannelInit(SVAsync *async, int64_t *channelId) {
// register channel
taosThreadMutexLock(&async->mutex);
channel->channelId = *channelId = ++async->nextChannelId;
channel->channelId = channelID->id = ++async->nextChannelId;
// add to hash table
int32_t ret = vHashPut(async->channelTable, channel);
@ -657,16 +712,24 @@ int32_t vnodeAChannelInit(SVAsync *async, int64_t *channelId) {
taosThreadMutexUnlock(&async->mutex);
channelID->async = asyncID;
return 0;
}
int32_t vnodeAChannelDestroy(SVAsync *async, int64_t channelId, bool waitRunning) {
if (async == NULL || channelId <= 0) {
int32_t vnodeAChannelDestroy(SVAChannelID *channelID, bool waitRunning) {
if (channelID == NULL || channelID->async < MIN_ASYNC_ID || channelID->async > MAX_ASYNC_ID || channelID->id <= 0) {
return TSDB_CODE_INVALID_PARA;
}
SVAsync *async = vnodeAsyncs[channelID->async];
SVAChannel *channel = NULL;
SVAChannel channel2 = {.channelId = channelId};
SVAChannel channel2 = {
.channelId = channelID->id,
};
SArray *cancelArray = taosArrayInit(0, sizeof(SVATaskCancelInfo));
if (cancelArray == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
taosThreadMutexLock(&async->mutex);
@ -684,6 +747,12 @@ int32_t vnodeAChannelDestroy(SVAsync *async, int64_t channelId, bool waitRunning
SVATask *task = channel->queue[i].next;
task->prev->next = task->next;
task->next->prev = task->prev;
if (task->cancel) {
taosArrayPush(cancelArray, &(SVATaskCancelInfo){
.cancel = task->cancel,
.arg = task->arg,
});
}
vnodeAsyncTaskDone(async, task);
}
}
@ -693,6 +762,12 @@ int32_t vnodeAChannelDestroy(SVAsync *async, int64_t channelId, bool waitRunning
if (channel->scheduled) {
channel->scheduled->prev->next = channel->scheduled->next;
channel->scheduled->next->prev = channel->scheduled->prev;
if (channel->scheduled->cancel) {
taosArrayPush(cancelArray, &(SVATaskCancelInfo){
.cancel = channel->scheduled->cancel,
.arg = channel->scheduled->arg,
});
}
vnodeAsyncTaskDone(async, channel->scheduled);
}
taosMemoryFree(channel);
@ -713,12 +788,16 @@ int32_t vnodeAChannelDestroy(SVAsync *async, int64_t channelId, bool waitRunning
channel->state = EVA_CHANNEL_STATE_CLOSE;
}
}
} else {
taosThreadMutexUnlock(&async->mutex);
return TSDB_CODE_INVALID_PARA;
}
taosThreadMutexUnlock(&async->mutex);
for (int32_t i = 0; i < taosArrayGetSize(cancelArray); i++) {
SVATaskCancelInfo *cancel = (SVATaskCancelInfo *)taosArrayGet(cancelArray, i);
cancel->cancel(cancel->arg);
}
taosArrayDestroy(cancelArray);
channelID->async = 0;
channelID->id = 0;
return 0;
}

View File

@ -289,7 +289,7 @@ static int32_t vnodePrepareCommit(SVnode *pVnode, SCommitInfo *pInfo) {
int64_t lastCommitted = pInfo->info.state.committed;
// wait last commit task
vnodeAWait(vnodeAsyncHandle[0], pVnode->commitTask);
vnodeAWait(&pVnode->commitTask);
if (syncNodeGetConfig(pVnode->sync, &pVnode->config.syncCfg) != 0) goto _exit;
@ -361,15 +361,14 @@ static void vnodeReturnBufPool(SVnode *pVnode) {
taosThreadMutexUnlock(&pVnode->mutex);
}
static int32_t vnodeCommitTask(void *arg) {
static int32_t vnodeCommit(void *arg) {
int32_t code = 0;
SCommitInfo *pInfo = (SCommitInfo *)arg;
SVnode *pVnode = pInfo->pVnode;
// commit
code = vnodeCommitImpl(pInfo);
if (code) {
if ((code = vnodeCommitImpl(pInfo))) {
vFatal("vgId:%d, failed to commit vnode since %s", TD_VID(pVnode), terrstr());
taosMsleep(100);
exit(EXIT_FAILURE);
@ -379,37 +378,34 @@ static int32_t vnodeCommitTask(void *arg) {
vnodeReturnBufPool(pVnode);
_exit:
taosMemoryFree(arg);
return code;
}
static void vnodeCompleteCommit(void *arg) { taosMemoryFree(arg); }
static void vnodeCommitCancel(void *arg) { taosMemoryFree(arg); }
int vnodeAsyncCommit(SVnode *pVnode) {
int32_t code = 0;
int32_t lino = 0;
SCommitInfo *pInfo = (SCommitInfo *)taosMemoryCalloc(1, sizeof(*pInfo));
if (NULL == pInfo) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit);
}
// prepare to commit
code = vnodePrepareCommit(pVnode, pInfo);
if (TSDB_CODE_SUCCESS != code) {
goto _exit;
}
TSDB_CHECK_CODE(code, lino, _exit);
// schedule the task
code = vnodeAsyncC(vnodeAsyncHandle[0], pVnode->commitChannel, EVA_PRIORITY_HIGH, vnodeCommitTask,
vnodeCompleteCommit, pInfo, &pVnode->commitTask);
code =
vnodeAsync(&pVnode->commitChannel, EVA_PRIORITY_HIGH, vnodeCommit, vnodeCommitCancel, pInfo, &pVnode->commitTask);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
if (code) {
if (NULL != pInfo) {
taosMemoryFree(pInfo);
}
vError("vgId:%d, %s failed since %s, commit id:%" PRId64, TD_VID(pVnode), __func__, tstrerror(code),
pVnode->state.commitID);
taosMemoryFree(pInfo);
vError("vgId:%d %s failed at line %d since %s" PRId64, TD_VID(pVnode), __func__, lino, tstrerror(code));
} else {
vInfo("vgId:%d, vnode async commit done, commitId:%" PRId64 " term:%" PRId64 " applied:%" PRId64, TD_VID(pVnode),
pVnode->state.commitID, pVnode->state.applyTerm, pVnode->state.applied);
@ -419,7 +415,7 @@ _exit:
int vnodeSyncCommit(SVnode *pVnode) {
vnodeAsyncCommit(pVnode);
vnodeAWait(vnodeAsyncHandle[0], pVnode->commitTask);
vnodeAWait(&pVnode->commitTask);
return 0;
}

View File

@ -22,23 +22,6 @@
extern "C" {
#endif
typedef struct SVHashTable SVHashTable;
struct SVHashTable {
uint32_t (*hash)(const void*);
int32_t (*compare)(const void*, const void*);
int32_t numEntries;
uint32_t numBuckets;
struct SVHashEntry** buckets;
};
#define vHashNumEntries(ht) ((ht)->numEntries)
int32_t vHashInit(SVHashTable** ht, uint32_t (*hash)(const void*), int32_t (*compare)(const void*, const void*));
int32_t vHashDestroy(SVHashTable** ht);
int32_t vHashPut(SVHashTable* ht, void* obj);
int32_t vHashGet(SVHashTable* ht, const void* obj, void** retObj);
int32_t vHashDrop(SVHashTable* ht, const void* obj);
#ifdef __cplusplus
}
#endif

View File

@ -18,8 +18,6 @@
static volatile int32_t VINIT = 0;
SVAsync* vnodeAsyncHandle[2];
int vnodeInit(int nthreads) {
int32_t init;
@ -28,13 +26,9 @@ int vnodeInit(int nthreads) {
return 0;
}
// vnode-commit
vnodeAsyncInit(&vnodeAsyncHandle[0], "vnode-commit");
vnodeAsyncSetWorkers(vnodeAsyncHandle[0], nthreads);
// vnode-merge
vnodeAsyncInit(&vnodeAsyncHandle[1], "vnode-merge");
vnodeAsyncSetWorkers(vnodeAsyncHandle[1], nthreads);
if (vnodeAsyncOpen(nthreads) != 0) {
return -1;
}
if (walInit() < 0) {
return -1;
@ -48,8 +42,7 @@ void vnodeCleanup() {
if (init == 0) return;
// set stop
vnodeAsyncDestroy(&vnodeAsyncHandle[0]);
vnodeAsyncDestroy(&vnodeAsyncHandle[1]);
vnodeAsyncClose();
walCleanUp();
smaCleanUp();

View File

@ -399,7 +399,7 @@ SVnode *vnodeOpen(const char *path, int32_t diskPrimary, STfs *pTfs, SMsgCb msgC
taosThreadMutexInit(&pVnode->mutex, NULL);
taosThreadCondInit(&pVnode->poolNotEmpty, NULL);
if (vnodeAChannelInit(vnodeAsyncHandle[0], &pVnode->commitChannel) != 0) {
if (vnodeAChannelInit(1, &pVnode->commitChannel) != 0) {
vError("vgId:%d, failed to init commit channel", TD_VID(pVnode));
goto _err;
}
@ -527,8 +527,8 @@ void vnodePostClose(SVnode *pVnode) { vnodeSyncPostClose(pVnode); }
void vnodeClose(SVnode *pVnode) {
if (pVnode) {
vnodeAWait(vnodeAsyncHandle[0], pVnode->commitTask);
vnodeAChannelDestroy(vnodeAsyncHandle[0], pVnode->commitChannel, true);
vnodeAWait(&pVnode->commitTask);
vnodeAChannelDestroy(&pVnode->commitChannel, true);
vnodeSyncClose(pVnode);
vnodeQueryClose(pVnode);
tqClose(pVnode->pTq);

View File

@ -15,20 +15,15 @@
#include "vnd.h"
int32_t vnodeDoRetention(SVnode *pVnode, int64_t now) {
int32_t code = TSDB_CODE_SUCCESS;
extern int32_t tsdbAsyncRetention(STsdb *tsdb, int64_t now);
extern int32_t tsdbAsyncS3Migrate(STsdb *tsdb, int64_t now);
code = tsdbRetention(pVnode->pTsdb, now, pVnode->config.sttTrigger == 1);
if (TSDB_CODE_SUCCESS == code) code = smaRetention(pVnode->pSma, now);
return code;
int32_t vnodeAsyncRetention(SVnode *pVnode, int64_t now) {
// async retention
return tsdbAsyncRetention(pVnode->pTsdb, now);
}
int32_t vnodeDoS3Migrate(SVnode *pVnode, int64_t now) {
int32_t code = TSDB_CODE_SUCCESS;
code = tsdbS3Migrate(pVnode->pTsdb, now, pVnode->config.sttTrigger == 1);
return code;
int32_t vnodeAsyncS3Migrate(SVnode *pVnode, int64_t now) {
// async migration
return tsdbAsyncS3Migrate(pVnode->pTsdb, now);
}

View File

@ -622,13 +622,13 @@ extern int32_t tsdbEnableBgTask(STsdb *pTsdb);
static int32_t vnodeCancelAndDisableAllBgTask(SVnode *pVnode) {
tsdbDisableAndCancelAllBgTask(pVnode->pTsdb);
vnodeSyncCommit(pVnode);
vnodeAChannelDestroy(vnodeAsyncHandle[0], pVnode->commitChannel, true);
vnodeAChannelDestroy(&pVnode->commitChannel, true);
return 0;
}
static int32_t vnodeEnableBgTask(SVnode *pVnode) {
tsdbEnableBgTask(pVnode->pTsdb);
vnodeAChannelInit(vnodeAsyncHandle[0], &pVnode->commitChannel);
vnodeAChannelInit(1, &pVnode->commitChannel);
return 0;
}

View File

@ -50,7 +50,7 @@ static int32_t vnodeProcessArbCheckSyncReq(SVnode *pVnode, void *pReq, int32_t l
static int32_t vnodePreCheckAssignedLogSyncd(SVnode *pVnode, char *member0Token, char *member1Token);
static int32_t vnodeCheckAssignedLogSyncd(SVnode *pVnode, char *member0Token, char *member1Token);
static int32_t vnodeProcessFetchTtlExpiredTbs(SVnode* pVnode, int64_t ver, void* pReq, int32_t len, SRpcMsg* pRsp);
static int32_t vnodeProcessFetchTtlExpiredTbs(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
extern int32_t vnodeProcessKillCompactReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
extern int32_t vnodeQueryCompactProgress(SVnode *pVnode, SRpcMsg *pMsg);
@ -866,7 +866,7 @@ void vnodeUpdateMetaRsp(SVnode *pVnode, STableMetaRsp *pMetaRsp) {
pMetaRsp->precision = pVnode->config.tsdbCfg.precision;
}
extern int32_t vnodeDoRetention(SVnode *pVnode, int64_t now);
extern int32_t vnodeAsyncRetention(SVnode *pVnode, int64_t now);
static int32_t vnodeProcessTrimReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
int32_t code = 0;
@ -880,13 +880,13 @@ static int32_t vnodeProcessTrimReq(SVnode *pVnode, int64_t ver, void *pReq, int3
vInfo("vgId:%d, trim vnode request will be processed, time:%d", pVnode->config.vgId, trimReq.timestamp);
code = vnodeDoRetention(pVnode, trimReq.timestamp);
code = vnodeAsyncRetention(pVnode, trimReq.timestamp);
_exit:
return code;
}
extern int32_t vnodeDoS3Migrate(SVnode *pVnode, int64_t now);
extern int32_t vnodeAsyncS3Migrate(SVnode *pVnode, int64_t now);
static int32_t vnodeProcessS3MigrateReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
int32_t code = 0;
@ -900,7 +900,7 @@ static int32_t vnodeProcessS3MigrateReq(SVnode *pVnode, int64_t ver, void *pReq,
vInfo("vgId:%d, s3migrate vnode request will be processed, time:%d", pVnode->config.vgId, s3migrateReq.timestamp);
code = vnodeDoS3Migrate(pVnode, s3migrateReq.timestamp);
code = vnodeAsyncS3Migrate(pVnode, s3migrateReq.timestamp);
_exit:
return code;
@ -931,13 +931,13 @@ end:
return ret;
}
static int32_t vnodeProcessFetchTtlExpiredTbs(SVnode* pVnode, int64_t ver, void* pReq, int32_t len, SRpcMsg* pRsp) {
static int32_t vnodeProcessFetchTtlExpiredTbs(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
int32_t code = -1;
SMetaReader mr = {0};
SVDropTtlTableReq ttlReq = {0};
SVFetchTtlExpiredTbsRsp rsp = {0};
SEncoder encoder = {0};
SArray* pNames = NULL;
SArray *pNames = NULL;
pRsp->msgType = TDMT_VND_FETCH_TTL_EXPIRED_TBS_RSP;
pRsp->code = TSDB_CODE_SUCCESS;
pRsp->pCont = NULL;
@ -950,8 +950,8 @@ static int32_t vnodeProcessFetchTtlExpiredTbs(SVnode* pVnode, int64_t ver, void*
ASSERT(ttlReq.nUids == taosArrayGetSize(ttlReq.pTbUids));
tb_uid_t suid;
char ctbName[TSDB_TABLE_NAME_LEN];
tb_uid_t suid;
char ctbName[TSDB_TABLE_NAME_LEN];
SVDropTbReq expiredTb = {.igNotExists = true};
metaReaderDoInit(&mr, pVnode->pMeta, 0);
rsp.vgId = TD_VID(pVnode);
@ -965,12 +965,12 @@ static int32_t vnodeProcessFetchTtlExpiredTbs(SVnode* pVnode, int64_t ver, void*
}
char buf[TSDB_TABLE_NAME_LEN];
for (int32_t i = 0; i < ttlReq.nUids; ++i) {
tb_uid_t* uid = taosArrayGet(ttlReq.pTbUids, i);
tb_uid_t *uid = taosArrayGet(ttlReq.pTbUids, i);
expiredTb.suid = *uid;
terrno = metaReaderGetTableEntryByUid(&mr, *uid);
if (terrno < 0) goto _end;
strncpy(buf, mr.me.name, TSDB_TABLE_NAME_LEN);
void* p = taosArrayPush(pNames, buf);
void *p = taosArrayPush(pNames, buf);
expiredTb.name = p;
if (mr.me.type == TSDB_CHILD_TABLE) {
expiredTb.suid = mr.me.ctbEntry.suid;
@ -1144,7 +1144,7 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t ver, void *pReq,
if (i < tbNames->size - 1) {
taosStringBuilderAppendChar(&sb, ',');
}
//taosMemoryFreeClear(*key);
// taosMemoryFreeClear(*key);
}
size_t len = 0;
@ -2241,14 +2241,14 @@ static int32_t vnodeProcessDropIndexReq(SVnode *pVnode, int64_t ver, void *pReq,
return TSDB_CODE_SUCCESS;
}
extern int32_t vnodeProcessCompactVnodeReqImpl(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
extern int32_t vnodeAsyncCompact(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessCompactVnodeReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
if (!pVnode->restored) {
vInfo("vgId:%d, ignore compact req during restoring. ver:%" PRId64, TD_VID(pVnode), ver);
return 0;
}
return vnodeProcessCompactVnodeReqImpl(pVnode, ver, pReq, len, pRsp);
return vnodeAsyncCompact(pVnode, ver, pReq, len, pRsp);
}
static int32_t vnodeProcessConfigChangeReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {

View File

@ -177,11 +177,10 @@ int32_t tfsAllocDisk(STfs *pTfs, int32_t expLevel, SDiskID *pDiskId) {
continue;
}
return 0;
return (terrno = 0);
}
terrno = TSDB_CODE_FS_NO_VALID_DISK;
return -1;
return (terrno = TSDB_CODE_FS_NO_VALID_DISK);
}
const char *tfsGetPrimaryPath(STfs *pTfs) { return TFS_PRIMARY_DISK(pTfs)->path; }