fix issue

This commit is contained in:
54liuyao 2024-07-22 15:57:10 +08:00
parent e7ea54914c
commit 6f77049c1f
3 changed files with 22 additions and 5 deletions

View File

@ -480,7 +480,8 @@ void doStreamCountSaveCheckpoint(SOperatorInfo* pOperator) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _end); TSDB_CHECK_CODE(code, lino, _end);
} }
len = doStreamCountEncodeOpState(&pBuf, len, pOperator, true); void* pTmpBuf = pBuf;
len = doStreamCountEncodeOpState(&pTmpBuf, len, pOperator, true);
pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_COUNT_OP_CHECKPOINT_NAME, pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_COUNT_OP_CHECKPOINT_NAME,
strlen(STREAM_COUNT_OP_CHECKPOINT_NAME), pBuf, len); strlen(STREAM_COUNT_OP_CHECKPOINT_NAME), pBuf, len);
saveStreamOperatorStateComplete(&pInfo->basic); saveStreamOperatorStateComplete(&pInfo->basic);

View File

@ -540,6 +540,10 @@ void doStreamEventSaveCheckpoint(SOperatorInfo* pOperator) {
if (needSaveStreamOperatorInfo(&pInfo->basic)) { if (needSaveStreamOperatorInfo(&pInfo->basic)) {
int32_t len = doStreamEventEncodeOpState(NULL, 0, pOperator); int32_t len = doStreamEventEncodeOpState(NULL, 0, pOperator);
void* buf = taosMemoryCalloc(1, len); void* buf = taosMemoryCalloc(1, len);
if (!buf) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_OUT_OF_MEMORY));
return;
}
void* pBuf = buf; void* pBuf = buf;
len = doStreamEventEncodeOpState(&pBuf, len, pOperator); len = doStreamEventEncodeOpState(&pBuf, len, pOperator);
pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_EVENT_OP_CHECKPOINT_NAME, pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_EVENT_OP_CHECKPOINT_NAME,

View File

@ -1408,6 +1408,10 @@ void doStreamIntervalSaveCheckpoint(SOperatorInfo* pOperator) {
if (needSaveStreamOperatorInfo(&pInfo->basic)) { if (needSaveStreamOperatorInfo(&pInfo->basic)) {
int32_t len = doStreamIntervalEncodeOpState(NULL, 0, pOperator); int32_t len = doStreamIntervalEncodeOpState(NULL, 0, pOperator);
void* buf = taosMemoryCalloc(1, len); void* buf = taosMemoryCalloc(1, len);
if (!buf) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_OUT_OF_MEMORY));
return;
}
void* pBuf = buf; void* pBuf = buf;
len = doStreamIntervalEncodeOpState(&pBuf, len, pOperator); len = doStreamIntervalEncodeOpState(&pBuf, len, pOperator);
pInfo->stateStore.streamStateSaveInfo(pInfo->pState, STREAM_INTERVAL_OP_CHECKPOINT_NAME, pInfo->stateStore.streamStateSaveInfo(pInfo->pState, STREAM_INTERVAL_OP_CHECKPOINT_NAME,
@ -3188,6 +3192,10 @@ void doStreamSessionSaveCheckpoint(SOperatorInfo* pOperator) {
if (needSaveStreamOperatorInfo(&pInfo->basic)) { if (needSaveStreamOperatorInfo(&pInfo->basic)) {
int32_t len = doStreamSessionEncodeOpState(NULL, 0, pOperator, true); int32_t len = doStreamSessionEncodeOpState(NULL, 0, pOperator, true);
void* buf = taosMemoryCalloc(1, len); void* buf = taosMemoryCalloc(1, len);
if (!buf) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_OUT_OF_MEMORY));
return;
}
void* pBuf = buf; void* pBuf = buf;
len = doStreamSessionEncodeOpState(&pBuf, len, pOperator, true); len = doStreamSessionEncodeOpState(&pBuf, len, pOperator, true);
pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_SESSION_OP_CHECKPOINT_NAME, pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_SESSION_OP_CHECKPOINT_NAME,
@ -4390,6 +4398,10 @@ void doStreamStateSaveCheckpoint(SOperatorInfo* pOperator) {
if (needSaveStreamOperatorInfo(&pInfo->basic)) { if (needSaveStreamOperatorInfo(&pInfo->basic)) {
int32_t len = doStreamStateEncodeOpState(NULL, 0, pOperator, true); int32_t len = doStreamStateEncodeOpState(NULL, 0, pOperator, true);
void* buf = taosMemoryCalloc(1, len); void* buf = taosMemoryCalloc(1, len);
if (!buf) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_OUT_OF_MEMORY));
return;
}
void* pBuf = buf; void* pBuf = buf;
len = doStreamStateEncodeOpState(&pBuf, len, pOperator, true); len = doStreamStateEncodeOpState(&pBuf, len, pOperator, true);
pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_STATE_OP_CHECKPOINT_NAME, pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_STATE_OP_CHECKPOINT_NAME,