Merge branch '3.0' into feature/3.0_liaohj

This commit is contained in:
Haojun Liao 2021-10-21 11:07:50 +08:00
commit 0671a3ff81
44 changed files with 1480 additions and 362 deletions

3
.gitignore vendored
View File

@ -1,4 +1,6 @@
build/ build/
compile_commands.json
.cache
.ycm_extra_conf.py .ycm_extra_conf.py
.vscode/ .vscode/
.idea/ .idea/
@ -97,3 +99,4 @@ TAGS
deps/* deps/*
!deps/CMakeLists.txt !deps/CMakeLists.txt
!deps/test

View File

@ -10,6 +10,7 @@ project(
# DEPENDENCIES # DEPENDENCIES
# ============================================================================ # ============================================================================
set(CMAKE_SUPPORT_DIR "${CMAKE_SOURCE_DIR}/cmake") set(CMAKE_SUPPORT_DIR "${CMAKE_SOURCE_DIR}/cmake")
include(${CMAKE_SUPPORT_DIR}/cmake.options)
function(cat IN_FILE OUT_FILE) function(cat IN_FILE OUT_FILE)
file(READ ${IN_FILE} CONTENTS) file(READ ${IN_FILE} CONTENTS)
@ -20,8 +21,6 @@ set(DEPS_TMP_FILE "${CMAKE_BINARY_DIR}/deps_tmp_CMakeLists.txt.in")
configure_file("${CMAKE_SUPPORT_DIR}/deps_CMakeLists.txt.in" ${DEPS_TMP_FILE}) configure_file("${CMAKE_SUPPORT_DIR}/deps_CMakeLists.txt.in" ${DEPS_TMP_FILE})
## googletest ## googletest
option(BUILD_TEST "If build unit tests using googletest" ON)
if(${BUILD_TEST}) if(${BUILD_TEST})
cat("${CMAKE_SUPPORT_DIR}/gtest_CMakeLists.txt.in" ${DEPS_TMP_FILE}) cat("${CMAKE_SUPPORT_DIR}/gtest_CMakeLists.txt.in" ${DEPS_TMP_FILE})
endif(${BUILD_TEST}) endif(${BUILD_TEST})
@ -36,17 +35,21 @@ cat("${CMAKE_SUPPORT_DIR}/zlib_CMakeLists.txt.in" ${DEPS_TMP_FILE})
cat("${CMAKE_SUPPORT_DIR}/cjson_CMakeLists.txt.in" ${DEPS_TMP_FILE}) cat("${CMAKE_SUPPORT_DIR}/cjson_CMakeLists.txt.in" ${DEPS_TMP_FILE})
## leveldb ## leveldb
option(BUILD_WITH_LEVELDB "If build with leveldb" OFF)
if(${BUILD_WITH_LEVELDB}) if(${BUILD_WITH_LEVELDB})
cat("${CMAKE_SUPPORT_DIR}/leveldb_CMakeLists.txt.in" ${DEPS_TMP_FILE}) cat("${CMAKE_SUPPORT_DIR}/leveldb_CMakeLists.txt.in" ${DEPS_TMP_FILE})
endif(${BUILD_WITH_LEVELDB}) endif(${BUILD_WITH_LEVELDB})
## rocksdb ## rocksdb
option(BUILD_WITH_ROCKSDB "If build with rocksdb" OFF)
if(${BUILD_WITH_ROCKSDB}) if(${BUILD_WITH_ROCKSDB})
cat("${CMAKE_SUPPORT_DIR}/rocksdb_CMakeLists.txt.in" ${DEPS_TMP_FILE}) cat("${CMAKE_SUPPORT_DIR}/rocksdb_CMakeLists.txt.in" ${DEPS_TMP_FILE})
add_definitions(-DUSE_ROCKSDB)
endif(${BUILD_WITH_ROCKSDB}) endif(${BUILD_WITH_ROCKSDB})
## lucene
if(${BUILD_WITH_LUCENE})
cat("${CMAKE_SUPPORT_DIR}/lucene_CMakeLists.txt.in" ${DEPS_TMP_FILE})
endif(${BUILD_WITH_LUCENE})
## download dependencies ## download dependencies
configure_file(${DEPS_TMP_FILE} "${CMAKE_SOURCE_DIR}/deps/deps-download/CMakeLists.txt") configure_file(${DEPS_TMP_FILE} "${CMAKE_SOURCE_DIR}/deps/deps-download/CMakeLists.txt")
execute_process(COMMAND "${CMAKE_COMMAND}" -G "${CMAKE_GENERATOR}" . execute_process(COMMAND "${CMAKE_COMMAND}" -G "${CMAKE_GENERATOR}" .

32
cmake/cmake.options Normal file
View File

@ -0,0 +1,32 @@
# =========================================================
# Deps options
# =========================================================
option(
BUILD_TEST
"If build unit tests using googletest"
ON
)
option(
BUILD_WITH_LEVELDB
"If build with leveldb"
OFF
)
option(
BUILD_WITH_ROCKSDB
"If build with rocksdb"
OFF
)
option(
BUILD_WITH_LUCENE
"If build with lucene"
OFF
)
option(
BUILD_DEPENDENCY_TESTS
"If build dependency tests"
OFF
)

View File

@ -0,0 +1,13 @@
# lucene
ExternalProject_Add(lucene
GIT_REPOSITORY https://github.com/taosdata-contrib/LucenePlusPlus.git
GIT_TAG rel_3.0.8
SOURCE_DIR "${CMAKE_SOURCE_DIR}/deps/lucene"
BINARY_DIR ""
#BUILD_IN_SOURCE TRUE
CONFIGURE_COMMAND ""
BUILD_COMMAND ""
INSTALL_COMMAND ""
TEST_COMMAND ""
)

18
deps/CMakeLists.txt vendored
View File

@ -1,3 +1,6 @@
# ================================================================================================
# DEPENDENCIES
# ================================================================================================
# googletest # googletest
if(${BUILD_TEST}) if(${BUILD_TEST})
add_subdirectory(googletest) add_subdirectory(googletest)
@ -45,6 +48,7 @@ option(LEVELDB_BUILD_TESTS "" OFF)
endif(${BUILD_WITH_LEVELDB}) endif(${BUILD_WITH_LEVELDB})
# rocksdb # rocksdb
# To support rocksdb build on ubuntu: sudo apt-get install libgflags-dev
if(${BUILD_WITH_ROCKSDB}) if(${BUILD_WITH_ROCKSDB})
option(WITH_TESTS "" OFF) option(WITH_TESTS "" OFF)
option(WITH_BENCHMARK_TOOLS "" OFF) option(WITH_BENCHMARK_TOOLS "" OFF)
@ -55,3 +59,17 @@ if(${BUILD_WITH_ROCKSDB})
PUBLIC $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/rocksdb/include> PUBLIC $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/rocksdb/include>
) )
endif(${BUILD_WITH_ROCKSDB}) endif(${BUILD_WITH_ROCKSDB})
# lucene
# To support build on ubuntu: sudo apt-get install libboost-all-dev
if(${BUILD_WITH_LUCENE})
option(ENABLE_TEST "Enable the tests" OFF)
add_subdirectory(lucene)
endif(${BUILD_WITH_LUCENE})
# ================================================================================================
# DEPENDENCY TEST
# ================================================================================================
if(${BUILD_DEPENDENCY_TESTS})
add_subdirectory(test)
endif(${BUILD_DEPENDENCY_TESTS})

4
deps/test/CMakeLists.txt vendored Normal file
View File

@ -0,0 +1,4 @@
# rocksdb
if(${BUILD_WITH_ROCKSDB})
add_subdirectory(rocksdb)
endif(${BUILD_WITH_ROCKSDB})

6
deps/test/rocksdb/CMakeLists.txt vendored Normal file
View File

@ -0,0 +1,6 @@
add_executable(rocksdbTest "")
target_sources(rocksdbTest
PRIVATE
"${CMAKE_CURRENT_SOURCE_DIR}/main.c"
)
target_link_libraries(rocksdbTest rocksdb)

46
deps/test/rocksdb/main.c vendored Normal file
View File

@ -0,0 +1,46 @@
#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h> // sysconf() - get CPU count
#include "rocksdb/c.h"
// const char DBPath[] = "/tmp/rocksdb_c_simple_example";
const char DBPath[] = "rocksdb_c_simple_example";
const char DBBackupPath[] = "/tmp/rocksdb_c_simple_example_backup";
int main(int argc, char const *argv[]) {
rocksdb_t * db;
rocksdb_backup_engine_t *be;
rocksdb_options_t * options = rocksdb_options_create();
rocksdb_options_set_create_if_missing(options, 1);
// open DB
char *err = NULL;
db = rocksdb_open(options, DBPath, &err);
// Write
rocksdb_writeoptions_t *writeoptions = rocksdb_writeoptions_create();
rocksdb_put(db, writeoptions, "key", 3, "value", 5, &err);
// Read
rocksdb_readoptions_t *readoptions = rocksdb_readoptions_create();
rocksdb_readoptions_set_snapshot(readoptions, rocksdb_create_snapshot(db));
size_t vallen = 0;
char * val = rocksdb_get(db, readoptions, "key", 3, &vallen, &err);
printf("val:%s\n", val);
// Update
// rocksdb_put(db, writeoptions, "key", 3, "eulav", 5, &err);
// Delete
rocksdb_delete(db, writeoptions, "key", 3, &err);
// Read again
val = rocksdb_get(db, readoptions, "key", 3, &vallen, &err);
printf("val:%s\n", val);
rocksdb_close(db);
return 0;
}

View File

@ -41,6 +41,12 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_SUBMIT, "submit" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_QUERY, "query" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_QUERY, "query" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_FETCH, "fetch" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_FETCH, "fetch" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_UPDATE_TAG_VAL, "update-tag-val" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_UPDATE_TAG_VAL, "update-tag-val" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_CONNECT, "mq-connect" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_DISCONNECT, "mq-disconnect" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_CONSUME, "mq-consume" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_QUERY, "mq-query" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_ACK, "mq-ack" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_RESET, "mq-reset" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY1, "dummy1" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY1, "dummy1" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY2, "dummy2" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY2, "dummy2" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY3, "dummy3" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY3, "dummy3" )
@ -113,11 +119,11 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_NETWORK_TEST, "nettest" )
// message for topic // message for topic
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_CREATE_TP, "create-tp" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_CREATE_TP, "create-tp" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_DROP_TP, "drop-tp" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_DROP_TP, "drop-tp" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_USE_TP, "use-tp" ) //TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_USE_TP, "use-tp" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_ALTER_TP, "alter-tp" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_ALTER_TP, "alter-tp" )
#ifndef TAOS_MESSAGE_C #ifndef TAOS_MESSAGE_C
TSDB_MSG_TYPE_MAX // 105 TSDB_MSG_TYPE_MAX // 147
#endif #endif
}; };
@ -954,6 +960,40 @@ typedef struct {
char reserved2[64]; char reserved2[64];
} SStartupStep; } SStartupStep;
// mq related
typedef struct {
} SMqConnectReq;
typedef struct {
} SMqConnectRsp;
typedef struct {
} SMqDisconnectReq;
typedef struct {
} SMqDisconnectRsp;
typedef struct {
} SMqAckReq;
typedef struct {
} SMqAckRsp;
typedef struct {
} SMqResetReq;
typedef struct {
} SMqResetRsp;
//mq related end
typedef struct { typedef struct {
/* data */ /* data */
} SSubmitReq; } SSubmitReq;

