From 2d37aeabdca461e6faa86743c8a42569d125ee67 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 20 Mar 2023 15:33:18 +0000 Subject: [PATCH] add backend --- contrib/test/rocksdb/main.c | 222 ++++++++++++++++++++++++--- include/libs/stream/streamState.h | 8 +- source/libs/stream/src/streamState.c | 182 ++++++++++++++++++++-- 3 files changed, 373 insertions(+), 39 deletions(-) diff --git a/contrib/test/rocksdb/main.c b/contrib/test/rocksdb/main.c index d1cbd373da..84bada50dd 100644 --- a/contrib/test/rocksdb/main.c +++ b/contrib/test/rocksdb/main.c @@ -1,4 +1,5 @@ #include +#include #include #include #include @@ -9,38 +10,215 @@ const char DBPath[] = "rocksdb_c_simple_example"; const char DBBackupPath[] = "/tmp/rocksdb_c_simple_example_backup"; +#define POINTER_SHIFT(p, b) ((void *)((char *)(p) + (b))) +static int32_t taosEncodeFixedU64(void **buf, uint64_t value) { + if (buf != NULL) { + ((uint8_t *)(*buf))[0] = value & 0xff; + ((uint8_t *)(*buf))[1] = (value >> 8) & 0xff; + ((uint8_t *)(*buf))[2] = (value >> 16) & 0xff; + ((uint8_t *)(*buf))[3] = (value >> 24) & 0xff; + ((uint8_t *)(*buf))[4] = (value >> 32) & 0xff; + ((uint8_t *)(*buf))[5] = (value >> 40) & 0xff; + ((uint8_t *)(*buf))[6] = (value >> 48) & 0xff; + ((uint8_t *)(*buf))[7] = (value >> 56) & 0xff; + *buf = POINTER_SHIFT(*buf, sizeof(value)); + } + return (int32_t)sizeof(value); +} + +static void *taosDecodeFixedU64(const void *buf, uint64_t *value) { + ((uint8_t *)value)[7] = ((uint8_t *)buf)[0]; + ((uint8_t *)value)[6] = ((uint8_t *)buf)[1]; + ((uint8_t *)value)[5] = ((uint8_t *)buf)[2]; + ((uint8_t *)value)[4] = ((uint8_t *)buf)[3]; + ((uint8_t *)value)[3] = ((uint8_t *)buf)[4]; + ((uint8_t *)value)[2] = ((uint8_t *)buf)[5]; + ((uint8_t *)value)[1] = ((uint8_t *)buf)[6]; + ((uint8_t *)value)[0] = ((uint8_t *)buf)[7]; + return POINTER_SHIFT(buf, sizeof(*value)); +} + +typedef struct KV { + uint64_t k1; + uint64_t k2; +} KV; + +int kvSerial(KV *kv, char *buf) { + int len = 0; + buf[0] = 'a'; + buf += 1; + len += taosEncodeFixedU64((void **)&buf, kv->k1); + len += taosEncodeFixedU64((void **)&buf, kv->k2); + return len; +} +const char *kvDBName(void *name) { return "kvDBname"; } +int kvDBComp(void *state, const char *aBuf, size_t aLen, const char *bBuf, size_t bLen) { + KV w1, w2; + + memset(&w1, 0, sizeof(w1)); + memset(&w2, 0, sizeof(w2)); + + char *p1 = (char *)aBuf; + char *p2 = (char *)bBuf; + p1 += 1; + p2 += 1; + + p1 = taosDecodeFixedU64(p1, &w1.k1); + p2 = taosDecodeFixedU64(p2, &w2.k1); + + p1 = taosDecodeFixedU64(p1, &w1.k2); + p2 = taosDecodeFixedU64(p2, &w2.k2); + + if (w1.k1 < w2.k1) { + return -1; + } else if (w1.k1 > w2.k1) { + return 1; + } + + if (w1.k2 < w2.k2) { + return -1; + } else if (w1.k2 > w2.k2) { + return 1; + } + return 0; +} +int kvDeserial(KV *kv, char *buf) { + char *p1 = (char *)buf; + p1 += 1; + p1 = taosDecodeFixedU64(p1, &kv->k1); + p1 = taosDecodeFixedU64(p1, &kv->k2); + + return 0; +} + int main(int argc, char const *argv[]) { - rocksdb_t * db; + rocksdb_t *db; rocksdb_backup_engine_t *be; - rocksdb_options_t * options = rocksdb_options_create(); - rocksdb_options_set_create_if_missing(options, 1); - // open DB - char *err = NULL; - db = rocksdb_open(options, DBPath, &err); + char *err = NULL; + const char *path = "/tmp/db"; - // Write - rocksdb_writeoptions_t *writeoptions = rocksdb_writeoptions_create(); - rocksdb_put(db, writeoptions, "key", 3, "value", 5, &err); + rocksdb_options_t *opt = rocksdb_options_create(); + rocksdb_options_set_create_if_missing(opt, 1); + rocksdb_options_set_create_missing_column_families(opt, 1); - // Read - rocksdb_readoptions_t *readoptions = rocksdb_readoptions_create(); - rocksdb_readoptions_set_snapshot(readoptions, rocksdb_create_snapshot(db)); - size_t vallen = 0; - char * val = rocksdb_get(db, readoptions, "key", 3, &vallen, &err); - printf("val:%s\n", val); + const char *cfName[] = {"default", "cf1"}; + int len = sizeof(cfName) / sizeof(cfName[0]); - // Update - // rocksdb_put(db, writeoptions, "key", 3, "eulav", 5, &err); + const rocksdb_options_t **cfOpt = malloc(len * sizeof(rocksdb_options_t *)); + for (int i = 0; i < len; i++) { + cfOpt[i] = opt; + } - // Delete - rocksdb_delete(db, writeoptions, "key", 3, &err); + rocksdb_column_family_handle_t **cfHandle = malloc(len * sizeof(rocksdb_column_family_handle_t *)); + db = rocksdb_open_column_families(opt, "test", len, cfName, cfOpt, cfHandle, &err); - // Read again - val = rocksdb_get(db, readoptions, "key", 3, &vallen, &err); - printf("val:%s\n", val); + { + rocksdb_readoptions_t *rOpt = rocksdb_readoptions_create(); + size_t vlen = 0; + char *v = rocksdb_get_cf(db, rOpt, cfHandle[0], "key", strlen("key"), &vlen, &err); + printf("Get value %s, and len = %d\n", v, (int)vlen); + + char *v1 = rocksdb_get_cf(db, rOpt, cfHandle[1], "key", strlen("key"), &vlen, &err); + printf("Get value %s, and len = %d\n", v1, (int)vlen); + rocksdb_readoptions_destroy(rOpt); + } + + rocksdb_writeoptions_t *wOpt = rocksdb_writeoptions_create(); + rocksdb_writebatch_t *wBatch = rocksdb_writebatch_create(); + rocksdb_writebatch_put_cf(wBatch, cfHandle[0], "key", strlen("key"), "value", strlen("value")); + rocksdb_write(db, wOpt, wBatch, &err); + + rocksdb_readoptions_t *rOpt = rocksdb_readoptions_create(); + size_t vlen = 0; + + char *v = rocksdb_get_cf(db, rOpt, cfHandle[0], "key", strlen("key"), &vlen, &err); + printf("Get value %s, and len = %d\n", v, (int)vlen); + + rocksdb_column_family_handle_destroy(cfHandle[0]); + rocksdb_column_family_handle_destroy(cfHandle[1]); rocksdb_close(db); + // { + // // rocksdb_options_t *Options = rocksdb_options_create(); + // db = rocksdb_open(comm, path, &err); + // if (db != NULL) { + // rocksdb_options_t *cfo = rocksdb_options_create_copy(comm); + // rocksdb_comparator_t *cmp1 = rocksdb_comparator_create(NULL, NULL, kvDBComp, kvDBName); + // rocksdb_options_set_comparator(cfo, cmp1); + + // rocksdb_column_family_handle_t *handle = rocksdb_create_column_family(db, cfo, "cf1", &err); + + // rocksdb_column_family_handle_destroy(handle); + // rocksdb_close(db); + // db = NULL; + // } + // } + + // int ncf = 2; + + // rocksdb_column_family_handle_t **pHandle = malloc(ncf * sizeof(rocksdb_column_family_handle_t *)); + + // { + // rocksdb_options_t *options = rocksdb_options_create_copy(comm); + + // rocksdb_comparator_t *cmp1 = rocksdb_comparator_create(NULL, NULL, kvDBComp, kvDBName); + // rocksdb_options_t *dbOpts1 = rocksdb_options_create_copy(comm); + // rocksdb_options_t *dbOpts2 = rocksdb_options_create_copy(comm); + // rocksdb_options_set_comparator(dbOpts2, cmp1); + // // rocksdb_column_family_handle_t *cf = rocksdb_create_column_family(db, dbOpts1, "cmp1", &err); + + // const char *pName[] = {"default", "cf1"}; + + // const rocksdb_options_t **pOpts = malloc(ncf * sizeof(rocksdb_options_t *)); + // pOpts[0] = dbOpts1; + // pOpts[1] = dbOpts2; + + // rocksdb_options_t *allOptions = rocksdb_options_create_copy(comm); + // db = rocksdb_open_column_families(allOptions, "test", ncf, pName, pOpts, pHandle, &err); + // } + + // // rocksdb_options_t *options = rocksdb_options_create(); + // // rocksdb_options_set_create_if_missing(options, 1); + + // // //rocksdb_open_column_families(const rocksdb_options_t *options, const char *name, int num_column_families, + // // const char *const *column_family_names, + // // const rocksdb_options_t *const *column_family_options, + // // rocksdb_column_family_handle_t **column_family_handles, char **errptr); + + // for (int i = 0; i < 100; i++) { + // char buf[128] = {0}; + + // rocksdb_writeoptions_t *wopt = rocksdb_writeoptions_create(); + // KV kv = {.k1 = i, .k2 = i}; + // kvSerial(&kv, buf); + // rocksdb_put_cf(db, wopt, pHandle[0], buf, strlen(buf), (const char *)&i, sizeof(i), &err); + // } + + // rocksdb_close(db); + // Write + // rocksdb_writeoptions_t *writeoptions = rocksdb_writeoptions_create(); + // rocksdb_put(db, writeoptions, "key", 3, "value", 5, &err); + + //// Read + // rocksdb_readoptions_t *readoptions = rocksdb_readoptions_create(); + // rocksdb_readoptions_set_snapshot(readoptions, rocksdb_create_snapshot(db)); + // size_t vallen = 0; + // char *val = rocksdb_get(db, readoptions, "key", 3, &vallen, &err); + // printf("val:%s\n", val); + + //// Update + //// rocksdb_put(db, writeoptions, "key", 3, "eulav", 5, &err); + + //// Delete + // rocksdb_delete(db, writeoptions, "key", 3, &err); + + //// Read again + // val = rocksdb_get(db, readoptions, "key", 3, &vallen, &err); + // printf("val:%s\n", val); + + // rocksdb_close(db); + return 0; } \ No newline at end of file diff --git a/include/libs/stream/streamState.h b/include/libs/stream/streamState.h index 7b80c63eb8..d893804639 100644 --- a/include/libs/stream/streamState.h +++ b/include/libs/stream/streamState.h @@ -32,7 +32,13 @@ typedef bool (*state_key_cmpr_fn)(void* pKey1, void* pKey2); typedef struct STdbState { SStreamTask* pOwner; - rocksdb_t* rocksdb; + rocksdb_t* rocksdb; + rocksdb_column_family_handle_t** pHandle; + // rocksdb_column_family_handle_t* fillStateDB; + // rocksdb_column_family_handle_t* sessStateDB; + // rocksdb_column_family_handle_t* funcStateDB; + // rocksdb_column_family_handle_t* parnameStateDB; + // rocksdb_column_family_handle_t* partagStateDB; TDB* db; TTB* pStateDb; diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 15d7ec2425..4fa45c0148 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -14,9 +14,13 @@ */ #include "streamState.h" +#include +#include #include "executor.h" +#include "osMemory.h" #include "rocksdb/c.h" #include "streamInc.h" +#include "tcoding.h" #include "tcommon.h" #include "tcompare.h" #include "ttimer.h" @@ -109,13 +113,143 @@ static inline int stateKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, } // -// +// SStateKey // |--groupid--|---ts------|--opNum----| // |--uint64_t-|-uint64_t--|--int64_t--| // // // -int compareState(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen) { return -1; } +int stateKeyDBComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen) { + SStateKey key1, key2; + memset(&key1, 0, sizeof(key1)); + memset(&key2, 0, sizeof(key2)); + + char* p1 = (char*)aBuf; + char* p2 = (char*)bBuf; + + p1 = taosDecodeFixedU64(p1, &key1.key.groupId); + p2 = taosDecodeFixedU64(p2, &key2.key.groupId); + + p1 = taosDecodeFixedI64(p1, &key1.key.ts); + p2 = taosDecodeFixedI64(p2, &key2.key.ts); + + taosDecodeFixedI64(p1, &key1.opNum); + taosDecodeFixedI64(p2, &key2.opNum); + + return stateKeyCmpr(&key1, sizeof(key1), &key2, sizeof(key2)); +} + +int stateKeySerial(SStateKey* key, char* buf) { + int len = 0; + len += taosEncodeFixedU64((void**)&buf, key->key.groupId); + len += taosEncodeFixedI64((void**)&buf, key->key.ts); + len += taosEncodeFixedU64((void**)&buf, key->opNum); + return len; +} + +// +// SStateSessionKey +// |-----------SSessionKey----------| +// |-----STimeWindow-----| +// |---skey--|---ekey----|--groupId-|--opNum--| +// |---int64-|--int64_t--|--uint64--|--int64_t| +// | +// +int stateSessionKeyDBComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen) { + SStateSessionKey w1, w2; + memset(&w1, 0, sizeof(w1)); + memset(&w2, 0, sizeof(w2)); + + char* p1 = (char*)aBuf; + char* p2 = (char*)bBuf; + + p1 = taosDecodeFixedI64(p1, &w1.key.win.skey); + p2 = taosDecodeFixedI64(p2, &w2.key.win.skey); + + p1 = taosDecodeFixedI64(p1, &w1.key.win.ekey); + p2 = taosDecodeFixedI64(p2, &w2.key.win.ekey); + + p1 = taosDecodeFixedU64(p1, &w1.key.groupId); + p2 = taosDecodeFixedU64(p2, &w2.key.groupId); + + p1 = taosDecodeFixedI64(p1, &w1.opNum); + p2 = taosDecodeFixedI64(p2, &w2.opNum); + + return stateSessionKeyCmpr(&w1, sizeof(w1), &w2, sizeof(w2)); +} +int stateSessionKeySerial(SStateSessionKey* sess, char* buf) { + int len = 0; + len += taosEncodeFixedI64((void**)&buf, sess->key.win.skey); + len += taosEncodeFixedI64((void**)&buf, sess->key.win.ekey); + len += taosEncodeFixedU64((void**)&buf, sess->key.groupId); + len += taosEncodeFixedI64((void**)&buf, sess->opNum); + return len; +} + +/** + * SWinKey + * |------groupId------|-----ts------| + * |------uint64-------|----int64----| + */ +int winKeyDBComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen) { + SWinKey w1, w2; + memset(&w1, 0, sizeof(w1)); + memset(&w2, 0, sizeof(w2)); + + char* p1 = (char*)aBuf; + char* p2 = (char*)bBuf; + + p1 = taosDecodeFixedU64(p1, &w1.groupId); + p2 = taosDecodeFixedU64(p2, &w2.groupId); + + p1 = taosDecodeFixedI64(p1, &w1.ts); + p2 = taosDecodeFixedI64(p2, &w2.ts); + + return winKeyCmpr(&w1, sizeof(w1), &w2, sizeof(w2)); +} + +int winKeySerial(SWinKey* key, char* buf) { + int len = 0; + len += taosEncodeFixedU64((void**)&buf, key->groupId); + len += taosEncodeFixedI64((void**)&buf, key->ts); + return len; +} + +/* + * STupleKey + * |---groupId---|---ts---|---exprIdx---| + * |---uint64--|---int64--|---int32-----| + */ +int tupleKeyDBComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen) { + STupleKey w1, w2; + memset(&w1, 0, sizeof(w1)); + memset(&w2, 0, sizeof(w2)); + + char* p1 = (char*)aBuf; + char* p2 = (char*)bBuf; + + p1 = taosDecodeFixedU64(p1, &w1.groupId); + p2 = taosDecodeFixedU64(p2, &w2.groupId); + + p1 = taosDecodeFixedI64(p1, &w1.ts); + p2 = taosDecodeFixedI64(p2, &w2.ts); + + p1 = taosDecodeFixedI32(p1, &w1.exprIdx); + p2 = taosDecodeFixedI32(p2, &w2.exprIdx); + + return STupleKeyCmpr(&w1, sizeof(w1), &w2, sizeof(w2)); +} + +int tupleKeySerial(STupleKey* key, char* buf) { + int len = 0; + len += taosEncodeFixedU64((void**)&buf, key->groupId); + len += taosEncodeFixedI64((void**)&buf, key->ts); + len += taosEncodeFixedI32((void**)&buf, key->exprIdx); + return len; +} + +const char* cfName[] = {"default", "fill", "sess", "func", "parname", "partag"}; + const char* compareStateName(void* name) { return NULL; } int streamInitBackend(SStreamState* pState, char* path) { rocksdb_options_t* opts = rocksdb_options_create(); @@ -123,30 +257,46 @@ int streamInitBackend(SStreamState* pState, char* path) { rocksdb_options_optimize_level_style_compaction(opts, 0); // create the DB if it's not already present rocksdb_options_set_create_if_missing(opts, 1); + rocksdb_options_set_create_missing_column_families(opts, 1); - rocksdb_comparator_t* cmp1 = rocksdb_comparator_create(NULL, NULL, compareState, compareStateName); - rocksdb_comparator_t* cmp2 = rocksdb_comparator_create(NULL, NULL, compareState, compareStateName); + char* err = NULL; + int cfLen = sizeof(cfName) / sizeof(cfName[0]); - char* err = NULL; - rocksdb_t* db = rocksdb_open(opts, path, &err); - if (err == NULL) { - pState->pTdbState->rocksdb = db; + const rocksdb_options_t** cfOpt = taosMemoryCalloc(cfLen, sizeof(rocksdb_options_t*)); + for (int i = 0; i < cfLen; i++) { + cfOpt[i] = rocksdb_options_create_copy(opts); } - rocksdb_options_t* dbOpts1 = rocksdb_options_create_copy(opts); - rocksdb_options_t* dbOpts2 = rocksdb_options_create_copy(opts); + rocksdb_comparator_t* fillCompare = rocksdb_comparator_create(NULL, NULL, stateKeyDBComp, compareStateName); + rocksdb_options_set_comparator((rocksdb_options_t*)cfOpt[1], fillCompare); - rocksdb_options_set_comparator(dbOpts1, cmp1); - rocksdb_options_set_comparator(dbOpts2, cmp2); + rocksdb_comparator_t* sessCompare = rocksdb_comparator_create(NULL, NULL, stateKeyDBComp, compareStateName); + rocksdb_options_set_comparator((rocksdb_options_t*)cfOpt[2], sessCompare); - rocksdb_column_family_handle_t* cf1 = rocksdb_create_column_family(db, dbOpts1, "cmp1", &err); - rocksdb_column_family_handle_t* cf2 = rocksdb_create_column_family(db, dbOpts2, "cmp2", &err); + rocksdb_comparator_t* funcCompare = rocksdb_comparator_create(NULL, NULL, stateKeyDBComp, compareStateName); + rocksdb_options_set_comparator((rocksdb_options_t*)cfOpt[3], funcCompare); - rocksdb_writebatch_t* wp = rocksdb_writeoptions_create(); - rocksdb_put_cf(db, wp, cf1, NULL, 0, NULL, 0, &err); + rocksdb_comparator_t* parnameCompare = rocksdb_comparator_create(NULL, NULL, stateKeyDBComp, compareStateName); + rocksdb_options_set_comparator((rocksdb_options_t*)cfOpt[4], parnameCompare); + rocksdb_comparator_t* partagCompare = rocksdb_comparator_create(NULL, NULL, stateKeyDBComp, compareStateName); + rocksdb_options_set_comparator((rocksdb_options_t*)cfOpt[5], partagCompare); + + rocksdb_column_family_handle_t** cfHandle = taosMemoryMalloc(cfLen * sizeof(rocksdb_column_family_handle_t*)); + rocksdb_t* db = rocksdb_open_column_families(opts, "rocksdb", cfLen, cfName, cfOpt, cfHandle, &err); + + pState->pTdbState->rocksdb = db; + pState->pTdbState->pHandle = cfHandle; return 0; } +void streamCleanBackend(SStreamState* pState) { + int cfLen = sizeof(cfName) / sizeof(cfName[0]); + for (int i = 0; i < cfLen; i++) { + rocksdb_column_family_handle_destroy(pState->pTdbState->pHandle[i]); + } + rocksdb_close(pState->pTdbState->rocksdb); +} + SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int32_t szPage, int32_t pages) { SStreamState* pState = taosMemoryCalloc(1, sizeof(SStreamState)); if (pState == NULL) {