enh(tsdb): extract rows for the given tables in all stt files.
This commit is contained in:
parent
5467bba86b
commit
24d9337632
|
@ -258,8 +258,6 @@ typedef struct SQueryTableDataCond {
|
||||||
int32_t tEncodeDataBlock(void** buf, const SSDataBlock* pBlock);
|
int32_t tEncodeDataBlock(void** buf, const SSDataBlock* pBlock);
|
||||||
void* tDecodeDataBlock(const void* buf, SSDataBlock* pBlock);
|
void* tDecodeDataBlock(const void* buf, SSDataBlock* pBlock);
|
||||||
|
|
||||||
int32_t tEncodeDataBlocks(void** buf, const SArray* blocks);
|
|
||||||
void* tDecodeDataBlocks(const void* buf, SArray** blocks);
|
|
||||||
void colDataDestroy(SColumnInfoData* pColData);
|
void colDataDestroy(SColumnInfoData* pColData);
|
||||||
|
|
||||||
//======================================================================================================================
|
//======================================================================================================================
|
||||||
|
@ -294,7 +292,7 @@ typedef struct STableBlockDistInfo {
|
||||||
int32_t defMaxRows;
|
int32_t defMaxRows;
|
||||||
int32_t firstSeekTimeUs;
|
int32_t firstSeekTimeUs;
|
||||||
uint32_t numOfInmemRows;
|
uint32_t numOfInmemRows;
|
||||||
uint32_t numOfSmallBlocks;
|
uint32_t numOfSttRows;
|
||||||
uint32_t numOfVgroups;
|
uint32_t numOfVgroups;
|
||||||
int32_t blockRowsHisto[20];
|
int32_t blockRowsHisto[20];
|
||||||
} STableBlockDistInfo;
|
} STableBlockDistInfo;
|
||||||
|
|
|
@ -153,26 +153,6 @@ typedef struct STsdbReader STsdbReader;
|
||||||
#define CACHESCAN_RETRIEVE_LAST_ROW 0x4
|
#define CACHESCAN_RETRIEVE_LAST_ROW 0x4
|
||||||
#define CACHESCAN_RETRIEVE_LAST 0x8
|
#define CACHESCAN_RETRIEVE_LAST 0x8
|
||||||
|
|
||||||
int32_t tsdbReaderOpen(void *pVnode, SQueryTableDataCond *pCond, void *pTableList, int32_t numOfTables,
|
|
||||||
SSDataBlock *pResBlock, void **ppReader, const char *idstr, bool countOnly,
|
|
||||||
SHashObj **pIgnoreTables);
|
|
||||||
int32_t tsdbSetTableList(STsdbReader *pReader, const void *pTableList, int32_t num);
|
|
||||||
void tsdbReaderSetId(STsdbReader *pReader, const char *idstr);
|
|
||||||
void tsdbReaderClose(STsdbReader *pReader);
|
|
||||||
int32_t tsdbNextDataBlock(STsdbReader *pReader, bool *hasNext);
|
|
||||||
int32_t tsdbRetrieveDatablockSMA(STsdbReader *pReader, SSDataBlock *pDataBlock, bool *allHave, bool *hasNullSMA);
|
|
||||||
void tsdbReleaseDataBlock(STsdbReader *pReader);
|
|
||||||
SSDataBlock *tsdbRetrieveDataBlock(STsdbReader *pTsdbReadHandle, SArray *pColumnIdList);
|
|
||||||
int32_t tsdbReaderReset(STsdbReader *pReader, SQueryTableDataCond *pCond);
|
|
||||||
int32_t tsdbGetFileBlocksDistInfo(STsdbReader *pReader, STableBlockDistInfo *pTableBlockInfo);
|
|
||||||
int64_t tsdbGetNumOfRowsInMemTable(STsdbReader *pHandle);
|
|
||||||
void *tsdbGetIdx(SMeta *pMeta);
|
|
||||||
void *tsdbGetIvtIdx(SMeta *pMeta);
|
|
||||||
uint64_t tsdbGetReaderMaxVersion(STsdbReader *pReader);
|
|
||||||
void tsdbReaderSetCloseFlag(STsdbReader *pReader);
|
|
||||||
int64_t tsdbGetLastTimestamp(SVnode *pVnode, void *pTableList, int32_t numOfTables, const char *pIdStr);
|
|
||||||
|
|
||||||
//======================================================================================================================
|
|
||||||
int32_t tsdbReaderOpen2(void *pVnode, SQueryTableDataCond *pCond, void *pTableList, int32_t numOfTables,
|
int32_t tsdbReaderOpen2(void *pVnode, SQueryTableDataCond *pCond, void *pTableList, int32_t numOfTables,
|
||||||
SSDataBlock *pResBlock, void **ppReader, const char *idstr, bool countOnly,
|
SSDataBlock *pResBlock, void **ppReader, const char *idstr, bool countOnly,
|
||||||
SHashObj **pIgnoreTables);
|
SHashObj **pIgnoreTables);
|
||||||
|
@ -190,7 +170,6 @@ void *tsdbGetIdx2(SMeta *pMeta);
|
||||||
void *tsdbGetIvtIdx2(SMeta *pMeta);
|
void *tsdbGetIvtIdx2(SMeta *pMeta);
|
||||||
uint64_t tsdbGetReaderMaxVersion2(STsdbReader *pReader);
|
uint64_t tsdbGetReaderMaxVersion2(STsdbReader *pReader);
|
||||||
void tsdbReaderSetCloseFlag(STsdbReader *pReader);
|
void tsdbReaderSetCloseFlag(STsdbReader *pReader);
|
||||||
int64_t tsdbGetLastTimestamp2(SVnode *pVnode, void *pTableList, int32_t numOfTables, const char *pIdStr);
|
|
||||||
//======================================================================================================================
|
//======================================================================================================================
|
||||||
|
|
||||||
int32_t tsdbReuseCacherowsReader(void *pReader, void *pTableIdList, int32_t numOfTables);
|
int32_t tsdbReuseCacherowsReader(void *pReader, void *pTableIdList, int32_t numOfTables);
|
||||||
|
|
|
@ -885,10 +885,9 @@ void tMergeTreeUnpinSttBlock(SMergeTree *pMTree);
|
||||||
bool tMergeTreeIgnoreEarlierTs(SMergeTree *pMTree);
|
bool tMergeTreeIgnoreEarlierTs(SMergeTree *pMTree);
|
||||||
void tMergeTreeClose(SMergeTree *pMTree);
|
void tMergeTreeClose(SMergeTree *pMTree);
|
||||||
|
|
||||||
SSttBlockLoadInfo *tCreateOneLastBlockLoadInfo(STSchema *pSchema, int16_t *colList, int32_t numOfCols);
|
SSttBlockLoadInfo *tCreateSttBlockLoadInfo(STSchema *pSchema, int16_t *colList, int32_t numOfCols);
|
||||||
void resetLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo);
|
|
||||||
void getSttBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo, SSttBlockLoadCostInfo *pLoadCost);
|
void getSttBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo, SSttBlockLoadCostInfo *pLoadCost);
|
||||||
void *destroyLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo);
|
void *destroySttBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo);
|
||||||
void *destroySttBlockReader(SArray *pLDataIterArray, SSttBlockLoadCostInfo *pLoadCost);
|
void *destroySttBlockReader(SArray *pLDataIterArray, SSttBlockLoadCostInfo *pLoadCost);
|
||||||
|
|
||||||
// tsdbCache ==============================================================================================
|
// tsdbCache ==============================================================================================
|
||||||
|
|
|
@ -22,7 +22,7 @@
|
||||||
static void tLDataIterClose2(SLDataIter *pIter);
|
static void tLDataIterClose2(SLDataIter *pIter);
|
||||||
|
|
||||||
// SLDataIter =================================================
|
// SLDataIter =================================================
|
||||||
SSttBlockLoadInfo *tCreateOneLastBlockLoadInfo(STSchema *pSchema, int16_t *colList, int32_t numOfCols) {
|
SSttBlockLoadInfo *tCreateSttBlockLoadInfo(STSchema *pSchema, int16_t *colList, int32_t numOfCols) {
|
||||||
SSttBlockLoadInfo *pLoadInfo = taosMemoryCalloc(1, sizeof(SSttBlockLoadInfo));
|
SSttBlockLoadInfo *pLoadInfo = taosMemoryCalloc(1, sizeof(SSttBlockLoadInfo));
|
||||||
if (pLoadInfo == NULL) {
|
if (pLoadInfo == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
@ -61,7 +61,7 @@ void getSttBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo, SSttBlockLoadCostInfo* pL
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void *destroyLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo) {
|
void *destroySttBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo) {
|
||||||
if (pLoadInfo == NULL) {
|
if (pLoadInfo == NULL) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
@ -78,14 +78,19 @@ void *destroyLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo) {
|
||||||
pInfo->sttBlockIndex = -1;
|
pInfo->sttBlockIndex = -1;
|
||||||
pInfo->pin = false;
|
pInfo->pin = false;
|
||||||
|
|
||||||
|
if (pLoadInfo->statisBlock != NULL) {
|
||||||
|
tStatisBlockDestroy(pLoadInfo->statisBlock);
|
||||||
|
taosMemoryFreeClear(pLoadInfo->statisBlock);
|
||||||
|
}
|
||||||
|
|
||||||
taosArrayDestroy(pLoadInfo->aSttBlk);
|
taosArrayDestroy(pLoadInfo->aSttBlk);
|
||||||
taosMemoryFree(pLoadInfo);
|
taosMemoryFree(pLoadInfo);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void destroyLDataIter(SLDataIter *pIter) {
|
void destroyLDataIter(SLDataIter *pIter) {
|
||||||
tLDataIterClose2(pIter);
|
tLDataIterClose2(pIter);
|
||||||
destroyLastBlockLoadInfo(pIter->pBlockLoadInfo);
|
destroySttBlockLoadInfo(pIter->pBlockLoadInfo);
|
||||||
taosMemoryFree(pIter);
|
taosMemoryFree(pIter);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -732,25 +737,6 @@ static FORCE_INLINE int32_t tLDataIterDescCmprFn(const SRBTreeNode *p1, const SR
|
||||||
return -1 * tLDataIterCmprFn(p1, p2);
|
return -1 * tLDataIterCmprFn(p1, p2);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void adjustValidLDataIters(SArray *pLDIterList, int32_t numOfFileObj) {
|
|
||||||
int32_t size = taosArrayGetSize(pLDIterList);
|
|
||||||
|
|
||||||
if (size < numOfFileObj) {
|
|
||||||
int32_t inc = numOfFileObj - size;
|
|
||||||
for (int32_t k = 0; k < inc; ++k) {
|
|
||||||
SLDataIter *pIter = taosMemoryCalloc(1, sizeof(SLDataIter));
|
|
||||||
taosArrayPush(pLDIterList, &pIter);
|
|
||||||
}
|
|
||||||
} else if (size > numOfFileObj) { // remove unused LDataIter
|
|
||||||
int32_t inc = size - numOfFileObj;
|
|
||||||
|
|
||||||
for (int i = 0; i < inc; ++i) {
|
|
||||||
SLDataIter *pIter = taosArrayPop(pLDIterList);
|
|
||||||
destroyLDataIter(pIter);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf) {
|
int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
|
@ -773,19 +759,13 @@ int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// add the list/iter placeholder
|
// add the list/iter placeholder
|
||||||
while (taosArrayGetSize(pConf->pSttFileBlockIterArray) < numOfLevels) {
|
adjustLDataIters(pConf->pSttFileBlockIterArray, pConf->pCurrentFileset);
|
||||||
SArray *pList = taosArrayInit(4, POINTER_BYTES);
|
|
||||||
taosArrayPush(pConf->pSttFileBlockIterArray, &pList);
|
|
||||||
}
|
|
||||||
|
|
||||||
for (int32_t j = 0; j < numOfLevels; ++j) {
|
for (int32_t j = 0; j < numOfLevels; ++j) {
|
||||||
SSttLvl *pSttLevel = ((STFileSet *)pConf->pCurrentFileset)->lvlArr->data[j];
|
SSttLvl *pSttLevel = ((STFileSet *)pConf->pCurrentFileset)->lvlArr->data[j];
|
||||||
SArray *pList = taosArrayGetP(pConf->pSttFileBlockIterArray, j);
|
SArray *pList = taosArrayGetP(pConf->pSttFileBlockIterArray, j);
|
||||||
|
|
||||||
int32_t numOfFileObj = TARRAY2_SIZE(pSttLevel->fobjArr);
|
for (int32_t i = 0; i < TARRAY2_SIZE(pSttLevel->fobjArr); ++i) { // open all last file
|
||||||
adjustValidLDataIters(pList, numOfFileObj);
|
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfFileObj; ++i) { // open all last file
|
|
||||||
SLDataIter *pIter = taosArrayGetP(pList, i);
|
SLDataIter *pIter = taosArrayGetP(pList, i);
|
||||||
|
|
||||||
SSttFileReader *pSttFileReader = pIter->pReader;
|
SSttFileReader *pSttFileReader = pIter->pReader;
|
||||||
|
@ -805,7 +785,7 @@ int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pLoadInfo == NULL) {
|
if (pLoadInfo == NULL) {
|
||||||
pLoadInfo = tCreateOneLastBlockLoadInfo(pConf->pSchema, pConf->pCols, pConf->numOfCols);
|
pLoadInfo = tCreateSttBlockLoadInfo(pConf->pSchema, pConf->pCols, pConf->numOfCols);
|
||||||
}
|
}
|
||||||
|
|
||||||
memset(pIter, 0, sizeof(SLDataIter));
|
memset(pIter, 0, sizeof(SLDataIter));
|
||||||
|
|
|
@ -4730,7 +4730,18 @@ int32_t tsdbGetFileBlocksDistInfo2(STsdbReader* pReader, STableBlockDistInfo* pT
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SMergeTreeConf conf = {
|
||||||
|
.pReader = pReader,
|
||||||
|
.pSchema = pReader->info.pSchema,
|
||||||
|
.pCols = pReader->suppInfo.colId,
|
||||||
|
.numOfCols = pReader->suppInfo.numOfCols,
|
||||||
|
.suid = pReader->info.suid,
|
||||||
|
};
|
||||||
|
|
||||||
SReaderStatus* pStatus = &pReader->status;
|
SReaderStatus* pStatus = &pReader->status;
|
||||||
|
pTableBlockInfo->numOfSttRows +=
|
||||||
|
tsdbGetRowsInSttFiles(pStatus->pCurrentFileset, pStatus->pLDataIterArray, pReader->pTsdb, &conf, pReader->idStr);
|
||||||
|
|
||||||
STsdbCfg* pc = &pReader->pTsdb->pVnode->config.tsdbCfg;
|
STsdbCfg* pc = &pReader->pTsdb->pVnode->config.tsdbCfg;
|
||||||
pTableBlockInfo->defMinRows = pc->minRows;
|
pTableBlockInfo->defMinRows = pc->minRows;
|
||||||
|
@ -4767,10 +4778,6 @@ int32_t tsdbGetFileBlocksDistInfo2(STsdbReader* pReader, STableBlockDistInfo* pT
|
||||||
pTableBlockInfo->minRows = numOfRows;
|
pTableBlockInfo->minRows = numOfRows;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (numOfRows < defaultRows) {
|
|
||||||
pTableBlockInfo->numOfSmallBlocks += 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
pTableBlockInfo->totalSize += pBlockInfo->record.blockSize;
|
pTableBlockInfo->totalSize += pBlockInfo->record.blockSize;
|
||||||
|
|
||||||
int32_t bucketIndex = getBucketIndex(pTableBlockInfo->defMinRows, bucketRange, numOfRows, numOfBuckets);
|
int32_t bucketIndex = getBucketIndex(pTableBlockInfo->defMinRows, bucketRange, numOfRows, numOfBuckets);
|
||||||
|
@ -4786,10 +4793,9 @@ int32_t tsdbGetFileBlocksDistInfo2(STsdbReader* pReader, STableBlockDistInfo* pT
|
||||||
pTableBlockInfo->numOfBlocks += pBlockIter->numOfBlocks;
|
pTableBlockInfo->numOfBlocks += pBlockIter->numOfBlocks;
|
||||||
hasNext = (pBlockIter->numOfBlocks > 0);
|
hasNext = (pBlockIter->numOfBlocks > 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
// tsdbDebug("%p %d blocks found in file for %d table(s), fid:%d, %s", pReader, numOfBlocks, numOfTables,
|
|
||||||
// pReader->pFileGroup->fid, pReader->idStr);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// record the data in stt files
|
||||||
tsdbReleaseReader(pReader);
|
tsdbReleaseReader(pReader);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
|
@ -488,6 +488,8 @@ typedef enum {
|
||||||
BLK_CHECK_QUIT = 0x2,
|
BLK_CHECK_QUIT = 0x2,
|
||||||
} ETombBlkCheckEnum;
|
} ETombBlkCheckEnum;
|
||||||
|
|
||||||
|
static void loadNextStatisticsBlock(SSttFileReader* pSttFileReader, const SSttBlockLoadInfo* pBlockLoadInfo,
|
||||||
|
const TStatisBlkArray* pStatisBlkArray, int32_t numOfRows, int32_t* i, int32_t* j);
|
||||||
static int32_t doCheckTombBlock(STombBlock* pBlock, STsdbReader* pReader, int32_t numOfTables, int32_t* j,
|
static int32_t doCheckTombBlock(STombBlock* pBlock, STsdbReader* pReader, int32_t numOfTables, int32_t* j,
|
||||||
ETombBlkCheckEnum* pRet) {
|
ETombBlkCheckEnum* pRet) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
@ -659,3 +661,177 @@ void loadMemTombData(SArray** ppMemDelData, STbData* pMemTbData, STbData* piMemT
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t getNumOfRowsInSttBlock(SSttFileReader *pSttFileReader, SSttBlockLoadInfo *pBlockLoadInfo, uint64_t suid,
|
||||||
|
const uint64_t* pUidList, int32_t numOfTables) {
|
||||||
|
int32_t num = 0;
|
||||||
|
|
||||||
|
const TStatisBlkArray *pStatisBlkArray = pBlockLoadInfo->pSttStatisBlkArray;
|
||||||
|
if (TARRAY2_SIZE(pStatisBlkArray) <= 0) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t i = 0;
|
||||||
|
while((i < TARRAY2_SIZE(pStatisBlkArray)) && (pStatisBlkArray->data[i].minTbid.suid < suid)) {
|
||||||
|
++i;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (i >= TARRAY2_SIZE(pStatisBlkArray)) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
SStatisBlk *p = &pStatisBlkArray->data[i];
|
||||||
|
if (pBlockLoadInfo->statisBlock == NULL) {
|
||||||
|
pBlockLoadInfo->statisBlock = taosMemoryCalloc(1, sizeof(STbStatisBlock));
|
||||||
|
tStatisBlockInit(pBlockLoadInfo->statisBlock);
|
||||||
|
}
|
||||||
|
|
||||||
|
int64_t st = taosGetTimestampMs();
|
||||||
|
tsdbSttFileReadStatisBlock(pSttFileReader, p, pBlockLoadInfo->statisBlock);
|
||||||
|
pBlockLoadInfo->statisBlockIndex = i;
|
||||||
|
|
||||||
|
double el = (taosGetTimestampMs() - st) / 1000.0;
|
||||||
|
pBlockLoadInfo->cost.loadStatisBlocks += 1;
|
||||||
|
pBlockLoadInfo->cost.statisElapsedTime += el;
|
||||||
|
|
||||||
|
STbStatisBlock *pBlock = pBlockLoadInfo->statisBlock;
|
||||||
|
|
||||||
|
int32_t index = 0;
|
||||||
|
while (index < TARRAY2_SIZE(pBlock->suid) && pBlock->suid->data[index] < suid) {
|
||||||
|
++index;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (index >= TARRAY2_SIZE(pBlock->suid)) {
|
||||||
|
return num;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t j = index;
|
||||||
|
int32_t uidIndex = 0;
|
||||||
|
while (i < TARRAY2_SIZE(pStatisBlkArray) && uidIndex <= numOfTables) {
|
||||||
|
p = &pStatisBlkArray->data[i];
|
||||||
|
if (p->minTbid.suid > suid) {
|
||||||
|
return num;
|
||||||
|
}
|
||||||
|
|
||||||
|
uint64_t uid = pUidList[uidIndex];
|
||||||
|
|
||||||
|
if (pBlock->uid->data[j] == uid) {
|
||||||
|
num += pBlock->count->data[j];
|
||||||
|
uidIndex += 1;
|
||||||
|
j += 1;
|
||||||
|
loadNextStatisticsBlock(pSttFileReader, pBlockLoadInfo, pStatisBlkArray, pBlock->suid->size, &i, &j);
|
||||||
|
} else if (pBlock->uid->data[j] < uid) {
|
||||||
|
j += 1;
|
||||||
|
loadNextStatisticsBlock(pSttFileReader, pBlockLoadInfo, pStatisBlkArray, pBlock->suid->size, &i, &j);
|
||||||
|
} else {
|
||||||
|
uidIndex += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return num;
|
||||||
|
}
|
||||||
|
|
||||||
|
// load next stt statistics block
|
||||||
|
static void loadNextStatisticsBlock(SSttFileReader* pSttFileReader, const SSttBlockLoadInfo* pBlockLoadInfo,
|
||||||
|
const TStatisBlkArray* pStatisBlkArray, int32_t numOfRows, int32_t* i, int32_t* j) {
|
||||||
|
if ((*j) >= numOfRows) {
|
||||||
|
(*i) += 1;
|
||||||
|
(*j) = 0;
|
||||||
|
if ((*i) < TARRAY2_SIZE(pStatisBlkArray)) {
|
||||||
|
tsdbSttFileReadStatisBlock(pSttFileReader, &pStatisBlkArray->data[(*i)], pBlockLoadInfo->statisBlock);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void doAdjustValidDataIters(SArray* pLDIterList, int32_t numOfFileObj) {
|
||||||
|
int32_t size = taosArrayGetSize(pLDIterList);
|
||||||
|
|
||||||
|
if (size < numOfFileObj) {
|
||||||
|
int32_t inc = numOfFileObj - size;
|
||||||
|
for (int32_t k = 0; k < inc; ++k) {
|
||||||
|
SLDataIter *pIter = taosMemoryCalloc(1, sizeof(SLDataIter));
|
||||||
|
taosArrayPush(pLDIterList, &pIter);
|
||||||
|
}
|
||||||
|
} else if (size > numOfFileObj) { // remove unused LDataIter
|
||||||
|
int32_t inc = size - numOfFileObj;
|
||||||
|
|
||||||
|
for (int i = 0; i < inc; ++i) {
|
||||||
|
SLDataIter *pIter = taosArrayPop(pLDIterList);
|
||||||
|
destroyLDataIter(pIter);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t adjustLDataIters(SArray* pSttFileBlockIterArray, STFileSet* pFileSet) {
|
||||||
|
int32_t numOfLevels = pFileSet->lvlArr->size;
|
||||||
|
|
||||||
|
// add the list/iter placeholder
|
||||||
|
while (taosArrayGetSize(pSttFileBlockIterArray) < numOfLevels) {
|
||||||
|
SArray* pList = taosArrayInit(4, POINTER_BYTES);
|
||||||
|
taosArrayPush(pSttFileBlockIterArray, &pList);
|
||||||
|
}
|
||||||
|
|
||||||
|
for(int32_t j = 0; j < numOfLevels; ++j) {
|
||||||
|
SSttLvl* pSttLevel = pFileSet->lvlArr->data[j];
|
||||||
|
SArray* pList = taosArrayGetP(pSttFileBlockIterArray, j);
|
||||||
|
doAdjustValidDataIters(pList, TARRAY2_SIZE(pSttLevel->fobjArr));
|
||||||
|
}
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tsdbGetRowsInSttFiles(STFileSet* pFileSet, SArray* pSttFileBlockIterArray, STsdb* pTsdb, SMergeTreeConf* pConf,
|
||||||
|
const char* pstr) {
|
||||||
|
int32_t numOfRows = 0;
|
||||||
|
|
||||||
|
// no data exists, go to end
|
||||||
|
int32_t numOfLevels = pFileSet->lvlArr->size;
|
||||||
|
if (numOfLevels == 0) {
|
||||||
|
return numOfRows;
|
||||||
|
}
|
||||||
|
|
||||||
|
// add the list/iter placeholder
|
||||||
|
adjustLDataIters(pSttFileBlockIterArray, pFileSet);
|
||||||
|
|
||||||
|
for (int32_t j = 0; j < numOfLevels; ++j) {
|
||||||
|
SSttLvl* pSttLevel = pFileSet->lvlArr->data[j];
|
||||||
|
SArray* pList = taosArrayGetP(pSttFileBlockIterArray, j);
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < taosArrayGetSize(pList); ++i) { // open all last file
|
||||||
|
SLDataIter* pIter = taosArrayGetP(pList, i);
|
||||||
|
|
||||||
|
// open stt file reader if not opened yet
|
||||||
|
// if failed to open this stt file, ignore the error and try next one
|
||||||
|
if (pIter->pReader == NULL) {
|
||||||
|
SSttFileReaderConfig conf = {.tsdb = pTsdb, .szPage = pTsdb->pVnode->config.tsdbPageSize};
|
||||||
|
conf.file[0] = *pSttLevel->fobjArr->data[i]->f;
|
||||||
|
|
||||||
|
const char* pName = pSttLevel->fobjArr->data[i]->fname;
|
||||||
|
int32_t code = tsdbSttFileReaderOpen(pName, &conf, &pIter->pReader);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
tsdbError("open stt file reader error. file:%s, code %s, %s", pName, tstrerror(code), pstr);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pIter->pBlockLoadInfo == NULL) {
|
||||||
|
pIter->pBlockLoadInfo = tCreateSttBlockLoadInfo(pConf->pSchema, pConf->pCols, pConf->numOfCols);
|
||||||
|
}
|
||||||
|
|
||||||
|
// load stt blocks statis for all stt-blocks, to decide if the data of queried table exists in current stt file
|
||||||
|
int32_t code = tsdbSttFileReadStatisBlk(pIter->pReader, (const TStatisBlkArray **)&pIter->pBlockLoadInfo->pSttStatisBlkArray);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
tsdbError("failed to load stt block statistics, code:%s, %s", tstrerror(code), pstr);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// extract rows from each stt file one-by-one
|
||||||
|
STsdbReader* pReader = pConf->pReader;
|
||||||
|
int32_t numOfTables = tSimpleHashGetSize(pReader->status.pTableMap);
|
||||||
|
uint64_t* pUidList = pReader->status.uidList.tableUidList;
|
||||||
|
numOfRows += getNumOfRowsInSttBlock(pIter->pReader, pIter->pBlockLoadInfo, pConf->suid, pUidList, numOfTables);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return numOfRows;
|
||||||
|
}
|
|
@ -262,7 +262,12 @@ bool blockIteratorNext(SDataBlockIter* pBlockIter, const char* idStr);
|
||||||
void loadMemTombData(SArray** ppMemDelData, STbData* pMemTbData, STbData* piMemTbData, int64_t ver);
|
void loadMemTombData(SArray** ppMemDelData, STbData* pMemTbData, STbData* piMemTbData, int64_t ver);
|
||||||
int32_t loadDataFileTombDataForAll(STsdbReader* pReader);
|
int32_t loadDataFileTombDataForAll(STsdbReader* pReader);
|
||||||
int32_t loadSttTombDataForAll(STsdbReader* pReader, SSttFileReader* pSttFileReader, SSttBlockLoadInfo* pLoadInfo);
|
int32_t loadSttTombDataForAll(STsdbReader* pReader, SSttFileReader* pSttFileReader, SSttBlockLoadInfo* pLoadInfo);
|
||||||
|
int32_t getNumOfRowsInSttBlock(SSttFileReader *pSttFileReader, SSttBlockLoadInfo *pBlockLoadInfo, uint64_t suid,
|
||||||
|
const uint64_t* pUidList, int32_t numOfTables);
|
||||||
|
void destroyLDataIter(SLDataIter* pIter);
|
||||||
|
int32_t adjustLDataIters(SArray* pSttFileBlockIterArray, STFileSet* pFileSet);
|
||||||
|
int32_t tsdbGetRowsInSttFiles(STFileSet* pFileSet, SArray* pSttFileBlockIterArray, STsdb* pTsdb, SMergeTreeConf* pConf,
|
||||||
|
const char* pstr);
|
||||||
typedef struct {
|
typedef struct {
|
||||||
SArray* pTombData;
|
SArray* pTombData;
|
||||||
} STableLoadInfo;
|
} STableLoadInfo;
|
||||||
|
|
|
@ -5488,7 +5488,7 @@ int32_t blockDistFunction(SqlFunctionCtx* pCtx) {
|
||||||
pDistInfo->defMinRows = p1.defMinRows;
|
pDistInfo->defMinRows = p1.defMinRows;
|
||||||
pDistInfo->defMaxRows = p1.defMaxRows;
|
pDistInfo->defMaxRows = p1.defMaxRows;
|
||||||
pDistInfo->rowSize = p1.rowSize;
|
pDistInfo->rowSize = p1.rowSize;
|
||||||
pDistInfo->numOfSmallBlocks = p1.numOfSmallBlocks;
|
pDistInfo->numOfSttRows = p1.numOfSttRows;
|
||||||
|
|
||||||
if (pDistInfo->minRows > p1.minRows) {
|
if (pDistInfo->minRows > p1.minRows) {
|
||||||
pDistInfo->minRows = p1.minRows;
|
pDistInfo->minRows = p1.minRows;
|
||||||
|
@ -5523,7 +5523,7 @@ int32_t tSerializeBlockDistInfo(void* buf, int32_t bufLen, const STableBlockDist
|
||||||
if (tEncodeI32(&encoder, pInfo->defMaxRows) < 0) return -1;
|
if (tEncodeI32(&encoder, pInfo->defMaxRows) < 0) return -1;
|
||||||
if (tEncodeI32(&encoder, pInfo->defMinRows) < 0) return -1;
|
if (tEncodeI32(&encoder, pInfo->defMinRows) < 0) return -1;
|
||||||
if (tEncodeU32(&encoder, pInfo->numOfInmemRows) < 0) return -1;
|
if (tEncodeU32(&encoder, pInfo->numOfInmemRows) < 0) return -1;
|
||||||
if (tEncodeU32(&encoder, pInfo->numOfSmallBlocks) < 0) return -1;
|
if (tEncodeU32(&encoder, pInfo->numOfSttRows) < 0) return -1;
|
||||||
if (tEncodeU32(&encoder, pInfo->numOfVgroups) < 0) return -1;
|
if (tEncodeU32(&encoder, pInfo->numOfVgroups) < 0) return -1;
|
||||||
|
|
||||||
for (int32_t i = 0; i < tListLen(pInfo->blockRowsHisto); ++i) {
|
for (int32_t i = 0; i < tListLen(pInfo->blockRowsHisto); ++i) {
|
||||||
|
@ -5555,7 +5555,7 @@ int32_t tDeserializeBlockDistInfo(void* buf, int32_t bufLen, STableBlockDistInfo
|
||||||
if (tDecodeI32(&decoder, &pInfo->defMaxRows) < 0) return -1;
|
if (tDecodeI32(&decoder, &pInfo->defMaxRows) < 0) return -1;
|
||||||
if (tDecodeI32(&decoder, &pInfo->defMinRows) < 0) return -1;
|
if (tDecodeI32(&decoder, &pInfo->defMinRows) < 0) return -1;
|
||||||
if (tDecodeU32(&decoder, &pInfo->numOfInmemRows) < 0) return -1;
|
if (tDecodeU32(&decoder, &pInfo->numOfInmemRows) < 0) return -1;
|
||||||
if (tDecodeU32(&decoder, &pInfo->numOfSmallBlocks) < 0) return -1;
|
if (tDecodeU32(&decoder, &pInfo->numOfSttRows) < 0) return -1;
|
||||||
if (tDecodeU32(&decoder, &pInfo->numOfVgroups) < 0) return -1;
|
if (tDecodeU32(&decoder, &pInfo->numOfVgroups) < 0) return -1;
|
||||||
|
|
||||||
for (int32_t i = 0; i < tListLen(pInfo->blockRowsHisto); ++i) {
|
for (int32_t i = 0; i < tListLen(pInfo->blockRowsHisto); ++i) {
|
||||||
|
@ -5589,7 +5589,7 @@ int32_t blockDistFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t len = sprintf(st + VARSTR_HEADER_SIZE,
|
int32_t len = sprintf(st + VARSTR_HEADER_SIZE,
|
||||||
"Total_Blocks=[%d] Total_Size=[%.2f KB] Average_size=[%.2f KB] Compression_Ratio=[%.2f %c]",
|
"Total_Blocks=[%d] Total_Size=[%.2f KiB] Average_size=[%.2f KiB] Compression_Ratio=[%.2f %c]",
|
||||||
pData->numOfBlocks, pData->totalSize / 1024.0, averageSize / 1024.0, compRatio, '%');
|
pData->numOfBlocks, pData->totalSize / 1024.0, averageSize / 1024.0, compRatio, '%');
|
||||||
|
|
||||||
varDataSetLen(st, len);
|
varDataSetLen(st, len);
|
||||||
|
@ -5600,14 +5600,16 @@ int32_t blockDistFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||||
avgRows = pData->totalRows / pData->numOfBlocks;
|
avgRows = pData->totalRows / pData->numOfBlocks;
|
||||||
}
|
}
|
||||||
|
|
||||||
len = sprintf(st + VARSTR_HEADER_SIZE,
|
len = sprintf(st + VARSTR_HEADER_SIZE, "Block_Rows=[%" PRId64 "] MinRows=[%d] MaxRows=[%d] AvgRows=[%" PRId64 "]",
|
||||||
"Total_Rows=[%" PRId64 "] Inmem_Rows=[%d] MinRows=[%d] MaxRows=[%d] Average_Rows=[%" PRId64 "]",
|
pData->totalRows, pData->minRows, pData->maxRows, avgRows);
|
||||||
pData->totalRows, pData->numOfInmemRows, pData->minRows, pData->maxRows, avgRows);
|
|
||||||
|
|
||||||
varDataSetLen(st, len);
|
varDataSetLen(st, len);
|
||||||
colDataSetVal(pColInfo, row++, st, false);
|
colDataSetVal(pColInfo, row++, st, false);
|
||||||
|
|
||||||
len = sprintf(st + VARSTR_HEADER_SIZE, "Total_Tables=[%d] Total_Files=[%d] Total_Vgroups=[%d]", pData->numOfTables,
|
len = sprintf(st + VARSTR_HEADER_SIZE, "Inmem_Rows=[%d] Stt_Rows=[%d] ", pData->numOfInmemRows, pData->numOfSttRows);
|
||||||
|
varDataSetLen(st, len);
|
||||||
|
colDataSetVal(pColInfo, row++, st, false);
|
||||||
|
|
||||||
|
len = sprintf(st + VARSTR_HEADER_SIZE, "Total_Tables=[%d] Total_Filesets=[%d] Total_Vgroups=[%d]", pData->numOfTables,
|
||||||
pData->numOfFiles, pData->numOfVgroups);
|
pData->numOfFiles, pData->numOfVgroups);
|
||||||
|
|
||||||
varDataSetLen(st, len);
|
varDataSetLen(st, len);
|
||||||
|
|
Loading…
Reference in New Issue