View File

@ -32,13 +32,15 @@ typedef struct STkvWriteOpts STkvWriteOpts;
// DB operations // DB operations
STkvDb *tkvOpen(const STkvOpts *options, const char *path); STkvDb *tkvOpen(const STkvOpts *options, const char *path);
void tkvClose(STkvDb *db); void tkvClose(STkvDb *db);
void tkvPut(STkvDb *db, STkvWriteOpts *, char *key, size_t keylen, char *val, size_t vallen); void tkvPut(STkvDb *db, const STkvWriteOpts *, const char *key, size_t keylen, const char *val, size_t vallen);
char * tkvGet(STkvDb *db, STkvReadOpts *, char *key, size_t keylen, size_t *vallen); char * tkvGet(STkvDb *db, const STkvReadOpts *, const char *key, size_t keylen, size_t *vallen);
void tkvCommit(STkvDb *db);
// DB options // DB options
STkvOpts *tkvOptionsCreate(); STkvOpts *tkvOptsCreate();
void tkvOptionsDestroy(STkvOpts *); void tkvOptsDestroy(STkvOpts *);
void tkvOptionsSetCache(STkvOpts *, STkvCache *); void tkvOptionsSetCache(STkvOpts *, STkvCache *);
void tkvOptsSetCreateIfMissing(STkvOpts *, unsigned char);
// DB cache // DB cache
typedef enum { TKV_LRU_CACHE = 0, TKV_LFU_CACHE = 1 } ETkvCacheType; typedef enum { TKV_LRU_CACHE = 0, TKV_LFU_CACHE = 1 } ETkvCacheType;

View File

