Merge pull request #22693 from taosdata/fix/streamhb

fix(stream): fix epset errors
This commit is contained in:
Haojun Liao 2023-09-01 16:23:33 +08:00 committed by GitHub
commit c8687b7270
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 38 additions and 36 deletions

View File

@ -1,7 +1,7 @@
# cos
ExternalProject_Add(mxml
GIT_REPOSITORY https://github.com/michaelrsweet/mxml.git
GIT_TAG release-2.12
GIT_TAG v2.12
SOURCE_DIR "${TD_CONTRIB_DIR}/mxml"
#BINARY_DIR ""
BUILD_IN_SOURCE TRUE

View File

@ -757,7 +757,6 @@ void metaHbToMnode(void* param, void* tmrId) {
SStreamHbMsg hbMsg = {0};
SStreamMeta* pMeta = taosAcquireRef(streamMetaId, rid);
if (pMeta == NULL) {
// taosMemoryFree(param);
return;
}
@ -779,6 +778,7 @@ void metaHbToMnode(void* param, void* tmrId) {
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
SEpSet epset = {0};
bool hasValEpset = false;
hbMsg.vgId = pMeta->vgId;
hbMsg.pTaskStatus = taosArrayInit(numOfTasks, sizeof(STaskStatusEntry));
@ -797,51 +797,53 @@ void metaHbToMnode(void* param, void* tmrId) {
if (i == 0) {
epsetAssign(&epset, &(*pTask)->info.mnodeEpset);
hasValEpset = true;
}
}
hbMsg.numOfTasks = taosArrayGetSize(hbMsg.pTaskStatus);
taosRUnLockLatch(&pMeta->lock);
int32_t code = 0;
int32_t tlen = 0;
if (hasValEpset) {
int32_t code = 0;
int32_t tlen = 0;
tEncodeSize(tEncodeStreamHbMsg, &hbMsg, tlen, code);
if (code < 0) {
qError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(code));
taosArrayDestroy(hbMsg.pTaskStatus);
taosReleaseRef(streamMetaId, rid);
return;
}
tEncodeSize(tEncodeStreamHbMsg, &hbMsg, tlen, code);
if (code < 0) {
qError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(code));
taosArrayDestroy(hbMsg.pTaskStatus);
taosReleaseRef(streamMetaId, rid);
return;
}
void* buf = rpcMallocCont(tlen);
if (buf == NULL) {
qError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(TSDB_CODE_OUT_OF_MEMORY));
taosArrayDestroy(hbMsg.pTaskStatus);
taosReleaseRef(streamMetaId, rid);
return;
}
void* buf = rpcMallocCont(tlen);
if (buf == NULL) {
qError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(TSDB_CODE_OUT_OF_MEMORY));
taosArrayDestroy(hbMsg.pTaskStatus);
taosReleaseRef(streamMetaId, rid);
return;
}
SEncoder encoder;
tEncoderInit(&encoder, buf, tlen);
if ((code = tEncodeStreamHbMsg(&encoder, &hbMsg)) < 0) {
rpcFreeCont(buf);
qError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(code));
taosArrayDestroy(hbMsg.pTaskStatus);
taosReleaseRef(streamMetaId, rid);
return;
SEncoder encoder;
tEncoderInit(&encoder, buf, tlen);
if ((code = tEncodeStreamHbMsg(&encoder, &hbMsg)) < 0) {
rpcFreeCont(buf);
qError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(code));
taosArrayDestroy(hbMsg.pTaskStatus);
taosReleaseRef(streamMetaId, rid);
return;
}
tEncoderClear(&encoder);
SRpcMsg msg = {0};
initRpcMsg(&msg, TDMT_MND_STREAM_HEARTBEAT, buf, tlen);
msg.info.noResp = 1;
qDebug("vgId:%d, build and send hb to mnode", pMeta->vgId);
tmsgSendReq(&epset, &msg);
}
tEncoderClear(&encoder);
taosArrayDestroy(hbMsg.pTaskStatus);
SRpcMsg msg = {0};
initRpcMsg(&msg, TDMT_MND_STREAM_HEARTBEAT, buf, tlen);
msg.info.noResp = 1;
qDebug("vgId:%d, build and send hb to mnode", pMeta->vgId);
tmsgSendReq(&epset, &msg);
taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamEnv.timer, &pMeta->hbInfo.hbTmr);
taosReleaseRef(streamMetaId, rid);
}
@ -905,4 +907,4 @@ void streamMetaNotifyClose(SStreamMeta* pMeta) {
int64_t el = taosGetTimestampMs() - st;
qDebug("vgId:%d all stream tasks are not in timer, continue close, elapsed time:%" PRId64 " ms", pMeta->vgId, el);
}
}