diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index 91912bb764..939d6f52f3 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -41,6 +41,7 @@ static inline void dmBuildMnodeRedirectRsp(SDnode *pDnode, SRpcMsg *pMsg) { } static inline void dmSendRedirectRsp(SRpcMsg *pMsg, const SEpSet *pNewEpSet) { + pMsg->info.hasEpSet = 1; SRpcMsg rsp = {.code = TSDB_CODE_RPC_REDIRECT, .info = pMsg->info}; int32_t contLen = tSerializeSEpSet(NULL, 0, pNewEpSet); diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index ff237c5a0c..c39c9847a9 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -58,7 +58,7 @@ static void *mndBuildTimerMsg(int32_t *pContLen) { static void mndPullupTrans(SMnode *pMnode) { int32_t contLen = 0; - void *pReq = mndBuildTimerMsg(&contLen); + void * pReq = mndBuildTimerMsg(&contLen); if (pReq != NULL) { SRpcMsg rpcMsg = {.msgType = TDMT_MND_TRANS_TIMER, .pCont = pReq, .contLen = contLen}; tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); @@ -67,14 +67,14 @@ static void mndPullupTrans(SMnode *pMnode) { static void mndTtlTimer(SMnode *pMnode) { int32_t contLen = 0; - void *pReq = mndBuildTimerMsg(&contLen); + void * pReq = mndBuildTimerMsg(&contLen); SRpcMsg rpcMsg = {.msgType = TDMT_MND_TTL_TIMER, .pCont = pReq, .contLen = contLen}; tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); } static void mndCalMqRebalance(SMnode *pMnode) { int32_t contLen = 0; - void *pReq = mndBuildTimerMsg(&contLen); + void * pReq = mndBuildTimerMsg(&contLen); if (pReq != NULL) { SRpcMsg rpcMsg = {.msgType = TDMT_MND_MQ_TIMER, .pCont = pReq, .contLen = contLen}; tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg); @@ -83,7 +83,7 @@ static void mndCalMqRebalance(SMnode *pMnode) { static void mndPullupTelem(SMnode *pMnode) { int32_t contLen = 0; - void *pReq = mndBuildTimerMsg(&contLen); + void * pReq = mndBuildTimerMsg(&contLen); if (pReq != NULL) { SRpcMsg rpcMsg = {.msgType = TDMT_MND_TELEM_TIMER, .pCont = pReq, .contLen = contLen}; tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg); @@ -395,7 +395,7 @@ void mndStop(SMnode *pMnode) { } int32_t mndProcessSyncMsg(SRpcMsg *pMsg) { - SMnode *pMnode = pMsg->info.node; + SMnode * pMnode = pMsg->info.node; SSyncMgmt *pMgmt = &pMnode->syncMgmt; int32_t code = 0; @@ -413,7 +413,7 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) { } do { - char *syncNodeStr = sync2SimpleStr(pMgmt->sync); + char * syncNodeStr = sync2SimpleStr(pMgmt->sync); static int64_t mndTick = 0; if (++mndTick % 10 == 1) { mTrace("vgId:%d, sync trace msg:%s, %s", syncGetVgId(pMgmt->sync), TMSG_INFO(pMsg->msgType), syncNodeStr); @@ -527,8 +527,8 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) { static int32_t mndCheckMnodeState(SRpcMsg *pMsg) { if (!IsReq(pMsg)) return 0; - if (pMsg->msgType == TDMT_SCH_QUERY || pMsg->msgType == TDMT_SCH_MERGE_QUERY || - pMsg->msgType == TDMT_SCH_QUERY_CONTINUE || pMsg->msgType == TDMT_SCH_QUERY_HEARTBEAT || + if (pMsg->msgType == TDMT_SCH_QUERY || pMsg->msgType == TDMT_SCH_MERGE_QUERY || + pMsg->msgType == TDMT_SCH_QUERY_CONTINUE || pMsg->msgType == TDMT_SCH_QUERY_HEARTBEAT || pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_SCH_DROP_TASK) { return 0; } @@ -552,6 +552,7 @@ static int32_t mndCheckMnodeState(SRpcMsg *pMsg) { int32_t contLen = tSerializeSEpSet(NULL, 0, &epSet); pMsg->info.rsp = rpcMallocCont(contLen); + pMsg->info.hasEpSet = 1; if (pMsg->info.rsp != NULL) { tSerializeSEpSet(pMsg->info.rsp, contLen, &epSet); pMsg->info.rspLen = contLen; @@ -578,7 +579,7 @@ static int32_t mndCheckMsgContent(SRpcMsg *pMsg) { } int32_t mndProcessRpcMsg(SRpcMsg *pMsg) { - SMnode *pMnode = pMsg->info.node; + SMnode * pMnode = pMsg->info.node; const STraceId *trace = &pMsg->info.traceId; MndMsgFp fp = pMnode->msgFp[TMSG_INDEX(pMsg->msgType)]; @@ -631,7 +632,7 @@ int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgr SMonStbInfo *pStbInfo, SMonGrantInfo *pGrantInfo) { if (mndAcquireRpcRef(pMnode) != 0) return -1; - SSdb *pSdb = pMnode->pSdb; + SSdb * pSdb = pMnode->pSdb; int64_t ms = taosGetTimestampMs(); pClusterInfo->dnodes = taosArrayInit(sdbGetSize(pSdb, SDB_DNODE), sizeof(SMonDnodeDesc)); @@ -712,7 +713,7 @@ int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgr pGrantInfo->timeseries_used += pVgroup->numOfTimeSeries; tstrncpy(desc.status, "unsynced", sizeof(desc.status)); for (int32_t i = 0; i < pVgroup->replica; ++i) { - SVnodeGid *pVgid = &pVgroup->vnodeGid[i]; + SVnodeGid * pVgid = &pVgroup->vnodeGid[i]; SMonVnodeDesc *pVnDesc = &desc.vnodes[i]; pVnDesc->dnode_id = pVgid->dnodeId; tstrncpy(pVnDesc->vnode_role, syncStr(pVgid->role), sizeof(pVnDesc->vnode_role)); diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index d324a76438..67e507858f 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -119,7 +119,7 @@ static int32_t vnodeProcessAlterReplicaReq(SVnode *pVnode, SRpcMsg *pMsg) { } void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { - SVnode *pVnode = pInfo->ahandle; + SVnode * pVnode = pInfo->ahandle; int32_t vgId = pVnode->config.vgId; int32_t code = 0; SRpcMsg *pMsg = NULL; @@ -178,7 +178,7 @@ void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { for (int32_t i = 0; i < newEpSet.numOfEps; ++i) { vGTrace("vgId:%d, msg:%p redirect:%d ep:%s:%u", vgId, pMsg, i, newEpSet.eps[i].fqdn, newEpSet.eps[i].port); } - + pMsg->info.hasEpSet = 1; SRpcMsg rsp = {.code = TSDB_CODE_RPC_REDIRECT, .info = pMsg->info}; tmsgSendRedirectRsp(&rsp, &newEpSet); } else { @@ -199,7 +199,7 @@ void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { } void vnodeApplyMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { - SVnode *pVnode = pInfo->ahandle; + SVnode * pVnode = pInfo->ahandle; int32_t vgId = pVnode->config.vgId; int32_t code = 0; SRpcMsg *pMsg = NULL; @@ -240,7 +240,7 @@ int32_t vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { STraceId *trace = &pMsg->info.traceId; do { - char *syncNodeStr = sync2SimpleStr(pVnode->sync); + char * syncNodeStr = sync2SimpleStr(pVnode->sync); static int64_t vndTick = 0; if (++vndTick % 10 == 1) { vGTrace("vgId:%d, sync trace msg:%s, %s", syncGetVgId(pVnode->sync), TMSG_INFO(pMsg->msgType), syncNodeStr); @@ -375,7 +375,7 @@ static void vnodeSyncReconfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReCon } static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { - SVnode *pVnode = pFsm->data; + SVnode * pVnode = pFsm->data; SSnapshot snapshot = {0}; SyncIndex beginIndex = SYNC_INDEX_INVALID; char logBuf[256] = {0}; diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 5de907f8cb..b48939f9f9 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -1014,19 +1014,28 @@ void cliCompareAndSwap(int8_t* val, int8_t exp, int8_t newVal) { } bool cliTryToExtractEpSet(STransMsg* pResp, SEpSet* dst) { - if (pResp == NULL || pResp->info.hasEpSet == 0) { + if ((pResp == NULL || pResp->info.hasEpSet == 0)) { + return false; + } + // rebuild resp msg + SEpSet epset; + if (tDeserializeSEpSet(pResp->pCont, pResp->contLen, &epset) < 0) { return false; } - tDeserializeSEpSet(pResp->pCont, pResp->contLen, dst); int32_t tlen = tSerializeSEpSet(NULL, 0, dst); - int32_t bufLen = pResp->contLen - tlen; - char* buf = rpcMallocCont(bufLen); - - memcpy(buf, (char*)pResp->pCont + tlen, bufLen); + char* buf = NULL; + int32_t len = pResp->contLen - tlen; + if (len != 0) { + buf = rpcMallocCont(len); + memcpy(buf, (char*)pResp->pCont + tlen, len); + } + rpcFreeCont(pResp->pCont); pResp->pCont = buf; - pResp->contLen = bufLen; + pResp->contLen = len; + + *dst = epset; return true; } int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {