Merge pull request #12744 from taosdata/fix/mnode

fix: wrong ref count
This commit is contained in:
Shengliang Guan 2022-05-20 15:54:52 +08:00 committed by GitHub
commit 292735a1af
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 34 additions and 40 deletions

View File

@ -22,21 +22,19 @@
static inline void vmSendRsp(SRpcMsg *pMsg, int32_t code) { static inline void vmSendRsp(SRpcMsg *pMsg, int32_t code) {
SRpcMsg rsp = { SRpcMsg rsp = {
.code = code, .code = code,
.info = pMsg->info,
.pCont = pMsg->info.rsp, .pCont = pMsg->info.rsp,
.contLen = pMsg->info.rspLen, .contLen = pMsg->info.rspLen,
.info = pMsg->info,
}; };
tmsgSendRsp(&rsp); tmsgSendRsp(&rsp);
} }
static void vmProcessMgmtMonitorQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { static void vmProcessMgmtMonitorQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
SVnodeMgmt *pMgmt = pInfo->ahandle; SVnodeMgmt *pMgmt = pInfo->ahandle;
int32_t code = -1;
dTrace("msg:%p, get from vnode queue, type:%s", pMsg, TMSG_INFO(pMsg->msgType));
int32_t code = -1; switch (pMsg->msgType) {
tmsg_t msgType = pMsg->msgType;
dTrace("msg:%p, get from vnode queue, type:%s", pMsg, TMSG_INFO(msgType));
switch (msgType) {
case TDMT_MON_VM_INFO: case TDMT_MON_VM_INFO:
code = vmProcessGetMonitorInfoReq(pMgmt, pMsg); code = vmProcessGetMonitorInfoReq(pMgmt, pMsg);
break; break;
@ -54,7 +52,7 @@ static void vmProcessMgmtMonitorQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
dError("msg:%p, not processed in vnode queue", pMsg); dError("msg:%p, not processed in vnode queue", pMsg);
} }
if (msgType & 1u) { if (IsReq(pMsg)) {
if (code != 0 && terrno != 0) code = terrno; if (code != 0 && terrno != 0) code = terrno;
vmSendRsp(pMsg, code); vmSendRsp(pMsg, code);
} }
@ -96,7 +94,6 @@ static void vmProcessFetchQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
SVnodeObj *pVnode = pInfo->ahandle; SVnodeObj *pVnode = pInfo->ahandle;
SArray *pArray = taosArrayInit(numOfMsgs, sizeof(SRpcMsg *)); SArray *pArray = taosArrayInit(numOfMsgs, sizeof(SRpcMsg *));
if (pArray == NULL) { if (pArray == NULL) {
dError("failed to process %d msgs in write-queue since %s", numOfMsgs, terrstr()); dError("failed to process %d msgs in write-queue since %s", numOfMsgs, terrstr());
@ -116,7 +113,7 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
for (int i = 0; i < taosArrayGetSize(pArray); i++) { for (int i = 0; i < taosArrayGetSize(pArray); i++) {
SRpcMsg *pMsg = *(SRpcMsg **)taosArrayGet(pArray, i); SRpcMsg *pMsg = *(SRpcMsg **)taosArrayGet(pArray, i);
SRpcMsg rsp = {.info = pMsg->info, .pCont = NULL, .contLen = 0}; SRpcMsg rsp = {.info = pMsg->info};
int32_t ret = syncPropose(vnodeGetSyncHandle(pVnode->pImpl), pMsg, false); int32_t ret = syncPropose(vnodeGetSyncHandle(pVnode->pImpl), pMsg, false);
if (ret == TAOS_SYNC_PROPOSE_NOT_LEADER) { if (ret == TAOS_SYNC_PROPOSE_NOT_LEADER) {
@ -130,7 +127,6 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
rsp.code = TSDB_CODE_SYN_INTERNAL_ERROR; rsp.code = TSDB_CODE_SYN_INTERNAL_ERROR;
tmsgSendRsp(&rsp); tmsgSendRsp(&rsp);
} else if (ret == TAOS_SYNC_PROPOSE_SUCCESS) { } else if (ret == TAOS_SYNC_PROPOSE_SUCCESS) {
// ok
// send response in applyQ // send response in applyQ
} else { } else {
assert(0); assert(0);
@ -149,16 +145,13 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
SVnodeObj *pVnode = pInfo->ahandle; SVnodeObj *pVnode = pInfo->ahandle;
SRpcMsg *pMsg = NULL;
SRpcMsg rsp;
for (int32_t i = 0; i < numOfMsgs; ++i) { for (int32_t i = 0; i < numOfMsgs; ++i) {
SRpcMsg *pMsg = NULL;
taosGetQitem(qall, (void **)&pMsg); taosGetQitem(qall, (void **)&pMsg);
// init response rpc msg // init response rpc msg
rsp.code = 0; SRpcMsg rsp = {0};
rsp.pCont = NULL;
rsp.contLen = 0;
// get original rpc msg // get original rpc msg
assert(pMsg->msgType == TDMT_VND_SYNC_APPLY_MSG); assert(pMsg->msgType == TDMT_VND_SYNC_APPLY_MSG);
@ -177,7 +170,6 @@ static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
rpcFreeCont(originalRpcMsg.pCont); rpcFreeCont(originalRpcMsg.pCont);
// if leader, send response // if leader, send response
// if (pMsg->rpcMsg.handle != NULL && pMsg->rpcMsg.ahandle != NULL) {
if (pMsg->info.handle != NULL) { if (pMsg->info.handle != NULL) {
rsp.info = pMsg->info; rsp.info = pMsg->info;
tmsgSendRsp(&rsp); tmsgSendRsp(&rsp);
@ -190,21 +182,19 @@ static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
SVnodeObj *pVnode = pInfo->ahandle; SVnodeObj *pVnode = pInfo->ahandle;
SRpcMsg *pMsg = NULL;
for (int32_t i = 0; i < numOfMsgs; ++i) { for (int32_t i = 0; i < numOfMsgs; ++i) {
SRpcMsg *pMsg = NULL;
taosGetQitem(qall, (void **)&pMsg); taosGetQitem(qall, (void **)&pMsg);
// todo int32_t code = vnodeProcessSyncReq(pVnode->pImpl, pMsg, NULL);
SRpcMsg *pRsp = NULL; if (code != 0) {
int32_t ret = vnodeProcessSyncReq(pVnode->pImpl, pMsg, &pRsp);
if (ret != 0) {
// if leader, send response
if (pMsg->info.handle != NULL) { if (pMsg->info.handle != NULL) {
SRpcMsg rsp = {0}; SRpcMsg rsp = {
rsp.code = terrno; .code = (terrno < 0) ? terrno : code,
rsp.info = pMsg->info; .info = pMsg->info,
dTrace("msg:%p, process sync queue error since code:%s", pMsg, terrstr()); };
dTrace("msg:%p, failed to process sync queue since %s", pMsg, terrstr());
tmsgSendRsp(&rsp); tmsgSendRsp(&rsp);
} }
} }
@ -216,9 +206,9 @@ static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOf
static void vmProcessMergeQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { static void vmProcessMergeQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
SVnodeObj *pVnode = pInfo->ahandle; SVnodeObj *pVnode = pInfo->ahandle;
SRpcMsg *pMsg = NULL;
for (int32_t i = 0; i < numOfMsgs; ++i) { for (int32_t i = 0; i < numOfMsgs; ++i) {
SRpcMsg *pMsg = NULL;
taosGetQitem(qall, (void **)&pMsg); taosGetQitem(qall, (void **)&pMsg);
dTrace("msg:%p, get from vnode-merge queue", pMsg); dTrace("msg:%p, get from vnode-merge queue", pMsg);
@ -308,22 +298,24 @@ int32_t vmPutNodeMsgToMgmtQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
int32_t vmPutNodeMsgToMonitorQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { int32_t vmPutNodeMsgToMonitorQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
SSingleWorker *pWorker = &pMgmt->monitorWorker; SSingleWorker *pWorker = &pMgmt->monitorWorker;
dTrace("msg:%p, put into vnode-monitor worker, type:%s", pMsg, TMSG_INFO(pMsg->msgType)); dTrace("msg:%p, put into vnode-monitor worker, type:%s", pMsg, TMSG_INFO(pMsg->msgType));
taosWriteQitem(pWorker->queue, pMsg); taosWriteQitem(pWorker->queue, pMsg);
return 0; return 0;
} }
static int32_t vmPutRpcMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pRpc, EQueueType qtype) { static int32_t vmPutRpcMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pRpc, EQueueType qtype) {
SMsgHead *pHead = pRpc->pCont; SMsgHead *pHead = pRpc->pCont;
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId); SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId);
if (pVnode == NULL) return -1; if (pVnode == NULL) return -1;
SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM); SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM);
int32_t code = 0; int32_t code = 0;
if (pMsg != NULL) { if (pMsg == NULL) {
rpcFreeCont(pRpc->pCont);
pRpc->pCont = NULL;
code = -1;
} else {
memcpy(pMsg, pRpc, sizeof(SRpcMsg)); memcpy(pMsg, pRpc, sizeof(SRpcMsg));
switch (qtype) { switch (qtype) {
case WRITE_QUEUE: case WRITE_QUEUE:
@ -428,7 +420,7 @@ int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
return -1; return -1;
} }
dDebug("vgId:%d, vnode queue is alloced", pVnode->vgId); dDebug("vgId:%d, queue is alloced", pVnode->vgId);
return 0; return 0;
} }
@ -445,7 +437,7 @@ void vmFreeQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
pVnode->pQueryQ = NULL; pVnode->pQueryQ = NULL;
pVnode->pFetchQ = NULL; pVnode->pFetchQ = NULL;
pVnode->pMergeQ = NULL; pVnode->pMergeQ = NULL;
dDebug("vgId:%d, vnode queue is freed", pVnode->vgId); dDebug("vgId:%d, queue is freed", pVnode->vgId);
} }
int32_t vmStartWorker(SVnodeMgmt *pMgmt) { int32_t vmStartWorker(SVnodeMgmt *pMgmt) {
@ -496,7 +488,7 @@ int32_t vmStartWorker(SVnodeMgmt *pMgmt) {
.param = pMgmt, .param = pMgmt,
}; };
if (tSingleWorkerInit(&pMgmt->monitorWorker, &mCfg) != 0) { if (tSingleWorkerInit(&pMgmt->monitorWorker, &mCfg) != 0) {
dError("failed to start mnode vnode-monitor worker since %s", terrstr()); dError("failed to start vnode-monitor worker since %s", terrstr());
return -1; return -1;
} }

View File

@ -161,10 +161,12 @@ void dmGetVnodeLoads(SMonVloadInfo *pInfo) {
void dmGetMnodeLoads(SMonMloadInfo *pInfo) { void dmGetMnodeLoads(SMonMloadInfo *pInfo) {
SDnode *pDnode = dmInstance(); SDnode *pDnode = dmInstance();
SMgmtWrapper *pWrapper = &pDnode->wrappers[MNODE]; SMgmtWrapper *pWrapper = &pDnode->wrappers[MNODE];
if (tsMultiProcess) { if (dmMarkWrapper(pWrapper) == 0) {
dmSendLocalRecv(pDnode, TDMT_MON_MM_LOAD, tDeserializeSMonMloadInfo, pInfo); if (tsMultiProcess) {
} else if (pWrapper->pMgmt != NULL) { dmSendLocalRecv(pDnode, TDMT_MON_MM_LOAD, tDeserializeSMonMloadInfo, pInfo);
mmGetMnodeLoads(pWrapper->pMgmt, pInfo); } else if (pWrapper->pMgmt != NULL) {
mmGetMnodeLoads(pWrapper->pMgmt, pInfo);
}
dmReleaseWrapper(pWrapper);
} }
dmReleaseWrapper(pWrapper);
} }

View File

@ -162,7 +162,7 @@ void *taosAllocateQitem(int32_t size, EQItype itype) {
uTrace("item:%p, node:%p is allocated", pNode->item, pNode); uTrace("item:%p, node:%p is allocated", pNode->item, pNode);
} }
return (void *)pNode->item; return pNode->item;
} }
void taosFreeQitem(void *pItem) { void taosFreeQitem(void *pItem) {