diff --git a/include/common/taosmsg.h b/include/common/taosmsg.h index e798a0c42a..2ccab41f8f 100644 --- a/include/common/taosmsg.h +++ b/include/common/taosmsg.h @@ -36,7 +36,7 @@ enum { TSDB_MESSAGE_NULL = 0, #endif -// message from client to dnode +// message from client to vnode TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_SUBMIT, "submit" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_QUERY, "query" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_FETCH, "fetch" ) @@ -46,25 +46,12 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_ALTER_TABLE, "alter-table" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_UPDATE_TAG_VAL, "update-tag-val" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_TABLE_META, "table-meta" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_TABLES_META, "tables-meta" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_STABLE_VGROUP, "stable-vgroup" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_CONSUME, "mq-consume" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_QUERY, "mq-query" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_CONNECT, "mq-connect" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_DISCONNECT, "mq-disconnect" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_ACK, "mq-ack" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_RESET, "mq-reset" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_NETWORK_TEST, "nettest" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY0, "dummy0" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY1, "dummy1" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY2, "dummy2" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY3, "dummy3" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY4, "dummy4" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY5, "dummy5" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY6, "dummy6" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY7, "dummy7" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY8, "dummy8" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY9, "dummy9" ) - // message from client to mnode TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CONNECT, "connect" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_ACCT, "create-acct" ) @@ -88,6 +75,7 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_FUNCTION, "create-function" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_ALTER_FUNCTION, "alter-function" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_FUNCTION, "drop-function" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_STABLE, "create-stable" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_STABLE_VGROUP, "stable-vgroup" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_STABLE, "drop-stable" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_ALTER_STABLE, "alter-stable" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_KILL_QUERY, "kill-query" ) @@ -97,54 +85,55 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_SHOW, "show" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_SHOW_RETRIEVE, "retrieve" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_SHOW_RETRIEVE_FUNC, "retrieve-func" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_COMPACT_VNODE, "compact-vnode" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY10, "dummy10" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY11, "dummy11" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY12, "dummy12" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY13, "dummy13" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY14, "dummy14" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY15, "dummy15" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY16, "dummy16" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY17, "dummy17" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY18, "dummy18" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY19, "dummy19" ) +// message from client to qnode +// message from client to dnode +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_NETWORK_TEST, "nettest" ) -// message from mnode to dnode +// message from vnode to vnode +// message from vnode to mnode +// message from vnode to qnode +// message from vnode to dnode + +// message from mnode to vnode TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_STABLE_IN, "create-stable" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_ALTER_STABLE_IN, "alter-stable" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_STABLE_IN, "drop-stable" ) +// message from mnode to mnode +// message from mnode to qnode +// message from mnode to dnode TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_VNODE_IN, "create-vnode" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_ALTER_VNODE_IN, "alter-vnode" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_VNODE_IN, "drop-vnode" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_AUTH_VNODE_IN, "auth-vnode" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_SYNC_VNODE_IN, "sync-vnode" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_COMPACT_VNODE_IN, "compact-vnode" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_MNODE_IN, "create-mnode" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_MNODE_IN, "drop-mnode" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CONFIG_DNODE_IN, "config-dnode" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY20, "dummy20" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY21, "dummy21" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY22, "dummy22" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY23, "dummy23" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY24, "dummy24" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY25, "dummy25" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY26, "dummy26" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY27, "dummy27" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY28, "dummy28" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY29, "dummy29" ) +// message from qnode to vnode +// message from qnode to mnode +// message from qnode to qnode +// message from qnode to dnode + +// message from dnode to vnode // message from dnode to mnode TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_STATUS, "status" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_GRANT, "grant" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_AUTH, "auth" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY30, "dummy30" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY31, "dummy31" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY32, "dummy32" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY33, "dummy33" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY34, "dummy34" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY35, "dummy35" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY36, "dummy36" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY37, "dummy37" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY38, "dummy38" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY39, "dummy39" ) +// message from dnode to qnode +// message from dnode to dnode + +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY0, "dummy0" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY1, "dummy1" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY2, "dummy2" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY3, "dummy3" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY4, "dummy4" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY5, "dummy5" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY6, "dummy6" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY7, "dummy7" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY8, "dummy8" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY9, "dummy9" ) #ifndef TAOS_MESSAGE_C TSDB_MSG_TYPE_MAX // 147 @@ -336,7 +325,7 @@ typedef struct { typedef struct { char tableFname[TSDB_TABLE_FNAME_LEN]; - char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN]; + char db[TSDB_FULL_DB_NAME_LEN]; int16_t type; /* operation type */ int16_t numOfCols; /* number of schema */ int32_t tagValLen; @@ -428,10 +417,6 @@ typedef struct { char tableFname[TSDB_TABLE_FNAME_LEN]; } SDropSTableMsg; -typedef struct { - int32_t vgId; -} SDropVnodeMsg, SSyncVnodeMsg, SCompactVnodeMsg; - typedef struct SColIndex { int16_t colId; // column id int16_t colIndex; // column index in colList if it is a normal column or index in tagColList if a tag @@ -585,7 +570,7 @@ typedef struct SRetrieveTableRsp { } SRetrieveTableRsp; typedef struct { - char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN]; + char db[TSDB_FULL_DB_NAME_LEN]; int32_t cacheBlockSize; //MB int32_t totalBlocks; int32_t maxTables; @@ -661,9 +646,8 @@ typedef struct { typedef struct { int32_t vgId; - int8_t status; int8_t role; - int8_t reserved[2]; + int8_t reserved[3]; int64_t totalStorage; int64_t compStorage; int64_t pointsWritten; @@ -671,8 +655,8 @@ typedef struct { } SVnodeLoad; typedef struct { - int32_t vnodeNum; - SVnodeLoad vnodeLoads[]; + int32_t num; + SVnodeLoad data[]; } SVnodeLoads; typedef struct SStatusMsg { @@ -717,7 +701,7 @@ typedef struct { } SVnodeDesc; typedef struct { - char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN]; + char db[TSDB_FULL_DB_NAME_LEN]; uint32_t vgId; int32_t cacheBlockSize; int32_t totalBlocks; @@ -737,9 +721,18 @@ typedef struct { int8_t replica; int8_t quorum; int8_t selfIndex; - SVnodeDesc nodes[TSDB_MAX_REPLICA]; + SVnodeDesc replicas[TSDB_MAX_REPLICA]; } SCreateVnodeMsg, SAlterVnodeMsg; +typedef struct { + int32_t vgId; +} SDropVnodeMsg, SSyncVnodeMsg, SCompactVnodeMsg; + +typedef struct { + int32_t vgId; + int8_t accessState; +} SAuthVnodeMsg; + typedef struct { char tableFname[TSDB_TABLE_FNAME_LEN]; int16_t createFlag; @@ -811,13 +804,13 @@ typedef struct { */ typedef struct { int8_t type; - char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN]; + char db[TSDB_FULL_DB_NAME_LEN]; uint16_t payloadLen; char payload[]; } SShowMsg; typedef struct { - char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN]; + char db[TSDB_FULL_DB_NAME_LEN]; int32_t numOfVgroup; int32_t vgid[]; } SCompactMsg; @@ -1002,6 +995,7 @@ typedef struct { /* data */ } SUpdateTagValRsp; + #pragma pack(pop) #ifdef __cplusplus diff --git a/include/server/vnode/vnode.h b/include/server/vnode/vnode.h index c4f8df79c1..e570cf4261 100644 --- a/include/server/vnode/vnode.h +++ b/include/server/vnode/vnode.h @@ -27,7 +27,7 @@ extern "C" { typedef struct SVnode SVnode; typedef struct { - char dbName[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN]; + char db[TSDB_FULL_DB_NAME_LEN]; int32_t cacheBlockSize; // MB int32_t totalBlocks; int32_t daysPerFile; @@ -47,40 +47,43 @@ typedef struct { SVnodeDesc replicas[TSDB_MAX_REPLICA]; } SVnodeCfg; -typedef struct { - int64_t totalStorage; - int64_t compStorage; - int64_t pointsWritten; - int64_t tablesNum; -} SVnodeStatisic; +typedef enum { + VN_MSG_TYPE_WRITE = 1, + VN_MSG_TYPE_APPLY, + VN_MSG_TYPE_SYNC, + VN_MSG_TYPE_QUERY, + VN_MSG_TYPE_FETCH +} EVnMsgType; typedef struct { - int8_t syncRole; -} SVnodeStatus; - -typedef struct SVnodeMsg { - int32_t msgType; - int32_t code; - SRpcMsg rpcMsg; // original message from rpc - int32_t contLen; - char pCont[]; + int32_t curNum; + int32_t allocNum; + SRpcMsg rpcMsg[]; } SVnodeMsg; -int32_t vnodeInit(); -void vnodeCleanup(); +typedef struct { + void (*SendMsgToDnode)(SEpSet *pEpSet, SRpcMsg *pMsg); + void (*SendMsgToMnode)(SRpcMsg *pMsg); + int32_t (*PutMsgIntoApplyQueue)(int32_t vgId, SVnodeMsg *pMsg); +} SVnodePara; -int32_t vnodeGetStatistics(SVnode *pVnode, SVnodeStatisic *pStat); -int32_t vnodeGetStatus(SVnode *pVnode, SVnodeStatus *pStatus); +int32_t vnodeInit(SVnodePara); +void vnodeCleanup(); SVnode *vnodeOpen(int32_t vgId, const char *path); void vnodeClose(SVnode *pVnode); int32_t vnodeAlter(SVnode *pVnode, const SVnodeCfg *pCfg); SVnode *vnodeCreate(int32_t vgId, const char *path, const SVnodeCfg *pCfg); -int32_t vnodeDrop(SVnode *pVnode); +void vnodeDrop(SVnode *pVnode); int32_t vnodeCompact(SVnode *pVnode); int32_t vnodeSync(SVnode *pVnode); -void vnodeProcessMsg(SVnode *pVnode, SVnodeMsg *pMsg); +int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad); + +SVnodeMsg *vnodeInitMsg(int32_t msgNum); +int32_t vnodeAppendMsg(SVnodeMsg *pMsg, SRpcMsg *pRpcMsg); +void vnodeCleanupMsg(SVnodeMsg *pMsg); +void vnodeProcessMsg(SVnode *pVnode, SVnodeMsg *pMsg, EVnMsgType msgType); #ifdef __cplusplus } diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 0579ae46bc..e2f9a58023 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -223,6 +223,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_DND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0406) //"Action in progress") #define TSDB_CODE_DND_TOO_MANY_VNODES TAOS_DEF_ERROR_CODE(0, 0x0407) //"Too many vnode directories") #define TSDB_CODE_DND_EXITING TAOS_DEF_ERROR_CODE(0, 0x0408) //"Dnode is exiting" +#define TSDB_CODE_DND_PARSE_VNODE_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0409) //"Parse vnodes.json error") // vnode #define TSDB_CODE_VND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0500) //"Action in progress") diff --git a/include/util/tdef.h b/include/util/tdef.h index 80cd3cf8b8..0ad0f68f3f 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -153,11 +153,12 @@ do { \ #define TSDB_NODE_NAME_LEN 64 #define TSDB_TABLE_NAME_LEN 193 // it is a null-terminated string #define TSDB_DB_NAME_LEN 33 +#define TSDB_FULL_DB_NAME_LEN (TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN) #define TSDB_FUNC_NAME_LEN 65 #define TSDB_FUNC_CODE_LEN (65535 - 512) #define TSDB_FUNC_BUF_SIZE 512 #define TSDB_TYPE_STR_MAX_LEN 32 -#define TSDB_TABLE_FNAME_LEN (TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN + TSDB_TABLE_NAME_LEN) +#define TSDB_TABLE_FNAME_LEN (TSDB_FULL_DB_NAME_LEN + TSDB_TABLE_NAME_LEN) #define TSDB_COL_NAME_LEN 65 #define TSDB_MAX_SAVED_SQL_LEN TSDB_MAX_COLUMNS * 64 #define TSDB_MAX_SQL_LEN TSDB_PAYLOAD_SIZE @@ -211,7 +212,7 @@ do { \ #define TSDB_EXTRA_PAYLOAD_SIZE 128 // extra bytes for auth #define TSDB_CQ_SQL_SIZE 1024 #define TSDB_MIN_VNODES 64 -#define TSDB_MAX_VNODES 2048 +#define TSDB_MAX_VNODES 512 #define TSDB_MIN_VNODES_PER_DB 2 #define TSDB_MAX_VNODES_PER_DB 64 diff --git a/include/util/tqueue.h b/include/util/tqueue.h index faac1afe70..bcb9aea856 100644 --- a/include/util/tqueue.h +++ b/include/util/tqueue.h @@ -40,8 +40,8 @@ shall be used to set up the protection. typedef void *taos_queue; typedef void *taos_qset; typedef void *taos_qall; -typedef void *(*FProcessItem)(void *pItem, void *ahandle); -typedef void *(*FProcessItems)(taos_qall qall, int numOfItems, void *ahandle); +typedef void (*FProcessItem)(void *ahandle, void *pItem); +typedef void (*FProcessItems)(void *ahandle, taos_qall qall, int numOfItems); taos_queue taosOpenQueue(); void taosCloseQueue(taos_queue); @@ -50,6 +50,7 @@ void *taosAllocateQitem(int size); void taosFreeQitem(void *pItem); int taosWriteQitem(taos_queue, void *pItem); int taosReadQitem(taos_queue, void **pItem); +bool taosQueueEmpty(taos_queue); taos_qall taosAllocateQall(); void taosFreeQall(taos_qall); diff --git a/source/common/src/tname.c b/source/common/src/tname.c index c290a04ebc..28f920a6a9 100644 --- a/source/common/src/tname.c +++ b/source/common/src/tname.c @@ -201,7 +201,7 @@ int32_t tNameExtractFullName(const SName* name, char* dst) { return -1; } - int32_t len = snprintf(dst, TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN, "%s.%s", name->acctId, name->dbname); + int32_t len = snprintf(dst, TSDB_FULL_DB_NAME_LEN, "%s.%s", name->acctId, name->dbname); size_t tnameLen = strlen(name->tname); if (tnameLen > 0) { diff --git a/source/dnode/mgmt/inc/dnodeInt.h b/source/dnode/mgmt/inc/dnodeInt.h index 472c467ad6..906455dce4 100644 --- a/source/dnode/mgmt/inc/dnodeInt.h +++ b/source/dnode/mgmt/inc/dnodeInt.h @@ -42,7 +42,10 @@ void dnodeCleanup(); EDnStat dnodeGetRunStat(); void dnodeSetRunStat(); -void dnodeGetStartup(SStartupMsg *); + +void dnodeReportStartup(char *name, char *desc); +void dnodeReportStartupFinished(char *name, char *desc); +void dnodeGetStartup(SStartupMsg *); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/inc/dnodeVnodes.h b/source/dnode/mgmt/inc/dnodeVnodes.h index 6e9bce9ae5..31eae049ab 100644 --- a/source/dnode/mgmt/inc/dnodeVnodes.h +++ b/source/dnode/mgmt/inc/dnodeVnodes.h @@ -23,8 +23,13 @@ extern "C" { int32_t dnodeInitVnodes(); void dnodeCleanupVnodes(); -void dnodeProcessVnodesMsg(SRpcMsg *pMsg, SEpSet *pEpSet); -void dnodeGetVnodes(SVnodeLoads *pVloads); +void dnodeGetVnodeLoads(SVnodeLoads *pVloads); + +void dnodeProcessVnodeMgmtMsg(SRpcMsg *pMsg, SEpSet *pEpSet); +void dnodeProcessVnodeWriteMsg(SRpcMsg *pMsg, SEpSet *pEpSet); +void dnodeProcessVnodeSyncMsg(SRpcMsg *pMsg, SEpSet *pEpSet); +void dnodeProcessVnodeQueryMsg(SRpcMsg *pMsg, SEpSet *pEpSet); +void dnodeProcessVnodeFetchMsg(SRpcMsg *pMsg, SEpSet *pEpSet); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/src/dnodeDnode.c b/source/dnode/mgmt/src/dnodeDnode.c index 5bf5b1d56a..63de2b940d 100644 --- a/source/dnode/mgmt/src/dnodeDnode.c +++ b/source/dnode/mgmt/src/dnodeDnode.c @@ -372,8 +372,8 @@ static void dnodeSendStatusMsg() { char timestr[32] = "1970-01-01 00:00:00.00"; (void)taosParseTime(timestr, &pStatus->clusterCfg.checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0); - dnodeGetVnodes(&pStatus->vnodeLoads); - contLen = sizeof(SStatusMsg) + pStatus->vnodeLoads.vnodeNum * sizeof(SVnodeLoad); + dnodeGetVnodeLoads(&pStatus->vnodeLoads); + contLen = sizeof(SStatusMsg) + pStatus->vnodeLoads.num * sizeof(SVnodeLoad); SRpcMsg rpcMsg = {.pCont = pStatus, .contLen = contLen, .msgType = TSDB_MSG_TYPE_STATUS}; dnodeSendMsgToMnode(&rpcMsg); diff --git a/source/dnode/mgmt/src/dnodeInt.c b/source/dnode/mgmt/src/dnodeInt.c index e853ba7f14..e7018f4265 100644 --- a/source/dnode/mgmt/src/dnodeInt.c +++ b/source/dnode/mgmt/src/dnodeInt.c @@ -35,14 +35,14 @@ EDnStat dnodeGetRunStat() { return tsInt.runStat; } void dnodeSetRunStat(EDnStat stat) { tsInt.runStat = stat; } -static void dnodeReportStartup(char *name, char *desc) { +void dnodeReportStartup(char *name, char *desc) { SStartupMsg *pStartup = &tsInt.startup; tstrncpy(pStartup->name, name, strlen(pStartup->name)); tstrncpy(pStartup->desc, desc, strlen(pStartup->desc)); pStartup->finished = 0; } -static void dnodeReportStartupFinished(char *name, char *desc) { +void dnodeReportStartupFinished(char *name, char *desc) { SStartupMsg *pStartup = &tsInt.startup; tstrncpy(pStartup->name, name, strlen(pStartup->name)); tstrncpy(pStartup->desc, desc, strlen(pStartup->desc)); diff --git a/source/dnode/mgmt/src/dnodeTransport.c b/source/dnode/mgmt/src/dnodeTransport.c index 18e97e2b9f..265efe3070 100644 --- a/source/dnode/mgmt/src/dnodeTransport.c +++ b/source/dnode/mgmt/src/dnodeTransport.c @@ -34,24 +34,22 @@ static struct { static void dnodeInitMsgFp() { // msg from client to dnode - tsTrans.msgFp[TSDB_MSG_TYPE_SUBMIT] = dnodeProcessVnodesMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_QUERY] = dnodeProcessVnodesMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_FETCH] = dnodeProcessVnodesMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_TABLE] = dnodeProcessVnodesMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_DROP_TABLE] = dnodeProcessVnodesMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_TABLE] = dnodeProcessVnodesMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = dnodeProcessVnodesMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_TABLE_META] = dnodeProcessVnodesMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_TABLES_META] = dnodeProcessVnodesMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_STABLE_VGROUP] = dnodeProcessMnodeMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_MQ_QUERY] = dnodeProcessVnodesMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_MQ_CONSUME] = dnodeProcessVnodesMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_MQ_CONNECT] = dnodeProcessVnodesMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_MQ_DISCONNECT] = dnodeProcessVnodesMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_MQ_ACK] = dnodeProcessVnodesMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_MQ_RESET] = dnodeProcessVnodesMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_NETWORK_TEST] = dnodeProcessDnodeMsg; - + tsTrans.msgFp[TSDB_MSG_TYPE_SUBMIT] = dnodeProcessVnodeWriteMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_QUERY] = dnodeProcessVnodeQueryMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_FETCH] = dnodeProcessVnodeFetchMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_TABLE] = dnodeProcessVnodeWriteMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_DROP_TABLE] = dnodeProcessVnodeWriteMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_TABLE] = dnodeProcessVnodeWriteMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = dnodeProcessVnodeWriteMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_TABLE_META] = dnodeProcessVnodeQueryMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_TABLES_META] = dnodeProcessVnodeQueryMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_MQ_QUERY] = dnodeProcessVnodeQueryMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_MQ_CONSUME] = dnodeProcessVnodeQueryMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_MQ_CONNECT] = dnodeProcessVnodeWriteMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_MQ_DISCONNECT] = dnodeProcessVnodeWriteMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_MQ_ACK] = dnodeProcessVnodeWriteMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_MQ_RESET] = dnodeProcessVnodeWriteMsg; + // msg from client to mnode tsTrans.msgFp[TSDB_MSG_TYPE_CONNECT] = dnodeProcessMnodeMsg; tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_ACCT] = dnodeProcessMnodeMsg; @@ -77,6 +75,7 @@ static void dnodeInitMsgFp() { tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_STABLE] = dnodeProcessMnodeMsg; tsTrans.msgFp[TSDB_MSG_TYPE_DROP_STABLE] = dnodeProcessMnodeMsg; tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_STABLE] = dnodeProcessMnodeMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_STABLE_VGROUP] = dnodeProcessMnodeMsg; tsTrans.msgFp[TSDB_MSG_TYPE_KILL_QUERY] = dnodeProcessMnodeMsg; tsTrans.msgFp[TSDB_MSG_TYPE_KILL_CONN] = dnodeProcessMnodeMsg; tsTrans.msgFp[TSDB_MSG_TYPE_HEARTBEAT] = dnodeProcessMnodeMsg; @@ -85,22 +84,29 @@ static void dnodeInitMsgFp() { tsTrans.msgFp[TSDB_MSG_TYPE_SHOW_RETRIEVE_FUNC] = dnodeProcessMnodeMsg; tsTrans.msgFp[TSDB_MSG_TYPE_COMPACT_VNODE] = dnodeProcessMnodeMsg; - // message from mnode to dnode - tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_STABLE_IN] = dnodeProcessVnodesMsg; + // message from client to dnode + tsTrans.msgFp[TSDB_MSG_TYPE_NETWORK_TEST] = dnodeProcessDnodeMsg; + + // message from mnode to vnode + tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_STABLE_IN] = dnodeProcessVnodeWriteMsg; tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_STABLE_IN_RSP] = dnodeProcessMnodeMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_STABLE_IN] = dnodeProcessVnodesMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_STABLE_IN] = dnodeProcessVnodeWriteMsg; tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_STABLE_IN_RSP] = dnodeProcessMnodeMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_STABLE_IN] = dnodeProcessVnodesMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_DROP_STABLE_IN_RSP] = dnodeProcessMnodeMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_VNODE_IN] = dnodeProcessVnodesMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_DROP_STABLE_IN] = dnodeProcessVnodeWriteMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_DROP_STABLE_IN] = dnodeProcessMnodeMsg; + + // message from mnode to dnode + tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_VNODE_IN] = dnodeProcessVnodeMgmtMsg; tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_VNODE_IN_RSP] = dnodeProcessMnodeMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_VNODE_IN] = dnodeProcessVnodesMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_VNODE_IN] = dnodeProcessVnodeMgmtMsg; tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_VNODE_IN_RSP] = dnodeProcessMnodeMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_DROP_VNODE_IN] = dnodeProcessVnodesMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_DROP_VNODE_IN] = dnodeProcessVnodeMgmtMsg; tsTrans.msgFp[TSDB_MSG_TYPE_DROP_VNODE_IN_RSP] = dnodeProcessMnodeMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_SYNC_VNODE_IN] = dnodeProcessVnodesMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_SYNC_VNODE_IN] = dnodeProcessVnodeMgmtMsg; tsTrans.msgFp[TSDB_MSG_TYPE_SYNC_VNODE_IN_RSP] = dnodeProcessMnodeMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_COMPACT_VNODE_IN] = dnodeProcessVnodesMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_AUTH_VNODE_IN] = dnodeProcessVnodeMgmtMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_AUTH_VNODE_IN_RSP] = dnodeProcessMnodeMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_COMPACT_VNODE_IN] = dnodeProcessVnodeMgmtMsg; tsTrans.msgFp[TSDB_MSG_TYPE_COMPACT_VNODE_IN_RSP] = dnodeProcessMnodeMsg; tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_MNODE_IN] = dnodeProcessMnodeMsg; tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_MNODE_IN_RSP] = dnodeProcessMnodeMsg; diff --git a/source/dnode/mgmt/src/dnodeVnodes.c b/source/dnode/mgmt/src/dnodeVnodes.c index 59f5cf0fd0..8bf80ccff8 100644 --- a/source/dnode/mgmt/src/dnodeVnodes.c +++ b/source/dnode/mgmt/src/dnodeVnodes.c @@ -14,13 +14,1016 @@ */ #define _DEFAULT_SOURCE -#include "dnodeDnode.h" +#include "dnodeVnodes.h" +#include "dnodeTransport.h" +#include "cJSON.h" +#include "thash.h" +#include "tlockfree.h" +#include "tqueue.h" +#include "tstep.h" +#include "tthread.h" +#include "tworker.h" #include "vnode.h" -int32_t dnodeInitVnodes() { return vnodeInit(); } +typedef struct { + int32_t vgId; + int32_t refCount; + int8_t dropped; + int8_t accessState; + SVnode *pImpl; + taos_queue pWriteQ; + taos_queue pSyncQ; + taos_queue pApplyQ; + taos_queue pQueryQ; + taos_queue pFetchQ; +} SVnodeObj; -void dnodeCleanupVnodes() { vnodeCleanup(); } +typedef struct { + pthread_t *threadId; + int32_t threadIndex; + int32_t failed; + int32_t opened; + int32_t vnodeNum; + SVnodeObj *pVnodes; +} SVThread; -void dnodeProcessVnodesMsg(SRpcMsg *pMsg, SEpSet *pEpSet) { vnodeProcessMsg(NULL, NULL); } +static struct { + SHashObj *hash; + SWorkerPool mgmtPool; + SWorkerPool queryPool; + SWorkerPool fetchPool; + SMWorkerPool syncPool; + SMWorkerPool writePool; + taos_queue pMgmtQ; + SSteps *pSteps; + int32_t openVnodes; + int32_t totalVnodes; + SRWLatch latch; +} tsVnodes; -void dnodeGetVnodes(SVnodeLoads *pVloads) {} \ No newline at end of file +static int32_t dnodeAllocVnodeQueryQueue(SVnodeObj *pVnode); +static void dnodeFreeVnodeQueryQueue(SVnodeObj *pVnode); +static int32_t dnodeAllocVnodeFetchQueue(SVnodeObj *pVnode); +static void dnodeFreeVnodeFetchQueue(SVnodeObj *pVnode); +static int32_t dnodeAllocVnodeWriteQueue(SVnodeObj *pVnode); +static void dnodeFreeVnodeWriteQueue(SVnodeObj *pVnode); +static int32_t dnodeAllocVnodeApplyQueue(SVnodeObj *pVnode); +static void dnodeFreeVnodeApplyQueue(SVnodeObj *pVnode); +static int32_t dnodeAllocVnodeSyncQueue(SVnodeObj *pVnode); +static void dnodeFreeVnodeSyncQueue(SVnodeObj *pVnode); + +static SVnodeObj *dnodeAcquireVnode(int32_t vgId) { + SVnodeObj *pVnode = NULL; + int32_t refCount = 0; + + taosRLockLatch(&tsVnodes.latch); + taosHashGetClone(tsVnodes.hash, &vgId, sizeof(int32_t), (void *)&pVnode); + if (pVnode == NULL) { + terrno = TSDB_CODE_VND_INVALID_VGROUP_ID; + } else { + refCount = atomic_add_fetch_32(&pVnode->refCount, 1); + } + taosRUnLockLatch(&tsVnodes.latch); + + dTrace("vgId:%d, accquire vnode, refCount:%d", pVnode->vgId, refCount); + return pVnode; +} + +static void dnodeReleaseVnode(SVnodeObj *pVnode) { + int32_t refCount = atomic_sub_fetch_32(&pVnode->refCount, 1); + dTrace("vgId:%d, release vnode, refCount:%d", pVnode->vgId, refCount); +} + +static int32_t dnodeCreateVnodeWrapper(int32_t vgId, SVnode *pImpl) { + SVnodeObj *pVnode = calloc(1, sizeof(SVnodeObj)); + if (pVnode == NULL) { + return TSDB_CODE_DND_OUT_OF_MEMORY; + } + + pVnode->vgId = vgId; + pVnode->refCount = 0; + pVnode->dropped = 0; + pVnode->accessState = TSDB_VN_ALL_ACCCESS; + pVnode->pImpl = pImpl; + + int32_t code = dnodeAllocVnodeQueryQueue(pVnode); + if (code != 0) { + return code; + } + + code = dnodeAllocVnodeFetchQueue(pVnode); + if (code != 0) { + return code; + } + + code = dnodeAllocVnodeWriteQueue(pVnode); + if (code != 0) { + return code; + } + + code = dnodeAllocVnodeApplyQueue(pVnode); + if (code != 0) { + return code; + } + + code = dnodeAllocVnodeSyncQueue(pVnode); + if (code != 0) { + return code; + } + + taosWLockLatch(&tsVnodes.latch); + code = taosHashPut(tsVnodes.hash, &vgId, sizeof(int32_t), &pVnode, sizeof(SVnodeObj *)); + taosWUnLockLatch(&tsVnodes.latch); + + return code; +} + +static void dnodeDropVnodeWrapper(SVnodeObj *pVnode) { + taosWLockLatch(&tsVnodes.latch); + taosHashRemove(tsVnodes.hash, &pVnode->vgId, sizeof(int32_t)); + taosWUnLockLatch(&tsVnodes.latch); + + // wait all queue empty + dnodeReleaseVnode(pVnode); + while (pVnode->refCount > 0) taosMsleep(10); + while (!taosQueueEmpty(pVnode->pWriteQ)) taosMsleep(10); + while (!taosQueueEmpty(pVnode->pSyncQ)) taosMsleep(10); + while (!taosQueueEmpty(pVnode->pApplyQ)) taosMsleep(10); + while (!taosQueueEmpty(pVnode->pQueryQ)) taosMsleep(10); + while (!taosQueueEmpty(pVnode->pFetchQ)) taosMsleep(10); + + dnodeFreeVnodeQueryQueue(pVnode); + dnodeFreeVnodeFetchQueue(pVnode); + dnodeFreeVnodeWriteQueue(pVnode); + dnodeFreeVnodeApplyQueue(pVnode); + dnodeFreeVnodeSyncQueue(pVnode); +} + +static SVnodeObj **dnodeGetVnodesFromHash(int32_t *numOfVnodes) { + taosRLockLatch(&tsVnodes.latch); + + int32_t num = 0; + int32_t size = taosHashGetSize(tsVnodes.hash); + SVnodeObj **pVnodes = calloc(size, sizeof(SVnodeObj *)); + + void *pIter = taosHashIterate(tsVnodes.hash, NULL); + while (pIter) { + SVnodeObj **ppVnode = pIter; + SVnodeObj *pVnode = *ppVnode; + if (pVnode) { + num++; + if (num < size) { + int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1); + dTrace("vgId:%d, accquire vnode, refCount:%d", pVnode->vgId, refCount); + pVnodes[num] = (*ppVnode); + } + } + pIter = taosHashIterate(tsVnodes.hash, pIter); + } + + taosRUnLockLatch(&tsVnodes.latch); + *numOfVnodes = num; + + return pVnodes; +} + +static int32_t dnodeGetVnodesFromFile(SVnodeObj **ppVnodes, int32_t *numOfVnodes) { + int32_t code = TSDB_CODE_DND_PARSE_VNODE_FILE_ERROR; + int32_t len = 0; + int32_t maxLen = 30000; + char *content = calloc(1, maxLen + 1); + cJSON *root = NULL; + FILE *fp = NULL; + char file[PATH_MAX + 20] = {0}; + SVnodeObj *pVnodes = NULL; + + snprintf(file, PATH_MAX + 20, "%s/vnodes.json", tsVnodeDir); + + fp = fopen(file, "r"); + if (!fp) { + dDebug("file %s not exist", file); + code = 0; + goto PRASE_VNODE_OVER; + } + + len = (int32_t)fread(content, 1, maxLen, fp); + if (len <= 0) { + dError("failed to read %s since content is null", file); + goto PRASE_VNODE_OVER; + } + + content[len] = 0; + root = cJSON_Parse(content); + if (root == NULL) { + dError("failed to read %s since invalid json format", file); + goto PRASE_VNODE_OVER; + } + + cJSON *vnodes = cJSON_GetObjectItem(root, "vnodes"); + if (!vnodes || vnodes->type != cJSON_Array) { + dError("failed to read %s since vnodes not found", file); + goto PRASE_VNODE_OVER; + } + + int32_t vnodesNum = cJSON_GetArraySize(vnodes); + if (vnodesNum <= 0) { + dError("failed to read %s since vnodes size:%d invalid", file, vnodesNum); + goto PRASE_VNODE_OVER; + } + + pVnodes = calloc(vnodesNum, sizeof(SVnodeObj)); + if (pVnodes == NULL) { + dError("failed to read %s since out of memory", file); + goto PRASE_VNODE_OVER; + } + + for (int32_t i = 0; i < vnodesNum; ++i) { + cJSON *vnode = cJSON_GetArrayItem(vnodes, i); + SVnodeObj *pVnode = &pVnodes[i]; + + cJSON *vgId = cJSON_GetObjectItem(vnode, "vgId"); + if (!vgId || vgId->type != cJSON_String) { + dError("failed to read %s since vgId not found", file); + goto PRASE_VNODE_OVER; + } + pVnode->vgId = atoi(vgId->valuestring); + + cJSON *dropped = cJSON_GetObjectItem(vnode, "dropped"); + if (!dropped || dropped->type != cJSON_String) { + dError("failed to read %s since dropped not found", file); + goto PRASE_VNODE_OVER; + } + pVnode->dropped = atoi(vnode->valuestring); + } + + code = 0; + dInfo("succcessed to read file %s", file); + +PRASE_VNODE_OVER: + if (content != NULL) free(content); + if (root != NULL) cJSON_Delete(root); + if (fp != NULL) fclose(fp); + + return code; +} + +static int32_t dnodeWriteVnodesToFile() { + char file[PATH_MAX + 20] = {0}; + char realfile[PATH_MAX + 20] = {0}; + snprintf(file, PATH_MAX + 20, "%s/vnodes.json.bak", tsVnodeDir); + snprintf(realfile, PATH_MAX + 20, "%s/vnodes.json", tsVnodeDir); + + FILE *fp = fopen(file, "w"); + if (!fp) { + dError("failed to write %s since %s", file, strerror(errno)); + return -1; + } + + int32_t len = 0; + int32_t maxLen = 30000; + char *content = calloc(1, maxLen + 1); + int32_t numOfVnodes = 0; + SVnodeObj **pVnodes = dnodeGetVnodesFromHash(&numOfVnodes); + + len += snprintf(content + len, maxLen - len, "{\n"); + len += snprintf(content + len, maxLen - len, " \"vnodes\": [{\n"); + for (int32_t i = 0; i < numOfVnodes; ++i) { + SVnodeObj *pVnode = pVnodes[i]; + len += snprintf(content + len, maxLen - len, " \"vgId\": \"%d\",\n", pVnode->vgId); + len += snprintf(content + len, maxLen - len, " \"dropped\": \"%d\"\n", pVnode->dropped); + if (i < numOfVnodes - 1) { + len += snprintf(content + len, maxLen - len, " },{\n"); + } else { + len += snprintf(content + len, maxLen - len, " }]\n"); + } + } + len += snprintf(content + len, maxLen - len, "}\n"); + + fwrite(content, 1, len, fp); + taosFsyncFile(fileno(fp)); + fclose(fp); + free(content); + terrno = 0; + + for (int32_t i = 0; i < numOfVnodes; ++i) { + SVnodeObj *pVnode = pVnodes[i]; + dnodeReleaseVnode(pVnode); + } + + if (pVnodes != NULL) { + free(pVnodes); + } + + dInfo("successed to write %s", file); + return taosRenameFile(file, realfile); +} + +static int32_t dnodeCreateVnode(int32_t vgId, SVnodeCfg *pCfg) { + int32_t code = 0; + + char path[PATH_MAX + 20] = {0}; + snprintf(path, sizeof(path),"%s/vnode%d", tsVnodeDir, vgId); + SVnode *pImpl = vnodeCreate(vgId, path, pCfg); + + if (pImpl == NULL) { + code = terrno; + return code; + } + + code = dnodeCreateVnodeWrapper(vgId, pImpl); + if (code != 0) { + vnodeDrop(pImpl); + return code; + } + + code = dnodeWriteVnodesToFile(); + if (code != 0) { + vnodeDrop(pImpl); + return code; + } + + return code; +} + +static int32_t dnodeDropVnode(SVnodeObj *pVnode) { + pVnode->dropped = 1; + + int32_t code = dnodeWriteVnodesToFile(); + if (code != 0) { + pVnode->dropped = 0; + return code; + } + + dnodeDropVnodeWrapper(pVnode); + vnodeDrop(pVnode->pImpl); + dnodeWriteVnodesToFile(); + return 0; +} + +static void *dnodeOpenVnodeFunc(void *param) { + SVThread *pThread = param; + + dDebug("thread:%d, start to open %d vnodes", pThread->threadIndex, pThread->vnodeNum); + setThreadName("open-vnodes"); + + for (int32_t v = 0; v < pThread->vnodeNum; ++v) { + SVnodeObj *pVnode = &pThread->pVnodes[v]; + + char stepDesc[TSDB_STEP_DESC_LEN] = {0}; + snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to restore, %d of %d have been opened", pVnode->vgId, + tsVnodes.openVnodes, tsVnodes.totalVnodes); + dnodeReportStartup("open-vnodes", stepDesc); + + char path[PATH_MAX + 20] = {0}; + snprintf(path, sizeof(path),"%s/vnode%d", tsVnodeDir, pVnode->vgId); + SVnode *pImpl = vnodeOpen(pVnode->vgId, path); + if (pImpl == NULL) { + dError("vgId:%d, failed to open vnode by thread:%d", pVnode->vgId, pThread->threadIndex); + pThread->failed++; + } else { + dnodeCreateVnodeWrapper(pVnode->vgId, pImpl); + dDebug("vgId:%d, is opened by thread:%d", pVnode->vgId, pThread->threadIndex); + pThread->opened++; + } + + atomic_add_fetch_32(&tsVnodes.openVnodes, 1); + } + + dDebug("thread:%d, total vnodes:%d, opened:%d failed:%d", pThread->threadIndex, pThread->vnodeNum, pThread->opened, + pThread->failed); + return NULL; +} + +static int32_t dnodeOpenVnodes() { + taosInitRWLatch(&tsVnodes.latch); + + tsVnodes.hash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); + if (tsVnodes.hash == NULL) { + dError("failed to init vnode hash"); + return TSDB_CODE_VND_OUT_OF_MEMORY; + } + + SVnodeObj *pVnodes = NULL; + int32_t numOfVnodes = 0; + int32_t code = dnodeGetVnodesFromFile(&pVnodes, &numOfVnodes); + if (code != TSDB_CODE_SUCCESS) { + dInfo("failed to get vnode list from disk since %s", tstrerror(code)); + return code; + } + + tsVnodes.totalVnodes = numOfVnodes; + + int32_t threadNum = tsNumOfCores; + int32_t vnodesPerThread = numOfVnodes / threadNum + 1; + + SVThread *threads = calloc(threadNum, sizeof(SVThread)); + for (int32_t t = 0; t < threadNum; ++t) { + threads[t].threadIndex = t; + threads[t].pVnodes = calloc(vnodesPerThread, sizeof(SVnodeObj)); + } + + for (int32_t v = 0; v < numOfVnodes; ++v) { + int32_t t = v % threadNum; + SVThread *pThread = &threads[t]; + pThread->pVnodes[pThread->vnodeNum++] = pVnodes[v]; + } + + dInfo("start %d threads to open %d vnodes", threadNum, numOfVnodes); + + for (int32_t t = 0; t < threadNum; ++t) { + SVThread *pThread = &threads[t]; + if (pThread->vnodeNum == 0) continue; + + pThread->threadId = taosCreateThread(dnodeOpenVnodeFunc, pThread); + if (pThread->threadId == NULL) { + dError("thread:%d, failed to create thread to open vnode, reason:%s", pThread->threadIndex, strerror(errno)); + } + } + + for (int32_t t = 0; t < threadNum; ++t) { + SVThread *pThread = &threads[t]; + taosDestoryThread(pThread->threadId); + pThread->threadId = NULL; + free(pThread->pVnodes); + } + free(threads); + + if (tsVnodes.openVnodes != tsVnodes.totalVnodes) { + dError("there are total vnodes:%d, opened:%d", tsVnodes.totalVnodes, tsVnodes.openVnodes); + return -1; + } else { + dInfo("total vnodes:%d open successfully", tsVnodes.totalVnodes); + } + + return TSDB_CODE_SUCCESS; +} + +static void dnodeCloseVnodes() { + int32_t numOfVnodes = 0; + SVnodeObj **pVnodes = dnodeGetVnodesFromHash(&numOfVnodes); + + for (int32_t i = 0; i < numOfVnodes; ++i) { + dnodeDropVnodeWrapper(pVnodes[i]); + } + if (pVnodes != NULL) { + free(pVnodes); + } + + if (tsVnodes.hash != NULL) { + taosHashCleanup(tsVnodes.hash); + tsVnodes.hash = NULL; + } + + dInfo("total vnodes:%d are all closed", numOfVnodes); +} + +static int32_t dnodeParseCreateVnodeReq(SRpcMsg *rpcMsg, int32_t *vgId, SVnodeCfg *pCfg) { + SCreateVnodeMsg *pCreate = rpcMsg->pCont; + *vgId = htonl(pCreate->vgId); + + tstrncpy(pCfg->db, pCreate->db, TSDB_FULL_DB_NAME_LEN); + pCfg->cacheBlockSize = htonl(pCreate->cacheBlockSize); + pCfg->totalBlocks = htonl(pCreate->totalBlocks); + pCfg->daysPerFile = htonl(pCreate->daysPerFile); + pCfg->daysToKeep0 = htonl(pCreate->daysToKeep0); + pCfg->daysToKeep1 = htonl(pCreate->daysToKeep1); + pCfg->daysToKeep2 = htonl(pCreate->daysToKeep2); + pCfg->minRowsPerFileBlock = htonl(pCreate->minRowsPerFileBlock); + pCfg->maxRowsPerFileBlock = htonl(pCreate->maxRowsPerFileBlock); + pCfg->precision = pCreate->precision; + pCfg->compression = pCreate->compression; + pCfg->cacheLastRow = pCreate->cacheLastRow; + pCfg->update = pCreate->update; + pCfg->quorum = pCreate->quorum; + pCfg->replica = pCreate->replica; + pCfg->walLevel = pCreate->walLevel; + pCfg->fsyncPeriod = htonl(pCreate->fsyncPeriod); + + for (int32_t i = 0; i < pCfg->replica; ++i) { + pCfg->replicas[i].port = htons(pCreate->replicas[i].port); + tstrncpy(pCfg->replicas[i].fqdn, pCreate->replicas[i].fqdn, TSDB_FQDN_LEN); + } + + return 0; +} + +static SDropVnodeMsg *vnodeParseDropVnodeReq(SRpcMsg *rpcMsg) { + SDropVnodeMsg *pDrop = rpcMsg->pCont; + pDrop->vgId = htonl(pDrop->vgId); + return pDrop; +} + +static SAuthVnodeMsg *vnodeParseAuthVnodeReq(SRpcMsg *rpcMsg) { + SAuthVnodeMsg *pAuth = rpcMsg->pCont; + pAuth->vgId = htonl(pAuth->vgId); + return pAuth; +} + +static int32_t vnodeProcessCreateVnodeReq(SRpcMsg *rpcMsg) { + SVnodeCfg vnodeCfg = {0}; + int32_t vgId = 0; + + dnodeParseCreateVnodeReq(rpcMsg, &vgId, &vnodeCfg); + dDebug("vgId:%d, create vnode req is received", vgId); + + SVnodeObj *pVnode = dnodeAcquireVnode(vgId); + if (pVnode != NULL) { + dDebug("vgId:%d, already exist, return success", vgId); + dnodeReleaseVnode(pVnode); + return 0; + } + + int32_t code = dnodeCreateVnode(vgId, &vnodeCfg); + if (code != 0) { + dError("vgId:%d, failed to create vnode since %s", vgId, tstrerror(code)); + } + + return code; +} + +static int32_t vnodeProcessAlterVnodeReq(SRpcMsg *rpcMsg) { + SVnodeCfg vnodeCfg = {0}; + int32_t vgId = 0; + int32_t code = 0; + + dnodeParseCreateVnodeReq(rpcMsg, &vgId, &vnodeCfg); + dDebug("vgId:%d, alter vnode req is received", vgId); + + SVnodeObj *pVnode = dnodeAcquireVnode(vgId); + if (pVnode == NULL) { + code = terrno; + dDebug("vgId:%d, failed to alter vnode since %s", vgId, tstrerror(code)); + return code; + } + + code = vnodeAlter(pVnode->pImpl, &vnodeCfg); + if (code != 0) { + dError("vgId:%d, failed to alter vnode since %s", vgId, tstrerror(code)); + } + + dnodeReleaseVnode(pVnode); + return code; +} + +static int32_t vnodeProcessDropVnodeReq(SRpcMsg *rpcMsg) { + SDropVnodeMsg *pDrop = vnodeParseDropVnodeReq(rpcMsg); + + int32_t code = 0; + int32_t vgId = pDrop->vgId; + dDebug("vgId:%d, drop vnode req is received", vgId); + + SVnodeObj *pVnode = dnodeAcquireVnode(vgId); + if (pVnode == NULL) { + code = terrno; + dDebug("vgId:%d, failed to drop since %s", vgId, tstrerror(code)); + return code; + } + + code = dnodeDropVnode(pVnode); + if (code != 0) { + dnodeReleaseVnode(pVnode); + dError("vgId:%d, failed to drop vnode since %s", vgId, tstrerror(code)); + } + + return code; +} + +static int32_t vnodeProcessAuthVnodeReq(SRpcMsg *rpcMsg) { + SAuthVnodeMsg *pAuth = (SAuthVnodeMsg *)vnodeParseAuthVnodeReq(rpcMsg); + + int32_t code = 0; + int32_t vgId = pAuth->vgId; + dDebug("vgId:%d, auth vnode req is received", vgId); + + SVnodeObj *pVnode = dnodeAcquireVnode(vgId); + if (pVnode == NULL) { + code = terrno; + dDebug("vgId:%d, failed to auth since %s", vgId, tstrerror(code)); + return code; + } + + pVnode->accessState = pAuth->accessState; + dnodeReleaseVnode(pVnode); + return code; +} + +static int32_t vnodeProcessSyncVnodeReq(SRpcMsg *rpcMsg) { + SAuthVnodeMsg *pAuth = (SAuthVnodeMsg *)vnodeParseAuthVnodeReq(rpcMsg); + + int32_t code = 0; + int32_t vgId = pAuth->vgId; + dDebug("vgId:%d, auth vnode req is received", vgId); + + SVnodeObj *pVnode = dnodeAcquireVnode(vgId); + if (pVnode == NULL) { + code = terrno; + dDebug("vgId:%d, failed to auth since %s", vgId, tstrerror(code)); + return code; + } + + code = vnodeSync(pVnode->pImpl); + if (code != 0) { + dError("vgId:%d, failed to auth vnode since %s", vgId, tstrerror(code)); + } + + dnodeReleaseVnode(pVnode); + return code; +} + +static int32_t vnodeProcessCompactVnodeReq(SRpcMsg *rpcMsg) { + SCompactVnodeMsg *pCompact = (SCompactVnodeMsg *)vnodeParseDropVnodeReq(rpcMsg); + + int32_t code = 0; + int32_t vgId = pCompact->vgId; + dDebug("vgId:%d, compact vnode req is received", vgId); + + SVnodeObj *pVnode = dnodeAcquireVnode(vgId); + if (pVnode == NULL) { + code = terrno; + dDebug("vgId:%d, failed to compact since %s", vgId, tstrerror(code)); + return code; + } + + code = vnodeCompact(pVnode->pImpl); + if (code != 0) { + dError("vgId:%d, failed to compact vnode since %s", vgId, tstrerror(code)); + } + + dnodeReleaseVnode(pVnode); + return code; +} + +static void dnodeProcessVnodeMgmtQueue(void *unused, SRpcMsg *pMsg) { + int32_t code = 0; + + switch (pMsg->msgType) { + case TSDB_MSG_TYPE_CREATE_VNODE_IN: + code = vnodeProcessCreateVnodeReq(pMsg); + break; + case TSDB_MSG_TYPE_ALTER_VNODE_IN: + code = vnodeProcessAlterVnodeReq(pMsg); + break; + case TSDB_MSG_TYPE_DROP_VNODE_IN: + code = vnodeProcessDropVnodeReq(pMsg); + break; + case TSDB_MSG_TYPE_AUTH_VNODE_IN: + code = vnodeProcessAuthVnodeReq(pMsg); + break; + case TSDB_MSG_TYPE_SYNC_VNODE_IN: + code = vnodeProcessSyncVnodeReq(pMsg); + break; + case TSDB_MSG_TYPE_COMPACT_VNODE_IN: + code = vnodeProcessCompactVnodeReq(pMsg); + break; + default: + code = TSDB_CODE_DND_MSG_NOT_PROCESSED; + break; + } + + SRpcMsg rsp = {.code = code, .handle = pMsg->handle}; + rpcSendResponse(&rsp); + rpcFreeCont(pMsg->pCont); + taosFreeQitem(pMsg); +} + +static void dnodeProcessVnodeQueryQueue(SVnodeObj *pVnode, SVnodeMsg *pMsg) { + vnodeProcessMsg(pVnode->pImpl, pMsg, VN_MSG_TYPE_QUERY); +} + +static void dnodeProcessVnodeFetchQueue(SVnodeObj *pVnode, SVnodeMsg *pMsg) { + vnodeProcessMsg(pVnode->pImpl, pMsg, VN_MSG_TYPE_FETCH); +} + +static void dnodeProcessVnodeWriteQueue(SVnodeObj *pVnode, taos_qall qall, int32_t numOfMsgs) { + SVnodeMsg *pMsg = vnodeInitMsg(numOfMsgs); + SRpcMsg *pRpcMsg = NULL; + + for (int32_t i = 0; i < numOfMsgs; ++i) { + taosGetQitem(qall, (void **)&pRpcMsg); + vnodeAppendMsg(pMsg, pRpcMsg); + taosFreeQitem(pRpcMsg); + } + + vnodeProcessMsg(pVnode->pImpl, pMsg, VN_MSG_TYPE_WRITE); +} + +static void dnodeProcessVnodeApplyQueue(SVnodeObj *pVnode, taos_qall qall, int32_t numOfMsgs) { + SVnodeMsg *pMsg = NULL; + for (int32_t i = 0; i < numOfMsgs; ++i) { + taosGetQitem(qall, (void **)&pMsg); + vnodeProcessMsg(pVnode->pImpl, pMsg, VN_MSG_TYPE_APPLY); + } +} + +static void dnodeProcessVnodeSyncQueue(SVnodeObj *pVnode, taos_qall qall, int32_t numOfMsgs) { + SVnodeMsg *pMsg = NULL; + for (int32_t i = 0; i < numOfMsgs; ++i) { + taosGetQitem(qall, (void **)&pMsg); + vnodeProcessMsg(pVnode->pImpl, pMsg, VN_MSG_TYPE_SYNC); + } +} + +static int32_t dnodeWriteRpcMsgToVnodeQueue(taos_queue pQueue, SRpcMsg *pRpcMsg) { + int32_t code = 0; + + if (pQueue == NULL) { + code = TSDB_CODE_DND_MSG_NOT_PROCESSED; + } else { + SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg)); + if (pMsg == NULL) { + code = TSDB_CODE_DND_OUT_OF_MEMORY; + } else { + *pMsg = *pRpcMsg; + code = taosWriteQitem(pQueue, pMsg); + } + } + + if (code != TSDB_CODE_SUCCESS) { + SRpcMsg rsp = {.handle = pRpcMsg->handle, .code = code}; + rpcSendResponse(&rsp); + rpcFreeCont(pRpcMsg->pCont); + } +} + +static int32_t dnodeWriteVnodeMsgToVnodeQueue(taos_queue pQueue, SRpcMsg *pRpcMsg) { + int32_t code = 0; + + if (pQueue == NULL) { + code = TSDB_CODE_DND_MSG_NOT_PROCESSED; + } else { + SVnodeMsg *pMsg = vnodeInitMsg(1); + if (pMsg == NULL) { + code = TSDB_CODE_DND_OUT_OF_MEMORY; + } else { + vnodeAppendMsg(pMsg, pRpcMsg); + code = taosWriteQitem(pQueue, pMsg); + } + } + + if (code != TSDB_CODE_SUCCESS) { + SRpcMsg rsp = {.handle = pRpcMsg->handle, .code = code}; + rpcSendResponse(&rsp); + rpcFreeCont(pRpcMsg->pCont); + } +} + +static SVnodeObj *dnodeAcquireVnodeFromMsg(SRpcMsg *pMsg) { + SMsgHead *pHead = (SMsgHead *)pMsg->pCont; + pHead->vgId = htonl(pHead->vgId); + + SVnodeObj *pVnode = dnodeAcquireVnode(pHead->vgId); + if (pVnode == NULL) { + SRpcMsg rsp = {.handle = pMsg->handle, .code = terrno}; + rpcSendResponse(&rsp); + rpcFreeCont(pMsg->pCont); + } + + return pVnode; +} + +void dnodeProcessVnodeMgmtMsg(SRpcMsg *pMsg, SEpSet *pEpSet) { dnodeWriteRpcMsgToVnodeQueue(tsVnodes.pMgmtQ, pMsg); } + +void dnodeProcessVnodeWriteMsg(SRpcMsg *pMsg, SEpSet *pEpSet) { + SVnodeObj *pVnode = dnodeAcquireVnodeFromMsg(pMsg); + if (pVnode != NULL) { + dnodeWriteRpcMsgToVnodeQueue(pVnode->pWriteQ, pMsg); + dnodeReleaseVnode(pVnode); + } +} + +void dnodeProcessVnodeSyncMsg(SRpcMsg *pMsg, SEpSet *pEpSet) { + SVnodeObj *pVnode = dnodeAcquireVnodeFromMsg(pMsg); + if (pVnode != NULL) { + dnodeWriteVnodeMsgToVnodeQueue(pVnode->pSyncQ, pMsg); + dnodeReleaseVnode(pVnode); + } +} + +void dnodeProcessVnodeQueryMsg(SRpcMsg *pMsg, SEpSet *pEpSet) { + SVnodeObj *pVnode = dnodeAcquireVnodeFromMsg(pMsg); + if (pVnode != NULL) { + dnodeWriteVnodeMsgToVnodeQueue(pVnode->pQueryQ, pMsg); + dnodeReleaseVnode(pVnode); + } +} + +void dnodeProcessVnodeFetchMsg(SRpcMsg *pMsg, SEpSet *pEpSet) { + SVnodeObj *pVnode = dnodeAcquireVnodeFromMsg(pMsg); + if (pVnode != NULL) { + dnodeWriteVnodeMsgToVnodeQueue(pVnode->pFetchQ, pMsg); + dnodeReleaseVnode(pVnode); + } +} + +static int32_t dnodePutMsgIntoVnodeApplyQueue(int32_t vgId, SVnodeMsg *pMsg) { + SVnodeObj *pVnode = dnodeAcquireVnode(vgId); + if (pVnode == NULL) { + return terrno; + } + + int32_t code = taosWriteQitem(pVnode->pApplyQ, pMsg); + dnodeReleaseVnode(pVnode); + return code; +} + +static int32_t dnodeInitVnodeMgmtWorker() { + SWorkerPool *pPool = &tsVnodes.mgmtPool; + pPool->name = "vnode-mgmt"; + pPool->min = 1; + pPool->max = 1; + if (tWorkerInit(pPool) != 0) { + return TSDB_CODE_VND_OUT_OF_MEMORY; + } + + tsVnodes.pMgmtQ = tWorkerAllocQueue(pPool, NULL, (FProcessItem)dnodeProcessVnodeMgmtQueue); + if (tsVnodes.pMgmtQ == NULL) { + return TSDB_CODE_VND_OUT_OF_MEMORY; + } + + return 0; +} + +static void dnodeCleanupVnodeMgmtWorker() { + tWorkerFreeQueue(&tsVnodes.mgmtPool, tsVnodes.pMgmtQ); + tWorkerCleanup(&tsVnodes.mgmtPool); + tsVnodes.pMgmtQ = NULL; +} + +static int32_t dnodeAllocVnodeQueryQueue(SVnodeObj *pVnode) { + pVnode->pQueryQ = tWorkerAllocQueue(&tsVnodes.queryPool, pVnode, (FProcessItem)dnodeProcessVnodeQueryQueue); + if (pVnode->pQueryQ == NULL) { + return TSDB_CODE_DND_OUT_OF_MEMORY; + } + return 0; +} + +static void dnodeFreeVnodeQueryQueue(SVnodeObj *pVnode) { + tWorkerFreeQueue(&tsVnodes.queryPool, pVnode->pQueryQ); + pVnode->pQueryQ = NULL; +} + +static int32_t dnodeAllocVnodeFetchQueue(SVnodeObj *pVnode) { + pVnode->pFetchQ = tWorkerAllocQueue(&tsVnodes.fetchPool, pVnode, (FProcessItem)dnodeProcessVnodeFetchQueue); + if (pVnode->pFetchQ == NULL) { + return TSDB_CODE_DND_OUT_OF_MEMORY; + } + return 0; +} + +static void dnodeFreeVnodeFetchQueue(SVnodeObj *pVnode) { + tWorkerFreeQueue(&tsVnodes.fetchPool, pVnode->pFetchQ); + pVnode->pFetchQ = NULL; +} + +static int32_t dnodeInitVnodeReadWorker() { + int32_t maxFetchThreads = 4; + float threadsForQuery = MAX(tsNumOfCores * tsRatioOfQueryCores, 1); + + SWorkerPool *pPool = &tsVnodes.queryPool; + pPool->name = "vnode-query"; + pPool->min = (int32_t)threadsForQuery; + pPool->max = pPool->min; + if (tWorkerInit(pPool) != 0) { + return TSDB_CODE_VND_OUT_OF_MEMORY; + } + + pPool = &tsVnodes.fetchPool; + pPool->name = "vnode-fetch"; + pPool->min = MIN(maxFetchThreads, tsNumOfCores); + pPool->max = pPool->min; + if (tWorkerInit(pPool) != 0) { + TSDB_CODE_VND_OUT_OF_MEMORY; + } + + return 0; +} + +static void dnodeCleanupVnodeReadWorker() { + tWorkerCleanup(&tsVnodes.fetchPool); + tWorkerCleanup(&tsVnodes.queryPool); +} + +static int32_t dnodeAllocVnodeWriteQueue(SVnodeObj *pVnode) { + pVnode->pWriteQ = tMWorkerAllocQueue(&tsVnodes.writePool, pVnode, (FProcessItems)dnodeProcessVnodeWriteQueue); + if (pVnode->pWriteQ == NULL) { + return TSDB_CODE_DND_OUT_OF_MEMORY; + } + return 0; +} + +static void dnodeFreeVnodeWriteQueue(SVnodeObj *pVnode) { + tMWorkerFreeQueue(&tsVnodes.writePool, pVnode->pWriteQ); + pVnode->pWriteQ = NULL; +} + +static int32_t dnodeAllocVnodeApplyQueue(SVnodeObj *pVnode) { + pVnode->pApplyQ = tMWorkerAllocQueue(&tsVnodes.writePool, pVnode, (FProcessItems)dnodeProcessVnodeApplyQueue); + if (pVnode->pApplyQ == NULL) { + return TSDB_CODE_DND_OUT_OF_MEMORY; + } + return 0; +} + +static void dnodeFreeVnodeApplyQueue(SVnodeObj *pVnode) { + tMWorkerFreeQueue(&tsVnodes.writePool, pVnode->pApplyQ); + pVnode->pApplyQ = NULL; +} + +static int32_t dnodeInitVnodeWriteWorker() { + SMWorkerPool *pPool = &tsVnodes.writePool; + pPool->name = "vnode-write"; + pPool->max = tsNumOfCores; + if (tMWorkerInit(pPool) != 0) { + return TSDB_CODE_VND_OUT_OF_MEMORY; + } + + return 0; +} + +static void dnodeCleanupVnodeWriteWorker() { tMWorkerCleanup(&tsVnodes.writePool); } + +static int32_t dnodeAllocVnodeSyncQueue(SVnodeObj *pVnode) { + pVnode->pSyncQ = tMWorkerAllocQueue(&tsVnodes.writePool, pVnode, (FProcessItems)dnodeProcessVnodeSyncQueue); + if (pVnode->pSyncQ == NULL) { + return TSDB_CODE_DND_OUT_OF_MEMORY; + } + return 0; +} + +static void dnodeFreeVnodeSyncQueue(SVnodeObj *pVnode) { + tMWorkerFreeQueue(&tsVnodes.writePool, pVnode->pSyncQ); + pVnode->pSyncQ = NULL; +} + +static int32_t dnodeInitVnodeSyncWorker() { + int32_t maxThreads = tsNumOfCores / 2; + if (maxThreads < 1) maxThreads = 1; + + SMWorkerPool *pPool = &tsVnodes.writePool; + pPool->name = "vnode-sync"; + pPool->max = maxThreads; + if (tMWorkerInit(pPool) != 0) { + return TSDB_CODE_VND_OUT_OF_MEMORY; + } + + return 0; +} + +static void dnodeCleanupVnodeSyncWorker() { tMWorkerCleanup(&tsVnodes.syncPool); } + +static int32_t dnodeInitVnodeModule() { + SVnodePara para; + para.SendMsgToDnode = dnodeSendMsgToDnode; + para.SendMsgToMnode = dnodeSendMsgToMnode; + para.PutMsgIntoApplyQueue = dnodePutMsgIntoVnodeApplyQueue; + + return vnodeInit(para); +} + +int32_t dnodeInitVnodes() { + dInfo("dnode-vnodes start to init"); + + SSteps *pSteps = taosStepInit(3, dnodeReportStartup); + taosStepAdd(pSteps, "dnode-vnode-env", dnodeInitVnodeModule, vnodeCleanup); + taosStepAdd(pSteps, "dnode-vnode-mgmt", dnodeInitVnodeMgmtWorker, dnodeCleanupVnodeMgmtWorker); + taosStepAdd(pSteps, "dnode-vnode-read", dnodeInitVnodeReadWorker, dnodeCleanupVnodeReadWorker); + taosStepAdd(pSteps, "dnode-vnode-write", dnodeInitVnodeWriteWorker, dnodeCleanupVnodeWriteWorker); + taosStepAdd(pSteps, "dnode-vnode-sync", dnodeInitVnodeSyncWorker, dnodeCleanupVnodeSyncWorker); + taosStepAdd(pSteps, "dnode-vnodes", dnodeOpenVnodes, dnodeCleanupVnodes); + + tsVnodes.pSteps = pSteps; + return taosStepExec(pSteps); +} + +void dnodeCleanupVnodes() { + if (tsVnodes.pSteps != NULL) { + dInfo("dnode-vnodes start to clean up"); + taosStepCleanup(tsVnodes.pSteps); + tsVnodes.pSteps = NULL; + dInfo("dnode-vnodes is cleaned up"); + } +} + +void dnodeGetVnodeLoads(SVnodeLoads *pLoads) { + pLoads->num = taosHashGetSize(tsVnodes.hash); + + int32_t v = 0; + void *pIter = taosHashIterate(tsVnodes.hash, NULL); + while (pIter) { + SVnodeObj **ppVnode = pIter; + if (ppVnode == NULL) continue; + + SVnodeObj *pVnode = *ppVnode; + if (pVnode == NULL) continue; + + SVnodeLoad *pLoad = &pLoads->data[v++]; + vnodeGetLoad(pVnode->pImpl, pLoad); + pLoad->vgId = htonl(pLoad->vgId); + pLoad->totalStorage = htobe64(pLoad->totalStorage); + pLoad->compStorage = htobe64(pLoad->compStorage); + pLoad->pointsWritten = htobe64(pLoad->pointsWritten); + pLoad->tablesNum = htobe64(pLoad->tablesNum); + + pIter = taosHashIterate(tsVnodes.hash, pIter); + } +} diff --git a/source/dnode/mnode/inc/mnodeDef.h b/source/dnode/mnode/inc/mnodeDef.h index 33606e8ee2..0825815bc7 100644 --- a/source/dnode/mnode/inc/mnodeDef.h +++ b/source/dnode/mnode/inc/mnodeDef.h @@ -208,7 +208,7 @@ typedef struct { typedef struct SDbObj { SdbHead head; - char name[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN]; + char name[TSDB_FULL_DB_NAME_LEN]; char acct[TSDB_USER_LEN]; int64_t createdTime; int64_t updateTime; @@ -236,7 +236,7 @@ typedef struct SVgObj { int64_t updateTime; int32_t lbDnodeId; int32_t lbTime; - char dbName[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN]; + char dbName[TSDB_FULL_DB_NAME_LEN]; int8_t inUse; int8_t accessState; int8_t status; @@ -288,7 +288,7 @@ typedef struct { void *pIter; void *pVgIter; void **ppShow; - char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN]; + char db[TSDB_FULL_DB_NAME_LEN]; int16_t offset[TSDB_MAX_COLUMNS]; int32_t bytes[TSDB_MAX_COLUMNS]; char payload[]; diff --git a/source/dnode/vnode/impl/src/vnodeInt.c b/source/dnode/vnode/impl/src/vnodeInt.c index 6f83542cef..427a6dae4d 100644 --- a/source/dnode/vnode/impl/src/vnodeInt.c +++ b/source/dnode/vnode/impl/src/vnodeInt.c @@ -15,19 +15,58 @@ #define _DEFAULT_SOURCE #include "vnodeInt.h" +#include "tqueue.h" -int32_t vnodeInit() { return 0; } +int32_t vnodeInit(SVnodePara para) { return 0; } void vnodeCleanup() {} -int32_t vnodeGetStatistics(SVnode *pVnode, SVnodeStatisic *pStat) { return 0; } -int32_t vnodeGetStatus(SVnode *pVnode, SVnodeStatus *pStatus) { return 0; } - SVnode *vnodeOpen(int32_t vgId, const char *path) { return NULL; } void vnodeClose(SVnode *pVnode) {} int32_t vnodeAlter(SVnode *pVnode, const SVnodeCfg *pCfg) { return 0; } SVnode *vnodeCreate(int32_t vgId, const char *path, const SVnodeCfg *pCfg) { return NULL; } -int32_t vnodeDrop(SVnode *pVnode) { return 0; } +void vnodeDrop(SVnode *pVnode) {} int32_t vnodeCompact(SVnode *pVnode) { return 0; } int32_t vnodeSync(SVnode *pVnode) { return 0; } -void vnodeProcessMsg(SVnode *pVnode, SVnodeMsg *pMsg) {} +int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) { return 0; } + +SVnodeMsg *vnodeInitMsg(int32_t msgNum) { + SVnodeMsg *pMsg = taosAllocateQitem(msgNum * sizeof(SRpcMsg *) + sizeof(SVnodeMsg)); + if (pMsg == NULL) { + terrno = TSDB_CODE_VND_OUT_OF_MEMORY; + return NULL; + } else { + pMsg->allocNum = msgNum; + return pMsg; + } +} + +int32_t vnodeAppendMsg(SVnodeMsg *pMsg, SRpcMsg *pRpcMsg) { + if (pMsg->curNum >= pMsg->allocNum) { + return TSDB_CODE_VND_OUT_OF_MEMORY; + } + + pMsg->rpcMsg[pMsg->curNum++] = *pRpcMsg; +} + +void vnodeCleanupMsg(SVnodeMsg *pMsg) { + for (int32_t i = 0; i < pMsg->curNum; ++i) { + rpcFreeCont(pMsg->rpcMsg[i].pCont); + } + taosFreeQitem(pMsg); +} + +void vnodeProcessMsg(SVnode *pVnode, SVnodeMsg *pMsg, EVnMsgType msgType) { + switch (msgType) { + case VN_MSG_TYPE_WRITE: + break; + case VN_MSG_TYPE_APPLY: + break; + case VN_MSG_TYPE_SYNC: + break; + case VN_MSG_TYPE_QUERY: + break; + case VN_MSG_TYPE_FETCH: + break; + } +} diff --git a/source/libs/parser/src/parserUtil.c b/source/libs/parser/src/parserUtil.c index 0bef796026..5aab21bdae 100644 --- a/source/libs/parser/src/parserUtil.c +++ b/source/libs/parser/src/parserUtil.c @@ -1934,7 +1934,7 @@ char* cloneCurrentDBName(SSqlObj* pSql) { case TAOS_REQ_FROM_HTTP: pCtx = pSql->param; if (pCtx && pCtx->db[0] != '\0') { - char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN] = {0}; + char db[TSDB_FULL_DB_NAME_LEN] = {0}; int32_t len = sprintf(db, "%s%s%s", pTscObj->acctId, TS_PATH_DELIMITER, pCtx->db); assert(len <= sizeof(db)); diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 22fbeb1883..6838bab403 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -235,6 +235,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_DND_INVALID_MSG_LEN, "Invalid message lengt TAOS_DEFINE_ERROR(TSDB_CODE_DND_ACTION_IN_PROGRESS, "Action in progress") TAOS_DEFINE_ERROR(TSDB_CODE_DND_TOO_MANY_VNODES, "Too many vnode directories") TAOS_DEFINE_ERROR(TSDB_CODE_DND_EXITING, "Dnode is exiting") +TAOS_DEFINE_ERROR(TSDB_CODE_DND_PARSE_VNODE_FILE_ERROR, "Parse vnodes.json error") // vnode TAOS_DEFINE_ERROR(TSDB_CODE_VND_ACTION_IN_PROGRESS, "Action in progress") diff --git a/source/util/src/tqueue.c b/source/util/src/tqueue.c index 2813a55fea..5d6a507172 100644 --- a/source/util/src/tqueue.c +++ b/source/util/src/tqueue.c @@ -98,6 +98,20 @@ void taosCloseQueue(taos_queue param) { uTrace("queue:%p is closed", queue); } +bool taosQueueEmpty(taos_queue param) { + if (param == NULL) return true; + STaosQueue *queue = (STaosQueue *)param; + + bool empty = false; + pthread_mutex_lock(&queue->mutex); + if (queue->head == NULL && queue->tail == NULL) { + empty = true; + } + pthread_mutex_destroy(&queue->mutex); + + return empty; +} + void *taosAllocateQitem(int size) { STaosQnode *pNode = (STaosQnode *)calloc(sizeof(STaosQnode) + size, 1); diff --git a/source/util/src/tworker.c b/source/util/src/tworker.c index 7df12089b7..136bc40482 100644 --- a/source/util/src/tworker.c +++ b/source/util/src/tworker.c @@ -76,7 +76,7 @@ static void *tWorkerThreadFp(SWorker *worker) { } if (fp) { - (*fp)(msg, ahandle); + (*fp)(ahandle, msg); } } @@ -186,7 +186,7 @@ static void *tWriteWorkerThreadFp(SMWorker *worker) { } if (fp) { - (*fp)(worker->qall, numOfMsgs, ahandle); + (*fp)(ahandle, worker->qall, numOfMsgs); } }