From fcd1de8f62d026f057761045675d93bf85ef8d3a Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Tue, 12 Oct 2021 17:09:06 +0800 Subject: [PATCH 01/24] more --- source/libs/tkv/CMakeLists.txt | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/source/libs/tkv/CMakeLists.txt b/source/libs/tkv/CMakeLists.txt index 6ad553b1ca..6f68cc5a9a 100644 --- a/source/libs/tkv/CMakeLists.txt +++ b/source/libs/tkv/CMakeLists.txt @@ -8,4 +8,10 @@ target_include_directories( target_link_libraries( tkv PUBLIC os -) \ No newline at end of file +) +if(${BUILD_WITH_ROCKSDB}) + target_link_libraries( + tkv + PUBLIC rocksdb + ) +endif(${BUILD_WITH_ROCKSDB}) From 9179423f2ff4f26bc2cc93afd8debd46fe5571a0 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Tue, 12 Oct 2021 18:11:48 +0800 Subject: [PATCH 02/24] refact --- CMakeLists.txt | 3 +- include/libs/tkv/tkv.h | 9 +- source/libs/tkv/CMakeLists.txt | 2 +- source/libs/tkv/src/tkv.c | 148 +++++++++++++++++++++++----- source/server/vnode/meta/src/meta.c | 5 +- 5 files changed, 134 insertions(+), 33 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 65d1e133d7..9b4a0a1b85 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -42,9 +42,10 @@ if(${BUILD_WITH_LEVELDB}) endif(${BUILD_WITH_LEVELDB}) ## rocksdb -option(BUILD_WITH_ROCKSDB "If build with rocksdb" OFF) +option(BUILD_WITH_ROCKSDB "If build with rocksdb" ON) if(${BUILD_WITH_ROCKSDB}) cat("${CMAKE_SUPPORT_DIR}/rocksdb_CMakeLists.txt.in" ${DEPS_TMP_FILE}) + add_definitions(-DUSE_ROCKSDB) endif(${BUILD_WITH_ROCKSDB}) ## download dependencies diff --git a/include/libs/tkv/tkv.h b/include/libs/tkv/tkv.h index d9cabc1e40..36041767cf 100644 --- a/include/libs/tkv/tkv.h +++ b/include/libs/tkv/tkv.h @@ -32,13 +32,14 @@ typedef struct STkvWriteOpts STkvWriteOpts; // DB operations STkvDb *tkvOpen(const STkvOpts *options, const char *path); void tkvClose(STkvDb *db); -void tkvPut(STkvDb *db, STkvWriteOpts *, char *key, size_t keylen, char *val, size_t vallen); -char * tkvGet(STkvDb *db, STkvReadOpts *, char *key, size_t keylen, size_t *vallen); +void tkvPut(STkvDb *db, const STkvWriteOpts *, const char *key, size_t keylen, const char *val, size_t vallen); +char * tkvGet(STkvDb *db, const STkvReadOpts *, const char *key, size_t keylen, size_t *vallen); // DB options -STkvOpts *tkvOptionsCreate(); -void tkvOptionsDestroy(STkvOpts *); +STkvOpts *tkvOptsCreate(); +void tkvOptsDestroy(STkvOpts *); void tkvOptionsSetCache(STkvOpts *, STkvCache *); +void tkvOptsSetCreateIfMissing(STkvOpts *, unsigned char); // DB cache typedef enum { TKV_LRU_CACHE = 0, TKV_LFU_CACHE = 1 } ETkvCacheType; diff --git a/source/libs/tkv/CMakeLists.txt b/source/libs/tkv/CMakeLists.txt index 6f68cc5a9a..529b2703f7 100644 --- a/source/libs/tkv/CMakeLists.txt +++ b/source/libs/tkv/CMakeLists.txt @@ -1,5 +1,5 @@ aux_source_directory(src TKV_SRC) -add_library(tkv ${TKV_SRC}) +add_library(tkv STATIC ${TKV_SRC}) target_include_directories( tkv PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/tkv" diff --git a/source/libs/tkv/src/tkv.c b/source/libs/tkv/src/tkv.c index 9c78c23db4..47213f9f02 100644 --- a/source/libs/tkv/src/tkv.c +++ b/source/libs/tkv/src/tkv.c @@ -13,69 +13,167 @@ * along with this program. If not, see . */ +#ifdef USE_ROCKSDB +#include +#endif + #include "tkv.h" struct STkvDb { - // TODO +#ifdef USE_ROCKSDB + rocksdb_t *db; +#endif }; + struct STkvOpts { - // TODO +#ifdef USE_ROCKSDB + rocksdb_options_t *opts; +#endif }; + struct STkvCache { // TODO }; + struct STkvReadOpts { - // TODO +#ifdef USE_ROCKSDB + rocksdb_readoptions_t *ropts; +#endif }; + struct STkvWriteOpts { - // TODO +#ifdef USE_ROCKSDB + rocksdb_writeoptions_t *wopts; +#endif }; STkvDb *tkvOpen(const STkvOpts *options, const char *path) { - // TODO - return NULL; + STkvDb *pDb = NULL; + + pDb = (STkvDb *)malloc(sizeof(*pDb)); + if (pDb == NULL) { + return NULL; + } + +#ifdef USE_ROCKSDB + char *err = NULL; + + pDb->db = rocksdb_open(options->opts, path, &err); + // TODO: check err +#endif + + return pDb; } -void tkvClose(STkvDb *db) { - // TODO +void tkvClose(STkvDb *pDb) { + if (pDb) { +#ifdef USE_ROCKSDB + rocksdb_close(pDb->db); +#endif + free(pDb); + } } -void tkvPut(STkvDb *db, STkvWriteOpts *pwopts, char *key, size_t keylen, char *val, size_t vallen) { - // TODO +void tkvPut(STkvDb *pDb, const STkvWriteOpts *pwopts, const char *key, size_t keylen, const char *val, size_t vallen) { +#ifdef USE_ROCKSDB + char *err = NULL; + rocksdb_put(pDb->db, pwopts->wopts, key, keylen, val, vallen, &err); + // TODO: check error +#endif } -char *tkvGet(STkvDb *db, STkvReadOpts *propts, char *key, size_t keylen, size_t *vallen) { - // TODO - return NULL; +char *tkvGet(STkvDb *pDb, const STkvReadOpts *propts, const char *key, size_t keylen, size_t *vallen) { + char *ret = NULL; + +#ifdef USE_ROCKSDB + char *err = NULL; + ret = rocksdb_get(pDb->db, propts->ropts, key, keylen, vallen, &err); + // TODD: check error +#endif + + return ret; } -STkvOpts *tkvOptionsCreate() { - // TODO - return NULL; +STkvOpts *tkvOptsCreate() { + STkvOpts *pOpts = NULL; + + pOpts = (STkvOpts *)malloc(sizeof(*pOpts)); + if (pOpts == NULL) { + return NULL; + } + +#ifdef USE_ROCKSDB + pOpts->opts = rocksdb_options_create(); + // TODO: check error +#endif + + return pOpts; } -void tkvOptionsDestroy(STkvOpts *popts) { - // TODO +void tkvOptsDestroy(STkvOpts *pOpts) { + if (pOpts) { +#ifdef USE_ROCKSDB + rocksdb_options_destroy(pOpts->opts); +#endif + free(pOpts); + } } void tkvOptionsSetCache(STkvOpts *popts, STkvCache *pCache) { // TODO } -STkvReadOpts *tkvReadOptsCreate() { - // TODO - return NULL; +void tkvOptsSetCreateIfMissing(STkvOpts *pOpts, unsigned char c) { +#ifdef USE_ROCKSDB + rocksdb_options_set_create_if_missing(pOpts->opts, c); +#endif } -void tkvReadOptsDestroy(STkvReadOpts *propts) { - // TODO +STkvReadOpts *tkvReadOptsCreate() { + STkvReadOpts *pReadOpts = NULL; + + pReadOpts = (STkvReadOpts *)malloc(sizeof(*pReadOpts)); + if (pReadOpts == NULL) { + return NULL; + } + +#ifdef USE_ROCKSDB + pReadOpts->ropts = rocksdb_readoptions_create(); +#endif + + return pReadOpts; +} + +void tkvReadOptsDestroy(STkvReadOpts *pReadOpts) { + if (pReadOpts) { +#ifdef USE_ROCKSDB + rocksdb_readoptions_destroy(pReadOpts->ropts); +#endif + free(pReadOpts); + } } STkvWriteOpts *tkvWriteOptsCreate() { - // TODO + STkvWriteOpts *pWriteOpts = NULL; + + pWriteOpts = (STkvWriteOpts *)malloc(sizeof(*pWriteOpts)); + if (pWriteOpts == NULL) { return NULL; + } + +#ifdef USE_ROCKSDB + pWriteOpts->wopts = rocksdb_writeoptions_create(); +#endif + + return pWriteOpts; } -void tkvWriteOptsDestroy(STkvWriteOpts *pwopts) { +void tkvWriteOptsDestroy(STkvWriteOpts *pWriteOpts) { + if (pWriteOpts) { +#ifdef USE_ROCKSDB + rocksdb_writeoptions_destroy(pWriteOpts->wopts); +#endif + free(pWriteOpts); + } // TODO } \ No newline at end of file diff --git a/source/server/vnode/meta/src/meta.c b/source/server/vnode/meta/src/meta.c index c63aa31b30..88d0ff2f16 100644 --- a/source/server/vnode/meta/src/meta.c +++ b/source/server/vnode/meta/src/meta.c @@ -73,7 +73,8 @@ SMeta *metaOpen(SMetaOpts *options) { pMeta->stbList = tdListNew(sizeof(STableObj *)); // Options - STkvOpts *dbOptions = tkvOptionsCreate(); + STkvOpts *dbOptions = tkvOptsCreate(); + tkvOptsSetCreateIfMissing(dbOptions, 1); taosMkDir("meta"); @@ -89,7 +90,7 @@ SMeta *metaOpen(SMetaOpts *options) { // Open tag index pMeta->tagIdx = tkvOpen(dbOptions, "meta/tag_idx_db"); - tkvOptionsDestroy(dbOptions); + tkvOptsDestroy(dbOptions); return pMeta; } From 36f1f3088437f2a4195eee1e7dc8fecfb1801191 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Wed, 13 Oct 2021 10:43:26 +0800 Subject: [PATCH 03/24] more --- source/server/vnode/tsdb/CMakeLists.txt | 4 +- source/server/vnode/tsdb/inc/tsdbMemTable.h | 41 +++++++++++++++++++++ source/server/vnode/tsdb/src/tsdb.c | 11 ++++++ source/server/vnode/tsdb/src/tsdbMemTable.c | 16 ++++++++ 4 files changed, 71 insertions(+), 1 deletion(-) create mode 100644 source/server/vnode/tsdb/inc/tsdbMemTable.h create mode 100644 source/server/vnode/tsdb/src/tsdbMemTable.c diff --git a/source/server/vnode/tsdb/CMakeLists.txt b/source/server/vnode/tsdb/CMakeLists.txt index 426fa2a317..339971431b 100644 --- a/source/server/vnode/tsdb/CMakeLists.txt +++ b/source/server/vnode/tsdb/CMakeLists.txt @@ -8,5 +8,7 @@ target_include_directories( target_link_libraries( tsdb PUBLIC os - PRIVATE common + PUBLIC util + PUBLIC common + PUBLIC tkv ) \ No newline at end of file diff --git a/source/server/vnode/tsdb/inc/tsdbMemTable.h b/source/server/vnode/tsdb/inc/tsdbMemTable.h new file mode 100644 index 0000000000..b13c0579ce --- /dev/null +++ b/source/server/vnode/tsdb/inc/tsdbMemTable.h @@ -0,0 +1,41 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_TSDB_MEMTABLE_H_ +#define _TD_TSDB_MEMTABLE_H_ + +#include "tdef.h" +#include "thash.h" + +#ifdef __cplusplus +extern "C" { +#endif + +typedef struct STsdbMemTable STsdbMemTable; + +/* --------------------- For compile and test only --------------------- */ +struct STsdbMemTable { + TSKEY minKey; + TSKEY maxKey; + SHashObj *tData; // uid --> SSkipList + void * mallocator; + T_REF_DECLARE() +}; + +#ifdef __cplusplus +} +#endif + +#endif /*_TD_TSDB_MEMTABLE_H_*/ \ No newline at end of file diff --git a/source/server/vnode/tsdb/src/tsdb.c b/source/server/vnode/tsdb/src/tsdb.c index 56637ec74e..7fbb0a6868 100644 --- a/source/server/vnode/tsdb/src/tsdb.c +++ b/source/server/vnode/tsdb/src/tsdb.c @@ -14,6 +14,17 @@ */ #include "tsdb.h" +#include "tkv.h" +#include "tsdbMemTable.h" + +/* -------------- -------------- */ +struct STsdb { + STsdbMemTable *mem; + STsdbMemTable *imem; + STkvDb * lrowdb; // last row cache + STkvDb * lastdb; // last cache + // TODO +}; int tsdbInsert(STsdb *tsdb, SSubmitReq *pReq, SSubmitRsp *pRsp) { return 0; } int tsdbCommit(STsdb *pTsdb) { return 0; } \ No newline at end of file diff --git a/source/server/vnode/tsdb/src/tsdbMemTable.c b/source/server/vnode/tsdb/src/tsdbMemTable.c new file mode 100644 index 0000000000..a528419dd2 --- /dev/null +++ b/source/server/vnode/tsdb/src/tsdbMemTable.c @@ -0,0 +1,16 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "tsdbMemTable.h" \ No newline at end of file From 8e8b07af4e28ae4ae4dded3840f7d60e3456ff43 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Wed, 13 Oct 2021 14:57:15 +0800 Subject: [PATCH 04/24] more --- include/libs/tkv/tkv.h | 1 + source/libs/tkv/inc/tkvRocksdb.h | 27 +++++++++++++++++++++ source/libs/tkv/src/tkvRocksdb.c | 14 +++++++++++ source/server/vnode/tsdb/src/tsdbMemTable.c | 22 ++++++++++++++++- 4 files changed, 63 insertions(+), 1 deletion(-) create mode 100644 source/libs/tkv/inc/tkvRocksdb.h create mode 100644 source/libs/tkv/src/tkvRocksdb.c diff --git a/include/libs/tkv/tkv.h b/include/libs/tkv/tkv.h index 36041767cf..edade1dc36 100644 --- a/include/libs/tkv/tkv.h +++ b/include/libs/tkv/tkv.h @@ -34,6 +34,7 @@ STkvDb *tkvOpen(const STkvOpts *options, const char *path); void tkvClose(STkvDb *db); void tkvPut(STkvDb *db, const STkvWriteOpts *, const char *key, size_t keylen, const char *val, size_t vallen); char * tkvGet(STkvDb *db, const STkvReadOpts *, const char *key, size_t keylen, size_t *vallen); +void tkvCommit(STkvDb *db); // DB options STkvOpts *tkvOptsCreate(); diff --git a/source/libs/tkv/inc/tkvRocksdb.h b/source/libs/tkv/inc/tkvRocksdb.h new file mode 100644 index 0000000000..658deb335f --- /dev/null +++ b/source/libs/tkv/inc/tkvRocksdb.h @@ -0,0 +1,27 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_TVK_ROCKSDB_H_ +#define _TD_TVK_ROCKSDB_H_ + +#ifdef __cplusplus +extern "C" { +#endif + +#ifdef __cplusplus +} +#endif + +#endif /*_TD_TVK_ROCKSDB_H_*/ \ No newline at end of file diff --git a/source/libs/tkv/src/tkvRocksdb.c b/source/libs/tkv/src/tkvRocksdb.c new file mode 100644 index 0000000000..6dea4a4e57 --- /dev/null +++ b/source/libs/tkv/src/tkvRocksdb.c @@ -0,0 +1,14 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ \ No newline at end of file diff --git a/source/server/vnode/tsdb/src/tsdbMemTable.c b/source/server/vnode/tsdb/src/tsdbMemTable.c index a528419dd2..91700a37d6 100644 --- a/source/server/vnode/tsdb/src/tsdbMemTable.c +++ b/source/server/vnode/tsdb/src/tsdbMemTable.c @@ -13,4 +13,24 @@ * along with this program. If not, see . */ -#include "tsdbMemTable.h" \ No newline at end of file +#include "tsdbMemTable.h" + +STsdbMemTable *tsdbMemTableCreate(void *mallocator) { + STsdbMemTable *pTsdbMemTable = NULL; + + pTsdbMemTable = (STsdbMemTable *)malloc(sizeof(*pTsdbMemTable)); + if (pTsdbMemTable == NULL) { + return NULL; + } + + // TODO + + return pTsdbMemTable; +} + +void tsdbMemTableDestroy(STsdbMemTable *pTsdbMemTable) { + if (pTsdbMemTable) { + // TODO + free(pTsdbMemTable); + } +} \ No newline at end of file From a1fbaf30ab6491ca27dfdd46cc109ad5bc9da113 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Wed, 13 Oct 2021 17:53:10 +0800 Subject: [PATCH 05/24] tq data structure defined --- .gitignore | 4 ++- include/server/vnode/tq/tq.h | 20 ++++++++++- source/server/vnode/tq/inc/tqInt.h | 27 +++++++++++---- source/server/vnode/tq/src/tq.c | 55 +++++++++++++++++++++++++++++- 4 files changed, 97 insertions(+), 9 deletions(-) diff --git a/.gitignore b/.gitignore index 5141448ee0..0b98a1b161 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,6 @@ build/ +compile_commands.json +.cache .ycm_extra_conf.py .vscode/ .idea/ @@ -96,4 +98,4 @@ tramp TAGS deps/* -!deps/CMakeLists.txt \ No newline at end of file +!deps/CMakeLists.txt diff --git a/include/server/vnode/tq/tq.h b/include/server/vnode/tq/tq.h index dd355c8381..eb9c57c581 100644 --- a/include/server/vnode/tq/tq.h +++ b/include/server/vnode/tq/tq.h @@ -22,8 +22,26 @@ extern "C" { #endif -typedef struct STQ STQ; +typedef struct tqTopicVhandle { + //name + // + //executor for filter + // + //callback for mnode + // +} tqTopic; +typedef struct STQ { + //the set for topics + //key=topicName: str + //value=tqTopicVhandle + + //a map + //key= + //value=consumeOffset: int64_t +} STQ; + +//init in each vnode STQ* tqInit(void* ref_func(void*), void* unref_func(void*)); void tqCleanUp(STQ*); diff --git a/source/server/vnode/tq/inc/tqInt.h b/source/server/vnode/tq/inc/tqInt.h index a51f0b03af..c42bcfef43 100644 --- a/source/server/vnode/tq/inc/tqInt.h +++ b/source/server/vnode/tq/inc/tqInt.h @@ -18,23 +18,38 @@ #include "tq.h" +#define TQ_BUFFER_SIZE 8 + #ifdef __cplusplus extern "C" { #endif -//implement the array index -//implement the ring buffer +typedef struct tqBufferItem { + int64_t offset; + void *content; +} tqBufferItem; + + +typedef struct tqGroupHandle { + char* topic; + void* ahandle; + int64_t cgId; + int64_t consumeOffset; + int32_t head; + int32_t tail; + tqBufferItem buffer[TQ_BUFFER_SIZE]; +} tqGroupHandle; //create persistent storage for meta info such as consuming offset //return value > 0: cgId //return value <= 0: error code -int tqCreateGroup(STQ*); +int tqCreateTCGroup(STQ*, const char* topic, int cgId, tqGroupHandle** handle); //create ring buffer in memory and load consuming offset -int tqOpenGroup(STQ*, int cgId); +int tqOpenTCGroup(STQ*, const char* topic, int cgId); //destroy ring buffer and persist consuming offset -int tqCloseGroup(STQ*, int cgId); +int tqCloseTCGroup(STQ*, const char* topic, int cgId); //delete persistent storage for meta info -int tqDropGroup(STQ*, int cgId); +int tqDropTCGroup(STQ*, const char* topic, int cgId); #ifdef __cplusplus } diff --git a/source/server/vnode/tq/src/tq.c b/source/server/vnode/tq/src/tq.c index 3255f3fb3a..2ef2a4b6ea 100644 --- a/source/server/vnode/tq/src/tq.c +++ b/source/server/vnode/tq/src/tq.c @@ -22,12 +22,65 @@ // //handle management message +static tqGroupHandle* tqLookupGroupHandle(STQ *pTq, const char* topic, int cgId) { + //look in memory + // + //not found, try to restore from disk + // + //still not found + return NULL; +} + +static int tqCommitTCGroup(tqGroupHandle* handle) { + //persist into disk + return 0; +} + +int tqCreateTCGroup(STQ *pTq, const char* topic, int cgId, tqGroupHandle** handle) { + return 0; +} + +int tqOpenTGroup(STQ* pTq, const char* topic, int cgId) { + int code; + tqGroupHandle* handle = tqLookupGroupHandle(pTq, topic, cgId); + if(handle == NULL) { + code = tqCreateTCGroup(pTq, topic, cgId, &handle); + if(code != 0) { + return code; + } + } + ASSERT(handle != NULL); + + //put into STQ + + return 0; +} + +int tqCloseTCGroup(STQ* pTq, const char* topic, int cgId) { + tqGroupHandle* handle = tqLookupGroupHandle(pTq, topic, cgId); + return tqCommitTCGroup(handle); +} + +int tqDropTCGroup(STQ* pTq, const char* topic, int cgId) { + //delete from disk + return 0; +} + int tqPushMsg(STQ* pTq , void* p, int64_t version) { //add reference - // + //judge and launch new query return 0; } int tqCommit(STQ* pTq) { + //do nothing + return 0; +} + +int tqHandleMsg(STQ* pTq, void*msg) { + //parse msg and extract topic and cgId + //lookup handle + //confirm message and send to consumer + //judge and launch new query return 0; } From 18719691322059d1b47c7112b5fdc3f5555b3d8e Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Thu, 14 Oct 2021 09:42:40 +0800 Subject: [PATCH 06/24] more --- source/server/vnode/tsdb/src/tsdb.c | 9 ++++----- source/server/vnode/tsdb/src/tsdbSMA.c | 14 ++++++++++++++ 2 files changed, 18 insertions(+), 5 deletions(-) create mode 100644 source/server/vnode/tsdb/src/tsdbSMA.c diff --git a/source/server/vnode/tsdb/src/tsdb.c b/source/server/vnode/tsdb/src/tsdb.c index 7fbb0a6868..520c342a63 100644 --- a/source/server/vnode/tsdb/src/tsdb.c +++ b/source/server/vnode/tsdb/src/tsdb.c @@ -19,11 +19,10 @@ /* -------------- -------------- */ struct STsdb { - STsdbMemTable *mem; - STsdbMemTable *imem; - STkvDb * lrowdb; // last row cache - STkvDb * lastdb; // last cache - // TODO + STkvDb *tsdb; // original time-series data + STkvDb *lrowdb; // last row cache + STkvDb *lastdb; // last cache + STkvDb *fivemindb; }; int tsdbInsert(STsdb *tsdb, SSubmitReq *pReq, SSubmitRsp *pRsp) { return 0; } diff --git a/source/server/vnode/tsdb/src/tsdbSMA.c b/source/server/vnode/tsdb/src/tsdbSMA.c new file mode 100644 index 0000000000..f2f48bbc8a --- /dev/null +++ b/source/server/vnode/tsdb/src/tsdbSMA.c @@ -0,0 +1,14 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ From 9c188c53ff9ae6b968a8669407fc9129593b72f2 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Thu, 14 Oct 2021 13:28:18 +0800 Subject: [PATCH 07/24] refact --- include/util/amalloc.h | 25 +++++++------- source/server/vnode/tsdb/inc/tsdbMemTable.h | 13 ++++--- source/server/vnode/tsdb/inc/tsdbWriteBatch.h | 34 +++++++++++++++++++ source/server/vnode/tsdb/src/tsdbMemTable.c | 15 +++++++- 4 files changed, 70 insertions(+), 17 deletions(-) create mode 100644 source/server/vnode/tsdb/inc/tsdbWriteBatch.h diff --git a/include/util/amalloc.h b/include/util/amalloc.h index 6d2869f719..938e1caa4c 100644 --- a/include/util/amalloc.h +++ b/include/util/amalloc.h @@ -22,27 +22,28 @@ extern "C" { #endif +#define AMALLOC_APIS \ + void *(*malloc)(void *, size_t size); \ + void *(*calloc)(void *, size_t nmemb, size_t size); \ + void *(*realloc)(void *, size_t size); \ + void (*free)(void *ptr); + // Interfaces to implement typedef struct { - void *(*malloc)(void *, size_t size); - void *(*calloc)(void *, size_t nmemb, size_t size); - void (*free)(void *ptr, size_t size); // Do we need to set size in the allocated memory? - void *(*realloc)(void *ptr, size_t size); + AMALLOC_APIS } SMemAllocatorIf; typedef struct { - void * impl; - SMemAllocatorIf interface; + void *impl; + AMALLOC_APIS } SMemAllocator; -#define amalloc(allocator, size) \ - ((allocator) ? (*((allocator)->interface.malloc))((allocator)->impl, (size)) : malloc(size)) +#define amalloc(allocator, size) ((allocator) ? (*((allocator)->malloc))((allocator)->impl, (size)) : malloc(size)) #define acalloc(allocator, nmemb, size) \ - ((allocator) ? (*((allocator)->interface.calloc))((allocator)->impl, (nmemb), (size)) : calloc((nmemb), (size))) + ((allocator) ? (*((allocator)->calloc))((allocator)->impl, (nmemb), (size)) : calloc((nmemb), (size))) #define arealloc(allocator, ptr, size) \ - ((allocator) ? (*((allocator)->interface.realloc))((allocator)->impl, (ptr), (size)) : realloc((ptr), (size))) -#define afree(allocator, ptr, size) \ - ((allocator) ? (*((allocator)->interface.free))((allocator)->impl, (ptr), (size)) : free(ptr)) + ((allocator) ? (*((allocator)->realloc))((allocator)->impl, (ptr), (size)) : realloc((ptr), (size))) +#define afree(allocator, ptr, size) ((allocator) ? (*((allocator)->free))((allocator)->impl, (ptr), (size)) : free(ptr)) #ifdef __cplusplus } diff --git a/source/server/vnode/tsdb/inc/tsdbMemTable.h b/source/server/vnode/tsdb/inc/tsdbMemTable.h index b13c0579ce..5d1dcfcac7 100644 --- a/source/server/vnode/tsdb/inc/tsdbMemTable.h +++ b/source/server/vnode/tsdb/inc/tsdbMemTable.h @@ -18,6 +18,7 @@ #include "tdef.h" #include "thash.h" +#include "amalloc.h" #ifdef __cplusplus extern "C" { @@ -25,12 +26,16 @@ extern "C" { typedef struct STsdbMemTable STsdbMemTable; +STsdbMemTable *tsdbMemTableCreate(SMemAllocator *); +void tsdbMemTableDestroy(STsdbMemTable *); +int tsdbMemTableWriteBatch(STsdbMemTable *pTsdbMemTable, void *batch); + /* --------------------- For compile and test only --------------------- */ struct STsdbMemTable { - TSKEY minKey; - TSKEY maxKey; - SHashObj *tData; // uid --> SSkipList - void * mallocator; + TSKEY minKey; + TSKEY maxKey; + SHashObj * tData; // uid --> SSkipList + SMemAllocator *ma; T_REF_DECLARE() }; diff --git a/source/server/vnode/tsdb/inc/tsdbWriteBatch.h b/source/server/vnode/tsdb/inc/tsdbWriteBatch.h new file mode 100644 index 0000000000..f6b761f6a5 --- /dev/null +++ b/source/server/vnode/tsdb/inc/tsdbWriteBatch.h @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_TSDB_WRITE_BATCH_H_ +#define _TD_TSDB_WRITE_BATCH_H_ + +#ifdef __cplusplus +extern "C" { +#endif + +typedef struct STsdbWriteBatch STsdbWriteBatch; + +/* ------------------------- ------------------------- */ +struct STsdbWriteBatch { + // TODO +}; + +#ifdef __cplusplus +} +#endif + +#endif /*_TD_TSDB_WRITE_BATCH_H_*/ \ No newline at end of file diff --git a/source/server/vnode/tsdb/src/tsdbMemTable.c b/source/server/vnode/tsdb/src/tsdbMemTable.c index 91700a37d6..9fd815155f 100644 --- a/source/server/vnode/tsdb/src/tsdbMemTable.c +++ b/source/server/vnode/tsdb/src/tsdbMemTable.c @@ -15,7 +15,7 @@ #include "tsdbMemTable.h" -STsdbMemTable *tsdbMemTableCreate(void *mallocator) { +STsdbMemTable *tsdbMemTableCreate(SMemAllocator *ma) { STsdbMemTable *pTsdbMemTable = NULL; pTsdbMemTable = (STsdbMemTable *)malloc(sizeof(*pTsdbMemTable)); @@ -24,6 +24,13 @@ STsdbMemTable *tsdbMemTableCreate(void *mallocator) { } // TODO + pTsdbMemTable->minKey = TSKEY_INITIAL_VAL; + pTsdbMemTable->maxKey = TSKEY_INITIAL_VAL; + pTsdbMemTable->ma = ma; + pTsdbMemTable->tData = taosHashInit(1024, taosIntHash_64, true /* TODO */, HASH_NO_LOCK); + if (pTsdbMemTable->tData == NULL) { + // TODO + } return pTsdbMemTable; } @@ -33,4 +40,10 @@ void tsdbMemTableDestroy(STsdbMemTable *pTsdbMemTable) { // TODO free(pTsdbMemTable); } +} + +int tsdbMemTableWriteBatch(STsdbMemTable *pTsdbMemTable, void *batch) { + // TODO + + return 0; } \ No newline at end of file From f228d74625ae22de893a02c350989c781bf25408 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Thu, 14 Oct 2021 13:54:42 +0800 Subject: [PATCH 08/24] more --- deps/CMakeLists.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/deps/CMakeLists.txt b/deps/CMakeLists.txt index 05c154af86..205b6298e3 100644 --- a/deps/CMakeLists.txt +++ b/deps/CMakeLists.txt @@ -36,7 +36,7 @@ unset(CMAKE_PROJECT_INCLUDE_BEFORE) # leveldb if(${BUILD_WITH_LEVELDB}) -option(LEVELDB_BUILD_TESTS "" OFF) + option(LEVELDB_BUILD_TESTS "" OFF) add_subdirectory(leveldb) target_include_directories( leveldb From b88336d2cdcfd978759b1643e5679d2686d41459 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Thu, 14 Oct 2021 16:41:44 +0800 Subject: [PATCH 09/24] integrate lucene --- CMakeLists.txt | 8 +++++++- cmake/lucene_CMakeLists.txt.in | 13 +++++++++++++ deps/CMakeLists.txt | 8 ++++++++ 3 files changed, 28 insertions(+), 1 deletion(-) create mode 100644 cmake/lucene_CMakeLists.txt.in diff --git a/CMakeLists.txt b/CMakeLists.txt index 9b4a0a1b85..e1b578e08f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -42,12 +42,18 @@ if(${BUILD_WITH_LEVELDB}) endif(${BUILD_WITH_LEVELDB}) ## rocksdb -option(BUILD_WITH_ROCKSDB "If build with rocksdb" ON) +option(BUILD_WITH_ROCKSDB "If build with rocksdb" OFF) if(${BUILD_WITH_ROCKSDB}) cat("${CMAKE_SUPPORT_DIR}/rocksdb_CMakeLists.txt.in" ${DEPS_TMP_FILE}) add_definitions(-DUSE_ROCKSDB) endif(${BUILD_WITH_ROCKSDB}) +## lucene +option(BUILD_WITH_LUCENE "If build with lucene" OFF) +if(${BUILD_WITH_LUCENE}) + cat("${CMAKE_SUPPORT_DIR}/lucene_CMakeLists.txt.in" ${DEPS_TMP_FILE}) +endif(${BUILD_WITH_LUCENE}) + ## download dependencies configure_file(${DEPS_TMP_FILE} "${CMAKE_SOURCE_DIR}/deps/deps-download/CMakeLists.txt") execute_process(COMMAND "${CMAKE_COMMAND}" -G "${CMAKE_GENERATOR}" . diff --git a/cmake/lucene_CMakeLists.txt.in b/cmake/lucene_CMakeLists.txt.in new file mode 100644 index 0000000000..91e144dced --- /dev/null +++ b/cmake/lucene_CMakeLists.txt.in @@ -0,0 +1,13 @@ + +# lucene +ExternalProject_Add(lucene + GIT_REPOSITORY https://github.com/taosdata-contrib/LucenePlusPlus.git + GIT_TAG rel_3.0.8 + SOURCE_DIR "${CMAKE_SOURCE_DIR}/deps/lucene" + BINARY_DIR "" + #BUILD_IN_SOURCE TRUE + CONFIGURE_COMMAND "" + BUILD_COMMAND "" + INSTALL_COMMAND "" + TEST_COMMAND "" +) \ No newline at end of file diff --git a/deps/CMakeLists.txt b/deps/CMakeLists.txt index 205b6298e3..57de0125a2 100644 --- a/deps/CMakeLists.txt +++ b/deps/CMakeLists.txt @@ -45,6 +45,7 @@ if(${BUILD_WITH_LEVELDB}) endif(${BUILD_WITH_LEVELDB}) # rocksdb +# To support rocksdb build on ubuntu: sudo apt-get install libgflags-dev if(${BUILD_WITH_ROCKSDB}) option(WITH_TESTS "" OFF) option(WITH_BENCHMARK_TOOLS "" OFF) @@ -55,3 +56,10 @@ if(${BUILD_WITH_ROCKSDB}) PUBLIC $ ) endif(${BUILD_WITH_ROCKSDB}) + +# lucene +# To support build on ubuntu: sudo apt-get install libboost-all-dev +if(${BUILD_WITH_LUCENE}) + option(ENABLE_TEST "Enable the tests" OFF) + add_subdirectory(lucene) +endif(${BUILD_WITH_LUCENE}) From 69a4417a0c5b971b8a29494847d865c66ae8e742 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Thu, 14 Oct 2021 16:50:07 +0800 Subject: [PATCH 10/24] add some msg for tq (#8241) --- include/common/taosmsg.h | 6 +++- include/server/vnode/tq/tq.h | 54 ++++++++++++++++++++++++++++-- source/server/vnode/tq/inc/tqInt.h | 7 ++-- source/server/vnode/tq/src/tq.c | 2 +- 4 files changed, 62 insertions(+), 7 deletions(-) diff --git a/include/common/taosmsg.h b/include/common/taosmsg.h index 8f89df40d0..78f91cca64 100644 --- a/include/common/taosmsg.h +++ b/include/common/taosmsg.h @@ -41,6 +41,10 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_SUBMIT, "submit" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_QUERY, "query" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_FETCH, "fetch" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_UPDATE_TAG_VAL, "update-tag-val" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_CONNECT, "mq-connect" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_CONSUME, "mq-consume" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_ACK, "mq-ack" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_RESET, "mq-reset" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY1, "dummy1" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY2, "dummy2" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY3, "dummy3" ) @@ -113,7 +117,7 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_NETWORK_TEST, "nettest" ) // message for topic TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_CREATE_TP, "create-tp" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_DROP_TP, "drop-tp" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_USE_TP, "use-tp" ) +//TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_USE_TP, "use-tp" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_ALTER_TP, "alter-tp" ) #ifndef TAOS_MESSAGE_C diff --git a/include/server/vnode/tq/tq.h b/include/server/vnode/tq/tq.h index eb9c57c581..ef6a34ffa3 100644 --- a/include/server/vnode/tq/tq.h +++ b/include/server/vnode/tq/tq.h @@ -22,6 +22,56 @@ extern "C" { #endif +typedef struct tmqMsgHead { + int32_t headLen; + int32_t msgVer; + int64_t cgId; + int32_t topicLen; + char topic[]; +} tmqMsgHead; + +//TODO: put msgs into common +typedef struct tmqConnectReq { + tmqMsgHead head; + +} tmqConnectReq; + +typedef struct tmqConnectResp { + +} tmqConnectResp; + +typedef struct tmqDisconnectReq { + +} tmqDisconnectReq; + +typedef struct tmqDisconnectResp { + +} tmqDiconnectResp; + +typedef struct tmqConsumeReq { + +} tmqConsumeReq; + +typedef struct tmqConsumeResp { + +} tmqConsumeResp; + +typedef struct tmqSubscribeReq { + +} tmqSubscribeReq; + +typedef struct tmqSubscribeResp { + +} tmqSubscribeResp; + +typedef struct tmqHeartbeatReq { + +} tmqHeartbeatReq; + +typedef struct tmqHeartbeatResp { + +} tmqHeartbeatResp; + typedef struct tqTopicVhandle { //name // @@ -29,7 +79,7 @@ typedef struct tqTopicVhandle { // //callback for mnode // -} tqTopic; +} tqTopicVhandle; typedef struct STQ { //the set for topics @@ -50,7 +100,7 @@ int tqPushMsg(STQ*, void* msg, int64_t version); int tqCommit(STQ*); //void* will be replace by a msg type -int tqHandleMsg(STQ*, void* msg); +int tqHandleConsumeMsg(STQ*, tmqConsumeReq* msg); #ifdef __cplusplus } diff --git a/source/server/vnode/tq/inc/tqInt.h b/source/server/vnode/tq/inc/tqInt.h index c42bcfef43..cba9075fe9 100644 --- a/source/server/vnode/tq/inc/tqInt.h +++ b/source/server/vnode/tq/inc/tqInt.h @@ -26,14 +26,15 @@ extern "C" { typedef struct tqBufferItem { int64_t offset; - void *content; + void* executor; + void* content; } tqBufferItem; typedef struct tqGroupHandle { - char* topic; - void* ahandle; + char* topic; //c style, end with '\0' int64_t cgId; + void* ahandle; int64_t consumeOffset; int32_t head; int32_t tail; diff --git a/source/server/vnode/tq/src/tq.c b/source/server/vnode/tq/src/tq.c index 2ef2a4b6ea..7733ac29b5 100644 --- a/source/server/vnode/tq/src/tq.c +++ b/source/server/vnode/tq/src/tq.c @@ -77,7 +77,7 @@ int tqCommit(STQ* pTq) { return 0; } -int tqHandleMsg(STQ* pTq, void*msg) { +int tqHandleConsumeMsg(STQ* pTq, tmqConsumeReq* msg) { //parse msg and extract topic and cgId //lookup handle //confirm message and send to consumer From eef8f49af0abc75cd5fe8325ae252b5768a74732 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Thu, 14 Oct 2021 17:22:59 +0800 Subject: [PATCH 11/24] refact --- CMakeLists.txt | 6 +----- cmake/cmake.options | 26 ++++++++++++++++++++++++++ 2 files changed, 27 insertions(+), 5 deletions(-) create mode 100644 cmake/cmake.options diff --git a/CMakeLists.txt b/CMakeLists.txt index e1b578e08f..abb39c310a 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -10,6 +10,7 @@ project( # DEPENDENCIES # ============================================================================ set(CMAKE_SUPPORT_DIR "${CMAKE_SOURCE_DIR}/cmake") +include(${CMAKE_SUPPORT_DIR}/cmake.options) function(cat IN_FILE OUT_FILE) file(READ ${IN_FILE} CONTENTS) @@ -20,8 +21,6 @@ set(DEPS_TMP_FILE "${CMAKE_BINARY_DIR}/deps_tmp_CMakeLists.txt.in") configure_file("${CMAKE_SUPPORT_DIR}/deps_CMakeLists.txt.in" ${DEPS_TMP_FILE}) ## googletest -option(BUILD_TEST "If build unit tests using googletest" ON) - if(${BUILD_TEST}) cat("${CMAKE_SUPPORT_DIR}/gtest_CMakeLists.txt.in" ${DEPS_TMP_FILE}) endif(${BUILD_TEST}) @@ -36,20 +35,17 @@ cat("${CMAKE_SUPPORT_DIR}/zlib_CMakeLists.txt.in" ${DEPS_TMP_FILE}) cat("${CMAKE_SUPPORT_DIR}/cjson_CMakeLists.txt.in" ${DEPS_TMP_FILE}) ## leveldb -option(BUILD_WITH_LEVELDB "If build with leveldb" OFF) if(${BUILD_WITH_LEVELDB}) cat("${CMAKE_SUPPORT_DIR}/leveldb_CMakeLists.txt.in" ${DEPS_TMP_FILE}) endif(${BUILD_WITH_LEVELDB}) ## rocksdb -option(BUILD_WITH_ROCKSDB "If build with rocksdb" OFF) if(${BUILD_WITH_ROCKSDB}) cat("${CMAKE_SUPPORT_DIR}/rocksdb_CMakeLists.txt.in" ${DEPS_TMP_FILE}) add_definitions(-DUSE_ROCKSDB) endif(${BUILD_WITH_ROCKSDB}) ## lucene -option(BUILD_WITH_LUCENE "If build with lucene" OFF) if(${BUILD_WITH_LUCENE}) cat("${CMAKE_SUPPORT_DIR}/lucene_CMakeLists.txt.in" ${DEPS_TMP_FILE}) endif(${BUILD_WITH_LUCENE}) diff --git a/cmake/cmake.options b/cmake/cmake.options new file mode 100644 index 0000000000..bc3177e5cc --- /dev/null +++ b/cmake/cmake.options @@ -0,0 +1,26 @@ +# ========================================================= +# Deps options +# ========================================================= +option( + BUILD_TEST + "If build unit tests using googletest" + ON +) + +option( + BUILD_WITH_LEVELDB + "If build with leveldb" + OFF +) + +option( + BUILD_WITH_ROCKSDB + "If build with rocksdb" + OFF +) + +option( + BUILD_WITH_LUCENE + "If build with lucene" + OFF +) \ No newline at end of file From ad61d062ffd36fbaddafcde8f04b24088c8d37b5 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 14 Oct 2021 23:22:16 +0800 Subject: [PATCH 12/24] add priority queue --- include/util/theap.h | 65 +++++++++++++ source/util/src/theap.c | 206 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 271 insertions(+) create mode 100644 include/util/theap.h create mode 100644 source/util/src/theap.c diff --git a/include/util/theap.h b/include/util/theap.h new file mode 100644 index 0000000000..fd1a39f8dd --- /dev/null +++ b/include/util/theap.h @@ -0,0 +1,65 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +#ifndef TDENGINE_HEAP_H +#define TDENGINE_HEAP_H + +#ifdef __cplusplus +extern "C" { +#endif + +#include "os.h" + +struct HeapNode; + +/* Return non-zero if a < b. */ +typedef int (*HeapCompareFn)(const struct HeapNode* a, const struct HeapNode* b); + +typedef struct HeapNode { + struct HeapNode* left; + struct HeapNode* right; + struct HeapNode* parent; +} HeapNode; + +/* A binary min heap. The usual properties hold: the root is the lowest + * element in the set, the height of the tree is at most log2(nodes) and + * it's always a complete binary tree. + * + */ +typedef struct { + HeapNode* min; + size_t nelts; + HeapCompareFn compFn; +} Heap; + + +Heap* heapCreate(HeapCompareFn fn); + +void heapDestroy(Heap *heap); + +HeapNode* heapMin(const Heap* heap); + +void heapInsert(Heap* heap, HeapNode* node); + +void heapRemove(Heap* heap, struct HeapNode* node); + +void heapDequeue(Heap* heap); + +size_t heapSize(Heap *heap); + +#ifdef __cplusplus +} +#endif + +#endif // TDENGINE_HASH_H diff --git a/source/util/src/theap.c b/source/util/src/theap.c new file mode 100644 index 0000000000..aa822c7d5e --- /dev/null +++ b/source/util/src/theap.c @@ -0,0 +1,206 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "theap.h" + +size_t heapSize(Heap* heap) { + return heap->nelts; +} + +Heap* heapCreate(HeapCompareFn fn) { + Heap *heap = calloc(1, sizeof(Heap)); + if (heap == NULL) { return NULL; } + + heap->min = NULL; + heap->nelts = 0; + heap->compFn = fn; + return heap; +} + +void heapDestroy(Heap *heap) { + free(heap); +} + +HeapNode* heapMin(const Heap* heap) { + return heap->min; +} + +/* Swap parent with child. Child moves closer to the root, parent moves away. */ +static void heapNodeSwap(Heap* heap, HeapNode* parent, HeapNode* child) { + HeapNode* sibling; + HeapNode t; + + t = *parent; + *parent = *child; + *child = t; + + parent->parent = child; + if (child->left == child) { + child->left = parent; + sibling = child->right; + } else { + child->right = parent; + sibling = child->left; + } + if (sibling != NULL) + sibling->parent = child; + + if (parent->left != NULL) + parent->left->parent = parent; + if (parent->right != NULL) + parent->right->parent = parent; + + if (child->parent == NULL) + heap->min = child; + else if (child->parent->left == parent) + child->parent->left = child; + else + child->parent->right = child; +} + +void heapInsert(Heap* heap, HeapNode* newnode) { + HeapNode** parent; + HeapNode** child; + unsigned int path; + unsigned int n; + unsigned int k; + + newnode->left = NULL; + newnode->right = NULL; + newnode->parent = NULL; + + /* Calculate the path from the root to the insertion point. This is a min + * heap so we always insert at the left-most free node of the bottom row. + */ + path = 0; + for (k = 0, n = 1 + heap->nelts; n >= 2; k += 1, n /= 2) + path = (path << 1) | (n & 1); + + /* Now traverse the heap using the path we calculated in the previous step. */ + parent = child = &heap->min; + while (k > 0) { + parent = child; + if (path & 1) + child = &(*child)->right; + else + child = &(*child)->left; + path >>= 1; + k -= 1; + } + + /* Insert the new node. */ + newnode->parent = *parent; + *child = newnode; + heap->nelts += 1; + + /* Walk up the tree and check at each node if the heap property holds. + * It's a min heap so parent < child must be true. + */ + while (newnode->parent != NULL && (heap->compFn)(newnode, newnode->parent)) + heapNodeSwap(heap, newnode->parent, newnode); +} + +void heapRemove(Heap* heap, HeapNode* node) { + HeapNode* smallest; + HeapNode** max; + HeapNode* child; + unsigned int path; + unsigned int k; + unsigned int n; + + if (heap->nelts == 0) + return; + + /* Calculate the path from the min (the root) to the max, the left-most node + * of the bottom row. + */ + path = 0; + for (k = 0, n = heap->nelts; n >= 2; k += 1, n /= 2) + path = (path << 1) | (n & 1); + + /* Now traverse the heap using the path we calculated in the previous step. */ + max = &heap->min; + while (k > 0) { + if (path & 1) + max = &(*max)->right; + else + max = &(*max)->left; + path >>= 1; + k -= 1; + } + + heap->nelts -= 1; + + /* Unlink the max node. */ + child = *max; + *max = NULL; + + if (child == node) { + /* We're removing either the max or the last node in the tree. */ + if (child == heap->min) { + heap->min = NULL; + } + return; + } + + /* Replace the to be deleted node with the max node. */ + child->left = node->left; + child->right = node->right; + child->parent = node->parent; + + if (child->left != NULL) { + child->left->parent = child; + } + + if (child->right != NULL) { + child->right->parent = child; + } + + if (node->parent == NULL) { + heap->min = child; + } else if (node->parent->left == node) { + node->parent->left = child; + } else { + node->parent->right = child; + } + + /* Walk down the subtree and check at each node if the heap property holds. + * It's a min heap so parent < child must be true. If the parent is bigger, + * swap it with the smallest child. + */ + for (;;) { + smallest = child; + if (child->left != NULL && (heap->compFn)(child->left, smallest)) + smallest = child->left; + if (child->right != NULL && (heap->compFn)(child->right, smallest)) + smallest = child->right; + if (smallest == child) + break; + heapNodeSwap(heap, child, smallest); + } + + /* Walk up the subtree and check that each parent is less than the node + * this is required, because `max` node is not guaranteed to be the + * actual maximum in tree + */ + while (child->parent != NULL && (heap->compFn)(child, child->parent)) + heapNodeSwap(heap, child->parent, child); +} + +void heapDequeue(Heap* heap) { + heapRemove(heap, heap->min); +} + + From 36c20ec6176dcb2e99e62f633925a8310caeaef8 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Fri, 15 Oct 2021 09:55:43 +0800 Subject: [PATCH 13/24] more progress --- source/server/vnode/meta/inc/metaInt.h | 31 -------------------------- source/server/vnode/meta/inc/metaUid.h | 16 +++++++++++-- source/server/vnode/meta/src/meta.c | 29 +++++++++++++----------- source/server/vnode/meta/src/metaUid.c | 11 +++++---- 4 files changed, 37 insertions(+), 50 deletions(-) delete mode 100644 source/server/vnode/meta/inc/metaInt.h diff --git a/source/server/vnode/meta/inc/metaInt.h b/source/server/vnode/meta/inc/metaInt.h deleted file mode 100644 index 549ac829ab..0000000000 --- a/source/server/vnode/meta/inc/metaInt.h +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#ifndef _TD_META_INT_H_ -#define _TD_META_INT_H_ - -#ifdef __cplusplus -extern "C" { -#endif - -struct { - tkv_db_t db; -} SMeta; - -#ifdef __cplusplus -} -#endif - -#endif /*_TD_META_INT_H_*/ \ No newline at end of file diff --git a/source/server/vnode/meta/inc/metaUid.h b/source/server/vnode/meta/inc/metaUid.h index b01492cbf7..f0a747bc74 100644 --- a/source/server/vnode/meta/inc/metaUid.h +++ b/source/server/vnode/meta/inc/metaUid.h @@ -22,10 +22,22 @@ extern "C" { #endif -typedef uint64_t tb_uid_t; -tb_uid_t metaGenerateUid(); +/* ------------------------ APIS EXPOSED ------------------------ */ +typedef uint64_t tb_uid_t; +typedef struct STableUidGenerator STableUidGenerator; +// tb_uid_t #define IVLD_TB_UID 0 +tb_uid_t generateUid(STableUidGenerator *); + +// STableUidGenerator +void tableUidGeneratorInit(STableUidGenerator *, tb_uid_t suid); +#define tableUidGeneratorClear(ug) + +/* ------------------------ FOR TEST AND COMPILE ONLY ------------------------ */ +struct STableUidGenerator { + tb_uid_t nextUid; +}; #ifdef __cplusplus } diff --git a/source/server/vnode/meta/src/meta.c b/source/server/vnode/meta/src/meta.c index 88d0ff2f16..57824e9fe6 100644 --- a/source/server/vnode/meta/src/meta.c +++ b/source/server/vnode/meta/src/meta.c @@ -13,8 +13,8 @@ * along with this program. If not, see . */ -#include "tkv.h" #include "thash.h" +#include "tkv.h" #include "tlist.h" #include "tlockfree.h" #include "ttypes.h" @@ -43,13 +43,14 @@ typedef struct STableObj { struct SMeta { pthread_rwlock_t rwLock; - SHashObj *pTableObjHash; // uid --> STableObj - SList * stbList; // super table list - STkvDb * tbnameDb; // tbname --> uid - STkvDb * tagDb; // uid --> tag - STkvDb * schemaDb; - STkvDb * tagIdx; - size_t totalUsed; + STableUidGenerator uidGenerator; + SHashObj * pTableObjHash; // uid --> STableObj + SList * stbList; // super table list + STkvDb * tbnameDb; // tbname --> uid + STkvDb * tagDb; // uid --> tag + STkvDb * schemaDb; + STkvDb * tagIdx; + size_t totalUsed; }; static STable * metaTableNew(tb_uid_t uid, const char *name, int32_t sver); @@ -68,6 +69,7 @@ SMeta *metaOpen(SMetaOpts *options) { pthread_rwlock_init(&(pMeta->rwLock), NULL); + tableUidGeneratorInit(&(pMeta->uidGenerator), IVLD_TB_UID); pMeta->pTableObjHash = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); pMeta->stbList = tdListNew(sizeof(STableObj *)); @@ -109,9 +111,9 @@ void metaClose(SMeta *pMeta) { } int metaCreateTable(SMeta *pMeta, STableOpts *pTableOpts) { - size_t vallen; - STkvReadOpts *ropt; - STableObj * pTableObj = NULL; + size_t vallen; + STkvReadOpts * ropt; + STableObj * pTableObj = NULL; STkvWriteOpts *wopt; // Check if table already exists @@ -133,7 +135,8 @@ int metaCreateTable(SMeta *pMeta, STableOpts *pTableOpts) { } // Create table object - pTableObj->pTable = metaTableNew(metaGenerateUid(), pTableOpts->name, schemaVersion(pTableOpts->pSchema)); + pTableObj->pTable = + metaTableNew(generateUid(&(pMeta->uidGenerator)), pTableOpts->name, schemaVersion(pTableOpts->pSchema)); if (pTableObj->pTable == NULL) { // TODO } @@ -147,7 +150,7 @@ int metaCreateTable(SMeta *pMeta, STableOpts *pTableOpts) { // Add to tbname db tkvPut(pMeta->tbnameDb, wopt, pTableOpts->name, strlen(pTableOpts->name), (char *)&pTableObj->pTable->uid, - sizeof(tb_uid_t)); + sizeof(tb_uid_t)); // Add to schema db char id[12]; diff --git a/source/server/vnode/meta/src/metaUid.c b/source/server/vnode/meta/src/metaUid.c index 4662969a2e..80afa490f3 100644 --- a/source/server/vnode/meta/src/metaUid.c +++ b/source/server/vnode/meta/src/metaUid.c @@ -15,9 +15,12 @@ #include "metaUid.h" -static tb_uid_t nuid = IVLD_TB_UID; +tb_uid_t generateUid(STableUidGenerator *pGen) { + // Generate a new table UID + return ++(pGen->nextUid); +} -tb_uid_t metaGenerateUid() { - // TODO: need a more complex UID generator - return ++nuid; +void tableUidGeneratorInit(STableUidGenerator *pGen, tb_uid_t suid) { + // Init a generator + pGen->nextUid = suid; } \ No newline at end of file From 25e569e1290ad33d7aa0d9163ce54cef77d78f22 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Fri, 15 Oct 2021 10:27:42 +0800 Subject: [PATCH 14/24] add deps test --- .gitignore | 1 + cmake/cmake.options | 6 +++++ deps/CMakeLists.txt | 10 +++++++ deps/test/CMakeLists.txt | 4 +++ deps/test/rocksdb/CMakeLists.txt | 6 +++++ deps/test/rocksdb/main.c | 46 ++++++++++++++++++++++++++++++++ 6 files changed, 73 insertions(+) create mode 100644 deps/test/CMakeLists.txt create mode 100644 deps/test/rocksdb/CMakeLists.txt create mode 100644 deps/test/rocksdb/main.c diff --git a/.gitignore b/.gitignore index 0b98a1b161..83ed62c030 100644 --- a/.gitignore +++ b/.gitignore @@ -99,3 +99,4 @@ TAGS deps/* !deps/CMakeLists.txt +!deps/test diff --git a/cmake/cmake.options b/cmake/cmake.options index bc3177e5cc..74b0d9fdbb 100644 --- a/cmake/cmake.options +++ b/cmake/cmake.options @@ -23,4 +23,10 @@ option( BUILD_WITH_LUCENE "If build with lucene" OFF +) + +option( + BUILD_DEPENDENCY_TESTS + "If build dependency tests" + OFF ) \ No newline at end of file diff --git a/deps/CMakeLists.txt b/deps/CMakeLists.txt index 57de0125a2..e35417b4c5 100644 --- a/deps/CMakeLists.txt +++ b/deps/CMakeLists.txt @@ -1,3 +1,6 @@ +# ================================================================================================ +# DEPENDENCIES +# ================================================================================================ # googletest if(${BUILD_TEST}) add_subdirectory(googletest) @@ -63,3 +66,10 @@ if(${BUILD_WITH_LUCENE}) option(ENABLE_TEST "Enable the tests" OFF) add_subdirectory(lucene) endif(${BUILD_WITH_LUCENE}) + +# ================================================================================================ +# DEPENDENCY TEST +# ================================================================================================ +if(${BUILD_DEPENDENCY_TESTS}) + add_subdirectory(test) +endif(${BUILD_DEPENDENCY_TESTS}) diff --git a/deps/test/CMakeLists.txt b/deps/test/CMakeLists.txt new file mode 100644 index 0000000000..4547431ca7 --- /dev/null +++ b/deps/test/CMakeLists.txt @@ -0,0 +1,4 @@ +# rocksdb +if(${BUILD_WITH_ROCKSDB}) + add_subdirectory(rocksdb) +endif(${BUILD_WITH_ROCKSDB}) diff --git a/deps/test/rocksdb/CMakeLists.txt b/deps/test/rocksdb/CMakeLists.txt new file mode 100644 index 0000000000..b500e0a661 --- /dev/null +++ b/deps/test/rocksdb/CMakeLists.txt @@ -0,0 +1,6 @@ +add_executable(rocksdbTest "") +target_sources(rocksdbTest + PRIVATE + "${CMAKE_CURRENT_SOURCE_DIR}/main.c" +) +target_link_libraries(rocksdbTest rocksdb) \ No newline at end of file diff --git a/deps/test/rocksdb/main.c b/deps/test/rocksdb/main.c new file mode 100644 index 0000000000..d1cbd373da --- /dev/null +++ b/deps/test/rocksdb/main.c @@ -0,0 +1,46 @@ +#include +#include +#include +#include +#include // sysconf() - get CPU count +#include "rocksdb/c.h" + +// const char DBPath[] = "/tmp/rocksdb_c_simple_example"; +const char DBPath[] = "rocksdb_c_simple_example"; +const char DBBackupPath[] = "/tmp/rocksdb_c_simple_example_backup"; + +int main(int argc, char const *argv[]) { + rocksdb_t * db; + rocksdb_backup_engine_t *be; + rocksdb_options_t * options = rocksdb_options_create(); + rocksdb_options_set_create_if_missing(options, 1); + + // open DB + char *err = NULL; + db = rocksdb_open(options, DBPath, &err); + + // Write + rocksdb_writeoptions_t *writeoptions = rocksdb_writeoptions_create(); + rocksdb_put(db, writeoptions, "key", 3, "value", 5, &err); + + // Read + rocksdb_readoptions_t *readoptions = rocksdb_readoptions_create(); + rocksdb_readoptions_set_snapshot(readoptions, rocksdb_create_snapshot(db)); + size_t vallen = 0; + char * val = rocksdb_get(db, readoptions, "key", 3, &vallen, &err); + printf("val:%s\n", val); + + // Update + // rocksdb_put(db, writeoptions, "key", 3, "eulav", 5, &err); + + // Delete + rocksdb_delete(db, writeoptions, "key", 3, &err); + + // Read again + val = rocksdb_get(db, readoptions, "key", 3, &vallen, &err); + printf("val:%s\n", val); + + rocksdb_close(db); + + return 0; +} \ No newline at end of file From f660db701a85588c3cfa6e08b1e900fa590eea2f Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Fri, 15 Oct 2021 14:59:35 +0800 Subject: [PATCH 15/24] more --- include/server/vnode/meta/meta.h | 9 +- include/util/tcoding.h | 2 +- source/server/vnode/meta/inc/metaDef.h | 68 ++++++ source/server/vnode/meta/src/meta.c | 223 ------------------ source/server/vnode/meta/src/metaMain.c | 228 +++++++++++++++++++ source/server/vnode/meta/test/CMakeLists.txt | 2 +- source/server/vnode/meta/test/metaTests.cpp | 38 ++-- 7 files changed, 318 insertions(+), 252 deletions(-) create mode 100644 source/server/vnode/meta/inc/metaDef.h delete mode 100644 source/server/vnode/meta/src/meta.c create mode 100644 source/server/vnode/meta/src/metaMain.c diff --git a/include/server/vnode/meta/meta.h b/include/server/vnode/meta/meta.h index 3afe01511b..df6c79d077 100644 --- a/include/server/vnode/meta/meta.h +++ b/include/server/vnode/meta/meta.h @@ -38,7 +38,7 @@ int metaCreate(const char *path); void metaDestroy(const char *path); SMeta *metaOpen(SMetaOpts *); void metaClose(SMeta *); -int metaCreateTable(SMeta *, STableOpts *); +int metaCreateTable(SMeta *, const STableOpts *); int metaDropTable(SMeta *, uint64_t tuid_t); int metaAlterTable(SMeta *, void *); int metaCommit(SMeta *); @@ -59,13 +59,6 @@ void metaQueryOptionsDestroy(SMetaQueryOpts *); // STableOpts void metaTableOptsInit(STableOpts *, int8_t type, const char *name, const STSchema *pSchema); -/* -------------------------------- Hided implementations -------------------------------- */ -struct STableOpts { - int8_t type; - char * name; - STSchema *pSchema; -}; - #ifdef __cplusplus } #endif diff --git a/include/util/tcoding.h b/include/util/tcoding.h index a11768a1b0..a2c91c5dbe 100644 --- a/include/util/tcoding.h +++ b/include/util/tcoding.h @@ -330,7 +330,7 @@ static FORCE_INLINE void *taosDecodeVariantI64(void *buf, int64_t *value) { } // ---- string -static FORCE_INLINE int taosEncodeString(void **buf, char *value) { +static FORCE_INLINE int taosEncodeString(void **buf, const char *value) { int tlen = 0; size_t size = strlen(value); diff --git a/source/server/vnode/meta/inc/metaDef.h b/source/server/vnode/meta/inc/metaDef.h new file mode 100644 index 0000000000..3ff239e01e --- /dev/null +++ b/source/server/vnode/meta/inc/metaDef.h @@ -0,0 +1,68 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_META_DEF_H_ +#define _TD_META_DEF_H_ + +#include "metaUid.h" +#include "tkv.h" + +#ifdef __cplusplus +extern "C" { +#endif + +struct SMeta { + STableUidGenerator uidGenerator; + + STkvDb* tableDb; // uid->table obj + STkvDb* tbnameDb; // tbname --> uid + STkvDb* schemaDb; // uid+version --> schema + STkvDb* tagDb; // uid --> tag + STkvDb* tagIdx; // TODO: need to integrate lucene or our own + // STkvCache* metaCache; // TODO: add a global cache here +}; + +/* ------------------------ TEST CODE ------------------------ */ +typedef enum { META_SUPER_TABLE = 0, META_CHILD_TABLE = 1, META_NORMAL_TABLE = 2 } EMetaTableT; +typedef struct SSuperTableOpts { + tb_uid_t uid; + STSchema* pSchema; // (ts timestamp, a int) + STSchema* pTagSchema; // (tag1 binary(10), tag2 int) +} SSuperTableOpts; + +typedef struct SChildTableOpts { + tb_uid_t suid; // super table uid + SKVRow tags; // tag value of the child table +} SChildTableOpts; + +typedef struct SNormalTableOpts { + STSchema* pSchema; +} SNormalTableOpts; + +struct STableOpts { + EMetaTableT type; + char* name; + union { + SSuperTableOpts superOpts; + SChildTableOpts childOpts; + SNormalTableOpts normalOpts; + }; +}; + +#ifdef __cplusplus +} +#endif + +#endif /*_TD_META_DEF_H_*/ \ No newline at end of file diff --git a/source/server/vnode/meta/src/meta.c b/source/server/vnode/meta/src/meta.c deleted file mode 100644 index 57824e9fe6..0000000000 --- a/source/server/vnode/meta/src/meta.c +++ /dev/null @@ -1,223 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#include "thash.h" -#include "tkv.h" -#include "tlist.h" -#include "tlockfree.h" -#include "ttypes.h" - -#include "meta.h" -#include "metaUid.h" - -/* -------------------- Structures -------------------- */ - -typedef struct STable { - tb_uid_t uid; - char * name; - tb_uid_t suid; - SArray * schema; -} STable; - -typedef struct STableObj { - bool pin; - uint64_t ref; - SRWLatch latch; - uint64_t offset; - SList * ctbList; // child table list - STable * pTable; -} STableObj; - -struct SMeta { - pthread_rwlock_t rwLock; - - STableUidGenerator uidGenerator; - SHashObj * pTableObjHash; // uid --> STableObj - SList * stbList; // super table list - STkvDb * tbnameDb; // tbname --> uid - STkvDb * tagDb; // uid --> tag - STkvDb * schemaDb; - STkvDb * tagIdx; - size_t totalUsed; -}; - -static STable * metaTableNew(tb_uid_t uid, const char *name, int32_t sver); -static STableObj *metaTableObjNew(); - -/* -------------------- Methods -------------------- */ - -SMeta *metaOpen(SMetaOpts *options) { - SMeta *pMeta = NULL; - char * err = NULL; - - pMeta = (SMeta *)calloc(1, sizeof(*pMeta)); - if (pMeta == NULL) { - return NULL; - } - - pthread_rwlock_init(&(pMeta->rwLock), NULL); - - tableUidGeneratorInit(&(pMeta->uidGenerator), IVLD_TB_UID); - pMeta->pTableObjHash = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); - - pMeta->stbList = tdListNew(sizeof(STableObj *)); - - // Options - STkvOpts *dbOptions = tkvOptsCreate(); - tkvOptsSetCreateIfMissing(dbOptions, 1); - - taosMkDir("meta"); - - // Open tbname DB - pMeta->tbnameDb = tkvOpen(dbOptions, "meta/tbname_uid_db"); - - // Open tag DB - pMeta->tagDb = tkvOpen(dbOptions, "meta/uid_tag_db"); - - // Open schema DB - pMeta->schemaDb = tkvOpen(dbOptions, "meta/schema_db"); - - // Open tag index - pMeta->tagIdx = tkvOpen(dbOptions, "meta/tag_idx_db"); - - tkvOptsDestroy(dbOptions); - - return pMeta; -} - -void metaClose(SMeta *pMeta) { - if (pMeta) { - tkvClose(pMeta->tagIdx); - tkvClose(pMeta->schemaDb); - tkvClose(pMeta->tagDb); - tkvClose(pMeta->tbnameDb); - - tdListFree(pMeta->stbList); - taosHashCleanup(pMeta->pTableObjHash); - pthread_rwlock_destroy(&(pMeta->rwLock)); - } -} - -int metaCreateTable(SMeta *pMeta, STableOpts *pTableOpts) { - size_t vallen; - STkvReadOpts * ropt; - STableObj * pTableObj = NULL; - STkvWriteOpts *wopt; - - // Check if table already exists - ropt = tkvReadOptsCreate(); - - char *uidStr = tkvGet(pMeta->tbnameDb, ropt, pTableOpts->name, strlen(pTableOpts->name), &vallen); - if (uidStr != NULL) { - // Has duplicate named table - return -1; - } - - tkvReadOptsDestroy(ropt); - - // Create table obj - pTableObj = metaTableObjNew(); - if (pTableObj == NULL) { - // TODO - return -1; - } - - // Create table object - pTableObj->pTable = - metaTableNew(generateUid(&(pMeta->uidGenerator)), pTableOpts->name, schemaVersion(pTableOpts->pSchema)); - if (pTableObj->pTable == NULL) { - // TODO - } - - pthread_rwlock_rdlock(&pMeta->rwLock); - - taosHashPut(pMeta->pTableObjHash, &(pTableObj->pTable->uid), sizeof(tb_uid_t), &pTableObj, sizeof(pTableObj)); - - wopt = tkvWriteOptsCreate(); - // rocksdb_writeoptions_disable_WAL(wopt, 1); - - // Add to tbname db - tkvPut(pMeta->tbnameDb, wopt, pTableOpts->name, strlen(pTableOpts->name), (char *)&pTableObj->pTable->uid, - sizeof(tb_uid_t)); - - // Add to schema db - char id[12]; - char buf[256]; - void *pBuf = buf; - *(tb_uid_t *)id = pTableObj->pTable->uid; - *(int32_t *)(id + sizeof(tb_uid_t)) = schemaVersion(pTableOpts->pSchema); - int size = tdEncodeSchema(&pBuf, pTableOpts->pSchema); - - tkvPut(pMeta->schemaDb, wopt, id, 12, buf, size); - - tkvWriteOptsDestroy(wopt); - - pthread_rwlock_unlock(&pMeta->rwLock); - - return 0; -} - -void metaDestroy(const char *path) { taosRemoveDir(path); } - -int metaCommit(SMeta *meta) { return 0; } - -void metaTableOptsInit(STableOpts *pTableOpts, int8_t type, const char *name, const STSchema *pSchema) { - pTableOpts->type = type; - pTableOpts->name = strdup(name); - pTableOpts->pSchema = tdDupSchema(pSchema); -} - -/* -------------------- Static Methods -------------------- */ - -static STable *metaTableNew(tb_uid_t uid, const char *name, int32_t sver) { - STable *pTable = NULL; - - pTable = (STable *)malloc(sizeof(*pTable)); - if (pTable == NULL) { - // TODO - return NULL; - } - - pTable->schema = taosArrayInit(0, sizeof(int32_t)); - if (pTable->schema == NULL) { - // TODO - return NULL; - } - - pTable->uid = uid; - pTable->name = strdup(name); - pTable->suid = IVLD_TB_UID; - taosArrayPush(pTable->schema, &sver); - - return pTable; -} - -static STableObj *metaTableObjNew() { - STableObj *pTableObj = NULL; - - pTableObj = (STableObj *)malloc(sizeof(*pTableObj)); - if (pTableObj == NULL) { - return NULL; - } - - pTableObj->pin = true; - pTableObj->ref = 1; - taosInitRWLatch(&(pTableObj->latch)); - pTableObj->offset = UINT64_MAX; - pTableObj->ctbList = NULL; - pTableObj->pTable = NULL; - - return pTableObj; -} \ No newline at end of file diff --git a/source/server/vnode/meta/src/metaMain.c b/source/server/vnode/meta/src/metaMain.c new file mode 100644 index 0000000000..6f529f04c8 --- /dev/null +++ b/source/server/vnode/meta/src/metaMain.c @@ -0,0 +1,228 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "meta.h" +#include "metaDef.h" +#include "tcoding.h" + +static int metaCreateSuperTable(SMeta *pMeta, const char *tbname, const SSuperTableOpts *pSuperTableOpts); +static int metaCreateChildTable(SMeta *pMeta, const char *tbname, const SChildTableOpts *pChildTableOpts); +static int metaCreateNormalTable(SMeta *pMeta, const char *tbname, const SNormalTableOpts *pNormalTableOpts); + +SMeta *metaOpen(SMetaOpts *pMetaOpts) { + SMeta *pMeta = NULL; + + pMeta = (SMeta *)calloc(1, sizeof(*pMeta)); + if (pMeta == NULL) { + return NULL; + } + + // TODO: check if file exists and handle the error + taosMkDir("meta"); + + // Open tableDb + STkvOpts *tableDbOpts = tkvOptsCreate(); + tkvOptsSetCreateIfMissing(tableDbOpts, 1); + pMeta->tableDb = tkvOpen(tableDbOpts, "meta/table_db"); + tkvOptsDestroy(tableDbOpts); + + // Open tbnameDb + STkvOpts *tbnameDbOpts = tkvOptsCreate(); + tkvOptsSetCreateIfMissing(tbnameDbOpts, 1); + pMeta->tbnameDb = tkvOpen(tbnameDbOpts, "meta/tbname_db"); + tkvOptsDestroy(tbnameDbOpts); + + // Open schemaDb + STkvOpts *schemaDbOpts = tkvOptsCreate(); + tkvOptsSetCreateIfMissing(schemaDbOpts, 1); + pMeta->schemaDb = tkvOpen(schemaDbOpts, "meta/schema_db"); + tkvOptsDestroy(schemaDbOpts); + + // Open tagDb + STkvOpts *tagDbOpts = tkvOptsCreate(); + tkvOptsSetCreateIfMissing(tagDbOpts, 1); + pMeta->tagDb = tkvOpen(tagDbOpts, "meta/tag_db"); + tkvOptsDestroy(tagDbOpts); + + // Open tagIdx + STkvOpts *tagIdxDbOpts = tkvOptsCreate(); + tkvOptsSetCreateIfMissing(tagIdxDbOpts, 1); + pMeta->tagIdx = tkvOpen(tagIdxDbOpts, "meta/tag_idx_db"); + tkvOptsDestroy(tagIdxDbOpts); + + // TODO: need to figure out how to persist the START UID + tableUidGeneratorInit(&(pMeta->uidGenerator), IVLD_TB_UID); +} + +void metaClose(SMeta *pMeta) { + if (pMeta) { + tableUidGeneratorClear(&pMeta->uidGenerator); + + tkvClose(pMeta->tagIdx); + tkvClose(pMeta->tagDb); + tkvClose(pMeta->schemaDb); + tkvClose(pMeta->tbnameDb); + tkvClose(pMeta->tableDb); + + free(pMeta); + } +} + +int metaCreateTable(SMeta *pMeta, const STableOpts *pTableOpts) { + size_t vallen; + char * pUid; + + // Check if table already exists + pUid = tkvGet(pMeta->tbnameDb, NULL, pTableOpts->name, strlen(pTableOpts->name), &vallen); + if (pUid) { + free(pUid); + // Table already exists, return error code + return -1; + } + + switch (pTableOpts->type) { + case META_SUPER_TABLE: + return metaCreateSuperTable(pMeta, pTableOpts->name, &(pTableOpts->superOpts)); + case META_CHILD_TABLE: + return metaCreateChildTable(pMeta, pTableOpts->name, &(pTableOpts->childOpts)); + case META_NORMAL_TABLE: + return metaCreateNormalTable(pMeta, pTableOpts->name, &(pTableOpts->normalOpts)); + default: + ASSERT(0); + } + + return 0; +} + +/* ------------------------ STATIC METHODS ------------------------ */ +static int metaCreateSuperTable(SMeta *pMeta, const char *tbname, const SSuperTableOpts *pSuperTableOpts) { + size_t vallen; + size_t keylen; + char * pVal = NULL; + char schemaKey[sizeof(tb_uid_t) * 2]; + char buffer[1024]; /* TODO */ + void * pBuf = NULL; + + pVal = tkvGet(pMeta->tableDb, NULL, (char *)(&(pSuperTableOpts->uid)), sizeof(pSuperTableOpts->uid), &vallen); + if (pVal) { + free(pVal); + // TODO: table with same uid exists, just return error + return -1; + } + + // Put uid -> tbObj + vallen = 0; + pBuf = (void *)buffer; + vallen += taosEncodeString(&pBuf, tbname); // ENCODE TABLE NAME + vallen += taosEncodeFixedI32(&pBuf, 1); // ENCODE SCHEMA, SCHEMA HERE IS AN ARRAY OF VERSIONS + vallen += taosEncodeFixedI32(&pBuf, schemaVersion(pSuperTableOpts->pSchema)); + vallen += tdEncodeSchema(&pBuf, pSuperTableOpts->pTagSchema); // ENCODE TAG SCHEMA + tkvPut(pMeta->tableDb, NULL, (char *)(&(pSuperTableOpts->uid)), sizeof(pSuperTableOpts->uid), buffer, vallen); + + // Put tbname -> uid + tkvPut(pMeta->tbnameDb, NULL, tbname, strlen(tbname), (char *)(&(pSuperTableOpts->uid)), + sizeof(pSuperTableOpts->uid)); + + // Put uid+sversion -> schema + *(tb_uid_t *)schemaKey = pSuperTableOpts->uid; + *(int32_t *)(POINTER_SHIFT(schemaKey, sizeof(tb_uid_t))) = schemaVersion(pSuperTableOpts->pSchema); + keylen = sizeof(tb_uid_t) + sizeof(int32_t); + pBuf = (void *)buffer; + vallen = tdEncodeSchema(&pBuf, pSuperTableOpts->pSchema); + tkvPut(pMeta->schemaDb, NULL, schemaKey, keylen, buffer, vallen); + + return 0; +} + +static int metaCreateChildTable(SMeta *pMeta, const char *tbname, const SChildTableOpts *pChildTableOpts) { + size_t vallen; + char buffer[1024]; /* TODO */ + void * pBuf = NULL; + char * pTable; + tb_uid_t uid; + + // Check if super table exists + pTable = tkvGet(pMeta->tableDb, NULL, (char *)(&(pChildTableOpts->suid)), sizeof(pChildTableOpts->suid), &vallen); + if (pTable == NULL) { + // Super table not exists, just return error + return -1; + } + + // Generate a uid to the new table + uid = generateUid(&pMeta->uidGenerator); + + // Put uid -> tbObj + vallen = 0; + pBuf = (void *)buffer; + vallen += taosEncodeString(&pBuf, tbname); + vallen += taosEncodeFixedU64(pBuf, pChildTableOpts->suid); + tkvPut(pMeta->tableDb, NULL, (char *)(&uid), sizeof(uid), buffer, vallen); + + // Put tbname -> uid + tkvPut(pMeta->tbnameDb, NULL, tbname, strlen(tbname), (char *)(&uid), sizeof(uid)); + + // Put uid-> tags + tkvPut(pMeta->tagDb, NULL, (char *)(&uid), sizeof(uid), (char *)pChildTableOpts->tags, + (size_t)kvRowLen(pChildTableOpts->tags)); + + // TODO: Put tagIdx + + return 0; +} + +static int metaCreateNormalTable(SMeta *pMeta, const char *tbname, const SNormalTableOpts *pNormalTableOpts) { + size_t vallen; + char keyBuf[sizeof(tb_uid_t) + sizeof(int32_t)]; + char buffer[1024]; /* TODO */ + void * pBuf = NULL; + tb_uid_t uid; + + // Generate a uid to the new table + uid = generateUid(&pMeta->uidGenerator); + + // Put uid -> tbObj + vallen = 0; + pBuf = (void *)buffer; + vallen += taosEncodeString(&pBuf, tbname); + vallen += taosEncodeFixedI32(&pBuf, 1); + vallen += taosEncodeFixedI32(&pBuf, schemaVersion(pNormalTableOpts->pSchema)); + tkvPut(pMeta->tableDb, NULL, (char *)(&uid), sizeof(uid), buffer, vallen); + + // Put tbname -> uid + tkvPut(pMeta->tbnameDb, NULL, tbname, strlen(tbname), (char *)(&(uid)), sizeof(uid)); + + // Put uid+sversion -> schema + vallen = 0; + pBuf = (void *)buffer; + vallen += tdEncodeSchema(&pBuf, pNormalTableOpts->pSchema); + tkvPut(pMeta->schemaDb, NULL, keyBuf, sizeof(tb_uid_t) + sizeof(int32_t), buffer, vallen); + + return 0; +} + +#if 0 +/* -------------------- Structures -------------------- */ +static STable * metaTableNew(tb_uid_t uid, const char *name, int32_t sver); + +void metaDestroy(const char *path) { taosRemoveDir(path); } + +int metaCommit(SMeta *meta) { return 0; } + +void metaTableOptsInit(STableOpts *pTableOpts, int8_t type, const char *name, const STSchema *pSchema) { + pTableOpts->type = type; + pTableOpts->name = strdup(name); + pTableOpts->pSchema = tdDupSchema(pSchema); +} +#endif \ No newline at end of file diff --git a/source/server/vnode/meta/test/CMakeLists.txt b/source/server/vnode/meta/test/CMakeLists.txt index bca02c2907..b37ba6abd4 100644 --- a/source/server/vnode/meta/test/CMakeLists.txt +++ b/source/server/vnode/meta/test/CMakeLists.txt @@ -1,7 +1,7 @@ add_executable(metaTest "") target_sources(metaTest PRIVATE - "../src/meta.c" + "../src/metaMain.c" "../src/metaUid.c" "metaTests.cpp" ) diff --git a/source/server/vnode/meta/test/metaTests.cpp b/source/server/vnode/meta/test/metaTests.cpp index 47ca49b70a..6e26243aeb 100644 --- a/source/server/vnode/meta/test/metaTests.cpp +++ b/source/server/vnode/meta/test/metaTests.cpp @@ -9,29 +9,29 @@ TEST(MetaTest, meta_open_test) { SMeta *meta = metaOpen(NULL); std::cout << "Meta is opened!" << std::endl; - // Create tables - STableOpts tbOpts; - char tbname[128]; - STSchema * pSchema; - STSchemaBuilder sb; - tdInitTSchemaBuilder(&sb, 0); - for (size_t i = 0; i < 10; i++) { - tdAddColToSchema(&sb, TSDB_DATA_TYPE_TIMESTAMP, i, 8); - } - pSchema = tdGetSchemaFromBuilder(&sb); - tdDestroyTSchemaBuilder(&sb); - for (size_t i = 0; i < 1000000; i++) { - sprintf(tbname, "tb%ld", i); - metaTableOptsInit(&tbOpts, 0, tbname, pSchema); + // // Create tables + // STableOpts tbOpts; + // char tbname[128]; + // STSchema * pSchema; + // STSchemaBuilder sb; + // tdInitTSchemaBuilder(&sb, 0); + // for (size_t i = 0; i < 10; i++) { + // tdAddColToSchema(&sb, TSDB_DATA_TYPE_TIMESTAMP, i, 8); + // } + // pSchema = tdGetSchemaFromBuilder(&sb); + // tdDestroyTSchemaBuilder(&sb); + // for (size_t i = 0; i < 1000000; i++) { + // sprintf(tbname, "tb%ld", i); + // metaTableOptsInit(&tbOpts, 0, tbname, pSchema); - metaCreateTable(meta, &tbOpts); - } + // metaCreateTable(meta, &tbOpts); + // } // Close Meta metaClose(meta); std::cout << "Meta is closed!" << std::endl; - // Destroy Meta - metaDestroy("meta"); - std::cout << "Meta is destroyed!" << std::endl; + // // Destroy Meta + // metaDestroy("meta"); + // std::cout << "Meta is destroyed!" << std::endl; } \ No newline at end of file From 7d7c380c3904f5db4b3be1f56cfebfe3f3d0dcee Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Fri, 15 Oct 2021 15:13:46 +0800 Subject: [PATCH 16/24] more --- include/server/vnode/meta/meta.h | 30 +++++++++++++++++- source/server/vnode/meta/inc/metaDef.h | 27 ---------------- source/server/vnode/meta/inc/metaUid.h | 3 +- source/server/vnode/meta/test/metaTests.cpp | 34 +++++++++++---------- 4 files changed, 48 insertions(+), 46 deletions(-) diff --git a/include/server/vnode/meta/meta.h b/include/server/vnode/meta/meta.h index df6c79d077..4775b8c2ad 100644 --- a/include/server/vnode/meta/meta.h +++ b/include/server/vnode/meta/meta.h @@ -24,9 +24,10 @@ extern "C" { #endif -typedef uint64_t tuid_t; +/* ------------------------ APIs Exposed ------------------------ */ // Types exported +typedef uint64_t tb_uid_t; typedef struct SMeta SMeta; typedef struct SMetaOpts SMetaOpts; typedef struct SMetaQueryHandle SMetaQueryHandle; @@ -59,6 +60,33 @@ void metaQueryOptionsDestroy(SMetaQueryOpts *); // STableOpts void metaTableOptsInit(STableOpts *, int8_t type, const char *name, const STSchema *pSchema); +/* ------------------------ Impl should hidden ------------------------ */ +typedef enum { META_SUPER_TABLE = 0, META_CHILD_TABLE = 1, META_NORMAL_TABLE = 2 } EMetaTableT; +typedef struct SSuperTableOpts { + tb_uid_t uid; + STSchema *pSchema; // (ts timestamp, a int) + STSchema *pTagSchema; // (tag1 binary(10), tag2 int) +} SSuperTableOpts; + +typedef struct SChildTableOpts { + tb_uid_t suid; // super table uid + SKVRow tags; // tag value of the child table +} SChildTableOpts; + +typedef struct SNormalTableOpts { + STSchema *pSchema; +} SNormalTableOpts; + +struct STableOpts { + EMetaTableT type; + char * name; + union { + SSuperTableOpts superOpts; + SChildTableOpts childOpts; + SNormalTableOpts normalOpts; + }; +}; + #ifdef __cplusplus } #endif diff --git a/source/server/vnode/meta/inc/metaDef.h b/source/server/vnode/meta/inc/metaDef.h index 3ff239e01e..f5d0b7f74c 100644 --- a/source/server/vnode/meta/inc/metaDef.h +++ b/source/server/vnode/meta/inc/metaDef.h @@ -34,33 +34,6 @@ struct SMeta { // STkvCache* metaCache; // TODO: add a global cache here }; -/* ------------------------ TEST CODE ------------------------ */ -typedef enum { META_SUPER_TABLE = 0, META_CHILD_TABLE = 1, META_NORMAL_TABLE = 2 } EMetaTableT; -typedef struct SSuperTableOpts { - tb_uid_t uid; - STSchema* pSchema; // (ts timestamp, a int) - STSchema* pTagSchema; // (tag1 binary(10), tag2 int) -} SSuperTableOpts; - -typedef struct SChildTableOpts { - tb_uid_t suid; // super table uid - SKVRow tags; // tag value of the child table -} SChildTableOpts; - -typedef struct SNormalTableOpts { - STSchema* pSchema; -} SNormalTableOpts; - -struct STableOpts { - EMetaTableT type; - char* name; - union { - SSuperTableOpts superOpts; - SChildTableOpts childOpts; - SNormalTableOpts normalOpts; - }; -}; - #ifdef __cplusplus } #endif diff --git a/source/server/vnode/meta/inc/metaUid.h b/source/server/vnode/meta/inc/metaUid.h index f0a747bc74..37c3fac6ba 100644 --- a/source/server/vnode/meta/inc/metaUid.h +++ b/source/server/vnode/meta/inc/metaUid.h @@ -16,14 +16,13 @@ #ifndef _TD_META_UID_H_ #define _TD_META_UID_H_ -#include "os.h" +#include "meta.h" #ifdef __cplusplus extern "C" { #endif /* ------------------------ APIS EXPOSED ------------------------ */ -typedef uint64_t tb_uid_t; typedef struct STableUidGenerator STableUidGenerator; // tb_uid_t diff --git a/source/server/vnode/meta/test/metaTests.cpp b/source/server/vnode/meta/test/metaTests.cpp index 6e26243aeb..c352b49603 100644 --- a/source/server/vnode/meta/test/metaTests.cpp +++ b/source/server/vnode/meta/test/metaTests.cpp @@ -9,23 +9,25 @@ TEST(MetaTest, meta_open_test) { SMeta *meta = metaOpen(NULL); std::cout << "Meta is opened!" << std::endl; - // // Create tables - // STableOpts tbOpts; - // char tbname[128]; - // STSchema * pSchema; - // STSchemaBuilder sb; - // tdInitTSchemaBuilder(&sb, 0); - // for (size_t i = 0; i < 10; i++) { - // tdAddColToSchema(&sb, TSDB_DATA_TYPE_TIMESTAMP, i, 8); - // } - // pSchema = tdGetSchemaFromBuilder(&sb); - // tdDestroyTSchemaBuilder(&sb); - // for (size_t i = 0; i < 1000000; i++) { - // sprintf(tbname, "tb%ld", i); - // metaTableOptsInit(&tbOpts, 0, tbname, pSchema); +#if 0 + // Create tables + STableOpts tbOpts; + char tbname[128]; + STSchema * pSchema; + STSchemaBuilder sb; + tdInitTSchemaBuilder(&sb, 0); + for (size_t i = 0; i < 10; i++) { + tdAddColToSchema(&sb, TSDB_DATA_TYPE_TIMESTAMP, i, 8); + } + pSchema = tdGetSchemaFromBuilder(&sb); + tdDestroyTSchemaBuilder(&sb); + for (size_t i = 0; i < 1000000; i++) { + sprintf(tbname, "tb%ld", i); + metaTableOptsInit(&tbOpts, 0, tbname, pSchema); - // metaCreateTable(meta, &tbOpts); - // } + metaCreateTable(meta, &tbOpts); + } +#endif // Close Meta metaClose(meta); From d1b6f2b25714c895f4fe6d556dc2b40c6ccf8806 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Fri, 15 Oct 2021 15:42:01 +0800 Subject: [PATCH 17/24] more --- include/server/vnode/meta/meta.h | 10 +++--- source/server/vnode/meta/src/metaMain.c | 20 +++++++++++ source/server/vnode/meta/test/metaTests.cpp | 38 ++++++++++++--------- 3 files changed, 48 insertions(+), 20 deletions(-) diff --git a/include/server/vnode/meta/meta.h b/include/server/vnode/meta/meta.h index 4775b8c2ad..541f1b0316 100644 --- a/include/server/vnode/meta/meta.h +++ b/include/server/vnode/meta/meta.h @@ -58,10 +58,12 @@ SMetaQueryOpts *metaQueryOptionsCreate(); void metaQueryOptionsDestroy(SMetaQueryOpts *); // STableOpts -void metaTableOptsInit(STableOpts *, int8_t type, const char *name, const STSchema *pSchema); +#define META_TABLE_OPTS_DECLARE(name) STableOpts name = {0}; +void metaNormalTableOptsInit(STableOpts *, const char *name, const STSchema *pSchema); +void metaTableOptsDestroy(STableOpts *); /* ------------------------ Impl should hidden ------------------------ */ -typedef enum { META_SUPER_TABLE = 0, META_CHILD_TABLE = 1, META_NORMAL_TABLE = 2 } EMetaTableT; +typedef enum { META_INIT_TABLE = 0, META_SUPER_TABLE = 1, META_CHILD_TABLE = 2, META_NORMAL_TABLE = 3 } EMetaTableT; typedef struct SSuperTableOpts { tb_uid_t uid; STSchema *pSchema; // (ts timestamp, a int) @@ -78,8 +80,8 @@ typedef struct SNormalTableOpts { } SNormalTableOpts; struct STableOpts { - EMetaTableT type; - char * name; + int8_t type; + char * name; union { SSuperTableOpts superOpts; SChildTableOpts childOpts; diff --git a/source/server/vnode/meta/src/metaMain.c b/source/server/vnode/meta/src/metaMain.c index 6f529f04c8..1b12b54004 100644 --- a/source/server/vnode/meta/src/metaMain.c +++ b/source/server/vnode/meta/src/metaMain.c @@ -212,6 +212,26 @@ static int metaCreateNormalTable(SMeta *pMeta, const char *tbname, const SNormal return 0; } +void metaNormalTableOptsInit(STableOpts *pTableOpts, const char *name, const STSchema *pSchema) { + pTableOpts->type = META_NORMAL_TABLE; + pTableOpts->name = strdup(name); + pTableOpts->normalOpts.pSchema = tdDupSchema(pSchema); +} + +void metaTableOptsDestroy(STableOpts *pTableOpts) { + switch (pTableOpts->type) { + case META_NORMAL_TABLE: + tfree(pTableOpts->name); + tdFreeSchema(pTableOpts->normalOpts.pSchema); + break; + default: + break; + } + + // TODO + pTableOpts->type = META_INIT_TABLE; +} + #if 0 /* -------------------- Structures -------------------- */ static STable * metaTableNew(tb_uid_t uid, const char *name, int32_t sver); diff --git a/source/server/vnode/meta/test/metaTests.cpp b/source/server/vnode/meta/test/metaTests.cpp index c352b49603..1fa08f746e 100644 --- a/source/server/vnode/meta/test/metaTests.cpp +++ b/source/server/vnode/meta/test/metaTests.cpp @@ -4,30 +4,36 @@ #include "meta.h" +static STSchema *metaGetSimpleSchema() { + STSchema * pSchema = NULL; + STSchemaBuilder sb; + + tdInitTSchemaBuilder(&sb, 0); + tdAddColToSchema(&sb, TSDB_DATA_TYPE_TIMESTAMP, 0, 8); + tdAddColToSchema(&sb, TSDB_DATA_TYPE_INT, 1, 4); + + pSchema = tdGetSchemaFromBuilder(&sb); + tdDestroyTSchemaBuilder(&sb); + + return pSchema; +} + TEST(MetaTest, meta_open_test) { // Open Meta SMeta *meta = metaOpen(NULL); std::cout << "Meta is opened!" << std::endl; -#if 0 - // Create tables - STableOpts tbOpts; - char tbname[128]; - STSchema * pSchema; - STSchemaBuilder sb; - tdInitTSchemaBuilder(&sb, 0); - for (size_t i = 0; i < 10; i++) { - tdAddColToSchema(&sb, TSDB_DATA_TYPE_TIMESTAMP, i, 8); - } - pSchema = tdGetSchemaFromBuilder(&sb); - tdDestroyTSchemaBuilder(&sb); - for (size_t i = 0; i < 1000000; i++) { - sprintf(tbname, "tb%ld", i); - metaTableOptsInit(&tbOpts, 0, tbname, pSchema); + // Create 1000000 normal tables + META_TABLE_OPTS_DECLARE(tbOpts) + STSchema *pSchema = metaGetSimpleSchema(); + char tbname[128]; + for (size_t i = 0; i < 1000000; i++) { + sprintf(tbname, "ntb%ld", i); + metaNormalTableOptsInit(&tbOpts, tbname, pSchema); metaCreateTable(meta, &tbOpts); + metaTableOptsDestroy(&tbOpts); } -#endif // Close Meta metaClose(meta); From 83fac24260ea30317bc2e65d2748a102c7ca1f21 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Fri, 15 Oct 2021 15:54:07 +0800 Subject: [PATCH 18/24] refact --- source/libs/tkv/inc/{tkvInt.h => tkvDef.h} | 37 ++++++++++++++++++++-- source/libs/tkv/src/tkv.c | 33 +------------------ 2 files changed, 35 insertions(+), 35 deletions(-) rename source/libs/tkv/inc/{tkvInt.h => tkvDef.h} (60%) diff --git a/source/libs/tkv/inc/tkvInt.h b/source/libs/tkv/inc/tkvDef.h similarity index 60% rename from source/libs/tkv/inc/tkvInt.h rename to source/libs/tkv/inc/tkvDef.h index 314adce3f5..7433b1a4dd 100644 --- a/source/libs/tkv/inc/tkvInt.h +++ b/source/libs/tkv/inc/tkvDef.h @@ -13,15 +13,46 @@ * along with this program. If not, see . */ -#ifndef _TD_TKV_INT_H_ -#define _TD_TKV_INT_H_ +#ifndef _TD_TKV_DEF_H_ +#define _TD_TKV_DEF_H_ + +#ifdef USE_ROCKSDB +#include +#endif #ifdef __cplusplus extern "C" { #endif +struct STkvDb { +#ifdef USE_ROCKSDB + rocksdb_t *db; +#endif +}; + +struct STkvOpts { +#ifdef USE_ROCKSDB + rocksdb_options_t *opts; +#endif +}; + +struct STkvCache { + // TODO +}; + +struct STkvReadOpts { +#ifdef USE_ROCKSDB + rocksdb_readoptions_t *ropts; +#endif +}; + +struct STkvWriteOpts { +#ifdef USE_ROCKSDB + rocksdb_writeoptions_t *wopts; +#endif +}; #ifdef __cplusplus } #endif -#endif /*_TD_TKV_INT_H_*/ \ No newline at end of file +#endif /*_TD_TKV_DEF_H_*/ \ No newline at end of file diff --git a/source/libs/tkv/src/tkv.c b/source/libs/tkv/src/tkv.c index 47213f9f02..a0e2adbfad 100644 --- a/source/libs/tkv/src/tkv.c +++ b/source/libs/tkv/src/tkv.c @@ -13,39 +13,8 @@ * along with this program. If not, see . */ -#ifdef USE_ROCKSDB -#include -#endif - #include "tkv.h" - -struct STkvDb { -#ifdef USE_ROCKSDB - rocksdb_t *db; -#endif -}; - -struct STkvOpts { -#ifdef USE_ROCKSDB - rocksdb_options_t *opts; -#endif -}; - -struct STkvCache { - // TODO -}; - -struct STkvReadOpts { -#ifdef USE_ROCKSDB - rocksdb_readoptions_t *ropts; -#endif -}; - -struct STkvWriteOpts { -#ifdef USE_ROCKSDB - rocksdb_writeoptions_t *wopts; -#endif -}; +#include "tkvDef.h" STkvDb *tkvOpen(const STkvOpts *options, const char *path) { STkvDb *pDb = NULL; From c74f6292cc0fc09d68f66a294e62d815627cbd8c Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Fri, 15 Oct 2021 16:17:09 +0800 Subject: [PATCH 19/24] more --- source/libs/tkv/src/tkv.c | 27 ++++++++++++++++++++++++--- 1 file changed, 24 insertions(+), 3 deletions(-) diff --git a/source/libs/tkv/src/tkv.c b/source/libs/tkv/src/tkv.c index a0e2adbfad..5319f6b9da 100644 --- a/source/libs/tkv/src/tkv.c +++ b/source/libs/tkv/src/tkv.c @@ -16,7 +16,14 @@ #include "tkv.h" #include "tkvDef.h" +static pthread_once_t isInit = PTHREAD_ONCE_INIT; +static STkvReadOpts defaultReadOpts; +static STkvWriteOpts defaultWriteOpts; + +static void tkvInit(); + STkvDb *tkvOpen(const STkvOpts *options, const char *path) { + pthread_once(&isInit, tkvInit); STkvDb *pDb = NULL; pDb = (STkvDb *)malloc(sizeof(*pDb)); @@ -46,7 +53,7 @@ void tkvClose(STkvDb *pDb) { void tkvPut(STkvDb *pDb, const STkvWriteOpts *pwopts, const char *key, size_t keylen, const char *val, size_t vallen) { #ifdef USE_ROCKSDB char *err = NULL; - rocksdb_put(pDb->db, pwopts->wopts, key, keylen, val, vallen, &err); + rocksdb_put(pDb->db, pwopts ? pwopts->wopts : defaultWriteOpts.wopts, key, keylen, val, vallen, &err); // TODO: check error #endif } @@ -56,7 +63,7 @@ char *tkvGet(STkvDb *pDb, const STkvReadOpts *propts, const char *key, size_t ke #ifdef USE_ROCKSDB char *err = NULL; - ret = rocksdb_get(pDb->db, propts->ropts, key, keylen, vallen, &err); + ret = rocksdb_get(pDb->db, propts ? propts->ropts : defaultReadOpts.ropts, key, keylen, vallen, &err); // TODD: check error #endif @@ -144,5 +151,19 @@ void tkvWriteOptsDestroy(STkvWriteOpts *pWriteOpts) { #endif free(pWriteOpts); } - // TODO +} + +/* ------------------------ STATIC METHODS ------------------------ */ +static void tkvInit() { +#ifdef USE_ROCKSDB + defaultReadOpts.ropts = rocksdb_readoptions_create(); + defaultWriteOpts.wopts = rocksdb_writeoptions_create(); +#endif +} + +static void tkvClear() { +#ifdef USE_ROCKSDB + rocksdb_readoptions_destroy(defaultReadOpts.ropts); + rocksdb_writeoptions_destroy(defaultWriteOpts.wopts); +#endif } \ No newline at end of file From 841eae909ae539c2490e84f2d5d093e1ace8635e Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Fri, 15 Oct 2021 16:21:57 +0800 Subject: [PATCH 20/24] more --- source/server/vnode/meta/test/metaTests.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/server/vnode/meta/test/metaTests.cpp b/source/server/vnode/meta/test/metaTests.cpp index 1fa08f746e..8879f5bb9a 100644 --- a/source/server/vnode/meta/test/metaTests.cpp +++ b/source/server/vnode/meta/test/metaTests.cpp @@ -18,7 +18,7 @@ static STSchema *metaGetSimpleSchema() { return pSchema; } -TEST(MetaTest, meta_open_test) { +TEST(MetaTest, meta_create_1m_normal_tables_test) { // Open Meta SMeta *meta = metaOpen(NULL); std::cout << "Meta is opened!" << std::endl; From d0e645e4f542a3ee05cae5bbce881fcfc49d0c21 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Fri, 15 Oct 2021 16:25:09 +0800 Subject: [PATCH 21/24] more --- source/server/vnode/meta/src/metaMain.c | 13 ------------- source/server/vnode/meta/test/metaTests.cpp | 6 +++--- 2 files changed, 3 insertions(+), 16 deletions(-) diff --git a/source/server/vnode/meta/src/metaMain.c b/source/server/vnode/meta/src/metaMain.c index 1b12b54004..401ad6ff53 100644 --- a/source/server/vnode/meta/src/metaMain.c +++ b/source/server/vnode/meta/src/metaMain.c @@ -232,17 +232,4 @@ void metaTableOptsDestroy(STableOpts *pTableOpts) { pTableOpts->type = META_INIT_TABLE; } -#if 0 -/* -------------------- Structures -------------------- */ -static STable * metaTableNew(tb_uid_t uid, const char *name, int32_t sver); - void metaDestroy(const char *path) { taosRemoveDir(path); } - -int metaCommit(SMeta *meta) { return 0; } - -void metaTableOptsInit(STableOpts *pTableOpts, int8_t type, const char *name, const STSchema *pSchema) { - pTableOpts->type = type; - pTableOpts->name = strdup(name); - pTableOpts->pSchema = tdDupSchema(pSchema); -} -#endif \ No newline at end of file diff --git a/source/server/vnode/meta/test/metaTests.cpp b/source/server/vnode/meta/test/metaTests.cpp index 8879f5bb9a..aa645a143b 100644 --- a/source/server/vnode/meta/test/metaTests.cpp +++ b/source/server/vnode/meta/test/metaTests.cpp @@ -39,7 +39,7 @@ TEST(MetaTest, meta_create_1m_normal_tables_test) { metaClose(meta); std::cout << "Meta is closed!" << std::endl; - // // Destroy Meta - // metaDestroy("meta"); - // std::cout << "Meta is destroyed!" << std::endl; + // Destroy Meta + metaDestroy("meta"); + std::cout << "Meta is destroyed!" << std::endl; } \ No newline at end of file From ed3beb248a3e5f67f6d9dcfbf88400a2969c78cd Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Fri, 15 Oct 2021 17:07:37 +0800 Subject: [PATCH 22/24] more --- include/server/vnode/meta/meta.h | 7 ++- source/server/vnode/meta/src/metaMain.c | 32 ++++++++-- source/server/vnode/meta/test/metaTests.cpp | 66 +++++++++++++++++++-- 3 files changed, 95 insertions(+), 10 deletions(-) diff --git a/include/server/vnode/meta/meta.h b/include/server/vnode/meta/meta.h index 541f1b0316..ae81f995d7 100644 --- a/include/server/vnode/meta/meta.h +++ b/include/server/vnode/meta/meta.h @@ -58,9 +58,12 @@ SMetaQueryOpts *metaQueryOptionsCreate(); void metaQueryOptionsDestroy(SMetaQueryOpts *); // STableOpts -#define META_TABLE_OPTS_DECLARE(name) STableOpts name = {0}; +#define META_TABLE_OPTS_DECLARE(name) STableOpts name = {0} void metaNormalTableOptsInit(STableOpts *, const char *name, const STSchema *pSchema); -void metaTableOptsDestroy(STableOpts *); +void metaSuperTableOptsInit(STableOpts *, const char *name, tb_uid_t uid, const STSchema *pSchema, + const STSchema *pTagSchema); +void metaChildTableOptsInit(STableOpts *, const char *name, tb_uid_t suid, const SKVRow tags); +void metaTableOptsClear(STableOpts *); /* ------------------------ Impl should hidden ------------------------ */ typedef enum { META_INIT_TABLE = 0, META_SUPER_TABLE = 1, META_CHILD_TABLE = 2, META_NORMAL_TABLE = 3 } EMetaTableT; diff --git a/source/server/vnode/meta/src/metaMain.c b/source/server/vnode/meta/src/metaMain.c index 401ad6ff53..612ef49a04 100644 --- a/source/server/vnode/meta/src/metaMain.c +++ b/source/server/vnode/meta/src/metaMain.c @@ -167,7 +167,7 @@ static int metaCreateChildTable(SMeta *pMeta, const char *tbname, const SChildTa vallen = 0; pBuf = (void *)buffer; vallen += taosEncodeString(&pBuf, tbname); - vallen += taosEncodeFixedU64(pBuf, pChildTableOpts->suid); + vallen += taosEncodeFixedU64(&pBuf, pChildTableOpts->suid); tkvPut(pMeta->tableDb, NULL, (char *)(&uid), sizeof(uid), buffer, vallen); // Put tbname -> uid @@ -218,18 +218,42 @@ void metaNormalTableOptsInit(STableOpts *pTableOpts, const char *name, const STS pTableOpts->normalOpts.pSchema = tdDupSchema(pSchema); } -void metaTableOptsDestroy(STableOpts *pTableOpts) { +void metaSuperTableOptsInit(STableOpts *pTableOpts, const char *name, tb_uid_t uid, const STSchema *pSchema, + const STSchema *pTagSchema) { + pTableOpts->type = META_SUPER_TABLE; + pTableOpts->name = strdup(name); + pTableOpts->superOpts.uid = uid; + pTableOpts->superOpts.pSchema = tdDupSchema(pSchema); + pTableOpts->superOpts.pTagSchema = tdDupSchema(pTagSchema); +} + +void metaChildTableOptsInit(STableOpts *pTableOpts, const char *name, tb_uid_t suid, const SKVRow tags) { + pTableOpts->type = META_CHILD_TABLE; + pTableOpts->name = strdup(name); + pTableOpts->childOpts.suid = suid; + pTableOpts->childOpts.tags = tdKVRowDup(tags); +} + +void metaTableOptsClear(STableOpts *pTableOpts) { switch (pTableOpts->type) { case META_NORMAL_TABLE: tfree(pTableOpts->name); tdFreeSchema(pTableOpts->normalOpts.pSchema); break; + case META_SUPER_TABLE: + tdFreeSchema(pTableOpts->superOpts.pTagSchema); + tdFreeSchema(pTableOpts->superOpts.pSchema); + tfree(pTableOpts->name); + break; + case META_CHILD_TABLE: + kvRowFree(pTableOpts->childOpts.tags); + tfree(pTableOpts->name); + break; default: break; } - // TODO - pTableOpts->type = META_INIT_TABLE; + memset(pTableOpts, 0, sizeof(*pTableOpts)); } void metaDestroy(const char *path) { taosRemoveDir(path); } diff --git a/source/server/vnode/meta/test/metaTests.cpp b/source/server/vnode/meta/test/metaTests.cpp index aa645a143b..727d44f341 100644 --- a/source/server/vnode/meta/test/metaTests.cpp +++ b/source/server/vnode/meta/test/metaTests.cpp @@ -6,7 +6,7 @@ static STSchema *metaGetSimpleSchema() { STSchema * pSchema = NULL; - STSchemaBuilder sb; + STSchemaBuilder sb = {0}; tdInitTSchemaBuilder(&sb, 0); tdAddColToSchema(&sb, TSDB_DATA_TYPE_TIMESTAMP, 0, 8); @@ -18,13 +18,31 @@ static STSchema *metaGetSimpleSchema() { return pSchema; } -TEST(MetaTest, meta_create_1m_normal_tables_test) { +static SKVRow metaGetSimpleTags() { + SKVRowBuilder kvrb = {0}; + SKVRow row; + + tdInitKVRowBuilder(&kvrb); + int64_t ts = 1634287978000; + int32_t a = 10; + + tdAddColToKVRow(&kvrb, 0, TSDB_DATA_TYPE_TIMESTAMP, (void *)(&ts)); + tdAddColToKVRow(&kvrb, 0, TSDB_DATA_TYPE_INT, (void *)(&a)); + + row = tdGetKVRowFromBuilder(&kvrb); + + tdDestroyKVRowBuilder(&kvrb); + + return row; +} + +TEST(MetaTest, DISABLED_meta_create_1m_normal_tables_test) { // Open Meta SMeta *meta = metaOpen(NULL); std::cout << "Meta is opened!" << std::endl; // Create 1000000 normal tables - META_TABLE_OPTS_DECLARE(tbOpts) + META_TABLE_OPTS_DECLARE(tbOpts); STSchema *pSchema = metaGetSimpleSchema(); char tbname[128]; @@ -32,9 +50,49 @@ TEST(MetaTest, meta_create_1m_normal_tables_test) { sprintf(tbname, "ntb%ld", i); metaNormalTableOptsInit(&tbOpts, tbname, pSchema); metaCreateTable(meta, &tbOpts); - metaTableOptsDestroy(&tbOpts); + metaTableOptsClear(&tbOpts); } + tdFreeSchema(pSchema); + + // Close Meta + metaClose(meta); + std::cout << "Meta is closed!" << std::endl; + + // Destroy Meta + metaDestroy("meta"); + std::cout << "Meta is destroyed!" << std::endl; +} + +TEST(MetaTest, meta_create_1m_child_tables_test) { + // Open Meta + SMeta *meta = metaOpen(NULL); + std::cout << "Meta is opened!" << std::endl; + + // Create a super tables + tb_uid_t uid = 477529885843758ul; + META_TABLE_OPTS_DECLARE(tbOpts); + STSchema *pSchema = metaGetSimpleSchema(); + STSchema *pTagSchema = metaGetSimpleSchema(); + + metaSuperTableOptsInit(&tbOpts, "st", uid, pSchema, pTagSchema); + metaCreateTable(meta, &tbOpts); + metaTableOptsClear(&tbOpts); + + tdFreeSchema(pSchema); + tdFreeSchema(pTagSchema); + + // Create 1000000 child tables + char name[128]; + SKVRow row = metaGetSimpleTags(); + for (size_t i = 0; i < 1000000; i++) { + sprintf(name, "ctb%ld", i); + metaChildTableOptsInit(&tbOpts, name, uid, row); + metaCreateTable(meta, &tbOpts); + metaTableOptsClear(&tbOpts); + } + kvRowFree(row); + // Close Meta metaClose(meta); std::cout << "Meta is closed!" << std::endl; From 73fa7a7fb79e0369bb58a66c4c35ee4de8f0dcac Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Mon, 18 Oct 2021 10:46:53 +0800 Subject: [PATCH 23/24] vnode process mq msg (#8287) add vnodeprocess msg function --- include/common/taosmsg.h | 38 ++++++++++++++++++++++++- include/server/vnode/tq/tq.h | 5 ++-- source/server/dnode/src/dnodeTrans.c | 11 +++++-- source/server/vnode/inc/vnodeReadMsg.h | 2 ++ source/server/vnode/inc/vnodeWriteMsg.h | 8 +++++- source/server/vnode/src/vnodeMain.c | 36 ++++++++++++++--------- source/server/vnode/src/vnodeRead.c | 3 ++ source/server/vnode/src/vnodeWrite.c | 18 ++++++++++-- 8 files changed, 99 insertions(+), 22 deletions(-) diff --git a/include/common/taosmsg.h b/include/common/taosmsg.h index 78f91cca64..66a02f350e 100644 --- a/include/common/taosmsg.h +++ b/include/common/taosmsg.h @@ -42,7 +42,9 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_QUERY, "query" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_FETCH, "fetch" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_UPDATE_TAG_VAL, "update-tag-val" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_CONNECT, "mq-connect" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_DISCONNECT, "mq-disconnect" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_CONSUME, "mq-consume" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_QUERY, "mq-query" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_ACK, "mq-ack" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_RESET, "mq-reset" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY1, "dummy1" ) @@ -121,7 +123,7 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_DROP_TP, "drop-tp" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_ALTER_TP, "alter-tp" ) #ifndef TAOS_MESSAGE_C - TSDB_MSG_TYPE_MAX // 105 + TSDB_MSG_TYPE_MAX // 147 #endif }; @@ -958,6 +960,40 @@ typedef struct { char reserved2[64]; } SStartupStep; +// mq related +typedef struct { + +} SMqConnectReq; + +typedef struct { + +} SMqConnectRsp; + +typedef struct { + +} SMqDisconnectReq; + +typedef struct { + +} SMqDisconnectRsp; + +typedef struct { + +} SMqAckReq; + +typedef struct { + +} SMqAckRsp; + +typedef struct { + +} SMqResetReq; + +typedef struct { + +} SMqResetRsp; +//mq related end + typedef struct { /* data */ } SSubmitReq; diff --git a/include/server/vnode/tq/tq.h b/include/server/vnode/tq/tq.h index ef6a34ffa3..6e56e8256f 100644 --- a/include/server/vnode/tq/tq.h +++ b/include/server/vnode/tq/tq.h @@ -26,8 +26,9 @@ typedef struct tmqMsgHead { int32_t headLen; int32_t msgVer; int64_t cgId; - int32_t topicLen; - char topic[]; + int64_t topicId; + int32_t checksum; + int32_t msgType; } tmqMsgHead; //TODO: put msgs into common diff --git a/source/server/dnode/src/dnodeTrans.c b/source/server/dnode/src/dnodeTrans.c index a4409674f1..1739283f34 100644 --- a/source/server/dnode/src/dnodeTrans.c +++ b/source/server/dnode/src/dnodeTrans.c @@ -97,6 +97,9 @@ int32_t dnodeInitServer() { tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_GRANT] = mnodeProcessMsg; tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_STATUS] = mnodeProcessMsg; + tsTrans.peerMsgFp[TSDB_MSG_TYPE_MQ_CONNECT] = vnodeProcessMsg; + /*tsTrans.peerMsgFp[TSDB_MSG_TYPE_MQ_CONSUME] = vnodeProcessRead;*/ + SRpcInit rpcInit; memset(&rpcInit, 0, sizeof(rpcInit)); rpcInit.localPort = tsDnodeDnodePort; @@ -308,10 +311,12 @@ static int32_t dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, c } int32_t dnodeInitShell() { - tsTrans.shellMsgFp[TSDB_MSG_TYPE_SUBMIT] = vnodeProcessMsg; - tsTrans.shellMsgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessMsg; - tsTrans.shellMsgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessMsg; + tsTrans.shellMsgFp[TSDB_MSG_TYPE_SUBMIT] = vnodeProcessMsg; + tsTrans.shellMsgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessMsg; + tsTrans.shellMsgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessMsg; tsTrans.shellMsgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = vnodeProcessMsg; + tsTrans.shellMsgFp[TSDB_MSG_TYPE_MQ_QUERY] = vnodeProcessMsg; + tsTrans.shellMsgFp[TSDB_MSG_TYPE_MQ_CONSUME] = vnodeProcessMsg; // the following message shall be treated as mnode write tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_CREATE_ACCT] = mnodeProcessMsg; diff --git a/source/server/vnode/inc/vnodeReadMsg.h b/source/server/vnode/inc/vnodeReadMsg.h index a1efb729e1..1efc74d1af 100644 --- a/source/server/vnode/inc/vnodeReadMsg.h +++ b/source/server/vnode/inc/vnodeReadMsg.h @@ -36,6 +36,8 @@ typedef struct SReadMsg { int32_t vnodeProcessQueryMsg(SVnode *pVnode, SReadMsg *pRead); int32_t vnodeProcessFetchMsg(SVnode *pVnode, SReadMsg *pRead); +int32_t vnodeProcessConsumeMsg(SVnode *pVnode, SReadMsg *pRead); +int32_t vnodeProcessTqQueryMsg(SVnode *pVnode, SReadMsg *pRead); #ifdef __cplusplus } diff --git a/source/server/vnode/inc/vnodeWriteMsg.h b/source/server/vnode/inc/vnodeWriteMsg.h index 86cdba6946..9dbc4fe490 100644 --- a/source/server/vnode/inc/vnodeWriteMsg.h +++ b/source/server/vnode/inc/vnodeWriteMsg.h @@ -27,9 +27,15 @@ int32_t vnodeProcessDropTableReq(SVnode *pVnode, SDropTableReq *pReq, SDropTable int32_t vnodeProcessAlterTableReq(SVnode *pVnode, SAlterTableReq *pReq, SAlterTableRsp *pRsp); int32_t vnodeProcessDropStableReq(SVnode *pVnode, SDropStableReq *pReq, SDropStableRsp *pRsp); int32_t vnodeProcessUpdateTagValReq(SVnode *pVnode, SUpdateTagValReq *pReq, SUpdateTagValRsp *pRsp); +//mq related +int32_t vnodeProcessMqConnectReq(SVnode* pVnode, SMqConnectReq *pReq, SMqConnectRsp *pRsp); +int32_t vnodeProcessMqDisconnectReq(SVnode* pVnode, SMqConnectReq *pReq, SMqConnectRsp *pRsp); +int32_t vnodeProcessMqAckReq(SVnode* pVnode, SMqAckReq *pReq, SMqAckRsp *pRsp); +int32_t vnodeProcessMqResetReq(SVnode* pVnode, SMqResetReq *pReq, SMqResetRsp *pRsp); +//mq related end #ifdef __cplusplus } #endif -#endif /*_TD_VNODE_WRITE_MSG_H_*/ \ No newline at end of file +#endif /*_TD_VNODE_WRITE_MSG_H_*/ diff --git a/source/server/vnode/src/vnodeMain.c b/source/server/vnode/src/vnodeMain.c index d9c1a88d15..da1c1d7235 100644 --- a/source/server/vnode/src/vnodeMain.c +++ b/source/server/vnode/src/vnodeMain.c @@ -780,20 +780,30 @@ static void vnodeCleanupVnodes() { } static void vnodeInitMsgFp() { - tsVmain.msgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = vnodeProcessMgmtMsg; - tsVmain.msgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE] = vnodeProcessMgmtMsg; - tsVmain.msgFp[TSDB_MSG_TYPE_MD_SYNC_VNODE] = vnodeProcessMgmtMsg; + tsVmain.msgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = vnodeProcessMgmtMsg; + tsVmain.msgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE] = vnodeProcessMgmtMsg; + tsVmain.msgFp[TSDB_MSG_TYPE_MD_SYNC_VNODE] = vnodeProcessMgmtMsg; tsVmain.msgFp[TSDB_MSG_TYPE_MD_COMPACT_VNODE] = vnodeProcessMgmtMsg; - tsVmain.msgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = vnodeProcessMgmtMsg; - tsVmain.msgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = vnodeProcessMgmtMsg; - tsVmain.msgFp[TSDB_MSG_TYPE_MD_CREATE_TABLE] = vnodeProcessWriteMsg; - tsVmain.msgFp[TSDB_MSG_TYPE_MD_DROP_TABLE] = vnodeProcessWriteMsg; - tsVmain.msgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE] = vnodeProcessWriteMsg; - tsVmain.msgFp[TSDB_MSG_TYPE_MD_DROP_STABLE] = vnodeProcessWriteMsg; - tsVmain.msgFp[TSDB_MSG_TYPE_SUBMIT] = vnodeProcessWriteMsg; - tsVmain.msgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = vnodeProcessWriteMsg; - tsVmain.msgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessReadMsg; - tsVmain.msgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessReadMsg; + tsVmain.msgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = vnodeProcessMgmtMsg; + tsVmain.msgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = vnodeProcessMgmtMsg; + tsVmain.msgFp[TSDB_MSG_TYPE_MD_CREATE_TABLE] = vnodeProcessWriteMsg; + tsVmain.msgFp[TSDB_MSG_TYPE_MD_DROP_TABLE] = vnodeProcessWriteMsg; + tsVmain.msgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE] = vnodeProcessWriteMsg; + tsVmain.msgFp[TSDB_MSG_TYPE_MD_DROP_STABLE] = vnodeProcessWriteMsg; + tsVmain.msgFp[TSDB_MSG_TYPE_SUBMIT] = vnodeProcessWriteMsg; + tsVmain.msgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = vnodeProcessWriteMsg; + //mq related + tsVmain.msgFp[TSDB_MSG_TYPE_MQ_CONNECT] = vnodeProcessWriteMsg; + tsVmain.msgFp[TSDB_MSG_TYPE_MQ_DISCONNECT] = vnodeProcessWriteMsg; + tsVmain.msgFp[TSDB_MSG_TYPE_MQ_ACK] = vnodeProcessWriteMsg; + tsVmain.msgFp[TSDB_MSG_TYPE_MQ_RESET] = vnodeProcessWriteMsg; + //mq related end + tsVmain.msgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessReadMsg; + tsVmain.msgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessReadMsg; + //mq related + tsVmain.msgFp[TSDB_MSG_TYPE_MQ_QUERY] = vnodeProcessReadMsg; + tsVmain.msgFp[TSDB_MSG_TYPE_MQ_CONSUME] = vnodeProcessReadMsg; + //mq related end } void vnodeProcessMsg(SRpcMsg *pMsg) { diff --git a/source/server/vnode/src/vnodeRead.c b/source/server/vnode/src/vnodeRead.c index 39b6983b7d..0bf907c419 100644 --- a/source/server/vnode/src/vnodeRead.c +++ b/source/server/vnode/src/vnodeRead.c @@ -141,6 +141,9 @@ void vnodeProcessReadMsg(SRpcMsg *pMsg) { static void vnodeInitReadMsgFp() { tsVread.msgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessQueryMsg; tsVread.msgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessFetchMsg; + + tsVread.msgFp[TSDB_MSG_TYPE_MQ_QUERY] = vnodeProcessTqQueryMsg; + tsVread.msgFp[TSDB_MSG_TYPE_MQ_CONSUME] = vnodeProcessConsumeMsg; } static int32_t vnodeProcessReadStart(SVnode *pVnode, SReadMsg *pRead, int32_t qtype) { diff --git a/source/server/vnode/src/vnodeWrite.c b/source/server/vnode/src/vnodeWrite.c index 3c2634a2cf..c103460241 100644 --- a/source/server/vnode/src/vnodeWrite.c +++ b/source/server/vnode/src/vnodeWrite.c @@ -179,6 +179,20 @@ static bool vnodeProcessWriteStart(SVnode *pVnode, SVnWriteMsg *pWrite, int32_t case TSDB_MSG_TYPE_UPDATE_TAG_VAL: pWrite->code = vnodeProcessUpdateTagValReq(pVnode, (void*)pHead->cont, NULL); break; + //mq related + case TSDB_MSG_TYPE_MQ_CONNECT: + pWrite->code = vnodeProcessMqConnectReq(pVnode, (void*)pHead->cont, NULL); + break; + case TSDB_MSG_TYPE_MQ_DISCONNECT: + pWrite->code = vnodeProcessMqDisconnectReq(pVnode, (void*)pHead->cont, NULL); + break; + case TSDB_MSG_TYPE_MQ_ACK: + pWrite->code = vnodeProcessMqAckReq(pVnode, (void*)pHead->cont, NULL); + break; + case TSDB_MSG_TYPE_MQ_RESET: + pWrite->code = vnodeProcessMqResetReq(pVnode, (void*)pHead->cont, NULL); + break; + //mq related end default: pWrite->code = TSDB_CODE_VND_MSG_NOT_PROCESSED; break; @@ -186,7 +200,7 @@ static bool vnodeProcessWriteStart(SVnode *pVnode, SVnWriteMsg *pWrite, int32_t if (pWrite->code < 0) return false; - // update fync + // update fsync return (pWrite->code == 0 && msgType != TSDB_MSG_TYPE_SUBMIT); } @@ -233,4 +247,4 @@ void vnodeCleanupWrite() { taos_queue vnodeAllocWriteQueue(SVnode *pVnode) { return tWriteWorkerAllocQueue(&tsVwrite.pool, pVnode); } -void vnodeFreeWriteQueue(taos_queue pQueue) { tWriteWorkerFreeQueue(&tsVwrite.pool, pQueue); } \ No newline at end of file +void vnodeFreeWriteQueue(taos_queue pQueue) { tWriteWorkerFreeQueue(&tsVwrite.pool, pQueue); } From 8be1a251df6c6d4eb979296d96936c72ebe33f5e Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Mon, 18 Oct 2021 10:51:35 +0800 Subject: [PATCH 24/24] handle compile error for tq (#8288) add vnodeprocess msg for tq --- source/server/vnode/src/vnodeReadMsg.c | 9 +++++++++ source/server/vnode/src/vnodeWriteMsg.c | 15 +++++++++++++++ 2 files changed, 24 insertions(+) diff --git a/source/server/vnode/src/vnodeReadMsg.c b/source/server/vnode/src/vnodeReadMsg.c index 8a0f4b2e0f..158e550dcf 100644 --- a/source/server/vnode/src/vnodeReadMsg.c +++ b/source/server/vnode/src/vnodeReadMsg.c @@ -217,6 +217,15 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SReadMsg *pRead) { return 0; } +//mq related +int32_t vnodeProcessConsumeMsg(SVnode *pVnode, SReadMsg *pRead){ + return 0; +} +int32_t vnodeProcessTqQueryMsg(SVnode *pVnode, SReadMsg *pRead) { + return 0; +} +//mq related end + int32_t vnodeProcessFetchMsg(SVnode *pVnode, SReadMsg *pRead) { #if 0 void * pCont = pRead->pCont; diff --git a/source/server/vnode/src/vnodeWriteMsg.c b/source/server/vnode/src/vnodeWriteMsg.c index 0fe6fa2bc9..2e13d0035d 100644 --- a/source/server/vnode/src/vnodeWriteMsg.c +++ b/source/server/vnode/src/vnodeWriteMsg.c @@ -77,3 +77,18 @@ int32_t vnodeProcessUpdateTagValReq(SVnode *pVnode, SUpdateTagValReq *pReq, SUpd // TODO return 0; } + +//mq related +int32_t vnodeProcessMqConnectReq(SVnode* pVnode, SMqConnectReq *pReq, SMqConnectRsp *pRsp){ + return 0; +} +int32_t vnodeProcessMqDisconnectReq(SVnode* pVnode, SMqConnectReq *pReq, SMqConnectRsp *pRsp) { + return 0; +} +int32_t vnodeProcessMqAckReq(SVnode* pVnode, SMqAckReq *pReq, SMqAckRsp *pRsp) { + return 0; +} +int32_t vnodeProcessMqResetReq(SVnode* pVnode, SMqResetReq *pReq, SMqResetRsp *pRsp) { + return 0; +} +//mq related end