Merge pull request #26723 from taosdata/fix/TD-31071

fix(stream):fix issue
This commit is contained in:
Haojun Liao 2024-07-23 11:38:21 +08:00 committed by GitHub
commit 12ffdef6b0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 22 additions and 5 deletions

View File

@ -482,7 +482,8 @@ void doStreamCountSaveCheckpoint(SOperatorInfo* pOperator) {
code = TSDB_CODE_OUT_OF_MEMORY;
QUERY_CHECK_CODE(code, lino, _end);
}
len = doStreamCountEncodeOpState(&pBuf, len, pOperator, true);
void* pTmpBuf = pBuf;
len = doStreamCountEncodeOpState(&pTmpBuf, len, pOperator, true);
pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_COUNT_OP_CHECKPOINT_NAME,
strlen(STREAM_COUNT_OP_CHECKPOINT_NAME), pBuf, len);
saveStreamOperatorStateComplete(&pInfo->basic);

View File

@ -542,7 +542,11 @@ void doStreamEventSaveCheckpoint(SOperatorInfo* pOperator) {
if (needSaveStreamOperatorInfo(&pInfo->basic)) {
int32_t len = doStreamEventEncodeOpState(NULL, 0, pOperator);
void* buf = taosMemoryCalloc(1, len);
void* pBuf = buf;
if (!buf) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_OUT_OF_MEMORY));
return;
}
void* pBuf = buf;
len = doStreamEventEncodeOpState(&pBuf, len, pOperator);
pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_EVENT_OP_CHECKPOINT_NAME,
strlen(STREAM_EVENT_OP_CHECKPOINT_NAME), buf, len);

View File

@ -1410,7 +1410,11 @@ void doStreamIntervalSaveCheckpoint(SOperatorInfo* pOperator) {
if (needSaveStreamOperatorInfo(&pInfo->basic)) {
int32_t len = doStreamIntervalEncodeOpState(NULL, 0, pOperator);
void* buf = taosMemoryCalloc(1, len);
void* pBuf = buf;
if (!buf) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_OUT_OF_MEMORY));
return;
}
void* pBuf = buf;
len = doStreamIntervalEncodeOpState(&pBuf, len, pOperator);
pInfo->stateStore.streamStateSaveInfo(pInfo->pState, STREAM_INTERVAL_OP_CHECKPOINT_NAME,
strlen(STREAM_INTERVAL_OP_CHECKPOINT_NAME), buf, len);
@ -3193,7 +3197,11 @@ void doStreamSessionSaveCheckpoint(SOperatorInfo* pOperator) {
if (needSaveStreamOperatorInfo(&pInfo->basic)) {
int32_t len = doStreamSessionEncodeOpState(NULL, 0, pOperator, true);
void* buf = taosMemoryCalloc(1, len);
void* pBuf = buf;
if (!buf) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_OUT_OF_MEMORY));
return;
}
void* pBuf = buf;
len = doStreamSessionEncodeOpState(&pBuf, len, pOperator, true);
pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_SESSION_OP_CHECKPOINT_NAME,
strlen(STREAM_SESSION_OP_CHECKPOINT_NAME), buf, len);
@ -4401,7 +4409,11 @@ void doStreamStateSaveCheckpoint(SOperatorInfo* pOperator) {
if (needSaveStreamOperatorInfo(&pInfo->basic)) {
int32_t len = doStreamStateEncodeOpState(NULL, 0, pOperator, true);
void* buf = taosMemoryCalloc(1, len);
void* pBuf = buf;
if (!buf) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_OUT_OF_MEMORY));
return;
}
void* pBuf = buf;
len = doStreamStateEncodeOpState(&pBuf, len, pOperator, true);
pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_STATE_OP_CHECKPOINT_NAME,
strlen(STREAM_STATE_OP_CHECKPOINT_NAME), buf, len);