From dc954e112e5f89da002ff9b25f4428ad7ed8b846 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 5 Apr 2024 19:47:59 +0800 Subject: [PATCH] fix(tsdb): add key for lastProcessKey, and remove invalid assign for key --- source/common/src/tdataformat.c | 2 +- source/dnode/vnode/src/inc/tsdb.h | 8 +- source/dnode/vnode/src/tsdb/tsdbMergeTree.c | 86 ++++++++---- source/dnode/vnode/src/tsdb/tsdbRead2.c | 142 +++++++++----------- source/dnode/vnode/src/tsdb/tsdbReadUtil.c | 23 ++-- source/dnode/vnode/src/tsdb/tsdbReadUtil.h | 51 ++++--- 6 files changed, 180 insertions(+), 132 deletions(-) diff --git a/source/common/src/tdataformat.c b/source/common/src/tdataformat.c index ff25954fc1..179c1449fe 100644 --- a/source/common/src/tdataformat.c +++ b/source/common/src/tdataformat.c @@ -1306,8 +1306,8 @@ int32_t tRowKeyAssign(SRowKey *pDst, SRowKey *pSrc) { if (IS_NUMERIC_TYPE(pVal->type)) { pVal->val = pSrc->pks[i].val; } else { - memcpy(pVal->pData, pSrc->pks[i].pData, pVal->nData); pVal->nData = pSrc->pks[i].nData; + memcpy(pVal->pData, pSrc->pks[i].pData, pVal->nData); } } } diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 761e74b9cf..a574583561 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -758,9 +758,11 @@ typedef struct SBlockDataInfo { // todo: move away typedef struct { SArray *pUid; + SArray *pFirstTs; + SArray *pLastTs; + SArray *pCount; SArray *pFirstKey; SArray *pLastKey; - SArray *pCount; } SSttTableRowsInfo; typedef struct SSttBlockLoadInfo { @@ -836,7 +838,7 @@ struct SLDataIter { STimeWindow timeWindow; SVersionRange verRange; SSttBlockLoadInfo *pBlockLoadInfo; - SRowKey startRowKey; // current row key + SRowKey* pStartRowKey; // current row key __compar_fn_t comparFn; bool ignoreEarlierTs; struct SSttFileReader *pReader; @@ -870,7 +872,7 @@ typedef struct SMergeTreeConf { } SMergeTreeConf; typedef struct SSttDataInfoForTable { - SArray *pTimeWindowList; + SArray *pKeyRangeList; int64_t numOfRows; } SSttDataInfoForTable; diff --git a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c index b3dd1dec0e..3af0d40382 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c +++ b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c @@ -359,18 +359,51 @@ static int32_t loadSttStatisticsBlockData(SSttFileReader *pSttFileReader, SSttBl if (i < rows) { if (pBlockLoadInfo->info.pUid == NULL) { pBlockLoadInfo->info.pUid = taosArrayInit(rows, sizeof(int64_t)); - pBlockLoadInfo->info.pFirstKey = taosArrayInit(rows, sizeof(int64_t)); - pBlockLoadInfo->info.pLastKey = taosArrayInit(rows, sizeof(int64_t)); + pBlockLoadInfo->info.pFirstTs = taosArrayInit(rows, sizeof(int64_t)); + pBlockLoadInfo->info.pLastTs = taosArrayInit(rows, sizeof(int64_t)); pBlockLoadInfo->info.pCount = taosArrayInit(rows, sizeof(int64_t)); + + pBlockLoadInfo->info.pFirstKey = taosArrayInit(rows, sizeof(SValue)); + pBlockLoadInfo->info.pLastKey = taosArrayInit(rows, sizeof(SValue)); } if (pStatisBlkArray->data[k].maxTbid.suid == suid) { taosArrayAddBatch(pBlockLoadInfo->info.pUid, tBufferGetDataAt(&block.uids, i * sizeof(int64_t)), rows - i); - taosArrayAddBatch(pBlockLoadInfo->info.pFirstKey, + taosArrayAddBatch(pBlockLoadInfo->info.pFirstTs, tBufferGetDataAt(&block.firstKeyTimestamps, i * sizeof(int64_t)), rows - i); - taosArrayAddBatch(pBlockLoadInfo->info.pLastKey, - tBufferGetDataAt(&block.lastKeyTimestamps, i * sizeof(int64_t)), rows - i); + taosArrayAddBatch(pBlockLoadInfo->info.pLastTs, tBufferGetDataAt(&block.lastKeyTimestamps, i * sizeof(int64_t)), + rows - i); taosArrayAddBatch(pBlockLoadInfo->info.pCount, tBufferGetDataAt(&block.counts, i * sizeof(int64_t)), rows - i); + + SValue vFirst = {0}, vLast = {0}; + for (int32_t f = i; f < rows; ++f) { + int32_t code = tValueColumnGet(&block.firstKeyPKs[0], f, &vFirst); + if (code) { + break; + } + + if (IS_VAR_DATA_TYPE(vFirst.type)) { + char *p = (char *)vFirst.pData; + char *pBuf = taosMemoryMalloc(vFirst.nData); + memcpy(pBuf, p, vFirst.nData); + vFirst.pData = (uint8_t *)pBuf; + } + taosArrayPush(pBlockLoadInfo->info.pFirstKey, &vFirst); + + code = tValueColumnGet(&block.lastKeyPKs[0], f, &vLast); + if (code) { + break; + } + + if (IS_VAR_DATA_TYPE(vLast.type)) { + char *p = (char *)vLast.pData; + char *pBuf = taosMemoryMalloc(vLast.nData); + memcpy(pBuf, p, vLast.nData); + vLast.pData = (uint8_t *)pBuf; + } + taosArrayPush(pBlockLoadInfo->info.pLastKey, &vLast); + } + } else { STbStatisRecord record; while (i < rows) { @@ -380,9 +413,13 @@ static int32_t loadSttStatisticsBlockData(SSttFileReader *pSttFileReader, SSttBl } taosArrayPush(pBlockLoadInfo->info.pUid, &record.uid); - taosArrayPush(pBlockLoadInfo->info.pFirstKey, &record.firstKey.ts); - taosArrayPush(pBlockLoadInfo->info.pLastKey, &record.lastKey.ts); taosArrayPush(pBlockLoadInfo->info.pCount, &record.count); + + taosArrayPush(pBlockLoadInfo->info.pFirstTs, &record.firstKey.ts); + taosArrayPush(pBlockLoadInfo->info.pLastTs, &record.lastKey.ts); + + taosArrayPush(pBlockLoadInfo->info.pFirstKey, &record.firstKey.pks[0]); + taosArrayPush(pBlockLoadInfo->info.pLastKey, &record.lastKey.pks[0]); i += 1; } } @@ -452,23 +489,26 @@ static int32_t uidComparFn(const void *p1, const void *p2) { } } -static void setSttInfoForCurrentTable(SSttBlockLoadInfo *pLoadInfo, uint64_t uid, STimeWindow *pTimeWindow, +static void setSttInfoForCurrentTable(SSttBlockLoadInfo *pLoadInfo, uint64_t uid, SSttKeyRange *pRange, int64_t *numOfRows) { - if (pTimeWindow == NULL || taosArrayGetSize(pLoadInfo->info.pUid) == 0) { + if (pRange == NULL || taosArrayGetSize(pLoadInfo->info.pUid) == 0) { return; } int32_t index = taosArraySearchIdx(pLoadInfo->info.pUid, &uid, uidComparFn, TD_EQ); if (index >= 0) { - pTimeWindow->skey = *(int64_t *)taosArrayGet(pLoadInfo->info.pFirstKey, index); - pTimeWindow->ekey = *(int64_t *)taosArrayGet(pLoadInfo->info.pLastKey, index); + pRange->skey.ts = *(int64_t *)taosArrayGet(pLoadInfo->info.pFirstTs, index); + pRange->ekey.ts = *(int64_t *)taosArrayGet(pLoadInfo->info.pLastTs, index); *numOfRows += *(int64_t *)taosArrayGet(pLoadInfo->info.pCount, index); + + memcpy(&pRange->skey.pks[0], taosArrayGet(pLoadInfo->info.pFirstKey, index), sizeof(SValue)); + memcpy(&pRange->ekey.pks[0], taosArrayGet(pLoadInfo->info.pLastKey, index), sizeof(SValue)); } } int32_t tLDataIterOpen2(SLDataIter *pIter, SSttFileReader *pSttFileReader, int32_t cid, int8_t backward, - SMergeTreeConf *pConf, SSttBlockLoadInfo *pBlockLoadInfo, STimeWindow *pTimeWindow, + SMergeTreeConf *pConf, SSttBlockLoadInfo *pBlockLoadInfo, SSttKeyRange *pKeyRange, int64_t *numOfRows, const char *idStr) { int32_t code = TSDB_CODE_SUCCESS; @@ -481,7 +521,7 @@ int32_t tLDataIterOpen2(SLDataIter *pIter, SSttFileReader *pSttFileReader, int32 pIter->timeWindow.ekey = pConf->timewindow.ekey; pIter->comparFn = pConf->comparFn; - tRowKeyAssign(&pIter->startRowKey, pConf->pCurRowKey); + pIter->pStartRowKey = pConf->pCurRowKey; pIter->pReader = pSttFileReader; pIter->pBlockLoadInfo = pBlockLoadInfo; @@ -500,7 +540,7 @@ int32_t tLDataIterOpen2(SLDataIter *pIter, SSttFileReader *pSttFileReader, int32 } } - setSttInfoForCurrentTable(pBlockLoadInfo, pConf->uid, pTimeWindow, numOfRows); + setSttInfoForCurrentTable(pBlockLoadInfo, pConf->uid, pKeyRange, numOfRows); // find the start block, actually we could load the position to avoid repeatly searching for the start position when // the skey is updated. @@ -629,10 +669,10 @@ static void findNextValidRow(SLDataIter *pIter, const char *idStr) { continue; } - if (ts == pIter->timeWindow.skey && pIter->startRowKey.numOfPKs > 0) { + if (ts == pIter->timeWindow.skey && pIter->pStartRowKey->numOfPKs > 0) { SRowKey key; tColRowGetKey(pData, i, &key); - int32_t ret = pkCompEx(pIter->comparFn, &key, &pIter->startRowKey); + int32_t ret = pkCompEx(pIter->comparFn, &key, pIter->pStartRowKey); if (ret < 0) { continue; } @@ -646,10 +686,10 @@ static void findNextValidRow(SLDataIter *pIter, const char *idStr) { continue; } - if (ts == pIter->timeWindow.ekey && pIter->startRowKey.numOfPKs > 0) { + if (ts == pIter->timeWindow.ekey && pIter->pStartRowKey->numOfPKs > 0) { SRowKey key; tColRowGetKey(pData, i, &key); - int32_t ret = pkCompEx(pIter->comparFn, &key, &pIter->startRowKey); + int32_t ret = pkCompEx(pIter->comparFn, &key, pIter->pStartRowKey); if (ret > 0) { continue; } @@ -825,11 +865,11 @@ int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf, SSttDataInfoF memset(pIter, 0, sizeof(SLDataIter)); - STimeWindow w = {0}; - int64_t numOfRows = 0; - int64_t cid = pSttLevel->fobjArr->data[i]->f->cid; + SSttKeyRange range = {.skey.numOfPKs = pConf->pCurRowKey->numOfPKs, .ekey.numOfPKs = pConf->pCurRowKey->numOfPKs}; + int64_t numOfRows = 0; + int64_t cid = pSttLevel->fobjArr->data[i]->f->cid; - code = tLDataIterOpen2(pIter, pSttFileReader, cid, pMTree->backward, pConf, pLoadInfo, &w, &numOfRows, + code = tLDataIterOpen2(pIter, pSttFileReader, cid, pMTree->backward, pConf, pLoadInfo, &range, &numOfRows, pMTree->idStr); if (code != TSDB_CODE_SUCCESS) { goto _end; @@ -841,7 +881,7 @@ int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf, SSttDataInfoF // let's record the time window for current table of uid in the stt files if (pSttDataInfo != NULL && numOfRows > 0) { - taosArrayPush(pSttDataInfo->pTimeWindowList, &w); + taosArrayPush(pSttDataInfo->pKeyRangeList, &range); pSttDataInfo->numOfRows += numOfRows; } } else { diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 2e6a0a43e9..a12575f1c6 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -49,8 +49,8 @@ static int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, i STsdbReader* pReader); static TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader); static int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, SRowKey* pKey, STsdbReader* pReader); -static int32_t doMergeRowsInSttBlock(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pScanInfo, SRowKey* pRowKey, - SRowMerger* pMerger, int32_t pkSrcSlot, SVersionRange* pVerRange, const char* id); +static int32_t doMergeRowsInSttBlock(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pScanInfo, + SRowMerger* pMerger, int32_t pkSrcSlot, SVersionRange* pVerRange, const char* id); static int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, SRowKey* pCurKey, SArray* pDelList, STsdbReader* pReader); static int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, SRow* pTSRow, @@ -265,9 +265,11 @@ static STimeWindow updateQueryTimeWindow(STsdb* pTsdb, STimeWindow* pWindow) { // init file iterator static int32_t initFilesetIterator(SFilesetIter* pIter, TFileSetArray* pFileSetArray, STsdbReader* pReader) { - size_t numOfFileset = TARRAY2_SIZE(pFileSetArray); + SBlockLoadSuppInfo* pInfo = &pReader->suppInfo; + size_t numOfFileset = TARRAY2_SIZE(pFileSetArray); + bool asc = ASCENDING_TRAVERSE(pReader->info.order); - pIter->index = ASCENDING_TRAVERSE(pReader->info.order) ? -1 : numOfFileset; + pIter->index = asc ? -1 : numOfFileset; pIter->order = pReader->info.order; pIter->pFilesetList = pFileSetArray; pIter->numOfFiles = numOfFileset; @@ -281,15 +283,17 @@ static int32_t initFilesetIterator(SFilesetIter* pIter, TFileSetArray* pFileSetA } } - SSttBlockReader* pLReader = pIter->pSttBlockReader; - pLReader->order = pReader->info.order; - pLReader->window = pReader->info.window; - pLReader->verRange = pReader->info.verRange; - pLReader->numOfPks = pReader->suppInfo.numOfPks; - pLReader->pkComparFn = pReader->pkComparFn; + SSttBlockReader* pSttReader = pIter->pSttBlockReader; + pSttReader->order = pReader->info.order; + pSttReader->window = pReader->info.window; + pSttReader->verRange = pReader->info.verRange; + pSttReader->numOfPks = pReader->suppInfo.numOfPks; + pSttReader->pkComparFn = pReader->pkComparFn; + pSttReader->uid = 0; + + tMergeTreeClose(&pSttReader->mergeTree); + initRowKey(&pSttReader->currentKey, INT64_MIN, pInfo->numOfPks, pInfo->pk.type, pInfo->pk.bytes, asc); - pLReader->uid = 0; - tMergeTreeClose(&pLReader->mergeTree); tsdbDebug("init fileset iterator, total files:%d %s", pIter->numOfFiles, pReader->idStr); return TSDB_CODE_SUCCESS; } @@ -1676,7 +1680,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* SRowKey* pSttKey = &(SRowKey){0}; if (hasDataInSttBlock(pBlockScanInfo) && (!pBlockScanInfo->cleanSttBlocks)) { - tRowKeyAssign(pSttKey, getCurrentKeyInSttBlock(pSttBlockReader)); + pSttKey = getCurrentKeyInSttBlock(pSttBlockReader); } else { pSttKey = NULL; } @@ -1753,8 +1757,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* if (code != TSDB_CODE_SUCCESS) { return code; } - doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, pSttKey, pMerger, pkSrcSlot, &pReader->info.verRange, - pReader->idStr); + doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, pMerger, pkSrcSlot, &pReader->info.verRange, pReader->idStr); } if (pkCompEx(compFn, &minKey, &k) == 0) { @@ -1851,8 +1854,7 @@ static int32_t mergeFileBlockAndSttBlock(STsdbReader* pReader, SSttBlockReader* } // pSttKey will be changed when sttBlockReader iterates to the next row, so use pKey instead. - doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, pKey, pMerger, pkSrcSlot, &pReader->info.verRange, - pReader->idStr); + doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, pMerger, pkSrcSlot, &pReader->info.verRange, pReader->idStr); code = tsdbRowMergerGetRow(pMerger, &pTSRow); if (code != TSDB_CODE_SUCCESS) { @@ -1966,15 +1968,14 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pfKey, pReader); } - if (pkCompEx(compFn, &minKey, pSttKey) == 0) { + if (pkCompEx(compFn, &minKey, &pBlockScanInfo->lastProcKey) == 0) { TSDBROW* pRow1 = tMergeTreeGetRow(&pSttBlockReader->mergeTree); code = tsdbRowMergerAdd(pMerger, pRow1, NULL); if (code != TSDB_CODE_SUCCESS) { return code; } - doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, pSttKey, pMerger, pkSrcSlot, &pReader->info.verRange, - pReader->idStr); + doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, pMerger, pkSrcSlot, &pReader->info.verRange, pReader->idStr); } if (pkCompEx(compFn, &minKey, &ik) == 0) { @@ -2155,8 +2156,9 @@ static bool isValidFileBlockRow(SBlockData* pBlockData, int32_t rowIndex, STable } static bool initSttBlockReader(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pScanInfo, STsdbReader* pReader) { - bool hasData = true; - bool asc = ASCENDING_TRAVERSE(pReader->info.order); + bool hasData = true; + int32_t order = pReader->info.order; + bool asc = ASCENDING_TRAVERSE(order); // the stt block reader has been initialized for this table. if (pSttBlockReader->uid == pScanInfo->uid) { @@ -2206,7 +2208,7 @@ static bool initSttBlockReader(SSttBlockReader* pSttBlockReader, STableBlockScan .rspRows = (pReader->info.execMode == READER_EXEC_ROWS), }; - SSttDataInfoForTable info = {.pTimeWindowList = taosArrayInit(4, sizeof(STimeWindow))}; + SSttDataInfoForTable info = {.pKeyRangeList = taosArrayInit(4, sizeof(SSttKeyRange))}; int32_t code = tMergeTreeOpen2(&pSttBlockReader->mergeTree, &conf, &info); if (code != TSDB_CODE_SUCCESS) { return false; @@ -2216,44 +2218,41 @@ static bool initSttBlockReader(SSttBlockReader* pSttBlockReader, STableBlockScan initDelSkylineIterator(pScanInfo, pReader->info.order, &pReader->cost); if (conf.rspRows) { - pScanInfo->cleanSttBlocks = - isCleanSttBlock(info.pTimeWindowList, &pReader->info.window, pScanInfo, pReader->info.order); - + pScanInfo->cleanSttBlocks = isCleanSttBlock(info.pKeyRangeList, &pReader->info.window, pScanInfo, order); if (pScanInfo->cleanSttBlocks) { pScanInfo->numOfRowsInStt = info.numOfRows; - pScanInfo->sttWindow.skey = INT64_MAX; - pScanInfo->sttWindow.ekey = INT64_MIN; // calculate the time window for data in stt files - for (int32_t i = 0; i < taosArrayGetSize(info.pTimeWindowList); ++i) { - STimeWindow* pWindow = taosArrayGet(info.pTimeWindowList, i); - if (pScanInfo->sttWindow.skey > pWindow->skey) { - pScanInfo->sttWindow.skey = pWindow->skey; + for (int32_t i = 0; i < taosArrayGetSize(info.pKeyRangeList); ++i) { + SSttKeyRange* pKeyRange = taosArrayGet(info.pKeyRangeList, i); + if (pkCompEx(pReader->pkComparFn, &pScanInfo->sttRange.skey, &pKeyRange->skey) > 0) { + tRowKeyAssign(&pScanInfo->sttRange.skey, &pKeyRange->skey); } - if (pScanInfo->sttWindow.ekey < pWindow->ekey) { - pScanInfo->sttWindow.ekey = pWindow->ekey; + if (pkCompEx(pReader->pkComparFn, &pScanInfo->sttRange.ekey, &pKeyRange->ekey) < 0) { + tRowKeyAssign(&pScanInfo->sttRange.ekey, &pKeyRange->ekey); } } - pScanInfo->sttKeyInfo.status = taosArrayGetSize(info.pTimeWindowList) ? STT_FILE_HAS_DATA : STT_FILE_NO_DATA; + pScanInfo->sttKeyInfo.status = taosArrayGetSize(info.pKeyRangeList) ? STT_FILE_HAS_DATA : STT_FILE_NO_DATA; + + SRowKey* p = asc? &pScanInfo->sttRange.skey:&pScanInfo->sttRange.ekey; + tRowKeyAssign(&pScanInfo->sttKeyInfo.nextProcKey, p); - // todo set the primary key value - pScanInfo->sttKeyInfo.nextProcKey.ts = asc ? pScanInfo->sttWindow.skey : pScanInfo->sttWindow.ekey; hasData = (pScanInfo->sttKeyInfo.status == STT_FILE_HAS_DATA); } else { // not clean stt blocks - INIT_TIMEWINDOW(&pScanInfo->sttWindow); //reset the time window + INIT_KEYRANGE(&pScanInfo->sttRange); //reset the time window pScanInfo->sttBlockReturned = false; hasData = nextRowFromSttBlocks(pSttBlockReader, pScanInfo, pReader->suppInfo.pkSrcSlot, &pReader->info.verRange); } } else { pScanInfo->cleanSttBlocks = false; - INIT_TIMEWINDOW(&pScanInfo->sttWindow); // reset the time window + INIT_KEYRANGE(&pScanInfo->sttRange); // reset the time window pScanInfo->sttBlockReturned = false; hasData = nextRowFromSttBlocks(pSttBlockReader, pScanInfo, pReader->suppInfo.pkSrcSlot, &pReader->info.verRange); } - taosArrayDestroy(info.pTimeWindowList); + taosArrayDestroy(info.pKeyRangeList); int64_t el = taosGetTimestampUs() - st; pReader->cost.initSttBlockReader += (el / 1000.0); @@ -2318,29 +2317,26 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc } } -int32_t mergeRowsInSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pBlockScanInfo, - STsdbReader* pReader) { - bool copied = false; - SRow* pTSRow = NULL; - SRowKey sttKey = {0}; - int32_t pkSrcSlot = pReader->suppInfo.pkSrcSlot; - - tRowKeyAssign(&sttKey, getCurrentKeyInSttBlock(pSttBlockReader)); - +int32_t mergeRowsInSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pScanInfo, STsdbReader* pReader) { + bool copied = false; + SRow* pTSRow = NULL; + int32_t pkSrcSlot = pReader->suppInfo.pkSrcSlot; SRowMerger* pMerger = &pReader->status.merger; - TSDBROW* pRow = tMergeTreeGetRow(&pSttBlockReader->mergeTree); - TSDBROW fRow = {.iRow = pRow->iRow, .type = TSDBROW_COL_FMT, .pBlockData = pRow->pBlockData}; + // let's record the last processed key + tRowKeyAssign(&pScanInfo->lastProcKey, getCurrentKeyInSttBlock(pSttBlockReader)); + + TSDBROW* pRow = tMergeTreeGetRow(&pSttBlockReader->mergeTree); + TSDBROW fRow = {.iRow = pRow->iRow, .type = TSDBROW_COL_FMT, .pBlockData = pRow->pBlockData}; tsdbTrace("fRow ptr:%p, %d, uid:%" PRIu64 ", ts:%" PRId64 " %s", pRow->pBlockData, pRow->iRow, pSttBlockReader->uid, fRow.pBlockData->aTSKEY[fRow.iRow], pReader->idStr); - int32_t code = tryCopyDistinctRowFromSttBlock(&fRow, pSttBlockReader, pBlockScanInfo, &sttKey, pReader, &copied); + int32_t code = + tryCopyDistinctRowFromSttBlock(&fRow, pSttBlockReader, pScanInfo, &pScanInfo->lastProcKey, pReader, &copied); if (code) { return code; } - tRowKeyAssign(&pBlockScanInfo->lastProcKey, &sttKey); - if (copied) { return TSDB_CODE_SUCCESS; } else { @@ -2351,14 +2347,13 @@ int32_t mergeRowsInSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockScanIn TSDBROW* pRow1 = tMergeTreeGetRow(&pSttBlockReader->mergeTree); tsdbRowMergerAdd(pMerger, pRow1, NULL); - doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, &sttKey, pMerger, pkSrcSlot, &pReader->info.verRange, pReader->idStr); - + doMergeRowsInSttBlock(pSttBlockReader, pScanInfo, pMerger, pkSrcSlot, &pReader->info.verRange, pReader->idStr); code = tsdbRowMergerGetRow(pMerger, &pTSRow); if (code != TSDB_CODE_SUCCESS) { return code; } - code = doAppendRowFromTSRow(pReader->resBlockInfo.pResBlock, pReader, pTSRow, pBlockScanInfo); + code = doAppendRowFromTSRow(pReader->resBlockInfo.pResBlock, pReader, pTSRow, pScanInfo); taosMemoryFree(pTSRow); tsdbRowMergerClear(pMerger); @@ -2796,23 +2791,18 @@ static void buildCleanBlockFromSttFiles(STsdbReader* pReader, STableBlockScanInf pInfo->rows = pScanInfo->numOfRowsInStt; pInfo->id.uid = pScanInfo->uid; pInfo->dataLoad = 1; - pInfo->window = pScanInfo->sttWindow; + pInfo->window.skey = pScanInfo->sttRange.skey.ts; + pInfo->window.ekey = pScanInfo->sttRange.ekey.ts; setComposedBlockFlag(pReader, true); - pScanInfo->sttKeyInfo.nextProcKey.ts = asc ? pScanInfo->sttWindow.ekey + 1 : pScanInfo->sttWindow.skey - 1; + pScanInfo->sttKeyInfo.nextProcKey.ts = asc ? pScanInfo->sttRange.ekey.ts + 1 : pScanInfo->sttRange.skey.ts - 1; pScanInfo->sttKeyInfo.status = STT_FILE_NO_DATA; - pScanInfo->lastProcKey.ts = asc ? pScanInfo->sttWindow.ekey : pScanInfo->sttWindow.skey; - if (pScanInfo->lastProcKey.numOfPKs > 0) { - ASSERT(0); -// if (IS_NUMERIC_TYPE(pKey->pks[0].type)) { -// pKey->pks[0].val = asc ? pBlockInfo->lastPk.val : pBlockInfo->firstPk.val; -// } else { -// uint8_t* p = asc ? pBlockInfo->lastPk.pData : pBlockInfo->firstPk.pData; -// pKey->pks[0].nData = asc ? pBlockInfo->lastPKLen : pBlockInfo->firstPKLen; -// memcpy(pKey->pks[0].pData, p, pKey->pks[0].nData); -// } + if (asc) { + tRowKeyAssign(&pScanInfo->lastProcKey, &pScanInfo->sttRange.ekey); + } else { + tRowKeyAssign(&pScanInfo->lastProcKey, &pScanInfo->sttRange.skey); } pScanInfo->sttBlockReturned = true; @@ -3014,18 +3004,18 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { (!asc && pBlockInfo->firstKey > keyInStt)) { if (pScanInfo->cleanSttBlocks && hasDataInSttBlock(pScanInfo)) { if (asc) { // file block is located before the stt block - ASSERT(pScanInfo->sttWindow.skey > pBlockInfo->lastKey); + ASSERT(pScanInfo->sttRange.skey.ts > pBlockInfo->lastKey); } else { // stt block is before the file block - ASSERT(pScanInfo->sttWindow.ekey < pBlockInfo->firstKey); + ASSERT(pScanInfo->sttRange.ekey.ts < pBlockInfo->firstKey); } } buildCleanBlockFromDataFiles(pReader, pScanInfo, pBlockInfo, pBlockIter->index); } else { // clean stt block if (asc) { - ASSERT(pScanInfo->sttWindow.ekey < pBlockInfo->firstKey); + ASSERT(pScanInfo->sttRange.ekey.ts < pBlockInfo->firstKey); } else { - ASSERT(pScanInfo->sttWindow.skey > pBlockInfo->lastKey); + ASSERT(pScanInfo->sttRange.skey.ts > pBlockInfo->lastKey); } // return the stt file block @@ -3682,8 +3672,10 @@ int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pSc return TSDB_CODE_SUCCESS; } -int32_t doMergeRowsInSttBlock(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pScanInfo, SRowKey* pRowKey, - SRowMerger* pMerger, int32_t pkSrcSlot, SVersionRange* pVerRange, const char* idStr) { +int32_t doMergeRowsInSttBlock(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pScanInfo, SRowMerger* pMerger, + int32_t pkSrcSlot, SVersionRange* pVerRange, const char* idStr) { + SRowKey* pRowKey = &pScanInfo->lastProcKey; + while (nextRowFromSttBlocks(pSttBlockReader, pScanInfo, pkSrcSlot, pVerRange)) { SRowKey* pNextKey = getCurrentKeyInSttBlock(pSttBlockReader); diff --git a/source/dnode/vnode/src/tsdb/tsdbReadUtil.c b/source/dnode/vnode/src/tsdb/tsdbReadUtil.c index 86c6b70c92..48a6b9a7a8 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbReadUtil.c @@ -130,7 +130,7 @@ STableBlockScanInfo* getTableBlockScanInfo(SSHashObj* pTableMap, uint64_t uid, c return *p; } -static int32_t initSRowKey(SRowKey* pKey, int64_t ts, int32_t numOfPks, int32_t type, int32_t len, bool asc) { +int32_t initRowKey(SRowKey* pKey, int64_t ts, int32_t numOfPks, int32_t type, int32_t len, bool asc) { pKey->numOfPKs = numOfPks; pKey->ts = ts; @@ -169,30 +169,33 @@ static int32_t initSRowKey(SRowKey* pKey, int64_t ts, int32_t numOfPks, int32_t static void initLastProcKey(STableBlockScanInfo *pScanInfo, STsdbReader* pReader) { int32_t numOfPks = pReader->suppInfo.numOfPks; - bool asc = ASCENDING_TRAVERSE(pReader->info.order); + bool asc = ASCENDING_TRAVERSE(pReader->info.order); + int8_t type = pReader->suppInfo.pk.type; + int8_t bytes = pReader->suppInfo.pk.bytes; SRowKey* pRowKey = &pScanInfo->lastProcKey; if (asc) { int64_t skey = pReader->info.window.skey; int64_t ts = (skey > INT64_MIN) ? (skey - 1) : skey; - initSRowKey(pRowKey, ts, numOfPks, pReader->suppInfo.pk.type, pReader->suppInfo.pk.bytes, asc); - initSRowKey(&pScanInfo->sttKeyInfo.nextProcKey, skey, numOfPks, pReader->suppInfo.pk.type, - pReader->suppInfo.pk.bytes, asc); + initRowKey(pRowKey, ts, numOfPks, type, bytes, asc); + initRowKey(&pScanInfo->sttKeyInfo.nextProcKey, skey, numOfPks, type, bytes, asc); } else { int64_t ekey = pReader->info.window.ekey; int64_t ts = (ekey < INT64_MAX) ? (ekey + 1) : ekey; - initSRowKey(pRowKey, ts, numOfPks, pReader->suppInfo.pk.type, pReader->suppInfo.pk.bytes, asc); - initSRowKey(&pScanInfo->sttKeyInfo.nextProcKey, ekey, numOfPks, pReader->suppInfo.pk.type, - pReader->suppInfo.pk.bytes, asc); + initRowKey(pRowKey, ts, numOfPks, type, bytes, asc); + initRowKey(&pScanInfo->sttKeyInfo.nextProcKey, ekey, numOfPks, type, bytes, asc); } + + initRowKey(&pScanInfo->sttRange.skey, INT64_MAX, numOfPks, type, bytes, asc); + initRowKey(&pScanInfo->sttRange.ekey, INT64_MIN, numOfPks, type, bytes, asc); } int32_t initTableBlockScanInfo(STableBlockScanInfo* pScanInfo, uint64_t uid, SSHashObj* pTableMap, STsdbReader* pReader) { pScanInfo->uid = uid; - INIT_TIMEWINDOW(&pScanInfo->sttWindow); + INIT_KEYRANGE(&pScanInfo->sttRange); INIT_TIMEWINDOW(&pScanInfo->filesetWindow); pScanInfo->cleanSttBlocks = false; @@ -311,7 +314,7 @@ static void doCleanupInfoForNextFileset(STableBlockScanInfo* pScanInfo) { pScanInfo->cleanSttBlocks = false; pScanInfo->numOfRowsInStt = 0; pScanInfo->sttBlockReturned = false; - INIT_TIMEWINDOW(&pScanInfo->sttWindow); + INIT_KEYRANGE(&pScanInfo->sttRange); INIT_TIMEWINDOW(&pScanInfo->filesetWindow); pScanInfo->sttKeyInfo.status = STT_FILE_READER_UNINIT; } diff --git a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h index c48b7479dc..af7d00e019 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h +++ b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h @@ -32,6 +32,12 @@ extern "C" { (_w)->ekey = INT64_MIN; \ } while (0); +#define INIT_KEYRANGE(_k) \ + do { \ + (_k)->skey.ts = INT64_MAX; \ + (_k)->ekey.ts = INT64_MIN; \ + } while (0); + typedef enum { READER_STATUS_SUSPEND = 0x1, READER_STATUS_NORMAL = 0x2, @@ -78,34 +84,38 @@ typedef enum ESttKeyStatus { typedef struct SSttKeyInfo { ESttKeyStatus status; // this value should be updated when switch to the next fileset SRowKey nextProcKey; - // int64_t nextProcKey; // todo remove this attribute, since it is impossible to set correct nextProcKey - // value } SSttKeyInfo; +typedef struct SSttKeyRange { + SRowKey skey; + SRowKey ekey; +} SSttKeyRange; + // clean stt file blocks: // 1. not overlap with stt blocks in other stt files of the same fileset // 2. not overlap with delete skyline // 3. not overlap with in-memory data (mem/imem) // 4. not overlap with data file blocks typedef struct STableBlockScanInfo { - uint64_t uid; - SRowKey lastProcKey; - SSttKeyInfo sttKeyInfo; - SArray* pBlockList; // block data index list, SArray - SArray* pBlockIdxList; // SArray - SArray* pMemDelData; // SArray - SArray* pFileDelData; // SArray from each file set - SIterInfo iter; // mem buffer skip list iterator - SIterInfo iiter; // imem buffer skip list iterator - SArray* delSkyline; // delete info for this table - int32_t fileDelIndex; // file block delete index - int32_t sttBlockDelIndex; // delete index for last block - bool iterInit; // whether to initialize the in-memory skip list iterator or not - bool cleanSttBlocks; // stt block is clean in current fileset - bool sttBlockReturned; // result block returned alreay - int64_t numOfRowsInStt; - STimeWindow sttWindow; // timestamp window for current stt files - STimeWindow filesetWindow; // timestamp window for current file set + uint64_t uid; + SRowKey lastProcKey; + SSttKeyInfo sttKeyInfo; + SArray* pBlockList; // block data index list, SArray + SArray* pBlockIdxList; // SArray + SArray* pMemDelData; // SArray + SArray* pFileDelData; // SArray from each file set + SIterInfo iter; // mem buffer skip list iterator + SIterInfo iiter; // imem buffer skip list iterator + SArray* delSkyline; // delete info for this table + int32_t fileDelIndex; // file block delete index + int32_t sttBlockDelIndex; // delete index for last block + bool iterInit; // whether to initialize the in-memory skip list iterator or not + bool cleanSttBlocks; // stt block is clean in current fileset + bool sttBlockReturned; // result block returned alreay + int64_t numOfRowsInStt; + SSttKeyRange sttRange; + // STimeWindow sttWindow; // timestamp window for current stt files + STimeWindow filesetWindow; // timestamp window for current file set } STableBlockScanInfo; typedef struct SResultBlockInfo { @@ -336,6 +346,7 @@ int32_t tsdbGetRowsInSttFiles(STFileSet* pFileSet, SArray* pSttFileBlockIterArra bool isCleanSttBlock(SArray* pTimewindowList, STimeWindow* pQueryWindow, STableBlockScanInfo* pScanInfo, int32_t order); bool overlapWithDelSkyline(STableBlockScanInfo* pBlockScanInfo, const SBrinRecord* pRecord, int32_t order); int32_t pkCompEx(__compar_fn_t comparFn, SRowKey* p1, SRowKey* p2); +int32_t initRowKey(SRowKey* pKey, int64_t ts, int32_t numOfPks, int32_t type, int32_t len, bool asc); typedef struct { SArray* pTombData;