free mem
This commit is contained in:
parent
3cfda2c579
commit
fbed0acace
|
@ -2330,6 +2330,7 @@ void streamScanReleaseState(SOperatorInfo* pOperator) {
|
||||||
void* pBuff = taosMemoryCalloc(1, len);
|
void* pBuff = taosMemoryCalloc(1, len);
|
||||||
pInfo->stateStore.updateInfoSerialize(pBuff, len, pInfo->pUpdateInfo);
|
pInfo->stateStore.updateInfoSerialize(pBuff, len, pInfo->pUpdateInfo);
|
||||||
pInfo->stateStore.streamStateSaveInfo(pInfo->pState, STREAM_SCAN_OP_STATE_NAME, strlen(STREAM_SCAN_OP_STATE_NAME), pBuff, len);
|
pInfo->stateStore.streamStateSaveInfo(pInfo->pState, STREAM_SCAN_OP_STATE_NAME, strlen(STREAM_SCAN_OP_STATE_NAME), pBuff, len);
|
||||||
|
taosMemoryFree(pBuff);
|
||||||
}
|
}
|
||||||
|
|
||||||
void streamScanReloadState(SOperatorInfo* pOperator) {
|
void streamScanReloadState(SOperatorInfo* pOperator) {
|
||||||
|
@ -2342,6 +2343,7 @@ void streamScanReloadState(SOperatorInfo* pOperator) {
|
||||||
pInfo->stateStore.streamStateGetInfo(pInfo->pState, STREAM_SCAN_OP_STATE_NAME, strlen(STREAM_SCAN_OP_STATE_NAME), &pBuff, &len);
|
pInfo->stateStore.streamStateGetInfo(pInfo->pState, STREAM_SCAN_OP_STATE_NAME, strlen(STREAM_SCAN_OP_STATE_NAME), &pBuff, &len);
|
||||||
SUpdateInfo* pUpInfo = pInfo->stateStore.updateInfoInit(0, TSDB_TIME_PRECISION_MILLI, 0);
|
SUpdateInfo* pUpInfo = pInfo->stateStore.updateInfoInit(0, TSDB_TIME_PRECISION_MILLI, 0);
|
||||||
int32_t code = pInfo->stateStore.updateInfoDeserialize(pBuff, len, pUpInfo);
|
int32_t code = pInfo->stateStore.updateInfoDeserialize(pBuff, len, pUpInfo);
|
||||||
|
taosMemoryFree(pBuff);
|
||||||
if (code == TSDB_CODE_SUCCESS && pInfo->pUpdateInfo) {
|
if (code == TSDB_CODE_SUCCESS && pInfo->pUpdateInfo) {
|
||||||
if (pInfo->pUpdateInfo->minTS < 0) {
|
if (pInfo->pUpdateInfo->minTS < 0) {
|
||||||
pInfo->stateStore.updateInfoDestroy(pInfo->pUpdateInfo);
|
pInfo->stateStore.updateInfoDestroy(pInfo->pUpdateInfo);
|
||||||
|
|
|
@ -2747,6 +2747,7 @@ void streamIntervalReloadState(SOperatorInfo* pOperator) {
|
||||||
int32_t code = pInfo->statestore.streamStateGetInfo(pInfo->pState, STREAM_INTERVAL_OP_STATE_NAME,
|
int32_t code = pInfo->statestore.streamStateGetInfo(pInfo->pState, STREAM_INTERVAL_OP_STATE_NAME,
|
||||||
strlen(STREAM_INTERVAL_OP_STATE_NAME), &pBuf, &size);
|
strlen(STREAM_INTERVAL_OP_STATE_NAME), &pBuf, &size);
|
||||||
TSKEY ts = *(TSKEY*)pBuf;
|
TSKEY ts = *(TSKEY*)pBuf;
|
||||||
|
taosMemoryFree(pBuf);
|
||||||
pInfo->statestore.streamStateReloadInfo(pInfo->pState, ts);
|
pInfo->statestore.streamStateReloadInfo(pInfo->pState, ts);
|
||||||
}
|
}
|
||||||
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
||||||
|
@ -3677,6 +3678,7 @@ void streamSessionReloadState(SOperatorInfo* pOperator) {
|
||||||
setSessionOutputBuf(pAggSup, pSeKeyBuf[i].win.skey, pSeKeyBuf[i].win.ekey, pSeKeyBuf[i].groupId, &winInfo);
|
setSessionOutputBuf(pAggSup, pSeKeyBuf[i].win.skey, pSeKeyBuf[i].win.ekey, pSeKeyBuf[i].groupId, &winInfo);
|
||||||
compactSessionWindow(pOperator, &winInfo, pInfo->pStUpdated, pInfo->pStDeleted);
|
compactSessionWindow(pOperator, &winInfo, pInfo->pStUpdated, pInfo->pStDeleted);
|
||||||
}
|
}
|
||||||
|
taosMemoryFree(pBuf);
|
||||||
|
|
||||||
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
||||||
if (downstream->fpSet.reloadStreamStateFn) {
|
if (downstream->fpSet.reloadStreamStateFn) {
|
||||||
|
@ -4288,6 +4290,7 @@ void streamStateReloadState(SOperatorInfo* pOperator) {
|
||||||
compactStateWindow(pOperator, &curInfo.winInfo, &nextInfo.winInfo, pInfo->pStUpdated, pInfo->pStDeleted);
|
compactStateWindow(pOperator, &curInfo.winInfo, &nextInfo.winInfo, pInfo->pStUpdated, pInfo->pStDeleted);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
taosMemoryFree(pBuf);
|
||||||
|
|
||||||
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
||||||
if (downstream->fpSet.reloadStreamStateFn) {
|
if (downstream->fpSet.reloadStreamStateFn) {
|
||||||
|
|
Loading…
Reference in New Issue