From a8a0e3edf8b6a6303a0ea901b392a7b42b0cbeea Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 22 Mar 2024 14:47:00 +0800 Subject: [PATCH] refactor: do some internal refactor. --- source/dnode/vnode/src/tsdb/tsdbRead2.c | 37 +++++------ source/dnode/vnode/src/tsdb/tsdbReadUtil.c | 75 ++++++++++++---------- source/dnode/vnode/src/tsdb/tsdbReadUtil.h | 4 +- source/dnode/vnode/src/tsdb/tsdbUtil.c | 16 +++-- 4 files changed, 69 insertions(+), 63 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 6d6af90ef7..9f48f330cb 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -2315,6 +2315,7 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc } if (copied) { + tRowKeyAssign(&pBlockScanInfo->lastProcKey, pKey); return TSDB_CODE_SUCCESS; } else { TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); @@ -2822,10 +2823,19 @@ static void buildCleanBlockFromSttFiles(STsdbReader* pReader, STableBlockScanInf pScanInfo->sttKeyInfo.nextProcKey = asc ? pScanInfo->sttWindow.ekey + 1 : pScanInfo->sttWindow.skey - 1; pScanInfo->sttKeyInfo.status = STT_FILE_NO_DATA; - ASSERT(0); pScanInfo->lastProcKey.ts = asc ? pScanInfo->sttWindow.ekey : pScanInfo->sttWindow.skey; - pScanInfo->sttBlockReturned = true; + if (pScanInfo->lastProcKey.numOfPKs > 0) { + ASSERT(0); +// if (IS_NUMERIC_TYPE(pKey->pks[0].type)) { +// pKey->pks[0].val = asc ? pBlockInfo->lastPk.val : pBlockInfo->firstPk.val; +// } else { +// uint8_t* p = asc ? pBlockInfo->lastPk.pData : pBlockInfo->firstPk.pData; +// pKey->pks[0].nData = asc ? pBlockInfo->lastPKLen : pBlockInfo->firstPKLen; +// memcpy(pKey->pks[0].pData, p, pKey->pks[0].nData); +// } + } + pScanInfo->sttBlockReturned = true; pSttBlockReader->mergeTree.pIter = NULL; tsdbDebug("%p uid:%" PRId64 " return clean stt block as one, brange:%" PRId64 "-%" PRId64 " rows:%" PRId64 " %s", @@ -4100,25 +4110,10 @@ int32_t tsdbSetTableList2(STsdbReader* pReader, const void* pTableList, int32_t STableKeyInfo* pList = (STableKeyInfo*)pTableList; for (int32_t i = 0; i < num; ++i) { - STableBlockScanInfo* pInfo = getPosInBlockInfoBuf(&pReader->blockInfoBuf, i); - pInfo->uid = pList[i].uid; pUidList->tableUidList[i] = pList[i].uid; - // todo extract method - if (ASCENDING_TRAVERSE(pReader->info.order)) { - int64_t skey = pReader->info.window.skey; - ASSERT(0); -// pInfo->lastProcKey = (skey > INT64_MIN) ? (skey - 1) : skey; - pInfo->sttKeyInfo.nextProcKey = skey; - } else { - int64_t ekey = pReader->info.window.ekey; - ASSERT(0); -// pInfo->lastProcKey = (ekey < INT64_MAX) ? (ekey + 1) : ekey; - pInfo->sttKeyInfo.nextProcKey = ekey; - } - - pInfo->sttKeyInfo.status = STT_FILE_READER_UNINIT; - tSimpleHashPut(pReader->status.pTableMap, &pInfo->uid, sizeof(uint64_t), &pInfo, POINTER_BYTES); + STableBlockScanInfo* pInfo = getPosInBlockInfoBuf(&pReader->blockInfoBuf, i); + initTableBlockScanInfo(pInfo, pList[i].uid, pReader->status.pTableMap, pReader); } return TDB_CODE_SUCCESS; @@ -4155,8 +4150,6 @@ static int32_t doOpenReaderImpl(STsdbReader* pReader) { int32_t code = TSDB_CODE_SUCCESS; if (pStatus->fileIter.numOfFiles == 0) { pStatus->loadFromFile = false; - // } else if (READER_EXEC_DATA == pReader->info.readMode) { - // DO NOTHING } else { code = initForFirstBlockInFile(pReader, pBlockIter); } @@ -4429,7 +4422,7 @@ static int32_t doSuspendCurrentReader(STsdbReader* pCurrentReader) { while ((p = tSimpleHashIterate(pStatus->pTableMap, p, &iter)) != NULL) { STableBlockScanInfo* pInfo = *(STableBlockScanInfo**)p; clearBlockScanInfo(pInfo); - ASSERT(0); +// pInfo->sttKeyInfo.nextProcKey = pInfo->lastProcKey.ts + step; // pInfo->sttKeyInfo.nextProcKey = pInfo->lastProcKey + step; } diff --git a/source/dnode/vnode/src/tsdb/tsdbReadUtil.c b/source/dnode/vnode/src/tsdb/tsdbReadUtil.c index 05795b5832..c7077a2e71 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbReadUtil.c @@ -130,6 +130,46 @@ STableBlockScanInfo* getTableBlockScanInfo(SSHashObj* pTableMap, uint64_t uid, c return *p; } +static void initLastProcKey(STableBlockScanInfo *pScanInfo, STsdbReader* pReader) { + SRowKey* pRowKey = &pScanInfo->lastProcKey; + if (ASCENDING_TRAVERSE(pReader->info.order)) { + int64_t skey = pReader->info.window.skey; + pRowKey->ts = (skey > INT64_MIN) ? (skey - 1) : skey; + pScanInfo->sttKeyInfo.nextProcKey = skey; + } else { + int64_t ekey = pReader->info.window.ekey; + pRowKey->ts = (ekey < INT64_MAX) ? (ekey + 1) : ekey; + pScanInfo->sttKeyInfo.nextProcKey = ekey; + } + + // only handle the first primary key. + pRowKey->numOfPKs = pReader->suppInfo.numOfPks; + if (pReader->suppInfo.numOfPks > 0) { + if (IS_VAR_DATA_TYPE(pReader->suppInfo.pk.type)) { + pRowKey->pks[0].pData = taosMemoryCalloc(1, pReader->suppInfo.pk.bytes); + } + pRowKey->pks[0].type = pReader->suppInfo.pk.type; + } +} + +int32_t initTableBlockScanInfo(STableBlockScanInfo* pScanInfo, uint64_t uid, SSHashObj* pTableMap, + STsdbReader* pReader) { + pScanInfo->uid = uid; + INIT_TIMEWINDOW(&pScanInfo->sttWindow); + INIT_TIMEWINDOW(&pScanInfo->filesetWindow); + + pScanInfo->cleanSttBlocks = false; + pScanInfo->sttBlockReturned = false; + + initLastProcKey(pScanInfo, pReader); + + pScanInfo->sttKeyInfo.status = STT_FILE_READER_UNINIT; + tSimpleHashPut(pTableMap, &pScanInfo->uid, sizeof(uint64_t), &pScanInfo, POINTER_BYTES); + tsdbTrace("%p check table uid:%" PRId64 " from lastKey:%" PRId64 " %s", pReader, pScanInfo->uid, + pScanInfo->lastProcKey.ts, pReader->idStr); + return TSDB_CODE_SUCCESS; +} + // NOTE: speedup the whole processing by preparing the buffer for STableBlockScanInfo in batch model SSHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, SBlockInfoBuf* pBuf, const STableKeyInfo* idList, STableUidList* pUidList, int32_t numOfTables) { @@ -152,41 +192,10 @@ SSHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, SBlockInfoBuf* pBuf pUidList->currentIndex = 0; for (int32_t j = 0; j < numOfTables; ++j) { - STableBlockScanInfo* pScanInfo = getPosInBlockInfoBuf(pBuf, j); - - pScanInfo->uid = idList[j].uid; - INIT_TIMEWINDOW(&pScanInfo->sttWindow); - INIT_TIMEWINDOW(&pScanInfo->filesetWindow); - - pScanInfo->cleanSttBlocks = false; - pScanInfo->sttBlockReturned = false; - pUidList->tableUidList[j] = idList[j].uid; - SRowKey* pRowKey = &pScanInfo->lastProcKey; - if (ASCENDING_TRAVERSE(pTsdbReader->info.order)) { - int64_t skey = pTsdbReader->info.window.skey; - pRowKey->ts = (skey > INT64_MIN) ? (skey - 1) : skey; - pScanInfo->sttKeyInfo.nextProcKey = skey; - } else { - int64_t ekey = pTsdbReader->info.window.ekey; - pRowKey->ts = (ekey < INT64_MAX) ? (ekey + 1) : ekey; - pScanInfo->sttKeyInfo.nextProcKey = ekey; - } - - // only handle the first primary key. - pRowKey->numOfPKs = pTsdbReader->suppInfo.numOfPks; - if (pTsdbReader->suppInfo.numOfPks > 0) { - if (IS_VAR_DATA_TYPE(pTsdbReader->suppInfo.pk.type)) { - pRowKey->pks[0].pData = taosMemoryCalloc(1, pTsdbReader->suppInfo.pk.bytes); - } - pRowKey->pks[0].type = pTsdbReader->suppInfo.pk.type; - } - - pScanInfo->sttKeyInfo.status = STT_FILE_READER_UNINIT; - tSimpleHashPut(pTableMap, &pScanInfo->uid, sizeof(uint64_t), &pScanInfo, POINTER_BYTES); - tsdbTrace("%p check table uid:%" PRId64 " from lastKey:%" PRId64 " %s", pTsdbReader, pScanInfo->uid, pRowKey->ts, - pTsdbReader->idStr); + STableBlockScanInfo* pScanInfo = getPosInBlockInfoBuf(pBuf, j); + initTableBlockScanInfo(pScanInfo, idList[j].uid, pTableMap, pTsdbReader); } taosSort(pUidList->tableUidList, numOfTables, sizeof(uint64_t), uidComparFunc); diff --git a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h index 56e6ca9ba7..4131bb1b86 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h +++ b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h @@ -298,8 +298,10 @@ int32_t uidComparFunc(const void* p1, const void* p2); STableBlockScanInfo* getTableBlockScanInfo(SSHashObj* pTableMap, uint64_t uid, const char* id); -SSHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, SBlockInfoBuf* pBuf, const STableKeyInfo* idList, +SSHashObj* createDataBlockScanInfo(STsdbReader* pReader, SBlockInfoBuf* pBuf, const STableKeyInfo* idList, STableUidList* pUidList, int32_t numOfTables); +int32_t initTableBlockScanInfo(STableBlockScanInfo* pScanInfo, uint64_t uid, SSHashObj* pTableMap, + STsdbReader* pReader); void clearBlockScanInfo(STableBlockScanInfo* p); void destroyAllBlockScanInfo(SSHashObj* pTableMap); void resetAllDataBlockScanInfo(SSHashObj* pTableMap, int64_t ts, int32_t step); diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index 550bedf173..fe7a3457bd 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -629,18 +629,20 @@ void tColRowGetKey(SBlockData* pBlock, int32_t irow, SRowKey* key) { } int32_t tRowKeyAssign(SRowKey *pDst, SRowKey* pSrc) { - if (pSrc->numOfPKs == 0) { - pDst->ts = pSrc->ts; - pDst->numOfPKs = 0; - } else { - *pDst = *pSrc; + pDst->ts = pSrc->ts; + pDst->numOfPKs = pSrc->numOfPKs; + if (pSrc->numOfPKs > 0) { for (int32_t i = 0; i < pDst->numOfPKs; ++i) { SValue *pVal = &pDst->pks[i]; + pVal->type = pSrc->pks[i].type; + if (IS_NUMERIC_TYPE(pVal->type)) { - continue; + pVal->val = pSrc->pks[i].val; + } else { + memcpy(pVal->pData, pVal->pData, pVal->nData); + pVal->nData = pSrc->pks[i].nData; } - memcpy(pVal->pData, pVal->pData, pVal->nData); } }