diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 92b453bc1f..6727dd3289 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -320,7 +320,7 @@ typedef struct SEpSet { typedef struct { int32_t acctId; - uint32_t clusterId; + int64_t clusterId; int32_t connId; int8_t superUser; int8_t reserved[5]; @@ -644,20 +644,23 @@ typedef struct { typedef struct { int32_t sver; int32_t dnodeId; - int32_t clusterId; - int64_t rebootTime; // time stamp for last reboot + int64_t clusterId; + int64_t rebootTime; + int64_t updateTime; int16_t numOfCores; - int16_t numOfSupportMnodes; int16_t numOfSupportVnodes; - int16_t numOfSupportQnodes; char dnodeEp[TSDB_EP_LEN]; SClusterCfg clusterCfg; SVnodeLoads vnodeLoads; } SStatusMsg; +typedef struct { + int32_t reserved; +} STransMsg; + typedef struct { int32_t dnodeId; - int32_t clusterId; + int64_t clusterId; int8_t dropped; char reserved[7]; } SDnodeCfg; diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 15b5b9da28..9aa4325d58 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -67,23 +67,21 @@ enum { #endif // Requests handled by DNODE TD_NEW_MSG_SEG(TDMT_DND_MSG) - TD_DEF_MSG_TYPE(TDMT_DND_NETWORK_TEST, "dnode-nettest", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_DND_CREATE_MNODE, "dnode-create-mnode", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_DND_ALTER_MNODE, "dnode-alter-mnode", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_DND_DROP_MNODE, "dnode-drop-mnode", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_DND_CREATE_VNODE, "dnode-create-vnode", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_DND_ALTER_VNODE, "dnode-alter-vnode", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_DND_DROP_VNODE, "dnode-drop-vnode", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_DND_AUTH_VNODE, "dnode-auth-vnode", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_DND_SYNC_VNODE, "dnode-sync-vnode", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_DND_COMPACT_VNODE, "dnode-compact-vnode", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_DND_CREATE_MNODE, "dnode-create-mnode", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_DND_ALTER_MNODE, "dnode-alter-mnode", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_DND_DROP_MNODE, "dnode-drop-mnode", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_DND_CONFIG_DNODE, "dnode-config-dnode", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_DND_NETWORK_TEST, "dnode-nettest", NULL, NULL) // Requests handled by MNODE TD_NEW_MSG_SEG(TDMT_MND_MSG) TD_DEF_MSG_TYPE(TDMT_MND_CONNECT, "mnode-connect", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_MND_CREATE_TABLE, "mnode-create-table", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_MND_DROP_TABLE, "mnode-drop-table", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_CREATE_ACCT, "mnode-create-acct", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_ALTER_ACCT, "mnode-alter-acct", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_DROP_ACCT, "mnode-drop-acct", NULL, NULL) @@ -107,6 +105,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_MND_CREATE_STB, "mnode-create-stb", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_ALTER_STB, "mnode-alter-stb", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_DROP_STB, "mnode-drop-stb", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_STB_META, "mnode-stb-meta", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_VGROUP_LIST, "mnode-vgroup-list", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_KILL_QUERY, "mnode-kill-query", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_KILL_CONN, "mnode-kill-conn", NULL, NULL) @@ -114,6 +113,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_MND_SHOW, "mnode-show", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_SHOW_RETRIEVE, "mnode-retrieve", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_STATUS, "mnode-status", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_TRANS, "mnode-trans", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_GRANT, "mnode-grant", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_AUTH, "mnode-auth", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_CREATE_TOPIC, "mnode-create-topic", NULL, NULL) @@ -125,10 +125,15 @@ enum { TD_DEF_MSG_TYPE(TDMT_VND_SUBMIT, "vnode-submit", SSubmitReq, SSubmitRsp) TD_DEF_MSG_TYPE(TDMT_VND_QUERY, "vnode-query", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_FETCH, "vnode-fetch", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_CREATE_TABLE, "vnode-create-table", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_ALTER_TABLE, "vnode-alter-table", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_DROP_TABLE, "vnode-drop-table", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_UPDATE_TAG_VAL, "vnode-update-tag-val", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_TABLE_META, "vnode-table-meta", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_TABLES_META, "vnode-tables-meta", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_CREATE_STB, "vnode-create-stb", SVCreateTbReq, SVCreateTbRsp) + TD_DEF_MSG_TYPE(TDMT_VND_ALTER_STB, "vnode-alter-stb", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_DROP_STB, "vnode-drop-stb", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_MQ_CONSUME, "vnode-mq-consume", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_MQ_QUERY, "vnode-mq-query", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_MQ_CONNECT, "vnode-mq-connect", NULL, NULL) @@ -138,9 +143,6 @@ enum { TD_DEF_MSG_TYPE(TDMT_VND_TASKS_STATUS, "vnode-tasks-status", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_CANCEL_TASK, "vnode-cancel-task", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_DROP_TASK, "vnode-drop-task", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_CREATE_STB, "vnode-create-super-table", SVCreateTbReq, SVCreateTbRsp) - TD_DEF_MSG_TYPE(TDMT_VND_ALTER_STB, "vnode-alter-stb", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_DROP_STB, "vnode-drop-stb", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_CREATE_TOPIC, "vnode-create-topic", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_ALTER_TOPIC, "vnode-alter-topic", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_DROP_TOPIC, "vnode-drop-topic", NULL, NULL) diff --git a/include/dnode/mgmt/dnode.h b/include/dnode/mgmt/dnode.h index fbe447baf9..b7f3cea7fc 100644 --- a/include/dnode/mgmt/dnode.h +++ b/include/dnode/mgmt/dnode.h @@ -28,9 +28,7 @@ typedef struct SDnode SDnode; typedef struct { int32_t sver; int16_t numOfCores; - int16_t numOfSupportMnodes; int16_t numOfSupportVnodes; - int16_t numOfSupportQnodes; int8_t enableTelem; int32_t statusInterval; float numOfThreadsPerCore; diff --git a/include/dnode/mnode/mnode.h b/include/dnode/mnode/mnode.h index 09d1f8c013..e0619b2133 100644 --- a/include/dnode/mnode/mnode.h +++ b/include/dnode/mnode/mnode.h @@ -56,7 +56,7 @@ typedef struct SMnodeCfg { typedef struct { int32_t dnodeId; - int32_t clusterId; + int64_t clusterId; int8_t replica; int8_t selfIndex; SReplica replicas[TSDB_MAX_REPLICA]; diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index a57597a60a..6d3003eab1 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -60,7 +60,7 @@ typedef struct SAppInstInfo { SCorEpSet mgmtEp; SInstanceSummary summary; SList *pConnList; // STscObj linked list - uint32_t clusterId; + int64_t clusterId; void *pTransporter; } SAppInstInfo; diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index f20ef2c42e..0ee99f77aa 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -167,11 +167,11 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQueryNode* pQuery) { SMsgSendInfo* body = buildSendMsgInfoImpl(pRequest); SEpSet* pEpSet = &pTscObj->pAppInfo->mgmtEp.epSet; - if (pDcl->msgType == TDMT_MND_CREATE_TABLE) { + if (pDcl->msgType == TDMT_VND_CREATE_TABLE) { struct SCatalog* pCatalog = NULL; - char buf[12] = {0}; - sprintf(buf, "%d", pRequest->pTscObj->pAppInfo->clusterId); + char buf[18] = {0}; + sprintf(buf, "%" PRId64, pRequest->pTscObj->pAppInfo->clusterId); int32_t code = catalogGetHandle(buf, &pCatalog); if (code != TSDB_CODE_SUCCESS) { return code; @@ -230,11 +230,13 @@ TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) { terrno = TSDB_CODE_SUCCESS; CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return); CHECK_CODE_GOTO(parseSql(pRequest, &pQuery), _return); + if (qIsDdlQuery(pQuery)) { CHECK_CODE_GOTO(execDdlQuery(pRequest, pQuery), _return); + } else { + CHECK_CODE_GOTO(qCreateQueryDag(pQuery, &pDag), _return); + CHECK_CODE_GOTO(scheduleQuery(pRequest, pDag, &pJob), _return); } - CHECK_CODE_GOTO(qCreateQueryDag(pQuery, &pDag), _return); - CHECK_CODE_GOTO(scheduleQuery(pRequest, pDag, &pJob), _return); _return: qDestoryQuery(pQuery); diff --git a/source/client/src/clientMsgHandler.c b/source/client/src/clientMsgHandler.c index e4ed305efa..bdf54eb21c 100644 --- a/source/client/src/clientMsgHandler.c +++ b/source/client/src/clientMsgHandler.c @@ -34,6 +34,8 @@ int processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) { if (code != TSDB_CODE_SUCCESS) { pRequest->code = code; terrno = code; + + sem_post(&pRequest->body.rspSem); return code; } @@ -42,7 +44,7 @@ int processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) { SConnectRsp *pConnect = (SConnectRsp *)pMsg->pData; pConnect->acctId = htonl(pConnect->acctId); pConnect->connId = htonl(pConnect->connId); - pConnect->clusterId = htonl(pConnect->clusterId); + pConnect->clusterId = htobe64(pConnect->clusterId); assert(pConnect->epSet.numOfEps > 0); for(int32_t i = 0; i < pConnect->epSet.numOfEps; ++i) { @@ -64,8 +66,9 @@ int processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) { pTscObj->pAppInfo->clusterId = pConnect->clusterId; atomic_add_fetch_64(&pTscObj->pAppInfo->numOfConns, 1); -// pRequest->body.resInfo.pRspMsg = pMsg->pData; - tscDebug("0x%" PRIx64 " clusterId:%d, totalConn:%"PRId64, pRequest->requestId, pConnect->clusterId, pTscObj->pAppInfo->numOfConns); + // pRequest->body.resInfo.pRspMsg = pMsg->pData; + tscDebug("0x%" PRIx64 " clusterId:%" PRId64 ", totalConn:%" PRId64, pRequest->requestId, pConnect->clusterId, + pTscObj->pAppInfo->numOfConns); sem_post(&pRequest->body.rspSem); return 0; @@ -289,6 +292,6 @@ void initMsgHandleFp() { handleRequestRspFp[TDMT_MND_SHOW_RETRIEVE] = processRetrieveMnodeRsp; handleRequestRspFp[TDMT_MND_CREATE_DB] = processCreateDbRsp; handleRequestRspFp[TDMT_MND_USE_DB] = processUseDbRsp; - handleRequestRspFp[TDMT_MND_CREATE_TABLE] = processCreateTableRsp; + handleRequestRspFp[TDMT_MND_CREATE_STB] = processCreateTableRsp; handleRequestRspFp[TDMT_MND_DROP_DB] = processDropDbRsp; } \ No newline at end of file diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index 3f446ceef9..d14719fffb 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -160,6 +160,12 @@ TEST(testCase, create_db_Test) { int32_t numOfFields = taos_num_fields(pRes); ASSERT_EQ(numOfFields, 0); + taos_free_result(pRes); + + pRes = taos_query(pConn, "create database abc1"); + if (taos_errno(pRes) != 0) { + printf("error in create db, reason:%s\n", taos_errstr(pRes)); + } taos_close(pConn); } diff --git a/source/dnode/mgmt/daemon/src/daemon.c b/source/dnode/mgmt/daemon/src/daemon.c index 083935c706..75c2ff00e0 100644 --- a/source/dnode/mgmt/daemon/src/daemon.c +++ b/source/dnode/mgmt/daemon/src/daemon.c @@ -139,9 +139,7 @@ void dmnWaitSignal() { void dmnInitOption(SDnodeOpt *pOption) { pOption->sver = 30000000; //3.0.0.0 pOption->numOfCores = tsNumOfCores; - pOption->numOfSupportMnodes = 1; pOption->numOfSupportVnodes = 1; - pOption->numOfSupportQnodes = 1; pOption->statusInterval = tsStatusInterval; pOption->numOfThreadsPerCore = tsNumOfThreadsPerCore; pOption->ratioOfQueryCores = tsRatioOfQueryCores; diff --git a/source/dnode/mgmt/impl/inc/dndDnode.h b/source/dnode/mgmt/impl/inc/dndDnode.h index 27cc99c27c..a2015913a7 100644 --- a/source/dnode/mgmt/impl/inc/dndDnode.h +++ b/source/dnode/mgmt/impl/inc/dndDnode.h @@ -23,15 +23,16 @@ extern "C" { int32_t dndInitDnode(SDnode *pDnode); void dndCleanupDnode(SDnode *pDnode); -void dndProcessDnodeReq(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); -void dndProcessDnodeRsp(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); int32_t dndGetDnodeId(SDnode *pDnode); -int32_t dndGetClusterId(SDnode *pDnode); +int64_t dndGetClusterId(SDnode *pDnode); void dndGetDnodeEp(SDnode *pDnode, int32_t dnodeId, char *pEp, char *pFqdn, uint16_t *pPort); void dndGetMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet); -void dndSendRedirectMsg(SDnode *pDnode, SRpcMsg *pMsg); -void dndSendStatusMsg(SDnode *pDnode); + +void dndSendRedirectMsg(SDnode *pDnode, SRpcMsg *pMsg); +void dndSendStatusMsg(SDnode *pDnode); +void dndProcessMgmtMsg(SDnode *pDnode, SRpcMsg *pRpcMsg, SEpSet *pEpSet); +void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pMsg); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/impl/inc/dndInt.h b/source/dnode/mgmt/impl/inc/dndInt.h index 6f1357e9c1..d6e9a6b4a1 100644 --- a/source/dnode/mgmt/impl/inc/dndInt.h +++ b/source/dnode/mgmt/impl/inc/dndInt.h @@ -22,10 +22,11 @@ extern "C" { #include "cJSON.h" #include "os.h" -#include "tmsg.h" +#include "tep.h" #include "thash.h" #include "tlockfree.h" #include "tlog.h" +#include "tmsg.h" #include "tqueue.h" #include "trpc.h" #include "tthread.h" @@ -51,21 +52,27 @@ typedef void (*DndMsgFp)(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEps); typedef struct { char *dnode; char *mnode; + char *qnode; + char *snode; + char *bnode; char *vnodes; } SDnodeDir; typedef struct { - int32_t dnodeId; - int32_t dropped; - int32_t clusterId; - int64_t rebootTime; - int8_t statusSent; - SEpSet mnodeEpSet; - char *file; - SHashObj *dnodeHash; - SDnodeEps *dnodeEps; - pthread_t *threadId; - SRWLatch latch; + int32_t dnodeId; + int32_t dropped; + int64_t clusterId; + int64_t rebootTime; + int64_t updateTime; + int8_t statusSent; + SEpSet mnodeEpSet; + char *file; + SHashObj *dnodeHash; + SDnodeEps *dnodeEps; + pthread_t *threadId; + SRWLatch latch; + taos_queue pMgmtQ; + SWorkerPool mgmtPool; } SDnodeMgmt; typedef struct { @@ -81,8 +88,6 @@ typedef struct { taos_queue pReadQ; taos_queue pWriteQ; taos_queue pSyncQ; - taos_queue pMgmtQ; - SWorkerPool mgmtPool; SWorkerPool readPool; SWorkerPool writePool; SWorkerPool syncPool; @@ -93,8 +98,6 @@ typedef struct { int32_t openVnodes; int32_t totalVnodes; SRWLatch latch; - taos_queue pMgmtQ; - SWorkerPool mgmtPool; SWorkerPool queryPool; SWorkerPool fetchPool; SMWorkerPool syncPool; diff --git a/source/dnode/mgmt/impl/inc/dndMnode.h b/source/dnode/mgmt/impl/inc/dndMnode.h index 67c51e51a8..0f7cec230e 100644 --- a/source/dnode/mgmt/impl/inc/dndMnode.h +++ b/source/dnode/mgmt/impl/inc/dndMnode.h @@ -23,11 +23,14 @@ extern "C" { int32_t dndInitMnode(SDnode *pDnode); void dndCleanupMnode(SDnode *pDnode); + int32_t dndGetUserAuthFromMnode(SDnode *pDnode, char *user, char *spi, char *encrypt, char *secret, char *ckey); -void dndProcessMnodeMgmtMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); void dndProcessMnodeReadMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); void dndProcessMnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); void dndProcessMnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); +int32_t dndProcessCreateMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg); +int32_t dndProcessAlterMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg); +int32_t dndProcessDropMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/impl/inc/dndVnodes.h b/source/dnode/mgmt/impl/inc/dndVnodes.h index 35f99ee73b..bf5f0122c1 100644 --- a/source/dnode/mgmt/impl/inc/dndVnodes.h +++ b/source/dnode/mgmt/impl/inc/dndVnodes.h @@ -24,12 +24,18 @@ extern "C" { int32_t dndInitVnodes(SDnode *pDnode); void dndCleanupVnodes(SDnode *pDnode); void dndGetVnodeLoads(SDnode *pDnode, SVnodeLoads *pVloads); -void dndProcessVnodeMgmtMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); void dndProcessVnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); void dndProcessVnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); void dndProcessVnodeQueryMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); void dndProcessVnodeFetchMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); +int32_t dndProcessCreateVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg); +int32_t dndProcessAlterVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg); +int32_t dndProcessDropVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg); +int32_t dndProcessAuthVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg); +int32_t dndProcessSyncVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg); +int32_t dndProcessCompactVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg); + #ifdef __cplusplus } #endif diff --git a/source/dnode/mgmt/impl/src/dndDnode.c b/source/dnode/mgmt/impl/src/dndDnode.c index af86e59518..c67e55f048 100644 --- a/source/dnode/mgmt/impl/src/dndDnode.c +++ b/source/dnode/mgmt/impl/src/dndDnode.c @@ -17,7 +17,21 @@ #include "dndDnode.h" #include "dndTransport.h" #include "dndVnodes.h" -#include "tep.h" + +static int32_t dndInitMgmtWorker(SDnode *pDnode); +static void dndCleanupMgmtWorker(SDnode *pDnode); +static int32_t dndAllocMgmtQueue(SDnode *pDnode); +static void dndFreeMgmtQueue(SDnode *pDnode); +static void dndProcessMgmtQueue(SDnode *pDnode, SRpcMsg *pMsg); + +static int32_t dndReadDnodes(SDnode *pDnode); +static int32_t dndWriteDnodes(SDnode *pDnode); +static void *dnodeThreadRoutine(void *param); + +static void dndProcessConfigDnodeReq(SDnode *pDnode, SRpcMsg *pMsg); +static void dndProcessStatusRsp(SDnode *pDnode, SRpcMsg *pMsg); +static void dndProcessAuthRsp(SDnode *pDnode, SRpcMsg *pMsg); +static void dndProcessGrantRsp(SDnode *pDnode, SRpcMsg *pMsg); int32_t dndGetDnodeId(SDnode *pDnode) { SDnodeMgmt *pMgmt = &pDnode->dmgmt; @@ -27,10 +41,10 @@ int32_t dndGetDnodeId(SDnode *pDnode) { return dnodeId; } -int32_t dndGetClusterId(SDnode *pDnode) { +int64_t dndGetClusterId(SDnode *pDnode) { SDnodeMgmt *pMgmt = &pDnode->dmgmt; taosRLockLatch(&pMgmt->latch); - int32_t clusterId = pMgmt->clusterId; + int64_t clusterId = pMgmt->clusterId; taosRUnLockLatch(&pMgmt->latch); return clusterId; } @@ -68,7 +82,7 @@ void dndSendRedirectMsg(SDnode *pDnode, SRpcMsg *pMsg) { SEpSet epSet = {0}; dndGetMnodeEpSet(pDnode, &epSet); - dDebug("RPC %p, msg:%s is redirected, num:%d inUse:%d", pMsg->handle, TMSG_INFO(msgType), epSet.numOfEps, epSet.inUse); + dDebug("RPC %p, msg:%s is redirected, num:%d use:%d", pMsg->handle, TMSG_INFO(msgType), epSet.numOfEps, epSet.inUse); for (int32_t i = 0; i < epSet.numOfEps; ++i) { dDebug("mnode index:%d %s:%u", i, epSet.fqdn[i], epSet.port[i]); if (strcmp(epSet.fqdn[i], pDnode->opt.localFqdn) == 0 && epSet.port[i] == pDnode->opt.serverPort) { @@ -82,7 +96,7 @@ void dndSendRedirectMsg(SDnode *pDnode, SRpcMsg *pMsg) { } static void dndUpdateMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet) { - dInfo("mnode is changed, num:%d inUse:%d", pEpSet->numOfEps, pEpSet->inUse); + dInfo("mnode is changed, num:%d use:%d", pEpSet->numOfEps, pEpSet->inUse); SDnodeMgmt *pMgmt = &pDnode->dmgmt; taosWLockLatch(&pMgmt->latch); @@ -165,7 +179,7 @@ static int32_t dndReadDnodes(SDnode *pDnode) { int32_t code = TSDB_CODE_DND_DNODE_READ_FILE_ERROR; int32_t len = 0; - int32_t maxLen = 30000; + int32_t maxLen = 256 * 1024; char *content = calloc(1, maxLen + 1); cJSON *root = NULL; FILE *fp = NULL; @@ -198,11 +212,11 @@ static int32_t dndReadDnodes(SDnode *pDnode) { pMgmt->dnodeId = dnodeId->valueint; cJSON *clusterId = cJSON_GetObjectItem(root, "clusterId"); - if (!clusterId || clusterId->type != cJSON_Number) { + if (!clusterId || clusterId->type != cJSON_String) { dError("failed to read %s since clusterId not found", pMgmt->file); goto PRASE_DNODE_OVER; } - pMgmt->clusterId = clusterId->valueint; + pMgmt->clusterId = atoll(clusterId->valuestring); cJSON *dropped = cJSON_GetObjectItem(root, "dropped"); if (!dropped || dropped->type != cJSON_Number) { @@ -217,20 +231,20 @@ static int32_t dndReadDnodes(SDnode *pDnode) { goto PRASE_DNODE_OVER; } - int32_t numOfNodes = cJSON_GetArraySize(dnodes); - if (numOfNodes <= 0) { - dError("failed to read %s since numOfNodes:%d invalid", pMgmt->file, numOfNodes); + int32_t numOfDnodes = cJSON_GetArraySize(dnodes); + if (numOfDnodes <= 0) { + dError("failed to read %s since numOfDnodes:%d invalid", pMgmt->file, numOfDnodes); goto PRASE_DNODE_OVER; } - pMgmt->dnodeEps = calloc(1, numOfNodes * sizeof(SDnodeEp) + sizeof(SDnodeEps)); + pMgmt->dnodeEps = calloc(1, numOfDnodes * sizeof(SDnodeEp) + sizeof(SDnodeEps)); if (pMgmt->dnodeEps == NULL) { dError("failed to calloc dnodeEpList since %s", strerror(errno)); goto PRASE_DNODE_OVER; } - pMgmt->dnodeEps->num = numOfNodes; + pMgmt->dnodeEps->num = numOfDnodes; - for (int32_t i = 0; i < numOfNodes; ++i) { + for (int32_t i = 0; i < numOfDnodes; ++i) { cJSON *node = cJSON_GetArrayItem(dnodes, i); if (node == NULL) break; @@ -238,28 +252,28 @@ static int32_t dndReadDnodes(SDnode *pDnode) { cJSON *dnodeId = cJSON_GetObjectItem(node, "id"); if (!dnodeId || dnodeId->type != cJSON_Number) { - dError("failed to read %s, dnodeId not found", pMgmt->file); + dError("failed to read %s since dnodeId not found", pMgmt->file); goto PRASE_DNODE_OVER; } pDnodeEp->id = dnodeId->valueint; cJSON *dnodeFqdn = cJSON_GetObjectItem(node, "fqdn"); if (!dnodeFqdn || dnodeFqdn->type != cJSON_String || dnodeFqdn->valuestring == NULL) { - dError("failed to read %s, dnodeFqdn not found", pMgmt->file); + dError("failed to read %s since dnodeFqdn not found", pMgmt->file); goto PRASE_DNODE_OVER; } tstrncpy(pDnodeEp->fqdn, dnodeFqdn->valuestring, TSDB_FQDN_LEN); cJSON *dnodePort = cJSON_GetObjectItem(node, "port"); if (!dnodePort || dnodePort->type != cJSON_Number) { - dError("failed to read %s, dnodePort not found", pMgmt->file); + dError("failed to read %s since dnodePort not found", pMgmt->file); goto PRASE_DNODE_OVER; } pDnodeEp->port = dnodePort->valueint; cJSON *isMnode = cJSON_GetObjectItem(node, "isMnode"); if (!isMnode || isMnode->type != cJSON_Number) { - dError("failed to read %s, isMnode not found", pMgmt->file); + dError("failed to read %s since isMnode not found", pMgmt->file); goto PRASE_DNODE_OVER; } pDnodeEp->isMnode = isMnode->valueint; @@ -282,7 +296,7 @@ PRASE_DNODE_OVER: if (pMgmt->dnodeEps == NULL) { pMgmt->dnodeEps = calloc(1, sizeof(SDnodeEps) + sizeof(SDnodeEp)); pMgmt->dnodeEps->num = 1; - pMgmt->dnodeEps->eps[0].isMnode = 1; + pMgmt->dnodeEps->eps[0].isMnode = 1; taosGetFqdnPortFromEp(pDnode->opt.firstEp, pMgmt->dnodeEps->eps[0].fqdn, &pMgmt->dnodeEps->eps[0].port); } @@ -303,12 +317,12 @@ static int32_t dndWriteDnodes(SDnode *pDnode) { } int32_t len = 0; - int32_t maxLen = 30000; + int32_t maxLen = 256 * 1024; char *content = calloc(1, maxLen + 1); len += snprintf(content + len, maxLen - len, "{\n"); len += snprintf(content + len, maxLen - len, " \"dnodeId\": %d,\n", pMgmt->dnodeId); - len += snprintf(content + len, maxLen - len, " \"clusterId\": %d,\n", pMgmt->clusterId); + len += snprintf(content + len, maxLen - len, " \"clusterId\": \"%" PRId64 "\",\n", pMgmt->clusterId); len += snprintf(content + len, maxLen - len, " \"dropped\": %d,\n", pMgmt->dropped); len += snprintf(content + len, maxLen - len, " \"dnodes\": [{\n"); for (int32_t i = 0; i < pMgmt->dnodeEps->num; ++i) { @@ -331,6 +345,7 @@ static int32_t dndWriteDnodes(SDnode *pDnode) { free(content); terrno = 0; + pMgmt->updateTime = taosGetTimestampMs(); dInfo("successed to write %s", pMgmt->file); return 0; } @@ -348,12 +363,11 @@ void dndSendStatusMsg(SDnode *pDnode) { taosRLockLatch(&pMgmt->latch); pStatus->sver = htonl(pDnode->opt.sver); pStatus->dnodeId = htonl(pMgmt->dnodeId); - pStatus->clusterId = htonl(pMgmt->clusterId); + pStatus->clusterId = htobe64(pMgmt->clusterId); pStatus->rebootTime = htobe64(pMgmt->rebootTime); + pStatus->updateTime = htobe64(pMgmt->updateTime); pStatus->numOfCores = htons(pDnode->opt.numOfCores); - pStatus->numOfSupportMnodes = htons(pDnode->opt.numOfCores); - pStatus->numOfSupportVnodes = htons(pDnode->opt.numOfCores); - pStatus->numOfSupportQnodes = htons(pDnode->opt.numOfCores); + pStatus->numOfSupportVnodes = htons(pDnode->opt.numOfSupportVnodes); tstrncpy(pStatus->dnodeEp, pDnode->opt.localEp, TSDB_EP_LEN); pStatus->clusterCfg.statusInterval = htonl(pDnode->opt.statusInterval); @@ -379,7 +393,7 @@ void dndSendStatusMsg(SDnode *pDnode) { static void dndUpdateDnodeCfg(SDnode *pDnode, SDnodeCfg *pCfg) { SDnodeMgmt *pMgmt = &pDnode->dmgmt; if (pMgmt->dnodeId == 0 || pMgmt->dropped != pCfg->dropped) { - dInfo("set dnodeId:%d clusterId:%d dropped:%d", pCfg->dnodeId, pCfg->clusterId, pCfg->dropped); + dInfo("set dnodeId:%d clusterId:% " PRId64 " dropped:%d", pCfg->dnodeId, pCfg->clusterId, pCfg->dropped); taosWLockLatch(&pMgmt->latch); pMgmt->dnodeId = pCfg->dnodeId; @@ -410,13 +424,9 @@ static void dndUpdateDnodeEps(SDnode *pDnode, SDnodeEps *pDnodeEps) { taosWUnLockLatch(&pMgmt->latch); } -static void dndProcessStatusRsp(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { +static void dndProcessStatusRsp(SDnode *pDnode, SRpcMsg *pMsg) { SDnodeMgmt *pMgmt = &pDnode->dmgmt; - if (pEpSet && pEpSet->numOfEps > 0) { - dndUpdateMnodeEpSet(pDnode, pEpSet); - } - if (pMsg->code != TSDB_CODE_SUCCESS) { pMgmt->statusSent = 0; return; @@ -425,7 +435,7 @@ static void dndProcessStatusRsp(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { SStatusRsp *pRsp = pMsg->pCont; SDnodeCfg *pCfg = &pRsp->dnodeCfg; pCfg->dnodeId = htonl(pCfg->dnodeId); - pCfg->clusterId = htonl(pCfg->clusterId); + pCfg->clusterId = htobe64(pCfg->clusterId); dndUpdateDnodeCfg(pDnode, pCfg); if (pCfg->dropped) { @@ -444,9 +454,9 @@ static void dndProcessStatusRsp(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { pMgmt->statusSent = 0; } -static void dndProcessAuthRsp(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { assert(1); } +static void dndProcessAuthRsp(SDnode *pDnode, SRpcMsg *pMsg) { assert(1); } -static void dndProcessGrantRsp(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { assert(1); } +static void dndProcessGrantRsp(SDnode *pDnode, SRpcMsg *pMsg) { assert(1); } static void dndProcessConfigDnodeReq(SDnode *pDnode, SRpcMsg *pMsg) { dError("config msg is received, but not supported yet"); @@ -457,7 +467,7 @@ static void dndProcessConfigDnodeReq(SDnode *pDnode, SRpcMsg *pMsg) { rpcSendResponse(&rspMsg); } -static void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pMsg) { +void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pMsg) { dDebug("startup msg is received"); SStartupMsg *pStartup = rpcMallocCont(sizeof(SStartupMsg)); @@ -491,6 +501,7 @@ int32_t dndInitDnode(SDnode *pDnode) { pMgmt->rebootTime = taosGetTimestampMs(); pMgmt->dropped = 0; pMgmt->clusterId = 0; + taosInitRWLatch(&pMgmt->latch); char path[PATH_MAX]; snprintf(path, PATH_MAX, "%s/dnode.json", pDnode->dir.dnode); @@ -512,7 +523,15 @@ int32_t dndInitDnode(SDnode *pDnode) { return -1; } - taosInitRWLatch(&pMgmt->latch); + if (dndInitMgmtWorker(pDnode) != 0) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + if (dndAllocMgmtQueue(pDnode) != 0) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } pMgmt->threadId = taosCreateThread(dnodeThreadRoutine, pDnode); if (pMgmt->threadId == NULL) { @@ -528,6 +547,9 @@ int32_t dndInitDnode(SDnode *pDnode) { void dndCleanupDnode(SDnode *pDnode) { SDnodeMgmt *pMgmt = &pDnode->dmgmt; + dndCleanupMgmtWorker(pDnode); + dndFreeMgmtQueue(pDnode); + if (pMgmt->threadId != NULL) { taosDestoryThread(pMgmt->threadId); pMgmt->threadId = NULL; @@ -554,39 +576,123 @@ void dndCleanupDnode(SDnode *pDnode) { dInfo("dnode-dnode is cleaned up"); } -void dndProcessDnodeReq(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { +static int32_t dndInitMgmtWorker(SDnode *pDnode) { + SDnodeMgmt *pMgmt = &pDnode->dmgmt; + SWorkerPool *pPool = &pMgmt->mgmtPool; + pPool->name = "dnode-mgmt"; + pPool->min = 1; + pPool->max = 1; + if (tWorkerInit(pPool) != 0) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + dDebug("dnode mgmt worker is initialized"); + return 0; +} + +static void dndCleanupMgmtWorker(SDnode *pDnode) { + SDnodeMgmt *pMgmt = &pDnode->dmgmt; + tWorkerCleanup(&pMgmt->mgmtPool); + dDebug("dnode mgmt worker is closed"); +} + +static int32_t dndAllocMgmtQueue(SDnode *pDnode) { + SDnodeMgmt *pMgmt = &pDnode->dmgmt; + pMgmt->pMgmtQ = tWorkerAllocQueue(&pMgmt->mgmtPool, pDnode, (FProcessItem)dndProcessMgmtQueue); + if (pMgmt->pMgmtQ == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + return 0; +} + +static void dndFreeMgmtQueue(SDnode *pDnode) { + SDnodeMgmt *pMgmt = &pDnode->dmgmt; + tWorkerFreeQueue(&pMgmt->mgmtPool, pMgmt->pMgmtQ); + pMgmt->pMgmtQ = NULL; +} + +void dndProcessMgmtMsg(SDnode *pDnode, SRpcMsg *pRpcMsg, SEpSet *pEpSet) { + SDnodeMgmt *pMgmt = &pDnode->dmgmt; + + if (pEpSet && pEpSet->numOfEps > 0 && pRpcMsg->msgType == TDMT_MND_STATUS_RSP) { + dndUpdateMnodeEpSet(pDnode, pEpSet); + } + + SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg)); + if (pMsg != NULL) *pMsg = *pRpcMsg; + + if (pMsg == NULL || taosWriteQitem(pMgmt->pMgmtQ, pMsg) != 0) { + if (pRpcMsg->msgType & 1u) { + SRpcMsg rsp = {.handle = pRpcMsg->handle, .code = TSDB_CODE_OUT_OF_MEMORY}; + rpcSendResponse(&rsp); + } + rpcFreeCont(pRpcMsg->pCont); + taosFreeQitem(pMsg); + } +} + +static void dndProcessMgmtQueue(SDnode *pDnode, SRpcMsg *pMsg) { + int32_t code = 0; + switch (pMsg->msgType) { + case TDMT_DND_CREATE_MNODE: + code = dndProcessCreateMnodeReq(pDnode, pMsg); + break; + case TDMT_DND_ALTER_MNODE: + code = dndProcessAlterMnodeReq(pDnode, pMsg); + break; + case TDMT_DND_DROP_MNODE: + code = dndProcessDropMnodeReq(pDnode, pMsg); + break; case TDMT_DND_NETWORK_TEST: dndProcessStartupReq(pDnode, pMsg); break; case TDMT_DND_CONFIG_DNODE: dndProcessConfigDnodeReq(pDnode, pMsg); break; - default: - dError("RPC %p, dnode req:%s not processed", pMsg->handle, TMSG_INFO(pMsg->msgType)); - SRpcMsg rspMsg = {.handle = pMsg->handle, .code = TSDB_CODE_MSG_NOT_PROCESSED}; - rpcSendResponse(&rspMsg); - } - - rpcFreeCont(pMsg->pCont); - pMsg->pCont = NULL; -} - -void dndProcessDnodeRsp(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { - switch (pMsg->msgType) { case TDMT_MND_STATUS_RSP: - dndProcessStatusRsp(pDnode, pMsg, pEpSet); + dndProcessStatusRsp(pDnode, pMsg); break; case TDMT_MND_AUTH_RSP: - dndProcessAuthRsp(pDnode, pMsg, pEpSet); + dndProcessAuthRsp(pDnode, pMsg); break; case TDMT_MND_GRANT_RSP: - dndProcessGrantRsp(pDnode, pMsg, pEpSet); + dndProcessGrantRsp(pDnode, pMsg); + break; + case TDMT_DND_CREATE_VNODE: + code = dndProcessCreateVnodeReq(pDnode, pMsg); + break; + case TDMT_DND_ALTER_VNODE: + code = dndProcessAlterVnodeReq(pDnode, pMsg); + break; + case TDMT_DND_DROP_VNODE: + code = dndProcessDropVnodeReq(pDnode, pMsg); + break; + case TDMT_DND_AUTH_VNODE: + code = dndProcessAuthVnodeReq(pDnode, pMsg); + break; + case TDMT_DND_SYNC_VNODE: + code = dndProcessSyncVnodeReq(pDnode, pMsg); + break; + case TDMT_DND_COMPACT_VNODE: + code = dndProcessCompactVnodeReq(pDnode, pMsg); break; default: - dError("RPC %p, dnode rsp:%s not processed", pMsg->handle, TMSG_INFO(pMsg->msgType)); + terrno = TSDB_CODE_MSG_NOT_PROCESSED; + code = -1; + dError("RPC %p, dnode req:%s not processed", pMsg->handle, TMSG_INFO(pMsg->msgType)); + break; + } + + if (pMsg->msgType & 1u) { + if (code != 0) code = terrno; + SRpcMsg rsp = {.code = code, .handle = pMsg->handle, .ahandle = pMsg->ahandle}; + rpcSendResponse(&rsp); } rpcFreeCont(pMsg->pCont); pMsg->pCont = NULL; + taosFreeQitem(pMsg); } diff --git a/source/dnode/mgmt/impl/src/dndMnode.c b/source/dnode/mgmt/impl/src/dndMnode.c index 8fbb473af1..50da49d325 100644 --- a/source/dnode/mgmt/impl/src/dndMnode.c +++ b/source/dnode/mgmt/impl/src/dndMnode.c @@ -21,7 +21,6 @@ static int32_t dndInitMnodeReadWorker(SDnode *pDnode); static int32_t dndInitMnodeWriteWorker(SDnode *pDnode); static int32_t dndInitMnodeSyncWorker(SDnode *pDnode); -static int32_t dndInitMnodeMgmtWorker(SDnode *pDnode); static void dndCleanupMnodeReadWorker(SDnode *pDnode); static void dndCleanupMnodeWriteWorker(SDnode *pDnode); static void dndCleanupMnodeSyncWorker(SDnode *pDnode); @@ -29,7 +28,6 @@ static void dndCleanupMnodeMgmtWorker(SDnode *pDnode); static int32_t dndAllocMnodeReadQueue(SDnode *pDnode); static int32_t dndAllocMnodeWriteQueue(SDnode *pDnode); static int32_t dndAllocMnodeSyncQueue(SDnode *pDnode); -static int32_t dndAllocMnodeMgmtQueue(SDnode *pDnode); static void dndFreeMnodeReadQueue(SDnode *pDnode); static void dndFreeMnodeWriteQueue(SDnode *pDnode); static void dndFreeMnodeSyncQueue(SDnode *pDnode); @@ -38,12 +36,10 @@ static void dndFreeMnodeMgmtQueue(SDnode *pDnode); static void dndProcessMnodeReadQueue(SDnode *pDnode, SMnodeMsg *pMsg); static void dndProcessMnodeWriteQueue(SDnode *pDnode, SMnodeMsg *pMsg); static void dndProcessMnodeSyncQueue(SDnode *pDnode, SMnodeMsg *pMsg); -static void dndProcessMnodeMgmtQueue(SDnode *pDnode, SRpcMsg *pMsg); static int32_t dndWriteMnodeMsgToQueue(SMnode *pMnode, taos_queue pQueue, SRpcMsg *pRpcMsg); void dndProcessMnodeReadMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); void dndProcessMnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); void dndProcessMnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); -void dndProcessMnodeMgmtMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); static int32_t dndStartMnodeWorker(SDnode *pDnode); static void dndStopMnodeWorker(SDnode *pDnode); @@ -58,10 +54,6 @@ static int32_t dndOpenMnode(SDnode *pDnode, SMnodeOpt *pOption); static int32_t dndAlterMnode(SDnode *pDnode, SMnodeOpt *pOption); static int32_t dndDropMnode(SDnode *pDnode); -static int32_t dndProcessCreateMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg); -static int32_t dndProcessAlterMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg); -static int32_t dndProcessDropMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg); - static SMnode *dndAcquireMnode(SDnode *pDnode) { SMnodeMgmt *pMgmt = &pDnode->mmgmt; SMnode *pMnode = NULL; @@ -488,7 +480,7 @@ static SCreateMnodeInMsg *dndParseCreateMnodeMsg(SRpcMsg *pRpcMsg) { return pMsg; } -static int32_t dndProcessCreateMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) { +int32_t dndProcessCreateMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) { SCreateMnodeInMsg *pMsg = dndParseCreateMnodeMsg(pRpcMsg); if (pMsg->dnodeId != dndGetDnodeId(pDnode)) { @@ -504,7 +496,7 @@ static int32_t dndProcessCreateMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) { } } -static int32_t dndProcessAlterMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) { +int32_t dndProcessAlterMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) { SAlterMnodeInMsg *pMsg = dndParseCreateMnodeMsg(pRpcMsg); if (pMsg->dnodeId != dndGetDnodeId(pDnode)) { @@ -524,7 +516,7 @@ static int32_t dndProcessAlterMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) { return dndWriteMnodeFile(pDnode); } -static int32_t dndProcessDropMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) { +int32_t dndProcessDropMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) { SDropMnodeInMsg *pMsg = pRpcMsg->pCont; pMsg->dnodeId = htonl(pMsg->dnodeId); @@ -536,33 +528,6 @@ static int32_t dndProcessDropMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) { } } -static void dndProcessMnodeMgmtQueue(SDnode *pDnode, SRpcMsg *pMsg) { - int32_t code = 0; - - switch (pMsg->msgType) { - case TDMT_DND_CREATE_MNODE: - code = dndProcessCreateMnodeReq(pDnode, pMsg); - break; - case TDMT_DND_ALTER_MNODE: - code = dndProcessAlterMnodeReq(pDnode, pMsg); - break; - case TDMT_DND_DROP_MNODE: - code = dndProcessDropMnodeReq(pDnode, pMsg); - break; - default: - terrno = TSDB_CODE_MSG_NOT_PROCESSED; - code = -1; - break; - } - - if (pMsg->msgType & 1u) { - if (code != 0) code = terrno; - SRpcMsg rsp = {.code = code, .handle = pMsg->handle}; - rpcSendResponse(&rsp); - } - rpcFreeCont(pMsg->pCont); - taosFreeQitem(pMsg); -} static void dndProcessMnodeReadQueue(SDnode *pDnode, SMnodeMsg *pMsg) { SMnodeMgmt *pMgmt = &pDnode->mmgmt; @@ -622,25 +587,6 @@ static int32_t dndWriteMnodeMsgToQueue(SMnode *pMnode, taos_queue pQueue, SRpcMs return 0; } -void dndProcessMnodeMgmtMsg(SDnode *pDnode, SRpcMsg *pRpcMsg, SEpSet *pEpSet) { - SMnodeMgmt *pMgmt = &pDnode->mmgmt; - SMnode *pMnode = dndAcquireMnode(pDnode); - - SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg)); - if (pMsg != NULL) *pMsg = *pRpcMsg; - - if (pMsg == NULL || taosWriteQitem(pMgmt->pMgmtQ, pMsg) != 0) { - if (pRpcMsg->msgType & 1u) { - SRpcMsg rsp = {.handle = pRpcMsg->handle, .code = TSDB_CODE_OUT_OF_MEMORY}; - rpcSendResponse(&rsp); - } - rpcFreeCont(pRpcMsg->pCont); - taosFreeQitem(pMsg); - } - - dndReleaseMnode(pDnode, pMnode); -} - void dndProcessMnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { SMnodeMgmt *pMgmt = &pDnode->mmgmt; SMnode *pMnode = dndAcquireMnode(pDnode); @@ -686,42 +632,6 @@ void dndProcessMnodeReadMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { dndReleaseMnode(pDnode, pMnode); } -static int32_t dndAllocMnodeMgmtQueue(SDnode *pDnode) { - SMnodeMgmt *pMgmt = &pDnode->mmgmt; - pMgmt->pMgmtQ = tWorkerAllocQueue(&pMgmt->mgmtPool, pDnode, (FProcessItem)dndProcessMnodeMgmtQueue); - if (pMgmt->pMgmtQ == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - return 0; -} - -static void dndFreeMnodeMgmtQueue(SDnode *pDnode) { - SMnodeMgmt *pMgmt = &pDnode->mmgmt; - tWorkerFreeQueue(&pMgmt->mgmtPool, pMgmt->pMgmtQ); - pMgmt->pMgmtQ = NULL; -} - -static int32_t dndInitMnodeMgmtWorker(SDnode *pDnode) { - SMnodeMgmt *pMgmt = &pDnode->mmgmt; - SWorkerPool *pPool = &pMgmt->mgmtPool; - pPool->name = "mnode-mgmt"; - pPool->min = 1; - pPool->max = 1; - if (tWorkerInit(pPool) != 0) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - dDebug("mnode mgmt worker is initialized"); - return 0; -} - -static void dndCleanupMnodeMgmtWorker(SDnode *pDnode) { - SMnodeMgmt *pMgmt = &pDnode->mmgmt; - tWorkerCleanup(&pMgmt->mgmtPool); - dDebug("mnode mgmt worker is closed"); -} static int32_t dndAllocMnodeReadQueue(SDnode *pDnode) { SMnodeMgmt *pMgmt = &pDnode->mmgmt; @@ -842,16 +752,6 @@ int32_t dndInitMnode(SDnode *pDnode) { SMnodeMgmt *pMgmt = &pDnode->mmgmt; taosInitRWLatch(&pMgmt->latch); - if (dndInitMnodeMgmtWorker(pDnode) != 0) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - if (dndAllocMnodeMgmtQueue(pDnode) != 0) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - char path[PATH_MAX]; snprintf(path, PATH_MAX, "%s/mnode.json", pDnode->dir.dnode); pMgmt->file = strdup(path); @@ -894,8 +794,6 @@ void dndCleanupMnode(SDnode *pDnode) { dInfo("dnode-mnode start to clean up"); if (pMgmt->pMnode) dndStopMnodeWorker(pDnode); - dndCleanupMnodeMgmtWorker(pDnode); - dndFreeMnodeMgmtQueue(pDnode); tfree(pMgmt->file); mndClose(pMgmt->pMnode); dInfo("dnode-mnode is cleaned up"); diff --git a/source/dnode/mgmt/impl/src/dndTransport.c b/source/dnode/mgmt/impl/src/dndTransport.c index b8ebe3f884..8db09ea4cd 100644 --- a/source/dnode/mgmt/impl/src/dndTransport.c +++ b/source/dnode/mgmt/impl/src/dndTransport.c @@ -30,27 +30,30 @@ #define INTERNAL_SECRET "_secret" static void dndInitMsgFp(STransMgmt *pMgmt) { - // msg from client to dnode - pMgmt->msgFp[TMSG_INDEX(TDMT_VND_SUBMIT)] = dndProcessVnodeWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_VND_QUERY)] = dndProcessVnodeQueryMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_VND_FETCH)] = dndProcessVnodeFetchMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_TABLE)] = dndProcessVnodeWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_TABLE)] = dndProcessVnodeWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_VND_ALTER_TABLE)] = dndProcessVnodeWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_VND_UPDATE_TAG_VAL)] = dndProcessVnodeWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_VND_TABLE_META)] = dndProcessMnodeReadMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_VND_TABLES_META)] = dndProcessVnodeQueryMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_QUERY)] = dndProcessVnodeQueryMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_CONSUME)] = dndProcessVnodeQueryMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_CONNECT)] = dndProcessVnodeWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_DISCONNECT)] = dndProcessVnodeWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_SET_CUR)] = dndProcessVnodeWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_VND_RES_READY)] = dndProcessVnodeFetchMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_VND_TASKS_STATUS)] = dndProcessVnodeFetchMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_VND_CANCEL_TASK)] = dndProcessVnodeFetchMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_VND_DROP_TASK)] = dndProcessVnodeFetchMsg; + // Requests handled by DNODE + pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CREATE_MNODE)] = dndProcessMgmtMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CREATE_MNODE_RSP)] = dndProcessMnodeWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_DND_ALTER_MNODE)] = dndProcessMgmtMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_DND_ALTER_MNODE_RSP)] = dndProcessMnodeWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_DND_DROP_MNODE)] = dndProcessMgmtMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_DND_DROP_MNODE_RSP)] = dndProcessMnodeWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CREATE_VNODE)] = dndProcessMgmtMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CREATE_VNODE_RSP)] = dndProcessMnodeWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_DND_ALTER_VNODE)] = dndProcessMgmtMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_DND_ALTER_VNODE_RSP)] = dndProcessMnodeWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_DND_DROP_VNODE)] = dndProcessMgmtMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_DND_DROP_VNODE_RSP)] = dndProcessMnodeWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_DND_SYNC_VNODE)] = dndProcessMgmtMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_DND_SYNC_VNODE_RSP)] = dndProcessMnodeWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_DND_AUTH_VNODE)] = dndProcessMgmtMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_DND_AUTH_VNODE_RSP)] = dndProcessMnodeWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_DND_COMPACT_VNODE)] = dndProcessMgmtMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_DND_COMPACT_VNODE_RSP)] = dndProcessMnodeWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CONFIG_DNODE)] = dndProcessMgmtMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CONFIG_DNODE_RSP)] = dndProcessMnodeWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_DND_NETWORK_TEST)] = dndProcessMgmtMsg; - // msg from client to mnode + // Requests handled by MNODE pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CONNECT)] = dndProcessMnodeReadMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_ACCT)] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_ALTER_ACCT)] = dndProcessMnodeWriteMsg; @@ -75,53 +78,51 @@ static void dndInitMsgFp(STransMgmt *pMgmt) { pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_STB)] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_ALTER_STB)] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_STB)] = dndProcessMnodeWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_STB_META)] = dndProcessMnodeReadMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_VGROUP_LIST)] = dndProcessMnodeReadMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_KILL_QUERY)] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_KILL_CONN)] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_HEARTBEAT)] = dndProcessMnodeReadMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SHOW)] = dndProcessMnodeReadMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SHOW_RETRIEVE)] = dndProcessMnodeReadMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_STATUS)] = dndProcessMnodeWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_STATUS_RSP)] = dndProcessMgmtMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_TRANS)] = dndProcessMnodeWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_TRANS_RSP)] = dndProcessMnodeWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_GRANT)] = dndProcessMnodeWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_GRANT_RSP)] = dndProcessMgmtMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_AUTH)] = dndProcessMnodeReadMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_AUTH_RSP)] = dndProcessMgmtMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_TOPIC)] = dndProcessMnodeWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_ALTER_TOPIC)] = dndProcessMnodeWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_TOPIC)] = dndProcessMnodeWriteMsg; - // message from client to dnode - pMgmt->msgFp[TMSG_INDEX(TDMT_DND_NETWORK_TEST)] = dndProcessDnodeReq; - - // message from mnode to vnode + // Requests handled by VNODE + pMgmt->msgFp[TMSG_INDEX(TDMT_VND_SUBMIT)] = dndProcessVnodeWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_VND_QUERY)] = dndProcessVnodeQueryMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_VND_FETCH)] = dndProcessVnodeFetchMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_VND_ALTER_TABLE)] = dndProcessVnodeWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_VND_UPDATE_TAG_VAL)] = dndProcessVnodeWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_VND_TABLE_META)] = dndProcessVnodeQueryMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_VND_TABLES_META)] = dndProcessVnodeQueryMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_CONSUME)] = dndProcessVnodeQueryMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_QUERY)] = dndProcessVnodeQueryMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_CONNECT)] = dndProcessVnodeWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_DISCONNECT)] = dndProcessVnodeWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_SET_CUR)] = dndProcessVnodeWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_VND_RES_READY)] = dndProcessVnodeFetchMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_VND_TASKS_STATUS)] = dndProcessVnodeFetchMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_VND_CANCEL_TASK)] = dndProcessVnodeFetchMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_VND_DROP_TASK)] = dndProcessVnodeFetchMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_VND_CREATE_STB)] = dndProcessVnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_VND_CREATE_STB_RSP)] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_VND_ALTER_STB)] = dndProcessVnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_VND_ALTER_STB_RSP)] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_VND_DROP_STB)] = dndProcessVnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_VND_DROP_STB_RSP)] = dndProcessMnodeWriteMsg; - - // message from mnode to dnode - pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CREATE_VNODE)] = dndProcessVnodeMgmtMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CREATE_VNODE_RSP)] = dndProcessMnodeWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_DND_ALTER_VNODE)] = dndProcessVnodeMgmtMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_DND_ALTER_VNODE_RSP)] = dndProcessMnodeWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_DND_DROP_VNODE)] = dndProcessVnodeMgmtMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_DND_DROP_VNODE_RSP)] = dndProcessMnodeWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_DND_SYNC_VNODE)] = dndProcessVnodeMgmtMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_DND_SYNC_VNODE_RSP)] = dndProcessMnodeWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_DND_AUTH_VNODE)] = dndProcessVnodeMgmtMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_DND_AUTH_VNODE_RSP)] = dndProcessMnodeWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_DND_COMPACT_VNODE)] = dndProcessVnodeMgmtMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_DND_COMPACT_VNODE_RSP)] = dndProcessMnodeWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CREATE_MNODE)] = dndProcessMnodeMgmtMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CREATE_MNODE_RSP)] = dndProcessMnodeWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_DND_ALTER_MNODE)] = dndProcessMnodeMgmtMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_DND_ALTER_MNODE_RSP)] = dndProcessMnodeWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_DND_DROP_MNODE)] = dndProcessMnodeMgmtMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_DND_DROP_MNODE_RSP)] = dndProcessMnodeWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CONFIG_DNODE)] = dndProcessDnodeReq; - pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CONFIG_DNODE_RSP)] = dndProcessMnodeWriteMsg; - - // message from dnode to mnode - pMgmt->msgFp[TMSG_INDEX(TDMT_MND_GRANT)] = dndProcessMnodeWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_MND_GRANT_RSP)] = dndProcessDnodeRsp; - pMgmt->msgFp[TMSG_INDEX(TDMT_MND_STATUS)] = dndProcessMnodeWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_MND_STATUS_RSP)] = dndProcessDnodeRsp; - pMgmt->msgFp[TMSG_INDEX(TDMT_MND_AUTH)] = dndProcessMnodeReadMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_MND_AUTH_RSP)] = dndProcessDnodeRsp; + pMgmt->msgFp[TMSG_INDEX(TDMT_VND_CREATE_TABLE)] = dndProcessVnodeWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_VND_ALTER_TABLE)] = dndProcessVnodeWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_VND_DROP_TABLE)] = dndProcessVnodeWriteMsg; } static void dndProcessResponse(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { @@ -189,7 +190,7 @@ static void dndProcessRequest(void *param, SRpcMsg *pMsg, SEpSet *pEpSet) { tmsg_t msgType = pMsg->msgType; if (msgType == TDMT_DND_NETWORK_TEST) { dTrace("RPC %p, network test req, app:%p will be processed, code:0x%x", pMsg->handle, pMsg->ahandle, pMsg->code); - dndProcessDnodeReq(pDnode, pMsg, pEpSet); + dndProcessStartupReq(pDnode, pMsg); return; } @@ -208,7 +209,8 @@ static void dndProcessRequest(void *param, SRpcMsg *pMsg, SEpSet *pEpSet) { } if (pMsg->pCont == NULL) { - dTrace("RPC %p, req:%s app:%p not processed since content is null", pMsg->handle, TMSG_INFO(msgType), pMsg->ahandle); + dTrace("RPC %p, req:%s app:%p not processed since content is null", pMsg->handle, TMSG_INFO(msgType), + pMsg->ahandle); SRpcMsg rspMsg = {.handle = pMsg->handle, .code = TSDB_CODE_DND_INVALID_MSG_LEN}; rpcSendResponse(&rspMsg); return; diff --git a/source/dnode/mgmt/impl/src/dndVnodes.c b/source/dnode/mgmt/impl/src/dndVnodes.c index 351fc20784..1e03bba10b 100644 --- a/source/dnode/mgmt/impl/src/dndVnodes.c +++ b/source/dnode/mgmt/impl/src/dndVnodes.c @@ -56,11 +56,9 @@ typedef struct { static int32_t dndInitVnodeReadWorker(SDnode *pDnode); static int32_t dndInitVnodeWriteWorker(SDnode *pDnode); static int32_t dndInitVnodeSyncWorker(SDnode *pDnode); -static int32_t dndInitVnodeMgmtWorker(SDnode *pDnode); static void dndCleanupVnodeReadWorker(SDnode *pDnode); static void dndCleanupVnodeWriteWorker(SDnode *pDnode); static void dndCleanupVnodeSyncWorker(SDnode *pDnode); -static void dndCleanupVnodeMgmtWorker(SDnode *pDnode); static int32_t dndAllocVnodeQueryQueue(SDnode *pDnode, SVnodeObj *pVnode); static int32_t dndAllocVnodeFetchQueue(SDnode *pDnode, SVnodeObj *pVnode); static int32_t dndAllocVnodeWriteQueue(SDnode *pDnode, SVnodeObj *pVnode); @@ -77,12 +75,10 @@ static void dndProcessVnodeFetchQueue(SVnodeObj *pVnode, SRpcMsg *pMsg); static void dndProcessVnodeWriteQueue(SVnodeObj *pVnode, taos_qall qall, int32_t numOfMsgs); static void dndProcessVnodeApplyQueue(SVnodeObj *pVnode, taos_qall qall, int32_t numOfMsgs); static void dndProcessVnodeSyncQueue(SVnodeObj *pVnode, taos_qall qall, int32_t numOfMsgs); -static void dndProcessVnodeMgmtQueue(SDnode *pDnode, SRpcMsg *pMsg); void dndProcessVnodeQueryMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); void dndProcessVnodeFetchMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); void dndProcessVnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); void dndProcessVnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); -void dndProcessVnodeMgmtMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); static int32_t dndPutMsgIntoVnodeApplyQueue(SDnode *pDnode, int32_t vgId, SRpcMsg *pMsg); static SVnodeObj *dndAcquireVnode(SDnode *pDnode, int32_t vgId); @@ -96,13 +92,6 @@ static int32_t dndWriteVnodesToFile(SDnode *pDnode); static int32_t dndOpenVnodes(SDnode *pDnode); static void dndCloseVnodes(SDnode *pDnode); -static int32_t dndProcessCreateVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg); -static int32_t dndProcessAlterVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg); -static int32_t dndProcessDropVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg); -static int32_t dndProcessAuthVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg); -static int32_t dndProcessSyncVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg); -static int32_t dndProcessCompactVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg); - static SVnodeObj *dndAcquireVnode(SDnode *pDnode, int32_t vgId) { SVnodesMgmt *pMgmt = &pDnode->vmgmt; SVnodeObj *pVnode = NULL; @@ -600,7 +589,7 @@ static SAuthVnodeMsg *vnodeParseAuthVnodeReq(SRpcMsg *rpcMsg) { return pAuth; } -static int32_t dndProcessCreateVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) { +int32_t dndProcessCreateVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) { SCreateVnodeMsg *pCreate = dndParseCreateVnodeReq(rpcMsg); dDebug("vgId:%d, create vnode req is received", pCreate->vgId); @@ -641,7 +630,7 @@ static int32_t dndProcessCreateVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) { return 0; } -static int32_t dndProcessAlterVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) { +int32_t dndProcessAlterVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) { SAlterVnodeMsg *pAlter = (SAlterVnodeMsg *)dndParseCreateVnodeReq(rpcMsg); dDebug("vgId:%d, alter vnode req is received", pAlter->vgId); @@ -680,7 +669,7 @@ static int32_t dndProcessAlterVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) { return code; } -static int32_t dndProcessDropVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) { +int32_t dndProcessDropVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) { SDropVnodeMsg *pDrop = vnodeParseDropVnodeReq(rpcMsg); int32_t vgId = pDrop->vgId; @@ -707,7 +696,7 @@ static int32_t dndProcessDropVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) { return 0; } -static int32_t dndProcessAuthVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) { +int32_t dndProcessAuthVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) { SAuthVnodeMsg *pAuth = (SAuthVnodeMsg *)vnodeParseAuthVnodeReq(rpcMsg); int32_t code = 0; @@ -725,7 +714,7 @@ static int32_t dndProcessAuthVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) { return 0; } -static int32_t dndProcessSyncVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) { +int32_t dndProcessSyncVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) { SAuthVnodeMsg *pAuth = (SAuthVnodeMsg *)vnodeParseAuthVnodeReq(rpcMsg); int32_t vgId = pAuth->vgId; @@ -747,7 +736,7 @@ static int32_t dndProcessSyncVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) { return 0; } -static int32_t dndProcessCompactVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) { +int32_t dndProcessCompactVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) { SCompactVnodeMsg *pCompact = (SCompactVnodeMsg *)vnodeParseDropVnodeReq(rpcMsg); int32_t vgId = pCompact->vgId; @@ -769,39 +758,6 @@ static int32_t dndProcessCompactVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) { return 0; } -static void dndProcessVnodeMgmtQueue(SDnode *pDnode, SRpcMsg *pMsg) { - int32_t code = 0; - - switch (pMsg->msgType) { - case TDMT_DND_CREATE_VNODE: - code = dndProcessCreateVnodeReq(pDnode, pMsg); - break; - case TDMT_DND_ALTER_VNODE: - code = dndProcessAlterVnodeReq(pDnode, pMsg); - break; - case TDMT_DND_DROP_VNODE: - code = dndProcessDropVnodeReq(pDnode, pMsg); - break; - case TDMT_DND_AUTH_VNODE: - code = dndProcessAuthVnodeReq(pDnode, pMsg); - break; - case TDMT_DND_SYNC_VNODE: - code = dndProcessSyncVnodeReq(pDnode, pMsg); - break; - case TDMT_DND_COMPACT_VNODE: - code = dndProcessCompactVnodeReq(pDnode, pMsg); - break; - default: - code = TSDB_CODE_MSG_NOT_PROCESSED; - break; - } - - SRpcMsg rsp = {.code = code, .handle = pMsg->handle, .ahandle = pMsg->ahandle}; - rpcSendResponse(&rsp); - rpcFreeCont(pMsg->pCont); - taosFreeQitem(pMsg); -} - static void dndProcessVnodeQueryQueue(SVnodeObj *pVnode, SRpcMsg *pMsg) { SRpcMsg *pRsp = NULL; vnodeProcessQueryReq(pVnode->pImpl, pMsg, &pRsp); @@ -909,11 +865,6 @@ static SVnodeObj *dndAcquireVnodeFromMsg(SDnode *pDnode, SRpcMsg *pMsg) { return pVnode; } -void dndProcessVnodeMgmtMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { - SVnodesMgmt *pMgmt = &pDnode->vmgmt; - dndWriteRpcMsgToVnodeQueue(pMgmt->pMgmtQ, pMsg); -} - void dndProcessVnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { SVnodeObj *pVnode = dndAcquireVnodeFromMsg(pDnode, pMsg); if (pVnode != NULL) { @@ -957,35 +908,6 @@ static int32_t dndPutMsgIntoVnodeApplyQueue(SDnode *pDnode, int32_t vgId, SRpcMs return code; } -static int32_t dndInitVnodeMgmtWorker(SDnode *pDnode) { - SVnodesMgmt *pMgmt = &pDnode->vmgmt; - SWorkerPool *pPool = &pMgmt->mgmtPool; - pPool->name = "vnode-mgmt"; - pPool->min = 1; - pPool->max = 1; - if (tWorkerInit(pPool) != 0) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - pMgmt->pMgmtQ = tWorkerAllocQueue(pPool, pDnode, (FProcessItem)dndProcessVnodeMgmtQueue); - if (pMgmt->pMgmtQ == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - dDebug("vnode mgmt worker is initialized"); - return 0; -} - -static void dndCleanupVnodeMgmtWorker(SDnode *pDnode) { - SVnodesMgmt *pMgmt = &pDnode->vmgmt; - tWorkerFreeQueue(&pMgmt->mgmtPool, pMgmt->pMgmtQ); - tWorkerCleanup(&pMgmt->mgmtPool); - pMgmt->pMgmtQ = NULL; - dDebug("vnode mgmt worker is closed"); -} - static int32_t dndAllocVnodeQueryQueue(SDnode *pDnode, SVnodeObj *pVnode) { SVnodesMgmt *pMgmt = &pDnode->vmgmt; pVnode->pQueryQ = tWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FProcessItem)dndProcessVnodeQueryQueue); @@ -1167,11 +1089,6 @@ int32_t dndInitVnodes(SDnode *pDnode) { return -1; } - if (dndInitVnodeMgmtWorker(pDnode) != 0) { - dError("failed to init vnodes mgmt worker since %s", terrstr()); - return -1; - } - if (dndOpenVnodes(pDnode) != 0) { dError("failed to open vnodes since %s", terrstr()); return -1; @@ -1187,7 +1104,6 @@ void dndCleanupVnodes(SDnode *pDnode) { dndCleanupVnodeReadWorker(pDnode); dndCleanupVnodeWriteWorker(pDnode); dndCleanupVnodeSyncWorker(pDnode); - dndCleanupVnodeMgmtWorker(pDnode); dInfo("dnode-vnodes is cleaned up"); } diff --git a/source/dnode/mgmt/impl/test/cluster/cluster.cpp b/source/dnode/mgmt/impl/test/cluster/cluster.cpp index 7230c3eb74..7734826789 100644 --- a/source/dnode/mgmt/impl/test/cluster/cluster.cpp +++ b/source/dnode/mgmt/impl/test/cluster/cluster.cpp @@ -28,14 +28,14 @@ Testbase DndTestCluster::test; TEST_F(DndTestCluster, 01_ShowCluster) { test.SendShowMetaMsg(TSDB_MGMT_TABLE_CLUSTER, ""); CHECK_META( "show cluster", 3); - CHECK_SCHEMA(0, TSDB_DATA_TYPE_INT, 4, "id"); + CHECK_SCHEMA(0, TSDB_DATA_TYPE_BIGINT, 8, "id"); CHECK_SCHEMA(1, TSDB_DATA_TYPE_BINARY, TSDB_CLUSTER_ID_LEN + VARSTR_HEADER_SIZE, "name"); CHECK_SCHEMA(2, TSDB_DATA_TYPE_TIMESTAMP, 8, "create_time"); test.SendShowRetrieveMsg(); EXPECT_EQ(test.GetShowRows(), 1); - IgnoreInt32(); + IgnoreInt64(); IgnoreBinary(TSDB_CLUSTER_ID_LEN); CheckTimestamp(); } \ No newline at end of file diff --git a/source/dnode/mgmt/impl/test/profile/profile.cpp b/source/dnode/mgmt/impl/test/profile/profile.cpp index 081f9e7ef5..87e6bfde74 100644 --- a/source/dnode/mgmt/impl/test/profile/profile.cpp +++ b/source/dnode/mgmt/impl/test/profile/profile.cpp @@ -42,7 +42,7 @@ TEST_F(DndTestProfile, 01_ConnectMsg) { SConnectRsp* pRsp = (SConnectRsp*)pMsg->pCont; ASSERT_NE(pRsp, nullptr); pRsp->acctId = htonl(pRsp->acctId); - pRsp->clusterId = htonl(pRsp->clusterId); + pRsp->clusterId = htobe64(pRsp->clusterId); pRsp->connId = htonl(pRsp->connId); pRsp->epSet.port[0] = htons(pRsp->epSet.port[0]); @@ -174,7 +174,7 @@ TEST_F(DndTestProfile, 05_KillConnMsg) { SConnectRsp* pRsp = (SConnectRsp*)pMsg->pCont; ASSERT_NE(pRsp, nullptr); pRsp->acctId = htonl(pRsp->acctId); - pRsp->clusterId = htonl(pRsp->clusterId); + pRsp->clusterId = htobe64(pRsp->clusterId); pRsp->connId = htonl(pRsp->connId); pRsp->epSet.port[0] = htons(pRsp->epSet.port[0]); diff --git a/source/dnode/mgmt/impl/test/stb/stb.cpp b/source/dnode/mgmt/impl/test/stb/stb.cpp index e67a3ea8cf..c55e952a89 100644 --- a/source/dnode/mgmt/impl/test/stb/stb.cpp +++ b/source/dnode/mgmt/impl/test/stb/stb.cpp @@ -133,7 +133,7 @@ TEST_F(DndTestStb, 01_Create_Show_Meta_Drop_Restart_Stb) { STableInfoMsg* pReq = (STableInfoMsg*)rpcMallocCont(contLen); strcpy(pReq->tableFname, "1.d1.stb"); - SRpcMsg* pMsg = test.SendMsg(TDMT_VND_TABLE_META, pReq, contLen); + SRpcMsg* pMsg = test.SendMsg(TDMT_MND_STB_META, pReq, contLen); ASSERT_NE(pMsg, nullptr); ASSERT_EQ(pMsg->code, 0); diff --git a/source/dnode/mgmt/impl/test/sut/src/server.cpp b/source/dnode/mgmt/impl/test/sut/src/server.cpp index e96f7deaf5..a001748a12 100644 --- a/source/dnode/mgmt/impl/test/sut/src/server.cpp +++ b/source/dnode/mgmt/impl/test/sut/src/server.cpp @@ -26,9 +26,7 @@ SDnodeOpt TestServer::BuildOption(const char* path, const char* fqdn, uint16_t p SDnodeOpt option = {0}; option.sver = 1; option.numOfCores = 1; - option.numOfSupportMnodes = 1; option.numOfSupportVnodes = 1; - option.numOfSupportQnodes = 1; option.statusInterval = 1; option.numOfThreadsPerCore = 1; option.ratioOfQueryCores = 1; diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index c6fb0cce1d..1ac2c3d171 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -62,10 +62,14 @@ typedef enum { typedef enum { TRN_STAGE_PREPARE = 0, - TRN_STAGE_EXECUTE = 1, - TRN_STAGE_ROLLBACK = 2, - TRN_STAGE_COMMIT = 3, - TRN_STAGE_OVER = 4, + TRN_STAGE_REDO_LOG = 1, + TRN_STAGE_REDO_ACTION = 2, + TRN_STAGE_UNDO_LOG = 3, + TRN_STAGE_UNDO_ACTION = 4, + TRN_STAGE_COMMIT_LOG = 5, + TRN_STAGE_COMMIT = 6, + TRN_STAGE_ROLLBACK = 7, + TRN_STAGE_FINISHED = 8 } ETrnStage; typedef enum { TRN_POLICY_ROLLBACK = 0, TRN_POLICY_RETRY = 1 } ETrnPolicy; @@ -95,7 +99,8 @@ typedef struct { int32_t id; ETrnStage stage; ETrnPolicy policy; - int32_t retryTimes; + int32_t code; + int32_t failedTimes; void *rpcHandle; void *rpcAHandle; SArray *redoLogs; @@ -106,7 +111,7 @@ typedef struct { } STrans; typedef struct { - int32_t id; + int64_t id; char name[TSDB_CLUSTER_ID_LEN]; int64_t createdTime; int64_t updateTime; @@ -313,12 +318,6 @@ typedef struct SMnodeMsg { void *pCont; } SMnodeMsg; -typedef struct { - int32_t id; - int32_t code; - void *rpcHandle; -} STransMsg; - #ifdef __cplusplus } #endif diff --git a/source/dnode/mnode/impl/inc/mndInt.h b/source/dnode/mnode/impl/inc/mndInt.h index a9932ce048..6eb82daa11 100644 --- a/source/dnode/mnode/impl/inc/mndInt.h +++ b/source/dnode/mnode/impl/inc/mndInt.h @@ -70,11 +70,12 @@ typedef struct { typedef struct SMnode { int32_t dnodeId; - int32_t clusterId; + int64_t clusterId; int8_t replica; int8_t selfIndex; SReplica replicas[TSDB_MAX_REPLICA]; tmr_h timer; + tmr_h transTimer; char *path; SMnodeCfg cfg; int64_t checkTime; diff --git a/source/dnode/mnode/impl/inc/mndTrans.h b/source/dnode/mnode/impl/inc/mndTrans.h index 2d57179f1c..201fcde1a9 100644 --- a/source/dnode/mnode/impl/inc/mndTrans.h +++ b/source/dnode/mnode/impl/inc/mndTrans.h @@ -44,11 +44,7 @@ int32_t mndTransAppendRedoAction(STrans *pTrans, STransAction *pAction); int32_t mndTransAppendUndoAction(STrans *pTrans, STransAction *pAction); int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans); -void mndTransApply(SMnode *pMnode, SSdbRaw *pRaw, STransMsg *pMsg, int32_t code); -void mndTransHandleActionRsp(SMnodeMsg *pMsg); - -char *mndTransStageStr(ETrnStage stage); -char *mndTransPolicyStr(ETrnPolicy policy); +void mndTransProcessRsp(SMnodeMsg *pMsg); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/src/mndCluster.c b/source/dnode/mnode/impl/src/mndCluster.c index cf8511c054..f656c27dcd 100644 --- a/source/dnode/mnode/impl/src/mndCluster.c +++ b/source/dnode/mnode/impl/src/mndCluster.c @@ -67,7 +67,7 @@ static SSdbRaw *mndClusterActionEncode(SClusterObj *pCluster) { if (pRaw == NULL) return NULL; int32_t dataPos = 0; - SDB_SET_INT32(pRaw, dataPos, pCluster->id); + SDB_SET_INT64(pRaw, dataPos, pCluster->id); SDB_SET_INT64(pRaw, dataPos, pCluster->createdTime) SDB_SET_INT64(pRaw, dataPos, pCluster->updateTime) SDB_SET_BINARY(pRaw, dataPos, pCluster->name, TSDB_CLUSTER_ID_LEN) @@ -91,7 +91,7 @@ static SSdbRow *mndClusterActionDecode(SSdbRaw *pRaw) { if (pCluster == NULL) return NULL; int32_t dataPos = 0; - SDB_GET_INT32(pRaw, pRow, dataPos, &pCluster->id) + SDB_GET_INT64(pRaw, pRow, dataPos, &pCluster->id) SDB_GET_INT64(pRaw, pRow, dataPos, &pCluster->createdTime) SDB_GET_INT64(pRaw, pRow, dataPos, &pCluster->updateTime) SDB_GET_BINARY(pRaw, pRow, dataPos, pCluster->name, TSDB_CLUSTER_ID_LEN) @@ -101,17 +101,17 @@ static SSdbRow *mndClusterActionDecode(SSdbRaw *pRaw) { } static int32_t mndClusterActionInsert(SSdb *pSdb, SClusterObj *pCluster) { - mTrace("cluster:%d, perform insert action", pCluster->id); + mTrace("cluster:%" PRId64 ", perform insert action", pCluster->id); return 0; } static int32_t mndClusterActionDelete(SSdb *pSdb, SClusterObj *pCluster) { - mTrace("cluster:%d, perform delete action", pCluster->id); + mTrace("cluster:%" PRId64 ", perform delete action", pCluster->id); return 0; } static int32_t mndClusterActionUpdate(SSdb *pSdb, SClusterObj *pOldCluster, SClusterObj *pNewCluster) { - mTrace("cluster:%d, perform update action", pOldCluster->id); + mTrace("cluster:%" PRId64 ", perform update action", pOldCluster->id); return 0; } @@ -125,17 +125,17 @@ static int32_t mndCreateDefaultCluster(SMnode *pMnode) { strcpy(clusterObj.name, "tdengine2.0"); mError("failed to get name from system, set to default val %s", clusterObj.name); } else { - mDebug("cluster:%d, name is %s", clusterObj.id, clusterObj.name); + mDebug("cluster:%" PRId64 ", name is %s", clusterObj.id, clusterObj.name); } clusterObj.id = MurmurHash3_32(clusterObj.name, TSDB_CLUSTER_ID_LEN); - clusterObj.id = abs(clusterObj.id); + clusterObj.id = (clusterObj.id >= 0 ? clusterObj.id : -clusterObj.id); pMnode->clusterId = clusterObj.id; SSdbRaw *pRaw = mndClusterActionEncode(&clusterObj); if (pRaw == NULL) return -1; sdbSetRawStatus(pRaw, SDB_STATUS_READY); - mDebug("cluster:%d, will be created while deploy sdb", clusterObj.id); + mDebug("cluster:%" PRId64 ", will be created while deploy sdb", clusterObj.id); return sdbWrite(pMnode->pSdb, pRaw); } @@ -143,8 +143,8 @@ static int32_t mndGetClusterMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg int32_t cols = 0; SSchema *pSchema = pMeta->pSchema; - pShow->bytes[cols] = 4; - pSchema[cols].type = TSDB_DATA_TYPE_INT; + pShow->bytes[cols] = 8; + pSchema[cols].type = TSDB_DATA_TYPE_BIGINT; strcpy(pSchema[cols].name, "id"); pSchema[cols].bytes = htonl(pShow->bytes[cols]); cols++; @@ -192,7 +192,7 @@ static int32_t mndRetrieveClusters(SMnodeMsg *pMsg, SShowObj *pShow, char *data, cols = 0; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - *(int32_t *)pWrite = pCluster->id; + *(int64_t *)pWrite = pCluster->id; cols++; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index 80c9f9544e..153d75ffd0 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -276,12 +276,11 @@ static int32_t mndCheckClusterCfgPara(SMnode *pMnode, const SClusterCfg *pCfg) { static void mndParseStatusMsg(SStatusMsg *pStatus) { pStatus->sver = htonl(pStatus->sver); pStatus->dnodeId = htonl(pStatus->dnodeId); - pStatus->clusterId = htonl(pStatus->clusterId); + pStatus->clusterId = htobe64(pStatus->clusterId); pStatus->rebootTime = htobe64(pStatus->rebootTime); + pStatus->updateTime = htobe64(pStatus->updateTime); pStatus->numOfCores = htons(pStatus->numOfCores); - pStatus->numOfSupportMnodes = htons(pStatus->numOfSupportMnodes); pStatus->numOfSupportVnodes = htons(pStatus->numOfSupportVnodes); - pStatus->numOfSupportQnodes = htons(pStatus->numOfSupportQnodes); pStatus->clusterCfg.statusInterval = htonl(pStatus->clusterCfg.statusInterval); pStatus->clusterCfg.checkTime = htobe64(pStatus->clusterCfg.checkTime); } @@ -324,13 +323,14 @@ static int32_t mndProcessStatusMsg(SMnodeMsg *pMsg) { } if (pStatus->dnodeId == 0) { - mDebug("dnode:%d %s, first access, set clusterId %d", pDnode->id, pDnode->ep, pMnode->clusterId); + mDebug("dnode:%d %s, first access, set clusterId %" PRId64, pDnode->id, pDnode->ep, pMnode->clusterId); } else { if (pStatus->clusterId != pMnode->clusterId) { if (pDnode != NULL && pDnode->status != DND_STATUS_READY) { pDnode->offlineReason = DND_REASON_CLUSTER_ID_NOT_MATCH; } - mError("dnode:%d, clusterId %d not match exist %d", pDnode->id, pStatus->clusterId, pMnode->clusterId); + mError("dnode:%d, clusterId %" PRId64 " not match exist %" PRId64, pDnode->id, pStatus->clusterId, + pMnode->clusterId); mndReleaseDnode(pMnode, pDnode); terrno != TSDB_CODE_MND_INVALID_CLUSTER_ID; return -1; @@ -356,9 +356,7 @@ static int32_t mndProcessStatusMsg(SMnodeMsg *pMsg) { pDnode->rebootTime = pStatus->rebootTime; pDnode->numOfCores = pStatus->numOfCores; - pDnode->numOfSupportMnodes = pStatus->numOfSupportMnodes; pDnode->numOfSupportVnodes = pStatus->numOfSupportVnodes; - pDnode->numOfSupportQnodes = pStatus->numOfSupportQnodes; pDnode->lastAccessTime = taosGetTimestampMs(); pDnode->status = DND_STATUS_READY; @@ -373,7 +371,7 @@ static int32_t mndProcessStatusMsg(SMnodeMsg *pMsg) { pRsp->dnodeCfg.dnodeId = htonl(pDnode->id); pRsp->dnodeCfg.dropped = 0; - pRsp->dnodeCfg.clusterId = htonl(pMnode->clusterId); + pRsp->dnodeCfg.clusterId = htobe64(pMnode->clusterId); mndGetDnodeData(pMnode, &pRsp->dnodeEps, numOfEps); pMsg->contLen = contLen; diff --git a/source/dnode/mnode/impl/src/mndMnode.c b/source/dnode/mnode/impl/src/mndMnode.c index 8f57a18a1d..1800fd8e83 100644 --- a/source/dnode/mnode/impl/src/mndMnode.c +++ b/source/dnode/mnode/impl/src/mndMnode.c @@ -567,17 +567,17 @@ static int32_t mndProcessDropMnodeReq(SMnodeMsg *pMsg) { } static int32_t mndProcessCreateMnodeRsp(SMnodeMsg *pMsg) { - mndTransHandleActionRsp(pMsg); + mndTransProcessRsp(pMsg); return 0; } static int32_t mndProcessAlterMnodeRsp(SMnodeMsg *pMsg) { - mndTransHandleActionRsp(pMsg); + mndTransProcessRsp(pMsg); return 0; } static int32_t mndProcessDropMnodeRsp(SMnodeMsg *pMsg) { - mndTransHandleActionRsp(pMsg); + mndTransProcessRsp(pMsg); return 0; } diff --git a/source/dnode/mnode/impl/src/mndProfile.c b/source/dnode/mnode/impl/src/mndProfile.c index 833be3884c..548e2f7be6 100644 --- a/source/dnode/mnode/impl/src/mndProfile.c +++ b/source/dnode/mnode/impl/src/mndProfile.c @@ -225,7 +225,7 @@ static int32_t mndProcessConnectMsg(SMnodeMsg *pMsg) { mndReleaseUser(pMnode, pUser); } - pRsp->clusterId = htonl(pMnode->clusterId); + pRsp->clusterId = htobe64(pMnode->clusterId); pRsp->connId = htonl(pConn->id); mndGetMnodeEpSet(pMnode, &pRsp->epSet); mndReleaseConn(pMnode, pConn); diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index 24d88068bf..799eb4afa7 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -58,7 +58,7 @@ int32_t mndInitStb(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_VND_CREATE_STB_RSP, mndProcessCreateStbInRsp); mndSetMsgHandle(pMnode, TDMT_VND_ALTER_STB_RSP, mndProcessAlterStbInRsp); mndSetMsgHandle(pMnode, TDMT_VND_DROP_STB_RSP, mndProcessDropStbInRsp); - mndSetMsgHandle(pMnode, TDMT_VND_TABLE_META, mndProcessStbMetaMsg); + mndSetMsgHandle(pMnode, TDMT_MND_STB_META, mndProcessStbMetaMsg); mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_STB, mndGetStbMeta); mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STB, mndRetrieveStb); @@ -552,7 +552,7 @@ static int32_t mndProcessCreateStbMsg(SMnodeMsg *pMsg) { } static int32_t mndProcessCreateStbInRsp(SMnodeMsg *pMsg) { - mndTransHandleActionRsp(pMsg); + mndTransProcessRsp(pMsg); return 0; } @@ -616,7 +616,7 @@ static int32_t mndProcessAlterStbMsg(SMnodeMsg *pMsg) { } static int32_t mndProcessAlterStbInRsp(SMnodeMsg *pMsg) { - mndTransHandleActionRsp(pMsg); + mndTransProcessRsp(pMsg); return 0; } @@ -728,7 +728,7 @@ static int32_t mndProcessDropStbMsg(SMnodeMsg *pMsg) { } static int32_t mndProcessDropStbInRsp(SMnodeMsg *pMsg) { - mndTransHandleActionRsp(pMsg); + mndTransProcessRsp(pMsg); return 0; } @@ -886,14 +886,19 @@ static void mndExtractTableName(char *tableId, char *name) { } static int32_t mndRetrieveStb(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows) { - SMnode * pMnode = pMsg->pMnode; - SSdb * pSdb = pMnode->pSdb; + SMnode *pMnode = pMsg->pMnode; + SSdb *pSdb = pMnode->pSdb; int32_t numOfRows = 0; SStbObj *pStb = NULL; int32_t cols = 0; - char * pWrite; + char *pWrite; char prefix[64] = {0}; + SDbObj *pDb = mndAcquireDb(pMnode, pShow->db); + if (pDb == NULL) { + return TSDB_CODE_MND_INVALID_DB; + } + tstrncpy(prefix, pShow->db, 64); strcat(prefix, TS_PATH_DELIMITER); int32_t prefixLen = (int32_t)strlen(prefix); @@ -902,7 +907,7 @@ static int32_t mndRetrieveStb(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int3 pShow->pIter = sdbFetch(pSdb, SDB_STB, pShow->pIter, (void **)&pStb); if (pShow->pIter == NULL) break; - if (strncmp(pStb->name, prefix, prefixLen) != 0) { + if (pStb->dbUid != pDb->uid) { sdbRelease(pSdb, pStb); continue; } @@ -931,6 +936,7 @@ static int32_t mndRetrieveStb(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int3 sdbRelease(pSdb, pStb); } + mndReleaseDb(pMnode, pDb); pShow->numOfReads += numOfRows; mndVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow); return numOfRows; diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index 04f6907918..7106c79588 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -524,7 +524,7 @@ static int32_t mndProcessAlterTopicMsg(SMnodeMsg *pMsg) { } static int32_t mndProcessAlterTopicInRsp(SMnodeMsg *pMsg) { - mndTransHandleActionRsp(pMsg); + mndTransProcessRsp(pMsg); return 0; } @@ -636,7 +636,7 @@ static int32_t mndProcessDropTopicMsg(SMnodeMsg *pMsg) { } static int32_t mndProcessDropTopicInRsp(SMnodeMsg *pMsg) { - mndTransHandleActionRsp(pMsg); + mndTransProcessRsp(pMsg); return 0; } @@ -706,7 +706,7 @@ static int32_t mndProcessTopicMetaMsg(SMnodeMsg *pMsg) { } static int32_t mndProcessCreateTopicInRsp(SMnodeMsg *pMsg) { - mndTransHandleActionRsp(pMsg); + mndTransProcessRsp(pMsg); return 0; } diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 0b714c34ae..9459c5e525 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -27,7 +27,6 @@ static int32_t mndTransActionInsert(SSdb *pSdb, STrans *pTrans); static int32_t mndTransActionUpdate(SSdb *pSdb, STrans *OldTrans, STrans *pOldTrans); static int32_t mndTransActionDelete(SSdb *pSdb, STrans *pTrans); -static void mndTransSendRpcRsp(STrans *pTrans, int32_t code); static int32_t mndTransAppendLog(SArray *pArray, SSdbRaw *pRaw); static int32_t mndTransAppendAction(SArray *pArray, STransAction *pAction); static void mndTransDropLogs(SArray *pArray); @@ -36,14 +35,23 @@ static int32_t mndTransExecuteLogs(SMnode *pMnode, SArray *pArray); static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pArray); static int32_t mndTransExecuteRedoLogs(SMnode *pMnode, STrans *pTrans); static int32_t mndTransExecuteUndoLogs(SMnode *pMnode, STrans *pTrans); -static int32_t mndTransExecuteCommitLogs(SMnode *pMnode, STrans *pTrans); static int32_t mndTransExecuteRedoActions(SMnode *pMnode, STrans *pTrans); static int32_t mndTransExecuteUndoActions(SMnode *pMnode, STrans *pTrans); -static void mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans); -static int32_t mndTransPerformExecuteStage(SMnode *pMnode, STrans *pTrans); -static int32_t mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans); -static int32_t mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans); +static int32_t mndTransExecuteCommitLogs(SMnode *pMnode, STrans *pTrans); +static bool mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans); +static bool mndTransPerformRedoLogStage(SMnode *pMnode, STrans *pTrans); +static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans); +static bool mndTransPerformUndoLogStage(SMnode *pMnode, STrans *pTrans); +static bool mndTransPerformUndoActionStage(SMnode *pMnode, STrans *pTrans); +static bool mndTransPerformCommitLogStage(SMnode *pMnode, STrans *pTrans); +static bool mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans); +static bool mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans); +static bool mndTransPerfromFinishedStage(SMnode *pMnode, STrans *pTrans); + static void mndTransExecute(SMnode *pMnode, STrans *pTrans); +static void mndTransSendRpcRsp(STrans *pTrans); +static int32_t mndProcessTransMsg(SMnodeMsg *pMsg); +static int32_t mndProcessTransRsp(SMnodeMsg *pMsg); int32_t mndInitTrans(SMnode *pMnode) { SSdbTable table = {.sdbType = SDB_TRANS, @@ -54,6 +62,8 @@ int32_t mndInitTrans(SMnode *pMnode) { .updateFp = (SdbUpdateFp)mndTransActionUpdate, .deleteFp = (SdbDeleteFp)mndTransActionDelete}; + mndSetMsgHandle(pMnode, TDMT_MND_TRANS, mndProcessTransMsg); + mndSetMsgHandle(pMnode, TDMT_MND_TRANS_RSP, mndProcessTransRsp); return sdbSetTable(pMnode->pSdb, table); } @@ -290,12 +300,12 @@ TRANS_DECODE_OVER: static int32_t mndTransActionInsert(SSdb *pSdb, STrans *pTrans) { pTrans->stage = TRN_STAGE_PREPARE; - mTrace("trans:%d, perform insert action, stage:%s", pTrans->id, mndTransStageStr(pTrans->stage)); + mTrace("trans:%d, perform insert action", pTrans->id); return 0; } static int32_t mndTransActionDelete(SSdb *pSdb, STrans *pTrans) { - mTrace("trans:%d, perform delete action, stage:%s", pTrans->id, mndTransStageStr(pTrans->stage)); + mTrace("trans:%d, perform delete action", pTrans->id); mndTransDropLogs(pTrans->redoLogs); mndTransDropLogs(pTrans->undoLogs); @@ -307,7 +317,7 @@ static int32_t mndTransActionDelete(SSdb *pSdb, STrans *pTrans) { } static int32_t mndTransActionUpdate(SSdb *pSdb, STrans *pOldTrans, STrans *pNewTrans) { - mTrace("trans:%d, perform update action, stage:%s", pOldTrans->id, mndTransStageStr(pNewTrans->stage)); + mTrace("trans:%d, perform update action", pOldTrans->id); pOldTrans->stage = pNewTrans->stage; return 0; } @@ -326,34 +336,6 @@ void mndReleaseTrans(SMnode *pMnode, STrans *pTrans) { sdbRelease(pSdb, pTrans); } -char *mndTransStageStr(ETrnStage stage) { - switch (stage) { - case TRN_STAGE_PREPARE: - return "prepare"; - case TRN_STAGE_EXECUTE: - return "execute"; - case TRN_STAGE_COMMIT: - return "commit"; - case TRN_STAGE_ROLLBACK: - return "rollback"; - case TRN_STAGE_OVER: - return "over"; - default: - return "undefined"; - } -} - -char *mndTransPolicyStr(ETrnPolicy policy) { - switch (policy) { - case TRN_POLICY_ROLLBACK: - return "prepare"; - case TRN_POLICY_RETRY: - return "retry"; - default: - return "undefined"; - } -} - STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, SRpcMsg *pMsg) { STrans *pTrans = calloc(1, sizeof(STrans)); if (pTrans == NULL) { @@ -428,23 +410,11 @@ static int32_t mndTransAppendLog(SArray *pArray, SSdbRaw *pRaw) { return 0; } -int32_t mndTransAppendRedolog(STrans *pTrans, SSdbRaw *pRaw) { - int32_t code = mndTransAppendLog(pTrans->redoLogs, pRaw); - mTrace("trans:%d, raw:%p append to redo logs, code:0x%x", pTrans->id, pRaw, code); - return code; -} +int32_t mndTransAppendRedolog(STrans *pTrans, SSdbRaw *pRaw) { return mndTransAppendLog(pTrans->redoLogs, pRaw); } -int32_t mndTransAppendUndolog(STrans *pTrans, SSdbRaw *pRaw) { - int32_t code = mndTransAppendLog(pTrans->undoLogs, pRaw); - mTrace("trans:%d, raw:%p append to undo logs, code:0x%x", pTrans->id, pRaw, code); - return code; -} +int32_t mndTransAppendUndolog(STrans *pTrans, SSdbRaw *pRaw) { return mndTransAppendLog(pTrans->undoLogs, pRaw); } -int32_t mndTransAppendCommitlog(STrans *pTrans, SSdbRaw *pRaw) { - int32_t code = mndTransAppendLog(pTrans->commitLogs, pRaw); - mTrace("trans:%d, raw:%p append to commit logs, code:0x%x", pTrans->id, pRaw, code); - return code; -} +int32_t mndTransAppendCommitlog(STrans *pTrans, SSdbRaw *pRaw) { return mndTransAppendLog(pTrans->commitLogs, pRaw); } static int32_t mndTransAppendAction(SArray *pArray, STransAction *pAction) { void *ptr = taosArrayPush(pArray, pAction); @@ -457,20 +427,14 @@ static int32_t mndTransAppendAction(SArray *pArray, STransAction *pAction) { } int32_t mndTransAppendRedoAction(STrans *pTrans, STransAction *pAction) { - int32_t code = mndTransAppendAction(pTrans->redoActions, pAction); - mTrace("trans:%d, msg:%s append to redo actions, code:0x%x", pTrans->id, TMSG_INFO(pAction->msgType), code); - return code; + return mndTransAppendAction(pTrans->redoActions, pAction); } int32_t mndTransAppendUndoAction(STrans *pTrans, STransAction *pAction) { - int32_t code = mndTransAppendAction(pTrans->undoActions, pAction); - mTrace("trans:%d, msg:%s append to undo actions, code:0x%x", pTrans->id, TMSG_INFO(pAction->msgType), code); - return code; + return mndTransAppendAction(pTrans->undoActions, pAction); } -int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) { - mDebug("trans:%d, prepare transaction", pTrans->id); - +static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) { SSdbRaw *pRaw = mndTransActionEncode(pTrans); if (pRaw == NULL) { mError("trans:%d, failed to decode trans since %s", pTrans->id, terrstr()); @@ -494,6 +458,17 @@ int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) { return -1; } + return 0; +} + +int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) { + mDebug("trans:%d, prepare transaction", pTrans->id); + if (mndTransSync(pMnode, pTrans) != 0) { + mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); + return -1; + } + mDebug("trans:%d, prepare finished", pTrans->id); + STrans *pNewTrans = mndAcquireTrans(pMnode, pTrans->id); if (pNewTrans == NULL) { mError("trans:%d, failed to read from sdb since %s", pTrans->id, terrstr()); @@ -507,84 +482,41 @@ int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) { return 0; } -int32_t mndTransCommit(SMnode *pMnode, STrans *pTrans) { +static int32_t mndTransCommit(SMnode *pMnode, STrans *pTrans) { + if (taosArrayGetSize(pTrans->commitLogs) == 0 && taosArrayGetSize(pTrans->redoActions) == 0) return 0; + mDebug("trans:%d, commit transaction", pTrans->id); - - SSdbRaw *pRaw = mndTransActionEncode(pTrans); - if (pRaw == NULL) { - mError("trans:%d, failed to decode trans since %s", pTrans->id, terrstr()); + if (mndTransSync(pMnode, pTrans) != 0) { + mError("trans:%d, failed to commit since %s", pTrans->id, terrstr()); return -1; } - sdbSetRawStatus(pRaw, SDB_STATUS_DROPPED); - - if (taosArrayGetSize(pTrans->commitLogs) != 0) { - mTrace("trans:%d, sync to other nodes", pTrans->id); - if (mndSyncPropose(pMnode, pRaw) != 0) { - mError("trans:%d, failed to sync since %s", pTrans->id, terrstr()); - sdbFreeRaw(pRaw); - return -1; - } - mTrace("trans:%d, sync finished", pTrans->id); - } - - if (sdbWrite(pMnode->pSdb, pRaw) != 0) { - mError("trans:%d, failed to write sdb since %s", pTrans->id, terrstr()); - return -1; - } - mDebug("trans:%d, commit finished", pTrans->id); return 0; } -int32_t mndTransRollback(SMnode *pMnode, STrans *pTrans) { +static int32_t mndTransRollback(SMnode *pMnode, STrans *pTrans) { mDebug("trans:%d, rollback transaction", pTrans->id); - - SSdbRaw *pRaw = mndTransActionEncode(pTrans); - if (pRaw == NULL) { - mError("trans:%d, failed to decode trans since %s", pTrans->id, terrstr()); + if (mndTransSync(pMnode, pTrans) != 0) { + mError("trans:%d, failed to rollback since %s", pTrans->id, terrstr()); return -1; } - sdbSetRawStatus(pRaw, SDB_STATUS_DROPPED); - - mTrace("trans:%d, sync to other nodes", pTrans->id); - int32_t code = mndSyncPropose(pMnode, pRaw); - if (code != 0) { - mError("trans:%d, failed to sync since %s", pTrans->id, terrstr()); - sdbFreeRaw(pRaw); - return -1; - } - - mTrace("trans:%d, sync finished", pTrans->id); - code = sdbWrite(pMnode->pSdb, pRaw); - if (code != 0) { - mError("trans:%d, failed to write sdb since %s", pTrans->id, terrstr()); - return -1; - } - mDebug("trans:%d, rollback finished", pTrans->id); return 0; } -static void mndTransSendRpcRsp(STrans *pTrans, int32_t code) { - if (code == TSDB_CODE_MND_ACTION_IN_PROGRESS) return; - mDebug("trans:%d, send rpc rsp, RPC:%p ahandle:%p code:0x%x", pTrans->id, pTrans->rpcHandle, pTrans->rpcAHandle, - code & 0xFFFF); - +static void mndTransSendRpcRsp(STrans *pTrans) { if (pTrans->rpcHandle != NULL) { - SRpcMsg rspMsg = {.handle = pTrans->rpcHandle, .code = code, .ahandle = pTrans->rpcAHandle}; + mDebug("trans:%d, send rsp, ahandle:%p code:0x%x", pTrans->id, pTrans->rpcAHandle, pTrans->code & 0xFFFF); + SRpcMsg rspMsg = {.handle = pTrans->rpcHandle, .code = pTrans->code, .ahandle = pTrans->rpcAHandle}; rpcSendResponse(&rspMsg); } } -void mndTransApply(SMnode *pMnode, SSdbRaw *pRaw, STransMsg *pMsg, int32_t code) { - // todo -} - -void mndTransHandleActionRsp(SMnodeMsg *pMsg) { +void mndTransProcessRsp(SMnodeMsg *pMsg) { SMnode *pMnode = pMsg->pMnode; - int64_t sig = (int64_t)(pMsg->rpcMsg.ahandle); - int32_t transId = (int32_t)(sig >> 32); - int32_t action = (int32_t)((sig << 32) >> 32); + int64_t signature = (int64_t)(pMsg->rpcMsg.ahandle); + int32_t transId = (int32_t)(signature >> 32); + int32_t action = (int32_t)((signature << 32) >> 32); STrans *pTrans = mndAcquireTrans(pMnode, transId); if (pTrans == NULL) { @@ -593,15 +525,17 @@ void mndTransHandleActionRsp(SMnodeMsg *pMsg) { } SArray *pArray = NULL; - if (pTrans->stage == TRN_STAGE_EXECUTE) { + if (pTrans->stage == TRN_STAGE_REDO_ACTION) { pArray = pTrans->redoActions; - } else if (pTrans->stage == TRN_STAGE_ROLLBACK) { + } else if (pTrans->stage == TRN_STAGE_UNDO_ACTION) { pArray = pTrans->undoActions; } else { + mError("trans:%d, invalid trans stage:%d while recv action rsp", pTrans->id, pTrans->stage); + goto HANDLE_ACTION_RSP_OVER; } if (pArray == NULL) { - mError("trans:%d, invalid trans stage:%s", transId, mndTransStageStr(pTrans->stage)); + mError("trans:%d, invalid trans stage:%d", transId, pTrans->stage); goto HANDLE_ACTION_RSP_OVER; } @@ -653,15 +587,27 @@ static int32_t mndTransExecuteCommitLogs(SMnode *pMnode, STrans *pTrans) { return mndTransExecuteLogs(pMnode, pTrans->commitLogs); } -static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pArray) { +static void mndTransResetActions(SMnode *pMnode, STrans *pTrans, SArray *pArray) { + int32_t numOfActions = taosArrayGetSize(pArray); + + for (int32_t action = 0; action < numOfActions; ++action) { + STransAction *pAction = taosArrayGet(pArray, action); + if (pAction == NULL) continue; + if (pAction->msgSent && pAction->msgReceived && pAction->errCode == 0) continue; + + pAction->msgSent = 0; + pAction->msgReceived = 0; + pAction->errCode = 0; + mDebug("trans:%d, action:%d is reset and will be re-executed", pTrans->id, action); + } +} + +static int32_t mndTransSendActionMsg(SMnode *pMnode, STrans *pTrans, SArray *pArray) { int32_t numOfActions = taosArrayGetSize(pArray); - if (numOfActions == 0) return 0; for (int32_t action = 0; action < numOfActions; ++action) { STransAction *pAction = taosArrayGet(pArray, action); if (pAction == NULL) continue; - // if (pAction->msgSent && !pAction->msgReceived) continue; - // if (pAction->msgSent && pAction->msgReceived && pAction->errCode == 0) continue; if (pAction->msgSent) continue; int64_t signature = pTrans->id; @@ -684,6 +630,17 @@ static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pA mndSendMsgToDnode(pMnode, &pAction->epSet, &rpcMsg); } + return 0; +} + +static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pArray) { + int32_t numOfActions = taosArrayGetSize(pArray); + if (numOfActions == 0) return 0; + + if (mndTransSendActionMsg(pMnode, pTrans, pArray) != 0) { + return -1; + } + int32_t numOfReceived = 0; int32_t errCode = 0; for (int32_t action = 0; action < numOfActions; ++action) { @@ -698,9 +655,15 @@ static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pA } if (numOfReceived == numOfActions) { - mDebug("trans:%d, all %d actions executed, code:0x%x", pTrans->id, numOfActions, errCode); - terrno = errCode; - return errCode; + if (errCode == 0) { + mDebug("trans:%d, all %d actions execute successfully", pTrans->id, numOfActions); + return 0; + } else { + mError("trans:%d, all %d actions executed, code:0x%x", pTrans->id, numOfActions, errCode); + mndTransResetActions(pMnode, pTrans, pArray); + terrno = errCode; + return errCode; + } } else { mDebug("trans:%d, %d of %d actions executed, code:0x%x", pTrans->id, numOfReceived, numOfActions, errCode); return TSDB_CODE_MND_ACTION_IN_PROGRESS; @@ -715,88 +678,231 @@ static int32_t mndTransExecuteUndoActions(SMnode *pMnode, STrans *pTrans) { return mndTransExecuteActions(pMnode, pTrans, pTrans->undoActions); } -static void mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans) { +static bool mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans) { + bool continueExec = true; + pTrans->stage = TRN_STAGE_REDO_LOG; + mDebug("trans:%d, stage from prepare to redoLog", pTrans->id); + return continueExec; +} + +static bool mndTransPerformRedoLogStage(SMnode *pMnode, STrans *pTrans) { + bool continueExec = true; int32_t code = mndTransExecuteRedoLogs(pMnode, pTrans); if (code == 0) { - pTrans->stage = TRN_STAGE_EXECUTE; - mDebug("trans:%d, stage from prepare to execute", pTrans->id); + pTrans->code = 0; + pTrans->stage = TRN_STAGE_REDO_ACTION; + mDebug("trans:%d, stage from redoLog to redoAction", pTrans->id); } else { - pTrans->stage = TRN_STAGE_ROLLBACK; - mError("trans:%d, stage from prepare to rollback since %s", pTrans->id, terrstr()); + pTrans->code = terrno; + pTrans->stage = TRN_STAGE_UNDO_LOG; + mError("trans:%d, stage from redoLog to undoLog", pTrans->id); } + + return continueExec; } -static int32_t mndTransPerformExecuteStage(SMnode *pMnode, STrans *pTrans) { +static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans) { + bool continueExec = true; int32_t code = mndTransExecuteRedoActions(pMnode, pTrans); if (code == 0) { + pTrans->code = 0; pTrans->stage = TRN_STAGE_COMMIT; - mDebug("trans:%d, stage from execute to commit", pTrans->id); + mDebug("trans:%d, stage from redoAction to commit", pTrans->id); + continueExec = true; } else if (code == TSDB_CODE_MND_ACTION_IN_PROGRESS) { - mDebug("trans:%d, stage keep on execute since %s", pTrans->id, tstrerror(code)); + mDebug("trans:%d, stage keep on redoAction since %s", pTrans->id, tstrerror(code)); + continueExec = false; } else { + pTrans->code = terrno; if (pTrans->policy == TRN_POLICY_ROLLBACK) { - pTrans->stage = TRN_STAGE_ROLLBACK; - mError("trans:%d, stage from execute to rollback since %s", pTrans->id, terrstr()); + pTrans->stage = TRN_STAGE_UNDO_ACTION; + mError("trans:%d, stage from redoAction to undoAction since %s", pTrans->id, terrstr()); + continueExec = true; } else { - pTrans->stage = TRN_STAGE_EXECUTE; - pTrans->retryTimes++; - mError("trans:%d, stage keep on execute since %s", pTrans->id, terrstr()); + pTrans->failedTimes++; + mError("trans:%d, stage keep on redoAction since %s, failedTimes:%d", pTrans->id, terrstr(), pTrans->failedTimes); + continueExec = false; } } - return code; + return continueExec; } -static int32_t mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans) { - mndTransExecuteCommitLogs(pMnode, pTrans); - pTrans->stage = TRN_STAGE_OVER; - return 0; +static bool mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans) { + bool continueExec = true; + int32_t code = mndTransCommit(pMnode, pTrans); + + if (code == 0) { + pTrans->code = 0; + pTrans->stage = TRN_STAGE_COMMIT_LOG; + mDebug("trans:%d, stage from commit to commitLog", pTrans->id); + continueExec = true; + } else { + pTrans->code = terrno; + if (pTrans->policy == TRN_POLICY_ROLLBACK) { + pTrans->stage = TRN_STAGE_REDO_ACTION; + mError("trans:%d, stage from commit to redoAction since %s, failedTimes:%d", pTrans->id, terrstr(), + pTrans->failedTimes); + continueExec = true; + } else { + pTrans->failedTimes++; + mError("trans:%d, stage keep on commit since %s, failedTimes:%d", pTrans->id, terrstr(), pTrans->failedTimes); + continueExec = false; + } + } + + return continueExec; } -static int32_t mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans) { +static bool mndTransPerformCommitLogStage(SMnode *pMnode, STrans *pTrans) { + bool continueExec = true; + int32_t code = mndTransExecuteCommitLogs(pMnode, pTrans); + + if (code == 0) { + pTrans->code = 0; + pTrans->stage = TRN_STAGE_FINISHED; + mDebug("trans:%d, stage from commitLog to finished", pTrans->id); + continueExec = true; + } else { + pTrans->code = terrno; + pTrans->failedTimes++; + mError("trans:%d, stage keep on commitLog since %s", pTrans->id, terrstr()); + continueExec = false; + ; + } + + return continueExec; +} + +static bool mndTransPerformUndoLogStage(SMnode *pMnode, STrans *pTrans) { + bool continueExec = true; + int32_t code = mndTransExecuteUndoLogs(pMnode, pTrans); + + if (code == 0) { + pTrans->stage = TRN_STAGE_ROLLBACK; + mDebug("trans:%d, stage from undoLog to rollback", pTrans->id); + continueExec = true; + } else { + mDebug("trans:%d, stage keep on undoLog since %s", pTrans->id, terrstr()); + continueExec = false; + } + + return continueExec; +} + +static bool mndTransPerformUndoActionStage(SMnode *pMnode, STrans *pTrans) { + bool continueExec = true; int32_t code = mndTransExecuteUndoActions(pMnode, pTrans); if (code == 0) { - mDebug("trans:%d, rollbacked", pTrans->id); + pTrans->stage = TRN_STAGE_REDO_LOG; + mDebug("trans:%d, stage from undoAction to undoLog", pTrans->id); + continueExec = true; + } else if (code == TSDB_CODE_MND_ACTION_IN_PROGRESS) { + mDebug("trans:%d, stage keep on undoAction since %s", pTrans->id, tstrerror(code)); + continueExec = false; } else { - pTrans->stage = TRN_STAGE_ROLLBACK; - pTrans->retryTimes++; - mError("trans:%d, stage keep on rollback since %s", pTrans->id, terrstr()); + pTrans->failedTimes++; + mError("trans:%d, stage keep on undoAction since %s", pTrans->id, terrstr()); + continueExec = false; } - return code; + return continueExec; +} + +static bool mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans) { + bool continueExec = true; + int32_t code = mndTransRollback(pMnode, pTrans); + + if (code == 0) { + pTrans->stage = TRN_STAGE_FINISHED; + mDebug("trans:%d, stage from rollback to finished", pTrans->id); + continueExec = true; + ; + } else { + pTrans->failedTimes++; + mError("trans:%d, stage keep on rollback since %s", pTrans->id, terrstr()); + continueExec = false; + } + + return continueExec; +} + +static bool mndTransPerfromFinishedStage(SMnode *pMnode, STrans *pTrans) { + bool continueExec = false; + + SSdbRaw *pRaw = mndTransActionEncode(pTrans); + if (pRaw == NULL) { + mError("trans:%d, failed to decode trans since %s", pTrans->id, terrstr()); + } + sdbSetRawStatus(pRaw, SDB_STATUS_DROPPED); + + int32_t code = sdbWrite(pMnode->pSdb, pRaw); + if (code != 0) { + mError("trans:%d, failed to write sdb since %s", pTrans->id, terrstr()); + } + + mDebug("trans:%d, finished, code:0x%x, failedTimes:%d", pTrans->id, pTrans->code, pTrans->failedTimes); + return continueExec; } static void mndTransExecute(SMnode *pMnode, STrans *pTrans) { - int32_t code = 0; + bool continueExec = true; - while (code == 0) { + while (continueExec) { switch (pTrans->stage) { case TRN_STAGE_PREPARE: - mndTransPerformPrepareStage(pMnode, pTrans); + continueExec = mndTransPerformPrepareStage(pMnode, pTrans); break; - case TRN_STAGE_EXECUTE: - code = mndTransPerformExecuteStage(pMnode, pTrans); + case TRN_STAGE_REDO_LOG: + continueExec = mndTransPerformRedoLogStage(pMnode, pTrans); + break; + case TRN_STAGE_REDO_ACTION: + continueExec = mndTransPerformRedoActionStage(pMnode, pTrans); + break; + case TRN_STAGE_UNDO_LOG: + continueExec = mndTransPerformUndoLogStage(pMnode, pTrans); + break; + case TRN_STAGE_UNDO_ACTION: + continueExec = mndTransPerformUndoActionStage(pMnode, pTrans); + break; + case TRN_STAGE_COMMIT_LOG: + continueExec = mndTransPerformCommitLogStage(pMnode, pTrans); break; case TRN_STAGE_COMMIT: - code = mndTransCommit(pMnode, pTrans); - if (code == 0) { - mndTransPerformCommitStage(pMnode, pTrans); - } + continueExec = mndTransPerformCommitStage(pMnode, pTrans); break; case TRN_STAGE_ROLLBACK: - code = mndTransRollback(pMnode, pTrans); - if (code == 0) { - mndTransPerformRollbackStage(pMnode, pTrans); - } + continueExec = mndTransPerformRollbackStage(pMnode, pTrans); + break; + case TRN_STAGE_FINISHED: + continueExec = mndTransPerfromFinishedStage(pMnode, pTrans); break; default: - mndTransSendRpcRsp(pTrans, 0); - return; + continueExec = false; + break; } } - mndTransSendRpcRsp(pTrans, code); + if (pTrans->stage == TRN_STAGE_FINISHED) { + mndTransSendRpcRsp(pTrans); + } } + +static int32_t mndProcessTransMsg(SMnodeMsg *pMsg) { + SMnode *pMnode = pMsg->pMnode; + STrans *pTrans = NULL; + void *pIter = NULL; + + while (1) { + pIter = sdbFetch(pMnode->pSdb, SDB_TRANS, pIter, (void **)&pTrans); + if (pIter == NULL) break; + + mndTransExecute(pMnode, pTrans); + sdbRelease(pMnode->pSdb, pTrans); + } +} + +static int32_t mndProcessTransRsp(SMnodeMsg *pMsg) { return 0; } \ No newline at end of file diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index 8ff2139314..c9f4401264 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -333,17 +333,17 @@ SEpSet mndGetVgroupEpset(SMnode *pMnode, SVgObj *pVgroup) { } static int32_t mndProcessCreateVnodeRsp(SMnodeMsg *pMsg) { - mndTransHandleActionRsp(pMsg); + mndTransProcessRsp(pMsg); return 0; } static int32_t mndProcessAlterVnodeRsp(SMnodeMsg *pMsg) { - mndTransHandleActionRsp(pMsg); + mndTransProcessRsp(pMsg); return 0; } static int32_t mndProcessDropVnodeRsp(SMnodeMsg *pMsg) { - mndTransHandleActionRsp(pMsg); + mndTransProcessRsp(pMsg); return 0; } diff --git a/source/dnode/mnode/impl/src/mnode.c b/source/dnode/mnode/impl/src/mnode.c index 27668a585a..64ea85044a 100644 --- a/source/dnode/mnode/impl/src/mnode.c +++ b/source/dnode/mnode/impl/src/mnode.c @@ -50,6 +50,20 @@ void mndSendRedirectMsg(SMnode *pMnode, SRpcMsg *pMsg) { } } +static void mndTransReExecute(void *param, void *tmrId) { + SMnode *pMnode = param; + if (mndIsMaster(pMnode)) { + STransMsg *pMsg = rpcMallocCont(sizeof(STransMsg)); + SEpSet epSet = {.inUse = 0, .numOfEps = 1}; + epSet.port[0] = pMnode->replicas[pMnode->selfIndex].port; + memcpy(epSet.fqdn[0], pMnode->replicas[pMnode->selfIndex].fqdn, TSDB_FQDN_LEN); + SRpcMsg rpcMsg = {.msgType = TDMT_MND_TRANS, .pCont = pMsg, .contLen = sizeof(STransMsg)}; + mndSendMsgToDnode(pMnode, &epSet, &rpcMsg); + } + + taosTmrReset(mndTransReExecute, 3000, pMnode, pMnode->timer, &pMnode->transTimer); +} + static int32_t mndInitTimer(SMnode *pMnode) { if (pMnode->timer == NULL) { pMnode->timer = taosTmrInit(5000, 200, 3600000, "MND"); @@ -60,11 +74,18 @@ static int32_t mndInitTimer(SMnode *pMnode) { return -1; } + if (taosTmrReset(mndTransReExecute, 1000, pMnode, pMnode->timer, &pMnode->transTimer)) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + return 0; } static void mndCleanupTimer(SMnode *pMnode) { if (pMnode->timer != NULL) { + taosTmrStop(pMnode->transTimer); + pMnode->transTimer = NULL; taosTmrCleanUp(pMnode->timer); pMnode->timer = NULL; } diff --git a/source/dnode/mnode/sdb/src/sdbHash.c b/source/dnode/mnode/sdb/src/sdbHash.c index 0c311b86b6..27ff8e697d 100644 --- a/source/dnode/mnode/sdb/src/sdbHash.c +++ b/source/dnode/mnode/sdb/src/sdbHash.c @@ -123,6 +123,10 @@ static int32_t sdbDeleteRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow * SRWLatch *pLock = &pSdb->locks[pRow->type]; taosWLockLatch(pLock); + // remove attached object such as trans + SdbDeleteFp deleteFp = pSdb->deleteFps[pRow->type]; + if (deleteFp != NULL) (*deleteFp)(pSdb, pRow->pObj); + SSdbRow **ppOldRow = taosHashGet(hash, pRow->pObj, keySize); if (ppOldRow == NULL || *ppOldRow == NULL) { taosWUnLockLatch(pLock); diff --git a/source/dnode/vnode/impl/src/vnodeRequest.c b/source/dnode/vnode/impl/src/vnodeRequest.c index efbba6352f..afc43602d8 100644 --- a/source/dnode/vnode/impl/src/vnodeRequest.c +++ b/source/dnode/vnode/impl/src/vnodeRequest.c @@ -23,7 +23,7 @@ int vnodeBuildReq(void **buf, const SVnodeReq *pReq, tmsg_t type) { tsize += taosEncodeFixedU64(buf, pReq->ver); switch (type) { - case TDMT_MND_CREATE_TABLE: + case TDMT_VND_CREATE_STB: tsize += vnodeBuildCreateTableReq(buf, &(pReq->ctReq)); break; case TDMT_VND_SUBMIT: @@ -40,7 +40,7 @@ void *vnodeParseReq(void *buf, SVnodeReq *pReq, tmsg_t type) { buf = taosDecodeFixedU64(buf, &(pReq->ver)); switch (type) { - case TDMT_MND_CREATE_TABLE: + case TDMT_VND_CREATE_STB: buf = vnodeParseCreateTableReq(buf, &(pReq->ctReq)); break; diff --git a/source/dnode/vnode/impl/src/vnodeWrite.c b/source/dnode/vnode/impl/src/vnodeWrite.c index 18a808f955..ef35c81e06 100644 --- a/source/dnode/vnode/impl/src/vnodeWrite.c +++ b/source/dnode/vnode/impl/src/vnodeWrite.c @@ -78,7 +78,7 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { // TODO: maybe need to clear the requst struct break; case TDMT_VND_DROP_STB: - case TDMT_MND_DROP_TABLE: + case TDMT_VND_DROP_TABLE: if (metaDropTable(pVnode->pMeta, vReq.dtReq.uid) < 0) { // TODO: handle error } diff --git a/source/dnode/vnode/impl/test/vnodeApiTests.cpp b/source/dnode/vnode/impl/test/vnodeApiTests.cpp index 13d703f0be..71ffdd1d0f 100644 --- a/source/dnode/vnode/impl/test/vnodeApiTests.cpp +++ b/source/dnode/vnode/impl/test/vnodeApiTests.cpp @@ -84,14 +84,14 @@ static void vtBuildCreateStbReq(tb_uid_t suid, char *tbname, SRpcMsg **ppMsg) { SVnodeReq vCreateSTbReq; vnodeSetCreateStbReq(&vCreateSTbReq, tbname, UINT32_MAX, UINT32_MAX, suid, pSchema, pTagSchema); - zs = vnodeBuildReq(NULL, &vCreateSTbReq, TDMT_MND_CREATE_TABLE); + zs = vnodeBuildReq(NULL, &vCreateSTbReq, TDMT_VND_CREATE_STB); pMsg = (SRpcMsg *)malloc(sizeof(SRpcMsg) + zs); - pMsg->msgType = TDMT_MND_CREATE_TABLE; + pMsg->msgType = TDMT_VND_CREATE_STB; pMsg->contLen = zs; pMsg->pCont = POINTER_SHIFT(pMsg, sizeof(SRpcMsg)); pBuf = pMsg->pCont; - vnodeBuildReq(&pBuf, &vCreateSTbReq, TDMT_MND_CREATE_TABLE); + vnodeBuildReq(&pBuf, &vCreateSTbReq, TDMT_VND_CREATE_STB); META_CLEAR_TB_CFG(&vCreateSTbReq); tdFreeSchema(pSchema); @@ -108,14 +108,14 @@ static void vtBuildCreateCtbReq(tb_uid_t suid, char *tbname, SRpcMsg **ppMsg) { SVnodeReq vCreateCTbReq; vnodeSetCreateCtbReq(&vCreateCTbReq, tbname, UINT32_MAX, UINT32_MAX, suid, pTag); - tz = vnodeBuildReq(NULL, &vCreateCTbReq, TDMT_MND_CREATE_TABLE); + tz = vnodeBuildReq(NULL, &vCreateCTbReq, TDMT_VND_CREATE_TABLE); pMsg = (SRpcMsg *)malloc(sizeof(SRpcMsg) + tz); - pMsg->msgType = TDMT_MND_CREATE_TABLE; + pMsg->msgType = TDMT_VND_CREATE_TABLE; pMsg->contLen = tz; pMsg->pCont = POINTER_SHIFT(pMsg, sizeof(*pMsg)); void *pBuf = pMsg->pCont; - vnodeBuildReq(&pBuf, &vCreateCTbReq, TDMT_MND_CREATE_TABLE); + vnodeBuildReq(&pBuf, &vCreateCTbReq, TDMT_VND_CREATE_TABLE); META_CLEAR_TB_CFG(&vCreateCTbReq); free(pTag); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 2e1c1f3f6e..f119627c69 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -21,7 +21,6 @@ #include "exception.h" #include "executorimpl.h" #include "thash.h" -//#include "queryLog.h" #include "function.h" #include "tcompare.h" #include "tcompression.h" @@ -7685,7 +7684,7 @@ int32_t createQueryFunc(SQueriedTableInfo* pTableInfo, int32_t numOfOutput, SExp type = s.type; bytes = s.bytes; } else if (pExprs[i].base.pColumns->info.colId == TSDB_TBNAME_COLUMN_INDEX && functionId == FUNCTION_TAGPRJ) { // parse the normal column - SSchema* s = tGetTbnameColumnSchema(); + const SSchema* s = tGetTbnameColumnSchema(); type = s->type; bytes = s->bytes; } else if (pExprs[i].base.pColumns->info.colId <= TSDB_UD_COLUMN_INDEX && pExprs[i].base.pColumns->info.colId > TSDB_RES_COL_ID) { @@ -7716,7 +7715,7 @@ int32_t createQueryFunc(SQueriedTableInfo* pTableInfo, int32_t numOfOutput, SExp type = pCol->type; bytes = pCol->bytes; } else { - SSchema* s = tGetTbnameColumnSchema(); + const SSchema* s = tGetTbnameColumnSchema(); type = s->type; bytes = s->bytes;