Merge pull request #27800 from taosdata/enh/TD-31890-6
enh: error code handle
This commit is contained in:
commit
19ccc5ca9a
|
@ -280,7 +280,7 @@ void tsdbGetCurrentFName(STsdb *pTsdb, char *current, char *current_t);
|
|||
// tsdbReaderWriter.c ==============================================================================================
|
||||
// SDataFReader
|
||||
int32_t tsdbDataFReaderOpen(SDataFReader **ppReader, STsdb *pTsdb, SDFileSet *pSet);
|
||||
int32_t tsdbDataFReaderClose(SDataFReader **ppReader);
|
||||
void tsdbDataFReaderClose(SDataFReader **ppReader);
|
||||
int32_t tsdbReadBlockIdx(SDataFReader *pReader, SArray *aBlockIdx);
|
||||
int32_t tsdbReadDataBlk(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *mDataBlk);
|
||||
int32_t tsdbReadSttBlk(SDataFReader *pReader, int32_t iStt, SArray *aSttBlk);
|
||||
|
@ -678,8 +678,8 @@ typedef TARRAY2(STFileSet *) TFileSetArray;
|
|||
typedef struct STFileSetRange STFileSetRange;
|
||||
typedef TARRAY2(STFileSetRange *) TFileSetRangeArray; // disjoint ranges
|
||||
|
||||
int32_t tsdbTFileSetRangeClear(STFileSetRange **fsr);
|
||||
void tsdbTFileSetRangeArrayDestroy(TFileSetRangeArray **ppArr);
|
||||
void tsdbTFileSetRangeClear(STFileSetRange **fsr);
|
||||
void tsdbTFileSetRangeArrayDestroy(TFileSetRangeArray **ppArr);
|
||||
|
||||
// fset partition
|
||||
enum {
|
||||
|
|
|
@ -710,7 +710,7 @@ int32_t tsdbCommitCommit(STsdb *tsdb) {
|
|||
for (int32_t i = 0; i < taosArrayGetSize(tsdb->commitInfo->arr); i++) {
|
||||
SFileSetCommitInfo *info = *(SFileSetCommitInfo **)taosArrayGet(tsdb->commitInfo->arr, i);
|
||||
if (info->fset) {
|
||||
TAOS_UNUSED(tsdbFinishTaskOnFileSet(tsdb, info->fid));
|
||||
tsdbFinishTaskOnFileSet(tsdb, info->fid);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -741,7 +741,7 @@ int32_t tsdbCommitAbort(STsdb *pTsdb) {
|
|||
for (int32_t i = 0; i < taosArrayGetSize(pTsdb->commitInfo->arr); i++) {
|
||||
SFileSetCommitInfo *info = *(SFileSetCommitInfo **)taosArrayGet(pTsdb->commitInfo->arr, i);
|
||||
if (info->fset) {
|
||||
TAOS_UNUSED(tsdbFinishTaskOnFileSet(pTsdb, info->fid));
|
||||
tsdbFinishTaskOnFileSet(pTsdb, info->fid);
|
||||
}
|
||||
}
|
||||
(void)taosThreadMutexUnlock(&pTsdb->mutex);
|
||||
|
|
|
@ -117,7 +117,7 @@ static int32_t tsdbDataFileRAWWriterCloseAbort(SDataFileRAWWriter *writer) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t tsdbDataFileRAWWriterDoClose(SDataFileRAWWriter *writer) { return 0; }
|
||||
static void tsdbDataFileRAWWriterDoClose(SDataFileRAWWriter *writer) { return; }
|
||||
|
||||
static int32_t tsdbDataFileRAWWriterCloseCommit(SDataFileRAWWriter *writer, TFileOpArray *opArr) {
|
||||
int32_t code = 0;
|
||||
|
@ -200,7 +200,7 @@ int32_t tsdbDataFileRAWWriterClose(SDataFileRAWWriter **writer, bool abort, TFil
|
|||
} else {
|
||||
TAOS_CHECK_GOTO(tsdbDataFileRAWWriterCloseCommit(writer[0], opArr), &lino, _exit);
|
||||
}
|
||||
(void)tsdbDataFileRAWWriterDoClose(writer[0]);
|
||||
tsdbDataFileRAWWriterDoClose(writer[0]);
|
||||
}
|
||||
taosMemoryFree(writer[0]);
|
||||
writer[0] = NULL;
|
||||
|
|
|
@ -847,7 +847,7 @@ int32_t tsdbFileWriteBrinBlock(STsdbFD *fd, SBrinBlock *brinBlock, uint32_t cmpr
|
|||
for (int i = 0; i < brinBlock->numOfRecords; i++) {
|
||||
SBrinRecord record;
|
||||
|
||||
(void)tBrinBlockGet(brinBlock, i, &record);
|
||||
TAOS_CHECK_RETURN(tBrinBlockGet(brinBlock, i, &record));
|
||||
if (i == 0) {
|
||||
brinBlk.minTbid.suid = record.suid;
|
||||
brinBlk.minTbid.uid = record.uid;
|
||||
|
@ -1160,7 +1160,8 @@ static int32_t tsdbDataFileDoWriteTableOldData(SDataFileWriter *writer, const ST
|
|||
|
||||
for (; writer->ctx->brinBlockIdx < writer->ctx->brinBlock->numOfRecords; writer->ctx->brinBlockIdx++) {
|
||||
SBrinRecord record;
|
||||
(void)tBrinBlockGet(writer->ctx->brinBlock, writer->ctx->brinBlockIdx, &record);
|
||||
code = tBrinBlockGet(writer->ctx->brinBlock, writer->ctx->brinBlockIdx, &record);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
if (record.uid != writer->ctx->tbid->uid) {
|
||||
writer->ctx->tbHasOldData = false;
|
||||
goto _exit;
|
||||
|
@ -1170,7 +1171,8 @@ static int32_t tsdbDataFileDoWriteTableOldData(SDataFileWriter *writer, const ST
|
|||
goto _exit;
|
||||
} else {
|
||||
SBrinRecord record[1];
|
||||
(void)tBrinBlockGet(writer->ctx->brinBlock, writer->ctx->brinBlockIdx, record);
|
||||
code = tBrinBlockGet(writer->ctx->brinBlock, writer->ctx->brinBlockIdx, record);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
if (tsdbRowKeyCmprNullAsLargest(key, &record->lastKey) > 0) { // key > record->lastKey
|
||||
if (writer->blockData->nRow > 0) {
|
||||
TAOS_CHECK_GOTO(tsdbDataFileDoWriteBlockData(writer, writer->blockData), &lino, _exit);
|
||||
|
|
|
@ -747,8 +747,8 @@ _exit:
|
|||
return code;
|
||||
}
|
||||
|
||||
static void tsdbFSSetBlockCommit(STFileSet *fset, bool block);
|
||||
extern int32_t tsdbStopAllCompTask(STsdb *tsdb);
|
||||
static void tsdbFSSetBlockCommit(STFileSet *fset, bool block);
|
||||
extern void tsdbStopAllCompTask(STsdb *tsdb);
|
||||
|
||||
int32_t tsdbDisableAndCancelAllBgTask(STsdb *pTsdb) {
|
||||
STFileSystem *fs = pTsdb->pFS;
|
||||
|
@ -783,12 +783,15 @@ int32_t tsdbDisableAndCancelAllBgTask(STsdb *pTsdb) {
|
|||
// destroy all channels
|
||||
for (int32_t i = 0; i < taosArrayGetSize(channelArray); i++) {
|
||||
SVAChannelID *channel = taosArrayGet(channelArray, i);
|
||||
(void)vnodeAChannelDestroy(channel, true);
|
||||
int32_t code = vnodeAChannelDestroy(channel, true);
|
||||
if (code) {
|
||||
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
|
||||
}
|
||||
}
|
||||
taosArrayDestroy(channelArray);
|
||||
|
||||
#ifdef TD_ENTERPRISE
|
||||
(void)tsdbStopAllCompTask(pTsdb);
|
||||
tsdbStopAllCompTask(pTsdb);
|
||||
#endif
|
||||
return 0;
|
||||
}
|
||||
|
@ -802,7 +805,7 @@ void tsdbEnableBgTask(STsdb *pTsdb) {
|
|||
void tsdbCloseFS(STFileSystem **fs) {
|
||||
if (fs[0] == NULL) return;
|
||||
|
||||
(void)tsdbDisableAndCancelAllBgTask((*fs)->tsdb);
|
||||
TAOS_UNUSED(tsdbDisableAndCancelAllBgTask((*fs)->tsdb));
|
||||
close_file_system(fs[0]);
|
||||
destroy_fs(fs);
|
||||
return;
|
||||
|
@ -983,13 +986,12 @@ int32_t tsdbFSCreateCopySnapshot(STFileSystem *fs, TFileSetArray **fsetArr) {
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t tsdbFSDestroyCopySnapshot(TFileSetArray **fsetArr) {
|
||||
void tsdbFSDestroyCopySnapshot(TFileSetArray **fsetArr) {
|
||||
if (fsetArr[0]) {
|
||||
TARRAY2_DESTROY(fsetArr[0], tsdbTFileSetClear);
|
||||
taosMemoryFree(fsetArr[0]);
|
||||
fsetArr[0] = NULL;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tsdbFSCreateRefSnapshot(STFileSystem *fs, TFileSetArray **fsetArr) {
|
||||
|
@ -1101,7 +1103,7 @@ _out:
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t tsdbFSDestroyCopyRangedSnapshot(TFileSetArray **fsetArr) { return tsdbFSDestroyCopySnapshot(fsetArr); }
|
||||
void tsdbFSDestroyCopyRangedSnapshot(TFileSetArray **fsetArr) { tsdbFSDestroyCopySnapshot(fsetArr); }
|
||||
|
||||
int32_t tsdbFSCreateRefRangedSnapshot(STFileSystem *fs, int64_t sver, int64_t ever, TFileSetRangeArray *pRanges,
|
||||
TFileSetRangeArray **fsrArr) {
|
||||
|
@ -1157,7 +1159,7 @@ int32_t tsdbFSCreateRefRangedSnapshot(STFileSystem *fs, int64_t sver, int64_t ev
|
|||
(void)taosThreadMutexUnlock(&fs->tsdb->mutex);
|
||||
|
||||
if (code) {
|
||||
(void)tsdbTFileSetRangeClear(&fsr1);
|
||||
tsdbTFileSetRangeClear(&fsr1);
|
||||
TARRAY2_DESTROY(fsrArr[0], tsdbTFileSetRangeClear);
|
||||
fsrArr[0] = NULL;
|
||||
}
|
||||
|
@ -1195,7 +1197,7 @@ void tsdbBeginTaskOnFileSet(STsdb *tsdb, int32_t fid, STFileSet **fset) {
|
|||
}
|
||||
}
|
||||
|
||||
int32_t tsdbFinishTaskOnFileSet(STsdb *tsdb, int32_t fid) {
|
||||
void tsdbFinishTaskOnFileSet(STsdb *tsdb, int32_t fid) {
|
||||
int16_t sttTrigger = tsdb->pVnode->config.sttTrigger;
|
||||
if (sttTrigger == 1) {
|
||||
STFileSet *fset = NULL;
|
||||
|
@ -1208,6 +1210,4 @@ int32_t tsdbFinishTaskOnFileSet(STsdb *tsdb, int32_t fid) {
|
|||
tsdbInfo("vgId:%d finish task on file set:%d", TD_VID(tsdb->pVnode), fid);
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -41,14 +41,14 @@ int32_t tsdbOpenFS(STsdb *pTsdb, STFileSystem **fs, int8_t rollback);
|
|||
void tsdbCloseFS(STFileSystem **fs);
|
||||
// snapshot
|
||||
int32_t tsdbFSCreateCopySnapshot(STFileSystem *fs, TFileSetArray **fsetArr);
|
||||
int32_t tsdbFSDestroyCopySnapshot(TFileSetArray **fsetArr);
|
||||
void tsdbFSDestroyCopySnapshot(TFileSetArray **fsetArr);
|
||||
int32_t tsdbFSCreateRefSnapshot(STFileSystem *fs, TFileSetArray **fsetArr);
|
||||
int32_t tsdbFSCreateRefSnapshotWithoutLock(STFileSystem *fs, TFileSetArray **fsetArr);
|
||||
void tsdbFSDestroyRefSnapshot(TFileSetArray **fsetArr);
|
||||
|
||||
int32_t tsdbFSCreateCopyRangedSnapshot(STFileSystem *fs, TFileSetRangeArray *pExclude, TFileSetArray **fsetArr,
|
||||
TFileOpArray *fopArr);
|
||||
int32_t tsdbFSDestroyCopyRangedSnapshot(TFileSetArray **fsetArr);
|
||||
void tsdbFSDestroyCopyRangedSnapshot(TFileSetArray **fsetArr);
|
||||
int32_t tsdbFSCreateRefRangedSnapshot(STFileSystem *fs, int64_t sver, int64_t ever, TFileSetRangeArray *pRanges,
|
||||
TFileSetRangeArray **fsrArr);
|
||||
void tsdbFSDestroyRefRangedSnapshot(TFileSetRangeArray **fsrArr);
|
||||
|
@ -62,7 +62,7 @@ int32_t tsdbFSEditAbort(STFileSystem *fs);
|
|||
void tsdbFSGetFSet(STFileSystem *fs, int32_t fid, STFileSet **fset);
|
||||
int32_t tsdbFSCheckCommit(STsdb *tsdb, int32_t fid);
|
||||
void tsdbBeginTaskOnFileSet(STsdb *tsdb, int32_t fid, STFileSet **fset);
|
||||
int32_t tsdbFinishTaskOnFileSet(STsdb *tsdb, int32_t fid);
|
||||
void tsdbFinishTaskOnFileSet(STsdb *tsdb, int32_t fid);
|
||||
// utils
|
||||
int32_t save_fs(const TFileSetArray *arr, const char *fname);
|
||||
void current_fname(STsdb *pTsdb, char *fname, EFCurrentT ftype);
|
||||
|
|
|
@ -64,9 +64,16 @@ static int32_t tsdbSttLvlInitRef(STsdb *pTsdb, const SSttLvl *lvl1, SSttLvl **lv
|
|||
|
||||
STFileObj *fobj1;
|
||||
TARRAY2_FOREACH(lvl1->fobjArr, fobj1) {
|
||||
(void)tsdbTFileObjRef(fobj1);
|
||||
code = tsdbTFileObjRef(fobj1);
|
||||
if (code) {
|
||||
tsdbSttLvlClear(lvl);
|
||||
return code;
|
||||
}
|
||||
code = TARRAY2_APPEND(lvl[0]->fobjArr, fobj1);
|
||||
if (code) return code;
|
||||
if (code) {
|
||||
tsdbSttLvlClear(lvl);
|
||||
return code;
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
@ -99,7 +106,12 @@ static int32_t tsdbSttLvlFilteredInitEx(STsdb *pTsdb, const SSttLvl *lvl1, int64
|
|||
return 0;
|
||||
}
|
||||
|
||||
static void tsdbSttLvlRemoveFObj(void *data) { (void)tsdbTFileObjRemove(*(STFileObj **)data); }
|
||||
static void tsdbSttLvlRemoveFObj(void *data) {
|
||||
int32_t code = tsdbTFileObjRemove(*(STFileObj **)data);
|
||||
if (code) {
|
||||
tsdbError("failed to remove file obj, code:%d, error:%s", code, tstrerror(code));
|
||||
}
|
||||
}
|
||||
static void tsdbSttLvlRemove(SSttLvl **lvl) {
|
||||
TARRAY2_DESTROY(lvl[0]->fobjArr, tsdbSttLvlRemoveFObj);
|
||||
taosMemoryFree(lvl[0]);
|
||||
|
@ -348,7 +360,8 @@ int32_t tsdbTFileSetEdit(STsdb *pTsdb, STFileSet *fset, const STFileOp *op) {
|
|||
int32_t idx = TARRAY2_SEARCH_IDX(lvl->fobjArr, &tfobjp, tsdbTFileObjCmpr, TD_EQ);
|
||||
TARRAY2_REMOVE(lvl->fobjArr, idx, tsdbSttLvlClearFObj);
|
||||
} else {
|
||||
(void)tsdbTFileObjUnref(fset->farr[op->of.type]);
|
||||
code = tsdbTFileObjUnref(fset->farr[op->of.type]);
|
||||
if (code) return code;
|
||||
fset->farr[op->of.type] = NULL;
|
||||
}
|
||||
} else {
|
||||
|
@ -391,9 +404,11 @@ int32_t tsdbTFileSetApplyEdit(STsdb *pTsdb, const STFileSet *fset1, STFileSet *f
|
|||
}
|
||||
} else {
|
||||
if (fobj1->f->cid != fobj2->f->cid) {
|
||||
(void)tsdbTFileObjRemove(fobj2);
|
||||
code = tsdbTFileObjRemove(fobj2);
|
||||
if (code) return code;
|
||||
} else {
|
||||
(void)tsdbTFileObjRemoveUpdateLC(fobj2);
|
||||
code = tsdbTFileObjRemoveUpdateLC(fobj2);
|
||||
if (code) return code;
|
||||
}
|
||||
code = tsdbTFileObjInit(pTsdb, fobj1->f, &fset2->farr[ftype]);
|
||||
if (code) return code;
|
||||
|
@ -404,7 +419,8 @@ int32_t tsdbTFileSetApplyEdit(STsdb *pTsdb, const STFileSet *fset1, STFileSet *f
|
|||
if (code) return code;
|
||||
} else {
|
||||
// remove the file
|
||||
(void)tsdbTFileObjRemove(fobj2);
|
||||
code = tsdbTFileObjRemove(fobj2);
|
||||
if (code) return code;
|
||||
fset2->farr[ftype] = NULL;
|
||||
}
|
||||
}
|
||||
|
@ -570,7 +586,11 @@ int32_t tsdbTFileSetInitRef(STsdb *pTsdb, const STFileSet *fset1, STFileSet **fs
|
|||
for (int32_t ftype = TSDB_FTYPE_MIN; ftype < TSDB_FTYPE_MAX; ++ftype) {
|
||||
if (fset1->farr[ftype] == NULL) continue;
|
||||
|
||||
(void)tsdbTFileObjRef(fset1->farr[ftype]);
|
||||
code = tsdbTFileObjRef(fset1->farr[ftype]);
|
||||
if (code) {
|
||||
tsdbTFileSetClear(fset);
|
||||
return code;
|
||||
}
|
||||
fset[0]->farr[ftype] = fset1->farr[ftype];
|
||||
}
|
||||
|
||||
|
@ -595,13 +615,13 @@ int32_t tsdbTFileSetInitRef(STsdb *pTsdb, const STFileSet *fset1, STFileSet **fs
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t tsdbTFileSetRangeClear(STFileSetRange **fsr) {
|
||||
if (!fsr[0]) return 0;
|
||||
void tsdbTFileSetRangeClear(STFileSetRange **fsr) {
|
||||
if (!fsr[0]) return;
|
||||
|
||||
tsdbTFileSetClear(&fsr[0]->fset);
|
||||
taosMemoryFree(fsr[0]);
|
||||
fsr[0] = NULL;
|
||||
return 0;
|
||||
return;
|
||||
}
|
||||
|
||||
void tsdbTFileSetRangeArrayDestroy(TFileSetRangeArray **ppArr) {
|
||||
|
@ -616,7 +636,11 @@ void tsdbTFileSetClear(STFileSet **fset) {
|
|||
if (fset && *fset) {
|
||||
for (tsdb_ftype_t ftype = TSDB_FTYPE_MIN; ftype < TSDB_FTYPE_MAX; ++ftype) {
|
||||
if ((*fset)->farr[ftype] == NULL) continue;
|
||||
(void)tsdbTFileObjUnref((*fset)->farr[ftype]);
|
||||
int32_t code = tsdbTFileObjUnref((*fset)->farr[ftype]);
|
||||
if (code) {
|
||||
tsdbError("failed to unref file, fid:%d, ftype:%d", (*fset)->fid, ftype);
|
||||
}
|
||||
(*fset)->farr[ftype] = NULL;
|
||||
}
|
||||
|
||||
TARRAY2_DESTROY((*fset)->lvlArr, tsdbSttLvlClear);
|
||||
|
@ -632,7 +656,10 @@ void tsdbTFileSetRemove(STFileSet *fset) {
|
|||
|
||||
for (tsdb_ftype_t ftype = TSDB_FTYPE_MIN; ftype < TSDB_FTYPE_MAX; ++ftype) {
|
||||
if (fset->farr[ftype] != NULL) {
|
||||
(void)tsdbTFileObjRemove(fset->farr[ftype]);
|
||||
int32_t code = tsdbTFileObjRemove(fset->farr[ftype]);
|
||||
if (code) {
|
||||
tsdbError("failed to remove file, fid:%d, ftype:%d", fset->fid, ftype);
|
||||
}
|
||||
fset->farr[ftype] = NULL;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -249,8 +249,8 @@ int32_t tsdbTFileObjRef(STFileObj *fobj) {
|
|||
(void)taosThreadMutexLock(&fobj->mutex);
|
||||
|
||||
if (fobj->ref <= 0 || fobj->state != TSDB_FSTATE_LIVE) {
|
||||
(void)taosThreadMutexUnlock(&fobj->mutex);
|
||||
tsdbError("file %s, fobj:%p ref %d", fobj->fname, fobj, fobj->ref);
|
||||
(void)taosThreadMutexUnlock(&fobj->mutex);
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
|
@ -335,8 +335,8 @@ static void tsdbTFileObjRemoveLC(STFileObj *fobj, bool remove_all) {
|
|||
int32_t tsdbTFileObjRemove(STFileObj *fobj) {
|
||||
(void)taosThreadMutexLock(&fobj->mutex);
|
||||
if (fobj->state != TSDB_FSTATE_LIVE || fobj->ref <= 0) {
|
||||
(void)taosThreadMutexUnlock(&fobj->mutex);
|
||||
tsdbError("file %s, fobj:%p ref %d", fobj->fname, fobj, fobj->ref);
|
||||
(void)taosThreadMutexUnlock(&fobj->mutex);
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
fobj->state = TSDB_FSTATE_DEAD;
|
||||
|
|
|
@ -153,7 +153,8 @@ static int32_t tsdbDataIterNext(STsdbIter *iter, const TABLEID *tbid) {
|
|||
|
||||
for (; iter->dataData->brinBlockIdx < iter->dataData->brinBlock->numOfRecords; iter->dataData->brinBlockIdx++) {
|
||||
SBrinRecord record[1];
|
||||
(void)tBrinBlockGet(iter->dataData->brinBlock, iter->dataData->brinBlockIdx, record);
|
||||
code = tBrinBlockGet(iter->dataData->brinBlock, iter->dataData->brinBlockIdx, record);
|
||||
if (code) return code;
|
||||
|
||||
if (iter->filterByVersion && (record->maxVer < iter->range[0] || record->minVer > iter->range[1])) {
|
||||
continue;
|
||||
|
@ -224,7 +225,7 @@ static int32_t tsdbMemTableIterNext(STsdbIter *iter, const TABLEID *tbid) {
|
|||
|
||||
iter->row->row = row[0];
|
||||
|
||||
(void)tsdbTbDataIterNext(iter->memtData->tbIter);
|
||||
TAOS_UNUSED(tsdbTbDataIterNext(iter->memtData->tbIter));
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
|
|
|
@ -403,7 +403,11 @@ static int32_t tsdbGetOrCreateTbData(SMemTable *pMemTable, tb_uid_t suid, tb_uid
|
|||
pMemTable->aBucket[idx] = pTbData;
|
||||
pMemTable->nTbData++;
|
||||
|
||||
(void)tRBTreePut(pMemTable->tbDataTree, pTbData->rbtn);
|
||||
if (tRBTreePut(pMemTable->tbDataTree, pTbData->rbtn) == NULL) {
|
||||
taosWUnLockLatch(&pMemTable->latch);
|
||||
code = TSDB_CODE_INTERNAL_ERROR;
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
taosWUnLockLatch(&pMemTable->latch);
|
||||
|
||||
|
|
|
@ -77,9 +77,8 @@ static int32_t tsdbMergerClose(SMerger *merger) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t tsdbMergeFileSetEndCloseReader(SMerger *merger) {
|
||||
static void tsdbMergeFileSetEndCloseReader(SMerger *merger) {
|
||||
TARRAY2_CLEAR(merger->sttReaderArr, tsdbSttFileReaderClose);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t tsdbMergeFileSetBeginOpenReader(SMerger *merger) {
|
||||
|
@ -219,7 +218,7 @@ _exit:
|
|||
if (code) {
|
||||
tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(merger->tsdb->pVnode), __func__, __FILE__, lino,
|
||||
tstrerror(code));
|
||||
(void)tsdbMergeFileSetEndCloseReader(merger);
|
||||
tsdbMergeFileSetEndCloseReader(merger);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
@ -271,7 +270,9 @@ static int32_t tsdbMergeFileSetBeginOpenWriter(SMerger *merger) {
|
|||
|
||||
TAOS_CHECK_GOTO(tfsAllocDisk(merger->tsdb->pVnode->pTfs, level, &did), &lino, _exit);
|
||||
|
||||
(void)tfsMkdirRecurAt(merger->tsdb->pVnode->pTfs, merger->tsdb->path, did);
|
||||
code = tfsMkdirRecurAt(merger->tsdb->pVnode->pTfs, merger->tsdb->path, did);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
SFSetWriterConfig config = {
|
||||
.tsdb = merger->tsdb,
|
||||
.toSttOnly = true,
|
||||
|
@ -354,7 +355,7 @@ static int32_t tsdbMergeFileSetEnd(SMerger *merger) {
|
|||
|
||||
TAOS_CHECK_GOTO(tsdbMergeFileSetEndCloseIter(merger), &lino, _exit);
|
||||
|
||||
TAOS_CHECK_GOTO(tsdbMergeFileSetEndCloseReader(merger), &lino, _exit);
|
||||
tsdbMergeFileSetEndCloseReader(merger);
|
||||
|
||||
// edit file system
|
||||
TAOS_CHECK_GOTO(tsdbFSEditBegin(merger->tsdb->pFS, merger->fopArr, TSDB_FEDIT_MERGE), &lino, _exit);
|
||||
|
|
|
@ -17,7 +17,7 @@
|
|||
#include "tsdbFS2.h"
|
||||
|
||||
extern int32_t tsdbOpenCompMonitor(STsdb *tsdb);
|
||||
extern int32_t tsdbCloseCompMonitor(STsdb *tsdb);
|
||||
extern void tsdbCloseCompMonitor(STsdb *tsdb);
|
||||
|
||||
void tsdbSetKeepCfg(STsdb *pTsdb, STsdbCfg *pCfg) {
|
||||
STsdbKeepCfg *pKeepCfg = &pTsdb->keepCfg;
|
||||
|
@ -65,19 +65,23 @@ int32_t tsdbOpen(SVnode *pVnode, STsdb **ppTsdb, const char *dir, STsdbKeepCfg *
|
|||
|
||||
// create dir
|
||||
if (pVnode->pTfs) {
|
||||
(void)tfsMkdir(pVnode->pTfs, pTsdb->path);
|
||||
code = tfsMkdir(pVnode->pTfs, pTsdb->path);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
} else {
|
||||
(void)taosMkDir(pTsdb->path);
|
||||
code = taosMkDir(pTsdb->path);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
// open tsdb
|
||||
TAOS_CHECK_GOTO(tsdbOpenFS(pTsdb, &pTsdb->pFS, rollback), &lino, _exit);
|
||||
code = tsdbOpenFS(pTsdb, &pTsdb->pFS, rollback);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
if (pTsdb->pFS->fsstate == TSDB_FS_STATE_INCOMPLETE && force == false) {
|
||||
TAOS_CHECK_GOTO(TSDB_CODE_NEED_RETRY, &lino, _exit);
|
||||
}
|
||||
|
||||
TAOS_CHECK_GOTO(tsdbOpenCache(pTsdb), &lino, _exit);
|
||||
code = tsdbOpenCache(pTsdb);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
#ifdef TD_ENTERPRISE
|
||||
TAOS_CHECK_GOTO(tsdbOpenCompMonitor(pTsdb), &lino, _exit);
|
||||
|
@ -112,7 +116,7 @@ int32_t tsdbClose(STsdb **pTsdb) {
|
|||
tsdbCloseFS(&(*pTsdb)->pFS);
|
||||
tsdbCloseCache(*pTsdb);
|
||||
#ifdef TD_ENTERPRISE
|
||||
(void)tsdbCloseCompMonitor(*pTsdb);
|
||||
tsdbCloseCompMonitor(*pTsdb);
|
||||
#endif
|
||||
(void)taosThreadMutexDestroy(&(*pTsdb)->mutex);
|
||||
taosMemoryFreeClear(*pTsdb);
|
||||
|
|
|
@ -160,7 +160,8 @@ static int32_t tsdbWriteFilePage(STsdbFD *pFD, int32_t encryptAlgorithm, char *e
|
|||
TSDB_CHECK_CODE(code = TAOS_SYSTEM_ERROR(errno), lino, _exit);
|
||||
}
|
||||
|
||||
(void)taosCalcChecksumAppend(0, pFD->pBuf, pFD->szPage);
|
||||
code = taosCalcChecksumAppend(0, pFD->pBuf, pFD->szPage);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
if (encryptAlgorithm == DND_CA_SM4) {
|
||||
// if(tsiEncryptAlgorithm == DND_CA_SM4 && (tsiEncryptScope & DND_CS_TSDB) == DND_CS_TSDB){
|
||||
|
@ -635,9 +636,8 @@ _exit:
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t tsdbDataFReaderClose(SDataFReader **ppReader) {
|
||||
int32_t code = 0;
|
||||
if (*ppReader == NULL) return code;
|
||||
void tsdbDataFReaderClose(SDataFReader **ppReader) {
|
||||
if (*ppReader == NULL) return;
|
||||
|
||||
// head
|
||||
tsdbCloseFile(&(*ppReader)->pHeadFD);
|
||||
|
@ -660,7 +660,6 @@ int32_t tsdbDataFReaderClose(SDataFReader **ppReader) {
|
|||
}
|
||||
taosMemoryFree(*ppReader);
|
||||
*ppReader = NULL;
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tsdbReadBlockIdx(SDataFReader *pReader, SArray *aBlockIdx) {
|
||||
|
|
|
@ -255,7 +255,8 @@ static int32_t tsdbDoRetention(SRTNer *rtner) {
|
|||
SDiskID did;
|
||||
|
||||
TAOS_CHECK_GOTO(tfsAllocDisk(rtner->tsdb->pVnode->pTfs, expLevel, &did), &lino, _exit);
|
||||
(void)tfsMkdirRecurAt(rtner->tsdb->pVnode->pTfs, rtner->tsdb->path, did);
|
||||
code = tfsMkdirRecurAt(rtner->tsdb->pVnode->pTfs, rtner->tsdb->path, did);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
// data
|
||||
for (int32_t ftype = 0; ftype < TSDB_FTYPE_MAX && (fobj = fset->farr[ftype], 1); ++ftype) {
|
||||
|
@ -337,7 +338,7 @@ static int32_t tsdbRetention(void *arg) {
|
|||
_exit:
|
||||
if (rtner.fset) {
|
||||
(void)taosThreadMutexLock(&pTsdb->mutex);
|
||||
(void)tsdbFinishTaskOnFileSet(pTsdb, rtnArg->fid);
|
||||
tsdbFinishTaskOnFileSet(pTsdb, rtnArg->fid);
|
||||
(void)taosThreadMutexUnlock(&pTsdb->mutex);
|
||||
}
|
||||
|
||||
|
|
|
@ -102,7 +102,11 @@ static int32_t tsdbTFileSetToFSetPartition(STFileSet* fset, STsdbFSetPartition**
|
|||
}
|
||||
count++;
|
||||
SVersionRange vr = {.minVer = f->minVer, .maxVer = f->maxVer};
|
||||
(void)TARRAY2_SORT_INSERT(&p->verRanges[typ], vr, tVersionRangeCmprFn);
|
||||
code = TARRAY2_SORT_INSERT(&p->verRanges[typ], vr, tVersionRangeCmprFn);
|
||||
if (code) {
|
||||
tsdbFSetPartitionClear(&p);
|
||||
return code;
|
||||
}
|
||||
}
|
||||
|
||||
typ = TSDB_FSET_RANGE_TYP_STT;
|
||||
|
@ -120,12 +124,20 @@ static int32_t tsdbTFileSetToFSetPartition(STFileSet* fset, STsdbFSetPartition**
|
|||
}
|
||||
count++;
|
||||
SVersionRange vr = {.minVer = f->minVer, .maxVer = f->maxVer};
|
||||
(void)TARRAY2_SORT_INSERT(&p->verRanges[typ], vr, tVersionRangeCmprFn);
|
||||
code = TARRAY2_SORT_INSERT(&p->verRanges[typ], vr, tVersionRangeCmprFn);
|
||||
if (code) {
|
||||
tsdbFSetPartitionClear(&p);
|
||||
return code;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (corrupt && count == 0) {
|
||||
SVersionRange vr = {.minVer = VERSION_MIN, .maxVer = fset->maxVerValid};
|
||||
(void)TARRAY2_SORT_INSERT(&p->verRanges[typ], vr, tVersionRangeCmprFn);
|
||||
code = TARRAY2_SORT_INSERT(&p->verRanges[typ], vr, tVersionRangeCmprFn);
|
||||
if (code) {
|
||||
tsdbFSetPartitionClear(&p);
|
||||
return code;
|
||||
}
|
||||
}
|
||||
ppSP[0] = p;
|
||||
return 0;
|
||||
|
@ -182,7 +194,11 @@ int32_t tsdbFSetPartListToRangeDiff(STsdbFSetPartList* pList, TFileSetRangeArray
|
|||
r->sver = maxVerValid + 1;
|
||||
r->ever = VERSION_MAX;
|
||||
tsdbDebug("range diff fid:%" PRId64 ", sver:%" PRId64 ", ever:%" PRId64, part->fid, r->sver, r->ever);
|
||||
(void)TARRAY2_SORT_INSERT(pDiff, r, tsdbTFileSetRangeCmprFn);
|
||||
code = TARRAY2_SORT_INSERT(pDiff, r, tsdbTFileSetRangeCmprFn);
|
||||
if (code) {
|
||||
taosMemoryFree(r);
|
||||
goto _err;
|
||||
}
|
||||
}
|
||||
ppRanges[0] = pDiff;
|
||||
|
||||
|
|
|
@ -1127,7 +1127,7 @@ int32_t tsdbSnapWriterClose(STsdbSnapWriter** writer, int8_t rollback) {
|
|||
tsdbDataFileReaderClose(&writer[0]->ctx->dataReader);
|
||||
|
||||
TARRAY2_DESTROY(writer[0]->fopArr, NULL);
|
||||
TAOS_UNUSED(tsdbFSDestroyCopyRangedSnapshot(&writer[0]->fsetArr));
|
||||
tsdbFSDestroyCopyRangedSnapshot(&writer[0]->fsetArr);
|
||||
|
||||
for (int32_t i = 0; i < ARRAY_SIZE(writer[0]->buffers); ++i) {
|
||||
tBufferDestroy(writer[0]->buffers + i);
|
||||
|
|
|
@ -157,10 +157,9 @@ static int32_t tsdbSnapRAWReadFileSetOpenIter(STsdbSnapRAWReader* reader) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t tsdbSnapRAWReadFileSetCloseIter(STsdbSnapRAWReader* reader) {
|
||||
static void tsdbSnapRAWReadFileSetCloseIter(STsdbSnapRAWReader* reader) {
|
||||
reader->dataIter->count = 0;
|
||||
reader->dataIter->idx = 0;
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int64_t tsdbSnapRAWReadPeek(SDataFileRAWReader* reader) {
|
||||
|
@ -260,7 +259,7 @@ _exit:
|
|||
}
|
||||
|
||||
static int32_t tsdbSnapRAWReadEnd(STsdbSnapRAWReader* reader) {
|
||||
(void)tsdbSnapRAWReadFileSetCloseIter(reader);
|
||||
tsdbSnapRAWReadFileSetCloseIter(reader);
|
||||
tsdbSnapRAWReadFileSetCloseReader(reader);
|
||||
reader->ctx->fset = NULL;
|
||||
return 0;
|
||||
|
@ -410,7 +409,9 @@ static int32_t tsdbSnapRAWWriteFileSetBegin(STsdbSnapRAWWriter* writer, int32_t
|
|||
int32_t level = tsdbFidLevel(fid, &writer->tsdb->keepCfg, taosGetTimestampSec());
|
||||
code = tfsAllocDisk(writer->tsdb->pVnode->pTfs, level, &writer->ctx->did);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
(void)tfsMkdirRecurAt(writer->tsdb->pVnode->pTfs, writer->tsdb->path, writer->ctx->did);
|
||||
|
||||
code = tfsMkdirRecurAt(writer->tsdb->pVnode->pTfs, writer->tsdb->path, writer->ctx->did);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
code = tsdbSnapRAWWriteFileSetOpenWriter(writer);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
@ -489,7 +490,7 @@ int32_t tsdbSnapRAWWriterClose(STsdbSnapRAWWriter** writer, int8_t rollback) {
|
|||
}
|
||||
|
||||
TARRAY2_DESTROY(writer[0]->fopArr, NULL);
|
||||
(void)tsdbFSDestroyCopySnapshot(&writer[0]->fsetArr);
|
||||
tsdbFSDestroyCopySnapshot(&writer[0]->fsetArr);
|
||||
|
||||
taosMemoryFree(writer[0]);
|
||||
writer[0] = NULL;
|
||||
|
|
|
@ -590,11 +590,13 @@ static int32_t tsdbSttFileDoWriteStatisBlock(SSttFileWriter *writer) {
|
|||
statisBlk.cmprAlg = writer->config->cmprAlg;
|
||||
statisBlk.numOfPKs = statisBlock->numOfPKs;
|
||||
|
||||
(void)tStatisBlockGet(statisBlock, 0, &record);
|
||||
code = tStatisBlockGet(statisBlock, 0, &record);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
statisBlk.minTbid.suid = record.suid;
|
||||
statisBlk.minTbid.uid = record.uid;
|
||||
|
||||
(void)tStatisBlockGet(statisBlock, statisBlock->numOfRecords - 1, &record);
|
||||
code = tStatisBlockGet(statisBlock, statisBlock->numOfRecords - 1, &record);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
statisBlk.maxTbid.suid = record.suid;
|
||||
statisBlk.maxTbid.uid = record.uid;
|
||||
|
||||
|
|
|
@ -356,7 +356,7 @@ static int32_t tsdbUpgradeFileSet(STsdb *tsdb, SDFileSet *pDFileSet, TFileSetArr
|
|||
TAOS_CHECK_GOTO(tsdbUpgradeStt(tsdb, pDFileSet, reader, fset), &lino, _exit);
|
||||
}
|
||||
|
||||
(void)tsdbDataFReaderClose(&reader);
|
||||
tsdbDataFReaderClose(&reader);
|
||||
|
||||
TAOS_CHECK_GOTO(TARRAY2_APPEND(fileSetArray, fset), &lino, _exit);
|
||||
|
||||
|
|
|
@ -106,7 +106,7 @@ _exit:
|
|||
#endif
|
||||
|
||||
void tMapDataGetItemByIdx(SMapData *pMapData, int32_t idx, void *pItem, int32_t (*tGetItemFn)(uint8_t *, void *)) {
|
||||
(void)tGetItemFn(pMapData->pData + pMapData->aOffset[idx], pItem);
|
||||
TAOS_UNUSED(tGetItemFn(pMapData->pData + pMapData->aOffset[idx], pItem));
|
||||
}
|
||||
|
||||
#ifdef BUILD_NO_CALL
|
||||
|
|
Loading…
Reference in New Issue