fix(tsdb): opt read performance by check clean stt block if only row number required.

This commit is contained in:
Haojun Liao 2023-12-07 23:12:54 +08:00
parent cb8dfae0cc
commit 7c17d6f313
13 changed files with 536 additions and 495 deletions

View File

@ -253,6 +253,7 @@ typedef struct SQueryTableDataCond {
STimeWindow twindows; STimeWindow twindows;
int64_t startVersion; int64_t startVersion;
int64_t endVersion; int64_t endVersion;
bool trimData; // response the actual data, not only the rows in the attribute of info.row of ssdatablock
} SQueryTableDataCond; } SQueryTableDataCond;
int32_t tEncodeDataBlock(void** buf, const SSDataBlock* pBlock); int32_t tEncodeDataBlock(void** buf, const SSDataBlock* pBlock);

View File

@ -154,8 +154,7 @@ typedef struct {
/*-------------------------------------------------new api format---------------------------------------------------*/ /*-------------------------------------------------new api format---------------------------------------------------*/
typedef struct TsdReader { typedef struct TsdReader {
int32_t (*tsdReaderOpen)(void* pVnode, SQueryTableDataCond* pCond, void* pTableList, int32_t numOfTables, int32_t (*tsdReaderOpen)(void* pVnode, SQueryTableDataCond* pCond, void* pTableList, int32_t numOfTables,
SSDataBlock* pResBlock, void** ppReader, const char* idstr, bool countOnly, SSDataBlock* pResBlock, void** ppReader, const char* idstr, SHashObj** pIgnoreTables);
SHashObj** pIgnoreTables);
void (*tsdReaderClose)(); void (*tsdReaderClose)();
void (*tsdSetReaderTaskId)(void *pReader, const char *pId); void (*tsdSetReaderTaskId)(void *pReader, const char *pId);
int32_t (*tsdSetQueryTableList)(); int32_t (*tsdSetQueryTableList)();

View File

@ -154,8 +154,7 @@ typedef struct STsdbReader STsdbReader;
#define CACHESCAN_RETRIEVE_LAST 0x8 #define CACHESCAN_RETRIEVE_LAST 0x8
int32_t tsdbReaderOpen2(void *pVnode, SQueryTableDataCond *pCond, void *pTableList, int32_t numOfTables, int32_t tsdbReaderOpen2(void *pVnode, SQueryTableDataCond *pCond, void *pTableList, int32_t numOfTables,
SSDataBlock *pResBlock, void **ppReader, const char *idstr, bool countOnly, SSDataBlock *pResBlock, void **ppReader, const char *idstr, SHashObj **pIgnoreTables);
SHashObj **pIgnoreTables);
int32_t tsdbSetTableList2(STsdbReader *pReader, const void *pTableList, int32_t num); int32_t tsdbSetTableList2(STsdbReader *pReader, const void *pTableList, int32_t num);
void tsdbReaderSetId2(STsdbReader *pReader, const char *idstr); void tsdbReaderSetId2(STsdbReader *pReader, const char *idstr);
void tsdbReaderClose2(STsdbReader *pReader); void tsdbReaderClose2(STsdbReader *pReader);

View File

@ -675,8 +675,6 @@ struct SDelFWriter {
}; };
#include "tarray2.h" #include "tarray2.h"
// #include "tsdbFS2.h"
// struct STFileSet;
typedef struct STFileSet STFileSet; typedef struct STFileSet STFileSet;
typedef TARRAY2(STFileSet *) TFileSetArray; typedef TARRAY2(STFileSet *) TFileSetArray;
@ -772,20 +770,25 @@ typedef struct SBlockDataInfo {
int32_t sttBlockIndex; int32_t sttBlockIndex;
} SBlockDataInfo; } SBlockDataInfo;
typedef struct SSttBlockLoadInfo { // todo: move away
SBlockDataInfo blockData[2]; // buffered block data typedef struct {
int32_t statisBlockIndex; // buffered statistics block index SArray *pUid;
void *statisBlock; // buffered statistics block data SArray *pFirstKey;
void *pSttStatisBlkArray; SArray *pLastKey;
SArray *aSttBlk; SArray *pCount;
int32_t currentLoadBlockIndex; } SSttTableRowsInfo;
STSchema *pSchema;
int16_t *colIds;
int32_t numOfCols;
bool checkRemainingRow; // todo: no assign value?
bool isLast;
bool sttBlockLoaded;
typedef struct SSttBlockLoadInfo {
SBlockDataInfo blockData[2]; // buffered block data
SArray *aSttBlk;
int32_t currentLoadBlockIndex;
STSchema *pSchema;
int16_t *colIds;
int32_t numOfCols;
bool checkRemainingRow; // todo: no assign value?
bool isLast;
bool sttBlockLoaded;
SSttTableRowsInfo info;
SSttBlockLoadCostInfo cost; SSttBlockLoadCostInfo cost;
} SSttBlockLoadInfo; } SSttBlockLoadInfo;
@ -874,27 +877,31 @@ typedef struct {
_load_tomb_fn loadTombFn; _load_tomb_fn loadTombFn;
void *pReader; void *pReader;
void *idstr; void *idstr;
bool rspRows; // response the rows in stt-file, if possible
} SMergeTreeConf; } SMergeTreeConf;
int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf); typedef struct SSttDataInfoForTable {
SArray* pTimeWindowList;
int64_t numOfRows;
} SSttDataInfoForTable;
void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter); int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf, SSttDataInfoForTable* pTableInfo);
bool tMergeTreeNext(SMergeTree *pMTree); void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter);
void tMergeTreePinSttBlock(SMergeTree *pMTree); bool tMergeTreeNext(SMergeTree *pMTree);
void tMergeTreeUnpinSttBlock(SMergeTree *pMTree); void tMergeTreePinSttBlock(SMergeTree *pMTree);
bool tMergeTreeIgnoreEarlierTs(SMergeTree *pMTree); void tMergeTreeUnpinSttBlock(SMergeTree *pMTree);
void tMergeTreeClose(SMergeTree *pMTree); bool tMergeTreeIgnoreEarlierTs(SMergeTree *pMTree);
void tMergeTreeClose(SMergeTree *pMTree);
SSttBlockLoadInfo *tCreateSttBlockLoadInfo(STSchema *pSchema, int16_t *colList, int32_t numOfCols); SSttBlockLoadInfo *tCreateSttBlockLoadInfo(STSchema *pSchema, int16_t *colList, int32_t numOfCols);
void getSttBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo, SSttBlockLoadCostInfo *pLoadCost);
void *destroySttBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo); void *destroySttBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo);
void *destroySttBlockReader(SArray *pLDataIterArray, SSttBlockLoadCostInfo *pLoadCost); void *destroySttBlockReader(SArray *pLDataIterArray, SSttBlockLoadCostInfo *pLoadCost);
// tsdbCache ============================================================================================== // tsdbCache ==============================================================================================
typedef enum { typedef enum {
READ_MODE_COUNT_ONLY = 0x1, READER_EXEC_DATA = 0x1,
READ_MODE_ALL, READER_EXEC_ROWS = 0x2,
} EReadMode; } EExecMode;
typedef struct { typedef struct {
TSKEY ts; TSKEY ts;

View File

@ -1870,7 +1870,7 @@ static int32_t lastIterOpen(SFSLastIter *iter, STFileSet *pFileSet, STsdb *pTsdb
.idstr = pr->idstr, .idstr = pr->idstr,
}; };
code = tMergeTreeOpen2(&iter->mergeTree, &conf); code = tMergeTreeOpen2(&iter->mergeTree, &conf, NULL);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return -1; return -1;
} }

View File

