fix(stream): add check when merging rows from files and buffer.

This commit is contained in:
Haojun Liao 2024-01-10 11:02:44 +08:00
parent 273e47a1fb
commit eac0d14868
3 changed files with 21 additions and 98 deletions

View File

@ -1490,7 +1490,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
int64_t tsLast = INT64_MIN; int64_t tsLast = INT64_MIN;
if (hasDataInSttBlock(pBlockScanInfo)) { if (hasDataInSttBlock(pBlockScanInfo) && (!pBlockScanInfo->cleanSttBlocks)) {
tsLast = getCurrentKeyInSttBlock(pSttBlockReader); tsLast = getCurrentKeyInSttBlock(pSttBlockReader);
} }
@ -1509,7 +1509,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
int64_t minKey = 0; int64_t minKey = 0;
if (pReader->info.order == TSDB_ORDER_ASC) { if (pReader->info.order == TSDB_ORDER_ASC) {
minKey = INT64_MAX; // chosen the minimum value minKey = INT64_MAX; // chosen the minimum value
if (minKey > tsLast && hasDataInSttBlock(pBlockScanInfo)) { if (minKey > tsLast && hasDataInSttBlock(pBlockScanInfo) && (!pBlockScanInfo->cleanSttBlocks)) {
minKey = tsLast; minKey = tsLast;
} }
@ -1522,7 +1522,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
} }
} else { } else {
minKey = INT64_MIN; minKey = INT64_MIN;
if (minKey < tsLast && hasDataInSttBlock(pBlockScanInfo)) { if (minKey < tsLast && hasDataInSttBlock(pBlockScanInfo) && (!pBlockScanInfo->cleanSttBlocks)) {
minKey = tsLast; minKey = tsLast;
} }
@ -1626,91 +1626,6 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
return code; return code;
} }
static int32_t doMergeFileBlockAndLastBlock(SSttBlockReader* pSttBlockReader, STsdbReader* pReader,
STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData,
bool mergeBlockData) {
SRowMerger* pMerger = &pReader->status.merger;
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
int64_t tsLastBlock = getCurrentKeyInSttBlock(pSttBlockReader);
bool copied = false;
int32_t code = TSDB_CODE_SUCCESS;
SRow* pTSRow = NULL;
TSDBROW* pRow = tMergeTreeGetRow(&pSttBlockReader->mergeTree);
// create local variable to hold the row value
TSDBROW fRow = {.iRow = pRow->iRow, .type = TSDBROW_COL_FMT, .pBlockData = pRow->pBlockData};
tsdbTrace("fRow ptr:%p, %d, uid:%" PRIu64 ", ts:%" PRId64 " %s", pRow->pBlockData, pRow->iRow, pSttBlockReader->uid,
fRow.pBlockData->aTSKEY[fRow.iRow], pReader->idStr);
// only stt block exists
if ((!mergeBlockData) || (tsLastBlock != pBlockData->aTSKEY[pDumpInfo->rowIndex])) {
code = tryCopyDistinctRowFromSttBlock(&fRow, pSttBlockReader, pBlockScanInfo, tsLastBlock, pReader, &copied);
if (code) {
return code;
}
if (copied) {
pBlockScanInfo->lastProcKey = tsLastBlock;
return TSDB_CODE_SUCCESS;
} else {
code = tsdbRowMergerAdd(pMerger, &fRow, NULL);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
TSDBROW* pRow1 = tMergeTreeGetRow(&pSttBlockReader->mergeTree);
tsdbRowMergerAdd(pMerger, pRow1, NULL);
doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, tsLastBlock, pMerger, &pReader->info.verRange,
pReader->idStr);
code = tsdbRowMergerGetRow(pMerger, &pTSRow);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
code = doAppendRowFromTSRow(pReader->resBlockInfo.pResBlock, pReader, pTSRow, pBlockScanInfo);
taosMemoryFree(pTSRow);
tsdbRowMergerClear(pMerger);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
} else { // not merge block data
code = tsdbRowMergerAdd(pMerger, &fRow, NULL);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, tsLastBlock, pMerger, &pReader->info.verRange,
pReader->idStr);
// merge with block data if ts == key
if (tsLastBlock == pBlockData->aTSKEY[pDumpInfo->rowIndex]) {
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader);
}
code = tsdbRowMergerGetRow(pMerger, &pTSRow);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
code = doAppendRowFromTSRow(pReader->resBlockInfo.pResBlock, pReader, pTSRow, pBlockScanInfo);
taosMemoryFree(pTSRow);
tsdbRowMergerClear(pMerger);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
return TSDB_CODE_SUCCESS;
}
static int32_t mergeFileBlockAndSttBlock(STsdbReader* pReader, SSttBlockReader* pSttBlockReader, int64_t key, static int32_t mergeFileBlockAndSttBlock(STsdbReader* pReader, SSttBlockReader* pSttBlockReader, int64_t key,
STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData) { STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData) {
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
@ -1816,7 +1731,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pDelList, pReader); TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pDelList, pReader);
int64_t tsLast = INT64_MIN; int64_t tsLast = INT64_MIN;
if (hasDataInSttBlock(pBlockScanInfo)) { if (hasDataInSttBlock(pBlockScanInfo) && (!pBlockScanInfo->cleanSttBlocks)) {
tsLast = getCurrentKeyInSttBlock(pSttBlockReader); tsLast = getCurrentKeyInSttBlock(pSttBlockReader);
} }
@ -1865,7 +1780,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
minKey = key; minKey = key;
} }
if (minKey > tsLast && hasDataInSttBlock(pBlockScanInfo)) { if (minKey > tsLast && hasDataInSttBlock(pBlockScanInfo) && (!pBlockScanInfo->cleanSttBlocks)) {
minKey = tsLast; minKey = tsLast;
} }
} else { } else {
@ -1882,7 +1797,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
minKey = key; minKey = key;
} }
if (minKey < tsLast && hasDataInSttBlock(pBlockScanInfo)) { if (minKey < tsLast && hasDataInSttBlock(pBlockScanInfo) && (!pBlockScanInfo->cleanSttBlocks)) {
minKey = tsLast; minKey = tsLast;
} }
} }
@ -2169,10 +2084,15 @@ static bool initSttBlockReader(SSttBlockReader* pSttBlockReader, STableBlockScan
pScanInfo->sttKeyInfo.nextProcKey = pScanInfo->sttKeyInfo.nextProcKey =
ASCENDING_TRAVERSE(pReader->info.order) ? pScanInfo->sttWindow.skey : pScanInfo->sttWindow.ekey; ASCENDING_TRAVERSE(pReader->info.order) ? pScanInfo->sttWindow.skey : pScanInfo->sttWindow.ekey;
hasData = true; hasData = true;
} else { } else { // not clean stt blocks
INIT_TIMEWINDOW(pScanInfo->sttWindow); //reset the time window
pScanInfo->sttBlockReturned = false;
hasData = nextRowFromSttBlocks(pSttBlockReader, pScanInfo, &pReader->info.verRange); hasData = nextRowFromSttBlocks(pSttBlockReader, pScanInfo, &pReader->info.verRange);
} }
} else { } else {
pScanInfo->cleanSttBlocks = false;
INIT_TIMEWINDOW(pScanInfo->sttWindow); //reset the time window
pScanInfo->sttBlockReturned = false;
hasData = nextRowFromSttBlocks(pSttBlockReader, pScanInfo, &pReader->info.verRange); hasData = nextRowFromSttBlocks(pSttBlockReader, pScanInfo, &pReader->info.verRange);
} }

