diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index bed8ff61c7..9368208377 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -49,8 +49,8 @@ static int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, i STsdbReader* pReader); static TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader); static int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, SRowKey* pKey, STsdbReader* pReader); -static int32_t doMergeRowsInSttBlock(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pScanInfo, - SRowKey* pRowKey, SRowMerger* pMerger, SVersionRange* pVerRange, const char* id); +static int32_t doMergeRowsInSttBlock(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pScanInfo, SRowKey* pRowKey, + SRowMerger* pMerger, int32_t pkSrcSlot, SVersionRange* pVerRange, const char* id); static int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, SRowKey* pCurKey, SArray* pDelList, STsdbReader* pReader); static int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, SRow* pTSRow, @@ -1526,7 +1526,7 @@ static bool tryCopyDistinctRowFromFileBlock(STsdbReader* pReader, SBlockData* pB return code; } -static bool nextRowFromSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pScanInfo, +static bool nextRowFromSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pScanInfo, int32_t pkSrcSlot, SVersionRange* pVerRange) { int32_t order = pSttBlockReader->order; int32_t step = ASCENDING_TRAVERSE(order) ? 1 : -1; @@ -1547,13 +1547,11 @@ static bool nextRowFromSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockSc if (pSttBlockReader->numOfPks == 0) { pSttBlockReader->currentKey.ts = key; - // todo handle error - pScanInfo->sttKeyInfo.nextProcKey = key; - } else { // todo handle the deep copy problem - tColRowGetKey(pRow->pBlockData, pRow->iRow, &pSttBlockReader->currentKey); - pScanInfo->sttKeyInfo.nextProcKey = key; + } else { + tColRowGetKeyDeepCopy(pRow->pBlockData, pRow->iRow, pkSrcSlot, &pSttBlockReader->currentKey); } + pScanInfo->sttKeyInfo.nextProcKey = key; if (pScanInfo->delSkyline != NULL && TARRAY_SIZE(pScanInfo->delSkyline) > 0) { if (!hasBeenDropped(pScanInfo->delSkyline, &pScanInfo->sttBlockDelIndex, key, ver, order, pVerRange)) { pScanInfo->sttKeyInfo.status = STT_FILE_HAS_DATA; @@ -1578,30 +1576,18 @@ static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SSttBlockReader* pSttB // avoid the fetch next row replace the referenced stt block in buffer doPinSttBlock(pSttBlockReader); - bool hasVal = nextRowFromSttBlocks(pSttBlockReader, pScanInfo, &pReader->info.verRange); + bool hasVal = nextRowFromSttBlocks(pSttBlockReader, pScanInfo, pReader->suppInfo.pkSrcSlot, &pReader->info.verRange); doUnpinSttBlock(pSttBlockReader); if (hasVal) { - SRowKey nextKey; - - TSDBROW* pNextRow = tMergeTreeGetRow(&pSttBlockReader->mergeTree); - tRowGetKeyEx(pNextRow, &nextKey); - - if (pkCompEx(pReader->pkComparFn, pSttKey, &nextKey) != 0) { + SRowKey* pNext = getCurrentKeyInSttBlock(pSttBlockReader); + if (pkCompEx(pReader->pkComparFn, pSttKey, pNext) != 0) { code = doAppendRowFromFileBlock(pReader->resBlockInfo.pResBlock, pReader, fRow->pBlockData, fRow->iRow); - if (code) { - return code; - } - - *copied = true; + *copied = (code == TSDB_CODE_SUCCESS); return code; } } else { code = doAppendRowFromFileBlock(pReader->resBlockInfo.pResBlock, pReader, fRow->pBlockData, fRow->iRow); - if (code) { - return code; - } - - *copied = true; + *copied = (code == TSDB_CODE_SUCCESS); return code; } @@ -1648,6 +1634,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* SBlockData* pBlockData = &pReader->status.fileBlockData; SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; __compar_fn_t compFn = pReader->pkComparFn; + int32_t pkSrcSlot = pReader->suppInfo.pkSrcSlot; SRowKey* pSttKey = NULL; if (hasDataInSttBlock(pBlockScanInfo) && (!pBlockScanInfo->cleanSttBlocks)) { @@ -1687,17 +1674,17 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* if (pReader->info.order == TSDB_ORDER_ASC) { minKey = k; // chosen the minimum value - if (hasDataInFileBlock(pBlockData, pDumpInfo) && (pkCompEx(compFn, pfKey, &minKey) < 0)) { + if (pkCompEx(compFn, pfKey, &minKey) < 0) { minKey = *pfKey; } - if (pSttKey != NULL && (pkCompEx(compFn, pSttKey, &minKey) < 0)) { + if (pkCompEx(compFn, pSttKey, &minKey) < 0) { minKey = *pSttKey; } } else { minKey = k; - if (hasDataInFileBlock(pBlockData, pDumpInfo) && (pkCompEx(compFn, pfKey, &minKey) > 0)) { + if (pkCompEx(compFn, pfKey, &minKey) > 0) { minKey = *pfKey; } @@ -1724,7 +1711,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* if (code != TSDB_CODE_SUCCESS) { return code; } - doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, pSttKey, pMerger, &pReader->info.verRange, pReader->idStr); + doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, pSttKey, pMerger, pkSrcSlot, &pReader->info.verRange, pReader->idStr); } if (pkCompEx(compFn, &minKey, &k) == 0) { @@ -1757,7 +1744,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* if (code != TSDB_CODE_SUCCESS) { return code; } - doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, pSttKey, pMerger, &pReader->info.verRange, pReader->idStr); + doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, pSttKey, pMerger, pkSrcSlot, &pReader->info.verRange, pReader->idStr); } if (pkCompEx(compFn, &minKey, pfKey) == 0) { @@ -1787,6 +1774,7 @@ static int32_t mergeFileBlockAndSttBlock(STsdbReader* pReader, SSttBlockReader* SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; SRowMerger* pMerger = &pReader->status.merger; int32_t code = TSDB_CODE_SUCCESS; + int32_t pkSrcSlot = pReader->suppInfo.pkSrcSlot; // merge is not initialized yet, due to the fact that the pReader->info.pSchema is not initialized if (pMerger->pArray == NULL) { @@ -1848,7 +1836,7 @@ static int32_t mergeFileBlockAndSttBlock(STsdbReader* pReader, SSttBlockReader* return code; } - doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, pSttKey, pMerger, &pReader->info.verRange, pReader->idStr); + doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, pSttKey, pMerger, pkSrcSlot, &pReader->info.verRange, pReader->idStr); } else { TSDBROW* pRow1 = tMergeTreeGetRow(&pSttBlockReader->mergeTree); code = tsdbRowMergerAdd(pMerger, pRow1, NULL); @@ -1856,7 +1844,7 @@ static int32_t mergeFileBlockAndSttBlock(STsdbReader* pReader, SSttBlockReader* return code; } - doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, pSttKey, pMerger, &pReader->info.verRange, pReader->idStr); + doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, pSttKey, pMerger, pkSrcSlot, &pReader->info.verRange, pReader->idStr); code = tsdbRowMergerAdd(pMerger, &fRow, NULL); if (code != TSDB_CODE_SUCCESS) { @@ -1887,6 +1875,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; SArray* pDelList = pBlockScanInfo->delSkyline; __compar_fn_t compFn = pReader->pkComparFn; + int32_t pkSrcSlot = pReader->suppInfo.pkSrcSlot; TSDBROW* pRow = getValidMemRow(&pBlockScanInfo->iter, pDelList, pReader); TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pDelList, pReader); @@ -1982,7 +1971,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* return code; } - doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, pSttKey, pMerger, &pReader->info.verRange, pReader->idStr); + doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, pSttKey, pMerger, pkSrcSlot, &pReader->info.verRange, pReader->idStr); } if (pkCompEx(compFn, &minKey, &ik) == 0) { @@ -2040,7 +2029,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* return code; } - doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, pSttKey, pMerger, &pReader->info.verRange, pReader->idStr); + doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, pSttKey, pMerger, pkSrcSlot, &pReader->info.verRange, pReader->idStr); } if (pkCompEx(compFn, &minKey, pfKey) == 0) { @@ -2108,13 +2097,13 @@ static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbRea startKey = (STsdbRowKey){.version = pReader->info.verRange.minVer, .key = { .ts = pBlockScanInfo->lastProcKey.ts + 1, - .numOfPKs = 0, // TODO: change here if multi-key is supported + .numOfPKs = pReader->suppInfo.numOfPks, }}; } else { startKey = (STsdbRowKey){.version = pReader->info.verRange.maxVer, .key = { .ts = pBlockScanInfo->lastProcKey.ts - 1, - .numOfPKs = 0, // TODO: change here if multi-key is supported + .numOfPKs = pReader->suppInfo.numOfPks, }}; } @@ -2267,13 +2256,13 @@ static bool initSttBlockReader(SSttBlockReader* pSttBlockReader, STableBlockScan } else { // not clean stt blocks INIT_TIMEWINDOW(&pScanInfo->sttWindow); // reset the time window pScanInfo->sttBlockReturned = false; - hasData = nextRowFromSttBlocks(pSttBlockReader, pScanInfo, &pReader->info.verRange); + hasData = nextRowFromSttBlocks(pSttBlockReader, pScanInfo, pReader->suppInfo.pkSrcSlot, &pReader->info.verRange); } } else { pScanInfo->cleanSttBlocks = false; INIT_TIMEWINDOW(&pScanInfo->sttWindow); // reset the time window pScanInfo->sttBlockReturned = false; - hasData = nextRowFromSttBlocks(pSttBlockReader, pScanInfo, &pReader->info.verRange); + hasData = nextRowFromSttBlocks(pSttBlockReader, pScanInfo, pReader->suppInfo.pkSrcSlot, &pReader->info.verRange); } taosArrayDestroy(info.pTimeWindowList); @@ -2336,15 +2325,18 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc taosMemoryFree(pTSRow); tsdbRowMergerClear(pMerger); + + tRowKeyAssign(&pBlockScanInfo->lastProcKey, pKey); return code; } } int32_t mergeRowsInSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) { - bool copied = false; - SRow* pTSRow = NULL; - SRowKey sttKey = {0}; + bool copied = false; + SRow* pTSRow = NULL; + SRowKey sttKey = {0}; + int32_t pkSrcSlot = pReader->suppInfo.pkSrcSlot; tRowKeyAssign(&sttKey, getCurrentKeyInSttBlock(pSttBlockReader)); @@ -2371,7 +2363,7 @@ int32_t mergeRowsInSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockScanIn TSDBROW* pRow1 = tMergeTreeGetRow(&pSttBlockReader->mergeTree); tsdbRowMergerAdd(pMerger, pRow1, NULL); - doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, &sttKey, pMerger, &pReader->info.verRange, pReader->idStr); + doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, &sttKey, pMerger, pkSrcSlot, &pReader->info.verRange, pReader->idStr); code = tsdbRowMergerGetRow(pMerger, &pTSRow); if (code != TSDB_CODE_SUCCESS) { @@ -2382,6 +2374,8 @@ int32_t mergeRowsInSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockScanIn taosMemoryFree(pTSRow); tsdbRowMergerClear(pMerger); + + tRowKeyAssign(&pBlockScanInfo->lastProcKey, &sttKey); return code; } } @@ -3702,11 +3696,11 @@ int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pSc } int32_t doMergeRowsInSttBlock(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pScanInfo, SRowKey* pRowKey, - SRowMerger* pMerger, SVersionRange* pVerRange, const char* idStr) { - while (nextRowFromSttBlocks(pSttBlockReader, pScanInfo, pVerRange)) { - SRowKey* next1 = getCurrentKeyInSttBlock(pSttBlockReader); + SRowMerger* pMerger, int32_t pkSrcSlot, SVersionRange* pVerRange, const char* idStr) { + while (nextRowFromSttBlocks(pSttBlockReader, pScanInfo, pkSrcSlot, pVerRange)) { + SRowKey* pNextKey = getCurrentKeyInSttBlock(pSttBlockReader); - int32_t ret = pkCompEx(pSttBlockReader->pkComparFn, pRowKey, next1); + int32_t ret = pkCompEx(pSttBlockReader->pkComparFn, pRowKey, pNextKey); if (ret == 0) { TSDBROW* pRow1 = tMergeTreeGetRow(&pSttBlockReader->mergeTree); tsdbRowMergerAdd(pMerger, pRow1, NULL); @@ -3724,8 +3718,9 @@ int32_t doMergeRowsInSttBlock(SSttBlockReader* pSttBlockReader, STableBlockScanI int32_t doMergeMemTableMultiRows(TSDBROW* pRow, SRowKey* pKey, uint64_t uid, SIterInfo* pIter, SArray* pDelList, TSDBROW* pResRow, STsdbReader* pReader, bool* freeTSRow) { - TSDBROW* pNextRow = NULL; - TSDBROW current = *pRow; + SRowMerger* pMerger = &pReader->status.merger; + TSDBROW* pNextRow = NULL; + TSDBROW current = *pRow; { // if the timestamp of the next valid row has a different ts, return current row directly pIter->hasVal = tsdbTbDataIterNext(pIter->iter); @@ -3770,7 +3765,7 @@ int32_t doMergeMemTableMultiRows(TSDBROW* pRow, SRowKey* pKey, uint64_t uid, SIt } } - code = tsdbRowMergerAdd(&pReader->status.merger, ¤t, pTSchema); + code = tsdbRowMergerAdd(pMerger, ¤t, pTSchema); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -3783,7 +3778,7 @@ int32_t doMergeMemTableMultiRows(TSDBROW* pRow, SRowKey* pKey, uint64_t uid, SIt } } - code = tsdbRowMergerAdd(&pReader->status.merger, pNextRow, pTSchema1); + code = tsdbRowMergerAdd(pMerger, pNextRow, pTSchema1); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -3793,13 +3788,13 @@ int32_t doMergeMemTableMultiRows(TSDBROW* pRow, SRowKey* pKey, uint64_t uid, SIt return code; } - code = tsdbRowMergerGetRow(&pReader->status.merger, &pResRow->pTSRow); + code = tsdbRowMergerGetRow(pMerger, &pResRow->pTSRow); if (code != TSDB_CODE_SUCCESS) { return code; } pResRow->type = TSDBROW_ROW_FMT; - tsdbRowMergerClear(&pReader->status.merger); + tsdbRowMergerClear(pMerger); *freeTSRow = true; return TSDB_CODE_SUCCESS; @@ -3807,7 +3802,7 @@ int32_t doMergeMemTableMultiRows(TSDBROW* pRow, SRowKey* pKey, uint64_t uid, SIt int32_t doMergeMemIMemRows(TSDBROW* pRow, SRowKey* pRowKey, TSDBROW* piRow, SRowKey* piRowKey, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, SRow** pTSRow) { - SRowMerger* pMerger = &pReader->status.merger; + SRowMerger* pMerger = pMerger; STSchema* pSchema = NULL; if (pRow->type == TSDBROW_ROW_FMT) { @@ -3826,7 +3821,7 @@ int32_t doMergeMemIMemRows(TSDBROW* pRow, SRowKey* pRowKey, TSDBROW* piRow, SRow } if (ASCENDING_TRAVERSE(pReader->info.order)) { // ascending order imem --> mem - int32_t code = tsdbRowMergerAdd(&pReader->status.merger, piRow, piSchema); + int32_t code = tsdbRowMergerAdd(pMerger, piRow, piSchema); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -3836,14 +3831,14 @@ int32_t doMergeMemIMemRows(TSDBROW* pRow, SRowKey* pRowKey, TSDBROW* piRow, SRow return code; } - tsdbRowMergerAdd(&pReader->status.merger, pRow, pSchema); + tsdbRowMergerAdd(pMerger, pRow, pSchema); code = doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, pRowKey, pBlockScanInfo->delSkyline, pReader); if (code != TSDB_CODE_SUCCESS) { return code; } } else { - int32_t code = tsdbRowMergerAdd(&pReader->status.merger, pRow, pSchema); + int32_t code = tsdbRowMergerAdd(pMerger, pRow, pSchema); if (code != TSDB_CODE_SUCCESS || pMerger->pTSchema == NULL) { return code; } @@ -3853,7 +3848,7 @@ int32_t doMergeMemIMemRows(TSDBROW* pRow, SRowKey* pRowKey, TSDBROW* piRow, SRow return code; } - tsdbRowMergerAdd(&pReader->status.merger, piRow, piSchema); + tsdbRowMergerAdd(pMerger, piRow, piSchema); code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, piRowKey, pBlockScanInfo->delSkyline, pReader); if (code != TSDB_CODE_SUCCESS) { return code;