fix(tsdb): fix error.

This commit is contained in:
Haojun Liao 2024-03-23 00:11:21 +08:00
parent 17a65430cd
commit c35e834141
1 changed files with 27 additions and 17 deletions

View File

@ -148,6 +148,7 @@ static void tRowGetKeyDeepCopy(SRow* pRow, SRowKey* pKey) {
} }
SPrimaryKeyIndex indices[TD_MAX_PK_COLS]; SPrimaryKeyIndex indices[TD_MAX_PK_COLS];
ASSERT(pKey->numOfPKs <= TD_MAX_PK_COLS);
uint8_t *data = pRow->data; uint8_t *data = pRow->data;
for (int32_t i = 0; i < pRow->numOfPKs; i++) { for (int32_t i = 0; i < pRow->numOfPKs; i++) {
@ -686,13 +687,14 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN
STableBlockScanInfo* pScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, uid, pReader->idStr); STableBlockScanInfo* pScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, uid, pReader->idStr);
// todo: here we should find the first timestamp that is greater than the lastProcKey // todo: here we should find the first timestamp that is greater than the lastProcKey
// the window is an open interval NOW.
if (asc) { if (asc) {
w.skey = pScanInfo->lastProcKey.ts + step; w.skey = pScanInfo->lastProcKey.ts;
} else { } else {
w.ekey = pScanInfo->lastProcKey.ts + step; w.ekey = pScanInfo->lastProcKey.ts;
} }
if (isEmptyQueryTimeWindow(&w)) { if (/*isEmptyQueryTimeWindow(&w)*/ w.ekey - w.skey < 2) { // NOTE: specialized for open interval
k += 1; k += 1;
if (k >= numOfTables) { if (k >= numOfTables) {
@ -707,7 +709,11 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN
continue; continue;
} }
// 2. version range check if (pkCompEx(pReader->pkComparFn, &pRecord->lastKey.key, &pScanInfo->lastProcKey) <= 0) {
continue;
}
// 2. version range check, version range is an CLOSED interval
if (pRecord->minVer > pReader->info.verRange.maxVer || pRecord->maxVer < pReader->info.verRange.minVer) { if (pRecord->minVer > pReader->info.verRange.maxVer || pRecord->maxVer < pReader->info.verRange.minVer) {
continue; continue;
} }
@ -724,6 +730,7 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN
if (pScanInfo->filesetWindow.skey > pRecord->firstKey.key.ts) { if (pScanInfo->filesetWindow.skey > pRecord->firstKey.key.ts) {
pScanInfo->filesetWindow.skey = pRecord->firstKey.key.ts; pScanInfo->filesetWindow.skey = pRecord->firstKey.key.ts;
} }
if (pScanInfo->filesetWindow.ekey < pRecord->lastKey.key.ts) { if (pScanInfo->filesetWindow.ekey < pRecord->lastKey.key.ts) {
pScanInfo->filesetWindow.ekey = pRecord->lastKey.key.ts; pScanInfo->filesetWindow.ekey = pRecord->lastKey.key.ts;
} }
@ -1637,9 +1644,11 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
__compar_fn_t compFn = pReader->pkComparFn; __compar_fn_t compFn = pReader->pkComparFn;
int32_t pkSrcSlot = pReader->suppInfo.pkSrcSlot; int32_t pkSrcSlot = pReader->suppInfo.pkSrcSlot;
SRowKey* pSttKey = NULL; SRowKey* pSttKey = &(SRowKey){0};
if (hasDataInSttBlock(pBlockScanInfo) && (!pBlockScanInfo->cleanSttBlocks)) { if (hasDataInSttBlock(pBlockScanInfo) && (!pBlockScanInfo->cleanSttBlocks)) {
pSttKey = getCurrentKeyInSttBlock(pSttBlockReader); tRowKeyAssign(pSttKey, getCurrentKeyInSttBlock(pSttBlockReader));
} else {
pSttKey = NULL;
} }
SRowKey k; SRowKey k;
@ -1675,21 +1684,21 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
if (pReader->info.order == TSDB_ORDER_ASC) { if (pReader->info.order == TSDB_ORDER_ASC) {
minKey = k; // chosen the minimum value minKey = k; // chosen the minimum value
if (pkCompEx(compFn, pfKey, &minKey) < 0) { if (pfKey != NULL && pkCompEx(compFn, pfKey, &minKey) < 0) {
minKey = *pfKey; minKey = *pfKey;
} }
if (pkCompEx(compFn, pSttKey, &minKey) < 0) { if (pSttKey != NULL && pkCompEx(compFn, pSttKey, &minKey) < 0) {
minKey = *pSttKey; minKey = *pSttKey;
} }
} else { } else {
minKey = k; minKey = k;
if (pkCompEx(compFn, pfKey, &minKey) > 0) { if (pfKey != NULL && pkCompEx(compFn, pfKey, &minKey) > 0) {
minKey = *pfKey; minKey = *pfKey;
} }
if (pkCompEx(compFn, pSttKey, &minKey) > 0) { if (pSttKey != NULL && pkCompEx(compFn, pSttKey, &minKey) > 0) {
minKey = *pSttKey; minKey = *pSttKey;
} }
} }
@ -1888,7 +1897,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
minKey = ik; minKey = ik;
} }
if (hasDataInFileBlock(pBlockData, pDumpInfo) && (pkCompEx(compFn, pfKey, &minKey) < 0)) { if (pfKey != NULL && (pkCompEx(compFn, pfKey, &minKey) < 0)) {
minKey = *pfKey; minKey = *pfKey;
} }
@ -1901,7 +1910,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
minKey = ik; minKey = ik;
} }
if (hasDataInFileBlock(pBlockData, pDumpInfo) && (pkCompEx(compFn, pfKey, &minKey) > 0)) { if (pfKey != NULL && (pkCompEx(compFn, pfKey, &minKey) > 0)) {
minKey = *pfKey; minKey = *pfKey;
} }
@ -3659,7 +3668,7 @@ int32_t doMergeMemTableMultiRows(TSDBROW* pRow, SRowKey* pKey, uint64_t uid, SIt
SRowKey nextRowKey = {0}; SRowKey nextRowKey = {0};
tRowGetKeyEx(pNextRow, &nextRowKey); tRowGetKeyEx(pNextRow, &nextRowKey);
if (pkCompEx(pReader->pkComparFn, pKey, &nextRowKey) != 0) { if (pKey->numOfPKs > 0 && pkCompEx(pReader->pkComparFn, pKey, &nextRowKey) != 0) {
*pResRow = current; *pResRow = current;
*freeTSRow = false; *freeTSRow = false;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -3966,6 +3975,9 @@ int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t e
if (row.type == TSDBROW_ROW_FMT) { if (row.type == TSDBROW_ROW_FMT) {
code = doAppendRowFromTSRow(pBlock, pReader, row.pTSRow, pBlockScanInfo); code = doAppendRowFromTSRow(pBlock, pReader, row.pTSRow, pBlockScanInfo);
if (code == TSDB_CODE_SUCCESS) {
tRowGetKeyDeepCopy(row.pTSRow, &pBlockScanInfo->lastProcKey);
}
if (freeTSRow) { if (freeTSRow) {
taosMemoryFree(row.pTSRow); taosMemoryFree(row.pTSRow);
@ -3974,8 +3986,6 @@ int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t e
if (code) { if (code) {
return code; return code;
} }
tRowGetKeyDeepCopy(row.pTSRow, &pBlockScanInfo->lastProcKey);
} else { } else {
code = doAppendRowFromFileBlock(pBlock, pReader, row.pBlockData, row.iRow); code = doAppendRowFromFileBlock(pBlock, pReader, row.pBlockData, row.iRow);
if (code) { if (code) {
@ -4667,8 +4677,8 @@ int32_t tsdbNextDataBlock2(STsdbReader* pReader, bool* hasNext) {
tsdbReleaseReader(pReader); tsdbReleaseReader(pReader);
} }
tsdbReaderSuspend2(pReader); // tsdbReaderSuspend2(pReader);
tsdbReaderResume2(pReader); // tsdbReaderResume2(pReader);
return code; return code;
} }