diff --git a/include/common/taosmsg.h b/include/common/taosmsg.h index e798a0c42a..49e8892c7d 100644 --- a/include/common/taosmsg.h +++ b/include/common/taosmsg.h @@ -36,7 +36,7 @@ enum { TSDB_MESSAGE_NULL = 0, #endif -// message from client to dnode +// message from client to vnode TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_SUBMIT, "submit" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_QUERY, "query" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_FETCH, "fetch" ) @@ -46,25 +46,12 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_ALTER_TABLE, "alter-table" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_UPDATE_TAG_VAL, "update-tag-val" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_TABLE_META, "table-meta" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_TABLES_META, "tables-meta" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_STABLE_VGROUP, "stable-vgroup" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_CONSUME, "mq-consume" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_QUERY, "mq-query" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_CONNECT, "mq-connect" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_DISCONNECT, "mq-disconnect" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_ACK, "mq-ack" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_RESET, "mq-reset" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_NETWORK_TEST, "nettest" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY0, "dummy0" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY1, "dummy1" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY2, "dummy2" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY3, "dummy3" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY4, "dummy4" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY5, "dummy5" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY6, "dummy6" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY7, "dummy7" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY8, "dummy8" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY9, "dummy9" ) - // message from client to mnode TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CONNECT, "connect" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_ACCT, "create-acct" ) @@ -88,6 +75,7 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_FUNCTION, "create-function" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_ALTER_FUNCTION, "alter-function" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_FUNCTION, "drop-function" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_STABLE, "create-stable" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_STABLE_VGROUP, "stable-vgroup" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_STABLE, "drop-stable" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_ALTER_STABLE, "alter-stable" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_KILL_QUERY, "kill-query" ) @@ -97,54 +85,55 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_SHOW, "show" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_SHOW_RETRIEVE, "retrieve" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_SHOW_RETRIEVE_FUNC, "retrieve-func" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_COMPACT_VNODE, "compact-vnode" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY10, "dummy10" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY11, "dummy11" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY12, "dummy12" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY13, "dummy13" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY14, "dummy14" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY15, "dummy15" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY16, "dummy16" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY17, "dummy17" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY18, "dummy18" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY19, "dummy19" ) +// message from client to qnode +// message from client to dnode +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_NETWORK_TEST, "nettest" ) -// message from mnode to dnode +// message from vnode to vnode +// message from vnode to mnode +// message from vnode to qnode +// message from vnode to dnode + +// message from mnode to vnode TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_STABLE_IN, "create-stable" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_ALTER_STABLE_IN, "alter-stable" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_STABLE_IN, "drop-stable" ) +// message from mnode to mnode +// message from mnode to qnode +// message from mnode to dnode TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_VNODE_IN, "create-vnode" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_ALTER_VNODE_IN, "alter-vnode" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_VNODE_IN, "drop-vnode" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_AUTH_VNODE_IN, "auth-vnode" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_SYNC_VNODE_IN, "sync-vnode" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_COMPACT_VNODE_IN, "compact-vnode" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_MNODE_IN, "create-mnode" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_MNODE_IN, "drop-mnode" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CONFIG_DNODE_IN, "config-dnode" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY20, "dummy20" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY21, "dummy21" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY22, "dummy22" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY23, "dummy23" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY24, "dummy24" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY25, "dummy25" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY26, "dummy26" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY27, "dummy27" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY28, "dummy28" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY29, "dummy29" ) +// message from qnode to vnode +// message from qnode to mnode +// message from qnode to qnode +// message from qnode to dnode + +// message from dnode to vnode // message from dnode to mnode TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_STATUS, "status" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_GRANT, "grant" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_AUTH, "auth" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY30, "dummy30" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY31, "dummy31" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY32, "dummy32" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY33, "dummy33" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY34, "dummy34" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY35, "dummy35" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY36, "dummy36" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY37, "dummy37" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY38, "dummy38" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY39, "dummy39" ) +// message from dnode to qnode +// message from dnode to dnode + +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY0, "dummy0" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY1, "dummy1" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY2, "dummy2" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY3, "dummy3" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY4, "dummy4" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY5, "dummy5" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY6, "dummy6" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY7, "dummy7" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY8, "dummy8" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY9, "dummy9" ) #ifndef TAOS_MESSAGE_C TSDB_MSG_TYPE_MAX // 147 @@ -428,10 +417,6 @@ typedef struct { char tableFname[TSDB_TABLE_FNAME_LEN]; } SDropSTableMsg; -typedef struct { - int32_t vgId; -} SDropVnodeMsg, SSyncVnodeMsg, SCompactVnodeMsg; - typedef struct SColIndex { int16_t colId; // column id int16_t colIndex; // column index in colList if it is a normal column or index in tagColList if a tag @@ -661,9 +646,8 @@ typedef struct { typedef struct { int32_t vgId; - int8_t status; int8_t role; - int8_t reserved[2]; + int8_t reserved[3]; int64_t totalStorage; int64_t compStorage; int64_t pointsWritten; @@ -737,9 +721,18 @@ typedef struct { int8_t replica; int8_t quorum; int8_t selfIndex; - SVnodeDesc nodes[TSDB_MAX_REPLICA]; + SVnodeDesc replicas[TSDB_MAX_REPLICA]; } SCreateVnodeMsg, SAlterVnodeMsg; +typedef struct { + int32_t vgId; +} SDropVnodeMsg, SSyncVnodeMsg, SCompactVnodeMsg; + +typedef struct { + int32_t vgId; + int8_t accessState; +} SAuthVnodeMsg; + typedef struct { char tableFname[TSDB_TABLE_FNAME_LEN]; int16_t createFlag; @@ -1002,6 +995,7 @@ typedef struct { /* data */ } SUpdateTagValRsp; + #pragma pack(pop) #ifdef __cplusplus diff --git a/include/server/vnode/vnode.h b/include/server/vnode/vnode.h index c4f8df79c1..a20a7fd410 100644 --- a/include/server/vnode/vnode.h +++ b/include/server/vnode/vnode.h @@ -27,7 +27,7 @@ extern "C" { typedef struct SVnode SVnode; typedef struct { - char dbName[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN]; + char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN]; int32_t cacheBlockSize; // MB int32_t totalBlocks; int32_t daysPerFile; @@ -47,17 +47,6 @@ typedef struct { SVnodeDesc replicas[TSDB_MAX_REPLICA]; } SVnodeCfg; -typedef struct { - int64_t totalStorage; - int64_t compStorage; - int64_t pointsWritten; - int64_t tablesNum; -} SVnodeStatisic; - -typedef struct { - int8_t syncRole; -} SVnodeStatus; - typedef struct SVnodeMsg { int32_t msgType; int32_t code; @@ -69,9 +58,6 @@ typedef struct SVnodeMsg { int32_t vnodeInit(); void vnodeCleanup(); -int32_t vnodeGetStatistics(SVnode *pVnode, SVnodeStatisic *pStat); -int32_t vnodeGetStatus(SVnode *pVnode, SVnodeStatus *pStatus); - SVnode *vnodeOpen(int32_t vgId, const char *path); void vnodeClose(SVnode *pVnode); int32_t vnodeAlter(SVnode *pVnode, const SVnodeCfg *pCfg); @@ -80,6 +66,7 @@ int32_t vnodeDrop(SVnode *pVnode); int32_t vnodeCompact(SVnode *pVnode); int32_t vnodeSync(SVnode *pVnode); +void vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad); void vnodeProcessMsg(SVnode *pVnode, SVnodeMsg *pMsg); #ifdef __cplusplus diff --git a/source/dnode/mgmt/inc/dnodeInt.h b/source/dnode/mgmt/inc/dnodeInt.h index 472c467ad6..906455dce4 100644 --- a/source/dnode/mgmt/inc/dnodeInt.h +++ b/source/dnode/mgmt/inc/dnodeInt.h @@ -42,7 +42,10 @@ void dnodeCleanup(); EDnStat dnodeGetRunStat(); void dnodeSetRunStat(); -void dnodeGetStartup(SStartupMsg *); + +void dnodeReportStartup(char *name, char *desc); +void dnodeReportStartupFinished(char *name, char *desc); +void dnodeGetStartup(SStartupMsg *); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/inc/dnodeVnodes.h b/source/dnode/mgmt/inc/dnodeVnodes.h index 6e9bce9ae5..2b72ba5d59 100644 --- a/source/dnode/mgmt/inc/dnodeVnodes.h +++ b/source/dnode/mgmt/inc/dnodeVnodes.h @@ -23,9 +23,14 @@ extern "C" { int32_t dnodeInitVnodes(); void dnodeCleanupVnodes(); -void dnodeProcessVnodesMsg(SRpcMsg *pMsg, SEpSet *pEpSet); void dnodeGetVnodes(SVnodeLoads *pVloads); +void dnodeProcessVnodeMgmtMsg(SRpcMsg *pMsg, SEpSet *pEpSet); +void dnodeProcessVnodeWriteMsg(SRpcMsg *pMsg, SEpSet *pEpSet); +void dnodeProcessVnodeSyncMsg(SRpcMsg *pMsg, SEpSet *pEpSet); +void dnodeProcessVnodeQueryMsg(SRpcMsg *pMsg, SEpSet *pEpSet); +void dnodeProcessVnodeFetchMsg(SRpcMsg *pMsg, SEpSet *pEpSet); + #ifdef __cplusplus } #endif diff --git a/source/dnode/mgmt/src/dnodeInt.c b/source/dnode/mgmt/src/dnodeInt.c index e853ba7f14..e7018f4265 100644 --- a/source/dnode/mgmt/src/dnodeInt.c +++ b/source/dnode/mgmt/src/dnodeInt.c @@ -35,14 +35,14 @@ EDnStat dnodeGetRunStat() { return tsInt.runStat; } void dnodeSetRunStat(EDnStat stat) { tsInt.runStat = stat; } -static void dnodeReportStartup(char *name, char *desc) { +void dnodeReportStartup(char *name, char *desc) { SStartupMsg *pStartup = &tsInt.startup; tstrncpy(pStartup->name, name, strlen(pStartup->name)); tstrncpy(pStartup->desc, desc, strlen(pStartup->desc)); pStartup->finished = 0; } -static void dnodeReportStartupFinished(char *name, char *desc) { +void dnodeReportStartupFinished(char *name, char *desc) { SStartupMsg *pStartup = &tsInt.startup; tstrncpy(pStartup->name, name, strlen(pStartup->name)); tstrncpy(pStartup->desc, desc, strlen(pStartup->desc)); diff --git a/source/dnode/mgmt/src/dnodeTransport.c b/source/dnode/mgmt/src/dnodeTransport.c index 18e97e2b9f..265efe3070 100644 --- a/source/dnode/mgmt/src/dnodeTransport.c +++ b/source/dnode/mgmt/src/dnodeTransport.c @@ -34,24 +34,22 @@ static struct { static void dnodeInitMsgFp() { // msg from client to dnode - tsTrans.msgFp[TSDB_MSG_TYPE_SUBMIT] = dnodeProcessVnodesMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_QUERY] = dnodeProcessVnodesMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_FETCH] = dnodeProcessVnodesMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_TABLE] = dnodeProcessVnodesMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_DROP_TABLE] = dnodeProcessVnodesMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_TABLE] = dnodeProcessVnodesMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = dnodeProcessVnodesMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_TABLE_META] = dnodeProcessVnodesMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_TABLES_META] = dnodeProcessVnodesMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_STABLE_VGROUP] = dnodeProcessMnodeMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_MQ_QUERY] = dnodeProcessVnodesMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_MQ_CONSUME] = dnodeProcessVnodesMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_MQ_CONNECT] = dnodeProcessVnodesMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_MQ_DISCONNECT] = dnodeProcessVnodesMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_MQ_ACK] = dnodeProcessVnodesMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_MQ_RESET] = dnodeProcessVnodesMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_NETWORK_TEST] = dnodeProcessDnodeMsg; - + tsTrans.msgFp[TSDB_MSG_TYPE_SUBMIT] = dnodeProcessVnodeWriteMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_QUERY] = dnodeProcessVnodeQueryMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_FETCH] = dnodeProcessVnodeFetchMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_TABLE] = dnodeProcessVnodeWriteMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_DROP_TABLE] = dnodeProcessVnodeWriteMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_TABLE] = dnodeProcessVnodeWriteMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = dnodeProcessVnodeWriteMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_TABLE_META] = dnodeProcessVnodeQueryMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_TABLES_META] = dnodeProcessVnodeQueryMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_MQ_QUERY] = dnodeProcessVnodeQueryMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_MQ_CONSUME] = dnodeProcessVnodeQueryMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_MQ_CONNECT] = dnodeProcessVnodeWriteMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_MQ_DISCONNECT] = dnodeProcessVnodeWriteMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_MQ_ACK] = dnodeProcessVnodeWriteMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_MQ_RESET] = dnodeProcessVnodeWriteMsg; + // msg from client to mnode tsTrans.msgFp[TSDB_MSG_TYPE_CONNECT] = dnodeProcessMnodeMsg; tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_ACCT] = dnodeProcessMnodeMsg; @@ -77,6 +75,7 @@ static void dnodeInitMsgFp() { tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_STABLE] = dnodeProcessMnodeMsg; tsTrans.msgFp[TSDB_MSG_TYPE_DROP_STABLE] = dnodeProcessMnodeMsg; tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_STABLE] = dnodeProcessMnodeMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_STABLE_VGROUP] = dnodeProcessMnodeMsg; tsTrans.msgFp[TSDB_MSG_TYPE_KILL_QUERY] = dnodeProcessMnodeMsg; tsTrans.msgFp[TSDB_MSG_TYPE_KILL_CONN] = dnodeProcessMnodeMsg; tsTrans.msgFp[TSDB_MSG_TYPE_HEARTBEAT] = dnodeProcessMnodeMsg; @@ -85,22 +84,29 @@ static void dnodeInitMsgFp() { tsTrans.msgFp[TSDB_MSG_TYPE_SHOW_RETRIEVE_FUNC] = dnodeProcessMnodeMsg; tsTrans.msgFp[TSDB_MSG_TYPE_COMPACT_VNODE] = dnodeProcessMnodeMsg; - // message from mnode to dnode - tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_STABLE_IN] = dnodeProcessVnodesMsg; + // message from client to dnode + tsTrans.msgFp[TSDB_MSG_TYPE_NETWORK_TEST] = dnodeProcessDnodeMsg; + + // message from mnode to vnode + tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_STABLE_IN] = dnodeProcessVnodeWriteMsg; tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_STABLE_IN_RSP] = dnodeProcessMnodeMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_STABLE_IN] = dnodeProcessVnodesMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_STABLE_IN] = dnodeProcessVnodeWriteMsg; tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_STABLE_IN_RSP] = dnodeProcessMnodeMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_STABLE_IN] = dnodeProcessVnodesMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_DROP_STABLE_IN_RSP] = dnodeProcessMnodeMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_VNODE_IN] = dnodeProcessVnodesMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_DROP_STABLE_IN] = dnodeProcessVnodeWriteMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_DROP_STABLE_IN] = dnodeProcessMnodeMsg; + + // message from mnode to dnode + tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_VNODE_IN] = dnodeProcessVnodeMgmtMsg; tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_VNODE_IN_RSP] = dnodeProcessMnodeMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_VNODE_IN] = dnodeProcessVnodesMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_VNODE_IN] = dnodeProcessVnodeMgmtMsg; tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_VNODE_IN_RSP] = dnodeProcessMnodeMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_DROP_VNODE_IN] = dnodeProcessVnodesMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_DROP_VNODE_IN] = dnodeProcessVnodeMgmtMsg; tsTrans.msgFp[TSDB_MSG_TYPE_DROP_VNODE_IN_RSP] = dnodeProcessMnodeMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_SYNC_VNODE_IN] = dnodeProcessVnodesMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_SYNC_VNODE_IN] = dnodeProcessVnodeMgmtMsg; tsTrans.msgFp[TSDB_MSG_TYPE_SYNC_VNODE_IN_RSP] = dnodeProcessMnodeMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_COMPACT_VNODE_IN] = dnodeProcessVnodesMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_AUTH_VNODE_IN] = dnodeProcessVnodeMgmtMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_AUTH_VNODE_IN_RSP] = dnodeProcessMnodeMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_COMPACT_VNODE_IN] = dnodeProcessVnodeMgmtMsg; tsTrans.msgFp[TSDB_MSG_TYPE_COMPACT_VNODE_IN_RSP] = dnodeProcessMnodeMsg; tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_MNODE_IN] = dnodeProcessMnodeMsg; tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_MNODE_IN_RSP] = dnodeProcessMnodeMsg; diff --git a/source/dnode/mgmt/src/dnodeVnodes.c b/source/dnode/mgmt/src/dnodeVnodes.c index 59f5cf0fd0..c2e143da80 100644 --- a/source/dnode/mgmt/src/dnodeVnodes.c +++ b/source/dnode/mgmt/src/dnodeVnodes.c @@ -14,13 +14,622 @@ */ #define _DEFAULT_SOURCE -#include "dnodeDnode.h" +#include "dnodeVnodes.h" +#include "thash.h" +#include "tqueue.h" +#include "tstep.h" +#include "tthread.h" +#include "tworker.h" #include "vnode.h" -int32_t dnodeInitVnodes() { return vnodeInit(); } +typedef struct { + int32_t vgId; + int32_t refCount; + int8_t dropped; + int8_t accessState; + SVnode *pImpl; + taos_queue pWriteQ; + taos_queue pSyncQ; + taos_queue pApplyQ; + taos_queue pQueryQ; + taos_queue pFetchQ; +} SVnodeObj; -void dnodeCleanupVnodes() { vnodeCleanup(); } +typedef struct { + pthread_t *threadId; + int32_t threadIndex; + int32_t failed; + int32_t opened; + int32_t vnodeNum; + SVnodeObj *pVnodes; +} SVThread; -void dnodeProcessVnodesMsg(SRpcMsg *pMsg, SEpSet *pEpSet) { vnodeProcessMsg(NULL, NULL); } +static struct { + SHashObj *hash; + SWorkerPool mgmtPool; + taos_queue pMgmtQ; + SSteps *pSteps; + int32_t openVnodes; + int32_t totalVnodes; + char file[PATH_MAX + 20]; +} tsVnodes; -void dnodeGetVnodes(SVnodeLoads *pVloads) {} \ No newline at end of file +static int32_t dnodeCreateVnodeWrapper(int32_t vgId, SVnode *pImpl) { + SVnodeObj *pVnode = calloc(1, sizeof(SVnodeObj)); + pVnode->vgId = vgId; + pVnode->refCount = 0; + pVnode->dropped = 0; + pVnode->accessState = TSDB_VN_ALL_ACCCESS; + pVnode->pImpl = pImpl; + pVnode->pWriteQ = NULL; + pVnode->pSyncQ = NULL; + pVnode->pApplyQ = NULL; + pVnode->pQueryQ = NULL; + pVnode->pFetchQ = NULL; + + return taosHashPut(tsVnodes.hash, &vgId, sizeof(int32_t), &pVnode, sizeof(SVnodeObj *)); +} + +static void dnodeDropVnodeWrapper(SVnodeObj *pVnode) { + taosHashRemove(tsVnodes.hash, &pVnode->vgId, sizeof(int32_t)); + + //todo wait all queue empty + pVnode->pWriteQ = NULL; + pVnode->pSyncQ = NULL; + pVnode->pApplyQ = NULL; + pVnode->pQueryQ = NULL; + pVnode->pFetchQ = NULL; +} + +static int32_t dnodeGetVnodesFromHash(SVnodeObj *pVnodes[], int32_t *numOfVnodes) { + void *pIter = taosHashIterate(tsVnodes.hash, NULL); + while (pIter) { + SVnodeObj **ppVnode = pIter; + if (*ppVnode) { + (*numOfVnodes)++; + if (*numOfVnodes >= TSDB_MAX_VNODES) { + dError("vgId:%d, too many open vnodes, exist:%d max:%d", (*ppVnode)->vgId, *numOfVnodes, TSDB_MAX_VNODES); + continue; + } else { + pVnodes[*numOfVnodes - 1] = (*ppVnode); + } + } + + pIter = taosHashIterate(tsVnodes.hash, pIter); + } + + return TSDB_CODE_SUCCESS; +} + +static int32_t dnodeGetVnodesFromFile(SVnodeObj *pVnodes, int32_t *numOfVnodes) { + pVnodes[0].vgId = 2; + pVnodes[0].dropped = 0; + pVnodes[0].vgId = 3; + pVnodes[0].dropped = 0; + return 0; +} + +static int32_t dnodeWriteVnodesToFile() { return 0; } + +static int32_t dnodeCreateVnode(int32_t vgId, SVnodeCfg *pCfg) { + int32_t code = 0; + + char path[PATH_MAX + 20] = {0}; + snprintf(path, sizeof(path),"%s/vnode%d", tsVnodeDir, vgId); + SVnode *pImpl = vnodeCreate(vgId, path, pCfg); + + if (pImpl == NULL) { + code = terrno; + return code; + } + + code = dnodeCreateVnodeWrapper(vgId, pImpl); + if (code != 0) { + vnodeDrop(pImpl); + return code; + } + + code = dnodeWriteVnodesToFile(); + if (code != 0) { + vnodeDrop(pImpl); + return code; + } + + return code; +} + +static int32_t dnodeDropVnode(SVnodeObj *pVnode) { + pVnode->dropped = 1; + + int32_t code = dnodeWriteVnodesToFile(); + if (code != 0) { + pVnode->dropped = 0; + return code; + } + + dnodeDropVnodeWrapper(pVnode); + vnodeDrop(pVnode->pImpl); + dnodeWriteVnodesToFile(); + return 0; +} + +static SVnodeObj *dnodeAcquireVnode(int32_t vgId) { + SVnodeObj *pVnode = NULL; + + taosHashGetClone(tsVnodes.hash, &vgId, sizeof(int32_t), (void *)&pVnode); + if (pVnode == NULL) { + terrno = TSDB_CODE_VND_INVALID_VGROUP_ID; + } + + int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1); + dTrace("vgId:%d, accquire vnode, refCount:%d", pVnode->vgId, refCount); + return pVnode; +} + +static void dnodeReleaseVnode(SVnodeObj *pVnode) { + int32_t refCount = atomic_sub_fetch_32(&pVnode->refCount, 1); + dTrace("vgId:%d, release vnode, refCount:%d", pVnode->vgId, refCount); +} + +static void *dnodeOpenVnodeFunc(void *param) { + SVThread *pThread = param; + + dDebug("thread:%d, start to open %d vnodes", pThread->threadIndex, pThread->vnodeNum); + setThreadName("open-vnodes"); + + for (int32_t v = 0; v < pThread->vnodeNum; ++v) { + SVnodeObj *pVnode = &pThread->pVnodes[v]; + + char stepDesc[TSDB_STEP_DESC_LEN] = {0}; + snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to restore, %d of %d have been opened", pVnode->vgId, + tsVnodes.openVnodes, tsVnodes.totalVnodes); + dnodeReportStartup("open-vnodes", stepDesc); + + char path[PATH_MAX + 20] = {0}; + snprintf(path, sizeof(path),"%s/vnode%d", tsVnodeDir, pVnode->vgId); + SVnode *pImpl = vnodeOpen(pVnode->vgId, path); + if (pImpl == NULL) { + dError("vgId:%d, failed to open vnode by thread:%d", pVnode->vgId, pThread->threadIndex); + pThread->failed++; + } else { + dnodeCreateVnodeWrapper(pVnode->vgId, pImpl); + dDebug("vgId:%d, is opened by thread:%d", pVnode->vgId, pThread->threadIndex); + pThread->opened++; + } + + atomic_add_fetch_32(&tsVnodes.openVnodes, 1); + } + + dDebug("thread:%d, total vnodes:%d, opened:%d failed:%d", pThread->threadIndex, pThread->vnodeNum, pThread->opened, + pThread->failed); + return NULL; +} + +static int32_t dnodeOpenVnodes() { + tsVnodes.hash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); + if (tsVnodes.hash == NULL) { + dError("failed to init vnode hash"); + return TSDB_CODE_VND_OUT_OF_MEMORY; + } + + SVnodeObj pVnodes[TSDB_MAX_VNODES] = {0}; + int32_t numOfVnodes = 0; + int32_t code = dnodeGetVnodesFromFile(pVnodes, &numOfVnodes); + if (code != TSDB_CODE_SUCCESS) { + dInfo("failed to get vnode list from disk since %s", tstrerror(code)); + return code; + } + + tsVnodes.totalVnodes = numOfVnodes; + + int32_t threadNum = tsNumOfCores; + int32_t vnodesPerThread = numOfVnodes / threadNum + 1; + + SVThread *threads = calloc(threadNum, sizeof(SVThread)); + for (int32_t t = 0; t < threadNum; ++t) { + threads[t].threadIndex = t; + threads[t].pVnodes = calloc(vnodesPerThread, sizeof(SVnodeObj)); + } + + for (int32_t v = 0; v < numOfVnodes; ++v) { + int32_t t = v % threadNum; + SVThread *pThread = &threads[t]; + pThread->pVnodes[pThread->vnodeNum++] = pVnodes[v]; + } + + dInfo("start %d threads to open %d vnodes", threadNum, numOfVnodes); + + for (int32_t t = 0; t < threadNum; ++t) { + SVThread *pThread = &threads[t]; + if (pThread->vnodeNum == 0) continue; + + pThread->threadId = taosCreateThread(dnodeOpenVnodeFunc, pThread); + if (pThread->threadId == NULL) { + dError("thread:%d, failed to create thread to open vnode, reason:%s", pThread->threadIndex, strerror(errno)); + } + } + + for (int32_t t = 0; t < threadNum; ++t) { + SVThread *pThread = &threads[t]; + taosDestoryThread(pThread->threadId); + pThread->threadId = NULL; + free(pThread->pVnodes); + } + free(threads); + + if (tsVnodes.openVnodes != tsVnodes.totalVnodes) { + dError("there are total vnodes:%d, opened:%d", tsVnodes.totalVnodes, tsVnodes.openVnodes); + return -1; + } else { + dInfo("total vnodes:%d open successfully", tsVnodes.totalVnodes); + } + + return TSDB_CODE_SUCCESS; +} + +static void dnodeCloseVnodes() { + SVnodeObj *pVnodes[TSDB_MAX_VNODES] = {0}; + int32_t numOfVnodes = 0; + + int32_t code = dnodeGetVnodesFromHash(pVnodes, &numOfVnodes); + if (code != TSDB_CODE_SUCCESS) { + dInfo("failed to get dnode list since code %d", code); + return; + } + + for (int32_t i = 0; i < numOfVnodes; ++i) { + vnodeClose(pVnodes[i]->pImpl); + } + + if (tsVnodes.hash != NULL) { + taosHashCleanup(tsVnodes.hash); + tsVnodes.hash = NULL; + } + + dInfo("total vnodes:%d are all closed", numOfVnodes); +} + +static int32_t dnodeParseCreateVnodeReq(SRpcMsg *rpcMsg, int32_t *vgId, SVnodeCfg *pCfg) { + SCreateVnodeMsg *pCreate = rpcMsg->pCont; + *vgId = htonl(pCreate->vgId); + + tstrncpy(pCfg->db, pCreate->db, TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN); + pCfg->cacheBlockSize = htonl(pCreate->cacheBlockSize); + pCfg->totalBlocks = htonl(pCreate->totalBlocks); + pCfg->daysPerFile = htonl(pCreate->daysPerFile); + pCfg->daysToKeep0 = htonl(pCreate->daysToKeep0); + pCfg->daysToKeep1 = htonl(pCreate->daysToKeep1); + pCfg->daysToKeep2 = htonl(pCreate->daysToKeep2); + pCfg->minRowsPerFileBlock = htonl(pCreate->minRowsPerFileBlock); + pCfg->maxRowsPerFileBlock = htonl(pCreate->maxRowsPerFileBlock); + pCfg->precision = pCreate->precision; + pCfg->compression = pCreate->compression; + pCfg->cacheLastRow = pCreate->cacheLastRow; + pCfg->update = pCreate->update; + pCfg->quorum = pCreate->quorum; + pCfg->replica = pCreate->replica; + pCfg->walLevel = pCreate->walLevel; + pCfg->fsyncPeriod = htonl(pCreate->fsyncPeriod); + + for (int32_t i = 0; i < pCfg->replica; ++i) { + pCfg->replicas[i].port = htons(pCreate->replicas[i].port); + tstrncpy(pCfg->replicas[i].fqdn, pCreate->replicas[i].fqdn, TSDB_FQDN_LEN); + } + + return 0; +} + +static SDropVnodeMsg *vnodeParseDropVnodeReq(SRpcMsg *rpcMsg) { + SDropVnodeMsg *pDrop = rpcMsg->pCont; + pDrop->vgId = htonl(pDrop->vgId); + return pDrop; +} + +static SAuthVnodeMsg *vnodeParseAuthVnodeReq(SRpcMsg *rpcMsg) { + SAuthVnodeMsg *pAuth = rpcMsg->pCont; + pAuth->vgId = htonl(pAuth->vgId); + return pAuth; +} + +static int32_t vnodeProcessCreateVnodeReq(SRpcMsg *rpcMsg) { + SVnodeCfg vnodeCfg = {0}; + int32_t vgId = 0; + + dnodeParseCreateVnodeReq(rpcMsg, &vgId, &vnodeCfg); + dDebug("vgId:%d, create vnode req is received", vgId); + + SVnodeObj *pVnode = dnodeAcquireVnode(vgId); + if (pVnode != NULL) { + dDebug("vgId:%d, already exist, return success", vgId); + dnodeReleaseVnode(pVnode); + return 0; + } + + int32_t code = dnodeCreateVnode(vgId, &vnodeCfg); + if (code != 0) { + dError("vgId:%d, failed to create vnode since %s", vgId, tstrerror(code)); + } + + return code; +} + +static int32_t vnodeProcessAlterVnodeReq(SRpcMsg *rpcMsg) { + SVnodeCfg vnodeCfg = {0}; + int32_t vgId = 0; + int32_t code = 0; + + dnodeParseCreateVnodeReq(rpcMsg, &vgId, &vnodeCfg); + dDebug("vgId:%d, alter vnode req is received", vgId); + + SVnodeObj *pVnode = dnodeAcquireVnode(vgId); + if (pVnode == NULL) { + code = terrno; + dDebug("vgId:%d, failed to alter vnode since %s", vgId, tstrerror(code)); + return code; + } + + code = vnodeAlter(pVnode->pImpl, &vnodeCfg); + if (code != 0) { + dError("vgId:%d, failed to alter vnode since %s", vgId, tstrerror(code)); + } + + dnodeReleaseVnode(pVnode); + return code; +} + +static int32_t vnodeProcessDropVnodeReq(SRpcMsg *rpcMsg) { + SDropVnodeMsg *pDrop = vnodeParseDropVnodeReq(rpcMsg); + + int32_t code = 0; + int32_t vgId = pDrop->vgId; + dDebug("vgId:%d, drop vnode req is received", vgId); + + SVnodeObj *pVnode = dnodeAcquireVnode(vgId); + if (pVnode == NULL) { + code = terrno; + dDebug("vgId:%d, failed to drop since %s", vgId, tstrerror(code)); + return code; + } + + code = vnodeDrop(pVnode->pImpl); + if (code != 0) { + dError("vgId:%d, failed to drop vnode since %s", vgId, tstrerror(code)); + } + + dnodeReleaseVnode(pVnode); + return code; +} + +static int32_t vnodeProcessAuthVnodeReq(SRpcMsg *rpcMsg) { + SAuthVnodeMsg *pAuth = (SAuthVnodeMsg *)vnodeParseAuthVnodeReq(rpcMsg); + + int32_t code = 0; + int32_t vgId = pAuth->vgId; + dDebug("vgId:%d, auth vnode req is received", vgId); + + SVnodeObj *pVnode = dnodeAcquireVnode(vgId); + if (pVnode == NULL) { + code = terrno; + dDebug("vgId:%d, failed to auth since %s", vgId, tstrerror(code)); + return code; + } + + pVnode->accessState = pAuth->accessState; + dnodeReleaseVnode(pVnode); + return code; +} + +static int32_t vnodeProcessSyncVnodeReq(SRpcMsg *rpcMsg) { + SAuthVnodeMsg *pAuth = (SAuthVnodeMsg *)vnodeParseAuthVnodeReq(rpcMsg); + + int32_t code = 0; + int32_t vgId = pAuth->vgId; + dDebug("vgId:%d, auth vnode req is received", vgId); + + SVnodeObj *pVnode = dnodeAcquireVnode(vgId); + if (pVnode == NULL) { + code = terrno; + dDebug("vgId:%d, failed to auth since %s", vgId, tstrerror(code)); + return code; + } + + code = vnodeSync(pVnode->pImpl); + if (code != 0) { + dError("vgId:%d, failed to auth vnode since %s", vgId, tstrerror(code)); + } + + dnodeReleaseVnode(pVnode); + return code; +} + +static int32_t vnodeProcessCompactVnodeReq(SRpcMsg *rpcMsg) { + SCompactVnodeMsg *pCompact = (SCompactVnodeMsg *)vnodeParseDropVnodeReq(rpcMsg); + + int32_t code = 0; + int32_t vgId = pCompact->vgId; + dDebug("vgId:%d, compact vnode req is received", vgId); + + SVnodeObj *pVnode = dnodeAcquireVnode(vgId); + if (pVnode == NULL) { + code = terrno; + dDebug("vgId:%d, failed to compact since %s", vgId, tstrerror(code)); + return code; + } + + code = vnodeCompact(pVnode->pImpl); + if (code != 0) { + dError("vgId:%d, failed to compact vnode since %s", vgId, tstrerror(code)); + } + + dnodeReleaseVnode(pVnode); + return code; +} + +static void dnodeProcessVnodeMgmtReq(SRpcMsg *pMsg, void *unused) { + int32_t code = 0; + + switch (pMsg->msgType) { + case TSDB_MSG_TYPE_CREATE_VNODE_IN: + code = vnodeProcessCreateVnodeReq(pMsg); + break; + case TSDB_MSG_TYPE_ALTER_VNODE_IN: + code = vnodeProcessAlterVnodeReq(pMsg); + break; + case TSDB_MSG_TYPE_DROP_VNODE_IN: + code = vnodeProcessDropVnodeReq(pMsg); + break; + case TSDB_MSG_TYPE_AUTH_VNODE_IN: + code = vnodeProcessAuthVnodeReq(pMsg); + break; + case TSDB_MSG_TYPE_SYNC_VNODE_IN: + code = vnodeProcessSyncVnodeReq(pMsg); + break; + case TSDB_MSG_TYPE_COMPACT_VNODE_IN: + code = vnodeProcessCompactVnodeReq(pMsg); + break; + default: + code = TSDB_CODE_DND_MSG_NOT_PROCESSED; + break; + } + + SRpcMsg rsp = {.code = code, .handle = pMsg->handle}; + rpcSendResponse(&rsp); + rpcFreeCont(pMsg->pCont); + taosFreeQitem(pMsg); +} + +static int32_t dnodeWriteToVnodeQueue(taos_queue pQueue, SRpcMsg *pRpcMsg) { + int32_t code = 0; + + if (pQueue == NULL) { + code = TSDB_CODE_DND_MSG_NOT_PROCESSED; + } else { + SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg)); + if (pMsg == NULL) { + code = TSDB_CODE_DND_OUT_OF_MEMORY; + } else { + *pMsg = *pRpcMsg; + code = taosWriteQitem(pQueue, pMsg); + } + } + + if (code != TSDB_CODE_SUCCESS) { + SRpcMsg rsp = {.handle = pRpcMsg->handle, .code = code}; + rpcSendResponse(&rsp); + rpcFreeCont(pRpcMsg->pCont); + } +} + +static SVnodeObj *dnodeAcquireVnodeFromMsg(SRpcMsg *pMsg) { + SMsgHead *pHead = (SMsgHead *)pMsg->pCont; + pHead->vgId = htonl(pHead->vgId); + + SVnodeObj *pVnode = dnodeAcquireVnode(pHead->vgId); + if (pVnode == NULL) { + SRpcMsg rsp = {.handle = pMsg->handle, .code = terrno}; + rpcSendResponse(&rsp); + rpcFreeCont(pMsg->pCont); + } + + return pVnode; +} + +void dnodeProcessVnodeMgmtMsg(SRpcMsg *pMsg, SEpSet *pEpSet) { dnodeWriteToVnodeQueue(tsVnodes.pMgmtQ, pMsg); } + +void dnodeProcessVnodeWriteMsg(SRpcMsg *pMsg, SEpSet *pEpSet) { + SVnodeObj *pVnode = dnodeAcquireVnodeFromMsg(pMsg); + if (pVnode != NULL) { + dnodeWriteToVnodeQueue(pVnode->pWriteQ, pMsg); + dnodeReleaseVnode(pVnode); + } +} + +void dnodeProcessVnodeSyncMsg(SRpcMsg *pMsg, SEpSet *pEpSet) { + SVnodeObj *pVnode = dnodeAcquireVnodeFromMsg(pMsg); + if (pVnode != NULL) { + dnodeWriteToVnodeQueue(pVnode->pSyncQ, pMsg); + dnodeReleaseVnode(pVnode); + } +} + +void dnodeProcessVnodeQueryMsg(SRpcMsg *pMsg, SEpSet *pEpSet) { + SVnodeObj *pVnode = dnodeAcquireVnodeFromMsg(pMsg); + if (pVnode != NULL) { + dnodeWriteToVnodeQueue(pVnode->pQueryQ, pMsg); + dnodeReleaseVnode(pVnode); + } +} + +void dnodeProcessVnodeFetchMsg(SRpcMsg *pMsg, SEpSet *pEpSet) { + SVnodeObj *pVnode = dnodeAcquireVnodeFromMsg(pMsg); + if (pVnode != NULL) { + dnodeWriteToVnodeQueue(pVnode->pFetchQ, pMsg); + dnodeReleaseVnode(pVnode); + } +} + +static int32_t dnodeInitVnodeMgmtWorker() { + SWorkerPool *pPool = &tsVnodes.mgmtPool; + pPool->name = "vnode-mgmt"; + pPool->min = 1; + pPool->max = 1; + if (tWorkerInit(pPool) != 0) { + return TSDB_CODE_VND_OUT_OF_MEMORY; + } + + tsVnodes.pMgmtQ = tWorkerAllocQueue(pPool, NULL, (FProcessItem)dnodeProcessVnodeMgmtReq); + if (tsVnodes.pMgmtQ == NULL) { + return TSDB_CODE_VND_OUT_OF_MEMORY; + } + + return 0; +} + +static void dnodeCleanupVnodeMgmtWorker() { + tWorkerFreeQueue(&tsVnodes.mgmtPool, tsVnodes.pMgmtQ); + tWorkerCleanup(&tsVnodes.mgmtPool); + tsVnodes.pMgmtQ = NULL; +} + +int32_t dnodeInitVnodes() { + dInfo("dnode-vnodes start to init"); + + SSteps *pSteps = taosStepInit(3, dnodeReportStartup); + taosStepAdd(pSteps, "dnode-vnode-env", vnodeInit, vnodeCleanup); + taosStepAdd(pSteps, "dnode-vnode-mgmt", dnodeInitVnodeMgmtWorker, dnodeCleanupVnodeMgmtWorker); + taosStepAdd(pSteps, "dnode-vnodes", dnodeOpenVnodes, dnodeCleanupVnodes); + + tsVnodes.pSteps = pSteps; + return taosStepExec(pSteps); +} + +void dnodeCleanupVnodes() { + if (tsVnodes.pSteps != NULL) { + dInfo("dnode-vnodes start to clean up"); + taosStepCleanup(tsVnodes.pSteps); + tsVnodes.pSteps = NULL; + dInfo("dnode-vnodes is cleaned up"); + } +} + +void dnodeGetVnodes(SVnodeLoads *pLoads) { + pLoads->vnodeNum = taosHashGetSize(tsVnodes.hash); + + int32_t v = 0; + void *pIter = taosHashIterate(tsVnodes.hash, NULL); + while (pIter) { + SVnodeObj **ppVnode = pIter; + if (ppVnode == NULL) continue; + SVnodeObj *pVnode = *ppVnode; + if (pVnode) { + SVnodeLoad *pLoad = &pLoads->vnodeLoads[v++]; + vnodeGetLoad(pVnode->pImpl, pLoad); + pLoad->vgId = htonl(pLoad->vgId); + pLoad->totalStorage = htobe64(pLoad->totalStorage); + pLoad->compStorage = htobe64(pLoad->compStorage); + pLoad->pointsWritten = htobe64(pLoad->pointsWritten); + pLoad->tablesNum = htobe64(pLoad->tablesNum); + } + pIter = taosHashIterate(tsVnodes.hash, pIter); + } +} \ No newline at end of file diff --git a/source/dnode/vnode/impl/src/vnodeInt.c b/source/dnode/vnode/impl/src/vnodeInt.c index 6f83542cef..1df3d37749 100644 --- a/source/dnode/vnode/impl/src/vnodeInt.c +++ b/source/dnode/vnode/impl/src/vnodeInt.c @@ -19,9 +19,6 @@ int32_t vnodeInit() { return 0; } void vnodeCleanup() {} -int32_t vnodeGetStatistics(SVnode *pVnode, SVnodeStatisic *pStat) { return 0; } -int32_t vnodeGetStatus(SVnode *pVnode, SVnodeStatus *pStatus) { return 0; } - SVnode *vnodeOpen(int32_t vgId, const char *path) { return NULL; } void vnodeClose(SVnode *pVnode) {} int32_t vnodeAlter(SVnode *pVnode, const SVnodeCfg *pCfg) { return 0; } @@ -31,3 +28,4 @@ int32_t vnodeCompact(SVnode *pVnode) { return 0; } int32_t vnodeSync(SVnode *pVnode) { return 0; } void vnodeProcessMsg(SVnode *pVnode, SVnodeMsg *pMsg) {} +void vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) {}