diff --git a/CMakeLists.txt b/CMakeLists.txt index 46f0517fdc..a9450220a7 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -45,6 +45,16 @@ if(${BUILD_WITH_ROCKSDB}) add_definitions(-DUSE_ROCKSDB) endif(${BUILD_WITH_ROCKSDB}) +## bdb +if(${BUILD_WITH_BDB}) + cat("${CMAKE_SUPPORT_DIR}/bdb_CMakeLists.txt.in" ${DEPS_TMP_FILE}) +endif(${BUILD_WITH_DBD}) + +## sqlite +if(${BUILD_WITH_SQLITE}) + cat("${CMAKE_SUPPORT_DIR}/sqlite_CMakeLists.txt.in" ${DEPS_TMP_FILE}) +endif(${BUILD_WITH_SQLITE}) + ## lucene if(${BUILD_WITH_LUCENE}) cat("${CMAKE_SUPPORT_DIR}/lucene_CMakeLists.txt.in" ${DEPS_TMP_FILE}) diff --git a/cmake/bdb_CMakeLists.txt.in b/cmake/bdb_CMakeLists.txt.in new file mode 100644 index 0000000000..ecb7b91d09 --- /dev/null +++ b/cmake/bdb_CMakeLists.txt.in @@ -0,0 +1,13 @@ + +# bdb +ExternalProject_Add(bdb + GIT_REPOSITORY https://github.com/berkeleydb/libdb.git + GIT_TAG v5.3.28 + SOURCE_DIR "${CMAKE_SOURCE_DIR}/deps/bdb" + BINARY_DIR "${CMAKE_SOURCE_DIR}/deps/bdb" + #BUILD_IN_SOURCE TRUE + CONFIGURE_COMMAND "./dist/configure" + BUILD_COMMAND "$(MAKE)" + INSTALL_COMMAND "" + TEST_COMMAND "" +) \ No newline at end of file diff --git a/cmake/cmake.options b/cmake/cmake.options index 97fd1781e0..4c082fe79a 100644 --- a/cmake/cmake.options +++ b/cmake/cmake.options @@ -19,6 +19,18 @@ option( ON ) +option( + BUILD_WITH_SQLITE + "If build with sqlite" + ON +) + +option( + BUILD_WITH_BDB + "If build with BerkleyDB" + ON +) + option( BUILD_WITH_LUCENE "If build with lucene" @@ -34,7 +46,7 @@ option( option( BUILD_DEPENDENCY_TESTS "If build dependency tests" - OFF + ON ) option( diff --git a/cmake/sqlite_CMakeLists.txt.in b/cmake/sqlite_CMakeLists.txt.in new file mode 100644 index 0000000000..6fd981aeff --- /dev/null +++ b/cmake/sqlite_CMakeLists.txt.in @@ -0,0 +1,13 @@ + +# sqlite +ExternalProject_Add(sqlite + GIT_REPOSITORY https://github.com/sqlite/sqlite.git + GIT_TAG version-3.36.0 + SOURCE_DIR "${CMAKE_SOURCE_DIR}/deps/sqlite" + BINARY_DIR "${CMAKE_SOURCE_DIR}/deps/sqlite" + #BUILD_IN_SOURCE TRUE + CONFIGURE_COMMAND "./configure" + BUILD_COMMAND "$(MAKE)" + INSTALL_COMMAND "" + TEST_COMMAND "" +) \ No newline at end of file diff --git a/deps/CMakeLists.txt b/deps/CMakeLists.txt index e10ea9fa01..afd606c403 100644 --- a/deps/CMakeLists.txt +++ b/deps/CMakeLists.txt @@ -80,6 +80,33 @@ if(${BUILD_WITH_NURAFT}) add_subdirectory(nuraft) endif(${BUILD_WITH_NURAFT}) +# BDB +if(${BUILD_WITH_BDB}) + add_library(bdb STATIC IMPORTED) + set_target_properties(bdb PROPERTIES + IMPORTED_LOCATION "${CMAKE_CURRENT_SOURCE_DIR}/bdb/libdb.a" + INTERFACE_INCLUDE_DIRECTORIES "${CMAKE_CURRENT_SOURCE_DIR}/bdb" + ) + target_link_libraries(bdb + INTERFACE pthread + ) +endif(${BUILD_WITH_BDB}) + +# SQLite +if(${BUILD_WITH_SQLITE}) + add_library(sqlite STATIC IMPORTED) + set_target_properties(sqlite PROPERTIES + IMPORTED_LOCATION "${CMAKE_CURRENT_SOURCE_DIR}/sqlite/.libs/libsqlite3.a" + INTERFACE_INCLUDE_DIRECTORIES "${CMAKE_CURRENT_SOURCE_DIR}/sqlite" + ) + target_link_libraries(sqlite + INTERFACE m + INTERFACE pthread + INTERFACE dl + ) +endif(${BUILD_WITH_SQLITE}) + + # ================================================================================================ # DEPENDENCY TEST diff --git a/deps/test/CMakeLists.txt b/deps/test/CMakeLists.txt index e571146b86..0a333f604c 100644 --- a/deps/test/CMakeLists.txt +++ b/deps/test/CMakeLists.txt @@ -6,3 +6,13 @@ endif(${BUILD_WITH_ROCKSDB}) if(${BUILD_WITH_LUCENE}) add_subdirectory(lucene) endif(${BUILD_WITH_LUCENE}) + +if(${BUILD_WITH_BDB}) + add_subdirectory(bdb) +endif(${BUILD_WITH_BDB}) + +if(${BUILD_WITH_SQLITE}) + add_subdirectory(sqlite) +endif(${BUILD_WITH_SQLITE}) + +add_subdirectory(tdev) diff --git a/deps/test/bdb/CMakeLists.txt b/deps/test/bdb/CMakeLists.txt new file mode 100644 index 0000000000..62da0d4ac8 --- /dev/null +++ b/deps/test/bdb/CMakeLists.txt @@ -0,0 +1,7 @@ +add_executable(bdbTest "") +target_sources( + bdbTest PRIVATE + "bdbTest.c" +) + +target_link_libraries(bdbTest bdb) \ No newline at end of file diff --git a/deps/test/bdb/bdbTest.c b/deps/test/bdb/bdbTest.c new file mode 100644 index 0000000000..ab80b33fd0 --- /dev/null +++ b/deps/test/bdb/bdbTest.c @@ -0,0 +1,28 @@ +#include +#include + +#include "db.h" + +// refer: https://docs.oracle.com/cd/E17076_05/html/gsg/C/BerkeleyDB-Core-C-GSG.pdf + +int main(int argc, char const *argv[]) { + DB * db; + int ret; + uint32_t flags; + + ret = db_create(&db, NULL, 0); + if (ret != 0) { + exit(1); + } + + flags = DB_CREATE; + + ret = db->open(db, NULL, "test.db", NULL, DB_BTREE, flags, 0); + if (ret != 0) { + exit(1); + } + + db->close(db, 0); + + return 0; +} diff --git a/deps/test/sqlite/CMakeLists.txt b/deps/test/sqlite/CMakeLists.txt new file mode 100644 index 0000000000..b679dd82d9 --- /dev/null +++ b/deps/test/sqlite/CMakeLists.txt @@ -0,0 +1,6 @@ +add_executable(sqliteTest "") +target_sources( + sqliteTest PRIVATE + "sqliteTest.c" +) +target_link_libraries(sqliteTest sqlite) \ No newline at end of file diff --git a/deps/test/sqlite/sqliteTest.c b/deps/test/sqlite/sqliteTest.c new file mode 100644 index 0000000000..8766d94288 --- /dev/null +++ b/deps/test/sqlite/sqliteTest.c @@ -0,0 +1,84 @@ +#include +#include + +#include "sqlite3.h" + +static void count_table(sqlite3 *db) { + int rc; + char * sql = "select * from t;"; + sqlite3_stmt *stmt = NULL; + int nrows = 0; + + rc = sqlite3_prepare_v2(db, sql, -1, &stmt, NULL); + while (SQLITE_ROW == sqlite3_step(stmt)) { + nrows++; + } + + printf("Number of rows: %d\n", nrows); +} + +int main(int argc, char const *argv[]) { + sqlite3 *db; + char * err_msg = 0; + + int rc = sqlite3_open("test.db", &db); + + if (rc != SQLITE_OK) { + fprintf(stderr, "Cannot open database: %s\n", sqlite3_errmsg(db)); + sqlite3_close(db); + + return 1; + } + + char *sql = + "DROP TABLE IF EXISTS t;" + "CREATE TABLE t(id BIGINT);"; + + rc = sqlite3_exec(db, sql, 0, 0, &err_msg); + + if (rc != SQLITE_OK) { + fprintf(stderr, "SQL error: %s\n", err_msg); + + sqlite3_free(err_msg); + sqlite3_close(db); + + return 1; + } + + { + // Write a lot of data + int nrows = 1000; + int batch = 100; + char tsql[1024]; + int v = 0; + + // sqlite3_exec(db, "PRAGMA journal_mode=WAL;", 0, 0, &err_msg); + sqlite3_exec(db, "PRAGMA read_uncommitted=true;", 0, 0, &err_msg); + + for (int k = 0; k < nrows / batch; k++) { + sqlite3_exec(db, "begin;", 0, 0, &err_msg); + + for (int i = 0; i < batch; i++) { + v++; + sprintf(tsql, "insert into t values (%d)", v); + rc = sqlite3_exec(db, tsql, 0, 0, &err_msg); + + if (rc != SQLITE_OK) { + fprintf(stderr, "SQL error: %s\n", err_msg); + + sqlite3_free(err_msg); + sqlite3_close(db); + + return 1; + } + } + + count_table(db); + sqlite3_exec(db, "commit;", 0, 0, &err_msg); + } + } + + sqlite3_close(db); + + return 0; +} diff --git a/deps/test/tdev/CMakeLists.txt b/deps/test/tdev/CMakeLists.txt new file mode 100644 index 0000000000..d173e8d24a --- /dev/null +++ b/deps/test/tdev/CMakeLists.txt @@ -0,0 +1,4 @@ +aux_source_directory(src TDEV_SRC) +add_executable(tdev ${TDEV_SRC}) + +target_include_directories(tdev PUBLIC inc) \ No newline at end of file diff --git a/deps/test/tdev/src/main.c b/deps/test/tdev/src/main.c new file mode 100644 index 0000000000..687b175a62 --- /dev/null +++ b/deps/test/tdev/src/main.c @@ -0,0 +1,87 @@ +#include +#include +#include +#include + +#define POINTER_SHIFT(ptr, s) ((void *)(((char *)ptr) + (s))) +#define POINTER_DISTANCE(pa, pb) ((char *)(pb) - (char *)(pa)) + +#define tPutA(buf, val) \ + ({ \ + memcpy(buf, &val, sizeof(val)); \ + POINTER_SHIFT(buf, sizeof(val)); \ + }) + +#define tPutB(buf, val) \ + ({ \ + ((uint8_t *)buf)[3] = ((val) >> 24) & 0xff; \ + ((uint8_t *)buf)[2] = ((val) >> 16) & 0xff; \ + ((uint8_t *)buf)[1] = ((val) >> 8) & 0xff; \ + ((uint8_t *)buf)[0] = (val)&0xff; \ + POINTER_SHIFT(buf, sizeof(val)); \ + }) + +#define tPutC(buf, val) \ + ({ \ + ((uint64_t *)buf)[0] = (val); \ + POINTER_SHIFT(buf, sizeof(val)); \ + }) + +typedef enum { A, B, C } T; + +static void func(T t) { + uint64_t val = 198; + char buf[1024]; + void * pBuf = buf; + + switch (t) { + case A: + for (size_t i = 0; i < 10 * 1024l * 1024l * 1024l; i++) { + pBuf = tPutA(pBuf, val); + if (POINTER_DISTANCE(buf, pBuf) == 1024) { + pBuf = buf; + } + } + break; + case B: + for (size_t i = 0; i < 10 * 1024l * 1024l * 1024l; i++) { + pBuf = tPutB(pBuf, val); + if (POINTER_DISTANCE(buf, pBuf) == 1024) { + pBuf = buf; + } + } + break; + case C: + for (size_t i = 0; i < 10 * 1024l * 1024l * 1024l; i++) { + pBuf = tPutC(pBuf, val); + if (POINTER_DISTANCE(buf, pBuf) == 1024) { + pBuf = buf; + } + } + break; + + default: + break; + } +} + +static uint64_t now() { + struct timeval tv; + gettimeofday(&tv, NULL); + + return tv.tv_sec * 1000000 + tv.tv_usec; +} + +int main(int argc, char const *argv[]) { + uint64_t t1 = now(); + func(A); + uint64_t t2 = now(); + printf("A: %ld\n", t2 - t1); + func(B); + uint64_t t3 = now(); + printf("B: %ld\n", t3 - t2); + func(C); + uint64_t t4 = now(); + printf("C: %ld\n", t4 - t3); + return 0; +} diff --git a/include/common/trow.h b/include/common/trow.h index e699031d7a..4092500d5f 100644 --- a/include/common/trow.h +++ b/include/common/trow.h @@ -17,53 +17,108 @@ #define _TD_COMMON_ROW_H_ #include "os.h" +#include "tbuffer.h" +#include "tdataformat.h" +#include "tdef.h" +#include "tschema.h" #ifdef __cplusplus extern "C" { #endif -// types -typedef void * SRow; -typedef struct SRowBatch SRowBatch; -typedef struct SRowBuilder SRowBuilder; -typedef struct SRowBatchIter SRowBatchIter; -typedef struct SRowBatchBuilder SRowBatchBuilder; +#define TD_UNDECIDED_ROW 0 +#define TD_OR_ROW 1 +#define TD_KV_ROW 2 -// SRow -#define ROW_HEADER_SIZE (sizeof(uint8_t) + 2 * sizeof(uint16_t) + sizeof(uint64_t)) -#define rowType(r) (*(uint8_t *)(r)) // row type -#define rowLen(r) (*(uint16_t *)POINTER_SHIFT(r, sizeof(uint8_t))) // row length -#define rowSVer(r) \ - (*(uint16_t *)POINTER_SHIFT(r, sizeof(uint8_t) + sizeof(uint16_t))) // row schema version, only for SDataRow -#define rowNCols(r) rowSVer(r) // only for SKVRow -#define rowVer(r) (*(uint64_t)POINTER_SHIFT(r, sizeof(uint8_t) + 2 * sizeof(uint16_t))) // row version -#define rowCopy(dest, r) memcpy((dest), r, rowLen(r)) +typedef struct { + // TODO +} SOrRow; -static FORCE_INLINE SRow rowDup(SRow row) { - SRow r = malloc(rowLen(row)); - if (r == NULL) { - return NULL; - } +typedef struct { + col_id_t cid; + uint32_t offset; +} SKvRowIdx; - rowCopy(r, row); +typedef struct { + uint16_t ncols; + SKvRowIdx cidx[]; +} SKvRow; - return r; -} +typedef struct { + union { + /// union field for encode and decode + uint32_t info; + struct { + /// row type + uint32_t type : 2; + /// row schema version + uint32_t sver : 16; + /// is delete row + uint32_t del : 1; + /// reserved for back compatibility + uint32_t reserve : 13; + }; + }; + /// row total length + uint32_t len; + /// row version + uint64_t ver; + /// timestamp + TSKEY ts; + /// the inline data, maybe a tuple or a k-v tuple + char data[]; +} STSRow; -// SRowBatch +typedef struct { + uint32_t nRows; + char rows[]; +} STSRowBatch; -// SRowBuilder -SRowBuilder *rowBuilderCreate(); -void rowBuilderDestroy(SRowBuilder *); +typedef enum { + /// ordinary row builder + TD_OR_ROW_BUILDER = 0, + /// kv row builder + TD_KV_ROW_BUILDER, + /// self-determined row builder + TD_SD_ROW_BUILDER +} ERowBbuilderT; -// SRowBatchIter -SRowBatchIter *rowBatchIterCreate(SRowBatch *); -void rowBatchIterDestroy(SRowBatchIter *); -const SRow rowBatchIterNext(SRowBatchIter *); +typedef struct { + /// row builder type + ERowBbuilderT type; + /// buffer writer + SBufferWriter bw; + /// target row + STSRow *pRow; +} STSRowBuilder; -// SRowBatchBuilder -SRowBatchBuilder *rowBatchBuilderCreate(); -void rowBatchBuilderDestroy(SRowBatchBuilder *); +typedef struct { + STSchema *pSchema; + STSRow * pRow; +} STSRowReader; + +typedef struct { + uint32_t it; + STSRowBatch *pRowBatch; +} STSRowBatchIter; + +// STSRowBuilder +#define trbInit(rt, allocator, endian, target, size) \ + { .type = (rt), .bw = tbufInitWriter(allocator, endian), .pRow = (target) } +void trbSetRowInfo(STSRowBuilder *pRB, bool del, uint16_t sver); +void trbSetRowVersion(STSRowBuilder *pRB, uint64_t ver); +void trbSetRowTS(STSRowBuilder *pRB, TSKEY ts); +int trbWriteCol(STSRowBuilder *pRB, void *pData, col_id_t cid); + +// STSRowReader +#define tRowReaderInit(schema, row) \ + { .schema = (schema), .row = (row) } +int tRowReaderRead(STSRowReader *pRowReader, col_id_t cid, void *target, uint64_t size); + +// STSRowBatchIter +#define tRowBatchIterInit(pRB) \ + { .it = 0, .pRowBatch = (pRB) } +const STSRow *tRowBatchIterNext(STSRowBatchIter *pRowBatchIter); #ifdef __cplusplus } diff --git a/include/common/tschema.h b/include/common/tschema.h index 5e9057520b..4063758043 100644 --- a/include/common/tschema.h +++ b/include/common/tschema.h @@ -16,10 +16,65 @@ #ifndef _TD_COMMON_SCHEMA_H_ #define _TD_COMMON_SCHEMA_H_ +#include "os.h" +#include "tarray.h" + #ifdef __cplusplus extern "C" { #endif +typedef uint16_t col_id_t; + +#if 0 +typedef struct STColumn { + /// column name + char *cname; + union { + /// for encode purpose + uint64_t info; + struct { + uint64_t sma : 1; + /// column data type + uint64_t type : 7; + /// column id + uint64_t cid : 16; + /// max bytes of the column + uint64_t bytes : 32; + /// reserved + uint64_t reserve : 8; + }; + }; + /// comment about the column + char *comment; +} STColumn; + +typedef struct STSchema { + /// schema version + uint16_t sver; + /// number of columns + uint16_t ncols; + /// sma attributes + struct { + bool sma; + SArray *smaArray; + }; + /// column info + STColumn cols[]; +} STSchema; + +typedef struct { + uint64_t size; + STSchema *pSchema; +} STShemaBuilder; + +#define tSchemaBuilderInit(target, capacity) \ + { .size = (capacity), .pSchema = (target) } +void tSchemaBuilderSetSver(STShemaBuilder *pSchemaBuilder, uint16_t sver); +void tSchemaBuilderSetSMA(bool sma, SArray *smaArray); +int tSchemaBuilderPutColumn(char *cname, bool sma, uint8_t type, col_id_t cid, uint32_t bytes, char *comment); + +#endif + #ifdef __cplusplus } #endif diff --git a/source/dnode/vnode/impl/inc/vnodeFileSystem.h b/include/common/type/type.h similarity index 84% rename from source/dnode/vnode/impl/inc/vnodeFileSystem.h rename to include/common/type/type.h index 2a885c9c34..3cbea6edbb 100644 --- a/source/dnode/vnode/impl/inc/vnodeFileSystem.h +++ b/include/common/type/type.h @@ -13,18 +13,15 @@ * along with this program. If not, see . */ -#ifndef _TD_VNODE_FILE_SYSTEM_H_ -#define _TD_VNODE_FILE_SYSTEM_H_ +#ifndef _TD_TYPE_H_ +#define _TD_TYPE_H_ #ifdef __cplusplus extern "C" { #endif -typedef struct { -} SVnodeFS; - #ifdef __cplusplus } #endif -#endif /*_TD_VNODE_FILE_SYSTEM_H_*/ \ No newline at end of file +#endif /*_TD_TYPE_H_*/ \ No newline at end of file diff --git a/include/dnode/vnode/meta/impl/metaImpl.h b/include/dnode/vnode/meta/impl/metaImpl.h deleted file mode 100644 index 90ced02f30..0000000000 --- a/include/dnode/vnode/meta/impl/metaImpl.h +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#ifndef _TD_META_IMPL_H_ -#define _TD_META_IMPL_H_ - -#include "os.h" - -#include "taosmsg.h" - -#ifdef __cplusplus -extern "C" { -#endif -typedef uint64_t tb_uid_t; - -/* ------------------------ SMetaOptions ------------------------ */ -struct SMetaOptions { - size_t lruCacheSize; // LRU cache size -}; - -/* ------------------------ STbOptions ------------------------ */ -#define META_NORMAL_TABLE ((uint8_t)1) -#define META_SUPER_TABLE ((uint8_t)2) -#define META_CHILD_TABLE ((uint8_t)3) - -typedef struct { -} SSMAOptions; - -// super table options -typedef struct { - tb_uid_t uid; - STSchema* pSchema; - STSchema* pTagSchema; -} SSTbOptions; - -// child table options -typedef struct { - tb_uid_t suid; - SKVRow tags; -} SCTbOptions; - -// normal table options -typedef struct { - STSchema* pSchame; -} SNTbOptions; - -struct STbOptions { - uint8_t type; - char* name; - uint32_t ttl; // time to live in (SECONDS) - SSMAOptions bsma; // Block-wise sma - union { - SSTbOptions stbOptions; - SNTbOptions ntbOptions; - SCTbOptions ctbOptions; - }; -}; - -#ifdef __cplusplus -} -#endif - -#endif /*_TD_META_IMPL_H_*/ \ No newline at end of file diff --git a/include/dnode/vnode/meta/meta.h b/include/dnode/vnode/meta/meta.h index 421f96ef5f..1de1f9fa2c 100644 --- a/include/dnode/vnode/meta/meta.h +++ b/include/dnode/vnode/meta/meta.h @@ -16,36 +16,96 @@ #ifndef _TD_META_H_ #define _TD_META_H_ -#include "impl/metaImpl.h" +#include "os.h" +#include "trow.h" #ifdef __cplusplus extern "C" { #endif // Types exported -typedef struct SMeta SMeta; -typedef struct SMetaOptions SMetaOptions; -typedef struct STbOptions STbOptions; +typedef uint64_t tb_uid_t; +typedef struct SMeta SMeta; + +#define META_SUPER_TABLE 0 +#define META_CHILD_TABLE 1 +#define META_NORMAL_TABLE 2 + +typedef struct SMetaCfg { + /// LRU cache size + uint64_t lruSize; +} SMetaCfg; + +typedef struct STbCfg { + /// name of the table + char *name; + /// time to live of the table + uint32_t ttl; + /// keep time of this table + uint32_t keep; + /// type of table + uint8_t type; + union { + /// super table configurations + struct { + /// super table UID + tb_uid_t suid; + /// row schema + STSchema *pSchema; + /// tag schema + STSchema *pTagSchema; + } stbCfg; + + /// normal table configuration + struct { + /// row schema + STSchema *pSchema; + } ntbCfg; + /// child table configuration + struct { + /// super table UID + tb_uid_t suid; + SKVRow pTag; + } ctbCfg; + }; +} STbCfg; // SMeta operations -SMeta *metaOpen(const char *path, const SMetaOptions *pOptions); +SMeta *metaOpen(const char *path, const SMetaCfg *pOptions); void metaClose(SMeta *pMeta); void metaRemove(const char *path); -int metaCreateTable(SMeta *pMeta, const STbOptions *pTbOptions); +int metaCreateTable(SMeta *pMeta, STbCfg *pTbCfg); int metaDropTable(SMeta *pMeta, tb_uid_t uid); int metaCommit(SMeta *pMeta); // Options -void metaOptionsInit(SMetaOptions *pOptions); -void metaOptionsClear(SMetaOptions *pOptions); +void metaOptionsInit(SMetaCfg *pOptions); +void metaOptionsClear(SMetaCfg *pOptions); -// STableOpts -#define META_TABLE_OPTS_DECLARE(name) STableOpts name = {0} -void metaNormalTableOptsInit(STbOptions *pTbOptions, const char *name, const STSchema *pSchema); -void metaSuperTableOptsInit(STbOptions *pTbOptions, const char *name, tb_uid_t uid, const STSchema *pSchema, - const STSchema *pTagSchema); -void metaChildTableOptsInit(STbOptions *pTbOptions, const char *name, tb_uid_t suid, const SKVRow tags); -void metaTableOptsClear(STbOptions *pTbOptions); +// STbCfg +#define META_INIT_STB_CFG(NAME, TTL, KEEP, SUID, PSCHEMA, PTAGSCHEMA) \ + { \ + .name = (NAME), .ttl = (TTL), .keep = (KEEP), .type = META_SUPER_TABLE, .stbCfg = { \ + .suid = (SUID), \ + .pSchema = (PSCHEMA), \ + .pTagSchema = (PTAGSCHEMA) \ + } \ + } + +#define META_INIT_CTB_CFG(NAME, TTL, KEEP, SUID, PTAG) \ + { \ + .name = (NAME), .ttl = (TTL), .keep = (KEEP), .type = META_CHILD_TABLE, .ctbCfg = {.suid = (SUID), .pTag = PTAG } \ + } + +#define META_INIT_NTB_CFG(NAME, TTL, KEEP, SUID, PSCHEMA) \ + { \ + .name = (NAME), .ttl = (TTL), .keep = (KEEP), .type = META_NORMAL_TABLE, .ntbCfg = {.pSchema = (PSCHEMA) } \ + } + +#define META_CLEAR_TB_CFG(pTbCfg) + +int metaEncodeTbCfg(void **pBuf, STbCfg *pTbCfg); +void *metaDecodeTbCfg(void *pBuf, STbCfg **pTbCfg); #ifdef __cplusplus } diff --git a/include/dnode/vnode/tq/tq.h b/include/dnode/vnode/tq/tq.h index 2785b6de96..85533f65bc 100644 --- a/include/dnode/vnode/tq/tq.h +++ b/include/dnode/vnode/tq/tq.h @@ -161,9 +161,9 @@ typedef struct TqLogReader { int64_t (*logGetLastVer)(void* logHandle); } TqLogReader; -typedef struct TqConfig { +typedef struct STqCfg { // TODO -} TqConfig; +} STqCfg; typedef struct TqMemRef { SMemAllocatorFactory *pAlloctorFactory; @@ -256,14 +256,14 @@ typedef struct STQ { // the collection of group handle // the handle of kvstore char* path; - TqConfig* tqConfig; + STqCfg* tqConfig; TqLogReader* tqLogReader; TqMemRef tqMemRef; TqMetaStore* tqMeta; } STQ; // open in each vnode -STQ* tqOpen(const char* path, TqConfig* tqConfig, TqLogReader* tqLogReader, SMemAllocatorFactory *allocFac); +STQ* tqOpen(const char* path, STqCfg* tqConfig, TqLogReader* tqLogReader, SMemAllocatorFactory *allocFac); void tqDestroy(STQ*); // void* will be replace by a msg type diff --git a/include/dnode/vnode/tsdb/tsdb.h b/include/dnode/vnode/tsdb/tsdb.h index e92205378a..26efce5d43 100644 --- a/include/dnode/vnode/tsdb/tsdb.h +++ b/include/dnode/vnode/tsdb/tsdb.h @@ -22,21 +22,25 @@ extern "C" { // TYPES EXPOSED typedef struct STsdb STsdb; -typedef struct STsdbOptions STsdbOptions; +typedef struct STsdbCfg STsdbCfg; typedef struct STsdbMemAllocator STsdbMemAllocator; // STsdb -STsdb *tsdbOpen(const char *path, const STsdbOptions *); +STsdb *tsdbOpen(const char *path, const STsdbCfg *); void tsdbClose(STsdb *); void tsdbRemove(const char *path); +int tsdbInsertData(STsdb *pTsdb, void *pData, int len); -// STsdbOptions -int tsdbOptionsInit(STsdbOptions *); -void tsdbOptionsClear(STsdbOptions *); +// STsdbCfg +int tsdbOptionsInit(STsdbCfg *); +void tsdbOptionsClear(STsdbCfg *); /* ------------------------ STRUCT DEFINITIONS ------------------------ */ -struct STsdbOptions { +struct STsdbCfg { uint64_t lruCacheSize; + uint32_t keep0; + uint32_t keep1; + uint32_t keep2; /* TODO */ }; diff --git a/include/dnode/vnode/vnode.h b/include/dnode/vnode/vnode.h index 1edd93f509..30531ad738 100644 --- a/include/dnode/vnode/vnode.h +++ b/include/dnode/vnode/vnode.h @@ -23,24 +23,67 @@ #include "tarray.h" #include "tq.h" #include "tsdb.h" +#include "wal.h" #ifdef __cplusplus extern "C" { #endif /* ------------------------ TYPES EXPOSED ------------------------ */ -typedef struct SVnode SVnode; -typedef struct SVnodeOptions SVnodeOptions; +typedef struct SVnode SVnode; +typedef struct SVnodeCfg { + /** vnode buffer pool options */ + struct { + /** write buffer size */ + uint64_t wsize; + /** use heap allocator or arena allocator */ + bool isHeapAllocator; + }; + + /** time to live of tables in this vnode */ + uint32_t ttl; + + /** data to keep in this vnode */ + uint32_t keep; + + /** if TS data is eventually consistency */ + bool isWeak; + + /** TSDB config */ + STsdbCfg tsdbCfg; + + /** META config */ + SMetaCfg metaCfg; + + /** TQ config */ + STqCfg tqCfg; + + /** WAL config */ + SWalCfg walCfg; +} SVnodeCfg; /* ------------------------ SVnode ------------------------ */ +/** + * @brief Initialize the vnode module + * + * @return int 0 for success and -1 for failure + */ +int vnodeInit(); + +/** + * @brief clear a vnode + * + */ +void vnodeClear(); + /** * @brief Open a VNODE. * * @param path path of the vnode - * @param pVnodeOptions options of the vnode + * @param pVnodeCfg options of the vnode * @return SVnode* The vnode object */ -SVnode *vnodeOpen(const char *path, const SVnodeOptions *pVnodeOptions); +SVnode *vnodeOpen(const char *path, const SVnodeCfg *pVnodeCfg); /** * @brief Close a VNODE @@ -85,61 +128,55 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); */ int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); -/* ------------------------ SVnodeOptions ------------------------ */ +/* ------------------------ SVnodeCfg ------------------------ */ /** * @brief Initialize VNODE options. * * @param pOptions The options object to be initialized. It should not be NULL. */ -void vnodeOptionsInit(SVnodeOptions *pOptions); +void vnodeOptionsInit(SVnodeCfg *pOptions); /** * @brief Clear VNODE options. * * @param pOptions Options to clear. */ -void vnodeOptionsClear(SVnodeOptions *pOptions); +void vnodeOptionsClear(SVnodeCfg *pOptions); -/* ------------------------ STRUCT DEFINITIONS ------------------------ */ -struct SVnodeOptions { - /** - * @brief write buffer size in BYTES - * - */ - uint64_t wsize; +/* ------------------------ REQUESTS ------------------------ */ +typedef STbCfg SVCreateTableReq; +typedef struct { + tb_uid_t uid; +} SVDropTableReq; - /** - * @brief time to live of tables in this vnode - * in SECONDS - * - */ - uint32_t ttl; +typedef struct { + // TODO +} SVSubmitReq; - /** - * @brief if time-series requests eventual consistency - * - */ - bool isWeak; +typedef struct { + uint64_t ver; + union { + SVCreateTableReq ctReq; + SVDropTableReq dtReq; + }; +} SVnodeReq; - /** - * @brief if the allocator is heap allcator or arena allocator - * - */ - bool isHeapAllocator; +typedef struct { + int err; + char info[]; +} SVnodeRsp; - /** - * @brief TSDB options - * - */ - STsdbOptions tsdbOptions; +#define VNODE_INIT_CREATE_STB_REQ(NAME, TTL, KEEP, SUID, PSCHEMA, PTAGSCHEMA) \ + { .ver = 0, .ctReq = META_INIT_STB_CFG(NAME, TTL, KEEP, SUID, PSCHEMA, PTAGSCHEMA) } - /** - * @brief META options - * - */ - SMetaOptions metaOptions; - // STqOptions tqOptions; // TODO -}; +#define VNODE_INIT_CREATE_CTB_REQ(NAME, TTL, KEEP, SUID, PTAG) \ + { .ver = 0, .ctReq = META_INIT_CTB_CFG(NAME, TTL, KEEP, SUID, PTAG) } + +#define VNODE_INIT_CREATE_NTB_REQ(NAME, TTL, KEEP, SUID, PSCHEMA) \ + { .ver = 0, .ctReq = META_INIT_NTB_CFG(NAME, TTL, KEEP, SUID, PSCHEMA) } + +int vnodeBuildReq(void **buf, const SVnodeReq *pReq, uint8_t type); +void *vnodeParseReq(void *buf, SVnodeReq *pReq, uint8_t type); /* ------------------------ FOR COMPILE ------------------------ */ @@ -148,27 +185,27 @@ struct SVnodeOptions { #include "taosmsg.h" #include "trpc.h" -typedef struct { - char db[TSDB_FULL_DB_NAME_LEN]; - int32_t cacheBlockSize; // MB - int32_t totalBlocks; - int32_t daysPerFile; - int32_t daysToKeep0; - int32_t daysToKeep1; - int32_t daysToKeep2; - int32_t minRowsPerFileBlock; - int32_t maxRowsPerFileBlock; - int8_t precision; // time resolution - int8_t compression; - int8_t cacheLastRow; - int8_t update; - int8_t quorum; - int8_t replica; - int8_t selfIndex; - int8_t walLevel; - int32_t fsyncPeriod; // millisecond - SReplica replicas[TSDB_MAX_REPLICA]; -} SVnodeCfg; +// typedef struct { +// char db[TSDB_FULL_DB_NAME_LEN]; +// int32_t cacheBlockSize; // MB +// int32_t totalBlocks; +// int32_t daysPerFile; +// int32_t daysToKeep0; +// int32_t daysToKeep1; +// int32_t daysToKeep2; +// int32_t minRowsPerFileBlock; +// int32_t maxRowsPerFileBlock; +// int8_t precision; // time resolution +// int8_t compression; +// int8_t cacheLastRow; +// int8_t update; +// int8_t quorum; +// int8_t replica; +// int8_t selfIndex; +// int8_t walLevel; +// int32_t fsyncPeriod; // millisecond +// SReplica replicas[TSDB_MAX_REPLICA]; +// } SVnodeCfg; typedef enum { VN_MSG_TYPE_WRITE = 1, diff --git a/include/util/mallocator.h b/include/util/mallocator.h index fd66811f38..ffe242017e 100644 --- a/include/util/mallocator.h +++ b/include/util/mallocator.h @@ -22,10 +22,10 @@ extern "C" { #endif -typedef struct SMemAllocator SMemAllocator; +typedef struct SMemAllocator SMemAllocator; +typedef struct SMemAllocatorFactory SMemAllocatorFactory; struct SMemAllocator { - char name[16]; void *impl; void *(*malloc)(SMemAllocator *, uint64_t size); void *(*calloc)(SMemAllocator *, uint64_t nmemb, uint64_t size); @@ -34,11 +34,11 @@ struct SMemAllocator { uint64_t (*usage)(SMemAllocator *); }; -typedef struct { +struct SMemAllocatorFactory { void *impl; - SMemAllocator *(*create)(); - void (*destroy)(SMemAllocator *); -} SMemAllocatorFactory; + SMemAllocator *(*create)(SMemAllocatorFactory *); + void (*destroy)(SMemAllocatorFactory *, SMemAllocator *); +}; #ifdef __cplusplus } diff --git a/source/common/src/trow.c b/source/common/src/trow.c index cf1b0eceff..f383cd04dc 100644 --- a/source/common/src/trow.c +++ b/source/common/src/trow.c @@ -15,75 +15,21 @@ #include "trow.h" -/* ------------ Structures ---------- */ -struct SRowBatch { - int32_t compress : 1; // if batch row is compressed - int32_t nrows : 31; // number of rows - int32_t tlen; // total length (including `nrows` and `tlen`) - char rows[]; -}; - -struct SRowBuilder { +#if 0 +void trbSetRowInfo(SRowBuilder *pRB, bool del, uint16_t sver) { // TODO -}; +} -struct SRowBatchIter { - int32_t counter; // row counter - SRowBatch *rb; // row batch to iter - SRow nrow; // next row -}; - -struct SRowBatchBuilder { +void trbSetRowVersion(SRowBuilder *pRB, uint64_t ver) { // TODO -}; +} -/* ------------ Methods ---------- */ - -// SRowBuilder -SRowBuilder *rowBuilderCreate() { - SRowBuilder *pRowBuilder = NULL; +void trbSetRowTS(SRowBuilder *pRB, TSKEY ts) { // TODO - - return pRowBuilder; } -void rowBuilderDestroy(SRowBuilder *pRowBuilder) { - if (pRowBuilder) { - free(pRowBuilder); - } +int trbWriteCol(SRowBuilder *pRB, void *pData, col_id_t cid) { + // TODO + return 0; } - -// SRowBatchIter -SRowBatchIter *rowBatchIterCreate(SRowBatch *pRowBatch) { - SRowBatchIter *pRowBatchIter = (SRowBatchIter *)malloc(sizeof(*pRowBatchIter)); - if (pRowBatchIter == NULL) { - return NULL; - } - - pRowBatchIter->counter = 0; - pRowBatchIter->rb = pRowBatch; - pRowBatchIter->nrow = pRowBatch->rows; - - return pRowBatchIter; -}; - -void rowBatchIterDestroy(SRowBatchIter *pRowBatchIter) { - if (pRowBatchIter) { - free(pRowBatchIter); - } -} - -const SRow rowBatchIterNext(SRowBatchIter *pRowBatchIter) { - SRow r = NULL; - if (pRowBatchIter->counter < pRowBatchIter->rb->nrows) { - r = pRowBatchIter->nrow; - pRowBatchIter->counter += 1; - pRowBatchIter->nrow = (SRow)POINTER_SHIFT(r, rowLen(r)); - } - - return r; -} - -// SRowBatchBuilder -SRowBatchBuilder *rowBatchBuilderCreate(); -void rowBatchBuilderDestroy(SRowBatchBuilder *); \ No newline at end of file +#endif \ No newline at end of file diff --git a/source/common/test/trowTest.cpp b/source/common/test/trowTest.cpp new file mode 100644 index 0000000000..d7f0783d4a --- /dev/null +++ b/source/common/test/trowTest.cpp @@ -0,0 +1,23 @@ +#include + +#include "trow.h" + +TEST(td_row_test, build_row_to_target) { +#if 0 + char dst[1024]; + SRow* pRow = (SRow*)dst; + int ncols = 10; + col_id_t cid; + void* pData; + SRowBuilder rb = trbInit(TD_OR_ROW_BUILDER, NULL, 0, pRow, 1024); + + trbSetRowInfo(&rb, false, 0); + trbSetRowTS(&rb, 1637550210000); + for (int c = 0; c < ncols; c++) { + cid = c; + if (trbWriteCol(&rb, pData, cid) < 0) { + // TODO + } + } +#endif +} \ No newline at end of file diff --git a/source/common/test/tschemaTest.cpp b/source/common/test/tschemaTest.cpp new file mode 100644 index 0000000000..acced6e09e --- /dev/null +++ b/source/common/test/tschemaTest.cpp @@ -0,0 +1,6 @@ +#include +#include "tschema.h" + +TEST(td_schema_test, build_schema_test) { + +} \ No newline at end of file diff --git a/source/common/type/type.c b/source/common/type/type.c new file mode 100644 index 0000000000..6dea4a4e57 --- /dev/null +++ b/source/common/type/type.c @@ -0,0 +1,14 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ \ No newline at end of file diff --git a/source/common/type/typeBigint.c b/source/common/type/typeBigint.c new file mode 100644 index 0000000000..6dea4a4e57 --- /dev/null +++ b/source/common/type/typeBigint.c @@ -0,0 +1,14 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ \ No newline at end of file diff --git a/source/common/type/typeBinary.c b/source/common/type/typeBinary.c new file mode 100644 index 0000000000..6dea4a4e57 --- /dev/null +++ b/source/common/type/typeBinary.c @@ -0,0 +1,14 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ \ No newline at end of file diff --git a/source/common/type/typeBlob.c b/source/common/type/typeBlob.c new file mode 100644 index 0000000000..6dea4a4e57 --- /dev/null +++ b/source/common/type/typeBlob.c @@ -0,0 +1,14 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ \ No newline at end of file diff --git a/source/common/type/typeBool.c b/source/common/type/typeBool.c new file mode 100644 index 0000000000..6dea4a4e57 --- /dev/null +++ b/source/common/type/typeBool.c @@ -0,0 +1,14 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ \ No newline at end of file diff --git a/source/common/type/typeDecimal.c b/source/common/type/typeDecimal.c new file mode 100644 index 0000000000..6dea4a4e57 --- /dev/null +++ b/source/common/type/typeDecimal.c @@ -0,0 +1,14 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ \ No newline at end of file diff --git a/source/common/type/typeDouble.c b/source/common/type/typeDouble.c new file mode 100644 index 0000000000..6dea4a4e57 --- /dev/null +++ b/source/common/type/typeDouble.c @@ -0,0 +1,14 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ \ No newline at end of file diff --git a/source/common/type/typeFloat.c b/source/common/type/typeFloat.c new file mode 100644 index 0000000000..6dea4a4e57 --- /dev/null +++ b/source/common/type/typeFloat.c @@ -0,0 +1,14 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ \ No newline at end of file diff --git a/source/common/type/typeInt.c b/source/common/type/typeInt.c new file mode 100644 index 0000000000..6dea4a4e57 --- /dev/null +++ b/source/common/type/typeInt.c @@ -0,0 +1,14 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ \ No newline at end of file diff --git a/source/common/type/typeJson.c b/source/common/type/typeJson.c new file mode 100644 index 0000000000..6dea4a4e57 --- /dev/null +++ b/source/common/type/typeJson.c @@ -0,0 +1,14 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ \ No newline at end of file diff --git a/source/common/type/typeLongblob.c b/source/common/type/typeLongblob.c new file mode 100644 index 0000000000..6dea4a4e57 --- /dev/null +++ b/source/common/type/typeLongblob.c @@ -0,0 +1,14 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ \ No newline at end of file diff --git a/source/common/type/typeNchar.c b/source/common/type/typeNchar.c new file mode 100644 index 0000000000..6dea4a4e57 --- /dev/null +++ b/source/common/type/typeNchar.c @@ -0,0 +1,14 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ \ No newline at end of file diff --git a/source/common/type/typeNull.c b/source/common/type/typeNull.c new file mode 100644 index 0000000000..6dea4a4e57 --- /dev/null +++ b/source/common/type/typeNull.c @@ -0,0 +1,14 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ \ No newline at end of file diff --git a/source/common/type/typeSmallint.c b/source/common/type/typeSmallint.c new file mode 100644 index 0000000000..6dea4a4e57 --- /dev/null +++ b/source/common/type/typeSmallint.c @@ -0,0 +1,14 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ \ No newline at end of file diff --git a/source/common/type/typeTimestamp.c b/source/common/type/typeTimestamp.c new file mode 100644 index 0000000000..6dea4a4e57 --- /dev/null +++ b/source/common/type/typeTimestamp.c @@ -0,0 +1,14 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ \ No newline at end of file diff --git a/source/common/type/typeTinyint.c b/source/common/type/typeTinyint.c new file mode 100644 index 0000000000..6dea4a4e57 --- /dev/null +++ b/source/common/type/typeTinyint.c @@ -0,0 +1,14 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ \ No newline at end of file diff --git a/source/common/type/typeUBigint.c b/source/common/type/typeUBigint.c new file mode 100644 index 0000000000..6dea4a4e57 --- /dev/null +++ b/source/common/type/typeUBigint.c @@ -0,0 +1,14 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ \ No newline at end of file diff --git a/source/common/type/typeUSmallint.c b/source/common/type/typeUSmallint.c new file mode 100644 index 0000000000..6dea4a4e57 --- /dev/null +++ b/source/common/type/typeUSmallint.c @@ -0,0 +1,14 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ \ No newline at end of file diff --git a/source/common/type/typeUTinyint.c b/source/common/type/typeUTinyint.c new file mode 100644 index 0000000000..6dea4a4e57 --- /dev/null +++ b/source/common/type/typeUTinyint.c @@ -0,0 +1,14 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ \ No newline at end of file diff --git a/source/common/type/typeUint.c b/source/common/type/typeUint.c new file mode 100644 index 0000000000..6dea4a4e57 --- /dev/null +++ b/source/common/type/typeUint.c @@ -0,0 +1,14 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ \ No newline at end of file diff --git a/source/common/type/typeVarchar.c b/source/common/type/typeVarchar.c new file mode 100644 index 0000000000..6dea4a4e57 --- /dev/null +++ b/source/common/type/typeVarchar.c @@ -0,0 +1,14 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ \ No newline at end of file diff --git a/source/dnode/mgmt/impl/src/dndVnodes.c b/source/dnode/mgmt/impl/src/dndVnodes.c index 34d37d2320..a6eb916aef 100644 --- a/source/dnode/mgmt/impl/src/dndVnodes.c +++ b/source/dnode/mgmt/impl/src/dndVnodes.c @@ -38,7 +38,7 @@ typedef struct { int32_t threadIndex; pthread_t *pThreadId; SVnodeObj *pVnodes; - SDnode *pDnode; + SDnode * pDnode; } SVnodeThread; static int32_t dndInitVnodeReadWorker(SDnode *pDnode); @@ -73,7 +73,7 @@ void dndProcessVnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEp void dndProcessVnodeMgmtMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); static int32_t dndPutMsgIntoVnodeApplyQueue(SDnode *pDnode, int32_t vgId, SVnodeMsg *pMsg); -static SVnodeObj *dndAcquireVnode(SDnode *pDnode, int32_t vgId); +static SVnodeObj * dndAcquireVnode(SDnode *pDnode, int32_t vgId); static void dndReleaseVnode(SDnode *pDnode, SVnodeObj *pVnode); static int32_t dndCreateVnodeWrapper(SDnode *pDnode, int32_t vgId, char *path, SVnode *pImpl); static void dndDropVnodeWrapper(SDnode *pDnode, SVnodeObj *pVnode); @@ -95,7 +95,7 @@ static int32_t dndProcessCompactVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg); static SVnodeObj *dndAcquireVnode(SDnode *pDnode, int32_t vgId) { SVnodesMgmt *pMgmt = &pDnode->vmgmt; - SVnodeObj *pVnode = NULL; + SVnodeObj * pVnode = NULL; int32_t refCount = 0; taosRLockLatch(&pMgmt->latch); @@ -128,7 +128,7 @@ static void dndReleaseVnode(SDnode *pDnode, SVnodeObj *pVnode) { static int32_t dndCreateVnodeWrapper(SDnode *pDnode, int32_t vgId, char *path, SVnode *pImpl) { SVnodesMgmt *pMgmt = &pDnode->vmgmt; - SVnodeObj *pVnode = calloc(1, sizeof(SVnodeObj)); + SVnodeObj * pVnode = calloc(1, sizeof(SVnodeObj)); if (pVnode == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; @@ -208,7 +208,7 @@ static SVnodeObj **dndGetVnodesFromHash(SDnode *pDnode, int32_t *numOfVnodes) { void *pIter = taosHashIterate(pMgmt->hash, NULL); while (pIter) { SVnodeObj **ppVnode = pIter; - SVnodeObj *pVnode = *ppVnode; + SVnodeObj * pVnode = *ppVnode; if (pVnode) { num++; if (num < size) { @@ -230,9 +230,9 @@ static int32_t dndGetVnodesFromFile(SDnode *pDnode, SVnodeObj **ppVnodes, int32_ int32_t code = TSDB_CODE_DND_VNODE_READ_FILE_ERROR; int32_t len = 0; int32_t maxLen = 30000; - char *content = calloc(1, maxLen + 1); - cJSON *root = NULL; - FILE *fp = NULL; + char * content = calloc(1, maxLen + 1); + cJSON * root = NULL; + FILE * fp = NULL; char file[PATH_MAX + 20] = {0}; SVnodeObj *pVnodes = NULL; @@ -277,7 +277,7 @@ static int32_t dndGetVnodesFromFile(SDnode *pDnode, SVnodeObj **ppVnodes, int32_ } for (int32_t i = 0; i < vnodesNum; ++i) { - cJSON *vnode = cJSON_GetArrayItem(vnodes, i); + cJSON * vnode = cJSON_GetArrayItem(vnodes, i); SVnodeObj *pVnode = &pVnodes[i]; cJSON *vgId = cJSON_GetObjectItem(vnode, "vgId"); @@ -321,7 +321,7 @@ static int32_t dndWriteVnodesToFile(SDnode *pDnode) { int32_t len = 0; int32_t maxLen = 30000; - char *content = calloc(1, maxLen + 1); + char * content = calloc(1, maxLen + 1); int32_t numOfVnodes = 0; SVnodeObj **pVnodes = dndGetVnodesFromHash(pDnode, &numOfVnodes); @@ -403,8 +403,8 @@ static int32_t dndDropVnode(SDnode *pDnode, SVnodeObj *pVnode) { static void *dnodeOpenVnodeFunc(void *param) { SVnodeThread *pThread = param; - SDnode *pDnode = pThread->pDnode; - SVnodesMgmt *pMgmt = &pDnode->vmgmt; + SDnode * pDnode = pThread->pDnode; + SVnodesMgmt * pMgmt = &pDnode->vmgmt; dDebug("thread:%d, start to open %d vnodes", pThread->threadIndex, pThread->vnodeNum); setThreadName("open-vnodes"); @@ -527,6 +527,7 @@ static int32_t dndParseCreateVnodeReq(SRpcMsg *rpcMsg, int32_t *vgId, SVnodeCfg SCreateVnodeMsg *pCreate = rpcMsg->pCont; *vgId = htonl(pCreate->vgId); +#if 0 tstrncpy(pCfg->db, pCreate->db, TSDB_FULL_DB_NAME_LEN); pCfg->cacheBlockSize = htonl(pCreate->cacheBlockSize); pCfg->totalBlocks = htonl(pCreate->totalBlocks); @@ -549,6 +550,7 @@ static int32_t dndParseCreateVnodeReq(SRpcMsg *rpcMsg, int32_t *vgId, SVnodeCfg pCfg->replicas[i].port = htons(pCreate->replicas[i].port); tstrncpy(pCfg->replicas[i].fqdn, pCreate->replicas[i].fqdn, TSDB_FQDN_LEN); } +#endif return 0; } @@ -738,7 +740,7 @@ static void dndProcessVnodeFetchQueue(SVnodeObj *pVnode, SVnodeMsg *pMsg) { static void dndProcessVnodeWriteQueue(SVnodeObj *pVnode, taos_qall qall, int32_t numOfMsgs) { SVnodeMsg *pMsg = vnodeInitMsg(numOfMsgs); - SRpcMsg *pRpcMsg = NULL; + SRpcMsg * pRpcMsg = NULL; for (int32_t i = 0; i < numOfMsgs; ++i) { taosGetQitem(qall, (void **)&pRpcMsg); @@ -1011,7 +1013,7 @@ static void dndFreeVnodeApplyQueue(SDnode *pDnode, SVnodeObj *pVnode) { } static int32_t dndInitVnodeWriteWorker(SDnode *pDnode) { - SVnodesMgmt *pMgmt = &pDnode->vmgmt; + SVnodesMgmt * pMgmt = &pDnode->vmgmt; SMWorkerPool *pPool = &pMgmt->writePool; pPool->name = "vnode-write"; pPool->max = tsNumOfCores; @@ -1119,12 +1121,12 @@ void dndGetVnodeLoads(SDnode *pDnode, SVnodeLoads *pLoads) { pLoads->num = taosHashGetSize(pMgmt->hash); int32_t v = 0; - void *pIter = taosHashIterate(pMgmt->hash, NULL); + void * pIter = taosHashIterate(pMgmt->hash, NULL); while (pIter) { SVnodeObj **ppVnode = pIter; if (ppVnode == NULL || *ppVnode == NULL) continue; - SVnodeObj *pVnode = *ppVnode; + SVnodeObj * pVnode = *ppVnode; SVnodeLoad *pLoad = &pLoads->data[v++]; vnodeGetLoad(pVnode->pImpl, pLoad); diff --git a/source/dnode/vnode/impl/inc/vnodeBufferPool.h b/source/dnode/vnode/impl/inc/vnodeBufferPool.h index 3033862779..d64dc93847 100644 --- a/source/dnode/vnode/impl/inc/vnodeBufferPool.h +++ b/source/dnode/vnode/impl/inc/vnodeBufferPool.h @@ -25,8 +25,9 @@ extern "C" { typedef struct SVBufPool SVBufPool; -int vnodeOpenBufPool(SVnode *pVnode); -void vnodeCloseBufPool(SVnode *pVnode); +int vnodeOpenBufPool(SVnode *pVnode); +void vnodeCloseBufPool(SVnode *pVnode); +void *vnodeMalloc(SVnode *pVnode, uint64_t size); #ifdef __cplusplus } diff --git a/source/dnode/vnode/impl/inc/vnodeOptions.h b/source/dnode/vnode/impl/inc/vnodeCfg.h similarity index 73% rename from source/dnode/vnode/impl/inc/vnodeOptions.h rename to source/dnode/vnode/impl/inc/vnodeCfg.h index edb4be2a77..c4245b4023 100644 --- a/source/dnode/vnode/impl/inc/vnodeOptions.h +++ b/source/dnode/vnode/impl/inc/vnodeCfg.h @@ -13,8 +13,8 @@ * along with this program. If not, see . */ -#ifndef _TD_VNODE_OPTIONS_H_ -#define _TD_VNODE_OPTIONS_H_ +#ifndef _TD_VNODE_CFG_H_ +#define _TD_VNODE_CFG_H_ #include "vnode.h" @@ -22,13 +22,13 @@ extern "C" { #endif -extern const SVnodeOptions defaultVnodeOptions; +extern const SVnodeCfg defaultVnodeOptions; -int vnodeValidateOptions(const SVnodeOptions *); -void vnodeOptionsCopy(SVnodeOptions *pDest, const SVnodeOptions *pSrc); +int vnodeValidateOptions(const SVnodeCfg *); +void vnodeOptionsCopy(SVnodeCfg *pDest, const SVnodeCfg *pSrc); #ifdef __cplusplus } #endif -#endif /*_TD_VNODE_OPTIONS_H_*/ \ No newline at end of file +#endif /*_TD_VNODE_CFG_H_*/ \ No newline at end of file diff --git a/source/dnode/vnode/impl/inc/vnodeDef.h b/source/dnode/vnode/impl/inc/vnodeDef.h index 9cf9210cf3..c92de433c3 100644 --- a/source/dnode/vnode/impl/inc/vnodeDef.h +++ b/source/dnode/vnode/impl/inc/vnodeDef.h @@ -20,12 +20,14 @@ #include "sync.h" #include "tlockfree.h" #include "wal.h" +#include "tcoding.h" #include "vnode.h" #include "vnodeBufferPool.h" +#include "vnodeCfg.h" #include "vnodeCommit.h" -#include "vnodeFileSystem.h" -#include "vnodeOptions.h" +#include "vnodeFS.h" +#include "vnodeRequest.h" #include "vnodeStateMgr.h" #include "vnodeSync.h" @@ -34,16 +36,16 @@ extern "C" { #endif struct SVnode { - char* path; - SVnodeOptions options; - SVState state; - SVBufPool* pBufPool; - SMeta* pMeta; - STsdb* pTsdb; - STQ* pTq; - SWal* pWal; - SVnodeSync* pSync; - SVnodeFS* pFs; + char* path; + SVnodeCfg config; + SVState state; + SVBufPool* pBufPool; + SMeta* pMeta; + STsdb* pTsdb; + STQ* pTq; + SWal* pWal; + SVnodeSync* pSync; + SVnodeFS* pFs; }; #ifdef __cplusplus diff --git a/source/dnode/vnode/impl/inc/vnodeFS.h b/source/dnode/vnode/impl/inc/vnodeFS.h new file mode 100644 index 0000000000..dbec985695 --- /dev/null +++ b/source/dnode/vnode/impl/inc/vnodeFS.h @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_VNODE_FS_H_ +#define _TD_VNODE_FS_H_ + +#include "vnode.h" + +#ifdef __cplusplus +extern "C" { +#endif + +typedef struct { +} SDir; + +typedef struct { +} SFile; + +typedef struct SFS { + void *pImpl; + int (*startEdit)(struct SFS *); + int (*endEdit)(struct SFS *); +} SFS; + +typedef struct { +} SVnodeFS; + +int vnodeOpenFS(SVnode *pVnode); +void vnodeCloseFS(SVnode *pVnode); + +#ifdef __cplusplus +} +#endif + +#endif /*_TD_VNODE_FS_H_*/ \ No newline at end of file diff --git a/source/dnode/vnode/impl/inc/vnodeRequest.h b/source/dnode/vnode/impl/inc/vnodeRequest.h index 788918f105..d70fc84cab 100644 --- a/source/dnode/vnode/impl/inc/vnodeRequest.h +++ b/source/dnode/vnode/impl/inc/vnodeRequest.h @@ -16,23 +16,15 @@ #ifndef _TD_VNODE_REQUEST_H_ #define _TD_VNODE_REQUEST_H_ +#include "vnode.h" + #ifdef __cplusplus extern "C" { #endif -typedef struct SVnodeReq SVnodeReq; -typedef struct SVnodeRsp SVnodeRsp; - -typedef enum { -} EVReqT; - -struct SVnodeReq { - /* TODO */ -}; - -struct SVnodeRsp { - /* TODO */ -}; +// SVDropTableReq +int vnodeBuildDropTableReq(void **buf, const SVDropTableReq *pReq); +void *vnodeParseDropTableReq(void *buf, SVDropTableReq *pReq); #ifdef __cplusplus } diff --git a/source/dnode/vnode/impl/inc/vnodeStateMgr.h b/source/dnode/vnode/impl/inc/vnodeStateMgr.h index a32f682846..788426e25e 100644 --- a/source/dnode/vnode/impl/inc/vnodeStateMgr.h +++ b/source/dnode/vnode/impl/inc/vnodeStateMgr.h @@ -21,6 +21,9 @@ extern "C" { #endif typedef struct { + uint64_t processed; + uint64_t committed; + uint64_t applied; } SVState; #ifdef __cplusplus diff --git a/source/dnode/vnode/impl/src/vnodeBufferPool.c b/source/dnode/vnode/impl/src/vnodeBufferPool.c index eba71b5ba3..00203ed9b6 100644 --- a/source/dnode/vnode/impl/src/vnodeBufferPool.c +++ b/source/dnode/vnode/impl/src/vnodeBufferPool.c @@ -19,9 +19,12 @@ #define VNODE_BUF_POOL_SHARDS 3 struct SVBufPool { + // buffer pool impl SList free; SList incycle; SListNode *inuse; + // MAF for submodules + SMemAllocatorFactory maf; }; typedef enum { @@ -49,6 +52,11 @@ typedef struct { SVArenaNode node; } SVArenaAllocator; +typedef struct { + SVnode * pVnode; + SListNode *pNode; +} SVMAWrapper; + typedef struct { T_REF_DECLARE() uint64_t capacity; @@ -59,8 +67,11 @@ typedef struct { }; } SVMemAllocator; -static SListNode *vBufPoolNewNode(uint64_t capacity, EVMemAllocatorT type); -static void vBufPoolFreeNode(SListNode *pNode); +static SListNode * vBufPoolNewNode(uint64_t capacity, EVMemAllocatorT type); +static void vBufPoolFreeNode(SListNode *pNode); +static SMemAllocator *vBufPoolCreateMA(SMemAllocatorFactory *pmaf); +static void vBufPoolDestroyMA(SMemAllocatorFactory *pmaf, SMemAllocator *pma); +static void * vBufPoolMalloc(SVMemAllocator *pvma, uint64_t size); int vnodeOpenBufPool(SVnode *pVnode) { uint64_t capacity; @@ -74,8 +85,8 @@ int vnodeOpenBufPool(SVnode *pVnode) { tdListInit(&(pVnode->pBufPool->free), 0); tdListInit(&(pVnode->pBufPool->incycle), 0); - capacity = pVnode->options.wsize / VNODE_BUF_POOL_SHARDS; - if (pVnode->options.isHeapAllocator) { + capacity = pVnode->config.wsize / VNODE_BUF_POOL_SHARDS; + if (pVnode->config.isHeapAllocator) { type = E_V_HEAP_ALLOCATOR; } @@ -89,6 +100,10 @@ int vnodeOpenBufPool(SVnode *pVnode) { tdListAppendNode(&(pVnode->pBufPool->free), pNode); } + pVnode->pBufPool->maf.impl = pVnode; + pVnode->pBufPool->maf.create = vBufPoolCreateMA; + pVnode->pBufPool->maf.destroy = vBufPoolDestroyMA; + return 0; } @@ -115,6 +130,24 @@ void vnodeCloseBufPool(SVnode *pVnode) { } } +void *vnodeMalloc(SVnode *pVnode, uint64_t size) { + void *ptr; + + if (pVnode->pBufPool->inuse == NULL) { + SListNode *pNode; + while ((pNode = tdListPopHead(&(pVnode->pBufPool->free))) == NULL) { + // todo + // tsem_wait(); + ASSERT(0); + } + + pVnode->pBufPool->inuse = pNode; + } + + SVMemAllocator *pvma = (SVMemAllocator *)(pVnode->pBufPool->inuse->data); + return vBufPoolMalloc(pvma, size); +} + /* ------------------------ STATIC METHODS ------------------------ */ static void vArenaAllocatorInit(SVArenaAllocator *pvaa, uint64_t capacity, uint64_t ssize, uint64_t lsize) { /* TODO */ pvaa->ssize = ssize; @@ -185,4 +218,103 @@ static void vBufPoolFreeNode(SListNode *pNode) { } free(pNode); +} + +static void *vBufPoolMalloc(SVMemAllocator *pvma, uint64_t size) { + void *ptr = NULL; + + if (pvma->type == E_V_ARENA_ALLOCATOR) { + SVArenaAllocator *pvaa = &(pvma->vaa); + + if (POINTER_DISTANCE(pvaa->inuse->ptr, pvaa->inuse->data) + size > pvaa->inuse->size) { + SVArenaNode *pNode = (SVArenaNode *)malloc(sizeof(*pNode) + MAX(size, pvaa->ssize)); + if (pNode == NULL) { + // TODO: handle error + return NULL; + } + + pNode->prev = pvaa->inuse; + pNode->size = MAX(size, pvaa->ssize); + pNode->ptr = pNode->data; + + pvaa->inuse = pNode; + } + + ptr = pvaa->inuse->ptr; + pvaa->inuse->ptr = POINTER_SHIFT(ptr, size); + } else if (pvma->type == E_V_HEAP_ALLOCATOR) { + /* TODO */ + } + + return ptr; +} + +static SMemAllocator *vBufPoolCreateMA(SMemAllocatorFactory *pmaf) { + SVnode * pVnode; + SMemAllocator * pma; + SVMemAllocator *pvma; + SVMAWrapper * pvmaw; + + pVnode = (SVnode *)(pmaf->impl); + pma = (SMemAllocator *)calloc(1, sizeof(*pma) + sizeof(SVMAWrapper)); + if (pma == NULL) { + // TODO: handle error + return NULL; + } + pvmaw = (SVMAWrapper *)POINTER_SHIFT(pma, sizeof(*pma)); + + // No allocator used currently + if (pVnode->pBufPool->inuse == NULL) { + while (listNEles(&(pVnode->pBufPool->free)) == 0) { + // TODO: wait until all released ro kill query + // tsem_wait(); + ASSERT(0); + } + + pVnode->pBufPool->inuse = tdListPopHead(&(pVnode->pBufPool->free)); + pvma = (SVMemAllocator *)(pVnode->pBufPool->inuse->data); + T_REF_INIT_VAL(pvma, 1); + } else { + pvma = (SVMemAllocator *)(pVnode->pBufPool->inuse->data); + } + + T_REF_INC(pvma); + + pvmaw->pVnode = pVnode; + pvmaw->pNode = pVnode->pBufPool->inuse; + + pma->impl = pvmaw; + pma->malloc = NULL; + pma->calloc = NULL; /* TODO */ + pma->realloc = NULL; /* TODO */ + pma->free = NULL; /* TODO */ + pma->usage = NULL; /* TODO */ + + return pma; +} + +static void vBufPoolDestroyMA(SMemAllocatorFactory *pmaf, SMemAllocator *pma) { /* TODO */ + SVnode * pVnode = (SVnode *)(pmaf->impl); + SListNode * pNode = ((SVMAWrapper *)(pma->impl))->pNode; + SVMemAllocator *pvma = (SVMemAllocator *)(pNode->data); + + if (T_REF_DEC(pvma) == 0) { + if (pvma->type == E_V_ARENA_ALLOCATOR) { + SVArenaAllocator *pvaa = &(pvma->vaa); + while (pvaa->inuse != &(pvaa->node)) { + SVArenaNode *pNode = pvaa->inuse; + pvaa->inuse = pNode->prev; + /* code */ + } + + pvaa->inuse->ptr = pvaa->inuse->data; + } else if (pvma->type == E_V_HEAP_ALLOCATOR) { + } else { + ASSERT(0); + } + + // Move node from incycle to free + tdListAppendNode(&(pVnode->pBufPool->free), tdListPopNode(&(pVnode->pBufPool->incycle), pNode)); + // tsem_post(); todo: sem_post + } } \ No newline at end of file diff --git a/source/dnode/vnode/impl/src/vnodeOptions.c b/source/dnode/vnode/impl/src/vnodeCfg.c similarity index 65% rename from source/dnode/vnode/impl/src/vnodeOptions.c rename to source/dnode/vnode/impl/src/vnodeCfg.c index 5f519416b9..01facba888 100644 --- a/source/dnode/vnode/impl/src/vnodeOptions.c +++ b/source/dnode/vnode/impl/src/vnodeCfg.c @@ -15,20 +15,20 @@ #include "vnodeDef.h" -const SVnodeOptions defaultVnodeOptions = {0}; /* TODO */ +const SVnodeCfg defaultVnodeOptions = {0}; /* TODO */ -void vnodeOptionsInit(SVnodeOptions *pVnodeOptions) { /* TODO */ +void vnodeOptionsInit(SVnodeCfg *pVnodeOptions) { /* TODO */ vnodeOptionsCopy(pVnodeOptions, &defaultVnodeOptions); } -void vnodeOptionsClear(SVnodeOptions *pVnodeOptions) { /* TODO */ +void vnodeOptionsClear(SVnodeCfg *pVnodeOptions) { /* TODO */ } -int vnodeValidateOptions(const SVnodeOptions *pVnodeOptions) { +int vnodeValidateOptions(const SVnodeCfg *pVnodeOptions) { // TODO return 0; } -void vnodeOptionsCopy(SVnodeOptions *pDest, const SVnodeOptions *pSrc) { - memcpy((void *)pDest, (void *)pSrc, sizeof(SVnodeOptions)); +void vnodeOptionsCopy(SVnodeCfg *pDest, const SVnodeCfg *pSrc) { + memcpy((void *)pDest, (void *)pSrc, sizeof(SVnodeCfg)); } \ No newline at end of file diff --git a/source/dnode/vnode/impl/src/vnodeFileSystem.c b/source/dnode/vnode/impl/src/vnodeFS.c similarity index 100% rename from source/dnode/vnode/impl/src/vnodeFileSystem.c rename to source/dnode/vnode/impl/src/vnodeFS.c diff --git a/source/dnode/vnode/impl/src/vnodeMain.c b/source/dnode/vnode/impl/src/vnodeMain.c index 2746b729a6..ab33b58858 100644 --- a/source/dnode/vnode/impl/src/vnodeMain.c +++ b/source/dnode/vnode/impl/src/vnodeMain.c @@ -15,27 +15,40 @@ #include "vnodeDef.h" -static SVnode *vnodeNew(const char *path, const SVnodeOptions *pVnodeOptions); +static SVnode *vnodeNew(const char *path, const SVnodeCfg *pVnodeCfg); static void vnodeFree(SVnode *pVnode); static int vnodeOpenImpl(SVnode *pVnode); static void vnodeCloseImpl(SVnode *pVnode); -SVnode *vnodeOpen(const char *path, const SVnodeOptions *pVnodeOptions) { +int vnodeInit() { + // TODO + if (walInit() < 0) { + return -1; + } + + return 0; +} + +void vnodeClear() { + walCleanUp(); +} + +SVnode *vnodeOpen(const char *path, const SVnodeCfg *pVnodeCfg) { SVnode *pVnode = NULL; // Set default options - if (pVnodeOptions == NULL) { - pVnodeOptions = &defaultVnodeOptions; + if (pVnodeCfg == NULL) { + pVnodeCfg = &defaultVnodeOptions; } // Validate options - if (vnodeValidateOptions(pVnodeOptions) < 0) { + if (vnodeValidateOptions(pVnodeCfg) < 0) { // TODO return NULL; } // Create the handle - pVnode = vnodeNew(path, pVnodeOptions); + pVnode = vnodeNew(path, pVnodeCfg); if (pVnode == NULL) { // TODO: handle error return NULL; @@ -62,7 +75,7 @@ void vnodeClose(SVnode *pVnode) { void vnodeDestroy(const char *path) { taosRemoveDir(path); } /* ------------------------ STATIC METHODS ------------------------ */ -static SVnode *vnodeNew(const char *path, const SVnodeOptions *pVnodeOptions) { +static SVnode *vnodeNew(const char *path, const SVnodeCfg *pVnodeCfg) { SVnode *pVnode = NULL; pVnode = (SVnode *)calloc(1, sizeof(*pVnode)); @@ -72,7 +85,7 @@ static SVnode *vnodeNew(const char *path, const SVnodeOptions *pVnodeOptions) { } pVnode->path = strdup(path); - vnodeOptionsCopy(&(pVnode->options), pVnodeOptions); + vnodeOptionsCopy(&(pVnode->config), pVnodeCfg); return pVnode; } @@ -94,7 +107,7 @@ static int vnodeOpenImpl(SVnode *pVnode) { // Open meta sprintf(dir, "%s/meta", pVnode->path); - pVnode->pMeta = metaOpen(dir, &(pVnode->options.metaOptions)); + pVnode->pMeta = metaOpen(dir, &(pVnode->config.metaCfg)); if (pVnode->pMeta == NULL) { // TODO: handle error return -1; @@ -102,23 +115,23 @@ static int vnodeOpenImpl(SVnode *pVnode) { // Open tsdb sprintf(dir, "%s/tsdb", pVnode->path); - pVnode->pTsdb = tsdbOpen(dir, &(pVnode->options.tsdbOptions)); + pVnode->pTsdb = tsdbOpen(dir, &(pVnode->config.tsdbCfg)); if (pVnode->pTsdb == NULL) { // TODO: handle error return -1; } // TODO: Open TQ - sprintf(dir, "%s/wal", pVnode->path); - // pVnode->pTq = tqOpen(dir, NULL /* TODO */); - // if (pVnode->pTq == NULL) { - // // TODO: handle error - // return -1; - // } + sprintf(dir, "%s/tq", pVnode->path); + pVnode->pTq = tqOpen(dir, &(pVnode->config.tqCfg), NULL, NULL); + if (pVnode->pTq == NULL) { + // TODO: handle error + return -1; + } // Open WAL sprintf(dir, "%s/wal", pVnode->path); - pVnode->pWal = walOpen(dir, NULL /* TODO */); + pVnode->pWal = walOpen(dir, &(pVnode->config.walCfg)); if (pVnode->pWal == NULL) { // TODO: handle error return -1; diff --git a/source/dnode/vnode/impl/src/vnodeRequest.c b/source/dnode/vnode/impl/src/vnodeRequest.c index 6dea4a4e57..be5f5c890c 100644 --- a/source/dnode/vnode/impl/src/vnodeRequest.c +++ b/source/dnode/vnode/impl/src/vnodeRequest.c @@ -11,4 +11,105 @@ * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . - */ \ No newline at end of file + */ + +#include "vnodeDef.h" + +static int vnodeBuildCreateTableReq(void **buf, const SVCreateTableReq *pReq); +static void *vnodeParseCreateTableReq(void *buf, SVCreateTableReq *pReq); + +int vnodeBuildReq(void **buf, const SVnodeReq *pReq, uint8_t type) { + int tsize = 0; + + tsize += taosEncodeFixedU64(buf, pReq->ver); + switch (type) { + case TSDB_MSG_TYPE_CREATE_TABLE: + tsize += vnodeBuildCreateTableReq(buf, &(pReq->ctReq)); + /* code */ + break; + + default: + break; + } + /* TODO */ + return tsize; +} + +void *vnodeParseReq(void *buf, SVnodeReq *pReq, uint8_t type) { + buf = taosDecodeFixedU64(buf, &(pReq->ver)); + + switch (type) { + case TSDB_MSG_TYPE_CREATE_TABLE: + buf = vnodeParseCreateTableReq(buf, &(pReq->ctReq)); + break; + + default: + break; + } + + // TODO + return buf; +} + +static int vnodeBuildCreateTableReq(void **buf, const SVCreateTableReq *pReq) { + int tsize = 0; + + tsize += taosEncodeString(buf, pReq->name); + tsize += taosEncodeFixedU32(buf, pReq->ttl); + tsize += taosEncodeFixedU32(buf, pReq->keep); + tsize += taosEncodeFixedU8(buf, pReq->type); + + switch (pReq->type) { + case META_SUPER_TABLE: + tsize += taosEncodeFixedU64(buf, pReq->stbCfg.suid); + tsize += tdEncodeSchema(buf, pReq->stbCfg.pSchema); + tsize += tdEncodeSchema(buf, pReq->stbCfg.pTagSchema); + break; + case META_CHILD_TABLE: + tsize += taosEncodeFixedU64(buf, pReq->ctbCfg.suid); + tsize += tdEncodeKVRow(buf, pReq->ctbCfg.pTag); + break; + case META_NORMAL_TABLE: + tsize += tdEncodeSchema(buf, pReq->ntbCfg.pSchema); + break; + default: + break; + } + + return tsize; +} + +static void *vnodeParseCreateTableReq(void *buf, SVCreateTableReq *pReq) { + buf = taosDecodeString(buf, &(pReq->name)); + buf = taosDecodeFixedU32(buf, &(pReq->ttl)); + buf = taosDecodeFixedU32(buf, &(pReq->keep)); + buf = taosDecodeFixedU8(buf, &(pReq->type)); + + switch (pReq->type) { + case META_SUPER_TABLE: + buf = taosDecodeFixedU64(buf, &(pReq->stbCfg.suid)); + buf = tdDecodeSchema(buf, &(pReq->stbCfg.pSchema)); + buf = tdDecodeSchema(buf, &(pReq->stbCfg.pTagSchema)); + break; + case META_CHILD_TABLE: + buf = taosDecodeFixedU64(buf, &(pReq->ctbCfg.suid)); + buf = tdDecodeKVRow(buf, &(pReq->ctbCfg.pTag)); + break; + case META_NORMAL_TABLE: + buf = tdDecodeSchema(buf, &(pReq->ntbCfg.pSchema)); + break; + default: + break; + } + + return buf; +} + +int vnodeBuildDropTableReq(void **buf, const SVDropTableReq *pReq) { + // TODO + return 0; +} + +void *vnodeParseDropTableReq(void *buf, SVDropTableReq *pReq) { + // TODO +} \ No newline at end of file diff --git a/source/dnode/vnode/impl/src/vnodeWrite.c b/source/dnode/vnode/impl/src/vnodeWrite.c index 810ac570bc..cdeb894932 100644 --- a/source/dnode/vnode/impl/src/vnodeWrite.c +++ b/source/dnode/vnode/impl/src/vnodeWrite.c @@ -16,51 +16,84 @@ #include "vnodeDef.h" int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) { - /* TODO */ - return 0; -} + SRpcMsg * pMsg; + SVnodeReq *pVnodeReq; -int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { -#if 0 - int reqType; /* TODO */ - size_t reqSize; /* TODO */ - uint64_t reqVersion = 0; /* TODO */ - int code = 0; + for (int i = 0; i < taosArrayGetSize(pMsgs); i++) { + pMsg = *(SRpcMsg **)taosArrayGet(pMsgs, i); - // Copy the request to vnode buffer - void *pReq = mMalloc(pVnode->inuse, reqSize); - if (pReq == NULL) { - // TODO: handle error - } + // ser request version + void * pBuf = pMsg->pCont; + uint64_t ver = pVnode->state.processed++; + taosEncodeFixedU64(&pBuf, ver); - memcpy(pReq, pMsg, reqSize); - - // Push the request to TQ so consumers can consume - tqPushMsg(pVnode->pTq, pReq, 0); - - // Process the request - switch (reqType) { - case TSDB_MSG_TYPE_CREATE_TABLE: - code = metaCreateTable(pVnode->pMeta, NULL /* TODO */); - break; - case TSDB_MSG_TYPE_DROP_TABLE: - code = metaDropTable(pVnode->pMeta, 0 /* TODO */); - break; - case TSDB_MSG_TYPE_SUBMIT: - /* TODO */ - break; - default: - break; - } - - if (vnodeShouldCommit(pVnode)) { - if (vnodeAsyncCommit(pVnode) < 0) { + if (walWrite(pVnode->pWal, ver, pMsg->msgType, pMsg->pCont, pMsg->contLen) < 0) { // TODO: handle error } } - return code; -#endif + walFsync(pVnode->pWal, false); + + // Apply each request now + for (int i = 0; i < taosArrayGetSize(pMsgs); i++) { + pMsg = *(SRpcMsg **)taosArrayGet(pMsgs, i); + SVnodeReq vReq; + + // Apply the request + { + void *ptr = vnodeMalloc(pVnode, pMsg->contLen); + if (ptr == NULL) { + // TODO: handle error + } + + // TODO: copy here need to be extended + memcpy(ptr, pMsg->pCont, pMsg->contLen); + + // todo: change the interface here + uint64_t ver; + taosDecodeFixedU64(pMsg->pCont, &ver); + if (tqPushMsg(pVnode->pTq, ptr, ver) < 0) { + // TODO: handle error + } + + vnodeParseReq(pMsg->pCont, &vReq, pMsg->msgType); + + switch (pMsg->msgType) { + case TSDB_MSG_TYPE_CREATE_TABLE: + if (metaCreateTable(pVnode->pMeta, &(vReq.ctReq)) < 0) { + // TODO: handle error + } + + // TODO: maybe need to clear the requst struct + break; + case TSDB_MSG_TYPE_DROP_TABLE: + if (metaDropTable(pVnode->pMeta, vReq.dtReq.uid) < 0) { + // TODO: handle error + } + break; + case TSDB_MSG_TYPE_SUBMIT: + /* code */ + break; + default: + break; + } + + pVnode->state.applied = ver; + } + + // Check if it needs to commit + if (vnodeShouldCommit(pVnode)) { + if (vnodeAsyncCommit(pVnode) < 0) { + // TODO: handle error + } + } + } + + return 0; +} + +int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { + // TODO return 0; } diff --git a/source/dnode/vnode/impl/test/vnodeApiTests.cpp b/source/dnode/vnode/impl/test/vnodeApiTests.cpp index 65aa0f506c..493fe4448b 100644 --- a/source/dnode/vnode/impl/test/vnodeApiTests.cpp +++ b/source/dnode/vnode/impl/test/vnodeApiTests.cpp @@ -3,11 +3,173 @@ #include "vnode.h" +static STSchema *createBasicSchema() { + STSchemaBuilder sb; + STSchema * pSchema = NULL; + + tdInitTSchemaBuilder(&sb, 0); + + tdAddColToSchema(&sb, TSDB_DATA_TYPE_TIMESTAMP, 0, 0); + for (int i = 1; i < 10; i++) { + tdAddColToSchema(&sb, TSDB_DATA_TYPE_INT, i, 0); + } + + pSchema = tdGetSchemaFromBuilder(&sb); + + tdDestroyTSchemaBuilder(&sb); + + return pSchema; +} + +static STSchema *createBasicTagSchema() { + STSchemaBuilder sb; + STSchema * pSchema = NULL; + + tdInitTSchemaBuilder(&sb, 0); + + tdAddColToSchema(&sb, TSDB_DATA_TYPE_TIMESTAMP, 0, 0); + for (int i = 10; i < 12; i++) { + tdAddColToSchema(&sb, TSDB_DATA_TYPE_BINARY, i, 20); + } + + pSchema = tdGetSchemaFromBuilder(&sb); + + tdDestroyTSchemaBuilder(&sb); + + return pSchema; +} + +static SKVRow createBasicTag() { + SKVRowBuilder rb; + SKVRow pTag; + + tdInitKVRowBuilder(&rb); + + for (int i = 10; i < 12; i++) { + void *pVal = malloc(sizeof(VarDataLenT) + strlen("foo")); + varDataLen(pVal) = strlen("foo"); + memcpy(varDataVal(pVal), "foo", strlen("foo")); + + tdAddColToKVRow(&rb, i, TSDB_DATA_TYPE_BINARY, pVal); + free(pVal); + } + + pTag = tdGetKVRowFromBuilder(&rb); + tdDestroyKVRowBuilder(&rb); + + return pTag; +} + +#if 0 +TEST(vnodeApiTest, test_create_table_encode_and_decode_function) { + tb_uid_t suid = 1638166374163; + STSchema *pSchema = createBasicSchema(); + STSchema *pTagSchema = createBasicTagSchema(); + char tbname[128] = "st"; + char * buffer = new char[1024]; + void * pBuf = (void *)buffer; + SVnodeReq vCreateSTbReq = VNODE_INIT_CREATE_STB_REQ(tbname, UINT32_MAX, UINT32_MAX, suid, pSchema, pTagSchema); + + vnodeBuildReq(&pBuf, &vCreateSTbReq, TSDB_MSG_TYPE_CREATE_TABLE); + + SVnodeReq decoded_req; + + vnodeParseReq(buffer, &decoded_req, TSDB_MSG_TYPE_CREATE_TABLE); + + int k = 10; +} +#endif + TEST(vnodeApiTest, vnodeOpen_vnodeClose_test) { + GTEST_ASSERT_GE(vnodeInit(), 0); + // Create and open a vnode SVnode *pVnode = vnodeOpen("vnode1", NULL); ASSERT_NE(pVnode, nullptr); + tb_uid_t suid = 1638166374163; + { + // Create a super table + STSchema *pSchema = createBasicSchema(); + STSchema *pTagSchema = createBasicTagSchema(); + char tbname[128] = "st"; + + SArray * pMsgs = (SArray *)taosArrayInit(1, sizeof(SRpcMsg *)); + SVnodeReq vCreateSTbReq = VNODE_INIT_CREATE_STB_REQ(tbname, UINT32_MAX, UINT32_MAX, suid, pSchema, pTagSchema); + + int zs = vnodeBuildReq(NULL, &vCreateSTbReq, TSDB_MSG_TYPE_CREATE_TABLE); + SRpcMsg *pMsg = (SRpcMsg *)malloc(sizeof(SRpcMsg) + zs); + pMsg->msgType = TSDB_MSG_TYPE_CREATE_TABLE; + pMsg->contLen = zs; + pMsg->pCont = POINTER_SHIFT(pMsg, sizeof(SRpcMsg)); + + void *pBuf = pMsg->pCont; + + vnodeBuildReq(&pBuf, &vCreateSTbReq, TSDB_MSG_TYPE_CREATE_TABLE); + META_CLEAR_TB_CFG(&vCreateSTbReq); + + taosArrayPush(pMsgs, &(pMsg)); + + vnodeProcessWMsgs(pVnode, pMsgs); + + free(pMsg); + taosArrayDestroy(pMsgs); + tdFreeSchema(pSchema); + tdFreeSchema(pTagSchema); + } + + { + // Create some child tables + int ntables = 100000; + int batch = 10; + for (int i = 0; i < ntables / batch; i++) { + SArray *pMsgs = (SArray *)taosArrayInit(batch, sizeof(SRpcMsg *)); + for (int j = 0; j < batch; j++) { + SKVRow pTag = createBasicTag(); + char tbname[128]; + sprintf(tbname, "tb%d", i * batch + j); + SVnodeReq vCreateCTbReq = VNODE_INIT_CREATE_CTB_REQ(tbname, UINT32_MAX, UINT32_MAX, suid, pTag); + + int tz = vnodeBuildReq(NULL, &vCreateCTbReq, TSDB_MSG_TYPE_CREATE_TABLE); + SRpcMsg *pMsg = (SRpcMsg *)malloc(sizeof(SRpcMsg) + tz); + pMsg->msgType = TSDB_MSG_TYPE_CREATE_TABLE; + pMsg->contLen = tz; + pMsg->pCont = POINTER_SHIFT(pMsg, sizeof(*pMsg)); + void *pBuf = pMsg->pCont; + + vnodeBuildReq(&pBuf, &vCreateCTbReq, TSDB_MSG_TYPE_CREATE_TABLE); + META_CLEAR_TB_CFG(&vCreateCTbReq); + free(pTag); + + taosArrayPush(pMsgs, &(pMsg)); + } + + vnodeProcessWMsgs(pVnode, pMsgs); + + for (int j = 0; j < batch; j++) { + SRpcMsg *pMsg = *(SRpcMsg **)taosArrayPop(pMsgs); + free(pMsg); + } + + taosArrayDestroy(pMsgs); + + // std::cout << "the " << i << "th batch is created" << std::endl; + } + } + // Close the vnode vnodeClose(pVnode); + + vnodeClear(); +} + +TEST(vnodeApiTest, DISABLED_vnode_process_create_table) { + STSchema * pSchema = NULL; + STSchema * pTagSchema = NULL; + char stname[15]; + SVCreateTableReq pReq = META_INIT_STB_CFG(stname, UINT32_MAX, UINT32_MAX, 0, pSchema, pTagSchema); + + int k = 10; + + META_CLEAR_TB_CFG(pReq); } diff --git a/source/dnode/vnode/meta/inc/metaOptions.h b/source/dnode/vnode/meta/inc/metaCfg.h similarity index 74% rename from source/dnode/vnode/meta/inc/metaOptions.h rename to source/dnode/vnode/meta/inc/metaCfg.h index 500f2d5e59..5c72ffa680 100644 --- a/source/dnode/vnode/meta/inc/metaOptions.h +++ b/source/dnode/vnode/meta/inc/metaCfg.h @@ -13,8 +13,8 @@ * along with this program. If not, see . */ -#ifndef _TD_META_OPTIONS_H_ -#define _TD_META_OPTIONS_H_ +#ifndef _TD_META_CFG_H_ +#define _TD_META_CFG_H_ #include "meta.h" @@ -22,13 +22,13 @@ extern "C" { #endif -extern const SMetaOptions defaultMetaOptions; +extern const SMetaCfg defaultMetaOptions; -int metaValidateOptions(const SMetaOptions *); -void metaOptionsCopy(SMetaOptions *pDest, const SMetaOptions *pSrc); +int metaValidateOptions(const SMetaCfg *); +void metaOptionsCopy(SMetaCfg *pDest, const SMetaCfg *pSrc); #ifdef __cplusplus } #endif -#endif /*_TD_META_OPTIONS_H_*/ \ No newline at end of file +#endif /*_TD_META_CFG_H_*/ \ No newline at end of file diff --git a/source/dnode/vnode/meta/inc/metaDB.h b/source/dnode/vnode/meta/inc/metaDB.h index b1531d2fd7..f758af0673 100644 --- a/source/dnode/vnode/meta/inc/metaDB.h +++ b/source/dnode/vnode/meta/inc/metaDB.h @@ -34,7 +34,7 @@ typedef struct { int metaOpenDB(SMeta *pMeta); void metaCloseDB(SMeta *pMeta); -int metaSaveTableToDB(SMeta *pMeta, const STbOptions *pTbOptions); +int metaSaveTableToDB(SMeta *pMeta, const STbCfg *pTbOptions); int metaRemoveTableFromDb(SMeta *pMeta, tb_uid_t uid); #ifdef __cplusplus diff --git a/source/dnode/vnode/meta/inc/metaDef.h b/source/dnode/vnode/meta/inc/metaDef.h index b0d31de1b4..fd14efd50b 100644 --- a/source/dnode/vnode/meta/inc/metaDef.h +++ b/source/dnode/vnode/meta/inc/metaDef.h @@ -20,10 +20,10 @@ #include "meta.h" #include "metaCache.h" +#include "metaCfg.h" #include "metaDB.h" #include "metaIdx.h" -#include "metaOptions.h" -#include "metaTbOptions.h" +#include "metaTbCfg.h" #include "metaTbTag.h" #include "metaTbUid.h" @@ -33,7 +33,7 @@ extern "C" { struct SMeta { char* path; - SMetaOptions options; + SMetaCfg options; meta_db_t* pDB; meta_index_t* pIdx; meta_cache_t* pCache; diff --git a/source/dnode/vnode/meta/inc/metaIdx.h b/source/dnode/vnode/meta/inc/metaIdx.h index d43df9afc3..28d58cb4f1 100644 --- a/source/dnode/vnode/meta/inc/metaIdx.h +++ b/source/dnode/vnode/meta/inc/metaIdx.h @@ -28,7 +28,7 @@ typedef rocksdb_t meta_index_t; int metaOpenIdx(SMeta *pMeta); void metaCloseIdx(SMeta *pMeta); -int metaSaveTableToIdx(SMeta *pMeta, const STbOptions *pTbOptions); +int metaSaveTableToIdx(SMeta *pMeta, const STbCfg *pTbOptions); int metaRemoveTableFromIdx(SMeta *pMeta, tb_uid_t uid); #ifdef __cplusplus diff --git a/source/dnode/vnode/meta/inc/metaTbOptions.h b/source/dnode/vnode/meta/inc/metaTbCfg.h similarity index 69% rename from source/dnode/vnode/meta/inc/metaTbOptions.h rename to source/dnode/vnode/meta/inc/metaTbCfg.h index b0fbd3a463..68c609d6b4 100644 --- a/source/dnode/vnode/meta/inc/metaTbOptions.h +++ b/source/dnode/vnode/meta/inc/metaTbCfg.h @@ -13,8 +13,8 @@ * along with this program. If not, see . */ -#ifndef _TD_META_TABLE_OPTIONS_H_ -#define _TD_META_TABLE_OPTIONS_H_ +#ifndef _TD_META_TABLE_CFG_H_ +#define _TD_META_TABLE_CFG_H_ #include "meta.h" @@ -22,11 +22,15 @@ extern "C" { #endif -int metaValidateTbOptions(SMeta *pMeta, const STbOptions *); -size_t metaEncodeTbObjFromTbOptions(const STbOptions *, void *pBuf, size_t bsize); +#define META_SUPER_TABLE 0 +#define META_CHILD_TABLE 1 +#define META_NORMAL_TABLE 2 + +int metaValidateTbOptions(SMeta *pMeta, const STbCfg *); +size_t metaEncodeTbObjFromTbOptions(const STbCfg *, void *pBuf, size_t bsize); #ifdef __cplusplus } #endif -#endif /*_TD_META_TABLE_OPTIONS_H_*/ \ No newline at end of file +#endif /*_TD_META_TABLE_CFG_H_*/ \ No newline at end of file diff --git a/source/dnode/vnode/meta/src/metaCache.c b/source/dnode/vnode/meta/src/metaCache.c index 9166f1724a..aaa97caea0 100644 --- a/source/dnode/vnode/meta/src/metaCache.c +++ b/source/dnode/vnode/meta/src/metaCache.c @@ -18,8 +18,8 @@ int metaOpenCache(SMeta *pMeta) { // TODO - if (pMeta->options.lruCacheSize) { - pMeta->pCache = rocksdb_cache_create_lru(pMeta->options.lruCacheSize); + if (pMeta->options.lruSize) { + pMeta->pCache = rocksdb_cache_create_lru(pMeta->options.lruSize); if (pMeta->pCache == NULL) { // TODO: handle error return -1; diff --git a/source/dnode/vnode/meta/src/metaOptions.c b/source/dnode/vnode/meta/src/metaCfg.c similarity index 68% rename from source/dnode/vnode/meta/src/metaOptions.c rename to source/dnode/vnode/meta/src/metaCfg.c index f92cd73cae..cbaac1c409 100644 --- a/source/dnode/vnode/meta/src/metaOptions.c +++ b/source/dnode/vnode/meta/src/metaCfg.c @@ -15,20 +15,20 @@ #include "metaDef.h" -const SMetaOptions defaultMetaOptions = {.lruCacheSize = 0}; +const SMetaCfg defaultMetaOptions = {.lruSize = 0}; /* ------------------------ EXPOSED METHODS ------------------------ */ -void metaOptionsInit(SMetaOptions *pMetaOptions) { metaOptionsCopy(pMetaOptions, &defaultMetaOptions); } +void metaOptionsInit(SMetaCfg *pMetaOptions) { metaOptionsCopy(pMetaOptions, &defaultMetaOptions); } -void metaOptionsClear(SMetaOptions *pMetaOptions) { +void metaOptionsClear(SMetaCfg *pMetaOptions) { // TODO } -int metaValidateOptions(const SMetaOptions *pMetaOptions) { +int metaValidateOptions(const SMetaCfg *pMetaOptions) { // TODO return 0; } -void metaOptionsCopy(SMetaOptions *pDest, const SMetaOptions *pSrc) { memcpy(pDest, pSrc, sizeof(*pSrc)); } +void metaOptionsCopy(SMetaCfg *pDest, const SMetaCfg *pSrc) { memcpy(pDest, pSrc, sizeof(*pSrc)); } /* ------------------------ STATIC METHODS ------------------------ */ \ No newline at end of file diff --git a/source/dnode/vnode/meta/src/metaDB.c b/source/dnode/vnode/meta/src/metaDB.c index 8865678508..d1fb65d2ed 100644 --- a/source/dnode/vnode/meta/src/metaDB.c +++ b/source/dnode/vnode/meta/src/metaDB.c @@ -92,7 +92,7 @@ void metaCloseDB(SMeta *pMeta) { } } -int metaSaveTableToDB(SMeta *pMeta, const STbOptions *pTbOptions) { +int metaSaveTableToDB(SMeta *pMeta, const STbCfg *pTbOptions) { tb_uid_t uid; char * err = NULL; size_t size; @@ -102,13 +102,14 @@ int metaSaveTableToDB(SMeta *pMeta, const STbOptions *pTbOptions) { // Generate a uid for child and normal table if (pTbOptions->type == META_SUPER_TABLE) { - uid = pTbOptions->stbOptions.uid; + uid = pTbOptions->stbCfg.suid; } else { uid = metaGenerateUid(pMeta); } // Save tbname -> uid to tbnameDB rocksdb_put(pMeta->pDB->nameDb, wopt, pTbOptions->name, strlen(pTbOptions->name), (char *)(&uid), sizeof(uid), &err); + rocksdb_writeoptions_disable_WAL(wopt, 1); // Save uid -> tb_obj to tbDB size = metaEncodeTbObjFromTbOptions(pTbOptions, pBuf, 1024); @@ -117,22 +118,22 @@ int metaSaveTableToDB(SMeta *pMeta, const STbOptions *pTbOptions) { switch (pTbOptions->type) { case META_NORMAL_TABLE: // save schemaDB - metaSaveSchemaDB(pMeta, uid, pTbOptions->ntbOptions.pSchame); + metaSaveSchemaDB(pMeta, uid, pTbOptions->ntbCfg.pSchema); break; case META_SUPER_TABLE: // save schemaDB - metaSaveSchemaDB(pMeta, uid, pTbOptions->stbOptions.pSchema); + metaSaveSchemaDB(pMeta, uid, pTbOptions->stbCfg.pSchema); // save mapDB (really need?) rocksdb_put(pMeta->pDB->mapDb, wopt, (char *)(&uid), sizeof(uid), "", 0, &err); break; case META_CHILD_TABLE: // save tagDB - rocksdb_put(pMeta->pDB->tagDb, wopt, (char *)(&uid), sizeof(uid), pTbOptions->ctbOptions.tags, - kvRowLen(pTbOptions->ctbOptions.tags), &err); + rocksdb_put(pMeta->pDB->tagDb, wopt, (char *)(&uid), sizeof(uid), pTbOptions->ctbCfg.pTag, + kvRowLen(pTbOptions->ctbCfg.pTag), &err); // save mapDB - metaSaveMapDB(pMeta, pTbOptions->ctbOptions.suid, uid); + metaSaveMapDB(pMeta, pTbOptions->ctbCfg.suid, uid); break; default: ASSERT(0); @@ -157,6 +158,7 @@ static void metaSaveSchemaDB(SMeta *pMeta, tb_uid_t uid, STSchema *pSchema) { char * err = NULL; rocksdb_writeoptions_t *wopt = rocksdb_writeoptions_create(); + rocksdb_writeoptions_disable_WAL(wopt, 1); metaGetSchemaDBKey(key, uid, schemaVersion(pSchema)); vsize = tdEncodeSchema((void **)(&ppBuf), pSchema); @@ -190,10 +192,12 @@ static int metaSaveMapDB(SMeta *pMeta, tb_uid_t suid, tb_uid_t uid) { memcpy(POINTER_SHIFT(nval, vlen), (void *)(&uid), sizeof(uid)); rocksdb_writeoptions_t *wopt = rocksdb_writeoptions_create(); + rocksdb_writeoptions_disable_WAL(wopt, 1); rocksdb_put(pMeta->pDB->mapDb, wopt, (char *)(&suid), sizeof(suid), nval, vlen + sizeof(uid), &err); rocksdb_writeoptions_destroy(wopt); + free(nval); return 0; } \ No newline at end of file diff --git a/source/dnode/vnode/meta/src/metaIdx.c b/source/dnode/vnode/meta/src/metaIdx.c index 54cc8bd461..a3c0d3540e 100644 --- a/source/dnode/vnode/meta/src/metaIdx.c +++ b/source/dnode/vnode/meta/src/metaIdx.c @@ -47,7 +47,12 @@ void metaCloseIdx(SMeta *pMeta) { /* TODO */ } } -int metaSaveTableToIdx(SMeta *pMeta, const STbOptions *pTbOptions) { +int metaSaveTableToIdx(SMeta *pMeta, const STbCfg *pTbOptions) { + // TODO + return 0; +} + +int metaRemoveTableFromIdx(SMeta *pMeta, tb_uid_t uid) { // TODO return 0; } \ No newline at end of file diff --git a/source/dnode/vnode/meta/src/metaMain.c b/source/dnode/vnode/meta/src/metaMain.c index 000b10a126..a936002328 100644 --- a/source/dnode/vnode/meta/src/metaMain.c +++ b/source/dnode/vnode/meta/src/metaMain.c @@ -15,17 +15,14 @@ #include "tcoding.h" -#include "meta.h" -#include "metaDB.h" #include "metaDef.h" -#include "metaOptions.h" -static SMeta *metaNew(const char *path, const SMetaOptions *pMetaOptions); +static SMeta *metaNew(const char *path, const SMetaCfg *pMetaOptions); static void metaFree(SMeta *pMeta); static int metaOpenImpl(SMeta *pMeta); static void metaCloseImpl(SMeta *pMeta); -SMeta *metaOpen(const char *path, const SMetaOptions *pMetaOptions) { +SMeta *metaOpen(const char *path, const SMetaCfg *pMetaOptions) { SMeta *pMeta = NULL; // Set default options @@ -68,7 +65,7 @@ void metaClose(SMeta *pMeta) { void metaRemove(const char *path) { taosRemoveDir(path); } /* ------------------------ STATIC METHODS ------------------------ */ -static SMeta *metaNew(const char *path, const SMetaOptions *pMetaOptions) { +static SMeta *metaNew(const char *path, const SMetaCfg *pMetaOptions) { SMeta *pMeta; size_t psize = strlen(path); diff --git a/source/dnode/vnode/meta/src/metaTable.c b/source/dnode/vnode/meta/src/metaTable.c index d4a1ad3e38..fc0f19302f 100644 --- a/source/dnode/vnode/meta/src/metaTable.c +++ b/source/dnode/vnode/meta/src/metaTable.c @@ -15,21 +15,21 @@ #include "metaDef.h" -int metaCreateTable(SMeta *pMeta, const STbOptions *pTbOptions) { +int metaCreateTable(SMeta *pMeta, STbCfg *pTbCfg) { // Validate the tbOptions - if (metaValidateTbOptions(pMeta, pTbOptions) < 0) { + if (metaValidateTbOptions(pMeta, pTbCfg) < 0) { // TODO: handle error return -1; } // TODO: add atomicity - if (metaSaveTableToDB(pMeta, pTbOptions) < 0) { + if (metaSaveTableToDB(pMeta, pTbCfg) < 0) { // TODO: handle error return -1; } - if (metaSaveTableToIdx(pMeta, pTbOptions) < 0) { + if (metaSaveTableToIdx(pMeta, pTbCfg) < 0) { // TODO: handle error return -1; } diff --git a/source/dnode/vnode/meta/src/metaTbOptions.c b/source/dnode/vnode/meta/src/metaTbCfg.c similarity index 70% rename from source/dnode/vnode/meta/src/metaTbOptions.c rename to source/dnode/vnode/meta/src/metaTbCfg.c index 9bf9607df7..8485d82a8d 100644 --- a/source/dnode/vnode/meta/src/metaTbOptions.c +++ b/source/dnode/vnode/meta/src/metaTbCfg.c @@ -16,12 +16,12 @@ #include "metaDef.h" #include "tcoding.h" -int metaValidateTbOptions(SMeta *pMeta, const STbOptions *pTbOptions) { +int metaValidateTbOptions(SMeta *pMeta, const STbCfg *pTbOptions) { // TODO return 0; } -size_t metaEncodeTbObjFromTbOptions(const STbOptions *pTbOptions, void *pBuf, size_t bsize) { +size_t metaEncodeTbObjFromTbOptions(const STbCfg *pTbOptions, void *pBuf, size_t bsize) { void **ppBuf = &pBuf; int tlen = 0; @@ -31,12 +31,12 @@ size_t metaEncodeTbObjFromTbOptions(const STbOptions *pTbOptions, void *pBuf, si switch (pTbOptions->type) { case META_SUPER_TABLE: - tlen += taosEncodeFixedU64(ppBuf, pTbOptions->stbOptions.uid); - tlen += tdEncodeSchema(ppBuf, pTbOptions->stbOptions.pTagSchema); + tlen += taosEncodeFixedU64(ppBuf, pTbOptions->stbCfg.suid); + tlen += tdEncodeSchema(ppBuf, pTbOptions->stbCfg.pTagSchema); // TODO: encode schema version array break; case META_CHILD_TABLE: - tlen += taosEncodeFixedU64(ppBuf, pTbOptions->ctbOptions.suid); + tlen += taosEncodeFixedU64(ppBuf, pTbOptions->ctbCfg.suid); break; case META_NORMAL_TABLE: // TODO: encode schema version array @@ -46,4 +46,13 @@ size_t metaEncodeTbObjFromTbOptions(const STbOptions *pTbOptions, void *pBuf, si } return tlen; +} + +int metaEncodeTbCfg(void **pBuf, STbCfg *pTbCfg) { + // TODO + return 0; +} + +void *metaDecodeTbCfg(void *pBuf, STbCfg **pTbCfg) { + // TODO } \ No newline at end of file diff --git a/source/dnode/vnode/tq/src/tq.c b/source/dnode/vnode/tq/src/tq.c index c010042b8c..c1a46e567b 100644 --- a/source/dnode/vnode/tq/src/tq.c +++ b/source/dnode/vnode/tq/src/tq.c @@ -40,17 +40,17 @@ void* tqSerializeBufItem(TqBufferItem* bufItem, void* ptr); const void* tqDeserializeBufHandle(const void* pBytes, TqBufferHandle* bufHandle); const void* tqDeserializeBufItem(const void* pBytes, TqBufferItem* bufItem); -STQ* tqOpen(const char* path, TqConfig* tqConfig, TqLogReader* tqLogReader, SMemAllocatorFactory *allocFac) { +STQ* tqOpen(const char* path, STqCfg* tqConfig, TqLogReader* tqLogReader, SMemAllocatorFactory *allocFac) { STQ* pTq = malloc(sizeof(STQ)); if(pTq == NULL) { //TODO: memory error return NULL; } - strcpy(pTq->path, path); + pTq->path = strdup(path); pTq->tqConfig = tqConfig; pTq->tqLogReader = tqLogReader; - pTq->tqMemRef.pAlloctorFactory = allocFac; - pTq->tqMemRef.pAllocator = allocFac->create(); + // pTq->tqMemRef.pAlloctorFactory = allocFac; + // pTq->tqMemRef.pAllocator = allocFac->create(allocFac); if(pTq->tqMemRef.pAllocator == NULL) { //TODO } diff --git a/source/dnode/vnode/tsdb/inc/tsdbDef.h b/source/dnode/vnode/tsdb/inc/tsdbDef.h index ca3d0319c1..0a57f5f670 100644 --- a/source/dnode/vnode/tsdb/inc/tsdbDef.h +++ b/source/dnode/vnode/tsdb/inc/tsdbDef.h @@ -28,7 +28,7 @@ extern "C" { struct STsdb { char * path; - STsdbOptions options; + STsdbCfg options; SMemAllocatorFactory *pmaf; }; diff --git a/source/dnode/vnode/tsdb/inc/tsdbOptions.h b/source/dnode/vnode/tsdb/inc/tsdbOptions.h index ffd409099a..46607ea2fe 100644 --- a/source/dnode/vnode/tsdb/inc/tsdbOptions.h +++ b/source/dnode/vnode/tsdb/inc/tsdbOptions.h @@ -20,10 +20,10 @@ extern "C" { #endif -extern const STsdbOptions defautlTsdbOptions; +extern const STsdbCfg defautlTsdbOptions; -int tsdbValidateOptions(const STsdbOptions *); -void tsdbOptionsCopy(STsdbOptions *pDest, const STsdbOptions *pSrc); +int tsdbValidateOptions(const STsdbCfg *); +void tsdbOptionsCopy(STsdbCfg *pDest, const STsdbCfg *pSrc); #ifdef __cplusplus } diff --git a/source/dnode/vnode/tsdb/src/tsdbMain.c b/source/dnode/vnode/tsdb/src/tsdbMain.c index 10b6c2aa65..2fe7a61930 100644 --- a/source/dnode/vnode/tsdb/src/tsdbMain.c +++ b/source/dnode/vnode/tsdb/src/tsdbMain.c @@ -15,12 +15,12 @@ #include "tsdbDef.h" -static STsdb *tsdbNew(const char *path, const STsdbOptions *pTsdbOptions); +static STsdb *tsdbNew(const char *path, const STsdbCfg *pTsdbOptions); static void tsdbFree(STsdb *pTsdb); static int tsdbOpenImpl(STsdb *pTsdb); static void tsdbCloseImpl(STsdb *pTsdb); -STsdb *tsdbOpen(const char *path, const STsdbOptions *pTsdbOptions) { +STsdb *tsdbOpen(const char *path, const STsdbCfg *pTsdbOptions) { STsdb *pTsdb = NULL; // Set default TSDB Options @@ -62,7 +62,7 @@ void tsdbClose(STsdb *pTsdb) { void tsdbRemove(const char *path) { taosRemoveDir(path); } /* ------------------------ STATIC METHODS ------------------------ */ -static STsdb *tsdbNew(const char *path, const STsdbOptions *pTsdbOptions) { +static STsdb *tsdbNew(const char *path, const STsdbCfg *pTsdbOptions) { STsdb *pTsdb = NULL; pTsdb = (STsdb *)calloc(1, sizeof(STsdb)); diff --git a/source/dnode/vnode/tsdb/src/tsdbOptions.c b/source/dnode/vnode/tsdb/src/tsdbOptions.c index 3a1102f048..1c2b3c640a 100644 --- a/source/dnode/vnode/tsdb/src/tsdbOptions.c +++ b/source/dnode/vnode/tsdb/src/tsdbOptions.c @@ -15,20 +15,20 @@ #include "tsdbDef.h" -const STsdbOptions defautlTsdbOptions = {.lruCacheSize = 0}; +const STsdbCfg defautlTsdbOptions = {.lruCacheSize = 0}; -int tsdbOptionsInit(STsdbOptions *pTsdbOptions) { +int tsdbOptionsInit(STsdbCfg *pTsdbOptions) { // TODO return 0; } -void tsdbOptionsClear(STsdbOptions *pTsdbOptions) { +void tsdbOptionsClear(STsdbCfg *pTsdbOptions) { // TODO } -int tsdbValidateOptions(const STsdbOptions *pTsdbOptions) { +int tsdbValidateOptions(const STsdbCfg *pTsdbOptions) { // TODO return 0; } -void tsdbOptionsCopy(STsdbOptions *pDest, const STsdbOptions *pSrc) { memcpy(pDest, pSrc, sizeof(STsdbOptions)); } \ No newline at end of file +void tsdbOptionsCopy(STsdbCfg *pDest, const STsdbCfg *pSrc) { memcpy(pDest, pSrc, sizeof(STsdbCfg)); } \ No newline at end of file