diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index a9e5fe628b..93bbf9d380 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -180,7 +180,7 @@ int32_t tsdbSetTableList(STsdbReader *pReader, const void *pTableList, int32_t n int32_t tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, void *pTableList, int32_t numOfTables, SSDataBlock *pResBlock, STsdbReader **ppReader, const char *idstr, bool countOnly); -void tsdbReaderSetId(STsdbReader* pReader, const char* idstr); +void tsdbReaderSetId(STsdbReader *pReader, const char *idstr); void tsdbReaderClose(STsdbReader *pReader); int32_t tsdbNextDataBlock(STsdbReader *pReader, bool *hasNext); int32_t tsdbRetrieveDatablockSMA(STsdbReader *pReader, SSDataBlock *pDataBlock, bool *allHave); @@ -194,7 +194,7 @@ void *tsdbGetIvtIdx(SMeta *pMeta); uint64_t getReaderMaxVersion(STsdbReader *pReader); int32_t tsdbCacherowsReaderOpen(void *pVnode, int32_t type, void *pTableIdList, int32_t numOfTables, int32_t numOfCols, - uint64_t suid, void **pReader, const char *idstr); + SArray *pCidList, uint64_t suid, void **pReader, const char *idstr); int32_t tsdbRetrieveCacheRows(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds, SArray *pTableUids); void *tsdbCacherowsReaderClose(void *pReader); int32_t tsdbGetTableSchema(SVnode *pVnode, int64_t uid, STSchema **pSchema, int64_t *suid); @@ -260,7 +260,7 @@ int32_t tqReaderAddTbUidList(STqReader *pReader, const SArray *tbUidList); int32_t tqReaderRemoveTbUidList(STqReader *pReader, const SArray *tbUidList); int32_t tqSeekVer(STqReader *pReader, int64_t ver, const char *id); -void tqNextBlock(STqReader *pReader, SFetchRet *ret); +void tqNextBlock(STqReader *pReader, SFetchRet *ret); int32_t tqReaderSetSubmitReq2(STqReader *pReader, void *msgStr, int32_t msgLen, int64_t ver); // int32_t tqReaderSetDataMsg(STqReader *pReader, const SSubmitReq *pMsg, int64_t ver); diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 16afa9f596..2ec3bf59e1 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -787,6 +787,7 @@ typedef struct SCacheRowsReader { uint64_t suid; char **transferBuf; // todo remove it soon int32_t numOfCols; + SArray *pCidList; int32_t type; int32_t tableIndex; // currently returned result tables STableKeyInfo *pTableList; // table id list diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 3dd84bd07c..a60f20522b 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -283,6 +283,14 @@ _exit: return code; } +int32_t tsdbCacheGetLast(STsdb *pTsdb, tb_uid_t uid, SArray **ppLastArray, SCacheRowsReader *pr) { + int32_t code = 0; + + SArray *pCidList = pr->pCidList; + + return code; +} + int32_t tsdbOpenCache(STsdb *pTsdb) { int32_t code = 0; SLRUCache *pCache = NULL; diff --git a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c index 95981c2f08..19967302d0 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c @@ -143,7 +143,7 @@ static int32_t setTableSchema(SCacheRowsReader* p, uint64_t suid, const char* id } int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList, int32_t numOfTables, int32_t numOfCols, - uint64_t suid, void** pReader, const char* idstr) { + SArray* pCidList, uint64_t suid, void** pReader, const char* idstr) { *pReader = NULL; SCacheRowsReader* p = taosMemoryCalloc(1, sizeof(SCacheRowsReader)); if (p == NULL) { @@ -155,6 +155,7 @@ int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList, p->pTsdb = p->pVnode->pTsdb; p->verRange = (SVersionRange){.minVer = 0, .maxVer = UINT64_MAX}; p->numOfCols = numOfCols; + p->pCidList = pCidList; p->suid = suid; if (numOfTables == 0) { diff --git a/source/libs/executor/src/cachescanoperator.c b/source/libs/executor/src/cachescanoperator.c index f6fc332b37..543ac412db 100644 --- a/source/libs/executor/src/cachescanoperator.c +++ b/source/libs/executor/src/cachescanoperator.c @@ -36,6 +36,7 @@ typedef struct SCacheRowsScanInfo { int32_t currentGroupIndex; SSDataBlock* pBufferredRes; SArray* pUidList; + SArray* pCidList; int32_t indexOfBufferedRes; STableListInfo* pTableList; } SCacheRowsScanInfo; @@ -45,13 +46,13 @@ static void destroyCacheScanOperator(void* param); static int32_t extractCacheScanSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTaskInfo, int32_t** pSlotIds); static int32_t removeRedundantTsCol(SLastRowScanPhysiNode* pScanNode, SColMatchInfo* pColMatchInfo); -#define SCAN_ROW_TYPE(_t) ((_t)? CACHESCAN_RETRIEVE_LAST : CACHESCAN_RETRIEVE_LAST_ROW) +#define SCAN_ROW_TYPE(_t) ((_t) ? CACHESCAN_RETRIEVE_LAST : CACHESCAN_RETRIEVE_LAST_ROW) SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandle* readHandle, STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) { - int32_t code = TSDB_CODE_SUCCESS; + int32_t code = TSDB_CODE_SUCCESS; SCacheRowsScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SCacheRowsScanInfo)); - SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); + SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; tableListDestroy(pTableListInfo); @@ -71,6 +72,13 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe goto _error; } + SArray* pCidList = taosArrayInit(numOfCols, sizeof(int16_t)); + for (int i = 0; i < TARRAY_SIZE(pInfo->matchInfo.pList); ++i) { + SColMatchItem* pColInfo = taosArrayGet(pInfo->matchInfo.pList, i); + taosArrayPush(pCidList, &pColInfo->colId); + } + pInfo->pCidList = pCidList; + removeRedundantTsCol(pScanNode, &pInfo->matchInfo); code = extractCacheScanSlotId(pInfo->matchInfo.pList, pTaskInfo, &pInfo->pSlotIds); @@ -91,7 +99,8 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe uint64_t suid = tableListGetSuid(pTableListInfo); code = tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pList, totalTables, - taosArrayGetSize(pInfo->matchInfo.pList), suid, &pInfo->pLastrowReader, pTaskInfo->id.str); + taosArrayGetSize(pInfo->matchInfo.pList), pCidList, suid, &pInfo->pLastrowReader, + pTaskInfo->id.str); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -114,7 +123,8 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe p->pCtx = createSqlFunctionCtx(p->pExprInfo, p->numOfExprs, &p->rowEntryInfoOffset); } - setOperatorInfo(pOperator, "CachedRowScanOperator", QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN, false, OP_NOT_OPENED, pInfo, pTaskInfo); + setOperatorInfo(pOperator, "CachedRowScanOperator", QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN, false, OP_NOT_OPENED, + pInfo, pTaskInfo); pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock); pOperator->fpSet = @@ -123,7 +133,7 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe pOperator->cost.openCost = 0; return pOperator; - _error: +_error: pTaskInfo->code = code; destroyCacheScanOperator(pInfo); taosMemoryFree(pOperator); @@ -136,8 +146,8 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { } SCacheRowsScanInfo* pInfo = pOperator->info; - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - STableListInfo* pTableList = pInfo->pTableList; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + STableListInfo* pTableList = pInfo->pTableList; uint64_t suid = tableListGetSuid(pTableList); int32_t size = tableListGetSize(pTableList); @@ -194,8 +204,8 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { pRes->info.rows = 1; SExprSupp* pSup = &pInfo->pseudoExprSup; - int32_t code = addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pRes, - pRes->info.rows, GET_TASKID(pTaskInfo), NULL); + int32_t code = addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pRes, + pRes->info.rows, GET_TASKID(pTaskInfo), NULL); if (code != TSDB_CODE_SUCCESS) { pTaskInfo->code = code; return NULL; @@ -217,7 +227,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { } STableKeyInfo* pList = NULL; - int32_t num = 0; + int32_t num = 0; int32_t code = tableListGetGroupList(pTableList, pInfo->currentGroupIndex, &pList, &num); if (code != TSDB_CODE_SUCCESS) { @@ -225,8 +235,8 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { } code = tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pList, num, - taosArrayGetSize(pInfo->matchInfo.pList), suid, &pInfo->pLastrowReader, - pTaskInfo->id.str); + taosArrayGetSize(pInfo->matchInfo.pList), pInfo->pCidList, suid, + &pInfo->pLastrowReader, pTaskInfo->id.str); if (code != TSDB_CODE_SUCCESS) { pInfo->currentGroupIndex += 1; taosArrayClear(pInfo->pUidList); @@ -254,8 +264,8 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { ASSERT((pInfo->retrieveType & CACHESCAN_RETRIEVE_LAST_ROW) == CACHESCAN_RETRIEVE_LAST_ROW); pInfo->pRes->info.id.uid = *(tb_uid_t*)taosArrayGet(pInfo->pUidList, 0); - code = addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pInfo->pRes, pInfo->pRes->info.rows, - GET_TASKID(pTaskInfo), NULL); + code = addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pInfo->pRes, + pInfo->pRes->info.rows, GET_TASKID(pTaskInfo), NULL); if (code != TSDB_CODE_SUCCESS) { pTaskInfo->code = code; return NULL; @@ -280,6 +290,7 @@ void destroyCacheScanOperator(void* param) { blockDataDestroy(pInfo->pRes); blockDataDestroy(pInfo->pBufferredRes); taosMemoryFree(pInfo->pSlotIds); + taosArrayDestroy(pInfo->pCidList); taosArrayDestroy(pInfo->pUidList); taosArrayDestroy(pInfo->matchInfo.pList); tableListDestroy(pInfo->pTableList); @@ -325,7 +336,7 @@ int32_t removeRedundantTsCol(SLastRowScanPhysiNode* pScanNode, SColMatchInfo* pC return TSDB_CODE_SUCCESS; } - size_t size = taosArrayGetSize(pColMatchInfo->pList); + size_t size = taosArrayGetSize(pColMatchInfo->pList); SArray* pMatchInfo = taosArrayInit(size, sizeof(SColMatchItem)); for (int32_t i = 0; i < size; ++i) {