fix: invalid read in multi-process mode
This commit is contained in:
parent
3bff1f0984
commit
27b298bcfa
|
@ -87,6 +87,7 @@ int32_t* taosGetErrno();
|
||||||
#define TSDB_CODE_MSG_NOT_PROCESSED TAOS_DEF_ERROR_CODE(0, 0x0113)
|
#define TSDB_CODE_MSG_NOT_PROCESSED TAOS_DEF_ERROR_CODE(0, 0x0113)
|
||||||
#define TSDB_CODE_CFG_NOT_FOUND TAOS_DEF_ERROR_CODE(0, 0x0114)
|
#define TSDB_CODE_CFG_NOT_FOUND TAOS_DEF_ERROR_CODE(0, 0x0114)
|
||||||
#define TSDB_CODE_REPEAT_INIT TAOS_DEF_ERROR_CODE(0, 0x0115)
|
#define TSDB_CODE_REPEAT_INIT TAOS_DEF_ERROR_CODE(0, 0x0115)
|
||||||
|
#define TSDB_CODE_DUP_KEY TAOS_DEF_ERROR_CODE(0, 0x0116)
|
||||||
|
|
||||||
#define TSDB_CODE_REF_NO_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0140)
|
#define TSDB_CODE_REF_NO_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0140)
|
||||||
#define TSDB_CODE_REF_FULL TAOS_DEF_ERROR_CODE(0, 0x0141)
|
#define TSDB_CODE_REF_FULL TAOS_DEF_ERROR_CODE(0, 0x0141)
|
||||||
|
|
|
@ -72,6 +72,7 @@ static void dmProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, SEpSet *pEpSe
|
||||||
NodeMsgFp msgFp = NULL;
|
NodeMsgFp msgFp = NULL;
|
||||||
uint16_t msgType = pRpc->msgType;
|
uint16_t msgType = pRpc->msgType;
|
||||||
bool needRelease = false;
|
bool needRelease = false;
|
||||||
|
bool isReq = msgType & 1U;
|
||||||
|
|
||||||
if (pEpSet && pEpSet->numOfEps > 0 && msgType == TDMT_MND_STATUS_RSP) {
|
if (pEpSet && pEpSet->numOfEps > 0 && msgType == TDMT_MND_STATUS_RSP) {
|
||||||
dmSetMnodeEpSet(pWrapper->pDnode, pEpSet);
|
dmSetMnodeEpSet(pWrapper->pDnode, pEpSet);
|
||||||
|
@ -85,13 +86,13 @@ static void dmProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, SEpSet *pEpSe
|
||||||
if (dmBuildMsg(pMsg, pRpc) != 0) goto _OVER;
|
if (dmBuildMsg(pMsg, pRpc) != 0) goto _OVER;
|
||||||
|
|
||||||
if (pWrapper->procType == DND_PROC_SINGLE) {
|
if (pWrapper->procType == DND_PROC_SINGLE) {
|
||||||
dTrace("msg:%p, is created, type:%s handle:%p user:%s", pMsg, TMSG_INFO(msgType), pRpc->handle, pMsg->user);
|
dTrace("msg:%p, created, type:%s handle:%p user:%s", pMsg, TMSG_INFO(msgType), pRpc->handle, pMsg->user);
|
||||||
code = (*msgFp)(pWrapper, pMsg);
|
code = (*msgFp)(pWrapper, pMsg);
|
||||||
} else if (pWrapper->procType == DND_PROC_PARENT) {
|
} else if (pWrapper->procType == DND_PROC_PARENT) {
|
||||||
dTrace("msg:%p, is created and put into child queue, type:%s handle:%p user:%s", pMsg, TMSG_INFO(msgType),
|
dTrace("msg:%p, created and put into child queue, type:%s handle:%p code:0x%04x user:%s contLen:%d", pMsg,
|
||||||
pRpc->handle, pMsg->user);
|
TMSG_INFO(msgType), pRpc->handle, pMsg->rpcMsg.code & 0XFFFF, pMsg->user, pRpc->contLen);
|
||||||
code = taosProcPutToChildQ(pWrapper->procObj, pMsg, sizeof(SNodeMsg), pRpc->pCont, pRpc->contLen, pRpc->handle,
|
code = taosProcPutToChildQ(pWrapper->procObj, pMsg, sizeof(SNodeMsg), pRpc->pCont, pRpc->contLen,
|
||||||
pRpc->refId, PROC_FUNC_REQ);
|
(isReq && (pMsg->rpcMsg.code == 0)) ? pRpc->handle : NULL, pRpc->refId, PROC_FUNC_REQ);
|
||||||
} else {
|
} else {
|
||||||
dTrace("msg:%p, should not processed in child process, handle:%p user:%s", pMsg, pRpc->handle, pMsg->user);
|
dTrace("msg:%p, should not processed in child process, handle:%p user:%s", pMsg, pRpc->handle, pMsg->user);
|
||||||
ASSERT(1);
|
ASSERT(1);
|
||||||
|
@ -100,12 +101,13 @@ static void dmProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, SEpSet *pEpSe
|
||||||
_OVER:
|
_OVER:
|
||||||
if (code == 0) {
|
if (code == 0) {
|
||||||
if (pWrapper->procType == DND_PROC_PARENT) {
|
if (pWrapper->procType == DND_PROC_PARENT) {
|
||||||
dTrace("msg:%p, is freed in parent process", pMsg);
|
dTrace("msg:%p, freed in parent process", pMsg);
|
||||||
taosFreeQitem(pMsg);
|
taosFreeQitem(pMsg);
|
||||||
rpcFreeCont(pRpc->pCont);
|
rpcFreeCont(pRpc->pCont);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
dError("msg:%p, type:%s failed to process since 0x%04x:%s", pMsg, TMSG_INFO(msgType), code & 0XFFFF, terrstr());
|
dError("msg:%p, type:%s handle:%p failed to process since 0x%04x:%s", pMsg, TMSG_INFO(msgType), pRpc->handle,
|
||||||
|
code & 0XFFFF, terrstr());
|
||||||
if (msgType & 1U) {
|
if (msgType & 1U) {
|
||||||
if (terrno != 0) code = terrno;
|
if (terrno != 0) code = terrno;
|
||||||
if (code == TSDB_CODE_NODE_NOT_DEPLOYED || code == TSDB_CODE_NODE_OFFLINE) {
|
if (code == TSDB_CODE_NODE_NOT_DEPLOYED || code == TSDB_CODE_NODE_OFFLINE) {
|
||||||
|
@ -350,29 +352,32 @@ static void dmConsumeChildQueue(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int16_t
|
||||||
|
|
||||||
static void dmConsumeParentQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, int16_t msgLen, void *pCont, int32_t contLen,
|
static void dmConsumeParentQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, int16_t msgLen, void *pCont, int32_t contLen,
|
||||||
EProcFuncType ftype) {
|
EProcFuncType ftype) {
|
||||||
|
int32_t code = pMsg->code & 0xFFFF;
|
||||||
pMsg->pCont = pCont;
|
pMsg->pCont = pCont;
|
||||||
dTrace("msg:%p, get from parent queue, ftype:%d handle:%p code:0x%04x mtype:%d, app:%p", pMsg, ftype, pMsg->handle,
|
|
||||||
pMsg->code & 0xFFFF, pMsg->msgType, pMsg->ahandle);
|
|
||||||
|
|
||||||
switch (ftype) {
|
if (ftype == PROC_FUNC_REQ) {
|
||||||
case PROC_FUNC_REGIST:
|
dTrace("msg:%p, get from parent queue, send req:%s handle:%p code:0x%04x, app:%p", pMsg, TMSG_INFO(pMsg->msgType),
|
||||||
|
pMsg->handle, code, pMsg->ahandle);
|
||||||
|
dmSendRpcReq(pWrapper->pDnode, (SEpSet *)((char *)pMsg + sizeof(SRpcMsg)), pMsg);
|
||||||
|
} else if (ftype == PROC_FUNC_RSP) {
|
||||||
|
dTrace("msg:%p, get from parent queue, send rsp:%s handle:%p code:0x%04x, app:%p", pMsg, TMSG_INFO(pMsg->msgType),
|
||||||
|
pMsg->handle, code, pMsg->ahandle);
|
||||||
|
pMsg->refId = taosProcRemoveHandle(pWrapper->procObj, pMsg->handle);
|
||||||
|
dmSendRpcRsp(pWrapper->pDnode, pMsg);
|
||||||
|
} else if (ftype == PROC_FUNC_REGIST) {
|
||||||
|
dTrace("msg:%p, get from parent queue, regist handle:%p code:0x%04x, app:%p", pMsg, pMsg->handle, code,
|
||||||
|
pMsg->ahandle);
|
||||||
rpcRegisterBrokenLinkArg(pMsg);
|
rpcRegisterBrokenLinkArg(pMsg);
|
||||||
break;
|
} else if (ftype == PROC_FUNC_RELEASE) {
|
||||||
case PROC_FUNC_RELEASE:
|
dTrace("msg:%p, get from parent queue, release handle:%p code:0x%04x, app:%p", pMsg, pMsg->handle, code,
|
||||||
|
pMsg->ahandle);
|
||||||
taosProcRemoveHandle(pWrapper->procObj, pMsg->handle);
|
taosProcRemoveHandle(pWrapper->procObj, pMsg->handle);
|
||||||
rpcReleaseHandle(pMsg->handle, (int8_t)pMsg->code);
|
rpcReleaseHandle(pMsg->handle, (int8_t)pMsg->code);
|
||||||
rpcFreeCont(pCont);
|
rpcFreeCont(pCont);
|
||||||
break;
|
} else {
|
||||||
case PROC_FUNC_REQ:
|
dError("msg:%p, invalid ftype:%d while get from parent queue, handle:%p", pMsg, ftype, pMsg->handle);
|
||||||
dmSendRpcReq(pWrapper->pDnode, (SEpSet *)((char *)pMsg + sizeof(SRpcMsg)), pMsg);
|
|
||||||
break;
|
|
||||||
case PROC_FUNC_RSP:
|
|
||||||
pMsg->refId = taosProcRemoveHandle(pWrapper->procObj, pMsg->handle);
|
|
||||||
dmSendRpcRsp(pWrapper->pDnode, pMsg);
|
|
||||||
break;
|
|
||||||
default:
|
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
taosMemoryFree(pMsg);
|
taosMemoryFree(pMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -262,23 +262,23 @@ static int32_t vmPutNodeMsgToQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg, EQueueTyp
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
switch (qtype) {
|
switch (qtype) {
|
||||||
case QUERY_QUEUE:
|
case QUERY_QUEUE:
|
||||||
dTrace("msg:%p, will be written into vnode-query queue", pMsg);
|
dTrace("msg:%p, type:%s will be written into vnode-query queue", pMsg, TMSG_INFO(pRpc->msgType));
|
||||||
taosWriteQitem(pVnode->pQueryQ, pMsg);
|
taosWriteQitem(pVnode->pQueryQ, pMsg);
|
||||||
break;
|
break;
|
||||||
case FETCH_QUEUE:
|
case FETCH_QUEUE:
|
||||||
dTrace("msg:%p, will be written into vnode-fetch queue", pMsg);
|
dTrace("msg:%p, type:%s will be written into vnode-fetch queue", pMsg, TMSG_INFO(pRpc->msgType));
|
||||||
taosWriteQitem(pVnode->pFetchQ, pMsg);
|
taosWriteQitem(pVnode->pFetchQ, pMsg);
|
||||||
break;
|
break;
|
||||||
case WRITE_QUEUE:
|
case WRITE_QUEUE:
|
||||||
dTrace("msg:%p, will be written into vnode-write queue", pMsg);
|
dTrace("msg:%p, type:%s will be written into vnode-write queue", pMsg, TMSG_INFO(pRpc->msgType));
|
||||||
taosWriteQitem(pVnode->pWriteQ, pMsg);
|
taosWriteQitem(pVnode->pWriteQ, pMsg);
|
||||||
break;
|
break;
|
||||||
case SYNC_QUEUE:
|
case SYNC_QUEUE:
|
||||||
dTrace("msg:%p, will be written into vnode-sync queue", pMsg);
|
dTrace("msg:%p, type:%s will be written into vnode-sync queue", pMsg, TMSG_INFO(pRpc->msgType));
|
||||||
taosWriteQitem(pVnode->pSyncQ, pMsg);
|
taosWriteQitem(pVnode->pSyncQ, pMsg);
|
||||||
break;
|
break;
|
||||||
case MERGE_QUEUE:
|
case MERGE_QUEUE:
|
||||||
dTrace("msg:%p, will be written into vnode-merge queue", pMsg);
|
dTrace("msg:%p, type:%s will be written into vnode-merge queue", pMsg, TMSG_INFO(pRpc->msgType));
|
||||||
taosWriteQitem(pVnode->pMergeQ, pMsg);
|
taosWriteQitem(pVnode->pMergeQ, pMsg);
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
|
|
|
@ -93,6 +93,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_OPS_NOT_SUPPORT, "Operation not support
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_MSG_NOT_PROCESSED, "Message not processed")
|
TAOS_DEFINE_ERROR(TSDB_CODE_MSG_NOT_PROCESSED, "Message not processed")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_CFG_NOT_FOUND, "Config not found")
|
TAOS_DEFINE_ERROR(TSDB_CODE_CFG_NOT_FOUND, "Config not found")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_REPEAT_INIT, "Repeat initialization")
|
TAOS_DEFINE_ERROR(TSDB_CODE_REPEAT_INIT, "Repeat initialization")
|
||||||
|
TAOS_DEFINE_ERROR(TSDB_CODE_DUP_KEY, "Cannot add duplicate keys to hash")
|
||||||
|
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_REF_NO_MEMORY, "Ref out of memory")
|
TAOS_DEFINE_ERROR(TSDB_CODE_REF_NO_MEMORY, "Ref out of memory")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_REF_FULL, "too many Ref Objs")
|
TAOS_DEFINE_ERROR(TSDB_CODE_REF_FULL, "too many Ref Objs")
|
||||||
|
|
|
@ -310,6 +310,7 @@ int32_t taosHashGetSize(const SHashObj *pHashObj) {
|
||||||
|
|
||||||
int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, const void *data, size_t size) {
|
int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, const void *data, size_t size) {
|
||||||
if (pHashObj == NULL || key == NULL || keyLen == 0) {
|
if (pHashObj == NULL || key == NULL || keyLen == 0) {
|
||||||
|
terrno = TSDB_CODE_INVALID_PTR;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -378,6 +379,8 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, const vo
|
||||||
}
|
}
|
||||||
|
|
||||||
doUpdateHashNode(pHashObj, pe, prev, pNode, pNewNode);
|
doUpdateHashNode(pHashObj, pe, prev, pNode, pNewNode);
|
||||||
|
} else {
|
||||||
|
terrno = TSDB_CODE_DUP_KEY;
|
||||||
}
|
}
|
||||||
|
|
||||||
taosHashEntryWUnlock(pHashObj, pe);
|
taosHashEntryWUnlock(pHashObj, pe);
|
||||||
|
|
|
@ -175,7 +175,6 @@ static int32_t taosProcQueuePush(SProcObj *pProc, SProcQueue *pQueue, const char
|
||||||
if (handle != 0 && ftype == PROC_FUNC_REQ) {
|
if (handle != 0 && ftype == PROC_FUNC_REQ) {
|
||||||
if (taosHashPut(pProc->hash, &handle, sizeof(int64_t), &handleRef, sizeof(int64_t)) != 0) {
|
if (taosHashPut(pProc->hash, &handle, sizeof(int64_t), &handleRef, sizeof(int64_t)) != 0) {
|
||||||
taosThreadMutexUnlock(&pQueue->mutex);
|
taosThreadMutexUnlock(&pQueue->mutex);
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -227,8 +226,8 @@ static int32_t taosProcQueuePush(SProcObj *pProc, SProcQueue *pQueue, const char
|
||||||
taosThreadMutexUnlock(&pQueue->mutex);
|
taosThreadMutexUnlock(&pQueue->mutex);
|
||||||
tsem_post(&pQueue->sem);
|
tsem_post(&pQueue->sem);
|
||||||
|
|
||||||
uTrace("proc:%s, push msg at pos:%d ftype:%d remain:%d, head:%d %p body:%d %p", pQueue->name, pos, ftype,
|
uTrace("proc:%s, push msg at pos:%d ftype:%d remain:%d handle:%p ref:%" PRId64 ", head:%d %p body:%d %p",
|
||||||
pQueue->items, headLen, pHead, bodyLen, pBody);
|
pQueue->name, pos, ftype, pQueue->items, (void *)handle, handleRef, headLen, pHead, bodyLen, pBody);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue