Merge pull request #20795 from taosdata/fix/TD-23495

fix:time_wait in doAskEp
This commit is contained in:
Haojun Liao 2023-04-07 09:34:10 +08:00 committed by GitHub
commit 8a21888f98
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 4 additions and 1 deletions

View File

@ -1426,6 +1426,9 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp)
char vgKey[TSDB_TOPIC_FNAME_LEN + 22]; char vgKey[TSDB_TOPIC_FNAME_LEN + 22];
tscDebug("consumer:0x%" PRIx64 " update ep epoch from %d to epoch %d, incoming topics:%d, existed topics:%d", tscDebug("consumer:0x%" PRIx64 " update ep epoch from %d to epoch %d, incoming topics:%d, existed topics:%d",
tmq->consumerId, tmq->epoch, epoch, topicNumGet, topicNumCur); tmq->consumerId, tmq->epoch, epoch, topicNumGet, topicNumCur);
if (epoch <= tmq->epoch) {
return false;
}
SArray* newTopics = taosArrayInit(topicNumGet, sizeof(SMqClientTopic)); SArray* newTopics = taosArrayInit(topicNumGet, sizeof(SMqClientTopic));
if (newTopics == NULL) { if (newTopics == NULL) {
@ -1529,8 +1532,8 @@ int32_t askEpCallbackFn(void* param, SDataBuf* pMsg, int32_t code) {
} else { } else {
tscDebug("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d, update local ep", tmq->consumerId, tscDebug("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d, update local ep", tmq->consumerId,
head->epoch, epoch); head->epoch, epoch);
pParam->pUserFn(tmq, code, pMsg, pParam->pParam);
} }
pParam->pUserFn(tmq, code, pMsg, pParam->pParam);
taosReleaseRef(tmqMgmt.rsetId, pParam->refId); taosReleaseRef(tmqMgmt.rsetId, pParam->refId);