From 764a81298fc3f9dfcd0b75a2d787d1390986f49e Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Tue, 21 Mar 2023 13:21:36 +0000 Subject: [PATCH] add backend --- source/libs/stream/src/streamStateRocksdb.c | 101 +++++++++++++++++++- 1 file changed, 99 insertions(+), 2 deletions(-) diff --git a/source/libs/stream/src/streamStateRocksdb.c b/source/libs/stream/src/streamStateRocksdb.c index 42bfac945a..7b2f8d28f5 100644 --- a/source/libs/stream/src/streamStateRocksdb.c +++ b/source/libs/stream/src/streamStateRocksdb.c @@ -438,6 +438,35 @@ int32_t streamStateSessionPut_rocksdb(SStreamState* pState, const SSessionKey* k STREAM_STATE_PUT_ROCKSDB(pState, "sess", key, value, vLen); return code; } +SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pState, const SSessionKey* key) { + SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); + if (pCur == NULL) { + return NULL; + } + pCur->number = pState->number; + + pCur->iter = + rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->ropts, pState->pTdbState->pHandle[2]); + if (!rocksdb_iter_valid(pCur->iter)) { + streamStateFreeCur(pCur); + return NULL; + } + + SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; + int32_t c = 0; + size_t klen; + const char* iKey = rocksdb_iter_key(pCur->iter, &klen); + SStateSessionKey curKey = {0}; + stateSessionKeyDecode(&curKey, (char*)iKey); + if (stateSessionKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) >= 0) return pCur; + + rocksdb_iter_prev(pCur->iter); + if (!rocksdb_iter_valid(pCur->iter)) { + streamStateFreeCur(pCur); + return NULL; + } + return pCur; +} SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pState, SSessionKey* key) { SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); if (pCur == NULL) { @@ -467,21 +496,89 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pSta } return pCur; } + +int32_t streamStateSessionGetKVByCur_rocksdb(SStreamStateCur* pCur, SSessionKey* pKey, void** pVal, int32_t* pVLen) { + if (!pCur) { + return -1; + } + SStateSessionKey ktmp = {0}; + SStateSessionKey* pKTmp = &ktmp; + int32_t kLen, vLen; + + if (!rocksdb_iter_valid(pCur->iter)) { + return -1; + } + const char* curKey = rocksdb_iter_key(pCur->iter, (size_t*)&kLen); + stateSessionKeyDecode((void*)&ktmp, (char*)curKey); + + const char* val = rocksdb_iter_value(pCur->iter, (size_t*)&vLen); + *pVal = (char*)val; + *pVLen = vLen; + + if (pKTmp->opNum != pCur->number) { + return -1; + } + if (pKey->groupId != 0 && pKey->groupId != pKTmp->key.groupId) { + return -1; + } + *pKey = pKTmp->key; + return 0; +} + +int32_t streamStateCurNext_rocksdb(SStreamState* pState, SStreamStateCur* pCur) { + if (!pCur) { + return -1; + } + rocksdb_iter_next(pCur->iter); + return 0; +} int32_t streamStateSessionGet_rocksdb(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen) { int code = 0; - SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext(pState, key); + SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext_rocksdb(pState, key); SSessionKey resKey = *key; + void* tmp = NULL; + code = streamStateSessionGetKVByCur_rocksdb(pCur, &resKey, &tmp, pVLen); + if (code == 0) { + if (key->win.skey != resKey.win.skey) { + code = -1; + } else { + *key = resKey; + *pVal = tmp; + } + } + streamStateFreeCur(pCur); // impl later return code; } -int32_t streamStateSessionDel(SStreamState* pState, const SSessionKey* key) { +int32_t streamStateSessionDel_rocksdb(SStreamState* pState, const SSessionKey* key) { int code = 0; SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; STREAM_STATE_DEL_ROCKSDB(pState, "sess", key); return code; } + +int32_t streamStateSessionClear(SStreamState* pState) { + SSessionKey key = {.win.skey = 0, .win.ekey = 0, .groupId = 0}; + SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext_rocksdb(pState, &key); + + while (1) { + SSessionKey delKey = {0}; + void* buf = NULL; + int32_t size = 0; + int32_t code = streamStateSessionGetKVByCur_rocksdb(pCur, &delKey, &buf, &size); + if (code == 0 && size > 0) { + memset(buf, 0, size); + streamStateSessionPut_rocksdb(pState, &delKey, buf, size); + } else { + break; + } + streamStateCurNext_rocksdb(pState, pCur); + } + streamStateFreeCur(pCur); + return -1; +} int32_t streamStatePutParTag_rocksdb(SStreamState* pState, int64_t groupId, const void* tag, int32_t tagLen) { int code = 0; STREAM_STATE_PUT_ROCKSDB(pState, "partag", &groupId, tag, tagLen);