From 0315e5ac6300988f8cd76b7e5a4274846b9934b5 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 12 Mar 2024 16:22:56 +0800 Subject: [PATCH] feat(tsdb): read complex primary key from buf. --- source/dnode/vnode/src/tsdb/tsdbRead2.c | 129 ++++++++++++++---------- 1 file changed, 76 insertions(+), 53 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 9eefaf6a98..d7f0476d30 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -42,7 +42,8 @@ static TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbRe static int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader); static int32_t doMergeRowsInSttBlock(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts, SRowMerger* pMerger, SVersionRange* pVerRange, const char* id); -static int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, STsdbReader* pReader); +static int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, STsdbRowKey* pKey, SArray* pDelList, + STsdbReader* pReader); static int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, SRow* pTSRow, STableBlockScanInfo* pScanInfo); static int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, SBlockData* pBlockData, @@ -79,6 +80,32 @@ static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWi static void resetPreFilesetMemTableListIndex(SReaderStatus* pStatus); +static int32_t pkComp(STsdbReader* pReader, TSDBROW* p1, TSDBROW* p2) { + STsdbRowKey k1 = {0}, k2 = {0}; + + if (pReader->pkComparFn == NULL) { + ASSERT(TSDBROW_TS(p1) != TSDBROW_TS(p2)); + ASSERT(pReader->pkChecked); + return 0; + } + + tsdbRowGetKey(p1, &k1); + tsdbRowGetKey(p2, &k2); + return pReader->pkComparFn(&k1.key.pks[0].val, &k2.key.pks[0].val); +} + +static int32_t pkComp1(STsdbReader* pReader, STsdbRowKey* p1, TSDBROW* p2) { + if (pReader->pkComparFn == NULL) { + ASSERT(p1->key.ts != TSDBROW_TS(p2)); + ASSERT(pReader->pkChecked); + return 0; + } + + STsdbRowKey k2 = {0}; + tsdbRowGetKey(p2, &k2); + return pReader->pkComparFn(&p1->key.pks[0].val, &k2.key.pks[0].val); +} + static int32_t setColumnIdSlotList(SBlockLoadSuppInfo* pSupInfo, SColumnInfo* pCols, const int32_t* pSlotIdList, int32_t numOfCols) { pSupInfo->smaValid = true; @@ -1503,7 +1530,9 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* tsLast = getCurrentKeyInSttBlock(pSttBlockReader); } - TSDBKEY k = TSDBROW_KEY(pRow); + STsdbRowKey k; + tsdbRowGetKey(pRow, &k); + TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); // merge is not initialized yet, due to the fact that the pReader->info.pSchema is not initialized @@ -1522,8 +1551,8 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* minKey = tsLast; } - if (minKey > k.ts) { - minKey = k.ts; + if (minKey > k.key.ts) { + minKey = k.key.ts; } if (minKey > key && hasDataInFileBlock(pBlockData, pDumpInfo)) { @@ -1535,8 +1564,8 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* minKey = tsLast; } - if (minKey < k.ts) { - minKey = k.ts; + if (minKey < k.key.ts) { + minKey = k.key.ts; } if (minKey < key && hasDataInFileBlock(pBlockData, pDumpInfo)) { @@ -1564,7 +1593,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange, pReader->idStr); } - if (minKey == k.ts) { + if (minKey == k.key.ts) { STSchema* pTSchema = NULL; if (pRow->type == TSDBROW_ROW_FMT) { pTSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid); @@ -1578,13 +1607,13 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* return code; } - code = doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, pReader); + code = doMergeRowsInBuf(pIter, pBlockScanInfo->uid, &k, pBlockScanInfo->delSkyline, pReader); if (code != TSDB_CODE_SUCCESS) { return code; } } } else { - if (minKey == k.ts) { + if (minKey == k.key.ts) { STSchema* pTSchema = NULL; if (pRow->type == TSDBROW_ROW_FMT) { pTSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid); @@ -1598,7 +1627,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* return code; } - code = doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, pReader); + code = doMergeRowsInBuf(pIter, pBlockScanInfo->uid, &k, pBlockScanInfo->delSkyline, pReader); if (code != TSDB_CODE_SUCCESS || pMerger->pTSchema == NULL) { return code; } @@ -1746,8 +1775,9 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* int64_t key = hasDataInFileBlock(pBlockData, pDumpInfo) ? pBlockData->aTSKEY[pDumpInfo->rowIndex] : INT64_MIN; - TSDBKEY k = TSDBROW_KEY(pRow); - TSDBKEY ik = TSDBROW_KEY(piRow); + STsdbRowKey k, ik; + tsdbRowGetKey(pRow, &k); + tsdbRowGetKey(piRow, &ik); STSchema* pSchema = NULL; if (pRow->type == TSDBROW_ROW_FMT) { @@ -1777,12 +1807,12 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* int64_t minKey = 0; if (ASCENDING_TRAVERSE(pReader->info.order)) { minKey = INT64_MAX; // let's find the minimum - if (minKey > k.ts) { - minKey = k.ts; + if (minKey > k.key.ts) { + minKey = k.key.ts; } - if (minKey > ik.ts) { - minKey = ik.ts; + if (minKey > ik.key.ts) { + minKey = ik.key.ts; } if (minKey > key && hasDataInFileBlock(pBlockData, pDumpInfo)) { @@ -1794,12 +1824,12 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* } } else { minKey = INT64_MIN; // let find the maximum ts value - if (minKey < k.ts) { - minKey = k.ts; + if (minKey < k.key.ts) { + minKey = k.key.ts; } - if (minKey < ik.ts) { - minKey = ik.ts; + if (minKey < ik.key.ts) { + minKey = ik.key.ts; } if (minKey < key && hasDataInFileBlock(pBlockData, pDumpInfo)) { @@ -1834,49 +1864,49 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange, pReader->idStr); } - if (minKey == ik.ts) { + if (minKey == ik.key.ts) { code = tsdbRowMergerAdd(pMerger, piRow, piSchema); if (code != TSDB_CODE_SUCCESS) { return code; } - code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, pReader); + code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, &ik, pBlockScanInfo->delSkyline, pReader); if (code != TSDB_CODE_SUCCESS) { return code; } } - if (minKey == k.ts) { + if (minKey == k.key.ts) { code = tsdbRowMergerAdd(pMerger, pRow, pSchema); if (code != TSDB_CODE_SUCCESS) { return code; } - code = doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, pReader); + code = doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, &k, pBlockScanInfo->delSkyline, pReader); if (code != TSDB_CODE_SUCCESS) { return code; } } } else { - if (minKey == k.ts) { + if (minKey == k.key.ts) { code = tsdbRowMergerAdd(pMerger, pRow, pSchema); if (code != TSDB_CODE_SUCCESS) { return code; } - code = doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, pReader); + code = doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, &k, pBlockScanInfo->delSkyline, pReader); if (code != TSDB_CODE_SUCCESS) { return code; } } - if (minKey == ik.ts) { + if (minKey == ik.key.ts) { code = tsdbRowMergerAdd(pMerger, piRow, piSchema); if (code != TSDB_CODE_SUCCESS) { return code; } - code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, pReader); + code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, &ik, pBlockScanInfo->delSkyline, pReader); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -3395,7 +3425,7 @@ TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* p } } -int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, STsdbReader* pReader) { +int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, STsdbRowKey *pCurKey, SArray* pDelList, STsdbReader* pReader) { SRowMerger* pMerger = &pReader->status.merger; while (1) { @@ -3411,8 +3441,11 @@ int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDe } // ts is not identical, quit - TSDBKEY k = TSDBROW_KEY(pRow); - if (k.ts != ts) { + if (TSDBROW_TS(pRow) != pCurKey->key.ts) { + break; + } + + if (pkComp1(pReader, pCurKey, pRow) != 0) { break; } @@ -3528,25 +3561,14 @@ int32_t doMergeRowsInSttBlock(SSttBlockReader* pSttBlockReader, STableBlockScanI return TSDB_CODE_SUCCESS; } -static int32_t pkComp(STsdbReader* pReader, TSDBROW* p1, TSDBROW* p2) { - STsdbRowKey k1 = {0}, k2 = {0}; - - if (pReader->pkComparFn == NULL) { - ASSERT(TSDBROW_TS(p1) != TSDBROW_TS(p2)); - ASSERT(pReader->pkChecked); - return 0; - } - - tsdbRowGetKey(p1, &k1); - tsdbRowGetKey(p2, &k2); - return pReader->pkComparFn(&k1.key.pks[0].val, &k2.key.pks[0].val); -} - int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, TSDBROW* pResRow, STsdbReader* pReader, bool* freeTSRow) { TSDBROW* pNextRow = NULL; TSDBROW current = *pRow; + STsdbRowKey curKey = {0}; + tsdbRowGetKey(¤t, &curKey); + { // if the timestamp of the next valid row has a different ts, return current row directly pIter->hasVal = tsdbTbDataIterNext(pIter->iter); @@ -3562,7 +3584,7 @@ int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, return TSDB_CODE_SUCCESS; } - if (TSDBROW_TS(¤t) != TSDBROW_TS(pNextRow) || (pkComp(pReader, ¤t, pNextRow) != 0)) { + if (TSDBROW_TS(¤t) != TSDBROW_TS(pNextRow) || (pkComp1(pReader, &curKey, pNextRow) != 0)) { *pResRow = current; *freeTSRow = false; return TSDB_CODE_SUCCESS; @@ -3600,7 +3622,7 @@ int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, return code; } - code = doMergeRowsInBuf(pIter, uid, TSDBROW_TS(¤t), pDelList, pReader); + code = doMergeRowsInBuf(pIter, uid, &curKey, pDelList, pReader); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -3621,8 +3643,9 @@ int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* p SRow** pTSRow) { SRowMerger* pMerger = &pReader->status.merger; - TSDBKEY k = TSDBROW_KEY(pRow); - TSDBKEY ik = TSDBROW_KEY(piRow); + STsdbRowKey k, ik; + tsdbRowGetKey(pRow, &k); + tsdbRowGetKey(piRow, &ik); STSchema* pSchema = NULL; if (pRow->type == TSDBROW_ROW_FMT) { @@ -3646,13 +3669,13 @@ int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* p return code; } - code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, pReader); + code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, &ik, pBlockScanInfo->delSkyline, pReader); if (code != TSDB_CODE_SUCCESS) { return code; } tsdbRowMergerAdd(&pReader->status.merger, pRow, pSchema); - code = doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, pReader); + code = doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, &k, pBlockScanInfo->delSkyline, pReader); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -3663,13 +3686,13 @@ int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* p return code; } - code = doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, pReader); + code = doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, &k, pBlockScanInfo->delSkyline, pReader); if (code != TSDB_CODE_SUCCESS) { return code; } tsdbRowMergerAdd(&pReader->status.merger, piRow, piSchema); - code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, pReader); + code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, &ik, pBlockScanInfo->delSkyline, pReader); if (code != TSDB_CODE_SUCCESS) { return code; }