diff --git a/include/libs/stream/streamState.h b/include/libs/stream/streamState.h index ec68e8ed71..3d59a69dea 100644 --- a/include/libs/stream/streamState.h +++ b/include/libs/stream/streamState.h @@ -68,9 +68,10 @@ int32_t streamStateSessionClear(SStreamState* pState); int32_t streamStateSessionGetKVByCur(SStreamStateCur* pCur, SSessionKey* pKey, const void** pVal, int32_t* pVLen); int32_t streamStateStateAddIfNotExist(SStreamState* pState, SSessionKey* key, char* pKeyData, int32_t keyDataLen, state_key_cmpr_fn fn, void** pVal, int32_t* pVLen); +int32_t streamStateSessionGetKey(SStreamState* pState, const SSessionKey* key, SSessionKey* curKey); SStreamStateCur* streamStateSessionSeekKeyNext(SStreamState* pState, const SSessionKey* key); -SStreamStateCur* streamStateSessionSeekKeyPrev(SStreamState* pState, const SSessionKey* key); +SStreamStateCur* streamStateSessionSeekKeyCurrentPrev(SStreamState* pState, const SSessionKey* key); SStreamStateCur* streamStateSessionGetCur(SStreamState* pState, const SSessionKey* key); int32_t streamStateFillPut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen); diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 505654d967..15cad81d23 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -3594,9 +3594,7 @@ void getCurSessionWindow(SStreamAggSupporter* pAggSup, TSKEY startTs, TSKEY endT pKey->win.skey = startTs; pKey->win.ekey = endTs; pKey->groupId = groupId; - SStreamStateCur* pCur = streamStateSessionGetCur(pAggSup->pState, pKey); - int32_t code = streamStateSessionGetKVByCur(pCur, pKey, NULL, 0); - streamStateFreeCur(pCur); + int32_t code = streamStateSessionGetKey(pAggSup->pState, pKey, pKey); if (code != TSDB_CODE_SUCCESS) { SET_SESSION_WIN_KEY_INVALID(pKey); } diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index c3c58e125a..4b3affa9de 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -526,7 +526,7 @@ int32_t streamStateSessionDel(SStreamState* pState, const SSessionKey* key) { return tdbTbDelete(pState->pSessionStateDb, &sKey, sizeof(SStateSessionKey), &pState->txn); } -SStreamStateCur* streamStateSessionSeekKeyPrev(SStreamState* pState, const SSessionKey* key) { +SStreamStateCur* streamStateSessionSeekKeyCurrentPrev(SStreamState* pState, const SSessionKey* key) { SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); if (pCur == NULL) { return NULL; @@ -544,7 +544,7 @@ SStreamStateCur* streamStateSessionSeekKeyPrev(SStreamState* pState, const SSess streamStateFreeCur(pCur); return NULL; } - if (c > 0) return pCur; + if (c >= 0) return pCur; if (tdbTbcMoveToPrev(pCur->pCur) < 0) { streamStateFreeCur(pCur); @@ -572,7 +572,7 @@ SStreamStateCur* streamStateSessionSeekKeyNext(SStreamState* pState, const SSess streamStateFreeCur(pCur); return NULL; } - if (c > 0) return pCur; + if (c < 0) return pCur; if (tdbTbcMoveToNext(pCur->pCur) < 0) { streamStateFreeCur(pCur); @@ -630,7 +630,7 @@ SStreamStateCur* streamStateSessionGetCur(SStreamState* pState, const SSessionKe streamStateCurPrev(pState, pCur); SSessionKey tmpKey = *key; int32_t code = streamStateSessionGetKVByCur(pCur, &tmpKey, NULL, 0); - if (code == TSDB_CODE_SUCCESS && sessionKeyCmpr(key, &tmpKey) == 0) { + if (code == 0 && sessionKeyCmpr(key, &tmpKey) == 0) { resKey = tmpKey; } else { break; @@ -640,9 +640,28 @@ SStreamStateCur* streamStateSessionGetCur(SStreamState* pState, const SSessionKe return streamStateSessionGetRanomCur(pState, &resKey); } +int32_t streamStateSessionGetKey(SStreamState* pState, const SSessionKey* key, SSessionKey* curKey) { + SStreamStateCur* pCur = streamStateSessionGetRanomCur(pState, key); + SSessionKey resKey = *key; + int32_t res = -1; + while (1) { + SSessionKey tmpKey = *key; + int32_t code = streamStateSessionGetKVByCur(pCur, &tmpKey, NULL, 0); + if (code == 0 && sessionKeyCmpr(key, &tmpKey) == 0) { + res = 0; + resKey = tmpKey; + } else { + break; + } + streamStateCurPrev(pState, pCur); + } + *curKey = resKey; + return res; +} + int32_t streamStateSessionAddIfNotExist(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen) { // todo refactor - SStreamStateCur* pCur = streamStateSessionGetCur(pState, key); + SStreamStateCur* pCur = streamStateSessionGetRanomCur(pState, key); int32_t size = *pVLen; void* tmp = NULL; *pVal = tdbRealloc(NULL, size); @@ -659,7 +678,7 @@ int32_t streamStateSessionAddIfNotExist(SStreamState* pState, SSessionKey* key, int32_t streamStateStateAddIfNotExist(SStreamState* pState, SSessionKey* key, char* pKeyData, int32_t keyDataLen, state_key_cmpr_fn fn, void** pVal, int32_t* pVLen) { // todo refactor - int32_t res = TSDB_CODE_SUCCESS; + int32_t res = 0; SSessionKey tmpKey = *key; int32_t valSize = *pVLen; void* tmp = tdbRealloc(NULL, valSize); @@ -667,21 +686,14 @@ int32_t streamStateStateAddIfNotExist(SStreamState* pState, SSessionKey* key, ch return -1; } - SStreamStateCur* pCur = streamStateSessionGetRanomCur(pState, key); + SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentPrev(pState, key); int32_t code = streamStateSessionGetKVByCur(pCur, key, (const void**)pVal, pVLen); - if (code == TSDB_CODE_SUCCESS) { - memcpy(tmp, *pVal, valSize); - *pVal = tmp; - streamStateFreeCur(pCur); - return res; - } - streamStateFreeCur(pCur); + if (code == 0) { + if (key->win.skey <= tmpKey.win.skey && tmpKey.win.ekey <= key->win.ekey) { + memcpy(tmp, *pVal, valSize); + goto _end; + } - streamStateSessionPut(pState, key, NULL, 0); - pCur = streamStateSessionGetRanomCur(pState, key); - streamStateCurPrev(pState, pCur); - code = streamStateSessionGetKVByCur(pCur, key, (const void**)pVal, pVLen); - if (code == TSDB_CODE_SUCCESS) { void* stateKey = (char*)(*pVal) + (valSize - keyDataLen); if (fn(pKeyData, stateKey) == true) { memcpy(tmp, *pVal, valSize); @@ -689,11 +701,9 @@ int32_t streamStateStateAddIfNotExist(SStreamState* pState, SSessionKey* key, ch } } - streamStateFreeCur(pCur); - *key = tmpKey; - pCur = streamStateSessionSeekKeyNext(pState, key); + streamStateCurNext(pState, pCur); code = streamStateSessionGetKVByCur(pCur, key, (const void**)pVal, pVLen); - if (code == TSDB_CODE_SUCCESS) { + if (code == 0) { void* stateKey = (char*)(*pVal) + (valSize - keyDataLen); if (fn(pKeyData, stateKey) == true) { memcpy(tmp, *pVal, valSize); @@ -708,7 +718,6 @@ int32_t streamStateStateAddIfNotExist(SStreamState* pState, SSessionKey* key, ch _end: *pVal = tmp; - streamStateSessionDel(pState, &tmpKey); streamStateFreeCur(pCur); return res; }