diff --git a/source/libs/executor/src/streamcountwindowoperator.c b/source/libs/executor/src/streamcountwindowoperator.c index dd05581ff0..d8ad026c6c 100644 --- a/source/libs/executor/src/streamcountwindowoperator.c +++ b/source/libs/executor/src/streamcountwindowoperator.c @@ -482,7 +482,8 @@ void doStreamCountSaveCheckpoint(SOperatorInfo* pOperator) { code = TSDB_CODE_OUT_OF_MEMORY; QUERY_CHECK_CODE(code, lino, _end); } - len = doStreamCountEncodeOpState(&pBuf, len, pOperator, true); + void* pTmpBuf = pBuf; + len = doStreamCountEncodeOpState(&pTmpBuf, len, pOperator, true); pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_COUNT_OP_CHECKPOINT_NAME, strlen(STREAM_COUNT_OP_CHECKPOINT_NAME), pBuf, len); saveStreamOperatorStateComplete(&pInfo->basic); diff --git a/source/libs/executor/src/streameventwindowoperator.c b/source/libs/executor/src/streameventwindowoperator.c index 2cd36a7fb7..3f1887b1ae 100644 --- a/source/libs/executor/src/streameventwindowoperator.c +++ b/source/libs/executor/src/streameventwindowoperator.c @@ -542,7 +542,11 @@ void doStreamEventSaveCheckpoint(SOperatorInfo* pOperator) { if (needSaveStreamOperatorInfo(&pInfo->basic)) { int32_t len = doStreamEventEncodeOpState(NULL, 0, pOperator); void* buf = taosMemoryCalloc(1, len); - void* pBuf = buf; + if (!buf) { + qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_OUT_OF_MEMORY)); + return; + } + void* pBuf = buf; len = doStreamEventEncodeOpState(&pBuf, len, pOperator); pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_EVENT_OP_CHECKPOINT_NAME, strlen(STREAM_EVENT_OP_CHECKPOINT_NAME), buf, len); diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index d340efbb1b..c47f99cca3 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -1410,7 +1410,11 @@ void doStreamIntervalSaveCheckpoint(SOperatorInfo* pOperator) { if (needSaveStreamOperatorInfo(&pInfo->basic)) { int32_t len = doStreamIntervalEncodeOpState(NULL, 0, pOperator); void* buf = taosMemoryCalloc(1, len); - void* pBuf = buf; + if (!buf) { + qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_OUT_OF_MEMORY)); + return; + } + void* pBuf = buf; len = doStreamIntervalEncodeOpState(&pBuf, len, pOperator); pInfo->stateStore.streamStateSaveInfo(pInfo->pState, STREAM_INTERVAL_OP_CHECKPOINT_NAME, strlen(STREAM_INTERVAL_OP_CHECKPOINT_NAME), buf, len); @@ -3193,7 +3197,11 @@ void doStreamSessionSaveCheckpoint(SOperatorInfo* pOperator) { if (needSaveStreamOperatorInfo(&pInfo->basic)) { int32_t len = doStreamSessionEncodeOpState(NULL, 0, pOperator, true); void* buf = taosMemoryCalloc(1, len); - void* pBuf = buf; + if (!buf) { + qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_OUT_OF_MEMORY)); + return; + } + void* pBuf = buf; len = doStreamSessionEncodeOpState(&pBuf, len, pOperator, true); pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_SESSION_OP_CHECKPOINT_NAME, strlen(STREAM_SESSION_OP_CHECKPOINT_NAME), buf, len); @@ -4401,7 +4409,11 @@ void doStreamStateSaveCheckpoint(SOperatorInfo* pOperator) { if (needSaveStreamOperatorInfo(&pInfo->basic)) { int32_t len = doStreamStateEncodeOpState(NULL, 0, pOperator, true); void* buf = taosMemoryCalloc(1, len); - void* pBuf = buf; + if (!buf) { + qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_OUT_OF_MEMORY)); + return; + } + void* pBuf = buf; len = doStreamStateEncodeOpState(&pBuf, len, pOperator, true); pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_STATE_OP_CHECKPOINT_NAME, strlen(STREAM_STATE_OP_CHECKPOINT_NAME), buf, len);