diff --git a/include/libs/stream/streamState.h b/include/libs/stream/streamState.h index b7f100733b..17b766906c 100644 --- a/include/libs/stream/streamState.h +++ b/include/libs/stream/streamState.h @@ -35,6 +35,7 @@ int32_t streamStateBegin(SStreamState* pState); int32_t streamStateCommit(SStreamState* pState); void streamStateDestroy(SStreamState* pState, bool remove); int32_t streamStateDeleteCheckPoint(SStreamState* pState, TSKEY mark); +int32_t streamStateDelTaskDb(SStreamState* pState); int32_t streamStateFuncPut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen); int32_t streamStateFuncGet(SStreamState* pState, const SWinKey* key, void** ppVal, int32_t* pVLen); @@ -130,4 +131,4 @@ char* streamStateIntervalDump(SStreamState* pState); } #endif -#endif /* ifndef _STREAM_STATE_H_ */ +#endif /* ifndef _STREAM_STATE_H_ */ \ No newline at end of file diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index fe458fdba3..5485ab88e3 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -89,7 +89,11 @@ void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree) { } if (isDeepFree && pItem->pStreamState) { + //SStreamTask *pTask = pItem->pStreamState->pTdb + streamStateDelTaskDb(pItem->pStreamState); streamStateClose(pItem->pStreamState, false); + + } if (isDeepFree && pInfo->taskInfo[i]) { diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 16363e1bdd..e874f42b7a 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -209,6 +209,12 @@ _err: #endif } +int32_t streamStateDelTaskDb(SStreamState* pState) { + SStreamTask* pTask = pState->pTdbState->pOwner; + taskDbRemoveRef(pTask->pBackend); + taosMemoryFree(pTask); + return 0; +} void streamStateClose(SStreamState* pState, bool remove) { SStreamTask* pTask = pState->pTdbState->pOwner; #ifdef USE_ROCKSDB