set key for session window state

This commit is contained in:
54liuyao 2024-07-10 09:01:39 +08:00
parent 92089e2d86
commit dcb8673063
2 changed files with 2 additions and 2 deletions

View File

@ -3121,6 +3121,7 @@ _error:
static void clearStreamSessionOperator(SStreamSessionAggOperatorInfo* pInfo) { static void clearStreamSessionOperator(SStreamSessionAggOperatorInfo* pInfo) {
tSimpleHashClear(pInfo->streamAggSup.pResultRows); tSimpleHashClear(pInfo->streamAggSup.pResultRows);
pInfo->streamAggSup.stateStore.streamStateSessionClear(pInfo->streamAggSup.pState); pInfo->streamAggSup.stateStore.streamStateSessionClear(pInfo->streamAggSup.pState);
pInfo->clearState = false;
} }
void deleteSessionWinState(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock, SSHashObj* pMapUpdate, void deleteSessionWinState(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock, SSHashObj* pMapUpdate,
@ -3170,7 +3171,6 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
// semi session operator clear disk buffer // semi session operator clear disk buffer
clearStreamSessionOperator(pInfo); clearStreamSessionOperator(pInfo);
setStreamOperatorCompleted(pOperator); setStreamOperatorCompleted(pOperator);
pInfo->clearState = false;
return NULL; return NULL;
} }
} }

View File

@ -242,7 +242,6 @@ _end:
int32_t getSessionFlushedBuff(SStreamFileState* pFileState, SSessionKey* pKey, void** pVal, int32_t* pVLen) { int32_t getSessionFlushedBuff(SStreamFileState* pFileState, SSessionKey* pKey, void** pVal, int32_t* pVLen) {
SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState); SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
memcpy(pNewPos->pKey, pKey, sizeof(SSessionKey));
pNewPos->needFree = true; pNewPos->needFree = true;
pNewPos->beFlushed = true; pNewPos->beFlushed = true;
void* pBuff = NULL; void* pBuff = NULL;
@ -250,6 +249,7 @@ int32_t getSessionFlushedBuff(SStreamFileState* pFileState, SSessionKey* pKey, v
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
memcpy(pNewPos->pKey, pKey, sizeof(SSessionKey));
memcpy(pNewPos->pRowBuff, pBuff, *pVLen); memcpy(pNewPos->pRowBuff, pBuff, *pVLen);
taosMemoryFreeClear(pBuff); taosMemoryFreeClear(pBuff);
(*pVal) = pNewPos; (*pVal) = pNewPos;