fix issuse

This commit is contained in:
liuyao 2023-10-07 10:34:35 +08:00
parent ebf4283e91
commit 59d47e1e1f
2 changed files with 54 additions and 29 deletions

View File

@ -3456,12 +3456,17 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
void streamStateReleaseState(SOperatorInfo* pOperator) {
SStreamStateAggOperatorInfo* pInfo = pOperator->info;
int32_t resSize = taosArrayGetSize(pInfo->historyWins) * sizeof(SSessionKey);
int32_t winSize = taosArrayGetSize(pInfo->historyWins) * sizeof(SSessionKey);
int32_t resSize = winSize + sizeof(TSKEY);
char* pBuff = taosMemoryCalloc(1, resSize);
memcpy(pBuff, pInfo->historyWins->pData, winSize);
memcpy(pBuff + winSize, &pInfo->twAggSup.maxTs, sizeof(TSKEY));
qDebug("===stream=== relase state. save result count:%d", (int32_t)taosArrayGetSize(pInfo->historyWins));
pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_STATE_OP_STATE_NAME,
strlen(STREAM_STATE_OP_STATE_NAME), pInfo->historyWins->pData,
resSize);
strlen(STREAM_STATE_OP_STATE_NAME), pBuff, resSize);
pInfo->streamAggSup.stateStore.streamStateCommit(pInfo->streamAggSup.pState);
taosMemoryFreeClear(pBuff);
SOperatorInfo* downstream = pOperator->pDownstream[0];
if (downstream->fpSet.releaseStreamStateFn) {
downstream->fpSet.releaseStreamStateFn(downstream);
@ -3507,10 +3512,15 @@ void streamStateReloadState(SOperatorInfo* pOperator) {
void* pBuf = NULL;
int32_t code = pAggSup->stateStore.streamStateGetInfo(pAggSup->pState, STREAM_STATE_OP_STATE_NAME,
strlen(STREAM_STATE_OP_STATE_NAME), &pBuf, &size);
int32_t num = size / sizeof(SSessionKey);
int32_t num = (size - sizeof(TSKEY)) / sizeof(SSessionKey);
qDebug("===stream=== reload state. get result count:%d", num);
SSessionKey* pSeKeyBuf = (SSessionKey*)pBuf;
ASSERT(size == num * sizeof(SSessionKey));
ASSERT(size == num * sizeof(SSessionKey) + sizeof(TSKEY));
TSKEY ts = *(TSKEY*)((char*)pBuf + size - sizeof(TSKEY));
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, ts);
pAggSup->stateStore.streamStateReloadInfo(pAggSup->pState, ts);
if (!pInfo->pSeUpdated && num > 0) {
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
pInfo->pSeUpdated = tSimpleHashInit(64, hashFn);

View File

@ -88,6 +88,15 @@ static SRowBuffPos* insertNewSessionWindow(SStreamFileState* pFileState, SArray*
return pNewPos;
}
SRowBuffPos* createSessionWinBuff(SStreamFileState* pFileState, SSessionKey* pKey, void* p, int32_t* pVLen) {
SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
memcpy(pNewPos->pKey, pKey, sizeof(SSessionKey));
pNewPos->needFree = true;
memcpy(pNewPos->pRowBuff, p, *pVLen);
taosMemoryFree(p);
return pNewPos;
}
int32_t getSessionWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, TSKEY gap, void** pVal, int32_t* pVLen) {
int32_t code = TSDB_CODE_SUCCESS;
SSHashObj* pSessionBuff = getRowStateBuff(pFileState);
@ -105,8 +114,17 @@ int32_t getSessionWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey,
int32_t size = taosArrayGetSize(pWinStates);
if (size == 0) {
(*pVal) = addNewSessionWindow(pFileState, pWinStates, pKey);
code = TSDB_CODE_FAILED;
void* pFileStore = getStateFileStore(pFileState);
void* p = NULL;
int32_t code_file = streamStateSessionAddIfNotExist_rocksdb(pFileStore, pKey, gap, &p, pVLen);
if (code_file == TSDB_CODE_SUCCESS) {
(*pVal) = createSessionWinBuff(pFileState, pKey, p, pVLen);
code = code_file;
qDebug("===stream===0 get session win:%" PRId64 ",%" PRId64 " from disc, res %d", startTs, endTs, code_file);
} else {
(*pVal) = addNewSessionWindow(pFileState, pWinStates, pKey);
code = TSDB_CODE_FAILED;
}
goto _end;
}
@ -140,21 +158,14 @@ int32_t getSessionWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey,
if (!isDeteled(pFileState, endTs) && isFlushedState(pFileState, endTs, gap)) {
void* p = NULL;
void* pFileStore = getStateFileStore(pFileState);
SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
int32_t code_file = streamStateSessionAddIfNotExist_rocksdb(pFileStore, pKey, gap, &p, pVLen);
if (code_file == TSDB_CODE_SUCCESS || isFlushedState(pFileState, endTs, 0)) {
memcpy(pNewPos->pKey, pKey, sizeof(SSessionKey));
pNewPos->needFree = true;
qDebug("===stream===get session win:%" PRId64 ",%" PRId64 " from disc, res %d", startTs, endTs, code_file);
memcpy(pNewPos->pRowBuff, p, *pVLen);
taosMemoryFree(p);
(*pVal) = pNewPos;
(*pVal) = createSessionWinBuff(pFileState, pKey, p, pVLen);
code = code_file;
qDebug("===stream===1 get session win:%" PRId64 ",%" PRId64 " from disc, res %d", startTs, endTs, code_file);
goto _end;
} else {
taosMemoryFree(p);
streamFileStateReleaseBuff(pFileState, pNewPos, false);
}
}
}
@ -479,8 +490,17 @@ int32_t getStateWinResultBuff(SStreamFileState* pFileState, SSessionKey* key, ch
int32_t size = taosArrayGetSize(pWinStates);
if (size == 0) {
(*pVal) = addNewSessionWindow(pFileState, pWinStates, key);
code = TSDB_CODE_FAILED;
void* pFileStore = getStateFileStore(pFileState);
void* p = NULL;
int32_t code_file = streamStateStateAddIfNotExist_rocksdb(pFileStore, pWinKey, pKeyData, keyDataLen, fn, &p, pVLen);
if (code_file == TSDB_CODE_SUCCESS) {
(*pVal) = createSessionWinBuff(pFileState, pWinKey, p, pVLen);
code = code_file;
qDebug("===stream===0 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey, pWinKey->win.ekey, code_file);
} else {
(*pVal) = addNewSessionWindow(pFileState, pWinStates, key);
code = TSDB_CODE_FAILED;
}
goto _end;
}
@ -515,22 +535,17 @@ int32_t getStateWinResultBuff(SStreamFileState* pFileState, SSessionKey* key, ch
if (index + 1 == 0) {
if (!isDeteled(pFileState, endTs)) {
void* p = NULL;
void* pFileStore = getStateFileStore(pFileState);
SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
pNewPos->needFree = true;
int32_t code_file = streamStateStateAddIfNotExist_rocksdb(pFileStore, pWinKey, pKeyData, keyDataLen, fn, &p, pVLen);
void* p = NULL;
void* pFileStore = getStateFileStore(pFileState);
int32_t code_file =
streamStateStateAddIfNotExist_rocksdb(pFileStore, pWinKey, pKeyData, keyDataLen, fn, &p, pVLen);
if (code_file == TSDB_CODE_SUCCESS || isFlushedState(pFileState, endTs, 0)) {
memcpy(pNewPos->pKey, pWinKey, sizeof(SSessionKey));
qDebug("===stream===get session win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey, pWinKey->win.ekey, code_file);
memcpy(pNewPos->pRowBuff, p, *pVLen);
(*pVal) = pNewPos;
taosMemoryFree(p);
(*pVal) = createSessionWinBuff(pFileState, pWinKey, p, pVLen);
code = code_file;
qDebug("===stream===1 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey, pWinKey->win.ekey, code_file);
goto _end;
} else {
taosMemoryFree(p);
streamFileStateReleaseBuff(pFileState, pNewPos, false);
}
}
}