From c35e834141d722d3f4162ed5684eecea3f5c5442 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 23 Mar 2024 00:11:21 +0800 Subject: [PATCH] fix(tsdb): fix error. --- source/dnode/vnode/src/tsdb/tsdbRead2.c | 44 +++++++++++++++---------- 1 file changed, 27 insertions(+), 17 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index cda8330282..7cd624d7c7 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -148,6 +148,7 @@ static void tRowGetKeyDeepCopy(SRow* pRow, SRowKey* pKey) { } SPrimaryKeyIndex indices[TD_MAX_PK_COLS]; + ASSERT(pKey->numOfPKs <= TD_MAX_PK_COLS); uint8_t *data = pRow->data; for (int32_t i = 0; i < pRow->numOfPKs; i++) { @@ -686,13 +687,14 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN STableBlockScanInfo* pScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, uid, pReader->idStr); // todo: here we should find the first timestamp that is greater than the lastProcKey + // the window is an open interval NOW. if (asc) { - w.skey = pScanInfo->lastProcKey.ts + step; + w.skey = pScanInfo->lastProcKey.ts; } else { - w.ekey = pScanInfo->lastProcKey.ts + step; + w.ekey = pScanInfo->lastProcKey.ts; } - if (isEmptyQueryTimeWindow(&w)) { + if (/*isEmptyQueryTimeWindow(&w)*/ w.ekey - w.skey < 2) { // NOTE: specialized for open interval k += 1; if (k >= numOfTables) { @@ -707,7 +709,11 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN continue; } - // 2. version range check + if (pkCompEx(pReader->pkComparFn, &pRecord->lastKey.key, &pScanInfo->lastProcKey) <= 0) { + continue; + } + + // 2. version range check, version range is an CLOSED interval if (pRecord->minVer > pReader->info.verRange.maxVer || pRecord->maxVer < pReader->info.verRange.minVer) { continue; } @@ -724,6 +730,7 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN if (pScanInfo->filesetWindow.skey > pRecord->firstKey.key.ts) { pScanInfo->filesetWindow.skey = pRecord->firstKey.key.ts; } + if (pScanInfo->filesetWindow.ekey < pRecord->lastKey.key.ts) { pScanInfo->filesetWindow.ekey = pRecord->lastKey.key.ts; } @@ -1637,9 +1644,11 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* __compar_fn_t compFn = pReader->pkComparFn; int32_t pkSrcSlot = pReader->suppInfo.pkSrcSlot; - SRowKey* pSttKey = NULL; + SRowKey* pSttKey = &(SRowKey){0}; if (hasDataInSttBlock(pBlockScanInfo) && (!pBlockScanInfo->cleanSttBlocks)) { - pSttKey = getCurrentKeyInSttBlock(pSttBlockReader); + tRowKeyAssign(pSttKey, getCurrentKeyInSttBlock(pSttBlockReader)); + } else { + pSttKey = NULL; } SRowKey k; @@ -1675,21 +1684,21 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* if (pReader->info.order == TSDB_ORDER_ASC) { minKey = k; // chosen the minimum value - if (pkCompEx(compFn, pfKey, &minKey) < 0) { + if (pfKey != NULL && pkCompEx(compFn, pfKey, &minKey) < 0) { minKey = *pfKey; } - if (pkCompEx(compFn, pSttKey, &minKey) < 0) { + if (pSttKey != NULL && pkCompEx(compFn, pSttKey, &minKey) < 0) { minKey = *pSttKey; } } else { minKey = k; - if (pkCompEx(compFn, pfKey, &minKey) > 0) { + if (pfKey != NULL && pkCompEx(compFn, pfKey, &minKey) > 0) { minKey = *pfKey; } - if (pkCompEx(compFn, pSttKey, &minKey) > 0) { + if (pSttKey != NULL && pkCompEx(compFn, pSttKey, &minKey) > 0) { minKey = *pSttKey; } } @@ -1888,7 +1897,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* minKey = ik; } - if (hasDataInFileBlock(pBlockData, pDumpInfo) && (pkCompEx(compFn, pfKey, &minKey) < 0)) { + if (pfKey != NULL && (pkCompEx(compFn, pfKey, &minKey) < 0)) { minKey = *pfKey; } @@ -1901,7 +1910,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* minKey = ik; } - if (hasDataInFileBlock(pBlockData, pDumpInfo) && (pkCompEx(compFn, pfKey, &minKey) > 0)) { + if (pfKey != NULL && (pkCompEx(compFn, pfKey, &minKey) > 0)) { minKey = *pfKey; } @@ -3659,7 +3668,7 @@ int32_t doMergeMemTableMultiRows(TSDBROW* pRow, SRowKey* pKey, uint64_t uid, SIt SRowKey nextRowKey = {0}; tRowGetKeyEx(pNextRow, &nextRowKey); - if (pkCompEx(pReader->pkComparFn, pKey, &nextRowKey) != 0) { + if (pKey->numOfPKs > 0 && pkCompEx(pReader->pkComparFn, pKey, &nextRowKey) != 0) { *pResRow = current; *freeTSRow = false; return TSDB_CODE_SUCCESS; @@ -3966,6 +3975,9 @@ int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t e if (row.type == TSDBROW_ROW_FMT) { code = doAppendRowFromTSRow(pBlock, pReader, row.pTSRow, pBlockScanInfo); + if (code == TSDB_CODE_SUCCESS) { + tRowGetKeyDeepCopy(row.pTSRow, &pBlockScanInfo->lastProcKey); + } if (freeTSRow) { taosMemoryFree(row.pTSRow); @@ -3974,8 +3986,6 @@ int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t e if (code) { return code; } - - tRowGetKeyDeepCopy(row.pTSRow, &pBlockScanInfo->lastProcKey); } else { code = doAppendRowFromFileBlock(pBlock, pReader, row.pBlockData, row.iRow); if (code) { @@ -4667,8 +4677,8 @@ int32_t tsdbNextDataBlock2(STsdbReader* pReader, bool* hasNext) { tsdbReleaseReader(pReader); } - tsdbReaderSuspend2(pReader); - tsdbReaderResume2(pReader); +// tsdbReaderSuspend2(pReader); +// tsdbReaderResume2(pReader); return code; }