diff --git a/source/libs/stream/src/streamStateRocksdb.c b/source/libs/stream/src/streamStateRocksdb.c index 7b2f8d28f5..f11420edfb 100644 --- a/source/libs/stream/src/streamStateRocksdb.c +++ b/source/libs/stream/src/streamStateRocksdb.c @@ -497,6 +497,36 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pSta return pCur; } +SStreamStateCur* streamStateSessionSeekKeyNext_rocksdb(SStreamState* pState, const SSessionKey* key) { + SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); + if (pCur == NULL) { + return NULL; + } + pCur->iter = + rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->ropts, pState->pTdbState->pHandle[2]); + pCur->number = pState->number; + SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; + char buf[128] = {0}; + stateSessionKeyEncode(&sKey, buf); + rocksdb_iter_seek(pCur->iter, (const char*)buf, sizeof(sKey)); + if (!rocksdb_iter_valid(pCur->iter)) { + streamStateFreeCur(pCur); + return NULL; + } + size_t klen; + const char* iKey = rocksdb_iter_key(pCur->iter, &klen); + SStateSessionKey curKey = {0}; + stateSessionKeyDecode(&curKey, (char*)iKey); + if (stateSessionKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) < 0) return pCur; + + rocksdb_iter_next(pCur->iter); + if (!rocksdb_iter_valid(pCur->iter)) { + streamStateFreeCur(pCur); + return NULL; + } + return pCur; +} + int32_t streamStateSessionGetKVByCur_rocksdb(SStreamStateCur* pCur, SSessionKey* pKey, void** pVal, int32_t* pVLen) { if (!pCur) { return -1; @@ -512,8 +542,8 @@ int32_t streamStateSessionGetKVByCur_rocksdb(SStreamStateCur* pCur, SSessionKey* stateSessionKeyDecode((void*)&ktmp, (char*)curKey); const char* val = rocksdb_iter_value(pCur->iter, (size_t*)&vLen); - *pVal = (char*)val; - *pVLen = vLen; + if (pVal != NULL) *pVal = (char*)val; + if (pVLen != NULL) *pVLen = vLen; if (pKTmp->opNum != pCur->number) { return -1; @@ -532,6 +562,62 @@ int32_t streamStateCurNext_rocksdb(SStreamState* pState, SStreamStateCur* pCur) rocksdb_iter_next(pCur->iter); return 0; } +int32_t streamStateSessionGetKeyByRange_rocksdb(SStreamState* pState, const SSessionKey* key, SSessionKey* curKey) { + SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); + if (pCur == NULL) { + return -1; + } + pCur->number = pState->number; + pCur->iter = + rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->ropts, pState->pTdbState->pHandle[2]); + + SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; + int32_t c = 0; + char buf[128] = {0}; + stateSessionKeyEncode(&sKey, buf); + rocksdb_iter_seek(pCur->iter, buf, sizeof(sKey)); + if (!rocksdb_iter_valid(pCur->iter)) { + streamStateFreeCur(pCur); + return -1; + } + + int32_t kLen; + const char* iKeyStr = rocksdb_iter_key(pCur->iter, (size_t*)&kLen); + SStateSessionKey iKey = {0}; + stateSessionKeyDecode(&iKey, (char*)iKeyStr); + + c = stateSessionKeyCmpr(&sKey, sizeof(sKey), &iKey, sizeof(iKey)); + + SSessionKey resKey = *key; + int32_t code = streamStateSessionGetKVByCur_rocksdb(pCur, &resKey, NULL, 0); + if (code == 0 && sessionRangeKeyCmpr(key, &resKey) == 0) { + *curKey = resKey; + streamStateFreeCur(pCur); + return code; + } + + if (c > 0) { + streamStateCurNext_rocksdb(pState, pCur); + code = streamStateSessionGetKVByCur_rocksdb(pCur, &resKey, NULL, 0); + if (code == 0 && sessionRangeKeyCmpr(key, &resKey) == 0) { + *curKey = resKey; + streamStateFreeCur(pCur); + return code; + } + } else if (c < 0) { + streamStateCurPrev(pState, pCur); + code = streamStateSessionGetKVByCur_rocksdb(pCur, &resKey, NULL, 0); + if (code == 0 && sessionRangeKeyCmpr(key, &resKey) == 0) { + *curKey = resKey; + streamStateFreeCur(pCur); + return code; + } + } + + streamStateFreeCur(pCur); + return -1; +} + int32_t streamStateSessionGet_rocksdb(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen) { int code = 0; SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext_rocksdb(pState, key); @@ -558,6 +644,107 @@ int32_t streamStateSessionDel_rocksdb(SStreamState* pState, const SSessionKey* k STREAM_STATE_DEL_ROCKSDB(pState, "sess", key); return code; } +int32_t streamStateSessionAddIfNotExist_rocksdb(SStreamState* pState, SSessionKey* key, TSKEY gap, void** pVal, + int32_t* pVLen) { + // todo refactor + int32_t res = 0; + SSessionKey originKey = *key; + SSessionKey searchKey = *key; + searchKey.win.skey = key->win.skey - gap; + searchKey.win.ekey = key->win.ekey + gap; + int32_t valSize = *pVLen; + + void* tmp = taosMemoryMalloc(valSize); + + SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentPrev_rocksdb(pState, key); + int32_t code = streamStateSessionGetKVByCur_rocksdb(pCur, key, pVal, pVLen); + if (code == 0) { + if (sessionRangeKeyCmpr(&searchKey, key) == 0) { + memcpy(tmp, *pVal, valSize); + streamStateSessionDel_rocksdb(pState, key); + goto _end; + } + streamStateCurNext_rocksdb(pState, pCur); + } else { + *key = originKey; + streamStateFreeCur(pCur); + pCur = streamStateSessionSeekKeyNext_rocksdb(pState, key); + } + + code = streamStateSessionGetKVByCur_rocksdb(pCur, key, pVal, pVLen); + if (code == 0) { + if (sessionRangeKeyCmpr(&searchKey, key) == 0) { + memcpy(tmp, *pVal, valSize); + streamStateSessionDel_rocksdb(pState, key); + goto _end; + } + } + + *key = originKey; + res = 1; + memset(tmp, 0, valSize); + +_end: + + *pVal = tmp; + streamStateFreeCur(pCur); + return res; +} +int32_t streamStateStateAddIfNotExist_rocksdb(SStreamState* pState, SSessionKey* key, char* pKeyData, + int32_t keyDataLen, state_key_cmpr_fn fn, void** pVal, int32_t* pVLen) { + // todo refactor + int32_t res = 0; + SSessionKey tmpKey = *key; + int32_t valSize = *pVLen; + void* tmp = taosMemoryMalloc(valSize); + // tdbRealloc(NULL, valSize); + if (!tmp) { + return -1; + } + + SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentPrev_rocksdb(pState, key); + int32_t code = streamStateSessionGetKVByCur_rocksdb(pCur, key, pVal, pVLen); + if (code == 0) { + if (key->win.skey <= tmpKey.win.skey && tmpKey.win.ekey <= key->win.ekey) { + memcpy(tmp, *pVal, valSize); + streamStateSessionDel(pState, key); + goto _end; + } + + void* stateKey = (char*)(*pVal) + (valSize - keyDataLen); + if (fn(pKeyData, stateKey) == true) { + memcpy(tmp, *pVal, valSize); + streamStateSessionDel(pState, key); + goto _end; + } + + streamStateCurNext_rocksdb(pState, pCur); + } else { + *key = tmpKey; + streamStateFreeCur(pCur); + pCur = streamStateSessionSeekKeyNext_rocksdb(pState, key); + } + + code = streamStateSessionGetKVByCur_rocksdb(pCur, key, pVal, pVLen); + if (code == 0) { + void* stateKey = (char*)(*pVal) + (valSize - keyDataLen); + if (fn(pKeyData, stateKey) == true) { + memcpy(tmp, *pVal, valSize); + streamStateSessionDel_rocksdb(pState, key); + goto _end; + } + } + + *key = tmpKey; + res = 1; + memset(tmp, 0, valSize); + +_end: + + *pVal = tmp; + streamStateFreeCur(pCur); + return res; +} int32_t streamStateSessionClear(SStreamState* pState) { SSessionKey key = {.win.skey = 0, .win.ekey = 0, .groupId = 0};