diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 9df0ba50ae..6d6af90ef7 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -61,10 +61,10 @@ static void setComposedBlockFlag(STsdbReader* pReader, bool composed); static bool hasBeenDropped(const SArray* pDelList, int32_t* index, int64_t key, int64_t ver, int32_t order, SVersionRange* pVerRange); -static int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, +static int32_t doMergeMemTableMultiRows(TSDBROW* pRow, SRowKey* pKey, uint64_t uid, SIterInfo* pIter, SArray* pDelList, TSDBROW* pResRow, STsdbReader* pReader, bool* freeTSRow); -static int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo, - STsdbReader* pReader, SRow** pTSRow); +static int32_t doMergeMemIMemRows(TSDBROW* pRow, SRowKey* pRowKey, TSDBROW* piRow, SRowKey* piRowKey, + STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, SRow** pTSRow); static int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, SRowKey* pKey, STsdbReader* pReader); static int32_t mergeRowsInSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pBlockScanInfo, @@ -89,32 +89,26 @@ static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWi static void resetPreFilesetMemTableListIndex(SReaderStatus* pStatus); -static int32_t pkComp(STsdbReader* pReader, TSDBROW* p1, TSDBROW* p2) { - STsdbRowKey k1 = {0}, k2 = {0}; - - if (pReader->pkComparFn == NULL) { - ASSERT(TSDBROW_TS(p1) != TSDBROW_TS(p2)); - return 0; +static int32_t pkCompEx(__compar_fn_t comparFn, SRowKey* p1, SRowKey* p2) { + if (p2 == NULL) { + return 1; } - tsdbRowGetKey(p1, &k1); - tsdbRowGetKey(p2, &k2); - return pReader->pkComparFn(&k1.key.pks[0].val, &k2.key.pks[0].val); -} - -static int32_t pkComp1(STsdbReader* pReader, SRowKey* p1, TSDBROW* p2) { - if (pReader->pkComparFn == NULL) { - ASSERT(p1->ts != TSDBROW_TS(p2)); - return 0; + if (p1 == NULL) { + return -1; } - SRowKey k2 = {0}; - tRowGetKeyEx(p2, &k2); - return pReader->pkComparFn(&p1->pks[0].val, &k2.pks[0].val); -} + if (p1->ts < p2->ts) { + return -1; + } else if (p1->ts > p2->ts) { + return 1; + } -static int32_t pkComp2(__compar_fn_t comparFn, SRowKey* p1, SRowKey* p2) { - return comparFn(&p1->pks[0].val, &p2->pks[0].val); + if (p1->numOfPKs == 0) { + return 0; + } else { + return comparFn(&p1->pks[0].val, &p2->pks[0].val); + } } static void tColRowGetKeyDeepCopy(SBlockData* pBlock, int32_t irow, int32_t slotId, SRowKey* pKey) { @@ -1519,7 +1513,7 @@ static bool tryCopyDistinctRowFromFileBlock(STsdbReader* pReader, SBlockData* pB SRowKey nextRowKey; tColRowGetKey(pBlockData, pDumpInfo->rowIndex + step, &nextRowKey); - if (pKey->ts != nextRowKey.ts || (pkComp2(pReader->pkComparFn, pKey, &nextRowKey) != 0)) { // merge is not needed + if (pkCompEx(pReader->pkComparFn, pKey, &nextRowKey) != 0) { // merge is not needed code = doAppendRowFromFileBlock(pReader->resBlockInfo.pResBlock, pReader, pBlockData, pDumpInfo->rowIndex); if (code) { return code; @@ -1576,28 +1570,6 @@ static void doPinSttBlock(SSttBlockReader* pSttBlockReader) { tMergeTreePinSttBl static void doUnpinSttBlock(SSttBlockReader* pSttBlockReader) { tMergeTreeUnpinSttBlock(&pSttBlockReader->mergeTree); } -static int32_t pkCompEx(__compar_fn_t comparFn, SRowKey* p1, SRowKey* p2) { - if (p2 == NULL) { - return 1; - } - - if (p1 == NULL) { - return -1; - } - - if (p1->ts < p2->ts) { - return -1; - } else if (p1->ts > p2->ts) { - return 1; - } - - if (p1->numOfPKs == 0) { - return 0; - } else { - return comparFn(&p1->pks[0].val, &p2->pks[0].val); - } -} - static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pScanInfo, SRowKey* pSttKey, STsdbReader* pReader, bool* copied) { @@ -2191,9 +2163,9 @@ static bool isValidFileBlockRow(SBlockData* pBlockData, int32_t rowIndex, STable } if (ts == pBlockScanInfo->lastProcKey.ts) { // todo opt perf - SRowKey nextRowKey; + SRowKey nextRowKey; // lazy eval tColRowGetKey(pBlockData, rowIndex, &nextRowKey); - if (pkComp2(pReader->pkComparFn, &pBlockScanInfo->lastProcKey, &nextRowKey) == 0) { + if (pkCompEx(pReader->pkComparFn, &pBlockScanInfo->lastProcKey, &nextRowKey) == 0) { return false; } } @@ -3615,7 +3587,9 @@ int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, SRowKey *pCurKey, SArra break; } - if (pkComp1(pReader, pCurKey, pRow) != 0) { + SRowKey nextKey = {0}; + tRowGetKeyEx(pRow, &nextKey); + if (pkCompEx(pReader->pkComparFn, pCurKey, &nextKey) != 0) { break; } @@ -3738,14 +3712,11 @@ int32_t doMergeRowsInSttBlock(SSttBlockReader* pSttBlockReader, STableBlockScanI return TSDB_CODE_SUCCESS; } -int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, TSDBROW* pResRow, - STsdbReader* pReader, bool* freeTSRow) { +int32_t doMergeMemTableMultiRows(TSDBROW* pRow, SRowKey* pKey, uint64_t uid, SIterInfo* pIter, SArray* pDelList, + TSDBROW* pResRow, STsdbReader* pReader, bool* freeTSRow) { TSDBROW* pNextRow = NULL; TSDBROW current = *pRow; - SRowKey curKey = {0}; - tRowGetKeyEx(¤t, &curKey); - { // if the timestamp of the next valid row has a different ts, return current row directly pIter->hasVal = tsdbTbDataIterNext(pIter->iter); @@ -3761,7 +3732,15 @@ int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, return TSDB_CODE_SUCCESS; } - if (TSDBROW_TS(¤t) != TSDBROW_TS(pNextRow) || (pkComp1(pReader, &curKey, pNextRow) != 0)) { + if (TSDBROW_TS(¤t) != TSDBROW_TS(pNextRow)) { + *pResRow = current; + *freeTSRow = false; + return TSDB_CODE_SUCCESS; + } + + SRowKey nextRowKey = {0}; + tRowGetKeyEx(pNextRow, &nextRowKey); + if (pkCompEx(pReader->pkComparFn, pKey, &nextRowKey) != 0) { *pResRow = current; *freeTSRow = false; return TSDB_CODE_SUCCESS; @@ -3799,7 +3778,7 @@ int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, return code; } - code = doMergeRowsInBuf(pIter, uid, &curKey, pDelList, pReader); + code = doMergeRowsInBuf(pIter, uid, pKey, pDelList, pReader); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -3816,14 +3795,10 @@ int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, return TSDB_CODE_SUCCESS; } -int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, - SRow** pTSRow) { +int32_t doMergeMemIMemRows(TSDBROW* pRow, SRowKey* pRowKey, TSDBROW* piRow, SRowKey* piRowKey, + STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, SRow** pTSRow) { SRowMerger* pMerger = &pReader->status.merger; - SRowKey k, ik; - tRowGetKeyEx(pRow, &k); - tRowGetKeyEx(piRow, &ik); - STSchema* pSchema = NULL; if (pRow->type == TSDBROW_ROW_FMT) { pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid); @@ -3846,13 +3821,13 @@ int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* p return code; } - code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, &ik, pBlockScanInfo->delSkyline, pReader); + code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, piRowKey, pBlockScanInfo->delSkyline, pReader); if (code != TSDB_CODE_SUCCESS) { return code; } tsdbRowMergerAdd(&pReader->status.merger, pRow, pSchema); - code = doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, &k, pBlockScanInfo->delSkyline, pReader); + code = doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, pRowKey, pBlockScanInfo->delSkyline, pReader); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -3863,13 +3838,13 @@ int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* p return code; } - code = doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, &k, pBlockScanInfo->delSkyline, pReader); + code = doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, pRowKey, pBlockScanInfo->delSkyline, pReader); if (code != TSDB_CODE_SUCCESS) { return code; } tsdbRowMergerAdd(&pReader->status.merger, piRow, piSchema); - code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, &ik, pBlockScanInfo->delSkyline, pReader); + code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, piRowKey, pBlockScanInfo->delSkyline, pReader); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -3882,42 +3857,47 @@ int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* p static int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, TSDBROW* pResRow, int64_t endKey, bool* freeTSRow) { - TSDBROW* pRow = getValidMemRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader); - TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader); - SArray* pDelList = pBlockScanInfo->delSkyline; - uint64_t uid = pBlockScanInfo->uid; + TSDBROW* pRow = getValidMemRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader); + TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader); + + SArray* pDelList = pBlockScanInfo->delSkyline; + uint64_t uid = pBlockScanInfo->uid; + SIterInfo* piter = &pBlockScanInfo->iter; + SIterInfo* piiter = &pBlockScanInfo->iiter; + SRowKey rowKey = {0}, irowKey = {0}; // todo refactor bool asc = ASCENDING_TRAVERSE(pReader->info.order); - if (pBlockScanInfo->iter.hasVal) { + if (piter->hasVal) { TSDBKEY k = TSDBROW_KEY(pRow); if ((k.ts >= endKey && asc) || (k.ts <= endKey && !asc)) { pRow = NULL; } } - if (pBlockScanInfo->iiter.hasVal) { + if (piiter->hasVal) { TSDBKEY k = TSDBROW_KEY(piRow); if ((k.ts >= endKey && asc) || (k.ts <= endKey && !asc)) { piRow = NULL; } } - if (pBlockScanInfo->iter.hasVal && pBlockScanInfo->iiter.hasVal && pRow != NULL && piRow != NULL) { - TSDBKEY k = TSDBROW_KEY(pRow); - TSDBKEY ik = TSDBROW_KEY(piRow); + if (piter->hasVal && piiter->hasVal && pRow != NULL && piRow != NULL) { + tRowGetKeyEx(pRow, &rowKey); + tRowGetKeyEx(piRow, &irowKey); int32_t code = TSDB_CODE_SUCCESS; - if (ik.ts != k.ts || (pkComp(pReader, pRow, piRow) != 0)) { - if (((ik.ts < k.ts) && asc) || ((ik.ts > k.ts) && (!asc))) { // ik.ts < k.ts - code = doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, pResRow, pReader, freeTSRow); - } else if (((k.ts < ik.ts) && asc) || ((k.ts > ik.ts) && (!asc))) { - code = doMergeMemTableMultiRows(pRow, uid, &pBlockScanInfo->iter, pDelList, pResRow, pReader, freeTSRow); + int32_t ret = pkCompEx(pReader->pkComparFn, &rowKey, &irowKey); + if (ret != 0) { + if ((ret > 0 && asc) || (ret < 0 && (!asc))) { // ik.ts < k.ts + code = doMergeMemTableMultiRows(piRow, &irowKey, uid, piiter, pDelList, pResRow, pReader, freeTSRow); + } else if ((ret < 0 && asc) || (ret > 0 && (!asc))) { + code = doMergeMemTableMultiRows(pRow, &rowKey, uid, piter, pDelList, pResRow, pReader, freeTSRow); } } else { // ik.ts == k.ts *freeTSRow = true; pResRow->type = TSDBROW_ROW_FMT; - code = doMergeMemIMemRows(pRow, piRow, pBlockScanInfo, pReader, &pResRow->pTSRow); + code = doMergeMemIMemRows(pRow, &rowKey, piRow, &irowKey, pBlockScanInfo, pReader, &pResRow->pTSRow); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -3926,12 +3906,14 @@ static int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbRea return code; } - if (pBlockScanInfo->iter.hasVal && pRow != NULL) { - return doMergeMemTableMultiRows(pRow, uid, &pBlockScanInfo->iter, pDelList, pResRow, pReader, freeTSRow); + if (piter->hasVal && pRow != NULL) { + tRowGetKeyEx(pRow, &rowKey); + return doMergeMemTableMultiRows(pRow, &rowKey, uid, piter, pDelList, pResRow, pReader, freeTSRow); } - if (pBlockScanInfo->iiter.hasVal && piRow != NULL) { - return doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, pResRow, pReader, freeTSRow); + if (piiter->hasVal && piRow != NULL) { + tRowGetKeyEx(piRow, &irowKey); + return doMergeMemTableMultiRows(piRow, &irowKey, uid, piiter, pDelList, pResRow, pReader, freeTSRow); } return TSDB_CODE_SUCCESS;