From d8e4f2d743f4d79bccf30fd971188aa97ce17b8a Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Mon, 22 Jul 2024 11:34:25 +0800 Subject: [PATCH 1/3] operator --- source/libs/executor/src/scanoperator.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 583dd7b41b..7a6d5cb730 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -3556,7 +3556,7 @@ static int32_t tagScanCreateResultData(SDataType* pType, int32_t numOfRows, SSca } static EDealRes tagScanRewriteTagColumn(SNode** pNode, void* pContext) { - SColumnNode* pSColumnNode = NULL; + SColumnNode* pSColumnNode = NULL;; if (QUERY_NODE_COLUMN == nodeType((*pNode))) { pSColumnNode = *(SColumnNode**)pNode; } else if (QUERY_NODE_FUNCTION == nodeType((*pNode))) { From 6f77049c1f6de04970fb76b10a8ce21766ccbe7c Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Mon, 22 Jul 2024 15:57:10 +0800 Subject: [PATCH 2/3] fix issue --- .../executor/src/streamcountwindowoperator.c | 3 ++- .../executor/src/streameventwindowoperator.c | 6 +++++- .../executor/src/streamtimewindowoperator.c | 18 +++++++++++++++--- 3 files changed, 22 insertions(+), 5 deletions(-) diff --git a/source/libs/executor/src/streamcountwindowoperator.c b/source/libs/executor/src/streamcountwindowoperator.c index 04310276a9..b79e6cf800 100644 --- a/source/libs/executor/src/streamcountwindowoperator.c +++ b/source/libs/executor/src/streamcountwindowoperator.c @@ -480,7 +480,8 @@ void doStreamCountSaveCheckpoint(SOperatorInfo* pOperator) { code = TSDB_CODE_OUT_OF_MEMORY; TSDB_CHECK_CODE(code, lino, _end); } - len = doStreamCountEncodeOpState(&pBuf, len, pOperator, true); + void* pTmpBuf = pBuf; + len = doStreamCountEncodeOpState(&pTmpBuf, len, pOperator, true); pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_COUNT_OP_CHECKPOINT_NAME, strlen(STREAM_COUNT_OP_CHECKPOINT_NAME), pBuf, len); saveStreamOperatorStateComplete(&pInfo->basic); diff --git a/source/libs/executor/src/streameventwindowoperator.c b/source/libs/executor/src/streameventwindowoperator.c index 2d6f9b1fc5..a3f0fb9839 100644 --- a/source/libs/executor/src/streameventwindowoperator.c +++ b/source/libs/executor/src/streameventwindowoperator.c @@ -540,7 +540,11 @@ void doStreamEventSaveCheckpoint(SOperatorInfo* pOperator) { if (needSaveStreamOperatorInfo(&pInfo->basic)) { int32_t len = doStreamEventEncodeOpState(NULL, 0, pOperator); void* buf = taosMemoryCalloc(1, len); - void* pBuf = buf; + if (!buf) { + qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_OUT_OF_MEMORY)); + return; + } + void* pBuf = buf; len = doStreamEventEncodeOpState(&pBuf, len, pOperator); pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_EVENT_OP_CHECKPOINT_NAME, strlen(STREAM_EVENT_OP_CHECKPOINT_NAME), buf, len); diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 50b20344d6..5724f2b3ba 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -1408,7 +1408,11 @@ void doStreamIntervalSaveCheckpoint(SOperatorInfo* pOperator) { if (needSaveStreamOperatorInfo(&pInfo->basic)) { int32_t len = doStreamIntervalEncodeOpState(NULL, 0, pOperator); void* buf = taosMemoryCalloc(1, len); - void* pBuf = buf; + if (!buf) { + qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_OUT_OF_MEMORY)); + return; + } + void* pBuf = buf; len = doStreamIntervalEncodeOpState(&pBuf, len, pOperator); pInfo->stateStore.streamStateSaveInfo(pInfo->pState, STREAM_INTERVAL_OP_CHECKPOINT_NAME, strlen(STREAM_INTERVAL_OP_CHECKPOINT_NAME), buf, len); @@ -3188,7 +3192,11 @@ void doStreamSessionSaveCheckpoint(SOperatorInfo* pOperator) { if (needSaveStreamOperatorInfo(&pInfo->basic)) { int32_t len = doStreamSessionEncodeOpState(NULL, 0, pOperator, true); void* buf = taosMemoryCalloc(1, len); - void* pBuf = buf; + if (!buf) { + qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_OUT_OF_MEMORY)); + return; + } + void* pBuf = buf; len = doStreamSessionEncodeOpState(&pBuf, len, pOperator, true); pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_SESSION_OP_CHECKPOINT_NAME, strlen(STREAM_SESSION_OP_CHECKPOINT_NAME), buf, len); @@ -4390,7 +4398,11 @@ void doStreamStateSaveCheckpoint(SOperatorInfo* pOperator) { if (needSaveStreamOperatorInfo(&pInfo->basic)) { int32_t len = doStreamStateEncodeOpState(NULL, 0, pOperator, true); void* buf = taosMemoryCalloc(1, len); - void* pBuf = buf; + if (!buf) { + qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_OUT_OF_MEMORY)); + return; + } + void* pBuf = buf; len = doStreamStateEncodeOpState(&pBuf, len, pOperator, true); pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_STATE_OP_CHECKPOINT_NAME, strlen(STREAM_STATE_OP_CHECKPOINT_NAME), buf, len); From f589cc68944123d466ce8ae15910a18efccde90a Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Mon, 22 Jul 2024 15:58:46 +0800 Subject: [PATCH 3/3] fix issue --- source/libs/executor/src/scanoperator.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 7a6d5cb730..583dd7b41b 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -3556,7 +3556,7 @@ static int32_t tagScanCreateResultData(SDataType* pType, int32_t numOfRows, SSca } static EDealRes tagScanRewriteTagColumn(SNode** pNode, void* pContext) { - SColumnNode* pSColumnNode = NULL;; + SColumnNode* pSColumnNode = NULL; if (QUERY_NODE_COLUMN == nodeType((*pNode))) { pSColumnNode = *(SColumnNode**)pNode; } else if (QUERY_NODE_FUNCTION == nodeType((*pNode))) {