feat:[TD-30270] opti close logic in tmq

This commit is contained in:
wangmm0220 2024-09-05 14:32:11 +08:00
parent bd83023b09
commit 0cd9e274ab
4 changed files with 202 additions and 183 deletions

View File

@ -4158,6 +4158,7 @@ typedef struct {
typedef struct {
SArray* topicPrivileges; // SArray<STopicPrivilege>
int32_t debugFlag;
} SMqHbRsp;
typedef struct {

File diff suppressed because it is too large Load Diff

View File

@ -7050,6 +7050,7 @@ int32_t tSerializeSMqHbRsp(void *buf, int32_t bufLen, SMqHbRsp *pRsp) {
if (tEncodeI8(&encoder, privilege->noPrivilege) < 0) return -1;
}
if (tEncodeI32(&encoder, pRsp->debugFlag) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
@ -7075,6 +7076,10 @@ int32_t tDeserializeSMqHbRsp(void *buf, int32_t bufLen, SMqHbRsp *pRsp) {
if (tDecodeI8(&decoder, &data->noPrivilege) < 0) return -1;
}
}
if (!tDecodeIsEnd(&decoder)) {
if (tDecodeI32(&decoder, &pRsp->debugFlag) < 0) return -1;
}
tEndDecode(&decoder);
tDecoderClear(&decoder);

View File

@ -244,6 +244,7 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
}
storeOffsetRows(pMnode, &req, pConsumer);
rsp.debugFlag = tqDebugFlag;
code = buildMqHbRsp(pMsg, &rsp);
END:
@ -609,7 +610,7 @@ END:
tDeleteSMqConsumerObj(pConsumerNew);
taosArrayDestroyP(subscribe.topicNames, (FDelete)taosMemoryFree);
return code == TSDB_CODE_TMQ_NO_NEED_REBALANCE ? 0 : code;
return (code == TSDB_CODE_TMQ_NO_NEED_REBALANCE || code == TSDB_CODE_MND_CONSUMER_NOT_EXIST) ? 0 : code;
}
SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer) {