diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index 7e3915f3d1..faa94a335d 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -233,6 +233,14 @@ int32_t vmPutMsgToMgmtQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { } int32_t vmPutRpcMsgToQueue(SVnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) { + if (pRpc->contLen < sizeof(SMsgHead)) { + dError("invalid rpc msg since no msg head at pCont. pRpc:%p, type:%s, len:%d", pRpc, TMSG_INFO(pRpc->msgType), + pRpc->contLen); + rpcFreeCont(pRpc->pCont); + pRpc->pCont = NULL; + return -1; + } + SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM, pRpc->contLen); if (pMsg == NULL) { rpcFreeCont(pRpc->pCont); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 152e16bd2e..b071aec2f4 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -2384,6 +2384,13 @@ int32_t syncCacheEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, LRUHand } int32_t syncNodeAppend(SSyncNode* ths, SSyncRaftEntry* pEntry) { + if (pEntry->dataLen < sizeof(SMsgHead)) { + sError("vgId:%d, cannot append an invalid client request with no msg head. type:%s, dataLen:%d", ths->vgId, + TMSG_INFO(pEntry->originalRpcType), pEntry->dataLen); + syncEntryDestroy(pEntry); + return -1; + } + // append to log buffer if (syncLogBufferAppend(ths->pLogBuf, ths, pEntry) < 0) { sError("vgId:%d, failed to enqueue sync log buffer. index:%" PRId64 "", ths->vgId, pEntry->index); @@ -2679,16 +2686,21 @@ int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIn pEntry = syncEntryBuildFromRpcMsg(pMsg, term, index); } + if (pEntry == NULL) { + sError("vgId:%d, failed to process client request since %s.", ths->vgId, terrstr()); + return -1; + } + if (ths->state == TAOS_SYNC_STATE_LEADER) { if (pRetIndex) { (*pRetIndex) = index; } int32_t code = syncNodeAppend(ths, pEntry); - if (code < 0 && ths->vgId != 1 && vnodeIsMsgBlock(pEntry->originalRpcType)) { - ASSERTS(false, "failed to append blocking msg"); - } return code; + } else { + syncEntryDestroy(pEntry); + pEntry = NULL; } return -1;