diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json index b258a2a252..cb05c84d72 100644 --- a/.devcontainer/devcontainer.json +++ b/.devcontainer/devcontainer.json @@ -26,7 +26,8 @@ "eamodio.gitlens", "matepek.vscode-catch2-test-adapter", "spmeesseman.vscode-taskexplorer", - "cschlosser.doxdocgen" + "cschlosser.doxdocgen", + "urosvujosevic.explorer-manager" ], // Use 'forwardPorts' to make a list of ports inside the container available locally. // "forwardPorts": [], diff --git a/include/libs/planner/planner.h b/include/libs/planner/planner.h index 4f1d98d1e3..f069b68286 100644 --- a/include/libs/planner/planner.h +++ b/include/libs/planner/planner.h @@ -120,6 +120,25 @@ typedef struct SExchangePhyNode { SArray *pSrcEndPoints; // SEpAddr, scheduler fill by calling qSetSuplanExecutionNode } SExchangePhyNode; +typedef enum EAggAlgo { + AGG_ALGO_PLAIN = 1, // simple agg across all input rows + AGG_ALGO_SORTED, // grouped agg, input must be sorted + AGG_ALGO_HASHED // grouped agg, use internal hashtable +} EAggAlgo; + +typedef enum EAggSplit { + AGG_SPLIT_PRE = 1, // first level agg, maybe don't need calculate the final result + AGG_SPLIT_FINAL // second level agg, must calculate the final result +} EAggSplit; + +typedef struct SAggPhyNode { + SPhyNode node; + EAggAlgo aggAlgo; // algorithm used by agg operator + EAggSplit aggSplit; // distributed splitting mode + SArray *pExprs; // SExprInfo list, these are expression list of group_by_clause and parameter expression of aggregate function + SArray *pGroupByList; // SColIndex list, but these must be column node +} SAggPhyNode; + typedef struct SSubplanId { uint64_t queryId; uint64_t templateId; diff --git a/include/libs/planner/plannerOp.h b/include/libs/planner/plannerOp.h index 41d6e028cf..5cc896f1c2 100644 --- a/include/libs/planner/plannerOp.h +++ b/include/libs/planner/plannerOp.h @@ -30,7 +30,7 @@ OP_ENUM_MACRO(TagScan) OP_ENUM_MACRO(SystemTableScan) OP_ENUM_MACRO(Aggregate) OP_ENUM_MACRO(Project) -OP_ENUM_MACRO(Groupby) +// OP_ENUM_MACRO(Groupby) OP_ENUM_MACRO(Limit) OP_ENUM_MACRO(SLimit) OP_ENUM_MACRO(TimeWindow) diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 7ff02309ec..d8de94b204 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -35,28 +35,26 @@ typedef struct SDnode SDnode; typedef int32_t (*PutReqToVQueryQFp)(SDnode *pDnode, struct SRpcMsg *pReq); typedef struct SVnodeCfg { - int32_t vgId; - SDnode *pDnode; - struct { - uint64_t wsize; - uint64_t ssize; - uint64_t lsize; - bool isHeapAllocator; - }; + int32_t vgId; + SDnode * pDnode; + uint64_t wsize; + uint64_t ssize; + uint64_t lsize; + bool isHeapAllocator; uint32_t ttl; uint32_t keep; - bool isWeak; + bool isWeak; STsdbCfg tsdbCfg; SMetaCfg metaCfg; - STqCfg tqCfg; - SWalCfg walCfg; + STqCfg tqCfg; + SWalCfg walCfg; } SVnodeCfg; typedef struct { int32_t sver; - char *timezone; - char *locale; - char *charset; + char * timezone; + char * locale; + char * charset; uint16_t nthreads; // number of commit threads. 0 for no threads and a schedule queue should be given (TODO) PutReqToVQueryQFp putReqToVQueryQFp; } SVnodeOpt; diff --git a/source/dnode/vnode/src/inc/vnodeDef.h b/source/dnode/vnode/src/inc/vnodeDef.h index 85563890fa..1333c9dce7 100644 --- a/source/dnode/vnode/src/inc/vnodeDef.h +++ b/source/dnode/vnode/src/inc/vnodeDef.h @@ -82,12 +82,12 @@ int32_t vnodePutReqToVQueryQ(SVnode *pVnode, struct SRpcMsg *pReq); // For Log extern int32_t vDebugFlag; -#define vFatal(...) do { if (vDebugFlag & DEBUG_FATAL) { taosPrintLog("TDB FATAL ", 255, __VA_ARGS__); }} while(0) -#define vError(...) do { if (vDebugFlag & DEBUG_ERROR) { taosPrintLog("TDB ERROR ", 255, __VA_ARGS__); }} while(0) -#define vWarn(...) do { if (vDebugFlag & DEBUG_WARN) { taosPrintLog("TDB WARN ", 255, __VA_ARGS__); }} while(0) -#define vInfo(...) do { if (vDebugFlag & DEBUG_INFO) { taosPrintLog("TDB ", 255, __VA_ARGS__); }} while(0) -#define vDebug(...) do { if (vDebugFlag & DEBUG_DEBUG) { taosPrintLog("TDB ", tsdbDebugFlag, __VA_ARGS__); }} while(0) -#define vTrace(...) do { if (vDebugFlag & DEBUG_TRACE) { taosPrintLog("TDB ", tsdbDebugFlag, __VA_ARGS__); }} while(0) +#define vFatal(...) do { if (vDebugFlag & DEBUG_FATAL) { taosPrintLog("VND FATAL ", 255, __VA_ARGS__); }} while(0) +#define vError(...) do { if (vDebugFlag & DEBUG_ERROR) { taosPrintLog("VND ERROR ", 255, __VA_ARGS__); }} while(0) +#define vWarn(...) do { if (vDebugFlag & DEBUG_WARN) { taosPrintLog("VND WARN ", 255, __VA_ARGS__); }} while(0) +#define vInfo(...) do { if (vDebugFlag & DEBUG_INFO) { taosPrintLog("VND ", 255, __VA_ARGS__); }} while(0) +#define vDebug(...) do { if (vDebugFlag & DEBUG_DEBUG) { taosPrintLog("VND ", tsdbDebugFlag, __VA_ARGS__); }} while(0) +#define vTrace(...) do { if (vDebugFlag & DEBUG_TRACE) { taosPrintLog("VND ", tsdbDebugFlag, __VA_ARGS__); }} while(0) #ifdef __cplusplus } diff --git a/source/dnode/vnode/src/vnd/vnodeWrite.c b/source/dnode/vnode/src/vnd/vnodeWrite.c index 01619b5c77..472f8e0fe0 100644 --- a/source/dnode/vnode/src/vnd/vnodeWrite.c +++ b/source/dnode/vnode/src/vnd/vnodeWrite.c @@ -116,7 +116,7 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { // Check if it needs to commit if (vnodeShouldCommit(pVnode)) { - tsem_wait(&(pVnode->canCommit)); + // tsem_wait(&(pVnode->canCommit)); if (vnodeAsyncCommit(pVnode) < 0) { // TODO: handle error } diff --git a/source/libs/planner/src/physicalPlan.c b/source/libs/planner/src/physicalPlan.c index 7371ec6cfa..422233eed7 100644 --- a/source/libs/planner/src/physicalPlan.c +++ b/source/libs/planner/src/physicalPlan.c @@ -295,13 +295,31 @@ static SPhyNode* createSingleTableScanNode(SQueryPlanNode* pPlanNode, SQueryTabl static SPhyNode* createTableScanNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) { SQueryTableInfo* pTable = (SQueryTableInfo*)pPlanNode->pExtInfo; - if (needMultiNodeScan(pTable)) { return createExchangeNode(pCxt, pPlanNode, splitSubplanByTable(pCxt, pPlanNode, pTable)); } return createSingleTableScanNode(pPlanNode, pTable, pCxt->pCurrentSubplan); } +static SPhyNode* createSingleTableAgg(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) { + SAggPhyNode* node = (SAggPhyNode*)initPhyNode(pPlanNode, OP_Aggregate, sizeof(SAggPhyNode)); + SGroupbyExpr* pGroupBy = (SGroupbyExpr*)pPlanNode->pExtInfo; + node->aggAlgo = AGG_ALGO_PLAIN; + node->aggSplit = AGG_SPLIT_FINAL; + if (NULL != pGroupBy) { + node->aggAlgo = AGG_ALGO_HASHED; + node->pGroupByList = validPointer(taosArrayDup(pGroupBy->columnInfo)); + } + return (SPhyNode*)node; +} + +static SPhyNode* createAggNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) { + // if (needMultiNodeAgg(pPlanNode)) { + + // } + return createSingleTableAgg(pCxt, pPlanNode); +} + static SPhyNode* createPhyNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) { SPhyNode* node = NULL; switch (pPlanNode->info.type) { @@ -311,6 +329,10 @@ static SPhyNode* createPhyNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) { case QNODE_TABLESCAN: node = createTableScanNode(pCxt, pPlanNode); break; + case QNODE_AGGREGATE: + case QNODE_GROUPBY: + node = createAggNode(pCxt, pPlanNode); + break; case QNODE_MODIFY: // Insert is not an operator in a physical plan. break; diff --git a/source/libs/planner/src/physicalPlanJson.c b/source/libs/planner/src/physicalPlanJson.c index e249ca4066..2e553fe4e6 100644 --- a/source/libs/planner/src/physicalPlanJson.c +++ b/source/libs/planner/src/physicalPlanJson.c @@ -20,6 +20,19 @@ typedef bool (*FToJson)(const void* obj, cJSON* json); typedef bool (*FFromJson)(const cJSON* json, void* obj); +static char* getString(const cJSON* json, const char* name) { + char* p = cJSON_GetStringValue(cJSON_GetObjectItem(json, name)); + return strdup(p); +} + +static void copyString(const cJSON* json, const char* name, char* dst) { + strcpy(dst, cJSON_GetStringValue(cJSON_GetObjectItem(json, name))); +} + +static int64_t getNumber(const cJSON* json, const char* name) { + return cJSON_GetNumberValue(cJSON_GetObjectItem(json, name)); +} + static bool addObject(cJSON* json, const char* name, FToJson func, const void* obj) { if (NULL == obj) { return true; @@ -62,6 +75,39 @@ static bool fromObjectWithAlloc(const cJSON* json, const char* name, FFromJson f return func(jObj, *obj); } +static const char* jkPnodeType = "Type"; +static int32_t getPnodeTypeSize(cJSON* json) { + switch (getNumber(json, jkPnodeType)) { + case OP_TableScan: + case OP_DataBlocksOptScan: + case OP_TableSeqScan: + return sizeof(STableScanPhyNode); + case OP_TagScan: + return sizeof(STagScanPhyNode); + case OP_SystemTableScan: + return sizeof(SSystemTableScanPhyNode); + case OP_Aggregate: + return sizeof(SAggPhyNode); + case OP_Exchange: + return sizeof(SExchangePhyNode); + default: + break; + }; + return -1; +} + +static bool fromPnode(const cJSON* json, const char* name, FFromJson func, void** obj) { + cJSON* jObj = cJSON_GetObjectItem(json, name); + if (NULL == jObj) { + return true; + } + *obj = calloc(1, getPnodeTypeSize(jObj)); + if (NULL == *obj) { + return false; + } + return func(jObj, *obj); +} + static bool addTarray(cJSON* json, const char* name, FToJson func, const SArray* array, bool isPoint) { size_t size = (NULL == array) ? 0 : taosArrayGetSize(array); if (size > 0) { @@ -154,26 +200,9 @@ static bool fromRawArrayWithAlloc(const cJSON* json, const char* name, FFromJson return fromItem(jArray, func, *array, itemSize, *size); } -static bool fromRawArray(const cJSON* json, const char* name, FFromJson func, void** array, int32_t itemSize, int32_t* size) { +static bool fromRawArray(const cJSON* json, const char* name, FFromJson func, void* array, int32_t itemSize, int32_t* size) { const cJSON* jArray = getArray(json, name, size); - if (*array == NULL) { - *array = calloc(*size, itemSize); - } - - return fromItem(jArray, func, *array, itemSize, *size); -} - -static char* getString(const cJSON* json, const char* name) { - char* p = cJSON_GetStringValue(cJSON_GetObjectItem(json, name)); - return strdup(p); -} - -static void copyString(const cJSON* json, const char* name, char* dst) { - strcpy(dst, cJSON_GetStringValue(cJSON_GetObjectItem(json, name))); -} - -static int64_t getNumber(const cJSON* json, const char* name) { - return cJSON_GetNumberValue(cJSON_GetObjectItem(json, name)); + return fromItem(jArray, func, array, itemSize, *size); } static const char* jkSchemaType = "Type"; @@ -221,7 +250,7 @@ static bool dataBlockSchemaFromJson(const cJSON* json, void* obj) { schema->resultRowSize = getNumber(json, jkDataBlockSchemaResultRowSize); schema->precision = getNumber(json, jkDataBlockSchemaPrecision); - return fromRawArray(json, jkDataBlockSchemaSlotSchema, schemaFromJson, (void**) &(schema->pSchema), sizeof(SSlotSchema), &schema->numOfCols); + return fromRawArrayWithAlloc(json, jkDataBlockSchemaSlotSchema, schemaFromJson, (void**)&(schema->pSchema), sizeof(SSlotSchema), &schema->numOfCols); } static const char* jkColumnFilterInfoLowerRelOptr = "LowerRelOptr"; @@ -539,11 +568,9 @@ static bool scanNodeToJson(const void* obj, cJSON* json) { if (res) { res = cJSON_AddNumberToObject(json, jkScanNodeTableType, pNode->tableType); } - if (res) { res = cJSON_AddNumberToObject(json, jkScanNodeTableOrder, pNode->order); } - if (res) { res = cJSON_AddNumberToObject(json, jkScanNodeTableCount, pNode->count); } @@ -551,7 +578,6 @@ static bool scanNodeToJson(const void* obj, cJSON* json) { if (res) { res = cJSON_AddNumberToObject(json, jkScanNodeTableRevCount, pNode->reverse); } - return res; } @@ -565,6 +591,66 @@ static bool scanNodeFromJson(const cJSON* json, void* obj) { return true; } +static const char* jkColIndexColId = "ColId"; +static const char* jkColIndexColIndex = "ColIndex"; +static const char* jkColIndexFlag = "Flag"; +static const char* jkColIndexName = "Name"; + +static bool colIndexToJson(const void* obj, cJSON* json) { + const SColIndex* col = (const SColIndex*)obj; + bool res = cJSON_AddNumberToObject(json, jkColIndexColId, col->colId); + if (res) { + res = cJSON_AddNumberToObject(json, jkColIndexColIndex, col->colIndex); + } + if (res) { + res = cJSON_AddNumberToObject(json, jkColIndexFlag, col->flag); + } + if (res) { + res = cJSON_AddStringToObject(json, jkColIndexName, col->name); + } + return res; +} + +static bool colIndexFromJson(const cJSON* json, void* obj) { + SColIndex* col = (SColIndex*)obj; + col->colId = getNumber(json, jkColIndexColId); + col->colIndex = getNumber(json, jkColIndexColIndex); + col->flag = getNumber(json, jkColIndexFlag); + copyString(json, jkColIndexName, col->name); + return true; +} + +static const char* jkAggNodeAggAlgo = "AggAlgo"; +static const char* jkAggNodeAggSplit = "AggSplit"; +static const char* jkAggNodeExprs = "Exprs"; +static const char* jkAggNodeGroupByList = "GroupByList"; + +static bool aggNodeToJson(const void* obj, cJSON* json) { + const SAggPhyNode* agg = (const SAggPhyNode*)obj; + bool res = cJSON_AddNumberToObject(json, jkAggNodeAggAlgo, agg->aggAlgo); + if (res) { + res = cJSON_AddNumberToObject(json, jkAggNodeAggSplit, agg->aggSplit); + } + if (res) { + res = addArray(json, jkAggNodeExprs, exprInfoToJson, agg->pExprs); + } + if (res) { + res = addArray(json, jkAggNodeGroupByList, colIndexToJson, agg->pGroupByList); + } + return res; +} + +static bool aggNodeFromJson(const cJSON* json, void* obj) { + SAggPhyNode* agg = (SAggPhyNode*)obj; + agg->aggAlgo = getNumber(json, jkAggNodeAggAlgo); + agg->aggSplit = getNumber(json, jkAggNodeAggSplit); + bool res = fromArray(json, jkAggNodeExprs, exprInfoFromJson, &agg->pExprs, sizeof(SExprInfo)); + if (res) { + res = fromArray(json, jkAggNodeGroupByList, colIndexFromJson, &agg->pGroupByList, sizeof(SExprInfo)); + } + return res; +} + static const char* jkTableScanNodeFlag = "Flag"; static const char* jkTableScanNodeWindow = "Window"; static const char* jkTableScanNodeTagsConditions = "TagsConditions"; @@ -673,10 +759,10 @@ static bool specificPhyNodeToJson(const void* obj, cJSON* json) { case OP_SystemTableScan: return scanNodeToJson(obj, json); case OP_Aggregate: - break; // todo + return aggNodeToJson(obj, json); case OP_Project: return true; - case OP_Groupby: + // case OP_Groupby: case OP_Limit: case OP_SLimit: case OP_TimeWindow: @@ -714,7 +800,7 @@ static bool specificPhyNodeFromJson(const cJSON* json, void* obj) { break; // todo case OP_Project: return true; - case OP_Groupby: + // case OP_Groupby: case OP_Limit: case OP_SLimit: case OP_TimeWindow: @@ -741,12 +827,15 @@ static bool specificPhyNodeFromJson(const cJSON* json, void* obj) { static const char* jkPnodeName = "Name"; static const char* jkPnodeTargets = "Targets"; static const char* jkPnodeConditions = "Conditions"; -static const char* jkPnodeSchema = "InputSchema"; +static const char* jkPnodeSchema = "TargetSchema"; static const char* jkPnodeChildren = "Children"; // The 'pParent' field do not need to be serialized. static bool phyNodeToJson(const void* obj, cJSON* jNode) { const SPhyNode* phyNode = (const SPhyNode*)obj; - bool res = cJSON_AddStringToObject(jNode, jkPnodeName, phyNode->info.name); + bool res = cJSON_AddNumberToObject(jNode, jkPnodeType, phyNode->info.type); + if (res) { + res = cJSON_AddStringToObject(jNode, jkPnodeName, phyNode->info.name); + } if (res) { res = addArray(jNode, jkPnodeTargets, exprInfoToJson, phyNode->pTargets); } @@ -768,8 +857,8 @@ static bool phyNodeToJson(const void* obj, cJSON* jNode) { static bool phyNodeFromJson(const cJSON* json, void* obj) { SPhyNode* node = (SPhyNode*) obj; - node->info.name = getString(json, jkPnodeName); - node->info.type = opNameToOpType(node->info.name); + node->info.type = getNumber(json, jkPnodeType); + node->info.name = opTypeToOpName(node->info.type); bool res = fromArray(json, jkPnodeTargets, exprInfoFromJson, &node->pTargets, sizeof(SExprInfo)); if (res) { @@ -914,11 +1003,13 @@ static SSubplan* subplanFromJson(const cJSON* json) { if (NULL == subplan) { return NULL; } + bool res = fromObject(json, jkSubplanId, subplanIdFromJson, &subplan->id, true); + if (res) { - size_t size = MAX(sizeof(SPhyNode), sizeof(STableScanPhyNode)); - res = fromObjectWithAlloc(json, jkSubplanNode, phyNodeFromJson, (void**)&subplan->pNode, size, false); + res = fromPnode(json, jkSubplanNode, phyNodeFromJson, (void**)&subplan->pNode); } + if (res) { res = fromObjectWithAlloc(json, jkSubplanDataSink, dataSinkFromJson, (void**)&subplan->pDataSink, sizeof(SDataSink), false); } diff --git a/source/libs/planner/test/phyPlanTests.cpp b/source/libs/planner/test/phyPlanTests.cpp index 29f6e48dc7..6d9e08e829 100644 --- a/source/libs/planner/test/phyPlanTests.cpp +++ b/source/libs/planner/test/phyPlanTests.cpp @@ -30,6 +30,21 @@ void* myCalloc(size_t nmemb, size_t size) { class PhyPlanTest : public Test { protected: + void pushAgg(int32_t aggOp) { + unique_ptr agg((SQueryPlanNode*)myCalloc(1, sizeof(SQueryPlanNode))); + agg->info.type = aggOp; + agg->pExpr = taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES); + unique_ptr expr((SExprInfo*)myCalloc(1, sizeof(SExprInfo))); + expr->base.resSchema.type = TSDB_DATA_TYPE_INT; + expr->base.resSchema.bytes = tDataTypes[TSDB_DATA_TYPE_INT].bytes; + expr->pExpr = (tExprNode*)myCalloc(1, sizeof(tExprNode)); + expr->pExpr->nodeType = TEXPR_FUNCTION_NODE; + strcpy(expr->pExpr->_function.functionName, "Count"); + SExprInfo* item = expr.release(); + taosArrayPush(agg->pExpr, &item); + pushNode(agg.release()); + } + void pushScan(const string& db, const string& table, int32_t scanOp) { shared_ptr meta = mockCatalogService->getTableMeta(db, table); EXPECT_TRUE(meta); @@ -95,10 +110,11 @@ protected: private: void pushNode(SQueryPlanNode* node) { if (logicPlan_) { - // todo - } else { - logicPlan_.reset(node); + node->pChildren = taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES); + SQueryPlanNode* child = logicPlan_.release(); + taosArrayPush(node->pChildren, &child); } + logicPlan_.reset(node); } void copySchemaMeta(STableMeta** dst, const STableMeta* src) { @@ -174,6 +190,16 @@ TEST_F(PhyPlanTest, superTableScanTest) { // todo check } +// select count(*) from table +TEST_F(PhyPlanTest, simpleAggTest) { + pushScan("test", "t1", QNODE_TABLESCAN); + pushAgg(QNODE_AGGREGATE); + ASSERT_EQ(run(), TSDB_CODE_SUCCESS); + explain(); + SQueryDag* dag = result(); + // todo check +} + // insert into t values(...) TEST_F(PhyPlanTest, insertTest) { ASSERT_EQ(run("test", "insert into t1 values (now, 1, \"beijing\")"), TSDB_CODE_SUCCESS); diff --git a/source/libs/tdb/CMakeLists.txt b/source/libs/tdb/CMakeLists.txt index 647771fd2d..eb63f2b144 100644 --- a/source/libs/tdb/CMakeLists.txt +++ b/source/libs/tdb/CMakeLists.txt @@ -1,10 +1,11 @@ -aux_source_directory(src TDB_SRC) + +set(TDB_SUBDIRS "btree" "db" "hash" "mpool" "dmgr") +foreach(TDB_SUBDIR ${TDB_SUBDIRS}) + aux_source_directory("src/${TDB_SUBDIR}" TDB_SRC) +endforeach() + add_library(tdb STATIC ${TDB_SRC}) -# target_include_directories( -# tkv -# PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/tkv" -# PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" -# ) + target_include_directories( tdb PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/inc" @@ -17,5 +18,5 @@ target_link_libraries( ) if(${BUILD_TEST}) - # add_subdirectory(test) + add_subdirectory(test) endif(${BUILD_TEST}) diff --git a/source/libs/tdb/inc/tdb.h b/source/libs/tdb/inc/tdb.h index 40d79de821..905b08ee0b 100644 --- a/source/libs/tdb/inc/tdb.h +++ b/source/libs/tdb/inc/tdb.h @@ -22,10 +22,14 @@ extern "C" { #endif +#define TDB_EXTERN +#define TDB_PUBLIC +#define TDB_STATIC static + typedef enum { - TDB_BTREE = 0, - TDB_HASH, - TDB_HEAP, + TDB_BTREE_T = 0, + TDB_HASH_T, + TDB_HEAP_T, } tdb_db_t; // Forward declaration @@ -39,9 +43,9 @@ typedef struct { } TDB_KEY, TDB_VALUE; // TDB Operations -int tdbCreateDB(TDB** dbpp); -int tdbOpenDB(TDB* dbp, tdb_db_t type, uint32_t flags); -int tdbCloseDB(TDB* dbp, uint32_t flags); +TDB_EXTERN int tdbCreateDB(TDB** dbpp, tdb_db_t type); +TDB_EXTERN int tdbOpenDB(TDB* dbp, uint32_t flags); +TDB_EXTERN int tdbCloseDB(TDB* dbp, uint32_t flags); #ifdef __cplusplus } diff --git a/source/libs/tdb/src/db/tdbDB.c b/source/libs/tdb/src/db/tdbDB.c new file mode 100644 index 0000000000..2af40d8642 --- /dev/null +++ b/source/libs/tdb/src/db/tdbDB.c @@ -0,0 +1,67 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "tdbDB.h" +#include "tdb.h" + +TDB_EXTERN int tdbCreateDB(TDB** dbpp, tdb_db_t type) { + TDB* dbp; + int ret; + + dbp = calloc(1, sizeof(*dbp)); + if (dbp == NULL) { + return -1; + } + + dbp->pageSize = TDB_DEFAULT_PGSIZE; + dbp->type = type; + + switch (type) { + case TDB_BTREE_T: + // ret = tdbInitBtreeDB(dbp); + // if (ret < 0) goto _err; + break; + case TDB_HASH_T: + // ret = tdbInitHashDB(dbp); + // if (ret < 0) goto _err; + break; + case TDB_HEAP_T: + // ret = tdbInitHeapDB(dbp); + // if (ret < 0) goto _err; + break; + default: + break; + } + + *dbpp = dbp; + return 0; + +_err: + if (dbp) { + free(dbp); + } + *dbpp = NULL; + return 0; +} + +TDB_EXTERN int tdbOpenDB(TDB* dbp, uint32_t flags) { + // TODO + return 0; +} + +TDB_EXTERN int tdbCloseDB(TDB* dbp, uint32_t flags) { + // TODO + return 0; +} \ No newline at end of file diff --git a/source/libs/tdb/src/tdbDiskMgr.c b/source/libs/tdb/src/dmgr/tdbDiskMgr.c similarity index 100% rename from source/libs/tdb/src/tdbDiskMgr.c rename to source/libs/tdb/src/dmgr/tdbDiskMgr.c diff --git a/source/libs/tdb/src/inc/tdbBtree.h b/source/libs/tdb/src/inc/tdbBtree.h index c68f94bb48..28258b8e60 100644 --- a/source/libs/tdb/src/inc/tdbBtree.h +++ b/source/libs/tdb/src/inc/tdbBtree.h @@ -16,7 +16,7 @@ #ifndef _TD_TDB_BTREE_H_ #define _TD_TDB_BTREE_H_ -#include "tkvDef.h" +#include "tdbDef.h" #ifdef __cplusplus extern "C" { @@ -26,6 +26,8 @@ typedef struct { pgid_t root; // root page number } TDB_BTREE; +TDB_PUBLIC int tdbInitBtreeDB(TDB *dbp); + #ifdef __cplusplus } #endif diff --git a/source/libs/tdb/src/inc/tdbDB.h b/source/libs/tdb/src/inc/tdbDB.h index 479ef77711..e6b49c94ec 100644 --- a/source/libs/tdb/src/inc/tdbDB.h +++ b/source/libs/tdb/src/inc/tdbDB.h @@ -16,20 +16,22 @@ #ifndef _TD_TDB_DB_H_ #define _TD_TDB_DB_H_ +#include "tdb.h" #include "tdbBtree.h" #include "tdbHash.h" +#include "tdbHeap.h" #ifdef __cplusplus extern "C" { #endif - struct TDB { pgsize_t pageSize; tdb_db_t type; union { - TDB_BTREE btree; - TDB_HASH hash; + TDB_BTREE *btree; + TDB_HASH * hash; + TDB_HEAP * heap; } dbam; // Different access methods }; diff --git a/source/libs/tdb/src/inc/tdbDef.h b/source/libs/tdb/src/inc/tdbDef.h index a04b8cc402..4b5e54368b 100644 --- a/source/libs/tdb/src/inc/tdbDef.h +++ b/source/libs/tdb/src/inc/tdbDef.h @@ -24,16 +24,17 @@ extern "C" { // pgid_t typedef int32_t pgid_t; -#define TKV_IVLD_PGID ((pgid_t)-1) +#define TDB_IVLD_PGID ((pgid_t)-1) // framd_id_t typedef int32_t frame_id_t; // pgsize_t typedef int32_t pgsize_t; -#define TKV_MIN_PGSIZE 512 -#define TKV_MAX_PGSIZE 16384 -#define TKV_IS_PGSIZE_VLD(s) (((s) >= TKV_MIN_PGSIZE) && (TKV_MAX_PGSIZE <= TKV_MAX_PGSIZE)) +#define TDB_MIN_PGSIZE 512 +#define TDB_MAX_PGSIZE 16384 +#define TDB_DEFAULT_PGSIZE 4096 +#define TDB_IS_PGSIZE_VLD(s) (((s) >= TKV_MIN_PGSIZE) && (TKV_MAX_PGSIZE <= TKV_MAX_PGSIZE)) #ifdef __cplusplus } diff --git a/source/libs/tdb/src/inc/tdbHash.h b/source/libs/tdb/src/inc/tdbHash.h index fca19035f1..8219bda2f8 100644 --- a/source/libs/tdb/src/inc/tdbHash.h +++ b/source/libs/tdb/src/inc/tdbHash.h @@ -13,8 +13,8 @@ * along with this program. If not, see . */ -#ifndef _TD_TKV_HAHS_H_ -#define _TD_TKV_HAHS_H_ +#ifndef _TD_TDB_HASH_H_ +#define _TD_TDB_HASH_H_ #include "tdbDef.h" @@ -26,8 +26,10 @@ typedef struct { // TODO } TDB_HASH; +TDB_PUBLIC int tdbInitHashDB(TDB *dbp); + #ifdef __cplusplus } #endif -#endif /*_TD_TKV_HAHS_H_*/ \ No newline at end of file +#endif /*_TD_TDB_HASH_H_*/ \ No newline at end of file diff --git a/source/libs/tdb/src/inc/tdbHeap.h b/source/libs/tdb/src/inc/tdbHeap.h new file mode 100644 index 0000000000..25a812fa5f --- /dev/null +++ b/source/libs/tdb/src/inc/tdbHeap.h @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_TDB_HEAP_H_ +#define _TD_TDB_HEAP_H_ + +#include "tdbDef.h" + +#ifdef __cplusplus +extern "C" { +#endif + +typedef struct { + // TODO +} TDB_HEAP; + +TDB_PUBLIC int tdbInitHeapDB(TDB *dbp); + +#ifdef __cplusplus +} +#endif + +#endif /*_TD_TDB_HEAP_H_*/ \ No newline at end of file diff --git a/source/libs/tdb/src/tdbBufPool.c b/source/libs/tdb/src/mpool/tdbBufPool.c similarity index 100% rename from source/libs/tdb/src/tdbBufPool.c rename to source/libs/tdb/src/mpool/tdbBufPool.c diff --git a/source/libs/tdb/test/tdbTest.cpp b/source/libs/tdb/test/tdbTest.cpp index 3ff43fdd69..38c3b0b917 100644 --- a/source/libs/tdb/test/tdbTest.cpp +++ b/source/libs/tdb/test/tdbTest.cpp @@ -6,9 +6,9 @@ TEST(tdb_api_test, tdb_create_open_close_db_test) { int ret; TDB *dbp; - tdbCreateDB(&dbp); + tdbCreateDB(&dbp, TDB_BTREE_T); - tdbOpenDB(dbp, TDB_BTREE, 0); + tdbOpenDB(dbp, 0); tdbCloseDB(dbp, 0); } \ No newline at end of file