Merge pull request #2081 from taosdata/hotfix/dnodemgmt
Hotfix/dnodemgmt
This commit is contained in:
commit
760f673a4e
|
@ -22,7 +22,7 @@ extern "C" {
|
||||||
|
|
||||||
int32_t dnodeInitMgmt();
|
int32_t dnodeInitMgmt();
|
||||||
void dnodeCleanupMgmt();
|
void dnodeCleanupMgmt();
|
||||||
void dnodeDispatchToDnodeMgmt(SRpcMsg *rpcMsg);
|
void dnodeDispatchToMgmtQueue(SRpcMsg *rpcMsg);
|
||||||
|
|
||||||
void* dnodeGetVnode(int32_t vgId);
|
void* dnodeGetVnode(int32_t vgId);
|
||||||
int32_t dnodeGetVnodeStatus(void *pVnode);
|
int32_t dnodeGetVnodeStatus(void *pVnode);
|
||||||
|
|
|
@ -22,6 +22,7 @@
|
||||||
#include "ttimer.h"
|
#include "ttimer.h"
|
||||||
#include "tsdb.h"
|
#include "tsdb.h"
|
||||||
#include "twal.h"
|
#include "twal.h"
|
||||||
|
#include "tqueue.h"
|
||||||
#include "tsync.h"
|
#include "tsync.h"
|
||||||
#include "ttime.h"
|
#include "ttime.h"
|
||||||
#include "ttimer.h"
|
#include "ttimer.h"
|
||||||
|
@ -42,10 +43,12 @@ void * tsDnodeTmr = NULL;
|
||||||
static void * tsStatusTimer = NULL;
|
static void * tsStatusTimer = NULL;
|
||||||
static uint32_t tsRebootTime;
|
static uint32_t tsRebootTime;
|
||||||
|
|
||||||
static SRpcIpSet tsDMnodeIpSetForPeer = {0};
|
static SRpcIpSet tsDMnodeIpSet = {0};
|
||||||
static SRpcIpSet tsDMnodeIpSetForShell = {0};
|
|
||||||
static SDMMnodeInfos tsDMnodeInfos = {0};
|
static SDMMnodeInfos tsDMnodeInfos = {0};
|
||||||
static SDMDnodeCfg tsDnodeCfg = {0};
|
static SDMDnodeCfg tsDnodeCfg = {0};
|
||||||
|
static taos_qset tsMgmtQset = NULL;
|
||||||
|
static taos_queue tsMgmtQueue = NULL;
|
||||||
|
static pthread_t tsQthread;
|
||||||
|
|
||||||
static void dnodeUpdateMnodeInfos(SDMMnodeInfos *pMnodes);
|
static void dnodeUpdateMnodeInfos(SDMMnodeInfos *pMnodes);
|
||||||
static bool dnodeReadMnodeInfos();
|
static bool dnodeReadMnodeInfos();
|
||||||
|
@ -55,6 +58,7 @@ static bool dnodeReadDnodeCfg();
|
||||||
static void dnodeSaveDnodeCfg();
|
static void dnodeSaveDnodeCfg();
|
||||||
static void dnodeProcessStatusRsp(SRpcMsg *pMsg);
|
static void dnodeProcessStatusRsp(SRpcMsg *pMsg);
|
||||||
static void dnodeSendStatusMsg(void *handle, void *tmrId);
|
static void dnodeSendStatusMsg(void *handle, void *tmrId);
|
||||||
|
static void *dnodeProcessMgmtQueue(void *param);
|
||||||
|
|
||||||
static int32_t dnodeOpenVnodes();
|
static int32_t dnodeOpenVnodes();
|
||||||
static void dnodeCloseVnodes();
|
static void dnodeCloseVnodes();
|
||||||
|
@ -74,52 +78,64 @@ int32_t dnodeInitMgmt() {
|
||||||
dnodeReadDnodeCfg();
|
dnodeReadDnodeCfg();
|
||||||
tsRebootTime = taosGetTimestampSec();
|
tsRebootTime = taosGetTimestampSec();
|
||||||
|
|
||||||
|
if (!dnodeReadMnodeInfos()) {
|
||||||
|
memset(&tsDMnodeIpSet, 0, sizeof(SRpcIpSet));
|
||||||
|
memset(&tsDMnodeInfos, 0, sizeof(SDMMnodeInfos));
|
||||||
|
|
||||||
|
tsDMnodeIpSet.numOfIps = 1;
|
||||||
|
taosGetFqdnPortFromEp(tsFirst, tsDMnodeIpSet.fqdn[0], &tsDMnodeIpSet.port[0]);
|
||||||
|
|
||||||
|
if (strcmp(tsSecond, tsFirst) != 0) {
|
||||||
|
tsDMnodeIpSet.numOfIps = 2;
|
||||||
|
taosGetFqdnPortFromEp(tsSecond, tsDMnodeIpSet.fqdn[1], &tsDMnodeIpSet.port[1]);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
tsDMnodeIpSet.inUse = tsDMnodeInfos.inUse;
|
||||||
|
tsDMnodeIpSet.numOfIps = tsDMnodeInfos.nodeNum;
|
||||||
|
for (int32_t i = 0; i < tsDMnodeInfos.nodeNum; i++) {
|
||||||
|
taosGetFqdnPortFromEp(tsDMnodeInfos.nodeInfos[i].nodeEp, tsDMnodeIpSet.fqdn[i], &tsDMnodeIpSet.port[i]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// create the queue and thread to handle the message
|
||||||
|
tsMgmtQset = taosOpenQset();
|
||||||
|
if (tsMgmtQset == NULL) {
|
||||||
|
dError("failed to create the mgmt queue set");
|
||||||
|
dnodeCleanupMgmt();
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
tsMgmtQueue = taosOpenQueue();
|
||||||
|
if (tsMgmtQueue == NULL) {
|
||||||
|
dError("failed to create the mgmt queue");
|
||||||
|
dnodeCleanupMgmt();
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
taosAddIntoQset(tsMgmtQset, tsMgmtQueue, NULL);
|
||||||
|
|
||||||
|
pthread_attr_t thAttr;
|
||||||
|
pthread_attr_init(&thAttr);
|
||||||
|
pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
|
||||||
|
|
||||||
|
int32_t code = pthread_create(&tsQthread, &thAttr, dnodeProcessMgmtQueue, NULL);
|
||||||
|
pthread_attr_destroy(&thAttr);
|
||||||
|
if (code != 0) {
|
||||||
|
dError("failed to create thread to process mgmt queue, reason:%s", strerror(errno));
|
||||||
|
dnodeCleanupMgmt();
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
code = dnodeOpenVnodes();
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
dnodeCleanupMgmt();
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
tsDnodeTmr = taosTmrInit(100, 200, 60000, "DND-DM");
|
tsDnodeTmr = taosTmrInit(100, 200, 60000, "DND-DM");
|
||||||
if (tsDnodeTmr == NULL) {
|
if (tsDnodeTmr == NULL) {
|
||||||
dError("failed to init dnode timer");
|
dError("failed to init dnode timer");
|
||||||
return -1;
|
dnodeCleanupMgmt();
|
||||||
}
|
|
||||||
|
|
||||||
if (!dnodeReadMnodeInfos()) {
|
|
||||||
memset(&tsDMnodeIpSetForPeer, 0, sizeof(SRpcIpSet));
|
|
||||||
memset(&tsDMnodeIpSetForShell, 0, sizeof(SRpcIpSet));
|
|
||||||
memset(&tsDMnodeInfos, 0, sizeof(SDMMnodeInfos));
|
|
||||||
|
|
||||||
tsDMnodeIpSetForPeer.numOfIps = 1;
|
|
||||||
taosGetFqdnPortFromEp(tsFirst, tsDMnodeIpSetForPeer.fqdn[0], &tsDMnodeIpSetForPeer.port[0]);
|
|
||||||
tsDMnodeIpSetForPeer.port[0] += TSDB_PORT_DNODEDNODE;
|
|
||||||
|
|
||||||
tsDMnodeIpSetForShell.numOfIps = 1;
|
|
||||||
taosGetFqdnPortFromEp(tsFirst, tsDMnodeIpSetForShell.fqdn[0], &tsDMnodeIpSetForShell.port[0]);
|
|
||||||
tsDMnodeIpSetForShell.port[0] += TSDB_PORT_DNODESHELL;
|
|
||||||
|
|
||||||
if (strcmp(tsSecond, tsFirst) != 0) {
|
|
||||||
tsDMnodeIpSetForPeer.numOfIps = 2;
|
|
||||||
taosGetFqdnPortFromEp(tsSecond, tsDMnodeIpSetForPeer.fqdn[1], &tsDMnodeIpSetForPeer.port[1]);
|
|
||||||
tsDMnodeIpSetForPeer.port[1] += TSDB_PORT_DNODEDNODE;
|
|
||||||
|
|
||||||
tsDMnodeIpSetForShell.numOfIps = 2;
|
|
||||||
taosGetFqdnPortFromEp(tsSecond, tsDMnodeIpSetForShell.fqdn[1], &tsDMnodeIpSetForShell.port[1]);
|
|
||||||
tsDMnodeIpSetForShell.port[1] += TSDB_PORT_DNODESHELL;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
tsDMnodeIpSetForPeer.inUse = tsDMnodeInfos.inUse;
|
|
||||||
tsDMnodeIpSetForPeer.numOfIps = tsDMnodeInfos.nodeNum;
|
|
||||||
for (int32_t i = 0; i < tsDMnodeInfos.nodeNum; i++) {
|
|
||||||
taosGetFqdnPortFromEp(tsDMnodeInfos.nodeInfos[i].nodeEp, tsDMnodeIpSetForPeer.fqdn[i], &tsDMnodeIpSetForPeer.port[i]);
|
|
||||||
tsDMnodeIpSetForPeer.port[i] += TSDB_PORT_DNODEDNODE;
|
|
||||||
}
|
|
||||||
|
|
||||||
tsDMnodeIpSetForShell.inUse = tsDMnodeInfos.inUse;
|
|
||||||
tsDMnodeIpSetForShell.numOfIps = tsDMnodeInfos.nodeNum;
|
|
||||||
for (int32_t i = 0; i < tsDMnodeInfos.nodeNum; i++) {
|
|
||||||
taosGetFqdnPortFromEp(tsDMnodeInfos.nodeInfos[i].nodeEp, tsDMnodeIpSetForShell.fqdn[i], &tsDMnodeIpSetForShell.port[i]);
|
|
||||||
tsDMnodeIpSetForShell.port[i] += TSDB_PORT_DNODESHELL;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t code = dnodeOpenVnodes();
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -142,22 +158,62 @@ void dnodeCleanupMgmt() {
|
||||||
}
|
}
|
||||||
|
|
||||||
dnodeCloseVnodes();
|
dnodeCloseVnodes();
|
||||||
|
|
||||||
|
if (tsMgmtQset) taosQsetThreadResume(tsMgmtQset);
|
||||||
|
if (tsQthread) pthread_join(tsQthread, NULL);
|
||||||
|
|
||||||
|
if (tsMgmtQueue) taosCloseQueue(tsMgmtQueue);
|
||||||
|
if (tsMgmtQset) taosCloseQset(tsMgmtQset);
|
||||||
|
tsMgmtQset = NULL;
|
||||||
|
tsMgmtQueue = NULL;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void dnodeDispatchToDnodeMgmt(SRpcMsg *pMsg) {
|
void dnodeDispatchToMgmtQueue(SRpcMsg *pMsg) {
|
||||||
SRpcMsg rsp;
|
void *item;
|
||||||
|
|
||||||
if (dnodeProcessMgmtMsgFp[pMsg->msgType]) {
|
item = taosAllocateQitem(sizeof(SRpcMsg));
|
||||||
rsp.code = (*dnodeProcessMgmtMsgFp[pMsg->msgType])(pMsg);
|
if (item) {
|
||||||
|
memcpy(item, pMsg, sizeof(SRpcMsg));
|
||||||
|
taosWriteQitem(tsMgmtQueue, 1, item);
|
||||||
} else {
|
} else {
|
||||||
rsp.code = TSDB_CODE_MSG_NOT_PROCESSED;
|
SRpcMsg rsp;
|
||||||
|
rsp.handle = pMsg->handle;
|
||||||
|
rsp.pCont = NULL;
|
||||||
|
rsp.code = TSDB_CODE_SERV_OUT_OF_MEMORY;
|
||||||
|
rpcSendResponse(&rsp);
|
||||||
|
rpcFreeCont(pMsg->pCont);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static void *dnodeProcessMgmtQueue(void *param) {
|
||||||
|
SRpcMsg *pMsg;
|
||||||
|
SRpcMsg rsp;
|
||||||
|
int type;
|
||||||
|
void *handle;
|
||||||
|
|
||||||
|
while (1) {
|
||||||
|
if (taosReadQitemFromQset(tsMgmtQset, &type, (void **) &pMsg, &handle) == 0) {
|
||||||
|
dTrace("dnode mgmt got no message from qset, exit ...");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
dTrace("%p, msg:%s will be processed", pMsg->ahandle, taosMsg[pMsg->msgType]);
|
||||||
|
if (dnodeProcessMgmtMsgFp[pMsg->msgType]) {
|
||||||
|
rsp.code = (*dnodeProcessMgmtMsgFp[pMsg->msgType])(pMsg);
|
||||||
|
} else {
|
||||||
|
rsp.code = TSDB_CODE_MSG_NOT_PROCESSED;
|
||||||
|
}
|
||||||
|
|
||||||
|
rsp.handle = pMsg->handle;
|
||||||
|
rsp.pCont = NULL;
|
||||||
|
rpcSendResponse(&rsp);
|
||||||
|
|
||||||
|
rpcFreeCont(pMsg->pCont);
|
||||||
|
taosFreeQitem(pMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
rsp.handle = pMsg->handle;
|
return NULL;
|
||||||
rsp.pCont = NULL;
|
|
||||||
rpcSendResponse(&rsp);
|
|
||||||
|
|
||||||
rpcFreeCont(pMsg->pCont);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t dnodeGetVnodeList(int32_t vnodeList[], int32_t *numOfVnodes) {
|
static int32_t dnodeGetVnodeList(int32_t vnodeList[], int32_t *numOfVnodes) {
|
||||||
|
@ -284,22 +340,26 @@ static int32_t dnodeProcessConfigDnodeMsg(SRpcMsg *pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void dnodeUpdateMnodeIpSetForPeer(SRpcIpSet *pIpSet) {
|
void dnodeUpdateMnodeIpSetForPeer(SRpcIpSet *pIpSet) {
|
||||||
dPrint("mnode IP list for peer is changed, numOfIps:%d inUse:%d", pIpSet->numOfIps, pIpSet->inUse);
|
dPrint("mnode IP list for is changed, numOfIps:%d inUse:%d", pIpSet->numOfIps, pIpSet->inUse);
|
||||||
for (int i = 0; i < pIpSet->numOfIps; ++i) {
|
for (int i = 0; i < pIpSet->numOfIps; ++i) {
|
||||||
|
pIpSet->port[i] -= TSDB_PORT_DNODEDNODE;
|
||||||
dPrint("mnode index:%d %s:%u", i, pIpSet->fqdn[i], pIpSet->port[i])
|
dPrint("mnode index:%d %s:%u", i, pIpSet->fqdn[i], pIpSet->port[i])
|
||||||
}
|
}
|
||||||
|
|
||||||
tsDMnodeIpSetForPeer = *pIpSet;
|
tsDMnodeIpSet = *pIpSet;
|
||||||
}
|
}
|
||||||
|
|
||||||
void dnodeGetMnodeIpSetForPeer(void *ipSetRaw) {
|
void dnodeGetMnodeIpSetForPeer(void *ipSetRaw) {
|
||||||
SRpcIpSet *ipSet = ipSetRaw;
|
SRpcIpSet *ipSet = ipSetRaw;
|
||||||
*ipSet = tsDMnodeIpSetForPeer;
|
*ipSet = tsDMnodeIpSet;
|
||||||
|
|
||||||
|
for (int i=0; i<ipSet->numOfIps; ++i)
|
||||||
|
ipSet->port[i] += TSDB_PORT_DNODEDNODE;
|
||||||
}
|
}
|
||||||
|
|
||||||
void dnodeGetMnodeIpSetForShell(void *ipSetRaw) {
|
void dnodeGetMnodeIpSetForShell(void *ipSetRaw) {
|
||||||
SRpcIpSet *ipSet = ipSetRaw;
|
SRpcIpSet *ipSet = ipSetRaw;
|
||||||
*ipSet = tsDMnodeIpSetForShell;
|
*ipSet = tsDMnodeIpSet;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void dnodeProcessStatusRsp(SRpcMsg *pMsg) {
|
static void dnodeProcessStatusRsp(SRpcMsg *pMsg) {
|
||||||
|
@ -349,19 +409,10 @@ static void dnodeUpdateMnodeInfos(SDMMnodeInfos *pMnodes) {
|
||||||
dPrint("mnode index:%d, %s", tsDMnodeInfos.nodeInfos[i].nodeId, tsDMnodeInfos.nodeInfos[i].nodeEp);
|
dPrint("mnode index:%d, %s", tsDMnodeInfos.nodeInfos[i].nodeId, tsDMnodeInfos.nodeInfos[i].nodeEp);
|
||||||
}
|
}
|
||||||
|
|
||||||
tsDMnodeIpSetForPeer.inUse = tsDMnodeInfos.inUse;
|
tsDMnodeIpSet.inUse = tsDMnodeInfos.inUse;
|
||||||
tsDMnodeIpSetForPeer.numOfIps = tsDMnodeInfos.nodeNum;
|
tsDMnodeIpSet.numOfIps = tsDMnodeInfos.nodeNum;
|
||||||
for (int32_t i = 0; i < tsDMnodeInfos.nodeNum; i++) {
|
for (int32_t i = 0; i < tsDMnodeInfos.nodeNum; i++) {
|
||||||
taosGetFqdnPortFromEp(tsDMnodeInfos.nodeInfos[i].nodeEp, tsDMnodeIpSetForPeer.fqdn[i], &tsDMnodeIpSetForPeer.port[i]);
|
taosGetFqdnPortFromEp(tsDMnodeInfos.nodeInfos[i].nodeEp, tsDMnodeIpSet.fqdn[i], &tsDMnodeIpSet.port[i]);
|
||||||
tsDMnodeIpSetForPeer.port[i] += TSDB_PORT_DNODEDNODE;
|
|
||||||
dPrint("mnode index:%d, for peer %s %d", i, tsDMnodeIpSetForPeer.fqdn[i], tsDMnodeIpSetForPeer.port[i]);
|
|
||||||
}
|
|
||||||
|
|
||||||
tsDMnodeIpSetForShell.inUse = tsDMnodeInfos.inUse;
|
|
||||||
tsDMnodeIpSetForShell.numOfIps = tsDMnodeInfos.nodeNum;
|
|
||||||
for (int32_t i = 0; i < tsDMnodeInfos.nodeNum; i++) {
|
|
||||||
taosGetFqdnPortFromEp(tsDMnodeInfos.nodeInfos[i].nodeEp, tsDMnodeIpSetForShell.fqdn[i], &tsDMnodeIpSetForShell.port[i]);
|
|
||||||
dPrint("mnode index:%d, for shell %s %d", i, tsDMnodeIpSetForShell.fqdn[i], tsDMnodeIpSetForShell.port[i]);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
dnodeSaveMnodeInfos();
|
dnodeSaveMnodeInfos();
|
||||||
|
@ -487,7 +538,7 @@ static void dnodeSaveMnodeInfos() {
|
||||||
}
|
}
|
||||||
|
|
||||||
char *dnodeGetMnodeMasterEp() {
|
char *dnodeGetMnodeMasterEp() {
|
||||||
return tsDMnodeInfos.nodeInfos[tsDMnodeIpSetForPeer.inUse].nodeEp;
|
return tsDMnodeInfos.nodeInfos[tsDMnodeIpSet.inUse].nodeEp;
|
||||||
}
|
}
|
||||||
|
|
||||||
void* dnodeGetMnodeInfos() {
|
void* dnodeGetMnodeInfos() {
|
||||||
|
@ -534,7 +585,9 @@ static void dnodeSendStatusMsg(void *handle, void *tmrId) {
|
||||||
.msgType = TSDB_MSG_TYPE_DM_STATUS
|
.msgType = TSDB_MSG_TYPE_DM_STATUS
|
||||||
};
|
};
|
||||||
|
|
||||||
dnodeSendMsgToDnode(&tsDMnodeIpSetForPeer, &rpcMsg);
|
SRpcIpSet ipSet;
|
||||||
|
dnodeGetMnodeIpSetForPeer(&ipSet);
|
||||||
|
dnodeSendMsgToDnode(&ipSet, &rpcMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool dnodeReadDnodeCfg() {
|
static bool dnodeReadDnodeCfg() {
|
||||||
|
|
|
@ -43,10 +43,10 @@ int32_t dnodeInitServer() {
|
||||||
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE] = dnodeDispatchToVnodeWriteQueue;
|
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE] = dnodeDispatchToVnodeWriteQueue;
|
||||||
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_DROP_STABLE] = dnodeDispatchToVnodeWriteQueue;
|
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_DROP_STABLE] = dnodeDispatchToVnodeWriteQueue;
|
||||||
|
|
||||||
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = dnodeDispatchToDnodeMgmt;
|
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = dnodeDispatchToMgmtQueue;
|
||||||
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = dnodeDispatchToDnodeMgmt;
|
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = dnodeDispatchToMgmtQueue;
|
||||||
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = dnodeDispatchToDnodeMgmt;
|
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = dnodeDispatchToMgmtQueue;
|
||||||
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE] = dnodeDispatchToDnodeMgmt;
|
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE] = dnodeDispatchToMgmtQueue;
|
||||||
|
|
||||||
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_CONFIG_TABLE] = dnodeDispatchToMnodePeerQueue;
|
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_CONFIG_TABLE] = dnodeDispatchToMnodePeerQueue;
|
||||||
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_CONFIG_VNODE] = dnodeDispatchToMnodePeerQueue;
|
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_CONFIG_VNODE] = dnodeDispatchToMnodePeerQueue;
|
||||||
|
|
|
@ -420,13 +420,13 @@ void rpcSendResponse(const SRpcMsg *pRsp) {
|
||||||
pConn->rspMsgLen = msgLen;
|
pConn->rspMsgLen = msgLen;
|
||||||
if (pMsg->code == TSDB_CODE_ACTION_IN_PROGRESS) pConn->inTranId--;
|
if (pMsg->code == TSDB_CODE_ACTION_IN_PROGRESS) pConn->inTranId--;
|
||||||
|
|
||||||
rpcUnlockConn(pConn);
|
|
||||||
|
|
||||||
taosTmrStopA(&pConn->pTimer);
|
taosTmrStopA(&pConn->pTimer);
|
||||||
// taosTmrReset(rpcProcessIdleTimer, pRpc->idleTime, pConn, pRpc->tmrCtrl, &pConn->pIdleTimer);
|
// taosTmrReset(rpcProcessIdleTimer, pRpc->idleTime, pConn, pRpc->tmrCtrl, &pConn->pIdleTimer);
|
||||||
rpcSendMsgToPeer(pConn, msg, msgLen);
|
rpcSendMsgToPeer(pConn, msg, msgLen);
|
||||||
pConn->secured = 1; // connection shall be secured
|
pConn->secured = 1; // connection shall be secured
|
||||||
|
|
||||||
|
rpcUnlockConn(pConn);
|
||||||
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1095,10 +1095,10 @@ static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) {
|
||||||
pConn->reqMsgLen = msgLen;
|
pConn->reqMsgLen = msgLen;
|
||||||
pConn->pContext = pContext;
|
pConn->pContext = pContext;
|
||||||
|
|
||||||
rpcUnlockConn(pConn);
|
|
||||||
|
|
||||||
taosTmrReset(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl, &pConn->pTimer);
|
taosTmrReset(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl, &pConn->pTimer);
|
||||||
rpcSendMsgToPeer(pConn, msg, msgLen);
|
rpcSendMsgToPeer(pConn, msg, msgLen);
|
||||||
|
|
||||||
|
rpcUnlockConn(pConn);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) {
|
static void rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) {
|
||||||
|
|
Loading…
Reference in New Issue