enh: optimize count performance
This commit is contained in:
parent
7cf169b8d0
commit
61571d9296
|
@ -1180,7 +1180,7 @@ void blockDataCleanup(SSDataBlock* pDataBlock) {
|
|||
|
||||
void blockDataEmpty(SSDataBlock* pDataBlock) {
|
||||
SDataBlockInfo* pInfo = &pDataBlock->info;
|
||||
if (pInfo->capacity == 0 || pInfo->rows > pDataBlock->info.capacity) {
|
||||
if (pInfo->capacity == 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
|
|
|
@ -24,6 +24,11 @@ typedef enum {
|
|||
EXTERNAL_ROWS_NEXT = 0x3,
|
||||
} EContentData;
|
||||
|
||||
typedef enum {
|
||||
READ_MODE_COUNT_ONLY = 0x1,
|
||||
READ_MODE_ALL,
|
||||
} EReadMode;
|
||||
|
||||
typedef struct {
|
||||
STbDataIter* iter;
|
||||
int32_t index;
|
||||
|
@ -167,6 +172,8 @@ struct STsdbReader {
|
|||
uint64_t suid;
|
||||
int16_t order;
|
||||
bool freeBlock;
|
||||
EReadMode readMode;
|
||||
uint64_t rowsNum;
|
||||
STimeWindow window; // the primary query time window that applies to all queries
|
||||
SSDataBlock* pResBlock;
|
||||
int32_t capacity;
|
||||
|
@ -2998,6 +3005,9 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
|
|||
TSDBKEY keyInBuf = getCurrentKeyInBuf(pScanInfo, pReader);
|
||||
|
||||
if (fileBlockShouldLoad(pReader, pBlockInfo, pBlock, pScanInfo, keyInBuf, pLastBlockReader)) {
|
||||
if (READ_MODE_COUNT_ONLY == pReader->readMode && pReader->rowsNum > 0) {
|
||||
return code;
|
||||
}
|
||||
code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData, pScanInfo->uid);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
|
@ -3006,12 +3016,19 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
|
|||
// build composed data block
|
||||
code = buildComposedDataBlock(pReader);
|
||||
} else if (bufferDataInFileBlockGap(pReader->order, keyInBuf, pBlock)) {
|
||||
if (READ_MODE_COUNT_ONLY == pReader->readMode && pReader->rowsNum > 0) {
|
||||
return code;
|
||||
}
|
||||
// data in memory that are earlier than current file block
|
||||
// rows in buffer should be less than the file block in asc, greater than file block in desc
|
||||
int64_t endKey = (ASCENDING_TRAVERSE(pReader->order)) ? pBlock->minKey.ts : pBlock->maxKey.ts;
|
||||
code = buildDataBlockFromBuf(pReader, pScanInfo, endKey);
|
||||
} else {
|
||||
if (hasDataInLastBlock(pLastBlockReader) && !ASCENDING_TRAVERSE(pReader->order)) {
|
||||
if (READ_MODE_COUNT_ONLY == pReader->readMode && pReader->rowsNum > 0) {
|
||||
return code;
|
||||
}
|
||||
|
||||
// only return the rows in last block
|
||||
int64_t tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
|
||||
ASSERT(tsLast >= pBlock->maxKey.ts);
|
||||
|
@ -3069,6 +3086,131 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
|
|||
return code;
|
||||
}
|
||||
|
||||
|
||||
static int32_t doSumBlockRows(STsdbReader* pReader, SDataFReader* pFileReader) {
|
||||
int64_t st = taosGetTimestampUs();
|
||||
LRUHandle* handle = NULL;
|
||||
int32_t code = tsdbCacheGetBlockIdx(pFileReader->pTsdb->biCache, pFileReader, &handle);
|
||||
if (code != TSDB_CODE_SUCCESS || handle == NULL) {
|
||||
goto _end;
|
||||
}
|
||||
|
||||
int32_t numOfTables = taosHashGetSize(pReader->status.pTableMap);
|
||||
|
||||
SArray* aBlockIdx = (SArray*)taosLRUCacheValue(pFileReader->pTsdb->biCache, handle);
|
||||
size_t num = taosArrayGetSize(aBlockIdx);
|
||||
if (num == 0) {
|
||||
tsdbBICacheRelease(pFileReader->pTsdb->biCache, handle);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SBlockIdx* pBlockIdx = NULL;
|
||||
int32_t i = 0;
|
||||
for (int32_t i = 0; i < num; ++i) {
|
||||
pBlockIdx = (SBlockIdx*)taosArrayGet(aBlockIdx, i);
|
||||
if (pBlockIdx->suid != pReader->suid) {
|
||||
continue;
|
||||
}
|
||||
|
||||
STableBlockScanInfo** p = taosHashGet(pReader->status.pTableMap, &pBlockIdx->uid, sizeof(pBlockIdx->uid));
|
||||
if (p == NULL || *p == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
STableBlockScanInfo *pScanInfo = *p;
|
||||
tMapDataReset(&pScanInfo->mapData);
|
||||
tsdbReadDataBlk(pReader->pFileReader, pBlockIdx, &pScanInfo->mapData);
|
||||
|
||||
SDataBlk block = {0};
|
||||
for (int32_t j = 0; j < pScanInfo->mapData.nItem; ++j) {
|
||||
tGetDataBlk(pScanInfo->mapData.pData + pScanInfo->mapData.aOffset[j], &block);
|
||||
pReader->rowsNum += block.nRow;
|
||||
}
|
||||
}
|
||||
|
||||
_end:
|
||||
tsdbBICacheRelease(pFileReader->pTsdb->biCache, handle);
|
||||
return code;
|
||||
}
|
||||
|
||||
|
||||
static int32_t readRowsCountFromFile(STsdbReader* pReader) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
while (1) {
|
||||
bool hasNext = false;
|
||||
int32_t code = filesetIteratorNext(&pReader->status.fileIter, pReader, &hasNext);
|
||||
if (code) {
|
||||
return code;
|
||||
}
|
||||
|
||||
if (!hasNext) { // no data files on disk
|
||||
break;
|
||||
}
|
||||
|
||||
code = doSumBlockRows(pReader, pReader->pFileReader);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
}
|
||||
|
||||
pReader->status.loadFromFile = false;
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t readRowsCountFromStt(STsdbReader* pReader) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SLastBlockReader* pLastBlockReader = pReader->status.fileIter.pLastBlockReader;
|
||||
SSttBlockLoadInfo* pBlockLoadInfo = NULL;
|
||||
|
||||
for (int32_t i = 0; i < pReader->pFileReader->pSet->nSttF; ++i) { // open all last file
|
||||
pBlockLoadInfo = &pLastBlockReader->pInfo[i];
|
||||
|
||||
if (!pLastBlockReader->pInfo[i].sttBlockLoaded) {
|
||||
pLastBlockReader->pInfo[i].sttBlockLoaded = true;
|
||||
|
||||
code = tsdbReadSttBlk(pReader->pFileReader, i, pBlockLoadInfo->aSttBlk);
|
||||
if (code) {
|
||||
return code;
|
||||
}
|
||||
}
|
||||
|
||||
size_t size = taosArrayGetSize(pBlockLoadInfo->aSttBlk);
|
||||
|
||||
if (size >= 1) {
|
||||
SSttBlk *pStart = taosArrayGet(pBlockLoadInfo->aSttBlk, 0);
|
||||
SSttBlk *pEnd = taosArrayGet(pBlockLoadInfo->aSttBlk, size - 1);
|
||||
|
||||
// all identical
|
||||
if (pStart->suid == pEnd->suid) {
|
||||
if (pStart->suid != pReader->suid) {
|
||||
// no qualified stt block existed
|
||||
taosArrayClear(pBlockLoadInfo->aSttBlk);
|
||||
continue;
|
||||
}
|
||||
} else {
|
||||
for (int32_t i = 0; i < size; ++i) {
|
||||
SSttBlk *p = taosArrayGet(pBlockLoadInfo->aSttBlk, i);
|
||||
uint64_t s = p->suid;
|
||||
if (s < pReader->suid) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (s == pReader->suid) {
|
||||
pReader->rowsNum += p->nRow;
|
||||
} else if (s > pReader->suid) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
|
||||
static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) {
|
||||
SReaderStatus* pStatus = &pReader->status;
|
||||
STableUidList* pUidList = &pStatus->uidList;
|
||||
|
@ -3212,6 +3354,12 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) {
|
|||
initBlockDumpInfo(pReader, pBlockIter);
|
||||
} else {
|
||||
if (pReader->status.pCurrentFileset->nSttF > 0) {
|
||||
if (READ_MODE_COUNT_ONLY == pReader->readMode && pReader->rowsNum > 0) {
|
||||
pReader->pResBlock->info.rows = pReader->rowsNum;
|
||||
pReader->rowsNum = 0;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
// data blocks in current file are exhausted, let's try the next file now
|
||||
SBlockData* pBlockData = &pReader->status.fileBlockData;
|
||||
if (pBlockData->uid != 0) {
|
||||
|
@ -3226,7 +3374,17 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) {
|
|||
code = initForFirstBlockInFile(pReader, pBlockIter);
|
||||
|
||||
// error happens or all the data files are completely checked
|
||||
if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) {
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
||||
if (READ_MODE_COUNT_ONLY == pReader->readMode && pReader->rowsNum > 0) {
|
||||
pReader->pResBlock->info.rows = pReader->rowsNum;
|
||||
pReader->rowsNum = 0;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (pReader->status.loadFromFile == false) {
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -3240,6 +3398,17 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) {
|
|||
}
|
||||
|
||||
code = doBuildDataBlock(pReader);
|
||||
if (READ_MODE_COUNT_ONLY == pReader->readMode) {
|
||||
if (false == pReader->status.composedDataBlock && pDumpInfo->allDumped) {
|
||||
pReader->rowsNum += pReader->pResBlock->info.rows;
|
||||
pReader->pResBlock->info.rows = 0;
|
||||
continue;
|
||||
} else if (pReader->pResBlock->info.rows == 0 && pReader->rowsNum > 0) {
|
||||
pReader->pResBlock->info.rows = pReader->rowsNum;
|
||||
pReader->rowsNum = 0;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -3986,6 +4155,8 @@ static int32_t doOpenReaderImpl(STsdbReader* pReader) {
|
|||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
if (pStatus->fileIter.numOfFiles == 0) {
|
||||
pStatus->loadFromFile = false;
|
||||
} else if (READ_MODE_COUNT_ONLY == pReader->readMode) {
|
||||
// DO NOTHING
|
||||
} else {
|
||||
code = initForFirstBlockInFile(pReader, pBlockIter);
|
||||
}
|
||||
|
@ -4091,6 +4262,9 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableL
|
|||
|
||||
pReader->suspended = true;
|
||||
|
||||
|
||||
pReader->readMode = READ_MODE_COUNT_ONLY;
|
||||
|
||||
tsdbDebug("%p total numOfTable:%d in this query %s", pReader, numOfTables, pReader->idStr);
|
||||
return code;
|
||||
|
||||
|
@ -4394,6 +4568,35 @@ _err:
|
|||
return code;
|
||||
}
|
||||
|
||||
static bool tsdbReadRowsCountOnly(STsdbReader* pReader) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SSDataBlock* pBlock = pReader->pResBlock;
|
||||
|
||||
while (1) {
|
||||
if (pReader->status.loadFromFile == false) {
|
||||
break;
|
||||
}
|
||||
|
||||
code = readRowsCountFromFile(pReader);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return false;
|
||||
}
|
||||
|
||||
code = readRowsCountFromStt(pReader);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
pBlock->info.rows = pReader->rowsNum;
|
||||
pBlock->info.id.uid = 0;
|
||||
pBlock->info.dataLoad = 0;
|
||||
|
||||
pReader->rowsNum = 0;
|
||||
|
||||
return pBlock->info.rows > 0;
|
||||
}
|
||||
|
||||
static bool doTsdbNextDataBlock(STsdbReader* pReader) {
|
||||
// cleanup the data that belongs to the previous data block
|
||||
SSDataBlock* pBlock = pReader->pResBlock;
|
||||
|
@ -4404,6 +4607,10 @@ static bool doTsdbNextDataBlock(STsdbReader* pReader) {
|
|||
return false;
|
||||
}
|
||||
|
||||
if (READ_MODE_COUNT_ONLY == pReader->readMode) {
|
||||
return tsdbReadRowsCountOnly(pReader);
|
||||
}
|
||||
|
||||
if (pStatus->loadFromFile) {
|
||||
int32_t code = buildBlockFromFiles(pReader);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
|
|
@ -654,9 +654,10 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
|
|||
continue;
|
||||
}
|
||||
|
||||
ASSERT(pBlock->info.id.uid != 0);
|
||||
pBlock->info.id.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBlock->info.id.uid);
|
||||
|
||||
if (pBlock->info.id.uid) {
|
||||
pBlock->info.id.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBlock->info.id.uid);
|
||||
}
|
||||
|
||||
uint32_t status = 0;
|
||||
int32_t code = loadDataBlock(pOperator, &pTableScanInfo->base, pBlock, &status);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -680,7 +681,6 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
|
|||
pTaskInfo->streamInfo.lastStatus.uid = pBlock->info.id.uid;
|
||||
pTaskInfo->streamInfo.lastStatus.ts = pBlock->info.window.ekey;
|
||||
|
||||
ASSERT(pBlock->info.id.uid != 0);
|
||||
return pBlock;
|
||||
}
|
||||
return NULL;
|
||||
|
@ -797,7 +797,6 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
|
|||
|
||||
SSDataBlock* result = doGroupedTableScan(pOperator);
|
||||
if (result != NULL) {
|
||||
ASSERT(result->info.id.uid != 0);
|
||||
return result;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue