From b17f99de47a945f84bce119950747dfe7aabdb02 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 23 Mar 2023 12:49:33 +0000 Subject: [PATCH] add backend --- source/libs/stream/src/streamState.c | 55 +++++++------ source/libs/stream/src/streamStateRocksdb.c | 87 ++++++++++----------- 2 files changed, 72 insertions(+), 70 deletions(-) diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index a7862fd0ce..5efcb483bf 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -312,15 +312,6 @@ int32_t streamStatePut(SStreamState* pState, const SWinKey* key, const void* val #endif } -// todo refactor -int32_t streamStateFillPut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) { -#ifdef USE_ROCKSDB - return streamStateFillPut_rocksdb(pState, key, value, vLen); -#else - return tdbTbUpsert(pState->pTdbState->pFillStateDb, key, sizeof(SWinKey), value, vLen, pState->pTdbState->txn); -#endif -} - // todo refactor int32_t streamStateGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) { #ifdef USE_ROCKSDB @@ -330,6 +321,24 @@ int32_t streamStateGet(SStreamState* pState, const SWinKey* key, void** pVal, in return tdbTbGet(pState->pTdbState->pStateDb, &sKey, sizeof(SStateKey), pVal, pVLen); #endif } +// todo refactor +int32_t streamStateDel(SStreamState* pState, const SWinKey* key) { +#ifdef USE_ROCKSDB + return streamStateDel_rocksdb(pState, key); +#else + SStateKey sKey = {.key = *key, .opNum = pState->number}; + return tdbTbDelete(pState->pTdbState->pStateDb, &sKey, sizeof(SStateKey), pState->pTdbState->txn); +#endif +} + +// todo refactor +int32_t streamStateFillPut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) { +#ifdef USE_ROCKSDB + return streamStateFillPut_rocksdb(pState, key, value, vLen); +#else + return tdbTbUpsert(pState->pTdbState->pFillStateDb, key, sizeof(SWinKey), value, vLen, pState->pTdbState->txn); +#endif +} // todo refactor int32_t streamStateFillGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) { @@ -341,12 +350,11 @@ int32_t streamStateFillGet(SStreamState* pState, const SWinKey* key, void** pVal } // todo refactor -int32_t streamStateDel(SStreamState* pState, const SWinKey* key) { +int32_t streamStateFillDel(SStreamState* pState, const SWinKey* key) { #ifdef USE_ROCKSDB - return streamStateDel_rocksdb(pState, key); + return streamStateFillDel_rocksdb(pState, key); #else - SStateKey sKey = {.key = *key, .opNum = pState->number}; - return tdbTbDelete(pState->pTdbState->pStateDb, &sKey, sizeof(SStateKey), pState->pTdbState->txn); + return tdbTbDelete(pState->pTdbState->pFillStateDb, key, sizeof(SWinKey), pState->pTdbState->txn); #endif } @@ -373,15 +381,6 @@ int32_t streamStateClear(SStreamState* pState) { void streamStateSetNumber(SStreamState* pState, int32_t number) { pState->number = number; } -// todo refactor -int32_t streamStateFillDel(SStreamState* pState, const SWinKey* key) { -#ifdef USE_ROCKSDB - return streamStateFillDel_rocksdb(pState, key); -#else - return tdbTbDelete(pState->pTdbState->pFillStateDb, key, sizeof(SWinKey), pState->pTdbState->txn); -#endif -} - int32_t streamStateAddIfNotExist(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) { #ifdef USE_ROCKSDB return streamStateAddIfNotExist_rocksdb(pState, key, pVal, pVLen); @@ -535,13 +534,21 @@ int32_t streamStateGetFirst(SStreamState* pState, SWinKey* key) { } int32_t streamStateSeekFirst(SStreamState* pState, SStreamStateCur* pCur) { - // +#ifdef USE_ROCKSDB + rocksdb_iter_seek_to_first(pCur->iter); + return 0; +#else return tdbTbcMoveToFirst(pCur->pCur); +#endif } int32_t streamStateSeekLast(SStreamState* pState, SStreamStateCur* pCur) { - // +#ifdef USE_ROCKSDB + rocksdb_iter_seek_to_last(pCur->iter); + return 0; +#else return tdbTbcMoveToLast(pCur->pCur); +#endif } SStreamStateCur* streamStateSeekKeyNext(SStreamState* pState, const SWinKey* key) { diff --git a/source/libs/stream/src/streamStateRocksdb.c b/source/libs/stream/src/streamStateRocksdb.c index de330e4f59..a161b1b612 100644 --- a/source/libs/stream/src/streamStateRocksdb.c +++ b/source/libs/stream/src/streamStateRocksdb.c @@ -49,7 +49,7 @@ int stateKeyEncode(void* k, char* buf) { int len = 0; len += taosEncodeFixedU64((void**)&buf, key->key.groupId); len += taosEncodeFixedI64((void**)&buf, key->key.ts); - len += taosEncodeFixedU64((void**)&buf, key->opNum); + len += taosEncodeFixedI64((void**)&buf, key->opNum); return len; } int stateKeyDecode(void* k, char* buf) { @@ -482,6 +482,17 @@ int32_t streamStateFillPut_rocksdb(SStreamState* pState, const SWinKey* key, con return code; } +int32_t streamStateFillGet_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) { + int code = 0; + STREAM_STATE_GET_ROCKSDB(pState, "fill", key, pVal, pVLen); + return code; +} +int32_t streamStateFillDel_rocksdb(SStreamState* pState, const SWinKey* key) { + int code = 0; + STREAM_STATE_DEL_ROCKSDB(pState, "fill", key); + return code; +} + // todo refactor int32_t streamStateClear_rocksdb(SStreamState* pState) { @@ -502,16 +513,6 @@ int32_t streamStateClear_rocksdb(SStreamState* pState) { } return 0; } -int32_t streamStateFillGet_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) { - int code = 0; - STREAM_STATE_GET_ROCKSDB(pState, "fill", key, pVal, pVLen); - return code; -} -int32_t streamStateFillDel_rocksdb(SStreamState* pState, const SWinKey* key) { - int code = 0; - STREAM_STATE_DEL_ROCKSDB(pState, "fill", key); - return code; -} int32_t streamStateSessionPut_rocksdb(SStreamState* pState, const SSessionKey* key, const void* value, int32_t vLen) { int code = 0; @@ -545,10 +546,6 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pSta if (stateSessionKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) >= 0) return pCur; rocksdb_iter_prev(pCur->iter); - if (!rocksdb_iter_valid(pCur->iter)) { - streamStateFreeCur(pCur); - return NULL; - } return pCur; } SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pState, SSessionKey* key) { @@ -560,9 +557,12 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pSta pCur->iter = rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->ropts, pState->pTdbState->pHandle[2]); pCur->number = pState->number; + + char buf[128] = {0}; + SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; - char buf[128] = {0}; stateSessionKeyEncode(&sKey, buf); + rocksdb_iter_seek(pCur->iter, (const char*)buf, sizeof(sKey)); if (!rocksdb_iter_valid(pCur->iter)) { streamStateFreeCur(pCur); @@ -575,10 +575,6 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pSta if (stateSessionKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) <= 0) return pCur; rocksdb_iter_next(pCur->iter); - if (!rocksdb_iter_valid(pCur->iter)) { - streamStateFreeCur(pCur); - return NULL; - } return pCur; } @@ -606,10 +602,6 @@ SStreamStateCur* streamStateSessionSeekKeyNext_rocksdb(SStreamState* pState, con if (stateSessionKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) < 0) return pCur; rocksdb_iter_next(pCur->iter); - if (!rocksdb_iter_valid(pCur->iter)) { - streamStateFreeCur(pCur); - return NULL; - } return pCur; } @@ -626,31 +618,43 @@ int32_t streamStateAddIfNotExist_rocksdb(SStreamState* pState, const SWinKey* ke SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey* key) { qWarn("streamStateGetCur_rocksdb"); SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); + if (pCur == NULL) return NULL; + qWarn("streamStateGetCur_rocksdb-->1"); pCur->iter = rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->ropts, pState->pTdbState->pHandle[0]); - int32_t c = 0; SStateKey sKey = {.key = *key, .opNum = pState->number}; char buf[128] = {0}; stateKeyEncode((void*)&sKey, buf); + char sKeyStr[128] = {0}; + stateKeyToString(&sKey, sKeyStr); rocksdb_iter_seek(pCur->iter, buf, sizeof(sKey)); if (rocksdb_iter_valid(pCur->iter)) { + qWarn("streamStateGetCur_rocksdb-->2"); SStateKey curKey; size_t kLen = 0; char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen); stateKeyDecode((void*)&curKey, keyStr); + char tKeyStr[128] = {0}; + stateKeyToString(&curKey, tKeyStr); + + qWarn("streamStateGetCur_rocksdb-->src:%s, dst:%s", sKeyStr, tKeyStr); + if (stateKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) == 0) { pCur->number = pState->number; return pCur; } + qWarn("streamStateGetCur_rocksdb-->3"); } + qWarn("streamStateGetCur_rocksdb-->4"); streamStateFreeCur(pCur); return NULL; } SStreamStateCur* streamStateFillGetCur_rocksdb(SStreamState* pState, const SWinKey* key) { qWarn("streamStateFillGetCur_rocksdb"); SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); + if (pCur == NULL) return NULL; pCur->iter = @@ -688,15 +692,19 @@ int32_t streamStateGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, cons SStateKey* pKtmp = &tkey; if (rocksdb_iter_valid(pCur->iter)) { + qWarn("streamStateGetKVByCur_rocksdb-2"); size_t tlen; char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &tlen); stateKeyDecode((void*)pKtmp, keyStr); if (pKtmp->opNum != pCur->number) { + qWarn("streamStateGetKVByCur_rocksdb-3"); return -1; } + qWarn("streamStateGetKVByCur_rocksdb-4"); *pKey = pKtmp->key; return 0; } + qWarn("streamStateGetKVByCur_rocksdb-5"); return -1; } int32_t streamStateFillGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) { @@ -779,10 +787,7 @@ SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWin pCur->number = pState->number; pCur->iter = rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->ropts, pState->pTdbState->pHandle[0]); - // if (!rocksdb_iter_valid(pCur->iter)) { - // streamStateFreeCur(pCur); - // return NULL; - // } + SStateKey sKey = {.key = *key, .opNum = pState->number}; char buf[128] = {0}; stateKeyEncode((void*)&sKey, buf); @@ -796,9 +801,7 @@ SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWin return pCur; } rocksdb_iter_next(pCur->iter); - if (rocksdb_iter_valid(pCur->iter)) { - return pCur; - } + return pCur; } streamStateFreeCur(pCur); return NULL; @@ -817,6 +820,7 @@ SStreamStateCur* streamStateFillSeekKeyNext_rocksdb(SStreamState* pState, const rocksdb_iter_seek(pCur->iter, buf, sizeof(*key)); if (!rocksdb_iter_valid(pCur->iter)) { streamStateFreeCur(pCur); + return NULL; } { SWinKey curKey; @@ -827,9 +831,7 @@ SStreamStateCur* streamStateFillSeekKeyNext_rocksdb(SStreamState* pState, const return pCur; } rocksdb_iter_next(pCur->iter); - if (rocksdb_iter_valid(pCur->iter)) { - return pCur; - } + return pCur; } streamStateFreeCur(pCur); return NULL; @@ -848,6 +850,7 @@ SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const rocksdb_iter_seek(pCur->iter, buf, sizeof(*key)); if (!rocksdb_iter_valid(pCur->iter)) { streamStateFreeCur(pCur); + return NULL; } { @@ -859,9 +862,7 @@ SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const return pCur; } rocksdb_iter_prev(pCur->iter); - if (rocksdb_iter_valid(pCur->iter)) { - return pCur; - } + return pCur; } streamStateFreeCur(pCur); return NULL; @@ -869,10 +870,8 @@ SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const int32_t streamStateCurPrev_rocksdb(SStreamState* pState, SStreamStateCur* pCur) { qWarn("streamStateCurPrev_rocksdb"); if (!pCur) return -1; + rocksdb_iter_prev(pCur->iter); - if (!rocksdb_iter_valid(pCur->iter)) { - return -1; - } return 0; } int32_t streamStateCurNext_rocksdb(SStreamState* pState, SStreamStateCur* pCur) { @@ -880,9 +879,6 @@ int32_t streamStateCurNext_rocksdb(SStreamState* pState, SStreamStateCur* pCur) return -1; } rocksdb_iter_next(pCur->iter); - if (!rocksdb_iter_valid(pCur->iter)) { - return -1; - } return 0; } int32_t streamStateSessionGetKeyByRange_rocksdb(SStreamState* pState, const SSessionKey* key, SSessionKey* curKey) { @@ -959,7 +955,6 @@ int32_t streamStateSessionGet_rocksdb(SStreamState* pState, SSessionKey* key, vo } } streamStateFreeCur(pCur); - // impl later return code; } @@ -1085,7 +1080,7 @@ int32_t streamStateSessionClear_rocksdb(SStreamState* pState) { int32_t size = 0; int32_t code = streamStateSessionGetKVByCur_rocksdb(pCur, &delKey, &buf, &size); if (code == 0 && size > 0) { - // memset(buf, 0, size); + memset(buf, 0, size); streamStateSessionPut_rocksdb(pState, &delKey, buf, size); } else { break;