shm
This commit is contained in:
parent
5bf4dfe8c4
commit
718e399455
|
@ -45,6 +45,7 @@ static void bmProcessQueue(SBnodeMgmt *pMgmt, STaosQall *qall, int32_t numOfMsgs
|
||||||
for (int32_t i = 0; i < numOfMsgs; ++i) {
|
for (int32_t i = 0; i < numOfMsgs; ++i) {
|
||||||
SNodeMsg *pMsg = NULL;
|
SNodeMsg *pMsg = NULL;
|
||||||
taosGetQitem(qall, (void **)&pMsg);
|
taosGetQitem(qall, (void **)&pMsg);
|
||||||
|
dTrace("msg:%p, will be processed in bnode queue", pMsg);
|
||||||
if (taosArrayPush(pArray, &pMsg) == NULL) {
|
if (taosArrayPush(pArray, &pMsg) == NULL) {
|
||||||
bmSendErrorRsp(pWrapper, pMsg, TSDB_CODE_OUT_OF_MEMORY);
|
bmSendErrorRsp(pWrapper, pMsg, TSDB_CODE_OUT_OF_MEMORY);
|
||||||
}
|
}
|
||||||
|
|
|
@ -77,7 +77,7 @@ _OVER:
|
||||||
rpcFreeCont(pRpc->pCont);
|
rpcFreeCont(pRpc->pCont);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
dError("msg:%p, failed to process since %s", pMsg, terrstr());
|
dError("msg:%p, failed to process since 0x%04x:%s", pMsg, code & 0XFFFF, terrstr());
|
||||||
if (pRpc->msgType & 1U) {
|
if (pRpc->msgType & 1U) {
|
||||||
SRpcMsg rsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = terrno};
|
SRpcMsg rsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = terrno};
|
||||||
dndSendRsp(pWrapper, &rsp);
|
dndSendRsp(pWrapper, &rsp);
|
||||||
|
@ -95,6 +95,7 @@ static int32_t dndProcessCreateNodeMsg(SDnode *pDnode, ENodeType ntype, SNodeMsg
|
||||||
if (pWrapper != NULL) {
|
if (pWrapper != NULL) {
|
||||||
dndReleaseWrapper(pWrapper);
|
dndReleaseWrapper(pWrapper);
|
||||||
terrno = TSDB_CODE_NODE_ALREADY_DEPLOYED;
|
terrno = TSDB_CODE_NODE_ALREADY_DEPLOYED;
|
||||||
|
dError("failed to create node since %s", terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -121,6 +122,7 @@ static int32_t dndProcessDropNodeMsg(SDnode *pDnode, ENodeType ntype, SNodeMsg *
|
||||||
SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, ntype);
|
SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, ntype);
|
||||||
if (pWrapper == NULL) {
|
if (pWrapper == NULL) {
|
||||||
terrno = TSDB_CODE_NODE_NOT_DEPLOYED;
|
terrno = TSDB_CODE_NODE_NOT_DEPLOYED;
|
||||||
|
dError("failed to drop node since %s", terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -293,7 +293,7 @@ int32_t dndSendReqToMnode(SMgmtWrapper *pWrapper, SRpcMsg *pReq) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void dndSendRpcRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp) {
|
void dndSendRpcRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp) {
|
||||||
if (pRsp->code == TSDB_CODE_NODE_NOT_DEPLOYED || pRsp->code == TSDB_CODE_APP_NOT_READY) {
|
if (pRsp->code == TSDB_CODE_APP_NOT_READY) {
|
||||||
SMgmtWrapper *pDnodeWrapper = dndAcquireWrapper(pWrapper->pDnode, DNODE);
|
SMgmtWrapper *pDnodeWrapper = dndAcquireWrapper(pWrapper->pDnode, DNODE);
|
||||||
dmSendRedirectRsp(pDnodeWrapper->pMgmt, pRsp);
|
dmSendRedirectRsp(pDnodeWrapper->pMgmt, pRsp);
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -53,12 +53,12 @@ static void *dmThreadRoutine(void *param) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static void dmProcessQueue(SDnodeMgmt *pMgmt, SNodeMsg *pMsg) {
|
static void dmProcessQueue(SDnodeMgmt *pMgmt, SNodeMsg *pMsg) {
|
||||||
int32_t code = -1;
|
|
||||||
tmsg_t msgType = pMsg->rpcMsg.msgType;
|
|
||||||
SDnode *pDnode = pMgmt->pDnode;
|
SDnode *pDnode = pMgmt->pDnode;
|
||||||
|
SRpcMsg *pRpc = &pMsg->rpcMsg;
|
||||||
|
int32_t code = -1;
|
||||||
dTrace("msg:%p, will be processed in dnode queue", pMsg);
|
dTrace("msg:%p, will be processed in dnode queue", pMsg);
|
||||||
|
|
||||||
switch (msgType) {
|
switch (pRpc->msgType) {
|
||||||
case TDMT_DND_CREATE_MNODE:
|
case TDMT_DND_CREATE_MNODE:
|
||||||
case TDMT_DND_CREATE_QNODE:
|
case TDMT_DND_CREATE_QNODE:
|
||||||
case TDMT_DND_CREATE_SNODE:
|
case TDMT_DND_CREATE_SNODE:
|
||||||
|
@ -84,16 +84,16 @@ static void dmProcessQueue(SDnodeMgmt *pMgmt, SNodeMsg *pMsg) {
|
||||||
default:
|
default:
|
||||||
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
|
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
|
||||||
code = -1;
|
code = -1;
|
||||||
dError("RPC %p, dnode msg:%s not processed in dnode queue", pMsg->rpcMsg.handle, TMSG_INFO(msgType));
|
dError("msg:%p, type:%s not processed in dnode queue", pRpc->handle, TMSG_INFO(pRpc->msgType));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (msgType & 1u) {
|
if (pRpc->msgType & 1u) {
|
||||||
if (code != 0) code = terrno;
|
if (code != 0) code = terrno;
|
||||||
SRpcMsg rsp = {.code = code, .handle = pMsg->rpcMsg.handle, .ahandle = pMsg->rpcMsg.ahandle};
|
SRpcMsg rsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = code};
|
||||||
rpcSendResponse(&rsp);
|
rpcSendResponse(&rsp);
|
||||||
}
|
}
|
||||||
|
|
||||||
dTrace("msg:%p, is freed", pMsg);
|
dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
|
||||||
rpcFreeCont(pMsg->rpcMsg.pCont);
|
rpcFreeCont(pMsg->rpcMsg.pCont);
|
||||||
taosFreeQitem(pMsg);
|
taosFreeQitem(pMsg);
|
||||||
}
|
}
|
||||||
|
|
|
@ -32,7 +32,7 @@ static void mmProcessQueue(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) {
|
||||||
if (pRpc->handle == NULL) return;
|
if (pRpc->handle == NULL) return;
|
||||||
if (code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
|
if (code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
|
||||||
if (code != 0) code = terrno;
|
if (code != 0) code = terrno;
|
||||||
SRpcMsg rsp = {.handle = pRpc->handle, .contLen = pMsg->rspLen, .pCont = pMsg->pRsp, .code = code};
|
SRpcMsg rsp = {.handle = pRpc->handle, .code = code, .contLen = pMsg->rspLen, .pCont = pMsg->pRsp};
|
||||||
dndSendRsp(pMgmt->pWrapper, &rsp);
|
dndSendRsp(pMgmt->pWrapper, &rsp);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -34,7 +34,7 @@ static void qmProcessQueue(SQnodeMgmt *pMgmt, SNodeMsg *pMsg) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
dTrace("msg:%p, is freed", pMsg);
|
dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
|
||||||
rpcFreeCont(pRpc->pCont);
|
rpcFreeCont(pRpc->pCont);
|
||||||
taosFreeQitem(pMsg);
|
taosFreeQitem(pMsg);
|
||||||
}
|
}
|
||||||
|
|
|
@ -30,12 +30,12 @@ static void smProcessUniqueQueue(SSnodeMgmt *pMgmt, STaosQall *qall, int32_t num
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void smProcessSharedQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
static void smProcessSharedQueue(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) {
|
||||||
dTrace("msg:%p, will be processed in snode shared queue", pMsg);
|
dTrace("msg:%p, will be processed in snode shared queue", pMsg);
|
||||||
sndProcessSMsg(pMgmt->pSnode, pMsg);
|
sndProcessSMsg(pMgmt->pSnode, &pMsg->rpcMsg);
|
||||||
|
|
||||||
dTrace("msg:%p, is freed", pMsg);
|
dTrace("msg:%p, is freed", pMsg);
|
||||||
rpcFreeCont(pMsg->pCont);
|
rpcFreeCont(pMsg->rpcMsg.pCont);
|
||||||
taosFreeQitem(pMsg);
|
taosFreeQitem(pMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -40,7 +40,7 @@ TEST_F(DndTestMnode, 01_Create_Mnode) {
|
||||||
|
|
||||||
SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_MNODE, pReq, contLen);
|
SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_MNODE, pReq, contLen);
|
||||||
ASSERT_NE(pRsp, nullptr);
|
ASSERT_NE(pRsp, nullptr);
|
||||||
ASSERT_EQ(pRsp->code, TSDB_CODE_NODE_INVALID_OPTION);
|
ASSERT_EQ(pRsp->code, TSDB_CODE_NODE_ALREADY_DEPLOYED);
|
||||||
}
|
}
|
||||||
|
|
||||||
{
|
{
|
||||||
|
@ -57,7 +57,7 @@ TEST_F(DndTestMnode, 01_Create_Mnode) {
|
||||||
|
|
||||||
SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_MNODE, pReq, contLen);
|
SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_MNODE, pReq, contLen);
|
||||||
ASSERT_NE(pRsp, nullptr);
|
ASSERT_NE(pRsp, nullptr);
|
||||||
ASSERT_EQ(pRsp->code, TSDB_CODE_NODE_INVALID_OPTION);
|
ASSERT_EQ(pRsp->code, TSDB_CODE_NODE_ALREADY_DEPLOYED);
|
||||||
}
|
}
|
||||||
|
|
||||||
{
|
{
|
||||||
|
|
|
@ -26,6 +26,7 @@ static void vmProcessWriteQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numO
|
||||||
for (int32_t i = 0; i < numOfMsgs; ++i) {
|
for (int32_t i = 0; i < numOfMsgs; ++i) {
|
||||||
SNodeMsg *pMsg = NULL;
|
SNodeMsg *pMsg = NULL;
|
||||||
taosGetQitem(qall, (void **)&pMsg);
|
taosGetQitem(qall, (void **)&pMsg);
|
||||||
|
dTrace("msg:%p, will be processed in vnode write queue", pMsg);
|
||||||
void *ptr = taosArrayPush(pArray, &pMsg);
|
void *ptr = taosArrayPush(pArray, &pMsg);
|
||||||
assert(ptr != NULL);
|
assert(ptr != NULL);
|
||||||
}
|
}
|
||||||
|
@ -34,24 +35,25 @@ static void vmProcessWriteQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numO
|
||||||
|
|
||||||
for (size_t i = 0; i < numOfMsgs; i++) {
|
for (size_t i = 0; i < numOfMsgs; i++) {
|
||||||
SRpcMsg *pRsp = NULL;
|
SRpcMsg *pRsp = NULL;
|
||||||
SNodeMsg *pNodeMsg = *(SNodeMsg **)taosArrayGet(pArray, i);
|
SNodeMsg *pMsg = *(SNodeMsg **)taosArrayGet(pArray, i);
|
||||||
SRpcMsg *pMsg = &pNodeMsg->rpcMsg;
|
SRpcMsg *pRpc = &pMsg->rpcMsg;
|
||||||
int32_t code = vnodeApplyWMsg(pVnode->pImpl, pMsg, &pRsp);
|
int32_t code = vnodeApplyWMsg(pVnode->pImpl, pRpc, &pRsp);
|
||||||
if (pRsp != NULL) {
|
if (pRsp != NULL) {
|
||||||
pRsp->ahandle = pMsg->ahandle;
|
pRsp->ahandle = pRpc->ahandle;
|
||||||
rpcSendResponse(pRsp);
|
rpcSendResponse(pRsp);
|
||||||
free(pRsp);
|
free(pRsp);
|
||||||
} else {
|
} else {
|
||||||
if (code != 0) code = terrno;
|
if (code != 0) code = terrno;
|
||||||
SRpcMsg rpcRsp = {.handle = pMsg->handle, .ahandle = pMsg->ahandle, .code = code};
|
SRpcMsg rpcRsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = code};
|
||||||
rpcSendResponse(&rpcRsp);
|
rpcSendResponse(&rpcRsp);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for (size_t i = 0; i < numOfMsgs; i++) {
|
for (size_t i = 0; i < numOfMsgs; i++) {
|
||||||
SNodeMsg *pNodeMsg = *(SNodeMsg **)taosArrayGet(pArray, i);
|
SNodeMsg *pMsg = *(SNodeMsg **)taosArrayGet(pArray, i);
|
||||||
rpcFreeCont(pNodeMsg->rpcMsg.pCont);
|
dTrace("msg:%p, is freed", pMsg);
|
||||||
taosFreeQitem(pNodeMsg);
|
rpcFreeCont(pMsg->rpcMsg.pCont);
|
||||||
|
taosFreeQitem(pMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosArrayDestroy(pArray);
|
taosArrayDestroy(pArray);
|
||||||
|
|
Loading…
Reference in New Issue