From 224f5a72c255d233716379a33e74b7544b023894 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 23 Dec 2022 17:45:12 +0800 Subject: [PATCH] refactor: do some internal refactor. --- source/dnode/vnode/inc/vnode.h | 2 +- source/dnode/vnode/src/inc/tsdb.h | 20 ++++++++-------- source/dnode/vnode/src/tsdb/tsdbCache.c | 24 -------------------- source/dnode/vnode/src/tsdb/tsdbCacheRead.c | 23 +++++++++++-------- source/libs/executor/src/cachescanoperator.c | 4 ++-- 5 files changed, 26 insertions(+), 47 deletions(-) diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 540f0c3127..dad8e1191d 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -185,7 +185,7 @@ void *tsdbGetIvtIdx(SMeta *pMeta); uint64_t getReaderMaxVersion(STsdbReader *pReader); int32_t tsdbCacherowsReaderOpen(void *pVnode, int32_t type, void *pTableIdList, int32_t numOfTables, int32_t numOfCols, - uint64_t suid, void **pReader); + uint64_t suid, void **pReader, const char* idstr); int32_t tsdbRetrieveCacheRows(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds, SArray *pTableUids); void *tsdbCacherowsReaderClose(void *pReader); int32_t tsdbGetTableSchema(SVnode *pVnode, int64_t uid, STSchema **pSchema, int64_t *suid); diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 7fb962e3a7..5a63af41af 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -715,21 +715,21 @@ void *destroyLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo); // tsdbCache ============================================================================================== typedef struct SCacheRowsReader { - SVnode *pVnode; - STSchema *pSchema; - uint64_t uid; - uint64_t suid; - char **transferBuf; // todo remove it soon - int32_t numOfCols; - int32_t type; - int32_t tableIndex; // currently returned result tables - + SVnode *pVnode; + STSchema *pSchema; + uint64_t uid; + uint64_t suid; + char **transferBuf; // todo remove it soon + int32_t numOfCols; + int32_t type; + int32_t tableIndex; // currently returned result tables STableKeyInfo *pTableList; // table id list int32_t numOfTables; SSttBlockLoadInfo *pLoadInfo; STsdbReadSnap *pReadSnap; SDataFReader *pDataFReader; SDataFReader *pDataFReaderLast; + const char *idstr; } SCacheRowsReader; typedef struct { @@ -752,8 +752,6 @@ int32_t tsdbCacheDelete(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey); void tsdbCacheSetCapacity(SVnode *pVnode, size_t capacity); size_t tsdbCacheGetCapacity(SVnode *pVnode); -int32_t tsdbCacheLastArray2Row(SArray *pLastArray, STSRow **ppRow, STSchema *pSchema); - // ========== inline functions ========== static FORCE_INLINE int32_t tsdbKeyCmprFn(const void *p1, const void *p2) { TSDBKEY *pKey1 = (TSDBKEY *)p1; diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 5b09ce5eb6..6a82517067 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -1406,30 +1406,6 @@ int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, SCacheRowsReader * return code; } -int32_t tsdbCacheLastArray2Row(SArray *pLastArray, STSRow **ppRow, STSchema *pTSchema) { - int32_t code = 0; - int16_t nCol = taosArrayGetSize(pLastArray); - SArray *pColArray = taosArrayInit(nCol, sizeof(SColVal)); - - for (int16_t iCol = 0; iCol < nCol; ++iCol) { - SLastCol *tTsVal = (SLastCol *)taosArrayGet(pLastArray, iCol); - SColVal *tColVal = &tTsVal->colVal; - taosArrayPush(pColArray, tColVal); - } - - code = tdSTSRowNew(pColArray, pTSchema, ppRow); - if (code) goto _err; - - taosArrayDestroy(pColArray); - - return code; - -_err: - taosArrayDestroy(pColArray); - - return code; -} - int32_t tsdbCacheGetLastH(SLRUCache *pCache, tb_uid_t uid, SCacheRowsReader *pr, LRUHandle **handle) { int32_t code = 0; char key[32] = {0}; diff --git a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c index b87e5d5503..f05f5d5c88 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c @@ -20,9 +20,8 @@ #define HASTYPE(_type, _t) (((_type) & (_t)) == (_t)) -static void saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* pReader, const int32_t* slotIds, - void** pRes) { - ASSERT(pReader->numOfCols <= taosArrayGetSize(pBlock->pDataBlock)); +static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* pReader, const int32_t* slotIds, + void** pRes, const char* idStr) { int32_t numOfRows = pBlock->info.rows; if (HASTYPE(pReader->type, CACHESCAN_RETRIEVE_LAST)) { @@ -65,9 +64,7 @@ static void saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* pRea } pBlock->info.rows += allNullRow ? 0 : 1; - } else { - ASSERT(HASTYPE(pReader->type, CACHESCAN_RETRIEVE_LAST_ROW)); - + } else if (HASTYPE(pReader->type, CACHESCAN_RETRIEVE_LAST_ROW)) { for (int32_t i = 0; i < pReader->numOfCols; ++i) { SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); @@ -94,11 +91,16 @@ static void saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* pRea } pBlock->info.rows += 1; + } else { + tsdbError("invalid retrieve type:%d, %s", pReader->type, idStr); + return TSDB_CODE_INVALID_PARA; } + + return TSDB_CODE_SUCCESS; } int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList, int32_t numOfTables, int32_t numOfCols, - uint64_t suid, void** pReader) { + uint64_t suid, void** pReader, const char* idstr) { *pReader = NULL; SCacheRowsReader* p = taosMemoryCalloc(1, sizeof(SCacheRowsReader)); if (p == NULL) { @@ -142,6 +144,8 @@ int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList, return TSDB_CODE_OUT_OF_MEMORY; } + p->idstr = taosMemoryStrDup(idstr); + *pReader = p; return TSDB_CODE_SUCCESS; } @@ -160,6 +164,7 @@ void* tsdbCacherowsReaderClose(void* pReader) { destroyLastBlockLoadInfo(p->pLoadInfo); + taosMemoryFree((void*) p->idstr); taosMemoryFree(pReader); return NULL; } @@ -308,7 +313,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 } if (hasRes) { - saveOneRow(pLastCols, pResBlock, pr, slotIds, pRes); + saveOneRow(pLastCols, pResBlock, pr, slotIds, pRes, pr->idstr); } } else if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_TYPE_ALL)) { @@ -323,7 +328,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 continue; } - saveOneRow(pRow, pResBlock, pr, slotIds, pRes); + saveOneRow(pRow, pResBlock, pr, slotIds, pRes, pr->idstr); // TODO reset the pRes taosArrayPush(pTableUidList, &pKeyInfo->uid); diff --git a/source/libs/executor/src/cachescanoperator.c b/source/libs/executor/src/cachescanoperator.c index 672bb09b14..294424746a 100644 --- a/source/libs/executor/src/cachescanoperator.c +++ b/source/libs/executor/src/cachescanoperator.c @@ -90,7 +90,7 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe uint64_t suid = tableListGetSuid(pTableList); code = tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pList, totalTables, - taosArrayGetSize(pInfo->matchInfo.pList), suid, &pInfo->pLastrowReader); + taosArrayGetSize(pInfo->matchInfo.pList), suid, &pInfo->pLastrowReader, pTaskInfo->id.str); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -216,7 +216,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { } tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pList, num, - taosArrayGetSize(pInfo->matchInfo.pList), suid, &pInfo->pLastrowReader); + taosArrayGetSize(pInfo->matchInfo.pList), suid, &pInfo->pLastrowReader, pTaskInfo->id.str); taosArrayClear(pInfo->pUidList); code = tsdbRetrieveCacheRows(pInfo->pLastrowReader, pInfo->pRes, pInfo->pSlotIds, pInfo->pUidList);