fix(stream): fix epset errors
This commit is contained in:
parent
4d3f0500ea
commit
5159bf3e2a
|
@ -1,7 +1,7 @@
|
|||
# cos
|
||||
ExternalProject_Add(mxml
|
||||
GIT_REPOSITORY https://github.com/michaelrsweet/mxml.git
|
||||
GIT_TAG release-2.12
|
||||
GIT_TAG v2.12
|
||||
SOURCE_DIR "${TD_CONTRIB_DIR}/mxml"
|
||||
#BINARY_DIR ""
|
||||
BUILD_IN_SOURCE TRUE
|
||||
|
|
|
@ -757,7 +757,6 @@ void metaHbToMnode(void* param, void* tmrId) {
|
|||
SStreamHbMsg hbMsg = {0};
|
||||
SStreamMeta* pMeta = taosAcquireRef(streamMetaId, rid);
|
||||
if (pMeta == NULL) {
|
||||
// taosMemoryFree(param);
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -779,6 +778,7 @@ void metaHbToMnode(void* param, void* tmrId) {
|
|||
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
|
||||
|
||||
SEpSet epset = {0};
|
||||
bool hasValEpset = false;
|
||||
|
||||
hbMsg.vgId = pMeta->vgId;
|
||||
hbMsg.pTaskStatus = taosArrayInit(numOfTasks, sizeof(STaskStatusEntry));
|
||||
|
@ -797,51 +797,53 @@ void metaHbToMnode(void* param, void* tmrId) {
|
|||
|
||||
if (i == 0) {
|
||||
epsetAssign(&epset, &(*pTask)->info.mnodeEpset);
|
||||
hasValEpset = true;
|
||||
}
|
||||
}
|
||||
|
||||
hbMsg.numOfTasks = taosArrayGetSize(hbMsg.pTaskStatus);
|
||||
taosRUnLockLatch(&pMeta->lock);
|
||||
|
||||
int32_t code = 0;
|
||||
int32_t tlen = 0;
|
||||
if (hasValEpset) {
|
||||
int32_t code = 0;
|
||||
int32_t tlen = 0;
|
||||
|
||||
tEncodeSize(tEncodeStreamHbMsg, &hbMsg, tlen, code);
|
||||
if (code < 0) {
|
||||
qError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(code));
|
||||
taosArrayDestroy(hbMsg.pTaskStatus);
|
||||
taosReleaseRef(streamMetaId, rid);
|
||||
return;
|
||||
}
|
||||
tEncodeSize(tEncodeStreamHbMsg, &hbMsg, tlen, code);
|
||||
if (code < 0) {
|
||||
qError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(code));
|
||||
taosArrayDestroy(hbMsg.pTaskStatus);
|
||||
taosReleaseRef(streamMetaId, rid);
|
||||
return;
|
||||
}
|
||||
|
||||
void* buf = rpcMallocCont(tlen);
|
||||
if (buf == NULL) {
|
||||
qError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(TSDB_CODE_OUT_OF_MEMORY));
|
||||
taosArrayDestroy(hbMsg.pTaskStatus);
|
||||
taosReleaseRef(streamMetaId, rid);
|
||||
return;
|
||||
}
|
||||
void* buf = rpcMallocCont(tlen);
|
||||
if (buf == NULL) {
|
||||
qError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(TSDB_CODE_OUT_OF_MEMORY));
|
||||
taosArrayDestroy(hbMsg.pTaskStatus);
|
||||
taosReleaseRef(streamMetaId, rid);
|
||||
return;
|
||||
}
|
||||
|
||||
SEncoder encoder;
|
||||
tEncoderInit(&encoder, buf, tlen);
|
||||
if ((code = tEncodeStreamHbMsg(&encoder, &hbMsg)) < 0) {
|
||||
rpcFreeCont(buf);
|
||||
qError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(code));
|
||||
taosArrayDestroy(hbMsg.pTaskStatus);
|
||||
taosReleaseRef(streamMetaId, rid);
|
||||
return;
|
||||
SEncoder encoder;
|
||||
tEncoderInit(&encoder, buf, tlen);
|
||||
if ((code = tEncodeStreamHbMsg(&encoder, &hbMsg)) < 0) {
|
||||
rpcFreeCont(buf);
|
||||
qError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(code));
|
||||
taosArrayDestroy(hbMsg.pTaskStatus);
|
||||
taosReleaseRef(streamMetaId, rid);
|
||||
return;
|
||||
}
|
||||
tEncoderClear(&encoder);
|
||||
|
||||
SRpcMsg msg = {0};
|
||||
initRpcMsg(&msg, TDMT_MND_STREAM_HEARTBEAT, buf, tlen);
|
||||
msg.info.noResp = 1;
|
||||
|
||||
qDebug("vgId:%d, build and send hb to mnode", pMeta->vgId);
|
||||
tmsgSendReq(&epset, &msg);
|
||||
}
|
||||
tEncoderClear(&encoder);
|
||||
|
||||
taosArrayDestroy(hbMsg.pTaskStatus);
|
||||
|
||||
SRpcMsg msg = {0};
|
||||
initRpcMsg(&msg, TDMT_MND_STREAM_HEARTBEAT, buf, tlen);
|
||||
msg.info.noResp = 1;
|
||||
|
||||
qDebug("vgId:%d, build and send hb to mnode", pMeta->vgId);
|
||||
|
||||
tmsgSendReq(&epset, &msg);
|
||||
taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamEnv.timer, &pMeta->hbInfo.hbTmr);
|
||||
taosReleaseRef(streamMetaId, rid);
|
||||
}
|
||||
|
@ -905,4 +907,4 @@ void streamMetaNotifyClose(SStreamMeta* pMeta) {
|
|||
|
||||
int64_t el = taosGetTimestampMs() - st;
|
||||
qDebug("vgId:%d all stream tasks are not in timer, continue close, elapsed time:%" PRId64 " ms", pMeta->vgId, el);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue