Merge pull request #20228 from taosdata/feature/3_liaohj

fix(query): record the correct last accessed key in the stt file.
This commit is contained in:
Haojun Liao 2023-03-02 17:13:31 +08:00 committed by GitHub
commit 37281ae17f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 63 additions and 28 deletions

View File

@ -404,7 +404,7 @@ void tLDataIterNextBlock(SLDataIter *pIter, const char *idStr) {
tsdbDebug("try next last file block:%d from %d, trigger by uid:%" PRIu64 ", file index:%d, %s", pIter->iSttBlk,
oldIndex, pIter->uid, pIter->iStt, idStr);
} else {
tsdbDebug("no more last block qualified, uid:%" PRIu64 ", file index::%d, %s", pIter->uid, oldIndex, idStr);
tsdbDebug("no more last block qualified, uid:%" PRIu64 ", file index:%d, %s", pIter->uid, oldIndex, idStr);
}
}

View File

@ -44,6 +44,7 @@ typedef struct SBlockIndex {
typedef struct STableBlockScanInfo {
uint64_t uid;
TSKEY lastKey;
TSKEY lastKeyInStt; // last accessed key in stt
SMapData mapData; // block info (compressed)
SArray* pBlockList; // block data index list, SArray<SBlockIndex>
SIterInfo iter; // mem buffer skip list iterator
@ -192,7 +193,7 @@ static TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbRe
static int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader,
SRowMerger* pMerger);
static int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts,
SRowMerger* pMerger, SVersionRange* pVerRange);
SRowMerger* pMerger, SVersionRange* pVerRange, const char* id);
static int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, SRowMerger* pMerger,
STsdbReader* pReader);
static int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, SRow* pTSRow,
@ -402,9 +403,11 @@ static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, SBlockInfoBuf
if (ASCENDING_TRAVERSE(pTsdbReader->order)) {
int64_t skey = pTsdbReader->window.skey;
pScanInfo->lastKey = (skey > INT64_MIN) ? (skey - 1) : skey;
pScanInfo->lastKeyInStt = skey;
} else {
int64_t ekey = pTsdbReader->window.ekey;
pScanInfo->lastKey = (ekey < INT64_MAX) ? (ekey + 1) : ekey;
pScanInfo->lastKeyInStt = ekey;
}
taosHashPut(pTableMap, &pScanInfo->uid, sizeof(uint64_t), &pScanInfo, POINTER_BYTES);
@ -1795,21 +1798,18 @@ static bool nextRowFromLastBlocks(SLastBlockReader* pLastBlockReader, STableBloc
while (1) {
bool hasVal = tMergeTreeNext(&pLastBlockReader->mergeTree);
if (!hasVal) {
if (!hasVal) { // the next value will be the accessed key in stt
pScanInfo->lastKeyInStt += step;
return false;
}
TSDBROW row = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
TSDBKEY k = TSDBROW_KEY(&row);
if (hasBeenDropped(pScanInfo->delSkyline, &pScanInfo->lastBlockDelIndex, &k, pLastBlockReader->order, pVerRange)) {
pScanInfo->lastKey = k.ts;
} else {
pScanInfo->lastKeyInStt = k.ts;
if (!hasBeenDropped(pScanInfo->delSkyline, &pScanInfo->lastBlockDelIndex, &k, pLastBlockReader->order, pVerRange)) {
// the qualifed ts may equal to k.ts, only a greater version one.
// here we need to fallback one step.
if (pScanInfo->lastKey == k.ts) {
pScanInfo->lastKey -= step;
}
return true;
}
}
@ -1949,7 +1949,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
return code;
}
}
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange);
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange, pReader->idStr);
}
if (minKey == k.ts) {
@ -1997,7 +1997,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
return code;
}
}
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange);
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange, pReader->idStr);
}
if (minKey == key) {
@ -2050,7 +2050,7 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader,
TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
tsdbRowMerge(&merge, &fRow1);
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge, &pReader->verRange);
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge, &pReader->verRange, pReader->idStr);
code = tsdbRowMergerGetRow(&merge, &pTSRow);
if (code != TSDB_CODE_SUCCESS) {
@ -2068,7 +2068,7 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader,
return code;
}
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge, &pReader->verRange);
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge, &pReader->verRange, pReader->idStr);
// merge with block data if ts == key
if (tsLastBlock == pBlockData->aTSKEY[pDumpInfo->rowIndex]) {
@ -2121,7 +2121,7 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader
TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
tsdbRowMerge(&merge, &fRow1);
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, ts, &merge, &pReader->verRange);
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, ts, &merge, &pReader->verRange, pReader->idStr);
code = tsdbRowMergerGetRow(&merge, &pTSRow);
if (code != TSDB_CODE_SUCCESS) {
@ -2230,7 +2230,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
}
}
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange);
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange, pReader->idStr);
}
if (minKey == ik.ts) {
@ -2321,7 +2321,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
return code;
}
}
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange);
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange, pReader->idStr);
}
if (minKey == key) {
@ -2472,9 +2472,9 @@ static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScan
int32_t step = ASCENDING_TRAVERSE(pLBlockReader->order) ? 1 : -1;
STimeWindow w = pLBlockReader->window;
if (ASCENDING_TRAVERSE(pLBlockReader->order)) {
w.skey = pScanInfo->lastKey + step;
w.skey = pScanInfo->lastKeyInStt;
} else {
w.ekey = pScanInfo->lastKey + step;
w.ekey = pScanInfo->lastKeyInStt;
}
tsdbDebug("init last block reader, window:%" PRId64 "-%" PRId64 ", uid:%" PRIu64 ", %s", w.skey, w.ekey,
@ -3568,13 +3568,16 @@ int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pSc
}
int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts,
SRowMerger* pMerger, SVersionRange* pVerRange) {
SRowMerger* pMerger, SVersionRange* pVerRange, const char* idStr) {
while (nextRowFromLastBlocks(pLastBlockReader, pScanInfo, pVerRange)) {
int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader);
if (next1 == ts) {
TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
tsdbRowMerge(pMerger, &fRow1);
} else {
tsdbTrace("uid:%" PRIu64 " last del index:%d, del range:%d, lastKeyInStt:%" PRId64 ", %s", pScanInfo->uid,
pScanInfo->lastBlockDelIndex, (int32_t)taosArrayGetSize(pScanInfo->delSkyline), pScanInfo->lastKeyInStt,
idStr);
break;
}
}
@ -3937,6 +3940,17 @@ int32_t tsdbSetTableList(STsdbReader* pReader, const void* pTableList, int32_t n
pInfo->uid = pList[i].uid;
pUidList->tableUidList[i] = pList[i].uid;
// todo extract method
if (ASCENDING_TRAVERSE(pReader->order)) {
int64_t skey = pReader->window.skey;
pInfo->lastKey = (skey > INT64_MIN) ? (skey - 1) : skey;
pInfo->lastKeyInStt = skey;
} else {
int64_t ekey = pReader->window.ekey;
pInfo->lastKey = (ekey < INT64_MAX) ? (ekey + 1) : ekey;
pInfo->lastKeyInStt = ekey;
}
taosHashPut(pReader->status.pTableMap, &pInfo->uid, sizeof(uint64_t), &pInfo, POINTER_BYTES);
}
@ -4721,8 +4735,12 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
return code;
}
static int32_t getBucketIndex(int32_t startRow, int32_t bucketRange, int32_t numOfRows) {
return (numOfRows - startRow) / bucketRange;
static int32_t getBucketIndex(int32_t startRow, int32_t bucketRange, int32_t numOfRows, int32_t numOfBucket) {
int32_t bucketIndex = (numOfRows - startRow) / bucketRange;
if (bucketIndex == numOfBucket) {
bucketIndex -= 1;
}
return bucketIndex;
}
int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTableBlockInfo) {
@ -4731,8 +4749,9 @@ int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTa
pTableBlockInfo->totalRows = 0;
pTableBlockInfo->numOfVgroups = 1;
// find the start data block in file
const int32_t numOfBucket = 20.0;
// find the start data block in file
tsdbAcquireReader(pReader);
if (pReader->suspended) {
tsdbReaderResume(pReader);
@ -4743,7 +4762,7 @@ int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTa
pTableBlockInfo->defMinRows = pc->minRows;
pTableBlockInfo->defMaxRows = pc->maxRows;
int32_t bucketRange = ceil((pc->maxRows - pc->minRows) / 20.0);
int32_t bucketRange = ceil((pc->maxRows - pc->minRows) / numOfBucket);
pTableBlockInfo->numOfFiles += 1;
@ -4781,7 +4800,7 @@ int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTa
pTableBlockInfo->totalSize += pBlock->aSubBlock[0].szBlock;
int32_t bucketIndex = getBucketIndex(pTableBlockInfo->defMinRows, bucketRange, numOfRows);
int32_t bucketIndex = getBucketIndex(pTableBlockInfo->defMinRows, bucketRange, numOfRows, numOfBucket);
pTableBlockInfo->blockRowsHisto[bucketIndex]++;
hasNext = blockIteratorNext(&pStatus->blockIter, pReader->idStr);

View File

@ -45,6 +45,8 @@ class TDSimClient:
"supportVnodes": "1024",
"enableQueryHb": "1",
"telemetryReporting": "0",
"tqDebugflag": "135",
"wDebugflag":"135",
}
def getLogDir(self):

View File

@ -49,9 +49,23 @@ while $i < $tbNum
$nchar = $nchar . $c
$nchar = $nchar . '
sql insert into $tb values ($tstart , $c , $c , $c , $c , $c , $c , $c , $binary , $nchar )
$tstart = $tstart + 30
$x = $x + 1
$next = $tstart + 30
$f = $x + 1
$c1 = $f / 100
$c1 = $c1 * 100
$c1 = $f - $c1
$binary1 = ' . binary
$binary1 = $binary1 . $c1
$binary1 = $binary1 . '
$nchar1 = ' . nchar
$nchar1 = $nchar1 . $c1
$nchar1 = $nchar1 . '
sql insert into $tb values ($tstart , $c , $c , $c , $c , $c , $c , $c , $binary , $nchar ) ($next , $c1 , $c1 , $c1 , $c1 , $c1 , $c1 , $c1 , $binary1 , $nchar1 )
$tstart = $tstart + 60
$x = $x + 2
endw
$i = $i + 1