diff --git a/include/libs/executor/storageapi.h b/include/libs/executor/storageapi.h index 77576cf1af..0fd8a74127 100644 --- a/include/libs/executor/storageapi.h +++ b/include/libs/executor/storageapi.h @@ -396,7 +396,7 @@ int32_t metaPutTbGroupToCache(SMeta* pMeta, uint64_t suid, const void* pKey, in int32_t (*getNumOfChildTables)(void* pVnode, int64_t uid, int64_t* numOfTables); // int32_t metaGetStbStats(SMeta *pMeta, int64_t uid, SMetaStbStats *pInfo); void (*getBasicInfo)(void *pVnode, const char **dbname, int32_t *vgId, int64_t* numOfTables, int64_t* numOfNormalTables);// vnodeGetInfo(void *pVnode, const char **dbname, int32_t *vgId) & metaGetTbNum(SMeta *pMeta) & metaGetNtbNum(SMeta *pMeta); - int64_t (*getNumOfRowsInMem)(); + int64_t (*getNumOfRowsInMem)(void* pVnode); /** int32_t vnodeGetCtbIdList(void *pVnode, int64_t suid, SArray *list); int32_t vnodeGetCtbIdListByFilter(void *pVnode, int64_t suid, SArray *list, bool (*filter)(void *arg), void *arg); diff --git a/source/dnode/mnode/impl/src/mndQuery.c b/source/dnode/mnode/impl/src/mndQuery.c index 5278fc7761..8e95fa3d6d 100644 --- a/source/dnode/mnode/impl/src/mndQuery.c +++ b/source/dnode/mnode/impl/src/mndQuery.c @@ -33,6 +33,7 @@ void mndPostProcessQueryMsg(SRpcMsg *pMsg) { int32_t mndProcessQueryMsg(SRpcMsg *pMsg) { int32_t code = -1; SMnode *pMnode = pMsg->info.node; + SReadHandle handle = {.mnd = pMnode, .pMsgCb = &pMnode->msgCb}; mTrace("msg:%p, in query queue is processing", pMsg); diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 558cfdac2b..39aa5c3043 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -277,6 +277,8 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat } SReadHandle handle = { .vnode = pVnode, .initTqReader = 1, .pStateBackend = pStreamState }; + initStorageAPI(&handle.api); + pRSmaInfo->taskInfo[idx] = qCreateStreamExecTaskInfo(param->qmsg[idx], &handle, TD_VID(pVnode)); if (!pRSmaInfo->taskInfo[idx]) { terrno = TSDB_CODE_RSMA_QTASKINFO_CREATE; @@ -849,6 +851,8 @@ static int32_t tdCloneQTaskInfo(SSma *pSma, qTaskInfo_t dstTaskInfo, qTaskInfo_t TSDB_CHECK_CODE(code, lino, _exit); SReadHandle handle = { .vnode = pVnode, .initTqReader = 1 }; + initStorageAPI(&handle.api); + if (ASSERTS(!dstTaskInfo, "dstTaskInfo:%p is not NULL", dstTaskInfo)) { code = TSDB_CODE_APP_ERROR; TSDB_CHECK_CODE(code, lino, _exit); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 4adbf02302..362b8204fc 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -671,6 +671,8 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg pHandle->pRef = pRef; SReadHandle handle = {.vnode = pVnode, .initTableReader = true, .initTqReader = true, .version = ver}; + initStorageAPI(&handle.api); + pHandle->snapshotVer = ver; if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { diff --git a/source/dnode/vnode/src/vnd/vnodeInitApi.c b/source/dnode/vnode/src/vnd/vnodeInitApi.c index 70cff5e68a..d9bb4270d8 100644 --- a/source/dnode/vnode/src/vnd/vnodeInitApi.c +++ b/source/dnode/vnode/src/vnd/vnodeInitApi.c @@ -65,6 +65,7 @@ void initMetadataAPI(SStoreMeta* pMeta) { pMeta->getBasicInfo = vnodeGetInfo; pMeta->getNumOfChildTables = metaGetStbStats; +// pMeta->getNumOfRowsInMem = tsdbGetNumOfRowsInMemTable; pMeta->getChildTableList = vnodeGetCtbIdList; diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index d16102037d..b950437f23 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -250,7 +250,9 @@ static int32_t vnodePreProcessDeleteMsg(SVnode *pVnode, SRpcMsg *pMsg) { uint8_t *pCont; SEncoder *pCoder = &(SEncoder){0}; SDeleteRes res = {0}; + SReadHandle handle = {.config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb}; + initStorageAPI(&handle.api); code = qWorkerProcessDeleteMsg(&handle, pVnode->pQuery, pMsg, &res); if (code) goto _exit; diff --git a/source/libs/executor/src/sysscanoperator.c b/source/libs/executor/src/sysscanoperator.c index 8f705a7eeb..d15b6d5754 100644 --- a/source/libs/executor/src/sysscanoperator.c +++ b/source/libs/executor/src/sysscanoperator.c @@ -1814,9 +1814,13 @@ void destroySysScanOperator(void* param) { if (strncasecmp(name, TSDB_INS_TABLE_TABLES, TSDB_TABLE_FNAME_LEN) == 0 || strncasecmp(name, TSDB_INS_TABLE_TAGS, TSDB_TABLE_FNAME_LEN) == 0 || strncasecmp(name, TSDB_INS_TABLE_COLS, TSDB_TABLE_FNAME_LEN) == 0 || pInfo->pCur != NULL) { - pInfo->pAPI->metaFn.closeTableMetaCursor(pInfo->pCur); + if (pInfo->pAPI->metaFn.closeTableMetaCursor != NULL) { + pInfo->pAPI->metaFn.closeTableMetaCursor(pInfo->pCur); + } + pInfo->pCur = NULL; } + if (pInfo->pIdx) { taosArrayDestroy(pInfo->pIdx->uids); taosMemoryFree(pInfo->pIdx); @@ -2200,7 +2204,7 @@ static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator) { } pAPI->tsdReader.tsdReaderGetDataBlockDistInfo(pBlockScanInfo->pHandle, &blockDistInfo); - blockDistInfo.numOfInmemRows = (int32_t) pAPI->metaFn.getNumOfRowsInMem(pBlockScanInfo->pHandle); + blockDistInfo.numOfInmemRows = (int32_t) pAPI->tsdReader.tsdReaderGetNumOfInMemRows(pBlockScanInfo->pHandle); SSDataBlock* pBlock = pBlockScanInfo->pResBlock; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 2d366b6110..cf58892920 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -2906,7 +2906,7 @@ void initDownStream(SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, uin } int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, int64_t gap, - SStreamState* pState, int32_t keySize, int16_t keyType) { + SStreamState* pState, int32_t keySize, int16_t keyType, SStateStore* pStore) { pSup->resultRowSize = keySize + getResultRowSize(pCtx, numOfOutput); pSup->pScanBlock = createSpecialDataBlock(STREAM_CLEAR); pSup->gap = gap; @@ -2939,6 +2939,8 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SqlFunctionCtx* pCtx, qError("Init stream agg supporter failed since %s, tempDir:%s", terrstr(), tsTempDir); return terrno; } + + pSup->stateStore = *pStore; int32_t code = createDiskbasedBuf(&pSup->pResultBuf, pageSize, bufSize, "function", tsTempDir); for (int32_t i = 0; i < numOfOutput; ++i) { pCtx[i].saveHandle.pBuf = pSup->pResultBuf; @@ -3600,7 +3602,7 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh } code = initStreamAggSupporter(&pInfo->streamAggSup, pSup->pCtx, numOfCols, pSessionNode->gap, - pTaskInfo->streamInfo.pState, 0, 0); + pTaskInfo->streamInfo.pState, 0, 0, &pTaskInfo->storageAPI.stateStore); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -4149,7 +4151,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys int32_t keySize = sizeof(SStateKeys) + pColNode->node.resType.bytes; int16_t type = pColNode->node.resType.type; code = initStreamAggSupporter(&pInfo->streamAggSup, pSup->pCtx, numOfCols, 0, pTaskInfo->streamInfo.pState, keySize, - type); + type, &pTaskInfo->storageAPI.stateStore); if (code != TSDB_CODE_SUCCESS) { goto _error; }