From fd8f065c6757bf5292f59fc95ec3c966d7a54f5b Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 21 Mar 2024 15:08:23 +0800 Subject: [PATCH] refactor: do some internal refactor. --- source/dnode/vnode/src/inc/tsdb.h | 4 +- source/dnode/vnode/src/tsdb/tsdbMergeTree.c | 4 +- source/dnode/vnode/src/tsdb/tsdbRead2.c | 204 ++++++++++---------- source/dnode/vnode/src/tsdb/tsdbReadUtil.c | 13 +- source/dnode/vnode/src/tsdb/tsdbReadUtil.h | 5 +- source/dnode/vnode/src/tsdb/tsdbUtil.c | 42 ++-- 6 files changed, 127 insertions(+), 145 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 7295e9849e..2b806e95a5 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -125,8 +125,8 @@ int32_t tsdbRowCompare(const void *p1, const void *p2); int32_t tsdbRowCompareWithoutVersion(const void *p1, const void *p2); int32_t tsdbRowKeyCmpr(const STsdbRowKey *key1, const STsdbRowKey *key2); void tsdbRowGetKey(TSDBROW *row, STsdbRowKey *key); -void tsdbColRowGetKey(SBlockData *pBlock, int32_t irow, STsdbRowKey *key); -int32_t tsdbRowKeyAssign(STsdbRowKey *pDst, STsdbRowKey *pSrc); +void tColRowGetKey(SBlockData *pBlock, int32_t irow, SRowKey *key); +int32_t tRowKeyAssign(SRowKey *pDst, SRowKey *pSrc); // STSDBRowIter int32_t tsdbRowIterOpen(STSDBRowIter *pIter, TSDBROW *pRow, STSchema *pTSchema); diff --git a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c index ee6f0a8c84..095e427462 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c +++ b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c @@ -419,7 +419,6 @@ static int32_t doLoadSttFilesBlk(SSttBlockLoadInfo *pBlockLoadInfo, SLDataIter * return code; } -#if 0 // load stt statistics block for all stt-blocks, to decide if the data of queried table exists in current stt file TStatisBlkArray *pStatisBlkArray = NULL; code = tsdbSttFileReadStatisBlk(pIter->pReader, (const TStatisBlkArray **)&pStatisBlkArray); @@ -434,7 +433,6 @@ static int32_t doLoadSttFilesBlk(SSttBlockLoadInfo *pBlockLoadInfo, SLDataIter * tsdbError("failed to load stt statistics block data, code:%s, %s", tstrerror(code), idStr); return code; } -#endif code = loadTombFn(pReader1, pIter->pReader, pIter->pBlockLoadInfo); @@ -817,7 +815,7 @@ int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf, SSttDataInfoF tMergeTreeAddIter(pMTree, pIter); // let's record the time window for current table of uid in the stt files - if (pSttDataInfo != NULL) { + if (pSttDataInfo != NULL && numOfRows > 0) { taosArrayPush(pSttDataInfo->pTimeWindowList, &w); pSttDataInfo->numOfRows += numOfRows; } diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 654e765ef7..6c410df0ce 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -25,6 +25,15 @@ #define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC) #define getCurrentKeyInSttBlock(_r) (&((_r)->currentKey)) +#define tRowGetKeyEx(_pRow, _pKey) \ + do { \ + if ((_pRow)->type == TSDBROW_ROW_FMT) { \ + tRowGetKey((_pRow)->pTSRow, (_pKey)); \ + } else { \ + tColRowGetKey((_pRow)->pBlockData, (_pRow)->iRow, (_pKey)); \ + } \ + } while (0) + typedef struct { bool overlapWithNeighborBlock; bool hasDupTs; @@ -41,8 +50,8 @@ static int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, i static TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader); static int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader); static int32_t doMergeRowsInSttBlock(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pScanInfo, - STsdbRowKey* pRowKey, SRowMerger* pMerger, SVersionRange* pVerRange, const char* id); -static int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, STsdbRowKey* pCurKey, SArray* pDelList, + SRowKey* pRowKey, SRowMerger* pMerger, SVersionRange* pVerRange, const char* id); +static int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, SRowKey* pCurKey, SArray* pDelList, STsdbReader* pReader); static int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, SRow* pTSRow, STableBlockScanInfo* pScanInfo); @@ -93,19 +102,19 @@ static int32_t pkComp(STsdbReader* pReader, TSDBROW* p1, TSDBROW* p2) { return pReader->pkComparFn(&k1.key.pks[0].val, &k2.key.pks[0].val); } -static int32_t pkComp1(STsdbReader* pReader, STsdbRowKey* p1, TSDBROW* p2) { +static int32_t pkComp1(STsdbReader* pReader, SRowKey* p1, TSDBROW* p2) { if (pReader->pkComparFn == NULL) { - ASSERT(p1->key.ts != TSDBROW_TS(p2)); + ASSERT(p1->ts != TSDBROW_TS(p2)); return 0; } - STsdbRowKey k2 = {0}; - tsdbRowGetKey(p2, &k2); - return pReader->pkComparFn(&p1->key.pks[0].val, &k2.key.pks[0].val); + SRowKey k2 = {0}; + tRowGetKeyEx(p2, &k2); + return pReader->pkComparFn(&p1->pks[0].val, &k2.pks[0].val); } -static int32_t pkComp2(STsdbReader* pReader, STsdbRowKey* p1, STsdbRowKey* p2) { - return pReader->pkComparFn(&p1->key.pks[0].val, &p2->key.pks[0].val); +static int32_t pkComp2(__compar_fn_t comparFn, SRowKey* p1, SRowKey* p2) { + return comparFn(&p1->pks[0].val, &p2->pks[0].val); } static int32_t setColumnIdSlotList(SBlockLoadSuppInfo* pSupInfo, SColumnInfo* pCols, const int32_t* pSlotIdList, @@ -136,6 +145,7 @@ static int32_t setColumnIdSlotList(SBlockLoadSuppInfo* pSupInfo, SColumnInfo* pC if (pCols[i].pk) { pSupInfo->pk = pCols[i]; + pSupInfo->pk.slotId = pSlotIdList[i]; pSupInfo->numOfPks += 1; } } @@ -207,6 +217,7 @@ static int32_t initFilesetIterator(SFilesetIter* pIter, TFileSetArray* pFileSetA pLReader->window = pReader->info.window; pLReader->verRange = pReader->info.verRange; pLReader->numOfPks = pReader->suppInfo.numOfPks; + pLReader->pkComparFn = pReader->pkComparFn; pLReader->uid = 0; tMergeTreeClose(&pLReader->mergeTree); @@ -431,13 +442,15 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, void initReaderStatus(&pReader->status); pReader->pTsdb = getTsdbByRetentions(pVnode, pCond, pVnode->config.tsdbCfg.retentions, idstr, &level); + pReader->info.suid = pCond->suid; pReader->info.order = pCond->order; + pReader->info.verRange = getQueryVerRange(pVnode, pCond, level); + pReader->info.window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows); pReader->idStr = (idstr != NULL) ? taosStrdup(idstr) : NULL; - pReader->info.verRange = getQueryVerRange(pVnode, pCond, level); pReader->type = pCond->type; - pReader->info.window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows); + pReader->blockInfoBuf.numPerBucket = 1000; // 1000 tables per bucket code = initResBlockInfo(&pReader->resBlockInfo, capacity, pResBlock, pCond); @@ -621,9 +634,9 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN // todo: here we should find the first timestamp that is greater than the lastProcKey if (asc) { - w.skey = pScanInfo->lastProcKey.key.ts + step; + w.skey = pScanInfo->lastProcKey.ts + step; } else { - w.ekey = pScanInfo->lastProcKey.key.ts + step; + w.ekey = pScanInfo->lastProcKey.ts + step; } if (isEmptyQueryTimeWindow(&w)) { @@ -954,7 +967,7 @@ static void blockInfoToRecord(SBrinRecord* record, SFileDataBlockInfo* pBlockInf record->count = pBlockInfo->count; } -static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STsdbRowKey* pLastProcKey) { +static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, SRowKey* pLastProcKey) { SReaderStatus* pStatus = &pReader->status; SDataBlockIter* pBlockIter = &pStatus->blockIter; SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo; @@ -1103,7 +1116,7 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STsdbRowKey* pLas pResBlock->info.rows = dumpedRows; pDumpInfo->rowIndex += step * dumpedRows; - tsdbColRowGetKey(pBlockData, pDumpInfo->rowIndex - step, pLastProcKey); + tColRowGetKey(pBlockData, pDumpInfo->rowIndex - step, pLastProcKey); // check if current block are all handled if (pDumpInfo->rowIndex >= 0 && pDumpInfo->rowIndex < pRecord->numRow) { @@ -1426,11 +1439,11 @@ static bool tryCopyDistinctRowFromFileBlock(STsdbReader* pReader, SBlockData* pB if ((pDumpInfo->rowIndex < pDumpInfo->totalRows - 1 && asc) || (pDumpInfo->rowIndex > 0 && (!asc))) { int32_t step = ASCENDING_TRAVERSE(pReader->info.order)? 1 : -1; - STsdbRowKey rowKey, nextRowKey; - tsdbColRowGetKey(pBlockData, pDumpInfo->rowIndex, &rowKey); - tsdbColRowGetKey(pBlockData, pDumpInfo->rowIndex + step, &nextRowKey); + SRowKey rowKey, nextRowKey; + tColRowGetKey(pBlockData, pDumpInfo->rowIndex, &rowKey); + tColRowGetKey(pBlockData, pDumpInfo->rowIndex + step, &nextRowKey); - if (rowKey.key.ts != nextRowKey.key.ts || (pkComp2(pReader, &rowKey, &nextRowKey) != 0)) { // merge is not needed + if (rowKey.ts != nextRowKey.ts || (pkComp2(pReader->pkComparFn, &rowKey, &nextRowKey) != 0)) { // merge is not needed code = doAppendRowFromFileBlock(pReader->resBlockInfo.pResBlock, pReader, pBlockData, pDumpInfo->rowIndex); if (code) { return code; @@ -1462,26 +1475,13 @@ static bool nextRowFromSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockSc int64_t key = pRow->pBlockData->aTSKEY[pRow->iRow]; int64_t ver = pRow->pBlockData->aVersion[pRow->iRow]; - if (pSttBlockReader->numOfPks < 0) {// todo handle the deep copy problem - tsdbColRowGetKey(pRow->pBlockData, pRow->iRow, &pSttBlockReader->currentKey); - pSttBlockReader->numOfPks = pSttBlockReader->currentKey.key.numOfPKs; - if (pSttBlockReader->numOfPks > 0) { - pSttBlockReader->pkComparFn = getComparFunc(pSttBlockReader->currentKey.key.pks[0].type, 0); - } - + if (pSttBlockReader->numOfPks == 0) { + pSttBlockReader->currentKey.ts = key; + // todo handle error + pScanInfo->sttKeyInfo.nextProcKey = key; + } else { // todo handle the deep copy problem + tColRowGetKey(pRow->pBlockData, pRow->iRow, &pSttBlockReader->currentKey); pScanInfo->sttKeyInfo.nextProcKey = key; - } else { - if (pSttBlockReader->numOfPks == 0) { - pSttBlockReader->currentKey.key.ts = key; - pSttBlockReader->currentKey.version = ver; - - // todo handle error - pScanInfo->sttKeyInfo.nextProcKey = key; - } else { - // todo handle the deep copy problem - tsdbColRowGetKey(pRow->pBlockData, pRow->iRow, &pSttBlockReader->currentKey); - pScanInfo->sttKeyInfo.nextProcKey = key; - } } if (pScanInfo->delSkyline != NULL && TARRAY_SIZE(pScanInfo->delSkyline) > 0) { @@ -1500,22 +1500,22 @@ static void doPinSttBlock(SSttBlockReader* pSttBlockReader) { tMergeTreePinSttBl static void doUnpinSttBlock(SSttBlockReader* pSttBlockReader) { tMergeTreeUnpinSttBlock(&pSttBlockReader->mergeTree); } -static int32_t pkCompEx(__compar_fn_t comparFn, STsdbRowKey* p1, STsdbRowKey* p2) { - if (p1->key.ts < p2->key.ts) { +static int32_t pkCompEx(__compar_fn_t comparFn, SRowKey* p1, SRowKey* p2) { + if (p1->ts < p2->ts) { return -1; - } else if (p1->key.ts > p2->key.ts) { + } else if (p1->ts > p2->ts) { return 1; } - if (p1->key.numOfPKs == 0) { + if (p1->numOfPKs == 0) { return 0; } else { - return comparFn(&p1->key.pks[0].val, &p2->key.pks[0].val); + return comparFn(&p1->pks[0].val, &p2->pks[0].val); } } static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SSttBlockReader* pSttBlockReader, - STableBlockScanInfo* pScanInfo, STsdbRowKey* pSttKey, STsdbReader* pReader, + STableBlockScanInfo* pScanInfo, SRowKey* pSttKey, STsdbReader* pReader, bool* copied) { int32_t code = TSDB_CODE_SUCCESS; *copied = false; @@ -1525,10 +1525,10 @@ static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SSttBlockReader* pSttB bool hasVal = nextRowFromSttBlocks(pSttBlockReader, pScanInfo, &pReader->info.verRange); doUnpinSttBlock(pSttBlockReader); if (hasVal) { - STsdbRowKey nextKey; + SRowKey nextKey; TSDBROW* pNextRow = tMergeTreeGetRow(&pSttBlockReader->mergeTree); - tsdbRowGetKey(pNextRow, &nextKey); + tRowGetKeyEx(pNextRow, &nextKey); if (pkCompEx(pReader->pkComparFn, pSttKey, &nextKey) != 0) { code = doAppendRowFromFileBlock(pReader->resBlockInfo.pResBlock, pReader, fRow->pBlockData, fRow->iRow); @@ -1586,20 +1586,20 @@ static FORCE_INLINE STSchema* doGetSchemaForTSRow(int32_t sversion, STsdbReader* } static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, TSDBROW* pRow, - SIterInfo* pIter, int64_t key, SSttBlockReader* pSttBlockReader) { + SIterInfo* pIter, SSttBlockReader* pSttBlockReader) { SRowMerger* pMerger = &pReader->status.merger; SRow* pTSRow = NULL; SBlockData* pBlockData = &pReader->status.fileBlockData; SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; __compar_fn_t compFn = pReader->pkComparFn; - STsdbRowKey* pSttKey = NULL; + SRowKey* pSttKey = NULL; if (hasDataInSttBlock(pBlockScanInfo) && (!pBlockScanInfo->cleanSttBlocks)) { pSttKey = getCurrentKeyInSttBlock(pSttBlockReader); } - STsdbRowKey k; - tsdbRowGetKey(pRow, &k); + SRowKey k; + tRowGetKeyEx(pRow, &k); STSchema* pSchema = NULL; if (pRow->type == TSDBROW_ROW_FMT) { @@ -1609,9 +1609,9 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* } } - STsdbRowKey* pfKey = &(STsdbRowKey){0}; + SRowKey* pfKey = &(SRowKey){0}; if (hasDataInFileBlock(pBlockData, pDumpInfo)) { - tsdbColRowGetKey(pBlockData, pDumpInfo->rowIndex, pfKey); + tColRowGetKey(pBlockData, pDumpInfo->rowIndex, pfKey); } else { pfKey = NULL; } @@ -1627,8 +1627,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* } } -// int64_t minKey = 0; - STsdbRowKey minKey; + SRowKey minKey; if (pReader->info.order == TSDB_ORDER_ASC) { minKey = k; // chosen the minimum value @@ -1757,12 +1756,12 @@ static int32_t mergeFileBlockAndSttBlock(STsdbReader* pReader, SSttBlockReader* } else { // row in both stt file blocks and data file blocks TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); - STsdbRowKey* pSttKey = getCurrentKeyInSttBlock(pSttBlockReader); + SRowKey* pSttKey = getCurrentKeyInSttBlock(pSttBlockReader); if (ASCENDING_TRAVERSE(pReader->info.order)) { - if (key < pSttKey->key.ts) { // asc + if (key < pSttKey->ts) { // asc return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader); - } else if (key > pSttKey->key.ts) { + } else if (key > pSttKey->ts) { return mergeRowsInSttBlocks(pSttBlockReader, pBlockScanInfo, pReader); } @@ -1776,9 +1775,9 @@ static int32_t mergeFileBlockAndSttBlock(STsdbReader* pReader, SSttBlockReader* } } } else { // desc - if (key > pSttKey->key.ts) { + if (key > pSttKey->ts) { return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader); - } else if (key < pSttKey->key.ts) { + } else if (key < pSttKey->ts) { return mergeRowsInSttBlocks(pSttBlockReader, pBlockScanInfo, pReader); } @@ -1854,21 +1853,21 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* TSDBROW* pRow = getValidMemRow(&pBlockScanInfo->iter, pDelList, pReader); TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pDelList, pReader); - STsdbRowKey* pSttKey = NULL; + SRowKey* pSttKey = NULL; if (hasDataInSttBlock(pBlockScanInfo) && (!pBlockScanInfo->cleanSttBlocks)) { pSttKey = getCurrentKeyInSttBlock(pSttBlockReader); } - STsdbRowKey* pfKey = &(STsdbRowKey){0}; + SRowKey* pfKey = &(SRowKey){0}; if (hasDataInFileBlock(pBlockData, pDumpInfo)) { - tsdbColRowGetKey(pBlockData, pDumpInfo->rowIndex, pfKey); + tColRowGetKey(pBlockData, pDumpInfo->rowIndex, pfKey); } else { pfKey = NULL; } - STsdbRowKey k, ik; - tsdbRowGetKey(pRow, &k); - tsdbRowGetKey(piRow, &ik); + SRowKey k, ik; + tRowGetKeyEx(pRow, &k); + tRowGetKeyEx(piRow, &ik); STSchema* pSchema = NULL; if (pRow->type == TSDBROW_ROW_FMT) { @@ -1895,7 +1894,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* } } - STsdbRowKey minKey; + SRowKey minKey; if (ASCENDING_TRAVERSE(pReader->info.order)) { minKey = k; // let's find the minimum @@ -2070,13 +2069,13 @@ static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbRea if (ASCENDING_TRAVERSE(pReader->info.order)) { startKey = (STsdbRowKey){.version = pReader->info.verRange.minVer, .key = { - .ts = pBlockScanInfo->lastProcKey.key.ts + 1, + .ts = pBlockScanInfo->lastProcKey.ts + 1, .numOfPKs = 0, // TODO: change here if multi-key is supported }}; } else { startKey = (STsdbRowKey){.version = pReader->info.verRange.maxVer, .key = { - .ts = pBlockScanInfo->lastProcKey.key.ts - 1, + .ts = pBlockScanInfo->lastProcKey.ts - 1, .numOfPKs = 0, // TODO: change here if multi-key is supported }}; } @@ -2121,14 +2120,14 @@ static bool isValidFileBlockRow(SBlockData* pBlockData, int32_t rowIndex, STable return false; } - if ((asc && (ts < pBlockScanInfo->lastProcKey.key.ts)) || ((!asc) && (ts > pBlockScanInfo->lastProcKey.key.ts))) { + if ((asc && (ts < pBlockScanInfo->lastProcKey.ts)) || ((!asc) && (ts > pBlockScanInfo->lastProcKey.ts))) { return false; } - if (ts == pBlockScanInfo->lastProcKey.key.ts) { // todo opt perf - STsdbRowKey nextRowKey; - tsdbColRowGetKey(pBlockData, rowIndex, &nextRowKey); - if (pkComp2(pReader, &pBlockScanInfo->lastProcKey, &nextRowKey) == 0) { + if (ts == pBlockScanInfo->lastProcKey.ts) { // todo opt perf + SRowKey nextRowKey; + tColRowGetKey(pBlockData, rowIndex, &nextRowKey); + if (pkComp2(pReader->pkComparFn, &pBlockScanInfo->lastProcKey, &nextRowKey) == 0) { return false; } } @@ -2202,7 +2201,7 @@ static bool initSttBlockReader(SSttBlockReader* pSttBlockReader, STableBlockScan initMemDataIterator(pScanInfo, pReader); initDelSkylineIterator(pScanInfo, pReader->info.order, &pReader->cost); - if (0 /*conf.rspRows*/) { + if (conf.rspRows) { pScanInfo->cleanSttBlocks = isCleanSttBlock(info.pTimeWindowList, &pReader->info.window, pScanInfo, pReader->info.order); @@ -2226,7 +2225,7 @@ static bool initSttBlockReader(SSttBlockReader* pSttBlockReader, STableBlockScan pScanInfo->sttKeyInfo.status = taosArrayGetSize(info.pTimeWindowList) ? STT_FILE_HAS_DATA : STT_FILE_NO_DATA; pScanInfo->sttKeyInfo.nextProcKey = ASCENDING_TRAVERSE(pReader->info.order) ? pScanInfo->sttWindow.skey : pScanInfo->sttWindow.ekey; - hasData = true; + hasData = (pScanInfo->sttKeyInfo.status == STT_FILE_HAS_DATA); } else { // not clean stt blocks INIT_TIMEWINDOW(&pScanInfo->sttWindow); // reset the time window pScanInfo->sttBlockReturned = false; @@ -2279,10 +2278,10 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc if (copied) { if (pReader->suppInfo.numOfPks == 0) { - pBlockScanInfo->lastProcKey.key.ts = key; + pBlockScanInfo->lastProcKey.ts = key; } else { // todo use deep copy instead of shallow copy int32_t step = ASCENDING_TRAVERSE(pReader->info.order)? 1 : -1; - tsdbColRowGetKey(pBlockData, pDumpInfo->rowIndex - step, &pBlockScanInfo->lastProcKey); + tColRowGetKey(pBlockData, pDumpInfo->rowIndex - step, &pBlockScanInfo->lastProcKey); } return TSDB_CODE_SUCCESS; } else { @@ -2312,11 +2311,9 @@ int32_t mergeRowsInSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockScanIn STsdbReader* pReader) { bool copied = false; SRow* pTSRow = NULL; - STsdbRowKey* pSttKey = NULL;//getCurrentKeyInSttBlock(pSttBlockReader); + SRowKey sttKey = {0}; - STsdbRowKey newSttKey; - tsdbRowKeyAssign(&newSttKey, getCurrentKeyInSttBlock(pSttBlockReader)); - pSttKey = &newSttKey; + tRowKeyAssign(&sttKey, getCurrentKeyInSttBlock(pSttBlockReader)); SRowMerger* pMerger = &pReader->status.merger; TSDBROW* pRow = tMergeTreeGetRow(&pSttBlockReader->mergeTree); @@ -2325,13 +2322,13 @@ int32_t mergeRowsInSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockScanIn tsdbTrace("fRow ptr:%p, %d, uid:%" PRIu64 ", ts:%" PRId64 " %s", pRow->pBlockData, pRow->iRow, pSttBlockReader->uid, fRow.pBlockData->aTSKEY[fRow.iRow], pReader->idStr); - int32_t code = tryCopyDistinctRowFromSttBlock(&fRow, pSttBlockReader, pBlockScanInfo, pSttKey, pReader, &copied); + int32_t code = tryCopyDistinctRowFromSttBlock(&fRow, pSttBlockReader, pBlockScanInfo, &sttKey, pReader, &copied); if (code) { return code; } if (copied) { - tsdbRowKeyAssign(&pBlockScanInfo->lastProcKey, pSttKey); + tRowKeyAssign(&pBlockScanInfo->lastProcKey, &sttKey); return TSDB_CODE_SUCCESS; } else { code = tsdbRowMergerAdd(pMerger, &fRow, NULL); @@ -2341,7 +2338,7 @@ int32_t mergeRowsInSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockScanIn TSDBROW* pRow1 = tMergeTreeGetRow(&pSttBlockReader->mergeTree); tsdbRowMergerAdd(pMerger, pRow1, NULL); - doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, pSttKey, pMerger, &pReader->info.verRange, pReader->idStr); + doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, &sttKey, pMerger, &pReader->info.verRange, pReader->idStr); code = tsdbRowMergerGetRow(pMerger, &pTSRow); if (code != TSDB_CODE_SUCCESS) { @@ -2379,12 +2376,12 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI // imem + file + stt block if (pBlockScanInfo->iiter.hasVal) { - return doMergeBufAndFileRows(pReader, pBlockScanInfo, piRow, &pBlockScanInfo->iiter, key, pSttBlockReader); + return doMergeBufAndFileRows(pReader, pBlockScanInfo, piRow, &pBlockScanInfo->iiter, pSttBlockReader); } // mem + file + stt block if (pBlockScanInfo->iter.hasVal) { - return doMergeBufAndFileRows(pReader, pBlockScanInfo, pRow, &pBlockScanInfo->iter, key, pSttBlockReader); + return doMergeBufAndFileRows(pReader, pBlockScanInfo, pRow, &pBlockScanInfo->iter, pSttBlockReader); } // files data blocks + stt block @@ -2789,7 +2786,7 @@ static void buildCleanBlockFromSttFiles(STsdbReader* pReader, STableBlockScanInf pScanInfo->sttKeyInfo.status = STT_FILE_NO_DATA; ASSERT(0); - pScanInfo->lastProcKey.key.ts = asc ? pScanInfo->sttWindow.ekey : pScanInfo->sttWindow.skey; + pScanInfo->lastProcKey.ts = asc ? pScanInfo->sttWindow.ekey : pScanInfo->sttWindow.skey; pScanInfo->sttBlockReturned = true; pSttBlockReader->mergeTree.pIter = NULL; @@ -2829,7 +2826,7 @@ static void buildCleanBlockFromDataFiles(STsdbReader* pReader, STableBlockScanIn setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlockInfo->lastKey, pReader->info.order); // update the last key for the corresponding table - SRowKey* pKey = &pScanInfo->lastProcKey.key; + SRowKey* pKey = &pScanInfo->lastProcKey; pKey->ts = asc ? pInfo->window.ekey : pInfo->window.skey; pKey->numOfPKs = pReader->suppInfo.numOfPks; @@ -3041,10 +3038,10 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { } // data in stt now overlaps with current active file data block, need to composed with file data block. - STsdbRowKey* pSttKey = getCurrentKeyInSttBlock(pSttBlockReader); - if ((pSttKey->key.ts >= pBlockInfo->firstKey && asc) || (pSttKey->key.ts <= pBlockInfo->lastKey && (!asc))) { + SRowKey* pSttKey = getCurrentKeyInSttBlock(pSttBlockReader); + if ((pSttKey->ts >= pBlockInfo->firstKey && asc) || (pSttKey->ts <= pBlockInfo->lastKey && (!asc))) { tsdbDebug("%p lastKeyInStt:%" PRId64 ", overlap with file block, brange:%" PRId64 "-%" PRId64 " %s", pReader, - pSttKey->key.ts, pBlockInfo->firstKey, pBlockInfo->lastKey, pReader->idStr); + pSttKey->ts, pBlockInfo->firstKey, pBlockInfo->lastKey, pReader->idStr); break; } } @@ -3539,7 +3536,7 @@ TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* p } } -int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, STsdbRowKey *pCurKey, SArray* pDelList, STsdbReader* pReader) { +int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, SRowKey *pCurKey, SArray* pDelList, STsdbReader* pReader) { SRowMerger* pMerger = &pReader->status.merger; while (1) { @@ -3555,7 +3552,7 @@ int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, STsdbRowKey *pCurKey, S } // ts is not identical, quit - if (TSDBROW_TS(pRow) != pCurKey->key.ts) { + if (TSDBROW_TS(pRow) != pCurKey->ts) { break; } @@ -3659,10 +3656,10 @@ int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pSc -int32_t doMergeRowsInSttBlock(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pScanInfo, STsdbRowKey* pRowKey, +int32_t doMergeRowsInSttBlock(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pScanInfo, SRowKey* pRowKey, SRowMerger* pMerger, SVersionRange* pVerRange, const char* idStr) { while (nextRowFromSttBlocks(pSttBlockReader, pScanInfo, pVerRange)) { - STsdbRowKey* next1 = getCurrentKeyInSttBlock(pSttBlockReader); + SRowKey* next1 = getCurrentKeyInSttBlock(pSttBlockReader); int32_t ret = pkCompEx(pSttBlockReader->pkComparFn, pRowKey, next1); if (ret == 0) { @@ -3685,8 +3682,8 @@ int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, TSDBROW* pNextRow = NULL; TSDBROW current = *pRow; - STsdbRowKey curKey = {0}; - tsdbRowGetKey(¤t, &curKey); + SRowKey curKey = {0}; + tRowGetKeyEx(¤t, &curKey); { // if the timestamp of the next valid row has a different ts, return current row directly pIter->hasVal = tsdbTbDataIterNext(pIter->iter); @@ -3762,9 +3759,9 @@ int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* p SRow** pTSRow) { SRowMerger* pMerger = &pReader->status.merger; - STsdbRowKey k, ik; - tsdbRowGetKey(pRow, &k); - tsdbRowGetKey(piRow, &ik); + SRowKey k, ik; + tRowGetKeyEx(pRow, &k); + tRowGetKeyEx(piRow, &ik); STSchema* pSchema = NULL; if (pRow->type == TSDBROW_ROW_FMT) { @@ -3934,7 +3931,7 @@ int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, SRow* pT // todo no version TSDBROW row = {.pTSRow = pTSRow, .type = TSDBROW_ROW_FMT}; - tsdbRowGetKey(&row, &pScanInfo->lastProcKey); + tRowGetKeyEx(&row, &pScanInfo->lastProcKey); // pScanInfo->lastProcKey = pTSRow->ts; return TSDB_CODE_SUCCESS; } @@ -4001,7 +3998,6 @@ int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t e } if (row.type == TSDBROW_ROW_FMT) { - int64_t ts = row.pTSRow->ts; code = doAppendRowFromTSRow(pBlock, pReader, row.pTSRow, pBlockScanInfo); if (freeTSRow) { @@ -4012,13 +4008,13 @@ int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t e return code; } - tsdbRowGetKey(&row, &pBlockScanInfo->lastProcKey); + tRowGetKey(row.pTSRow, &pBlockScanInfo->lastProcKey); } else { code = doAppendRowFromFileBlock(pBlock, pReader, row.pBlockData, row.iRow); if (code) { return code; } - tsdbColRowGetKey(row.pBlockData, row.iRow, &pBlockScanInfo->lastProcKey); + tColRowGetKey(row.pBlockData, row.iRow, &pBlockScanInfo->lastProcKey); } // no data in buffer, return immediately diff --git a/source/dnode/vnode/src/tsdb/tsdbReadUtil.c b/source/dnode/vnode/src/tsdb/tsdbReadUtil.c index 4afacea145..fb43e5b713 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbReadUtil.c @@ -165,18 +165,23 @@ SSHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, SBlockInfoBuf* pBuf if (ASCENDING_TRAVERSE(pTsdbReader->info.order)) { int64_t skey = pTsdbReader->info.window.skey; - pScanInfo->lastProcKey.key.ts = (skey > INT64_MIN) ? (skey - 1) : skey; + pScanInfo->lastProcKey.ts = (skey > INT64_MIN) ? (skey - 1) : skey; pScanInfo->sttKeyInfo.nextProcKey = skey; } else { int64_t ekey = pTsdbReader->info.window.ekey; - pScanInfo->lastProcKey.key.ts = (ekey < INT64_MAX) ? (ekey + 1) : ekey; + pScanInfo->lastProcKey.ts = (ekey < INT64_MAX) ? (ekey + 1) : ekey; pScanInfo->sttKeyInfo.nextProcKey = ekey; } + pScanInfo->lastProcKey.numOfPKs = pTsdbReader->suppInfo.numOfPks; + if (pTsdbReader->suppInfo.numOfPks > 0 && IS_VAR_DATA_TYPE(pTsdbReader->suppInfo.pk.type)) { + pScanInfo->lastProcKey.pks[0].pData = taosMemoryCalloc(1, pTsdbReader->suppInfo.pk.bytes); + } + pScanInfo->sttKeyInfo.status = STT_FILE_READER_UNINIT; tSimpleHashPut(pTableMap, &pScanInfo->uid, sizeof(uint64_t), &pScanInfo, POINTER_BYTES); tsdbTrace("%p check table uid:%" PRId64 " from lastKey:%" PRId64 " %s", pTsdbReader, pScanInfo->uid, - pScanInfo->lastProcKey.key.ts, pTsdbReader->idStr); + pScanInfo->lastProcKey.ts, pTsdbReader->idStr); } taosSort(pUidList->tableUidList, numOfTables, sizeof(uint64_t), uidComparFunc); @@ -209,7 +214,7 @@ void resetAllDataBlockScanInfo(SSHashObj* pTableMap, int64_t ts, int32_t step) { } pInfo->delSkyline = taosArrayDestroy(pInfo->delSkyline); - pInfo->lastProcKey.key.ts = ts; + pInfo->lastProcKey.ts = ts; ASSERT(0); pInfo->sttKeyInfo.nextProcKey = ts + step; diff --git a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h index 738b508206..70fe50fa96 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h +++ b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h @@ -87,8 +87,7 @@ typedef struct SSttKeyInfo { // 4. not overlap with data file blocks typedef struct STableBlockScanInfo { uint64_t uid; -// TSKEY lastProcKey; // todo: refactor: add primary key - STsdbRowKey lastProcKey; + SRowKey lastProcKey; SSttKeyInfo sttKeyInfo; SArray* pBlockList; // block data index list, SArray SArray* pBlockIdxList; // SArray @@ -171,7 +170,7 @@ typedef struct SSttBlockReader { int32_t order; uint64_t uid; SMergeTree mergeTree; - STsdbRowKey currentKey; + SRowKey currentKey; int32_t numOfPks; __compar_fn_t pkComparFn; } SSttBlockReader; diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index d701387263..02a14d03d7 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -606,27 +606,13 @@ void tsdbRowGetKey(TSDBROW *row, STsdbRowKey *key) { tRowGetKey(row->pTSRow, &key->key); } else { key->version = row->pBlockData->aVersion[row->iRow]; - key->key.ts = row->pBlockData->aTSKEY[row->iRow]; - key->key.numOfPKs = 0; - for (int32_t i = 0; i < row->pBlockData->nColData; i++) { - SColData *pColData = &row->pBlockData->aColData[i]; - if (pColData->cflag & COL_IS_KEY) { - SColVal cv; - tColDataGetValue(pColData, row->iRow, &cv); - ASSERT(COL_VAL_IS_VALUE(&cv)); - key->key.pks[key->key.numOfPKs] = cv.value; - key->key.numOfPKs++; - } else { - break; - } - } + tColRowGetKey(row->pBlockData, row->iRow, &key->key); } } -void tsdbColRowGetKey(SBlockData* pBlock, int32_t irow, STsdbRowKey* key) { - key->version = pBlock->aVersion[irow]; - key->key.ts = pBlock->aTSKEY[irow]; - key->key.numOfPKs = 0; +void tColRowGetKey(SBlockData* pBlock, int32_t irow, SRowKey* key) { + key->ts = pBlock->aTSKEY[irow]; + key->numOfPKs = 0; for (int32_t i = 0; i < pBlock->nColData; i++) { SColData *pColData = &pBlock->aColData[i]; @@ -634,25 +620,23 @@ void tsdbColRowGetKey(SBlockData* pBlock, int32_t irow, STsdbRowKey* key) { SColVal cv; tColDataGetValue(pColData, irow, &cv); ASSERT(COL_VAL_IS_VALUE(&cv)); - key->key.pks[key->key.numOfPKs] = cv.value; - key->key.numOfPKs++; + key->pks[key->numOfPKs] = cv.value; + key->numOfPKs++; } else { break; } } } -int32_t tsdbRowKeyAssign(STsdbRowKey *pDst, STsdbRowKey* pSrc) { - pDst->version = pSrc->version; - - if (pSrc->key.numOfPKs == 0) { - pDst->key.ts = pSrc->key.ts; - pDst->key.numOfPKs = 0; +int32_t tRowKeyAssign(SRowKey *pDst, SRowKey* pSrc) { + if (pSrc->numOfPKs == 0) { + pDst->ts = pSrc->ts; + pDst->numOfPKs = 0; } else { - pDst->key = pSrc->key; + *pDst = *pSrc; - for (int32_t i = 0; i < pDst->key.numOfPKs; ++i) { - SValue *pVal = &pDst->key.pks[i]; + for (int32_t i = 0; i < pDst->numOfPKs; ++i) { + SValue *pVal = &pDst->pks[i]; if (IS_NUMERIC_TYPE(pVal->type)) { continue; }