adj stream count state buff

This commit is contained in:
54liuyao 2024-06-03 15:20:05 +08:00
parent d7ecd98cf9
commit ca7e1344e1
1 changed files with 35 additions and 17 deletions

View File

@ -734,6 +734,21 @@ _end:
return code; return code;
} }
int32_t getCountWinStateFromDisc(SStreamState* pState, SSessionKey* pKey, void** pVal, int32_t* pVLen) {
SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext_rocksdb(pState, pKey);
int32_t code = streamStateSessionGetKVByCur_rocksdb(pCur, pKey, pVal, pVLen);
streamStateFreeCur(pCur);
if (code == TSDB_CODE_SUCCESS) {
return code;
} else {
pCur = streamStateSessionSeekKeyPrev_rocksdb(pState, pKey);
}
code = streamStateSessionGetKVByCur_rocksdb(pCur, pKey, pVal, pVLen);
streamStateFreeCur(pCur);
return code;
}
int32_t getCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, COUNT_TYPE winCount, void** pVal, int32_t* pVLen) { int32_t getCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, COUNT_TYPE winCount, void** pVal, int32_t* pVLen) {
SSessionKey* pWinKey = pKey; SSessionKey* pWinKey = pKey;
const TSKEY gap = 0; const TSKEY gap = 0;
@ -755,11 +770,10 @@ int32_t getCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, C
if (size == 0) { if (size == 0) {
void* pFileStore = getStateFileStore(pFileState); void* pFileStore = getStateFileStore(pFileState);
void* pRockVal = NULL; void* pRockVal = NULL;
SStreamStateCur* pCur = streamStateSessionSeekToLast_rocksdb(pFileStore, pKey->groupId); code = getCountWinStateFromDisc(pFileStore, pWinKey, &pRockVal, pVLen);
code = streamStateSessionGetKVByCur_rocksdb(pCur, pWinKey, &pRockVal, pVLen);
streamStateFreeCur(pCur);
if (code == TSDB_CODE_SUCCESS || isFlushedState(pFileState, endTs, 0)) { if (code == TSDB_CODE_SUCCESS || isFlushedState(pFileState, endTs, 0)) {
qDebug("===stream===0 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey, pWinKey->win.ekey, code); qDebug("===stream===0 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey,
pWinKey->win.ekey, code);
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
int32_t valSize = *pVLen; int32_t valSize = *pVLen;
COUNT_TYPE* pWinStateCout = (COUNT_TYPE*)((char*)(pRockVal) + (valSize - sizeof(COUNT_TYPE))); COUNT_TYPE* pWinStateCout = (COUNT_TYPE*)((char*)(pRockVal) + (valSize - sizeof(COUNT_TYPE)));
@ -798,20 +812,24 @@ int32_t getCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, C
} }
if (index == -1) { if (index == -1) {
if (!isDeteled(pFileState, endTs)) { if (!isDeteled(pFileState, endTs) && isFlushedState(pFileState, endTs, 0)) {
void* p = NULL; SSessionKey tmpKey = *pWinKey;
void* pRockVal = NULL;
void* pFileStore = getStateFileStore(pFileState); void* pFileStore = getStateFileStore(pFileState);
SStreamStateCur* pCur = streamStateSessionSeekToLast_rocksdb(pFileStore, pKey->groupId); int32_t code_file = getCountWinStateFromDisc(pFileStore, &tmpKey, &pRockVal, pVLen);
int32_t code_file = streamStateSessionGetKVByCur_rocksdb(pCur, pWinKey, &p, pVLen);
if (code_file == TSDB_CODE_SUCCESS) { if (code_file == TSDB_CODE_SUCCESS) {
(*pVal) = createSessionWinBuff(pFileState, pWinKey, p, pVLen); SRowBuffPos* pFirstPos = taosArrayGetP(pWinStates, 0);
SSessionKey* pFirstWinKey = (SSessionKey*)pFirstPos->pKey;
if (tmpKey.win.ekey < pFirstWinKey->win.skey) {
*pWinKey = tmpKey;
(*pVal) = createSessionWinBuff(pFileState, pWinKey, pRockVal, pVLen);
code = code_file; code = code_file;
qDebug("===stream===1 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey, pWinKey->win.ekey, code_file); qDebug("===stream===1 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey,
streamStateFreeCur(pCur); pWinKey->win.ekey, code_file);
goto _end; goto _end;
} }
taosMemoryFree(p); }
streamStateFreeCur(pCur); taosMemoryFree(pRockVal);
} }
} }