fix(tmq): fix race condition.
This commit is contained in:
parent
9ca77572d9
commit
3513de5ab4
|
@ -1446,15 +1446,16 @@ static bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) {
|
||||||
taosArrayPush(newTopics, &topic);
|
taosArrayPush(newTopics, &topic);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
taosHashCleanup(pVgOffsetHashMap);
|
||||||
|
|
||||||
|
taosThreadMutexLock(&tmq->lock);
|
||||||
// destroy current buffered existed topics info
|
// destroy current buffered existed topics info
|
||||||
if (tmq->clientTopics) {
|
if (tmq->clientTopics) {
|
||||||
taosArrayDestroyEx(tmq->clientTopics, freeClientVgInfo);
|
taosArrayDestroyEx(tmq->clientTopics, freeClientVgInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosHashCleanup(pVgOffsetHashMap);
|
|
||||||
|
|
||||||
taosThreadMutexLock(&tmq->lock);
|
|
||||||
tmq->clientTopics = newTopics;
|
tmq->clientTopics = newTopics;
|
||||||
|
|
||||||
taosThreadMutexUnlock(&tmq->lock);
|
taosThreadMutexUnlock(&tmq->lock);
|
||||||
|
|
||||||
int8_t flag = (topicNumGet == 0)? TMQ_CONSUMER_STATUS__NO_TOPIC:TMQ_CONSUMER_STATUS__READY;
|
int8_t flag = (topicNumGet == 0)? TMQ_CONSUMER_STATUS__NO_TOPIC:TMQ_CONSUMER_STATUS__READY;
|
||||||
|
|
Loading…
Reference in New Issue