enh: remove void with error code handle
This commit is contained in:
parent
77a5ab8721
commit
63b64a90c0
|
@ -243,7 +243,7 @@ int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable);
|
|||
void tsdbMemTableDestroy(SMemTable *pMemTable, bool proactive);
|
||||
STbData *tsdbGetTbDataFromMemTable(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid);
|
||||
int32_t tsdbRefMemTable(SMemTable *pMemTable, SQueryNode *pQNode);
|
||||
int32_t tsdbUnrefMemTable(SMemTable *pMemTable, SQueryNode *pNode, bool proactive);
|
||||
void tsdbUnrefMemTable(SMemTable *pMemTable, SQueryNode *pNode, bool proactive);
|
||||
// STbDataIter
|
||||
int32_t tsdbTbDataIterCreate(STbData *pTbData, STsdbRowKey *pFrom, int8_t backward, STbDataIter **ppIter);
|
||||
void *tsdbTbDataIterDestroy(STbDataIter *pIter);
|
||||
|
@ -679,7 +679,7 @@ typedef struct STFileSetRange STFileSetRange;
|
|||
typedef TARRAY2(STFileSetRange *) TFileSetRangeArray; // disjoint ranges
|
||||
|
||||
int32_t tsdbTFileSetRangeClear(STFileSetRange **fsr);
|
||||
int32_t tsdbTFileSetRangeArrayDestroy(TFileSetRangeArray **ppArr);
|
||||
void tsdbTFileSetRangeArrayDestroy(TFileSetRangeArray **ppArr);
|
||||
|
||||
// fset partition
|
||||
enum {
|
||||
|
@ -1069,6 +1069,8 @@ typedef enum {
|
|||
ETsdbFsState tsdbSnapGetFsState(SVnode *pVnode);
|
||||
int32_t tsdbSnapPrepDescription(SVnode *pVnode, SSnapshot *pSnap);
|
||||
|
||||
void tsdbRemoveFile(const char *path);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -130,7 +130,7 @@ void vnodeBufPoolRef(SVBufPool* pPool);
|
|||
void vnodeBufPoolUnRef(SVBufPool* pPool, bool proactive);
|
||||
int vnodeDecodeInfo(uint8_t* pData, SVnodeInfo* pInfo);
|
||||
|
||||
int32_t vnodeBufPoolRegisterQuery(SVBufPool* pPool, SQueryNode* pQNode);
|
||||
void vnodeBufPoolRegisterQuery(SVBufPool* pPool, SQueryNode* pQNode);
|
||||
void vnodeBufPoolDeregisterQuery(SVBufPool* pPool, SQueryNode* pQNode, bool proactive);
|
||||
|
||||
// meta
|
||||
|
@ -223,7 +223,7 @@ int tsdbScanAndConvertSubmitMsg(STsdb* pTsdb, SSubmitReq2* pMsg);
|
|||
int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq2* pMsg, SSubmitRsp2* pRsp);
|
||||
int32_t tsdbInsertTableData(STsdb* pTsdb, int64_t version, SSubmitTbData* pSubmitTbData, int32_t* affectedRows);
|
||||
int32_t tsdbDeleteTableData(STsdb* pTsdb, int64_t version, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey);
|
||||
int32_t tsdbSetKeepCfg(STsdb* pTsdb, STsdbCfg* pCfg);
|
||||
void tsdbSetKeepCfg(STsdb* pTsdb, STsdbCfg* pCfg);
|
||||
int64_t tsdbGetEarliestTs(STsdb* pTsdb);
|
||||
|
||||
// tq
|
||||
|
|
|
@ -1015,29 +1015,6 @@ static int32_t tsdbCacheUpdateValue(SValue *pOld, SValue *pNew) {
|
|||
TAOS_RETURN(TSDB_CODE_SUCCESS);
|
||||
}
|
||||
|
||||
#ifdef BUILD_NO_CALL
|
||||
static void tsdbCacheUpdateLastCol(SLastCol *pLastCol, SRowKey *pRowKey, SColVal *pColVal) {
|
||||
// update rowkey
|
||||
pLastCol->rowKey.ts = pRowKey->ts;
|
||||
pLastCol->rowKey.numOfPKs = pRowKey->numOfPKs;
|
||||
for (int8_t i = 0; i < pRowKey->numOfPKs; i++) {
|
||||
SValue *pPKValue = &pLastCol->rowKey.pks[i];
|
||||
SValue *pNewPKValue = &pRowKey->pks[i];
|
||||
|
||||
(void)tsdbCacheUpdateValue(pPKValue, pNewPKValue);
|
||||
}
|
||||
|
||||
// update colval
|
||||
pLastCol->colVal.cid = pColVal->cid;
|
||||
pLastCol->colVal.flag = pColVal->flag;
|
||||
(void)tsdbCacheUpdateValue(&pLastCol->colVal.value, &pColVal->value);
|
||||
|
||||
if (!pLastCol->dirty) {
|
||||
pLastCol->dirty = 1;
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
static void tsdbCacheUpdateLastColToNone(SLastCol *pLastCol, ELastCacheStatus cacheStatus) {
|
||||
// update rowkey
|
||||
pLastCol->rowKey.ts = TSKEY_MIN;
|
||||
|
@ -2538,7 +2515,7 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie
|
|||
if (!state->pBrinBlock) {
|
||||
state->pBrinBlock = &state->brinBlock;
|
||||
} else {
|
||||
(void)tBrinBlockClear(&state->brinBlock);
|
||||
tBrinBlockClear(&state->brinBlock);
|
||||
}
|
||||
|
||||
TAOS_CHECK_GOTO(tsdbDataFileReadBrinBlock(state->pr->pFileReader, pBrinBlk, &state->brinBlock), &lino, _err);
|
||||
|
@ -2550,7 +2527,7 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie
|
|||
if (SFSNEXTROW_BRINBLOCK == state->state) {
|
||||
_next_brinrecord:
|
||||
if (state->iBrinRecord < 0) { // empty brin block, goto _next_brinindex
|
||||
(void)tBrinBlockClear(&state->brinBlock);
|
||||
tBrinBlockClear(&state->brinBlock);
|
||||
goto _next_brinindex;
|
||||
}
|
||||
|
||||
|
@ -2809,7 +2786,7 @@ int32_t clearNextRowFromFS(void *iter) {
|
|||
}
|
||||
|
||||
if (state->pBrinBlock) {
|
||||
(void)tBrinBlockDestroy(state->pBrinBlock);
|
||||
tBrinBlockDestroy(state->pBrinBlock);
|
||||
state->pBrinBlock = NULL;
|
||||
}
|
||||
|
||||
|
@ -2842,7 +2819,7 @@ static void clearLastFileSet(SFSNextRowIter *state) {
|
|||
}
|
||||
|
||||
if (state->pr->pFileReader) {
|
||||
(void)tsdbDataFileReaderClose(&state->pr->pFileReader);
|
||||
tsdbDataFileReaderClose(&state->pr->pFileReader);
|
||||
state->pr->pFileReader = NULL;
|
||||
|
||||
state->pr->pCurFileSet = NULL;
|
||||
|
@ -2927,9 +2904,7 @@ _err:
|
|||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
static int32_t nextRowIterClose(CacheNextRowIter *pIter) {
|
||||
int32_t code = 0;
|
||||
|
||||
static void nextRowIterClose(CacheNextRowIter *pIter) {
|
||||
for (int i = 0; i < 3; ++i) {
|
||||
if (pIter->input[i].nextRowClearFn) {
|
||||
(void)pIter->input[i].nextRowClearFn(pIter->input[i].iter);
|
||||
|
@ -2943,9 +2918,6 @@ static int32_t nextRowIterClose(CacheNextRowIter *pIter) {
|
|||
if (pIter->pMemDelData) {
|
||||
taosArrayDestroy(pIter->pMemDelData);
|
||||
}
|
||||
|
||||
_err:
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
// iterate next row non deleted backward ts, version (from high to low)
|
||||
|
@ -3260,7 +3232,7 @@ static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SC
|
|||
}
|
||||
*ppLastArray = pColArray;
|
||||
|
||||
(void)nextRowIterClose(&iter);
|
||||
nextRowIterClose(&iter);
|
||||
taosArrayDestroy(aColArray);
|
||||
|
||||
TAOS_RETURN(code);
|
||||
|
@ -3381,7 +3353,7 @@ static int32_t mergeLastRowCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray,
|
|||
}
|
||||
*ppLastArray = pColArray;
|
||||
|
||||
(void)nextRowIterClose(&iter);
|
||||
nextRowIterClose(&iter);
|
||||
taosArrayDestroy(aColArray);
|
||||
|
||||
TAOS_RETURN(code);
|
||||
|
|
|
@ -387,7 +387,7 @@ void tsdbCacherowsReaderClose(void* pReader) {
|
|||
}
|
||||
|
||||
if (p->pFileReader) {
|
||||
(void) tsdbDataFileReaderClose(&p->pFileReader);
|
||||
tsdbDataFileReaderClose(&p->pFileReader);
|
||||
p->pFileReader = NULL;
|
||||
}
|
||||
|
||||
|
|
|
@ -251,8 +251,8 @@ _exit:
|
|||
}
|
||||
|
||||
static int32_t tsdbCommitCloseIter(SCommitter2 *committer) {
|
||||
TAOS_UNUSED(tsdbIterMergerClose(&committer->tombIterMerger));
|
||||
TAOS_UNUSED(tsdbIterMergerClose(&committer->dataIterMerger));
|
||||
tsdbIterMergerClose(&committer->tombIterMerger);
|
||||
tsdbIterMergerClose(&committer->dataIterMerger);
|
||||
TARRAY2_CLEAR(committer->tombIterArray, tsdbIterClose);
|
||||
TARRAY2_CLEAR(committer->dataIterArray, tsdbIterClose);
|
||||
return 0;
|
||||
|
@ -669,7 +669,7 @@ int32_t tsdbCommitBegin(STsdb *tsdb, SCommitInfo *info) {
|
|||
(void)taosThreadMutexLock(&tsdb->mutex);
|
||||
tsdb->imem = NULL;
|
||||
(void)taosThreadMutexUnlock(&tsdb->mutex);
|
||||
TAOS_UNUSED(tsdbUnrefMemTable(imem, NULL, true));
|
||||
tsdbUnrefMemTable(imem, NULL, true);
|
||||
} else {
|
||||
SCommitter2 committer = {0};
|
||||
|
||||
|
@ -717,7 +717,7 @@ int32_t tsdbCommitCommit(STsdb *tsdb) {
|
|||
(void)taosThreadMutexUnlock(&tsdb->mutex);
|
||||
|
||||
TAOS_UNUSED(tsdbCommitInfoDestroy(tsdb));
|
||||
TAOS_UNUSED(tsdbUnrefMemTable(pMemTable, NULL, true));
|
||||
tsdbUnrefMemTable(pMemTable, NULL, true);
|
||||
}
|
||||
|
||||
_exit:
|
||||
|
|
|
@ -150,9 +150,9 @@ _exit:
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t tsdbDataFileReaderClose(SDataFileReader **reader) {
|
||||
void tsdbDataFileReaderClose(SDataFileReader **reader) {
|
||||
if (reader[0] == NULL) {
|
||||
return 0;
|
||||
return;
|
||||
}
|
||||
|
||||
TARRAY2_DESTROY(reader[0]->tombBlkArray, NULL);
|
||||
|
@ -170,7 +170,6 @@ int32_t tsdbDataFileReaderClose(SDataFileReader **reader) {
|
|||
|
||||
taosMemoryFree(reader[0]);
|
||||
reader[0] = NULL;
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tsdbDataFileReadBrinBlk(SDataFileReader *reader, const TBrinBlkArray **brinBlkArray) {
|
||||
|
@ -230,7 +229,7 @@ int32_t tsdbDataFileReadBrinBlock(SDataFileReader *reader, const SBrinBlk *brinB
|
|||
|
||||
// decode brin block
|
||||
SBufferReader br = BUFFER_READER_INITIALIZER(0, buffer);
|
||||
(void)tBrinBlockClear(brinBlock);
|
||||
tBrinBlockClear(brinBlock);
|
||||
brinBlock->numOfPKs = brinBlk->numOfPKs;
|
||||
brinBlock->numOfRecords = brinBlk->numRec;
|
||||
for (int32_t i = 0; i < 10; i++) { // int64_t
|
||||
|
@ -677,20 +676,20 @@ static int32_t tsdbDataFileWriterCloseAbort(SDataFileWriter *writer) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t tsdbDataFileWriterDoClose(SDataFileWriter *writer) {
|
||||
static void tsdbDataFileWriterDoClose(SDataFileWriter *writer) {
|
||||
if (writer->ctx->reader) {
|
||||
(void)tsdbDataFileReaderClose(&writer->ctx->reader);
|
||||
tsdbDataFileReaderClose(&writer->ctx->reader);
|
||||
}
|
||||
|
||||
tTombBlockDestroy(writer->tombBlock);
|
||||
TARRAY2_DESTROY(writer->tombBlkArray, NULL);
|
||||
tBlockDataDestroy(writer->blockData);
|
||||
(void)tBrinBlockDestroy(writer->brinBlock);
|
||||
tBrinBlockDestroy(writer->brinBlock);
|
||||
TARRAY2_DESTROY(writer->brinBlkArray, NULL);
|
||||
|
||||
tTombBlockDestroy(writer->ctx->tombBlock);
|
||||
tBlockDataDestroy(writer->ctx->blockData);
|
||||
(void)tBrinBlockDestroy(writer->ctx->brinBlock);
|
||||
tBrinBlockDestroy(writer->ctx->brinBlock);
|
||||
|
||||
for (int32_t i = 0; i < ARRAY_SIZE(writer->local); ++i) {
|
||||
tBufferDestroy(writer->local + i);
|
||||
|
@ -698,7 +697,6 @@ static int32_t tsdbDataFileWriterDoClose(SDataFileWriter *writer) {
|
|||
|
||||
tDestroyTSchema(writer->skmRow->pTSchema);
|
||||
tDestroyTSchema(writer->skmTb->pTSchema);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t tsdbDataFileWriterDoOpenReader(SDataFileWriter *writer) {
|
||||
|
@ -819,10 +817,9 @@ _exit:
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t tsdbWriterUpdVerRange(SVersionRange *range, int64_t minVer, int64_t maxVer) {
|
||||
void tsdbWriterUpdVerRange(SVersionRange *range, int64_t minVer, int64_t maxVer) {
|
||||
range->minVer = TMIN(range->minVer, minVer);
|
||||
range->maxVer = TMAX(range->maxVer, maxVer);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tsdbFileWriteBrinBlock(STsdbFD *fd, SBrinBlock *brinBlock, uint32_t cmprAlg, int64_t *fileSize,
|
||||
|
@ -869,7 +866,7 @@ int32_t tsdbFileWriteBrinBlock(STsdbFD *fd, SBrinBlock *brinBlock, uint32_t cmpr
|
|||
}
|
||||
}
|
||||
|
||||
(void)tsdbWriterUpdVerRange(range, brinBlk.minVer, brinBlk.maxVer);
|
||||
tsdbWriterUpdVerRange(range, brinBlk.minVer, brinBlk.maxVer);
|
||||
|
||||
// write to file
|
||||
for (int32_t i = 0; i < 10; ++i) {
|
||||
|
@ -930,7 +927,7 @@ int32_t tsdbFileWriteBrinBlock(STsdbFD *fd, SBrinBlock *brinBlock, uint32_t cmpr
|
|||
// append to brinBlkArray
|
||||
TAOS_CHECK_RETURN(TARRAY2_APPEND_PTR(brinBlkArray, &brinBlk));
|
||||
|
||||
(void)tBrinBlockClear(brinBlock);
|
||||
tBrinBlockClear(brinBlock);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
@ -1032,7 +1029,7 @@ static int32_t tsdbDataFileDoWriteBlockData(SDataFileWriter *writer, SBlockData
|
|||
}
|
||||
}
|
||||
|
||||
(void)tsdbWriterUpdVerRange(&writer->ctx->range, record->minVer, record->maxVer);
|
||||
tsdbWriterUpdVerRange(&writer->ctx->range, record->minVer, record->maxVer);
|
||||
|
||||
code = metaGetColCmpr(writer->config->tsdb->pVnode->pMeta, bData->suid != 0 ? bData->suid : bData->uid,
|
||||
&cmprInfo.pColCmpr);
|
||||
|
@ -1383,7 +1380,7 @@ int32_t tsdbFileWriteTombBlock(STsdbFD *fd, STombBlock *tombBlock, int8_t cmprAl
|
|||
}
|
||||
}
|
||||
|
||||
(void)tsdbWriterUpdVerRange(range, tombBlk.minVer, tombBlk.maxVer);
|
||||
tsdbWriterUpdVerRange(range, tombBlk.minVer, tombBlk.maxVer);
|
||||
|
||||
for (int32_t i = 0; i < ARRAY_SIZE(tombBlock->buffers); i++) {
|
||||
tBufferClear(buffer0);
|
||||
|
@ -1615,10 +1612,9 @@ _exit:
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t tsdbTFileUpdVerRange(STFile *f, SVersionRange range) {
|
||||
void tsdbTFileUpdVerRange(STFile *f, SVersionRange range) {
|
||||
f->minVer = TMIN(f->minVer, range.minVer);
|
||||
f->maxVer = TMAX(f->maxVer, range.maxVer);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t tsdbDataFileWriterCloseCommit(SDataFileWriter *writer, TFileOpArray *opArr) {
|
||||
|
@ -1658,8 +1654,8 @@ static int32_t tsdbDataFileWriterCloseCommit(SDataFileWriter *writer, TFileOpArr
|
|||
.fid = writer->config->fid,
|
||||
.nf = writer->files[ftype],
|
||||
};
|
||||
(void)tsdbTFileUpdVerRange(&op.nf, ofRange);
|
||||
(void)tsdbTFileUpdVerRange(&op.nf, writer->ctx->range);
|
||||
tsdbTFileUpdVerRange(&op.nf, ofRange);
|
||||
tsdbTFileUpdVerRange(&op.nf, writer->ctx->range);
|
||||
TAOS_CHECK_GOTO(TARRAY2_APPEND(opArr, op), &lino, _exit);
|
||||
|
||||
// .data
|
||||
|
@ -1670,7 +1666,7 @@ static int32_t tsdbDataFileWriterCloseCommit(SDataFileWriter *writer, TFileOpArr
|
|||
.fid = writer->config->fid,
|
||||
.nf = writer->files[ftype],
|
||||
};
|
||||
(void)tsdbTFileUpdVerRange(&op.nf, writer->ctx->range);
|
||||
tsdbTFileUpdVerRange(&op.nf, writer->ctx->range);
|
||||
TAOS_CHECK_GOTO(TARRAY2_APPEND(opArr, op), &lino, _exit);
|
||||
} else if (writer->config->files[ftype].file.size != writer->files[ftype].size) {
|
||||
op = (STFileOp){
|
||||
|
@ -1679,7 +1675,7 @@ static int32_t tsdbDataFileWriterCloseCommit(SDataFileWriter *writer, TFileOpArr
|
|||
.of = writer->config->files[ftype].file,
|
||||
.nf = writer->files[ftype],
|
||||
};
|
||||
(void)tsdbTFileUpdVerRange(&op.nf, writer->ctx->range);
|
||||
tsdbTFileUpdVerRange(&op.nf, writer->ctx->range);
|
||||
TAOS_CHECK_GOTO(TARRAY2_APPEND(opArr, op), &lino, _exit);
|
||||
}
|
||||
|
||||
|
@ -1691,7 +1687,7 @@ static int32_t tsdbDataFileWriterCloseCommit(SDataFileWriter *writer, TFileOpArr
|
|||
.fid = writer->config->fid,
|
||||
.nf = writer->files[ftype],
|
||||
};
|
||||
(void)tsdbTFileUpdVerRange(&op.nf, writer->ctx->range);
|
||||
tsdbTFileUpdVerRange(&op.nf, writer->ctx->range);
|
||||
TAOS_CHECK_GOTO(TARRAY2_APPEND(opArr, op), &lino, _exit);
|
||||
} else if (writer->config->files[ftype].file.size != writer->files[ftype].size) {
|
||||
op = (STFileOp){
|
||||
|
@ -1700,7 +1696,7 @@ static int32_t tsdbDataFileWriterCloseCommit(SDataFileWriter *writer, TFileOpArr
|
|||
.of = writer->config->files[ftype].file,
|
||||
.nf = writer->files[ftype],
|
||||
};
|
||||
(void)tsdbTFileUpdVerRange(&op.nf, writer->ctx->range);
|
||||
tsdbTFileUpdVerRange(&op.nf, writer->ctx->range);
|
||||
TAOS_CHECK_GOTO(TARRAY2_APPEND(opArr, op), &lino, _exit);
|
||||
}
|
||||
}
|
||||
|
@ -1734,8 +1730,8 @@ static int32_t tsdbDataFileWriterCloseCommit(SDataFileWriter *writer, TFileOpArr
|
|||
.fid = writer->config->fid,
|
||||
.nf = writer->files[ftype],
|
||||
};
|
||||
(void)tsdbTFileUpdVerRange(&op.nf, ofRange);
|
||||
(void)tsdbTFileUpdVerRange(&op.nf, writer->ctx->tombRange);
|
||||
tsdbTFileUpdVerRange(&op.nf, ofRange);
|
||||
tsdbTFileUpdVerRange(&op.nf, writer->ctx->tombRange);
|
||||
TAOS_CHECK_GOTO(TARRAY2_APPEND(opArr, op), &lino, _exit);
|
||||
}
|
||||
int32_t encryptAlgorithm = writer->config->tsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
|
||||
|
@ -1822,7 +1818,7 @@ int32_t tsdbDataFileWriterClose(SDataFileWriter **writer, bool abort, TFileOpArr
|
|||
} else {
|
||||
TAOS_CHECK_GOTO(tsdbDataFileWriterCloseCommit(writer[0], opArr), &lino, _exit);
|
||||
}
|
||||
(void)tsdbDataFileWriterDoClose(writer[0]);
|
||||
tsdbDataFileWriterDoClose(writer[0]);
|
||||
}
|
||||
taosMemoryFree(writer[0]);
|
||||
writer[0] = NULL;
|
||||
|
|
|
@ -51,7 +51,7 @@ typedef struct SDataFileReaderConfig {
|
|||
|
||||
int32_t tsdbDataFileReaderOpen(const char *fname[/* TSDB_FTYPE_MAX */], const SDataFileReaderConfig *config,
|
||||
SDataFileReader **reader);
|
||||
int32_t tsdbDataFileReaderClose(SDataFileReader **reader);
|
||||
void tsdbDataFileReaderClose(SDataFileReader **reader);
|
||||
// .head
|
||||
int32_t tsdbDataFileReadBrinBlk(SDataFileReader *reader, const TBrinBlkArray **brinBlkArray);
|
||||
int32_t tsdbDataFileReadBrinBlock(SDataFileReader *reader, const SBrinBlk *brinBlk, SBrinBlock *brinBlock);
|
||||
|
@ -115,8 +115,8 @@ int32_t tsdbFileWriteTombFooter(STsdbFD *fd, const STombFooter *footer, int64_t
|
|||
char *encryptKey);
|
||||
|
||||
// utils
|
||||
int32_t tsdbWriterUpdVerRange(SVersionRange *range, int64_t minVer, int64_t maxVer);
|
||||
int32_t tsdbTFileUpdVerRange(STFile *f, SVersionRange range);
|
||||
void tsdbWriterUpdVerRange(SVersionRange *range, int64_t minVer, int64_t maxVer);
|
||||
void tsdbTFileUpdVerRange(STFile *f, SVersionRange range);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -340,21 +340,21 @@ static int32_t tsdbRemoveFileSet(STsdb *pTsdb, SDFileSet *pSet) {
|
|||
int32_t nRef = atomic_sub_fetch_32(&pSet->pHeadF->nRef, 1);
|
||||
if (nRef == 0) {
|
||||
tsdbHeadFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pHeadF, fname);
|
||||
(void)taosRemoveFile(fname);
|
||||
tsdbRemoveFile(fname);
|
||||
taosMemoryFree(pSet->pHeadF);
|
||||
}
|
||||
|
||||
nRef = atomic_sub_fetch_32(&pSet->pDataF->nRef, 1);
|
||||
if (nRef == 0) {
|
||||
tsdbDataFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pDataF, fname);
|
||||
(void)taosRemoveFile(fname);
|
||||
tsdbRemoveFile(fname);
|
||||
taosMemoryFree(pSet->pDataF);
|
||||
}
|
||||
|
||||
nRef = atomic_sub_fetch_32(&pSet->pSmaF->nRef, 1);
|
||||
if (nRef == 0) {
|
||||
tsdbSmaFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pSmaF, fname);
|
||||
(void)taosRemoveFile(fname);
|
||||
tsdbRemoveFile(fname);
|
||||
taosMemoryFree(pSet->pSmaF);
|
||||
}
|
||||
|
||||
|
@ -362,7 +362,7 @@ static int32_t tsdbRemoveFileSet(STsdb *pTsdb, SDFileSet *pSet) {
|
|||
nRef = atomic_sub_fetch_32(&pSet->aSttF[iStt]->nRef, 1);
|
||||
if (nRef == 0) {
|
||||
tsdbSttFileName(pTsdb, pSet->diskId, pSet->fid, pSet->aSttF[iStt], fname);
|
||||
(void)taosRemoveFile(fname);
|
||||
tsdbRemoveFile(fname);
|
||||
taosMemoryFree(pSet->aSttF[iStt]);
|
||||
}
|
||||
}
|
||||
|
@ -449,7 +449,7 @@ static int32_t tsdbMergeFileSet(STsdb *pTsdb, SDFileSet *pSetOld, SDFileSet *pSe
|
|||
nRef = atomic_sub_fetch_32(&pHeadF->nRef, 1);
|
||||
if (nRef == 0) {
|
||||
tsdbHeadFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pHeadF, fname);
|
||||
(void)taosRemoveFile(fname);
|
||||
tsdbRemoveFile(fname);
|
||||
taosMemoryFree(pHeadF);
|
||||
}
|
||||
} else {
|
||||
|
@ -472,7 +472,7 @@ static int32_t tsdbMergeFileSet(STsdb *pTsdb, SDFileSet *pSetOld, SDFileSet *pSe
|
|||
nRef = atomic_sub_fetch_32(&pDataF->nRef, 1);
|
||||
if (nRef == 0) {
|
||||
tsdbDataFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pDataF, fname);
|
||||
(void)taosRemoveFile(fname);
|
||||
tsdbRemoveFile(fname);
|
||||
taosMemoryFree(pDataF);
|
||||
}
|
||||
} else {
|
||||
|
@ -493,7 +493,7 @@ static int32_t tsdbMergeFileSet(STsdb *pTsdb, SDFileSet *pSetOld, SDFileSet *pSe
|
|||
nRef = atomic_sub_fetch_32(&pSmaF->nRef, 1);
|
||||
if (nRef == 0) {
|
||||
tsdbSmaFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSmaF, fname);
|
||||
(void)taosRemoveFile(fname);
|
||||
tsdbRemoveFile(fname);
|
||||
taosMemoryFree(pSmaF);
|
||||
}
|
||||
} else {
|
||||
|
@ -517,7 +517,7 @@ static int32_t tsdbMergeFileSet(STsdb *pTsdb, SDFileSet *pSetOld, SDFileSet *pSe
|
|||
nRef = atomic_sub_fetch_32(&pSttFile->nRef, 1);
|
||||
if (nRef == 0) {
|
||||
tsdbSttFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSttFile, fname);
|
||||
(void)taosRemoveFile(fname);
|
||||
tsdbRemoveFile(fname);
|
||||
taosMemoryFree(pSttFile);
|
||||
}
|
||||
pSetOld->aSttF[iStt] = NULL;
|
||||
|
@ -538,7 +538,7 @@ static int32_t tsdbMergeFileSet(STsdb *pTsdb, SDFileSet *pSetOld, SDFileSet *pSe
|
|||
nRef = atomic_sub_fetch_32(&pSttFile->nRef, 1);
|
||||
if (nRef == 0) {
|
||||
tsdbSttFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSttFile, fname);
|
||||
(void)taosRemoveFile(fname);
|
||||
tsdbRemoveFile(fname);
|
||||
taosMemoryFree(pSttFile);
|
||||
}
|
||||
|
||||
|
@ -563,7 +563,7 @@ static int32_t tsdbMergeFileSet(STsdb *pTsdb, SDFileSet *pSetOld, SDFileSet *pSe
|
|||
nRef = atomic_sub_fetch_32(&pSttFile->nRef, 1);
|
||||
if (nRef == 0) {
|
||||
tsdbSttFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSttFile, fname);
|
||||
(void)taosRemoveFile(fname);
|
||||
tsdbRemoveFile(fname);
|
||||
taosMemoryFree(pSttFile);
|
||||
}
|
||||
}
|
||||
|
@ -619,7 +619,7 @@ static int32_t tsdbFSApplyChange(STsdb *pTsdb, STsdbFS *pFS) {
|
|||
nRef = atomic_sub_fetch_32(&pDelFile->nRef, 1);
|
||||
if (nRef == 0) {
|
||||
tsdbDelFileName(pTsdb, pDelFile, fname);
|
||||
(void)taosRemoveFile(fname);
|
||||
tsdbRemoveFile(fname);
|
||||
taosMemoryFree(pDelFile);
|
||||
}
|
||||
}
|
||||
|
@ -629,7 +629,7 @@ static int32_t tsdbFSApplyChange(STsdb *pTsdb, STsdbFS *pFS) {
|
|||
nRef = atomic_sub_fetch_32(&pTsdb->fs.pDelFile->nRef, 1);
|
||||
if (nRef == 0) {
|
||||
tsdbDelFileName(pTsdb, pTsdb->fs.pDelFile, fname);
|
||||
(void)taosRemoveFile(fname);
|
||||
tsdbRemoveFile(fname);
|
||||
taosMemoryFree(pTsdb->fs.pDelFile);
|
||||
}
|
||||
pTsdb->fs.pDelFile = NULL;
|
||||
|
@ -741,7 +741,7 @@ static int32_t tsdbFSRollback(STsdb *pTsdb) {
|
|||
|
||||
char current_t[TSDB_FILENAME_LEN] = {0};
|
||||
tsdbGetCurrentFName(pTsdb, NULL, current_t);
|
||||
(void)taosRemoveFile(current_t);
|
||||
tsdbRemoveFile(current_t);
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
|
|
|
@ -20,8 +20,6 @@
|
|||
|
||||
#define BLOCK_COMMIT_FACTOR 3
|
||||
|
||||
extern void remove_file(const char *fname);
|
||||
|
||||
typedef struct STFileHashEntry {
|
||||
struct STFileHashEntry *next;
|
||||
char fname[TSDB_FILENAME_LEN];
|
||||
|
@ -55,25 +53,22 @@ static int32_t create_fs(STsdb *pTsdb, STFileSystem **fs) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t destroy_fs(STFileSystem **fs) {
|
||||
if (fs[0] == NULL) return 0;
|
||||
static void destroy_fs(STFileSystem **fs) {
|
||||
if (fs[0] == NULL) return;
|
||||
|
||||
TARRAY2_DESTROY(fs[0]->fSetArr, NULL);
|
||||
TARRAY2_DESTROY(fs[0]->fSetArrTmp, NULL);
|
||||
(void)tsem_destroy(&fs[0]->canEdit);
|
||||
taosMemoryFree(fs[0]);
|
||||
fs[0] = NULL;
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t current_fname(STsdb *pTsdb, char *fname, EFCurrentT ftype) {
|
||||
void current_fname(STsdb *pTsdb, char *fname, EFCurrentT ftype) {
|
||||
int32_t offset = 0;
|
||||
|
||||
vnodeGetPrimaryDir(pTsdb->path, pTsdb->pVnode->diskPrimary, pTsdb->pVnode->pTfs, fname, TSDB_FILENAME_LEN);
|
||||
offset = strlen(fname);
|
||||
snprintf(fname + offset, TSDB_FILENAME_LEN - offset - 1, "%s%s", TD_DIRSEP, gCurrentFname[ftype]);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t save_json(const cJSON *json, const char *fname) {
|
||||
|
@ -300,11 +295,11 @@ static int32_t commit_edit(STFileSystem *fs) {
|
|||
char current[TSDB_FILENAME_LEN];
|
||||
char current_t[TSDB_FILENAME_LEN];
|
||||
|
||||
(void)current_fname(fs->tsdb, current, TSDB_FCURRENT);
|
||||
current_fname(fs->tsdb, current, TSDB_FCURRENT);
|
||||
if (fs->etype == TSDB_FEDIT_COMMIT) {
|
||||
(void)current_fname(fs->tsdb, current_t, TSDB_FCURRENT_C);
|
||||
current_fname(fs->tsdb, current_t, TSDB_FCURRENT_C);
|
||||
} else {
|
||||
(void)current_fname(fs->tsdb, current_t, TSDB_FCURRENT_M);
|
||||
current_fname(fs->tsdb, current_t, TSDB_FCURRENT_M);
|
||||
}
|
||||
|
||||
int32_t code;
|
||||
|
@ -335,9 +330,9 @@ static int32_t abort_edit(STFileSystem *fs) {
|
|||
char fname[TSDB_FILENAME_LEN];
|
||||
|
||||
if (fs->etype == TSDB_FEDIT_COMMIT) {
|
||||
(void)current_fname(fs->tsdb, fname, TSDB_FCURRENT_C);
|
||||
current_fname(fs->tsdb, fname, TSDB_FCURRENT_C);
|
||||
} else {
|
||||
(void)current_fname(fs->tsdb, fname, TSDB_FCURRENT_M);
|
||||
current_fname(fs->tsdb, fname, TSDB_FCURRENT_M);
|
||||
}
|
||||
|
||||
int32_t code;
|
||||
|
@ -421,7 +416,7 @@ static int32_t tsdbFSCreateFileObjHash(STFileSystem *fs, STFileHash *hash) {
|
|||
}
|
||||
|
||||
// vnode.json
|
||||
(void)current_fname(fs->tsdb, fname, TSDB_FCURRENT);
|
||||
current_fname(fs->tsdb, fname, TSDB_FCURRENT);
|
||||
code = tsdbFSAddEntryToFileObjHash(hash, fname);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
|
@ -538,7 +533,7 @@ static int32_t tsdbFSDoSanAndFix(STFileSystem *fs) {
|
|||
if (tsdbFSGetFileObjHashEntry(&fobjHash, file->aname) == NULL &&
|
||||
strncmp(file->aname + strlen(file->aname) - 3, ".cp", 3) &&
|
||||
strncmp(file->aname + strlen(file->aname) - 5, ".data", 5)) {
|
||||
remove_file(file->aname);
|
||||
tsdbRemoveFile(file->aname);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -605,9 +600,9 @@ static int32_t open_fs(STFileSystem *fs, int8_t rollback) {
|
|||
char cCurrent[TSDB_FILENAME_LEN];
|
||||
char mCurrent[TSDB_FILENAME_LEN];
|
||||
|
||||
(void)current_fname(pTsdb, fCurrent, TSDB_FCURRENT);
|
||||
(void)current_fname(pTsdb, cCurrent, TSDB_FCURRENT_C);
|
||||
(void)current_fname(pTsdb, mCurrent, TSDB_FCURRENT_M);
|
||||
current_fname(pTsdb, fCurrent, TSDB_FCURRENT);
|
||||
current_fname(pTsdb, cCurrent, TSDB_FCURRENT_C);
|
||||
current_fname(pTsdb, mCurrent, TSDB_FCURRENT_M);
|
||||
|
||||
if (taosCheckExistFile(fCurrent)) { // current.json exists
|
||||
code = load_fs(pTsdb, fCurrent, fs->fSetArr);
|
||||
|
@ -746,14 +741,14 @@ int32_t tsdbOpenFS(STsdb *pTsdb, STFileSystem **fs, int8_t rollback) {
|
|||
_exit:
|
||||
if (code) {
|
||||
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
|
||||
(void)destroy_fs(fs);
|
||||
destroy_fs(fs);
|
||||
} else {
|
||||
tsdbInfo("vgId:%d %s success", TD_VID(pTsdb->pVnode), __func__);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbFSSetBlockCommit(STFileSet *fset, bool block);
|
||||
static void tsdbFSSetBlockCommit(STFileSet *fset, bool block);
|
||||
extern int32_t tsdbStopAllCompTask(STsdb *tsdb);
|
||||
|
||||
int32_t tsdbDisableAndCancelAllBgTask(STsdb *pTsdb) {
|
||||
|
@ -779,7 +774,7 @@ int32_t tsdbDisableAndCancelAllBgTask(STsdb *pTsdb) {
|
|||
}
|
||||
fset->channel = (SVAChannelID){0};
|
||||
fset->mergeScheduled = false;
|
||||
(void)tsdbFSSetBlockCommit(fset, false);
|
||||
tsdbFSSetBlockCommit(fset, false);
|
||||
fset->channelOpened = false;
|
||||
}
|
||||
}
|
||||
|
@ -811,7 +806,7 @@ int32_t tsdbCloseFS(STFileSystem **fs) {
|
|||
|
||||
(void)tsdbDisableAndCancelAllBgTask((*fs)->tsdb);
|
||||
(void)close_file_system(fs[0]);
|
||||
(void)destroy_fs(fs);
|
||||
destroy_fs(fs);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -834,9 +829,9 @@ int32_t tsdbFSEditBegin(STFileSystem *fs, const TFileOpArray *opArray, EFEditT e
|
|||
char current_t[TSDB_FILENAME_LEN];
|
||||
|
||||
if (etype == TSDB_FEDIT_COMMIT) {
|
||||
(void)current_fname(fs->tsdb, current_t, TSDB_FCURRENT_C);
|
||||
current_fname(fs->tsdb, current_t, TSDB_FCURRENT_C);
|
||||
} else {
|
||||
(void)current_fname(fs->tsdb, current_t, TSDB_FCURRENT_M);
|
||||
current_fname(fs->tsdb, current_t, TSDB_FCURRENT_M);
|
||||
}
|
||||
|
||||
(void)tsem_wait(&fs->canEdit);
|
||||
|
@ -860,7 +855,7 @@ _exit:
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbFSSetBlockCommit(STFileSet *fset, bool block) {
|
||||
static void tsdbFSSetBlockCommit(STFileSet *fset, bool block) {
|
||||
if (block) {
|
||||
fset->blockCommit = true;
|
||||
} else {
|
||||
|
@ -869,13 +864,12 @@ static int32_t tsdbFSSetBlockCommit(STFileSet *fset, bool block) {
|
|||
(void)taosThreadCondSignal(&fset->canCommit);
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tsdbFSCheckCommit(STsdb *tsdb, int32_t fid) {
|
||||
(void)taosThreadMutexLock(&tsdb->mutex);
|
||||
STFileSet *fset;
|
||||
(void)tsdbFSGetFSet(tsdb->pFS, fid, &fset);
|
||||
tsdbFSGetFSet(tsdb->pFS, fid, &fset);
|
||||
if (fset) {
|
||||
while (fset->blockCommit) {
|
||||
fset->numWaitCommit++;
|
||||
|
@ -902,13 +896,13 @@ int32_t tsdbFSEditCommit(STFileSystem *fs) {
|
|||
STFileSet *fset;
|
||||
TARRAY2_FOREACH_REVERSE(fs->fSetArr, fset) {
|
||||
if (TARRAY2_SIZE(fset->lvlArr) == 0) {
|
||||
(void)tsdbFSSetBlockCommit(fset, false);
|
||||
tsdbFSSetBlockCommit(fset, false);
|
||||
continue;
|
||||
}
|
||||
|
||||
SSttLvl *lvl = TARRAY2_FIRST(fset->lvlArr);
|
||||
if (lvl->level != 0) {
|
||||
(void)tsdbFSSetBlockCommit(fset, false);
|
||||
tsdbFSSetBlockCommit(fset, false);
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -933,9 +927,9 @@ int32_t tsdbFSEditCommit(STFileSystem *fs) {
|
|||
}
|
||||
|
||||
if (numFile >= sttTrigger * BLOCK_COMMIT_FACTOR) {
|
||||
(void)tsdbFSSetBlockCommit(fset, true);
|
||||
tsdbFSSetBlockCommit(fset, true);
|
||||
} else {
|
||||
(void)tsdbFSSetBlockCommit(fset, false);
|
||||
tsdbFSSetBlockCommit(fset, false);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -956,12 +950,11 @@ int32_t tsdbFSEditAbort(STFileSystem *fs) {
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t tsdbFSGetFSet(STFileSystem *fs, int32_t fid, STFileSet **fset) {
|
||||
void tsdbFSGetFSet(STFileSystem *fs, int32_t fid, STFileSet **fset) {
|
||||
STFileSet tfset = {.fid = fid};
|
||||
STFileSet *pset = &tfset;
|
||||
STFileSet **fsetPtr = TARRAY2_SEARCH(fs->fSetArr, &pset, tsdbTFileSetCmprFn, TD_EQ);
|
||||
fset[0] = (fsetPtr == NULL) ? NULL : fsetPtr[0];
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tsdbFSCreateCopySnapshot(STFileSystem *fs, TFileSetArray **fsetArr) {
|
||||
|
@ -1180,12 +1173,12 @@ _out:
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t tsdbFSDestroyRefRangedSnapshot(TFileSetRangeArray **fsrArr) { return tsdbTFileSetRangeArrayDestroy(fsrArr); }
|
||||
void tsdbFSDestroyRefRangedSnapshot(TFileSetRangeArray **fsrArr) { tsdbTFileSetRangeArrayDestroy(fsrArr); }
|
||||
|
||||
int32_t tsdbBeginTaskOnFileSet(STsdb *tsdb, int32_t fid, STFileSet **fset) {
|
||||
int16_t sttTrigger = tsdb->pVnode->config.sttTrigger;
|
||||
|
||||
(void)tsdbFSGetFSet(tsdb->pFS, fid, fset);
|
||||
tsdbFSGetFSet(tsdb->pFS, fid, fset);
|
||||
if (sttTrigger == 1 && (*fset)) {
|
||||
for (;;) {
|
||||
if ((*fset)->taskRunning) {
|
||||
|
@ -1193,7 +1186,7 @@ int32_t tsdbBeginTaskOnFileSet(STsdb *tsdb, int32_t fid, STFileSet **fset) {
|
|||
|
||||
(void)taosThreadCondWait(&(*fset)->beginTask, &tsdb->mutex);
|
||||
|
||||
(void)tsdbFSGetFSet(tsdb->pFS, fid, fset);
|
||||
tsdbFSGetFSet(tsdb->pFS, fid, fset);
|
||||
|
||||
(*fset)->numWaitTask--;
|
||||
} else {
|
||||
|
@ -1211,7 +1204,7 @@ int32_t tsdbFinishTaskOnFileSet(STsdb *tsdb, int32_t fid) {
|
|||
int16_t sttTrigger = tsdb->pVnode->config.sttTrigger;
|
||||
if (sttTrigger == 1) {
|
||||
STFileSet *fset = NULL;
|
||||
(void)tsdbFSGetFSet(tsdb->pFS, fid, &fset);
|
||||
tsdbFSGetFSet(tsdb->pFS, fid, &fset);
|
||||
if (fset != NULL && fset->taskRunning) {
|
||||
fset->taskRunning = false;
|
||||
if (fset->numWaitTask > 0) {
|
||||
|
|
|
@ -51,7 +51,7 @@ int32_t tsdbFSCreateCopyRangedSnapshot(STFileSystem *fs, TFileSetRangeArray *pEx
|
|||
int32_t tsdbFSDestroyCopyRangedSnapshot(TFileSetArray **fsetArr);
|
||||
int32_t tsdbFSCreateRefRangedSnapshot(STFileSystem *fs, int64_t sver, int64_t ever, TFileSetRangeArray *pRanges,
|
||||
TFileSetRangeArray **fsrArr);
|
||||
int32_t tsdbFSDestroyRefRangedSnapshot(TFileSetRangeArray **fsrArr);
|
||||
void tsdbFSDestroyRefRangedSnapshot(TFileSetRangeArray **fsrArr);
|
||||
// txn
|
||||
int64_t tsdbFSAllocEid(STFileSystem *fs);
|
||||
void tsdbFSUpdateEid(STFileSystem *fs, int64_t cid);
|
||||
|
@ -59,13 +59,13 @@ int32_t tsdbFSEditBegin(STFileSystem *fs, const TFileOpArray *opArray, EFEditT e
|
|||
int32_t tsdbFSEditCommit(STFileSystem *fs);
|
||||
int32_t tsdbFSEditAbort(STFileSystem *fs);
|
||||
// other
|
||||
int32_t tsdbFSGetFSet(STFileSystem *fs, int32_t fid, STFileSet **fset);
|
||||
void tsdbFSGetFSet(STFileSystem *fs, int32_t fid, STFileSet **fset);
|
||||
int32_t tsdbFSCheckCommit(STsdb *tsdb, int32_t fid);
|
||||
int32_t tsdbBeginTaskOnFileSet(STsdb *tsdb, int32_t fid, STFileSet **fset);
|
||||
int32_t tsdbFinishTaskOnFileSet(STsdb *tsdb, int32_t fid);
|
||||
// utils
|
||||
int32_t save_fs(const TFileSetArray *arr, const char *fname);
|
||||
int32_t current_fname(STsdb *pTsdb, char *fname, EFCurrentT ftype);
|
||||
void current_fname(STsdb *pTsdb, char *fname, EFCurrentT ftype);
|
||||
|
||||
/* Exposed Structs */
|
||||
struct STFileSystem {
|
||||
|
|
|
@ -27,13 +27,12 @@ int32_t tsdbSttLvlInit(int32_t level, SSttLvl **lvl) {
|
|||
|
||||
static void tsdbSttLvlClearFObj(void *data) { TAOS_UNUSED(tsdbTFileObjUnref(*(STFileObj **)data)); }
|
||||
|
||||
int32_t tsdbSttLvlClear(SSttLvl **lvl) {
|
||||
void tsdbSttLvlClear(SSttLvl **lvl) {
|
||||
if (lvl[0] != NULL) {
|
||||
TARRAY2_DESTROY(lvl[0]->fobjArr, tsdbSttLvlClearFObj);
|
||||
taosMemoryFree(lvl[0]);
|
||||
lvl[0] = NULL;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t tsdbSttLvlInitEx(STsdb *pTsdb, const SSttLvl *lvl1, SSttLvl **lvl) {
|
||||
|
@ -45,13 +44,13 @@ static int32_t tsdbSttLvlInitEx(STsdb *pTsdb, const SSttLvl *lvl1, SSttLvl **lvl
|
|||
STFileObj *fobj;
|
||||
code = tsdbTFileObjInit(pTsdb, fobj1->f, &fobj);
|
||||
if (code) {
|
||||
(void)tsdbSttLvlClear(lvl);
|
||||
tsdbSttLvlClear(lvl);
|
||||
return code;
|
||||
}
|
||||
|
||||
code = TARRAY2_APPEND(lvl[0]->fobjArr, fobj);
|
||||
if (code) {
|
||||
(void)tsdbSttLvlClear(lvl);
|
||||
tsdbSttLvlClear(lvl);
|
||||
taosMemoryFree(fobj);
|
||||
return code;
|
||||
}
|
||||
|
@ -83,7 +82,7 @@ static int32_t tsdbSttLvlFilteredInitEx(STsdb *pTsdb, const SSttLvl *lvl1, int64
|
|||
STFileObj *fobj;
|
||||
code = tsdbTFileObjInit(pTsdb, fobj1->f, &fobj);
|
||||
if (code) {
|
||||
(void)tsdbSttLvlClear(lvl);
|
||||
tsdbSttLvlClear(lvl);
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -204,7 +203,7 @@ static int32_t tsdbJsonToSttLvl(STsdb *pTsdb, const cJSON *json, SSttLvl **lvl)
|
|||
|
||||
item1 = cJSON_GetObjectItem(json, "files");
|
||||
if (!cJSON_IsArray(item1)) {
|
||||
(void)tsdbSttLvlClear(lvl);
|
||||
tsdbSttLvlClear(lvl);
|
||||
return TSDB_CODE_FILE_CORRUPTED;
|
||||
}
|
||||
|
||||
|
@ -212,14 +211,14 @@ static int32_t tsdbJsonToSttLvl(STsdb *pTsdb, const cJSON *json, SSttLvl **lvl)
|
|||
STFile tf;
|
||||
code = tsdbJsonToTFile(item2, TSDB_FTYPE_STT, &tf);
|
||||
if (code) {
|
||||
(void)tsdbSttLvlClear(lvl);
|
||||
tsdbSttLvlClear(lvl);
|
||||
return code;
|
||||
}
|
||||
|
||||
STFileObj *fobj;
|
||||
code = tsdbTFileObjInit(pTsdb, &tf, &fobj);
|
||||
if (code) {
|
||||
(void)tsdbSttLvlClear(lvl);
|
||||
tsdbSttLvlClear(lvl);
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -605,13 +604,12 @@ int32_t tsdbTFileSetRangeClear(STFileSetRange **fsr) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t tsdbTFileSetRangeArrayDestroy(TFileSetRangeArray **ppArr) {
|
||||
void tsdbTFileSetRangeArrayDestroy(TFileSetRangeArray **ppArr) {
|
||||
if (ppArr && ppArr[0]) {
|
||||
TARRAY2_DESTROY(ppArr[0], tsdbTFileSetRangeClear);
|
||||
taosMemoryFree(ppArr[0]);
|
||||
ppArr[0] = NULL;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
void tsdbTFileSetClear(STFileSet **fset) {
|
||||
|
|
|
@ -57,7 +57,6 @@ int32_t tsdbJsonToTFileSet(STsdb *pTsdb, const cJSON *json, STFileSet **fset);
|
|||
// cmpr
|
||||
int32_t tsdbTFileSetCmprFn(const STFileSet **fset1, const STFileSet **fset2);
|
||||
// edit
|
||||
int32_t tsdbSttLvlClear(SSttLvl **lvl);
|
||||
int32_t tsdbTFileSetEdit(STsdb *pTsdb, STFileSet *fset, const STFileOp *op);
|
||||
int32_t tsdbTFileSetApplyEdit(STsdb *pTsdb, const STFileSet *fset1, STFileSet *fset);
|
||||
// max commit id
|
||||
|
@ -68,7 +67,7 @@ SSttLvl *tsdbTFileSetGetSttLvl(STFileSet *fset, int32_t level);
|
|||
bool tsdbTFileSetIsEmpty(const STFileSet *fset);
|
||||
// stt
|
||||
int32_t tsdbSttLvlInit(int32_t level, SSttLvl **lvl);
|
||||
int32_t tsdbSttLvlClear(SSttLvl **lvl);
|
||||
void tsdbSttLvlClear(SSttLvl **lvl);
|
||||
// open channel
|
||||
int32_t tsdbTFileSetOpenChannel(STFileSet *fset);
|
||||
|
||||
|
|
|
@ -43,10 +43,14 @@ static const struct {
|
|||
[TSDB_FTYPE_STT] = {"stt", stt_to_json, stt_from_json},
|
||||
};
|
||||
|
||||
void remove_file(const char *fname) {
|
||||
(void)taosRemoveFile(fname);
|
||||
void tsdbRemoveFile(const char *fname) {
|
||||
int32_t code = taosRemoveFile(fname);
|
||||
if (code) {
|
||||
tsdbError("failed to remove file:%s, code:%d, error:%s", fname, code, tstrerror(code));
|
||||
} else {
|
||||
tsdbInfo("file:%s is removed", fname);
|
||||
}
|
||||
}
|
||||
|
||||
static int32_t tfile_to_json(const STFile *file, cJSON *json) {
|
||||
/* did.level */
|
||||
|
@ -269,7 +273,7 @@ int32_t tsdbTFileObjUnref(STFileObj *fobj) {
|
|||
tsdbTrace("unref file %s, fobj:%p ref %d", fobj->fname, fobj, nRef);
|
||||
if (nRef == 0) {
|
||||
if (fobj->state == TSDB_FSTATE_DEAD) {
|
||||
remove_file(fobj->fname);
|
||||
tsdbRemoveFile(fobj->fname);
|
||||
}
|
||||
taosMemoryFree(fobj);
|
||||
}
|
||||
|
@ -279,7 +283,7 @@ int32_t tsdbTFileObjUnref(STFileObj *fobj) {
|
|||
|
||||
static void tsdbTFileObjRemoveLC(STFileObj *fobj, bool remove_all) {
|
||||
if (fobj->f->type != TSDB_FTYPE_DATA || fobj->f->lcn < 1) {
|
||||
remove_file(fobj->fname);
|
||||
tsdbRemoveFile(fobj->fname);
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -295,7 +299,7 @@ static void tsdbTFileObjRemoveLC(STFileObj *fobj, bool remove_all) {
|
|||
}
|
||||
snprintf(dot + 1, TSDB_FQDN_LEN - (dot + 1 - lc_path), "%d.data", fobj->f->lcn);
|
||||
|
||||
remove_file(lc_path);
|
||||
tsdbRemoveFile(lc_path);
|
||||
|
||||
} else {
|
||||
// delete by data file prefix
|
||||
|
@ -324,7 +328,7 @@ static void tsdbTFileObjRemoveLC(STFileObj *fobj, bool remove_all) {
|
|||
}
|
||||
snprintf(dot + 1, TSDB_FQDN_LEN - (dot + 1 - lc_path), "%d.data", fobj->f->lcn);
|
||||
|
||||
remove_file(lc_path);
|
||||
tsdbRemoveFile(lc_path);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -430,7 +430,7 @@ static int32_t tsdbMemTombIterOpen(STsdbIter *iter) {
|
|||
}
|
||||
|
||||
static int32_t tsdbDataIterClose(STsdbIter *iter) {
|
||||
(void)tBrinBlockDestroy(iter->dataData->brinBlock);
|
||||
tBrinBlockDestroy(iter->dataData->brinBlock);
|
||||
tBlockDataDestroy(iter->dataData->blockData);
|
||||
return 0;
|
||||
}
|
||||
|
@ -699,12 +699,11 @@ int32_t tsdbIterMergerOpen(const TTsdbIterArray *iterArray, SIterMerger **merger
|
|||
return tsdbIterMergerNext(merger[0]);
|
||||
}
|
||||
|
||||
int32_t tsdbIterMergerClose(SIterMerger **merger) {
|
||||
void tsdbIterMergerClose(SIterMerger **merger) {
|
||||
if (merger[0]) {
|
||||
taosMemoryFree(merger[0]);
|
||||
merger[0] = NULL;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tsdbIterMergerNext(SIterMerger *merger) {
|
||||
|
|
|
@ -59,7 +59,7 @@ int32_t tsdbIterNext(STsdbIter *iter);
|
|||
|
||||
// SIterMerger ===============
|
||||
int32_t tsdbIterMergerOpen(const TTsdbIterArray *iterArray, SIterMerger **merger, bool isTomb);
|
||||
int32_t tsdbIterMergerClose(SIterMerger **merger);
|
||||
void tsdbIterMergerClose(SIterMerger **merger);
|
||||
int32_t tsdbIterMergerNext(SIterMerger *merger);
|
||||
int32_t tsdbIterMergerSkipTableData(SIterMerger *merger, const TABLEID *tbid);
|
||||
|
||||
|
|
|
@ -731,15 +731,13 @@ int32_t tsdbRefMemTable(SMemTable *pMemTable, SQueryNode *pQNode) {
|
|||
tsdbError("vgId:%d, memtable ref count is invalid, ref:%d", TD_VID(pMemTable->pTsdb->pVnode), nRef);
|
||||
}
|
||||
|
||||
(void)vnodeBufPoolRegisterQuery(pMemTable->pPool, pQNode);
|
||||
vnodeBufPoolRegisterQuery(pMemTable->pPool, pQNode);
|
||||
|
||||
_exit:
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tsdbUnrefMemTable(SMemTable *pMemTable, SQueryNode *pNode, bool proactive) {
|
||||
int32_t code = 0;
|
||||
|
||||
void tsdbUnrefMemTable(SMemTable *pMemTable, SQueryNode *pNode, bool proactive) {
|
||||
if (pNode) {
|
||||
vnodeBufPoolDeregisterQuery(pMemTable->pPool, pNode, proactive);
|
||||
}
|
||||
|
@ -747,8 +745,6 @@ int32_t tsdbUnrefMemTable(SMemTable *pMemTable, SQueryNode *pNode, bool proactiv
|
|||
if (atomic_sub_fetch_32(&pMemTable->nRef, 1) == 0) {
|
||||
tsdbMemTableDestroy(pMemTable, proactive);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static FORCE_INLINE int32_t tbDataPCmprFn(const void *p1, const void *p2) {
|
||||
|
|
|
@ -339,9 +339,9 @@ static int32_t tsdbMergeFileSetEndCloseWriter(SMerger *merger) {
|
|||
}
|
||||
|
||||
static int32_t tsdbMergeFileSetEndCloseIter(SMerger *merger) {
|
||||
(void)tsdbIterMergerClose(&merger->tombIterMerger);
|
||||
tsdbIterMergerClose(&merger->tombIterMerger);
|
||||
TARRAY2_CLEAR(merger->tombIterArr, tsdbIterClose);
|
||||
(void)tsdbIterMergerClose(&merger->dataIterMerger);
|
||||
tsdbIterMergerClose(&merger->dataIterMerger);
|
||||
TARRAY2_CLEAR(merger->dataIterArr, tsdbIterClose);
|
||||
return 0;
|
||||
}
|
||||
|
@ -463,7 +463,7 @@ static int32_t tsdbMergeGetFSet(SMerger *merger) {
|
|||
STFileSet *fset;
|
||||
|
||||
(void)taosThreadMutexLock(&merger->tsdb->mutex);
|
||||
(void)tsdbFSGetFSet(merger->tsdb->pFS, merger->fid, &fset);
|
||||
tsdbFSGetFSet(merger->tsdb->pFS, merger->fid, &fset);
|
||||
if (fset == NULL) {
|
||||
(void)taosThreadMutexUnlock(&merger->tsdb->mutex);
|
||||
return 0;
|
||||
|
|
|
@ -537,7 +537,7 @@ static int32_t loadSttStatisticsBlockData(SSttFileReader *pSttFileReader, SSttBl
|
|||
}
|
||||
|
||||
_end:
|
||||
ret = tStatisBlockDestroy(&block);
|
||||
tStatisBlockDestroy(&block);
|
||||
if (code != 0) {
|
||||
tsdbError("%s error happens at:%s line number: %d, code:%s", id, __func__, lino, tstrerror(code));
|
||||
} else {
|
||||
|
|
|
@ -19,7 +19,7 @@
|
|||
extern int32_t tsdbOpenCompMonitor(STsdb *tsdb);
|
||||
extern int32_t tsdbCloseCompMonitor(STsdb *tsdb);
|
||||
|
||||
int32_t tsdbSetKeepCfg(STsdb *pTsdb, STsdbCfg *pCfg) {
|
||||
void tsdbSetKeepCfg(STsdb *pTsdb, STsdbCfg *pCfg) {
|
||||
STsdbKeepCfg *pKeepCfg = &pTsdb->keepCfg;
|
||||
pKeepCfg->precision = pCfg->precision;
|
||||
pKeepCfg->days = pCfg->days;
|
||||
|
@ -27,7 +27,6 @@ int32_t tsdbSetKeepCfg(STsdb *pTsdb, STsdbCfg *pCfg) {
|
|||
pKeepCfg->keep1 = pCfg->keep1;
|
||||
pKeepCfg->keep2 = pCfg->keep2;
|
||||
pKeepCfg->keepTimeOffset = pCfg->keepTimeOffset;
|
||||
return 0;
|
||||
}
|
||||
|
||||
int64_t tsdbGetEarliestTs(STsdb *pTsdb) {
|
||||
|
@ -59,7 +58,7 @@ int32_t tsdbOpen(SVnode *pVnode, STsdb **ppTsdb, const char *dir, STsdbKeepCfg *
|
|||
pTsdb->pVnode = pVnode;
|
||||
(void)taosThreadMutexInit(&pTsdb->mutex, NULL);
|
||||
if (!pKeepCfg) {
|
||||
(void)tsdbSetKeepCfg(pTsdb, &pVnode->config.tsdbCfg);
|
||||
tsdbSetKeepCfg(pTsdb, &pVnode->config.tsdbCfg);
|
||||
} else {
|
||||
memcpy(&pTsdb->keepCfg, pKeepCfg, sizeof(STsdbKeepCfg));
|
||||
}
|
||||
|
|
|
@ -308,7 +308,7 @@ static int32_t filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader, bo
|
|||
|
||||
while (1) {
|
||||
if (pReader->pFileReader != NULL) {
|
||||
(void) tsdbDataFileReaderClose(&pReader->pFileReader);
|
||||
tsdbDataFileReaderClose(&pReader->pFileReader);
|
||||
}
|
||||
|
||||
pReader->status.pCurrentFileset = pIter->pFilesetList->data[pIter->index];
|
||||
|
@ -4861,7 +4861,7 @@ void tsdbReaderClose2(STsdbReader* pReader) {
|
|||
clearBlockScanInfoBuf(&pReader->blockInfoBuf);
|
||||
|
||||
if (pReader->pFileReader != NULL) {
|
||||
(void) tsdbDataFileReaderClose(&pReader->pFileReader);
|
||||
tsdbDataFileReaderClose(&pReader->pFileReader);
|
||||
}
|
||||
|
||||
SReadCostSummary* pCost = &pReader->cost;
|
||||
|
@ -4915,7 +4915,7 @@ static int32_t doSuspendCurrentReader(STsdbReader* pCurrentReader) {
|
|||
SReaderStatus* pStatus = &pCurrentReader->status;
|
||||
|
||||
if (pStatus->loadFromFile) {
|
||||
(void) tsdbDataFileReaderClose(&pCurrentReader->pFileReader);
|
||||
tsdbDataFileReaderClose(&pCurrentReader->pFileReader);
|
||||
|
||||
SReadCostSummary* pCost = &pCurrentReader->cost;
|
||||
destroySttBlockReader(pStatus->pLDataIterArray, &pCost->sttCost);
|
||||
|
@ -5563,7 +5563,7 @@ int32_t tsdbReaderReset2(STsdbReader* pReader, SQueryTableDataCond* pCond) {
|
|||
memset(&pReader->suppInfo.tsColAgg, 0, sizeof(SColumnDataAgg));
|
||||
|
||||
pReader->suppInfo.tsColAgg.colId = PRIMARYKEY_TIMESTAMP_COL_ID;
|
||||
(void) tsdbDataFileReaderClose(&pReader->pFileReader);
|
||||
tsdbDataFileReaderClose(&pReader->pFileReader);
|
||||
|
||||
int32_t numOfTables = tSimpleHashGetSize(pStatus->pTableMap);
|
||||
|
||||
|
@ -5906,7 +5906,7 @@ int32_t tsdbTakeReadSnap2(STsdbReader* pReader, _query_reseek_func_t reseek, STs
|
|||
code = terrno;
|
||||
|
||||
if (pTsdb->mem && pSnap->pNode) {
|
||||
(void) tsdbUnrefMemTable(pTsdb->mem, pSnap->pNode, true); // unref the previous refed mem
|
||||
tsdbUnrefMemTable(pTsdb->mem, pSnap->pNode, true); // unref the previous refed mem
|
||||
}
|
||||
|
||||
(void) taosThreadMutexUnlock(&pTsdb->mutex);
|
||||
|
@ -5924,11 +5924,11 @@ int32_t tsdbTakeReadSnap2(STsdbReader* pReader, _query_reseek_func_t reseek, STs
|
|||
code = tsdbFSCreateRefSnapshotWithoutLock(pTsdb->pFS, &pSnap->pfSetArray);
|
||||
if (code) {
|
||||
if (pSnap->pNode) {
|
||||
(void) tsdbUnrefMemTable(pTsdb->mem, pSnap->pNode, true); // unref the previous refed mem
|
||||
tsdbUnrefMemTable(pTsdb->mem, pSnap->pNode, true); // unref the previous refed mem
|
||||
}
|
||||
|
||||
if (pSnap->pINode) {
|
||||
(void) tsdbUnrefMemTable(pTsdb->imem, pSnap->pINode, true);
|
||||
tsdbUnrefMemTable(pTsdb->imem, pSnap->pINode, true);
|
||||
}
|
||||
|
||||
(void) taosThreadMutexUnlock(&pTsdb->mutex);
|
||||
|
@ -5957,11 +5957,11 @@ void tsdbUntakeReadSnap2(STsdbReader* pReader, STsdbReadSnap* pSnap, bool proact
|
|||
|
||||
if (pSnap) {
|
||||
if (pSnap->pMem) {
|
||||
(void) tsdbUnrefMemTable(pSnap->pMem, pSnap->pNode, proactive);
|
||||
tsdbUnrefMemTable(pSnap->pMem, pSnap->pNode, proactive);
|
||||
}
|
||||
|
||||
if (pSnap->pIMem) {
|
||||
(void) tsdbUnrefMemTable(pSnap->pIMem, pSnap->pINode, proactive);
|
||||
tsdbUnrefMemTable(pSnap->pIMem, pSnap->pINode, proactive);
|
||||
}
|
||||
|
||||
if (pSnap->pNode) taosMemoryFree(pSnap->pNode);
|
||||
|
|
|
@ -490,7 +490,7 @@ int32_t getNextBrinRecord(SBrinRecordIter* pIter, SBrinRecord** pRecord) {
|
|||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
|
||||
(void)tBrinBlockClear(&pIter->block);
|
||||
tBrinBlockClear(&pIter->block);
|
||||
int32_t code = tsdbDataFileReadBrinBlock(pIter->pReader, pIter->pCurrentBlk, &pIter->block);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tsdbError("failed to read brinBlock from file, code:%s", tstrerror(code));
|
||||
|
@ -507,7 +507,7 @@ int32_t getNextBrinRecord(SBrinRecordIter* pIter, SBrinRecord** pRecord) {
|
|||
return code;
|
||||
}
|
||||
|
||||
void clearBrinBlockIter(SBrinRecordIter* pIter) { (void)tBrinBlockDestroy(&pIter->block); }
|
||||
void clearBrinBlockIter(SBrinRecordIter* pIter) { tBrinBlockDestroy(&pIter->block); }
|
||||
|
||||
// initialize the file block access order
|
||||
// sort the file blocks according to the offset of each data block in the files
|
||||
|
@ -1038,7 +1038,7 @@ int32_t getNumOfRowsInSttBlock(SSttFileReader* pSttFileReader, SSttBlockLoadInfo
|
|||
}
|
||||
|
||||
if (index >= pStatisBlock->numOfRecords) {
|
||||
code = tStatisBlockDestroy(pStatisBlock);
|
||||
tStatisBlockDestroy(pStatisBlock);
|
||||
taosMemoryFreeClear(pStatisBlock);
|
||||
*pNumOfRows = num;
|
||||
return code;
|
||||
|
@ -1049,7 +1049,7 @@ int32_t getNumOfRowsInSttBlock(SSttFileReader* pSttFileReader, SSttBlockLoadInfo
|
|||
while (i < TARRAY2_SIZE(pStatisBlkArray) && uidIndex < numOfTables) {
|
||||
p = &pStatisBlkArray->data[i];
|
||||
if (p->minTbid.suid > suid) {
|
||||
code = tStatisBlockDestroy(pStatisBlock);
|
||||
tStatisBlockDestroy(pStatisBlock);
|
||||
taosMemoryFreeClear(pStatisBlock);
|
||||
*pNumOfRows = num;
|
||||
return code;
|
||||
|
@ -1072,7 +1072,7 @@ int32_t getNumOfRowsInSttBlock(SSttFileReader* pSttFileReader, SSttBlockLoadInfo
|
|||
}
|
||||
}
|
||||
|
||||
int32_t ret = tStatisBlockDestroy(pStatisBlock);
|
||||
tStatisBlockDestroy(pStatisBlock);
|
||||
taosMemoryFreeClear(pStatisBlock);
|
||||
*pNumOfRows = num;
|
||||
return code;
|
||||
|
@ -1085,7 +1085,8 @@ _err:
|
|||
|
||||
// load next stt statistics block
|
||||
static int32_t loadNextStatisticsBlock(SSttFileReader* pSttFileReader, STbStatisBlock* pStatisBlock,
|
||||
const TStatisBlkArray* pStatisBlkArray, int32_t numOfRows, int32_t* i, int32_t* j) {
|
||||
const TStatisBlkArray* pStatisBlkArray, int32_t numOfRows, int32_t* i,
|
||||
int32_t* j) {
|
||||
if ((*j) >= numOfRows) {
|
||||
(*i) += 1;
|
||||
(*j) = 0;
|
||||
|
|
|
@ -125,9 +125,12 @@ void tsdbCloseFile(STsdbFD **ppFD) {
|
|||
STsdbFD *pFD = *ppFD;
|
||||
if (pFD) {
|
||||
taosMemoryFree(pFD->pBuf);
|
||||
// if (!pFD->s3File) {
|
||||
(void)taosCloseFile(&pFD->pFD);
|
||||
//}
|
||||
int32_t code = taosCloseFile(&pFD->pFD);
|
||||
if (code) {
|
||||
tsdbError("failed to close file: %s, code:%d reason:%s", pFD->path, code, tstrerror(code));
|
||||
} else {
|
||||
tsdbTrace("close file: %s", pFD->path);
|
||||
}
|
||||
taosMemoryFree(pFD);
|
||||
*ppFD = NULL;
|
||||
}
|
||||
|
|
|
@ -191,7 +191,7 @@ int32_t tsdbFSetPartListToRangeDiff(STsdbFSetPartList* pList, TFileSetRangeArray
|
|||
|
||||
_err:
|
||||
if (pDiff) {
|
||||
(void)tsdbTFileSetRangeArrayDestroy(&pDiff);
|
||||
tsdbTFileSetRangeArrayDestroy(&pDiff);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -59,7 +59,7 @@ struct STsdbSnapReader {
|
|||
|
||||
static int32_t tsdbSnapReadFileSetCloseReader(STsdbSnapReader* reader) {
|
||||
TARRAY2_CLEAR(reader->sttReaderArr, tsdbSttFileReaderClose);
|
||||
TAOS_UNUSED(tsdbDataFileReaderClose(&reader->dataReader));
|
||||
tsdbDataFileReaderClose(&reader->dataReader);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -191,8 +191,8 @@ _exit:
|
|||
}
|
||||
|
||||
static int32_t tsdbSnapReadFileSetCloseIter(STsdbSnapReader* reader) {
|
||||
TAOS_UNUSED(tsdbIterMergerClose(&reader->dataIterMerger));
|
||||
TAOS_UNUSED(tsdbIterMergerClose(&reader->tombIterMerger));
|
||||
tsdbIterMergerClose(&reader->dataIterMerger);
|
||||
tsdbIterMergerClose(&reader->tombIterMerger);
|
||||
TARRAY2_CLEAR(reader->dataIterArr, tsdbIterClose);
|
||||
TARRAY2_CLEAR(reader->tombIterArr, tsdbIterClose);
|
||||
return 0;
|
||||
|
@ -430,7 +430,7 @@ _exit:
|
|||
if (code) {
|
||||
tsdbError("vgId:%d %s failed at %s:%d since %s, sver:%" PRId64 " ever:%" PRId64 " type:%d", TD_VID(tsdb->pVnode),
|
||||
__func__, __FILE__, lino, tstrerror(code), sver, ever, type);
|
||||
TAOS_UNUSED(tsdbTFileSetRangeArrayDestroy(&reader[0]->fsrArr));
|
||||
tsdbTFileSetRangeArrayDestroy(&reader[0]->fsrArr);
|
||||
taosMemoryFree(reader[0]);
|
||||
reader[0] = NULL;
|
||||
} else {
|
||||
|
@ -452,14 +452,14 @@ int32_t tsdbSnapReaderClose(STsdbSnapReader** reader) {
|
|||
TAOS_UNUSED(tTombBlockDestroy(reader[0]->tombBlock));
|
||||
tBlockDataDestroy(reader[0]->blockData);
|
||||
|
||||
TAOS_UNUSED(tsdbIterMergerClose(&reader[0]->dataIterMerger));
|
||||
TAOS_UNUSED(tsdbIterMergerClose(&reader[0]->tombIterMerger));
|
||||
tsdbIterMergerClose(&reader[0]->dataIterMerger);
|
||||
tsdbIterMergerClose(&reader[0]->tombIterMerger);
|
||||
TARRAY2_DESTROY(reader[0]->dataIterArr, tsdbIterClose);
|
||||
TARRAY2_DESTROY(reader[0]->tombIterArr, tsdbIterClose);
|
||||
TARRAY2_DESTROY(reader[0]->sttReaderArr, tsdbSttFileReaderClose);
|
||||
TAOS_UNUSED(tsdbDataFileReaderClose(&reader[0]->dataReader));
|
||||
tsdbDataFileReaderClose(&reader[0]->dataReader);
|
||||
|
||||
TAOS_UNUSED(tsdbFSDestroyRefRangedSnapshot(&reader[0]->fsrArr));
|
||||
tsdbFSDestroyRefRangedSnapshot(&reader[0]->fsrArr);
|
||||
tDestroyTSchema(reader[0]->skmTb->pTSchema);
|
||||
|
||||
for (int32_t i = 0; i < ARRAY_SIZE(reader[0]->buffers); ++i) {
|
||||
|
@ -691,7 +691,7 @@ _exit:
|
|||
|
||||
static int32_t tsdbSnapWriteFileSetCloseReader(STsdbSnapWriter* writer) {
|
||||
TARRAY2_CLEAR(writer->ctx->sttReaderArr, tsdbSttFileReaderClose);
|
||||
TAOS_UNUSED(tsdbDataFileReaderClose(&writer->ctx->dataReader));
|
||||
tsdbDataFileReaderClose(&writer->ctx->dataReader);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -767,8 +767,8 @@ _exit:
|
|||
}
|
||||
|
||||
static int32_t tsdbSnapWriteFileSetCloseIter(STsdbSnapWriter* writer) {
|
||||
TAOS_UNUSED(tsdbIterMergerClose(&writer->ctx->dataIterMerger));
|
||||
TAOS_UNUSED(tsdbIterMergerClose(&writer->ctx->tombIterMerger));
|
||||
tsdbIterMergerClose(&writer->ctx->dataIterMerger);
|
||||
tsdbIterMergerClose(&writer->ctx->tombIterMerger);
|
||||
TARRAY2_CLEAR(writer->ctx->dataIterArr, tsdbIterClose);
|
||||
TARRAY2_CLEAR(writer->ctx->tombIterArr, tsdbIterClose);
|
||||
return 0;
|
||||
|
@ -1119,12 +1119,12 @@ int32_t tsdbSnapWriterClose(STsdbSnapWriter** writer, int8_t rollback) {
|
|||
(void)taosThreadMutexUnlock(&writer[0]->tsdb->mutex);
|
||||
}
|
||||
|
||||
TAOS_UNUSED(tsdbIterMergerClose(&writer[0]->ctx->tombIterMerger));
|
||||
TAOS_UNUSED(tsdbIterMergerClose(&writer[0]->ctx->dataIterMerger));
|
||||
tsdbIterMergerClose(&writer[0]->ctx->tombIterMerger);
|
||||
tsdbIterMergerClose(&writer[0]->ctx->dataIterMerger);
|
||||
TARRAY2_DESTROY(writer[0]->ctx->tombIterArr, tsdbIterClose);
|
||||
TARRAY2_DESTROY(writer[0]->ctx->dataIterArr, tsdbIterClose);
|
||||
TARRAY2_DESTROY(writer[0]->ctx->sttReaderArr, tsdbSttFileReaderClose);
|
||||
TAOS_UNUSED(tsdbDataFileReaderClose(&writer[0]->ctx->dataReader));
|
||||
tsdbDataFileReaderClose(&writer[0]->ctx->dataReader);
|
||||
|
||||
TARRAY2_DESTROY(writer[0]->fopArr, NULL);
|
||||
TAOS_UNUSED(tsdbFSDestroyCopyRangedSnapshot(&writer[0]->fsetArr));
|
||||
|
|
|
@ -524,7 +524,7 @@ static int32_t tsdbFileDoWriteSttBlockData(STsdbFD *fd, SBlockData *blockData, S
|
|||
if (sttBlk->maxVer < blockData->aVersion[iRow]) sttBlk->maxVer = blockData->aVersion[iRow];
|
||||
}
|
||||
|
||||
(void)tsdbWriterUpdVerRange(range, sttBlk->minVer, sttBlk->maxVer);
|
||||
tsdbWriterUpdVerRange(range, sttBlk->minVer, sttBlk->maxVer);
|
||||
TAOS_CHECK_RETURN(tBlockDataCompress(blockData, info, buffers, buffers + 4));
|
||||
|
||||
sttBlk->bInfo.offset = *fileSize;
|
||||
|
@ -837,7 +837,7 @@ static void tsdbSttFWriterDoClose(SSttFileWriter *writer) {
|
|||
tDestroyTSchema(writer->skmRow->pTSchema);
|
||||
tDestroyTSchema(writer->skmTb->pTSchema);
|
||||
tTombBlockDestroy(writer->tombBlock);
|
||||
(void)tStatisBlockDestroy(writer->staticBlock);
|
||||
tStatisBlockDestroy(writer->staticBlock);
|
||||
tBlockDataDestroy(writer->blockData);
|
||||
TARRAY2_DESTROY(writer->tombBlkArray, NULL);
|
||||
TARRAY2_DESTROY(writer->statisBlkArray, NULL);
|
||||
|
@ -874,7 +874,7 @@ static int32_t tsdbSttFWriterCloseCommit(SSttFileWriter *writer, TFileOpArray *o
|
|||
.fid = writer->config->fid,
|
||||
.nf = writer->file[0],
|
||||
};
|
||||
(void)tsdbTFileUpdVerRange(&op.nf, writer->ctx->range);
|
||||
tsdbTFileUpdVerRange(&op.nf, writer->ctx->range);
|
||||
|
||||
TAOS_CHECK_GOTO(TARRAY2_APPEND(opArray, op), &lino, _exit);
|
||||
|
||||
|
@ -890,7 +890,7 @@ static int32_t tsdbSttFWriterCloseAbort(SSttFileWriter *writer) {
|
|||
char fname[TSDB_FILENAME_LEN];
|
||||
tsdbTFileName(writer->config->tsdb, writer->file, fname);
|
||||
tsdbCloseFile(&writer->fd);
|
||||
(void)taosRemoveFile(fname);
|
||||
tsdbRemoveFile(fname);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -23,8 +23,6 @@
|
|||
#include "tsdbDataFileRW.h"
|
||||
#include "tsdbFS2.h"
|
||||
#include "tsdbSttFileRW.h"
|
||||
// extern int32_t save_fs(const TFileSetArray *arr, const char *fname);
|
||||
// extern int32_t current_fname(STsdb *pTsdb, char *fname, EFCurrentT ftype);
|
||||
|
||||
static int32_t tsdbUpgradeHead(STsdb *tsdb, SDFileSet *pDFileSet, SDataFReader *reader, STFileSet *fset) {
|
||||
int32_t code = 0;
|
||||
|
@ -167,7 +165,7 @@ _exit:
|
|||
tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(tsdb->pVnode), __func__, __FILE__, lino, tstrerror(code));
|
||||
}
|
||||
TARRAY2_DESTROY(ctx->brinBlkArray, NULL);
|
||||
(void)tBrinBlockDestroy(ctx->brinBlock);
|
||||
tBrinBlockDestroy(ctx->brinBlock);
|
||||
tBlockDataDestroy(ctx->blockData);
|
||||
tMapDataClear(ctx->mDataBlk);
|
||||
taosArrayDestroy(ctx->aBlockIdx);
|
||||
|
@ -321,7 +319,7 @@ static int32_t tsdbUpgradeStt(STsdb *tsdb, SDFileSet *pDFileSet, SDataFReader *r
|
|||
if (TARRAY2_SIZE(lvl->fobjArr) > 0) {
|
||||
TAOS_CHECK_GOTO(TARRAY2_APPEND(fset->lvlArr, lvl), &lino, _exit);
|
||||
} else {
|
||||
(void)tsdbSttLvlClear(&lvl);
|
||||
tsdbSttLvlClear(&lvl);
|
||||
}
|
||||
|
||||
_exit:
|
||||
|
@ -612,7 +610,7 @@ static int32_t tsdbUpgradeFileSystem(STsdb *tsdb, int8_t rollback) {
|
|||
|
||||
// save new file system
|
||||
char fname[TSDB_FILENAME_LEN];
|
||||
(void)current_fname(tsdb, fname, TSDB_FCURRENT);
|
||||
current_fname(tsdb, fname, TSDB_FCURRENT);
|
||||
TAOS_CHECK_GOTO(save_fs(fileSetArray, fname), &lino, _exit);
|
||||
|
||||
_exit:
|
||||
|
@ -632,6 +630,6 @@ int32_t tsdbCheckAndUpgradeFileSystem(STsdb *tsdb, int8_t rollback) {
|
|||
|
||||
TAOS_CHECK_RETURN(tsdbUpgradeFileSystem(tsdb, rollback));
|
||||
|
||||
(void)taosRemoveFile(fname);
|
||||
tsdbRemoveFile(fname);
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -84,12 +84,12 @@ int32_t tStatisBlockInit(STbStatisBlock *statisBlock) {
|
|||
|
||||
_exit:
|
||||
if (code) {
|
||||
TAOS_UNUSED(tStatisBlockDestroy(statisBlock));
|
||||
tStatisBlockDestroy(statisBlock);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tStatisBlockDestroy(STbStatisBlock *statisBlock) {
|
||||
void tStatisBlockDestroy(STbStatisBlock *statisBlock) {
|
||||
statisBlock->numOfPKs = 0;
|
||||
statisBlock->numOfRecords = 0;
|
||||
for (int32_t i = 0; i < ARRAY_SIZE(statisBlock->buffers); ++i) {
|
||||
|
@ -99,7 +99,6 @@ int32_t tStatisBlockDestroy(STbStatisBlock *statisBlock) {
|
|||
TAOS_UNUSED(tValueColumnDestroy(&statisBlock->firstKeyPKs[i]));
|
||||
TAOS_UNUSED(tValueColumnDestroy(&statisBlock->lastKeyPKs[i]));
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tStatisBlockClear(STbStatisBlock *statisBlock) {
|
||||
|
@ -244,12 +243,12 @@ int32_t tBrinBlockInit(SBrinBlock *brinBlock) {
|
|||
|
||||
_exit:
|
||||
if (code) {
|
||||
(void)tBrinBlockDestroy(brinBlock);
|
||||
tBrinBlockDestroy(brinBlock);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tBrinBlockDestroy(SBrinBlock *brinBlock) {
|
||||
void tBrinBlockDestroy(SBrinBlock *brinBlock) {
|
||||
brinBlock->numOfPKs = 0;
|
||||
brinBlock->numOfRecords = 0;
|
||||
for (int32_t i = 0; i < ARRAY_SIZE(brinBlock->buffers); ++i) {
|
||||
|
@ -259,10 +258,9 @@ int32_t tBrinBlockDestroy(SBrinBlock *brinBlock) {
|
|||
TAOS_UNUSED(tValueColumnDestroy(&brinBlock->firstKeyPKs[i]));
|
||||
TAOS_UNUSED(tValueColumnDestroy(&brinBlock->lastKeyPKs[i]));
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tBrinBlockClear(SBrinBlock *brinBlock) {
|
||||
void tBrinBlockClear(SBrinBlock *brinBlock) {
|
||||
brinBlock->numOfPKs = 0;
|
||||
brinBlock->numOfRecords = 0;
|
||||
for (int32_t i = 0; i < ARRAY_SIZE(brinBlock->buffers); ++i) {
|
||||
|
@ -272,7 +270,6 @@ int32_t tBrinBlockClear(SBrinBlock *brinBlock) {
|
|||
TAOS_UNUSED(tValueColumnClear(&brinBlock->firstKeyPKs[i]));
|
||||
TAOS_UNUSED(tValueColumnClear(&brinBlock->lastKeyPKs[i]));
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tBrinBlockPut(SBrinBlock *brinBlock, const SBrinRecord *record) {
|
||||
|
|
|
@ -112,7 +112,7 @@ typedef struct {
|
|||
#define STATIS_BLOCK_SIZE(db) ((db)->numOfRecords)
|
||||
|
||||
int32_t tStatisBlockInit(STbStatisBlock *statisBlock);
|
||||
int32_t tStatisBlockDestroy(STbStatisBlock *statisBlock);
|
||||
void tStatisBlockDestroy(STbStatisBlock *statisBlock);
|
||||
int32_t tStatisBlockClear(STbStatisBlock *statisBlock);
|
||||
int32_t tStatisBlockPut(STbStatisBlock *statisBlock, SRowInfo *row, int32_t maxRecords);
|
||||
int32_t tStatisBlockGet(STbStatisBlock *statisBlock, int32_t idx, STbStatisRecord *record);
|
||||
|
@ -179,8 +179,8 @@ typedef TARRAY2(SBrinBlk) TBrinBlkArray;
|
|||
#define BRIN_BLOCK_SIZE(db) ((db)->numOfRecords)
|
||||
|
||||
int32_t tBrinBlockInit(SBrinBlock *brinBlock);
|
||||
int32_t tBrinBlockDestroy(SBrinBlock *brinBlock);
|
||||
int32_t tBrinBlockClear(SBrinBlock *brinBlock);
|
||||
void tBrinBlockDestroy(SBrinBlock *brinBlock);
|
||||
void tBrinBlockClear(SBrinBlock *brinBlock);
|
||||
int32_t tBrinBlockPut(SBrinBlock *brinBlock, const SBrinRecord *record);
|
||||
int32_t tBrinBlockGet(SBrinBlock *brinBlock, int32_t idx, SBrinRecord *record);
|
||||
|
||||
|
|
|
@ -286,7 +286,7 @@ _exit:
|
|||
return;
|
||||
}
|
||||
|
||||
int32_t vnodeBufPoolRegisterQuery(SVBufPool *pPool, SQueryNode *pQNode) {
|
||||
void vnodeBufPoolRegisterQuery(SVBufPool *pPool, SQueryNode *pQNode) {
|
||||
(void)taosThreadMutexLock(&pPool->mutex);
|
||||
|
||||
pQNode->pNext = pPool->qList.pNext;
|
||||
|
@ -296,7 +296,6 @@ int32_t vnodeBufPoolRegisterQuery(SVBufPool *pPool, SQueryNode *pQNode) {
|
|||
pPool->nQuery++;
|
||||
|
||||
(void)taosThreadMutexUnlock(&pPool->mutex);
|
||||
return 0;
|
||||
}
|
||||
|
||||
void vnodeBufPoolDeregisterQuery(SVBufPool *pPool, SQueryNode *pQNode, bool proactive) {
|
||||
|
|
|
@ -210,7 +210,7 @@ static void vnodeSnapReaderDestroyTsdbRanges(SVSnapReader *pReader) {
|
|||
for (int32_t j = 0; j < TSDB_RETENTION_MAX; ++j) {
|
||||
TFileSetRangeArray **ppRanges = vnodeSnapReaderGetTsdbRanges(pReader, tsdbTyps[j]);
|
||||
if (ppRanges == NULL) continue;
|
||||
(void)tsdbTFileSetRangeArrayDestroy(ppRanges);
|
||||
tsdbTFileSetRangeArrayDestroy(ppRanges);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -648,7 +648,7 @@ static void vnodeSnapWriterDestroyTsdbRanges(SVSnapWriter *pWriter) {
|
|||
for (int32_t j = 0; j < TSDB_RETENTION_MAX; ++j) {
|
||||
TFileSetRangeArray **ppRanges = vnodeSnapWriterGetTsdbRanges(pWriter, tsdbTyps[j]);
|
||||
if (ppRanges == NULL) continue;
|
||||
(void)tsdbTFileSetRangeArrayDestroy(ppRanges);
|
||||
tsdbTFileSetRangeArrayDestroy(ppRanges);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -754,8 +754,7 @@ int32_t vnodePreprocessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
|
|||
|
||||
int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
|
||||
vTrace("message in vnode query queue is processing");
|
||||
if ((pMsg->msgType == TDMT_SCH_QUERY || pMsg->msgType == TDMT_VND_TMQ_CONSUME) &&
|
||||
!syncIsReadyForRead(pVnode->sync)) {
|
||||
if ((pMsg->msgType == TDMT_SCH_QUERY || pMsg->msgType == TDMT_VND_TMQ_CONSUME) && !syncIsReadyForRead(pVnode->sync)) {
|
||||
vnodeRedirectRpcMsg(pVnode, pMsg, terrno);
|
||||
return 0;
|
||||
}
|
||||
|
@ -1477,7 +1476,8 @@ static int32_t vnodeDebugPrintSingleSubmitMsg(SMeta *pMeta, SSubmitBlk *pBlock,
|
|||
tInitSubmitBlkIter(msgIter, pBlock, &blkIter);
|
||||
if (blkIter.row == NULL) return 0;
|
||||
|
||||
int32_t code = metaGetTbTSchemaNotNull(pMeta, msgIter->suid, TD_ROW_SVER(blkIter.row), 1, &pSchema); // TODO: use the real schema
|
||||
int32_t code = metaGetTbTSchemaNotNull(pMeta, msgIter->suid, TD_ROW_SVER(blkIter.row), 1,
|
||||
&pSchema); // TODO: use the real schema
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
printf("%s:%d no valid schema\n", tags, __LINE__);
|
||||
return code;
|
||||
|
@ -2166,7 +2166,7 @@ static int32_t vnodeProcessAlterConfigReq(SVnode *pVnode, int64_t ver, void *pRe
|
|||
}
|
||||
|
||||
if (tsdbChanged) {
|
||||
(void)tsdbSetKeepCfg(pVnode->pTsdb, &pVnode->config.tsdbCfg);
|
||||
tsdbSetKeepCfg(pVnode->pTsdb, &pVnode->config.tsdbCfg);
|
||||
}
|
||||
|
||||
return 0;
|
||||
|
|
Loading…
Reference in New Issue