View File

@ -22,12 +22,6 @@
#include "tsdbUtil2.h" #include "tsdbUtil2.h"
#include "tsimplehash.h" #include "tsimplehash.h"
#define INIT_TIMEWINDOW(_w) \
do { \
(_w)->skey = INT64_MAX; \
(_w)->ekey = INT64_MIN; \
} while (0);
static bool overlapWithDelSkylineWithoutVer(STableBlockScanInfo* pBlockScanInfo, const SBrinRecord* pRecord, static bool overlapWithDelSkylineWithoutVer(STableBlockScanInfo* pBlockScanInfo, const SBrinRecord* pRecord,
int32_t order); int32_t order);
@ -165,6 +159,9 @@ SSHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, SBlockInfoBuf* pBuf
INIT_TIMEWINDOW(&pScanInfo->sttWindow); INIT_TIMEWINDOW(&pScanInfo->sttWindow);
INIT_TIMEWINDOW(&pScanInfo->filesetWindow); INIT_TIMEWINDOW(&pScanInfo->filesetWindow);
pScanInfo->cleanSttBlocks = false;
pScanInfo->sttBlockReturned = false;
pUidList->tableUidList[j] = idList[j].uid; pUidList->tableUidList[j] = idList[j].uid;
if (ASCENDING_TRAVERSE(pTsdbReader->info.order)) { if (ASCENDING_TRAVERSE(pTsdbReader->info.order)) {

View File

@ -26,6 +26,12 @@ extern "C" {
#define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC) #define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC)
#define INIT_TIMEWINDOW(_w) \
do { \
(_w)->skey = INT64_MAX; \
(_w)->ekey = INT64_MIN; \
} while (0);
typedef enum { typedef enum {
READER_STATUS_SUSPEND = 0x1, READER_STATUS_SUSPEND = 0x1,
READER_STATUS_NORMAL = 0x2, READER_STATUS_NORMAL = 0x2,