From 91f9b58f9a7f2f9f95397ff04f4ad02aa5376fa4 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 27 Apr 2023 16:06:06 +0800 Subject: [PATCH] refactor: do some internal refactor. --- source/dnode/vnode/inc/vnode.h | 37 +++++++++++-------------- source/dnode/vnode/src/tq/tq.c | 4 +-- source/dnode/vnode/src/tq/tqMeta.c | 4 +-- source/dnode/vnode/src/tq/tqRead.c | 4 +-- source/dnode/vnode/src/tq/tqScan.c | 4 +-- source/libs/executor/src/scanoperator.c | 6 ++-- 6 files changed, 27 insertions(+), 32 deletions(-) diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 3bcb1c9d2e..002dcda488 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -175,11 +175,8 @@ typedef struct STsdbReader STsdbReader; #define CACHESCAN_RETRIEVE_LAST_ROW 0x4 #define CACHESCAN_RETRIEVE_LAST 0x8 -int32_t tsdbSetTableList(STsdbReader *pReader, const void *pTableList, int32_t num); -int32_t tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, void *pTableList, int32_t numOfTables, - SSDataBlock *pResBlock, STsdbReader **ppReader, const char *idstr, bool countOnly); - -void tsdbReaderSetId(STsdbReader* pReader, const char* idstr); +int32_t tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, void *pTableList, int32_t numOfTables, + SSDataBlock *pResBlock, STsdbReader **ppReader, const char *idstr, bool countOnly); void tsdbReaderClose(STsdbReader *pReader); int32_t tsdbNextDataBlock(STsdbReader *pReader, bool *hasNext); int32_t tsdbRetrieveDatablockSMA(STsdbReader *pReader, SSDataBlock *pDataBlock, bool *allHave); @@ -190,7 +187,10 @@ int32_t tsdbGetFileBlocksDistInfo(STsdbReader *pReader, STableBlockDistInfo int64_t tsdbGetNumOfRowsInMemTable(STsdbReader *pHandle); void *tsdbGetIdx(SMeta *pMeta); void *tsdbGetIvtIdx(SMeta *pMeta); -uint64_t getReaderMaxVersion(STsdbReader *pReader); +uint64_t tsdbGetReaderMaxVersion(STsdbReader *pReader); +int32_t tsdbSetTableList(STsdbReader *pReader, const void *pTableList, int32_t num); +void tsdbReaderSetId(STsdbReader *pReader, const char *idstr); +void tsdbReaderSetCloseFlag(STsdbReader *pReader); int32_t tsdbCacherowsReaderOpen(void *pVnode, int32_t type, void *pTableIdList, int32_t numOfTables, int32_t numOfCols, uint64_t suid, void **pReader, const char *idstr); @@ -231,26 +231,21 @@ typedef struct SSnapContext { } SSnapContext; typedef struct STqReader { - SPackedData msg2; - - SSubmitReq2 submit; - int32_t nextBlk; - - int64_t lastBlkUid; - - SWalReader *pWalReader; - - SMeta *pVnodeMeta; - SHashObj *tbIdHash; - SArray *pColIdList; // SArray - + SPackedData msg2; + SSubmitReq2 submit; + int32_t nextBlk; + int64_t lastBlkUid; + SWalReader *pWalReader; + SMeta *pVnodeMeta; + SHashObj *tbIdHash; + SArray *pColIdList; // SArray int32_t cachedSchemaVer; int64_t cachedSchemaSuid; SSchemaWrapper *pSchemaWrapper; STSchema *pSchema; } STqReader; -STqReader *tqOpenReader(SVnode *pVnode); +STqReader *tqReaderOpen(SVnode *pVnode); void tqCloseReader(STqReader *); void tqReaderSetColIdList(STqReader *pReader, SArray *pColIdList); @@ -266,7 +261,7 @@ int32_t tqReaderSetSubmitMsg(STqReader *pReader, void *msgStr, int32_t msgLen, i bool tqNextBlockImpl(STqReader *pReader); bool tqNextDataBlockFilterOut(STqReader *pReader, SHashObj *filterOutUids); int32_t tqRetrieveDataBlock(SSDataBlock *pBlock, STqReader *pReader, SSubmitTbData **pSubmitTbDataRet); -int32_t tqRetrieveTaosxBlock2(STqReader *pReader, SArray *blocks, SArray *schemas, SSubmitTbData **pSubmitTbDataRet); +int32_t tqRetrieveTaosxBlock(STqReader *pReader, SArray *blocks, SArray *schemas, SSubmitTbData **pSubmitTbDataRet); int32_t vnodeEnqueueStreamMsg(SVnode *pVnode, SRpcMsg *pMsg); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index ae52db163f..7314e23145 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -504,7 +504,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg pHandle->execHandle.pTqReader = qExtractReaderFromStreamScanner(scanner); } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) { pHandle->pWalReader = walOpenReader(pVnode->pWal, NULL); - pHandle->execHandle.pTqReader = tqOpenReader(pVnode); + pHandle->execHandle.pTqReader = tqReaderOpen(pVnode); pHandle->execHandle.execDb.pFilterOutTbUid = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); @@ -523,7 +523,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i); tqDebug("vgId:%d, idx %d, uid:%" PRId64, vgId, i, tbUid); } - pHandle->execHandle.pTqReader = tqOpenReader(pVnode); + pHandle->execHandle.pTqReader = tqReaderOpen(pVnode); tqReaderSetTbUidList(pHandle->execHandle.pTqReader, tbUidList); taosArrayDestroy(tbUidList); diff --git a/source/dnode/vnode/src/tq/tqMeta.c b/source/dnode/vnode/src/tq/tqMeta.c index cd8cefb307..f3ecaa08f6 100644 --- a/source/dnode/vnode/src/tq/tqMeta.c +++ b/source/dnode/vnode/src/tq/tqMeta.c @@ -328,7 +328,7 @@ int32_t tqMetaRestoreHandle(STQ* pTq) { } } else if (handle.execHandle.subType == TOPIC_SUB_TYPE__DB) { handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL); - handle.execHandle.pTqReader = tqOpenReader(pTq->pVnode); + handle.execHandle.pTqReader = tqReaderOpen(pTq->pVnode); buildSnapContext(reader.meta, reader.version, 0, handle.execHandle.subType, handle.fetchMeta, (SSnapContext**)(&reader.sContext)); @@ -343,7 +343,7 @@ int32_t tqMetaRestoreHandle(STQ* pTq) { int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i); tqDebug("vgId:%d, idx %d, uid:%" PRId64, vgId, i, tbUid); } - handle.execHandle.pTqReader = tqOpenReader(pTq->pVnode); + handle.execHandle.pTqReader = tqReaderOpen(pTq->pVnode); tqReaderSetTbUidList(handle.execHandle.pTqReader, tbUidList); taosArrayDestroy(tbUidList); diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 0e9caf24ae..ead00dcc35 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -249,7 +249,7 @@ END: return code; } -STqReader* tqOpenReader(SVnode* pVnode) { +STqReader* tqReaderOpen(SVnode* pVnode) { STqReader* pReader = taosMemoryCalloc(1, sizeof(STqReader)); if (pReader == NULL) { return NULL; @@ -653,7 +653,7 @@ FAIL: return -1; } -int32_t tqRetrieveTaosxBlock2(STqReader* pReader, SArray* blocks, SArray* schemas, SSubmitTbData** pSubmitTbDataRet) { +int32_t tqRetrieveTaosxBlock(STqReader* pReader, SArray* blocks, SArray* schemas, SSubmitTbData** pSubmitTbDataRet) { tqDebug("tq reader retrieve data block %p, %d", pReader->msg2.msgStr, pReader->nextBlk); SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk); diff --git a/source/dnode/vnode/src/tq/tqScan.c b/source/dnode/vnode/src/tq/tqScan.c index e049e2d390..0f00a5acb8 100644 --- a/source/dnode/vnode/src/tq/tqScan.c +++ b/source/dnode/vnode/src/tq/tqScan.c @@ -207,7 +207,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR taosArrayClear(pBlocks); taosArrayClear(pSchemas); SSubmitTbData* pSubmitTbDataRet = NULL; - if (tqRetrieveTaosxBlock2(pReader, pBlocks, pSchemas, &pSubmitTbDataRet) < 0) { + if (tqRetrieveTaosxBlock(pReader, pBlocks, pSchemas, &pSubmitTbDataRet) < 0) { if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue; } if (pRsp->withTbName) { @@ -266,7 +266,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR taosArrayClear(pBlocks); taosArrayClear(pSchemas); SSubmitTbData* pSubmitTbDataRet = NULL; - if (tqRetrieveTaosxBlock2(pReader, pBlocks, pSchemas, &pSubmitTbDataRet) < 0) { + if (tqRetrieveTaosxBlock(pReader, pBlocks, pSchemas, &pSubmitTbDataRet) < 0) { if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue; } if (pRsp->withTbName) { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 8d9d1bb887..0f583c021d 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1855,7 +1855,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { SSDataBlock* pSDB = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex); if (pSDB) { STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info; - uint64_t version = getReaderMaxVersion(pTableScanInfo->base.dataReader); + uint64_t version = tsdbGetReaderMaxVersion(pTableScanInfo->base.dataReader); updateInfoSetScanRange(pInfo->pUpdateInfo, &pTableScanInfo->base.cond.twindows, pInfo->groupId, version); pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA; checkUpdateData(pInfo, true, pSDB, false); @@ -2021,7 +2021,7 @@ FETCH_NEXT_BLOCK: SSDataBlock* pSDB = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex); if (pSDB) { STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info; - uint64_t version = getReaderMaxVersion(pTableScanInfo->base.dataReader); + uint64_t version = tsdbGetReaderMaxVersion(pTableScanInfo->base.dataReader); updateInfoSetScanRange(pInfo->pUpdateInfo, &pTableScanInfo->base.cond.twindows, pInfo->groupId, version); pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA; checkUpdateData(pInfo, true, pSDB, false); @@ -2426,7 +2426,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys if (pHandle->initTqReader) { ASSERT(pHandle->tqReader == NULL); - pInfo->tqReader = tqOpenReader(pHandle->vnode); + pInfo->tqReader = tqReaderOpen(pHandle->vnode); ASSERT(pInfo->tqReader); } else { ASSERT(pHandle->tqReader);