From ab4b640fffec28abebebac9bca8fc192b1ad8a34 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Thu, 4 Nov 2021 09:28:55 +0800 Subject: [PATCH 1/7] dnode-vnodes --- include/common/taosmsg.h | 98 ++-- include/server/vnode/vnode.h | 17 +- source/dnode/mgmt/inc/dnodeInt.h | 5 +- source/dnode/mgmt/inc/dnodeVnodes.h | 7 +- source/dnode/mgmt/src/dnodeInt.c | 4 +- source/dnode/mgmt/src/dnodeTransport.c | 62 +-- source/dnode/mgmt/src/dnodeVnodes.c | 619 ++++++++++++++++++++++++- source/dnode/vnode/impl/src/vnodeInt.c | 4 +- 8 files changed, 709 insertions(+), 107 deletions(-) diff --git a/include/common/taosmsg.h b/include/common/taosmsg.h index e798a0c42a..49e8892c7d 100644 --- a/include/common/taosmsg.h +++ b/include/common/taosmsg.h @@ -36,7 +36,7 @@ enum { TSDB_MESSAGE_NULL = 0, #endif -// message from client to dnode +// message from client to vnode TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_SUBMIT, "submit" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_QUERY, "query" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_FETCH, "fetch" ) @@ -46,25 +46,12 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_ALTER_TABLE, "alter-table" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_UPDATE_TAG_VAL, "update-tag-val" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_TABLE_META, "table-meta" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_TABLES_META, "tables-meta" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_STABLE_VGROUP, "stable-vgroup" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_CONSUME, "mq-consume" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_QUERY, "mq-query" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_CONNECT, "mq-connect" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_DISCONNECT, "mq-disconnect" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_ACK, "mq-ack" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_RESET, "mq-reset" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_NETWORK_TEST, "nettest" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY0, "dummy0" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY1, "dummy1" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY2, "dummy2" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY3, "dummy3" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY4, "dummy4" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY5, "dummy5" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY6, "dummy6" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY7, "dummy7" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY8, "dummy8" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY9, "dummy9" ) - // message from client to mnode TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CONNECT, "connect" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_ACCT, "create-acct" ) @@ -88,6 +75,7 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_FUNCTION, "create-function" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_ALTER_FUNCTION, "alter-function" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_FUNCTION, "drop-function" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_STABLE, "create-stable" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_STABLE_VGROUP, "stable-vgroup" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_STABLE, "drop-stable" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_ALTER_STABLE, "alter-stable" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_KILL_QUERY, "kill-query" ) @@ -97,54 +85,55 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_SHOW, "show" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_SHOW_RETRIEVE, "retrieve" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_SHOW_RETRIEVE_FUNC, "retrieve-func" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_COMPACT_VNODE, "compact-vnode" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY10, "dummy10" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY11, "dummy11" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY12, "dummy12" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY13, "dummy13" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY14, "dummy14" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY15, "dummy15" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY16, "dummy16" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY17, "dummy17" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY18, "dummy18" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY19, "dummy19" ) +// message from client to qnode +// message from client to dnode +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_NETWORK_TEST, "nettest" ) -// message from mnode to dnode +// message from vnode to vnode +// message from vnode to mnode +// message from vnode to qnode +// message from vnode to dnode + +// message from mnode to vnode TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_STABLE_IN, "create-stable" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_ALTER_STABLE_IN, "alter-stable" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_STABLE_IN, "drop-stable" ) +// message from mnode to mnode +// message from mnode to qnode +// message from mnode to dnode TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_VNODE_IN, "create-vnode" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_ALTER_VNODE_IN, "alter-vnode" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_VNODE_IN, "drop-vnode" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_AUTH_VNODE_IN, "auth-vnode" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_SYNC_VNODE_IN, "sync-vnode" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_COMPACT_VNODE_IN, "compact-vnode" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_MNODE_IN, "create-mnode" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_MNODE_IN, "drop-mnode" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CONFIG_DNODE_IN, "config-dnode" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY20, "dummy20" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY21, "dummy21" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY22, "dummy22" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY23, "dummy23" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY24, "dummy24" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY25, "dummy25" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY26, "dummy26" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY27, "dummy27" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY28, "dummy28" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY29, "dummy29" ) +// message from qnode to vnode +// message from qnode to mnode +// message from qnode to qnode +// message from qnode to dnode + +// message from dnode to vnode // message from dnode to mnode TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_STATUS, "status" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_GRANT, "grant" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_AUTH, "auth" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY30, "dummy30" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY31, "dummy31" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY32, "dummy32" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY33, "dummy33" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY34, "dummy34" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY35, "dummy35" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY36, "dummy36" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY37, "dummy37" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY38, "dummy38" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY39, "dummy39" ) +// message from dnode to qnode +// message from dnode to dnode + +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY0, "dummy0" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY1, "dummy1" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY2, "dummy2" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY3, "dummy3" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY4, "dummy4" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY5, "dummy5" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY6, "dummy6" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY7, "dummy7" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY8, "dummy8" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY9, "dummy9" ) #ifndef TAOS_MESSAGE_C TSDB_MSG_TYPE_MAX // 147 @@ -428,10 +417,6 @@ typedef struct { char tableFname[TSDB_TABLE_FNAME_LEN]; } SDropSTableMsg; -typedef struct { - int32_t vgId; -} SDropVnodeMsg, SSyncVnodeMsg, SCompactVnodeMsg; - typedef struct SColIndex { int16_t colId; // column id int16_t colIndex; // column index in colList if it is a normal column or index in tagColList if a tag @@ -661,9 +646,8 @@ typedef struct { typedef struct { int32_t vgId; - int8_t status; int8_t role; - int8_t reserved[2]; + int8_t reserved[3]; int64_t totalStorage; int64_t compStorage; int64_t pointsWritten; @@ -737,9 +721,18 @@ typedef struct { int8_t replica; int8_t quorum; int8_t selfIndex; - SVnodeDesc nodes[TSDB_MAX_REPLICA]; + SVnodeDesc replicas[TSDB_MAX_REPLICA]; } SCreateVnodeMsg, SAlterVnodeMsg; +typedef struct { + int32_t vgId; +} SDropVnodeMsg, SSyncVnodeMsg, SCompactVnodeMsg; + +typedef struct { + int32_t vgId; + int8_t accessState; +} SAuthVnodeMsg; + typedef struct { char tableFname[TSDB_TABLE_FNAME_LEN]; int16_t createFlag; @@ -1002,6 +995,7 @@ typedef struct { /* data */ } SUpdateTagValRsp; + #pragma pack(pop) #ifdef __cplusplus diff --git a/include/server/vnode/vnode.h b/include/server/vnode/vnode.h index c4f8df79c1..a20a7fd410 100644 --- a/include/server/vnode/vnode.h +++ b/include/server/vnode/vnode.h @@ -27,7 +27,7 @@ extern "C" { typedef struct SVnode SVnode; typedef struct { - char dbName[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN]; + char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN]; int32_t cacheBlockSize; // MB int32_t totalBlocks; int32_t daysPerFile; @@ -47,17 +47,6 @@ typedef struct { SVnodeDesc replicas[TSDB_MAX_REPLICA]; } SVnodeCfg; -typedef struct { - int64_t totalStorage; - int64_t compStorage; - int64_t pointsWritten; - int64_t tablesNum; -} SVnodeStatisic; - -typedef struct { - int8_t syncRole; -} SVnodeStatus; - typedef struct SVnodeMsg { int32_t msgType; int32_t code; @@ -69,9 +58,6 @@ typedef struct SVnodeMsg { int32_t vnodeInit(); void vnodeCleanup(); -int32_t vnodeGetStatistics(SVnode *pVnode, SVnodeStatisic *pStat); -int32_t vnodeGetStatus(SVnode *pVnode, SVnodeStatus *pStatus); - SVnode *vnodeOpen(int32_t vgId, const char *path); void vnodeClose(SVnode *pVnode); int32_t vnodeAlter(SVnode *pVnode, const SVnodeCfg *pCfg); @@ -80,6 +66,7 @@ int32_t vnodeDrop(SVnode *pVnode); int32_t vnodeCompact(SVnode *pVnode); int32_t vnodeSync(SVnode *pVnode); +void vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad); void vnodeProcessMsg(SVnode *pVnode, SVnodeMsg *pMsg); #ifdef __cplusplus diff --git a/source/dnode/mgmt/inc/dnodeInt.h b/source/dnode/mgmt/inc/dnodeInt.h index 472c467ad6..906455dce4 100644 --- a/source/dnode/mgmt/inc/dnodeInt.h +++ b/source/dnode/mgmt/inc/dnodeInt.h @@ -42,7 +42,10 @@ void dnodeCleanup(); EDnStat dnodeGetRunStat(); void dnodeSetRunStat(); -void dnodeGetStartup(SStartupMsg *); + +void dnodeReportStartup(char *name, char *desc); +void dnodeReportStartupFinished(char *name, char *desc); +void dnodeGetStartup(SStartupMsg *); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/inc/dnodeVnodes.h b/source/dnode/mgmt/inc/dnodeVnodes.h index 6e9bce9ae5..2b72ba5d59 100644 --- a/source/dnode/mgmt/inc/dnodeVnodes.h +++ b/source/dnode/mgmt/inc/dnodeVnodes.h @@ -23,9 +23,14 @@ extern "C" { int32_t dnodeInitVnodes(); void dnodeCleanupVnodes(); -void dnodeProcessVnodesMsg(SRpcMsg *pMsg, SEpSet *pEpSet); void dnodeGetVnodes(SVnodeLoads *pVloads); +void dnodeProcessVnodeMgmtMsg(SRpcMsg *pMsg, SEpSet *pEpSet); +void dnodeProcessVnodeWriteMsg(SRpcMsg *pMsg, SEpSet *pEpSet); +void dnodeProcessVnodeSyncMsg(SRpcMsg *pMsg, SEpSet *pEpSet); +void dnodeProcessVnodeQueryMsg(SRpcMsg *pMsg, SEpSet *pEpSet); +void dnodeProcessVnodeFetchMsg(SRpcMsg *pMsg, SEpSet *pEpSet); + #ifdef __cplusplus } #endif diff --git a/source/dnode/mgmt/src/dnodeInt.c b/source/dnode/mgmt/src/dnodeInt.c index e853ba7f14..e7018f4265 100644 --- a/source/dnode/mgmt/src/dnodeInt.c +++ b/source/dnode/mgmt/src/dnodeInt.c @@ -35,14 +35,14 @@ EDnStat dnodeGetRunStat() { return tsInt.runStat; } void dnodeSetRunStat(EDnStat stat) { tsInt.runStat = stat; } -static void dnodeReportStartup(char *name, char *desc) { +void dnodeReportStartup(char *name, char *desc) { SStartupMsg *pStartup = &tsInt.startup; tstrncpy(pStartup->name, name, strlen(pStartup->name)); tstrncpy(pStartup->desc, desc, strlen(pStartup->desc)); pStartup->finished = 0; } -static void dnodeReportStartupFinished(char *name, char *desc) { +void dnodeReportStartupFinished(char *name, char *desc) { SStartupMsg *pStartup = &tsInt.startup; tstrncpy(pStartup->name, name, strlen(pStartup->name)); tstrncpy(pStartup->desc, desc, strlen(pStartup->desc)); diff --git a/source/dnode/mgmt/src/dnodeTransport.c b/source/dnode/mgmt/src/dnodeTransport.c index 18e97e2b9f..265efe3070 100644 --- a/source/dnode/mgmt/src/dnodeTransport.c +++ b/source/dnode/mgmt/src/dnodeTransport.c @@ -34,24 +34,22 @@ static struct { static void dnodeInitMsgFp() { // msg from client to dnode - tsTrans.msgFp[TSDB_MSG_TYPE_SUBMIT] = dnodeProcessVnodesMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_QUERY] = dnodeProcessVnodesMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_FETCH] = dnodeProcessVnodesMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_TABLE] = dnodeProcessVnodesMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_DROP_TABLE] = dnodeProcessVnodesMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_TABLE] = dnodeProcessVnodesMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = dnodeProcessVnodesMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_TABLE_META] = dnodeProcessVnodesMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_TABLES_META] = dnodeProcessVnodesMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_STABLE_VGROUP] = dnodeProcessMnodeMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_MQ_QUERY] = dnodeProcessVnodesMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_MQ_CONSUME] = dnodeProcessVnodesMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_MQ_CONNECT] = dnodeProcessVnodesMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_MQ_DISCONNECT] = dnodeProcessVnodesMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_MQ_ACK] = dnodeProcessVnodesMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_MQ_RESET] = dnodeProcessVnodesMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_NETWORK_TEST] = dnodeProcessDnodeMsg; - + tsTrans.msgFp[TSDB_MSG_TYPE_SUBMIT] = dnodeProcessVnodeWriteMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_QUERY] = dnodeProcessVnodeQueryMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_FETCH] = dnodeProcessVnodeFetchMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_TABLE] = dnodeProcessVnodeWriteMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_DROP_TABLE] = dnodeProcessVnodeWriteMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_TABLE] = dnodeProcessVnodeWriteMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = dnodeProcessVnodeWriteMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_TABLE_META] = dnodeProcessVnodeQueryMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_TABLES_META] = dnodeProcessVnodeQueryMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_MQ_QUERY] = dnodeProcessVnodeQueryMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_MQ_CONSUME] = dnodeProcessVnodeQueryMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_MQ_CONNECT] = dnodeProcessVnodeWriteMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_MQ_DISCONNECT] = dnodeProcessVnodeWriteMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_MQ_ACK] = dnodeProcessVnodeWriteMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_MQ_RESET] = dnodeProcessVnodeWriteMsg; + // msg from client to mnode tsTrans.msgFp[TSDB_MSG_TYPE_CONNECT] = dnodeProcessMnodeMsg; tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_ACCT] = dnodeProcessMnodeMsg; @@ -77,6 +75,7 @@ static void dnodeInitMsgFp() { tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_STABLE] = dnodeProcessMnodeMsg; tsTrans.msgFp[TSDB_MSG_TYPE_DROP_STABLE] = dnodeProcessMnodeMsg; tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_STABLE] = dnodeProcessMnodeMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_STABLE_VGROUP] = dnodeProcessMnodeMsg; tsTrans.msgFp[TSDB_MSG_TYPE_KILL_QUERY] = dnodeProcessMnodeMsg; tsTrans.msgFp[TSDB_MSG_TYPE_KILL_CONN] = dnodeProcessMnodeMsg; tsTrans.msgFp[TSDB_MSG_TYPE_HEARTBEAT] = dnodeProcessMnodeMsg; @@ -85,22 +84,29 @@ static void dnodeInitMsgFp() { tsTrans.msgFp[TSDB_MSG_TYPE_SHOW_RETRIEVE_FUNC] = dnodeProcessMnodeMsg; tsTrans.msgFp[TSDB_MSG_TYPE_COMPACT_VNODE] = dnodeProcessMnodeMsg; - // message from mnode to dnode - tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_STABLE_IN] = dnodeProcessVnodesMsg; + // message from client to dnode + tsTrans.msgFp[TSDB_MSG_TYPE_NETWORK_TEST] = dnodeProcessDnodeMsg; + + // message from mnode to vnode + tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_STABLE_IN] = dnodeProcessVnodeWriteMsg; tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_STABLE_IN_RSP] = dnodeProcessMnodeMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_STABLE_IN] = dnodeProcessVnodesMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_STABLE_IN] = dnodeProcessVnodeWriteMsg; tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_STABLE_IN_RSP] = dnodeProcessMnodeMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_STABLE_IN] = dnodeProcessVnodesMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_DROP_STABLE_IN_RSP] = dnodeProcessMnodeMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_VNODE_IN] = dnodeProcessVnodesMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_DROP_STABLE_IN] = dnodeProcessVnodeWriteMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_DROP_STABLE_IN] = dnodeProcessMnodeMsg; + + // message from mnode to dnode + tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_VNODE_IN] = dnodeProcessVnodeMgmtMsg; tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_VNODE_IN_RSP] = dnodeProcessMnodeMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_VNODE_IN] = dnodeProcessVnodesMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_VNODE_IN] = dnodeProcessVnodeMgmtMsg; tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_VNODE_IN_RSP] = dnodeProcessMnodeMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_DROP_VNODE_IN] = dnodeProcessVnodesMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_DROP_VNODE_IN] = dnodeProcessVnodeMgmtMsg; tsTrans.msgFp[TSDB_MSG_TYPE_DROP_VNODE_IN_RSP] = dnodeProcessMnodeMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_SYNC_VNODE_IN] = dnodeProcessVnodesMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_SYNC_VNODE_IN] = dnodeProcessVnodeMgmtMsg; tsTrans.msgFp[TSDB_MSG_TYPE_SYNC_VNODE_IN_RSP] = dnodeProcessMnodeMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_COMPACT_VNODE_IN] = dnodeProcessVnodesMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_AUTH_VNODE_IN] = dnodeProcessVnodeMgmtMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_AUTH_VNODE_IN_RSP] = dnodeProcessMnodeMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_COMPACT_VNODE_IN] = dnodeProcessVnodeMgmtMsg; tsTrans.msgFp[TSDB_MSG_TYPE_COMPACT_VNODE_IN_RSP] = dnodeProcessMnodeMsg; tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_MNODE_IN] = dnodeProcessMnodeMsg; tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_MNODE_IN_RSP] = dnodeProcessMnodeMsg; diff --git a/source/dnode/mgmt/src/dnodeVnodes.c b/source/dnode/mgmt/src/dnodeVnodes.c index 59f5cf0fd0..c2e143da80 100644 --- a/source/dnode/mgmt/src/dnodeVnodes.c +++ b/source/dnode/mgmt/src/dnodeVnodes.c @@ -14,13 +14,622 @@ */ #define _DEFAULT_SOURCE -#include "dnodeDnode.h" +#include "dnodeVnodes.h" +#include "thash.h" +#include "tqueue.h" +#include "tstep.h" +#include "tthread.h" +#include "tworker.h" #include "vnode.h" -int32_t dnodeInitVnodes() { return vnodeInit(); } +typedef struct { + int32_t vgId; + int32_t refCount; + int8_t dropped; + int8_t accessState; + SVnode *pImpl; + taos_queue pWriteQ; + taos_queue pSyncQ; + taos_queue pApplyQ; + taos_queue pQueryQ; + taos_queue pFetchQ; +} SVnodeObj; -void dnodeCleanupVnodes() { vnodeCleanup(); } +typedef struct { + pthread_t *threadId; + int32_t threadIndex; + int32_t failed; + int32_t opened; + int32_t vnodeNum; + SVnodeObj *pVnodes; +} SVThread; -void dnodeProcessVnodesMsg(SRpcMsg *pMsg, SEpSet *pEpSet) { vnodeProcessMsg(NULL, NULL); } +static struct { + SHashObj *hash; + SWorkerPool mgmtPool; + taos_queue pMgmtQ; + SSteps *pSteps; + int32_t openVnodes; + int32_t totalVnodes; + char file[PATH_MAX + 20]; +} tsVnodes; -void dnodeGetVnodes(SVnodeLoads *pVloads) {} \ No newline at end of file +static int32_t dnodeCreateVnodeWrapper(int32_t vgId, SVnode *pImpl) { + SVnodeObj *pVnode = calloc(1, sizeof(SVnodeObj)); + pVnode->vgId = vgId; + pVnode->refCount = 0; + pVnode->dropped = 0; + pVnode->accessState = TSDB_VN_ALL_ACCCESS; + pVnode->pImpl = pImpl; + pVnode->pWriteQ = NULL; + pVnode->pSyncQ = NULL; + pVnode->pApplyQ = NULL; + pVnode->pQueryQ = NULL; + pVnode->pFetchQ = NULL; + + return taosHashPut(tsVnodes.hash, &vgId, sizeof(int32_t), &pVnode, sizeof(SVnodeObj *)); +} + +static void dnodeDropVnodeWrapper(SVnodeObj *pVnode) { + taosHashRemove(tsVnodes.hash, &pVnode->vgId, sizeof(int32_t)); + + //todo wait all queue empty + pVnode->pWriteQ = NULL; + pVnode->pSyncQ = NULL; + pVnode->pApplyQ = NULL; + pVnode->pQueryQ = NULL; + pVnode->pFetchQ = NULL; +} + +static int32_t dnodeGetVnodesFromHash(SVnodeObj *pVnodes[], int32_t *numOfVnodes) { + void *pIter = taosHashIterate(tsVnodes.hash, NULL); + while (pIter) { + SVnodeObj **ppVnode = pIter; + if (*ppVnode) { + (*numOfVnodes)++; + if (*numOfVnodes >= TSDB_MAX_VNODES) { + dError("vgId:%d, too many open vnodes, exist:%d max:%d", (*ppVnode)->vgId, *numOfVnodes, TSDB_MAX_VNODES); + continue; + } else { + pVnodes[*numOfVnodes - 1] = (*ppVnode); + } + } + + pIter = taosHashIterate(tsVnodes.hash, pIter); + } + + return TSDB_CODE_SUCCESS; +} + +static int32_t dnodeGetVnodesFromFile(SVnodeObj *pVnodes, int32_t *numOfVnodes) { + pVnodes[0].vgId = 2; + pVnodes[0].dropped = 0; + pVnodes[0].vgId = 3; + pVnodes[0].dropped = 0; + return 0; +} + +static int32_t dnodeWriteVnodesToFile() { return 0; } + +static int32_t dnodeCreateVnode(int32_t vgId, SVnodeCfg *pCfg) { + int32_t code = 0; + + char path[PATH_MAX + 20] = {0}; + snprintf(path, sizeof(path),"%s/vnode%d", tsVnodeDir, vgId); + SVnode *pImpl = vnodeCreate(vgId, path, pCfg); + + if (pImpl == NULL) { + code = terrno; + return code; + } + + code = dnodeCreateVnodeWrapper(vgId, pImpl); + if (code != 0) { + vnodeDrop(pImpl); + return code; + } + + code = dnodeWriteVnodesToFile(); + if (code != 0) { + vnodeDrop(pImpl); + return code; + } + + return code; +} + +static int32_t dnodeDropVnode(SVnodeObj *pVnode) { + pVnode->dropped = 1; + + int32_t code = dnodeWriteVnodesToFile(); + if (code != 0) { + pVnode->dropped = 0; + return code; + } + + dnodeDropVnodeWrapper(pVnode); + vnodeDrop(pVnode->pImpl); + dnodeWriteVnodesToFile(); + return 0; +} + +static SVnodeObj *dnodeAcquireVnode(int32_t vgId) { + SVnodeObj *pVnode = NULL; + + taosHashGetClone(tsVnodes.hash, &vgId, sizeof(int32_t), (void *)&pVnode); + if (pVnode == NULL) { + terrno = TSDB_CODE_VND_INVALID_VGROUP_ID; + } + + int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1); + dTrace("vgId:%d, accquire vnode, refCount:%d", pVnode->vgId, refCount); + return pVnode; +} + +static void dnodeReleaseVnode(SVnodeObj *pVnode) { + int32_t refCount = atomic_sub_fetch_32(&pVnode->refCount, 1); + dTrace("vgId:%d, release vnode, refCount:%d", pVnode->vgId, refCount); +} + +static void *dnodeOpenVnodeFunc(void *param) { + SVThread *pThread = param; + + dDebug("thread:%d, start to open %d vnodes", pThread->threadIndex, pThread->vnodeNum); + setThreadName("open-vnodes"); + + for (int32_t v = 0; v < pThread->vnodeNum; ++v) { + SVnodeObj *pVnode = &pThread->pVnodes[v]; + + char stepDesc[TSDB_STEP_DESC_LEN] = {0}; + snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to restore, %d of %d have been opened", pVnode->vgId, + tsVnodes.openVnodes, tsVnodes.totalVnodes); + dnodeReportStartup("open-vnodes", stepDesc); + + char path[PATH_MAX + 20] = {0}; + snprintf(path, sizeof(path),"%s/vnode%d", tsVnodeDir, pVnode->vgId); + SVnode *pImpl = vnodeOpen(pVnode->vgId, path); + if (pImpl == NULL) { + dError("vgId:%d, failed to open vnode by thread:%d", pVnode->vgId, pThread->threadIndex); + pThread->failed++; + } else { + dnodeCreateVnodeWrapper(pVnode->vgId, pImpl); + dDebug("vgId:%d, is opened by thread:%d", pVnode->vgId, pThread->threadIndex); + pThread->opened++; + } + + atomic_add_fetch_32(&tsVnodes.openVnodes, 1); + } + + dDebug("thread:%d, total vnodes:%d, opened:%d failed:%d", pThread->threadIndex, pThread->vnodeNum, pThread->opened, + pThread->failed); + return NULL; +} + +static int32_t dnodeOpenVnodes() { + tsVnodes.hash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); + if (tsVnodes.hash == NULL) { + dError("failed to init vnode hash"); + return TSDB_CODE_VND_OUT_OF_MEMORY; + } + + SVnodeObj pVnodes[TSDB_MAX_VNODES] = {0}; + int32_t numOfVnodes = 0; + int32_t code = dnodeGetVnodesFromFile(pVnodes, &numOfVnodes); + if (code != TSDB_CODE_SUCCESS) { + dInfo("failed to get vnode list from disk since %s", tstrerror(code)); + return code; + } + + tsVnodes.totalVnodes = numOfVnodes; + + int32_t threadNum = tsNumOfCores; + int32_t vnodesPerThread = numOfVnodes / threadNum + 1; + + SVThread *threads = calloc(threadNum, sizeof(SVThread)); + for (int32_t t = 0; t < threadNum; ++t) { + threads[t].threadIndex = t; + threads[t].pVnodes = calloc(vnodesPerThread, sizeof(SVnodeObj)); + } + + for (int32_t v = 0; v < numOfVnodes; ++v) { + int32_t t = v % threadNum; + SVThread *pThread = &threads[t]; + pThread->pVnodes[pThread->vnodeNum++] = pVnodes[v]; + } + + dInfo("start %d threads to open %d vnodes", threadNum, numOfVnodes); + + for (int32_t t = 0; t < threadNum; ++t) { + SVThread *pThread = &threads[t]; + if (pThread->vnodeNum == 0) continue; + + pThread->threadId = taosCreateThread(dnodeOpenVnodeFunc, pThread); + if (pThread->threadId == NULL) { + dError("thread:%d, failed to create thread to open vnode, reason:%s", pThread->threadIndex, strerror(errno)); + } + } + + for (int32_t t = 0; t < threadNum; ++t) { + SVThread *pThread = &threads[t]; + taosDestoryThread(pThread->threadId); + pThread->threadId = NULL; + free(pThread->pVnodes); + } + free(threads); + + if (tsVnodes.openVnodes != tsVnodes.totalVnodes) { + dError("there are total vnodes:%d, opened:%d", tsVnodes.totalVnodes, tsVnodes.openVnodes); + return -1; + } else { + dInfo("total vnodes:%d open successfully", tsVnodes.totalVnodes); + } + + return TSDB_CODE_SUCCESS; +} + +static void dnodeCloseVnodes() { + SVnodeObj *pVnodes[TSDB_MAX_VNODES] = {0}; + int32_t numOfVnodes = 0; + + int32_t code = dnodeGetVnodesFromHash(pVnodes, &numOfVnodes); + if (code != TSDB_CODE_SUCCESS) { + dInfo("failed to get dnode list since code %d", code); + return; + } + + for (int32_t i = 0; i < numOfVnodes; ++i) { + vnodeClose(pVnodes[i]->pImpl); + } + + if (tsVnodes.hash != NULL) { + taosHashCleanup(tsVnodes.hash); + tsVnodes.hash = NULL; + } + + dInfo("total vnodes:%d are all closed", numOfVnodes); +} + +static int32_t dnodeParseCreateVnodeReq(SRpcMsg *rpcMsg, int32_t *vgId, SVnodeCfg *pCfg) { + SCreateVnodeMsg *pCreate = rpcMsg->pCont; + *vgId = htonl(pCreate->vgId); + + tstrncpy(pCfg->db, pCreate->db, TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN); + pCfg->cacheBlockSize = htonl(pCreate->cacheBlockSize); + pCfg->totalBlocks = htonl(pCreate->totalBlocks); + pCfg->daysPerFile = htonl(pCreate->daysPerFile); + pCfg->daysToKeep0 = htonl(pCreate->daysToKeep0); + pCfg->daysToKeep1 = htonl(pCreate->daysToKeep1); + pCfg->daysToKeep2 = htonl(pCreate->daysToKeep2); + pCfg->minRowsPerFileBlock = htonl(pCreate->minRowsPerFileBlock); + pCfg->maxRowsPerFileBlock = htonl(pCreate->maxRowsPerFileBlock); + pCfg->precision = pCreate->precision; + pCfg->compression = pCreate->compression; + pCfg->cacheLastRow = pCreate->cacheLastRow; + pCfg->update = pCreate->update; + pCfg->quorum = pCreate->quorum; + pCfg->replica = pCreate->replica; + pCfg->walLevel = pCreate->walLevel; + pCfg->fsyncPeriod = htonl(pCreate->fsyncPeriod); + + for (int32_t i = 0; i < pCfg->replica; ++i) { + pCfg->replicas[i].port = htons(pCreate->replicas[i].port); + tstrncpy(pCfg->replicas[i].fqdn, pCreate->replicas[i].fqdn, TSDB_FQDN_LEN); + } + + return 0; +} + +static SDropVnodeMsg *vnodeParseDropVnodeReq(SRpcMsg *rpcMsg) { + SDropVnodeMsg *pDrop = rpcMsg->pCont; + pDrop->vgId = htonl(pDrop->vgId); + return pDrop; +} + +static SAuthVnodeMsg *vnodeParseAuthVnodeReq(SRpcMsg *rpcMsg) { + SAuthVnodeMsg *pAuth = rpcMsg->pCont; + pAuth->vgId = htonl(pAuth->vgId); + return pAuth; +} + +static int32_t vnodeProcessCreateVnodeReq(SRpcMsg *rpcMsg) { + SVnodeCfg vnodeCfg = {0}; + int32_t vgId = 0; + + dnodeParseCreateVnodeReq(rpcMsg, &vgId, &vnodeCfg); + dDebug("vgId:%d, create vnode req is received", vgId); + + SVnodeObj *pVnode = dnodeAcquireVnode(vgId); + if (pVnode != NULL) { + dDebug("vgId:%d, already exist, return success", vgId); + dnodeReleaseVnode(pVnode); + return 0; + } + + int32_t code = dnodeCreateVnode(vgId, &vnodeCfg); + if (code != 0) { + dError("vgId:%d, failed to create vnode since %s", vgId, tstrerror(code)); + } + + return code; +} + +static int32_t vnodeProcessAlterVnodeReq(SRpcMsg *rpcMsg) { + SVnodeCfg vnodeCfg = {0}; + int32_t vgId = 0; + int32_t code = 0; + + dnodeParseCreateVnodeReq(rpcMsg, &vgId, &vnodeCfg); + dDebug("vgId:%d, alter vnode req is received", vgId); + + SVnodeObj *pVnode = dnodeAcquireVnode(vgId); + if (pVnode == NULL) { + code = terrno; + dDebug("vgId:%d, failed to alter vnode since %s", vgId, tstrerror(code)); + return code; + } + + code = vnodeAlter(pVnode->pImpl, &vnodeCfg); + if (code != 0) { + dError("vgId:%d, failed to alter vnode since %s", vgId, tstrerror(code)); + } + + dnodeReleaseVnode(pVnode); + return code; +} + +static int32_t vnodeProcessDropVnodeReq(SRpcMsg *rpcMsg) { + SDropVnodeMsg *pDrop = vnodeParseDropVnodeReq(rpcMsg); + + int32_t code = 0; + int32_t vgId = pDrop->vgId; + dDebug("vgId:%d, drop vnode req is received", vgId); + + SVnodeObj *pVnode = dnodeAcquireVnode(vgId); + if (pVnode == NULL) { + code = terrno; + dDebug("vgId:%d, failed to drop since %s", vgId, tstrerror(code)); + return code; + } + + code = vnodeDrop(pVnode->pImpl); + if (code != 0) { + dError("vgId:%d, failed to drop vnode since %s", vgId, tstrerror(code)); + } + + dnodeReleaseVnode(pVnode); + return code; +} + +static int32_t vnodeProcessAuthVnodeReq(SRpcMsg *rpcMsg) { + SAuthVnodeMsg *pAuth = (SAuthVnodeMsg *)vnodeParseAuthVnodeReq(rpcMsg); + + int32_t code = 0; + int32_t vgId = pAuth->vgId; + dDebug("vgId:%d, auth vnode req is received", vgId); + + SVnodeObj *pVnode = dnodeAcquireVnode(vgId); + if (pVnode == NULL) { + code = terrno; + dDebug("vgId:%d, failed to auth since %s", vgId, tstrerror(code)); + return code; + } + + pVnode->accessState = pAuth->accessState; + dnodeReleaseVnode(pVnode); + return code; +} + +static int32_t vnodeProcessSyncVnodeReq(SRpcMsg *rpcMsg) { + SAuthVnodeMsg *pAuth = (SAuthVnodeMsg *)vnodeParseAuthVnodeReq(rpcMsg); + + int32_t code = 0; + int32_t vgId = pAuth->vgId; + dDebug("vgId:%d, auth vnode req is received", vgId); + + SVnodeObj *pVnode = dnodeAcquireVnode(vgId); + if (pVnode == NULL) { + code = terrno; + dDebug("vgId:%d, failed to auth since %s", vgId, tstrerror(code)); + return code; + } + + code = vnodeSync(pVnode->pImpl); + if (code != 0) { + dError("vgId:%d, failed to auth vnode since %s", vgId, tstrerror(code)); + } + + dnodeReleaseVnode(pVnode); + return code; +} + +static int32_t vnodeProcessCompactVnodeReq(SRpcMsg *rpcMsg) { + SCompactVnodeMsg *pCompact = (SCompactVnodeMsg *)vnodeParseDropVnodeReq(rpcMsg); + + int32_t code = 0; + int32_t vgId = pCompact->vgId; + dDebug("vgId:%d, compact vnode req is received", vgId); + + SVnodeObj *pVnode = dnodeAcquireVnode(vgId); + if (pVnode == NULL) { + code = terrno; + dDebug("vgId:%d, failed to compact since %s", vgId, tstrerror(code)); + return code; + } + + code = vnodeCompact(pVnode->pImpl); + if (code != 0) { + dError("vgId:%d, failed to compact vnode since %s", vgId, tstrerror(code)); + } + + dnodeReleaseVnode(pVnode); + return code; +} + +static void dnodeProcessVnodeMgmtReq(SRpcMsg *pMsg, void *unused) { + int32_t code = 0; + + switch (pMsg->msgType) { + case TSDB_MSG_TYPE_CREATE_VNODE_IN: + code = vnodeProcessCreateVnodeReq(pMsg); + break; + case TSDB_MSG_TYPE_ALTER_VNODE_IN: + code = vnodeProcessAlterVnodeReq(pMsg); + break; + case TSDB_MSG_TYPE_DROP_VNODE_IN: + code = vnodeProcessDropVnodeReq(pMsg); + break; + case TSDB_MSG_TYPE_AUTH_VNODE_IN: + code = vnodeProcessAuthVnodeReq(pMsg); + break; + case TSDB_MSG_TYPE_SYNC_VNODE_IN: + code = vnodeProcessSyncVnodeReq(pMsg); + break; + case TSDB_MSG_TYPE_COMPACT_VNODE_IN: + code = vnodeProcessCompactVnodeReq(pMsg); + break; + default: + code = TSDB_CODE_DND_MSG_NOT_PROCESSED; + break; + } + + SRpcMsg rsp = {.code = code, .handle = pMsg->handle}; + rpcSendResponse(&rsp); + rpcFreeCont(pMsg->pCont); + taosFreeQitem(pMsg); +} + +static int32_t dnodeWriteToVnodeQueue(taos_queue pQueue, SRpcMsg *pRpcMsg) { + int32_t code = 0; + + if (pQueue == NULL) { + code = TSDB_CODE_DND_MSG_NOT_PROCESSED; + } else { + SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg)); + if (pMsg == NULL) { + code = TSDB_CODE_DND_OUT_OF_MEMORY; + } else { + *pMsg = *pRpcMsg; + code = taosWriteQitem(pQueue, pMsg); + } + } + + if (code != TSDB_CODE_SUCCESS) { + SRpcMsg rsp = {.handle = pRpcMsg->handle, .code = code}; + rpcSendResponse(&rsp); + rpcFreeCont(pRpcMsg->pCont); + } +} + +static SVnodeObj *dnodeAcquireVnodeFromMsg(SRpcMsg *pMsg) { + SMsgHead *pHead = (SMsgHead *)pMsg->pCont; + pHead->vgId = htonl(pHead->vgId); + + SVnodeObj *pVnode = dnodeAcquireVnode(pHead->vgId); + if (pVnode == NULL) { + SRpcMsg rsp = {.handle = pMsg->handle, .code = terrno}; + rpcSendResponse(&rsp); + rpcFreeCont(pMsg->pCont); + } + + return pVnode; +} + +void dnodeProcessVnodeMgmtMsg(SRpcMsg *pMsg, SEpSet *pEpSet) { dnodeWriteToVnodeQueue(tsVnodes.pMgmtQ, pMsg); } + +void dnodeProcessVnodeWriteMsg(SRpcMsg *pMsg, SEpSet *pEpSet) { + SVnodeObj *pVnode = dnodeAcquireVnodeFromMsg(pMsg); + if (pVnode != NULL) { + dnodeWriteToVnodeQueue(pVnode->pWriteQ, pMsg); + dnodeReleaseVnode(pVnode); + } +} + +void dnodeProcessVnodeSyncMsg(SRpcMsg *pMsg, SEpSet *pEpSet) { + SVnodeObj *pVnode = dnodeAcquireVnodeFromMsg(pMsg); + if (pVnode != NULL) { + dnodeWriteToVnodeQueue(pVnode->pSyncQ, pMsg); + dnodeReleaseVnode(pVnode); + } +} + +void dnodeProcessVnodeQueryMsg(SRpcMsg *pMsg, SEpSet *pEpSet) { + SVnodeObj *pVnode = dnodeAcquireVnodeFromMsg(pMsg); + if (pVnode != NULL) { + dnodeWriteToVnodeQueue(pVnode->pQueryQ, pMsg); + dnodeReleaseVnode(pVnode); + } +} + +void dnodeProcessVnodeFetchMsg(SRpcMsg *pMsg, SEpSet *pEpSet) { + SVnodeObj *pVnode = dnodeAcquireVnodeFromMsg(pMsg); + if (pVnode != NULL) { + dnodeWriteToVnodeQueue(pVnode->pFetchQ, pMsg); + dnodeReleaseVnode(pVnode); + } +} + +static int32_t dnodeInitVnodeMgmtWorker() { + SWorkerPool *pPool = &tsVnodes.mgmtPool; + pPool->name = "vnode-mgmt"; + pPool->min = 1; + pPool->max = 1; + if (tWorkerInit(pPool) != 0) { + return TSDB_CODE_VND_OUT_OF_MEMORY; + } + + tsVnodes.pMgmtQ = tWorkerAllocQueue(pPool, NULL, (FProcessItem)dnodeProcessVnodeMgmtReq); + if (tsVnodes.pMgmtQ == NULL) { + return TSDB_CODE_VND_OUT_OF_MEMORY; + } + + return 0; +} + +static void dnodeCleanupVnodeMgmtWorker() { + tWorkerFreeQueue(&tsVnodes.mgmtPool, tsVnodes.pMgmtQ); + tWorkerCleanup(&tsVnodes.mgmtPool); + tsVnodes.pMgmtQ = NULL; +} + +int32_t dnodeInitVnodes() { + dInfo("dnode-vnodes start to init"); + + SSteps *pSteps = taosStepInit(3, dnodeReportStartup); + taosStepAdd(pSteps, "dnode-vnode-env", vnodeInit, vnodeCleanup); + taosStepAdd(pSteps, "dnode-vnode-mgmt", dnodeInitVnodeMgmtWorker, dnodeCleanupVnodeMgmtWorker); + taosStepAdd(pSteps, "dnode-vnodes", dnodeOpenVnodes, dnodeCleanupVnodes); + + tsVnodes.pSteps = pSteps; + return taosStepExec(pSteps); +} + +void dnodeCleanupVnodes() { + if (tsVnodes.pSteps != NULL) { + dInfo("dnode-vnodes start to clean up"); + taosStepCleanup(tsVnodes.pSteps); + tsVnodes.pSteps = NULL; + dInfo("dnode-vnodes is cleaned up"); + } +} + +void dnodeGetVnodes(SVnodeLoads *pLoads) { + pLoads->vnodeNum = taosHashGetSize(tsVnodes.hash); + + int32_t v = 0; + void *pIter = taosHashIterate(tsVnodes.hash, NULL); + while (pIter) { + SVnodeObj **ppVnode = pIter; + if (ppVnode == NULL) continue; + SVnodeObj *pVnode = *ppVnode; + if (pVnode) { + SVnodeLoad *pLoad = &pLoads->vnodeLoads[v++]; + vnodeGetLoad(pVnode->pImpl, pLoad); + pLoad->vgId = htonl(pLoad->vgId); + pLoad->totalStorage = htobe64(pLoad->totalStorage); + pLoad->compStorage = htobe64(pLoad->compStorage); + pLoad->pointsWritten = htobe64(pLoad->pointsWritten); + pLoad->tablesNum = htobe64(pLoad->tablesNum); + } + pIter = taosHashIterate(tsVnodes.hash, pIter); + } +} \ No newline at end of file diff --git a/source/dnode/vnode/impl/src/vnodeInt.c b/source/dnode/vnode/impl/src/vnodeInt.c index 6f83542cef..1df3d37749 100644 --- a/source/dnode/vnode/impl/src/vnodeInt.c +++ b/source/dnode/vnode/impl/src/vnodeInt.c @@ -19,9 +19,6 @@ int32_t vnodeInit() { return 0; } void vnodeCleanup() {} -int32_t vnodeGetStatistics(SVnode *pVnode, SVnodeStatisic *pStat) { return 0; } -int32_t vnodeGetStatus(SVnode *pVnode, SVnodeStatus *pStatus) { return 0; } - SVnode *vnodeOpen(int32_t vgId, const char *path) { return NULL; } void vnodeClose(SVnode *pVnode) {} int32_t vnodeAlter(SVnode *pVnode, const SVnodeCfg *pCfg) { return 0; } @@ -31,3 +28,4 @@ int32_t vnodeCompact(SVnode *pVnode) { return 0; } int32_t vnodeSync(SVnode *pVnode) { return 0; } void vnodeProcessMsg(SVnode *pVnode, SVnodeMsg *pMsg) {} +void vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) {} From 2f09d49aa4c32f01398448fed3d06e0030099223 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Thu, 4 Nov 2021 14:06:29 +0800 Subject: [PATCH 2/7] refact dnode - vnodes --- include/common/taosmsg.h | 4 +- include/server/vnode/vnode.h | 22 +- include/util/tqueue.h | 4 +- source/dnode/mgmt/inc/dnodeVnodes.h | 2 +- source/dnode/mgmt/src/dnodeDnode.c | 4 +- source/dnode/mgmt/src/dnodeVnodes.c | 299 ++++++++++++++++++++++--- source/dnode/vnode/impl/src/vnodeInt.c | 43 +++- source/util/src/tworker.c | 4 +- 8 files changed, 329 insertions(+), 53 deletions(-) diff --git a/include/common/taosmsg.h b/include/common/taosmsg.h index 49e8892c7d..822f850018 100644 --- a/include/common/taosmsg.h +++ b/include/common/taosmsg.h @@ -655,8 +655,8 @@ typedef struct { } SVnodeLoad; typedef struct { - int32_t vnodeNum; - SVnodeLoad vnodeLoads[]; + int32_t num; + SVnodeLoad data[]; } SVnodeLoads; typedef struct SStatusMsg { diff --git a/include/server/vnode/vnode.h b/include/server/vnode/vnode.h index a20a7fd410..ca0706f0b9 100644 --- a/include/server/vnode/vnode.h +++ b/include/server/vnode/vnode.h @@ -47,12 +47,18 @@ typedef struct { SVnodeDesc replicas[TSDB_MAX_REPLICA]; } SVnodeCfg; +typedef enum { + VN_MSG_TYPE_WRITE = 1, + VN_MSG_TYPE_APPLY, + VN_MSG_TYPE_SYNC, + VN_MSG_TYPE_QUERY, + VN_MSG_TYPE_FETCH +} EVMType; + typedef struct SVnodeMsg { - int32_t msgType; - int32_t code; - SRpcMsg rpcMsg; // original message from rpc - int32_t contLen; - char pCont[]; + int32_t curNum; + int32_t allocNum; + SRpcMsg rpcMsg[]; } SVnodeMsg; int32_t vnodeInit(); @@ -67,7 +73,11 @@ int32_t vnodeCompact(SVnode *pVnode); int32_t vnodeSync(SVnode *pVnode); void vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad); -void vnodeProcessMsg(SVnode *pVnode, SVnodeMsg *pMsg); + +SVnodeMsg *vnodeInitMsg(int32_t msgNum); +int32_t vnodeAppendMsg(SVnodeMsg *pMsg, SRpcMsg *pRpcMsg); +void vnodeCleanupMsg(SVnodeMsg *pMsg); +void vnodeProcessMsg(SVnode *pVnode, SVnodeMsg *pMsg, EVMType msgType); #ifdef __cplusplus } diff --git a/include/util/tqueue.h b/include/util/tqueue.h index faac1afe70..24c56ea6a3 100644 --- a/include/util/tqueue.h +++ b/include/util/tqueue.h @@ -40,8 +40,8 @@ shall be used to set up the protection. typedef void *taos_queue; typedef void *taos_qset; typedef void *taos_qall; -typedef void *(*FProcessItem)(void *pItem, void *ahandle); -typedef void *(*FProcessItems)(taos_qall qall, int numOfItems, void *ahandle); +typedef void (*FProcessItem)(void *ahandle, void *pItem); +typedef void (*FProcessItems)(void *ahandle, taos_qall qall, int numOfItems); taos_queue taosOpenQueue(); void taosCloseQueue(taos_queue); diff --git a/source/dnode/mgmt/inc/dnodeVnodes.h b/source/dnode/mgmt/inc/dnodeVnodes.h index 2b72ba5d59..31eae049ab 100644 --- a/source/dnode/mgmt/inc/dnodeVnodes.h +++ b/source/dnode/mgmt/inc/dnodeVnodes.h @@ -23,7 +23,7 @@ extern "C" { int32_t dnodeInitVnodes(); void dnodeCleanupVnodes(); -void dnodeGetVnodes(SVnodeLoads *pVloads); +void dnodeGetVnodeLoads(SVnodeLoads *pVloads); void dnodeProcessVnodeMgmtMsg(SRpcMsg *pMsg, SEpSet *pEpSet); void dnodeProcessVnodeWriteMsg(SRpcMsg *pMsg, SEpSet *pEpSet); diff --git a/source/dnode/mgmt/src/dnodeDnode.c b/source/dnode/mgmt/src/dnodeDnode.c index 5bf5b1d56a..63de2b940d 100644 --- a/source/dnode/mgmt/src/dnodeDnode.c +++ b/source/dnode/mgmt/src/dnodeDnode.c @@ -372,8 +372,8 @@ static void dnodeSendStatusMsg() { char timestr[32] = "1970-01-01 00:00:00.00"; (void)taosParseTime(timestr, &pStatus->clusterCfg.checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0); - dnodeGetVnodes(&pStatus->vnodeLoads); - contLen = sizeof(SStatusMsg) + pStatus->vnodeLoads.vnodeNum * sizeof(SVnodeLoad); + dnodeGetVnodeLoads(&pStatus->vnodeLoads); + contLen = sizeof(SStatusMsg) + pStatus->vnodeLoads.num * sizeof(SVnodeLoad); SRpcMsg rpcMsg = {.pCont = pStatus, .contLen = contLen, .msgType = TSDB_MSG_TYPE_STATUS}; dnodeSendMsgToMnode(&rpcMsg); diff --git a/source/dnode/mgmt/src/dnodeVnodes.c b/source/dnode/mgmt/src/dnodeVnodes.c index c2e143da80..72048a15a5 100644 --- a/source/dnode/mgmt/src/dnodeVnodes.c +++ b/source/dnode/mgmt/src/dnodeVnodes.c @@ -45,27 +45,66 @@ typedef struct { } SVThread; static struct { - SHashObj *hash; - SWorkerPool mgmtPool; - taos_queue pMgmtQ; - SSteps *pSteps; - int32_t openVnodes; - int32_t totalVnodes; - char file[PATH_MAX + 20]; + SHashObj *hash; + SWorkerPool mgmtPool; + SWorkerPool queryPool; + SWorkerPool fetchPool; + SMWorkerPool syncPool; + SMWorkerPool writePool; + taos_queue pMgmtQ; + SSteps *pSteps; + int32_t openVnodes; + int32_t totalVnodes; + char file[PATH_MAX + 20]; } tsVnodes; +static int32_t dnodeAllocVnodeQueryQueue(SVnodeObj *pVnode); +static void dnodeFreeVnodeQueryQueue(SVnodeObj *pVnode); +static int32_t dnodeAllocVnodeFetchQueue(SVnodeObj *pVnode); +static void dnodeFreeVnodeFetchQueue(SVnodeObj *pVnode); +static int32_t dnodeAllocVnodeWriteQueue(SVnodeObj *pVnode); +static void dnodeFreeVnodeWriteQueue(SVnodeObj *pVnode); +static int32_t dnodeAllocVnodeApplyQueue(SVnodeObj *pVnode); +static void dnodeFreeVnodeApplyQueue(SVnodeObj *pVnode); +static int32_t dnodeAllocVnodeSyncQueue(SVnodeObj *pVnode); +static void dnodeFreeVnodeSyncQueue(SVnodeObj *pVnode); + static int32_t dnodeCreateVnodeWrapper(int32_t vgId, SVnode *pImpl) { SVnodeObj *pVnode = calloc(1, sizeof(SVnodeObj)); + if (pVnode == NULL) { + return TSDB_CODE_DND_OUT_OF_MEMORY; + } + pVnode->vgId = vgId; pVnode->refCount = 0; pVnode->dropped = 0; pVnode->accessState = TSDB_VN_ALL_ACCCESS; pVnode->pImpl = pImpl; - pVnode->pWriteQ = NULL; - pVnode->pSyncQ = NULL; - pVnode->pApplyQ = NULL; - pVnode->pQueryQ = NULL; - pVnode->pFetchQ = NULL; + + int32_t code = dnodeAllocVnodeQueryQueue(pVnode); + if (code != 0) { + return code; + } + + code = dnodeAllocVnodeFetchQueue(pVnode); + if (code != 0) { + return code; + } + + code = dnodeAllocVnodeWriteQueue(pVnode); + if (code != 0) { + return code; + } + + code = dnodeAllocVnodeApplyQueue(pVnode); + if (code != 0) { + return code; + } + + code = dnodeAllocVnodeSyncQueue(pVnode); + if (code != 0) { + return code; + } return taosHashPut(tsVnodes.hash, &vgId, sizeof(int32_t), &pVnode, sizeof(SVnodeObj *)); } @@ -74,11 +113,11 @@ static void dnodeDropVnodeWrapper(SVnodeObj *pVnode) { taosHashRemove(tsVnodes.hash, &pVnode->vgId, sizeof(int32_t)); //todo wait all queue empty - pVnode->pWriteQ = NULL; - pVnode->pSyncQ = NULL; - pVnode->pApplyQ = NULL; - pVnode->pQueryQ = NULL; - pVnode->pFetchQ = NULL; + dnodeFreeVnodeQueryQueue(pVnode); + dnodeFreeVnodeFetchQueue(pVnode); + dnodeFreeVnodeWriteQueue(pVnode); + dnodeFreeVnodeApplyQueue(pVnode); + dnodeFreeVnodeSyncQueue(pVnode); } static int32_t dnodeGetVnodesFromHash(SVnodeObj *pVnodes[], int32_t *numOfVnodes) { @@ -465,7 +504,7 @@ static int32_t vnodeProcessCompactVnodeReq(SRpcMsg *rpcMsg) { return code; } -static void dnodeProcessVnodeMgmtReq(SRpcMsg *pMsg, void *unused) { +static void dnodeProcessVnodeMgmtQueue(void *unused, SRpcMsg *pMsg) { int32_t code = 0; switch (pMsg->msgType) { @@ -498,7 +537,44 @@ static void dnodeProcessVnodeMgmtReq(SRpcMsg *pMsg, void *unused) { taosFreeQitem(pMsg); } -static int32_t dnodeWriteToVnodeQueue(taos_queue pQueue, SRpcMsg *pRpcMsg) { +static void dnodeProcessVnodeQueryQueue(SVnodeObj *pVnode, SVnodeMsg *pMsg) { + vnodeProcessMsg(pVnode->pImpl, pMsg, VN_MSG_TYPE_QUERY); +} + +static void dnodeProcessVnodeFetchQueue(SVnodeObj *pVnode, SVnodeMsg *pMsg) { + vnodeProcessMsg(pVnode->pImpl, pMsg, VN_MSG_TYPE_FETCH); +} + +static void dnodeProcessVnodeWriteQueue(SVnodeObj *pVnode, taos_qall qall, int32_t numOfMsgs) { + SVnodeMsg *pMsg = vnodeInitMsg(numOfMsgs); + SRpcMsg *pRpcMsg = NULL; + + for (int32_t i = 0; i < numOfMsgs; ++i) { + taosGetQitem(qall, (void **)&pRpcMsg); + vnodeAppendMsg(pMsg, pRpcMsg); + taosFreeQitem(pRpcMsg); + } + + vnodeProcessMsg(pVnode->pImpl, pMsg, VN_MSG_TYPE_WRITE); +} + +static void dnodeProcessVnodeApplyQueue(SVnodeObj *pVnode, taos_qall qall, int32_t numOfMsgs) { + SVnodeMsg *pMsg = NULL; + for (int32_t i = 0; i < numOfMsgs; ++i) { + taosGetQitem(qall, (void **)&pMsg); + vnodeProcessMsg(pVnode->pImpl, pMsg, VN_MSG_TYPE_APPLY); + } +} + +static void dnodeProcessVnodeSyncQueue(SVnodeObj *pVnode, taos_qall qall, int32_t numOfMsgs) { + SVnodeMsg *pMsg = NULL; + for (int32_t i = 0; i < numOfMsgs; ++i) { + taosGetQitem(qall, (void **)&pMsg); + vnodeProcessMsg(pVnode->pImpl, pMsg, VN_MSG_TYPE_SYNC); + } +} + +static int32_t dnodeWriteRpcMsgToVnodeQueue(taos_queue pQueue, SRpcMsg *pRpcMsg) { int32_t code = 0; if (pQueue == NULL) { @@ -520,6 +596,28 @@ static int32_t dnodeWriteToVnodeQueue(taos_queue pQueue, SRpcMsg *pRpcMsg) { } } +static int32_t dnodeWriteVnodeMsgToVnodeQueue(taos_queue pQueue, SRpcMsg *pRpcMsg) { + int32_t code = 0; + + if (pQueue == NULL) { + code = TSDB_CODE_DND_MSG_NOT_PROCESSED; + } else { + SVnodeMsg *pMsg = vnodeInitMsg(1); + if (pMsg == NULL) { + code = TSDB_CODE_DND_OUT_OF_MEMORY; + } else { + vnodeAppendMsg(pMsg, pRpcMsg); + code = taosWriteQitem(pQueue, pMsg); + } + } + + if (code != TSDB_CODE_SUCCESS) { + SRpcMsg rsp = {.handle = pRpcMsg->handle, .code = code}; + rpcSendResponse(&rsp); + rpcFreeCont(pRpcMsg->pCont); + } +} + static SVnodeObj *dnodeAcquireVnodeFromMsg(SRpcMsg *pMsg) { SMsgHead *pHead = (SMsgHead *)pMsg->pCont; pHead->vgId = htonl(pHead->vgId); @@ -534,12 +632,12 @@ static SVnodeObj *dnodeAcquireVnodeFromMsg(SRpcMsg *pMsg) { return pVnode; } -void dnodeProcessVnodeMgmtMsg(SRpcMsg *pMsg, SEpSet *pEpSet) { dnodeWriteToVnodeQueue(tsVnodes.pMgmtQ, pMsg); } +void dnodeProcessVnodeMgmtMsg(SRpcMsg *pMsg, SEpSet *pEpSet) { dnodeWriteRpcMsgToVnodeQueue(tsVnodes.pMgmtQ, pMsg); } void dnodeProcessVnodeWriteMsg(SRpcMsg *pMsg, SEpSet *pEpSet) { SVnodeObj *pVnode = dnodeAcquireVnodeFromMsg(pMsg); if (pVnode != NULL) { - dnodeWriteToVnodeQueue(pVnode->pWriteQ, pMsg); + dnodeWriteRpcMsgToVnodeQueue(pVnode->pWriteQ, pMsg); dnodeReleaseVnode(pVnode); } } @@ -547,7 +645,7 @@ void dnodeProcessVnodeWriteMsg(SRpcMsg *pMsg, SEpSet *pEpSet) { void dnodeProcessVnodeSyncMsg(SRpcMsg *pMsg, SEpSet *pEpSet) { SVnodeObj *pVnode = dnodeAcquireVnodeFromMsg(pMsg); if (pVnode != NULL) { - dnodeWriteToVnodeQueue(pVnode->pSyncQ, pMsg); + dnodeWriteVnodeMsgToVnodeQueue(pVnode->pSyncQ, pMsg); dnodeReleaseVnode(pVnode); } } @@ -555,7 +653,7 @@ void dnodeProcessVnodeSyncMsg(SRpcMsg *pMsg, SEpSet *pEpSet) { void dnodeProcessVnodeQueryMsg(SRpcMsg *pMsg, SEpSet *pEpSet) { SVnodeObj *pVnode = dnodeAcquireVnodeFromMsg(pMsg); if (pVnode != NULL) { - dnodeWriteToVnodeQueue(pVnode->pQueryQ, pMsg); + dnodeWriteVnodeMsgToVnodeQueue(pVnode->pQueryQ, pMsg); dnodeReleaseVnode(pVnode); } } @@ -563,7 +661,7 @@ void dnodeProcessVnodeQueryMsg(SRpcMsg *pMsg, SEpSet *pEpSet) { void dnodeProcessVnodeFetchMsg(SRpcMsg *pMsg, SEpSet *pEpSet) { SVnodeObj *pVnode = dnodeAcquireVnodeFromMsg(pMsg); if (pVnode != NULL) { - dnodeWriteToVnodeQueue(pVnode->pFetchQ, pMsg); + dnodeWriteVnodeMsgToVnodeQueue(pVnode->pFetchQ, pMsg); dnodeReleaseVnode(pVnode); } } @@ -577,7 +675,7 @@ static int32_t dnodeInitVnodeMgmtWorker() { return TSDB_CODE_VND_OUT_OF_MEMORY; } - tsVnodes.pMgmtQ = tWorkerAllocQueue(pPool, NULL, (FProcessItem)dnodeProcessVnodeMgmtReq); + tsVnodes.pMgmtQ = tWorkerAllocQueue(pPool, NULL, (FProcessItem)dnodeProcessVnodeMgmtQueue); if (tsVnodes.pMgmtQ == NULL) { return TSDB_CODE_VND_OUT_OF_MEMORY; } @@ -591,12 +689,137 @@ static void dnodeCleanupVnodeMgmtWorker() { tsVnodes.pMgmtQ = NULL; } +static int32_t dnodeAllocVnodeQueryQueue(SVnodeObj *pVnode) { + pVnode->pQueryQ = tWorkerAllocQueue(&tsVnodes.queryPool, pVnode, (FProcessItem)dnodeProcessVnodeQueryQueue); + if (pVnode->pQueryQ == NULL) { + return TSDB_CODE_DND_OUT_OF_MEMORY; + } + return 0; +} + +static void dnodeFreeVnodeQueryQueue(SVnodeObj *pVnode) { + tWorkerFreeQueue(&tsVnodes.queryPool, pVnode->pQueryQ); + pVnode->pQueryQ = NULL; +} + +static int32_t dnodeAllocVnodeFetchQueue(SVnodeObj *pVnode) { + pVnode->pFetchQ = tWorkerAllocQueue(&tsVnodes.fetchPool, pVnode, (FProcessItem)dnodeProcessVnodeFetchQueue); + if (pVnode->pFetchQ == NULL) { + return TSDB_CODE_DND_OUT_OF_MEMORY; + } + return 0; +} + +static void dnodeFreeVnodeFetchQueue(SVnodeObj *pVnode) { + tWorkerFreeQueue(&tsVnodes.fetchPool, pVnode->pFetchQ); + pVnode->pFetchQ = NULL; +} + +static int32_t dnodeInitVnodeReadWorker() { + int32_t maxFetchThreads = 4; + float threadsForQuery = MAX(tsNumOfCores * tsRatioOfQueryCores, 1); + + SWorkerPool *pPool = &tsVnodes.queryPool; + pPool->name = "vnode-query"; + pPool->min = (int32_t)threadsForQuery; + pPool->max = pPool->min; + if (tWorkerInit(pPool) != 0) { + return TSDB_CODE_VND_OUT_OF_MEMORY; + } + + pPool = &tsVnodes.fetchPool; + pPool->name = "vnode-fetch"; + pPool->min = MIN(maxFetchThreads, tsNumOfCores); + pPool->max = pPool->min; + if (tWorkerInit(pPool) != 0) { + TSDB_CODE_VND_OUT_OF_MEMORY; + } + + return 0; +} + +static void dnodeCleanupVnodeReadWorker() { + tWorkerCleanup(&tsVnodes.fetchPool); + tWorkerCleanup(&tsVnodes.queryPool); +} + +static int32_t dnodeAllocVnodeWriteQueue(SVnodeObj *pVnode) { + pVnode->pWriteQ = tMWorkerAllocQueue(&tsVnodes.writePool, pVnode, (FProcessItems)dnodeProcessVnodeWriteQueue); + if (pVnode->pWriteQ == NULL) { + return TSDB_CODE_DND_OUT_OF_MEMORY; + } + return 0; +} + +static void dnodeFreeVnodeWriteQueue(SVnodeObj *pVnode) { + tMWorkerFreeQueue(&tsVnodes.writePool, pVnode->pWriteQ); + pVnode->pWriteQ = NULL; +} + +static int32_t dnodeAllocVnodeApplyQueue(SVnodeObj *pVnode) { + pVnode->pApplyQ = tMWorkerAllocQueue(&tsVnodes.writePool, pVnode, (FProcessItems)dnodeProcessVnodeApplyQueue); + if (pVnode->pApplyQ == NULL) { + return TSDB_CODE_DND_OUT_OF_MEMORY; + } + return 0; +} + +static void dnodeFreeVnodeApplyQueue(SVnodeObj *pVnode) { + tMWorkerFreeQueue(&tsVnodes.writePool, pVnode->pApplyQ); + pVnode->pApplyQ = NULL; +} + +static int32_t dnodeInitVnodeWriteWorker() { + SMWorkerPool *pPool = &tsVnodes.writePool; + pPool->name = "vnode-write"; + pPool->max = tsNumOfCores; + if (tMWorkerInit(pPool) != 0) { + return TSDB_CODE_VND_OUT_OF_MEMORY; + } + + return 0; +} + +static void dnodeCleanupVnodeWriteWorker() { tMWorkerCleanup(&tsVnodes.writePool); } + +static int32_t dnodeAllocVnodeSyncQueue(SVnodeObj *pVnode) { + pVnode->pSyncQ = tMWorkerAllocQueue(&tsVnodes.writePool, pVnode, (FProcessItems)dnodeProcessVnodeSyncQueue); + if (pVnode->pSyncQ == NULL) { + return TSDB_CODE_DND_OUT_OF_MEMORY; + } + return 0; +} + +static void dnodeFreeVnodeSyncQueue(SVnodeObj *pVnode) { + tMWorkerFreeQueue(&tsVnodes.writePool, pVnode->pSyncQ); + pVnode->pSyncQ = NULL; +} + +static int32_t dnodeInitVnodeSyncWorker() { + int32_t maxThreads = tsNumOfCores / 2; + if (maxThreads < 1) maxThreads = 1; + + SMWorkerPool *pPool = &tsVnodes.writePool; + pPool->name = "vnode-sync"; + pPool->max = maxThreads; + if (tMWorkerInit(pPool) != 0) { + return TSDB_CODE_VND_OUT_OF_MEMORY; + } + + return 0; +} + +static void dnodeCleanupVnodeSyncWorker() { tMWorkerCleanup(&tsVnodes.syncPool); } + int32_t dnodeInitVnodes() { dInfo("dnode-vnodes start to init"); SSteps *pSteps = taosStepInit(3, dnodeReportStartup); taosStepAdd(pSteps, "dnode-vnode-env", vnodeInit, vnodeCleanup); taosStepAdd(pSteps, "dnode-vnode-mgmt", dnodeInitVnodeMgmtWorker, dnodeCleanupVnodeMgmtWorker); + taosStepAdd(pSteps, "dnode-vnode-read", dnodeInitVnodeReadWorker, dnodeCleanupVnodeReadWorker); + taosStepAdd(pSteps, "dnode-vnode-write", dnodeInitVnodeWriteWorker, dnodeCleanupVnodeWriteWorker); + taosStepAdd(pSteps, "dnode-vnode-sync", dnodeInitVnodeSyncWorker, dnodeCleanupVnodeSyncWorker); taosStepAdd(pSteps, "dnode-vnodes", dnodeOpenVnodes, dnodeCleanupVnodes); tsVnodes.pSteps = pSteps; @@ -612,24 +835,26 @@ void dnodeCleanupVnodes() { } } -void dnodeGetVnodes(SVnodeLoads *pLoads) { - pLoads->vnodeNum = taosHashGetSize(tsVnodes.hash); +void dnodeGetVnodeLoads(SVnodeLoads *pLoads) { + pLoads->num = taosHashGetSize(tsVnodes.hash); int32_t v = 0; void *pIter = taosHashIterate(tsVnodes.hash, NULL); while (pIter) { SVnodeObj **ppVnode = pIter; if (ppVnode == NULL) continue; + SVnodeObj *pVnode = *ppVnode; - if (pVnode) { - SVnodeLoad *pLoad = &pLoads->vnodeLoads[v++]; - vnodeGetLoad(pVnode->pImpl, pLoad); - pLoad->vgId = htonl(pLoad->vgId); - pLoad->totalStorage = htobe64(pLoad->totalStorage); - pLoad->compStorage = htobe64(pLoad->compStorage); - pLoad->pointsWritten = htobe64(pLoad->pointsWritten); - pLoad->tablesNum = htobe64(pLoad->tablesNum); - } + if (pVnode == NULL) continue; + + SVnodeLoad *pLoad = &pLoads->data[v++]; + vnodeGetLoad(pVnode->pImpl, pLoad); + pLoad->vgId = htonl(pLoad->vgId); + pLoad->totalStorage = htobe64(pLoad->totalStorage); + pLoad->compStorage = htobe64(pLoad->compStorage); + pLoad->pointsWritten = htobe64(pLoad->pointsWritten); + pLoad->tablesNum = htobe64(pLoad->tablesNum); + pIter = taosHashIterate(tsVnodes.hash, pIter); } -} \ No newline at end of file +} diff --git a/source/dnode/vnode/impl/src/vnodeInt.c b/source/dnode/vnode/impl/src/vnodeInt.c index 1df3d37749..c345f2e1b9 100644 --- a/source/dnode/vnode/impl/src/vnodeInt.c +++ b/source/dnode/vnode/impl/src/vnodeInt.c @@ -15,6 +15,7 @@ #define _DEFAULT_SOURCE #include "vnodeInt.h" +#include "tqueue.h" int32_t vnodeInit() { return 0; } void vnodeCleanup() {} @@ -27,5 +28,45 @@ int32_t vnodeDrop(SVnode *pVnode) { return 0; } int32_t vnodeCompact(SVnode *pVnode) { return 0; } int32_t vnodeSync(SVnode *pVnode) { return 0; } -void vnodeProcessMsg(SVnode *pVnode, SVnodeMsg *pMsg) {} void vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) {} + +SVnodeMsg *vnodeInitMsg(int32_t msgNum) { + SVnodeMsg *pMsg = taosAllocateQitem(msgNum * sizeof(SRpcMsg *) + sizeof(SVnodeMsg)); + if (pMsg == NULL) { + terrno = TSDB_CODE_VND_OUT_OF_MEMORY; + return NULL; + } else { + pMsg->allocNum = msgNum; + return pMsg; + } +} + +int32_t vnodeAppendMsg(SVnodeMsg *pMsg, SRpcMsg *pRpcMsg) { + if (pMsg->curNum >= pMsg->allocNum) { + return TSDB_CODE_VND_OUT_OF_MEMORY; + } + + pMsg->rpcMsg[pMsg->curNum++] = *pRpcMsg; +} + +void vnodeCleanupMsg(SVnodeMsg *pMsg) { + for (int32_t i = 0; i < pMsg->curNum; ++i) { + rpcFreeCont(pMsg->rpcMsg[i].pCont); + } + taosFreeQitem(pMsg); +} + +void vnodeProcessMsg(SVnode *pVnode, SVnodeMsg *pMsg, EVMType msgType) { + switch (msgType) { + case VN_MSG_TYPE_WRITE: + break; + case VN_MSG_TYPE_APPLY: + break; + case VN_MSG_TYPE_SYNC: + break; + case VN_MSG_TYPE_QUERY: + break; + case VN_MSG_TYPE_FETCH: + break; + } +} diff --git a/source/util/src/tworker.c b/source/util/src/tworker.c index 7df12089b7..136bc40482 100644 --- a/source/util/src/tworker.c +++ b/source/util/src/tworker.c @@ -76,7 +76,7 @@ static void *tWorkerThreadFp(SWorker *worker) { } if (fp) { - (*fp)(msg, ahandle); + (*fp)(ahandle, msg); } } @@ -186,7 +186,7 @@ static void *tWriteWorkerThreadFp(SMWorker *worker) { } if (fp) { - (*fp)(worker->qall, numOfMsgs, ahandle); + (*fp)(ahandle, worker->qall, numOfMsgs); } } From 318c3b7cf78915363a2c4c9610eb7fb8cc4ccb90 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Thu, 4 Nov 2021 14:18:03 +0800 Subject: [PATCH 3/7] add apply callback --- include/server/vnode/vnode.h | 10 ++++++++-- source/dnode/mgmt/src/dnodeVnodes.c | 23 ++++++++++++++++++++++- source/dnode/vnode/impl/src/vnodeInt.c | 2 +- 3 files changed, 31 insertions(+), 4 deletions(-) diff --git a/include/server/vnode/vnode.h b/include/server/vnode/vnode.h index ca0706f0b9..6db0eb86f6 100644 --- a/include/server/vnode/vnode.h +++ b/include/server/vnode/vnode.h @@ -55,13 +55,19 @@ typedef enum { VN_MSG_TYPE_FETCH } EVMType; -typedef struct SVnodeMsg { +typedef struct { int32_t curNum; int32_t allocNum; SRpcMsg rpcMsg[]; } SVnodeMsg; -int32_t vnodeInit(); +typedef struct { + void (*SendMsgToDnode)(SEpSet *pEpSet, SRpcMsg *pMsg); + void (*SendMsgToMnode)(SRpcMsg *pMsg); + int32_t (*PutMsgIntoApplyQueue)(int32_t vgId, SVnodeMsg *pMsg); +} SVnodePara; + +int32_t vnodeInit(SVnodePara); void vnodeCleanup(); SVnode *vnodeOpen(int32_t vgId, const char *path); diff --git a/source/dnode/mgmt/src/dnodeVnodes.c b/source/dnode/mgmt/src/dnodeVnodes.c index 72048a15a5..46b79c84a0 100644 --- a/source/dnode/mgmt/src/dnodeVnodes.c +++ b/source/dnode/mgmt/src/dnodeVnodes.c @@ -15,6 +15,7 @@ #define _DEFAULT_SOURCE #include "dnodeVnodes.h" +#include "dnodeTransport.h" #include "thash.h" #include "tqueue.h" #include "tstep.h" @@ -666,6 +667,17 @@ void dnodeProcessVnodeFetchMsg(SRpcMsg *pMsg, SEpSet *pEpSet) { } } +static int32_t dnodePutMsgIntoVnodeApplyQueue(int32_t vgId, SVnodeMsg *pMsg) { + SVnodeObj *pVnode = dnodeAcquireVnode(vgId); + if (pVnode == NULL) { + return terrno; + } + + int32_t code = taosWriteQitem(pVnode->pApplyQ, pMsg); + dnodeReleaseVnode(pVnode); + return code; +} + static int32_t dnodeInitVnodeMgmtWorker() { SWorkerPool *pPool = &tsVnodes.mgmtPool; pPool->name = "vnode-mgmt"; @@ -811,11 +823,20 @@ static int32_t dnodeInitVnodeSyncWorker() { static void dnodeCleanupVnodeSyncWorker() { tMWorkerCleanup(&tsVnodes.syncPool); } +static int32_t dnodeInitVnodeModule() { + SVnodePara para; + para.SendMsgToDnode = dnodeSendMsgToDnode; + para.SendMsgToMnode = dnodeSendMsgToMnode; + para.PutMsgIntoApplyQueue = dnodePutMsgIntoVnodeApplyQueue; + + return vnodeInit(para); +} + int32_t dnodeInitVnodes() { dInfo("dnode-vnodes start to init"); SSteps *pSteps = taosStepInit(3, dnodeReportStartup); - taosStepAdd(pSteps, "dnode-vnode-env", vnodeInit, vnodeCleanup); + taosStepAdd(pSteps, "dnode-vnode-env", dnodeInitVnodeModule, vnodeCleanup); taosStepAdd(pSteps, "dnode-vnode-mgmt", dnodeInitVnodeMgmtWorker, dnodeCleanupVnodeMgmtWorker); taosStepAdd(pSteps, "dnode-vnode-read", dnodeInitVnodeReadWorker, dnodeCleanupVnodeReadWorker); taosStepAdd(pSteps, "dnode-vnode-write", dnodeInitVnodeWriteWorker, dnodeCleanupVnodeWriteWorker); diff --git a/source/dnode/vnode/impl/src/vnodeInt.c b/source/dnode/vnode/impl/src/vnodeInt.c index c345f2e1b9..e08cc47aa1 100644 --- a/source/dnode/vnode/impl/src/vnodeInt.c +++ b/source/dnode/vnode/impl/src/vnodeInt.c @@ -17,7 +17,7 @@ #include "vnodeInt.h" #include "tqueue.h" -int32_t vnodeInit() { return 0; } +int32_t vnodeInit(SVnodePara para) { return 0; } void vnodeCleanup() {} SVnode *vnodeOpen(int32_t vgId, const char *path) { return NULL; } From 97f16ecb65aaf32dbdb2121a000f9b5fb9540a87 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Thu, 4 Nov 2021 14:45:48 +0800 Subject: [PATCH 4/7] minor changes --- include/common/taosmsg.h | 10 +++++----- include/server/vnode/vnode.h | 2 +- include/util/tdef.h | 3 ++- source/common/src/tname.c | 2 +- source/dnode/mgmt/src/dnodeVnodes.c | 2 +- source/dnode/mnode/inc/mnodeDef.h | 6 +++--- source/libs/parser/src/parserUtil.c | 2 +- 7 files changed, 14 insertions(+), 13 deletions(-) diff --git a/include/common/taosmsg.h b/include/common/taosmsg.h index 822f850018..2ccab41f8f 100644 --- a/include/common/taosmsg.h +++ b/include/common/taosmsg.h @@ -325,7 +325,7 @@ typedef struct { typedef struct { char tableFname[TSDB_TABLE_FNAME_LEN]; - char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN]; + char db[TSDB_FULL_DB_NAME_LEN]; int16_t type; /* operation type */ int16_t numOfCols; /* number of schema */ int32_t tagValLen; @@ -570,7 +570,7 @@ typedef struct SRetrieveTableRsp { } SRetrieveTableRsp; typedef struct { - char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN]; + char db[TSDB_FULL_DB_NAME_LEN]; int32_t cacheBlockSize; //MB int32_t totalBlocks; int32_t maxTables; @@ -701,7 +701,7 @@ typedef struct { } SVnodeDesc; typedef struct { - char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN]; + char db[TSDB_FULL_DB_NAME_LEN]; uint32_t vgId; int32_t cacheBlockSize; int32_t totalBlocks; @@ -804,13 +804,13 @@ typedef struct { */ typedef struct { int8_t type; - char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN]; + char db[TSDB_FULL_DB_NAME_LEN]; uint16_t payloadLen; char payload[]; } SShowMsg; typedef struct { - char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN]; + char db[TSDB_FULL_DB_NAME_LEN]; int32_t numOfVgroup; int32_t vgid[]; } SCompactMsg; diff --git a/include/server/vnode/vnode.h b/include/server/vnode/vnode.h index 6db0eb86f6..16699b855a 100644 --- a/include/server/vnode/vnode.h +++ b/include/server/vnode/vnode.h @@ -27,7 +27,7 @@ extern "C" { typedef struct SVnode SVnode; typedef struct { - char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN]; + char db[TSDB_FULL_DB_NAME_LEN]; int32_t cacheBlockSize; // MB int32_t totalBlocks; int32_t daysPerFile; diff --git a/include/util/tdef.h b/include/util/tdef.h index 80cd3cf8b8..66e5f28bde 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -153,11 +153,12 @@ do { \ #define TSDB_NODE_NAME_LEN 64 #define TSDB_TABLE_NAME_LEN 193 // it is a null-terminated string #define TSDB_DB_NAME_LEN 33 +#define TSDB_FULL_DB_NAME_LEN (TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN) #define TSDB_FUNC_NAME_LEN 65 #define TSDB_FUNC_CODE_LEN (65535 - 512) #define TSDB_FUNC_BUF_SIZE 512 #define TSDB_TYPE_STR_MAX_LEN 32 -#define TSDB_TABLE_FNAME_LEN (TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN + TSDB_TABLE_NAME_LEN) +#define TSDB_TABLE_FNAME_LEN (TSDB_FULL_DB_NAME_LEN + TSDB_TABLE_NAME_LEN) #define TSDB_COL_NAME_LEN 65 #define TSDB_MAX_SAVED_SQL_LEN TSDB_MAX_COLUMNS * 64 #define TSDB_MAX_SQL_LEN TSDB_PAYLOAD_SIZE diff --git a/source/common/src/tname.c b/source/common/src/tname.c index c290a04ebc..28f920a6a9 100644 --- a/source/common/src/tname.c +++ b/source/common/src/tname.c @@ -201,7 +201,7 @@ int32_t tNameExtractFullName(const SName* name, char* dst) { return -1; } - int32_t len = snprintf(dst, TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN, "%s.%s", name->acctId, name->dbname); + int32_t len = snprintf(dst, TSDB_FULL_DB_NAME_LEN, "%s.%s", name->acctId, name->dbname); size_t tnameLen = strlen(name->tname); if (tnameLen > 0) { diff --git a/source/dnode/mgmt/src/dnodeVnodes.c b/source/dnode/mgmt/src/dnodeVnodes.c index 46b79c84a0..c7682539ef 100644 --- a/source/dnode/mgmt/src/dnodeVnodes.c +++ b/source/dnode/mgmt/src/dnodeVnodes.c @@ -333,7 +333,7 @@ static int32_t dnodeParseCreateVnodeReq(SRpcMsg *rpcMsg, int32_t *vgId, SVnodeCf SCreateVnodeMsg *pCreate = rpcMsg->pCont; *vgId = htonl(pCreate->vgId); - tstrncpy(pCfg->db, pCreate->db, TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN); + tstrncpy(pCfg->db, pCreate->db, TSDB_FULL_DB_NAME_LEN); pCfg->cacheBlockSize = htonl(pCreate->cacheBlockSize); pCfg->totalBlocks = htonl(pCreate->totalBlocks); pCfg->daysPerFile = htonl(pCreate->daysPerFile); diff --git a/source/dnode/mnode/inc/mnodeDef.h b/source/dnode/mnode/inc/mnodeDef.h index 33606e8ee2..0825815bc7 100644 --- a/source/dnode/mnode/inc/mnodeDef.h +++ b/source/dnode/mnode/inc/mnodeDef.h @@ -208,7 +208,7 @@ typedef struct { typedef struct SDbObj { SdbHead head; - char name[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN]; + char name[TSDB_FULL_DB_NAME_LEN]; char acct[TSDB_USER_LEN]; int64_t createdTime; int64_t updateTime; @@ -236,7 +236,7 @@ typedef struct SVgObj { int64_t updateTime; int32_t lbDnodeId; int32_t lbTime; - char dbName[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN]; + char dbName[TSDB_FULL_DB_NAME_LEN]; int8_t inUse; int8_t accessState; int8_t status; @@ -288,7 +288,7 @@ typedef struct { void *pIter; void *pVgIter; void **ppShow; - char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN]; + char db[TSDB_FULL_DB_NAME_LEN]; int16_t offset[TSDB_MAX_COLUMNS]; int32_t bytes[TSDB_MAX_COLUMNS]; char payload[]; diff --git a/source/libs/parser/src/parserUtil.c b/source/libs/parser/src/parserUtil.c index 0bef796026..5aab21bdae 100644 --- a/source/libs/parser/src/parserUtil.c +++ b/source/libs/parser/src/parserUtil.c @@ -1934,7 +1934,7 @@ char* cloneCurrentDBName(SSqlObj* pSql) { case TAOS_REQ_FROM_HTTP: pCtx = pSql->param; if (pCtx && pCtx->db[0] != '\0') { - char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN] = {0}; + char db[TSDB_FULL_DB_NAME_LEN] = {0}; int32_t len = sprintf(db, "%s%s%s", pTscObj->acctId, TS_PATH_DELIMITER, pCtx->db); assert(len <= sizeof(db)); From a12a4f38c50549e29979c1e8a12cf6610eeeea51 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Thu, 4 Nov 2021 17:03:24 +0800 Subject: [PATCH 5/7] add vnode file --- include/util/taoserror.h | 1 + include/util/tdef.h | 2 +- include/util/tqueue.h | 1 + source/dnode/mgmt/src/dnodeVnodes.c | 252 ++++++++++++++++++++++------ source/util/src/terror.c | 1 + source/util/src/tqueue.c | 14 ++ 6 files changed, 218 insertions(+), 53 deletions(-) diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 0579ae46bc..e2f9a58023 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -223,6 +223,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_DND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0406) //"Action in progress") #define TSDB_CODE_DND_TOO_MANY_VNODES TAOS_DEF_ERROR_CODE(0, 0x0407) //"Too many vnode directories") #define TSDB_CODE_DND_EXITING TAOS_DEF_ERROR_CODE(0, 0x0408) //"Dnode is exiting" +#define TSDB_CODE_DND_PARSE_VNODE_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0409) //"Parse vnodes.json error") // vnode #define TSDB_CODE_VND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0500) //"Action in progress") diff --git a/include/util/tdef.h b/include/util/tdef.h index 66e5f28bde..0ad0f68f3f 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -212,7 +212,7 @@ do { \ #define TSDB_EXTRA_PAYLOAD_SIZE 128 // extra bytes for auth #define TSDB_CQ_SQL_SIZE 1024 #define TSDB_MIN_VNODES 64 -#define TSDB_MAX_VNODES 2048 +#define TSDB_MAX_VNODES 512 #define TSDB_MIN_VNODES_PER_DB 2 #define TSDB_MAX_VNODES_PER_DB 64 diff --git a/include/util/tqueue.h b/include/util/tqueue.h index 24c56ea6a3..bcb9aea856 100644 --- a/include/util/tqueue.h +++ b/include/util/tqueue.h @@ -50,6 +50,7 @@ void *taosAllocateQitem(int size); void taosFreeQitem(void *pItem); int taosWriteQitem(taos_queue, void *pItem); int taosReadQitem(taos_queue, void **pItem); +bool taosQueueEmpty(taos_queue); taos_qall taosAllocateQall(); void taosFreeQall(taos_qall); diff --git a/source/dnode/mgmt/src/dnodeVnodes.c b/source/dnode/mgmt/src/dnodeVnodes.c index c7682539ef..8bf80ccff8 100644 --- a/source/dnode/mgmt/src/dnodeVnodes.c +++ b/source/dnode/mgmt/src/dnodeVnodes.c @@ -16,7 +16,9 @@ #define _DEFAULT_SOURCE #include "dnodeVnodes.h" #include "dnodeTransport.h" +#include "cJSON.h" #include "thash.h" +#include "tlockfree.h" #include "tqueue.h" #include "tstep.h" #include "tthread.h" @@ -56,7 +58,7 @@ static struct { SSteps *pSteps; int32_t openVnodes; int32_t totalVnodes; - char file[PATH_MAX + 20]; + SRWLatch latch; } tsVnodes; static int32_t dnodeAllocVnodeQueryQueue(SVnodeObj *pVnode); @@ -70,6 +72,28 @@ static void dnodeFreeVnodeApplyQueue(SVnodeObj *pVnode); static int32_t dnodeAllocVnodeSyncQueue(SVnodeObj *pVnode); static void dnodeFreeVnodeSyncQueue(SVnodeObj *pVnode); +static SVnodeObj *dnodeAcquireVnode(int32_t vgId) { + SVnodeObj *pVnode = NULL; + int32_t refCount = 0; + + taosRLockLatch(&tsVnodes.latch); + taosHashGetClone(tsVnodes.hash, &vgId, sizeof(int32_t), (void *)&pVnode); + if (pVnode == NULL) { + terrno = TSDB_CODE_VND_INVALID_VGROUP_ID; + } else { + refCount = atomic_add_fetch_32(&pVnode->refCount, 1); + } + taosRUnLockLatch(&tsVnodes.latch); + + dTrace("vgId:%d, accquire vnode, refCount:%d", pVnode->vgId, refCount); + return pVnode; +} + +static void dnodeReleaseVnode(SVnodeObj *pVnode) { + int32_t refCount = atomic_sub_fetch_32(&pVnode->refCount, 1); + dTrace("vgId:%d, release vnode, refCount:%d", pVnode->vgId, refCount); +} + static int32_t dnodeCreateVnodeWrapper(int32_t vgId, SVnode *pImpl) { SVnodeObj *pVnode = calloc(1, sizeof(SVnodeObj)); if (pVnode == NULL) { @@ -107,13 +131,27 @@ static int32_t dnodeCreateVnodeWrapper(int32_t vgId, SVnode *pImpl) { return code; } - return taosHashPut(tsVnodes.hash, &vgId, sizeof(int32_t), &pVnode, sizeof(SVnodeObj *)); + taosWLockLatch(&tsVnodes.latch); + code = taosHashPut(tsVnodes.hash, &vgId, sizeof(int32_t), &pVnode, sizeof(SVnodeObj *)); + taosWUnLockLatch(&tsVnodes.latch); + + return code; } static void dnodeDropVnodeWrapper(SVnodeObj *pVnode) { + taosWLockLatch(&tsVnodes.latch); taosHashRemove(tsVnodes.hash, &pVnode->vgId, sizeof(int32_t)); + taosWUnLockLatch(&tsVnodes.latch); + + // wait all queue empty + dnodeReleaseVnode(pVnode); + while (pVnode->refCount > 0) taosMsleep(10); + while (!taosQueueEmpty(pVnode->pWriteQ)) taosMsleep(10); + while (!taosQueueEmpty(pVnode->pSyncQ)) taosMsleep(10); + while (!taosQueueEmpty(pVnode->pApplyQ)) taosMsleep(10); + while (!taosQueueEmpty(pVnode->pQueryQ)) taosMsleep(10); + while (!taosQueueEmpty(pVnode->pFetchQ)) taosMsleep(10); - //todo wait all queue empty dnodeFreeVnodeQueryQueue(pVnode); dnodeFreeVnodeFetchQueue(pVnode); dnodeFreeVnodeWriteQueue(pVnode); @@ -121,35 +159,164 @@ static void dnodeDropVnodeWrapper(SVnodeObj *pVnode) { dnodeFreeVnodeSyncQueue(pVnode); } -static int32_t dnodeGetVnodesFromHash(SVnodeObj *pVnodes[], int32_t *numOfVnodes) { +static SVnodeObj **dnodeGetVnodesFromHash(int32_t *numOfVnodes) { + taosRLockLatch(&tsVnodes.latch); + + int32_t num = 0; + int32_t size = taosHashGetSize(tsVnodes.hash); + SVnodeObj **pVnodes = calloc(size, sizeof(SVnodeObj *)); + void *pIter = taosHashIterate(tsVnodes.hash, NULL); while (pIter) { SVnodeObj **ppVnode = pIter; - if (*ppVnode) { - (*numOfVnodes)++; - if (*numOfVnodes >= TSDB_MAX_VNODES) { - dError("vgId:%d, too many open vnodes, exist:%d max:%d", (*ppVnode)->vgId, *numOfVnodes, TSDB_MAX_VNODES); - continue; - } else { - pVnodes[*numOfVnodes - 1] = (*ppVnode); + SVnodeObj *pVnode = *ppVnode; + if (pVnode) { + num++; + if (num < size) { + int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1); + dTrace("vgId:%d, accquire vnode, refCount:%d", pVnode->vgId, refCount); + pVnodes[num] = (*ppVnode); } } - pIter = taosHashIterate(tsVnodes.hash, pIter); } - return TSDB_CODE_SUCCESS; + taosRUnLockLatch(&tsVnodes.latch); + *numOfVnodes = num; + + return pVnodes; } -static int32_t dnodeGetVnodesFromFile(SVnodeObj *pVnodes, int32_t *numOfVnodes) { - pVnodes[0].vgId = 2; - pVnodes[0].dropped = 0; - pVnodes[0].vgId = 3; - pVnodes[0].dropped = 0; - return 0; +static int32_t dnodeGetVnodesFromFile(SVnodeObj **ppVnodes, int32_t *numOfVnodes) { + int32_t code = TSDB_CODE_DND_PARSE_VNODE_FILE_ERROR; + int32_t len = 0; + int32_t maxLen = 30000; + char *content = calloc(1, maxLen + 1); + cJSON *root = NULL; + FILE *fp = NULL; + char file[PATH_MAX + 20] = {0}; + SVnodeObj *pVnodes = NULL; + + snprintf(file, PATH_MAX + 20, "%s/vnodes.json", tsVnodeDir); + + fp = fopen(file, "r"); + if (!fp) { + dDebug("file %s not exist", file); + code = 0; + goto PRASE_VNODE_OVER; + } + + len = (int32_t)fread(content, 1, maxLen, fp); + if (len <= 0) { + dError("failed to read %s since content is null", file); + goto PRASE_VNODE_OVER; + } + + content[len] = 0; + root = cJSON_Parse(content); + if (root == NULL) { + dError("failed to read %s since invalid json format", file); + goto PRASE_VNODE_OVER; + } + + cJSON *vnodes = cJSON_GetObjectItem(root, "vnodes"); + if (!vnodes || vnodes->type != cJSON_Array) { + dError("failed to read %s since vnodes not found", file); + goto PRASE_VNODE_OVER; + } + + int32_t vnodesNum = cJSON_GetArraySize(vnodes); + if (vnodesNum <= 0) { + dError("failed to read %s since vnodes size:%d invalid", file, vnodesNum); + goto PRASE_VNODE_OVER; + } + + pVnodes = calloc(vnodesNum, sizeof(SVnodeObj)); + if (pVnodes == NULL) { + dError("failed to read %s since out of memory", file); + goto PRASE_VNODE_OVER; + } + + for (int32_t i = 0; i < vnodesNum; ++i) { + cJSON *vnode = cJSON_GetArrayItem(vnodes, i); + SVnodeObj *pVnode = &pVnodes[i]; + + cJSON *vgId = cJSON_GetObjectItem(vnode, "vgId"); + if (!vgId || vgId->type != cJSON_String) { + dError("failed to read %s since vgId not found", file); + goto PRASE_VNODE_OVER; + } + pVnode->vgId = atoi(vgId->valuestring); + + cJSON *dropped = cJSON_GetObjectItem(vnode, "dropped"); + if (!dropped || dropped->type != cJSON_String) { + dError("failed to read %s since dropped not found", file); + goto PRASE_VNODE_OVER; + } + pVnode->dropped = atoi(vnode->valuestring); + } + + code = 0; + dInfo("succcessed to read file %s", file); + +PRASE_VNODE_OVER: + if (content != NULL) free(content); + if (root != NULL) cJSON_Delete(root); + if (fp != NULL) fclose(fp); + + return code; } -static int32_t dnodeWriteVnodesToFile() { return 0; } +static int32_t dnodeWriteVnodesToFile() { + char file[PATH_MAX + 20] = {0}; + char realfile[PATH_MAX + 20] = {0}; + snprintf(file, PATH_MAX + 20, "%s/vnodes.json.bak", tsVnodeDir); + snprintf(realfile, PATH_MAX + 20, "%s/vnodes.json", tsVnodeDir); + + FILE *fp = fopen(file, "w"); + if (!fp) { + dError("failed to write %s since %s", file, strerror(errno)); + return -1; + } + + int32_t len = 0; + int32_t maxLen = 30000; + char *content = calloc(1, maxLen + 1); + int32_t numOfVnodes = 0; + SVnodeObj **pVnodes = dnodeGetVnodesFromHash(&numOfVnodes); + + len += snprintf(content + len, maxLen - len, "{\n"); + len += snprintf(content + len, maxLen - len, " \"vnodes\": [{\n"); + for (int32_t i = 0; i < numOfVnodes; ++i) { + SVnodeObj *pVnode = pVnodes[i]; + len += snprintf(content + len, maxLen - len, " \"vgId\": \"%d\",\n", pVnode->vgId); + len += snprintf(content + len, maxLen - len, " \"dropped\": \"%d\"\n", pVnode->dropped); + if (i < numOfVnodes - 1) { + len += snprintf(content + len, maxLen - len, " },{\n"); + } else { + len += snprintf(content + len, maxLen - len, " }]\n"); + } + } + len += snprintf(content + len, maxLen - len, "}\n"); + + fwrite(content, 1, len, fp); + taosFsyncFile(fileno(fp)); + fclose(fp); + free(content); + terrno = 0; + + for (int32_t i = 0; i < numOfVnodes; ++i) { + SVnodeObj *pVnode = pVnodes[i]; + dnodeReleaseVnode(pVnode); + } + + if (pVnodes != NULL) { + free(pVnodes); + } + + dInfo("successed to write %s", file); + return taosRenameFile(file, realfile); +} static int32_t dnodeCreateVnode(int32_t vgId, SVnodeCfg *pCfg) { int32_t code = 0; @@ -193,24 +360,6 @@ static int32_t dnodeDropVnode(SVnodeObj *pVnode) { return 0; } -static SVnodeObj *dnodeAcquireVnode(int32_t vgId) { - SVnodeObj *pVnode = NULL; - - taosHashGetClone(tsVnodes.hash, &vgId, sizeof(int32_t), (void *)&pVnode); - if (pVnode == NULL) { - terrno = TSDB_CODE_VND_INVALID_VGROUP_ID; - } - - int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1); - dTrace("vgId:%d, accquire vnode, refCount:%d", pVnode->vgId, refCount); - return pVnode; -} - -static void dnodeReleaseVnode(SVnodeObj *pVnode) { - int32_t refCount = atomic_sub_fetch_32(&pVnode->refCount, 1); - dTrace("vgId:%d, release vnode, refCount:%d", pVnode->vgId, refCount); -} - static void *dnodeOpenVnodeFunc(void *param) { SVThread *pThread = param; @@ -246,15 +395,17 @@ static void *dnodeOpenVnodeFunc(void *param) { } static int32_t dnodeOpenVnodes() { + taosInitRWLatch(&tsVnodes.latch); + tsVnodes.hash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); if (tsVnodes.hash == NULL) { dError("failed to init vnode hash"); return TSDB_CODE_VND_OUT_OF_MEMORY; } - SVnodeObj pVnodes[TSDB_MAX_VNODES] = {0}; - int32_t numOfVnodes = 0; - int32_t code = dnodeGetVnodesFromFile(pVnodes, &numOfVnodes); + SVnodeObj *pVnodes = NULL; + int32_t numOfVnodes = 0; + int32_t code = dnodeGetVnodesFromFile(&pVnodes, &numOfVnodes); if (code != TSDB_CODE_SUCCESS) { dInfo("failed to get vnode list from disk since %s", tstrerror(code)); return code; @@ -308,17 +459,14 @@ static int32_t dnodeOpenVnodes() { } static void dnodeCloseVnodes() { - SVnodeObj *pVnodes[TSDB_MAX_VNODES] = {0}; - int32_t numOfVnodes = 0; - - int32_t code = dnodeGetVnodesFromHash(pVnodes, &numOfVnodes); - if (code != TSDB_CODE_SUCCESS) { - dInfo("failed to get dnode list since code %d", code); - return; - } + int32_t numOfVnodes = 0; + SVnodeObj **pVnodes = dnodeGetVnodesFromHash(&numOfVnodes); for (int32_t i = 0; i < numOfVnodes; ++i) { - vnodeClose(pVnodes[i]->pImpl); + dnodeDropVnodeWrapper(pVnodes[i]); + } + if (pVnodes != NULL) { + free(pVnodes); } if (tsVnodes.hash != NULL) { @@ -431,12 +579,12 @@ static int32_t vnodeProcessDropVnodeReq(SRpcMsg *rpcMsg) { return code; } - code = vnodeDrop(pVnode->pImpl); + code = dnodeDropVnode(pVnode); if (code != 0) { + dnodeReleaseVnode(pVnode); dError("vgId:%d, failed to drop vnode since %s", vgId, tstrerror(code)); } - dnodeReleaseVnode(pVnode); return code; } diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 22fbeb1883..6838bab403 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -235,6 +235,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_DND_INVALID_MSG_LEN, "Invalid message lengt TAOS_DEFINE_ERROR(TSDB_CODE_DND_ACTION_IN_PROGRESS, "Action in progress") TAOS_DEFINE_ERROR(TSDB_CODE_DND_TOO_MANY_VNODES, "Too many vnode directories") TAOS_DEFINE_ERROR(TSDB_CODE_DND_EXITING, "Dnode is exiting") +TAOS_DEFINE_ERROR(TSDB_CODE_DND_PARSE_VNODE_FILE_ERROR, "Parse vnodes.json error") // vnode TAOS_DEFINE_ERROR(TSDB_CODE_VND_ACTION_IN_PROGRESS, "Action in progress") diff --git a/source/util/src/tqueue.c b/source/util/src/tqueue.c index 2813a55fea..5d6a507172 100644 --- a/source/util/src/tqueue.c +++ b/source/util/src/tqueue.c @@ -98,6 +98,20 @@ void taosCloseQueue(taos_queue param) { uTrace("queue:%p is closed", queue); } +bool taosQueueEmpty(taos_queue param) { + if (param == NULL) return true; + STaosQueue *queue = (STaosQueue *)param; + + bool empty = false; + pthread_mutex_lock(&queue->mutex); + if (queue->head == NULL && queue->tail == NULL) { + empty = true; + } + pthread_mutex_destroy(&queue->mutex); + + return empty; +} + void *taosAllocateQitem(int size) { STaosQnode *pNode = (STaosQnode *)calloc(sizeof(STaosQnode) + size, 1); From dd3281c9d2f163530809f8dc1f0568ace824ced8 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Thu, 4 Nov 2021 18:38:07 +0800 Subject: [PATCH 6/7] minor changes --- include/server/vnode/vnode.h | 4 ++-- source/dnode/vnode/impl/src/vnodeInt.c | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/include/server/vnode/vnode.h b/include/server/vnode/vnode.h index 16699b855a..9bd6888479 100644 --- a/include/server/vnode/vnode.h +++ b/include/server/vnode/vnode.h @@ -74,11 +74,11 @@ SVnode *vnodeOpen(int32_t vgId, const char *path); void vnodeClose(SVnode *pVnode); int32_t vnodeAlter(SVnode *pVnode, const SVnodeCfg *pCfg); SVnode *vnodeCreate(int32_t vgId, const char *path, const SVnodeCfg *pCfg); -int32_t vnodeDrop(SVnode *pVnode); +void vnodeDrop(SVnode *pVnode); int32_t vnodeCompact(SVnode *pVnode); int32_t vnodeSync(SVnode *pVnode); -void vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad); +int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad); SVnodeMsg *vnodeInitMsg(int32_t msgNum); int32_t vnodeAppendMsg(SVnodeMsg *pMsg, SRpcMsg *pRpcMsg); diff --git a/source/dnode/vnode/impl/src/vnodeInt.c b/source/dnode/vnode/impl/src/vnodeInt.c index e08cc47aa1..20a94c0786 100644 --- a/source/dnode/vnode/impl/src/vnodeInt.c +++ b/source/dnode/vnode/impl/src/vnodeInt.c @@ -24,11 +24,11 @@ SVnode *vnodeOpen(int32_t vgId, const char *path) { return NULL; } void vnodeClose(SVnode *pVnode) {} int32_t vnodeAlter(SVnode *pVnode, const SVnodeCfg *pCfg) { return 0; } SVnode *vnodeCreate(int32_t vgId, const char *path, const SVnodeCfg *pCfg) { return NULL; } -int32_t vnodeDrop(SVnode *pVnode) { return 0; } +void vnodeDrop(SVnode *pVnode) {} int32_t vnodeCompact(SVnode *pVnode) { return 0; } int32_t vnodeSync(SVnode *pVnode) { return 0; } -void vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) {} +int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) { return 0; } SVnodeMsg *vnodeInitMsg(int32_t msgNum) { SVnodeMsg *pMsg = taosAllocateQitem(msgNum * sizeof(SRpcMsg *) + sizeof(SVnodeMsg)); From da1bf2a2a8fb4f6a15d68ab046630c3e3c1cff56 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Thu, 4 Nov 2021 18:39:12 +0800 Subject: [PATCH 7/7] minor changes --- include/server/vnode/vnode.h | 4 ++-- source/dnode/vnode/impl/src/vnodeInt.c | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/include/server/vnode/vnode.h b/include/server/vnode/vnode.h index 9bd6888479..e570cf4261 100644 --- a/include/server/vnode/vnode.h +++ b/include/server/vnode/vnode.h @@ -53,7 +53,7 @@ typedef enum { VN_MSG_TYPE_SYNC, VN_MSG_TYPE_QUERY, VN_MSG_TYPE_FETCH -} EVMType; +} EVnMsgType; typedef struct { int32_t curNum; @@ -83,7 +83,7 @@ int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad); SVnodeMsg *vnodeInitMsg(int32_t msgNum); int32_t vnodeAppendMsg(SVnodeMsg *pMsg, SRpcMsg *pRpcMsg); void vnodeCleanupMsg(SVnodeMsg *pMsg); -void vnodeProcessMsg(SVnode *pVnode, SVnodeMsg *pMsg, EVMType msgType); +void vnodeProcessMsg(SVnode *pVnode, SVnodeMsg *pMsg, EVnMsgType msgType); #ifdef __cplusplus } diff --git a/source/dnode/vnode/impl/src/vnodeInt.c b/source/dnode/vnode/impl/src/vnodeInt.c index 20a94c0786..427a6dae4d 100644 --- a/source/dnode/vnode/impl/src/vnodeInt.c +++ b/source/dnode/vnode/impl/src/vnodeInt.c @@ -56,7 +56,7 @@ void vnodeCleanupMsg(SVnodeMsg *pMsg) { taosFreeQitem(pMsg); } -void vnodeProcessMsg(SVnode *pVnode, SVnodeMsg *pMsg, EVMType msgType) { +void vnodeProcessMsg(SVnode *pVnode, SVnodeMsg *pMsg, EVnMsgType msgType) { switch (msgType) { case VN_MSG_TYPE_WRITE: break;