Merge pull request #22461 from taosdata/fix/liaohj
refactor: do some internal refactcor
This commit is contained in:
commit
80506b564b
|
@ -708,14 +708,21 @@ typedef struct {
|
|||
TSDBROW row;
|
||||
} SRowInfo;
|
||||
|
||||
typedef struct SSttBlockLoadCostInfo {
|
||||
int64_t loadBlocks;
|
||||
int64_t loadStatisBlocks;
|
||||
double blockElapsedTime;
|
||||
double statisElapsedTime;
|
||||
} SSttBlockLoadCostInfo;
|
||||
|
||||
typedef struct SSttBlockLoadInfo {
|
||||
SBlockData blockData[2];
|
||||
SBlockData blockData[2]; // buffered block data
|
||||
int32_t statisBlockIndex; // buffered statistics block index
|
||||
void *statisBlock; // buffered statistics block data
|
||||
void *pSttStatisBlkArray;
|
||||
SArray *aSttBlk;
|
||||
int32_t blockIndex[2]; // to denote the loaded block in the corresponding position.
|
||||
int32_t currentLoadBlockIndex;
|
||||
int32_t loadBlocks;
|
||||
double elapsedTime;
|
||||
STSchema *pSchema;
|
||||
int16_t *colIds;
|
||||
int32_t numOfCols;
|
||||
|
@ -723,12 +730,7 @@ typedef struct SSttBlockLoadInfo {
|
|||
bool isLast;
|
||||
bool sttBlockLoaded;
|
||||
|
||||
// keep the last access position, this position may be used to reduce the binary times for
|
||||
// starting last block data for a new table
|
||||
struct {
|
||||
int32_t blockIndex;
|
||||
int32_t rowIndex;
|
||||
} prevEndPos;
|
||||
SSttBlockLoadCostInfo cost;
|
||||
} SSttBlockLoadInfo;
|
||||
|
||||
typedef struct SMergeTree {
|
||||
|
@ -831,9 +833,9 @@ void tMergeTreeClose(SMergeTree *pMTree);
|
|||
SSttBlockLoadInfo *tCreateLastBlockLoadInfo(STSchema *pSchema, int16_t *colList, int32_t numOfCols, int32_t numOfStt);
|
||||
SSttBlockLoadInfo *tCreateOneLastBlockLoadInfo(STSchema *pSchema, int16_t *colList, int32_t numOfCols);
|
||||
void resetLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo);
|
||||
void getLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo, int64_t *blocks, double *el);
|
||||
void getSttBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo, SSttBlockLoadCostInfo *pLoadCost);
|
||||
void *destroyLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo);
|
||||
void *destroySttBlockReader(SArray *pLDataIterArray, int64_t *blocks, double *el);
|
||||
void *destroySttBlockReader(SArray *pLDataIterArray, SSttBlockLoadCostInfo *pLoadCost);
|
||||
|
||||
// tsdbCache ==============================================================================================
|
||||
typedef enum {
|
||||
|
|
|
@ -1760,10 +1760,7 @@ typedef struct {
|
|||
static int32_t lastIterOpen(SFSLastIter *iter, STFileSet *pFileSet, STsdb *pTsdb, STSchema *pTSchema, tb_uid_t suid,
|
||||
tb_uid_t uid, SCacheRowsReader *pr, int64_t lastTs, int16_t *aCols, int nCols) {
|
||||
int32_t code = 0;
|
||||
|
||||
int64_t loadBlocks = 0;
|
||||
double elapse = 0;
|
||||
pr->pLDataIterArray = destroySttBlockReader(pr->pLDataIterArray, &loadBlocks, &elapse);
|
||||
pr->pLDataIterArray = destroySttBlockReader(pr->pLDataIterArray, NULL);
|
||||
pr->pLDataIterArray = taosArrayInit(4, POINTER_BYTES);
|
||||
|
||||
SMergeTreeConf conf = {
|
||||
|
|
|
@ -125,10 +125,7 @@ int32_t tsdbReuseCacherowsReader(void* reader, void* pTableIdList, int32_t numOf
|
|||
pReader->pTableList = pTableIdList;
|
||||
pReader->numOfTables = numOfTables;
|
||||
pReader->lastTs = INT64_MIN;
|
||||
|
||||
int64_t blocks;
|
||||
double elapse;
|
||||
pReader->pLDataIterArray = destroySttBlockReader(pReader->pLDataIterArray, &blocks, &elapse);
|
||||
pReader->pLDataIterArray = destroySttBlockReader(pReader->pLDataIterArray, NULL);
|
||||
pReader->pLDataIterArray = taosArrayInit(4, POINTER_BYTES);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -208,9 +205,7 @@ void* tsdbCacherowsReaderClose(void* pReader) {
|
|||
taosMemoryFree(p->pCurrSchema);
|
||||
|
||||
if (p->pLDataIterArray) {
|
||||
int64_t loadBlocks = 0;
|
||||
double elapse = 0;
|
||||
destroySttBlockReader(p->pLDataIterArray, &loadBlocks, &elapse);
|
||||
destroySttBlockReader(p->pLDataIterArray, NULL);
|
||||
}
|
||||
|
||||
if (p->pFileReader) {
|
||||
|
|
|
@ -530,6 +530,7 @@ static int32_t tsdbCloseCommitter(SCommitter2 *committer, int32_t eno) {
|
|||
ASSERT(committer->tombIterMerger == NULL);
|
||||
TARRAY2_DESTROY(committer->dataIterArray, NULL);
|
||||
TARRAY2_DESTROY(committer->tombIterArray, NULL);
|
||||
TARRAY2_DESTROY(committer->sttReaderArray, NULL);
|
||||
TARRAY2_DESTROY(committer->fopArray, NULL);
|
||||
TARRAY2_DESTROY(committer->sttReaderArray, NULL);
|
||||
tsdbFSDestroyCopySnapshot(&committer->fsetArr);
|
||||
|
|
|
@ -91,41 +91,39 @@ void resetLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo) {
|
|||
|
||||
taosArrayClear(pLoadInfo[i].aSttBlk);
|
||||
|
||||
pLoadInfo[i].elapsedTime = 0;
|
||||
pLoadInfo[i].loadBlocks = 0;
|
||||
pLoadInfo[i].cost.loadBlocks = 0;
|
||||
pLoadInfo[i].cost.blockElapsedTime = 0;
|
||||
pLoadInfo[i].cost.statisElapsedTime = 0;
|
||||
pLoadInfo[i].cost.loadStatisBlocks = 0;
|
||||
pLoadInfo[i].statisBlockIndex = -1;
|
||||
tStatisBlockDestroy(pLoadInfo[i].statisBlock);
|
||||
|
||||
pLoadInfo[i].sttBlockLoaded = false;
|
||||
}
|
||||
}
|
||||
|
||||
void getLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo, int64_t *blocks, double *el) {
|
||||
void getSttBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo, SSttBlockLoadCostInfo* pLoadCost) {
|
||||
for (int32_t i = 0; i < 1; ++i) {
|
||||
*el += pLoadInfo[i].elapsedTime;
|
||||
*blocks += pLoadInfo[i].loadBlocks;
|
||||
pLoadCost->blockElapsedTime += pLoadInfo[i].cost.blockElapsedTime;
|
||||
pLoadCost->loadBlocks += pLoadInfo[i].cost.loadBlocks;
|
||||
pLoadCost->loadStatisBlocks += pLoadInfo[i].cost.loadStatisBlocks;
|
||||
pLoadCost->statisElapsedTime += pLoadInfo[i].cost.statisElapsedTime;
|
||||
}
|
||||
}
|
||||
|
||||
static void freeTombBlock(void *param) {
|
||||
STombBlock **pTombBlock = (STombBlock **)param;
|
||||
tTombBlockDestroy(*pTombBlock);
|
||||
taosMemoryFree(*pTombBlock);
|
||||
}
|
||||
|
||||
void *destroyLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo) {
|
||||
if (pLoadInfo == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < 1; ++i) {
|
||||
pLoadInfo[i].currentLoadBlockIndex = 1;
|
||||
pLoadInfo[i].blockIndex[0] = -1;
|
||||
pLoadInfo[i].blockIndex[1] = -1;
|
||||
pLoadInfo->currentLoadBlockIndex = 1;
|
||||
pLoadInfo->blockIndex[0] = -1;
|
||||
pLoadInfo->blockIndex[1] = -1;
|
||||
|
||||
tBlockDataDestroy(&pLoadInfo[i].blockData[0]);
|
||||
tBlockDataDestroy(&pLoadInfo[i].blockData[1]);
|
||||
|
||||
taosArrayDestroy(pLoadInfo[i].aSttBlk);
|
||||
}
|
||||
tBlockDataDestroy(&pLoadInfo->blockData[0]);
|
||||
tBlockDataDestroy(&pLoadInfo->blockData[1]);
|
||||
|
||||
taosArrayDestroy(pLoadInfo->aSttBlk);
|
||||
taosMemoryFree(pLoadInfo);
|
||||
return NULL;
|
||||
}
|
||||
|
@ -136,7 +134,7 @@ static void destroyLDataIter(SLDataIter *pIter) {
|
|||
taosMemoryFree(pIter);
|
||||
}
|
||||
|
||||
void *destroySttBlockReader(SArray *pLDataIterArray, int64_t *blocks, double *el) {
|
||||
void *destroySttBlockReader(SArray *pLDataIterArray, SSttBlockLoadCostInfo* pLoadCost) {
|
||||
if (pLDataIterArray == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
@ -146,8 +144,13 @@ void *destroySttBlockReader(SArray *pLDataIterArray, int64_t *blocks, double *el
|
|||
SArray *pList = taosArrayGetP(pLDataIterArray, i);
|
||||
for (int32_t j = 0; j < taosArrayGetSize(pList); ++j) {
|
||||
SLDataIter *pIter = taosArrayGetP(pList, j);
|
||||
*el += pIter->pBlockLoadInfo->elapsedTime;
|
||||
*blocks += pIter->pBlockLoadInfo->loadBlocks;
|
||||
if (pLoadCost != NULL) {
|
||||
pLoadCost->loadBlocks += pIter->pBlockLoadInfo->cost.loadBlocks;
|
||||
pLoadCost->loadStatisBlocks += pIter->pBlockLoadInfo->cost.loadStatisBlocks;
|
||||
pLoadCost->blockElapsedTime += pIter->pBlockLoadInfo->cost.blockElapsedTime;
|
||||
pLoadCost->statisElapsedTime += pIter->pBlockLoadInfo->cost.statisElapsedTime;
|
||||
}
|
||||
|
||||
destroyLDataIter(pIter);
|
||||
}
|
||||
taosArrayDestroy(pList);
|
||||
|
@ -195,12 +198,12 @@ static SBlockData *loadLastBlock(SLDataIter *pIter, const char *idStr) {
|
|||
}
|
||||
|
||||
double el = (taosGetTimestampUs() - st) / 1000.0;
|
||||
pInfo->elapsedTime += el;
|
||||
pInfo->loadBlocks += 1;
|
||||
pInfo->cost.blockElapsedTime += el;
|
||||
pInfo->cost.loadBlocks += 1;
|
||||
|
||||
tsdbDebug("read last block, total load:%d, trigger by uid:%" PRIu64
|
||||
tsdbDebug("read last block, total load:%"PRId64", trigger by uid:%" PRIu64
|
||||
", last file index:%d, last block index:%d, entry:%d, rows:%d, %p, elapsed time:%.2f ms, %s",
|
||||
pInfo->loadBlocks, pIter->uid, pIter->iStt, pIter->iSttBlk, pInfo->currentLoadBlockIndex, pBlock->nRow,
|
||||
pInfo->cost.loadBlocks, pIter->uid, pIter->iStt, pIter->iSttBlk, pInfo->currentLoadBlockIndex, pBlock->nRow,
|
||||
pBlock, el, idStr);
|
||||
|
||||
pInfo->blockIndex[pInfo->currentLoadBlockIndex] = pIter->iSttBlk;
|
||||
|
@ -363,8 +366,9 @@ static int32_t suidComparFn(const void *target, const void *p2) {
|
|||
}
|
||||
}
|
||||
|
||||
static bool existsFromSttBlkStatis(const TStatisBlkArray *pStatisBlkArray, uint64_t suid, uint64_t uid,
|
||||
static bool existsFromSttBlkStatis(SSttBlockLoadInfo *pBlockLoadInfo, uint64_t suid, uint64_t uid,
|
||||
SSttFileReader *pReader) {
|
||||
const TStatisBlkArray *pStatisBlkArray = pBlockLoadInfo->pSttStatisBlkArray;
|
||||
if (TARRAY2_SIZE(pStatisBlkArray) <= 0) {
|
||||
return true;
|
||||
}
|
||||
|
@ -387,23 +391,40 @@ static bool existsFromSttBlkStatis(const TStatisBlkArray *pStatisBlkArray, uint6
|
|||
return false;
|
||||
}
|
||||
|
||||
STbStatisBlock block = {0};
|
||||
tsdbSttFileReadStatisBlock(pReader, p, &block);
|
||||
// if (pBlockLoadInfo->statisBlock == NULL) {
|
||||
// pBlockLoadInfo->statisBlock = taosMemoryCalloc(1, sizeof(STbStatisBlock));
|
||||
//
|
||||
// int64_t st = taosGetTimestampMs();
|
||||
// tsdbSttFileReadStatisBlock(pReader, p, pBlockLoadInfo->statisBlock);
|
||||
// pBlockLoadInfo->statisBlockIndex = i;
|
||||
//
|
||||
// double el = (taosGetTimestampMs() - st) / 1000.0;
|
||||
// pBlockLoadInfo->cost.loadStatisBlocks += 1;
|
||||
// pBlockLoadInfo->cost.statisElapsedTime += el;
|
||||
// } else if (pBlockLoadInfo->statisBlockIndex != i) {
|
||||
// tStatisBlockDestroy(pBlockLoadInfo->statisBlock);
|
||||
//
|
||||
// int64_t st = taosGetTimestampMs();
|
||||
// tsdbSttFileReadStatisBlock(pReader, p, pBlockLoadInfo->statisBlock);
|
||||
// pBlockLoadInfo->statisBlockIndex = i;
|
||||
//
|
||||
// double el = (taosGetTimestampMs() - st) / 1000.0;
|
||||
// pBlockLoadInfo->cost.loadStatisBlocks += 1;
|
||||
// pBlockLoadInfo->cost.statisElapsedTime += el;
|
||||
// }
|
||||
|
||||
int32_t index = tarray2SearchIdx(block.suid, &suid, sizeof(int64_t), suidComparFn, TD_EQ);
|
||||
STbStatisBlock* pBlock = pBlockLoadInfo->statisBlock;
|
||||
int32_t index = tarray2SearchIdx(pBlock->suid, &suid, sizeof(int64_t), suidComparFn, TD_EQ);
|
||||
if (index == -1) {
|
||||
tStatisBlockDestroy(&block);
|
||||
return false;
|
||||
}
|
||||
|
||||
int32_t j = index;
|
||||
if (block.uid->data[j] == uid) {
|
||||
tStatisBlockDestroy(&block);
|
||||
if (pBlock->uid->data[j] == uid) {
|
||||
return true;
|
||||
} else if (block.uid->data[j] > uid) {
|
||||
while (j >= 0 && block.suid->data[j] == suid) {
|
||||
if (block.uid->data[j] == uid) {
|
||||
tStatisBlockDestroy(&block);
|
||||
} else if (pBlock->uid->data[j] > uid) {
|
||||
while (j >= 0 && pBlock->suid->data[j] == suid) {
|
||||
if (pBlock->uid->data[j] == uid) {
|
||||
return true;
|
||||
} else {
|
||||
j -= 1;
|
||||
|
@ -411,9 +432,8 @@ static bool existsFromSttBlkStatis(const TStatisBlkArray *pStatisBlkArray, uint6
|
|||
}
|
||||
} else {
|
||||
j = index + 1;
|
||||
while (j < block.suid->size && block.suid->data[j] == suid) {
|
||||
if (block.uid->data[j] == uid) {
|
||||
tStatisBlockDestroy(&block);
|
||||
while (j < pBlock->suid->size && pBlock->suid->data[j] == suid) {
|
||||
if (pBlock->uid->data[j] == uid) {
|
||||
return true;
|
||||
} else {
|
||||
j += 1;
|
||||
|
@ -421,14 +441,47 @@ static bool existsFromSttBlkStatis(const TStatisBlkArray *pStatisBlkArray, uint6
|
|||
}
|
||||
}
|
||||
|
||||
tStatisBlockDestroy(&block);
|
||||
i += 1;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
int32_t tLDataIterOpen2(struct SLDataIter *pIter, SSttFileReader *pSttFileReader, int32_t iStt, int8_t backward,
|
||||
static int32_t doLoadSttFilesBlk(SSttBlockLoadInfo *pBlockLoadInfo, SLDataIter *pIter, int64_t suid,
|
||||
_load_tomb_fn loadTombFn, void *pReader1, const char *idStr) {
|
||||
int64_t st = taosGetTimestampUs();
|
||||
|
||||
const TSttBlkArray *pSttBlkArray = NULL;
|
||||
pBlockLoadInfo->sttBlockLoaded = true;
|
||||
|
||||
// load the stt block info for each stt-block
|
||||
int32_t code = tsdbSttFileReadSttBlk(pIter->pReader, &pSttBlkArray);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tsdbError("load stt blk failed, code:%s, %s", tstrerror(code), idStr);
|
||||
return code;
|
||||
}
|
||||
|
||||
code = extractSttBlockInfo(pIter, pSttBlkArray, pBlockLoadInfo, suid);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tsdbError("load stt block info failed, code:%s, %s", tstrerror(code), idStr);
|
||||
return code;
|
||||
}
|
||||
|
||||
// load stt blocks statis for all stt-blocks, to decide if the data of queried table exists in current stt file
|
||||
code = tsdbSttFileReadStatisBlk(pIter->pReader, (const TStatisBlkArray **)&pBlockLoadInfo->pSttStatisBlkArray);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tsdbError("failed to load stt block statistics, code:%s, %s", tstrerror(code), idStr);
|
||||
return code;
|
||||
}
|
||||
|
||||
code = loadTombFn(pReader1, pIter->pReader, pIter->pBlockLoadInfo);
|
||||
|
||||
double el = (taosGetTimestampUs() - st) / 1000.0;
|
||||
tsdbDebug("load the stt file info completed, elapsed time:%.2fms, %s", el, idStr);
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tLDataIterOpen2(SLDataIter *pIter, SSttFileReader *pSttFileReader, int32_t iStt, int8_t backward,
|
||||
uint64_t suid, uint64_t uid, STimeWindow *pTimeWindow, SVersionRange *pRange,
|
||||
SSttBlockLoadInfo *pBlockLoadInfo, const char *idStr, bool strictTimeRange,
|
||||
_load_tomb_fn loadTombFn, void *pReader1) {
|
||||
|
@ -444,6 +497,7 @@ int32_t tLDataIterOpen2(struct SLDataIter *pIter, SSttFileReader *pSttFileReader
|
|||
pIter->pReader = pSttFileReader;
|
||||
pIter->pBlockLoadInfo = pBlockLoadInfo;
|
||||
|
||||
// open stt file failed, ignore and continue
|
||||
if (pIter->pReader == NULL) {
|
||||
tsdbError("stt file reader is null, %s", idStr);
|
||||
pIter->pSttBlk = NULL;
|
||||
|
@ -452,43 +506,18 @@ int32_t tLDataIterOpen2(struct SLDataIter *pIter, SSttFileReader *pSttFileReader
|
|||
}
|
||||
|
||||
if (!pBlockLoadInfo->sttBlockLoaded) {
|
||||
int64_t st = taosGetTimestampUs();
|
||||
|
||||
const TSttBlkArray *pSttBlkArray = NULL;
|
||||
pBlockLoadInfo->sttBlockLoaded = true;
|
||||
|
||||
// load the stt block info for each stt-block
|
||||
code = tsdbSttFileReadSttBlk(pIter->pReader, &pSttBlkArray);
|
||||
code = doLoadSttFilesBlk(pBlockLoadInfo, pIter, suid, loadTombFn, pReader1, idStr);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tsdbError("load stt blk failed, code:%s, %s", tstrerror(code), idStr);
|
||||
return code;
|
||||
}
|
||||
|
||||
code = extractSttBlockInfo(pIter, pSttBlkArray, pBlockLoadInfo, suid);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tsdbError("load stt block info failed, code:%s, %s", tstrerror(code), idStr);
|
||||
return code;
|
||||
}
|
||||
|
||||
// load stt blocks statis for all stt-blocks, to decide if the data of queried table exists in current stt file
|
||||
code = tsdbSttFileReadStatisBlk(pIter->pReader, (const TStatisBlkArray **)&pBlockLoadInfo->pSttStatisBlkArray);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tsdbError("failed to load stt block statistics, code:%s, %s", tstrerror(code), idStr);
|
||||
return code;
|
||||
}
|
||||
|
||||
code = loadTombFn(pReader1, pIter->pReader, pIter->pBlockLoadInfo);
|
||||
|
||||
double el = (taosGetTimestampUs() - st) / 1000.0;
|
||||
tsdbDebug("load the stt file info completed, elapsed time:%.2fms, %s", el, idStr);
|
||||
}
|
||||
|
||||
// bool exists = existsFromSttBlkStatis(pBlockLoadInfo->pSttStatisBlkArray, suid, uid, pIter->pReader);
|
||||
// if (!exists) {
|
||||
// pIter->iSttBlk = -1;
|
||||
// pIter->pSttBlk = NULL;
|
||||
// return TSDB_CODE_SUCCESS;
|
||||
// }
|
||||
// bool exists = existsFromSttBlkStatis(pBlockLoadInfo, suid, uid, pIter->pReader);
|
||||
// if (!exists) {
|
||||
// pIter->iSttBlk = -1;
|
||||
// pIter->pSttBlk = NULL;
|
||||
// return TSDB_CODE_SUCCESS;
|
||||
// }
|
||||
|
||||
// find the start block, actually we could load the position to avoid repeatly searching for the start position when
|
||||
// the skey is updated.
|
||||
|
|
|
@ -63,7 +63,7 @@ typedef struct STableBlockScanInfo {
|
|||
SIterInfo iiter; // imem buffer skip list iterator
|
||||
SArray* delSkyline; // delete info for this table
|
||||
int32_t fileDelIndex; // file block delete index
|
||||
int32_t lastBlockDelIndex; // delete index for last block
|
||||
int32_t sttBlockDelIndex; // delete index for last block
|
||||
bool iterInit; // whether to initialize the in-memory skip list iterator or not
|
||||
} STableBlockScanInfo;
|
||||
|
||||
|
@ -87,13 +87,13 @@ typedef struct SIOCostSummary {
|
|||
double headFileLoadTime;
|
||||
int64_t smaDataLoad;
|
||||
double smaLoadTime;
|
||||
int64_t lastBlockLoad;
|
||||
double lastBlockLoadTime;
|
||||
int64_t sttStatisBlockLoad;
|
||||
int64_t sttBlockLoad;
|
||||
double sttBlockLoadTime;
|
||||
int64_t composedBlocks;
|
||||
double buildComposedBlockTime;
|
||||
double createScanInfoList;
|
||||
// double getTbFromMemTime;
|
||||
// double getTbFromIMemTime;
|
||||
SSttBlockLoadCostInfo sttCost;
|
||||
double initDelSkylineIterTime;
|
||||
} SIOCostSummary;
|
||||
|
||||
|
@ -586,8 +586,8 @@ static int32_t filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader, bo
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SIOCostSummary* pSum = &pReader->cost;
|
||||
getLastBlockLoadInfo(pIter->pLastBlockReader->pInfo, &pSum->lastBlockLoad, &pReader->cost.lastBlockLoadTime);
|
||||
SIOCostSummary* pCost = &pReader->cost;
|
||||
getSttBlockLoadInfo(pIter->pLastBlockReader->pInfo, &pCost->sttCost);
|
||||
|
||||
pIter->pLastBlockReader->uid = 0;
|
||||
tMergeTreeClose(&pIter->pLastBlockReader->mergeTree);
|
||||
|
@ -1976,7 +1976,7 @@ static bool nextRowFromLastBlocks(SLastBlockReader* pLastBlockReader, STableBloc
|
|||
pLastBlockReader->currentKey = key;
|
||||
pScanInfo->lastKeyInStt = key;
|
||||
|
||||
if (!hasBeenDropped(pScanInfo->delSkyline, &pScanInfo->lastBlockDelIndex, key, ver, pLastBlockReader->order,
|
||||
if (!hasBeenDropped(pScanInfo->delSkyline, &pScanInfo->sttBlockDelIndex, key, ver, pLastBlockReader->order,
|
||||
pVerRange)) {
|
||||
return true;
|
||||
}
|
||||
|
@ -3018,7 +3018,7 @@ int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader*
|
|||
pBlockScanInfo->iter.index = index;
|
||||
pBlockScanInfo->iiter.index = index;
|
||||
pBlockScanInfo->fileDelIndex = index;
|
||||
pBlockScanInfo->lastBlockDelIndex = index;
|
||||
pBlockScanInfo->sttBlockDelIndex = index;
|
||||
|
||||
return code;
|
||||
|
||||
|
@ -4029,7 +4029,7 @@ int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockSc
|
|||
tsdbRowMergerAdd(pMerger, pRow1, NULL);
|
||||
} else {
|
||||
tsdbTrace("uid:%" PRIu64 " last del index:%d, del range:%d, lastKeyInStt:%" PRId64 ", %s", pScanInfo->uid,
|
||||
pScanInfo->lastBlockDelIndex, (int32_t)taosArrayGetSize(pScanInfo->delSkyline), pScanInfo->lastKeyInStt,
|
||||
pScanInfo->sttBlockDelIndex, (int32_t)taosArrayGetSize(pScanInfo->delSkyline), pScanInfo->lastKeyInStt,
|
||||
idStr);
|
||||
break;
|
||||
}
|
||||
|
@ -4697,7 +4697,7 @@ void tsdbReaderClose(STsdbReader* pReader) {
|
|||
SLastBlockReader* pLReader = pFilesetIter->pLastBlockReader;
|
||||
tMergeTreeClose(&pLReader->mergeTree);
|
||||
|
||||
getLastBlockLoadInfo(pLReader->pInfo, &pCost->lastBlockLoad, &pCost->lastBlockLoadTime);
|
||||
getSttBlockLoadInfo(pLReader->pInfo, &pCost->sttCost);
|
||||
|
||||
pLReader->pInfo = destroyLastBlockLoadInfo(pLReader->pInfo);
|
||||
taosMemoryFree(pLReader);
|
||||
|
@ -4711,7 +4711,7 @@ void tsdbReaderClose(STsdbReader* pReader) {
|
|||
", composed-blocks-time:%.2fms, STableBlockScanInfo size:%.2f Kb, createTime:%.2f ms,initDelSkylineIterTime:%.2f "
|
||||
"ms, %s",
|
||||
pReader, pCost->headFileLoad, pCost->headFileLoadTime, pCost->smaDataLoad, pCost->smaLoadTime, pCost->numOfBlocks,
|
||||
pCost->blockLoadTime, pCost->buildmemBlock, pCost->lastBlockLoad, pCost->lastBlockLoadTime, pCost->composedBlocks,
|
||||
pCost->blockLoadTime, pCost->buildmemBlock, pCost->sttBlockLoad, pCost->sttBlockLoadTime, pCost->composedBlocks,
|
||||
pCost->buildComposedBlockTime, numOfTables * sizeof(STableBlockScanInfo) / 1000.0, pCost->createScanInfoList,
|
||||
pCost->initDelSkylineIterTime, pReader->idStr);
|
||||
|
||||
|
|
|
@ -168,13 +168,11 @@ static int32_t filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader, bo
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SCostSummary* pSum = &pReader->cost;
|
||||
SCostSummary* pCost = &pReader->cost;
|
||||
|
||||
pIter->pLastBlockReader->uid = 0;
|
||||
tMergeTreeClose(&pIter->pLastBlockReader->mergeTree);
|
||||
|
||||
pReader->status.pLDataIterArray =
|
||||
destroySttBlockReader(pReader->status.pLDataIterArray, &pSum->lastBlockLoad, &pSum->lastBlockLoadTime);
|
||||
pReader->status.pLDataIterArray = destroySttBlockReader(pReader->status.pLDataIterArray, &pCost->sttCost);
|
||||
pReader->status.pLDataIterArray = taosArrayInit(4, POINTER_BYTES);
|
||||
|
||||
// check file the time range of coverage
|
||||
|
@ -1389,7 +1387,7 @@ static bool nextRowFromLastBlocks(SLastBlockReader* pLastBlockReader, STableBloc
|
|||
pLastBlockReader->currentKey = key;
|
||||
pScanInfo->lastKeyInStt = key;
|
||||
|
||||
if (!hasBeenDropped(pScanInfo->delSkyline, &pScanInfo->lastBlockDelIndex, key, ver, pLastBlockReader->order,
|
||||
if (!hasBeenDropped(pScanInfo->delSkyline, &pScanInfo->sttBlockDelIndex, key, ver, pLastBlockReader->order,
|
||||
pVerRange)) {
|
||||
return true;
|
||||
}
|
||||
|
@ -2408,7 +2406,7 @@ int32_t getInitialDelIndex(const SArray* pDelSkyline, int32_t order) {
|
|||
|
||||
int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, int32_t order, SCostSummary* pCost) {
|
||||
int32_t code = 0;
|
||||
int32_t newDelDataInFile = taosArrayGetSize(pBlockScanInfo->pfileDelData);
|
||||
int32_t newDelDataInFile = taosArrayGetSize(pBlockScanInfo->pFileDelData);
|
||||
if (newDelDataInFile == 0 &&
|
||||
((pBlockScanInfo->delSkyline != NULL) || (TARRAY_SIZE(pBlockScanInfo->pMemDelData) == 0))) {
|
||||
return code;
|
||||
|
@ -2422,7 +2420,7 @@ int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, int32_t orde
|
|||
pBlockScanInfo->delSkyline = taosArrayInit(4, sizeof(TSDBKEY));
|
||||
}
|
||||
|
||||
SArray* pSource = pBlockScanInfo->pfileDelData;
|
||||
SArray* pSource = pBlockScanInfo->pFileDelData;
|
||||
if (pSource == NULL) {
|
||||
pSource = pBlockScanInfo->pMemDelData;
|
||||
} else {
|
||||
|
@ -2431,13 +2429,13 @@ int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, int32_t orde
|
|||
|
||||
code = tsdbBuildDeleteSkyline(pSource, 0, taosArrayGetSize(pSource) - 1, pBlockScanInfo->delSkyline);
|
||||
|
||||
taosArrayClear(pBlockScanInfo->pfileDelData);
|
||||
taosArrayClear(pBlockScanInfo->pFileDelData);
|
||||
int32_t index = getInitialDelIndex(pBlockScanInfo->delSkyline, order);
|
||||
|
||||
pBlockScanInfo->iter.index = index;
|
||||
pBlockScanInfo->iiter.index = index;
|
||||
pBlockScanInfo->fileDelIndex = index;
|
||||
pBlockScanInfo->lastBlockDelIndex = index;
|
||||
pBlockScanInfo->sttBlockDelIndex = index;
|
||||
|
||||
double el = taosGetTimestampUs() - st;
|
||||
pCost->createSkylineIterTime = el / 1000.0;
|
||||
|
@ -3411,7 +3409,7 @@ int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockSc
|
|||
tsdbRowMergerAdd(pMerger, pRow1, NULL);
|
||||
} else {
|
||||
tsdbTrace("uid:%" PRIu64 " last del index:%d, del range:%d, lastKeyInStt:%" PRId64 ", %s", pScanInfo->uid,
|
||||
pScanInfo->lastBlockDelIndex, (int32_t)taosArrayGetSize(pScanInfo->delSkyline), pScanInfo->lastKeyInStt,
|
||||
pScanInfo->sttBlockDelIndex, (int32_t)taosArrayGetSize(pScanInfo->delSkyline), pScanInfo->lastKeyInStt,
|
||||
idStr);
|
||||
break;
|
||||
}
|
||||
|
@ -4067,18 +4065,20 @@ void tsdbReaderClose2(STsdbReader* pReader) {
|
|||
taosMemoryFree(pLReader);
|
||||
}
|
||||
|
||||
destroySttBlockReader(pReader->status.pLDataIterArray, &pCost->lastBlockLoad, &pCost->lastBlockLoadTime);
|
||||
destroySttBlockReader(pReader->status.pLDataIterArray, &pCost->sttCost);
|
||||
taosMemoryFreeClear(pReader->status.uidList.tableUidList);
|
||||
|
||||
tsdbDebug(
|
||||
"%p :io-cost summary: head-file:%" PRIu64 ", head-file time:%.2f ms, SMA:%" PRId64
|
||||
" SMA-time:%.2f ms, fileBlocks:%" PRId64
|
||||
", fileBlocks-load-time:%.2f ms, "
|
||||
"build in-memory-block-time:%.2f ms, lastBlocks:%" PRId64 ", lastBlocks-time:%.2f ms, composed-blocks:%" PRId64
|
||||
"build in-memory-block-time:%.2f ms, sttBlocks:%" PRId64 ", sttBlocks-time:%.2f ms, sttStatisBlock:%" PRId64
|
||||
", stt-statis-Block-time:%.2f ms, composed-blocks:%" PRId64
|
||||
", composed-blocks-time:%.2fms, STableBlockScanInfo size:%.2f Kb, createTime:%.2f ms,createSkylineIterTime:%.2f "
|
||||
"ms, initLastBlockReader:%.2fms, %s",
|
||||
pReader, pCost->headFileLoad, pCost->headFileLoadTime, pCost->smaDataLoad, pCost->smaLoadTime, pCost->numOfBlocks,
|
||||
pCost->blockLoadTime, pCost->buildmemBlock, pCost->lastBlockLoad, pCost->lastBlockLoadTime, pCost->composedBlocks,
|
||||
pCost->blockLoadTime, pCost->buildmemBlock, pCost->sttCost.loadBlocks, pCost->sttCost.blockElapsedTime,
|
||||
pCost->sttCost.loadStatisBlocks, pCost->sttCost.statisElapsedTime, pCost->composedBlocks,
|
||||
pCost->buildComposedBlockTime, numOfTables * sizeof(STableBlockScanInfo) / 1000.0, pCost->createScanInfoList,
|
||||
pCost->createSkylineIterTime, pCost->initLastBlockReader, pReader->idStr);
|
||||
|
||||
|
@ -4092,9 +4092,8 @@ void tsdbReaderClose2(STsdbReader* pReader) {
|
|||
}
|
||||
|
||||
int32_t tsdbReaderSuspend2(STsdbReader* pReader) {
|
||||
int32_t code = 0;
|
||||
|
||||
// save reader's base state & reset top state to be reconstructed from base state
|
||||
int32_t code = 0;
|
||||
SReaderStatus* pStatus = &pReader->status;
|
||||
STableBlockScanInfo* pBlockScanInfo = NULL;
|
||||
|
||||
|
@ -4110,9 +4109,9 @@ int32_t tsdbReaderSuspend2(STsdbReader* pReader) {
|
|||
}
|
||||
|
||||
tsdbDataFileReaderClose(&pReader->pFileReader);
|
||||
int64_t loadBlocks = 0;
|
||||
double elapse = 0;
|
||||
pReader->status.pLDataIterArray = destroySttBlockReader(pReader->status.pLDataIterArray, &loadBlocks, &elapse);
|
||||
|
||||
SCostSummary* pCost = &pReader->cost;
|
||||
pReader->status.pLDataIterArray = destroySttBlockReader(pReader->status.pLDataIterArray, &pCost->sttCost);
|
||||
pReader->status.pLDataIterArray = taosArrayInit(4, POINTER_BYTES);
|
||||
// resetDataBlockScanInfo excluding lastKey
|
||||
STableBlockScanInfo** p = NULL;
|
||||
|
@ -4134,7 +4133,7 @@ int32_t tsdbReaderSuspend2(STsdbReader* pReader) {
|
|||
}
|
||||
|
||||
pInfo->delSkyline = taosArrayDestroy(pInfo->delSkyline);
|
||||
pInfo->pfileDelData = taosArrayDestroy(pInfo->pfileDelData);
|
||||
pInfo->pFileDelData = taosArrayDestroy(pInfo->pFileDelData);
|
||||
}
|
||||
} else {
|
||||
// resetDataBlockScanInfo excluding lastKey
|
||||
|
@ -4209,7 +4208,6 @@ static int32_t tsdbSetQueryReseek(void* pQHandle) {
|
|||
}
|
||||
|
||||
tsdbReaderSuspend2(pReader);
|
||||
|
||||
tsdbReleaseReader(pReader);
|
||||
|
||||
return code;
|
||||
|
@ -4222,8 +4220,7 @@ static int32_t tsdbSetQueryReseek(void* pQHandle) {
|
|||
}
|
||||
|
||||
int32_t tsdbReaderResume2(STsdbReader* pReader) {
|
||||
int32_t code = 0;
|
||||
|
||||
int32_t code = 0;
|
||||
STableBlockScanInfo** pBlockScanInfo = pReader->status.pTableIter;
|
||||
|
||||
// restore reader's state
|
||||
|
@ -4290,7 +4287,6 @@ static bool tsdbReadRowsCountOnly(STsdbReader* pReader) {
|
|||
pBlock->info.rows = pReader->rowsNum;
|
||||
pBlock->info.id.uid = 0;
|
||||
pBlock->info.dataLoad = 0;
|
||||
|
||||
pReader->rowsNum = 0;
|
||||
|
||||
return pBlock->info.rows > 0;
|
||||
|
|
|
@ -221,7 +221,7 @@ void clearBlockScanInfo(STableBlockScanInfo* p) {
|
|||
p->delSkyline = taosArrayDestroy(p->delSkyline);
|
||||
p->pBlockList = taosArrayDestroy(p->pBlockList);
|
||||
p->pMemDelData = taosArrayDestroy(p->pMemDelData);
|
||||
p->pfileDelData = taosArrayDestroy(p->pfileDelData);
|
||||
p->pFileDelData = taosArrayDestroy(p->pFileDelData);
|
||||
}
|
||||
|
||||
void destroyAllBlockScanInfo(SSHashObj* pTableMap) {
|
||||
|
@ -238,7 +238,7 @@ void destroyAllBlockScanInfo(SSHashObj* pTableMap) {
|
|||
static void doCleanupInfoForNextFileset(STableBlockScanInfo* pScanInfo) {
|
||||
// reset the index in last block when handing a new file
|
||||
taosArrayClear(pScanInfo->pBlockList);
|
||||
taosArrayClear(pScanInfo->pfileDelData); // del data from each file set
|
||||
taosArrayClear(pScanInfo->pFileDelData); // del data from each file set
|
||||
}
|
||||
|
||||
void cleanupInfoFoxNextFileset(SSHashObj* pTableMap) {
|
||||
|
@ -502,14 +502,14 @@ static int32_t doCheckTombBlock(STombBlock* pBlock, STsdbReader* pReader, int32_
|
|||
|
||||
if (newTable) {
|
||||
(*pScanInfo) = getTableBlockScanInfo(pReader->status.pTableMap, uid, pReader->idStr);
|
||||
if ((*pScanInfo)->pfileDelData == NULL) {
|
||||
(*pScanInfo)->pfileDelData = taosArrayInit(4, sizeof(SDelData));
|
||||
if ((*pScanInfo)->pFileDelData == NULL) {
|
||||
(*pScanInfo)->pFileDelData = taosArrayInit(4, sizeof(SDelData));
|
||||
}
|
||||
}
|
||||
|
||||
if (record.version <= pReader->info.verRange.maxVer) {
|
||||
SDelData delData = {.version = record.version, .sKey = record.skey, .eKey = record.ekey};
|
||||
taosArrayPush((*pScanInfo)->pfileDelData, &delData);
|
||||
taosArrayPush((*pScanInfo)->pFileDelData, &delData);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -556,8 +556,8 @@ static int32_t doLoadTombDataFromTombBlk(const TTombBlkArray* pTombBlkArray, STs
|
|||
uint64_t uid = pReader->status.uidList.tableUidList[j];
|
||||
|
||||
STableBlockScanInfo* pScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, uid, pReader->idStr);
|
||||
if (pScanInfo->pfileDelData == NULL) {
|
||||
pScanInfo->pfileDelData = taosArrayInit(4, sizeof(SDelData));
|
||||
if (pScanInfo->pFileDelData == NULL) {
|
||||
pScanInfo->pFileDelData = taosArrayInit(4, sizeof(SDelData));
|
||||
}
|
||||
|
||||
ETombBlkCheckEnum ret = 0;
|
||||
|
|
|
@ -65,12 +65,12 @@ typedef struct STableBlockScanInfo {
|
|||
TSKEY lastKeyInStt; // last accessed key in stt
|
||||
SArray* pBlockList; // block data index list, SArray<SBrinRecord>
|
||||
SArray* pMemDelData; // SArray<SDelData>
|
||||
SArray* pfileDelData; // SArray<SDelData> from each file set
|
||||
SArray* pFileDelData; // SArray<SDelData> from each file set
|
||||
SIterInfo iter; // mem buffer skip list iterator
|
||||
SIterInfo iiter; // imem buffer skip list iterator
|
||||
SArray* delSkyline; // delete info for this table
|
||||
int32_t fileDelIndex; // file block delete index
|
||||
int32_t lastBlockDelIndex; // delete index for last block
|
||||
int32_t sttBlockDelIndex; // delete index for last block
|
||||
bool iterInit; // whether to initialize the in-memory skip list iterator or not
|
||||
} STableBlockScanInfo;
|
||||
|
||||
|
@ -88,8 +88,7 @@ typedef struct SCostSummary {
|
|||
double headFileLoadTime;
|
||||
int64_t smaDataLoad;
|
||||
double smaLoadTime;
|
||||
int64_t lastBlockLoad;
|
||||
double lastBlockLoadTime;
|
||||
SSttBlockLoadCostInfo sttCost;
|
||||
int64_t composedBlocks;
|
||||
double buildComposedBlockTime;
|
||||
double createScanInfoList;
|
||||
|
|
Loading…
Reference in New Issue