From 5c729cc836f2d9eac23bceb83c9b26c43dd541a4 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 25 Mar 2024 17:57:59 +0800 Subject: [PATCH] refactor: do some internal refactor. --- source/dnode/vnode/src/inc/tsdb.h | 6 +- source/dnode/vnode/src/tsdb/tsdbMergeTree.c | 37 ++++++++++-- source/dnode/vnode/src/tsdb/tsdbRead2.c | 62 ++++++++++++++------- source/dnode/vnode/src/tsdb/tsdbReadUtil.c | 47 +++++++++++----- source/dnode/vnode/src/tsdb/tsdbReadUtil.h | 5 +- 5 files changed, 117 insertions(+), 40 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 2b806e95a5..761e74b9cf 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -836,6 +836,8 @@ struct SLDataIter { STimeWindow timeWindow; SVersionRange verRange; SSttBlockLoadInfo *pBlockLoadInfo; + SRowKey startRowKey; // current row key + __compar_fn_t comparFn; bool ignoreEarlierTs; struct SSttFileReader *pReader; }; @@ -846,7 +848,7 @@ struct SSttFileReader; typedef int32_t (*_load_tomb_fn)(STsdbReader *pReader, struct SSttFileReader *pSttFileReader, SSttBlockLoadInfo *pLoadInfo); -typedef struct { +typedef struct SMergeTreeConf { int8_t backward; STsdb *pTsdb; uint64_t suid; @@ -859,7 +861,9 @@ typedef struct { STSchema *pSchema; int16_t *pCols; int32_t numOfCols; + SRowKey *pCurRowKey; _load_tomb_fn loadTombFn; + __compar_fn_t comparFn; void *pReader; void *idstr; bool rspRows; // response the rows in stt-file, if possible diff --git a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c index 61c610538e..97194acb00 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c +++ b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c @@ -479,6 +479,9 @@ int32_t tLDataIterOpen2(SLDataIter *pIter, SSttFileReader *pSttFileReader, int32 pIter->verRange.maxVer = pConf->verRange.maxVer; pIter->timeWindow.skey = pConf->timewindow.skey; pIter->timeWindow.ekey = pConf->timewindow.ekey; + pIter->comparFn = pConf->comparFn; + + tRowKeyAssign(&pIter->startRowKey, pConf->pCurRowKey); pIter->pReader = pSttFileReader; pIter->pBlockLoadInfo = pBlockLoadInfo; @@ -618,17 +621,39 @@ static void findNextValidRow(SLDataIter *pIter, const char *idStr) { } int64_t ts = pData->aTSKEY[i]; - if (!pIter->backward) { // asc + if (!pIter->backward) { // asc if (ts > pIter->timeWindow.ekey) { // no more data break; - } else if (ts < pIter->timeWindow.skey) { - continue; + } else { + if (ts < pIter->timeWindow.skey) { + continue; + } + + if (ts == pIter->timeWindow.skey && pIter->startRowKey.numOfPKs > 0) { + SRowKey key; + tColRowGetKey(pData, i, &key); + int32_t ret = pkCompEx(pIter->comparFn, &key, &pIter->startRowKey); + if (ret < 0) { + continue; + } + } } } else { if (ts < pIter->timeWindow.skey) { break; - } else if (ts > pIter->timeWindow.ekey) { - continue; + } else { + if (ts > pIter->timeWindow.ekey) { + continue; + } + + if (ts == pIter->timeWindow.ekey && pIter->startRowKey.numOfPKs > 0) { + SRowKey key; + tColRowGetKey(pData, i, &key); + int32_t ret = pkCompEx(pIter->comparFn, &key, &pIter->startRowKey); + if (ret > 0) { + continue; + } + } } } @@ -802,8 +827,8 @@ int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf, SSttDataInfoF STimeWindow w = {0}; int64_t numOfRows = 0; + int64_t cid = pSttLevel->fobjArr->data[i]->f->cid; - int64_t cid = pSttLevel->fobjArr->data[i]->f->cid; code = tLDataIterOpen2(pIter, pSttFileReader, cid, pMTree->backward, pConf, pLoadInfo, &w, &numOfRows, pMTree->idStr); if (code != TSDB_CODE_SUCCESS) { diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 891fd823e7..80c4514579 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -89,7 +89,7 @@ static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWi static void resetPreFilesetMemTableListIndex(SReaderStatus* pStatus); -static int32_t pkCompEx(__compar_fn_t comparFn, SRowKey* p1, SRowKey* p2) { +int32_t pkCompEx(__compar_fn_t comparFn, SRowKey* p1, SRowKey* p2) { if (p2 == NULL) { return 1; } @@ -118,12 +118,13 @@ static void tColRowGetKeyDeepCopy(SBlockData* pBlock, int32_t irow, int32_t slot return; } - pKey->numOfPKs = 1; - SColData* pColData = &pBlock->aColData[slotId]; SColVal cv; tColDataGetValue(pColData, irow, &cv); + pKey->numOfPKs = 1; + pKey->pks[0].type = cv.value.type; + if (IS_NUMERIC_TYPE(cv.value.type)) { pKey->pks[0].val = cv.value.val; } else { @@ -1394,7 +1395,7 @@ static int64_t getBoarderKeyInFiles(SFileDataBlockInfo* pBlock, STableBlockScanI int64_t key = 0; if (pScanInfo->sttKeyInfo.status == STT_FILE_HAS_DATA) { - int64_t keyInStt = pScanInfo->sttKeyInfo.nextProcKey; + int64_t keyInStt = pScanInfo->sttKeyInfo.nextProcKey.ts; key = ascScan ? TMIN(pBlock->firstKey, keyInStt) : TMAX(pBlock->lastKey, keyInStt); } else { key = ascScan ? pBlock->firstKey : pBlock->lastKey; @@ -1437,9 +1438,10 @@ static void getBlockToLoadInfo(SDataBlockToLoadInfo* pInfo, SFileDataBlockInfo* pInfo->hasDupTs = (pBlockInfo->numRow > pBlockInfo->count) || (pBlockInfo->count <= 0); pInfo->overlapWithDelInfo = overlapWithDelSkyline(pScanInfo, &pRecord, pReader->info.order); + // todo handle the primary key overlap case ASSERT(pScanInfo->sttKeyInfo.status != STT_FILE_READER_UNINIT); if (pScanInfo->sttKeyInfo.status == STT_FILE_HAS_DATA) { - int64_t nextProcKeyInStt = pScanInfo->sttKeyInfo.nextProcKey; + int64_t nextProcKeyInStt = pScanInfo->sttKeyInfo.nextProcKey.ts; pInfo->overlapWithSttBlock = !(pBlockInfo->lastKey < nextProcKeyInStt || pBlockInfo->firstKey > nextProcKeyInStt); } @@ -1536,8 +1538,9 @@ static bool tryCopyDistinctRowFromFileBlock(STsdbReader* pReader, SBlockData* pB static bool nextRowFromSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pScanInfo, int32_t pkSrcSlot, SVersionRange* pVerRange) { - int32_t order = pSttBlockReader->order; - int32_t step = ASCENDING_TRAVERSE(order) ? 1 : -1; + int32_t order = pSttBlockReader->order; + int32_t step = ASCENDING_TRAVERSE(order) ? 1 : -1; + SRowKey* pNextProc = &pScanInfo->sttKeyInfo.nextProcKey; while (1) { bool hasVal = tMergeTreeNext(&pSttBlockReader->mergeTree); @@ -1545,7 +1548,14 @@ static bool nextRowFromSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockSc pScanInfo->sttKeyInfo.status = STT_FILE_NO_DATA; // next file, the timestamps in the next file must be greater than those in current - pScanInfo->sttKeyInfo.nextProcKey += step; + pNextProc->ts += step; + if (pSttBlockReader->numOfPks > 0) { + if (IS_NUMERIC_TYPE(pNextProc->pks[0].type)) { + pNextProc->pks[0].val = INT64_MIN; + } else { + memset(pNextProc->pks[0].pData, 0, pNextProc->pks[0].nData); + } + } return false; } @@ -1559,7 +1569,8 @@ static bool nextRowFromSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockSc tColRowGetKeyDeepCopy(pRow->pBlockData, pRow->iRow, pkSrcSlot, &pSttBlockReader->currentKey); } - pScanInfo->sttKeyInfo.nextProcKey = key; + tColRowGetKeyDeepCopy(pRow->pBlockData, pRow->iRow, pkSrcSlot, pNextProc); + if (pScanInfo->delSkyline != NULL && TARRAY_SIZE(pScanInfo->delSkyline) > 0) { if (!hasBeenDropped(pScanInfo->delSkyline, &pScanInfo->sttBlockDelIndex, key, ver, order, pVerRange)) { pScanInfo->sttKeyInfo.status = STT_FILE_HAS_DATA; @@ -2042,7 +2053,9 @@ static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbRea STbData* d = NULL; STbData* di = NULL; bool asc = ASCENDING_TRAVERSE(pReader->info.order); + bool forward = true; STsdbReadSnap* pSnap = pReader->pReadSnap; + STimeWindow* pWindow = &pReader->info.window; if (pBlockScanInfo->iterInit) { return TSDB_CODE_SUCCESS; @@ -2051,6 +2064,10 @@ static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbRea STsdbRowKey startKey = {0}; tRowKeyAssign(&startKey.key, &pBlockScanInfo->lastProcKey); startKey.version = asc ? pReader->info.verRange.minVer : pReader->info.verRange.maxVer; + if ((asc && (startKey.key.ts < pWindow->skey)) || ((!asc) && startKey.key.ts > pWindow->ekey)) { + startKey.key.ts = asc? pWindow->skey:pWindow->ekey; + forward = false; + } int32_t code = doInitMemDataIter(pReader, &d, pBlockScanInfo, &startKey, pSnap->pMem, &pBlockScanInfo->iter, "mem"); if (code != TSDB_CODE_SUCCESS) { @@ -2063,7 +2080,10 @@ static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbRea } loadMemTombData(&pBlockScanInfo->pMemDelData, d, di, pReader->info.verRange.maxVer); - forwardDataIter(&startKey.key, pBlockScanInfo, pReader); + + if (forward) { + forwardDataIter(&startKey.key, pBlockScanInfo, pReader); + } pBlockScanInfo->iterInit = true; return TSDB_CODE_SUCCESS; @@ -2115,6 +2135,7 @@ static bool isValidFileBlockRow(SBlockData* pBlockData, int32_t rowIndex, STable static bool initSttBlockReader(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pScanInfo, STsdbReader* pReader) { bool hasData = true; + bool asc = ASCENDING_TRAVERSE(pReader->info.order); // the stt block reader has been initialized for this table. if (pSttBlockReader->uid == pScanInfo->uid) { @@ -2133,10 +2154,10 @@ static bool initSttBlockReader(SSttBlockReader* pSttBlockReader, STableBlockScan } STimeWindow w = pSttBlockReader->window; - if (ASCENDING_TRAVERSE(pSttBlockReader->order)) { - w.skey = pScanInfo->sttKeyInfo.nextProcKey; + if (asc) { + w.skey = pScanInfo->sttKeyInfo.nextProcKey.ts; } else { - w.ekey = pScanInfo->sttKeyInfo.nextProcKey; + w.ekey = pScanInfo->sttKeyInfo.nextProcKey.ts; } int64_t st = taosGetTimestampUs(); @@ -2157,6 +2178,8 @@ static bool initSttBlockReader(SSttBlockReader* pSttBlockReader, STableBlockScan .pCols = pReader->suppInfo.colId, .numOfCols = pReader->suppInfo.numOfCols, .loadTombFn = loadSttTombDataForAll, + .pCurRowKey = &pScanInfo->sttKeyInfo.nextProcKey, + .comparFn = pReader->pkComparFn, .pReader = pReader, .idstr = pReader->idStr, .rspRows = (pReader->info.execMode == READER_EXEC_ROWS), @@ -2193,8 +2216,9 @@ static bool initSttBlockReader(SSttBlockReader* pSttBlockReader, STableBlockScan } pScanInfo->sttKeyInfo.status = taosArrayGetSize(info.pTimeWindowList) ? STT_FILE_HAS_DATA : STT_FILE_NO_DATA; - pScanInfo->sttKeyInfo.nextProcKey = - ASCENDING_TRAVERSE(pReader->info.order) ? pScanInfo->sttWindow.skey : pScanInfo->sttWindow.ekey; + + // todo set the primary key value + pScanInfo->sttKeyInfo.nextProcKey.ts = asc ? pScanInfo->sttWindow.skey : pScanInfo->sttWindow.ekey; hasData = (pScanInfo->sttKeyInfo.status == STT_FILE_HAS_DATA); } else { // not clean stt blocks INIT_TIMEWINDOW(&pScanInfo->sttWindow); //reset the time window @@ -2755,7 +2779,7 @@ static void buildCleanBlockFromSttFiles(STsdbReader* pReader, STableBlockScanInf setComposedBlockFlag(pReader, true); - pScanInfo->sttKeyInfo.nextProcKey = asc ? pScanInfo->sttWindow.ekey + 1 : pScanInfo->sttWindow.skey - 1; + pScanInfo->sttKeyInfo.nextProcKey.ts = asc ? pScanInfo->sttWindow.ekey + 1 : pScanInfo->sttWindow.skey - 1; pScanInfo->sttKeyInfo.status = STT_FILE_NO_DATA; pScanInfo->lastProcKey.ts = asc ? pScanInfo->sttWindow.ekey : pScanInfo->sttWindow.skey; @@ -2915,7 +2939,7 @@ static bool notOverlapWithFiles(SFileDataBlockInfo* pBlockInfo, STableBlockScanI if ((!hasDataInSttBlock(pScanInfo)) || (pScanInfo->cleanSttBlocks == true)) { return true; } else { - int64_t keyInStt = pScanInfo->sttKeyInfo.nextProcKey; + int64_t keyInStt = pScanInfo->sttKeyInfo.nextProcKey.ts; return (asc && pBlockInfo->lastKey < keyInStt) || (!asc && pBlockInfo->firstKey > keyInStt); } } @@ -2963,7 +2987,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { code = buildDataBlockFromBuf(pReader, pScanInfo, endKey); } else { if (notOverlapWithFiles(pBlockInfo, pScanInfo, asc)) { - int64_t keyInStt = pScanInfo->sttKeyInfo.nextProcKey; + int64_t keyInStt = pScanInfo->sttKeyInfo.nextProcKey.ts; if ((!hasDataInSttBlock(pScanInfo)) || (asc && pBlockInfo->lastKey < keyInStt) || (!asc && pBlockInfo->firstKey > keyInStt)) { @@ -3649,7 +3673,7 @@ int32_t doMergeRowsInSttBlock(SSttBlockReader* pSttBlockReader, STableBlockScanI } else { tsdbTrace("uid:%" PRIu64 " last del index:%d, del range:%d, lastKeyInStt:%" PRId64 ", %s", pScanInfo->uid, pScanInfo->sttBlockDelIndex, (int32_t)taosArrayGetSize(pScanInfo->delSkyline), - pScanInfo->sttKeyInfo.nextProcKey, idStr); + pScanInfo->sttKeyInfo.nextProcKey.ts, idStr); break; } } diff --git a/source/dnode/vnode/src/tsdb/tsdbReadUtil.c b/source/dnode/vnode/src/tsdb/tsdbReadUtil.c index 93bc5ca573..99f0e42261 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbReadUtil.c @@ -130,25 +130,46 @@ STableBlockScanInfo* getTableBlockScanInfo(SSHashObj* pTableMap, uint64_t uid, c return *p; } +static int32_t initSRowKey(SRowKey* pKey, int64_t ts, int32_t numOfPks, int32_t type, int32_t len) { + pKey->numOfPKs = numOfPks; + pKey->ts = ts; + + if (numOfPks > 0) { + pKey->pks[0].type = type; + if (IS_NUMERIC_TYPE(pKey->pks[0].type)) { + pKey->pks[0].val = INT64_MIN; + } else { + pKey->pks[0].pData = taosMemoryCalloc(1, len); + pKey->pks[0].nData = 0; + + if (pKey->pks[0].pData == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return terrno; + } + } + } + + return TSDB_CODE_SUCCESS; +} + static void initLastProcKey(STableBlockScanInfo *pScanInfo, STsdbReader* pReader) { + int32_t numOfPks = pReader->suppInfo.numOfPks; + SRowKey* pRowKey = &pScanInfo->lastProcKey; if (ASCENDING_TRAVERSE(pReader->info.order)) { int64_t skey = pReader->info.window.skey; - pRowKey->ts = (skey > INT64_MIN) ? (skey - 1) : skey; - pScanInfo->sttKeyInfo.nextProcKey = skey; + int64_t ts = (skey > INT64_MIN) ? (skey - 1) : skey; + + initSRowKey(pRowKey, ts, numOfPks, pReader->suppInfo.pk.type, pReader->suppInfo.pk.bytes); + initSRowKey(&pScanInfo->sttKeyInfo.nextProcKey, skey, numOfPks, pReader->suppInfo.pk.type, + pReader->suppInfo.pk.bytes); } else { int64_t ekey = pReader->info.window.ekey; - pRowKey->ts = (ekey < INT64_MAX) ? (ekey + 1) : ekey; - pScanInfo->sttKeyInfo.nextProcKey = ekey; - } + int64_t ts = (ekey < INT64_MAX) ? (ekey + 1) : ekey; - // only handle the first primary key. - pRowKey->numOfPKs = pReader->suppInfo.numOfPks; - if (pReader->suppInfo.numOfPks > 0) { - if (IS_VAR_DATA_TYPE(pReader->suppInfo.pk.type)) { - pRowKey->pks[0].pData = taosMemoryCalloc(1, pReader->suppInfo.pk.bytes); - } - pRowKey->pks[0].type = pReader->suppInfo.pk.type; + initSRowKey(pRowKey, ts, numOfPks, pReader->suppInfo.pk.type, pReader->suppInfo.pk.bytes); + initSRowKey(&pScanInfo->sttKeyInfo.nextProcKey, ekey, numOfPks, pReader->suppInfo.pk.type, + pReader->suppInfo.pk.bytes); } } @@ -230,7 +251,7 @@ void resetAllDataBlockScanInfo(SSHashObj* pTableMap, int64_t ts, int32_t step) { pInfo->delSkyline = taosArrayDestroy(pInfo->delSkyline); pInfo->lastProcKey.ts = ts; // todo check the nextProcKey info - pInfo->sttKeyInfo.nextProcKey = ts + step; + pInfo->sttKeyInfo.nextProcKey.ts = ts + step; } } diff --git a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h index 4131bb1b86..9cdb36e648 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h +++ b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h @@ -77,7 +77,9 @@ typedef enum ESttKeyStatus { typedef struct SSttKeyInfo { ESttKeyStatus status; // this value should be updated when switch to the next fileset - int64_t nextProcKey; // todo remove this attribute, since it is impossible to set correct nextProcKey value + SRowKey nextProcKey; + // int64_t nextProcKey; // todo remove this attribute, since it is impossible to set correct nextProcKey + // value } SSttKeyInfo; // clean stt file blocks: @@ -333,6 +335,7 @@ int32_t tsdbGetRowsInSttFiles(STFileSet* pFileSet, SArray* pSttFileBlockIterArra const char* pstr); bool isCleanSttBlock(SArray* pTimewindowList, STimeWindow* pQueryWindow, STableBlockScanInfo* pScanInfo, int32_t order); bool overlapWithDelSkyline(STableBlockScanInfo* pBlockScanInfo, const SBrinRecord* pRecord, int32_t order); +int32_t pkCompEx(__compar_fn_t comparFn, SRowKey* p1, SRowKey* p2); typedef struct { SArray* pTombData;