fix: remove useless code
This commit is contained in:
parent
aff8e1c9a6
commit
a7c8563c06
|
@ -256,18 +256,7 @@ void tsdbDelFileName(STsdb *pTsdb, SDelFile *pFile, char fname[]);
|
|||
// tsdbFS.c ==============================================================================================
|
||||
int32_t tsdbFSOpen(STsdb *pTsdb, int8_t rollback);
|
||||
int32_t tsdbFSClose(STsdb *pTsdb);
|
||||
int32_t tsdbFSCopy(STsdb *pTsdb, STsdbFS *pFS);
|
||||
void tsdbFSDestroy(STsdbFS *pFS);
|
||||
int32_t tDFileSetCmprFn(const void *p1, const void *p2);
|
||||
int32_t tsdbFSCommit(STsdb *pTsdb);
|
||||
int32_t tsdbFSRollback(STsdb *pTsdb);
|
||||
int32_t tsdbFSPrepareCommit(STsdb *pTsdb, STsdbFS *pFS);
|
||||
int32_t tsdbFSRef(STsdb *pTsdb, STsdbFS *pFS);
|
||||
void tsdbFSUnref(STsdb *pTsdb, STsdbFS *pFS);
|
||||
void tsdbGetCurrentFName(STsdb *pTsdb, char *current, char *current_t);
|
||||
|
||||
int32_t tsdbFSUpsertFSet(STsdbFS *pFS, SDFileSet *pSet);
|
||||
int32_t tsdbFSUpsertDelFile(STsdbFS *pFS, SDelFile *pDelFile);
|
||||
// tsdbReaderWriter.c ==============================================================================================
|
||||
// SDataFWriter
|
||||
int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pSet);
|
||||
|
@ -737,7 +726,6 @@ struct STsdbReadSnap {
|
|||
SMemTable *pIMem;
|
||||
SQueryNode *pINode;
|
||||
TFileSetArray *pfSetArray;
|
||||
STsdbFS fs;
|
||||
};
|
||||
|
||||
struct SDataFWriter {
|
||||
|
@ -796,16 +784,16 @@ typedef struct {
|
|||
} SSttTableRowsInfo;
|
||||
|
||||
typedef struct SSttBlockLoadInfo {
|
||||
SBlockDataInfo blockData[2]; // buffered block data
|
||||
SArray *aSttBlk;
|
||||
int32_t currentLoadBlockIndex;
|
||||
STSchema *pSchema;
|
||||
int16_t *colIds;
|
||||
int32_t numOfCols;
|
||||
bool checkRemainingRow; // todo: no assign value?
|
||||
bool isLast;
|
||||
bool sttBlockLoaded;
|
||||
SSttTableRowsInfo info;
|
||||
SBlockDataInfo blockData[2]; // buffered block data
|
||||
SArray *aSttBlk;
|
||||
int32_t currentLoadBlockIndex;
|
||||
STSchema *pSchema;
|
||||
int16_t *colIds;
|
||||
int32_t numOfCols;
|
||||
bool checkRemainingRow; // todo: no assign value?
|
||||
bool isLast;
|
||||
bool sttBlockLoaded;
|
||||
SSttTableRowsInfo info;
|
||||
SSttBlockLoadCostInfo cost;
|
||||
} SSttBlockLoadInfo;
|
||||
|
||||
|
@ -894,15 +882,15 @@ typedef struct {
|
|||
_load_tomb_fn loadTombFn;
|
||||
void *pReader;
|
||||
void *idstr;
|
||||
bool rspRows; // response the rows in stt-file, if possible
|
||||
bool rspRows; // response the rows in stt-file, if possible
|
||||
} SMergeTreeConf;
|
||||
|
||||
typedef struct SSttDataInfoForTable {
|
||||
SArray* pTimeWindowList;
|
||||
SArray *pTimeWindowList;
|
||||
int64_t numOfRows;
|
||||
} SSttDataInfoForTable;
|
||||
|
||||
int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf, SSttDataInfoForTable* pTableInfo);
|
||||
int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf, SSttDataInfoForTable *pTableInfo);
|
||||
void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter);
|
||||
bool tMergeTreeNext(SMergeTree *pMTree);
|
||||
void tMergeTreePinSttBlock(SMergeTree *pMTree);
|
||||
|
|
|
@ -95,7 +95,7 @@ typedef struct SQueryNode SQueryNode;
|
|||
#define VNODE_RSMA2_DIR "rsma2"
|
||||
#define VNODE_TQ_STREAM "stream"
|
||||
|
||||
#if SUSPEND_RESUME_TEST // only for test purpose
|
||||
#if SUSPEND_RESUME_TEST // only for test purpose
|
||||
#define VNODE_BUFPOOL_SEGMENTS 1
|
||||
#else
|
||||
#define VNODE_BUFPOOL_SEGMENTS 3
|
||||
|
@ -216,8 +216,6 @@ int32_t tsdbBegin(STsdb* pTsdb);
|
|||
int32_t tsdbCacheCommit(STsdb* pTsdb);
|
||||
int32_t tsdbCompact(STsdb* pTsdb, SCompactInfo* pInfo);
|
||||
int32_t tsdbRetention(STsdb* tsdb, int64_t now, int32_t sync);
|
||||
// int32_t tsdbFinishCommit(STsdb* pTsdb);
|
||||
// int32_t tsdbRollbackCommit(STsdb* pTsdb);
|
||||
int tsdbScanAndConvertSubmitMsg(STsdb* pTsdb, SSubmitReq2* pMsg);
|
||||
int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq2* pMsg, SSubmitRsp2* pRsp);
|
||||
int32_t tsdbInsertTableData(STsdb* pTsdb, int64_t version, SSubmitTbData* pSubmitTbData, int32_t* affectedRows);
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -702,7 +702,7 @@ _exit:
|
|||
}
|
||||
|
||||
// EXPOSED APIS ====================================================================================
|
||||
int32_t tsdbFSCommit(STsdb *pTsdb) {
|
||||
static int32_t tsdbFSCommit(STsdb *pTsdb) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
STsdbFS fs = {0};
|
||||
|
@ -738,7 +738,7 @@ _exit:
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t tsdbFSRollback(STsdb *pTsdb) {
|
||||
static int32_t tsdbFSRollback(STsdb *pTsdb) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
|
@ -833,312 +833,3 @@ int32_t tsdbFSClose(STsdb *pTsdb) {
|
|||
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tsdbFSCopy(STsdb *pTsdb, STsdbFS *pFS) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
pFS->pDelFile = NULL;
|
||||
if (pFS->aDFileSet) {
|
||||
taosArrayClear(pFS->aDFileSet);
|
||||
} else {
|
||||
pFS->aDFileSet = taosArrayInit(taosArrayGetSize(pTsdb->fs.aDFileSet), sizeof(SDFileSet));
|
||||
if (pFS->aDFileSet == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
}
|
||||
|
||||
if (pTsdb->fs.pDelFile) {
|
||||
pFS->pDelFile = (SDelFile *)taosMemoryMalloc(sizeof(SDelFile));
|
||||
if (pFS->pDelFile == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
*pFS->pDelFile = *pTsdb->fs.pDelFile;
|
||||
}
|
||||
|
||||
for (int32_t iSet = 0; iSet < taosArrayGetSize(pTsdb->fs.aDFileSet); iSet++) {
|
||||
SDFileSet *pSet = (SDFileSet *)taosArrayGet(pTsdb->fs.aDFileSet, iSet);
|
||||
SDFileSet fSet = {.diskId = pSet->diskId, .fid = pSet->fid};
|
||||
|
||||
// head
|
||||
fSet.pHeadF = (SHeadFile *)taosMemoryMalloc(sizeof(SHeadFile));
|
||||
if (fSet.pHeadF == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
*fSet.pHeadF = *pSet->pHeadF;
|
||||
|
||||
// data
|
||||
fSet.pDataF = (SDataFile *)taosMemoryMalloc(sizeof(SDataFile));
|
||||
if (fSet.pDataF == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
*fSet.pDataF = *pSet->pDataF;
|
||||
|
||||
// sma
|
||||
fSet.pSmaF = (SSmaFile *)taosMemoryMalloc(sizeof(SSmaFile));
|
||||
if (fSet.pSmaF == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
*fSet.pSmaF = *pSet->pSmaF;
|
||||
|
||||
// stt
|
||||
for (fSet.nSttF = 0; fSet.nSttF < pSet->nSttF; fSet.nSttF++) {
|
||||
fSet.aSttF[fSet.nSttF] = (SSttFile *)taosMemoryMalloc(sizeof(SSttFile));
|
||||
if (fSet.aSttF[fSet.nSttF] == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
*fSet.aSttF[fSet.nSttF] = *pSet->aSttF[fSet.nSttF];
|
||||
}
|
||||
|
||||
if (taosArrayPush(pFS->aDFileSet, &fSet) == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
}
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tsdbFSUpsertDelFile(STsdbFS *pFS, SDelFile *pDelFile) {
|
||||
int32_t code = 0;
|
||||
|
||||
if (pDelFile) {
|
||||
if (pFS->pDelFile == NULL) {
|
||||
pFS->pDelFile = (SDelFile *)taosMemoryMalloc(sizeof(SDelFile));
|
||||
if (pFS->pDelFile == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _exit;
|
||||
}
|
||||
}
|
||||
*pFS->pDelFile = *pDelFile;
|
||||
} else {
|
||||
if (pFS->pDelFile) {
|
||||
taosMemoryFree(pFS->pDelFile);
|
||||
pFS->pDelFile = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
_exit:
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tsdbFSUpsertFSet(STsdbFS *pFS, SDFileSet *pSet) {
|
||||
int32_t code = 0;
|
||||
int32_t idx = taosArraySearchIdx(pFS->aDFileSet, pSet, tDFileSetCmprFn, TD_GE);
|
||||
|
||||
if (idx < 0) {
|
||||
idx = taosArrayGetSize(pFS->aDFileSet);
|
||||
} else {
|
||||
SDFileSet *pDFileSet = (SDFileSet *)taosArrayGet(pFS->aDFileSet, idx);
|
||||
int32_t c = tDFileSetCmprFn(pSet, pDFileSet);
|
||||
if (c == 0) {
|
||||
*pDFileSet->pHeadF = *pSet->pHeadF;
|
||||
*pDFileSet->pDataF = *pSet->pDataF;
|
||||
*pDFileSet->pSmaF = *pSet->pSmaF;
|
||||
// stt
|
||||
if (pSet->nSttF > pDFileSet->nSttF) {
|
||||
ASSERT(pSet->nSttF == pDFileSet->nSttF + 1);
|
||||
|
||||
pDFileSet->aSttF[pDFileSet->nSttF] = (SSttFile *)taosMemoryMalloc(sizeof(SSttFile));
|
||||
if (pDFileSet->aSttF[pDFileSet->nSttF] == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _exit;
|
||||
}
|
||||
*pDFileSet->aSttF[pDFileSet->nSttF] = *pSet->aSttF[pSet->nSttF - 1];
|
||||
pDFileSet->nSttF++;
|
||||
} else if (pSet->nSttF < pDFileSet->nSttF) {
|
||||
ASSERT(pSet->nSttF == 1);
|
||||
for (int32_t iStt = 1; iStt < pDFileSet->nSttF; iStt++) {
|
||||
taosMemoryFree(pDFileSet->aSttF[iStt]);
|
||||
}
|
||||
|
||||
*pDFileSet->aSttF[0] = *pSet->aSttF[0];
|
||||
pDFileSet->nSttF = 1;
|
||||
} else {
|
||||
for (int32_t iStt = 0; iStt < pSet->nSttF; iStt++) {
|
||||
*pDFileSet->aSttF[iStt] = *pSet->aSttF[iStt];
|
||||
}
|
||||
}
|
||||
|
||||
pDFileSet->diskId = pSet->diskId;
|
||||
goto _exit;
|
||||
}
|
||||
}
|
||||
|
||||
ASSERT(pSet->nSttF == 1);
|
||||
SDFileSet fSet = {.diskId = pSet->diskId, .fid = pSet->fid, .nSttF = 1};
|
||||
|
||||
// head
|
||||
fSet.pHeadF = (SHeadFile *)taosMemoryMalloc(sizeof(SHeadFile));
|
||||
if (fSet.pHeadF == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _exit;
|
||||
}
|
||||
*fSet.pHeadF = *pSet->pHeadF;
|
||||
|
||||
// data
|
||||
fSet.pDataF = (SDataFile *)taosMemoryMalloc(sizeof(SDataFile));
|
||||
if (fSet.pDataF == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _exit;
|
||||
}
|
||||
*fSet.pDataF = *pSet->pDataF;
|
||||
|
||||
// sma
|
||||
fSet.pSmaF = (SSmaFile *)taosMemoryMalloc(sizeof(SSmaFile));
|
||||
if (fSet.pSmaF == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _exit;
|
||||
}
|
||||
*fSet.pSmaF = *pSet->pSmaF;
|
||||
|
||||
// stt
|
||||
fSet.aSttF[0] = (SSttFile *)taosMemoryMalloc(sizeof(SSttFile));
|
||||
if (fSet.aSttF[0] == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _exit;
|
||||
}
|
||||
*fSet.aSttF[0] = *pSet->aSttF[0];
|
||||
|
||||
if (taosArrayInsert(pFS->aDFileSet, idx, &fSet) == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
_exit:
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tsdbFSPrepareCommit(STsdb *pTsdb, STsdbFS *pFSNew) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
char tfname[TSDB_FILENAME_LEN];
|
||||
|
||||
tsdbGetCurrentFName(pTsdb, NULL, tfname);
|
||||
|
||||
// gnrt CURRENT.t
|
||||
code = tsdbSaveFSToFile(pFSNew, tfname);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tsdbFSRef(STsdb *pTsdb, STsdbFS *pFS) {
|
||||
int32_t code = 0;
|
||||
int32_t nRef;
|
||||
|
||||
pFS->aDFileSet = taosArrayInit(taosArrayGetSize(pTsdb->fs.aDFileSet), sizeof(SDFileSet));
|
||||
if (pFS->aDFileSet == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
pFS->pDelFile = pTsdb->fs.pDelFile;
|
||||
if (pFS->pDelFile) {
|
||||
nRef = atomic_fetch_add_32(&pFS->pDelFile->nRef, 1);
|
||||
ASSERT(nRef > 0);
|
||||
}
|
||||
|
||||
SDFileSet fSet;
|
||||
for (int32_t iSet = 0; iSet < taosArrayGetSize(pTsdb->fs.aDFileSet); iSet++) {
|
||||
SDFileSet *pSet = (SDFileSet *)taosArrayGet(pTsdb->fs.aDFileSet, iSet);
|
||||
fSet = *pSet;
|
||||
|
||||
nRef = atomic_fetch_add_32(&pSet->pHeadF->nRef, 1);
|
||||
ASSERT(nRef > 0);
|
||||
|
||||
nRef = atomic_fetch_add_32(&pSet->pDataF->nRef, 1);
|
||||
ASSERT(nRef > 0);
|
||||
|
||||
nRef = atomic_fetch_add_32(&pSet->pSmaF->nRef, 1);
|
||||
ASSERT(nRef > 0);
|
||||
|
||||
for (int32_t iStt = 0; iStt < pSet->nSttF; iStt++) {
|
||||
nRef = atomic_fetch_add_32(&pSet->aSttF[iStt]->nRef, 1);
|
||||
ASSERT(nRef > 0);
|
||||
}
|
||||
|
||||
if (taosArrayPush(pFS->aDFileSet, &fSet) == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _exit;
|
||||
}
|
||||
}
|
||||
|
||||
_exit:
|
||||
return code;
|
||||
}
|
||||
|
||||
void tsdbFSUnref(STsdb *pTsdb, STsdbFS *pFS) {
|
||||
int32_t nRef;
|
||||
char fname[TSDB_FILENAME_LEN];
|
||||
|
||||
if (pFS->pDelFile) {
|
||||
nRef = atomic_sub_fetch_32(&pFS->pDelFile->nRef, 1);
|
||||
ASSERT(nRef >= 0);
|
||||
if (nRef == 0) {
|
||||
tsdbDelFileName(pTsdb, pFS->pDelFile, fname);
|
||||
(void)taosRemoveFile(fname);
|
||||
taosMemoryFree(pFS->pDelFile);
|
||||
}
|
||||
}
|
||||
|
||||
for (int32_t iSet = 0; iSet < taosArrayGetSize(pFS->aDFileSet); iSet++) {
|
||||
SDFileSet *pSet = (SDFileSet *)taosArrayGet(pFS->aDFileSet, iSet);
|
||||
|
||||
// head
|
||||
nRef = atomic_sub_fetch_32(&pSet->pHeadF->nRef, 1);
|
||||
ASSERT(nRef >= 0);
|
||||
if (nRef == 0) {
|
||||
tsdbHeadFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pHeadF, fname);
|
||||
(void)taosRemoveFile(fname);
|
||||
taosMemoryFree(pSet->pHeadF);
|
||||
}
|
||||
|
||||
// data
|
||||
nRef = atomic_sub_fetch_32(&pSet->pDataF->nRef, 1);
|
||||
ASSERT(nRef >= 0);
|
||||
if (nRef == 0) {
|
||||
tsdbDataFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pDataF, fname);
|
||||
(void)taosRemoveFile(fname);
|
||||
taosMemoryFree(pSet->pDataF);
|
||||
}
|
||||
|
||||
// sma
|
||||
nRef = atomic_sub_fetch_32(&pSet->pSmaF->nRef, 1);
|
||||
ASSERT(nRef >= 0);
|
||||
if (nRef == 0) {
|
||||
tsdbSmaFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pSmaF, fname);
|
||||
(void)taosRemoveFile(fname);
|
||||
taosMemoryFree(pSet->pSmaF);
|
||||
}
|
||||
|
||||
// stt
|
||||
for (int32_t iStt = 0; iStt < pSet->nSttF; iStt++) {
|
||||
nRef = atomic_sub_fetch_32(&pSet->aSttF[iStt]->nRef, 1);
|
||||
ASSERT(nRef >= 0);
|
||||
if (nRef == 0) {
|
||||
tsdbSttFileName(pTsdb, pSet->diskId, pSet->fid, pSet->aSttF[iStt], fname);
|
||||
(void)taosRemoveFile(fname);
|
||||
taosMemoryFree(pSet->aSttF[iStt]);
|
||||
/* code */
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
taosArrayDestroy(pFS->aDFileSet);
|
||||
}
|
||||
|
|
|
@ -22,8 +22,8 @@
|
|||
#include "tsdbUtil2.h"
|
||||
#include "tsimplehash.h"
|
||||
|
||||
#define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC)
|
||||
#define getCurrentKeyInSttBlock(_r) ((_r)->currentKey)
|
||||
#define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC)
|
||||
#define getCurrentKeyInSttBlock(_r) ((_r)->currentKey)
|
||||
|
||||
typedef struct {
|
||||
bool overlapWithNeighborBlock;
|
||||
|
@ -41,7 +41,7 @@ static int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, i
|
|||
static TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader);
|
||||
static int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader);
|
||||
static int32_t doMergeRowsInSttBlock(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts,
|
||||
SRowMerger* pMerger, SVersionRange* pVerRange, const char* id);
|
||||
SRowMerger* pMerger, SVersionRange* pVerRange, const char* id);
|
||||
static int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, STsdbReader* pReader);
|
||||
static int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, SRow* pTSRow,
|
||||
STableBlockScanInfo* pScanInfo);
|
||||
|
@ -67,7 +67,7 @@ static SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond
|
|||
static int32_t doBuildDataBlock(STsdbReader* pReader);
|
||||
static TSDBKEY getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbReader* pReader);
|
||||
static bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo);
|
||||
static bool hasDataInSttBlock(STableBlockScanInfo *pInfo);
|
||||
static bool hasDataInSttBlock(STableBlockScanInfo* pInfo);
|
||||
static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter);
|
||||
static int32_t getInitialDelIndex(const SArray* pDelSkyline, int32_t order);
|
||||
static void resetTableListIndex(SReaderStatus* pStatus);
|
||||
|
@ -1138,7 +1138,7 @@ static bool getNeighborBlockOfSameTable(SDataBlockIter* pBlockIter, SFileDataBlo
|
|||
return false;
|
||||
}
|
||||
|
||||
int32_t step = asc ? 1 : -1;
|
||||
int32_t step = asc ? 1 : -1;
|
||||
STableDataBlockIdx* pTableDataBlockIdx =
|
||||
taosArrayGet(pTableBlockScanInfo->pBlockIdxList, pBlockInfo->tbBlockIdx + step);
|
||||
SFileDataBlockInfo* p = taosArrayGet(pBlockIter->blockList, pTableDataBlockIdx->globalIndex);
|
||||
|
@ -1316,17 +1316,17 @@ static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo*
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int64_t st = taosGetTimestampUs();
|
||||
int64_t st = taosGetTimestampUs();
|
||||
SSDataBlock* pBlock = pReader->resBlockInfo.pResBlock;
|
||||
int32_t code = buildDataBlockFromBufImpl(pBlockScanInfo, endKey, pReader->resBlockInfo.capacity, pReader);
|
||||
int32_t code = buildDataBlockFromBufImpl(pBlockScanInfo, endKey, pReader->resBlockInfo.capacity, pReader);
|
||||
|
||||
double el = (taosGetTimestampUs() - st) / 1000.0;
|
||||
updateComposedBlockInfo(pReader, el, pBlockScanInfo);
|
||||
|
||||
tsdbDebug("%p build data block from cache completed, elapsed time:%.2f ms, numOfRows:%" PRId64 ", brange:%" PRId64
|
||||
" - %" PRId64 ", uid:%" PRIu64 ", %s",
|
||||
pReader, el, pBlock->info.rows, pBlock->info.window.skey, pBlock->info.window.ekey,
|
||||
pBlockScanInfo->uid, pReader->idStr);
|
||||
pReader, el, pBlock->info.rows, pBlock->info.window.skey, pBlock->info.window.ekey, pBlockScanInfo->uid,
|
||||
pReader->idStr);
|
||||
|
||||
pReader->cost.buildmemBlock += el;
|
||||
return code;
|
||||
|
@ -1390,13 +1390,9 @@ static bool nextRowFromSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockSc
|
|||
}
|
||||
}
|
||||
|
||||
static void doPinSttBlock(SSttBlockReader* pSttBlockReader) {
|
||||
tMergeTreePinSttBlock(&pSttBlockReader->mergeTree);
|
||||
}
|
||||
static void doPinSttBlock(SSttBlockReader* pSttBlockReader) { tMergeTreePinSttBlock(&pSttBlockReader->mergeTree); }
|
||||
|
||||
static void doUnpinSttBlock(SSttBlockReader* pSttBlockReader) {
|
||||
tMergeTreeUnpinSttBlock(&pSttBlockReader->mergeTree);
|
||||
}
|
||||
static void doUnpinSttBlock(SSttBlockReader* pSttBlockReader) { tMergeTreeUnpinSttBlock(&pSttBlockReader->mergeTree); }
|
||||
|
||||
static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SSttBlockReader* pSttBlockReader,
|
||||
STableBlockScanInfo* pScanInfo, int64_t ts, STsdbReader* pReader,
|
||||
|
@ -1535,8 +1531,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
|
|||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange,
|
||||
pReader->idStr);
|
||||
doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange, pReader->idStr);
|
||||
}
|
||||
|
||||
if (minKey == k.ts) {
|
||||
|
@ -1585,8 +1580,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
|
|||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange,
|
||||
pReader->idStr);
|
||||
doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange, pReader->idStr);
|
||||
}
|
||||
|
||||
if (minKey == key) {
|
||||
|
@ -1648,7 +1642,7 @@ static int32_t doMergeFileBlockAndLastBlock(SSttBlockReader* pSttBlockReader, ST
|
|||
TSDBROW* pRow1 = tMergeTreeGetRow(&pSttBlockReader->mergeTree);
|
||||
tsdbRowMergerAdd(pMerger, pRow1, NULL);
|
||||
doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, tsLastBlock, pMerger, &pReader->info.verRange,
|
||||
pReader->idStr);
|
||||
pReader->idStr);
|
||||
|
||||
code = tsdbRowMergerGetRow(pMerger, &pTSRow);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -1671,7 +1665,7 @@ static int32_t doMergeFileBlockAndLastBlock(SSttBlockReader* pSttBlockReader, ST
|
|||
}
|
||||
|
||||
doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, tsLastBlock, pMerger, &pReader->info.verRange,
|
||||
pReader->idStr);
|
||||
pReader->idStr);
|
||||
|
||||
// merge with block data if ts == key
|
||||
if (tsLastBlock == pBlockData->aTSKEY[pDumpInfo->rowIndex]) {
|
||||
|
@ -1740,7 +1734,7 @@ static int32_t mergeFileBlockAndSttBlock(STsdbReader* pReader, SSttBlockReader*
|
|||
// the following for key == tsLast
|
||||
// ASC: file block ------> stt block
|
||||
// DESC: stt block ------> file block
|
||||
SRow* pTSRow = NULL;
|
||||
SRow* pTSRow = NULL;
|
||||
if (ASCENDING_TRAVERSE(pReader->info.order)) {
|
||||
code = tsdbRowMergerAdd(pMerger, &fRow, NULL);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -1889,8 +1883,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
|
|||
return code;
|
||||
}
|
||||
|
||||
doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange,
|
||||
pReader->idStr);
|
||||
doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange, pReader->idStr);
|
||||
}
|
||||
|
||||
if (minKey == ik.ts) {
|
||||
|
@ -1948,8 +1941,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
|
|||
return code;
|
||||
}
|
||||
|
||||
doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange,
|
||||
pReader->idStr);
|
||||
doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange, pReader->idStr);
|
||||
}
|
||||
|
||||
if (minKey == key) {
|
||||
|
@ -2120,7 +2112,7 @@ static bool initSttBlockReader(SSttBlockReader* pSttBlockReader, STableBlockScan
|
|||
};
|
||||
|
||||
SSttDataInfoForTable info = {.pTimeWindowList = taosArrayInit(4, sizeof(STimeWindow))};
|
||||
int32_t code = tMergeTreeOpen2(&pSttBlockReader->mergeTree, &conf, &info);
|
||||
int32_t code = tMergeTreeOpen2(&pSttBlockReader->mergeTree, &conf, &info);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return false;
|
||||
}
|
||||
|
@ -2138,7 +2130,7 @@ static bool initSttBlockReader(SSttBlockReader* pSttBlockReader, STableBlockScan
|
|||
pScanInfo->sttWindow.ekey = INT64_MIN;
|
||||
|
||||
// calculate the time window for data in stt files
|
||||
for(int32_t i = 0; i < taosArrayGetSize(info.pTimeWindowList); ++i) {
|
||||
for (int32_t i = 0; i < taosArrayGetSize(info.pTimeWindowList); ++i) {
|
||||
STimeWindow* pWindow = taosArrayGet(info.pTimeWindowList, i);
|
||||
if (pScanInfo->sttWindow.skey > pWindow->skey) {
|
||||
pScanInfo->sttWindow.skey = pWindow->skey;
|
||||
|
@ -2149,8 +2141,9 @@ static bool initSttBlockReader(SSttBlockReader* pSttBlockReader, STableBlockScan
|
|||
}
|
||||
}
|
||||
|
||||
pScanInfo->sttKeyInfo.status = taosArrayGetSize(info.pTimeWindowList)? STT_FILE_HAS_DATA:STT_FILE_NO_DATA;
|
||||
pScanInfo->sttKeyInfo.nextProcKey = ASCENDING_TRAVERSE(pReader->info.order)? pScanInfo->sttWindow.skey:pScanInfo->sttWindow.ekey;
|
||||
pScanInfo->sttKeyInfo.status = taosArrayGetSize(info.pTimeWindowList) ? STT_FILE_HAS_DATA : STT_FILE_NO_DATA;
|
||||
pScanInfo->sttKeyInfo.nextProcKey =
|
||||
ASCENDING_TRAVERSE(pReader->info.order) ? pScanInfo->sttWindow.skey : pScanInfo->sttWindow.ekey;
|
||||
hasData = true;
|
||||
} else {
|
||||
hasData = nextRowFromSttBlocks(pSttBlockReader, pScanInfo, &pReader->info.verRange);
|
||||
|
@ -2168,9 +2161,7 @@ static bool initSttBlockReader(SSttBlockReader* pSttBlockReader, STableBlockScan
|
|||
return hasData;
|
||||
}
|
||||
|
||||
static bool hasDataInSttBlock(STableBlockScanInfo *pInfo) {
|
||||
return pInfo->sttKeyInfo.status == STT_FILE_HAS_DATA;
|
||||
}
|
||||
static bool hasDataInSttBlock(STableBlockScanInfo* pInfo) { return pInfo->sttKeyInfo.status == STT_FILE_HAS_DATA; }
|
||||
|
||||
bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo) {
|
||||
if ((pBlockData->nRow > 0) && (pBlockData->nRow != pDumpInfo->totalRows)) {
|
||||
|
@ -2225,10 +2216,11 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc
|
|||
}
|
||||
}
|
||||
|
||||
int32_t mergeRowsInSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) {
|
||||
bool copied = false;
|
||||
SRow* pTSRow = NULL;
|
||||
int64_t tsLastBlock = getCurrentKeyInSttBlock(pSttBlockReader);
|
||||
int32_t mergeRowsInSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pBlockScanInfo,
|
||||
STsdbReader* pReader) {
|
||||
bool copied = false;
|
||||
SRow* pTSRow = NULL;
|
||||
int64_t tsLastBlock = getCurrentKeyInSttBlock(pSttBlockReader);
|
||||
|
||||
SRowMerger* pMerger = &pReader->status.merger;
|
||||
TSDBROW* pRow = tMergeTreeGetRow(&pSttBlockReader->mergeTree);
|
||||
|
@ -2559,16 +2551,16 @@ static void prepareDurationForNextFileSet(STsdbReader* pReader) {
|
|||
pReader->status.bProcMemFirstFileset = false;
|
||||
}
|
||||
|
||||
int32_t fid = pReader->status.pCurrentFileset->fid;
|
||||
int32_t fid = pReader->status.pCurrentFileset->fid;
|
||||
STimeWindow winFid = {0};
|
||||
tsdbFidKeyRange(fid, pReader->pTsdb->keepCfg.days, pReader->pTsdb->keepCfg.precision, &winFid.skey, &winFid.ekey);
|
||||
|
||||
if (ASCENDING_TRAVERSE(pReader->info.order)) {
|
||||
pReader->status.bProcMemPreFileset = !(pReader->status.memTableMaxKey < pReader->status.prevFilesetStartKey ||
|
||||
(winFid.skey-1) < pReader->status.memTableMinKey);
|
||||
(winFid.skey - 1) < pReader->status.memTableMinKey);
|
||||
} else {
|
||||
pReader->status.bProcMemPreFileset = !( pReader->status.memTableMaxKey < (winFid.ekey+1) ||
|
||||
pReader->status.prevFilesetEndKey < pReader->status.memTableMinKey);
|
||||
pReader->status.bProcMemPreFileset = !(pReader->status.memTableMaxKey < (winFid.ekey + 1) ||
|
||||
pReader->status.prevFilesetEndKey < pReader->status.memTableMinKey);
|
||||
}
|
||||
|
||||
if (pReader->status.bProcMemPreFileset) {
|
||||
|
@ -2802,7 +2794,7 @@ static int32_t doLoadSttBlockSequentially(STsdbReader* pReader) {
|
|||
static bool notOverlapWithSttFiles(SFileDataBlockInfo* pBlockInfo, STableBlockScanInfo* pScanInfo, bool asc) {
|
||||
ASSERT(pScanInfo->sttKeyInfo.status != STT_FILE_READER_UNINIT);
|
||||
|
||||
if(pScanInfo->sttKeyInfo.status == STT_FILE_NO_DATA) {
|
||||
if (pScanInfo->sttKeyInfo.status == STT_FILE_NO_DATA) {
|
||||
return true;
|
||||
} else {
|
||||
int64_t keyInStt = pScanInfo->sttKeyInfo.nextProcKey;
|
||||
|
@ -2923,7 +2915,8 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
|
|||
static int32_t buildBlockFromBufferSeqForPreFileset(STsdbReader* pReader, int64_t endKey) {
|
||||
SReaderStatus* pStatus = &pReader->status;
|
||||
|
||||
tsdbDebug("seq load data blocks from cache that preceeds fileset %d, %s", pReader->status.pCurrentFileset->fid, pReader->idStr);
|
||||
tsdbDebug("seq load data blocks from cache that preceeds fileset %d, %s", pReader->status.pCurrentFileset->fid,
|
||||
pReader->idStr);
|
||||
|
||||
while (1) {
|
||||
if (pReader->code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -3847,7 +3840,6 @@ int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t e
|
|||
pBlockScanInfo->lastProcKey = row.pBlockData->aTSKEY[row.iRow];
|
||||
}
|
||||
|
||||
|
||||
// no data in buffer, return immediately
|
||||
if (!(pBlockScanInfo->iter.hasVal || pBlockScanInfo->iiter.hasVal)) {
|
||||
break;
|
||||
|
@ -3945,7 +3937,7 @@ static int32_t doOpenReaderImpl(STsdbReader* pReader) {
|
|||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
if (pStatus->fileIter.numOfFiles == 0) {
|
||||
pStatus->loadFromFile = false;
|
||||
// } else if (READER_EXEC_DATA == pReader->info.readMode) {
|
||||
// } else if (READER_EXEC_DATA == pReader->info.readMode) {
|
||||
// DO NOTHING
|
||||
} else {
|
||||
code = initForFirstBlockInFile(pReader, pBlockIter);
|
||||
|
@ -4092,7 +4084,7 @@ int32_t tsdbReaderOpen2(void* pVnode, SQueryTableDataCond* pCond, void* pTableLi
|
|||
}
|
||||
|
||||
pReader->flag = READER_STATUS_SUSPEND;
|
||||
pReader->info.execMode = pCond->notLoadData? READER_EXEC_ROWS : READER_EXEC_DATA;
|
||||
pReader->info.execMode = pCond->notLoadData ? READER_EXEC_ROWS : READER_EXEC_DATA;
|
||||
|
||||
pReader->pIgnoreTables = pIgnoreTables;
|
||||
tsdbDebug("%p total numOfTable:%d, window:%" PRId64 " - %" PRId64 ", verRange:%" PRId64 " - %" PRId64
|
||||
|
@ -4157,7 +4149,7 @@ void tsdbReaderClose2(STsdbReader* pReader) {
|
|||
}
|
||||
|
||||
SReadCostSummary* pCost = &pReader->cost;
|
||||
SFilesetIter* pFilesetIter = &pReader->status.fileIter;
|
||||
SFilesetIter* pFilesetIter = &pReader->status.fileIter;
|
||||
if (pFilesetIter->pSttBlockReader != NULL) {
|
||||
SSttBlockReader* pLReader = pFilesetIter->pSttBlockReader;
|
||||
tMergeTreeClose(&pLReader->mergeTree);
|
||||
|
@ -4204,7 +4196,7 @@ int32_t tsdbReaderSuspend2(STsdbReader* pReader) {
|
|||
SReaderStatus* pStatus = &pReader->status;
|
||||
STableBlockScanInfo* pBlockScanInfo = NULL;
|
||||
|
||||
pReader->status.suspendInvoked = true; // record the suspend status
|
||||
pReader->status.suspendInvoked = true; // record the suspend status
|
||||
|
||||
if (pStatus->loadFromFile) {
|
||||
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);
|
||||
|
@ -4227,7 +4219,7 @@ int32_t tsdbReaderSuspend2(STsdbReader* pReader) {
|
|||
// resetDataBlockScanInfo excluding lastKey
|
||||
STableBlockScanInfo** p = NULL;
|
||||
|
||||
int32_t step = ASCENDING_TRAVERSE(pReader->info.order)? 1:-1;
|
||||
int32_t step = ASCENDING_TRAVERSE(pReader->info.order) ? 1 : -1;
|
||||
|
||||
int32_t iter = 0;
|
||||
while ((p = tSimpleHashIterate(pStatus->pTableMap, p, &iter)) != NULL) {
|
||||
|
@ -4248,7 +4240,7 @@ int32_t tsdbReaderSuspend2(STsdbReader* pReader) {
|
|||
pReader->flag = READER_STATUS_SUSPEND;
|
||||
|
||||
#if SUSPEND_RESUME_TEST
|
||||
tsem_post(&pReader->resumeAfterSuspend);
|
||||
tsem_post(&pReader->resumeAfterSuspend);
|
||||
#endif
|
||||
|
||||
tsdbDebug("reader: %p suspended uid %" PRIu64 " in this query %s", pReader, pBlockScanInfo ? pBlockScanInfo->uid : 0,
|
||||
|
@ -4331,13 +4323,13 @@ _err:
|
|||
}
|
||||
|
||||
static int32_t buildFromPreFilesetBuffer(STsdbReader* pReader) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SReaderStatus* pStatus = &pReader->status;
|
||||
|
||||
SSDataBlock* pBlock = pReader->resBlockInfo.pResBlock;
|
||||
|
||||
int32_t fid = pReader->status.pCurrentFileset->fid;
|
||||
STimeWindow win = {0};
|
||||
int32_t fid = pReader->status.pCurrentFileset->fid;
|
||||
STimeWindow win = {0};
|
||||
tsdbFidKeyRange(fid, pReader->pTsdb->keepCfg.days, pReader->pTsdb->keepCfg.precision, &win.skey, &win.ekey);
|
||||
|
||||
int64_t endKey = (ASCENDING_TRAVERSE(pReader->info.order)) ? win.skey : win.ekey;
|
||||
|
@ -4359,8 +4351,8 @@ static int32_t buildFromPreFilesetBuffer(STsdbReader* pReader) {
|
|||
|
||||
static int32_t doTsdbNextDataBlockFilesetDelimited(STsdbReader* pReader) {
|
||||
SReaderStatus* pStatus = &pReader->status;
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SSDataBlock* pBlock = pReader->resBlockInfo.pResBlock;
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SSDataBlock* pBlock = pReader->resBlockInfo.pResBlock;
|
||||
|
||||
if (pStatus->loadFromFile) {
|
||||
if (pStatus->bProcMemPreFileset) {
|
||||
|
@ -4375,12 +4367,12 @@ static int32_t doTsdbNextDataBlockFilesetDelimited(STsdbReader* pReader) {
|
|||
return code;
|
||||
}
|
||||
|
||||
tsdbTrace("block from file rows: %"PRId64", will process pre-file set buffer: %d. %s",
|
||||
pBlock->info.rows, pStatus->bProcMemFirstFileset, pReader->idStr);
|
||||
tsdbTrace("block from file rows: %" PRId64 ", will process pre-file set buffer: %d. %s", pBlock->info.rows,
|
||||
pStatus->bProcMemFirstFileset, pReader->idStr);
|
||||
if (pStatus->bProcMemPreFileset) {
|
||||
if (pBlock->info.rows > 0) {
|
||||
if (pReader->notifyFn) {
|
||||
int32_t fid = pReader->status.pCurrentFileset->fid;
|
||||
int32_t fid = pReader->status.pCurrentFileset->fid;
|
||||
STsdReaderNotifyInfo info = {0};
|
||||
info.duration.filesetId = fid;
|
||||
pReader->notifyFn(TSD_READER_NOTIFY_NEXT_DURATION_BLOCK, &info, pReader->notifyParam);
|
||||
|
@ -4404,8 +4396,8 @@ static int32_t doTsdbNextDataBlockFilesetDelimited(STsdbReader* pReader) {
|
|||
|
||||
static int32_t doTsdbNextDataBlockFilesFirst(STsdbReader* pReader) {
|
||||
SReaderStatus* pStatus = &pReader->status;
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SSDataBlock* pBlock = pReader->resBlockInfo.pResBlock;
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SSDataBlock* pBlock = pReader->resBlockInfo.pResBlock;
|
||||
|
||||
if (pStatus->loadFromFile) {
|
||||
code = buildBlockFromFiles(pReader);
|
||||
|
@ -4940,7 +4932,7 @@ static void getMemTableTimeRange(STsdbReader* pReader, int64_t* pMaxKey, int64_t
|
|||
int64_t minKey = INT64_MAX;
|
||||
|
||||
void* pHashIter = tSimpleHashIterate(pStatus->pTableMap, NULL, &iter);
|
||||
while (pHashIter!= NULL) {
|
||||
while (pHashIter != NULL) {
|
||||
STableBlockScanInfo* pBlockScanInfo = *(STableBlockScanInfo**)pHashIter;
|
||||
|
||||
STbData* d = NULL;
|
||||
|
@ -5144,7 +5136,6 @@ void tsdbUntakeReadSnap2(STsdbReader* pReader, STsdbReadSnap* pSnap, bool proact
|
|||
tsdbUnrefMemTable(pSnap->pIMem, pSnap->pINode, proactive);
|
||||
}
|
||||
|
||||
tsdbFSUnref(pTsdb, &pSnap->fs);
|
||||
if (pSnap->pNode) taosMemoryFree(pSnap->pNode);
|
||||
if (pSnap->pINode) taosMemoryFree(pSnap->pINode);
|
||||
|
||||
|
@ -5165,9 +5156,7 @@ void tsdbReaderSetId2(STsdbReader* pReader, const char* idstr) {
|
|||
void tsdbReaderSetCloseFlag(STsdbReader* pReader) { /*pReader->code = TSDB_CODE_TSC_QUERY_CANCELLED;*/
|
||||
}
|
||||
|
||||
void tsdbSetFilesetDelimited(STsdbReader* pReader) {
|
||||
pReader->bFilesetDelimited = true;
|
||||
}
|
||||
void tsdbSetFilesetDelimited(STsdbReader* pReader) { pReader->bFilesetDelimited = true; }
|
||||
|
||||
void tsdbReaderSetNotifyCb(STsdbReader* pReader, TsdReaderNotifyCbFn notifyFn, void* param) {
|
||||
pReader->notifyFn = notifyFn;
|
||||
|
|
Loading…
Reference in New Issue