fix issue

This commit is contained in:
liuyao 2023-09-22 15:00:26 +08:00
parent dc2f7872f8
commit 410a177637
4 changed files with 47 additions and 23 deletions

View File

@ -80,6 +80,7 @@ SStreamStateCur* streamStateSeekKeyNext(SStreamState* pState, const SWinKey* key
SStreamStateCur* streamStateFillSeekKeyNext(SStreamState* pState, const SWinKey* key);
SStreamStateCur* streamStateFillSeekKeyPrev(SStreamState* pState, const SWinKey* key);
void streamStateFreeCur(SStreamStateCur* pCur);
void streamStateResetCur(SStreamStateCur* pCur);
int32_t streamStateGetGroupKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen);
int32_t streamStateGetKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen);

View File

@ -2680,6 +2680,9 @@ void streamSessionReloadState(SOperatorInfo* pOperator) {
for (int32_t i = 0; i < num; i++) {
SResultWindowInfo winInfo = {0};
getSessionWindowInfoByKey(pAggSup, pSeKeyBuf + i, &winInfo);
if (!IS_VALID_SESSION_WIN(winInfo)) {
continue;
}
int32_t winNum = compactSessionWindow(pOperator, &winInfo, pInfo->pStUpdated, pInfo->pStDeleted, true);
if (winNum > 0) {
qDebug("===stream=== reload state. save result %" PRId64 ", %" PRIu64, winInfo.sessionWin.win.skey,

View File

@ -266,11 +266,11 @@ SStreamStateCur* sessionWinStateSeekKeyCurrentPrev(SStreamFileState* pFileState,
pCur->pStreamFileState = pFileState;
return pCur;
}
static void transformCursor(SStreamFileState* pFileState, SStreamStateCur** ppCur) {
SStreamStateCur* pCur = *ppCur;
streamStateFreeCur(pCur);
pCur = createStreamStateCursor();
(*ppCur) = pCur;
static void transformCursor(SStreamFileState* pFileState, SStreamStateCur* pCur) {
if (!pCur) {
return;
}
streamStateResetCur(pCur);
pCur->buffIndex = 0;
pCur->pStreamFileState = pFileState;
}
@ -278,12 +278,14 @@ static void transformCursor(SStreamFileState* pFileState, SStreamStateCur** ppCu
static void checkAndTransformCursor(SStreamFileState* pFileState, const uint64_t groupId, SArray* pWinStates, SStreamStateCur** ppCur) {
SSessionKey key = {.groupId = groupId};
int32_t code = streamStateSessionGetKVByCur_rocksdb(*ppCur, &key, NULL, NULL);
if (code == TSDB_CODE_FAILED || sessionStateKeyCompare(&key, pWinStates, 0) >= 0) {
transformCursor(pFileState, ppCur);
} else {
SStreamStateCur* pCur = *ppCur;
pCur->buffIndex = -1;
pCur->pStreamFileState = pFileState;
if (taosArrayGetSize(pWinStates) > 0 && (code == TSDB_CODE_FAILED || sessionStateKeyCompare(&key, pWinStates, 0) >= 0)) {
if ( !(*ppCur) ) {
(*ppCur) = createStreamStateCursor();
}
transformCursor(pFileState, *ppCur);
} else if (*ppCur) {
(*ppCur)->buffIndex = -1;
(*ppCur)->pStreamFileState = pFileState;
}
}
@ -323,6 +325,7 @@ int32_t sessionWinStateGetKVByCur(SStreamStateCur* pCur, SSessionKey* pKey, void
if (!pCur) {
return TSDB_CODE_FAILED;
}
int32_t code = TSDB_CODE_SUCCESS;
SSHashObj* pSessionBuff = getRowStateBuff(pCur->pStreamFileState);
void** ppBuff = tSimpleHashGet(pSessionBuff, &pKey->groupId, sizeof(uint64_t));
@ -342,20 +345,25 @@ int32_t sessionWinStateGetKVByCur(SStreamStateCur* pCur, SSessionKey* pKey, void
}
*pKey = *(SSessionKey*)(pPos->pKey);
} else {
int32_t code = streamStateSessionGetKVByCur_rocksdb(pCur, pKey, pVal, pVLen);
if (code == TSDB_CODE_FAILED || sessionStateKeyCompare(pKey, pWinStates, 0) >= 0) {
transformCursor(pCur->pStreamFileState, &pCur);
if (pCur->buffIndex >= size) {
return TSDB_CODE_FAILED;
}
void* pData = NULL;
code = streamStateSessionGetKVByCur_rocksdb(pCur, pKey, &pData, pVLen);
if (taosArrayGetSize(pWinStates) > 0 && (code == TSDB_CODE_FAILED || sessionStateKeyCompare(pKey, pWinStates, 0) >= 0)) {
transformCursor(pCur->pStreamFileState, pCur);
SRowBuffPos* pPos = taosArrayGetP(pWinStates, pCur->buffIndex);
if (pVal) {
*pVal = pPos;
}
*pKey = *(SSessionKey*)(pPos->pKey);
code = TSDB_CODE_SUCCESS;
} else if (code == TSDB_CODE_SUCCESS && pVal) {
SRowBuffPos* pNewPos = getNewRowPosForWrite(pCur->pStreamFileState);
pNewPos->needFree = true;
memcpy(pNewPos->pRowBuff, pData, *pVLen);
(*pVal) = pNewPos;
}
taosMemoryFreeClear(pData);
}
return TSDB_CODE_SUCCESS;
return code;
}
int32_t sessionWinStateMoveToNext(SStreamStateCur* pCur) {

View File

@ -678,17 +678,29 @@ int32_t streamStateCurPrev(SStreamState* pState, SStreamStateCur* pCur) {
return tdbTbcMoveToPrev(pCur->pCur);
#endif
}
void streamStateResetCur(SStreamStateCur* pCur) {
if (!pCur) {
return;
}
if (pCur->iter) rocksdb_iter_destroy(pCur->iter);
if (pCur->snapshot) rocksdb_release_snapshot(pCur->db, pCur->snapshot);
if (pCur->readOpt) rocksdb_readoptions_destroy(pCur->readOpt);
tdbTbcClose(pCur->pCur);
memset(pCur, 0, sizeof(SStreamStateCur));
pCur->buffIndex = -1;
}
void streamStateFreeCur(SStreamStateCur* pCur) {
if (!pCur || pCur->buffIndex >= 0) {
taosMemoryFree(pCur);
return;
}
qDebug("streamStateFreeCur");
rocksdb_iter_destroy(pCur->iter);
if (pCur->snapshot) rocksdb_release_snapshot(pCur->db, pCur->snapshot);
rocksdb_readoptions_destroy(pCur->readOpt);
tdbTbcClose(pCur->pCur);
streamStateResetCur(pCur);
taosMemoryFree(pCur);
}