diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 073558c73e..935eeb1418 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -538,8 +538,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddString(pCfg, "udfdResFuncs", tsUdfdResFuncs, CFG_SCOPE_SERVER) != 0) return -1; if (cfgAddString(pCfg, "udfdLdLibPath", tsUdfdLdLibPath, CFG_SCOPE_SERVER) != 0) return -1; - if (cfgAddBool(pCfg, "disableStream", tsDisableStream, 0) != 0) return -1; - if (cfgAddInt64(pCfg, "streamBufferSize", tsStreamBufferSize, 0, INT64_MAX, 0) != 0) return -1; + if (cfgAddBool(pCfg, "disableStream", tsDisableStream, CFG_SCOPE_SERVER) != 0) return -1; + if (cfgAddInt64(pCfg, "streamBufferSize", tsStreamBufferSize, 0, INT64_MAX, CFG_SCOPE_SERVER) != 0) return -1; if (cfgAddInt64(pCfg, "checkpointInterval", tsCheckpointInterval, 0, INT64_MAX, CFG_SCOPE_SERVER) != 0) return -1; if (cfgAddInt32(pCfg, "cacheLazyLoadThreshold", tsCacheLazyLoadThreshold, 0, 100000, CFG_SCOPE_SERVER) != 0) return -1; diff --git a/source/dnode/vnode/src/vnd/vnodeSnapshot.c b/source/dnode/vnode/src/vnd/vnodeSnapshot.c index 4a5ccf21ec..70d74268c5 100644 --- a/source/dnode/vnode/src/vnd/vnodeSnapshot.c +++ b/source/dnode/vnode/src/vnd/vnodeSnapshot.c @@ -239,30 +239,31 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData) } } } - if (!pReader->streamStateDone) { - if (pReader->pStreamStateReader == NULL) { - code = - streamStateSnapReaderOpen(pReader->pVnode->pTq, pReader->sver, pReader->sver, &pReader->pStreamStateReader); - if (code) { - pReader->streamStateDone = 1; - pReader->pStreamStateReader = NULL; - goto _err; - } - } - code = streamStateSnapRead(pReader->pStreamStateReader, ppData); - if (code) { - goto _err; - } else { - if (*ppData) { - goto _exit; - } else { - pReader->streamStateDone = 1; - code = streamStateSnapReaderClose(pReader->pStreamStateReader); - if (code) goto _err; - pReader->pStreamStateReader = NULL; - } - } - } + // if (!pReader->streamStateDone) { + // if (pReader->pStreamStateReader == NULL) { + // code = + // streamStateSnapReaderOpen(pReader->pVnode->pTq, pReader->sver, pReader->sver, + // &pReader->pStreamStateReader); + // if (code) { + // pReader->streamStateDone = 1; + // pReader->pStreamStateReader = NULL; + // goto _err; + // } + // } + // code = streamStateSnapRead(pReader->pStreamStateReader, ppData); + // if (code) { + // goto _err; + // } else { + // if (*ppData) { + // goto _exit; + // } else { + // pReader->streamStateDone = 1; + // code = streamStateSnapReaderClose(pReader->pStreamStateReader); + // if (code) goto _err; + // pReader->pStreamStateReader = NULL; + // } + // } + // } // RSMA ============== if (VND_IS_RSMA(pReader->pVnode) && !pReader->rsmaDone) { @@ -430,7 +431,7 @@ _exit: } static int32_t vnodeSnapWriteInfo(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData) { - int32_t code = 0; + int32_t code = 0; SVnode *pVnode = pWriter->pVnode; SSnapDataHdr *pHdr = (SSnapDataHdr *)pData; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index d5fd562691..8a0c3c81fe 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -3144,7 +3144,6 @@ void destroyStreamSessionAggOperatorInfo(void* param) { colDataDestroy(&pInfo->twAggSup.timeWindowData); blockDataDestroy(pInfo->pDelRes); blockDataDestroy(pInfo->pWinBlock); - blockDataDestroy(pInfo->pUpdateRes); tSimpleHashCleanup(pInfo->pStUpdated); tSimpleHashCleanup(pInfo->pStDeleted); @@ -3431,7 +3430,7 @@ static void compactSessionWindow(SOperatorInfo* pOperator, SResultWindowInfo* pC SSHashObj* pStDeleted, bool addGap) { SExprSupp* pSup = &pOperator->exprSupp; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI; + SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI; SStreamSessionAggOperatorInfo* pInfo = pOperator->info; SResultRow* pCurResult = NULL; @@ -3469,7 +3468,8 @@ static void compactSessionWindow(SOperatorInfo* pOperator, SResultWindowInfo* pC } int32_t saveSessionOutputBuf(SStreamAggSupporter* pAggSup, SResultWindowInfo* pWinInfo) { - saveSessionDiscBuf(pAggSup->pState, &pWinInfo->sessionWin, pWinInfo->pOutputBuf, pAggSup->resultRowSize, &pAggSup->stateStore); + saveSessionDiscBuf(pAggSup->pState, &pWinInfo->sessionWin, pWinInfo->pOutputBuf, pAggSup->resultRowSize, + &pAggSup->stateStore); pWinInfo->pOutputBuf = NULL; return TSDB_CODE_SUCCESS; } @@ -4883,8 +4883,8 @@ static void compactStateWindow(SOperatorInfo* pOperator, SResultWindowInfo* pCur } void streamStateReloadState(SOperatorInfo* pOperator) { - SStreamStateAggOperatorInfo* pInfo = pOperator->info; - SStreamAggSupporter* pAggSup = &pInfo->streamAggSup; + SStreamStateAggOperatorInfo* pInfo = pOperator->info; + SStreamAggSupporter* pAggSup = &pInfo->streamAggSup; resetWinRange(&pAggSup->winRange); SSessionKey seKey = {.win.skey = INT64_MIN, .win.ekey = INT64_MIN, .groupId = 0};