@ -24,9 +24,10 @@
extern "C" { extern "C" {
#endif #endif
typedef uint64_t tuid_t; /* ------------------------ APIs Exposed ------------------------ */
// Types exported // Types exported
typedef uint64_t tb_uid_t;
typedef struct SMeta SMeta; typedef struct SMeta SMeta;
typedef struct SMetaOpts SMetaOpts; typedef struct SMetaOpts SMetaOpts;
typedef struct SMetaQueryHandle SMetaQueryHandle; typedef struct SMetaQueryHandle SMetaQueryHandle;
@ -38,7 +39,7 @@ int metaCreate(const char *path);
void metaDestroy(const char *path); void metaDestroy(const char *path);
SMeta *metaOpen(SMetaOpts *); SMeta *metaOpen(SMetaOpts *);
void metaClose(SMeta *); void metaClose(SMeta *);
int metaCreateTable(SMeta *, STableOpts *); int metaCreateTable(SMeta *, const STableOpts *);
int metaDropTable(SMeta *, uint64_t tuid_t); int metaDropTable(SMeta *, uint64_t tuid_t);
int metaAlterTable(SMeta *, void *); int metaAlterTable(SMeta *, void *);
int metaCommit(SMeta *); int metaCommit(SMeta *);
@ -57,13 +58,38 @@ SMetaQueryOpts *metaQueryOptionsCreate();
void metaQueryOptionsDestroy(SMetaQueryOpts *); void metaQueryOptionsDestroy(SMetaQueryOpts *);
// STableOpts // STableOpts
void metaTableOptsInit(STableOpts *, int8_t type, const char *name, const STSchema *pSchema); #define META_TABLE_OPTS_DECLARE(name) STableOpts name = {0}
void metaNormalTableOptsInit(STableOpts *, const char *name, const STSchema *pSchema);
void metaSuperTableOptsInit(STableOpts *, const char *name, tb_uid_t uid, const STSchema *pSchema,
const STSchema *pTagSchema);
void metaChildTableOptsInit(STableOpts *, const char *name, tb_uid_t suid, const SKVRow tags);
void metaTableOptsClear(STableOpts *);
/* ------------------------ Impl should hidden ------------------------ */
typedef enum { META_INIT_TABLE = 0, META_SUPER_TABLE = 1, META_CHILD_TABLE = 2, META_NORMAL_TABLE = 3 } EMetaTableT;
typedef struct SSuperTableOpts {
tb_uid_t uid;
STSchema *pSchema; // (ts timestamp, a int)
STSchema *pTagSchema; // (tag1 binary(10), tag2 int)
} SSuperTableOpts;
typedef struct SChildTableOpts {
tb_uid_t suid; // super table uid
SKVRow tags; // tag value of the child table
} SChildTableOpts;
typedef struct SNormalTableOpts {
STSchema *pSchema;
} SNormalTableOpts;
/* -------------------------------- Hided implementations -------------------------------- */
struct STableOpts { struct STableOpts {
int8_t type; int8_t type;
char * name; char * name;
STSchema *pSchema; union {
SSuperTableOpts superOpts;
SChildTableOpts childOpts;
SNormalTableOpts normalOpts;
};
}; };
#ifdef __cplusplus #ifdef __cplusplus

View File

@ -22,8 +22,77 @@
extern "C" { extern "C" {
#endif #endif
typedef struct STQ STQ; typedef struct tmqMsgHead {
int32_t headLen;
int32_t msgVer;
int64_t cgId;
int64_t topicId;
int32_t checksum;
int32_t msgType;
} tmqMsgHead;
//TODO: put msgs into common
typedef struct tmqConnectReq {
tmqMsgHead head;
} tmqConnectReq;
typedef struct tmqConnectResp {
} tmqConnectResp;
typedef struct tmqDisconnectReq {
} tmqDisconnectReq;
typedef struct tmqDisconnectResp {
} tmqDiconnectResp;
typedef struct tmqConsumeReq {
} tmqConsumeReq;
typedef struct tmqConsumeResp {
} tmqConsumeResp;
typedef struct tmqSubscribeReq {
} tmqSubscribeReq;
typedef struct tmqSubscribeResp {
} tmqSubscribeResp;
typedef struct tmqHeartbeatReq {
} tmqHeartbeatReq;
typedef struct tmqHeartbeatResp {
} tmqHeartbeatResp;
typedef struct tqTopicVhandle {
//name
//
//executor for filter
//
//callback for mnode
//
} tqTopicVhandle;
typedef struct STQ {
//the set for topics
//key=topicName: str
//value=tqTopicVhandle
//a map
//key=<topic: str, cgId: int64_t>
//value=consumeOffset: int64_t
} STQ;
//init in each vnode
STQ* tqInit(void* ref_func(void*), void* unref_func(void*)); STQ* tqInit(void* ref_func(void*), void* unref_func(void*));
void tqCleanUp(STQ*); void tqCleanUp(STQ*);
@ -32,7 +101,7 @@ int tqPushMsg(STQ*, void* msg, int64_t version);
int tqCommit(STQ*); int tqCommit(STQ*);
//void* will be replace by a msg type //void* will be replace by a msg type
int tqHandleMsg(STQ*, void* msg); int tqHandleConsumeMsg(STQ*, tmqConsumeReq* msg);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -22,27 +22,28 @@
extern "C" { extern "C" {
#endif #endif
#define AMALLOC_APIS \
void *(*malloc)(void *, size_t size); \
void *(*calloc)(void *, size_t nmemb, size_t size); \
void *(*realloc)(void *, size_t size); \
void (*free)(void *ptr);
// Interfaces to implement // Interfaces to implement
typedef struct { typedef struct {
void *(*malloc)(void *, size_t size); AMALLOC_APIS
void *(*calloc)(void *, size_t nmemb, size_t size);
void (*free)(void *ptr, size_t size); // Do we need to set size in the allocated memory?
void *(*realloc)(void *ptr, size_t size);
} SMemAllocatorIf; } SMemAllocatorIf;
typedef struct { typedef struct {
void *impl; void *impl;
SMemAllocatorIf interface; AMALLOC_APIS
} SMemAllocator; } SMemAllocator;
#define amalloc(allocator, size) \ #define amalloc(allocator, size) ((allocator) ? (*((allocator)->malloc))((allocator)->impl, (size)) : malloc(size))
((allocator) ? (*((allocator)->interface.malloc))((allocator)->impl, (size)) : malloc(size))
#define acalloc(allocator, nmemb, size) \ #define acalloc(allocator, nmemb, size) \
((allocator) ? (*((allocator)->interface.calloc))((allocator)->impl, (nmemb), (size)) : calloc((nmemb), (size))) ((allocator) ? (*((allocator)->calloc))((allocator)->impl, (nmemb), (size)) : calloc((nmemb), (size)))
#define arealloc(allocator, ptr, size) \ #define arealloc(allocator, ptr, size) \
((allocator) ? (*((allocator)->interface.realloc))((allocator)->impl, (ptr), (size)) : realloc((ptr), (size))) ((allocator) ? (*((allocator)->realloc))((allocator)->impl, (ptr), (size)) : realloc((ptr), (size)))
#define afree(allocator, ptr, size) \ #define afree(allocator, ptr, size) ((allocator) ? (*((allocator)->free))((allocator)->impl, (ptr), (size)) : free(ptr))
((allocator) ? (*((allocator)->interface.free))((allocator)->impl, (ptr), (size)) : free(ptr))
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -330,7 +330,7 @@ static FORCE_INLINE void *taosDecodeVariantI64(void *buf, int64_t *value) {
} }
// ---- string // ---- string
static FORCE_INLINE int taosEncodeString(void **buf, char *value) { static FORCE_INLINE int taosEncodeString(void **buf, const char *value) {
int tlen = 0; int tlen = 0;
size_t size = strlen(value); size_t size = strlen(value);

65
include/util/theap.h Normal file
View File

@ -0,0 +1,65 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_HEAP_H
#define TDENGINE_HEAP_H
#ifdef __cplusplus
extern "C" {
#endif
#include "os.h"
struct HeapNode;
/* Return non-zero if a < b. */
typedef int (*HeapCompareFn)(const struct HeapNode* a, const struct HeapNode* b);
typedef struct HeapNode {
struct HeapNode* left;
struct HeapNode* right;
struct HeapNode* parent;
} HeapNode;
/* A binary min heap. The usual properties hold: the root is the lowest
* element in the set, the height of the tree is at most log2(nodes) and
* it's always a complete binary tree.
*
*/
typedef struct {
HeapNode* min;
size_t nelts;
HeapCompareFn compFn;
} Heap;
Heap* heapCreate(HeapCompareFn fn);
void heapDestroy(Heap *heap);
HeapNode* heapMin(const Heap* heap);
void heapInsert(Heap* heap, HeapNode* node);
void heapRemove(Heap* heap, struct HeapNode* node);
void heapDequeue(Heap* heap);
size_t heapSize(Heap *heap);
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_HASH_H

View File

@ -1,5 +1,5 @@
aux_source_directory(src TKV_SRC) aux_source_directory(src TKV_SRC)
add_library(tkv ${TKV_SRC}) add_library(tkv STATIC ${TKV_SRC})
target_include_directories( target_include_directories(
tkv tkv
PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/tkv" PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/tkv"
@ -9,3 +9,9 @@ target_link_libraries(
tkv tkv
PUBLIC os PUBLIC os
) )
if(${BUILD_WITH_ROCKSDB})
target_link_libraries(
tkv
PUBLIC rocksdb
)
endif(${BUILD_WITH_ROCKSDB})

View File

@ -0,0 +1,58 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_TKV_DEF_H_
#define _TD_TKV_DEF_H_
#ifdef USE_ROCKSDB
#include <rocksdb/c.h>
#endif
#ifdef __cplusplus
extern "C" {
#endif
struct STkvDb {
#ifdef USE_ROCKSDB
rocksdb_t *db;
#endif
};
struct STkvOpts {
#ifdef USE_ROCKSDB
rocksdb_options_t *opts;
#endif
};
struct STkvCache {
// TODO
};
struct STkvReadOpts {
#ifdef USE_ROCKSDB
rocksdb_readoptions_t *ropts;
#endif
};
struct STkvWriteOpts {
#ifdef USE_ROCKSDB
rocksdb_writeoptions_t *wopts;
#endif
};
#ifdef __cplusplus
}
#endif
#endif /*_TD_TKV_DEF_H_*/

View File

@ -13,8 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#ifndef _TD_TKV_INT_H_ #ifndef _TD_TVK_ROCKSDB_H_
#define _TD_TKV_INT_H_ #define _TD_TVK_ROCKSDB_H_
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
@ -24,4 +24,4 @@ extern "C" {
} }
#endif #endif
#endif /*_TD_TKV_INT_H_*/ #endif /*_TD_TVK_ROCKSDB_H_*/

View File

@ -14,68 +14,156 @@
*/ */
#include "tkv.h" #include "tkv.h"
#include "tkvDef.h"
struct STkvDb { static pthread_once_t isInit = PTHREAD_ONCE_INIT;
// TODO static STkvReadOpts defaultReadOpts;
}; static STkvWriteOpts defaultWriteOpts;
struct STkvOpts {
// TODO static void tkvInit();
};
struct STkvCache {
// TODO
};
struct STkvReadOpts {
// TODO
};
struct STkvWriteOpts {
// TODO
};
STkvDb *tkvOpen(const STkvOpts *options, const char *path) { STkvDb *tkvOpen(const STkvOpts *options, const char *path) {
// TODO pthread_once(&isInit, tkvInit);
STkvDb *pDb = NULL;
pDb = (STkvDb *)malloc(sizeof(*pDb));
if (pDb == NULL) {
return NULL; return NULL;
} }
void tkvClose(STkvDb *db) { #ifdef USE_ROCKSDB
// TODO char *err = NULL;
pDb->db = rocksdb_open(options->opts, path, &err);
// TODO: check err
#endif
return pDb;
} }
void tkvPut(STkvDb *db, STkvWriteOpts *pwopts, char *key, size_t keylen, char *val, size_t vallen) { void tkvClose(STkvDb *pDb) {
// TODO if (pDb) {
#ifdef USE_ROCKSDB
rocksdb_close(pDb->db);
#endif
free(pDb);
}
} }
char *tkvGet(STkvDb *db, STkvReadOpts *propts, char *key, size_t keylen, size_t *vallen) { void tkvPut(STkvDb *pDb, const STkvWriteOpts *pwopts, const char *key, size_t keylen, const char *val, size_t vallen) {
// TODO #ifdef USE_ROCKSDB
char *err = NULL;
rocksdb_put(pDb->db, pwopts ? pwopts->wopts : defaultWriteOpts.wopts, key, keylen, val, vallen, &err);
// TODO: check error
#endif
}
char *tkvGet(STkvDb *pDb, const STkvReadOpts *propts, const char *key, size_t keylen, size_t *vallen) {
char *ret = NULL;
#ifdef USE_ROCKSDB
char *err = NULL;
ret = rocksdb_get(pDb->db, propts ? propts->ropts : defaultReadOpts.ropts, key, keylen, vallen, &err);
// TODD: check error
#endif
return ret;
}
STkvOpts *tkvOptsCreate() {
STkvOpts *pOpts = NULL;
pOpts = (STkvOpts *)malloc(sizeof(*pOpts));
if (pOpts == NULL) {
return NULL; return NULL;
} }
STkvOpts *tkvOptionsCreate() { #ifdef USE_ROCKSDB
// TODO pOpts->opts = rocksdb_options_create();
return NULL; // TODO: check error
#endif
return pOpts;
} }
void tkvOptionsDestroy(STkvOpts *popts) { void tkvOptsDestroy(STkvOpts *pOpts) {
// TODO if (pOpts) {
#ifdef USE_ROCKSDB
rocksdb_options_destroy(pOpts->opts);
#endif
free(pOpts);
}
} }
void tkvOptionsSetCache(STkvOpts *popts, STkvCache *pCache) { void tkvOptionsSetCache(STkvOpts *popts, STkvCache *pCache) {
// TODO // TODO
} }
void tkvOptsSetCreateIfMissing(STkvOpts *pOpts, unsigned char c) {
#ifdef USE_ROCKSDB
rocksdb_options_set_create_if_missing(pOpts->opts, c);
#endif
}
STkvReadOpts *tkvReadOptsCreate() { STkvReadOpts *tkvReadOptsCreate() {
// TODO STkvReadOpts *pReadOpts = NULL;
pReadOpts = (STkvReadOpts *)malloc(sizeof(*pReadOpts));
if (pReadOpts == NULL) {
return NULL; return NULL;
} }
void tkvReadOptsDestroy(STkvReadOpts *propts) { #ifdef USE_ROCKSDB
// TODO pReadOpts->ropts = rocksdb_readoptions_create();
#endif
return pReadOpts;
}
void tkvReadOptsDestroy(STkvReadOpts *pReadOpts) {
if (pReadOpts) {
#ifdef USE_ROCKSDB
rocksdb_readoptions_destroy(pReadOpts->ropts);
#endif
free(pReadOpts);
}
} }
STkvWriteOpts *tkvWriteOptsCreate() { STkvWriteOpts *tkvWriteOptsCreate() {
// TODO STkvWriteOpts *pWriteOpts = NULL;
pWriteOpts = (STkvWriteOpts *)malloc(sizeof(*pWriteOpts));
if (pWriteOpts == NULL) {
return NULL; return NULL;
} }
void tkvWriteOptsDestroy(STkvWriteOpts *pwopts) { #ifdef USE_ROCKSDB
// TODO pWriteOpts->wopts = rocksdb_writeoptions_create();
#endif
return pWriteOpts;
}
void tkvWriteOptsDestroy(STkvWriteOpts *pWriteOpts) {
if (pWriteOpts) {
#ifdef USE_ROCKSDB
rocksdb_writeoptions_destroy(pWriteOpts->wopts);
#endif
free(pWriteOpts);
}
}
/* ------------------------ STATIC METHODS ------------------------ */
static void tkvInit() {
#ifdef USE_ROCKSDB
defaultReadOpts.ropts = rocksdb_readoptions_create();
defaultWriteOpts.wopts = rocksdb_writeoptions_create();
#endif
}
static void tkvClear() {
#ifdef USE_ROCKSDB
rocksdb_readoptions_destroy(defaultReadOpts.ropts);
rocksdb_writeoptions_destroy(defaultWriteOpts.wopts);
#endif
} }

View File

@ -0,0 +1,14 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

View File

@ -97,6 +97,9 @@ int32_t dnodeInitServer() {
tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_GRANT] = mnodeProcessMsg; tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_GRANT] = mnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_STATUS] = mnodeProcessMsg; tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_STATUS] = mnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_MQ_CONNECT] = vnodeProcessMsg;
/*tsTrans.peerMsgFp[TSDB_MSG_TYPE_MQ_CONSUME] = vnodeProcessRead;*/
SRpcInit rpcInit; SRpcInit rpcInit;
memset(&rpcInit, 0, sizeof(rpcInit)); memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.localPort = tsDnodeDnodePort; rpcInit.localPort = tsDnodeDnodePort;
@ -312,6 +315,8 @@ int32_t dnodeInitShell() {
tsTrans.shellMsgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessMsg; tsTrans.shellMsgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessMsg; tsTrans.shellMsgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = vnodeProcessMsg; tsTrans.shellMsgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = vnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_MQ_QUERY] = vnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_MQ_CONSUME] = vnodeProcessMsg;
// the following message shall be treated as mnode write // the following message shall be treated as mnode write
tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_CREATE_ACCT] = mnodeProcessMsg; tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_CREATE_ACCT] = mnodeProcessMsg;

