diff --git a/src/dnode/inc/dnodeMgmt.h b/src/dnode/inc/dnodeMgmt.h index 4d15dc5a86..826f4ff1c1 100644 --- a/src/dnode/inc/dnodeMgmt.h +++ b/src/dnode/inc/dnodeMgmt.h @@ -22,7 +22,7 @@ extern "C" { int32_t dnodeInitMgmt(); void dnodeCleanupMgmt(); -void dnodeDispatchToDnodeMgmt(SRpcMsg *rpcMsg); +void dnodeDispatchToMgmtQueue(SRpcMsg *rpcMsg); void* dnodeGetVnode(int32_t vgId); int32_t dnodeGetVnodeStatus(void *pVnode); diff --git a/src/dnode/src/dnodeMgmt.c b/src/dnode/src/dnodeMgmt.c index 14c2a725d9..7c457defca 100644 --- a/src/dnode/src/dnodeMgmt.c +++ b/src/dnode/src/dnodeMgmt.c @@ -22,6 +22,7 @@ #include "ttimer.h" #include "tsdb.h" #include "twal.h" +#include "tqueue.h" #include "tsync.h" #include "ttime.h" #include "ttimer.h" @@ -42,10 +43,12 @@ void * tsDnodeTmr = NULL; static void * tsStatusTimer = NULL; static uint32_t tsRebootTime; -static SRpcIpSet tsDMnodeIpSetForPeer = {0}; -static SRpcIpSet tsDMnodeIpSetForShell = {0}; +static SRpcIpSet tsDMnodeIpSet = {0}; static SDMMnodeInfos tsDMnodeInfos = {0}; static SDMDnodeCfg tsDnodeCfg = {0}; +static taos_qset tsMgmtQset = NULL; +static taos_queue tsMgmtQueue = NULL; +static pthread_t tsQthread; static void dnodeUpdateMnodeInfos(SDMMnodeInfos *pMnodes); static bool dnodeReadMnodeInfos(); @@ -55,6 +58,7 @@ static bool dnodeReadDnodeCfg(); static void dnodeSaveDnodeCfg(); static void dnodeProcessStatusRsp(SRpcMsg *pMsg); static void dnodeSendStatusMsg(void *handle, void *tmrId); +static void *dnodeProcessMgmtQueue(void *param); static int32_t dnodeOpenVnodes(); static void dnodeCloseVnodes(); @@ -74,52 +78,64 @@ int32_t dnodeInitMgmt() { dnodeReadDnodeCfg(); tsRebootTime = taosGetTimestampSec(); + if (!dnodeReadMnodeInfos()) { + memset(&tsDMnodeIpSet, 0, sizeof(SRpcIpSet)); + memset(&tsDMnodeInfos, 0, sizeof(SDMMnodeInfos)); + + tsDMnodeIpSet.numOfIps = 1; + taosGetFqdnPortFromEp(tsFirst, tsDMnodeIpSet.fqdn[0], &tsDMnodeIpSet.port[0]); + + if (strcmp(tsSecond, tsFirst) != 0) { + tsDMnodeIpSet.numOfIps = 2; + taosGetFqdnPortFromEp(tsSecond, tsDMnodeIpSet.fqdn[1], &tsDMnodeIpSet.port[1]); + } + } else { + tsDMnodeIpSet.inUse = tsDMnodeInfos.inUse; + tsDMnodeIpSet.numOfIps = tsDMnodeInfos.nodeNum; + for (int32_t i = 0; i < tsDMnodeInfos.nodeNum; i++) { + taosGetFqdnPortFromEp(tsDMnodeInfos.nodeInfos[i].nodeEp, tsDMnodeIpSet.fqdn[i], &tsDMnodeIpSet.port[i]); + } + } + + // create the queue and thread to handle the message + tsMgmtQset = taosOpenQset(); + if (tsMgmtQset == NULL) { + dError("failed to create the mgmt queue set"); + dnodeCleanupMgmt(); + return -1; + } + + tsMgmtQueue = taosOpenQueue(); + if (tsMgmtQueue == NULL) { + dError("failed to create the mgmt queue"); + dnodeCleanupMgmt(); + return -1; + } + + taosAddIntoQset(tsMgmtQset, tsMgmtQueue, NULL); + + pthread_attr_t thAttr; + pthread_attr_init(&thAttr); + pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE); + + int32_t code = pthread_create(&tsQthread, &thAttr, dnodeProcessMgmtQueue, NULL); + pthread_attr_destroy(&thAttr); + if (code != 0) { + dError("failed to create thread to process mgmt queue, reason:%s", strerror(errno)); + dnodeCleanupMgmt(); + return -1; + } + + code = dnodeOpenVnodes(); + if (code != TSDB_CODE_SUCCESS) { + dnodeCleanupMgmt(); + return -1; + } + tsDnodeTmr = taosTmrInit(100, 200, 60000, "DND-DM"); if (tsDnodeTmr == NULL) { dError("failed to init dnode timer"); - return -1; - } - - if (!dnodeReadMnodeInfos()) { - memset(&tsDMnodeIpSetForPeer, 0, sizeof(SRpcIpSet)); - memset(&tsDMnodeIpSetForShell, 0, sizeof(SRpcIpSet)); - memset(&tsDMnodeInfos, 0, sizeof(SDMMnodeInfos)); - - tsDMnodeIpSetForPeer.numOfIps = 1; - taosGetFqdnPortFromEp(tsFirst, tsDMnodeIpSetForPeer.fqdn[0], &tsDMnodeIpSetForPeer.port[0]); - tsDMnodeIpSetForPeer.port[0] += TSDB_PORT_DNODEDNODE; - - tsDMnodeIpSetForShell.numOfIps = 1; - taosGetFqdnPortFromEp(tsFirst, tsDMnodeIpSetForShell.fqdn[0], &tsDMnodeIpSetForShell.port[0]); - tsDMnodeIpSetForShell.port[0] += TSDB_PORT_DNODESHELL; - - if (strcmp(tsSecond, tsFirst) != 0) { - tsDMnodeIpSetForPeer.numOfIps = 2; - taosGetFqdnPortFromEp(tsSecond, tsDMnodeIpSetForPeer.fqdn[1], &tsDMnodeIpSetForPeer.port[1]); - tsDMnodeIpSetForPeer.port[1] += TSDB_PORT_DNODEDNODE; - - tsDMnodeIpSetForShell.numOfIps = 2; - taosGetFqdnPortFromEp(tsSecond, tsDMnodeIpSetForShell.fqdn[1], &tsDMnodeIpSetForShell.port[1]); - tsDMnodeIpSetForShell.port[1] += TSDB_PORT_DNODESHELL; - } - } else { - tsDMnodeIpSetForPeer.inUse = tsDMnodeInfos.inUse; - tsDMnodeIpSetForPeer.numOfIps = tsDMnodeInfos.nodeNum; - for (int32_t i = 0; i < tsDMnodeInfos.nodeNum; i++) { - taosGetFqdnPortFromEp(tsDMnodeInfos.nodeInfos[i].nodeEp, tsDMnodeIpSetForPeer.fqdn[i], &tsDMnodeIpSetForPeer.port[i]); - tsDMnodeIpSetForPeer.port[i] += TSDB_PORT_DNODEDNODE; - } - - tsDMnodeIpSetForShell.inUse = tsDMnodeInfos.inUse; - tsDMnodeIpSetForShell.numOfIps = tsDMnodeInfos.nodeNum; - for (int32_t i = 0; i < tsDMnodeInfos.nodeNum; i++) { - taosGetFqdnPortFromEp(tsDMnodeInfos.nodeInfos[i].nodeEp, tsDMnodeIpSetForShell.fqdn[i], &tsDMnodeIpSetForShell.port[i]); - tsDMnodeIpSetForShell.port[i] += TSDB_PORT_DNODESHELL; - } - } - - int32_t code = dnodeOpenVnodes(); - if (code != TSDB_CODE_SUCCESS) { + dnodeCleanupMgmt(); return -1; } @@ -142,22 +158,62 @@ void dnodeCleanupMgmt() { } dnodeCloseVnodes(); + + if (tsMgmtQset) taosQsetThreadResume(tsMgmtQset); + if (tsQthread) pthread_join(tsQthread, NULL); + + if (tsMgmtQueue) taosCloseQueue(tsMgmtQueue); + if (tsMgmtQset) taosCloseQset(tsMgmtQset); + tsMgmtQset = NULL; + tsMgmtQueue = NULL; + } -void dnodeDispatchToDnodeMgmt(SRpcMsg *pMsg) { - SRpcMsg rsp; +void dnodeDispatchToMgmtQueue(SRpcMsg *pMsg) { + void *item; - if (dnodeProcessMgmtMsgFp[pMsg->msgType]) { - rsp.code = (*dnodeProcessMgmtMsgFp[pMsg->msgType])(pMsg); + item = taosAllocateQitem(sizeof(SRpcMsg)); + if (item) { + memcpy(item, pMsg, sizeof(SRpcMsg)); + taosWriteQitem(tsMgmtQueue, 1, item); } else { - rsp.code = TSDB_CODE_MSG_NOT_PROCESSED; + SRpcMsg rsp; + rsp.handle = pMsg->handle; + rsp.pCont = NULL; + rsp.code = TSDB_CODE_SERV_OUT_OF_MEMORY; + rpcSendResponse(&rsp); + rpcFreeCont(pMsg->pCont); + } +} + +static void *dnodeProcessMgmtQueue(void *param) { + SRpcMsg *pMsg; + SRpcMsg rsp; + int type; + void *handle; + + while (1) { + if (taosReadQitemFromQset(tsMgmtQset, &type, (void **) &pMsg, &handle) == 0) { + dTrace("dnode mgmt got no message from qset, exit ..."); + break; + } + + dTrace("%p, msg:%s will be processed", pMsg->ahandle, taosMsg[pMsg->msgType]); + if (dnodeProcessMgmtMsgFp[pMsg->msgType]) { + rsp.code = (*dnodeProcessMgmtMsgFp[pMsg->msgType])(pMsg); + } else { + rsp.code = TSDB_CODE_MSG_NOT_PROCESSED; + } + + rsp.handle = pMsg->handle; + rsp.pCont = NULL; + rpcSendResponse(&rsp); + + rpcFreeCont(pMsg->pCont); + taosFreeQitem(pMsg); } - rsp.handle = pMsg->handle; - rsp.pCont = NULL; - rpcSendResponse(&rsp); - - rpcFreeCont(pMsg->pCont); + return NULL; } static int32_t dnodeGetVnodeList(int32_t vnodeList[], int32_t *numOfVnodes) { @@ -284,22 +340,26 @@ static int32_t dnodeProcessConfigDnodeMsg(SRpcMsg *pMsg) { } void dnodeUpdateMnodeIpSetForPeer(SRpcIpSet *pIpSet) { - dPrint("mnode IP list for peer is changed, numOfIps:%d inUse:%d", pIpSet->numOfIps, pIpSet->inUse); + dPrint("mnode IP list for is changed, numOfIps:%d inUse:%d", pIpSet->numOfIps, pIpSet->inUse); for (int i = 0; i < pIpSet->numOfIps; ++i) { + pIpSet->port[i] -= TSDB_PORT_DNODEDNODE; dPrint("mnode index:%d %s:%u", i, pIpSet->fqdn[i], pIpSet->port[i]) } - tsDMnodeIpSetForPeer = *pIpSet; + tsDMnodeIpSet = *pIpSet; } void dnodeGetMnodeIpSetForPeer(void *ipSetRaw) { SRpcIpSet *ipSet = ipSetRaw; - *ipSet = tsDMnodeIpSetForPeer; + *ipSet = tsDMnodeIpSet; + + for (int i=0; inumOfIps; ++i) + ipSet->port[i] += TSDB_PORT_DNODEDNODE; } void dnodeGetMnodeIpSetForShell(void *ipSetRaw) { SRpcIpSet *ipSet = ipSetRaw; - *ipSet = tsDMnodeIpSetForShell; + *ipSet = tsDMnodeIpSet; } static void dnodeProcessStatusRsp(SRpcMsg *pMsg) { @@ -349,19 +409,10 @@ static void dnodeUpdateMnodeInfos(SDMMnodeInfos *pMnodes) { dPrint("mnode index:%d, %s", tsDMnodeInfos.nodeInfos[i].nodeId, tsDMnodeInfos.nodeInfos[i].nodeEp); } - tsDMnodeIpSetForPeer.inUse = tsDMnodeInfos.inUse; - tsDMnodeIpSetForPeer.numOfIps = tsDMnodeInfos.nodeNum; + tsDMnodeIpSet.inUse = tsDMnodeInfos.inUse; + tsDMnodeIpSet.numOfIps = tsDMnodeInfos.nodeNum; for (int32_t i = 0; i < tsDMnodeInfos.nodeNum; i++) { - taosGetFqdnPortFromEp(tsDMnodeInfos.nodeInfos[i].nodeEp, tsDMnodeIpSetForPeer.fqdn[i], &tsDMnodeIpSetForPeer.port[i]); - tsDMnodeIpSetForPeer.port[i] += TSDB_PORT_DNODEDNODE; - dPrint("mnode index:%d, for peer %s %d", i, tsDMnodeIpSetForPeer.fqdn[i], tsDMnodeIpSetForPeer.port[i]); - } - - tsDMnodeIpSetForShell.inUse = tsDMnodeInfos.inUse; - tsDMnodeIpSetForShell.numOfIps = tsDMnodeInfos.nodeNum; - for (int32_t i = 0; i < tsDMnodeInfos.nodeNum; i++) { - taosGetFqdnPortFromEp(tsDMnodeInfos.nodeInfos[i].nodeEp, tsDMnodeIpSetForShell.fqdn[i], &tsDMnodeIpSetForShell.port[i]); - dPrint("mnode index:%d, for shell %s %d", i, tsDMnodeIpSetForShell.fqdn[i], tsDMnodeIpSetForShell.port[i]); + taosGetFqdnPortFromEp(tsDMnodeInfos.nodeInfos[i].nodeEp, tsDMnodeIpSet.fqdn[i], &tsDMnodeIpSet.port[i]); } dnodeSaveMnodeInfos(); @@ -487,7 +538,7 @@ static void dnodeSaveMnodeInfos() { } char *dnodeGetMnodeMasterEp() { - return tsDMnodeInfos.nodeInfos[tsDMnodeIpSetForPeer.inUse].nodeEp; + return tsDMnodeInfos.nodeInfos[tsDMnodeIpSet.inUse].nodeEp; } void* dnodeGetMnodeInfos() { @@ -534,7 +585,9 @@ static void dnodeSendStatusMsg(void *handle, void *tmrId) { .msgType = TSDB_MSG_TYPE_DM_STATUS }; - dnodeSendMsgToDnode(&tsDMnodeIpSetForPeer, &rpcMsg); + SRpcIpSet ipSet; + dnodeGetMnodeIpSetForPeer(&ipSet); + dnodeSendMsgToDnode(&ipSet, &rpcMsg); } static bool dnodeReadDnodeCfg() { diff --git a/src/dnode/src/dnodePeer.c b/src/dnode/src/dnodePeer.c index 9a7b0837e8..ea3af08d71 100644 --- a/src/dnode/src/dnodePeer.c +++ b/src/dnode/src/dnodePeer.c @@ -43,10 +43,10 @@ int32_t dnodeInitServer() { dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE] = dnodeDispatchToVnodeWriteQueue; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_DROP_STABLE] = dnodeDispatchToVnodeWriteQueue; - dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = dnodeDispatchToDnodeMgmt; - dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = dnodeDispatchToDnodeMgmt; - dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = dnodeDispatchToDnodeMgmt; - dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE] = dnodeDispatchToDnodeMgmt; + dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = dnodeDispatchToMgmtQueue; + dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = dnodeDispatchToMgmtQueue; + dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = dnodeDispatchToMgmtQueue; + dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE] = dnodeDispatchToMgmtQueue; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_CONFIG_TABLE] = dnodeDispatchToMnodePeerQueue; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_CONFIG_VNODE] = dnodeDispatchToMnodePeerQueue; diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index 297ff31ed9..fa46c3a0f3 100644 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -420,13 +420,13 @@ void rpcSendResponse(const SRpcMsg *pRsp) { pConn->rspMsgLen = msgLen; if (pMsg->code == TSDB_CODE_ACTION_IN_PROGRESS) pConn->inTranId--; - rpcUnlockConn(pConn); - taosTmrStopA(&pConn->pTimer); // taosTmrReset(rpcProcessIdleTimer, pRpc->idleTime, pConn, pRpc->tmrCtrl, &pConn->pIdleTimer); rpcSendMsgToPeer(pConn, msg, msgLen); pConn->secured = 1; // connection shall be secured + rpcUnlockConn(pConn); + return; } @@ -1095,10 +1095,10 @@ static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) { pConn->reqMsgLen = msgLen; pConn->pContext = pContext; - rpcUnlockConn(pConn); - taosTmrReset(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl, &pConn->pTimer); rpcSendMsgToPeer(pConn, msg, msgLen); + + rpcUnlockConn(pConn); } static void rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) {