Merge branch '3.0' into fix/3_liaohj
This commit is contained in:
commit
c3c6b680fb
|
@ -355,6 +355,8 @@ typedef struct {
|
|||
int flush_count;
|
||||
} SCacheFlushState;
|
||||
|
||||
typedef struct SCompMonitor SCompMonitor;
|
||||
|
||||
struct STsdb {
|
||||
char * path;
|
||||
SVnode * pVnode;
|
||||
|
@ -375,8 +377,11 @@ struct STsdb {
|
|||
TdThreadMutex pgMutex;
|
||||
struct STFileSystem *pFS; // new
|
||||
SRocksCache rCache;
|
||||
// compact monitor
|
||||
struct SCompMonitor *pCompMonitor;
|
||||
SCompMonitor *pCompMonitor;
|
||||
struct {
|
||||
SVHashTable *ht;
|
||||
SArray *arr;
|
||||
} *commitInfo;
|
||||
};
|
||||
|
||||
struct TSDBKEY {
|
||||
|
|
|
@ -49,31 +49,21 @@ int32_t vnodeEncodeConfig(const void* pObj, SJson* pJson);
|
|||
int32_t vnodeDecodeConfig(const SJson* pJson, void* pObj);
|
||||
|
||||
// vnodeAsync.c
|
||||
typedef struct SVAsync SVAsync;
|
||||
|
||||
typedef enum {
|
||||
EVA_PRIORITY_HIGH = 0,
|
||||
EVA_PRIORITY_NORMAL,
|
||||
EVA_PRIORITY_LOW,
|
||||
} EVAPriority;
|
||||
|
||||
#define VNODE_ASYNC_VALID_CHANNEL_ID(channelId) ((channelId) > 0)
|
||||
#define VNODE_ASYNC_VALID_TASK_ID(taskId) ((taskId) > 0)
|
||||
|
||||
int32_t vnodeAsyncInit(SVAsync** async, char* label);
|
||||
int32_t vnodeAsyncDestroy(SVAsync** async);
|
||||
int32_t vnodeAChannelInit(SVAsync* async, int64_t* channelId);
|
||||
int32_t vnodeAChannelDestroy(SVAsync* async, int64_t channelId, bool waitRunning);
|
||||
int32_t vnodeAsync(SVAsync* async, EVAPriority priority, int32_t (*execute)(void*), void (*complete)(void*), void* arg,
|
||||
int64_t* taskId);
|
||||
int32_t vnodeAsyncC(SVAsync* async, int64_t channelId, EVAPriority priority, int32_t (*execute)(void*),
|
||||
void (*complete)(void*), void* arg, int64_t* taskId);
|
||||
int32_t vnodeAWait(SVAsync* async, int64_t taskId);
|
||||
int32_t vnodeACancel(SVAsync* async, int64_t taskId);
|
||||
int32_t vnodeAsyncSetWorkers(SVAsync* async, int32_t numWorkers);
|
||||
|
||||
// vnodeModule.c
|
||||
extern SVAsync* vnodeAsyncHandle[2];
|
||||
int32_t vnodeAsyncOpen(int32_t numOfThreads);
|
||||
int32_t vnodeAsyncClose();
|
||||
int32_t vnodeAChannelInit(int64_t async, SVAChannelID* channelID);
|
||||
int32_t vnodeAChannelDestroy(SVAChannelID* channelID, bool waitRunning);
|
||||
int32_t vnodeAsync(SVAChannelID* channelID, EVAPriority priority, int32_t (*execute)(void*), void (*complete)(void*),
|
||||
void* arg, SVATaskID* taskID);
|
||||
int32_t vnodeAWait(SVATaskID* taskID);
|
||||
int32_t vnodeACancel(SVATaskID* taskID);
|
||||
int32_t vnodeAsyncSetWorkers(int64_t async, int32_t numWorkers);
|
||||
|
||||
// vnodeBufPool.c
|
||||
typedef struct SVBufPoolNode SVBufPoolNode;
|
||||
|
|
|
@ -237,9 +237,6 @@ int32_t tsdbCacheNewSTableColumn(STsdb* pTsdb, SArray* uids, int16_t cid, int8_t
|
|||
int32_t tsdbCacheDropSTableColumn(STsdb* pTsdb, SArray* uids, int16_t cid, bool hasPrimayKey);
|
||||
int32_t tsdbCacheNewNTableColumn(STsdb* pTsdb, int64_t uid, int16_t cid, int8_t col_type);
|
||||
int32_t tsdbCacheDropNTableColumn(STsdb* pTsdb, int64_t uid, int16_t cid, bool hasPrimayKey);
|
||||
int32_t tsdbCompact(STsdb* pTsdb, SCompactInfo* pInfo);
|
||||
int32_t tsdbRetention(STsdb* tsdb, int64_t now, int32_t sync);
|
||||
int32_t tsdbS3Migrate(STsdb* tsdb, int64_t now, int32_t sync);
|
||||
int tsdbScanAndConvertSubmitMsg(STsdb* pTsdb, SSubmitReq2* pMsg);
|
||||
int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq2* pMsg, SSubmitRsp2* pRsp);
|
||||
int32_t tsdbInsertTableData(STsdb* pTsdb, int64_t version, SSubmitTbData* pSubmitTbData, int32_t* affectedRows);
|
||||
|
@ -472,6 +469,16 @@ typedef struct SVMonitorObj {
|
|||
taos_counter_t* insertCounter;
|
||||
} SVMonitorObj;
|
||||
|
||||
typedef struct {
|
||||
int64_t async;
|
||||
int64_t id;
|
||||
} SVAChannelID;
|
||||
|
||||
typedef struct {
|
||||
int64_t async;
|
||||
int64_t id;
|
||||
} SVATaskID;
|
||||
|
||||
struct SVnode {
|
||||
char* path;
|
||||
SVnodeCfg config;
|
||||
|
@ -493,8 +500,8 @@ struct SVnode {
|
|||
SVBufPool* onRecycle;
|
||||
|
||||
// commit variables
|
||||
int64_t commitChannel;
|
||||
int64_t commitTask;
|
||||
SVAChannelID commitChannel;
|
||||
SVATaskID commitTask;
|
||||
|
||||
SMeta* pMeta;
|
||||
SSma* pSma;
|
||||
|
@ -600,6 +607,24 @@ struct SCompactInfo {
|
|||
|
||||
void initStorageAPI(SStorageAPI* pAPI);
|
||||
|
||||
// a simple hash table impl
|
||||
typedef struct SVHashTable SVHashTable;
|
||||
|
||||
struct SVHashTable {
|
||||
uint32_t (*hash)(const void*);
|
||||
int32_t (*compare)(const void*, const void*);
|
||||
int32_t numEntries;
|
||||
uint32_t numBuckets;
|
||||
struct SVHashEntry** buckets;
|
||||
};
|
||||
|
||||
#define vHashNumEntries(ht) ((ht)->numEntries)
|
||||
int32_t vHashInit(SVHashTable** ht, uint32_t (*hash)(const void*), int32_t (*compare)(const void*, const void*));
|
||||
int32_t vHashDestroy(SVHashTable** ht);
|
||||
int32_t vHashPut(SVHashTable* ht, void* obj);
|
||||
int32_t vHashGet(SVHashTable* ht, const void* obj, void** retObj);
|
||||
int32_t vHashDrop(SVHashTable* ht, const void* obj);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -622,8 +622,8 @@ int32_t smaRetention(SSma *pSma, int64_t now) {
|
|||
|
||||
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
|
||||
if (pSma->pRSmaTsdb[i]) {
|
||||
code = tsdbRetention(pSma->pRSmaTsdb[i], now, pSma->pVnode->config.sttTrigger == 1);
|
||||
if (code) goto _end;
|
||||
// code = tsdbRetention(pSma->pRSmaTsdb[i], now, pSma->pVnode->config.sttTrigger == 1);
|
||||
// if (code) goto _end;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -17,13 +17,13 @@
|
|||
|
||||
// extern dependencies
|
||||
typedef struct {
|
||||
STsdb *tsdb;
|
||||
TFileSetArray *fsetArr;
|
||||
TFileOpArray fopArray[1];
|
||||
|
||||
// SSkmInfo skmTb[1];
|
||||
// SSkmInfo skmRow[1];
|
||||
int32_t fid;
|
||||
bool hasDataToCommit;
|
||||
STFileSet *fset;
|
||||
} SFileSetCommitInfo;
|
||||
|
||||
typedef struct {
|
||||
STsdb *tsdb;
|
||||
int32_t minutes;
|
||||
int8_t precision;
|
||||
int32_t minRow;
|
||||
|
@ -32,34 +32,34 @@ typedef struct {
|
|||
int32_t sttTrigger;
|
||||
int32_t szPage;
|
||||
int64_t compactVersion;
|
||||
int64_t cid;
|
||||
int64_t now;
|
||||
|
||||
struct {
|
||||
int64_t cid;
|
||||
int64_t now;
|
||||
TSKEY nextKey;
|
||||
int32_t fid;
|
||||
int32_t expLevel;
|
||||
SDiskID did;
|
||||
TSKEY minKey;
|
||||
TSKEY maxKey;
|
||||
STFileSet *fset;
|
||||
TABLEID tbid[1];
|
||||
bool hasTSData;
|
||||
bool skipTsRow;
|
||||
SHashObj *pColCmprObj;
|
||||
SFileSetCommitInfo *info;
|
||||
|
||||
int32_t expLevel;
|
||||
SDiskID did;
|
||||
TSKEY minKey;
|
||||
TSKEY maxKey;
|
||||
TABLEID tbid[1];
|
||||
bool hasTSData;
|
||||
|
||||
bool skipTsRow;
|
||||
SHashObj *pColCmprObj;
|
||||
} ctx[1];
|
||||
|
||||
// reader
|
||||
TSttFileReaderArray sttReaderArray[1];
|
||||
|
||||
// iter
|
||||
TTsdbIterArray dataIterArray[1];
|
||||
SIterMerger *dataIterMerger;
|
||||
TTsdbIterArray tombIterArray[1];
|
||||
SIterMerger *tombIterMerger;
|
||||
|
||||
// writer
|
||||
SFSetWriter *writer;
|
||||
|
||||
TFileOpArray fopArray[1];
|
||||
} SCommitter2;
|
||||
|
||||
static int32_t tsdbCommitOpenWriter(SCommitter2 *committer) {
|
||||
|
@ -74,8 +74,8 @@ static int32_t tsdbCommitOpenWriter(SCommitter2 *committer) {
|
|||
.maxRow = committer->maxRow,
|
||||
.szPage = committer->szPage,
|
||||
.cmprAlg = committer->cmprAlg,
|
||||
.fid = committer->ctx->fid,
|
||||
.cid = committer->ctx->cid,
|
||||
.fid = committer->ctx->info->fid,
|
||||
.cid = committer->cid,
|
||||
.did = committer->ctx->did,
|
||||
.level = 0,
|
||||
};
|
||||
|
@ -83,11 +83,11 @@ static int32_t tsdbCommitOpenWriter(SCommitter2 *committer) {
|
|||
if (committer->sttTrigger == 1) {
|
||||
config.toSttOnly = false;
|
||||
|
||||
if (committer->ctx->fset) {
|
||||
if (committer->ctx->info->fset) {
|
||||
for (int32_t ftype = TSDB_FTYPE_MIN; ftype < TSDB_FTYPE_MAX; ftype++) {
|
||||
if (committer->ctx->fset->farr[ftype] != NULL) {
|
||||
if (committer->ctx->info->fset->farr[ftype] != NULL) {
|
||||
config.files[ftype].exist = true;
|
||||
config.files[ftype].file = committer->ctx->fset->farr[ftype]->f[0];
|
||||
config.files[ftype].file = committer->ctx->info->fset->farr[ftype]->f[0];
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -117,7 +117,6 @@ static int32_t tsdbCommitTSData(SCommitter2 *committer) {
|
|||
|
||||
committer->ctx->tbid->suid = 0;
|
||||
committer->ctx->tbid->uid = 0;
|
||||
|
||||
for (SRowInfo *row; (row = tsdbIterMergerGetData(committer->dataIterMerger)) != NULL;) {
|
||||
if (row->uid != committer->ctx->tbid->uid) {
|
||||
committer->ctx->tbid->suid = row->suid;
|
||||
|
@ -132,7 +131,6 @@ static int32_t tsdbCommitTSData(SCommitter2 *committer) {
|
|||
|
||||
int64_t ts = TSDBROW_TS(&row->row);
|
||||
if (ts > committer->ctx->maxKey) {
|
||||
committer->ctx->nextKey = TMIN(committer->ctx->nextKey, ts);
|
||||
code = tsdbIterMergerSkipTableData(committer->dataIterMerger, committer->ctx->tbid);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
continue;
|
||||
|
@ -152,7 +150,8 @@ _exit:
|
|||
if (code) {
|
||||
TSDB_ERROR_LOG(TD_VID(committer->tsdb->pVnode), lino, code);
|
||||
} else {
|
||||
tsdbDebug("vgId:%d fid:%d commit %" PRId64 " rows", TD_VID(committer->tsdb->pVnode), committer->ctx->fid, numOfRow);
|
||||
tsdbDebug("vgId:%d fid:%d commit %" PRId64 " rows", TD_VID(committer->tsdb->pVnode), committer->ctx->info->fid,
|
||||
numOfRow);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
@ -168,7 +167,7 @@ static int32_t tsdbCommitTombData(SCommitter2 *committer) {
|
|||
}
|
||||
|
||||
// do not need to write tomb data if there is no ts data
|
||||
bool skip = (committer->ctx->fset == NULL && !committer->ctx->hasTSData);
|
||||
bool skip = (committer->ctx->info->fset == NULL && !committer->ctx->hasTSData);
|
||||
|
||||
committer->ctx->tbid->suid = 0;
|
||||
committer->ctx->tbid->uid = 0;
|
||||
|
@ -187,12 +186,8 @@ static int32_t tsdbCommitTombData(SCommitter2 *committer) {
|
|||
if (record->ekey < committer->ctx->minKey) {
|
||||
// do nothing
|
||||
} else if (record->skey > committer->ctx->maxKey) {
|
||||
committer->ctx->nextKey = TMIN(record->skey, committer->ctx->nextKey);
|
||||
// committer->ctx->nextKey = TMIN(record->skey, committer->ctx->nextKey);
|
||||
} else {
|
||||
if (record->ekey > committer->ctx->maxKey) {
|
||||
committer->ctx->nextKey = TMIN(committer->ctx->nextKey, committer->ctx->maxKey + 1);
|
||||
}
|
||||
|
||||
record->skey = TMAX(record->skey, committer->ctx->minKey);
|
||||
record->ekey = TMIN(record->ekey, committer->ctx->maxKey);
|
||||
|
||||
|
@ -211,8 +206,8 @@ _exit:
|
|||
if (code) {
|
||||
TSDB_ERROR_LOG(TD_VID(committer->tsdb->pVnode), lino, code);
|
||||
} else {
|
||||
tsdbDebug("vgId:%d fid:%d commit %" PRId64 " tomb records", TD_VID(committer->tsdb->pVnode), committer->ctx->fid,
|
||||
numRecord);
|
||||
tsdbDebug("vgId:%d fid:%d commit %" PRId64 " tomb records", TD_VID(committer->tsdb->pVnode),
|
||||
committer->ctx->info->fid, numRecord);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
@ -223,15 +218,15 @@ static int32_t tsdbCommitOpenReader(SCommitter2 *committer) {
|
|||
|
||||
ASSERT(TARRAY2_SIZE(committer->sttReaderArray) == 0);
|
||||
|
||||
if (committer->ctx->fset == NULL //
|
||||
|| committer->sttTrigger > 1 //
|
||||
|| TARRAY2_SIZE(committer->ctx->fset->lvlArr) == 0 //
|
||||
if (committer->ctx->info->fset == NULL //
|
||||
|| committer->sttTrigger > 1 //
|
||||
|| TARRAY2_SIZE(committer->ctx->info->fset->lvlArr) == 0 //
|
||||
) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
SSttLvl *lvl;
|
||||
TARRAY2_FOREACH(committer->ctx->fset->lvlArr, lvl) {
|
||||
TARRAY2_FOREACH(committer->ctx->info->fset->lvlArr, lvl) {
|
||||
STFileObj *fobj = NULL;
|
||||
TARRAY2_FOREACH(lvl->fobjArr, fobj) {
|
||||
SSttFileReader *sttReader;
|
||||
|
@ -289,7 +284,7 @@ static int32_t tsdbCommitOpenIter(SCommitter2 *committer) {
|
|||
config.from->version = VERSION_MIN;
|
||||
config.from->key = (SRowKey){
|
||||
.ts = committer->ctx->minKey,
|
||||
.numOfPKs = 0, // TODO: support multiple primary keys
|
||||
.numOfPKs = 0,
|
||||
};
|
||||
|
||||
code = tsdbIterOpen(&config, &iter);
|
||||
|
@ -359,22 +354,15 @@ static int32_t tsdbCommitFileSetBegin(SCommitter2 *committer) {
|
|||
int32_t lino = 0;
|
||||
STsdb *tsdb = committer->tsdb;
|
||||
|
||||
int32_t fid = tsdbKeyFid(committer->ctx->nextKey, committer->minutes, committer->precision);
|
||||
|
||||
// check if can commit
|
||||
tsdbFSCheckCommit(tsdb, fid);
|
||||
tsdbFSCheckCommit(tsdb, committer->ctx->info->fid);
|
||||
|
||||
committer->ctx->fid = fid;
|
||||
committer->ctx->expLevel = tsdbFidLevel(committer->ctx->fid, &tsdb->keepCfg, committer->ctx->now);
|
||||
tsdbFidKeyRange(committer->ctx->fid, committer->minutes, committer->precision, &committer->ctx->minKey,
|
||||
committer->ctx->expLevel = tsdbFidLevel(committer->ctx->info->fid, &tsdb->keepCfg, committer->now);
|
||||
tsdbFidKeyRange(committer->ctx->info->fid, committer->minutes, committer->precision, &committer->ctx->minKey,
|
||||
&committer->ctx->maxKey);
|
||||
code = tfsAllocDisk(committer->tsdb->pVnode->pTfs, committer->ctx->expLevel, &committer->ctx->did);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
tfsMkdirRecurAt(committer->tsdb->pVnode->pTfs, committer->tsdb->path, committer->ctx->did);
|
||||
STFileSet fset = {.fid = committer->ctx->fid};
|
||||
committer->ctx->fset = &fset;
|
||||
STFileSet **fsetPtr = TARRAY2_SEARCH(committer->fsetArr, &committer->ctx->fset, tsdbTFileSetCmprFn, TD_EQ);
|
||||
committer->ctx->fset = (fsetPtr == NULL) ? NULL : *fsetPtr;
|
||||
committer->ctx->tbid->suid = 0;
|
||||
committer->ctx->tbid->uid = 0;
|
||||
|
||||
|
@ -391,15 +379,13 @@ static int32_t tsdbCommitFileSetBegin(SCommitter2 *committer) {
|
|||
code = tsdbCommitOpenWriter(committer);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
// reset nextKey
|
||||
committer->ctx->nextKey = TSKEY_MAX;
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code);
|
||||
} else {
|
||||
tsdbDebug("vgId:%d %s done, fid:%d minKey:%" PRId64 " maxKey:%" PRId64 " expLevel:%d", TD_VID(tsdb->pVnode),
|
||||
__func__, committer->ctx->fid, committer->ctx->minKey, committer->ctx->maxKey, committer->ctx->expLevel);
|
||||
__func__, committer->ctx->info->fid, committer->ctx->minKey, committer->ctx->maxKey,
|
||||
committer->ctx->expLevel);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
@ -421,7 +407,7 @@ _exit:
|
|||
if (code) {
|
||||
TSDB_ERROR_LOG(TD_VID(committer->tsdb->pVnode), lino, code);
|
||||
} else {
|
||||
tsdbDebug("vgId:%d %s done, fid:%d", TD_VID(committer->tsdb->pVnode), __func__, committer->ctx->fid);
|
||||
tsdbDebug("vgId:%d %s done, fid:%d", TD_VID(committer->tsdb->pVnode), __func__, committer->ctx->info->fid);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
@ -449,7 +435,201 @@ _exit:
|
|||
if (code) {
|
||||
TSDB_ERROR_LOG(TD_VID(committer->tsdb->pVnode), lino, code);
|
||||
} else {
|
||||
tsdbDebug("vgId:%d %s done, fid:%d", TD_VID(committer->tsdb->pVnode), __func__, committer->ctx->fid);
|
||||
tsdbDebug("vgId:%d %s done, fid:%d", TD_VID(committer->tsdb->pVnode), __func__, committer->ctx->info->fid);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t tFileSetCommitInfoCompare(const void *arg1, const void *arg2) {
|
||||
SFileSetCommitInfo *info1 = (SFileSetCommitInfo *)arg1;
|
||||
SFileSetCommitInfo *info2 = (SFileSetCommitInfo *)arg2;
|
||||
|
||||
if (info1->fid < info2->fid) {
|
||||
return -1;
|
||||
} else if (info1->fid > info2->fid) {
|
||||
return 1;
|
||||
} else {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
static int32_t tFileSetCommitInfoPCompare(const void *arg1, const void *arg2) {
|
||||
return tFileSetCommitInfoCompare(*(SFileSetCommitInfo **)arg1, *(SFileSetCommitInfo **)arg2);
|
||||
}
|
||||
|
||||
static uint32_t tFileSetCommitInfoHash(const void *arg) {
|
||||
SFileSetCommitInfo *info = (SFileSetCommitInfo *)arg;
|
||||
return MurmurHash3_32((const char *)&info->fid, sizeof(info->fid));
|
||||
}
|
||||
|
||||
static int32_t tsdbCommitInfoDestroy(STsdb *pTsdb) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
if (pTsdb->commitInfo) {
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pTsdb->commitInfo->arr); i++) {
|
||||
SFileSetCommitInfo *info = *(SFileSetCommitInfo **)taosArrayGet(pTsdb->commitInfo->arr, i);
|
||||
vHashDrop(pTsdb->commitInfo->ht, info);
|
||||
tsdbTFileSetClear(&info->fset);
|
||||
taosMemoryFree(info);
|
||||
}
|
||||
|
||||
vHashDestroy(&pTsdb->commitInfo->ht);
|
||||
taosArrayDestroy(pTsdb->commitInfo->arr);
|
||||
pTsdb->commitInfo->arr = NULL;
|
||||
taosMemoryFreeClear(pTsdb->commitInfo);
|
||||
}
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbCommitInfoInit(STsdb *pTsdb) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
pTsdb->commitInfo = taosMemoryCalloc(1, sizeof(*pTsdb->commitInfo));
|
||||
if (pTsdb->commitInfo == NULL) {
|
||||
TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit);
|
||||
}
|
||||
|
||||
code = vHashInit(&pTsdb->commitInfo->ht, tFileSetCommitInfoHash, tFileSetCommitInfoCompare);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
pTsdb->commitInfo->arr = taosArrayInit(0, sizeof(SFileSetCommitInfo *));
|
||||
if (pTsdb->commitInfo->arr == NULL) {
|
||||
TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit);
|
||||
}
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
tsdbCommitInfoDestroy(pTsdb);
|
||||
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbCommitInfoAdd(STsdb *tsdb, const SFileSetCommitInfo *info) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
SFileSetCommitInfo *tinfo;
|
||||
|
||||
vHashGet(tsdb->commitInfo->ht, info, (void **)&tinfo);
|
||||
if (tinfo) {
|
||||
if (info->hasDataToCommit && !tinfo->hasDataToCommit) {
|
||||
tinfo->hasDataToCommit = true;
|
||||
}
|
||||
} else {
|
||||
if ((tinfo = taosMemoryCalloc(1, sizeof(*tinfo))) == NULL) {
|
||||
TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit);
|
||||
}
|
||||
tinfo->fid = info->fid;
|
||||
tinfo->hasDataToCommit = info->hasDataToCommit;
|
||||
if (info->fset) {
|
||||
code = tsdbTFileSetInitCopy(tsdb, info->fset, &tinfo->fset);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
code = vHashPut(tsdb->commitInfo->ht, tinfo);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
if ((taosArrayPush(tsdb->commitInfo->arr, &tinfo)) == NULL) {
|
||||
TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit);
|
||||
}
|
||||
taosArraySort(tsdb->commitInfo->arr, tFileSetCommitInfoPCompare);
|
||||
}
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(tsdb->pVnode), __func__, lino, tstrerror(code));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbCommitInfoBuild(STsdb *tsdb) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
STFileSet *fset = NULL;
|
||||
SRBTreeIter iter;
|
||||
|
||||
code = tsdbCommitInfoInit(tsdb);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
taosThreadMutexLock(&tsdb->mutex);
|
||||
TARRAY2_FOREACH(tsdb->pFS->fSetArr, fset) {
|
||||
SFileSetCommitInfo info = {
|
||||
.fid = fset->fid,
|
||||
.hasDataToCommit = false,
|
||||
.fset = fset,
|
||||
};
|
||||
if ((code = tsdbCommitInfoAdd(tsdb, &info))) {
|
||||
taosThreadMutexUnlock(&tsdb->mutex);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
}
|
||||
taosThreadMutexUnlock(&tsdb->mutex);
|
||||
|
||||
iter = tRBTreeIterCreate(tsdb->imem->tbDataTree, 1);
|
||||
for (SRBTreeNode *node = tRBTreeIterNext(&iter); node; node = tRBTreeIterNext(&iter)) {
|
||||
STbData *pTbData = TCONTAINER_OF(node, STbData, rbtn);
|
||||
|
||||
// scan time-series data
|
||||
STsdbRowKey from = {
|
||||
.key.ts = INT64_MIN,
|
||||
.key.numOfPKs = 0,
|
||||
.version = INT64_MIN,
|
||||
};
|
||||
for (;;) {
|
||||
int64_t minKey, maxKey;
|
||||
STbDataIter tbDataIter = {0};
|
||||
TSDBROW *row;
|
||||
int32_t fid;
|
||||
|
||||
tsdbTbDataIterOpen(pTbData, &from, 0, &tbDataIter);
|
||||
if ((row = tsdbTbDataIterGet(&tbDataIter)) == NULL) {
|
||||
break;
|
||||
}
|
||||
|
||||
fid = tsdbKeyFid(TSDBROW_TS(row), tsdb->keepCfg.days, tsdb->keepCfg.precision);
|
||||
tsdbFidKeyRange(fid, tsdb->keepCfg.days, tsdb->keepCfg.precision, &minKey, &maxKey);
|
||||
|
||||
SFileSetCommitInfo info = {
|
||||
.fid = fid,
|
||||
.hasDataToCommit = true,
|
||||
.fset = NULL,
|
||||
};
|
||||
code = tsdbCommitInfoAdd(tsdb, &info);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
from.key.ts = maxKey + 1;
|
||||
}
|
||||
|
||||
// scan tomb data
|
||||
for (SDelData *pDelData = pTbData->pHead; pDelData; pDelData = pDelData->pNext) {
|
||||
for (int32_t i = taosArrayGetSize(tsdb->commitInfo->arr) - 1; i >= 0; i--) {
|
||||
int64_t minKey, maxKey;
|
||||
SFileSetCommitInfo *info = *(SFileSetCommitInfo **)taosArrayGet(tsdb->commitInfo->arr, i);
|
||||
|
||||
tsdbFidKeyRange(info->fid, tsdb->keepCfg.days, tsdb->keepCfg.precision, &minKey, &maxKey);
|
||||
|
||||
if (pDelData->sKey > maxKey || pDelData->eKey < minKey) {
|
||||
continue;
|
||||
} else if (!info->hasDataToCommit) {
|
||||
info->hasDataToCommit = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
tsdbCommitInfoDestroy(tsdb);
|
||||
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(tsdb->pVnode), __func__, lino, tstrerror(code));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
@ -458,11 +638,7 @@ static int32_t tsdbOpenCommitter(STsdb *tsdb, SCommitInfo *info, SCommitter2 *co
|
|||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
memset(committer, 0, sizeof(committer[0]));
|
||||
|
||||
committer->tsdb = tsdb;
|
||||
code = tsdbFSCreateCopySnapshot(tsdb->pFS, &committer->fsetArr);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
committer->minutes = tsdb->keepCfg.days;
|
||||
committer->precision = tsdb->keepCfg.precision;
|
||||
committer->minRow = info->info.config.tsdbCfg.minRows;
|
||||
|
@ -471,21 +647,21 @@ static int32_t tsdbOpenCommitter(STsdb *tsdb, SCommitInfo *info, SCommitter2 *co
|
|||
committer->sttTrigger = info->info.config.sttTrigger;
|
||||
committer->szPage = info->info.config.tsdbPageSize;
|
||||
committer->compactVersion = INT64_MAX;
|
||||
committer->ctx->cid = tsdbFSAllocEid(tsdb->pFS);
|
||||
committer->ctx->now = taosGetTimestampSec();
|
||||
committer->cid = tsdbFSAllocEid(tsdb->pFS);
|
||||
committer->now = taosGetTimestampSec();
|
||||
|
||||
committer->ctx->nextKey = tsdb->imem->minKey;
|
||||
if (tsdb->imem->nDel > 0) {
|
||||
SRBTreeIter iter[1] = {tRBTreeIterCreate(tsdb->imem->tbDataTree, 1)};
|
||||
code = tsdbCommitInfoBuild(tsdb);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
for (SRBTreeNode *node = tRBTreeIterNext(iter); node; node = tRBTreeIterNext(iter)) {
|
||||
STbData *tbData = TCONTAINER_OF(node, STbData, rbtn);
|
||||
|
||||
for (SDelData *delData = tbData->pHead; delData; delData = delData->pNext) {
|
||||
committer->ctx->nextKey = TMIN(committer->ctx->nextKey, delData->sKey);
|
||||
}
|
||||
STFileSet *fset;
|
||||
taosThreadMutexLock(&tsdb->mutex);
|
||||
for (int i = 0; i < taosArrayGetSize(tsdb->commitInfo->arr); i++) {
|
||||
SFileSetCommitInfo *info = *(SFileSetCommitInfo **)taosArrayGet(tsdb->commitInfo->arr, i);
|
||||
if (info->hasDataToCommit && info->fset) {
|
||||
tsdbBeginTaskOnFileSet(tsdb, info->fid, &fset);
|
||||
}
|
||||
}
|
||||
taosThreadMutexUnlock(&tsdb->mutex);
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
|
@ -516,14 +692,13 @@ static int32_t tsdbCloseCommitter(SCommitter2 *committer, int32_t eno) {
|
|||
TARRAY2_DESTROY(committer->sttReaderArray, NULL);
|
||||
TARRAY2_DESTROY(committer->fopArray, NULL);
|
||||
TARRAY2_DESTROY(committer->sttReaderArray, NULL);
|
||||
tsdbFSDestroyCopySnapshot(&committer->fsetArr);
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
tsdbError("vgId:%d %s failed at line %d since %s, eid:%" PRId64, TD_VID(committer->tsdb->pVnode), __func__, lino,
|
||||
tstrerror(code), committer->ctx->cid);
|
||||
tstrerror(code), committer->cid);
|
||||
} else {
|
||||
tsdbDebug("vgId:%d %s done, eid:%" PRId64, TD_VID(committer->tsdb->pVnode), __func__, committer->ctx->cid);
|
||||
tsdbDebug("vgId:%d %s done, eid:%" PRId64, TD_VID(committer->tsdb->pVnode), __func__, committer->cid);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
@ -553,17 +728,20 @@ int32_t tsdbCommitBegin(STsdb *tsdb, SCommitInfo *info) {
|
|||
taosThreadMutexUnlock(&tsdb->mutex);
|
||||
tsdbUnrefMemTable(imem, NULL, true);
|
||||
} else {
|
||||
SCommitter2 committer[1];
|
||||
SCommitter2 committer = {0};
|
||||
|
||||
code = tsdbOpenCommitter(tsdb, info, committer);
|
||||
code = tsdbOpenCommitter(tsdb, info, &committer);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
while (committer->ctx->nextKey != TSKEY_MAX) {
|
||||
code = tsdbCommitFileSet(committer);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
for (int32_t i = 0; i < taosArrayGetSize(tsdb->commitInfo->arr); i++) {
|
||||
committer.ctx->info = *(SFileSetCommitInfo **)taosArrayGet(tsdb->commitInfo->arr, i);
|
||||
if (committer.ctx->info->hasDataToCommit) {
|
||||
code = tsdbCommitFileSet(&committer);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
}
|
||||
|
||||
code = tsdbCloseCommitter(committer, code);
|
||||
code = tsdbCloseCommitter(&committer, code);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
|
@ -580,22 +758,33 @@ int32_t tsdbCommitCommit(STsdb *tsdb) {
|
|||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
if (tsdb->imem == NULL) goto _exit;
|
||||
if (tsdb->imem) {
|
||||
SMemTable *pMemTable = tsdb->imem;
|
||||
|
||||
taosThreadMutexLock(&tsdb->mutex);
|
||||
|
||||
if ((code = tsdbFSEditCommit(tsdb->pFS))) {
|
||||
taosThreadMutexUnlock(&tsdb->mutex);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
tsdb->imem = NULL;
|
||||
|
||||
for (int32_t i = 0; i < taosArrayGetSize(tsdb->commitInfo->arr); i++) {
|
||||
SFileSetCommitInfo *info = *(SFileSetCommitInfo **)taosArrayGet(tsdb->commitInfo->arr, i);
|
||||
if (info->hasDataToCommit && info->fset) {
|
||||
tsdbFinishTaskOnFileSet(tsdb, info->fid);
|
||||
}
|
||||
}
|
||||
|
||||
SMemTable *pMemTable = tsdb->imem;
|
||||
taosThreadMutexLock(&tsdb->mutex);
|
||||
code = tsdbFSEditCommit(tsdb->pFS);
|
||||
if (code) {
|
||||
taosThreadMutexUnlock(&tsdb->mutex);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
tsdbCommitInfoDestroy(tsdb);
|
||||
tsdbUnrefMemTable(pMemTable, NULL, true);
|
||||
}
|
||||
tsdb->imem = NULL;
|
||||
taosThreadMutexUnlock(&tsdb->mutex);
|
||||
tsdbUnrefMemTable(pMemTable, NULL, true);
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code);
|
||||
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(tsdb->pVnode), __func__, lino, tstrerror(code));
|
||||
} else {
|
||||
tsdbInfo("vgId:%d %s done", TD_VID(tsdb->pVnode), __func__);
|
||||
}
|
||||
|
@ -611,6 +800,16 @@ int32_t tsdbCommitAbort(STsdb *pTsdb) {
|
|||
code = tsdbFSEditAbort(pTsdb->pFS);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
taosThreadMutexLock(&pTsdb->mutex);
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pTsdb->commitInfo->arr); i++) {
|
||||
SFileSetCommitInfo *info = *(SFileSetCommitInfo **)taosArrayGet(pTsdb->commitInfo->arr, i);
|
||||
if (info->hasDataToCommit && info->fset) {
|
||||
tsdbFinishTaskOnFileSet(pTsdb, info->fid);
|
||||
}
|
||||
}
|
||||
taosThreadMutexUnlock(&pTsdb->mutex);
|
||||
tsdbCommitInfoDestroy(pTsdb);
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
|
||||
|
|
|
@ -22,9 +22,6 @@
|
|||
|
||||
extern void remove_file(const char *fname);
|
||||
|
||||
#define TSDB_FS_EDIT_MIN TSDB_FEDIT_COMMIT
|
||||
#define TSDB_FS_EDIT_MAX (TSDB_FEDIT_MERGE + 1)
|
||||
|
||||
typedef struct STFileHashEntry {
|
||||
struct STFileHashEntry *next;
|
||||
char fname[TSDB_FILENAME_LEN];
|
||||
|
@ -290,10 +287,8 @@ static int32_t commit_edit(STFileSystem *fs) {
|
|||
current_fname(fs->tsdb, current, TSDB_FCURRENT);
|
||||
if (fs->etype == TSDB_FEDIT_COMMIT) {
|
||||
current_fname(fs->tsdb, current_t, TSDB_FCURRENT_C);
|
||||
} else if (fs->etype == TSDB_FEDIT_MERGE) {
|
||||
current_fname(fs->tsdb, current_t, TSDB_FCURRENT_M);
|
||||
} else {
|
||||
ASSERT(0);
|
||||
current_fname(fs->tsdb, current_t, TSDB_FCURRENT_M);
|
||||
}
|
||||
|
||||
int32_t code;
|
||||
|
@ -324,11 +319,8 @@ static int32_t abort_edit(STFileSystem *fs) {
|
|||
|
||||
if (fs->etype == TSDB_FEDIT_COMMIT) {
|
||||
current_fname(fs->tsdb, fname, TSDB_FCURRENT_C);
|
||||
} else if (fs->etype == TSDB_FEDIT_MERGE) {
|
||||
current_fname(fs->tsdb, fname, TSDB_FCURRENT_M);
|
||||
} else {
|
||||
tsdbError("vgId:%d %s failed since invalid etype:%d", TD_VID(fs->tsdb->pVnode), __func__, fs->etype);
|
||||
ASSERT(0);
|
||||
current_fname(fs->tsdb, fname, TSDB_FCURRENT_M);
|
||||
}
|
||||
|
||||
int32_t code;
|
||||
|
@ -767,9 +759,12 @@ extern int32_t tsdbStopAllCompTask(STsdb *tsdb);
|
|||
|
||||
int32_t tsdbDisableAndCancelAllBgTask(STsdb *pTsdb) {
|
||||
STFileSystem *fs = pTsdb->pFS;
|
||||
TARRAY2(int64_t) channelArr = {0};
|
||||
SArray *channelArray = taosArrayInit(0, sizeof(SVAChannelID));
|
||||
if (channelArray == NULL) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
taosThreadMutexLock(&fs->tsdb->mutex);
|
||||
taosThreadMutexLock(&pTsdb->mutex);
|
||||
|
||||
// disable
|
||||
pTsdb->bgTaskDisabled = true;
|
||||
|
@ -777,20 +772,23 @@ int32_t tsdbDisableAndCancelAllBgTask(STsdb *pTsdb) {
|
|||
// collect channel
|
||||
STFileSet *fset;
|
||||
TARRAY2_FOREACH(fs->fSetArr, fset) {
|
||||
if (VNODE_ASYNC_VALID_CHANNEL_ID(fset->bgTaskChannel)) {
|
||||
TARRAY2_APPEND(&channelArr, fset->bgTaskChannel);
|
||||
fset->bgTaskChannel = 0;
|
||||
if (fset->channelOpened) {
|
||||
taosArrayPush(channelArray, &fset->channel);
|
||||
fset->channel = (SVAChannelID){0};
|
||||
fset->mergeScheduled = false;
|
||||
tsdbFSSetBlockCommit(fset, false);
|
||||
fset->channelOpened = false;
|
||||
}
|
||||
fset->mergeScheduled = false;
|
||||
tsdbFSSetBlockCommit(fset, false);
|
||||
}
|
||||
|
||||
taosThreadMutexUnlock(&fs->tsdb->mutex);
|
||||
taosThreadMutexUnlock(&pTsdb->mutex);
|
||||
|
||||
// destroy all channels
|
||||
int64_t channel;
|
||||
TARRAY2_FOREACH(&channelArr, channel) { vnodeAChannelDestroy(vnodeAsyncHandle[1], channel, true); }
|
||||
TARRAY2_DESTROY(&channelArr, NULL);
|
||||
for (int32_t i = 0; i < taosArrayGetSize(channelArray); i++) {
|
||||
SVAChannelID *channel = taosArrayGet(channelArray, i);
|
||||
vnodeAChannelDestroy(channel, true);
|
||||
}
|
||||
taosArrayDestroy(channelArray);
|
||||
|
||||
#ifdef TD_ENTERPRISE
|
||||
tsdbStopAllCompTask(pTsdb);
|
||||
|
@ -832,15 +830,10 @@ int32_t tsdbFSEditBegin(STFileSystem *fs, const TFileOpArray *opArray, EFEditT e
|
|||
int32_t lino;
|
||||
char current_t[TSDB_FILENAME_LEN];
|
||||
|
||||
switch (etype) {
|
||||
case TSDB_FEDIT_COMMIT:
|
||||
current_fname(fs->tsdb, current_t, TSDB_FCURRENT_C);
|
||||
break;
|
||||
case TSDB_FEDIT_MERGE:
|
||||
current_fname(fs->tsdb, current_t, TSDB_FCURRENT_M);
|
||||
break;
|
||||
default:
|
||||
ASSERT(0);
|
||||
if (etype == TSDB_FEDIT_COMMIT) {
|
||||
current_fname(fs->tsdb, current_t, TSDB_FCURRENT_C);
|
||||
} else {
|
||||
current_fname(fs->tsdb, current_t, TSDB_FCURRENT_M);
|
||||
}
|
||||
|
||||
tsem_wait(&fs->canEdit);
|
||||
|
@ -932,8 +925,7 @@ int32_t tsdbFSEditCommit(STFileSystem *fs) {
|
|||
arg->tsdb = fs->tsdb;
|
||||
arg->fid = fset->fid;
|
||||
|
||||
code = vnodeAsyncC(vnodeAsyncHandle[1], fset->bgTaskChannel, EVA_PRIORITY_HIGH, tsdbMerge, taosMemoryFree, arg,
|
||||
NULL);
|
||||
code = vnodeAsync(&fset->channel, EVA_PRIORITY_HIGH, tsdbMerge, taosMemoryFree, arg, NULL);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
fset->mergeScheduled = true;
|
||||
}
|
||||
|
@ -946,33 +938,6 @@ int32_t tsdbFSEditCommit(STFileSystem *fs) {
|
|||
}
|
||||
}
|
||||
|
||||
// clear empty level and fset
|
||||
int32_t i = 0;
|
||||
while (i < TARRAY2_SIZE(fs->fSetArr)) {
|
||||
STFileSet *fset = TARRAY2_GET(fs->fSetArr, i);
|
||||
|
||||
int32_t j = 0;
|
||||
while (j < TARRAY2_SIZE(fset->lvlArr)) {
|
||||
SSttLvl *lvl = TARRAY2_GET(fset->lvlArr, j);
|
||||
|
||||
if (TARRAY2_SIZE(lvl->fobjArr) == 0) {
|
||||
TARRAY2_REMOVE(fset->lvlArr, j, tsdbSttLvlClear);
|
||||
} else {
|
||||
j++;
|
||||
}
|
||||
}
|
||||
|
||||
if (tsdbTFileSetIsEmpty(fset)) {
|
||||
if (VNODE_ASYNC_VALID_CHANNEL_ID(fset->bgTaskChannel)) {
|
||||
vnodeAChannelDestroy(vnodeAsyncHandle[1], fset->bgTaskChannel, false);
|
||||
fset->bgTaskChannel = 0;
|
||||
}
|
||||
TARRAY2_REMOVE(fs->fSetArr, i, tsdbTFileSetClear);
|
||||
} else {
|
||||
i++;
|
||||
}
|
||||
}
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(fs->tsdb->pVnode), __func__, lino, tstrerror(code));
|
||||
|
@ -1211,3 +1176,47 @@ _out:
|
|||
}
|
||||
|
||||
int32_t tsdbFSDestroyRefRangedSnapshot(TFileSetRangeArray **fsrArr) { return tsdbTFileSetRangeArrayDestroy(fsrArr); }
|
||||
|
||||
int32_t tsdbBeginTaskOnFileSet(STsdb *tsdb, int32_t fid, STFileSet **fset) {
|
||||
int16_t sttTrigger = tsdb->pVnode->config.sttTrigger;
|
||||
|
||||
tsdbFSGetFSet(tsdb->pFS, fid, fset);
|
||||
if (sttTrigger == 1 && fset) {
|
||||
for (;;) {
|
||||
if ((*fset)->taskRunning) {
|
||||
(*fset)->numWaitTask++;
|
||||
|
||||
taosThreadCondWait(&(*fset)->beginTask, &tsdb->mutex);
|
||||
|
||||
tsdbFSGetFSet(tsdb->pFS, fid, fset);
|
||||
ASSERT(fset != NULL);
|
||||
|
||||
(*fset)->numWaitTask--;
|
||||
ASSERT((*fset)->numWaitTask >= 0);
|
||||
} else {
|
||||
(*fset)->taskRunning = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
tsdbInfo("vgId:%d begin task on file set:%d", TD_VID(tsdb->pVnode), fid);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tsdbFinishTaskOnFileSet(STsdb *tsdb, int32_t fid) {
|
||||
int16_t sttTrigger = tsdb->pVnode->config.sttTrigger;
|
||||
if (sttTrigger == 1) {
|
||||
STFileSet *fset = NULL;
|
||||
tsdbFSGetFSet(tsdb->pFS, fid, &fset);
|
||||
if (fset != NULL && fset->taskRunning) {
|
||||
fset->taskRunning = false;
|
||||
if (fset->numWaitTask > 0) {
|
||||
taosThreadCondSignal(&fset->beginTask);
|
||||
}
|
||||
tsdbInfo("vgId:%d finish task on file set:%d", TD_VID(tsdb->pVnode), fid);
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
|
@ -24,7 +24,9 @@ extern "C" {
|
|||
|
||||
typedef enum {
|
||||
TSDB_FEDIT_COMMIT = 1, //
|
||||
TSDB_FEDIT_MERGE
|
||||
TSDB_FEDIT_MERGE,
|
||||
TSDB_FEDIT_COMPACT,
|
||||
TSDB_FEDIT_RETENTION,
|
||||
} EFEditT;
|
||||
|
||||
typedef enum {
|
||||
|
@ -59,6 +61,8 @@ int32_t tsdbFSEditAbort(STFileSystem *fs);
|
|||
// other
|
||||
int32_t tsdbFSGetFSet(STFileSystem *fs, int32_t fid, STFileSet **fset);
|
||||
int32_t tsdbFSCheckCommit(STsdb *tsdb, int32_t fid);
|
||||
int32_t tsdbBeginTaskOnFileSet(STsdb *tsdb, int32_t fid, STFileSet **fset);
|
||||
int32_t tsdbFinishTaskOnFileSet(STsdb *tsdb, int32_t fid);
|
||||
// utils
|
||||
int32_t save_fs(const TFileSetArray *arr, const char *fname);
|
||||
int32_t current_fname(STsdb *pTsdb, char *fname, EFCurrentT ftype);
|
||||
|
@ -72,10 +76,6 @@ struct STFileSystem {
|
|||
EFEditT etype;
|
||||
TFileSetArray fSetArr[1];
|
||||
TFileSetArray fSetArrTmp[1];
|
||||
|
||||
// background task queue
|
||||
bool stop;
|
||||
int64_t taskid;
|
||||
};
|
||||
|
||||
#ifdef __cplusplus
|
||||
|
|
|
@ -456,13 +456,14 @@ int32_t tsdbTFileSetInit(int32_t fid, STFileSet **fset) {
|
|||
TARRAY2_INIT(fset[0]->lvlArr);
|
||||
|
||||
// background task queue
|
||||
fset[0]->bgTaskChannel = 0;
|
||||
fset[0]->mergeScheduled = false;
|
||||
taosThreadCondInit(&(*fset)->beginTask, NULL);
|
||||
(*fset)->taskRunning = false;
|
||||
(*fset)->numWaitTask = 0;
|
||||
|
||||
// block commit variables
|
||||
taosThreadCondInit(&fset[0]->canCommit, NULL);
|
||||
fset[0]->numWaitCommit = 0;
|
||||
fset[0]->blockCommit = false;
|
||||
(*fset)->numWaitCommit = 0;
|
||||
(*fset)->blockCommit = false;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
@ -598,21 +599,19 @@ int32_t tsdbTFileSetRangeArrayDestroy(TFileSetRangeArray **ppArr) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t tsdbTFileSetClear(STFileSet **fset) {
|
||||
if (!fset[0]) return 0;
|
||||
void tsdbTFileSetClear(STFileSet **fset) {
|
||||
if (fset && *fset) {
|
||||
for (tsdb_ftype_t ftype = TSDB_FTYPE_MIN; ftype < TSDB_FTYPE_MAX; ++ftype) {
|
||||
if ((*fset)->farr[ftype] == NULL) continue;
|
||||
tsdbTFileObjUnref((*fset)->farr[ftype]);
|
||||
}
|
||||
|
||||
for (tsdb_ftype_t ftype = TSDB_FTYPE_MIN; ftype < TSDB_FTYPE_MAX; ++ftype) {
|
||||
if (fset[0]->farr[ftype] == NULL) continue;
|
||||
tsdbTFileObjUnref(fset[0]->farr[ftype]);
|
||||
TARRAY2_DESTROY((*fset)->lvlArr, tsdbSttLvlClear);
|
||||
|
||||
taosThreadCondDestroy(&(*fset)->beginTask);
|
||||
taosThreadCondDestroy(&(*fset)->canCommit);
|
||||
taosMemoryFreeClear(*fset);
|
||||
}
|
||||
|
||||
TARRAY2_DESTROY(fset[0]->lvlArr, tsdbSttLvlClear);
|
||||
|
||||
taosThreadCondDestroy(&fset[0]->canCommit);
|
||||
taosMemoryFree(fset[0]);
|
||||
fset[0] = NULL;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tsdbTFileSetRemove(STFileSet *fset) {
|
||||
|
@ -665,6 +664,12 @@ bool tsdbTFileSetIsEmpty(const STFileSet *fset) {
|
|||
}
|
||||
|
||||
int32_t tsdbTFileSetOpenChannel(STFileSet *fset) {
|
||||
if (VNODE_ASYNC_VALID_CHANNEL_ID(fset->bgTaskChannel)) return 0;
|
||||
return vnodeAChannelInit(vnodeAsyncHandle[1], &fset->bgTaskChannel);
|
||||
int32_t code;
|
||||
if (!fset->channelOpened) {
|
||||
if ((code = vnodeAChannelInit(2, &fset->channel))) {
|
||||
return code;
|
||||
}
|
||||
fset->channelOpened = true;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -42,7 +42,7 @@ typedef enum {
|
|||
int32_t tsdbTFileSetInit(int32_t fid, STFileSet **fset);
|
||||
int32_t tsdbTFileSetInitCopy(STsdb *pTsdb, const STFileSet *fset1, STFileSet **fset);
|
||||
int32_t tsdbTFileSetInitRef(STsdb *pTsdb, const STFileSet *fset1, STFileSet **fset);
|
||||
int32_t tsdbTFileSetClear(STFileSet **fset);
|
||||
void tsdbTFileSetClear(STFileSet **fset);
|
||||
int32_t tsdbTFileSetRemove(STFileSet *fset);
|
||||
|
||||
int32_t tsdbTFileSetFilteredInitDup(STsdb *pTsdb, const STFileSet *fset1, int64_t ever, STFileSet **fset,
|
||||
|
@ -90,9 +90,15 @@ struct STFileSet {
|
|||
STFileObj *farr[TSDB_FTYPE_MAX]; // file array
|
||||
TSttLvlArray lvlArr[1]; // level array
|
||||
|
||||
// background task channel
|
||||
int64_t bgTaskChannel;
|
||||
bool mergeScheduled;
|
||||
// background task
|
||||
bool channelOpened;
|
||||
SVAChannelID channel;
|
||||
bool mergeScheduled;
|
||||
|
||||
// sttTrigger = 1
|
||||
TdThreadCond beginTask;
|
||||
bool taskRunning;
|
||||
int32_t numWaitTask;
|
||||
|
||||
// block commit variables
|
||||
TdThreadCond canCommit;
|
||||
|
|
|
@ -593,5 +593,6 @@ _exit:
|
|||
exit(EXIT_FAILURE);
|
||||
}
|
||||
tsdbTFileSetClear(&merger->fset);
|
||||
taosMemoryFree(arg);
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -24,8 +24,8 @@ typedef struct {
|
|||
int64_t now;
|
||||
int64_t cid;
|
||||
|
||||
TFileSetArray *fsetArr;
|
||||
TFileOpArray fopArr[1];
|
||||
STFileSet *fset;
|
||||
TFileOpArray fopArr;
|
||||
} SRTNer;
|
||||
|
||||
static int32_t tsdbDoRemoveFileObject(SRTNer *rtner, const STFileObj *fobj) {
|
||||
|
@ -35,7 +35,7 @@ static int32_t tsdbDoRemoveFileObject(SRTNer *rtner, const STFileObj *fobj) {
|
|||
.of = fobj->f[0],
|
||||
};
|
||||
|
||||
return TARRAY2_APPEND(rtner->fopArr, op);
|
||||
return TARRAY2_APPEND(&rtner->fopArr, op);
|
||||
}
|
||||
|
||||
static int32_t tsdbDoCopyFileLC(SRTNer *rtner, const STFileObj *from, const STFile *to) {
|
||||
|
@ -71,7 +71,7 @@ static int32_t tsdbDoCopyFileLC(SRTNer *rtner, const STFileObj *from, const STFi
|
|||
|
||||
_exit:
|
||||
if (code) {
|
||||
TSDB_ERROR_LOG(TD_VID(rtner->tsdb->pVnode), lino, code);
|
||||
tsdbError("vgId:%d, %s failed, code:%d, line:%d", TD_VID(rtner->tsdb->pVnode), __func__, code, lino);
|
||||
if (fdFrom) taosCloseFile(&fdFrom);
|
||||
if (fdTo) taosCloseFile(&fdTo);
|
||||
}
|
||||
|
@ -108,7 +108,7 @@ static int32_t tsdbDoCopyFile(SRTNer *rtner, const STFileObj *from, const STFile
|
|||
|
||||
_exit:
|
||||
if (code) {
|
||||
TSDB_ERROR_LOG(TD_VID(rtner->tsdb->pVnode), lino, code);
|
||||
tsdbError("vgId:%d, %s failed, code:%d, line:%d", TD_VID(rtner->tsdb->pVnode), __func__, code, lino);
|
||||
taosCloseFile(&fdFrom);
|
||||
taosCloseFile(&fdTo);
|
||||
}
|
||||
|
@ -128,7 +128,7 @@ static int32_t tsdbDoMigrateFileObj(SRTNer *rtner, const STFileObj *fobj, const
|
|||
.of = fobj->f[0],
|
||||
};
|
||||
|
||||
code = TARRAY2_APPEND(rtner->fopArr, op);
|
||||
code = TARRAY2_APPEND(&rtner->fopArr, op);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
// create new
|
||||
|
@ -152,7 +152,7 @@ static int32_t tsdbDoMigrateFileObj(SRTNer *rtner, const STFileObj *fobj, const
|
|||
},
|
||||
};
|
||||
|
||||
code = TARRAY2_APPEND(rtner->fopArr, op);
|
||||
code = TARRAY2_APPEND(&rtner->fopArr, op);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
// do copy the file
|
||||
|
@ -167,7 +167,7 @@ static int32_t tsdbDoMigrateFileObj(SRTNer *rtner, const STFileObj *fobj, const
|
|||
|
||||
_exit:
|
||||
if (code) {
|
||||
TSDB_ERROR_LOG(TD_VID(rtner->tsdb->pVnode), lino, code);
|
||||
tsdbError("vgId:%d, %s failed, code:%d, line:%d", TD_VID(rtner->tsdb->pVnode), __func__, code, lino);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
@ -176,80 +176,51 @@ typedef struct {
|
|||
STsdb *tsdb;
|
||||
int64_t now;
|
||||
int32_t fid;
|
||||
bool s3Migrate;
|
||||
} SRtnArg;
|
||||
|
||||
static int32_t tsdbDoRetentionBegin(SRtnArg *arg, SRTNer *rtner) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
STsdb *tsdb = arg->tsdb;
|
||||
|
||||
rtner->tsdb = tsdb;
|
||||
rtner->szPage = tsdb->pVnode->config.tsdbPageSize;
|
||||
rtner->now = arg->now;
|
||||
rtner->cid = tsdbFSAllocEid(tsdb->pFS);
|
||||
|
||||
code = tsdbFSCreateCopySnapshot(tsdb->pFS, &rtner->fsetArr);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
TSDB_ERROR_LOG(TD_VID(rtner->tsdb->pVnode), lino, code);
|
||||
} else {
|
||||
tsdbDebug("vid:%d, cid:%" PRId64 ", %s done", TD_VID(rtner->tsdb->pVnode), rtner->cid, __func__);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbDoRetentionEnd(SRTNer *rtner) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
if (TARRAY2_SIZE(rtner->fopArr) == 0) goto _exit;
|
||||
|
||||
code = tsdbFSEditBegin(rtner->tsdb->pFS, rtner->fopArr, TSDB_FEDIT_MERGE);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
taosThreadMutexLock(&rtner->tsdb->mutex);
|
||||
|
||||
code = tsdbFSEditCommit(rtner->tsdb->pFS);
|
||||
if (code) {
|
||||
taosThreadMutexUnlock(&rtner->tsdb->mutex);
|
||||
if (TARRAY2_SIZE(&rtner->fopArr) > 0) {
|
||||
code = tsdbFSEditBegin(rtner->tsdb->pFS, &rtner->fopArr, TSDB_FEDIT_RETENTION);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
taosThreadMutexLock(&rtner->tsdb->mutex);
|
||||
|
||||
code = tsdbFSEditCommit(rtner->tsdb->pFS);
|
||||
if (code) {
|
||||
taosThreadMutexUnlock(&rtner->tsdb->mutex);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
taosThreadMutexUnlock(&rtner->tsdb->mutex);
|
||||
|
||||
TARRAY2_DESTROY(&rtner->fopArr, NULL);
|
||||
}
|
||||
|
||||
taosThreadMutexUnlock(&rtner->tsdb->mutex);
|
||||
|
||||
TARRAY2_DESTROY(rtner->fopArr, NULL);
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
TSDB_ERROR_LOG(TD_VID(rtner->tsdb->pVnode), lino, code);
|
||||
tsdbError("vgId:%d, %s failed, code:%d, line:%d", TD_VID(rtner->tsdb->pVnode), __func__, code, lino);
|
||||
} else {
|
||||
tsdbDebug("vid:%d, cid:%" PRId64 ", %s done", TD_VID(rtner->tsdb->pVnode), rtner->cid, __func__);
|
||||
}
|
||||
tsdbFSDestroyCopySnapshot(&rtner->fsetArr);
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbDoRetentionOnFileSet(SRTNer *rtner, STFileSet *fset) {
|
||||
static int32_t tsdbDoRetention(SRTNer *rtner) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
STFileObj *fobj = NULL;
|
||||
STFileSet *fset = rtner->fset;
|
||||
int32_t expLevel = tsdbFidLevel(fset->fid, &rtner->tsdb->keepCfg, rtner->now);
|
||||
|
||||
if (expLevel < 0) { // remove the fileset
|
||||
for (int32_t ftype = 0; (ftype < TSDB_FTYPE_MAX) && (fobj = fset->farr[ftype], 1); ++ftype) {
|
||||
if (fobj == NULL) continue;
|
||||
/*
|
||||
int32_t nlevel = tfsGetLevel(rtner->tsdb->pVnode->pTfs);
|
||||
if (tsS3Enabled && nlevel > 1 && TSDB_FTYPE_DATA == ftype && fobj->f->did.level == nlevel - 1) {
|
||||
code = tsdbRemoveFileObjectS3(rtner, fobj);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
} else {*/
|
||||
code = tsdbDoRemoveFileObject(rtner, fobj);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
//}
|
||||
}
|
||||
|
||||
SSttLvl *lvl;
|
||||
|
@ -275,10 +246,6 @@ static int32_t tsdbDoRetentionOnFileSet(SRTNer *rtner, STFileSet *fset) {
|
|||
if (fobj == NULL) continue;
|
||||
|
||||
if (fobj->f->did.level == did.level) {
|
||||
/*
|
||||
code = tsdbCheckMigrateS3(rtner, fobj, ftype, &did);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
*/
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -306,91 +273,110 @@ static int32_t tsdbDoRetentionOnFileSet(SRTNer *rtner, STFileSet *fset) {
|
|||
|
||||
_exit:
|
||||
if (code) {
|
||||
TSDB_ERROR_LOG(TD_VID(rtner->tsdb->pVnode), lino, code);
|
||||
tsdbError("vgId:%d, %s failed, code:%d, line:%d", TD_VID(rtner->tsdb->pVnode), __func__, code, lino);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static void tsdbFreeRtnArg(void *arg) { taosMemoryFree(arg); }
|
||||
static void tsdbRetentionCancel(void *arg) { taosMemoryFree(arg); }
|
||||
|
||||
static int32_t tsdbDoRetentionAsync(void *arg) {
|
||||
static int32_t tsdbDoS3Migrate(SRTNer *rtner);
|
||||
|
||||
static int32_t tsdbRetention(void *arg) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
SRTNer rtner[1] = {0};
|
||||
|
||||
code = tsdbDoRetentionBegin(arg, rtner);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
SRtnArg *rtnArg = (SRtnArg *)arg;
|
||||
STsdb *pTsdb = rtnArg->tsdb;
|
||||
SVnode *pVnode = pTsdb->pVnode;
|
||||
STFileSet *fset = NULL;
|
||||
SRTNer rtner = {
|
||||
.tsdb = pTsdb,
|
||||
.szPage = pVnode->config.tsdbPageSize,
|
||||
.now = rtnArg->now,
|
||||
.cid = tsdbFSAllocEid(pTsdb->pFS),
|
||||
};
|
||||
|
||||
STFileSet *fset;
|
||||
TARRAY2_FOREACH(rtner->fsetArr, fset) {
|
||||
if (fset->fid != ((SRtnArg *)arg)->fid) continue;
|
||||
// begin task
|
||||
taosThreadMutexLock(&pTsdb->mutex);
|
||||
tsdbBeginTaskOnFileSet(pTsdb, rtnArg->fid, &fset);
|
||||
if (fset && (code = tsdbTFileSetInitCopy(pTsdb, fset, &rtner.fset))) {
|
||||
taosThreadMutexUnlock(&pTsdb->mutex);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
taosThreadMutexUnlock(&pTsdb->mutex);
|
||||
|
||||
code = tsdbDoRetentionOnFileSet(rtner, fset);
|
||||
// do retention
|
||||
if (rtner.fset) {
|
||||
if (rtnArg->s3Migrate) {
|
||||
code = tsdbDoS3Migrate(&rtner);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
} else {
|
||||
code = tsdbDoRetention(&rtner);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
code = tsdbDoRetentionEnd(&rtner);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
code = tsdbDoRetentionEnd(rtner);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
if (TARRAY2_DATA(rtner->fopArr)) {
|
||||
TARRAY2_DESTROY(rtner->fopArr, NULL);
|
||||
}
|
||||
TFileSetArray **fsetArr = &rtner->fsetArr;
|
||||
if (fsetArr[0]) {
|
||||
tsdbFSDestroyCopySnapshot(&rtner->fsetArr);
|
||||
}
|
||||
if (rtner.fset) {
|
||||
taosThreadMutexLock(&pTsdb->mutex);
|
||||
tsdbFinishTaskOnFileSet(pTsdb, rtnArg->fid);
|
||||
taosThreadMutexUnlock(&pTsdb->mutex);
|
||||
}
|
||||
|
||||
TSDB_ERROR_LOG(TD_VID(rtner->tsdb->pVnode), lino, code);
|
||||
// clear resources
|
||||
tsdbTFileSetClear(&rtner.fset);
|
||||
TARRAY2_DESTROY(&rtner.fopArr, NULL);
|
||||
taosMemoryFree(arg);
|
||||
if (code) {
|
||||
tsdbError("vgId:%d, %s failed, code:%d, line:%d", TD_VID(((SRtnArg *)arg)->tsdb->pVnode), __func__, code, lino);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tsdbRetention(STsdb *tsdb, int64_t now, int32_t sync) {
|
||||
static int32_t tsdbAsyncRetentionImpl(STsdb *tsdb, int64_t now, bool s3Migrate) {
|
||||
int32_t code = 0;
|
||||
|
||||
taosThreadMutexLock(&tsdb->mutex);
|
||||
|
||||
if (tsdb->bgTaskDisabled) {
|
||||
taosThreadMutexUnlock(&tsdb->mutex);
|
||||
return 0;
|
||||
}
|
||||
int32_t lino = 0;
|
||||
|
||||
STFileSet *fset;
|
||||
TARRAY2_FOREACH(tsdb->pFS->fSetArr, fset) {
|
||||
code = tsdbTFileSetOpenChannel(fset);
|
||||
if (code) {
|
||||
taosThreadMutexUnlock(&tsdb->mutex);
|
||||
return code;
|
||||
}
|
||||
|
||||
SRtnArg *arg = taosMemoryMalloc(sizeof(*arg));
|
||||
if (arg == NULL) {
|
||||
taosThreadMutexUnlock(&tsdb->mutex);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
if (!tsdb->bgTaskDisabled) {
|
||||
TARRAY2_FOREACH(tsdb->pFS->fSetArr, fset) {
|
||||
code = tsdbTFileSetOpenChannel(fset);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
arg->tsdb = tsdb;
|
||||
arg->now = now;
|
||||
arg->fid = fset->fid;
|
||||
SRtnArg *arg = taosMemoryMalloc(sizeof(*arg));
|
||||
if (arg == NULL) {
|
||||
TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit);
|
||||
}
|
||||
|
||||
if (sync) {
|
||||
code = vnodeAsyncC(vnodeAsyncHandle[0], tsdb->pVnode->commitChannel, EVA_PRIORITY_LOW, tsdbDoRetentionAsync,
|
||||
tsdbFreeRtnArg, arg, NULL);
|
||||
} else {
|
||||
code = vnodeAsyncC(vnodeAsyncHandle[1], fset->bgTaskChannel, EVA_PRIORITY_LOW, tsdbDoRetentionAsync,
|
||||
tsdbFreeRtnArg, arg, NULL);
|
||||
}
|
||||
if (code) {
|
||||
tsdbFreeRtnArg(arg);
|
||||
taosThreadMutexUnlock(&tsdb->mutex);
|
||||
return code;
|
||||
arg->tsdb = tsdb;
|
||||
arg->now = now;
|
||||
arg->fid = fset->fid;
|
||||
arg->s3Migrate = s3Migrate;
|
||||
|
||||
if ((code = vnodeAsync(&fset->channel, EVA_PRIORITY_LOW, tsdbRetention, tsdbRetentionCancel, arg, NULL))) {
|
||||
taosMemoryFree(arg);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
taosThreadMutexUnlock(&tsdb->mutex);
|
||||
_exit:
|
||||
if (code) {
|
||||
tsdbError("vgId:%d, %s failed, code:%d, line:%d", TD_VID(tsdb->pVnode), __func__, code, lino);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tsdbAsyncRetention(STsdb *tsdb, int64_t now) {
|
||||
int32_t code = 0;
|
||||
taosThreadMutexLock(&tsdb->mutex);
|
||||
code = tsdbAsyncRetentionImpl(tsdb, now, false);
|
||||
taosThreadMutexUnlock(&tsdb->mutex);
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -462,7 +448,7 @@ static int32_t tsdbMigrateDataFileLCS3(SRTNer *rtner, const STFileObj *fobj, int
|
|||
.of = fobj->f[0],
|
||||
};
|
||||
|
||||
code = TARRAY2_APPEND(rtner->fopArr, op);
|
||||
code = TARRAY2_APPEND(&rtner->fopArr, op);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
// create new
|
||||
|
@ -486,7 +472,7 @@ static int32_t tsdbMigrateDataFileLCS3(SRTNer *rtner, const STFileObj *fobj, int
|
|||
},
|
||||
};
|
||||
|
||||
code = TARRAY2_APPEND(rtner->fopArr, op);
|
||||
code = TARRAY2_APPEND(&rtner->fopArr, op);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
char fname[TSDB_FILENAME_LEN];
|
||||
|
@ -566,7 +552,7 @@ static int32_t tsdbMigrateDataFileS3(SRTNer *rtner, const STFileObj *fobj, int64
|
|||
.of = fobj->f[0],
|
||||
};
|
||||
|
||||
code = TARRAY2_APPEND(rtner->fopArr, op);
|
||||
code = TARRAY2_APPEND(&rtner->fopArr, op);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
// create new
|
||||
|
@ -590,7 +576,7 @@ static int32_t tsdbMigrateDataFileS3(SRTNer *rtner, const STFileObj *fobj, int64
|
|||
},
|
||||
};
|
||||
|
||||
code = TARRAY2_APPEND(rtner->fopArr, op);
|
||||
code = TARRAY2_APPEND(&rtner->fopArr, op);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
char fname[TSDB_FILENAME_LEN];
|
||||
|
@ -653,10 +639,11 @@ _exit:
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbDoS3MigrateOnFileSet(SRTNer *rtner, STFileSet *fset) {
|
||||
static int32_t tsdbDoS3Migrate(SRTNer *rtner) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
STFileSet *fset = rtner->fset;
|
||||
STFileObj *fobj = fset->farr[TSDB_FTYPE_DATA];
|
||||
if (!fobj) return code;
|
||||
|
||||
|
@ -720,41 +707,7 @@ _exit:
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbDoS3MigrateAsync(void *arg) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
SRTNer rtner[1] = {0};
|
||||
|
||||
code = tsdbDoRetentionBegin(arg, rtner);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
STFileSet *fset;
|
||||
TARRAY2_FOREACH(rtner->fsetArr, fset) {
|
||||
if (fset->fid != ((SRtnArg *)arg)->fid) continue;
|
||||
|
||||
code = tsdbDoS3MigrateOnFileSet(rtner, fset);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
code = tsdbDoRetentionEnd(rtner);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
if (TARRAY2_DATA(rtner->fopArr)) {
|
||||
TARRAY2_DESTROY(rtner->fopArr, NULL);
|
||||
}
|
||||
TFileSetArray **fsetArr = &rtner->fsetArr;
|
||||
if (fsetArr[0]) {
|
||||
tsdbFSDestroyCopySnapshot(&rtner->fsetArr);
|
||||
}
|
||||
|
||||
TSDB_ERROR_LOG(TD_VID(rtner->tsdb->pVnode), lino, code);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tsdbS3Migrate(STsdb *tsdb, int64_t now, int32_t sync) {
|
||||
int32_t tsdbAsyncS3Migrate(STsdb *tsdb, int64_t now) {
|
||||
int32_t code = 0;
|
||||
|
||||
extern int8_t tsS3EnabledCfg;
|
||||
|
@ -772,45 +725,7 @@ int32_t tsdbS3Migrate(STsdb *tsdb, int64_t now, int32_t sync) {
|
|||
}
|
||||
|
||||
taosThreadMutexLock(&tsdb->mutex);
|
||||
|
||||
if (tsdb->bgTaskDisabled) {
|
||||
taosThreadMutexUnlock(&tsdb->mutex);
|
||||
return 0;
|
||||
}
|
||||
|
||||
STFileSet *fset;
|
||||
TARRAY2_FOREACH(tsdb->pFS->fSetArr, fset) {
|
||||
code = tsdbTFileSetOpenChannel(fset);
|
||||
if (code) {
|
||||
taosThreadMutexUnlock(&tsdb->mutex);
|
||||
return code;
|
||||
}
|
||||
|
||||
SRtnArg *arg = taosMemoryMalloc(sizeof(*arg));
|
||||
if (arg == NULL) {
|
||||
taosThreadMutexUnlock(&tsdb->mutex);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
arg->tsdb = tsdb;
|
||||
arg->now = now;
|
||||
arg->fid = fset->fid;
|
||||
|
||||
if (sync) {
|
||||
code = vnodeAsyncC(vnodeAsyncHandle[0], tsdb->pVnode->commitChannel, EVA_PRIORITY_LOW, tsdbDoS3MigrateAsync,
|
||||
tsdbFreeRtnArg, arg, NULL);
|
||||
} else {
|
||||
code = vnodeAsyncC(vnodeAsyncHandle[1], fset->bgTaskChannel, EVA_PRIORITY_LOW, tsdbDoS3MigrateAsync,
|
||||
tsdbFreeRtnArg, arg, NULL);
|
||||
}
|
||||
if (code) {
|
||||
tsdbFreeRtnArg(arg);
|
||||
taosThreadMutexUnlock(&tsdb->mutex);
|
||||
return code;
|
||||
}
|
||||
}
|
||||
|
||||
code = tsdbAsyncRetentionImpl(tsdb, now, true);
|
||||
taosThreadMutexUnlock(&tsdb->mutex);
|
||||
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
#include "vnd.h"
|
||||
#include "vnodeHash.h"
|
||||
|
||||
typedef struct SVAsync SVAsync;
|
||||
typedef struct SVATask SVATask;
|
||||
typedef struct SVAChannel SVAChannel;
|
||||
|
||||
|
@ -54,7 +55,7 @@ struct SVATask {
|
|||
int32_t priorScore;
|
||||
SVAChannel *channel;
|
||||
int32_t (*execute)(void *);
|
||||
void (*complete)(void *);
|
||||
void (*cancel)(void *);
|
||||
void *arg;
|
||||
EVATaskState state;
|
||||
|
||||
|
@ -67,6 +68,11 @@ struct SVATask {
|
|||
struct SVATask *next;
|
||||
};
|
||||
|
||||
typedef struct {
|
||||
void (*cancel)(void *);
|
||||
void *arg;
|
||||
} SVATaskCancelInfo;
|
||||
|
||||
#define VATASK_PIORITY(task_) ((task_)->priority - ((task_)->priorScore / 4))
|
||||
|
||||
// async channel
|
||||
|
@ -112,6 +118,10 @@ struct SVAsync {
|
|||
SVHashTable *taskTable;
|
||||
};
|
||||
|
||||
SVAsync *vnodeAsyncs[3];
|
||||
#define MIN_ASYNC_ID 1
|
||||
#define MAX_ASYNC_ID (sizeof(vnodeAsyncs) / sizeof(vnodeAsyncs[0]) - 1)
|
||||
|
||||
static int32_t vnodeAsyncTaskDone(SVAsync *async, SVATask *task) {
|
||||
int32_t ret;
|
||||
|
||||
|
@ -160,11 +170,6 @@ static int32_t vnodeAsyncTaskDone(SVAsync *async, SVATask *task) {
|
|||
}
|
||||
async->numTasks--;
|
||||
|
||||
// call complete callback
|
||||
if (task->complete) {
|
||||
task->complete(task->arg);
|
||||
}
|
||||
|
||||
if (task->numWait == 0) {
|
||||
taosThreadCondDestroy(&task->waitCond);
|
||||
taosMemoryFree(task);
|
||||
|
@ -176,7 +181,7 @@ static int32_t vnodeAsyncTaskDone(SVAsync *async, SVATask *task) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t vnodeAsyncCancelAllTasks(SVAsync *async) {
|
||||
static int32_t vnodeAsyncCancelAllTasks(SVAsync *async, SArray *cancelArray) {
|
||||
while (async->queue[0].next != &async->queue[0] || async->queue[1].next != &async->queue[1] ||
|
||||
async->queue[2].next != &async->queue[2]) {
|
||||
for (int32_t i = 0; i < EVA_PRIORITY_MAX; i++) {
|
||||
|
@ -184,6 +189,12 @@ static int32_t vnodeAsyncCancelAllTasks(SVAsync *async) {
|
|||
SVATask *task = async->queue[i].next;
|
||||
task->prev->next = task->next;
|
||||
task->next->prev = task->prev;
|
||||
if (task->cancel) {
|
||||
taosArrayPush(cancelArray, &(SVATaskCancelInfo){
|
||||
.cancel = task->cancel,
|
||||
.arg = task->arg,
|
||||
});
|
||||
}
|
||||
vnodeAsyncTaskDone(async, task);
|
||||
}
|
||||
}
|
||||
|
@ -194,6 +205,7 @@ static int32_t vnodeAsyncCancelAllTasks(SVAsync *async) {
|
|||
static void *vnodeAsyncLoop(void *arg) {
|
||||
SVWorker *worker = (SVWorker *)arg;
|
||||
SVAsync *async = worker->async;
|
||||
SArray *cancelArray = taosArrayInit(0, sizeof(SVATaskCancelInfo));
|
||||
|
||||
setThreadName(async->label);
|
||||
|
||||
|
@ -209,12 +221,12 @@ static void *vnodeAsyncLoop(void *arg) {
|
|||
for (;;) {
|
||||
if (async->stop || worker->workerId >= async->numWorkers) {
|
||||
if (async->stop) { // cancel all tasks
|
||||
vnodeAsyncCancelAllTasks(async);
|
||||
vnodeAsyncCancelAllTasks(async, cancelArray);
|
||||
}
|
||||
worker->state = EVA_WORKER_STATE_STOP;
|
||||
async->numLaunchWorkers--;
|
||||
taosThreadMutexUnlock(&async->mutex);
|
||||
return NULL;
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < EVA_PRIORITY_MAX; i++) {
|
||||
|
@ -259,6 +271,12 @@ static void *vnodeAsyncLoop(void *arg) {
|
|||
worker->runningTask->execute(worker->runningTask->arg);
|
||||
}
|
||||
|
||||
_exit:
|
||||
for (int32_t i = 0; i < taosArrayGetSize(cancelArray); i++) {
|
||||
SVATaskCancelInfo *cancel = (SVATaskCancelInfo *)taosArrayGet(cancelArray, i);
|
||||
cancel->cancel(cancel->arg);
|
||||
}
|
||||
taosArrayDestroy(cancelArray);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
@ -294,7 +312,7 @@ static int32_t vnodeAsyncChannelCompare(const void *obj1, const void *obj2) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t vnodeAsyncInit(SVAsync **async, char *label) {
|
||||
static int32_t vnodeAsyncInit(SVAsync **async, const char *label) {
|
||||
int32_t ret;
|
||||
|
||||
if (async == NULL) {
|
||||
|
@ -360,7 +378,7 @@ int32_t vnodeAsyncInit(SVAsync **async, char *label) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t vnodeAsyncDestroy(SVAsync **async) {
|
||||
static int32_t vnodeAsyncDestroy(SVAsync **async) {
|
||||
if ((*async) == NULL) {
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
|
@ -433,20 +451,39 @@ static int32_t vnodeAsyncLaunchWorker(SVAsync *async) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
#ifdef BUILD_NO_CALL
|
||||
int32_t vnodeAsync(SVAsync *async, EVAPriority priority, int32_t (*execute)(void *), void (*complete)(void *),
|
||||
void *arg, int64_t *taskId) {
|
||||
return vnodeAsyncC(async, 0, priority, execute, complete, arg, taskId);
|
||||
}
|
||||
#endif
|
||||
int32_t vnodeAsyncOpen(int32_t numOfThreads) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
int32_t vnodeAsyncC(SVAsync *async, int64_t channelId, EVAPriority priority, int32_t (*execute)(void *),
|
||||
void (*complete)(void *), void *arg, int64_t *taskId) {
|
||||
if (async == NULL || execute == NULL || channelId < 0) {
|
||||
// vnode-commit
|
||||
code = vnodeAsyncInit(&vnodeAsyncs[1], "vnode-commit");
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
vnodeAsyncSetWorkers(1, numOfThreads);
|
||||
|
||||
// vnode-merge
|
||||
code = vnodeAsyncInit(&vnodeAsyncs[2], "vnode-merge");
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
vnodeAsyncSetWorkers(2, numOfThreads);
|
||||
|
||||
_exit:
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t vnodeAsyncClose() {
|
||||
vnodeAsyncDestroy(&vnodeAsyncs[1]);
|
||||
vnodeAsyncDestroy(&vnodeAsyncs[2]);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t vnodeAsync(SVAChannelID *channelID, EVAPriority priority, int32_t (*execute)(void *), void (*cancel)(void *),
|
||||
void *arg, SVATaskID *taskID) {
|
||||
if (channelID == NULL || channelID->async < MIN_ASYNC_ID || channelID->async > MAX_ASYNC_ID || execute == NULL ||
|
||||
channelID->id < 0) {
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
|
||||
int64_t id;
|
||||
int64_t id;
|
||||
SVAsync *async = vnodeAsyncs[channelID->async];
|
||||
|
||||
// create task object
|
||||
SVATask *task = (SVATask *)taosMemoryCalloc(1, sizeof(SVATask));
|
||||
|
@ -457,7 +494,7 @@ int32_t vnodeAsyncC(SVAsync *async, int64_t channelId, EVAPriority priority, int
|
|||
task->priority = priority;
|
||||
task->priorScore = 0;
|
||||
task->execute = execute;
|
||||
task->complete = complete;
|
||||
task->cancel = cancel;
|
||||
task->arg = arg;
|
||||
task->state = EVA_TASK_STATE_WAITTING;
|
||||
task->numWait = 0;
|
||||
|
@ -466,10 +503,12 @@ int32_t vnodeAsyncC(SVAsync *async, int64_t channelId, EVAPriority priority, int
|
|||
// schedule task
|
||||
taosThreadMutexLock(&async->mutex);
|
||||
|
||||
if (channelId == 0) {
|
||||
if (channelID->id == 0) {
|
||||
task->channel = NULL;
|
||||
} else {
|
||||
SVAChannel channel = {.channelId = channelId};
|
||||
SVAChannel channel = {
|
||||
.channelId = channelID->id,
|
||||
};
|
||||
vHashGet(async->channelTable, &channel, (void **)&task->channel);
|
||||
if (task->channel == NULL) {
|
||||
taosThreadMutexUnlock(&async->mutex);
|
||||
|
@ -540,20 +579,24 @@ int32_t vnodeAsyncC(SVAsync *async, int64_t channelId, EVAPriority priority, int
|
|||
|
||||
taosThreadMutexUnlock(&async->mutex);
|
||||
|
||||
if (taskId != NULL) {
|
||||
*taskId = id;
|
||||
if (taskID != NULL) {
|
||||
taskID->async = channelID->async;
|
||||
taskID->id = id;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t vnodeAWait(SVAsync *async, int64_t taskId) {
|
||||
if (async == NULL || taskId <= 0) {
|
||||
int32_t vnodeAWait(SVATaskID *taskID) {
|
||||
if (taskID == NULL || taskID->async < MIN_ASYNC_ID || taskID->async > MAX_ASYNC_ID || taskID->id <= 0) {
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
|
||||
SVAsync *async = vnodeAsyncs[taskID->async];
|
||||
SVATask *task = NULL;
|
||||
SVATask task2 = {.taskId = taskId};
|
||||
SVATask task2 = {
|
||||
.taskId = taskID->id,
|
||||
};
|
||||
|
||||
taosThreadMutexLock(&async->mutex);
|
||||
|
||||
|
@ -574,21 +617,27 @@ int32_t vnodeAWait(SVAsync *async, int64_t taskId) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t vnodeACancel(SVAsync *async, int64_t taskId) {
|
||||
if (async == NULL) {
|
||||
int32_t vnodeACancel(SVATaskID *taskID) {
|
||||
if (taskID == NULL || taskID->async < MIN_ASYNC_ID || taskID->async > MAX_ASYNC_ID || taskID->id <= 0) {
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
|
||||
int32_t ret = 0;
|
||||
SVAsync *async = vnodeAsyncs[taskID->async];
|
||||
SVATask *task = NULL;
|
||||
SVATask task2 = {.taskId = taskId};
|
||||
SVATask task2 = {
|
||||
.taskId = taskID->id,
|
||||
};
|
||||
void (*cancel)(void *) = NULL;
|
||||
void *arg = NULL;
|
||||
|
||||
taosThreadMutexLock(&async->mutex);
|
||||
|
||||
vHashGet(async->taskTable, &task2, (void **)&task);
|
||||
if (task) {
|
||||
if (task->state == EVA_TASK_STATE_WAITTING) {
|
||||
// remove from queue
|
||||
cancel = task->cancel;
|
||||
arg = task->arg;
|
||||
task->next->prev = task->prev;
|
||||
task->prev->next = task->next;
|
||||
vnodeAsyncTaskDone(async, task);
|
||||
|
@ -599,14 +648,18 @@ int32_t vnodeACancel(SVAsync *async, int64_t taskId) {
|
|||
|
||||
taosThreadMutexUnlock(&async->mutex);
|
||||
|
||||
if (cancel) {
|
||||
cancel(arg);
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
int32_t vnodeAsyncSetWorkers(SVAsync *async, int32_t numWorkers) {
|
||||
if (async == NULL || numWorkers <= 0 || numWorkers > VNODE_ASYNC_MAX_WORKERS) {
|
||||
int32_t vnodeAsyncSetWorkers(int64_t asyncID, int32_t numWorkers) {
|
||||
if (asyncID < MIN_ASYNC_ID || asyncID > MAX_ASYNC_ID || numWorkers <= 0 || numWorkers > VNODE_ASYNC_MAX_WORKERS) {
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
|
||||
SVAsync *async = vnodeAsyncs[asyncID];
|
||||
taosThreadMutexLock(&async->mutex);
|
||||
async->numWorkers = numWorkers;
|
||||
if (async->numIdleWorkers > 0) {
|
||||
|
@ -617,11 +670,13 @@ int32_t vnodeAsyncSetWorkers(SVAsync *async, int32_t numWorkers) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t vnodeAChannelInit(SVAsync *async, int64_t *channelId) {
|
||||
if (async == NULL || channelId == NULL) {
|
||||
int32_t vnodeAChannelInit(int64_t asyncID, SVAChannelID *channelID) {
|
||||
if (channelID == NULL || asyncID < MIN_ASYNC_ID || asyncID > MAX_ASYNC_ID) {
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
|
||||
SVAsync *async = vnodeAsyncs[asyncID];
|
||||
|
||||
// create channel object
|
||||
SVAChannel *channel = (SVAChannel *)taosMemoryMalloc(sizeof(SVAChannel));
|
||||
if (channel == NULL) {
|
||||
|
@ -637,7 +692,7 @@ int32_t vnodeAChannelInit(SVAsync *async, int64_t *channelId) {
|
|||
// register channel
|
||||
taosThreadMutexLock(&async->mutex);
|
||||
|
||||
channel->channelId = *channelId = ++async->nextChannelId;
|
||||
channel->channelId = channelID->id = ++async->nextChannelId;
|
||||
|
||||
// add to hash table
|
||||
int32_t ret = vHashPut(async->channelTable, channel);
|
||||
|
@ -657,16 +712,24 @@ int32_t vnodeAChannelInit(SVAsync *async, int64_t *channelId) {
|
|||
|
||||
taosThreadMutexUnlock(&async->mutex);
|
||||
|
||||
channelID->async = asyncID;
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t vnodeAChannelDestroy(SVAsync *async, int64_t channelId, bool waitRunning) {
|
||||
if (async == NULL || channelId <= 0) {
|
||||
int32_t vnodeAChannelDestroy(SVAChannelID *channelID, bool waitRunning) {
|
||||
if (channelID == NULL || channelID->async < MIN_ASYNC_ID || channelID->async > MAX_ASYNC_ID || channelID->id <= 0) {
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
|
||||
SVAsync *async = vnodeAsyncs[channelID->async];
|
||||
SVAChannel *channel = NULL;
|
||||
SVAChannel channel2 = {.channelId = channelId};
|
||||
SVAChannel channel2 = {
|
||||
.channelId = channelID->id,
|
||||
};
|
||||
SArray *cancelArray = taosArrayInit(0, sizeof(SVATaskCancelInfo));
|
||||
if (cancelArray == NULL) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
taosThreadMutexLock(&async->mutex);
|
||||
|
||||
|
@ -684,6 +747,12 @@ int32_t vnodeAChannelDestroy(SVAsync *async, int64_t channelId, bool waitRunning
|
|||
SVATask *task = channel->queue[i].next;
|
||||
task->prev->next = task->next;
|
||||
task->next->prev = task->prev;
|
||||
if (task->cancel) {
|
||||
taosArrayPush(cancelArray, &(SVATaskCancelInfo){
|
||||
.cancel = task->cancel,
|
||||
.arg = task->arg,
|
||||
});
|
||||
}
|
||||
vnodeAsyncTaskDone(async, task);
|
||||
}
|
||||
}
|
||||
|
@ -693,6 +762,12 @@ int32_t vnodeAChannelDestroy(SVAsync *async, int64_t channelId, bool waitRunning
|
|||
if (channel->scheduled) {
|
||||
channel->scheduled->prev->next = channel->scheduled->next;
|
||||
channel->scheduled->next->prev = channel->scheduled->prev;
|
||||
if (channel->scheduled->cancel) {
|
||||
taosArrayPush(cancelArray, &(SVATaskCancelInfo){
|
||||
.cancel = channel->scheduled->cancel,
|
||||
.arg = channel->scheduled->arg,
|
||||
});
|
||||
}
|
||||
vnodeAsyncTaskDone(async, channel->scheduled);
|
||||
}
|
||||
taosMemoryFree(channel);
|
||||
|
@ -713,12 +788,16 @@ int32_t vnodeAChannelDestroy(SVAsync *async, int64_t channelId, bool waitRunning
|
|||
channel->state = EVA_CHANNEL_STATE_CLOSE;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
taosThreadMutexUnlock(&async->mutex);
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
|
||||
taosThreadMutexUnlock(&async->mutex);
|
||||
for (int32_t i = 0; i < taosArrayGetSize(cancelArray); i++) {
|
||||
SVATaskCancelInfo *cancel = (SVATaskCancelInfo *)taosArrayGet(cancelArray, i);
|
||||
cancel->cancel(cancel->arg);
|
||||
}
|
||||
taosArrayDestroy(cancelArray);
|
||||
|
||||
channelID->async = 0;
|
||||
channelID->id = 0;
|
||||
return 0;
|
||||
}
|
|
@ -289,7 +289,7 @@ static int32_t vnodePrepareCommit(SVnode *pVnode, SCommitInfo *pInfo) {
|
|||
int64_t lastCommitted = pInfo->info.state.committed;
|
||||
|
||||
// wait last commit task
|
||||
vnodeAWait(vnodeAsyncHandle[0], pVnode->commitTask);
|
||||
vnodeAWait(&pVnode->commitTask);
|
||||
|
||||
if (syncNodeGetConfig(pVnode->sync, &pVnode->config.syncCfg) != 0) goto _exit;
|
||||
|
||||
|
@ -361,15 +361,14 @@ static void vnodeReturnBufPool(SVnode *pVnode) {
|
|||
|
||||
taosThreadMutexUnlock(&pVnode->mutex);
|
||||
}
|
||||
static int32_t vnodeCommitTask(void *arg) {
|
||||
static int32_t vnodeCommit(void *arg) {
|
||||
int32_t code = 0;
|
||||
|
||||
SCommitInfo *pInfo = (SCommitInfo *)arg;
|
||||
SVnode *pVnode = pInfo->pVnode;
|
||||
|
||||
// commit
|
||||
code = vnodeCommitImpl(pInfo);
|
||||
if (code) {
|
||||
if ((code = vnodeCommitImpl(pInfo))) {
|
||||
vFatal("vgId:%d, failed to commit vnode since %s", TD_VID(pVnode), terrstr());
|
||||
taosMsleep(100);
|
||||
exit(EXIT_FAILURE);
|
||||
|
@ -379,37 +378,34 @@ static int32_t vnodeCommitTask(void *arg) {
|
|||
vnodeReturnBufPool(pVnode);
|
||||
|
||||
_exit:
|
||||
taosMemoryFree(arg);
|
||||
return code;
|
||||
}
|
||||
|
||||
static void vnodeCompleteCommit(void *arg) { taosMemoryFree(arg); }
|
||||
static void vnodeCommitCancel(void *arg) { taosMemoryFree(arg); }
|
||||
|
||||
int vnodeAsyncCommit(SVnode *pVnode) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
SCommitInfo *pInfo = (SCommitInfo *)taosMemoryCalloc(1, sizeof(*pInfo));
|
||||
if (NULL == pInfo) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _exit;
|
||||
TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit);
|
||||
}
|
||||
|
||||
// prepare to commit
|
||||
code = vnodePrepareCommit(pVnode, pInfo);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
goto _exit;
|
||||
}
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
// schedule the task
|
||||
code = vnodeAsyncC(vnodeAsyncHandle[0], pVnode->commitChannel, EVA_PRIORITY_HIGH, vnodeCommitTask,
|
||||
vnodeCompleteCommit, pInfo, &pVnode->commitTask);
|
||||
code =
|
||||
vnodeAsync(&pVnode->commitChannel, EVA_PRIORITY_HIGH, vnodeCommit, vnodeCommitCancel, pInfo, &pVnode->commitTask);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
if (NULL != pInfo) {
|
||||
taosMemoryFree(pInfo);
|
||||
}
|
||||
vError("vgId:%d, %s failed since %s, commit id:%" PRId64, TD_VID(pVnode), __func__, tstrerror(code),
|
||||
pVnode->state.commitID);
|
||||
taosMemoryFree(pInfo);
|
||||
vError("vgId:%d %s failed at line %d since %s" PRId64, TD_VID(pVnode), __func__, lino, tstrerror(code));
|
||||
} else {
|
||||
vInfo("vgId:%d, vnode async commit done, commitId:%" PRId64 " term:%" PRId64 " applied:%" PRId64, TD_VID(pVnode),
|
||||
pVnode->state.commitID, pVnode->state.applyTerm, pVnode->state.applied);
|
||||
|
@ -419,7 +415,7 @@ _exit:
|
|||
|
||||
int vnodeSyncCommit(SVnode *pVnode) {
|
||||
vnodeAsyncCommit(pVnode);
|
||||
vnodeAWait(vnodeAsyncHandle[0], pVnode->commitTask);
|
||||
vnodeAWait(&pVnode->commitTask);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -22,23 +22,6 @@
|
|||
extern "C" {
|
||||
#endif
|
||||
|
||||
typedef struct SVHashTable SVHashTable;
|
||||
|
||||
struct SVHashTable {
|
||||
uint32_t (*hash)(const void*);
|
||||
int32_t (*compare)(const void*, const void*);
|
||||
int32_t numEntries;
|
||||
uint32_t numBuckets;
|
||||
struct SVHashEntry** buckets;
|
||||
};
|
||||
|
||||
#define vHashNumEntries(ht) ((ht)->numEntries)
|
||||
int32_t vHashInit(SVHashTable** ht, uint32_t (*hash)(const void*), int32_t (*compare)(const void*, const void*));
|
||||
int32_t vHashDestroy(SVHashTable** ht);
|
||||
int32_t vHashPut(SVHashTable* ht, void* obj);
|
||||
int32_t vHashGet(SVHashTable* ht, const void* obj, void** retObj);
|
||||
int32_t vHashDrop(SVHashTable* ht, const void* obj);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -18,8 +18,6 @@
|
|||
|
||||
static volatile int32_t VINIT = 0;
|
||||
|
||||
SVAsync* vnodeAsyncHandle[2];
|
||||
|
||||
int vnodeInit(int nthreads) {
|
||||
int32_t init;
|
||||
|
||||
|
@ -28,13 +26,9 @@ int vnodeInit(int nthreads) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
// vnode-commit
|
||||
vnodeAsyncInit(&vnodeAsyncHandle[0], "vnode-commit");
|
||||
vnodeAsyncSetWorkers(vnodeAsyncHandle[0], nthreads);
|
||||
|
||||
// vnode-merge
|
||||
vnodeAsyncInit(&vnodeAsyncHandle[1], "vnode-merge");
|
||||
vnodeAsyncSetWorkers(vnodeAsyncHandle[1], nthreads);
|
||||
if (vnodeAsyncOpen(nthreads) != 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (walInit() < 0) {
|
||||
return -1;
|
||||
|
@ -48,8 +42,7 @@ void vnodeCleanup() {
|
|||
if (init == 0) return;
|
||||
|
||||
// set stop
|
||||
vnodeAsyncDestroy(&vnodeAsyncHandle[0]);
|
||||
vnodeAsyncDestroy(&vnodeAsyncHandle[1]);
|
||||
vnodeAsyncClose();
|
||||
|
||||
walCleanUp();
|
||||
smaCleanUp();
|
||||
|
|
|
@ -399,7 +399,7 @@ SVnode *vnodeOpen(const char *path, int32_t diskPrimary, STfs *pTfs, SMsgCb msgC
|
|||
taosThreadMutexInit(&pVnode->mutex, NULL);
|
||||
taosThreadCondInit(&pVnode->poolNotEmpty, NULL);
|
||||
|
||||
if (vnodeAChannelInit(vnodeAsyncHandle[0], &pVnode->commitChannel) != 0) {
|
||||
if (vnodeAChannelInit(1, &pVnode->commitChannel) != 0) {
|
||||
vError("vgId:%d, failed to init commit channel", TD_VID(pVnode));
|
||||
goto _err;
|
||||
}
|
||||
|
@ -527,8 +527,8 @@ void vnodePostClose(SVnode *pVnode) { vnodeSyncPostClose(pVnode); }
|
|||
|
||||
void vnodeClose(SVnode *pVnode) {
|
||||
if (pVnode) {
|
||||
vnodeAWait(vnodeAsyncHandle[0], pVnode->commitTask);
|
||||
vnodeAChannelDestroy(vnodeAsyncHandle[0], pVnode->commitChannel, true);
|
||||
vnodeAWait(&pVnode->commitTask);
|
||||
vnodeAChannelDestroy(&pVnode->commitChannel, true);
|
||||
vnodeSyncClose(pVnode);
|
||||
vnodeQueryClose(pVnode);
|
||||
tqClose(pVnode->pTq);
|
||||
|
|
|
@ -15,20 +15,15 @@
|
|||
|
||||
#include "vnd.h"
|
||||
|
||||
int32_t vnodeDoRetention(SVnode *pVnode, int64_t now) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
extern int32_t tsdbAsyncRetention(STsdb *tsdb, int64_t now);
|
||||
extern int32_t tsdbAsyncS3Migrate(STsdb *tsdb, int64_t now);
|
||||
|
||||
code = tsdbRetention(pVnode->pTsdb, now, pVnode->config.sttTrigger == 1);
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) code = smaRetention(pVnode->pSma, now);
|
||||
|
||||
return code;
|
||||
int32_t vnodeAsyncRetention(SVnode *pVnode, int64_t now) {
|
||||
// async retention
|
||||
return tsdbAsyncRetention(pVnode->pTsdb, now);
|
||||
}
|
||||
|
||||
int32_t vnodeDoS3Migrate(SVnode *pVnode, int64_t now) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
code = tsdbS3Migrate(pVnode->pTsdb, now, pVnode->config.sttTrigger == 1);
|
||||
|
||||
return code;
|
||||
int32_t vnodeAsyncS3Migrate(SVnode *pVnode, int64_t now) {
|
||||
// async migration
|
||||
return tsdbAsyncS3Migrate(pVnode->pTsdb, now);
|
||||
}
|
||||
|
|
|
@ -622,13 +622,13 @@ extern int32_t tsdbEnableBgTask(STsdb *pTsdb);
|
|||
static int32_t vnodeCancelAndDisableAllBgTask(SVnode *pVnode) {
|
||||
tsdbDisableAndCancelAllBgTask(pVnode->pTsdb);
|
||||
vnodeSyncCommit(pVnode);
|
||||
vnodeAChannelDestroy(vnodeAsyncHandle[0], pVnode->commitChannel, true);
|
||||
vnodeAChannelDestroy(&pVnode->commitChannel, true);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t vnodeEnableBgTask(SVnode *pVnode) {
|
||||
tsdbEnableBgTask(pVnode->pTsdb);
|
||||
vnodeAChannelInit(vnodeAsyncHandle[0], &pVnode->commitChannel);
|
||||
vnodeAChannelInit(1, &pVnode->commitChannel);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -50,7 +50,7 @@ static int32_t vnodeProcessArbCheckSyncReq(SVnode *pVnode, void *pReq, int32_t l
|
|||
|
||||
static int32_t vnodePreCheckAssignedLogSyncd(SVnode *pVnode, char *member0Token, char *member1Token);
|
||||
static int32_t vnodeCheckAssignedLogSyncd(SVnode *pVnode, char *member0Token, char *member1Token);
|
||||
static int32_t vnodeProcessFetchTtlExpiredTbs(SVnode* pVnode, int64_t ver, void* pReq, int32_t len, SRpcMsg* pRsp);
|
||||
static int32_t vnodeProcessFetchTtlExpiredTbs(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||
|
||||
extern int32_t vnodeProcessKillCompactReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||
extern int32_t vnodeQueryCompactProgress(SVnode *pVnode, SRpcMsg *pMsg);
|
||||
|
@ -875,7 +875,7 @@ void vnodeUpdateMetaRsp(SVnode *pVnode, STableMetaRsp *pMetaRsp) {
|
|||
pMetaRsp->precision = pVnode->config.tsdbCfg.precision;
|
||||
}
|
||||
|
||||
extern int32_t vnodeDoRetention(SVnode *pVnode, int64_t now);
|
||||
extern int32_t vnodeAsyncRetention(SVnode *pVnode, int64_t now);
|
||||
|
||||
static int32_t vnodeProcessTrimReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
||||
int32_t code = 0;
|
||||
|
@ -889,13 +889,13 @@ static int32_t vnodeProcessTrimReq(SVnode *pVnode, int64_t ver, void *pReq, int3
|
|||
|
||||
vInfo("vgId:%d, trim vnode request will be processed, time:%d", pVnode->config.vgId, trimReq.timestamp);
|
||||
|
||||
code = vnodeDoRetention(pVnode, trimReq.timestamp);
|
||||
code = vnodeAsyncRetention(pVnode, trimReq.timestamp);
|
||||
|
||||
_exit:
|
||||
return code;
|
||||
}
|
||||
|
||||
extern int32_t vnodeDoS3Migrate(SVnode *pVnode, int64_t now);
|
||||
extern int32_t vnodeAsyncS3Migrate(SVnode *pVnode, int64_t now);
|
||||
|
||||
static int32_t vnodeProcessS3MigrateReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
||||
int32_t code = 0;
|
||||
|
@ -909,7 +909,7 @@ static int32_t vnodeProcessS3MigrateReq(SVnode *pVnode, int64_t ver, void *pReq,
|
|||
|
||||
vInfo("vgId:%d, s3migrate vnode request will be processed, time:%d", pVnode->config.vgId, s3migrateReq.timestamp);
|
||||
|
||||
code = vnodeDoS3Migrate(pVnode, s3migrateReq.timestamp);
|
||||
code = vnodeAsyncS3Migrate(pVnode, s3migrateReq.timestamp);
|
||||
|
||||
_exit:
|
||||
return code;
|
||||
|
@ -940,13 +940,13 @@ end:
|
|||
return ret;
|
||||
}
|
||||
|
||||
static int32_t vnodeProcessFetchTtlExpiredTbs(SVnode* pVnode, int64_t ver, void* pReq, int32_t len, SRpcMsg* pRsp) {
|
||||
static int32_t vnodeProcessFetchTtlExpiredTbs(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
||||
int32_t code = -1;
|
||||
SMetaReader mr = {0};
|
||||
SVDropTtlTableReq ttlReq = {0};
|
||||
SVFetchTtlExpiredTbsRsp rsp = {0};
|
||||
SEncoder encoder = {0};
|
||||
SArray* pNames = NULL;
|
||||
SArray *pNames = NULL;
|
||||
pRsp->msgType = TDMT_VND_FETCH_TTL_EXPIRED_TBS_RSP;
|
||||
pRsp->code = TSDB_CODE_SUCCESS;
|
||||
pRsp->pCont = NULL;
|
||||
|
@ -959,8 +959,8 @@ static int32_t vnodeProcessFetchTtlExpiredTbs(SVnode* pVnode, int64_t ver, void*
|
|||
|
||||
ASSERT(ttlReq.nUids == taosArrayGetSize(ttlReq.pTbUids));
|
||||
|
||||
tb_uid_t suid;
|
||||
char ctbName[TSDB_TABLE_NAME_LEN];
|
||||
tb_uid_t suid;
|
||||
char ctbName[TSDB_TABLE_NAME_LEN];
|
||||
SVDropTbReq expiredTb = {.igNotExists = true};
|
||||
metaReaderDoInit(&mr, pVnode->pMeta, 0);
|
||||
rsp.vgId = TD_VID(pVnode);
|
||||
|
@ -974,12 +974,12 @@ static int32_t vnodeProcessFetchTtlExpiredTbs(SVnode* pVnode, int64_t ver, void*
|
|||
}
|
||||
char buf[TSDB_TABLE_NAME_LEN];
|
||||
for (int32_t i = 0; i < ttlReq.nUids; ++i) {
|
||||
tb_uid_t* uid = taosArrayGet(ttlReq.pTbUids, i);
|
||||
tb_uid_t *uid = taosArrayGet(ttlReq.pTbUids, i);
|
||||
expiredTb.suid = *uid;
|
||||
terrno = metaReaderGetTableEntryByUid(&mr, *uid);
|
||||
if (terrno < 0) goto _end;
|
||||
strncpy(buf, mr.me.name, TSDB_TABLE_NAME_LEN);
|
||||
void* p = taosArrayPush(pNames, buf);
|
||||
void *p = taosArrayPush(pNames, buf);
|
||||
expiredTb.name = p;
|
||||
if (mr.me.type == TSDB_CHILD_TABLE) {
|
||||
expiredTb.suid = mr.me.ctbEntry.suid;
|
||||
|
@ -1153,7 +1153,7 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t ver, void *pReq,
|
|||
if (i < tbNames->size - 1) {
|
||||
taosStringBuilderAppendChar(&sb, ',');
|
||||
}
|
||||
//taosMemoryFreeClear(*key);
|
||||
// taosMemoryFreeClear(*key);
|
||||
}
|
||||
|
||||
size_t len = 0;
|
||||
|
@ -2250,14 +2250,14 @@ static int32_t vnodeProcessDropIndexReq(SVnode *pVnode, int64_t ver, void *pReq,
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
extern int32_t vnodeProcessCompactVnodeReqImpl(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||
extern int32_t vnodeAsyncCompact(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||
|
||||
static int32_t vnodeProcessCompactVnodeReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
||||
if (!pVnode->restored) {
|
||||
vInfo("vgId:%d, ignore compact req during restoring. ver:%" PRId64, TD_VID(pVnode), ver);
|
||||
return 0;
|
||||
}
|
||||
return vnodeProcessCompactVnodeReqImpl(pVnode, ver, pReq, len, pRsp);
|
||||
return vnodeAsyncCompact(pVnode, ver, pReq, len, pRsp);
|
||||
}
|
||||
|
||||
static int32_t vnodeProcessConfigChangeReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
||||
|
@ -2364,8 +2364,6 @@ _OVER:
|
|||
}
|
||||
|
||||
#ifndef TD_ENTERPRISE
|
||||
int32_t vnodeProcessCompactVnodeReqImpl(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
||||
return 0;
|
||||
}
|
||||
int32_t vnodeAsyncCompact(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) { return 0; }
|
||||
int32_t tsdbAsyncCompact(STsdb *tsdb, const STimeWindow *tw, bool sync) { return 0; }
|
||||
#endif
|
||||
|
|
|
@ -177,11 +177,10 @@ int32_t tfsAllocDisk(STfs *pTfs, int32_t expLevel, SDiskID *pDiskId) {
|
|||
continue;
|
||||
}
|
||||
|
||||
return 0;
|
||||
return (terrno = 0);
|
||||
}
|
||||
|
||||
terrno = TSDB_CODE_FS_NO_VALID_DISK;
|
||||
return -1;
|
||||
return (terrno = TSDB_CODE_FS_NO_VALID_DISK);
|
||||
}
|
||||
|
||||
const char *tfsGetPrimaryPath(STfs *pTfs) { return TFS_PRIMARY_DISK(pTfs)->path; }
|
||||
|
|
Loading…
Reference in New Issue