shm
This commit is contained in:
parent
2a62d56845
commit
5bf4dfe8c4
|
@ -61,7 +61,7 @@ void dndProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, SEpSet *pEpSet) {
|
||||||
if ((pMsg = taosAllocateQitem(sizeof(SNodeMsg))) == NULL) goto _OVER;
|
if ((pMsg = taosAllocateQitem(sizeof(SNodeMsg))) == NULL) goto _OVER;
|
||||||
if (dndBuildMsg(pMsg, pRpc) != 0) goto _OVER;
|
if (dndBuildMsg(pMsg, pRpc) != 0) goto _OVER;
|
||||||
|
|
||||||
dTrace("msg:%p, is created, app:%p user:%s", pMsg, pRpc->ahandle, pMsg->user);
|
dTrace("msg:%p, is created, handle:%p app:%p user:%s", pMsg, pRpc->handle, pRpc->ahandle, pMsg->user);
|
||||||
if (pWrapper->procType == PROC_SINGLE) {
|
if (pWrapper->procType == PROC_SINGLE) {
|
||||||
code = (*msgFp)(pWrapper->pMgmt, pMsg);
|
code = (*msgFp)(pWrapper->pMgmt, pMsg);
|
||||||
} else if (pWrapper->procType == PROC_PARENT) {
|
} else if (pWrapper->procType == PROC_PARENT) {
|
||||||
|
|
|
@ -27,18 +27,18 @@ static void dndProcessResponse(void *parent, SRpcMsg *pRsp, SEpSet *pEpSet) {
|
||||||
|
|
||||||
if (dndGetStatus(pDnode) != DND_STAT_RUNNING) {
|
if (dndGetStatus(pDnode) != DND_STAT_RUNNING) {
|
||||||
if (pRsp == NULL || pRsp->pCont == NULL) return;
|
if (pRsp == NULL || pRsp->pCont == NULL) return;
|
||||||
dTrace("rsp:%s ignored since dnode exiting, app:%p", TMSG_INFO(msgType), pRsp->ahandle);
|
dTrace("rsp:%s ignored since dnode exiting, handle:%p app:%p", TMSG_INFO(msgType), pRsp->handle, pRsp->ahandle);
|
||||||
rpcFreeCont(pRsp->pCont);
|
rpcFreeCont(pRsp->pCont);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
SMsgHandle *pHandle = &pMgmt->msgHandles[TMSG_INDEX(msgType)];
|
SMsgHandle *pHandle = &pMgmt->msgHandles[TMSG_INDEX(msgType)];
|
||||||
if (pHandle->msgFp != NULL) {
|
if (pHandle->msgFp != NULL) {
|
||||||
dTrace("rsp:%s will be processed by %s, app:%p code:0x%x:%s", TMSG_INFO(msgType), pHandle->pWrapper->name,
|
dTrace("rsp:%s will be processed by %s, handle:%p app:%p code:0x%04x:%s", TMSG_INFO(msgType),
|
||||||
pRsp->ahandle, pRsp->code & 0XFFFF, tstrerror(pRsp->code));
|
pHandle->pWrapper->name, pRsp->handle, pRsp->ahandle, pRsp->code & 0XFFFF, tstrerror(pRsp->code));
|
||||||
dndProcessRpcMsg(pHandle->pWrapper, pRsp, pEpSet);
|
dndProcessRpcMsg(pHandle->pWrapper, pRsp, pEpSet);
|
||||||
} else {
|
} else {
|
||||||
dError("rsp:%s not processed since no handle, app:%p", TMSG_INFO(msgType), pRsp->ahandle);
|
dError("rsp:%s not processed since no handle, handle:%p app:%p", TMSG_INFO(msgType), pRsp->handle, pRsp->ahandle);
|
||||||
rpcFreeCont(pRsp->pCont);
|
rpcFreeCont(pRsp->pCont);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -88,13 +88,13 @@ static void dndProcessRequest(void *param, SRpcMsg *pReq, SEpSet *pEpSet) {
|
||||||
tmsg_t msgType = pReq->msgType;
|
tmsg_t msgType = pReq->msgType;
|
||||||
|
|
||||||
if (msgType == TDMT_DND_NETWORK_TEST) {
|
if (msgType == TDMT_DND_NETWORK_TEST) {
|
||||||
dTrace("RPC %p, network test req will be processed, app:%p", pReq->handle, pReq->ahandle);
|
dTrace("network test req will be processed, handle:%p, app:%p", pReq->handle, pReq->ahandle);
|
||||||
dndProcessStartupReq(pDnode, pReq);
|
dndProcessStartupReq(pDnode, pReq);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (dndGetStatus(pDnode) != DND_STAT_RUNNING) {
|
if (dndGetStatus(pDnode) != DND_STAT_RUNNING) {
|
||||||
dError("RPC %p, req:%s ignored since dnode not running, app:%p", pReq->handle, TMSG_INFO(msgType), pReq->ahandle);
|
dError("req:%s ignored since dnode not running, handle:%p app:%p", TMSG_INFO(msgType), pReq->handle, pReq->ahandle);
|
||||||
SRpcMsg rspMsg = {.handle = pReq->handle, .code = TSDB_CODE_APP_NOT_READY, .ahandle = pReq->ahandle};
|
SRpcMsg rspMsg = {.handle = pReq->handle, .code = TSDB_CODE_APP_NOT_READY, .ahandle = pReq->ahandle};
|
||||||
rpcSendResponse(&rspMsg);
|
rpcSendResponse(&rspMsg);
|
||||||
rpcFreeCont(pReq->pCont);
|
rpcFreeCont(pReq->pCont);
|
||||||
|
@ -102,7 +102,7 @@ static void dndProcessRequest(void *param, SRpcMsg *pReq, SEpSet *pEpSet) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pReq->pCont == NULL) {
|
if (pReq->pCont == NULL) {
|
||||||
dTrace("RPC %p, req:%s not processed since its empty, app:%p", pReq->handle, TMSG_INFO(msgType), pReq->ahandle);
|
dTrace("req:%s not processed since its empty, handle:%p app:%p", TMSG_INFO(msgType), pReq->handle, pReq->ahandle);
|
||||||
SRpcMsg rspMsg = {.handle = pReq->handle, .code = TSDB_CODE_DND_INVALID_MSG_LEN, .ahandle = pReq->ahandle};
|
SRpcMsg rspMsg = {.handle = pReq->handle, .code = TSDB_CODE_DND_INVALID_MSG_LEN, .ahandle = pReq->ahandle};
|
||||||
rpcSendResponse(&rspMsg);
|
rpcSendResponse(&rspMsg);
|
||||||
return;
|
return;
|
||||||
|
@ -110,11 +110,11 @@ static void dndProcessRequest(void *param, SRpcMsg *pReq, SEpSet *pEpSet) {
|
||||||
|
|
||||||
SMsgHandle *pHandle = &pMgmt->msgHandles[TMSG_INDEX(msgType)];
|
SMsgHandle *pHandle = &pMgmt->msgHandles[TMSG_INDEX(msgType)];
|
||||||
if (pHandle->msgFp != NULL) {
|
if (pHandle->msgFp != NULL) {
|
||||||
dTrace("RPC %p, req:%s will be processed by %s, app:%p", pReq->handle, TMSG_INFO(msgType), pHandle->pWrapper->name,
|
dTrace("req:%s will be processed by %s, handle:%p app:%p", TMSG_INFO(msgType), pHandle->pWrapper->name,
|
||||||
pReq->ahandle);
|
pReq->handle, pReq->ahandle);
|
||||||
dndProcessRpcMsg(pHandle->pWrapper, pReq, pEpSet);
|
dndProcessRpcMsg(pHandle->pWrapper, pReq, pEpSet);
|
||||||
} else {
|
} else {
|
||||||
dError("RPC %p, req:%s not processed since no handle, app:%p", pReq->handle, TMSG_INFO(msgType), pReq->ahandle);
|
dError("req:%s not processed since no handle, handle:%p app:%p", TMSG_INFO(msgType), pReq->handle, pReq->ahandle);
|
||||||
SRpcMsg rspMsg = {.handle = pReq->handle, .code = TSDB_CODE_MSG_NOT_PROCESSED, .ahandle = pReq->ahandle};
|
SRpcMsg rspMsg = {.handle = pReq->handle, .code = TSDB_CODE_MSG_NOT_PROCESSED, .ahandle = pReq->ahandle};
|
||||||
rpcSendResponse(&rspMsg);
|
rpcSendResponse(&rspMsg);
|
||||||
rpcFreeCont(pReq->pCont);
|
rpcFreeCont(pReq->pCont);
|
||||||
|
|
|
@ -32,12 +32,12 @@ static void mmProcessQueue(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) {
|
||||||
if (pRpc->handle == NULL) return;
|
if (pRpc->handle == NULL) return;
|
||||||
if (code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
|
if (code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
|
||||||
if (code != 0) code = terrno;
|
if (code != 0) code = terrno;
|
||||||
SRpcMsg rsp = {.handle = pRpc->handle, .contLen = pMsg->rspLen, .pCont = pMsg->pRsp};
|
SRpcMsg rsp = {.handle = pRpc->handle, .contLen = pMsg->rspLen, .pCont = pMsg->pRsp, .code = code};
|
||||||
dndSendRsp(pMgmt->pWrapper, &rsp);
|
dndSendRsp(pMgmt->pWrapper, &rsp);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
dTrace("msg:%p, is freed", pMsg);
|
dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
|
||||||
rpcFreeCont(pRpc->pCont);
|
rpcFreeCont(pRpc->pCont);
|
||||||
taosFreeQitem(pMsg);
|
taosFreeQitem(pMsg);
|
||||||
}
|
}
|
||||||
|
@ -90,7 +90,7 @@ static int32_t mmPutRpcMsgToWorker(SMnodeMgmt *pMgmt, SDnodeWorker *pWorker, SRp
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
dTrace("msg:%p, is created", pMsg);
|
dTrace("msg:%p, is created, type:%s", pMsg, TMSG_INFO(pRpc->msgType));
|
||||||
pMsg->rpcMsg = *pRpc;
|
pMsg->rpcMsg = *pRpc;
|
||||||
|
|
||||||
int32_t code = mmPutMsgToWorker(pMgmt, pWorker, pMsg);
|
int32_t code = mmPutMsgToWorker(pMgmt, pWorker, pMsg);
|
||||||
|
|
|
@ -21,9 +21,9 @@ void Testbase::InitLog(const char* path) {
|
||||||
mDebugFlag = 143;
|
mDebugFlag = 143;
|
||||||
cDebugFlag = 0;
|
cDebugFlag = 0;
|
||||||
jniDebugFlag = 0;
|
jniDebugFlag = 0;
|
||||||
tmrDebugFlag = 143;
|
tmrDebugFlag = 135;
|
||||||
uDebugFlag = 143;
|
uDebugFlag = 135;
|
||||||
rpcDebugFlag = 143;
|
rpcDebugFlag = 135;
|
||||||
qDebugFlag = 0;
|
qDebugFlag = 0;
|
||||||
wDebugFlag = 0;
|
wDebugFlag = 0;
|
||||||
sDebugFlag = 0;
|
sDebugFlag = 0;
|
||||||
|
|
|
@ -760,7 +760,7 @@ static void mndTransSendRpcRsp(STrans *pTrans) {
|
||||||
}
|
}
|
||||||
free(pTrans->rpcRsp);
|
free(pTrans->rpcRsp);
|
||||||
|
|
||||||
mDebug("trans:%d, send rsp, code:0x%x stage:%d app:%p", pTrans->id, pTrans->code & 0xFFFF, pTrans->stage,
|
mDebug("trans:%d, send rsp, code:0x%04x stage:%d app:%p", pTrans->id, pTrans->code & 0xFFFF, pTrans->stage,
|
||||||
pTrans->rpcAHandle);
|
pTrans->rpcAHandle);
|
||||||
SRpcMsg rspMsg = {.handle = pTrans->rpcHandle,
|
SRpcMsg rspMsg = {.handle = pTrans->rpcHandle,
|
||||||
.code = pTrans->code,
|
.code = pTrans->code,
|
||||||
|
@ -816,7 +816,7 @@ void mndTransProcessRsp(SNodeMsg *pRsp) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
mDebug("trans:%d, action:%d response is received, code:0x%x, accept:0x%x", transId, action, pRsp->rpcMsg.code,
|
mDebug("trans:%d, action:%d response is received, code:0x%04x, accept:0x%04x", transId, action, pRsp->rpcMsg.code,
|
||||||
pAction->acceptableCode);
|
pAction->acceptableCode);
|
||||||
mndTransExecute(pMnode, pTrans);
|
mndTransExecute(pMnode, pTrans);
|
||||||
|
|
||||||
|
@ -928,13 +928,13 @@ static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pA
|
||||||
mDebug("trans:%d, all %d actions execute successfully", pTrans->id, numOfActions);
|
mDebug("trans:%d, all %d actions execute successfully", pTrans->id, numOfActions);
|
||||||
return 0;
|
return 0;
|
||||||
} else {
|
} else {
|
||||||
mError("trans:%d, all %d actions executed, code:0x%x", pTrans->id, numOfActions, errCode);
|
mError("trans:%d, all %d actions executed, code:0x%04x", pTrans->id, numOfActions, errCode);
|
||||||
mndTransResetActions(pMnode, pTrans, pArray);
|
mndTransResetActions(pMnode, pTrans, pArray);
|
||||||
terrno = errCode;
|
terrno = errCode;
|
||||||
return errCode;
|
return errCode;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
mDebug("trans:%d, %d of %d actions executed, code:0x%x", pTrans->id, numOfReceived, numOfActions, errCode);
|
mDebug("trans:%d, %d of %d actions executed, code:0x%04x", pTrans->id, numOfReceived, numOfActions, errCode);
|
||||||
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
|
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1111,7 +1111,7 @@ static bool mndTransPerfromFinishedStage(SMnode *pMnode, STrans *pTrans) {
|
||||||
mError("trans:%d, failed to write sdb since %s", pTrans->id, terrstr());
|
mError("trans:%d, failed to write sdb since %s", pTrans->id, terrstr());
|
||||||
}
|
}
|
||||||
|
|
||||||
mDebug("trans:%d, finished, code:0x%x, failedTimes:%d", pTrans->id, pTrans->code, pTrans->failedTimes);
|
mDebug("trans:%d, finished, code:0x%04x, failedTimes:%d", pTrans->id, pTrans->code, pTrans->failedTimes);
|
||||||
return continueExec;
|
return continueExec;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -392,7 +392,7 @@ int32_t mndProcessMsg(SNodeMsg *pMsg) {
|
||||||
void *ahandle = pMsg->rpcMsg.ahandle;
|
void *ahandle = pMsg->rpcMsg.ahandle;
|
||||||
bool isReq = (pRpc->msgType & 1U);
|
bool isReq = (pRpc->msgType & 1U);
|
||||||
|
|
||||||
mTrace("msg:%p, type:%s will be processed, app:%p", pMsg, TMSG_INFO(msgType), ahandle);
|
mTrace("msg:%p, will be processed, type:%s app:%p", pMsg, TMSG_INFO(msgType), ahandle);
|
||||||
|
|
||||||
if (isReq && !mndIsMaster(pMnode)) {
|
if (isReq && !mndIsMaster(pMnode)) {
|
||||||
terrno = TSDB_CODE_APP_NOT_READY;
|
terrno = TSDB_CODE_APP_NOT_READY;
|
||||||
|
|
Loading…
Reference in New Issue