diff --git a/include/libs/executor/dataSinkMgt.h b/include/libs/executor/dataSinkMgt.h index f13ba5f87e..5cef3b2253 100644 --- a/include/libs/executor/dataSinkMgt.h +++ b/include/libs/executor/dataSinkMgt.h @@ -48,7 +48,7 @@ typedef struct SOutputData { int8_t compressed; char* pData; bool queryEnd; - bool needSchedule; + int32_t scheduleJobNo; int32_t bufStatus; int64_t useconds; int8_t precision; diff --git a/include/libs/planner/planner.h b/include/libs/planner/planner.h index d98c1ca595..4f1db37f79 100644 --- a/include/libs/planner/planner.h +++ b/include/libs/planner/planner.h @@ -119,6 +119,25 @@ typedef struct SExchangePhyNode { SArray *pSrcEndPoints; // SEpAddr, scheduler fill by calling qSetSuplanExecutionNode } SExchangePhyNode; +typedef enum EAggAlgo { + AGG_ALGO_PLAIN = 1, // simple agg across all input rows + AGG_ALGO_SORTED, // grouped agg, input must be sorted + AGG_ALGO_HASHED // grouped agg, use internal hashtable +} EAggAlgo; + +typedef enum EAggSplit { + AGG_SPLIT_PRE = 1, // first level agg, maybe don't need calculate the final result + AGG_SPLIT_FINAL // second level agg, must calculate the final result +} EAggSplit; + +typedef struct SAggPhyNode { + SPhyNode node; + EAggAlgo aggAlgo; // algorithm used by agg operator + EAggSplit aggSplit; // distributed splitting mode + SArray *pExprs; // SExprInfo list, these are expression list of group_by_clause and parameter expression of aggregate function + SArray *pGroupByList; // SColIndex list, but these must be column node +} SAggPhyNode; + typedef struct SSubplanId { uint64_t queryId; uint64_t templateId; diff --git a/include/libs/planner/plannerOp.h b/include/libs/planner/plannerOp.h index 41d6e028cf..5cc896f1c2 100644 --- a/include/libs/planner/plannerOp.h +++ b/include/libs/planner/plannerOp.h @@ -30,7 +30,7 @@ OP_ENUM_MACRO(TagScan) OP_ENUM_MACRO(SystemTableScan) OP_ENUM_MACRO(Aggregate) OP_ENUM_MACRO(Project) -OP_ENUM_MACRO(Groupby) +// OP_ENUM_MACRO(Groupby) OP_ENUM_MACRO(Limit) OP_ENUM_MACRO(SLimit) OP_ENUM_MACRO(TimeWindow) diff --git a/include/libs/qworker/qworker.h b/include/libs/qworker/qworker.h index 08b5fb98e7..5d815d15e0 100644 --- a/include/libs/qworker/qworker.h +++ b/include/libs/qworker/qworker.h @@ -55,7 +55,7 @@ int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qW int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg); -int32_t qWorkerProcessQueryContinueMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg); +int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg); int32_t qWorkerProcessDataSinkMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg); diff --git a/source/dnode/mgmt/impl/src/dndEnv.c b/source/dnode/mgmt/impl/src/dndEnv.c index 1bf1ea2b92..23fc643abe 100644 --- a/source/dnode/mgmt/impl/src/dndEnv.c +++ b/source/dnode/mgmt/impl/src/dndEnv.c @@ -293,7 +293,7 @@ int32_t dndInit(const SDnodeEnvCfg *pCfg) { if (vnodeInit(&vnodeOpt) != 0) { dError("failed to init vnode since %s", terrstr()); dndCleanup(); - return NULL; + return -1; } memcpy(&dndEnv.cfg, pCfg, sizeof(SDnodeEnvCfg)); diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 6113d0a536..7ff02309ec 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -37,36 +37,18 @@ typedef int32_t (*PutReqToVQueryQFp)(SDnode *pDnode, struct SRpcMsg *pReq); typedef struct SVnodeCfg { int32_t vgId; SDnode *pDnode; - - /** vnode buffer pool options */ struct { - /** write buffer size */ uint64_t wsize; uint64_t ssize; uint64_t lsize; - /** use heap allocator or arena allocator */ bool isHeapAllocator; }; - - /** time to live of tables in this vnode */ uint32_t ttl; - - /** data to keep in this vnode */ uint32_t keep; - - /** if TS data is eventually consistency */ bool isWeak; - - /** TSDB config */ STsdbCfg tsdbCfg; - - /** META config */ SMetaCfg metaCfg; - - /** TQ config */ STqCfg tqCfg; - - /** WAL config */ SWalCfg walCfg; } SVnodeCfg; diff --git a/source/dnode/vnode/src/inc/vnodeDef.h b/source/dnode/vnode/src/inc/vnodeDef.h index 1c534a8aeb..85563890fa 100644 --- a/source/dnode/vnode/src/inc/vnodeDef.h +++ b/source/dnode/vnode/src/inc/vnodeDef.h @@ -30,12 +30,9 @@ #include "vnodeBufferPool.h" #include "vnodeCfg.h" #include "vnodeCommit.h" -#include "vnodeFS.h" #include "vnodeMemAllocator.h" #include "vnodeQuery.h" -#include "vnodeRequest.h" #include "vnodeStateMgr.h" -#include "vnodeSync.h" #ifdef __cplusplus extern "C" { @@ -73,8 +70,6 @@ struct SVnode { STsdb* pTsdb; STQ* pTq; SWal* pWal; - SVnodeSync* pSync; - SVnodeFS* pFs; tsem_t canCommit; SQHandle* pQuery; SDnode* pDnode; @@ -84,6 +79,16 @@ int vnodeScheduleTask(SVnodeTask* task); int32_t vnodePutReqToVQueryQ(SVnode *pVnode, struct SRpcMsg *pReq); +// For Log +extern int32_t vDebugFlag; + +#define vFatal(...) do { if (vDebugFlag & DEBUG_FATAL) { taosPrintLog("TDB FATAL ", 255, __VA_ARGS__); }} while(0) +#define vError(...) do { if (vDebugFlag & DEBUG_ERROR) { taosPrintLog("TDB ERROR ", 255, __VA_ARGS__); }} while(0) +#define vWarn(...) do { if (vDebugFlag & DEBUG_WARN) { taosPrintLog("TDB WARN ", 255, __VA_ARGS__); }} while(0) +#define vInfo(...) do { if (vDebugFlag & DEBUG_INFO) { taosPrintLog("TDB ", 255, __VA_ARGS__); }} while(0) +#define vDebug(...) do { if (vDebugFlag & DEBUG_DEBUG) { taosPrintLog("TDB ", tsdbDebugFlag, __VA_ARGS__); }} while(0) +#define vTrace(...) do { if (vDebugFlag & DEBUG_TRACE) { taosPrintLog("TDB ", tsdbDebugFlag, __VA_ARGS__); }} while(0) + #ifdef __cplusplus } #endif diff --git a/source/dnode/vnode/src/inc/vnodeMAF.h b/source/dnode/vnode/src/inc/vnodeMAF.h deleted file mode 100644 index 7aa405103c..0000000000 --- a/source/dnode/vnode/src/inc/vnodeMAF.h +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#ifndef _TD_VNODE_MAF_H_ -#define _TD_VNODE_MAF_H_ - -#include "vnode.h" - -#ifdef __cplusplus -extern "C" { -#endif - -int vnodeOpenMAF(SVnode *pVnode); -void vnodeCloseMAF(SVnode *pVnode); - -#ifdef __cplusplus -} -#endif - -#endif /*_TD_VNODE_MAF_H_*/ \ No newline at end of file diff --git a/source/dnode/vnode/src/inc/vnodeQuery.h b/source/dnode/vnode/src/inc/vnodeQuery.h index b51aafb313..51c93b5ad7 100644 --- a/source/dnode/vnode/src/inc/vnodeQuery.h +++ b/source/dnode/vnode/src/inc/vnodeQuery.h @@ -19,13 +19,12 @@ #ifdef __cplusplus extern "C" { #endif -#include "vnodeInt.h" #include "qworker.h" +#include "vnode.h" typedef struct SQWorkerMgmt SQHandle; - int vnodeQueryOpen(SVnode *pVnode); #ifdef __cplusplus diff --git a/source/dnode/vnode/src/inc/vnodeRead.h b/source/dnode/vnode/src/inc/vnodeRead.h deleted file mode 100644 index 5ce84b2ebf..0000000000 --- a/source/dnode/vnode/src/inc/vnodeRead.h +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#ifndef _TD_VNODE_READ_H_ -#define _TD_VNODE_READ_H_ - -#ifdef __cplusplus -extern "C" { -#endif -#include "vnodeInt.h" - -void vnodeProcessReadMsg(SVnode *pVnode, SVnodeMsg *pMsg); - -#ifdef __cplusplus -} -#endif - -#endif /*_TD_VNODE_READ_H_*/ diff --git a/source/dnode/vnode/src/inc/vnodeRequest.h b/source/dnode/vnode/src/inc/vnodeRequest.h deleted file mode 100644 index 52f4281eea..0000000000 --- a/source/dnode/vnode/src/inc/vnodeRequest.h +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#ifndef _TD_VNODE_REQUEST_H_ -#define _TD_VNODE_REQUEST_H_ - -#include "vnode.h" - -#ifdef __cplusplus -extern "C" { -#endif - -// SVDropTbReq -// int vnodeBuildDropTableReq(void **buf, const SVDropTbReq *pReq); -// void *vnodeParseDropTableReq(void *buf, SVDropTbReq *pReq); - -#ifdef __cplusplus -} -#endif - -#endif /*_TD_VNODE_REQUEST_H_*/ \ No newline at end of file diff --git a/source/dnode/vnode/src/meta/CMakeLists.txt b/source/dnode/vnode/src/meta/CMakeLists.txt deleted file mode 100644 index 7041811617..0000000000 --- a/source/dnode/vnode/src/meta/CMakeLists.txt +++ /dev/null @@ -1,46 +0,0 @@ -set(META_DB_IMPL_LIST "BDB" "SQLITE") -set(META_DB_IMPL "BDB" CACHE STRING "Use BDB as the default META implementation") -set_property(CACHE META_DB_IMPL PROPERTY STRINGS ${META_DB_IMPL_LIST}) - -if(META_DB_IMPL IN_LIST META_DB_IMPL_LIST) - message(STATUS "META DB Impl: ${META_DB_IMPL}==============") -else() - message(FATAL_ERROR "Invalid META DB IMPL: ${META_DB_IMPL}==============") -endif() - -aux_source_directory(src META_SRC) -if(${META_DB_IMPL} STREQUAL "BDB") - list(REMOVE_ITEM META_SRC "src/metaSQLiteImpl.c") -elseif(${META_DB_IMPL} STREQUAL "SQLITE") - list(REMOVE_ITEM META_SRC "src/metaBDBImpl.c") -endif() - -add_library(meta STATIC ${META_SRC}) -target_include_directories( - meta - PUBLIC "${CMAKE_SOURCE_DIR}/include/dnode/vnode/meta" - PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/index" - PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" -) -target_link_libraries( - meta - PUBLIC common - PUBLIC index -) - -if(${META_DB_IMPL} STREQUAL "BDB") - target_link_libraries( - meta - PUBLIC bdb - ) -elseif(${META_DB_IMPL} STREQUAL "SQLITE") - target_link_libraries( - meta - PUBLIC sqlite - ) -endif() - - -if(${BUILD_TEST}) - add_subdirectory(test) -endif(${BUILD_TEST}) diff --git a/source/dnode/vnode/src/tq/CMakeLists.txt b/source/dnode/vnode/src/tq/CMakeLists.txt deleted file mode 100644 index 7cb7499d64..0000000000 --- a/source/dnode/vnode/src/tq/CMakeLists.txt +++ /dev/null @@ -1,20 +0,0 @@ -aux_source_directory(src TQ_SRC) -add_library(tq ${TQ_SRC}) -target_include_directories( - tq - PUBLIC "${CMAKE_SOURCE_DIR}/include/dnode/vnode/tq" - PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" -) - -target_link_libraries( - tq - PUBLIC wal - PUBLIC os - PUBLIC util - PUBLIC common - PUBLIC transport -) - -if(${BUILD_TEST}) - add_subdirectory(test) -endif(${BUILD_TEST}) diff --git a/source/dnode/vnode/src/tsdb/CMakeLists.txt b/source/dnode/vnode/src/tsdb/CMakeLists.txt deleted file mode 100644 index e38ba1c466..0000000000 --- a/source/dnode/vnode/src/tsdb/CMakeLists.txt +++ /dev/null @@ -1,34 +0,0 @@ -aux_source_directory(src TSDB_SRC) -if(0) - add_library(tsdb ${TSDB_SRC}) -else(0) - add_library(tsdb STATIC "") - target_sources(tsdb - PRIVATE - "src/tsdbCommit.c" - "src/tsdbMain.c" - "src/tsdbMemTable.c" - "src/tsdbOptions.c" - "src/tsdbWrite.c" - "src/tsdbReadImpl.c" - "src/tsdbFile.c" - "src/tsdbFS.c" - "src/tsdbRead.c" - ) -endif(0) - -target_include_directories( - tsdb - PUBLIC "${CMAKE_SOURCE_DIR}/include/dnode/vnode/tsdb" - PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" -) - -target_link_libraries( - tsdb - PUBLIC os - PUBLIC util - PUBLIC common - PUBLIC tkv - PUBLIC tfs - PUBLIC meta -) \ No newline at end of file diff --git a/source/dnode/vnode/src/vnd/CMakeLists.txt b/source/dnode/vnode/src/vnd/CMakeLists.txt deleted file mode 100644 index 944a4276db..0000000000 --- a/source/dnode/vnode/src/vnd/CMakeLists.txt +++ /dev/null @@ -1,24 +0,0 @@ -aux_source_directory(src VNODE_SRC) -add_library(vnode STATIC ${VNODE_SRC}) -target_include_directories( - vnode - PUBLIC "${CMAKE_SOURCE_DIR}/include/dnode/vnode" - private "${CMAKE_CURRENT_SOURCE_DIR}/inc" -) -target_link_libraries( - vnode - PUBLIC os - PUBLIC transport - PUBLIC meta - PUBLIC tq - PUBLIC tsdb - PUBLIC wal - PUBLIC sync - PUBLIC cjson - PUBLIC qworker -) - -# test -if(${BUILD_TEST}) -# add_subdirectory(test) -endif(${BUILD_TEST}) diff --git a/source/dnode/vnode/src/vnd/vnodeFS.c b/source/dnode/vnode/src/vnd/vnodeFS.c deleted file mode 100644 index 5e9f89ccd5..0000000000 --- a/source/dnode/vnode/src/vnd/vnodeFS.c +++ /dev/null @@ -1,16 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#include "vnodeDef.h" \ No newline at end of file diff --git a/source/dnode/vnode/src/vnd/vnodeMgr.c b/source/dnode/vnode/src/vnd/vnodeMgr.c index fdb96e52e2..ce9e487076 100644 --- a/source/dnode/vnode/src/vnd/vnodeMgr.c +++ b/source/dnode/vnode/src/vnd/vnodeMgr.c @@ -41,7 +41,7 @@ int vnodeInit(const SVnodeOpt *pOption) { for (uint16_t i = 0; i < pOption->nthreads; i++) { pthread_create(&(vnodeMgr.threads[i]), NULL, loop, NULL); - pthread_setname_np(vnodeMgr.threads[i], "VND Commit Thread"); + // pthread_setname_np(vnodeMgr.threads[i], "VND Commit Thread"); } } else { // TODO: if no commit thread is set, then another mechanism should be diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index 52b20f8411..2c959e663f 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -28,7 +28,7 @@ int vnodeProcessQueryReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { case TDMT_VND_QUERY: return qWorkerProcessQueryMsg(pVnode->pTsdb, pVnode->pQuery, pMsg); case TDMT_VND_QUERY_CONTINUE: - return qWorkerProcessQueryContinueMsg(pVnode->pTsdb, pVnode->pQuery, pMsg); + return qWorkerProcessCQueryMsg(pVnode->pTsdb, pVnode->pQuery, pMsg); case TDMT_VND_SCHEDULE_DATA_SINK: return qWorkerProcessDataSinkMsg(pVnode->pTsdb, pVnode->pQuery, pMsg); default: diff --git a/source/dnode/vnode/src/vnd/vnodeRead.c b/source/dnode/vnode/src/vnd/vnodeRead.c deleted file mode 100644 index 5e9f89ccd5..0000000000 --- a/source/dnode/vnode/src/vnd/vnodeRead.c +++ /dev/null @@ -1,16 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#include "vnodeDef.h" \ No newline at end of file diff --git a/source/dnode/vnode/src/vnd/vnodeRequest.c b/source/dnode/vnode/src/vnd/vnodeRequest.c deleted file mode 100644 index 5367c9e091..0000000000 --- a/source/dnode/vnode/src/vnd/vnodeRequest.c +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#include "vnodeDef.h" - -#if 0 - -static int vnodeBuildCreateTableReq(void **buf, const SVCreateTableReq *pReq); -static void *vnodeParseCreateTableReq(void *buf, SVCreateTableReq *pReq); - -int vnodeBuildReq(void **buf, const SVnodeReq *pReq, tmsg_t type) { - int tsize = 0; - - tsize += taosEncodeFixedU64(buf, pReq->ver); - switch (type) { - case TDMT_VND_CREATE_STB: - tsize += vnodeBuildCreateTableReq(buf, &(pReq->ctReq)); - break; - case TDMT_VND_SUBMIT: - /* code */ - break; - default: - break; - } - /* TODO */ - return tsize; -} - -void *vnodeParseReq(void *buf, SVnodeReq *pReq, tmsg_t type) { - buf = taosDecodeFixedU64(buf, &(pReq->ver)); - - switch (type) { - case TDMT_VND_CREATE_STB: - buf = vnodeParseCreateTableReq(buf, &(pReq->ctReq)); - break; - - default: - break; - } - - // TODO - return buf; -} - -static int vnodeBuildCreateTableReq(void **buf, const SVCreateTableReq *pReq) { - int tsize = 0; - - tsize += taosEncodeString(buf, pReq->name); - tsize += taosEncodeFixedU32(buf, pReq->ttl); - tsize += taosEncodeFixedU32(buf, pReq->keep); - tsize += taosEncodeFixedU8(buf, pReq->type); - - switch (pReq->type) { - case META_SUPER_TABLE: - tsize += taosEncodeFixedU64(buf, pReq->stbCfg.suid); - tsize += tdEncodeSchema(buf, pReq->stbCfg.pSchema); - tsize += tdEncodeSchema(buf, pReq->stbCfg.pTagSchema); - break; - case META_CHILD_TABLE: - tsize += taosEncodeFixedU64(buf, pReq->ctbCfg.suid); - tsize += tdEncodeKVRow(buf, pReq->ctbCfg.pTag); - break; - case META_NORMAL_TABLE: - tsize += tdEncodeSchema(buf, pReq->ntbCfg.pSchema); - break; - default: - break; - } - - return tsize; -} - -static void *vnodeParseCreateTableReq(void *buf, SVCreateTableReq *pReq) { - buf = taosDecodeString(buf, &(pReq->name)); - buf = taosDecodeFixedU32(buf, &(pReq->ttl)); - buf = taosDecodeFixedU32(buf, &(pReq->keep)); - buf = taosDecodeFixedU8(buf, &(pReq->type)); - - switch (pReq->type) { - case META_SUPER_TABLE: - buf = taosDecodeFixedU64(buf, &(pReq->stbCfg.suid)); - buf = tdDecodeSchema(buf, &(pReq->stbCfg.pSchema)); - buf = tdDecodeSchema(buf, &(pReq->stbCfg.pTagSchema)); - break; - case META_CHILD_TABLE: - buf = taosDecodeFixedU64(buf, &(pReq->ctbCfg.suid)); - buf = tdDecodeKVRow(buf, &(pReq->ctbCfg.pTag)); - break; - case META_NORMAL_TABLE: - buf = tdDecodeSchema(buf, &(pReq->ntbCfg.pSchema)); - break; - default: - break; - } - - return buf; -} - -int vnodeBuildDropTableReq(void **buf, const SVDropTbReq *pReq) { - // TODO - return 0; -} - -void *vnodeParseDropTableReq(void *buf, SVDropTbReq *pReq) { - // TODO -} -#endif \ No newline at end of file diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c deleted file mode 100644 index 6dea4a4e57..0000000000 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ /dev/null @@ -1,14 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ \ No newline at end of file diff --git a/source/dnode/vnode/src/vnd/vnodeWrite.c b/source/dnode/vnode/src/vnd/vnodeWrite.c index 185487757f..01619b5c77 100644 --- a/source/dnode/vnode/src/vnd/vnodeWrite.c +++ b/source/dnode/vnode/src/vnd/vnodeWrite.c @@ -83,6 +83,7 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { if (metaCreateTable(pVnode->pMeta, pCreateTbReq) < 0) { // TODO: handle error } + vTrace("vgId:%d process create table %s", pVnode->vgId, pCreateTbReq->name); if (pCreateTbReq->type == TD_SUPER_TABLE) { free(pCreateTbReq->stbCfg.pSchema); free(pCreateTbReq->stbCfg.pTagSchema); diff --git a/source/libs/CMakeLists.txt b/source/libs/CMakeLists.txt index 1dc16c74f7..1d23f333b2 100644 --- a/source/libs/CMakeLists.txt +++ b/source/libs/CMakeLists.txt @@ -1,6 +1,6 @@ add_subdirectory(transport) add_subdirectory(sync) -add_subdirectory(tkv) +add_subdirectory(tdb) add_subdirectory(index) add_subdirectory(wal) add_subdirectory(parser) diff --git a/source/libs/executor/src/dataDispatcher.c b/source/libs/executor/src/dataDispatcher.c index a69084f3db..8280f9d0af 100644 --- a/source/libs/executor/src/dataDispatcher.c +++ b/source/libs/executor/src/dataDispatcher.c @@ -182,6 +182,12 @@ static void getDataLength(SDataSinkHandle* pHandle, int32_t* pLen, bool* pQueryE static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) { SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle; + if (NULL == pDispatcher->nextOutput.pData) { + assert(pDispatcher->queryEnd); + pOutput->useconds = pDispatcher->useconds; + pOutput->precision = pDispatcher->schema.precision; + return TSDB_CODE_SUCCESS; + } SDataCacheEntry* pEntry = (SDataCacheEntry*)(pDispatcher->nextOutput.pData); memcpy(pOutput->pData, pEntry->data, pEntry->dataLen); pOutput->numOfRows = pEntry->numOfRows; @@ -190,7 +196,7 @@ static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) { pOutput->bufStatus = updateStatus(pDispatcher); pthread_mutex_lock(&pDispatcher->mutex); pOutput->queryEnd = pDispatcher->queryEnd; - pOutput->needSchedule = false; + pOutput->scheduleJobNo = 0; pOutput->useconds = pDispatcher->useconds; pOutput->precision = pDispatcher->schema.precision; pthread_mutex_unlock(&pDispatcher->mutex); diff --git a/source/libs/planner/src/physicalPlan.c b/source/libs/planner/src/physicalPlan.c index 71e69c67d2..65c97110ce 100644 --- a/source/libs/planner/src/physicalPlan.c +++ b/source/libs/planner/src/physicalPlan.c @@ -189,7 +189,6 @@ static SPhyNode* createUserTableScanNode(SQueryPlanNode* pPlanNode, SQueryTableI return (SPhyNode*)node; } - static bool isSystemTable(SQueryTableInfo* pTable) { // todo return false; @@ -295,13 +294,31 @@ static SPhyNode* createSingleTableScanNode(SQueryPlanNode* pPlanNode, SQueryTabl static SPhyNode* createTableScanNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) { SQueryTableInfo* pTable = (SQueryTableInfo*)pPlanNode->pExtInfo; - if (needMultiNodeScan(pTable)) { return createExchangeNode(pCxt, pPlanNode, splitSubplanByTable(pCxt, pPlanNode, pTable)); } return createSingleTableScanNode(pPlanNode, pTable, pCxt->pCurrentSubplan); } +static SPhyNode* createSingleTableAgg(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) { + SAggPhyNode* node = (SAggPhyNode*)initPhyNode(pPlanNode, OP_Aggregate, sizeof(SAggPhyNode)); + SGroupbyExpr* pGroupBy = (SGroupbyExpr*)pPlanNode->pExtInfo; + node->aggAlgo = AGG_ALGO_PLAIN; + node->aggSplit = AGG_SPLIT_FINAL; + if (NULL != pGroupBy) { + node->aggAlgo = AGG_ALGO_HASHED; + node->pGroupByList = validPointer(taosArrayDup(pGroupBy->columnInfo)); + } + return (SPhyNode*)node; +} + +static SPhyNode* createAggNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) { + // if (needMultiNodeAgg(pPlanNode)) { + + // } + return createSingleTableAgg(pCxt, pPlanNode); +} + static SPhyNode* createPhyNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) { SPhyNode* node = NULL; switch (pPlanNode->info.type) { @@ -311,6 +328,10 @@ static SPhyNode* createPhyNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) { case QNODE_TABLESCAN: node = createTableScanNode(pCxt, pPlanNode); break; + case QNODE_AGGREGATE: + case QNODE_GROUPBY: + node = createAggNode(pCxt, pPlanNode); + break; case QNODE_MODIFY: // Insert is not an operator in a physical plan. break; diff --git a/source/libs/planner/src/physicalPlanJson.c b/source/libs/planner/src/physicalPlanJson.c index 7e36e0e124..123364134d 100644 --- a/source/libs/planner/src/physicalPlanJson.c +++ b/source/libs/planner/src/physicalPlanJson.c @@ -20,6 +20,19 @@ typedef bool (*FToJson)(const void* obj, cJSON* json); typedef bool (*FFromJson)(const cJSON* json, void* obj); +static char* getString(const cJSON* json, const char* name) { + char* p = cJSON_GetStringValue(cJSON_GetObjectItem(json, name)); + return strdup(p); +} + +static void copyString(const cJSON* json, const char* name, char* dst) { + strcpy(dst, cJSON_GetStringValue(cJSON_GetObjectItem(json, name))); +} + +static int64_t getNumber(const cJSON* json, const char* name) { + return cJSON_GetNumberValue(cJSON_GetObjectItem(json, name)); +} + static bool addObject(cJSON* json, const char* name, FToJson func, const void* obj) { if (NULL == obj) { return true; @@ -62,6 +75,39 @@ static bool fromObjectWithAlloc(const cJSON* json, const char* name, FFromJson f return func(jObj, *obj); } +static const char* jkPnodeType = "Type"; +static int32_t getPnodeTypeSize(cJSON* json) { + switch (getNumber(json, jkPnodeType)) { + case OP_TableScan: + case OP_DataBlocksOptScan: + case OP_TableSeqScan: + return sizeof(STableScanPhyNode); + case OP_TagScan: + return sizeof(STagScanPhyNode); + case OP_SystemTableScan: + return sizeof(SSystemTableScanPhyNode); + case OP_Aggregate: + return sizeof(SAggPhyNode); + case OP_Exchange: + return sizeof(SExchangePhyNode); + default: + break; + }; + return -1; +} + +static bool fromPnode(const cJSON* json, const char* name, FFromJson func, void** obj) { + cJSON* jObj = cJSON_GetObjectItem(json, name); + if (NULL == jObj) { + return true; + } + *obj = calloc(1, getPnodeTypeSize(jObj)); + if (NULL == *obj) { + return false; + } + return func(jObj, *obj); +} + static bool addTarray(cJSON* json, const char* name, FToJson func, const SArray* array, bool isPoint) { size_t size = (NULL == array) ? 0 : taosArrayGetSize(array); if (size > 0) { @@ -154,26 +200,9 @@ static bool fromRawArrayWithAlloc(const cJSON* json, const char* name, FFromJson return fromItem(jArray, func, *array, itemSize, *size); } -static bool fromRawArray(const cJSON* json, const char* name, FFromJson func, void** array, int32_t itemSize, int32_t* size) { +static bool fromRawArray(const cJSON* json, const char* name, FFromJson func, void* array, int32_t itemSize, int32_t* size) { const cJSON* jArray = getArray(json, name, size); - if (*array == NULL) { - *array = calloc(*size, itemSize); - } - - return fromItem(jArray, func, *array, itemSize, *size); -} - -static char* getString(const cJSON* json, const char* name) { - char* p = cJSON_GetStringValue(cJSON_GetObjectItem(json, name)); - return strdup(p); -} - -static void copyString(const cJSON* json, const char* name, char* dst) { - strcpy(dst, cJSON_GetStringValue(cJSON_GetObjectItem(json, name))); -} - -static int64_t getNumber(const cJSON* json, const char* name) { - return cJSON_GetNumberValue(cJSON_GetObjectItem(json, name)); + return fromItem(jArray, func, array, itemSize, *size); } static const char* jkSchemaType = "Type"; @@ -221,7 +250,7 @@ static bool dataBlockSchemaFromJson(const cJSON* json, void* obj) { schema->resultRowSize = getNumber(json, jkDataBlockSchemaResultRowSize); schema->precision = getNumber(json, jkDataBlockSchemaPrecision); - return fromRawArray(json, jkDataBlockSchemaSlotSchema, schemaFromJson, (void**) &(schema->pSchema), sizeof(SSlotSchema), &schema->numOfCols); + return fromRawArrayWithAlloc(json, jkDataBlockSchemaSlotSchema, schemaFromJson, (void**)&(schema->pSchema), sizeof(SSlotSchema), &schema->numOfCols); } static const char* jkColumnFilterInfoLowerRelOptr = "LowerRelOptr"; @@ -534,19 +563,15 @@ static const char* jkScanNodeTableCount = "Count"; static bool scanNodeToJson(const void* obj, cJSON* json) { const SScanPhyNode* scan = (const SScanPhyNode*)obj; bool res = cJSON_AddNumberToObject(json, jkScanNodeTableId, scan->uid); - if (res) { res = cJSON_AddNumberToObject(json, jkScanNodeTableType, scan->tableType); } - if (res) { res = cJSON_AddNumberToObject(json, jkScanNodeTableOrder, scan->order); } - if (res) { res = cJSON_AddNumberToObject(json, jkScanNodeTableCount, scan->count); } - return res; } @@ -559,6 +584,66 @@ static bool scanNodeFromJson(const cJSON* json, void* obj) { return true; } +static const char* jkColIndexColId = "ColId"; +static const char* jkColIndexColIndex = "ColIndex"; +static const char* jkColIndexFlag = "Flag"; +static const char* jkColIndexName = "Name"; + +static bool colIndexToJson(const void* obj, cJSON* json) { + const SColIndex* col = (const SColIndex*)obj; + bool res = cJSON_AddNumberToObject(json, jkColIndexColId, col->colId); + if (res) { + res = cJSON_AddNumberToObject(json, jkColIndexColIndex, col->colIndex); + } + if (res) { + res = cJSON_AddNumberToObject(json, jkColIndexFlag, col->flag); + } + if (res) { + res = cJSON_AddStringToObject(json, jkColIndexName, col->name); + } + return res; +} + +static bool colIndexFromJson(const cJSON* json, void* obj) { + SColIndex* col = (SColIndex*)obj; + col->colId = getNumber(json, jkColIndexColId); + col->colIndex = getNumber(json, jkColIndexColIndex); + col->flag = getNumber(json, jkColIndexFlag); + copyString(json, jkColIndexName, col->name); + return true; +} + +static const char* jkAggNodeAggAlgo = "AggAlgo"; +static const char* jkAggNodeAggSplit = "AggSplit"; +static const char* jkAggNodeExprs = "Exprs"; +static const char* jkAggNodeGroupByList = "GroupByList"; + +static bool aggNodeToJson(const void* obj, cJSON* json) { + const SAggPhyNode* agg = (const SAggPhyNode*)obj; + bool res = cJSON_AddNumberToObject(json, jkAggNodeAggAlgo, agg->aggAlgo); + if (res) { + res = cJSON_AddNumberToObject(json, jkAggNodeAggSplit, agg->aggSplit); + } + if (res) { + res = addArray(json, jkAggNodeExprs, exprInfoToJson, agg->pExprs); + } + if (res) { + res = addArray(json, jkAggNodeGroupByList, colIndexToJson, agg->pGroupByList); + } + return res; +} + +static bool aggNodeFromJson(const cJSON* json, void* obj) { + SAggPhyNode* agg = (SAggPhyNode*)obj; + agg->aggAlgo = getNumber(json, jkAggNodeAggAlgo); + agg->aggSplit = getNumber(json, jkAggNodeAggSplit); + bool res = fromArray(json, jkAggNodeExprs, exprInfoFromJson, &agg->pExprs, sizeof(SExprInfo)); + if (res) { + res = fromArray(json, jkAggNodeGroupByList, colIndexFromJson, &agg->pGroupByList, sizeof(SExprInfo)); + } + return res; +} + static const char* jkTableScanNodeFlag = "Flag"; static const char* jkTableScanNodeWindow = "Window"; static const char* jkTableScanNodeTagsConditions = "TagsConditions"; @@ -667,10 +752,10 @@ static bool specificPhyNodeToJson(const void* obj, cJSON* json) { case OP_SystemTableScan: return scanNodeToJson(obj, json); case OP_Aggregate: - break; // todo + return aggNodeToJson(obj, json); case OP_Project: return true; - case OP_Groupby: + // case OP_Groupby: case OP_Limit: case OP_SLimit: case OP_TimeWindow: @@ -708,7 +793,7 @@ static bool specificPhyNodeFromJson(const cJSON* json, void* obj) { break; // todo case OP_Project: return true; - case OP_Groupby: + // case OP_Groupby: case OP_Limit: case OP_SLimit: case OP_TimeWindow: @@ -735,12 +820,15 @@ static bool specificPhyNodeFromJson(const cJSON* json, void* obj) { static const char* jkPnodeName = "Name"; static const char* jkPnodeTargets = "Targets"; static const char* jkPnodeConditions = "Conditions"; -static const char* jkPnodeSchema = "InputSchema"; +static const char* jkPnodeSchema = "TargetSchema"; static const char* jkPnodeChildren = "Children"; // The 'pParent' field do not need to be serialized. static bool phyNodeToJson(const void* obj, cJSON* jNode) { const SPhyNode* phyNode = (const SPhyNode*)obj; - bool res = cJSON_AddStringToObject(jNode, jkPnodeName, phyNode->info.name); + bool res = cJSON_AddNumberToObject(jNode, jkPnodeType, phyNode->info.type); + if (res) { + res = cJSON_AddStringToObject(jNode, jkPnodeName, phyNode->info.name); + } if (res) { res = addArray(jNode, jkPnodeTargets, exprInfoToJson, phyNode->pTargets); } @@ -762,8 +850,8 @@ static bool phyNodeToJson(const void* obj, cJSON* jNode) { static bool phyNodeFromJson(const cJSON* json, void* obj) { SPhyNode* node = (SPhyNode*) obj; - node->info.name = getString(json, jkPnodeName); - node->info.type = opNameToOpType(node->info.name); + node->info.type = getNumber(json, jkPnodeType); + node->info.name = opTypeToOpName(node->info.type); bool res = fromArray(json, jkPnodeTargets, exprInfoFromJson, &node->pTargets, sizeof(SExprInfo)); if (res) { @@ -910,8 +998,7 @@ static SSubplan* subplanFromJson(const cJSON* json) { } bool res = fromObject(json, jkSubplanId, subplanIdFromJson, &subplan->id, true); if (res) { - size_t size = MAX(sizeof(SPhyNode), sizeof(SScanPhyNode)); - res = fromObjectWithAlloc(json, jkSubplanNode, phyNodeFromJson, (void**)&subplan->pNode, size, false); + res = fromPnode(json, jkSubplanNode, phyNodeFromJson, (void**)&subplan->pNode); } if (res) { res = fromObjectWithAlloc(json, jkSubplanDataSink, dataSinkFromJson, (void**)&subplan->pDataSink, sizeof(SDataSink), false); diff --git a/source/libs/planner/src/planner.c b/source/libs/planner/src/planner.c index 21f57d95d5..bf815b26b2 100644 --- a/source/libs/planner/src/planner.c +++ b/source/libs/planner/src/planner.c @@ -65,9 +65,9 @@ int32_t qCreateQueryDag(const struct SQueryNode* pNode, struct SQueryDag** pDag, } if (pLogicPlan->info.type != QNODE_MODIFY) { -// char* str = NULL; -// queryPlanToString(pLogicPlan, &str); -// printf("%s\n", str); + char* str = NULL; + queryPlanToString(pLogicPlan, &str); + printf("%s\n", str); } code = optimizeQueryPlan(pLogicPlan); diff --git a/source/libs/planner/test/phyPlanTests.cpp b/source/libs/planner/test/phyPlanTests.cpp index 29f6e48dc7..6d9e08e829 100644 --- a/source/libs/planner/test/phyPlanTests.cpp +++ b/source/libs/planner/test/phyPlanTests.cpp @@ -30,6 +30,21 @@ void* myCalloc(size_t nmemb, size_t size) { class PhyPlanTest : public Test { protected: + void pushAgg(int32_t aggOp) { + unique_ptr agg((SQueryPlanNode*)myCalloc(1, sizeof(SQueryPlanNode))); + agg->info.type = aggOp; + agg->pExpr = taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES); + unique_ptr expr((SExprInfo*)myCalloc(1, sizeof(SExprInfo))); + expr->base.resSchema.type = TSDB_DATA_TYPE_INT; + expr->base.resSchema.bytes = tDataTypes[TSDB_DATA_TYPE_INT].bytes; + expr->pExpr = (tExprNode*)myCalloc(1, sizeof(tExprNode)); + expr->pExpr->nodeType = TEXPR_FUNCTION_NODE; + strcpy(expr->pExpr->_function.functionName, "Count"); + SExprInfo* item = expr.release(); + taosArrayPush(agg->pExpr, &item); + pushNode(agg.release()); + } + void pushScan(const string& db, const string& table, int32_t scanOp) { shared_ptr meta = mockCatalogService->getTableMeta(db, table); EXPECT_TRUE(meta); @@ -95,10 +110,11 @@ protected: private: void pushNode(SQueryPlanNode* node) { if (logicPlan_) { - // todo - } else { - logicPlan_.reset(node); + node->pChildren = taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES); + SQueryPlanNode* child = logicPlan_.release(); + taosArrayPush(node->pChildren, &child); } + logicPlan_.reset(node); } void copySchemaMeta(STableMeta** dst, const STableMeta* src) { @@ -174,6 +190,16 @@ TEST_F(PhyPlanTest, superTableScanTest) { // todo check } +// select count(*) from table +TEST_F(PhyPlanTest, simpleAggTest) { + pushScan("test", "t1", QNODE_TABLESCAN); + pushAgg(QNODE_AGGREGATE); + ASSERT_EQ(run(), TSDB_CODE_SUCCESS); + explain(); + SQueryDag* dag = result(); + // todo check +} + // insert into t values(...) TEST_F(PhyPlanTest, insertTest) { ASSERT_EQ(run("test", "insert into t1 values (now, 1, \"beijing\")"), TSDB_CODE_SUCCESS); diff --git a/source/libs/qworker/inc/qworkerInt.h b/source/libs/qworker/inc/qworkerInt.h index 1f8c1e2931..c07321ea4b 100644 --- a/source/libs/qworker/inc/qworkerInt.h +++ b/source/libs/qworker/inc/qworkerInt.h @@ -42,8 +42,7 @@ enum { QW_EVENT_READY, QW_EVENT_FETCH, QW_EVENT_DROP, - QW_EVENT_SCH_SINK, - QW_EVENT_SCH_QUERY, + QW_EVENT_CQUERY, QW_EVENT_MAX, }; @@ -138,10 +137,10 @@ typedef struct SQWorkerMgmt { #define QW_IDS() sId, qId, tId #define QW_FPARAMS() mgmt, QW_IDS() -#define QW_IS_EVENT_RECEIVED(ctx, event) ((ctx)->events[event] == QW_EVENT_RECEIVED) -#define QW_IS_EVENT_PROCESSED(ctx, event) ((ctx)->events[event] == QW_EVENT_PROCESSED) -#define QW_SET_EVENT_RECEIVED(ctx, event) ((ctx)->events[event] = QW_EVENT_RECEIVED) -#define QW_SET_EVENT_PROCESSED(ctx, event) ((ctx)->events[event] = QW_EVENT_PROCESSED) +#define QW_IS_EVENT_RECEIVED(ctx, event) (atomic_load_8(&(ctx)->events[event]) == QW_EVENT_RECEIVED) +#define QW_IS_EVENT_PROCESSED(ctx, event) (atomic_load_8(&(ctx)->events[event]) == QW_EVENT_PROCESSED) +#define QW_SET_EVENT_RECEIVED(ctx, event) atomic_store_8(&(ctx)->events[event], QW_EVENT_RECEIVED) +#define QW_SET_EVENT_PROCESSED(ctx, event) atomic_store_8(&(ctx)->events[event], QW_EVENT_PROCESSED) #define QW_IN_EXECUTOR(ctx) ((ctx)->phase == QW_PHASE_PRE_QUERY || (ctx)->phase == QW_PHASE_PRE_CQUERY || (ctx)->phase == QW_PHASE_PRE_FETCH || (ctx)->phase == QW_PHASE_PRE_SINK) diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 5693f3c3a4..ba4669fa74 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -269,7 +269,19 @@ int32_t qwAddTaskCtx(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tI QW_RET(qwAddTaskCtxImpl(QW_FPARAMS(), 0, 0, NULL)); } +int32_t qwGetTaskCtx(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWTaskCtx **ctx) { + char id[sizeof(qId) + sizeof(tId)] = {0}; + QW_SET_QTID(id, qId, tId); + + *ctx = taosHashGet(mgmt->ctxHash, id, sizeof(id)); + if (NULL == (*ctx)) { + QW_TASK_ELOG("ctx not in ctxHash, id:%s", id); + QW_ERR_RET(TSDB_CODE_QRY_RES_CACHE_NOT_EXIST); + } + return TSDB_CODE_SUCCESS; + +} int32_t qwAcquireTaskCtx(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int32_t rwType, SQWTaskCtx **ctx) { char id[sizeof(qId) + sizeof(tId)] = {0}; @@ -422,8 +434,11 @@ int32_t qwDropTask(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, QW_ERR_JRET(TSDB_CODE_QRY_DUPLICATTED_OPERATION); } - if (ctx->taskHandle && QW_IN_EXECUTOR(ctx)) { - QW_ERR_JRET(qKillTask(ctx->taskHandle)); + if (QW_IN_EXECUTOR(ctx)) { + if (ctx->taskHandle) { + QW_ERR_JRET(qKillTask(ctx->taskHandle)); + } + QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_DROPPING)); } else if (ctx->phase > 0) { QW_ERR_JRET(qwDropTaskStatus(QW_FPARAMS())); @@ -450,44 +465,6 @@ _return: QW_RET(code); } -int32_t qwScheduleDataSink(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWTaskCtx *ctx, SRpcMsg *pMsg) { - if (atomic_load_8(&handles->sinkScheduled)) { - qDebug("data sink already scheduled"); - return TSDB_CODE_SUCCESS; - } - - SSinkDataReq * req = (SSinkDataReq *)rpcMallocCont(sizeof(SSinkDataReq)); - if (NULL == req) { - qError("rpcMallocCont %d failed", (int32_t)sizeof(SSinkDataReq)); - QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); - } - - req->header.vgId = mgmt->nodeId; - req->sId = sId; - req->queryId = qId; - req->taskId = tId; - - SRpcMsg pNewMsg = { - .handle = pMsg->handle, - .ahandle = pMsg->ahandle, - .msgType = TDMT_VND_SCHEDULE_DATA_SINK, - .pCont = req, - .contLen = sizeof(SSinkDataReq), - .code = 0, - }; - - int32_t code = (*mgmt->putToQueueFp)(mgmt->nodeObj, &pNewMsg); - if (TSDB_CODE_SUCCESS != code) { - qError("put data sink schedule msg to queue failed, code:%x", code); - rpcFreeCont(req); - QW_ERR_RET(code); - } - - qDebug("put data sink schedule msg to query queue"); - - return TSDB_CODE_SUCCESS; -} - int32_t qwGetResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void **rspMsg, SOutputData *pOutput) { @@ -572,10 +549,8 @@ int32_t qwHandleTaskEvent(QW_FPARAMS_DEF, int32_t phase, SQWPhaseInput *input, S switch (phase) { case QW_PHASE_PRE_QUERY: { QW_ERR_JRET(qwAddAcquireTaskCtx(QW_FPARAMS(), QW_READ, &ctx)); - - QW_LOCK(QW_WRITE, &ctx->lock); - - locked = true; + + ctx->phase = phase; assert(!QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_CANCEL)); @@ -653,6 +628,8 @@ int32_t qwHandleTaskEvent(QW_FPARAMS_DEF, int32_t phase, SQWPhaseInput *input, S locked = true; + ctx->phase = phase; + if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_CANCEL)) { QW_TASK_WLOG("task already cancelled", NULL); output->needStop = true; @@ -685,6 +662,32 @@ int32_t qwHandleTaskEvent(QW_FPARAMS_DEF, int32_t phase, SQWPhaseInput *input, S } break; } + case QW_PHASE_POST_FETCH: { + QW_ERR_JRET(qwAddAcquireTaskCtx(QW_FPARAMS(), QW_READ, &ctx)); + + QW_LOCK(QW_WRITE, &ctx->lock); + + locked = true; + + if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_CANCEL)) { + QW_TASK_WLOG("task already cancelled", NULL); + output->needStop = true; + output->rspCode = TSDB_CODE_QRY_TASK_CANCELLED; + QW_ERR_JRET(TSDB_CODE_QRY_TASK_CANCELLED); + } + + if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) { + QW_TASK_WLOG("task is dropping", NULL); + output->needStop = true; + output->rspCode = TSDB_CODE_QRY_TASK_DROPPING; + } else if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_CANCEL)) { + QW_TASK_WLOG("task is cancelling", NULL); + output->needStop = true; + output->rspCode = TSDB_CODE_QRY_TASK_CANCELLING; + } + break; + } + } @@ -783,6 +786,93 @@ _return: QW_RET(rspCode); } +int32_t qwProcessCQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWMsg *qwMsg) { + int32_t code = 0; + bool queryRsped = false; + bool needStop = false; + struct SSubplan *plan = NULL; + int32_t rspCode = 0; + SQWPhaseInput input = {0}; + SQWPhaseOutput output = {0}; + SQWTaskCtx *ctx = NULL; + void *rsp = NULL; + int32_t dataLen = 0; + + QW_ERR_JRET(qwHandleTaskEvent(QW_FPARAMS(), QW_PHASE_PRE_CQUERY, &input, &output)); + + needStop = output.needStop; + code = output.rspCode; + + if (needStop) { + QW_TASK_DLOG("task need stop", NULL); + QW_ERR_JRET(code); + } + + QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx)); + + qTaskInfo_t taskHandle = ctx->taskHandle; + DataSinkHandle sinkHandle = ctx->sinkHandle; + + code = qExecTask(taskHandle, &sinkHandle); + if (code) { + QW_TASK_ELOG("qExecTask failed, code:%x", code); + QW_ERR_JRET(code); + } + + if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) { + QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_CQUERY); + + SOutputData sOutput = {0}; + QW_ERR_JRET(qwGetResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput)); + + if (NULL == rsp) { + QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_FETCH); + } + + // Note: schedule data sink firstly and will schedule query after it's done + if (sOutput.scheduleJobNo) { + if (sOutput.scheduleJobNo > ctx.sinkId) { + QW_TASK_DLOG("sink need schedule, scheduleJobNo:%d", sOutput.scheduleJobNo); + + ctx.sinkId = sOutput.scheduleJobNo; + QW_ERR_JRET(qwBuildAndSendSchSinkMsg(QW_FPARAMS(), qwMsg->connection)); + } + } else if ((!sOutput.queryEnd) && (DS_BUF_LOW == sOutput.bufStatus || DS_BUF_EMPTY == sOutput.bufStatus)) { + QW_TASK_DLOG("task not end, need to continue, bufStatus:%d", sOutput.bufStatus); + + if (!QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_CQUERY)) { + QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_CQUERY); + + QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXECUTING)); + + QW_ERR_RET(qwBuildAndSendCQueryMsg(QW_FPARAMS(), qwMsg->connection)); + } + } + + if (rsp) { + qwBuildFetchRsp(rsp, &sOutput, dataLen); + } + + } + +_return: + + qwHandleTaskEvent(QW_FPARAMS(), QW_PHASE_POST_CQUERY, &input, &output); + + if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) { + QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH); + + if (code) { + qwFreeFetchRsp(rsp); + rsp = NULL; + qwBuildAndSendFetchRsp(qwMsg->connection, rsp, 0, code); + } else if (rsp) { + qwBuildAndSendFetchRsp(qwMsg->connection, rsp, dataLen, code); + } + } + + QW_RET(rspCode); +} int32_t qwProcessFetch(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWMsg *qwMsg) { @@ -811,24 +901,33 @@ int32_t qwProcessFetch(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t QW_ERR_JRET(code); } - QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), QW_READ, &ctx)); - - QW_LOCK(QW_WRITE, &ctx->lock); - - locked = true; - + QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx)); + SOutputData sOutput = {0}; QW_ERR_JRET(qwGetResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput)); + if (NULL == rsp) { + QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_FETCH); + } + // Note: schedule data sink firstly and will schedule query after it's done - if (sOutput.needSchedule) { - QW_TASK_DLOG("sink need schedule, queryEnd:%d", sOutput.queryEnd); - if (sOutput.needSchedule > ctx.sinkId) { - QW_ERR_RET(qwScheduleDataSink(ctx, mgmt, sId, qId, tId, pMsg)); + if (sOutput.scheduleJobNo) { + if (sOutput.scheduleJobNo > ctx.sinkId) { + QW_TASK_DLOG("sink need schedule, scheduleJobNo:%d", sOutput.scheduleJobNo); + + ctx.sinkId = sOutput.scheduleJobNo; + QW_ERR_JRET(qwBuildAndSendSchSinkMsg(QW_FPARAMS(), qwMsg->connection)); } } else if ((!sOutput.queryEnd) && (DS_BUF_LOW == sOutput.bufStatus || DS_BUF_EMPTY == sOutput.bufStatus)) { - QW_TASK_DLOG("task not end, need to continue, bufStatus:%d", output.bufStatus); - QW_ERR_RET(qwScheduleQuery(ctx, mgmt, sId, qId, tId, pMsg)); + QW_TASK_DLOG("task not end, need to continue, bufStatus:%d", sOutput.bufStatus); + + if (!QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_CQUERY)) { + QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_CQUERY); + + QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXECUTING)); + + QW_ERR_RET(qwBuildAndSendCQueryMsg(QW_FPARAMS(), qwMsg->connection)); + } } if (rsp) { @@ -837,13 +936,7 @@ int32_t qwProcessFetch(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t _return: - if (locked) { - QW_UNLOCK(QW_WRITE, &ctx->lock); - } - - if (ctx) { - qwReleaseTaskCtx(QW_READ, mgmt); - } + qwHandleTaskEvent(QW_FPARAMS(), QW_PHASE_POST_FETCH, &input, &output); if (code) { qwFreeFetchRsp(rsp); @@ -852,6 +945,7 @@ _return: } else if (rsp) { qwBuildAndSendFetchRsp(qwMsg->connection, rsp, dataLen, code); } + QW_RET(code); } @@ -901,14 +995,14 @@ int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qW mgmt->cfg.maxSchTaskNum = QWORKER_DEFAULT_SCH_TASK_NUMBER; } - mgmt->schHash = taosHashInit(mgmt->cfg.maxSchedulerNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_NO_LOCK); + mgmt->schHash = taosHashInit(mgmt->cfg.maxSchedulerNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK); if (NULL == mgmt->schHash) { tfree(mgmt); qError("init %d scheduler hash failed", mgmt->cfg.maxSchedulerNum); QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - mgmt->ctxHash = taosHashInit(mgmt->cfg.maxTaskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); + mgmt->ctxHash = taosHashInit(mgmt->cfg.maxTaskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); if (NULL == mgmt->ctxHash) { taosHashCleanup(mgmt->schHash); mgmt->schHash = NULL; @@ -1104,47 +1198,3 @@ _return: } -int32_t qwScheduleQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWTaskCtx *handles, SRpcMsg *pMsg) { - if (atomic_load_8(&handles->queryScheduled)) { - QW_SCH_TASK_ELOG("query already scheduled, queryScheduled:%d", handles->queryScheduled); - return TSDB_CODE_SUCCESS; - } - - QW_ERR_RET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXECUTING)); - - SQueryContinueReq * req = (SQueryContinueReq *)rpcMallocCont(sizeof(SQueryContinueReq)); - if (NULL == req) { - QW_SCH_TASK_ELOG("rpcMallocCont %d failed", (int32_t)sizeof(SQueryContinueReq)); - QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); - } - - req->header.vgId = mgmt->nodeId; - req->sId = sId; - req->queryId = qId; - req->taskId = tId; - - SRpcMsg pNewMsg = { - .handle = pMsg->handle, - .ahandle = pMsg->ahandle, - .msgType = TDMT_VND_QUERY_CONTINUE, - .pCont = req, - .contLen = sizeof(SQueryContinueReq), - .code = 0, - }; - - int32_t code = (*mgmt->putToQueueFp)(mgmt->nodeObj, &pNewMsg); - if (TSDB_CODE_SUCCESS != code) { - QW_SCH_TASK_ELOG("put query continue msg to queue failed, code:%x", code); - rpcFreeCont(req); - QW_ERR_RET(code); - } - - handles->queryScheduled = true; - - QW_SCH_TASK_DLOG("put query continue msg to query queue, vgId:%d", mgmt->nodeId); - - return TSDB_CODE_SUCCESS; -} - - - diff --git a/source/libs/qworker/src/qworkerMsg.c b/source/libs/qworker/src/qworkerMsg.c index 7d5a1836a4..3053a6d15d 100644 --- a/source/libs/qworker/src/qworkerMsg.c +++ b/source/libs/qworker/src/qworkerMsg.c @@ -226,6 +226,42 @@ int32_t qwBuildAndSendShowFetchRsp(SRpcMsg *pMsg, SVShowTablesFetchReq* pFetchRe return TSDB_CODE_SUCCESS; } + +int32_t qwBuildAndSendSchSinkMsg(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, void *connection) { + SRpcMsg *pMsg = (SRpcMsg *)connection; + SSinkDataReq * req = (SSinkDataReq *)rpcMallocCont(sizeof(SSinkDataReq)); + if (NULL == req) { + qError("rpcMallocCont %d failed", (int32_t)sizeof(SSinkDataReq)); + QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + req->header.vgId = mgmt->nodeId; + req->sId = sId; + req->queryId = qId; + req->taskId = tId; + + SRpcMsg pNewMsg = { + .handle = pMsg->handle, + .ahandle = pMsg->ahandle, + .msgType = TDMT_VND_SCHEDULE_DATA_SINK, + .pCont = req, + .contLen = sizeof(SSinkDataReq), + .code = 0, + }; + + int32_t code = (*mgmt->putToQueueFp)(mgmt->nodeObj, &pNewMsg); + if (TSDB_CODE_SUCCESS != code) { + qError("put data sink schedule msg to queue failed, code:%x", code); + rpcFreeCont(req); + QW_ERR_RET(code); + } + + qDebug("put data sink schedule msg to query queue"); + + return TSDB_CODE_SUCCESS; +} + + int32_t qwSetAndSendReadyRsp(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SRpcMsg *pMsg) { SQWSchStatus *sch = NULL; SQWTaskStatus *task = NULL; @@ -268,14 +304,8 @@ _return: } -int32_t qwScheduleQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWTaskCtx *handles, SRpcMsg *pMsg) { - if (atomic_load_8(&handles->queryScheduled)) { - QW_SCH_TASK_ELOG("query already scheduled, queryScheduled:%d", handles->queryScheduled); - return TSDB_CODE_SUCCESS; - } - - QW_ERR_RET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXECUTING)); - +int32_t qwBuildAndSendCQueryMsg(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, void *connection) { + SRpcMsg *pMsg = (SRpcMsg *)connection; SQueryContinueReq * req = (SQueryContinueReq *)rpcMallocCont(sizeof(SQueryContinueReq)); if (NULL == req) { QW_SCH_TASK_ELOG("rpcMallocCont %d failed", (int32_t)sizeof(SQueryContinueReq)); @@ -303,8 +333,6 @@ int32_t qwScheduleQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t QW_ERR_RET(code); } - handles->queryScheduled = true; - QW_SCH_TASK_DLOG("put query continue msg to query queue, vgId:%d", mgmt->nodeId); return TSDB_CODE_SUCCESS; @@ -338,74 +366,31 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { QW_RET(qwProcessQuery(QW_FPARAMS(), &qwMsg)); } -int32_t qWorkerProcessQueryContinueMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { +int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { int32_t code = 0; int8_t status = 0; bool queryDone = false; - SQueryContinueReq *req = (SQueryContinueReq *)pMsg->pCont; + SQueryContinueReq *msg = (SQueryContinueReq *)pMsg->pCont; bool needStop = false; SQWTaskCtx *handles = NULL; - QW_ERR_JRET(qwAcquireTaskCtx(QW_READ, qWorkerMgmt, req->queryId, req->taskId, &handles)); - QW_LOCK(QW_WRITE, &handles->lock); + if (NULL == msg || pMsg->contLen <= sizeof(*msg)) { + QW_ELOG("invalid cquery msg, contLen:%d", pMsg->contLen); + QW_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); + } - qTaskInfo_t taskHandle = handles->taskHandle; - DataSinkHandle sinkHandle = handles->sinkHandle; - - QW_UNLOCK(QW_WRITE, &handles->lock); - qwReleaseTaskResCache(QW_READ, qWorkerMgmt); + msg->sId = be64toh(msg->sId); + msg->queryId = be64toh(msg->queryId); + msg->taskId = be64toh(msg->taskId); + msg->contentLen = ntohl(msg->contentLen); - QW_ERR_JRET(qwCheckAndProcessTaskDrop(qWorkerMgmt, req->sId, req->queryId, req->taskId, &needStop)); - if (needStop) { - qWarn("task need stop"); + uint64_t sId = msg->sId; + uint64_t qId = msg->queryId; + uint64_t tId = msg->taskId; - QW_ERR_JRET(qwAcquireTaskCtx(QW_READ, qWorkerMgmt, req->queryId, req->taskId, &handles)); - QW_LOCK(QW_WRITE, &handles->lock); - if (handles->needRsp) { - qwBuildAndSendQueryRsp(pMsg, TSDB_CODE_QRY_TASK_CANCELLED); - handles->needRsp = false; - } - QW_UNLOCK(QW_WRITE, &handles->lock); - qwReleaseTaskResCache(QW_READ, qWorkerMgmt); + SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .connection = pMsg}; - QW_ERR_RET(TSDB_CODE_QRY_TASK_CANCELLED); - } - - DataSinkHandle newHandle = NULL; - code = qExecTask(taskHandle, &newHandle); - if (code) { - qError("qExecTask failed, code:%x", code); - QW_ERR_JRET(code); - } - - if (sinkHandle != newHandle) { - qError("data sink mis-match"); - QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); - } - -_return: - - QW_ERR_JRET(qwAcquireTaskCtx(QW_READ, qWorkerMgmt, req->queryId, req->taskId, &handles)); - QW_LOCK(QW_WRITE, &handles->lock); - - if (handles->needRsp) { - code = qwBuildAndSendQueryRsp(pMsg, code); - handles->needRsp = false; - } - handles->queryScheduled = false; - - QW_UNLOCK(QW_WRITE, &handles->lock); - qwReleaseTaskResCache(QW_READ, qWorkerMgmt); - - if (TSDB_CODE_SUCCESS != code) { - status = JOB_TASK_STATUS_FAILED; - } else { - status = JOB_TASK_STATUS_PARTIAL_SUCCEED; - } - - code = qwQueryPostProcess(qWorkerMgmt, req->sId, req->queryId, req->taskId, status, code); - - QW_RET(code); + QW_RET(qwProcessCQuery(QW_FPARAMS(), &qwMsg)); } diff --git a/source/libs/tkv/CMakeLists.txt b/source/libs/tdb/CMakeLists.txt similarity index 67% rename from source/libs/tkv/CMakeLists.txt rename to source/libs/tdb/CMakeLists.txt index fec3f37cd5..647771fd2d 100644 --- a/source/libs/tkv/CMakeLists.txt +++ b/source/libs/tdb/CMakeLists.txt @@ -1,17 +1,21 @@ -aux_source_directory(src TKV_SRC) -add_library(tkv STATIC ${TKV_SRC}) +aux_source_directory(src TDB_SRC) +add_library(tdb STATIC ${TDB_SRC}) # target_include_directories( # tkv # PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/tkv" # PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" # ) target_include_directories( - tkv + tdb PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/inc" PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/src/inc" ) target_link_libraries( - tkv + tdb PUBLIC os PUBLIC util -) \ No newline at end of file +) + +if(${BUILD_TEST}) + # add_subdirectory(test) +endif(${BUILD_TEST}) diff --git a/source/libs/tkv/inc/tkv.h b/source/libs/tdb/inc/tdb.h similarity index 69% rename from source/libs/tkv/inc/tkv.h rename to source/libs/tdb/inc/tdb.h index 00534d2827..40d79de821 100644 --- a/source/libs/tkv/inc/tkv.h +++ b/source/libs/tdb/inc/tdb.h @@ -13,8 +13,8 @@ * along with this program. If not, see . */ -#ifndef _TD_TKV_H_ -#define _TD_TKV_H_ +#ifndef _TD_TDB_H_ +#define _TD_TDB_H_ #include "os.h" @@ -22,18 +22,29 @@ extern "C" { #endif +typedef enum { + TDB_BTREE = 0, + TDB_HASH, + TDB_HEAP, +} tdb_db_t; + // Forward declaration -typedef struct TDB TDB; -typedef struct TDB_ENV TDB_ENV; +typedef struct TDB TDB; +typedef struct TDB_CURSOR TDB_CURSOR; // SKey typedef struct { - void * bdata; + void* bdata; uint32_t size; } TDB_KEY, TDB_VALUE; +// TDB Operations +int tdbCreateDB(TDB** dbpp); +int tdbOpenDB(TDB* dbp, tdb_db_t type, uint32_t flags); +int tdbCloseDB(TDB* dbp, uint32_t flags); + #ifdef __cplusplus } #endif -#endif /*_TD_TKV_H_*/ \ No newline at end of file +#endif /*_TD_TDB_H_*/ \ No newline at end of file diff --git a/source/dnode/vnode/src/inc/vnodeSync.h b/source/libs/tdb/src/inc/tdbBtree.h similarity index 83% rename from source/dnode/vnode/src/inc/vnodeSync.h rename to source/libs/tdb/src/inc/tdbBtree.h index e82979551d..c68f94bb48 100644 --- a/source/dnode/vnode/src/inc/vnodeSync.h +++ b/source/libs/tdb/src/inc/tdbBtree.h @@ -13,21 +13,21 @@ * along with this program. If not, see . */ -#ifndef _TD_VNODE_SYNC_H_ -#define _TD_VNODE_SYNC_H_ +#ifndef _TD_TDB_BTREE_H_ +#define _TD_TDB_BTREE_H_ -// #include "sync.h" +#include "tkvDef.h" #ifdef __cplusplus extern "C" { #endif typedef struct { - /* data */ -} SVnodeSync; + pgid_t root; // root page number +} TDB_BTREE; #ifdef __cplusplus } #endif -#endif /*_TD_VNODE_SYNC_H_*/ \ No newline at end of file +#endif /*_TD_TDB_BTREE_H_*/ \ No newline at end of file diff --git a/source/dnode/vnode/src/inc/vnodeFS.h b/source/libs/tdb/src/inc/tdbBufPool.h similarity index 57% rename from source/dnode/vnode/src/inc/vnodeFS.h rename to source/libs/tdb/src/inc/tdbBufPool.h index dbec985695..5200d22faa 100644 --- a/source/dnode/vnode/src/inc/vnodeFS.h +++ b/source/libs/tdb/src/inc/tdbBufPool.h @@ -13,35 +13,27 @@ * along with this program. If not, see . */ -#ifndef _TD_VNODE_FS_H_ -#define _TD_VNODE_FS_H_ +#ifndef _TD_TDB_BUF_POOL_H_ +#define _TD_TDB_BUF_POOL_H_ -#include "vnode.h" +#include "tdbPage.h" #ifdef __cplusplus extern "C" { #endif -typedef struct { -} SDir; +typedef struct STdbBufPool STdbBufPool; -typedef struct { -} SFile; - -typedef struct SFS { - void *pImpl; - int (*startEdit)(struct SFS *); - int (*endEdit)(struct SFS *); -} SFS; - -typedef struct { -} SVnodeFS; - -int vnodeOpenFS(SVnode *pVnode); -void vnodeCloseFS(SVnode *pVnode); +int tbpOpen(STdbBufPool **ppTkvBufPool); +int tbpClose(STdbBufPool *pTkvBufPool); +STdbPage *tbpNewPage(STdbBufPool *pTkvBufPool); +int tbpDelPage(STdbBufPool *pTkvBufPool); +STdbPage *tbpFetchPage(STdbBufPool *pTkvBufPool, pgid_t pgid); +int tbpUnpinPage(STdbBufPool *pTkvBufPool, pgid_t pgid); +void tbpFlushPages(STdbBufPool *pTkvBufPool); #ifdef __cplusplus } #endif -#endif /*_TD_VNODE_FS_H_*/ \ No newline at end of file +#endif /*_TD_TDB_BUF_POOL_H_*/ \ No newline at end of file diff --git a/source/libs/tkv/src/inc/tkvEnv.h b/source/libs/tdb/src/inc/tdbDB.h similarity index 73% rename from source/libs/tkv/src/inc/tkvEnv.h rename to source/libs/tdb/src/inc/tdbDB.h index eba442e5a5..479ef77711 100644 --- a/source/libs/tkv/src/inc/tkvEnv.h +++ b/source/libs/tdb/src/inc/tdbDB.h @@ -13,19 +13,28 @@ * along with this program. If not, see . */ -#ifndef _TD_TKV_ENV_H_ -#define _TD_TKV_ENV_H_ +#ifndef _TD_TDB_DB_H_ +#define _TD_TDB_DB_H_ + +#include "tdbBtree.h" +#include "tdbHash.h" #ifdef __cplusplus extern "C" { #endif -struct TDB_ENV { - char *homeDir; + +struct TDB { + pgsize_t pageSize; + tdb_db_t type; + union { + TDB_BTREE btree; + TDB_HASH hash; + } dbam; // Different access methods }; #ifdef __cplusplus } #endif -#endif /*_TD_TKV_ENV_H_*/ \ No newline at end of file +#endif /*_TD_TDB_DB_H_*/ \ No newline at end of file diff --git a/source/libs/tkv/src/inc/tkvDef.h b/source/libs/tdb/src/inc/tdbDef.h similarity index 93% rename from source/libs/tkv/src/inc/tkvDef.h rename to source/libs/tdb/src/inc/tdbDef.h index cd418019be..a04b8cc402 100644 --- a/source/libs/tkv/src/inc/tkvDef.h +++ b/source/libs/tdb/src/inc/tdbDef.h @@ -13,8 +13,8 @@ * along with this program. If not, see . */ -#ifndef _TD_TKV_DEF_H_ -#define _TD_TKV_DEF_H_ +#ifndef _TD_TDB_DEF_H_ +#define _TD_TDB_DEF_H_ #include "os.h" @@ -39,4 +39,4 @@ typedef int32_t pgsize_t; } #endif -#endif /*_TD_TKV_DEF_H_*/ \ No newline at end of file +#endif /*_TD_TDB_DEF_H_*/ \ No newline at end of file diff --git a/source/libs/tkv/src/inc/tkvDiskMgr.h b/source/libs/tdb/src/inc/tdbDiskMgr.h similarity index 98% rename from source/libs/tkv/src/inc/tkvDiskMgr.h rename to source/libs/tdb/src/inc/tdbDiskMgr.h index 2ebe98ace2..b83a147437 100644 --- a/source/libs/tkv/src/inc/tkvDiskMgr.h +++ b/source/libs/tdb/src/inc/tdbDiskMgr.h @@ -22,7 +22,7 @@ extern "C" { #include "os.h" -#include "tkvDef.h" +#include "tdbDef.h" typedef struct STkvDiskMgr STkvDiskMgr; diff --git a/source/libs/tkv/src/inc/tkvDB.h b/source/libs/tdb/src/inc/tdbHash.h similarity index 85% rename from source/libs/tkv/src/inc/tkvDB.h rename to source/libs/tdb/src/inc/tdbHash.h index 1a45702540..fca19035f1 100644 --- a/source/libs/tkv/src/inc/tkvDB.h +++ b/source/libs/tdb/src/inc/tdbHash.h @@ -13,19 +13,21 @@ * along with this program. If not, see . */ -#ifndef _TD_TKV_DB_H_ -#define _TD_TKV_DB_H_ +#ifndef _TD_TKV_HAHS_H_ +#define _TD_TKV_HAHS_H_ + +#include "tdbDef.h" #ifdef __cplusplus extern "C" { #endif -struct TDB { +typedef struct { // TODO -}; +} TDB_HASH; #ifdef __cplusplus } #endif -#endif /*_TD_TKV_DB_H_*/ \ No newline at end of file +#endif /*_TD_TKV_HAHS_H_*/ \ No newline at end of file diff --git a/source/libs/tkv/src/inc/tkvPage.h b/source/libs/tdb/src/inc/tdbPage.h similarity index 77% rename from source/libs/tkv/src/inc/tkvPage.h rename to source/libs/tdb/src/inc/tdbPage.h index d596d215cd..e7245b6c39 100644 --- a/source/libs/tkv/src/inc/tkvPage.h +++ b/source/libs/tdb/src/inc/tdbPage.h @@ -17,30 +17,24 @@ #define _TD_TKV_PAGE_H_ #include "os.h" -#include "tkvDef.h" +#include "tdbDef.h" #ifdef __cplusplus extern "C" { #endif -typedef struct STkvPage { +typedef struct { pgid_t pgid; int32_t pinCount; bool idDirty; char* pData; -} STkvPage; +} STdbPage; typedef struct { uint16_t dbver; uint16_t pgsize; uint32_t cksm; -} STkvPgHdr; - -// typedef struct { -// SPgHdr chdr; -// uint16_t used; // number of used slots -// uint16_t loffset; // the offset of the starting location of the last slot used -// } SSlottedPgHdr; +} STdbPgHdr; #ifdef __cplusplus } diff --git a/source/libs/tkv/src/tkvBufPool.c b/source/libs/tdb/src/tdbBufPool.c similarity index 92% rename from source/libs/tkv/src/tkvBufPool.c rename to source/libs/tdb/src/tdbBufPool.c index 86bfa0ba3e..bc3c386b0f 100644 --- a/source/libs/tkv/src/tkvBufPool.c +++ b/source/libs/tdb/src/tdbBufPool.c @@ -16,17 +16,17 @@ #include "thash.h" #include "tlist.h" -#include "tkvBufPool.h" -#include "tkvDiskMgr.h" -#include "tkvPage.h" +#include "tdbBufPool.h" +#include "tdbDiskMgr.h" +#include "tdbPage.h" struct SFrameIdWrapper { TD_SLIST_NODE(SFrameIdWrapper); frame_id_t id; }; -struct STkvBufPool { - STkvPage* pages; +struct STdbBufPool { + STdbPage* pages; STkvDiskMgr* pDiskMgr; SHashObj* pgTb; // page_id_t --> frame_id_t TD_SLIST(SFrameIdWrapper) freeList; diff --git a/source/libs/tkv/src/tDiskMgr.c b/source/libs/tdb/src/tdbDiskMgr.c similarity index 98% rename from source/libs/tkv/src/tDiskMgr.c rename to source/libs/tdb/src/tdbDiskMgr.c index fa8f6062d8..71ab5f2589 100644 --- a/source/libs/tkv/src/tDiskMgr.c +++ b/source/libs/tdb/src/tdbDiskMgr.c @@ -13,7 +13,7 @@ * along with this program. If not, see . */ -#include "tkvDiskMgr.h" +#include "tdbDiskMgr.h" struct STkvDiskMgr { char * fname; diff --git a/source/libs/tdb/test/CMakeLists.txt b/source/libs/tdb/test/CMakeLists.txt new file mode 100644 index 0000000000..2d77c1f4e9 --- /dev/null +++ b/source/libs/tdb/test/CMakeLists.txt @@ -0,0 +1,3 @@ +# tdbTest +add_executable(tdbTest "tdbTest.cpp") +target_link_libraries(tdbTest tdb gtest gtest_main) \ No newline at end of file diff --git a/source/libs/tdb/test/tdbTest.cpp b/source/libs/tdb/test/tdbTest.cpp new file mode 100644 index 0000000000..3ff43fdd69 --- /dev/null +++ b/source/libs/tdb/test/tdbTest.cpp @@ -0,0 +1,14 @@ +#include "gtest/gtest.h" + +#include "tdb.h" + +TEST(tdb_api_test, tdb_create_open_close_db_test) { + int ret; + TDB *dbp; + + tdbCreateDB(&dbp); + + tdbOpenDB(dbp, TDB_BTREE, 0); + + tdbCloseDB(dbp, 0); +} \ No newline at end of file diff --git a/source/libs/tkv/src/inc/tkvBufPool.h b/source/libs/tkv/src/inc/tkvBufPool.h deleted file mode 100644 index ec8d177a9a..0000000000 --- a/source/libs/tkv/src/inc/tkvBufPool.h +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#ifndef _TD_TKV_BUF_POOL_H_ -#define _TD_TKV_BUF_POOL_H_ - -#include "tkvPage.h" - -#ifdef __cplusplus -extern "C" { -#endif - -typedef struct STkvBufPool STkvBufPool; - -int tbpOpen(STkvBufPool **ppTkvBufPool); -int tbpClose(STkvBufPool *pTkvBufPool); -STkvPage *tbpNewPage(STkvBufPool *pTkvBufPool); -int tbpDelPage(STkvBufPool *pTkvBufPool); -STkvPage *tbpFetchPage(STkvBufPool *pTkvBufPool, pgid_t pgid); -int tbpUnpinPage(STkvBufPool *pTkvBufPool, pgid_t pgid); -void tbpFlushPages(STkvBufPool *pTkvBufPool); - -#ifdef __cplusplus -} -#endif - -#endif /*_TD_TKV_BUF_POOL_H_*/ \ No newline at end of file diff --git a/source/libs/tkv/test/tDiskMgrTest.cpp b/source/libs/tkv/test/tDiskMgrTest.cpp deleted file mode 100644 index a735fdb33d..0000000000 --- a/source/libs/tkv/test/tDiskMgrTest.cpp +++ /dev/null @@ -1,10 +0,0 @@ -#include "gtest/gtest.h" - -#include "iostream" - -#include "tDiskMgr.h" - -TEST(tDiskMgrTest, simple_test) { - // TODO - std::cout << "This is in tDiskMgrTest::simple_test" << std::endl; -} \ No newline at end of file diff --git a/source/libs/tkv/test/tkvTests.cpp b/source/libs/tkv/test/tkvTests.cpp deleted file mode 100644 index e69de29bb2..0000000000