View File

@ -36,6 +36,8 @@ typedef struct SReadMsg {
int32_t vnodeProcessQueryMsg(SVnode *pVnode, SReadMsg *pRead); int32_t vnodeProcessQueryMsg(SVnode *pVnode, SReadMsg *pRead);
int32_t vnodeProcessFetchMsg(SVnode *pVnode, SReadMsg *pRead); int32_t vnodeProcessFetchMsg(SVnode *pVnode, SReadMsg *pRead);
int32_t vnodeProcessConsumeMsg(SVnode *pVnode, SReadMsg *pRead);
int32_t vnodeProcessTqQueryMsg(SVnode *pVnode, SReadMsg *pRead);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -27,6 +27,12 @@ int32_t vnodeProcessDropTableReq(SVnode *pVnode, SDropTableReq *pReq, SDropTable
int32_t vnodeProcessAlterTableReq(SVnode *pVnode, SAlterTableReq *pReq, SAlterTableRsp *pRsp); int32_t vnodeProcessAlterTableReq(SVnode *pVnode, SAlterTableReq *pReq, SAlterTableRsp *pRsp);
int32_t vnodeProcessDropStableReq(SVnode *pVnode, SDropStableReq *pReq, SDropStableRsp *pRsp); int32_t vnodeProcessDropStableReq(SVnode *pVnode, SDropStableReq *pReq, SDropStableRsp *pRsp);
int32_t vnodeProcessUpdateTagValReq(SVnode *pVnode, SUpdateTagValReq *pReq, SUpdateTagValRsp *pRsp); int32_t vnodeProcessUpdateTagValReq(SVnode *pVnode, SUpdateTagValReq *pReq, SUpdateTagValRsp *pRsp);
//mq related
int32_t vnodeProcessMqConnectReq(SVnode* pVnode, SMqConnectReq *pReq, SMqConnectRsp *pRsp);
int32_t vnodeProcessMqDisconnectReq(SVnode* pVnode, SMqConnectReq *pReq, SMqConnectRsp *pRsp);
int32_t vnodeProcessMqAckReq(SVnode* pVnode, SMqAckReq *pReq, SMqAckRsp *pRsp);
int32_t vnodeProcessMqResetReq(SVnode* pVnode, SMqResetReq *pReq, SMqResetRsp *pRsp);
//mq related end
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -0,0 +1,41 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_META_DEF_H_
#define _TD_META_DEF_H_
#include "metaUid.h"
#include "tkv.h"
#ifdef __cplusplus
extern "C" {
#endif
struct SMeta {
STableUidGenerator uidGenerator;
STkvDb* tableDb; // uid->table obj
STkvDb* tbnameDb; // tbname --> uid
STkvDb* schemaDb; // uid+version --> schema
STkvDb* tagDb; // uid --> tag
STkvDb* tagIdx; // TODO: need to integrate lucene or our own
// STkvCache* metaCache; // TODO: add a global cache here
};
#ifdef __cplusplus
}
#endif
#endif /*_TD_META_DEF_H_*/

View File

@ -16,16 +16,27 @@
#ifndef _TD_META_UID_H_ #ifndef _TD_META_UID_H_
#define _TD_META_UID_H_ #define _TD_META_UID_H_
#include "os.h" #include "meta.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
typedef uint64_t tb_uid_t; /* ------------------------ APIS EXPOSED ------------------------ */
tb_uid_t metaGenerateUid(); typedef struct STableUidGenerator STableUidGenerator;
// tb_uid_t
#define IVLD_TB_UID 0 #define IVLD_TB_UID 0
tb_uid_t generateUid(STableUidGenerator *);
// STableUidGenerator
void tableUidGeneratorInit(STableUidGenerator *, tb_uid_t suid);
#define tableUidGeneratorClear(ug)
/* ------------------------ FOR TEST AND COMPILE ONLY ------------------------ */
struct STableUidGenerator {
tb_uid_t nextUid;
};
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -1,219 +0,0 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tkv.h"
#include "thash.h"
#include "tlist.h"
#include "tlockfree.h"
#include "ttypes.h"
#include "meta.h"
#include "metaUid.h"
/* -------------------- Structures -------------------- */
typedef struct STable {
tb_uid_t uid;
char * name;
tb_uid_t suid;
SArray * schema;
} STable;
typedef struct STableObj {
bool pin;
uint64_t ref;
SRWLatch latch;
uint64_t offset;
SList * ctbList; // child table list
STable * pTable;
} STableObj;
struct SMeta {
pthread_rwlock_t rwLock;
SHashObj *pTableObjHash; // uid --> STableObj
SList * stbList; // super table list
STkvDb * tbnameDb; // tbname --> uid
STkvDb * tagDb; // uid --> tag
STkvDb * schemaDb;
STkvDb * tagIdx;
size_t totalUsed;
};
static STable * metaTableNew(tb_uid_t uid, const char *name, int32_t sver);
static STableObj *metaTableObjNew();
/* -------------------- Methods -------------------- */
SMeta *metaOpen(SMetaOpts *options) {
SMeta *pMeta = NULL;
char * err = NULL;
pMeta = (SMeta *)calloc(1, sizeof(*pMeta));
if (pMeta == NULL) {
return NULL;
}
pthread_rwlock_init(&(pMeta->rwLock), NULL);
pMeta->pTableObjHash = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
pMeta->stbList = tdListNew(sizeof(STableObj *));
// Options
STkvOpts *dbOptions = tkvOptionsCreate();
taosMkDir("meta");
// Open tbname DB
pMeta->tbnameDb = tkvOpen(dbOptions, "meta/tbname_uid_db");
// Open tag DB
pMeta->tagDb = tkvOpen(dbOptions, "meta/uid_tag_db");
// Open schema DB
pMeta->schemaDb = tkvOpen(dbOptions, "meta/schema_db");
// Open tag index
pMeta->tagIdx = tkvOpen(dbOptions, "meta/tag_idx_db");
tkvOptionsDestroy(dbOptions);
return pMeta;
}
void metaClose(SMeta *pMeta) {
if (pMeta) {
tkvClose(pMeta->tagIdx);
tkvClose(pMeta->schemaDb);
tkvClose(pMeta->tagDb);
tkvClose(pMeta->tbnameDb);
tdListFree(pMeta->stbList);
taosHashCleanup(pMeta->pTableObjHash);
pthread_rwlock_destroy(&(pMeta->rwLock));
}
}
int metaCreateTable(SMeta *pMeta, STableOpts *pTableOpts) {
size_t vallen;
STkvReadOpts *ropt;
STableObj * pTableObj = NULL;
STkvWriteOpts *wopt;
// Check if table already exists
ropt = tkvReadOptsCreate();
char *uidStr = tkvGet(pMeta->tbnameDb, ropt, pTableOpts->name, strlen(pTableOpts->name), &vallen);
if (uidStr != NULL) {
// Has duplicate named table
return -1;
}
tkvReadOptsDestroy(ropt);
// Create table obj
pTableObj = metaTableObjNew();
if (pTableObj == NULL) {
// TODO
return -1;
}
// Create table object
pTableObj->pTable = metaTableNew(metaGenerateUid(), pTableOpts->name, schemaVersion(pTableOpts->pSchema));
if (pTableObj->pTable == NULL) {
// TODO
}
pthread_rwlock_rdlock(&pMeta->rwLock);
taosHashPut(pMeta->pTableObjHash, &(pTableObj->pTable->uid), sizeof(tb_uid_t), &pTableObj, sizeof(pTableObj));
wopt = tkvWriteOptsCreate();
// rocksdb_writeoptions_disable_WAL(wopt, 1);
// Add to tbname db
tkvPut(pMeta->tbnameDb, wopt, pTableOpts->name, strlen(pTableOpts->name), (char *)&pTableObj->pTable->uid,
sizeof(tb_uid_t));
// Add to schema db
char id[12];
char buf[256];
void *pBuf = buf;
*(tb_uid_t *)id = pTableObj->pTable->uid;
*(int32_t *)(id + sizeof(tb_uid_t)) = schemaVersion(pTableOpts->pSchema);
int size = tdEncodeSchema(&pBuf, pTableOpts->pSchema);
tkvPut(pMeta->schemaDb, wopt, id, 12, buf, size);
tkvWriteOptsDestroy(wopt);
pthread_rwlock_unlock(&pMeta->rwLock);
return 0;
}
void metaDestroy(const char *path) { taosRemoveDir(path); }
int metaCommit(SMeta *meta) { return 0; }
void metaTableOptsInit(STableOpts *pTableOpts, int8_t type, const char *name, const STSchema *pSchema) {
pTableOpts->type = type;
pTableOpts->name = strdup(name);
pTableOpts->pSchema = tdDupSchema(pSchema);
}
/* -------------------- Static Methods -------------------- */
static STable *metaTableNew(tb_uid_t uid, const char *name, int32_t sver) {
STable *pTable = NULL;
pTable = (STable *)malloc(sizeof(*pTable));
if (pTable == NULL) {
// TODO
return NULL;
}
pTable->schema = taosArrayInit(0, sizeof(int32_t));
if (pTable->schema == NULL) {
// TODO
return NULL;
}
pTable->uid = uid;
pTable->name = strdup(name);
pTable->suid = IVLD_TB_UID;
taosArrayPush(pTable->schema, &sver);
return pTable;
}
static STableObj *metaTableObjNew() {
STableObj *pTableObj = NULL;
pTableObj = (STableObj *)malloc(sizeof(*pTableObj));
if (pTableObj == NULL) {
return NULL;
}
pTableObj->pin = true;
pTableObj->ref = 1;
taosInitRWLatch(&(pTableObj->latch));
pTableObj->offset = UINT64_MAX;
pTableObj->ctbList = NULL;
pTableObj->pTable = NULL;
return pTableObj;
}

View File

@ -0,0 +1,259 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "meta.h"
#include "metaDef.h"
#include "tcoding.h"
static int metaCreateSuperTable(SMeta *pMeta, const char *tbname, const SSuperTableOpts *pSuperTableOpts);
static int metaCreateChildTable(SMeta *pMeta, const char *tbname, const SChildTableOpts *pChildTableOpts);
static int metaCreateNormalTable(SMeta *pMeta, const char *tbname, const SNormalTableOpts *pNormalTableOpts);
SMeta *metaOpen(SMetaOpts *pMetaOpts) {
SMeta *pMeta = NULL;
pMeta = (SMeta *)calloc(1, sizeof(*pMeta));
if (pMeta == NULL) {
return NULL;
}
// TODO: check if file exists and handle the error
taosMkDir("meta");
// Open tableDb
STkvOpts *tableDbOpts = tkvOptsCreate();
tkvOptsSetCreateIfMissing(tableDbOpts, 1);
pMeta->tableDb = tkvOpen(tableDbOpts, "meta/table_db");
tkvOptsDestroy(tableDbOpts);
// Open tbnameDb
STkvOpts *tbnameDbOpts = tkvOptsCreate();
tkvOptsSetCreateIfMissing(tbnameDbOpts, 1);
pMeta->tbnameDb = tkvOpen(tbnameDbOpts, "meta/tbname_db");
tkvOptsDestroy(tbnameDbOpts);
// Open schemaDb
STkvOpts *schemaDbOpts = tkvOptsCreate();
tkvOptsSetCreateIfMissing(schemaDbOpts, 1);
pMeta->schemaDb = tkvOpen(schemaDbOpts, "meta/schema_db");
tkvOptsDestroy(schemaDbOpts);
// Open tagDb
STkvOpts *tagDbOpts = tkvOptsCreate();
tkvOptsSetCreateIfMissing(tagDbOpts, 1);
pMeta->tagDb = tkvOpen(tagDbOpts, "meta/tag_db");
tkvOptsDestroy(tagDbOpts);
// Open tagIdx
STkvOpts *tagIdxDbOpts = tkvOptsCreate();
tkvOptsSetCreateIfMissing(tagIdxDbOpts, 1);
pMeta->tagIdx = tkvOpen(tagIdxDbOpts, "meta/tag_idx_db");
tkvOptsDestroy(tagIdxDbOpts);
// TODO: need to figure out how to persist the START UID
tableUidGeneratorInit(&(pMeta->uidGenerator), IVLD_TB_UID);
}
void metaClose(SMeta *pMeta) {
if (pMeta) {
tableUidGeneratorClear(&pMeta->uidGenerator);
tkvClose(pMeta->tagIdx);
tkvClose(pMeta->tagDb);
tkvClose(pMeta->schemaDb);
tkvClose(pMeta->tbnameDb);
tkvClose(pMeta->tableDb);
free(pMeta);
}
}
int metaCreateTable(SMeta *pMeta, const STableOpts *pTableOpts) {
size_t vallen;
char * pUid;
// Check if table already exists
pUid = tkvGet(pMeta->tbnameDb, NULL, pTableOpts->name, strlen(pTableOpts->name), &vallen);
if (pUid) {
free(pUid);
// Table already exists, return error code
return -1;
}
switch (pTableOpts->type) {
case META_SUPER_TABLE:
return metaCreateSuperTable(pMeta, pTableOpts->name, &(pTableOpts->superOpts));
case META_CHILD_TABLE:
return metaCreateChildTable(pMeta, pTableOpts->name, &(pTableOpts->childOpts));
case META_NORMAL_TABLE:
return metaCreateNormalTable(pMeta, pTableOpts->name, &(pTableOpts->normalOpts));
default:
ASSERT(0);
}
return 0;
}
/* ------------------------ STATIC METHODS ------------------------ */
static int metaCreateSuperTable(SMeta *pMeta, const char *tbname, const SSuperTableOpts *pSuperTableOpts) {
size_t vallen;
size_t keylen;
char * pVal = NULL;
char schemaKey[sizeof(tb_uid_t) * 2];
char buffer[1024]; /* TODO */
void * pBuf = NULL;
pVal = tkvGet(pMeta->tableDb, NULL, (char *)(&(pSuperTableOpts->uid)), sizeof(pSuperTableOpts->uid), &vallen);
if (pVal) {
free(pVal);
// TODO: table with same uid exists, just return error
return -1;
}
// Put uid -> tbObj
vallen = 0;
pBuf = (void *)buffer;
vallen += taosEncodeString(&pBuf, tbname); // ENCODE TABLE NAME
vallen += taosEncodeFixedI32(&pBuf, 1); // ENCODE SCHEMA, SCHEMA HERE IS AN ARRAY OF VERSIONS
vallen += taosEncodeFixedI32(&pBuf, schemaVersion(pSuperTableOpts->pSchema));
vallen += tdEncodeSchema(&pBuf, pSuperTableOpts->pTagSchema); // ENCODE TAG SCHEMA
tkvPut(pMeta->tableDb, NULL, (char *)(&(pSuperTableOpts->uid)), sizeof(pSuperTableOpts->uid), buffer, vallen);
// Put tbname -> uid
tkvPut(pMeta->tbnameDb, NULL, tbname, strlen(tbname), (char *)(&(pSuperTableOpts->uid)),
sizeof(pSuperTableOpts->uid));
// Put uid+sversion -> schema
*(tb_uid_t *)schemaKey = pSuperTableOpts->uid;
*(int32_t *)(POINTER_SHIFT(schemaKey, sizeof(tb_uid_t))) = schemaVersion(pSuperTableOpts->pSchema);
keylen = sizeof(tb_uid_t) + sizeof(int32_t);
pBuf = (void *)buffer;
vallen = tdEncodeSchema(&pBuf, pSuperTableOpts->pSchema);
tkvPut(pMeta->schemaDb, NULL, schemaKey, keylen, buffer, vallen);
return 0;
}
static int metaCreateChildTable(SMeta *pMeta, const char *tbname, const SChildTableOpts *pChildTableOpts) {
size_t vallen;
char buffer[1024]; /* TODO */
void * pBuf = NULL;
char * pTable;
tb_uid_t uid;
// Check if super table exists
pTable = tkvGet(pMeta->tableDb, NULL, (char *)(&(pChildTableOpts->suid)), sizeof(pChildTableOpts->suid), &vallen);
if (pTable == NULL) {
// Super table not exists, just return error
return -1;
}
// Generate a uid to the new table
uid = generateUid(&pMeta->uidGenerator);
// Put uid -> tbObj
vallen = 0;
pBuf = (void *)buffer;
vallen += taosEncodeString(&pBuf, tbname);
vallen += taosEncodeFixedU64(&pBuf, pChildTableOpts->suid);
tkvPut(pMeta->tableDb, NULL, (char *)(&uid), sizeof(uid), buffer, vallen);
// Put tbname -> uid
tkvPut(pMeta->tbnameDb, NULL, tbname, strlen(tbname), (char *)(&uid), sizeof(uid));
// Put uid-> tags
tkvPut(pMeta->tagDb, NULL, (char *)(&uid), sizeof(uid), (char *)pChildTableOpts->tags,
(size_t)kvRowLen(pChildTableOpts->tags));
// TODO: Put tagIdx
return 0;
}
static int metaCreateNormalTable(SMeta *pMeta, const char *tbname, const SNormalTableOpts *pNormalTableOpts) {
size_t vallen;
char keyBuf[sizeof(tb_uid_t) + sizeof(int32_t)];
char buffer[1024]; /* TODO */
void * pBuf = NULL;
tb_uid_t uid;
// Generate a uid to the new table
uid = generateUid(&pMeta->uidGenerator);
// Put uid -> tbObj
vallen = 0;
pBuf = (void *)buffer;
vallen += taosEncodeString(&pBuf, tbname);
vallen += taosEncodeFixedI32(&pBuf, 1);
vallen += taosEncodeFixedI32(&pBuf, schemaVersion(pNormalTableOpts->pSchema));
tkvPut(pMeta->tableDb, NULL, (char *)(&uid), sizeof(uid), buffer, vallen);
// Put tbname -> uid
tkvPut(pMeta->tbnameDb, NULL, tbname, strlen(tbname), (char *)(&(uid)), sizeof(uid));
// Put uid+sversion -> schema
vallen = 0;
pBuf = (void *)buffer;
vallen += tdEncodeSchema(&pBuf, pNormalTableOpts->pSchema);
tkvPut(pMeta->schemaDb, NULL, keyBuf, sizeof(tb_uid_t) + sizeof(int32_t), buffer, vallen);
return 0;
}
void metaNormalTableOptsInit(STableOpts *pTableOpts, const char *name, const STSchema *pSchema) {
pTableOpts->type = META_NORMAL_TABLE;
pTableOpts->name = strdup(name);
pTableOpts->normalOpts.pSchema = tdDupSchema(pSchema);
}
void metaSuperTableOptsInit(STableOpts *pTableOpts, const char *name, tb_uid_t uid, const STSchema *pSchema,
const STSchema *pTagSchema) {
pTableOpts->type = META_SUPER_TABLE;
pTableOpts->name = strdup(name);
pTableOpts->superOpts.uid = uid;
pTableOpts->superOpts.pSchema = tdDupSchema(pSchema);
pTableOpts->superOpts.pTagSchema = tdDupSchema(pTagSchema);
}
void metaChildTableOptsInit(STableOpts *pTableOpts, const char *name, tb_uid_t suid, const SKVRow tags) {
pTableOpts->type = META_CHILD_TABLE;
pTableOpts->name = strdup(name);
pTableOpts->childOpts.suid = suid;
pTableOpts->childOpts.tags = tdKVRowDup(tags);
}
void metaTableOptsClear(STableOpts *pTableOpts) {
switch (pTableOpts->type) {
case META_NORMAL_TABLE:
tfree(pTableOpts->name);
tdFreeSchema(pTableOpts->normalOpts.pSchema);
break;
case META_SUPER_TABLE:
tdFreeSchema(pTableOpts->superOpts.pTagSchema);
tdFreeSchema(pTableOpts->superOpts.pSchema);
tfree(pTableOpts->name);
break;
case META_CHILD_TABLE:
kvRowFree(pTableOpts->childOpts.tags);
tfree(pTableOpts->name);
break;
default:
break;
}
memset(pTableOpts, 0, sizeof(*pTableOpts));
}
void metaDestroy(const char *path) { taosRemoveDir(path); }

View File

@ -15,9 +15,12 @@
#include "metaUid.h" #include "metaUid.h"
static tb_uid_t nuid = IVLD_TB_UID; tb_uid_t generateUid(STableUidGenerator *pGen) {
// Generate a new table UID
tb_uid_t metaGenerateUid() { return ++(pGen->nextUid);
// TODO: need a more complex UID generator }
return ++nuid;
void tableUidGeneratorInit(STableUidGenerator *pGen, tb_uid_t suid) {
// Init a generator
pGen->nextUid = suid;
} }

View File

@ -1,7 +1,7 @@
add_executable(metaTest "") add_executable(metaTest "")
target_sources(metaTest target_sources(metaTest
PRIVATE PRIVATE
"../src/meta.c" "../src/metaMain.c"
"../src/metaUid.c" "../src/metaUid.c"
"metaTests.cpp" "metaTests.cpp"
) )

View File

@ -4,29 +4,95 @@
#include "meta.h" #include "meta.h"
TEST(MetaTest, meta_open_test) { static STSchema *metaGetSimpleSchema() {
STSchema * pSchema = NULL;
STSchemaBuilder sb = {0};
tdInitTSchemaBuilder(&sb, 0);
tdAddColToSchema(&sb, TSDB_DATA_TYPE_TIMESTAMP, 0, 8);
tdAddColToSchema(&sb, TSDB_DATA_TYPE_INT, 1, 4);
pSchema = tdGetSchemaFromBuilder(&sb);
tdDestroyTSchemaBuilder(&sb);
return pSchema;
}
static SKVRow metaGetSimpleTags() {
SKVRowBuilder kvrb = {0};
SKVRow row;
tdInitKVRowBuilder(&kvrb);
int64_t ts = 1634287978000;
int32_t a = 10;
tdAddColToKVRow(&kvrb, 0, TSDB_DATA_TYPE_TIMESTAMP, (void *)(&ts));
tdAddColToKVRow(&kvrb, 0, TSDB_DATA_TYPE_INT, (void *)(&a));
row = tdGetKVRowFromBuilder(&kvrb);
tdDestroyKVRowBuilder(&kvrb);
return row;
}
TEST(MetaTest, DISABLED_meta_create_1m_normal_tables_test) {
// Open Meta // Open Meta
SMeta *meta = metaOpen(NULL); SMeta *meta = metaOpen(NULL);
std::cout << "Meta is opened!" << std::endl; std::cout << "Meta is opened!" << std::endl;
// Create tables // Create 1000000 normal tables
STableOpts tbOpts; META_TABLE_OPTS_DECLARE(tbOpts);
STSchema *pSchema = metaGetSimpleSchema();
char tbname[128]; char tbname[128];
STSchema * pSchema;
STSchemaBuilder sb;
tdInitTSchemaBuilder(&sb, 0);
for (size_t i = 0; i < 10; i++) {
tdAddColToSchema(&sb, TSDB_DATA_TYPE_TIMESTAMP, i, 8);
}
pSchema = tdGetSchemaFromBuilder(&sb);
tdDestroyTSchemaBuilder(&sb);
for (size_t i = 0; i < 1000000; i++) {
sprintf(tbname, "tb%ld", i);
metaTableOptsInit(&tbOpts, 0, tbname, pSchema);
for (size_t i = 0; i < 1000000; i++) {
sprintf(tbname, "ntb%ld", i);
metaNormalTableOptsInit(&tbOpts, tbname, pSchema);
metaCreateTable(meta, &tbOpts); metaCreateTable(meta, &tbOpts);
metaTableOptsClear(&tbOpts);
} }
tdFreeSchema(pSchema);
// Close Meta
metaClose(meta);
std::cout << "Meta is closed!" << std::endl;
// Destroy Meta
metaDestroy("meta");
std::cout << "Meta is destroyed!" << std::endl;
}
TEST(MetaTest, meta_create_1m_child_tables_test) {
// Open Meta
SMeta *meta = metaOpen(NULL);
std::cout << "Meta is opened!" << std::endl;
// Create a super tables
tb_uid_t uid = 477529885843758ul;
META_TABLE_OPTS_DECLARE(tbOpts);
STSchema *pSchema = metaGetSimpleSchema();
STSchema *pTagSchema = metaGetSimpleSchema();
metaSuperTableOptsInit(&tbOpts, "st", uid, pSchema, pTagSchema);
metaCreateTable(meta, &tbOpts);
metaTableOptsClear(&tbOpts);
tdFreeSchema(pSchema);
tdFreeSchema(pTagSchema);
// Create 1000000 child tables
char name[128];
SKVRow row = metaGetSimpleTags();
for (size_t i = 0; i < 1000000; i++) {
sprintf(name, "ctb%ld", i);
metaChildTableOptsInit(&tbOpts, name, uid, row);
metaCreateTable(meta, &tbOpts);
metaTableOptsClear(&tbOpts);
}
kvRowFree(row);
// Close Meta // Close Meta
metaClose(meta); metaClose(meta);
std::cout << "Meta is closed!" << std::endl; std::cout << "Meta is closed!" << std::endl;

View File

@ -792,8 +792,18 @@ static void vnodeInitMsgFp() {
tsVmain.msgFp[TSDB_MSG_TYPE_MD_DROP_STABLE] = vnodeProcessWriteMsg; tsVmain.msgFp[TSDB_MSG_TYPE_MD_DROP_STABLE] = vnodeProcessWriteMsg;
tsVmain.msgFp[TSDB_MSG_TYPE_SUBMIT] = vnodeProcessWriteMsg; tsVmain.msgFp[TSDB_MSG_TYPE_SUBMIT] = vnodeProcessWriteMsg;
tsVmain.msgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = vnodeProcessWriteMsg; tsVmain.msgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = vnodeProcessWriteMsg;
//mq related
tsVmain.msgFp[TSDB_MSG_TYPE_MQ_CONNECT] = vnodeProcessWriteMsg;
tsVmain.msgFp[TSDB_MSG_TYPE_MQ_DISCONNECT] = vnodeProcessWriteMsg;
tsVmain.msgFp[TSDB_MSG_TYPE_MQ_ACK] = vnodeProcessWriteMsg;
tsVmain.msgFp[TSDB_MSG_TYPE_MQ_RESET] = vnodeProcessWriteMsg;
//mq related end
tsVmain.msgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessReadMsg; tsVmain.msgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessReadMsg;
tsVmain.msgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessReadMsg; tsVmain.msgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessReadMsg;
//mq related
tsVmain.msgFp[TSDB_MSG_TYPE_MQ_QUERY] = vnodeProcessReadMsg;
tsVmain.msgFp[TSDB_MSG_TYPE_MQ_CONSUME] = vnodeProcessReadMsg;
//mq related end
} }
void vnodeProcessMsg(SRpcMsg *pMsg) { void vnodeProcessMsg(SRpcMsg *pMsg) {

View File

@ -141,6 +141,9 @@ void vnodeProcessReadMsg(SRpcMsg *pMsg) {
static void vnodeInitReadMsgFp() { static void vnodeInitReadMsgFp() {
tsVread.msgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessQueryMsg; tsVread.msgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessQueryMsg;
tsVread.msgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessFetchMsg; tsVread.msgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessFetchMsg;
tsVread.msgFp[TSDB_MSG_TYPE_MQ_QUERY] = vnodeProcessTqQueryMsg;
tsVread.msgFp[TSDB_MSG_TYPE_MQ_CONSUME] = vnodeProcessConsumeMsg;
} }
static int32_t vnodeProcessReadStart(SVnode *pVnode, SReadMsg *pRead, int32_t qtype) { static int32_t vnodeProcessReadStart(SVnode *pVnode, SReadMsg *pRead, int32_t qtype) {

View File

@ -217,6 +217,15 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SReadMsg *pRead) {
return 0; return 0;
} }
//mq related
int32_t vnodeProcessConsumeMsg(SVnode *pVnode, SReadMsg *pRead){
return 0;
}
int32_t vnodeProcessTqQueryMsg(SVnode *pVnode, SReadMsg *pRead) {
return 0;
}
//mq related end
int32_t vnodeProcessFetchMsg(SVnode *pVnode, SReadMsg *pRead) { int32_t vnodeProcessFetchMsg(SVnode *pVnode, SReadMsg *pRead) {
#if 0 #if 0
void * pCont = pRead->pCont; void * pCont = pRead->pCont;

View File

@ -179,6 +179,20 @@ static bool vnodeProcessWriteStart(SVnode *pVnode, SVnWriteMsg *pWrite, int32_t
case TSDB_MSG_TYPE_UPDATE_TAG_VAL: case TSDB_MSG_TYPE_UPDATE_TAG_VAL:
pWrite->code = vnodeProcessUpdateTagValReq(pVnode, (void*)pHead->cont, NULL); pWrite->code = vnodeProcessUpdateTagValReq(pVnode, (void*)pHead->cont, NULL);
break; break;
//mq related
case TSDB_MSG_TYPE_MQ_CONNECT:
pWrite->code = vnodeProcessMqConnectReq(pVnode, (void*)pHead->cont, NULL);
break;
case TSDB_MSG_TYPE_MQ_DISCONNECT:
pWrite->code = vnodeProcessMqDisconnectReq(pVnode, (void*)pHead->cont, NULL);
break;
case TSDB_MSG_TYPE_MQ_ACK:
pWrite->code = vnodeProcessMqAckReq(pVnode, (void*)pHead->cont, NULL);
break;
case TSDB_MSG_TYPE_MQ_RESET:
pWrite->code = vnodeProcessMqResetReq(pVnode, (void*)pHead->cont, NULL);
break;
//mq related end
default: default:
pWrite->code = TSDB_CODE_VND_MSG_NOT_PROCESSED; pWrite->code = TSDB_CODE_VND_MSG_NOT_PROCESSED;
break; break;
@ -186,7 +200,7 @@ static bool vnodeProcessWriteStart(SVnode *pVnode, SVnWriteMsg *pWrite, int32_t
if (pWrite->code < 0) return false; if (pWrite->code < 0) return false;
// update fync // update fsync
return (pWrite->code == 0 && msgType != TSDB_MSG_TYPE_SUBMIT); return (pWrite->code == 0 && msgType != TSDB_MSG_TYPE_SUBMIT);
} }

View File

@ -77,3 +77,18 @@ int32_t vnodeProcessUpdateTagValReq(SVnode *pVnode, SUpdateTagValReq *pReq, SUpd
// TODO // TODO
return 0; return 0;
} }
//mq related
int32_t vnodeProcessMqConnectReq(SVnode* pVnode, SMqConnectReq *pReq, SMqConnectRsp *pRsp){
return 0;
}
int32_t vnodeProcessMqDisconnectReq(SVnode* pVnode, SMqConnectReq *pReq, SMqConnectRsp *pRsp) {
return 0;
}
int32_t vnodeProcessMqAckReq(SVnode* pVnode, SMqAckReq *pReq, SMqAckRsp *pRsp) {
return 0;
}
int32_t vnodeProcessMqResetReq(SVnode* pVnode, SMqResetReq *pReq, SMqResetRsp *pRsp) {
return 0;
}
//mq related end

View File

@ -18,23 +18,39 @@
#include "tq.h" #include "tq.h"
#define TQ_BUFFER_SIZE 8
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
//implement the array index typedef struct tqBufferItem {
//implement the ring buffer int64_t offset;
void* executor;
void* content;
} tqBufferItem;
typedef struct tqGroupHandle {
char* topic; //c style, end with '\0'
int64_t cgId;
void* ahandle;
int64_t consumeOffset;
int32_t head;
int32_t tail;
tqBufferItem buffer[TQ_BUFFER_SIZE];
} tqGroupHandle;
//create persistent storage for meta info such as consuming offset //create persistent storage for meta info such as consuming offset
//return value > 0: cgId //return value > 0: cgId
//return value <= 0: error code //return value <= 0: error code
int tqCreateGroup(STQ*); int tqCreateTCGroup(STQ*, const char* topic, int cgId, tqGroupHandle** handle);
//create ring buffer in memory and load consuming offset //create ring buffer in memory and load consuming offset
int tqOpenGroup(STQ*, int cgId); int tqOpenTCGroup(STQ*, const char* topic, int cgId);
//destroy ring buffer and persist consuming offset //destroy ring buffer and persist consuming offset
int tqCloseGroup(STQ*, int cgId); int tqCloseTCGroup(STQ*, const char* topic, int cgId);
//delete persistent storage for meta info //delete persistent storage for meta info
int tqDropGroup(STQ*, int cgId); int tqDropTCGroup(STQ*, const char* topic, int cgId);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -22,12 +22,65 @@
// //
//handle management message //handle management message
static tqGroupHandle* tqLookupGroupHandle(STQ *pTq, const char* topic, int cgId) {
//look in memory
//
//not found, try to restore from disk
//
//still not found
return NULL;
}
static int tqCommitTCGroup(tqGroupHandle* handle) {
//persist into disk
return 0;
}
int tqCreateTCGroup(STQ *pTq, const char* topic, int cgId, tqGroupHandle** handle) {
return 0;
}
int tqOpenTGroup(STQ* pTq, const char* topic, int cgId) {
int code;
tqGroupHandle* handle = tqLookupGroupHandle(pTq, topic, cgId);
if(handle == NULL) {
code = tqCreateTCGroup(pTq, topic, cgId, &handle);
if(code != 0) {
return code;
}
}
ASSERT(handle != NULL);
//put into STQ
return 0;
}
int tqCloseTCGroup(STQ* pTq, const char* topic, int cgId) {
tqGroupHandle* handle = tqLookupGroupHandle(pTq, topic, cgId);
return tqCommitTCGroup(handle);
}
int tqDropTCGroup(STQ* pTq, const char* topic, int cgId) {
//delete from disk
return 0;
}
int tqPushMsg(STQ* pTq , void* p, int64_t version) { int tqPushMsg(STQ* pTq , void* p, int64_t version) {
//add reference //add reference
// //judge and launch new query
return 0; return 0;
} }
int tqCommit(STQ* pTq) { int tqCommit(STQ* pTq) {
//do nothing
return 0;
}
int tqHandleConsumeMsg(STQ* pTq, tmqConsumeReq* msg) {
//parse msg and extract topic and cgId
//lookup handle
//confirm message and send to consumer
//judge and launch new query
return 0; return 0;
} }

View File

@ -8,5 +8,7 @@ target_include_directories(
target_link_libraries( target_link_libraries(
tsdb tsdb
PUBLIC os PUBLIC os
PRIVATE common PUBLIC util
PUBLIC common
PUBLIC tkv
) )

View File

@ -0,0 +1,46 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_TSDB_MEMTABLE_H_
#define _TD_TSDB_MEMTABLE_H_
#include "tdef.h"
#include "thash.h"
#include "amalloc.h"
#ifdef __cplusplus
extern "C" {
#endif
typedef struct STsdbMemTable STsdbMemTable;
STsdbMemTable *tsdbMemTableCreate(SMemAllocator *);
void tsdbMemTableDestroy(STsdbMemTable *);
int tsdbMemTableWriteBatch(STsdbMemTable *pTsdbMemTable, void *batch);
/* --------------------- For compile and test only --------------------- */
struct STsdbMemTable {
TSKEY minKey;
TSKEY maxKey;
SHashObj * tData; // uid --> SSkipList
SMemAllocator *ma;
T_REF_DECLARE()
};
#ifdef __cplusplus
}
#endif
#endif /*_TD_TSDB_MEMTABLE_H_*/

View File

@ -0,0 +1,34 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_TSDB_WRITE_BATCH_H_
#define _TD_TSDB_WRITE_BATCH_H_
#ifdef __cplusplus
extern "C" {
#endif
typedef struct STsdbWriteBatch STsdbWriteBatch;
/* ------------------------- ------------------------- */
struct STsdbWriteBatch {
// TODO
};
#ifdef __cplusplus
}
#endif
#endif /*_TD_TSDB_WRITE_BATCH_H_*/

View File

@ -14,6 +14,16 @@
*/ */
#include "tsdb.h" #include "tsdb.h"
#include "tkv.h"
#include "tsdbMemTable.h"
/* -------------- -------------- */
struct STsdb {
STkvDb *tsdb; // original time-series data
STkvDb *lrowdb; // last row cache
STkvDb *lastdb; // last cache
STkvDb *fivemindb;
};
int tsdbInsert(STsdb *tsdb, SSubmitReq *pReq, SSubmitRsp *pRsp) { return 0; } int tsdbInsert(STsdb *tsdb, SSubmitReq *pReq, SSubmitRsp *pRsp) { return 0; }
int tsdbCommit(STsdb *pTsdb) { return 0; } int tsdbCommit(STsdb *pTsdb) { return 0; }

View File

@ -0,0 +1,49 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tsdbMemTable.h"
STsdbMemTable *tsdbMemTableCreate(SMemAllocator *ma) {
STsdbMemTable *pTsdbMemTable = NULL;
pTsdbMemTable = (STsdbMemTable *)malloc(sizeof(*pTsdbMemTable));
if (pTsdbMemTable == NULL) {
return NULL;
}
// TODO
pTsdbMemTable->minKey = TSKEY_INITIAL_VAL;
pTsdbMemTable->maxKey = TSKEY_INITIAL_VAL;
pTsdbMemTable->ma = ma;
pTsdbMemTable->tData = taosHashInit(1024, taosIntHash_64, true /* TODO */, HASH_NO_LOCK);
if (pTsdbMemTable->tData == NULL) {
// TODO
}
return pTsdbMemTable;
}
void tsdbMemTableDestroy(STsdbMemTable *pTsdbMemTable) {
if (pTsdbMemTable) {
// TODO
free(pTsdbMemTable);
}
}
int tsdbMemTableWriteBatch(STsdbMemTable *pTsdbMemTable, void *batch) {
// TODO
return 0;
}

View File

@ -12,20 +12,3 @@
* You should have received a copy of the GNU Affero General Public License * You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#ifndef _TD_META_INT_H_
#define _TD_META_INT_H_
#ifdef __cplusplus
extern "C" {
#endif
struct {
tkv_db_t db;
} SMeta;
#ifdef __cplusplus
}
#endif
#endif /*_TD_META_INT_H_*/

206
source/util/src/theap.c Normal file
View File

@ -0,0 +1,206 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "theap.h"
size_t heapSize(Heap* heap) {
return heap->nelts;
}
Heap* heapCreate(HeapCompareFn fn) {
Heap *heap = calloc(1, sizeof(Heap));
if (heap == NULL) { return NULL; }
heap->min = NULL;
heap->nelts = 0;
heap->compFn = fn;
return heap;
}
void heapDestroy(Heap *heap) {
free(heap);
}
HeapNode* heapMin(const Heap* heap) {
return heap->min;
}
/* Swap parent with child. Child moves closer to the root, parent moves away. */
static void heapNodeSwap(Heap* heap, HeapNode* parent, HeapNode* child) {
HeapNode* sibling;
HeapNode t;
t = *parent;
*parent = *child;
*child = t;
parent->parent = child;
if (child->left == child) {
child->left = parent;
sibling = child->right;
} else {
child->right = parent;
sibling = child->left;
}
if (sibling != NULL)
sibling->parent = child;
if (parent->left != NULL)
parent->left->parent = parent;
if (parent->right != NULL)
parent->right->parent = parent;
if (child->parent == NULL)
heap->min = child;
else if (child->parent->left == parent)
child->parent->left = child;
else
child->parent->right = child;
}
void heapInsert(Heap* heap, HeapNode* newnode) {
HeapNode** parent;
HeapNode** child;
unsigned int path;
unsigned int n;
unsigned int k;
newnode->left = NULL;
newnode->right = NULL;
newnode->parent = NULL;
/* Calculate the path from the root to the insertion point. This is a min
* heap so we always insert at the left-most free node of the bottom row.
*/
path = 0;
for (k = 0, n = 1 + heap->nelts; n >= 2; k += 1, n /= 2)
path = (path << 1) | (n & 1);
/* Now traverse the heap using the path we calculated in the previous step. */
parent = child = &heap->min;
while (k > 0) {
parent = child;
if (path & 1)
child = &(*child)->right;
else
child = &(*child)->left;
path >>= 1;
k -= 1;
}
/* Insert the new node. */
newnode->parent = *parent;
*child = newnode;
heap->nelts += 1;
/* Walk up the tree and check at each node if the heap property holds.
* It's a min heap so parent < child must be true.
*/
while (newnode->parent != NULL && (heap->compFn)(newnode, newnode->parent))
heapNodeSwap(heap, newnode->parent, newnode);
}
void heapRemove(Heap* heap, HeapNode* node) {
HeapNode* smallest;
HeapNode** max;
HeapNode* child;
unsigned int path;
unsigned int k;
unsigned int n;
if (heap->nelts == 0)
return;
/* Calculate the path from the min (the root) to the max, the left-most node
* of the bottom row.
*/
path = 0;
for (k = 0, n = heap->nelts; n >= 2; k += 1, n /= 2)
path = (path << 1) | (n & 1);
/* Now traverse the heap using the path we calculated in the previous step. */
max = &heap->min;
while (k > 0) {
if (path & 1)
max = &(*max)->right;
else
max = &(*max)->left;
path >>= 1;
k -= 1;
}
heap->nelts -= 1;
/* Unlink the max node. */
child = *max;
*max = NULL;
if (child == node) {
/* We're removing either the max or the last node in the tree. */
if (child == heap->min) {
heap->min = NULL;
}
return;
}
/* Replace the to be deleted node with the max node. */
child->left = node->left;
child->right = node->right;
child->parent = node->parent;
if (child->left != NULL) {
child->left->parent = child;
}
if (child->right != NULL) {
child->right->parent = child;
}
if (node->parent == NULL) {
heap->min = child;
} else if (node->parent->left == node) {
node->parent->left = child;
} else {
node->parent->right = child;
}
/* Walk down the subtree and check at each node if the heap property holds.
* It's a min heap so parent < child must be true. If the parent is bigger,
* swap it with the smallest child.
*/
for (;;) {
smallest = child;
if (child->left != NULL && (heap->compFn)(child->left, smallest))
smallest = child->left;
if (child->right != NULL && (heap->compFn)(child->right, smallest))
smallest = child->right;
if (smallest == child)
break;
heapNodeSwap(heap, child, smallest);
}
/* Walk up the subtree and check that each parent is less than the node
* this is required, because `max` node is not guaranteed to be the
* actual maximum in tree
*/
while (child->parent != NULL && (heap->compFn)(child, child->parent))
heapNodeSwap(heap, child->parent, child);
}
void heapDequeue(Heap* heap) {
heapRemove(heap, heap->min);
}