This commit is contained in:
Shengliang Guan 2022-03-11 18:18:45 +08:00
parent 712587511c
commit ad1609fe7f
4 changed files with 48 additions and 28 deletions

View File

@ -53,7 +53,7 @@ void mmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
int32_t mmPutMsgToWriteQueue(SDnode *pDnode, SRpcMsg *pRpcMsg);
int32_t mmPutMsgToReadQueue(SDnode *pDnode, SRpcMsg *pRpcMsg);
void mmConsumeChildQueue(SDnode *pDnode, SMndMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen);
void mmConsumeParentQueue(SDnode *pDnode, SMndMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen);
void mmConsumeParentQueue(SDnode *pDnode, SRpcMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen);
#ifdef __cplusplus
}

View File

@ -130,8 +130,8 @@ int32_t mmOpen(SDnode *pDnode, SMnodeOpt *pOption) {
.childFreeBodyFp = (ProcFreeFp)rpcFreeCont,
.parentQueueSize = 1024 * 1024,
.parentConsumeFp = (ProcConsumeFp)mmConsumeParentQueue,
.parentdMallocHeadFp = (ProcMallocFp)taosAllocateQitem,
.parentFreeHeadFp = (ProcFreeFp)taosFreeQitem,
.parentdMallocHeadFp = (ProcMallocFp)malloc,
.parentFreeHeadFp = (ProcFreeFp)free,
.parentMallocBodyFp = (ProcMallocFp)rpcMallocCont,
.parentFreeBodyFp = (ProcFreeFp)rpcFreeCont,
.testFlag = true,

View File

@ -139,6 +139,14 @@ void mmInitMsgFp(SMndMgmt *pMgmt) {
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_DROP_STB_RSP)] = mmProcessWriteMsg;
}
static void mmSendRpcRsp(SDnode *pDnode, SRpcMsg *pRpc) {
if (pRpc->code == TSDB_CODE_DND_MNODE_NOT_DEPLOYED || pRpc->code == TSDB_CODE_APP_NOT_READY) {
dndSendRedirectRsp(pDnode, pRpc);
} else {
rpcSendResponse(pRpc);
}
}
static int32_t mmBuildMsg(SMndMsg *pMsg, SRpcMsg *pRpc) {
SRpcConnInfo connInfo = {0};
if ((pRpc->msgType & 1U) && rpcGetConnInfo(pRpc->handle, &connInfo) != 0) {
@ -183,15 +191,16 @@ void mmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
_OVER:
if (code != 0) {
if (code == 0) {
if (!pMgmt->singleProc) {
taosFreeQitem(pMsg);
rpcFreeCont(pRpc->pCont);
}
} else {
bool isReq = (pRpc->msgType & 1U);
if (isReq) {
if (terrno == TSDB_CODE_DND_MNODE_NOT_DEPLOYED || terrno == TSDB_CODE_APP_NOT_READY) {
dndSendRedirectRsp(pDnode, pRpc);
} else {
SRpcMsg rsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = terrno};
rpcSendResponse(&rsp);
}
SRpcMsg rsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = terrno};
mmSendRpcRsp(pDnode, &rsp);
}
taosFreeQitem(pMsg);
rpcFreeCont(pRpc->pCont);
@ -245,6 +254,22 @@ static int32_t mmPutRpcMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SRpcMs
return code;
}
void mmPutRpcRspToWorker(SDnode *pDnode, SRpcMsg *pRpc) {
SMndMgmt *pMgmt = &pDnode->mmgmt;
int32_t code = -1;
if (pMgmt->singleProc) {
mmSendRpcRsp(pDnode, pRpc);
} else {
do {
code = taosProcPutToParentQueue(pMgmt->pProcess, pRpc, sizeof(SRpcMsg), pRpc->pCont, pRpc->contLen);
if (code != 0) {
taosMsleep(10);
}
} while (code != 0);
}
}
void mmConsumeChildQueue(SDnode *pDnode, SMndMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen) {
SMndMgmt *pMgmt = &pDnode->mmgmt;
@ -257,25 +282,23 @@ void mmConsumeChildQueue(SDnode *pDnode, SMndMsg *pMsg, int32_t msgLen, void *pC
if (code != 0) {
bool isReq = (pRpc->msgType & 1U);
if (isReq) {
if (terrno == TSDB_CODE_DND_MNODE_NOT_DEPLOYED || terrno == TSDB_CODE_APP_NOT_READY) {
dndSendRedirectRsp(pDnode, pRpc);
} else {
SRpcMsg rsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = terrno};
rpcSendResponse(&rsp);
}
SRpcMsg rsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = terrno};
mmPutRpcRspToWorker(pDnode, &rsp);
}
taosFreeQitem(pMsg);
rpcFreeCont(pCont);
}
}
void mmConsumeParentQueue(SDnode *pDnode, SMndMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen) {}
void mmConsumeParentQueue(SDnode *pDnode, SRpcMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen) {
pMsg->pCont = pCont;
mmSendRpcRsp(pDnode, pMsg);
free(pMsg);
}
static void mmConsumeMsgQueue(SDnode *pDnode, SMndMsg *pMsg) {
SMnode *pMnode = mmAcquire(pDnode);
SRpcMsg *pRpc = &pMsg->rpcMsg;
tmsg_t msgType = pMsg->rpcMsg.msgType;
void *ahandle = pMsg->rpcMsg.ahandle;
bool isReq = (pRpc->msgType & 1U);
int32_t code = -1;
@ -289,18 +312,15 @@ static void mmConsumeMsgQueue(SDnode *pDnode, SMndMsg *pMsg) {
if (pMsg->rpcMsg.handle == NULL) return;
if (code == 0) {
SRpcMsg rsp = {.handle = pRpc->handle, .contLen = pMsg->contLen, .pCont = pMsg->pCont};
rpcSendResponse(&rsp);
mmPutRpcRspToWorker(pDnode, &rsp);
} else {
if (terrno == TSDB_CODE_APP_NOT_READY) {
dndSendRedirectRsp(pDnode, pRpc);
} else if (terrno == TSDB_CODE_MND_ACTION_IN_PROGRESS) {
} else {
if (terrno != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
SRpcMsg rsp = {.handle = pRpc->handle, .contLen = pMsg->contLen, .pCont = pMsg->pCont, .code = terrno};
rpcSendResponse(&rsp);
mmPutRpcRspToWorker(pDnode, &rsp);
}
}
}
taosFreeQitem(pMsg);
rpcFreeCont(pRpc->pCont);
taosFreeQitem(pMsg);
}

View File

@ -149,8 +149,8 @@ static int32_t taosProcQueuePush(SProcQueue *pQueue, char *pHead, int32_t rawHea
pthread_mutex_unlock(&pQueue->mutex);
tsem_post(&pQueue->sem);
(*pQueue->freeHeadFp)(pHead);
(*pQueue->freeBodyFp)(pBody);
// (*pQueue->freeHeadFp)(pHead);
// (*pQueue->freeBodyFp)(pBody);
return 0;
}