From 874b6916aa13762a57b95136da4615af94c82824 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sun, 26 Mar 2023 12:06:06 +0000 Subject: [PATCH] add backend --- source/libs/stream/src/streamStateRocksdb.c | 61 +++++++++++---------- 1 file changed, 31 insertions(+), 30 deletions(-) diff --git a/source/libs/stream/src/streamStateRocksdb.c b/source/libs/stream/src/streamStateRocksdb.c index f1d7606805..1112dda1d7 100644 --- a/source/libs/stream/src/streamStateRocksdb.c +++ b/source/libs/stream/src/streamStateRocksdb.c @@ -682,37 +682,7 @@ SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey* streamStateFreeCur(pCur); return NULL; } -SStreamStateCur* streamStateFillGetCur_rocksdb(SStreamState* pState, const SWinKey* key) { - qDebug("streamStateFillGetCur_rocksdb"); - SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); - if (pCur == NULL) return NULL; - - pCur->iter = - rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->ropts, pState->pTdbState->pHandle[1]); - char buf[128] = {0}; - int len = winKeyDecode((void*)key, buf); - rocksdb_iter_seek(pCur->iter, buf, len); - if (!rocksdb_iter_valid(pCur->iter)) { - rocksdb_iter_seek_for_prev(pCur->iter, buf, len); - if (!rocksdb_iter_valid(pCur->iter)) { - streamStateFreeCur(pCur); - return NULL; - } - } - if (rocksdb_iter_valid(pCur->iter)) { - size_t kLen; - SWinKey curKey; - char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen); - winKeyDecode((void*)&curKey, keyStr); - if (winKeyCmpr(key, sizeof(*key), &curKey, sizeof(curKey)) == 0) { - return pCur; - } - } - - streamStateFreeCur(pCur); - return NULL; -} SStreamStateCur* streamStateGetAndCheckCur_rocksdb(SStreamState* pState, SWinKey* key) { qDebug("streamStateGetAndCheckCur_rocksdb"); SStreamStateCur* pCur = streamStateFillGetCur_rocksdb(pState, key); @@ -741,6 +711,37 @@ int32_t streamStateGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, cons } return -1; } +SStreamStateCur* streamStateFillGetCur_rocksdb(SStreamState* pState, const SWinKey* key) { + qDebug("streamStateFillGetCur_rocksdb"); + SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); + + if (pCur == NULL) return NULL; + + pCur->iter = + rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->ropts, pState->pTdbState->pHandle[1]); + char buf[128] = {0}; + int len = winKeyEncode((void*)key, buf); + rocksdb_iter_seek(pCur->iter, buf, len); + if (!rocksdb_iter_valid(pCur->iter)) { + rocksdb_iter_seek_for_prev(pCur->iter, buf, len); + if (!rocksdb_iter_valid(pCur->iter)) { + streamStateFreeCur(pCur); + return NULL; + } + } + if (rocksdb_iter_valid(pCur->iter)) { + size_t kLen; + SWinKey curKey; + char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen); + winKeyDecode((void*)&curKey, keyStr); + if (winKeyCmpr(key, sizeof(*key), &curKey, sizeof(curKey)) == 0) { + return pCur; + } + } + + streamStateFreeCur(pCur); + return NULL; +} int32_t streamStateFillGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) { qDebug("streamStateFillGetKVByCur_rocksdb"); if (!pCur) {