From 6f77049c1f6de04970fb76b10a8ce21766ccbe7c Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Mon, 22 Jul 2024 15:57:10 +0800 Subject: [PATCH] fix issue --- .../executor/src/streamcountwindowoperator.c | 3 ++- .../executor/src/streameventwindowoperator.c | 6 +++++- .../executor/src/streamtimewindowoperator.c | 18 +++++++++++++++--- 3 files changed, 22 insertions(+), 5 deletions(-) diff --git a/source/libs/executor/src/streamcountwindowoperator.c b/source/libs/executor/src/streamcountwindowoperator.c index 04310276a9..b79e6cf800 100644 --- a/source/libs/executor/src/streamcountwindowoperator.c +++ b/source/libs/executor/src/streamcountwindowoperator.c @@ -480,7 +480,8 @@ void doStreamCountSaveCheckpoint(SOperatorInfo* pOperator) { code = TSDB_CODE_OUT_OF_MEMORY; TSDB_CHECK_CODE(code, lino, _end); } - len = doStreamCountEncodeOpState(&pBuf, len, pOperator, true); + void* pTmpBuf = pBuf; + len = doStreamCountEncodeOpState(&pTmpBuf, len, pOperator, true); pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_COUNT_OP_CHECKPOINT_NAME, strlen(STREAM_COUNT_OP_CHECKPOINT_NAME), pBuf, len); saveStreamOperatorStateComplete(&pInfo->basic); diff --git a/source/libs/executor/src/streameventwindowoperator.c b/source/libs/executor/src/streameventwindowoperator.c index 2d6f9b1fc5..a3f0fb9839 100644 --- a/source/libs/executor/src/streameventwindowoperator.c +++ b/source/libs/executor/src/streameventwindowoperator.c @@ -540,7 +540,11 @@ void doStreamEventSaveCheckpoint(SOperatorInfo* pOperator) { if (needSaveStreamOperatorInfo(&pInfo->basic)) { int32_t len = doStreamEventEncodeOpState(NULL, 0, pOperator); void* buf = taosMemoryCalloc(1, len); - void* pBuf = buf; + if (!buf) { + qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_OUT_OF_MEMORY)); + return; + } + void* pBuf = buf; len = doStreamEventEncodeOpState(&pBuf, len, pOperator); pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_EVENT_OP_CHECKPOINT_NAME, strlen(STREAM_EVENT_OP_CHECKPOINT_NAME), buf, len); diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 50b20344d6..5724f2b3ba 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -1408,7 +1408,11 @@ void doStreamIntervalSaveCheckpoint(SOperatorInfo* pOperator) { if (needSaveStreamOperatorInfo(&pInfo->basic)) { int32_t len = doStreamIntervalEncodeOpState(NULL, 0, pOperator); void* buf = taosMemoryCalloc(1, len); - void* pBuf = buf; + if (!buf) { + qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_OUT_OF_MEMORY)); + return; + } + void* pBuf = buf; len = doStreamIntervalEncodeOpState(&pBuf, len, pOperator); pInfo->stateStore.streamStateSaveInfo(pInfo->pState, STREAM_INTERVAL_OP_CHECKPOINT_NAME, strlen(STREAM_INTERVAL_OP_CHECKPOINT_NAME), buf, len); @@ -3188,7 +3192,11 @@ void doStreamSessionSaveCheckpoint(SOperatorInfo* pOperator) { if (needSaveStreamOperatorInfo(&pInfo->basic)) { int32_t len = doStreamSessionEncodeOpState(NULL, 0, pOperator, true); void* buf = taosMemoryCalloc(1, len); - void* pBuf = buf; + if (!buf) { + qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_OUT_OF_MEMORY)); + return; + } + void* pBuf = buf; len = doStreamSessionEncodeOpState(&pBuf, len, pOperator, true); pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_SESSION_OP_CHECKPOINT_NAME, strlen(STREAM_SESSION_OP_CHECKPOINT_NAME), buf, len); @@ -4390,7 +4398,11 @@ void doStreamStateSaveCheckpoint(SOperatorInfo* pOperator) { if (needSaveStreamOperatorInfo(&pInfo->basic)) { int32_t len = doStreamStateEncodeOpState(NULL, 0, pOperator, true); void* buf = taosMemoryCalloc(1, len); - void* pBuf = buf; + if (!buf) { + qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_OUT_OF_MEMORY)); + return; + } + void* pBuf = buf; len = doStreamStateEncodeOpState(&pBuf, len, pOperator, true); pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_STATE_OP_CHECKPOINT_NAME, strlen(STREAM_STATE_OP_CHECKPOINT_NAME), buf, len);