diff --git a/cmake/mxml_CMakeLists.txt.in b/cmake/mxml_CMakeLists.txt.in index 1ac90ebdd4..762df40e10 100644 --- a/cmake/mxml_CMakeLists.txt.in +++ b/cmake/mxml_CMakeLists.txt.in @@ -1,7 +1,7 @@ # cos ExternalProject_Add(mxml GIT_REPOSITORY https://github.com/michaelrsweet/mxml.git - GIT_TAG release-2.12 + GIT_TAG v2.12 SOURCE_DIR "${TD_CONTRIB_DIR}/mxml" #BINARY_DIR "" BUILD_IN_SOURCE TRUE diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index ff5e9adaee..ce62552478 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -757,7 +757,6 @@ void metaHbToMnode(void* param, void* tmrId) { SStreamHbMsg hbMsg = {0}; SStreamMeta* pMeta = taosAcquireRef(streamMetaId, rid); if (pMeta == NULL) { - // taosMemoryFree(param); return; } @@ -779,6 +778,7 @@ void metaHbToMnode(void* param, void* tmrId) { int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); SEpSet epset = {0}; + bool hasValEpset = false; hbMsg.vgId = pMeta->vgId; hbMsg.pTaskStatus = taosArrayInit(numOfTasks, sizeof(STaskStatusEntry)); @@ -797,51 +797,53 @@ void metaHbToMnode(void* param, void* tmrId) { if (i == 0) { epsetAssign(&epset, &(*pTask)->info.mnodeEpset); + hasValEpset = true; } } hbMsg.numOfTasks = taosArrayGetSize(hbMsg.pTaskStatus); taosRUnLockLatch(&pMeta->lock); - int32_t code = 0; - int32_t tlen = 0; + if (hasValEpset) { + int32_t code = 0; + int32_t tlen = 0; - tEncodeSize(tEncodeStreamHbMsg, &hbMsg, tlen, code); - if (code < 0) { - qError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(code)); - taosArrayDestroy(hbMsg.pTaskStatus); - taosReleaseRef(streamMetaId, rid); - return; - } + tEncodeSize(tEncodeStreamHbMsg, &hbMsg, tlen, code); + if (code < 0) { + qError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(code)); + taosArrayDestroy(hbMsg.pTaskStatus); + taosReleaseRef(streamMetaId, rid); + return; + } - void* buf = rpcMallocCont(tlen); - if (buf == NULL) { - qError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(TSDB_CODE_OUT_OF_MEMORY)); - taosArrayDestroy(hbMsg.pTaskStatus); - taosReleaseRef(streamMetaId, rid); - return; - } + void* buf = rpcMallocCont(tlen); + if (buf == NULL) { + qError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(TSDB_CODE_OUT_OF_MEMORY)); + taosArrayDestroy(hbMsg.pTaskStatus); + taosReleaseRef(streamMetaId, rid); + return; + } - SEncoder encoder; - tEncoderInit(&encoder, buf, tlen); - if ((code = tEncodeStreamHbMsg(&encoder, &hbMsg)) < 0) { - rpcFreeCont(buf); - qError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(code)); - taosArrayDestroy(hbMsg.pTaskStatus); - taosReleaseRef(streamMetaId, rid); - return; + SEncoder encoder; + tEncoderInit(&encoder, buf, tlen); + if ((code = tEncodeStreamHbMsg(&encoder, &hbMsg)) < 0) { + rpcFreeCont(buf); + qError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(code)); + taosArrayDestroy(hbMsg.pTaskStatus); + taosReleaseRef(streamMetaId, rid); + return; + } + tEncoderClear(&encoder); + + SRpcMsg msg = {0}; + initRpcMsg(&msg, TDMT_MND_STREAM_HEARTBEAT, buf, tlen); + msg.info.noResp = 1; + + qDebug("vgId:%d, build and send hb to mnode", pMeta->vgId); + tmsgSendReq(&epset, &msg); } - tEncoderClear(&encoder); taosArrayDestroy(hbMsg.pTaskStatus); - - SRpcMsg msg = {0}; - initRpcMsg(&msg, TDMT_MND_STREAM_HEARTBEAT, buf, tlen); - msg.info.noResp = 1; - - qDebug("vgId:%d, build and send hb to mnode", pMeta->vgId); - - tmsgSendReq(&epset, &msg); taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamEnv.timer, &pMeta->hbInfo.hbTmr); taosReleaseRef(streamMetaId, rid); } @@ -905,4 +907,4 @@ void streamMetaNotifyClose(SStreamMeta* pMeta) { int64_t el = taosGetTimestampMs() - st; qDebug("vgId:%d all stream tasks are not in timer, continue close, elapsed time:%" PRId64 " ms", pMeta->vgId, el); -} \ No newline at end of file +}