@ -15,6 +15,7 @@
#include "tsdb.h" #include "tsdb.h"
#include "tsdbFSet2.h" #include "tsdbFSet2.h"
#include "tsdbUtil2.h"
#include "tsdbMerge.h" #include "tsdbMerge.h"
#include "tsdbReadUtil.h" #include "tsdbReadUtil.h"
#include "tsdbSttFileRW.h" #include "tsdbSttFileRW.h"
@ -52,15 +53,6 @@ SSttBlockLoadInfo *tCreateSttBlockLoadInfo(STSchema *pSchema, int16_t *colList,
return pLoadInfo; return pLoadInfo;
} }
void getSttBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo, SSttBlockLoadCostInfo* pLoadCost) {
for (int32_t i = 0; i < 1; ++i) {
pLoadCost->blockElapsedTime += pLoadInfo[i].cost.blockElapsedTime;
pLoadCost->loadBlocks += pLoadInfo[i].cost.loadBlocks;
pLoadCost->loadStatisBlocks += pLoadInfo[i].cost.loadStatisBlocks;
pLoadCost->statisElapsedTime += pLoadInfo[i].cost.statisElapsedTime;
}
}
void *destroySttBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo) { void *destroySttBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo) {
if (pLoadInfo == NULL) { if (pLoadInfo == NULL) {
return NULL; return NULL;
@ -78,9 +70,11 @@ void *destroySttBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo) {
pInfo->sttBlockIndex = -1; pInfo->sttBlockIndex = -1;
pInfo->pin = false; pInfo->pin = false;
if (pLoadInfo->statisBlock != NULL) { if (pLoadInfo->info.pCount != NULL) {
tStatisBlockDestroy(pLoadInfo->statisBlock); taosArrayDestroy(pLoadInfo->info.pUid);
taosMemoryFreeClear(pLoadInfo->statisBlock); taosArrayDestroy(pLoadInfo->info.pFirstKey);
taosArrayDestroy(pLoadInfo->info.pLastKey);
taosArrayDestroy(pLoadInfo->info.pCount);
} }
taosArrayDestroy(pLoadInfo->aSttBlk); taosArrayDestroy(pLoadInfo->aSttBlk);
@ -323,95 +317,74 @@ static int32_t extractSttBlockInfo(SLDataIter *pIter, const TSttBlkArray *pArray
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t suidComparFn(const void *target, const void *p2) { static int32_t loadSttStatisticsBlockData(SSttFileReader *pSttFileReader, SSttBlockLoadInfo *pBlockLoadInfo,
const uint64_t *targetUid = target; TStatisBlkArray *pStatisBlkArray, uint64_t suid, const char *id) {
const uint64_t *uid2 = p2; int32_t numOfBlocks = TARRAY2_SIZE(pStatisBlkArray);
if (*uid2 == (*targetUid)) { if (numOfBlocks <= 0) {
return 0; return 0;
} else {
return (*targetUid) < (*uid2) ? -1 : 1;
}
}
static bool existsFromSttBlkStatis(SSttBlockLoadInfo *pBlockLoadInfo, uint64_t suid, uint64_t uid,
SSttFileReader *pReader) {
const TStatisBlkArray *pStatisBlkArray = pBlockLoadInfo->pSttStatisBlkArray;
if (TARRAY2_SIZE(pStatisBlkArray) <= 0) {
return true;
} }
int32_t i = 0; int32_t startIndex = 0;
for (i = 0; i < TARRAY2_SIZE(pStatisBlkArray); ++i) { while((startIndex < numOfBlocks) && (pStatisBlkArray->data[startIndex].maxTbid.suid < suid)) {
SStatisBlk *p = &pStatisBlkArray->data[i]; ++startIndex;
if (p->minTbid.suid <= suid && p->maxTbid.suid >= suid) {
break;
}
} }
if (i >= TARRAY2_SIZE(pStatisBlkArray)) { if (startIndex >= numOfBlocks || pStatisBlkArray->data[startIndex].minTbid.suid > suid) {
return false; return 0;
} }
while (i < TARRAY2_SIZE(pStatisBlkArray)) { int32_t endIndex = startIndex;
SStatisBlk *p = &pStatisBlkArray->data[i]; while(endIndex < numOfBlocks && pStatisBlkArray->data[endIndex].minTbid.suid <= suid) {
if (p->minTbid.suid > suid) { ++endIndex;
return false; }
int32_t num = endIndex - startIndex;
pBlockLoadInfo->cost.loadStatisBlocks += num;
STbStatisBlock block;
tStatisBlockInit(&block);
int64_t st = taosGetTimestampUs();
for(int32_t k = startIndex; k < endIndex; ++k) {
tsdbSttFileReadStatisBlock(pSttFileReader, &pStatisBlkArray->data[k], &block);
int32_t i = 0;
while(block.suid->data[i] != suid) {
++i;
} }
// if (pBlockLoadInfo->statisBlock == NULL) { int32_t rows = TARRAY2_SIZE(block.suid);
// pBlockLoadInfo->statisBlock = taosMemoryCalloc(1, sizeof(STbStatisBlock)); if (pBlockLoadInfo->info.pUid == NULL) {
// pBlockLoadInfo->info.pUid = taosArrayInit(rows, sizeof(int64_t));
// int64_t st = taosGetTimestampMs(); pBlockLoadInfo->info.pFirstKey = taosArrayInit(rows, sizeof(int64_t));
// tsdbSttFileReadStatisBlock(pReader, p, pBlockLoadInfo->statisBlock); pBlockLoadInfo->info.pLastKey = taosArrayInit(rows, sizeof(int64_t));
// pBlockLoadInfo->statisBlockIndex = i; pBlockLoadInfo->info.pCount = taosArrayInit(rows, sizeof(int64_t));
//
// double el = (taosGetTimestampMs() - st) / 1000.0;
// pBlockLoadInfo->cost.loadStatisBlocks += 1;
// pBlockLoadInfo->cost.statisElapsedTime += el;
// } else if (pBlockLoadInfo->statisBlockIndex != i) {
// tStatisBlockDestroy(pBlockLoadInfo->statisBlock);
//
// int64_t st = taosGetTimestampMs();
// tsdbSttFileReadStatisBlock(pReader, p, pBlockLoadInfo->statisBlock);
// pBlockLoadInfo->statisBlockIndex = i;
//
// double el = (taosGetTimestampMs() - st) / 1000.0;
// pBlockLoadInfo->cost.loadStatisBlocks += 1;
// pBlockLoadInfo->cost.statisElapsedTime += el;
// }
STbStatisBlock* pBlock = pBlockLoadInfo->statisBlock;
int32_t index = tarray2SearchIdx(pBlock->suid, &suid, sizeof(int64_t), suidComparFn, TD_EQ);
if (index == -1) {
return false;
} }
int32_t j = index; if (pStatisBlkArray->data[k].maxTbid.suid == suid) {
if (pBlock->uid->data[j] == uid) { taosArrayAddBatch(pBlockLoadInfo->info.pUid, &block.uid->data[i], rows - i);
return true; taosArrayAddBatch(pBlockLoadInfo->info.pFirstKey, &block.firstKey->data[i], rows - i);
} else if (pBlock->uid->data[j] > uid) { taosArrayAddBatch(pBlockLoadInfo->info.pLastKey, &block.lastKey->data[i], rows - i);
while (j >= 0 && pBlock->suid->data[j] == suid) { taosArrayAddBatch(pBlockLoadInfo->info.pCount, &block.count->data[i], rows - i);
if (pBlock->uid->data[j] == uid) {
return true;
} else {
j -= 1;
}
}
} else { } else {
j = index + 1; while(i < rows && block.suid->data[i] == suid) {
while (j < pBlock->suid->size && pBlock->suid->data[j] == suid) { taosArrayPush(pBlockLoadInfo->info.pUid, &block.uid->data[i]);
if (pBlock->uid->data[j] == uid) { taosArrayPush(pBlockLoadInfo->info.pFirstKey, &block.firstKey->data[i]);
return true; taosArrayPush(pBlockLoadInfo->info.pLastKey, &block.lastKey->data[i]);
} else { taosArrayPush(pBlockLoadInfo->info.pCount, &block.count->data[i]);
j += 1; i += 1;
}
} }
} }
i += 1;
} }
return false; tStatisBlockDestroy(&block);
double el = (taosGetTimestampUs() - st) / 1000.0;
pBlockLoadInfo->cost.statisElapsedTime += el;
tsdbDebug("%s load %d statis blocks into buf, elapsed time:%.2fms", id, num, el);
return TSDB_CODE_SUCCESS;
} }
static int32_t doLoadSttFilesBlk(SSttBlockLoadInfo *pBlockLoadInfo, SLDataIter *pIter, int64_t suid, static int32_t doLoadSttFilesBlk(SSttBlockLoadInfo *pBlockLoadInfo, SLDataIter *pIter, int64_t suid,
@ -428,19 +401,28 @@ static int32_t doLoadSttFilesBlk(SSttBlockLoadInfo *pBlockLoadInfo, SLDataIter *
return code; return code;
} }
// load the stt block info for each stt file block
code = extractSttBlockInfo(pIter, pSttBlkArray, pBlockLoadInfo, suid); code = extractSttBlockInfo(pIter, pSttBlkArray, pBlockLoadInfo, suid);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
tsdbError("load stt block info failed, code:%s, %s", tstrerror(code), idStr); tsdbError("load stt block info failed, code:%s, %s", tstrerror(code), idStr);
return code; return code;
} }
// load stt blocks statis for all stt-blocks, to decide if the data of queried table exists in current stt file // load stt statistics block for all stt-blocks, to decide if the data of queried table exists in current stt file
code = tsdbSttFileReadStatisBlk(pIter->pReader, (const TStatisBlkArray **)&pBlockLoadInfo->pSttStatisBlkArray); TStatisBlkArray *pStatisBlkArray = NULL;
code = tsdbSttFileReadStatisBlk(pIter->pReader, (const TStatisBlkArray **)&pStatisBlkArray);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
tsdbError("failed to load stt block statistics, code:%s, %s", tstrerror(code), idStr); tsdbError("failed to load stt block statistics, code:%s, %s", tstrerror(code), idStr);
return code; return code;
} }
// load statistics block for all tables in current stt file
code = loadSttStatisticsBlockData(pIter->pReader, pIter->pBlockLoadInfo, pStatisBlkArray, suid, idStr);
if (code != TSDB_CODE_SUCCESS) {
tsdbError("failed to load stt statistics block data, code:%s, %s", tstrerror(code), idStr);
return code;
}
code = loadTombFn(pReader1, pIter->pReader, pIter->pBlockLoadInfo); code = loadTombFn(pReader1, pIter->pReader, pIter->pBlockLoadInfo);
double el = (taosGetTimestampUs() - st) / 1000.0; double el = (taosGetTimestampUs() - st) / 1000.0;
@ -448,19 +430,44 @@ static int32_t doLoadSttFilesBlk(SSttBlockLoadInfo *pBlockLoadInfo, SLDataIter *
return code; return code;
} }
static int32_t uidComparFn(const void* p1, const void* p2) {
const uint64_t *pFirst = p1;
const uint64_t *pVal = p2;
if (*pFirst == *pVal) {
return 0;
} else {
return *pFirst < *pVal? -1:1;
}
}
static void setSttInfoForCurrentTable(SSttBlockLoadInfo *pLoadInfo, uint64_t uid, STimeWindow *pTimeWindow,
int64_t *numOfRows) {
if (pTimeWindow == NULL || taosArrayGetSize(pLoadInfo->info.pUid) == 0) {
return;
}
int32_t index = taosArraySearchIdx(pLoadInfo->info.pUid, &uid, uidComparFn, TD_EQ);
if (index >= 0) {
pTimeWindow->skey = *(int64_t *)taosArrayGet(pLoadInfo->info.pFirstKey, index);
pTimeWindow->ekey = *(int64_t *)taosArrayGet(pLoadInfo->info.pLastKey, index);
*numOfRows += *(int64_t*) taosArrayGet(pLoadInfo->info.pCount, index);
}
}
int32_t tLDataIterOpen2(SLDataIter *pIter, SSttFileReader *pSttFileReader, int32_t cid, int8_t backward, int32_t tLDataIterOpen2(SLDataIter *pIter, SSttFileReader *pSttFileReader, int32_t cid, int8_t backward,
uint64_t suid, uint64_t uid, STimeWindow *pTimeWindow, SVersionRange *pRange, SMergeTreeConf *pConf, SSttBlockLoadInfo *pBlockLoadInfo, STimeWindow *pTimeWindow,
SSttBlockLoadInfo *pBlockLoadInfo, const char *idStr, bool strictTimeRange, int64_t *numOfRows, const char *idStr) {
_load_tomb_fn loadTombFn, void *pReader1) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
pIter->uid = uid; pIter->uid = pConf->uid;
pIter->cid = cid; pIter->cid = cid;
pIter->backward = backward; pIter->backward = backward;
pIter->verRange.minVer = pRange->minVer; pIter->verRange.minVer = pConf->verRange.minVer;
pIter->verRange.maxVer = pRange->maxVer; pIter->verRange.maxVer = pConf->verRange.maxVer;
pIter->timeWindow.skey = pTimeWindow->skey; pIter->timeWindow.skey = pConf->timewindow.skey;
pIter->timeWindow.ekey = pTimeWindow->ekey; pIter->timeWindow.ekey = pConf->timewindow.ekey;
pIter->pReader = pSttFileReader; pIter->pReader = pSttFileReader;
pIter->pBlockLoadInfo = pBlockLoadInfo; pIter->pBlockLoadInfo = pBlockLoadInfo;
@ -473,34 +480,29 @@ int32_t tLDataIterOpen2(SLDataIter *pIter, SSttFileReader *pSttFileReader, int32
} }
if (!pBlockLoadInfo->sttBlockLoaded) { if (!pBlockLoadInfo->sttBlockLoaded) {
code = doLoadSttFilesBlk(pBlockLoadInfo, pIter, suid, loadTombFn, pReader1, idStr); code = doLoadSttFilesBlk(pBlockLoadInfo, pIter, pConf->suid, pConf->loadTombFn, pConf->pReader, idStr);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
} }
// bool exists = existsFromSttBlkStatis(pBlockLoadInfo, suid, uid, pIter->pReader); setSttInfoForCurrentTable(pBlockLoadInfo, pConf->uid, pTimeWindow, numOfRows);
// if (!exists) {
// pIter->iSttBlk = -1;
// pIter->pSttBlk = NULL;
// return TSDB_CODE_SUCCESS;
// }
// find the start block, actually we could load the position to avoid repeatly searching for the start position when // find the start block, actually we could load the position to avoid repeatly searching for the start position when
// the skey is updated. // the skey is updated.
size_t size = taosArrayGetSize(pBlockLoadInfo->aSttBlk); size_t size = taosArrayGetSize(pBlockLoadInfo->aSttBlk);
pIter->iSttBlk = binarySearchForStartBlock(pBlockLoadInfo->aSttBlk->pData, size, uid, backward); pIter->iSttBlk = binarySearchForStartBlock(pBlockLoadInfo->aSttBlk->pData, size, pConf->uid, backward);
if (pIter->iSttBlk != -1) { if (pIter->iSttBlk != -1) {
pIter->pSttBlk = taosArrayGet(pBlockLoadInfo->aSttBlk, pIter->iSttBlk); pIter->pSttBlk = taosArrayGet(pBlockLoadInfo->aSttBlk, pIter->iSttBlk);
pIter->iRow = (pIter->backward) ? pIter->pSttBlk->nRow : -1; pIter->iRow = (pIter->backward) ? pIter->pSttBlk->nRow : -1;
if ((!backward) && ((strictTimeRange && pIter->pSttBlk->minKey >= pIter->timeWindow.ekey) || if ((!backward) && ((pConf->strictTimeRange && pIter->pSttBlk->minKey >= pIter->timeWindow.ekey) ||
(!strictTimeRange && pIter->pSttBlk->minKey > pIter->timeWindow.ekey))) { (!pConf->strictTimeRange && pIter->pSttBlk->minKey > pIter->timeWindow.ekey))) {
pIter->pSttBlk = NULL; pIter->pSttBlk = NULL;
} }
if (backward && ((strictTimeRange && pIter->pSttBlk->maxKey <= pIter->timeWindow.skey) || if (backward && ((pConf->strictTimeRange && pIter->pSttBlk->maxKey <= pIter->timeWindow.skey) ||
(!strictTimeRange && pIter->pSttBlk->maxKey < pIter->timeWindow.skey))) { (!pConf->strictTimeRange && pIter->pSttBlk->maxKey < pIter->timeWindow.skey))) {
pIter->pSttBlk = NULL; pIter->pSttBlk = NULL;
pIter->ignoreEarlierTs = true; pIter->ignoreEarlierTs = true;
} }
@ -708,8 +710,6 @@ bool tLDataIterNextRow(SLDataIter *pIter, const char *idStr) {
return (terrno == TSDB_CODE_SUCCESS) && (pIter->pSttBlk != NULL) && (pBlockData != NULL); return (terrno == TSDB_CODE_SUCCESS) && (pIter->pSttBlk != NULL) && (pBlockData != NULL);
} }
SRowInfo *tLDataIterGet(SLDataIter *pIter) { return &pIter->rInfo; }
// SMergeTree ================================================= // SMergeTree =================================================
static FORCE_INLINE int32_t tLDataIterCmprFn(const SRBTreeNode *p1, const SRBTreeNode *p2) { static FORCE_INLINE int32_t tLDataIterCmprFn(const SRBTreeNode *p1, const SRBTreeNode *p2) {
SLDataIter *pIter1 = (SLDataIter *)(((uint8_t *)p1) - offsetof(SLDataIter, node)); SLDataIter *pIter1 = (SLDataIter *)(((uint8_t *)p1) - offsetof(SLDataIter, node));
@ -737,7 +737,7 @@ static FORCE_INLINE int32_t tLDataIterDescCmprFn(const SRBTreeNode *p1, const SR
return -1 * tLDataIterCmprFn(p1, p2); return -1 * tLDataIterCmprFn(p1, p2);
} }
int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf) { int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf, SSttDataInfoForTable* pSttDataInfo) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
pMTree->pIter = NULL; pMTree->pIter = NULL;
@ -758,17 +758,16 @@ int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf) {
goto _end; goto _end;
} }
// add the list/iter placeholder adjustSttDataIters(pConf->pSttFileBlockIterArray, pConf->pCurrentFileset);
adjustLDataIters(pConf->pSttFileBlockIterArray, pConf->pCurrentFileset);
for (int32_t j = 0; j < numOfLevels; ++j) { for (int32_t j = 0; j < numOfLevels; ++j) {
SSttLvl *pSttLevel = ((STFileSet *)pConf->pCurrentFileset)->lvlArr->data[j]; SSttLvl *pSttLevel = ((STFileSet *)pConf->pCurrentFileset)->lvlArr->data[j];
SArray *pList = taosArrayGetP(pConf->pSttFileBlockIterArray, j); SArray * pList = taosArrayGetP(pConf->pSttFileBlockIterArray, j);
for (int32_t i = 0; i < TARRAY2_SIZE(pSttLevel->fobjArr); ++i) { // open all last file for (int32_t i = 0; i < TARRAY2_SIZE(pSttLevel->fobjArr); ++i) { // open all last file
SLDataIter *pIter = taosArrayGetP(pList, i); SLDataIter *pIter = taosArrayGetP(pList, i);
SSttFileReader *pSttFileReader = pIter->pReader; SSttFileReader * pSttFileReader = pIter->pReader;
SSttBlockLoadInfo *pLoadInfo = pIter->pBlockLoadInfo; SSttBlockLoadInfo *pLoadInfo = pIter->pBlockLoadInfo;
// open stt file reader if not opened yet // open stt file reader if not opened yet
@ -790,10 +789,11 @@ int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf) {
memset(pIter, 0, sizeof(SLDataIter)); memset(pIter, 0, sizeof(SLDataIter));
STimeWindow w = {0};
int64_t numOfRows = 0;
int64_t cid = pSttLevel->fobjArr->data[i]->f->cid; int64_t cid = pSttLevel->fobjArr->data[i]->f->cid;
code = tLDataIterOpen2(pIter, pSttFileReader, cid, pMTree->backward, pConf->suid, pConf->uid, &pConf->timewindow, code = tLDataIterOpen2(pIter, pSttFileReader, cid, pMTree->backward, pConf, pLoadInfo, &w, &numOfRows, pMTree->idStr);
&pConf->verRange, pLoadInfo, pMTree->idStr, pConf->strictTimeRange, pConf->loadTombFn,
pConf->pReader);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _end; goto _end;
} }
@ -801,6 +801,12 @@ int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf) {
bool hasVal = tLDataIterNextRow(pIter, pMTree->idStr); bool hasVal = tLDataIterNextRow(pIter, pMTree->idStr);
if (hasVal) { if (hasVal) {
tMergeTreeAddIter(pMTree, pIter); tMergeTreeAddIter(pMTree, pIter);
// let's record the time window for current table of uid in the stt files
if (pSttDataInfo != NULL) {
taosArrayPush(pSttDataInfo->pTimeWindowList, &w);
pSttDataInfo->numOfRows += numOfRows;
}
} else { } else {
if (!pMTree->ignoreEarlierTs) { if (!pMTree->ignoreEarlierTs) {
pMTree->ignoreEarlierTs = pIter->ignoreEarlierTs; pMTree->ignoreEarlierTs = pIter->ignoreEarlierTs;

View File

@ -25,6 +25,16 @@
#define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC) #define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC)
#define getCurrentKeyInSttBlock(_r) ((_r)->currentKey) #define getCurrentKeyInSttBlock(_r) ((_r)->currentKey)
typedef struct {
bool overlapWithNeighborBlock;
bool hasDupTs;
bool overlapWithDelInfo;
bool overlapWithSttBlock;
bool overlapWithKeyInBuf;
bool partiallyRequired;
bool moreThanCapcity;
} SDataBlockToLoadInfo;
static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter); static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter);
static int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity, static int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity,
STsdbReader* pReader); STsdbReader* pReader);
@ -1221,78 +1231,6 @@ static bool keyOverlapFileBlock(TSDBKEY key, SFileDataBlockInfo* pBlock, SVersio
(pBlock->record.maxVer >= pVerRange->minVer) && (pBlock->record.minVer <= pVerRange->maxVer); (pBlock->record.maxVer >= pVerRange->minVer) && (pBlock->record.minVer <= pVerRange->maxVer);
} }
static bool doCheckforDatablockOverlap(STableBlockScanInfo* pBlockScanInfo, const SBrinRecord* pRecord,
int32_t startIndex) {
size_t num = taosArrayGetSize(pBlockScanInfo->delSkyline);
for (int32_t i = startIndex; i < num; i += 1) {
TSDBKEY* p = taosArrayGet(pBlockScanInfo->delSkyline, i);
if (p->ts >= pRecord->firstKey && p->ts <= pRecord->lastKey) {
if (p->version >= pRecord->minVer) {
return true;
}
} else if (p->ts < pRecord->firstKey) { // p->ts < pBlock->minKey.ts
if (p->version >= pRecord->minVer) {
if (i < num - 1) {
TSDBKEY* pnext = taosArrayGet(pBlockScanInfo->delSkyline, i + 1);
if (pnext->ts >= pRecord->firstKey) {
return true;
}
} else { // it must be the last point
ASSERT(p->version == 0);
}
}
} else { // (p->ts > pBlock->maxKey.ts) {
return false;
}
}
return false;
}
static bool overlapWithDelSkyline(STableBlockScanInfo* pBlockScanInfo, const SBrinRecord* pRecord, int32_t order) {
if (pBlockScanInfo->delSkyline == NULL || (taosArrayGetSize(pBlockScanInfo->delSkyline) == 0)) {
return false;
}
// ts is not overlap
TSDBKEY* pFirst = taosArrayGet(pBlockScanInfo->delSkyline, 0);
TSDBKEY* pLast = taosArrayGetLast(pBlockScanInfo->delSkyline);
if (pRecord->firstKey > pLast->ts || pRecord->lastKey < pFirst->ts) {
return false;
}
// version is not overlap
if (ASCENDING_TRAVERSE(order)) {
return doCheckforDatablockOverlap(pBlockScanInfo, pRecord, pBlockScanInfo->fileDelIndex);
} else {
int32_t index = pBlockScanInfo->fileDelIndex;
while (1) {
TSDBKEY* p = taosArrayGet(pBlockScanInfo->delSkyline, index);
if (p->ts > pRecord->firstKey && index > 0) {
index -= 1;
} else { // find the first point that is smaller than the minKey.ts of dataBlock.
if (p->ts == pRecord->firstKey && p->version < pRecord->maxVer && index > 0) {
index -= 1;
}
break;
}
}
return doCheckforDatablockOverlap(pBlockScanInfo, pRecord, index);
}
}
typedef struct {
bool overlapWithNeighborBlock;
bool hasDupTs;
bool overlapWithDelInfo;
bool overlapWithLastBlock;
bool overlapWithKeyInBuf;
bool partiallyRequired;
bool moreThanCapcity;
} SDataBlockToLoadInfo;
static void getBlockToLoadInfo(SDataBlockToLoadInfo* pInfo, SFileDataBlockInfo* pBlockInfo, static void getBlockToLoadInfo(SDataBlockToLoadInfo* pInfo, SFileDataBlockInfo* pBlockInfo,
STableBlockScanInfo* pScanInfo, TSDBKEY keyInBuf, STsdbReader* pReader) { STableBlockScanInfo* pScanInfo, TSDBKEY keyInBuf, STsdbReader* pReader) {
SBrinRecord rec = {0}; SBrinRecord rec = {0};
@ -1313,7 +1251,7 @@ static void getBlockToLoadInfo(SDataBlockToLoadInfo* pInfo, SFileDataBlockInfo*
ASSERT(pScanInfo->sttKeyInfo.status != STT_FILE_READER_UNINIT); ASSERT(pScanInfo->sttKeyInfo.status != STT_FILE_READER_UNINIT);
if (pScanInfo->sttKeyInfo.status == STT_FILE_HAS_DATA) { if (pScanInfo->sttKeyInfo.status == STT_FILE_HAS_DATA) {
int64_t nextProcKeyInStt = pScanInfo->sttKeyInfo.nextProcKey; int64_t nextProcKeyInStt = pScanInfo->sttKeyInfo.nextProcKey;
pInfo->overlapWithLastBlock = pInfo->overlapWithSttBlock =
!(pBlockInfo->record.lastKey < nextProcKeyInStt || pBlockInfo->record.firstKey > nextProcKeyInStt); !(pBlockInfo->record.lastKey < nextProcKeyInStt || pBlockInfo->record.firstKey > nextProcKeyInStt);
} }
@ -1335,15 +1273,15 @@ static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo* pBlock
bool loadDataBlock = bool loadDataBlock =
(info.overlapWithNeighborBlock || info.hasDupTs || info.partiallyRequired || info.overlapWithKeyInBuf || (info.overlapWithNeighborBlock || info.hasDupTs || info.partiallyRequired || info.overlapWithKeyInBuf ||
info.moreThanCapcity || info.overlapWithDelInfo || info.overlapWithLastBlock); info.moreThanCapcity || info.overlapWithDelInfo || info.overlapWithSttBlock);
// log the reason why load the datablock for profile // log the reason why load the datablock for profile
if (loadDataBlock) { if (loadDataBlock) {
tsdbDebug("%p uid:%" PRIu64 tsdbDebug("%p uid:%" PRIu64
" need to load the datablock, overlapneighbor:%d, hasDup:%d, partiallyRequired:%d, " " need to load the datablock, overlapneighbor:%d, hasDup:%d, partiallyRequired:%d, "
"overlapWithKey:%d, greaterThanBuf:%d, overlapWithDel:%d, overlapWithlastBlock:%d, %s", "overlapWithKey:%d, greaterThanBuf:%d, overlapWithDel:%d, overlapWithSttBlock:%d, %s",
pReader, pBlockInfo->uid, info.overlapWithNeighborBlock, info.hasDupTs, info.partiallyRequired, pReader, pBlockInfo->uid, info.overlapWithNeighborBlock, info.hasDupTs, info.partiallyRequired,
info.overlapWithKeyInBuf, info.moreThanCapcity, info.overlapWithDelInfo, info.overlapWithLastBlock, info.overlapWithKeyInBuf, info.moreThanCapcity, info.overlapWithDelInfo, info.overlapWithSttBlock,
pReader->idStr); pReader->idStr);
} }
@ -1355,7 +1293,7 @@ static bool isCleanFileDataBlock(STsdbReader* pReader, SFileDataBlockInfo* pBloc
SDataBlockToLoadInfo info = {0}; SDataBlockToLoadInfo info = {0};
getBlockToLoadInfo(&info, pBlockInfo, pScanInfo, keyInBuf, pReader); getBlockToLoadInfo(&info, pBlockInfo, pScanInfo, keyInBuf, pReader);
bool isCleanFileBlock = !(info.overlapWithNeighborBlock || info.hasDupTs || info.overlapWithKeyInBuf || bool isCleanFileBlock = !(info.overlapWithNeighborBlock || info.hasDupTs || info.overlapWithKeyInBuf ||
info.overlapWithDelInfo || info.overlapWithLastBlock); info.overlapWithDelInfo || info.overlapWithSttBlock);
return isCleanFileBlock; return isCleanFileBlock;
} }
@ -2110,27 +2048,34 @@ static bool isValidFileBlockRow(SBlockData* pBlockData, SFileBlockDumpInfo* pDum
return true; return true;
} }
static bool initSttBlockReader(SSttBlockReader* pLBlockReader, STableBlockScanInfo* pScanInfo, STsdbReader* pReader) { static bool initSttBlockReader(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pScanInfo, STsdbReader* pReader) {
// the last block reader has been initialized for this table. bool hasData = true;
if (pLBlockReader->uid == pScanInfo->uid) {
return hasDataInSttBlock(pLBlockReader); // the stt block reader has been initialized for this table.
if (pSttBlockReader->uid == pScanInfo->uid) {
return hasDataInSttBlock(pSttBlockReader);
} }
if (pLBlockReader->uid != 0) { if (pSttBlockReader->uid != 0) {
tMergeTreeClose(&pLBlockReader->mergeTree); tMergeTreeClose(&pSttBlockReader->mergeTree);
} }
pLBlockReader->uid = pScanInfo->uid; pSttBlockReader->uid = pScanInfo->uid;
STimeWindow w = pLBlockReader->window; // second time init stt block reader
if (ASCENDING_TRAVERSE(pLBlockReader->order)) { if (pScanInfo->cleanSttBlocks && pReader->info.execMode == READER_EXEC_ROWS) {
return true;
}
STimeWindow w = pSttBlockReader->window;
if (ASCENDING_TRAVERSE(pSttBlockReader->order)) {
w.skey = pScanInfo->sttKeyInfo.nextProcKey; w.skey = pScanInfo->sttKeyInfo.nextProcKey;
} else { } else {
w.ekey = pScanInfo->sttKeyInfo.nextProcKey; w.ekey = pScanInfo->sttKeyInfo.nextProcKey;
} }
int64_t st = taosGetTimestampUs(); int64_t st = taosGetTimestampUs();
tsdbDebug("init last block reader, window:%" PRId64 "-%" PRId64 ", uid:%" PRIu64 ", %s", w.skey, w.ekey, tsdbDebug("init stt block reader, window:%" PRId64 "-%" PRId64 ", uid:%" PRIu64 ", %s", w.skey, w.ekey,
pScanInfo->uid, pReader->idStr); pScanInfo->uid, pReader->idStr);
SMergeTreeConf conf = { SMergeTreeConf conf = {
@ -2138,20 +2083,22 @@ static bool initSttBlockReader(SSttBlockReader* pLBlockReader, STableBlockScanIn
.suid = pReader->info.suid, .suid = pReader->info.suid,
.pTsdb = pReader->pTsdb, .pTsdb = pReader->pTsdb,
.timewindow = w, .timewindow = w,
.verRange = pLBlockReader->verRange, .verRange = pSttBlockReader->verRange,
.strictTimeRange = false, .strictTimeRange = false,
.pSchema = pReader->info.pSchema, .pSchema = pReader->info.pSchema,
.pCurrentFileset = pReader->status.pCurrentFileset, .pCurrentFileset = pReader->status.pCurrentFileset,
.backward = (pLBlockReader->order == TSDB_ORDER_DESC), .backward = (pSttBlockReader->order == TSDB_ORDER_DESC),
.pSttFileBlockIterArray = pReader->status.pLDataIterArray, .pSttFileBlockIterArray = pReader->status.pLDataIterArray,
.pCols = pReader->suppInfo.colId, .pCols = pReader->suppInfo.colId,
.numOfCols = pReader->suppInfo.numOfCols, .numOfCols = pReader->suppInfo.numOfCols,
.loadTombFn = loadSttTombDataForAll, .loadTombFn = loadSttTombDataForAll,
.pReader = pReader, .pReader = pReader,
.idstr = pReader->idStr, .idstr = pReader->idStr,
.rspRows = (pReader->info.execMode == READER_EXEC_ROWS),
}; };
int32_t code = tMergeTreeOpen2(&pLBlockReader->mergeTree, &conf); SSttDataInfoForTable info = {.pTimeWindowList = taosArrayInit(4, sizeof(STimeWindow))};
int32_t code = tMergeTreeOpen2(&pSttBlockReader->mergeTree, &conf, &info);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return false; return false;
} }
@ -2159,13 +2106,44 @@ static bool initSttBlockReader(SSttBlockReader* pLBlockReader, STableBlockScanIn
initMemDataIterator(pScanInfo, pReader); initMemDataIterator(pScanInfo, pReader);
initDelSkylineIterator(pScanInfo, pReader->info.order, &pReader->cost); initDelSkylineIterator(pScanInfo, pReader->info.order, &pReader->cost);
code = nextRowFromSttBlocks(pLBlockReader, pScanInfo, &pReader->info.verRange); if (conf.rspRows) {
pScanInfo->cleanSttBlocks =
isCleanSttBlock(info.pTimeWindowList, &pReader->info.window, pScanInfo, pReader->info.order);
if (pScanInfo->cleanSttBlocks) {
pScanInfo->numOfRowsInStt = info.numOfRows;
pScanInfo->sttWindow.skey = INT64_MAX;
pScanInfo->sttWindow.ekey = INT64_MIN;
// calculate the time window for data in stt files
for(int32_t i = 0; i < taosArrayGetSize(info.pTimeWindowList); ++i) {
STimeWindow* pWindow = taosArrayGet(info.pTimeWindowList, i);
if (pScanInfo->sttWindow.skey > pWindow->skey) {
pScanInfo->sttWindow.skey = pWindow->skey;
}
if (pScanInfo->sttWindow.ekey < pWindow->ekey) {
pScanInfo->sttWindow.ekey = pWindow->ekey;
}
}
pScanInfo->sttKeyInfo.status = taosArrayGetSize(info.pTimeWindowList)? STT_FILE_HAS_DATA:STT_FILE_NO_DATA;
pScanInfo->sttKeyInfo.nextProcKey = ASCENDING_TRAVERSE(pReader->info.order)? pScanInfo->sttWindow.skey:pScanInfo->sttWindow.ekey;
hasData = true;
} else {
hasData = nextRowFromSttBlocks(pSttBlockReader, pScanInfo, &pReader->info.verRange);
}
} else {
hasData = nextRowFromSttBlocks(pSttBlockReader, pScanInfo, &pReader->info.verRange);
}
taosArrayDestroy(info.pTimeWindowList);
int64_t el = taosGetTimestampUs() - st; int64_t el = taosGetTimestampUs() - st;
pReader->cost.initSttBlockReader += (el / 1000.0); pReader->cost.initSttBlockReader += (el / 1000.0);
tsdbDebug("init last block reader completed, elapsed time:%" PRId64 "us %s", el, pReader->idStr); tsdbDebug("init stt block reader completed, elapsed time:%" PRId64 "us %s", el, pReader->idStr);
return code; return hasData;
} }
static bool hasDataInSttBlock(SSttBlockReader* pSttBlockReader) { return pSttBlockReader->mergeTree.pIter != NULL; } static bool hasDataInSttBlock(SSttBlockReader* pSttBlockReader) { return pSttBlockReader->mergeTree.pIter != NULL; }
@ -2356,18 +2334,15 @@ void updateComposedBlockInfo(STsdbReader* pReader, double el, STableBlockScanInf
static int32_t buildComposedDataBlock(STsdbReader* pReader) { static int32_t buildComposedDataBlock(STsdbReader* pReader) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
bool asc = ASCENDING_TRAVERSE(pReader->info.order);
int64_t st = taosGetTimestampUs();
int32_t step = asc ? 1 : -1;
double el = 0;
SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock; SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock;
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter); SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);
SSttBlockReader* pSttBlockReader = pReader->status.fileIter.pSttBlockReader; SSttBlockReader* pSttBlockReader = pReader->status.fileIter.pSttBlockReader;
SBrinRecord* pRecord = &pBlockInfo->record;
bool asc = ASCENDING_TRAVERSE(pReader->info.order);
int64_t st = taosGetTimestampUs();
int32_t step = asc ? 1 : -1;
double el = 0;
SBrinRecord* pRecord = &pBlockInfo->record;
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
STableBlockScanInfo* pBlockScanInfo = NULL; STableBlockScanInfo* pBlockScanInfo = NULL;
@ -2629,10 +2604,10 @@ static bool moveToNextTable(STableUidList* pOrderedCheckInfo, SReaderStatus* pSt
} }
static int32_t doLoadSttBlockSequentially(STsdbReader* pReader) { static int32_t doLoadSttBlockSequentially(STsdbReader* pReader) {
SReaderStatus* pStatus = &pReader->status; SReaderStatus* pStatus = &pReader->status;
SSttBlockReader* pSttBlockReader = pStatus->fileIter.pSttBlockReader; SSttBlockReader* pSttBlockReader = pStatus->fileIter.pSttBlockReader;
STableUidList* pUidList = &pStatus->uidList; STableUidList* pUidList = &pStatus->uidList;
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
if (tSimpleHashGetSize(pStatus->pTableMap) == 0) { if (tSimpleHashGetSize(pStatus->pTableMap) == 0) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -2668,8 +2643,8 @@ static int32_t doLoadSttBlockSequentially(STsdbReader* pReader) {
continue; continue;
} }
bool hasDataInLastFile = initSttBlockReader(pSttBlockReader, pScanInfo, pReader); bool hasDataInSttFile = initSttBlockReader(pSttBlockReader, pScanInfo, pReader);
if (!hasDataInLastFile) { if (!hasDataInSttFile) {
bool hasNexTable = moveToNextTable(pUidList, pStatus); bool hasNexTable = moveToNextTable(pUidList, pStatus);
if (!hasNexTable) { if (!hasNexTable) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -2678,12 +2653,31 @@ static int32_t doLoadSttBlockSequentially(STsdbReader* pReader) {
continue; continue;
} }
// if only require the total rows, no need to load data from stt file if it is clean stt blocks
if (pReader->info.execMode == READER_EXEC_ROWS && pScanInfo->cleanSttBlocks) {
SDataBlockInfo* pInfo = &pResBlock->info;
pInfo->rows = pScanInfo->numOfRowsInStt;
pInfo->id.uid = pScanInfo->uid;
pInfo->dataLoad = 1;
pInfo->window = pScanInfo->sttWindow;
setComposedBlockFlag(pReader, true);
pScanInfo->sttKeyInfo.nextProcKey =
ASCENDING_TRAVERSE(pReader->info.order) ? pScanInfo->sttWindow.ekey + 1 : pScanInfo->sttWindow.skey - 1;
pScanInfo->sttKeyInfo.status = STT_FILE_NO_DATA;
pScanInfo->lastProcKey =
ASCENDING_TRAVERSE(pReader->info.order) ? pScanInfo->sttWindow.ekey : pScanInfo->sttWindow.skey;
pSttBlockReader->mergeTree.pIter = NULL;
tsdbDebug("%p uid:%" PRId64 " return clean stt block as one, brange:%" PRId64 "-%" PRId64 " rows:%" PRId64 " %s",
pReader, pResBlock->info.id.uid, pResBlock->info.window.skey, pResBlock->info.window.ekey,
pResBlock->info.rows, pReader->idStr);
return TSDB_CODE_SUCCESS;
}
int64_t st = taosGetTimestampUs(); int64_t st = taosGetTimestampUs();
while (1) { while (1) {
bool hasBlockLData = hasDataInSttBlock(pSttBlockReader); // no data in stt block and block, no need to proceed.
if (!hasDataInSttBlock(pSttBlockReader)) {
// no data in last block and block, no need to proceed.
if (hasBlockLData == false) {
break; break;
} }
@ -2728,14 +2722,13 @@ static bool notOverlapWithSttFiles(SFileDataBlockInfo* pBlockInfo, STableBlockSc
} }
static int32_t doBuildDataBlock(STsdbReader* pReader) { static int32_t doBuildDataBlock(STsdbReader* pReader) {
int32_t code = TSDB_CODE_SUCCESS;
SReaderStatus* pStatus = &pReader->status; SReaderStatus* pStatus = &pReader->status;
SDataBlockIter* pBlockIter = &pStatus->blockIter; SDataBlockIter* pBlockIter = &pStatus->blockIter;
STableBlockScanInfo* pScanInfo = NULL; STableBlockScanInfo* pScanInfo = NULL;
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter); SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
SSttBlockReader* pSttBlockReader = pReader->status.fileIter.pSttBlockReader; SSttBlockReader* pSttBlockReader = pReader->status.fileIter.pSttBlockReader;
bool asc = ASCENDING_TRAVERSE(pReader->info.order); bool asc = ASCENDING_TRAVERSE(pReader->info.order);
int32_t code = TSDB_CODE_SUCCESS;
if (pReader->pIgnoreTables && taosHashGet(*pReader->pIgnoreTables, &pBlockInfo->uid, sizeof(pBlockInfo->uid))) { if (pReader->pIgnoreTables && taosHashGet(*pReader->pIgnoreTables, &pBlockInfo->uid, sizeof(pBlockInfo->uid))) {
setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlockInfo->record.lastKey, pReader->info.order); setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlockInfo->record.lastKey, pReader->info.order);
@ -2793,13 +2786,13 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock; SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock;
tsdbDebug("load data in last block firstly %s", pReader->idStr); tsdbDebug("load data in stt block firstly %s", pReader->idStr);
int64_t st = taosGetTimestampUs(); int64_t st = taosGetTimestampUs();
// let's load data from stt files // let's load data from stt files
initSttBlockReader(pSttBlockReader, pScanInfo, pReader); initSttBlockReader(pSttBlockReader, pScanInfo, pReader);
// no data in last block, no need to proceed. // no data in stt block, no need to proceed.
while (hasDataInSttBlock(pSttBlockReader)) { while (hasDataInSttBlock(pSttBlockReader)) {
ASSERT(pScanInfo->sttKeyInfo.status == STT_FILE_HAS_DATA); ASSERT(pScanInfo->sttKeyInfo.status == STT_FILE_HAS_DATA);
@ -2837,147 +2830,6 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
return (pReader->code != TSDB_CODE_SUCCESS) ? pReader->code : code; return (pReader->code != TSDB_CODE_SUCCESS) ? pReader->code : code;
} }
static int32_t doSumFileBlockRows(STsdbReader* pReader, SDataFReader* pFileReader) {
int64_t st = taosGetTimestampUs();
LRUHandle* handle = NULL;
int32_t code = tsdbCacheGetBlockIdx(pFileReader->pTsdb->biCache, pFileReader, &handle);
if (code != TSDB_CODE_SUCCESS || handle == NULL) {
goto _end;
}
#if 0
int32_t numOfTables = tSimpleHashGetSize(pReader->status.pTableMap);
SArray* aBlockIdx = (SArray*)taosLRUCacheValue(pFileReader->pTsdb->biCache, handle);
size_t num = taosArrayGetSize(aBlockIdx);
if (num == 0) {
tsdbBICacheRelease(pFileReader->pTsdb->biCache, handle);
return TSDB_CODE_SUCCESS;
}
SBlockIdx* pBlockIdx = NULL;
for (int32_t i = 0; i < num; ++i) {
pBlockIdx = (SBlockIdx*)taosArrayGet(aBlockIdx, i);
if (pBlockIdx->suid != pReader->info.suid) {
continue;
}
STableBlockScanInfo** p = tSimpleHashGet(pReader->status.pTableMap, &pBlockIdx->uid, sizeof(pBlockIdx->uid));
if (p == NULL) {
continue;
}
STableBlockScanInfo* pScanInfo = *p;
SDataBlk block = {0};
// for (int32_t j = 0; j < pScanInfo->mapData.nItem; ++j) {
// tGetDataBlk(pScanInfo->mapData.pData + pScanInfo->mapData.aOffset[j], &block);
// pReader->rowsNum += block.nRow;
// }
}
#endif
_end:
tsdbBICacheRelease(pFileReader->pTsdb->biCache, handle);
return code;
}
static int32_t doSumSttBlockRows(STsdbReader* pReader) {
int32_t code = TSDB_CODE_SUCCESS;
SSttBlockReader* pSttBlockReader = pReader->status.fileIter.pSttBlockReader;
SSttBlockLoadInfo* pBlockLoadInfo = NULL;
#if 0
for (int32_t i = 0; i < pReader->pFileReader->pSet->nSttF; ++i) { // open all last file
pBlockLoadInfo = &pSttBlockReader->pInfo[i];
code = tsdbReadSttBlk(pReader->pFileReader, i, pBlockLoadInfo->aSttBlk);
if (code) {
return code;
}
size_t size = taosArrayGetSize(pBlockLoadInfo->aSttBlk);
if (size >= 1) {
SSttBlk* pStart = taosArrayGet(pBlockLoadInfo->aSttBlk, 0);
SSttBlk* pEnd = taosArrayGet(pBlockLoadInfo->aSttBlk, size - 1);
// all identical
if (pStart->suid == pEnd->suid) {
if (pStart->suid != pReader->info.suid) {
// no qualified stt block existed
taosArrayClear(pBlockLoadInfo->aSttBlk);
continue;
}
for (int32_t j = 0; j < size; ++j) {
SSttBlk* p = taosArrayGet(pBlockLoadInfo->aSttBlk, j);
pReader->rowsNum += p->nRow;
}
} else {
for (int32_t j = 0; j < size; ++j) {
SSttBlk* p = taosArrayGet(pBlockLoadInfo->aSttBlk, j);
uint64_t s = p->suid;
if (s < pReader->info.suid) {
continue;
}
if (s == pReader->info.suid) {
pReader->rowsNum += p->nRow;
} else if (s > pReader->info.suid) {
break;
}
}
}
}
}
#endif
return code;
}
static int32_t readRowsCountFromFiles(STsdbReader* pReader) {
int32_t code = TSDB_CODE_SUCCESS;
while (1) {
bool hasNext = false;
code = filesetIteratorNext(&pReader->status.fileIter, pReader, &hasNext);
if (code) {
return code;
}
if (!hasNext) { // no data files on disk
break;
}
// code = doSumFileBlockRows(pReader, pReader->pFileReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
code = doSumSttBlockRows(pReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
pReader->status.loadFromFile = false;
return code;
}
static int32_t readRowsCountFromMem(STsdbReader* pReader) {
int32_t code = TSDB_CODE_SUCCESS;
int64_t memNum = 0, imemNum = 0;
if (pReader->pReadSnap->pMem != NULL) {
tsdbMemTableCountRows(pReader->pReadSnap->pMem, pReader->status.pTableMap, &memNum);
}
if (pReader->pReadSnap->pIMem != NULL) {
tsdbMemTableCountRows(pReader->pReadSnap->pIMem, pReader->status.pTableMap, &imemNum);
}
pReader->rowsNum += memNum + imemNum;
return code;
}
static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) { static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) {
SReaderStatus* pStatus = &pReader->status; SReaderStatus* pStatus = &pReader->status;
STableUidList* pUidList = &pStatus->uidList; STableUidList* pUidList = &pStatus->uidList;
@ -3107,7 +2959,7 @@ static ERetrieveType doReadDataFromSttFiles(STsdbReader* pReader) {
return TSDB_READ_RETURN; return TSDB_READ_RETURN;
} }
// all data blocks are checked in this last block file, now let's try the next file // all data blocks are checked in this stt file, now let's try the next file set
ASSERT(pReader->status.pTableIter == NULL); ASSERT(pReader->status.pTableIter == NULL);
code = initForFirstBlockInFile(pReader, pBlockIter); code = initForFirstBlockInFile(pReader, pBlockIter);
@ -3946,7 +3798,7 @@ static int32_t doOpenReaderImpl(STsdbReader* pReader) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
if (pStatus->fileIter.numOfFiles == 0) { if (pStatus->fileIter.numOfFiles == 0) {
pStatus->loadFromFile = false; pStatus->loadFromFile = false;
} else if (READ_MODE_COUNT_ONLY == pReader->info.readMode) { // } else if (READER_EXEC_DATA == pReader->info.readMode) {
// DO NOTHING // DO NOTHING
} else { } else {
code = initForFirstBlockInFile(pReader, pBlockIter); code = initForFirstBlockInFile(pReader, pBlockIter);
@ -3987,8 +3839,7 @@ static void setSharedPtr(STsdbReader* pDst, const STsdbReader* pSrc) {
// ====================================== EXPOSED APIs ====================================== // ====================================== EXPOSED APIs ======================================
int32_t tsdbReaderOpen2(void* pVnode, SQueryTableDataCond* pCond, void* pTableList, int32_t numOfTables, int32_t tsdbReaderOpen2(void* pVnode, SQueryTableDataCond* pCond, void* pTableList, int32_t numOfTables,
SSDataBlock* pResBlock, void** ppReader, const char* idstr, bool countOnly, SSDataBlock* pResBlock, void** ppReader, const char* idstr, SHashObj** pIgnoreTables) {
SHashObj** pIgnoreTables) {
STimeWindow window = pCond->twindows; STimeWindow window = pCond->twindows;
SVnodeCfg* pConf = &(((SVnode*)pVnode)->config); SVnodeCfg* pConf = &(((SVnode*)pVnode)->config);
@ -4094,13 +3945,10 @@ int32_t tsdbReaderOpen2(void* pVnode, SQueryTableDataCond* pCond, void* pTableLi
} }
pReader->flag = READER_STATUS_SUSPEND; pReader->flag = READER_STATUS_SUSPEND;
// pReader->info.execMode = pCond->trimData ? READER_EXEC_ROWS : READER_EXEC_DATA;
if (countOnly) { pReader->info.execMode = READER_EXEC_ROWS;
pReader->info.readMode = READ_MODE_COUNT_ONLY;
}
pReader->pIgnoreTables = pIgnoreTables; pReader->pIgnoreTables = pIgnoreTables;
tsdbDebug("%p total numOfTable:%d, window:%" PRId64 " - %" PRId64 ", verRange:%" PRId64 " - %" PRId64 tsdbDebug("%p total numOfTable:%d, window:%" PRId64 " - %" PRId64 ", verRange:%" PRId64 " - %" PRId64
" in this query %s", " in this query %s",
pReader, numOfTables, pReader->info.window.skey, pReader->info.window.ekey, pReader->info.verRange.minVer, pReader, numOfTables, pReader->info.window.skey, pReader->info.window.ekey, pReader->info.verRange.minVer,
@ -4332,32 +4180,6 @@ _err:
return code; return code;
} }
static bool tsdbReadRowsCountOnly(STsdbReader* pReader) {
int32_t code = TSDB_CODE_SUCCESS;
SSDataBlock* pBlock = pReader->resBlockInfo.pResBlock;
if (pReader->status.loadFromFile == false) {
return false;
}
code = readRowsCountFromFiles(pReader);
if (code != TSDB_CODE_SUCCESS) {
return false;
}
code = readRowsCountFromMem(pReader);
if (code != TSDB_CODE_SUCCESS) {
return false;
}
pBlock->info.rows = pReader->rowsNum;
pBlock->info.id.uid = 0;
pBlock->info.dataLoad = 0;
pReader->rowsNum = 0;
return pBlock->info.rows > 0;
}
static int32_t doTsdbNextDataBlock2(STsdbReader* pReader, bool* hasNext) { static int32_t doTsdbNextDataBlock2(STsdbReader* pReader, bool* hasNext) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
@ -4372,10 +4194,6 @@ static int32_t doTsdbNextDataBlock2(STsdbReader* pReader, bool* hasNext) {
return code; return code;
} }
if (READ_MODE_COUNT_ONLY == pReader->info.readMode) {
return tsdbReadRowsCountOnly(pReader);
}
if (pStatus->loadFromFile) { if (pStatus->loadFromFile) {
code = buildBlockFromFiles(pReader); code = buildBlockFromFiles(pReader);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
@ -4685,7 +4503,7 @@ SSDataBlock* tsdbRetrieveDataBlock2(STsdbReader* pReader, SArray* pIdList) {
} }
SReaderStatus* pStatus = &pTReader->status; SReaderStatus* pStatus = &pTReader->status;
if (pStatus->composedDataBlock) { if (pStatus->composedDataBlock || pReader->info.execMode == READER_EXEC_ROWS) {
return pTReader->resBlockInfo.pResBlock; return pTReader->resBlockInfo.pResBlock;
} }
@ -4722,9 +4540,9 @@ int32_t tsdbReaderReset2(STsdbReader* pReader, SQueryTableDataCond* pCond) {
pReader->info.order = pCond->order; pReader->info.order = pCond->order;
pReader->type = TIMEWINDOW_RANGE_CONTAINED; pReader->type = TIMEWINDOW_RANGE_CONTAINED;
pReader->info.window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows);
pStatus->loadFromFile = true; pStatus->loadFromFile = true;
pStatus->pTableIter = NULL; pStatus->pTableIter = NULL;
pReader->info.window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows);
// allocate buffer in order to load data blocks from file // allocate buffer in order to load data blocks from file
memset(&pReader->suppInfo.tsColAgg, 0, sizeof(SColumnDataAgg)); memset(&pReader->suppInfo.tsColAgg, 0, sizeof(SColumnDataAgg));

View File

@ -22,15 +22,7 @@
#include "tsdbUtil2.h" #include "tsdbUtil2.h"
#include "tsimplehash.h" #include "tsimplehash.h"
int32_t uidComparFunc(const void* p1, const void* p2) { static bool overlapWithDelSkylineWithoutVer(STableBlockScanInfo* pBlockScanInfo, const SBrinRecord* pRecord, int32_t order);
uint64_t pu1 = *(uint64_t*)p1;
uint64_t pu2 = *(uint64_t*)p2;
if (pu1 == pu2) {
return 0;
} else {
return (pu1 < pu2) ? -1 : 1;
}
}
static int32_t initBlockScanInfoBuf(SBlockInfoBuf* pBuf, int32_t numOfTables) { static int32_t initBlockScanInfoBuf(SBlockInfoBuf* pBuf, int32_t numOfTables) {
int32_t num = numOfTables / pBuf->numPerBucket; int32_t num = numOfTables / pBuf->numPerBucket;
@ -61,6 +53,16 @@ static int32_t initBlockScanInfoBuf(SBlockInfoBuf* pBuf, int32_t numOfTables) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t uidComparFunc(const void* p1, const void* p2) {
uint64_t pu1 = *(uint64_t*)p1;
uint64_t pu2 = *(uint64_t*)p2;
if (pu1 == pu2) {
return 0;
} else {
return (pu1 < pu2) ? -1 : 1;
}
}
int32_t ensureBlockScanInfoBuf(SBlockInfoBuf* pBuf, int32_t numOfTables) { int32_t ensureBlockScanInfoBuf(SBlockInfoBuf* pBuf, int32_t numOfTables) {
if (numOfTables <= pBuf->numOfTables) { if (numOfTables <= pBuf->numOfTables) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -243,6 +245,10 @@ static void doCleanupInfoForNextFileset(STableBlockScanInfo* pScanInfo) {
taosArrayClear(pScanInfo->pBlockList); taosArrayClear(pScanInfo->pBlockList);
taosArrayClear(pScanInfo->pBlockIdxList); taosArrayClear(pScanInfo->pBlockIdxList);
taosArrayClear(pScanInfo->pFileDelData); // del data from each file set taosArrayClear(pScanInfo->pFileDelData); // del data from each file set
pScanInfo->cleanSttBlocks = false;
pScanInfo->numOfRowsInStt = 0;
pScanInfo->sttWindow.skey = INT64_MAX;
pScanInfo->sttWindow.ekey = INT64_MIN;
pScanInfo->sttKeyInfo.status = STT_FILE_READER_UNINIT; pScanInfo->sttKeyInfo.status = STT_FILE_READER_UNINIT;
} }
@ -488,7 +494,7 @@ typedef enum {
BLK_CHECK_QUIT = 0x2, BLK_CHECK_QUIT = 0x2,
} ETombBlkCheckEnum; } ETombBlkCheckEnum;
static void loadNextStatisticsBlock(SSttFileReader* pSttFileReader, const SSttBlockLoadInfo* pBlockLoadInfo, static void loadNextStatisticsBlock(SSttFileReader* pSttFileReader, STbStatisBlock* pStatisBlock,
const TStatisBlkArray* pStatisBlkArray, int32_t numOfRows, int32_t* i, int32_t* j); const TStatisBlkArray* pStatisBlkArray, int32_t numOfRows, int32_t* i, int32_t* j);
static int32_t doCheckTombBlock(STombBlock* pBlock, STsdbReader* pReader, int32_t numOfTables, int32_t* j, static int32_t doCheckTombBlock(STombBlock* pBlock, STsdbReader* pReader, int32_t numOfTables, int32_t* j,
ETombBlkCheckEnum* pRet) { ETombBlkCheckEnum* pRet) {
@ -662,17 +668,17 @@ void loadMemTombData(SArray** ppMemDelData, STbData* pMemTbData, STbData* piMemT
} }
} }
int32_t getNumOfRowsInSttBlock(SSttFileReader *pSttFileReader, SSttBlockLoadInfo *pBlockLoadInfo, uint64_t suid, int32_t getNumOfRowsInSttBlock(SSttFileReader* pSttFileReader, SSttBlockLoadInfo* pBlockLoadInfo,
const uint64_t* pUidList, int32_t numOfTables) { TStatisBlkArray* pStatisBlkArray, uint64_t suid, const uint64_t* pUidList,
int32_t numOfTables) {
int32_t num = 0; int32_t num = 0;
const TStatisBlkArray *pStatisBlkArray = pBlockLoadInfo->pSttStatisBlkArray;
if (TARRAY2_SIZE(pStatisBlkArray) <= 0) { if (TARRAY2_SIZE(pStatisBlkArray) <= 0) {
return 0; return 0;
} }
int32_t i = 0; int32_t i = 0;
while((i < TARRAY2_SIZE(pStatisBlkArray)) && (pStatisBlkArray->data[i].minTbid.suid < suid)) { while((i < TARRAY2_SIZE(pStatisBlkArray)) && (pStatisBlkArray->data[i].maxTbid.suid < suid)) {
++i; ++i;
} }
@ -681,64 +687,65 @@ int32_t getNumOfRowsInSttBlock(SSttFileReader *pSttFileReader, SSttBlockLoadInfo
} }
SStatisBlk *p = &pStatisBlkArray->data[i]; SStatisBlk *p = &pStatisBlkArray->data[i];
if (pBlockLoadInfo->statisBlock == NULL) { STbStatisBlock* pStatisBlock = taosMemoryCalloc(1, sizeof(STbStatisBlock));
pBlockLoadInfo->statisBlock = taosMemoryCalloc(1, sizeof(STbStatisBlock)); tStatisBlockInit(pStatisBlock);
tStatisBlockInit(pBlockLoadInfo->statisBlock);
}
int64_t st = taosGetTimestampMs(); int64_t st = taosGetTimestampMs();
tsdbSttFileReadStatisBlock(pSttFileReader, p, pBlockLoadInfo->statisBlock); tsdbSttFileReadStatisBlock(pSttFileReader, p, pStatisBlock);
pBlockLoadInfo->statisBlockIndex = i;
double el = (taosGetTimestampMs() - st) / 1000.0; double el = (taosGetTimestampMs() - st) / 1000.0;
pBlockLoadInfo->cost.loadStatisBlocks += 1; pBlockLoadInfo->cost.loadStatisBlocks += 1;
pBlockLoadInfo->cost.statisElapsedTime += el; pBlockLoadInfo->cost.statisElapsedTime += el;
STbStatisBlock *pBlock = pBlockLoadInfo->statisBlock;
int32_t index = 0; int32_t index = 0;
while (index < TARRAY2_SIZE(pBlock->suid) && pBlock->suid->data[index] < suid) { while (index < TARRAY2_SIZE(pStatisBlock->suid) && pStatisBlock->suid->data[index] < suid) {
++index; ++index;
} }
if (index >= TARRAY2_SIZE(pBlock->suid)) { if (index >= TARRAY2_SIZE(pStatisBlock->suid)) {
tStatisBlockDestroy(pStatisBlock);
taosMemoryFreeClear(pStatisBlock);
return num; return num;
} }
int32_t j = index; int32_t j = index;
int32_t uidIndex = 0; int32_t uidIndex = 0;
while (i < TARRAY2_SIZE(pStatisBlkArray) && uidIndex <= numOfTables) { while (i < TARRAY2_SIZE(pStatisBlkArray) && uidIndex < numOfTables) {
p = &pStatisBlkArray->data[i]; p = &pStatisBlkArray->data[i];
if (p->minTbid.suid > suid) { if (p->minTbid.suid > suid) {
tStatisBlockDestroy(pStatisBlock);
taosMemoryFreeClear(pStatisBlock);
return num; return num;
} }
uint64_t uid = pUidList[uidIndex]; uint64_t uid = pUidList[uidIndex];
if (pBlock->uid->data[j] == uid) { if (pStatisBlock->uid->data[j] == uid) {
num += pBlock->count->data[j]; num += pStatisBlock->count->data[j];
uidIndex += 1; uidIndex += 1;
j += 1; j += 1;
loadNextStatisticsBlock(pSttFileReader, pBlockLoadInfo, pStatisBlkArray, pBlock->suid->size, &i, &j); loadNextStatisticsBlock(pSttFileReader, pStatisBlock, pStatisBlkArray, pStatisBlock->suid->size, &i, &j);
} else if (pBlock->uid->data[j] < uid) { } else if (pStatisBlock->uid->data[j] < uid) {
j += 1; j += 1;
loadNextStatisticsBlock(pSttFileReader, pBlockLoadInfo, pStatisBlkArray, pBlock->suid->size, &i, &j); loadNextStatisticsBlock(pSttFileReader, pStatisBlock, pStatisBlkArray, pStatisBlock->suid->size, &i, &j);
} else { } else {
uidIndex += 1; uidIndex += 1;
} }
} }
tStatisBlockDestroy(pStatisBlock);
taosMemoryFreeClear(pStatisBlock);
return num; return num;
} }
// load next stt statistics block // load next stt statistics block
static void loadNextStatisticsBlock(SSttFileReader* pSttFileReader, const SSttBlockLoadInfo* pBlockLoadInfo, static void loadNextStatisticsBlock(SSttFileReader* pSttFileReader, STbStatisBlock* pStatisBlock,
const TStatisBlkArray* pStatisBlkArray, int32_t numOfRows, int32_t* i, int32_t* j) { const TStatisBlkArray* pStatisBlkArray, int32_t numOfRows, int32_t* i, int32_t* j) {
if ((*j) >= numOfRows) { if ((*j) >= numOfRows) {
(*i) += 1; (*i) += 1;
(*j) = 0; (*j) = 0;
if ((*i) < TARRAY2_SIZE(pStatisBlkArray)) { if ((*i) < TARRAY2_SIZE(pStatisBlkArray)) {
tsdbSttFileReadStatisBlock(pSttFileReader, &pStatisBlkArray->data[(*i)], pBlockLoadInfo->statisBlock); tsdbSttFileReadStatisBlock(pSttFileReader, &pStatisBlkArray->data[(*i)], pStatisBlock);
} }
} }
} }
@ -762,7 +769,7 @@ void doAdjustValidDataIters(SArray* pLDIterList, int32_t numOfFileObj) {
} }
} }
int32_t adjustLDataIters(SArray* pSttFileBlockIterArray, STFileSet* pFileSet) { int32_t adjustSttDataIters(SArray* pSttFileBlockIterArray, STFileSet* pFileSet) {
int32_t numOfLevels = pFileSet->lvlArr->size; int32_t numOfLevels = pFileSet->lvlArr->size;
// add the list/iter placeholder // add the list/iter placeholder
@ -791,7 +798,7 @@ int32_t tsdbGetRowsInSttFiles(STFileSet* pFileSet, SArray* pSttFileBlockIterArra
} }
// add the list/iter placeholder // add the list/iter placeholder
adjustLDataIters(pSttFileBlockIterArray, pFileSet); adjustSttDataIters(pSttFileBlockIterArray, pFileSet);
for (int32_t j = 0; j < numOfLevels; ++j) { for (int32_t j = 0; j < numOfLevels; ++j) {
SSttLvl* pSttLevel = pFileSet->lvlArr->data[j]; SSttLvl* pSttLevel = pFileSet->lvlArr->data[j];
@ -819,7 +826,8 @@ int32_t tsdbGetRowsInSttFiles(STFileSet* pFileSet, SArray* pSttFileBlockIterArra
} }
// load stt blocks statis for all stt-blocks, to decide if the data of queried table exists in current stt file // load stt blocks statis for all stt-blocks, to decide if the data of queried table exists in current stt file
int32_t code = tsdbSttFileReadStatisBlk(pIter->pReader, (const TStatisBlkArray **)&pIter->pBlockLoadInfo->pSttStatisBlkArray); TStatisBlkArray *pStatisBlkArray = NULL;
int32_t code = tsdbSttFileReadStatisBlk(pIter->pReader, (const TStatisBlkArray **)&pStatisBlkArray);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
tsdbError("failed to load stt block statistics, code:%s, %s", tstrerror(code), pstr); tsdbError("failed to load stt block statistics, code:%s, %s", tstrerror(code), pstr);
continue; continue;
@ -829,9 +837,199 @@ int32_t tsdbGetRowsInSttFiles(STFileSet* pFileSet, SArray* pSttFileBlockIterArra
STsdbReader* pReader = pConf->pReader; STsdbReader* pReader = pConf->pReader;
int32_t numOfTables = tSimpleHashGetSize(pReader->status.pTableMap); int32_t numOfTables = tSimpleHashGetSize(pReader->status.pTableMap);
uint64_t* pUidList = pReader->status.uidList.tableUidList; uint64_t* pUidList = pReader->status.uidList.tableUidList;
numOfRows += getNumOfRowsInSttBlock(pIter->pReader, pIter->pBlockLoadInfo, pConf->suid, pUidList, numOfTables); numOfRows += getNumOfRowsInSttBlock(pIter->pReader, pIter->pBlockLoadInfo, pStatisBlkArray, pConf->suid, pUidList,
numOfTables);
} }
} }
return numOfRows; return numOfRows;
}
// overlap with deletion skyline
static bool overlapWithTimeWindow(STimeWindow* p1, STimeWindow* pQueryWindow, STableBlockScanInfo* pBlockScanInfo,
int32_t order) {
// overlap with query window
if (!(p1->skey >= pQueryWindow->skey && p1->ekey <= pQueryWindow->ekey)) {
return true;
}
// overlap with mem data
SIterInfo* pMemIter = &pBlockScanInfo->iter;
SIterInfo* pIMemIter = &pBlockScanInfo->iiter;
if ((pMemIter->hasVal) && p1->ekey >= pMemIter->iter->pTbData->minKey && p1->skey <= pMemIter->iter->pTbData->maxKey) {
return true;
}
// overlap with imem data
if ((pIMemIter->hasVal) && p1->ekey >= pIMemIter->iter->pTbData->minKey && p1->skey <= pIMemIter->iter->pTbData->maxKey) {
return true;
}
// overlap with deletion skyline
SBrinRecord record = {.firstKey = p1->skey, .lastKey = p1->ekey};
if (overlapWithDelSkylineWithoutVer(pBlockScanInfo, &record, order)) {
return true;
}
return false;
}
static int32_t sortUidComparFn(const void* p1, const void* p2) {
const STimeWindow* px1 = p1;
const STimeWindow* px2 = p2;
if (px1->skey == px2->skey) {
return 0;
} else {
return px1->skey < px2->skey? -1:1;
}
}
bool isCleanSttBlock(SArray* pTimewindowList, STimeWindow* pQueryWindow, STableBlockScanInfo *pScanInfo, int32_t order) {
// check if it overlap with del skyline
taosArraySort(pTimewindowList, sortUidComparFn);
int32_t num = taosArrayGetSize(pTimewindowList);
if (num == 0) {
return false;
}
STimeWindow* p = taosArrayGet(pTimewindowList, 0);
if (overlapWithTimeWindow(p, pQueryWindow, pScanInfo, order)) {
return false;
}
for (int32_t i = 0; i < num - 1; ++i) {
STimeWindow* p1 = taosArrayGet(pTimewindowList, i);
STimeWindow* p2 = taosArrayGet(pTimewindowList, i + 1);
if (p1->ekey >= p2->skey) {
return false;
}
bool overlap = overlapWithTimeWindow(p2, pQueryWindow, pScanInfo, order);
if (overlap) {
return false;
}
}
return true;
}
static bool doCheckDatablockOverlap(STableBlockScanInfo* pBlockScanInfo, const SBrinRecord* pRecord,
int32_t startIndex) {
size_t num = taosArrayGetSize(pBlockScanInfo->delSkyline);
for (int32_t i = startIndex; i < num; i += 1) {
TSDBKEY* p = taosArrayGet(pBlockScanInfo->delSkyline, i);
if (p->ts >= pRecord->firstKey && p->ts <= pRecord->lastKey) {
if (p->version >= pRecord->minVer) {
return true;
}
} else if (p->ts < pRecord->firstKey) { // p->ts < pBlock->minKey.ts
if (p->version >= pRecord->minVer) {
if (i < num - 1) {
TSDBKEY* pnext = taosArrayGet(pBlockScanInfo->delSkyline, i + 1);
if (pnext->ts >= pRecord->firstKey) {
return true;
}
} else { // it must be the last point
ASSERT(p->version == 0);
}
}
} else { // (p->ts > pBlock->maxKey.ts) {
return false;
}
}
return false;
}
static bool doCheckDatablockOverlapWithoutVersion(STableBlockScanInfo* pBlockScanInfo, const SBrinRecord* pRecord,
int32_t startIndex) {
size_t num = taosArrayGetSize(pBlockScanInfo->delSkyline);
for (int32_t i = startIndex; i < num; i += 1) {
TSDBKEY* p = taosArrayGet(pBlockScanInfo->delSkyline, i);
if (p->ts >= pRecord->firstKey && p->ts <= pRecord->lastKey) {
return true;
} else if (p->ts < pRecord->firstKey) { // p->ts < pBlock->minKey.ts
if (i < num - 1) {
TSDBKEY* pnext = taosArrayGet(pBlockScanInfo->delSkyline, i + 1);
if (pnext->ts >= pRecord->firstKey) {
return true;
}
}
} else { // (p->ts > pBlock->maxKey.ts) {
return false;
}
}
return false;
}
bool overlapWithDelSkyline(STableBlockScanInfo* pBlockScanInfo, const SBrinRecord* pRecord, int32_t order) {
if (pBlockScanInfo->delSkyline == NULL || (taosArrayGetSize(pBlockScanInfo->delSkyline) == 0)) {
return false;
}
// ts is not overlap
TSDBKEY* pFirst = taosArrayGet(pBlockScanInfo->delSkyline, 0);
TSDBKEY* pLast = taosArrayGetLast(pBlockScanInfo->delSkyline);
if (pRecord->firstKey > pLast->ts || pRecord->lastKey < pFirst->ts) {
return false;
}
// version is not overlap
if (ASCENDING_TRAVERSE(order)) {
return doCheckDatablockOverlap(pBlockScanInfo, pRecord, pBlockScanInfo->fileDelIndex);
} else {
int32_t index = pBlockScanInfo->fileDelIndex;
while (1) {
TSDBKEY* p = taosArrayGet(pBlockScanInfo->delSkyline, index);
if (p->ts > pRecord->firstKey && index > 0) {
index -= 1;
} else { // find the first point that is smaller than the minKey.ts of dataBlock.
if (p->ts == pRecord->firstKey && p->version < pRecord->maxVer && index > 0) {
index -= 1;
}
break;
}
}
return doCheckDatablockOverlap(pBlockScanInfo, pRecord, index);
}
}
bool overlapWithDelSkylineWithoutVer(STableBlockScanInfo* pBlockScanInfo, const SBrinRecord* pRecord, int32_t order) {
if (pBlockScanInfo->delSkyline == NULL || (taosArrayGetSize(pBlockScanInfo->delSkyline) == 0)) {
return false;
}
// ts is not overlap
TSDBKEY* pFirst = taosArrayGet(pBlockScanInfo->delSkyline, 0);
TSDBKEY* pLast = taosArrayGetLast(pBlockScanInfo->delSkyline);
if (pRecord->firstKey > pLast->ts || pRecord->lastKey < pFirst->ts) {
return false;
}
// version is not overlap
if (ASCENDING_TRAVERSE(order)) {
return doCheckDatablockOverlapWithoutVersion(pBlockScanInfo, pRecord, pBlockScanInfo->fileDelIndex);
} else {
int32_t index = pBlockScanInfo->fileDelIndex;
while (1) {
TSDBKEY* p = taosArrayGet(pBlockScanInfo->delSkyline, index);
if (p->ts > pRecord->firstKey && index > 0) {
index -= 1;
} else { // find the first point that is smaller than the minKey.ts of dataBlock.
if (p->ts == pRecord->firstKey && index > 0) {
index -= 1;
}
break;
}
}
return doCheckDatablockOverlapWithoutVersion(pBlockScanInfo, pRecord, index);
}
} }

View File

@ -39,8 +39,7 @@ typedef enum {
typedef struct STsdbReaderInfo { typedef struct STsdbReaderInfo {
uint64_t suid; uint64_t suid;
STSchema* pSchema; STSchema* pSchema;
EReadMode readMode; EExecMode execMode;
uint64_t rowsNum;
STimeWindow window; STimeWindow window;
SVersionRange verRange; SVersionRange verRange;
int16_t order; int16_t order;
@ -74,6 +73,11 @@ typedef struct SSttKeyInfo {
int64_t nextProcKey; int64_t nextProcKey;
} SSttKeyInfo; } SSttKeyInfo;
// clean stt file blocks:
// 1. not overlap with stt blocks in other stt files of the same fileset
// 2. not overlap with delete skyline
// 3. not overlap with in-memory data (mem/imem)
// 4. not overlap with data file blocks
typedef struct STableBlockScanInfo { typedef struct STableBlockScanInfo {
uint64_t uid; uint64_t uid;
TSKEY lastProcKey; TSKEY lastProcKey;
@ -88,6 +92,9 @@ typedef struct STableBlockScanInfo {
int32_t fileDelIndex; // file block delete index int32_t fileDelIndex; // file block delete index
int32_t sttBlockDelIndex; // delete index for last block int32_t sttBlockDelIndex; // delete index for last block
bool iterInit; // whether to initialize the in-memory skip list iterator or not bool iterInit; // whether to initialize the in-memory skip list iterator or not
bool cleanSttBlocks; // stt block is clean in current fileset
int64_t numOfRowsInStt;
STimeWindow sttWindow;
} STableBlockScanInfo; } STableBlockScanInfo;
typedef struct SResultBlockInfo { typedef struct SResultBlockInfo {
@ -145,6 +152,7 @@ typedef struct SBlockLoadSuppInfo {
bool smaValid; // the sma on all queried columns are activated bool smaValid; // the sma on all queried columns are activated
} SBlockLoadSuppInfo; } SBlockLoadSuppInfo;
// each blocks in stt file not overlaps with in-memory/data-file/tomb-files, and not overlap with any other blocks in stt-file
typedef struct SSttBlockReader { typedef struct SSttBlockReader {
STimeWindow window; STimeWindow window;
SVersionRange verRange; SVersionRange verRange;
@ -262,12 +270,17 @@ bool blockIteratorNext(SDataBlockIter* pBlockIter, const char* idStr);
void loadMemTombData(SArray** ppMemDelData, STbData* pMemTbData, STbData* piMemTbData, int64_t ver); void loadMemTombData(SArray** ppMemDelData, STbData* pMemTbData, STbData* piMemTbData, int64_t ver);
int32_t loadDataFileTombDataForAll(STsdbReader* pReader); int32_t loadDataFileTombDataForAll(STsdbReader* pReader);
int32_t loadSttTombDataForAll(STsdbReader* pReader, SSttFileReader* pSttFileReader, SSttBlockLoadInfo* pLoadInfo); int32_t loadSttTombDataForAll(STsdbReader* pReader, SSttFileReader* pSttFileReader, SSttBlockLoadInfo* pLoadInfo);
int32_t getNumOfRowsInSttBlock(SSttFileReader *pSttFileReader, SSttBlockLoadInfo *pBlockLoadInfo, uint64_t suid, int32_t getNumOfRowsInSttBlock(SSttFileReader* pSttFileReader, SSttBlockLoadInfo* pBlockLoadInfo,
const uint64_t* pUidList, int32_t numOfTables); TStatisBlkArray* pStatisBlkArray, uint64_t suid, const uint64_t* pUidList,
int32_t numOfTables);
void destroyLDataIter(SLDataIter* pIter); void destroyLDataIter(SLDataIter* pIter);
int32_t adjustLDataIters(SArray* pSttFileBlockIterArray, STFileSet* pFileSet); int32_t adjustSttDataIters(SArray* pSttFileBlockIterArray, STFileSet* pFileSet);
int32_t tsdbGetRowsInSttFiles(STFileSet* pFileSet, SArray* pSttFileBlockIterArray, STsdb* pTsdb, SMergeTreeConf* pConf, int32_t tsdbGetRowsInSttFiles(STFileSet* pFileSet, SArray* pSttFileBlockIterArray, STsdb* pTsdb, SMergeTreeConf* pConf,
const char* pstr); const char* pstr);
bool isCleanSttBlock(SArray* pTimewindowList, STimeWindow* pQueryWindow, STableBlockScanInfo* pScanInfo, int32_t order);
bool overlapWithDelSkyline(STableBlockScanInfo* pBlockScanInfo, const SBrinRecord* pRecord, int32_t order);
typedef struct { typedef struct {
SArray* pTombData; SArray* pTombData;
} STableLoadInfo; } STableLoadInfo;

View File

@ -42,7 +42,7 @@ void initStorageAPI(SStorageAPI* pAPI) {
void initTsdbReaderAPI(TsdReader* pReader) { void initTsdbReaderAPI(TsdReader* pReader) {
pReader->tsdReaderOpen = (int32_t(*)(void*, SQueryTableDataCond*, void*, int32_t, SSDataBlock*, void**, const char*, pReader->tsdReaderOpen = (int32_t(*)(void*, SQueryTableDataCond*, void*, int32_t, SSDataBlock*, void**, const char*,
bool, SHashObj**))tsdbReaderOpen2; SHashObj**))tsdbReaderOpen2;
pReader->tsdReaderClose = tsdbReaderClose2; pReader->tsdReaderClose = tsdbReaderClose2;
pReader->tsdNextDataBlock = tsdbNextDataBlock2; pReader->tsdNextDataBlock = tsdbNextDataBlock2;

View File

@ -1232,7 +1232,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
if (pScanBaseInfo->dataReader == NULL) { if (pScanBaseInfo->dataReader == NULL) {
int32_t code = pTaskInfo->storageAPI.tsdReader.tsdReaderOpen( int32_t code = pTaskInfo->storageAPI.tsdReader.tsdReaderOpen(
pScanBaseInfo->readHandle.vnode, &pScanBaseInfo->cond, &keyInfo, 1, pScanInfo->pResBlock, pScanBaseInfo->readHandle.vnode, &pScanBaseInfo->cond, &keyInfo, 1, pScanInfo->pResBlock,
(void**)&pScanBaseInfo->dataReader, id, false, NULL); (void**)&pScanBaseInfo->dataReader, id, NULL);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
qError("prepare read tsdb snapshot failed, uid:%" PRId64 ", code:%s %s", pOffset->uid, tstrerror(code), id); qError("prepare read tsdb snapshot failed, uid:%" PRId64 ", code:%s %s", pOffset->uid, tstrerror(code), id);
terrno = code; terrno = code;
@ -1291,7 +1291,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
int32_t size = tableListGetSize(pTableListInfo); int32_t size = tableListGetSize(pTableListInfo);
pTaskInfo->storageAPI.tsdReader.tsdReaderOpen(pInfo->vnode, &pTaskInfo->streamInfo.tableCond, pList, size, NULL, pTaskInfo->storageAPI.tsdReader.tsdReaderOpen(pInfo->vnode, &pTaskInfo->streamInfo.tableCond, pList, size, NULL,
(void**)&pInfo->dataReader, NULL, false, NULL); (void**)&pInfo->dataReader, NULL, NULL);
cleanupQueryTableDataCond(&pTaskInfo->streamInfo.tableCond); cleanupQueryTableDataCond(&pTaskInfo->streamInfo.tableCond);
strcpy(pTaskInfo->streamInfo.tbName, mtInfo.tbName); strcpy(pTaskInfo->streamInfo.tbName, mtInfo.tbName);

View File

@ -889,7 +889,7 @@ static SSDataBlock* groupSeqTableScan(SOperatorInfo* pOperator) {
ASSERT(pInfo->base.dataReader == NULL); ASSERT(pInfo->base.dataReader == NULL);
int32_t code = pAPI->tsdReader.tsdReaderOpen(pInfo->base.readHandle.vnode, &pInfo->base.cond, pList, num, pInfo->pResBlock, int32_t code = pAPI->tsdReader.tsdReaderOpen(pInfo->base.readHandle.vnode, &pInfo->base.cond, pList, num, pInfo->pResBlock,
(void**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo), pInfo->countOnly, &pInfo->pIgnoreTables); (void**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo), &pInfo->pIgnoreTables);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, code); T_LONG_JMP(pTaskInfo->env, code);
} }
@ -1179,7 +1179,7 @@ static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbU
SSDataBlock* pBlock = pTableScanInfo->pResBlock; SSDataBlock* pBlock = pTableScanInfo->pResBlock;
STsdbReader* pReader = NULL; STsdbReader* pReader = NULL;
int32_t code = pAPI->tsdReader.tsdReaderOpen(pTableScanInfo->base.readHandle.vnode, &cond, &tblInfo, 1, pBlock, int32_t code = pAPI->tsdReader.tsdReaderOpen(pTableScanInfo->base.readHandle.vnode, &cond, &tblInfo, 1, pBlock,
(void**)&pReader, GET_TASKID(pTaskInfo), false, NULL); (void**)&pReader, GET_TASKID(pTaskInfo), NULL);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
terrno = code; terrno = code;
T_LONG_JMP(pTaskInfo->env, code); T_LONG_JMP(pTaskInfo->env, code);
@ -3373,7 +3373,7 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
param->pOperator = pOperator; param->pOperator = pOperator;
STableKeyInfo* startKeyInfo = tableListGetInfo(pInfo->base.pTableListInfo, tableStartIdx); STableKeyInfo* startKeyInfo = tableListGetInfo(pInfo->base.pTableListInfo, tableStartIdx);
pAPI->tsdReader.tsdReaderOpen(pHandle->vnode, &pInfo->base.cond, startKeyInfo, numOfTable, pInfo->pReaderBlock, pAPI->tsdReader.tsdReaderOpen(pHandle->vnode, &pInfo->base.cond, startKeyInfo, numOfTable, pInfo->pReaderBlock,
(void**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo), false, &pInfo->mSkipTables); (void**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo), &pInfo->mSkipTables);
SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource)); SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource));
ps->param = param; ps->param = param;

View File

@ -2304,7 +2304,7 @@ SOperatorInfo* createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDi
void* pList = tableListGetInfo(pTableListInfo, 0); void* pList = tableListGetInfo(pTableListInfo, 0);
code = readHandle->api.tsdReader.tsdReaderOpen(readHandle->vnode, &cond, pList, num, pInfo->pResBlock, code = readHandle->api.tsdReader.tsdReaderOpen(readHandle->vnode, &cond, pList, num, pInfo->pResBlock,
(void**)&pInfo->pHandle, pTaskInfo->id.str, false, NULL); (void**)&pInfo->pHandle, pTaskInfo->id.str, NULL);
cleanupQueryTableDataCond(&cond); cleanupQueryTableDataCond(&cond);
if (code != 0) { if (code != 0) {
goto _error; goto _error;