fix issue

This commit is contained in:
liuyao 2023-09-22 19:25:19 +08:00
parent 4f8e6d7795
commit 5cc1ae1266
4 changed files with 56 additions and 21 deletions

View File

@ -73,6 +73,7 @@ int32_t getRowStateRowSize(SStreamFileState* pFileState);
// session window
int32_t getSessionWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, TSKEY gap, void** pVal, int32_t* pVLen);
int32_t putSessionWinResultBuff(SStreamFileState* pFileState, SRowBuffPos* pPos);
int32_t getSessionFlushedBuff(SStreamFileState* pFileState, SSessionKey* pKey, void** pVal, int32_t* pVLen);
int32_t deleteSessionWinStateBuff(void* pBuff, const void *key, size_t keyLen);

View File

@ -88,29 +88,28 @@ static SRowBuffPos* insertNewSessionWindow(SStreamFileState* pFileState, SArray*
return pNewPos;
}
int32_t getSessionWinResultBuff(SStreamFileState* pFileState, SSessionKey* key, TSKEY gap, void** pVal, int32_t* pVLen) {
SSessionKey* pWinKey = key;
int32_t getSessionWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, TSKEY gap, void** pVal, int32_t* pVLen) {
SSHashObj* pSessionBuff = getRowStateBuff(pFileState);
SArray* pWinStates = NULL;
void** ppBuff = tSimpleHashGet(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t));
void** ppBuff = tSimpleHashGet(pSessionBuff, &pKey->groupId, sizeof(uint64_t));
if (ppBuff) {
pWinStates = (SArray*)(*ppBuff);
} else {
pWinStates = taosArrayInit(16, POINTER_BYTES);
tSimpleHashPut(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t), &pWinStates, POINTER_BYTES);
tSimpleHashPut(pSessionBuff, &pKey->groupId, sizeof(uint64_t), &pWinStates, POINTER_BYTES);
}
TSKEY startTs = pWinKey->win.skey;
TSKEY endTs = pWinKey->win.ekey;
TSKEY startTs = pKey->win.skey;
TSKEY endTs = pKey->win.ekey;
int32_t size = taosArrayGetSize(pWinStates);
if (size == 0) {
(*pVal) = addNewSessionWindow(pFileState, pWinStates, key);
(*pVal) = addNewSessionWindow(pFileState, pWinStates, pKey);
goto _end;
}
// find the first position which is smaller than the pWinKey
int32_t index = binarySearch(pWinStates, size, pWinKey, sessionStateKeyCompare);
// find the first position which is smaller than the pKey
int32_t index = binarySearch(pWinStates, size, pKey, sessionStateKeyCompare);
SRowBuffPos* pPos = NULL;
if (index >= 0) {
@ -118,7 +117,7 @@ int32_t getSessionWinResultBuff(SStreamFileState* pFileState, SSessionKey* key,
if (inSessionWindow(pPos->pKey, startTs, gap)) {
(*pVal) = pPos;
SSessionKey* pDestWinKey = (SSessionKey*)pPos->pKey;
*key = *pDestWinKey;
*pKey = *pDestWinKey;
goto _end;
}
}
@ -128,7 +127,7 @@ int32_t getSessionWinResultBuff(SStreamFileState* pFileState, SSessionKey* key,
if (inSessionWindow(pPos->pKey, startTs, gap) || (endTs != INT64_MIN && inSessionWindow(pPos->pKey, endTs, gap)) ) {
(*pVal) = pPos;
SSessionKey* pDestWinKey = (SSessionKey*)pPos->pKey;
*key = *pDestWinKey;
*pKey = *pDestWinKey;
goto _end;
}
}
@ -137,7 +136,7 @@ int32_t getSessionWinResultBuff(SStreamFileState* pFileState, SSessionKey* key,
if (!isDeteled(pFileState, endTs) && isFlushedState(pFileState, endTs)) {
void* p = NULL;
void* pFileStore = getStateFileStore(pFileState);
int32_t code = streamStateSessionAddIfNotExist_rocksdb(pFileStore, pWinKey, gap, &p, pVLen);
int32_t code = streamStateSessionAddIfNotExist_rocksdb(pFileStore, pKey, gap, &p, pVLen);
SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
pNewPos->needFree = true;
@ -152,15 +151,46 @@ int32_t getSessionWinResultBuff(SStreamFileState* pFileState, SSessionKey* key,
}
if (index == size - 1) {
(*pVal) = addNewSessionWindow(pFileState, pWinStates, key);
(*pVal) = addNewSessionWindow(pFileState, pWinStates, pKey);
goto _end;
}
(*pVal) = insertNewSessionWindow(pFileState, pWinStates, key, index + 1);
(*pVal) = insertNewSessionWindow(pFileState, pWinStates, pKey, index + 1);
_end:
return (*pVal) != NULL ? TSDB_CODE_SUCCESS : TSDB_CODE_FAILED;
}
int32_t putSessionWinResultBuff(SStreamFileState* pFileState, SRowBuffPos* pPos) {
SSHashObj* pSessionBuff = getRowStateBuff(pFileState);
SSessionKey* pKey = pPos->pKey;
SArray* pWinStates = NULL;
void** ppBuff = tSimpleHashGet(pSessionBuff, &pKey->groupId, sizeof(uint64_t));
if (ppBuff) {
pWinStates = (SArray*)(*ppBuff);
} else {
pWinStates = taosArrayInit(16, POINTER_BYTES);
tSimpleHashPut(pSessionBuff, &pKey->groupId, sizeof(uint64_t), &pWinStates, POINTER_BYTES);
}
int32_t size = taosArrayGetSize(pWinStates);
if (size == 0) {
taosArrayPush(pWinStates, &pPos);
goto _end;
}
// find the first position which is smaller than the pKey
int32_t index = binarySearch(pWinStates, size, pKey, sessionStateKeyCompare);
if (index >= 0) {
taosArrayInsert(pWinStates, index, &pPos);
} else {
taosArrayInsert(pWinStates, 0, &pPos);
}
_end:
pPos->needFree = false;
return TSDB_CODE_SUCCESS;
}
int32_t getSessionFlushedBuff(SStreamFileState* pFileState, SSessionKey* pKey, void** pVal, int32_t* pVLen) {
SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
pNewPos->needFree = true;

View File

@ -714,15 +714,19 @@ void streamStateFreeVal(void* val) {
int32_t streamStateSessionPut(SStreamState* pState, const SSessionKey* key, void* value, int32_t vLen) {
#ifdef USE_ROCKSDB
int32_t code = TSDB_CODE_SUCCESS;
SRowBuffPos* pos = (SRowBuffPos*)value;
if (pos->needFree) {
int32_t code = streamStateSessionPut_rocksdb(pState, key, pos->pRowBuff, vLen);
streamStateReleaseBuf(pState, pos, true);
qDebug("===stream===save skey:%" PRId64 ", ekey:%" PRId64 ", groupId:%" PRIu64 ".code:%d", key->win.skey,
key->win.ekey, key->groupId, code);
return code;
if (isFlushedState(pState->pFileState, key->win.ekey)) {
code = streamStateSessionPut_rocksdb(pState, key, pos->pRowBuff, vLen);
streamStateReleaseBuf(pState, pos, true);
qDebug("===stream===save skey:%" PRId64 ", ekey:%" PRId64 ", groupId:%" PRIu64 ".code:%d", key->win.skey,
key->win.ekey, key->groupId, code);
} else {
code = putSessionWinResultBuff(pState->pFileState, value);
}
}
return TSDB_CODE_SUCCESS;
return code;
#else
SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
return tdbTbUpsert(pState->pTdbState->pSessionStateDb, &sKey, sizeof(SStateSessionKey), value, vLen,

View File

@ -660,7 +660,7 @@ void* getStateFileStore(SStreamFileState* pFileState) {
}
bool isDeteled(SStreamFileState* pFileState, TSKEY ts) {
return ts < (pFileState->maxTs - pFileState->deleteMark);
return pFileState->deleteMark > 0 && ts < (pFileState->maxTs - pFileState->deleteMark);
}
bool isFlushedState(SStreamFileState* pFileState, TSKEY ts) {