From 0c578ab6c0d53a4a6087e858d01d0fc80e382d7b Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 21 Mar 2024 10:44:27 +0800 Subject: [PATCH] refactor: do some internal refactor. --- include/common/tdatablock.h | 1 + source/common/src/tdatablock.c | 49 +++++++++++++++++++- source/dnode/vnode/src/tsdb/tsdbRead2.c | 52 ++++++++++++---------- source/dnode/vnode/src/tsdb/tsdbReadUtil.c | 8 ---- source/dnode/vnode/src/tsdb/tsdbReadUtil.h | 8 ++-- 5 files changed, 79 insertions(+), 39 deletions(-) diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h index fa6e4c3ae8..290c90fde9 100644 --- a/include/common/tdatablock.h +++ b/include/common/tdatablock.h @@ -199,6 +199,7 @@ int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, int32_t numOfRow1, int int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* pSource, int32_t numOfRows, const SDataBlockInfo* pBlockInfo); int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock, int32_t tsColumnIndex); +int32_t blockDataUpdatePkRange(SSDataBlock* pDataBlock, int32_t pkColumnIndex, bool asc); int32_t colDataGetLength(const SColumnInfoData* pColumnInfoData, int32_t numOfRows); diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index f0ecf2365c..5e334fb1ff 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -490,9 +490,9 @@ int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock, int32_t tsColumnIndex) return 0; } - if (pDataBlock->info.rows > 0) { +// if (pDataBlock->info.rows > 0) { // ASSERT(pDataBlock->info.dataLoad == 1); - } +// } size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock); if (numOfCols <= 0) { @@ -515,6 +515,51 @@ int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock, int32_t tsColumnIndex) return 0; } +int32_t blockDataUpdatePkRange(SSDataBlock* pDataBlock, int32_t pkColumnIndex, bool asc) { + if (pDataBlock == NULL || pDataBlock->info.rows <= 0 || pDataBlock->info.dataLoad == 0 || pkColumnIndex == -1) { + return 0; + } + + size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock); + if (numOfCols <= 0) { + return -1; + } + + SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, pkColumnIndex); + if (!IS_NUMERIC_TYPE(pColInfoData->info.type) && (pColInfoData->info.type != TSDB_DATA_TYPE_VARCHAR)) { + return 0; + } + + void* skey = colDataGetData(pColInfoData, 0); + void* ekey = colDataGetData(pColInfoData, (pDataBlock->info.rows - 1)); + + if (asc) { + if (IS_NUMERIC_TYPE(pColInfoData->info.type)) { + pDataBlock->info.pks[0].val = *(int64_t*) skey; + pDataBlock->info.pks[1].val = *(int64_t*) ekey; + } else { // todo refactor + memcpy(pDataBlock->info.pks[0].pData, varDataVal(skey), varDataLen(skey)); + pDataBlock->info.pks[0].nData = varDataLen(skey); + + memcpy(pDataBlock->info.pks[1].pData, varDataVal(ekey), varDataLen(ekey)); + pDataBlock->info.pks[1].nData = varDataLen(ekey); + } + } else { + if (IS_NUMERIC_TYPE(pColInfoData->info.type)) { + pDataBlock->info.pks[0].val = *(int64_t*) ekey; + pDataBlock->info.pks[1].val = *(int64_t*) skey; + } else { // todo refactor + memcpy(pDataBlock->info.pks[0].pData, varDataVal(ekey), varDataLen(ekey)); + pDataBlock->info.pks[0].nData = varDataLen(ekey); + + memcpy(pDataBlock->info.pks[1].pData, varDataVal(skey), varDataLen(skey)); + pDataBlock->info.pks[1].nData = varDataLen(skey); + } + } + + return 0; +} + int32_t blockDataMerge(SSDataBlock* pDest, const SSDataBlock* pSrc) { int32_t capacity = pDest->info.capacity; diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 4973fb5d02..654e765ef7 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -110,6 +110,10 @@ static int32_t pkComp2(STsdbReader* pReader, STsdbRowKey* p1, STsdbRowKey* p2) { static int32_t setColumnIdSlotList(SBlockLoadSuppInfo* pSupInfo, SColumnInfo* pCols, const int32_t* pSlotIdList, int32_t numOfCols) { + pSupInfo->pk.pk = 0; + pSupInfo->numOfPks = 0; + pSupInfo->pk.slotId = -1; + pSupInfo->smaValid = true; pSupInfo->numOfCols = numOfCols; pSupInfo->colId = taosMemoryMalloc(numOfCols * (sizeof(int16_t) * 2 + POINTER_BYTES)); @@ -131,7 +135,8 @@ static int32_t setColumnIdSlotList(SBlockLoadSuppInfo* pSupInfo, SColumnInfo* pC } if (pCols[i].pk) { - pSupInfo->pkSlotId = pCols[i].slotId; + pSupInfo->pk = pCols[i]; + pSupInfo->numOfPks += 1; } } @@ -201,7 +206,7 @@ static int32_t initFilesetIterator(SFilesetIter* pIter, TFileSetArray* pFileSetA pLReader->order = pReader->info.order; pLReader->window = pReader->info.window; pLReader->verRange = pReader->info.verRange; - pLReader->numOfPks = pReader->numOfPks; + pLReader->numOfPks = pReader->suppInfo.numOfPks; pLReader->uid = 0; tMergeTreeClose(&pLReader->mergeTree); @@ -434,8 +439,6 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, void pReader->type = pCond->type; pReader->info.window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows); pReader->blockInfoBuf.numPerBucket = 1000; // 1000 tables per bucket - pReader->numOfPks = -1; - pReader->pkChecked = false; code = initResBlockInfo(&pReader->resBlockInfo, capacity, pResBlock, pCond); if (code != TSDB_CODE_SUCCESS) { @@ -453,6 +456,10 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, void pSup->tsColAgg.colId = PRIMARYKEY_TIMESTAMP_COL_ID; setColumnIdSlotList(pSup, pCond->colList, pCond->pSlotList, pCond->numOfCols); + if (pSup->numOfPks > 0) { + pReader->pkComparFn = getComparFunc(pSup->pk.type, 0); + } + code = tBlockDataCreate(&pReader->status.fileBlockData); if (code != TSDB_CODE_SUCCESS) { terrno = code; @@ -1523,12 +1530,6 @@ static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SSttBlockReader* pSttB TSDBROW* pNextRow = tMergeTreeGetRow(&pSttBlockReader->mergeTree); tsdbRowGetKey(pNextRow, &nextKey); - if (!pReader->pkChecked) { - pReader->pkComparFn = getComparFunc(pSttKey->key.pks[0].type, 0); - pReader->pkChecked = true; - pReader->numOfPks = pSttKey->key.numOfPKs; - } - if (pkCompEx(pReader->pkComparFn, pSttKey, &nextKey) != 0) { code = doAppendRowFromFileBlock(pReader->resBlockInfo.pResBlock, pReader, fRow->pBlockData, fRow->iRow); if (code) { @@ -1782,7 +1783,7 @@ static int32_t mergeFileBlockAndSttBlock(STsdbReader* pReader, SSttBlockReader* } // key == tsLast. ts is equal and the primary key exists - if (pReader->numOfPks > 0) { + if (pReader->suppInfo.numOfPks > 0) { int32_t res = pkComp1(pReader, pSttKey, &fRow); if (res < 0) { return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader); @@ -2277,7 +2278,7 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc } if (copied) { - if (pReader->numOfPks == 0) { + if (pReader->suppInfo.numOfPks == 0) { pBlockScanInfo->lastProcKey.key.ts = key; } else { // todo use deep copy instead of shallow copy int32_t step = ASCENDING_TRAVERSE(pReader->info.order)? 1 : -1; @@ -2438,6 +2439,7 @@ void updateComposedBlockInfo(STsdbReader* pReader, double el, STableBlockScanInf pResBlock->info.version = pReader->info.verRange.maxVer; blockDataUpdateTsWindow(pResBlock, pReader->suppInfo.slotId[0]); + blockDataUpdatePkRange(pResBlock, pReader->suppInfo.pk.slotId, ASCENDING_TRAVERSE(pReader->info.order)); setComposedBlockFlag(pReader, true); // todo update the pk range for current return data block @@ -2810,20 +2812,30 @@ static void buildCleanBlockFromDataFiles(STsdbReader* pReader, STableBlockScanIn pInfo->version = pReader->info.verRange.maxVer; pInfo->window = (STimeWindow){.skey = pBlockInfo->firstKey, .ekey = pBlockInfo->lastKey}; + if (pReader->suppInfo.pk.pk) { + if (IS_NUMERIC_TYPE(pReader->suppInfo.pk.type)) { + pInfo->pks[0].val = pBlockInfo->firstPk.val; + pInfo->pks[1].val = pBlockInfo->lastPk.val; + } else { + memcpy(pInfo->pks[0].pData, pBlockInfo->firstPk.pData, pBlockInfo->firstPKLen); + memcpy(pInfo->pks[1].pData, pBlockInfo->lastPk.pData, pBlockInfo->lastPKLen); + + pInfo->pks[0].nData = pBlockInfo->firstPKLen; + pInfo->pks[1].nData = pBlockInfo->lastPKLen; + } + } + setComposedBlockFlag(pReader, false); setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlockInfo->lastKey, pReader->info.order); // update the last key for the corresponding table SRowKey* pKey = &pScanInfo->lastProcKey.key; pKey->ts = asc ? pInfo->window.ekey : pInfo->window.skey; - pKey->numOfPKs = pReader->numOfPks; + pKey->numOfPKs = pReader->suppInfo.numOfPks; // todo opt allocation, and handle varchar primary key pKey->pks[0].val = asc ? pBlockInfo->lastPk.val : pBlockInfo->firstPk.val; - pInfo->pks[0].val = pBlockInfo->firstPk.val; - pInfo->pks[1].val = pBlockInfo->lastPk.val; - tsdbDebug("%p uid:%" PRIu64 " clean file block retrieved from file, global index:%d, " "table index:%d, rows:%d, brange:%" PRId64 "-%" PRId64 ", %s", @@ -3482,14 +3494,6 @@ TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* p int32_t order = pReader->info.order; TSDBROW* pRow = tsdbTbDataIterGet(pIter->iter); - if (!pReader->pkChecked) { - STsdbRowKey k; - tsdbRowGetKey(pRow, &k); - - pReader->pkComparFn = getComparFunc(k.key.pks[0].type, 0); - pReader->pkChecked = true; - } - TSDBKEY key = TSDBROW_KEY(pRow); if (outOfTimeWindow(key.ts, &pReader->info.window)) { pIter->hasVal = false; diff --git a/source/dnode/vnode/src/tsdb/tsdbReadUtil.c b/source/dnode/vnode/src/tsdb/tsdbReadUtil.c index e843216172..4afacea145 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbReadUtil.c @@ -372,12 +372,6 @@ static void recordToBlockInfo(SFileDataBlockInfo* pBlockInfo, SBrinRecord* recor pBlockInfo->count = record->count; SRowKey* pFirstKey = &record->firstKey.key; - if (!pReader->pkChecked) { - pReader->pkChecked = true; - pReader->numOfPks = pFirstKey->numOfPKs; - pReader->pkComparFn = getComparFunc(pFirstKey->pks[0].type, 0); - } - if (pFirstKey->numOfPKs > 0) { if (IS_NUMERIC_TYPE(pFirstKey->pks[0].type)) { pBlockInfo->firstPk.val = pFirstKey->pks[0].val; @@ -404,8 +398,6 @@ int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int3 pBlockIter->numOfBlocks = numOfBlocks; taosArrayClear(pBlockIter->blockList); - pBlockIter->pTableMap = pReader->status.pTableMap; - // access data blocks according to the offset of each block in asc/desc order. int32_t numOfTables = taosArrayGetSize(pTableList); diff --git a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h index 274a9fa20a..738b508206 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h +++ b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h @@ -157,9 +157,10 @@ typedef struct SBlockLoadSuppInfo { SColumnDataAgg tsColAgg; int16_t* colId; int16_t* slotId; - int32_t numOfCols; char** buildBuf; // build string tmp buffer, todo remove it later after all string format being updated. - int16_t pkSlotId; + int32_t numOfCols; + int32_t numOfPks; + SColumnInfo pk; bool smaValid; // the sma on all queried columns are activated } SBlockLoadSuppInfo; @@ -218,7 +219,6 @@ typedef struct SDataBlockIter { SArray* blockList; // SArray int32_t order; SDataBlk block; // current SDataBlk data - SSHashObj* pTableMap; } SDataBlockIter; typedef struct SFileBlockDumpInfo { @@ -281,8 +281,6 @@ struct STsdbReader { TsdReaderNotifyCbFn notifyFn; void* notifyParam; __compar_fn_t pkComparFn; - int32_t numOfPks; - bool pkChecked; }; typedef struct SBrinRecordIter {