diff --git a/contrib/test/rocksdb/main.c b/contrib/test/rocksdb/main.c index 84bada50dd..8b8317f635 100644 --- a/contrib/test/rocksdb/main.c +++ b/contrib/test/rocksdb/main.c @@ -10,32 +10,48 @@ const char DBPath[] = "rocksdb_c_simple_example"; const char DBBackupPath[] = "/tmp/rocksdb_c_simple_example_backup"; +static const int32_t endian_test_var = 1; +#define IS_LITTLE_ENDIAN() (*(uint8_t *)(&endian_test_var) != 0) +#define TD_RT_ENDIAN() (IS_LITTLE_ENDIAN() ? TD_LITTLE_ENDIAN : TD_BIG_ENDIAN) + #define POINTER_SHIFT(p, b) ((void *)((char *)(p) + (b))) -static int32_t taosEncodeFixedU64(void **buf, uint64_t value) { - if (buf != NULL) { - ((uint8_t *)(*buf))[0] = value & 0xff; - ((uint8_t *)(*buf))[1] = (value >> 8) & 0xff; - ((uint8_t *)(*buf))[2] = (value >> 16) & 0xff; - ((uint8_t *)(*buf))[3] = (value >> 24) & 0xff; - ((uint8_t *)(*buf))[4] = (value >> 32) & 0xff; - ((uint8_t *)(*buf))[5] = (value >> 40) & 0xff; - ((uint8_t *)(*buf))[6] = (value >> 48) & 0xff; - ((uint8_t *)(*buf))[7] = (value >> 56) & 0xff; - *buf = POINTER_SHIFT(*buf, sizeof(value)); +static void *taosDecodeFixedU64(const void *buf, uint64_t *value) { + if (IS_LITTLE_ENDIAN()) { + memcpy(value, buf, sizeof(*value)); + } else { + ((uint8_t *)value)[7] = ((uint8_t *)buf)[0]; + ((uint8_t *)value)[6] = ((uint8_t *)buf)[1]; + ((uint8_t *)value)[5] = ((uint8_t *)buf)[2]; + ((uint8_t *)value)[4] = ((uint8_t *)buf)[3]; + ((uint8_t *)value)[3] = ((uint8_t *)buf)[4]; + ((uint8_t *)value)[2] = ((uint8_t *)buf)[5]; + ((uint8_t *)value)[1] = ((uint8_t *)buf)[6]; + ((uint8_t *)value)[0] = ((uint8_t *)buf)[7]; } - return (int32_t)sizeof(value); + + return POINTER_SHIFT(buf, sizeof(*value)); } -static void *taosDecodeFixedU64(const void *buf, uint64_t *value) { - ((uint8_t *)value)[7] = ((uint8_t *)buf)[0]; - ((uint8_t *)value)[6] = ((uint8_t *)buf)[1]; - ((uint8_t *)value)[5] = ((uint8_t *)buf)[2]; - ((uint8_t *)value)[4] = ((uint8_t *)buf)[3]; - ((uint8_t *)value)[3] = ((uint8_t *)buf)[4]; - ((uint8_t *)value)[2] = ((uint8_t *)buf)[5]; - ((uint8_t *)value)[1] = ((uint8_t *)buf)[6]; - ((uint8_t *)value)[0] = ((uint8_t *)buf)[7]; - return POINTER_SHIFT(buf, sizeof(*value)); +// ---- Fixed U64 +static int32_t taosEncodeFixedU64(void **buf, uint64_t value) { + if (buf != NULL) { + if (IS_LITTLE_ENDIAN()) { + memcpy(*buf, &value, sizeof(value)); + } else { + ((uint8_t *)(*buf))[0] = value & 0xff; + ((uint8_t *)(*buf))[1] = (value >> 8) & 0xff; + ((uint8_t *)(*buf))[2] = (value >> 16) & 0xff; + ((uint8_t *)(*buf))[3] = (value >> 24) & 0xff; + ((uint8_t *)(*buf))[4] = (value >> 32) & 0xff; + ((uint8_t *)(*buf))[5] = (value >> 40) & 0xff; + ((uint8_t *)(*buf))[6] = (value >> 48) & 0xff; + ((uint8_t *)(*buf))[7] = (value >> 56) & 0xff; + } + + *buf = POINTER_SHIFT(*buf, sizeof(value)); + } + + return (int32_t)sizeof(value); } typedef struct KV { @@ -45,8 +61,6 @@ typedef struct KV { int kvSerial(KV *kv, char *buf) { int len = 0; - buf[0] = 'a'; - buf += 1; len += taosEncodeFixedU64((void **)&buf, kv->k1); len += taosEncodeFixedU64((void **)&buf, kv->k2); return len; @@ -60,8 +74,8 @@ int kvDBComp(void *state, const char *aBuf, size_t aLen, const char *bBu char *p1 = (char *)aBuf; char *p2 = (char *)bBuf; - p1 += 1; - p2 += 1; + // p1 += 1; + // p2 += 1; p1 = taosDecodeFixedU64(p1, &w1.k1); p2 = taosDecodeFixedU64(p2, &w2.k1); @@ -84,7 +98,7 @@ int kvDBComp(void *state, const char *aBuf, size_t aLen, const char *bBu } int kvDeserial(KV *kv, char *buf) { char *p1 = (char *)buf; - p1 += 1; + // p1 += 1; p1 = taosDecodeFixedU64(p1, &kv->k1); p1 = taosDecodeFixedU64(p1, &kv->k2); @@ -107,11 +121,15 @@ int main(int argc, char const *argv[]) { const rocksdb_options_t **cfOpt = malloc(len * sizeof(rocksdb_options_t *)); for (int i = 0; i < len; i++) { - cfOpt[i] = opt; + cfOpt[i] = rocksdb_options_create_copy(opt); + if (i != 0) { + rocksdb_comparator_t *comp = rocksdb_comparator_create(NULL, NULL, kvDBComp, kvDBName); + rocksdb_options_set_comparator((rocksdb_options_t *)cfOpt[i], comp); + } } rocksdb_column_family_handle_t **cfHandle = malloc(len * sizeof(rocksdb_column_family_handle_t *)); - db = rocksdb_open_column_families(opt, "test", len, cfName, cfOpt, cfHandle, &err); + db = rocksdb_open_column_families(opt, path, len, cfName, cfOpt, cfHandle, &err); { rocksdb_readoptions_t *rOpt = rocksdb_readoptions_create(); @@ -119,10 +137,6 @@ int main(int argc, char const *argv[]) { char *v = rocksdb_get_cf(db, rOpt, cfHandle[0], "key", strlen("key"), &vlen, &err); printf("Get value %s, and len = %d\n", v, (int)vlen); - - char *v1 = rocksdb_get_cf(db, rOpt, cfHandle[1], "key", strlen("key"), &vlen, &err); - printf("Get value %s, and len = %d\n", v1, (int)vlen); - rocksdb_readoptions_destroy(rOpt); } rocksdb_writeoptions_t *wOpt = rocksdb_writeoptions_create(); @@ -133,6 +147,40 @@ int main(int argc, char const *argv[]) { rocksdb_readoptions_t *rOpt = rocksdb_readoptions_create(); size_t vlen = 0; + { + rocksdb_writeoptions_t *wOpt = rocksdb_writeoptions_create(); + rocksdb_writebatch_t *wBatch = rocksdb_writebatch_create(); + for (int i = 0; i < 100; i++) { + char buf[128] = {0}; + KV kv = {.k1 = (100 - i) % 26, .k2 = i % 26}; + kvSerial(&kv, buf); + rocksdb_writebatch_put_cf(wBatch, cfHandle[1], buf, sizeof(kv), "value", strlen("value")); + } + rocksdb_write(db, wOpt, wBatch, &err); + } + { + char buf[128] = {0}; + KV kv = {.k1 = 0, .k2 = 0}; + kvSerial(&kv, buf); + char *v = rocksdb_get_cf(db, rOpt, cfHandle[1], buf, sizeof(kv), &vlen, &err); + printf("Get value %s, and len = %d, xxxx\n", v, (int)vlen); + rocksdb_iterator_t *iter = rocksdb_create_iterator_cf(db, rOpt, cfHandle[1]); + rocksdb_iter_seek_to_first(iter); + int i = 0; + while (rocksdb_iter_valid(iter)) { + size_t klen, vlen; + const char *key = rocksdb_iter_key(iter, &klen); + const char *value = rocksdb_iter_value(iter, &vlen); + KV kv; + kvDeserial(&kv, (char *)key); + printf("kv1: %d\t kv2: %d, len:%d, value = %s\n", (int)(kv.k1), (int)(kv.k2), (int)(klen), value); + i++; + rocksdb_iter_next(iter); + } + rocksdb_iter_destroy(iter); + printf("iterator count %d\n", i); + } + char *v = rocksdb_get_cf(db, rOpt, cfHandle[0], "key", strlen("key"), &vlen, &err); printf("Get value %s, and len = %d\n", v, (int)vlen); diff --git a/include/libs/stream/streamState.h b/include/libs/stream/streamState.h index d893804639..2a4185c6ab 100644 --- a/include/libs/stream/streamState.h +++ b/include/libs/stream/streamState.h @@ -34,6 +34,8 @@ typedef struct STdbState { rocksdb_t* rocksdb; rocksdb_column_family_handle_t** pHandle; + rocksdb_writeoptions_t* wopts; + rocksdb_readoptions_t* ropts; // rocksdb_column_family_handle_t* fillStateDB; // rocksdb_column_family_handle_t* sessStateDB; // rocksdb_column_family_handle_t* funcStateDB; @@ -65,6 +67,8 @@ int32_t streamStateAbort(SStreamState* pState); void streamStateDestroy(SStreamState* pState); typedef struct { + rocksdb_iterator_t* iter; + TBC* pCur; int64_t number; } SStreamStateCur; @@ -124,6 +128,23 @@ int32_t streamStateGetParName(SStreamState* pState, int64_t groupId, void** pVal int32_t streamStatePutParTag(SStreamState* pState, int64_t groupId, const void* tag, int32_t tagLen); int32_t streamStateGetParTag(SStreamState* pState, int64_t groupId, void** tagVal, int32_t* tagLen); +/***compare func **/ + +// todo refactor +typedef struct SStateKey { + SWinKey key; + int64_t opNum; +} SStateKey; + +typedef struct SStateSessionKey { + SSessionKey key; + int64_t opNum; +} SStateSessionKey; + +int sessionRangeKeyCmpr(const SSessionKey* pWin1, const SSessionKey* pWin2); +int sessionWinKeyCmpr(const SSessionKey* pWin1, const SSessionKey* pWin2); +int stateSessionKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2); +int stateKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2); #if 0 char* streamStateSessionDump(SStreamState* pState); char* streamStateIntervalDump(SStreamState* pState); diff --git a/source/dnode/vnode/CMakeLists.txt b/source/dnode/vnode/CMakeLists.txt index 8dc3f46ae3..566f6d4a9b 100644 --- a/source/dnode/vnode/CMakeLists.txt +++ b/source/dnode/vnode/CMakeLists.txt @@ -113,6 +113,12 @@ if(${BUILD_WITH_INVERTEDINDEX}) add_definitions(-DUSE_INVERTED_INDEX) endif(${BUILD_WITH_INVERTEDINDEX}) +if(${BUILD_WITH_ROCKSDB}) + add_definitions(-DUSE_ROCKSDB) +endif(${BUILD_WITH_ROCKSDB}) + + + if(${BUILD_TEST}) add_subdirectory(test) endif(${BUILD_TEST}) diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 4fa45c0148..57a0077a79 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -25,18 +25,7 @@ #include "tcompare.h" #include "ttimer.h" -// todo refactor -typedef struct SStateKey { - SWinKey key; - int64_t opNum; -} SStateKey; - -typedef struct SStateSessionKey { - SSessionKey key; - int64_t opNum; -} SStateSessionKey; - -static inline int sessionRangeKeyCmpr(const SSessionKey* pWin1, const SSessionKey* pWin2) { +int sessionRangeKeyCmpr(const SSessionKey* pWin1, const SSessionKey* pWin2) { if (pWin1->groupId > pWin2->groupId) { return 1; } else if (pWin1->groupId < pWin2->groupId) { @@ -52,7 +41,7 @@ static inline int sessionRangeKeyCmpr(const SSessionKey* pWin1, const SSessionKe return 0; } -static inline int sessionWinKeyCmpr(const SSessionKey* pWin1, const SSessionKey* pWin2) { +int sessionWinKeyCmpr(const SSessionKey* pWin1, const SSessionKey* pWin2) { if (pWin1->groupId > pWin2->groupId) { return 1; } else if (pWin1->groupId < pWin2->groupId) { @@ -74,7 +63,7 @@ static inline int sessionWinKeyCmpr(const SSessionKey* pWin1, const SSessionKey* return 0; } -static inline int stateSessionKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) { +int stateSessionKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) { SStateSessionKey* pWin1 = (SStateSessionKey*)pKey1; SStateSessionKey* pWin2 = (SStateSessionKey*)pKey2; @@ -87,7 +76,7 @@ static inline int stateSessionKeyCmpr(const void* pKey1, int kLen1, const void* return sessionWinKeyCmpr(&pWin1->key, &pWin2->key); } -static inline int stateKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) { +int stateKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) { SStateKey* pWin1 = (SStateKey*)pKey1; SStateKey* pWin2 = (SStateKey*)pKey2; @@ -112,191 +101,6 @@ static inline int stateKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, return 0; } -// -// SStateKey -// |--groupid--|---ts------|--opNum----| -// |--uint64_t-|-uint64_t--|--int64_t--| -// -// -// -int stateKeyDBComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen) { - SStateKey key1, key2; - memset(&key1, 0, sizeof(key1)); - memset(&key2, 0, sizeof(key2)); - - char* p1 = (char*)aBuf; - char* p2 = (char*)bBuf; - - p1 = taosDecodeFixedU64(p1, &key1.key.groupId); - p2 = taosDecodeFixedU64(p2, &key2.key.groupId); - - p1 = taosDecodeFixedI64(p1, &key1.key.ts); - p2 = taosDecodeFixedI64(p2, &key2.key.ts); - - taosDecodeFixedI64(p1, &key1.opNum); - taosDecodeFixedI64(p2, &key2.opNum); - - return stateKeyCmpr(&key1, sizeof(key1), &key2, sizeof(key2)); -} - -int stateKeySerial(SStateKey* key, char* buf) { - int len = 0; - len += taosEncodeFixedU64((void**)&buf, key->key.groupId); - len += taosEncodeFixedI64((void**)&buf, key->key.ts); - len += taosEncodeFixedU64((void**)&buf, key->opNum); - return len; -} - -// -// SStateSessionKey -// |-----------SSessionKey----------| -// |-----STimeWindow-----| -// |---skey--|---ekey----|--groupId-|--opNum--| -// |---int64-|--int64_t--|--uint64--|--int64_t| -// | -// -int stateSessionKeyDBComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen) { - SStateSessionKey w1, w2; - memset(&w1, 0, sizeof(w1)); - memset(&w2, 0, sizeof(w2)); - - char* p1 = (char*)aBuf; - char* p2 = (char*)bBuf; - - p1 = taosDecodeFixedI64(p1, &w1.key.win.skey); - p2 = taosDecodeFixedI64(p2, &w2.key.win.skey); - - p1 = taosDecodeFixedI64(p1, &w1.key.win.ekey); - p2 = taosDecodeFixedI64(p2, &w2.key.win.ekey); - - p1 = taosDecodeFixedU64(p1, &w1.key.groupId); - p2 = taosDecodeFixedU64(p2, &w2.key.groupId); - - p1 = taosDecodeFixedI64(p1, &w1.opNum); - p2 = taosDecodeFixedI64(p2, &w2.opNum); - - return stateSessionKeyCmpr(&w1, sizeof(w1), &w2, sizeof(w2)); -} -int stateSessionKeySerial(SStateSessionKey* sess, char* buf) { - int len = 0; - len += taosEncodeFixedI64((void**)&buf, sess->key.win.skey); - len += taosEncodeFixedI64((void**)&buf, sess->key.win.ekey); - len += taosEncodeFixedU64((void**)&buf, sess->key.groupId); - len += taosEncodeFixedI64((void**)&buf, sess->opNum); - return len; -} - -/** - * SWinKey - * |------groupId------|-----ts------| - * |------uint64-------|----int64----| - */ -int winKeyDBComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen) { - SWinKey w1, w2; - memset(&w1, 0, sizeof(w1)); - memset(&w2, 0, sizeof(w2)); - - char* p1 = (char*)aBuf; - char* p2 = (char*)bBuf; - - p1 = taosDecodeFixedU64(p1, &w1.groupId); - p2 = taosDecodeFixedU64(p2, &w2.groupId); - - p1 = taosDecodeFixedI64(p1, &w1.ts); - p2 = taosDecodeFixedI64(p2, &w2.ts); - - return winKeyCmpr(&w1, sizeof(w1), &w2, sizeof(w2)); -} - -int winKeySerial(SWinKey* key, char* buf) { - int len = 0; - len += taosEncodeFixedU64((void**)&buf, key->groupId); - len += taosEncodeFixedI64((void**)&buf, key->ts); - return len; -} - -/* - * STupleKey - * |---groupId---|---ts---|---exprIdx---| - * |---uint64--|---int64--|---int32-----| - */ -int tupleKeyDBComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen) { - STupleKey w1, w2; - memset(&w1, 0, sizeof(w1)); - memset(&w2, 0, sizeof(w2)); - - char* p1 = (char*)aBuf; - char* p2 = (char*)bBuf; - - p1 = taosDecodeFixedU64(p1, &w1.groupId); - p2 = taosDecodeFixedU64(p2, &w2.groupId); - - p1 = taosDecodeFixedI64(p1, &w1.ts); - p2 = taosDecodeFixedI64(p2, &w2.ts); - - p1 = taosDecodeFixedI32(p1, &w1.exprIdx); - p2 = taosDecodeFixedI32(p2, &w2.exprIdx); - - return STupleKeyCmpr(&w1, sizeof(w1), &w2, sizeof(w2)); -} - -int tupleKeySerial(STupleKey* key, char* buf) { - int len = 0; - len += taosEncodeFixedU64((void**)&buf, key->groupId); - len += taosEncodeFixedI64((void**)&buf, key->ts); - len += taosEncodeFixedI32((void**)&buf, key->exprIdx); - return len; -} - -const char* cfName[] = {"default", "fill", "sess", "func", "parname", "partag"}; - -const char* compareStateName(void* name) { return NULL; } -int streamInitBackend(SStreamState* pState, char* path) { - rocksdb_options_t* opts = rocksdb_options_create(); - rocksdb_options_increase_parallelism(opts, 4); - rocksdb_options_optimize_level_style_compaction(opts, 0); - // create the DB if it's not already present - rocksdb_options_set_create_if_missing(opts, 1); - rocksdb_options_set_create_missing_column_families(opts, 1); - - char* err = NULL; - int cfLen = sizeof(cfName) / sizeof(cfName[0]); - - const rocksdb_options_t** cfOpt = taosMemoryCalloc(cfLen, sizeof(rocksdb_options_t*)); - for (int i = 0; i < cfLen; i++) { - cfOpt[i] = rocksdb_options_create_copy(opts); - } - - rocksdb_comparator_t* fillCompare = rocksdb_comparator_create(NULL, NULL, stateKeyDBComp, compareStateName); - rocksdb_options_set_comparator((rocksdb_options_t*)cfOpt[1], fillCompare); - - rocksdb_comparator_t* sessCompare = rocksdb_comparator_create(NULL, NULL, stateKeyDBComp, compareStateName); - rocksdb_options_set_comparator((rocksdb_options_t*)cfOpt[2], sessCompare); - - rocksdb_comparator_t* funcCompare = rocksdb_comparator_create(NULL, NULL, stateKeyDBComp, compareStateName); - rocksdb_options_set_comparator((rocksdb_options_t*)cfOpt[3], funcCompare); - - rocksdb_comparator_t* parnameCompare = rocksdb_comparator_create(NULL, NULL, stateKeyDBComp, compareStateName); - rocksdb_options_set_comparator((rocksdb_options_t*)cfOpt[4], parnameCompare); - - rocksdb_comparator_t* partagCompare = rocksdb_comparator_create(NULL, NULL, stateKeyDBComp, compareStateName); - rocksdb_options_set_comparator((rocksdb_options_t*)cfOpt[5], partagCompare); - - rocksdb_column_family_handle_t** cfHandle = taosMemoryMalloc(cfLen * sizeof(rocksdb_column_family_handle_t*)); - rocksdb_t* db = rocksdb_open_column_families(opts, "rocksdb", cfLen, cfName, cfOpt, cfHandle, &err); - - pState->pTdbState->rocksdb = db; - pState->pTdbState->pHandle = cfHandle; - return 0; -} -void streamCleanBackend(SStreamState* pState) { - int cfLen = sizeof(cfName) / sizeof(cfName[0]); - for (int i = 0; i < cfLen; i++) { - rocksdb_column_family_handle_destroy(pState->pTdbState->pHandle[i]); - } - rocksdb_close(pState->pTdbState->rocksdb); -} - SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int32_t szPage, int32_t pages) { SStreamState* pState = taosMemoryCalloc(1, sizeof(SStreamState)); if (pState == NULL) { @@ -407,7 +211,7 @@ _err: void streamStateClose(SStreamState* pState) { #ifdef USE_ROCKSDB - + streamCleanBackend(pState); #else tdbCommit(pState->pTdbState->db, pState->pTdbState->txn); tdbPostCommit(pState->pTdbState->db, pState->pTdbState->txn); @@ -744,6 +548,7 @@ void streamStateFreeCur(SStreamStateCur* pCur) { return; } tdbTbcClose(pCur->pCur); + rocksdb_iter_destroy(pCur->iter); taosMemoryFree(pCur); } diff --git a/source/libs/stream/src/streamStateRocksdb.c b/source/libs/stream/src/streamStateRocksdb.c new file mode 100644 index 0000000000..42bfac945a --- /dev/null +++ b/source/libs/stream/src/streamStateRocksdb.c @@ -0,0 +1,512 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include +#include +#include "executor.h" +#include "osMemory.h" +#include "rocksdb/c.h" +#include "streamInc.h" +#include "streamState.h" +#include "tcoding.h" +#include "tcommon.h" +#include "tcompare.h" +#include "ttimer.h" + +// +// SStateKey +// |--groupid--|---ts------|--opNum----| +// |--uint64_t-|-uint64_t--|--int64_t--| +// +// +// +int stateKeyDBComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen) { + SStateKey key1, key2; + memset(&key1, 0, sizeof(key1)); + memset(&key2, 0, sizeof(key2)); + + char* p1 = (char*)aBuf; + char* p2 = (char*)bBuf; + + p1 = taosDecodeFixedU64(p1, &key1.key.groupId); + p2 = taosDecodeFixedU64(p2, &key2.key.groupId); + + p1 = taosDecodeFixedI64(p1, &key1.key.ts); + p2 = taosDecodeFixedI64(p2, &key2.key.ts); + + taosDecodeFixedI64(p1, &key1.opNum); + taosDecodeFixedI64(p2, &key2.opNum); + + return stateKeyCmpr(&key1, sizeof(key1), &key2, sizeof(key2)); +} + +int stateKeyEncode(void* k, char* buf) { + SStateKey* key = k; + int len = 0; + len += taosEncodeFixedU64((void**)&buf, key->key.groupId); + len += taosEncodeFixedI64((void**)&buf, key->key.ts); + len += taosEncodeFixedU64((void**)&buf, key->opNum); + return len; +} +int stateKeyDecode(void* k, char* buf) { + SStateKey* key = k; + int len = 0; + char* p = buf; + p = taosDecodeFixedU64(p, &key->key.groupId); + p = taosDecodeFixedI64(p, &key->key.ts); + p = taosDecodeFixedI64(p, &key->opNum); + return p - buf; +} + +// +// SStateSessionKey +// |-----------SSessionKey----------| +// |-----STimeWindow-----| +// |---skey--|---ekey----|--groupId-|--opNum--| +// |---int64-|--int64_t--|--uint64--|--int64_t| +// | +// +int stateSessionKeyDBComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen) { + SStateSessionKey w1, w2; + memset(&w1, 0, sizeof(w1)); + memset(&w2, 0, sizeof(w2)); + + char* p1 = (char*)aBuf; + char* p2 = (char*)bBuf; + + p1 = taosDecodeFixedI64(p1, &w1.key.win.skey); + p2 = taosDecodeFixedI64(p2, &w2.key.win.skey); + + p1 = taosDecodeFixedI64(p1, &w1.key.win.ekey); + p2 = taosDecodeFixedI64(p2, &w2.key.win.ekey); + + p1 = taosDecodeFixedU64(p1, &w1.key.groupId); + p2 = taosDecodeFixedU64(p2, &w2.key.groupId); + + p1 = taosDecodeFixedI64(p1, &w1.opNum); + p2 = taosDecodeFixedI64(p2, &w2.opNum); + + return stateSessionKeyCmpr(&w1, sizeof(w1), &w2, sizeof(w2)); +} +int stateSessionKeyEncode(void* ses, char* buf) { + SStateSessionKey* sess = ses; + int len = 0; + len += taosEncodeFixedI64((void**)&buf, sess->key.win.skey); + len += taosEncodeFixedI64((void**)&buf, sess->key.win.ekey); + len += taosEncodeFixedU64((void**)&buf, sess->key.groupId); + len += taosEncodeFixedI64((void**)&buf, sess->opNum); + return len; +} +int stateSessionKeyDecode(void* ses, char* buf) { + SStateSessionKey* sess = ses; + int len = 0; + + char* p = buf; + p = taosDecodeFixedI64(p, &sess->key.win.skey); + p = taosDecodeFixedI64(p, &sess->key.win.ekey); + p = taosDecodeFixedU64(p, &sess->key.groupId); + p = taosDecodeFixedI64(p, &sess->opNum); + return p - buf; +} + +/** + * SWinKey + * |------groupId------|-----ts------| + * |------uint64-------|----int64----| + */ +int winKeyDBComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen) { + SWinKey w1, w2; + memset(&w1, 0, sizeof(w1)); + memset(&w2, 0, sizeof(w2)); + + char* p1 = (char*)aBuf; + char* p2 = (char*)bBuf; + + p1 = taosDecodeFixedU64(p1, &w1.groupId); + p2 = taosDecodeFixedU64(p2, &w2.groupId); + + p1 = taosDecodeFixedI64(p1, &w1.ts); + p2 = taosDecodeFixedI64(p2, &w2.ts); + + return winKeyCmpr(&w1, sizeof(w1), &w2, sizeof(w2)); +} + +int winKeyEncode(void* k, char* buf) { + SWinKey* key = k; + int len = 0; + len += taosEncodeFixedU64((void**)&buf, key->groupId); + len += taosEncodeFixedI64((void**)&buf, key->ts); + return len; +} + +int winKeyDecode(void* k, char* buf) { + SWinKey* key = k; + int len = 0; + char* p = buf; + p = taosDecodeFixedU64(p, &key->groupId); + p = taosDecodeFixedI64(p, &key->ts); + return len; +} +/* + * STupleKey + * |---groupId---|---ts---|---exprIdx---| + * |---uint64--|---int64--|---int32-----| + */ +int tupleKeyDBComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen) { + STupleKey w1, w2; + memset(&w1, 0, sizeof(w1)); + memset(&w2, 0, sizeof(w2)); + + char* p1 = (char*)aBuf; + char* p2 = (char*)bBuf; + + p1 = taosDecodeFixedU64(p1, &w1.groupId); + p2 = taosDecodeFixedU64(p2, &w2.groupId); + + p1 = taosDecodeFixedI64(p1, &w1.ts); + p2 = taosDecodeFixedI64(p2, &w2.ts); + + p1 = taosDecodeFixedI32(p1, &w1.exprIdx); + p2 = taosDecodeFixedI32(p2, &w2.exprIdx); + + return STupleKeyCmpr(&w1, sizeof(w1), &w2, sizeof(w2)); +} + +int tupleKeyEncode(void* k, char* buf) { + STupleKey* key = k; + int len = 0; + len += taosEncodeFixedU64((void**)&buf, key->groupId); + len += taosEncodeFixedI64((void**)&buf, key->ts); + len += taosEncodeFixedI32((void**)&buf, key->exprIdx); + return len; +} +int tupleKeyDecode(void* k, char* buf) { + STupleKey* key = k; + int len = 0; + char* p = buf; + p = taosDecodeFixedU64(p, &key->groupId); + p = taosDecodeFixedI64(p, &key->ts); + p = taosDecodeFixedI32(p, &key->exprIdx); + return len; +} + +int parKeyDBComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen) { + int64_t w1, w2; + memset(&w1, 0, sizeof(w1)); + memset(&w2, 0, sizeof(w2)); + char* p1 = (char*)aBuf; + char* p2 = (char*)bBuf; + + taosDecodeFixedI64(p1, &w1); + taosDecodeFixedI64(p2, &w2); + if (w1 == w2) { + return 0; + } else { + return w1 < w2 ? -1 : 1; + } +} +int parKeyEncode(void* k, char* buf) { + int64_t* groupid = k; + int len = taosEncodeFixedI64((void**)&buf, *groupid); + return len; +} +int parKeyDecode(void* k, char* buf) { + char* p = buf; + int64_t* groupid = k; + + p = taosDecodeFixedI64(p, groupid); + return p - buf; +} + +const char* cfName[] = {"default", "fill", "sess", "func", "parname", "partag"}; +typedef int (*EncodeFunc)(void* key, char* buf); +typedef int (*DecodeFunc)(void* key, char* buf); +////typedef int32_t (*BackendCmpFunc)(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen); +////typedef const char* (*BackendCmpNameFunc)(void* statue); + +typedef struct { + const char* key; + int idx; + EncodeFunc enFunc; + DecodeFunc deFunc; +} SCfInit; + +SCfInit ginitDict[] = { + {"default", 0, stateKeyEncode, stateKeyDecode}, + {"fill", 1, winKeyEncode, winKeyDecode}, + {"sess", 2, stateSessionKeyEncode, stateSessionKeyDecode}, + {"func", 3, tupleKeyEncode, tupleKeyDecode}, + {"parname", 4, parKeyEncode, parKeyDecode}, + {"partag", 5, parKeyEncode, parKeyDecode}, +}; + +const char* compareStateName(void* name) { return cfName[0]; } +const char* compareWinKey(void* name) { return cfName[1]; } +const char* compareSessionKey(void* name) { return cfName[2]; } +const char* compareFuncKey(void* name) { return cfName[3]; } +const char* compareParKey(void* name) { return cfName[4]; } +const char* comparePartagKey(void* name) { return cfName[5]; } + +int streamInitBackend(SStreamState* pState, char* path) { + rocksdb_options_t* opts = rocksdb_options_create(); + rocksdb_options_increase_parallelism(opts, 4); + rocksdb_options_optimize_level_style_compaction(opts, 0); + // create the DB if it's not already present + rocksdb_options_set_create_if_missing(opts, 1); + rocksdb_options_set_create_missing_column_families(opts, 1); + + char* err = NULL; + int cfLen = sizeof(cfName) / sizeof(cfName[0]); + + const rocksdb_options_t** cfOpt = taosMemoryCalloc(cfLen, sizeof(rocksdb_options_t*)); + for (int i = 0; i < cfLen; i++) { + cfOpt[i] = rocksdb_options_create_copy(opts); + } + + rocksdb_comparator_t* stateCompare = rocksdb_comparator_create(NULL, NULL, stateKeyDBComp, compareStateName); + rocksdb_options_set_comparator((rocksdb_options_t*)cfOpt[0], stateCompare); + + rocksdb_comparator_t* fillCompare = rocksdb_comparator_create(NULL, NULL, winKeyDBComp, compareWinKey); + rocksdb_options_set_comparator((rocksdb_options_t*)cfOpt[1], fillCompare); + + rocksdb_comparator_t* sessCompare = rocksdb_comparator_create(NULL, NULL, stateSessionKeyDBComp, compareSessionKey); + rocksdb_options_set_comparator((rocksdb_options_t*)cfOpt[2], sessCompare); + + rocksdb_comparator_t* funcCompare = rocksdb_comparator_create(NULL, NULL, tupleKeyDBComp, compareFuncKey); + rocksdb_options_set_comparator((rocksdb_options_t*)cfOpt[3], funcCompare); + + rocksdb_comparator_t* parnameCompare = rocksdb_comparator_create(NULL, NULL, parKeyDBComp, compareParKey); + rocksdb_options_set_comparator((rocksdb_options_t*)cfOpt[4], parnameCompare); + + rocksdb_comparator_t* partagCompare = rocksdb_comparator_create(NULL, NULL, parKeyDBComp, comparePartagKey); + rocksdb_options_set_comparator((rocksdb_options_t*)cfOpt[5], partagCompare); + + rocksdb_column_family_handle_t** cfHandle = taosMemoryMalloc(cfLen * sizeof(rocksdb_column_family_handle_t*)); + rocksdb_t* db = rocksdb_open_column_families(opts, "rocksdb", cfLen, cfName, cfOpt, cfHandle, &err); + + pState->pTdbState->rocksdb = db; + pState->pTdbState->pHandle = cfHandle; + pState->pTdbState->wopts = rocksdb_writeoptions_create(); + pState->pTdbState->ropts = rocksdb_readoptions_create(); + return 0; +} +void streamCleanBackend(SStreamState* pState) { + int cfLen = sizeof(cfName) / sizeof(cfName[0]); + for (int i = 0; i < cfLen; i++) { + rocksdb_column_family_handle_destroy(pState->pTdbState->pHandle[i]); + } + rocksdb_close(pState->pTdbState->rocksdb); +} + +int streamGetInit(const char* funcName) { + for (int i = 0; i < sizeof(ginitDict) / sizeof(ginitDict[0]); i++) { + if (strncmp(funcName, ginitDict[i].key, strlen(funcName)) == 0) { + return i; + } + } + return -1; +} + +#define STREAM_STATE_PUT_ROCKSDB(pState, funcname, key, value, vLen) \ + do { \ + char buf[128] = {0}; \ + char* err = NULL; \ + int i = streamGetInit(funcname); \ + if (i < 0) { \ + return -1; \ + } \ + ginitDict[i].enFunc((void*)key, buf); \ + rocksdb_column_family_handle_t* pHandle = pState->pTdbState->pHandle[ginitDict[i].idx]; \ + rocksdb_t* db = pState->pTdbState->rocksdb; \ + rocksdb_writeoptions_t* opts = pState->pTdbState->wopts; \ + rocksdb_put_cf(db, opts, pHandle, (const char*)buf, sizeof(*key), (const char*)value, (size_t)vLen, &err); \ + if (err != NULL) { \ + taosMemoryFree(err); \ + code = -1; \ + } \ + code = 0; \ + } while (0); + +#define STREAM_STATE_GET_ROCKSDB(pState, funcname, key, pVal, vLen) \ + do { \ + char buf[128] = {0}; \ + char* err = NULL; \ + int i = streamGetInit(funcname); \ + if (i < 0) { \ + return -1; \ + } \ + ginitDict[i].enFunc((void*)key, buf); \ + rocksdb_column_family_handle_t* pHandle = pState->pTdbState->pHandle[ginitDict[i].idx]; \ + rocksdb_t* db = pState->pTdbState->rocksdb; \ + rocksdb_readoptions_t* opts = pState->pTdbState->ropts; \ + char* val = rocksdb_get_cf(db, opts, pHandle, (const char*)buf, sizeof(*key), (size_t*)vLen, &err); \ + *pVal = val; \ + if (err != NULL) { \ + taosMemoryFree(err); \ + code = -1; \ + } \ + code = 0; \ + } while (0); + +#define STREAM_STATE_DEL_ROCKSDB(pState, funcname, key) \ + do { \ + char buf[128] = {0}; \ + char* err = NULL; \ + int i = streamGetInit(funcname); \ + if (i < 0) { \ + return -1; \ + } \ + ginitDict[i].enFunc((void*)key, buf); \ + rocksdb_column_family_handle_t* pHandle = pState->pTdbState->pHandle[ginitDict[i].idx]; \ + rocksdb_t* db = pState->pTdbState->rocksdb; \ + rocksdb_writeoptions_t* opts = pState->pTdbState->wopts; \ + rocksdb_delete_cf(db, opts, pHandle, (const char*)buf, sizeof(*key), &err); \ + if (err != NULL) { \ + taosMemoryFree(err); \ + code = -1; \ + } \ + code = 0; \ + } while (0); + +int32_t streamStateFuncPut_rocksdb(SStreamState* pState, const STupleKey* key, const void* value, int32_t vLen) { + int code = 0; + STREAM_STATE_PUT_ROCKSDB(pState, "func", key, value, vLen); + return code; +} +int32_t streamStateFuncGet_rocksdb(SStreamState* pState, const STupleKey* key, void** pVal, int32_t* pVLen) { + int code = 0; + STREAM_STATE_GET_ROCKSDB(pState, "func", key, pVal, pVLen); + return 0; +} +int32_t streamStateFuncDel_rocksdb(SStreamState* pState, const STupleKey* key) { + int code = 0; + STREAM_STATE_DEL_ROCKSDB(pState, "func", key); + return 0; +} + +int32_t streamStatePut_rocksdb(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) { + int code = 0; + + SStateKey sKey = {.key = *key, .opNum = pState->number}; + STREAM_STATE_PUT_ROCKSDB(pState, "default", key, value, vLen); + return code; +} +int32_t streamStateGet_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) { + int code = 0; + SStateKey sKey = {.key = *key, .opNum = pState->number}; + STREAM_STATE_GET_ROCKSDB(pState, "default", key, pVal, pVLen); + return code; +} +// todo refactor +int32_t streamStateDel_rocksdb(SStreamState* pState, const SWinKey* key) { + int code = 0; + SStateKey sKey = {.key = *key, .opNum = pState->number}; + STREAM_STATE_DEL_ROCKSDB(pState, "default", key); + return code; +} + +// todo refactor +int32_t streamStateFillPut_rocksdb(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) { + int code = 0; + + STREAM_STATE_PUT_ROCKSDB(pState, "fill", key, value, vLen); + return code; +} + +// todo refactor +int32_t streamStateFillGet_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) { + int code = 0; + STREAM_STATE_GET_ROCKSDB(pState, "fill", key, pVal, pVLen); + return code; +} + +int32_t streamStateSessionPut_rocksdb(SStreamState* pState, const SSessionKey* key, const void* value, int32_t vLen) { + int code = 0; + SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; + STREAM_STATE_PUT_ROCKSDB(pState, "sess", key, value, vLen); + return code; +} +SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pState, SSessionKey* key) { + SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); + if (pCur == NULL) { + return NULL; + } + pCur->iter = + rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->ropts, pState->pTdbState->pHandle[2]); + pCur->number = pState->number; + SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; + char buf[128] = {0}; + stateSessionKeyEncode(&sKey, buf); + rocksdb_iter_seek(pCur->iter, (const char*)buf, sizeof(sKey)); + if (!rocksdb_iter_valid(pCur->iter)) { + streamStateFreeCur(pCur); + return NULL; + } + size_t klen; + const char* iKey = rocksdb_iter_key(pCur->iter, &klen); + SStateSessionKey curKey = {0}; + stateSessionKeyDecode(&curKey, (char*)iKey); + if (stateSessionKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) <= 0) return pCur; + + rocksdb_iter_next(pCur->iter); + if (!rocksdb_iter_valid(pCur->iter)) { + streamStateFreeCur(pCur); + return NULL; + } + return pCur; +} +int32_t streamStateSessionGet_rocksdb(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen) { + int code = 0; + SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext(pState, key); + SSessionKey resKey = *key; + + // impl later + return code; +} + +int32_t streamStateSessionDel(SStreamState* pState, const SSessionKey* key) { + int code = 0; + SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; + STREAM_STATE_DEL_ROCKSDB(pState, "sess", key); + return code; +} +int32_t streamStatePutParTag_rocksdb(SStreamState* pState, int64_t groupId, const void* tag, int32_t tagLen) { + int code = 0; + STREAM_STATE_PUT_ROCKSDB(pState, "partag", &groupId, tag, tagLen); + return code; +} + +int32_t streamStateGetParTag_rocksdb(SStreamState* pState, int64_t groupId, void** tagVal, int32_t* tagLen) { + int code = 0; + STREAM_STATE_GET_ROCKSDB(pState, "partag", &groupId, tagVal, tagLen); + return code; +} + +int32_t streamStatePutParName_rocksdb(SStreamState* pState, int64_t groupId, const char tbname[TSDB_TABLE_NAME_LEN]) { + int code = 0; + STREAM_STATE_PUT_ROCKSDB(pState, "parname", &groupId, tbname, TSDB_TABLE_NAME_LEN); + return code; +} +int32_t streamStateGetParName_rocksdb(SStreamState* pState, int64_t groupId, void** pVal) { + int code = 0; + size_t tagLen; + STREAM_STATE_GET_ROCKSDB(pState, "parname", &groupId, pVal, &tagLen); + return code; +} + +void streamStateDestroy_rocksdb(SStreamState* pState) { + // only close db + streamCleanBackend(pState); +} \ No newline at end of file