fix:semaphore always wait in tmq commit logic
This commit is contained in:
parent
11e690cb67
commit
68eb1cfc1e
|
@ -586,30 +586,32 @@ static int32_t asyncCommitOffset(tmq_t* tmq, char* pTopicName, int32_t vgId, STq
|
||||||
if(code != 0){
|
if(code != 0){
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
if (offsetVal->type > 0 && !tOffsetEqual(offsetVal, &pVg->offsetInfo.committedOffset)) {
|
if (offsetVal->type <= 0 || tOffsetEqual(offsetVal, &pVg->offsetInfo.committedOffset)) {
|
||||||
char offsetBuf[TSDB_OFFSET_LEN] = {0};
|
code = TSDB_CODE_TMQ_INVALID_MSG;
|
||||||
tFormatOffset(offsetBuf, tListLen(offsetBuf), offsetVal);
|
goto end;
|
||||||
|
|
||||||
char commitBuf[TSDB_OFFSET_LEN] = {0};
|
|
||||||
tFormatOffset(commitBuf, tListLen(commitBuf), &pVg->offsetInfo.committedOffset);
|
|
||||||
|
|
||||||
SMqCommitCbParamSet* pParamSet = prepareCommitCbParamSet(tmq, pCommitFp, userParam, 0);
|
|
||||||
if (pParamSet == NULL) {
|
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
goto end;
|
|
||||||
}
|
|
||||||
code = doSendCommitMsg(tmq, pVg->vgId, &pVg->epSet, offsetVal, pTopicName, pParamSet);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
tscError("consumer:0x%" PRIx64 " topic:%s on vgId:%d end commit msg failed, send offset:%s committed:%s, code:%s",
|
|
||||||
tmq->consumerId, pTopicName, pVg->vgId, offsetBuf, commitBuf, tstrerror(terrno));
|
|
||||||
taosMemoryFree(pParamSet);
|
|
||||||
goto end;
|
|
||||||
}
|
|
||||||
|
|
||||||
tscInfo("consumer:0x%" PRIx64 " topic:%s on vgId:%d send commit msg success, send offset:%s committed:%s",
|
|
||||||
tmq->consumerId, pTopicName, pVg->vgId, offsetBuf, commitBuf);
|
|
||||||
pVg->offsetInfo.committedOffset = *offsetVal;
|
|
||||||
}
|
}
|
||||||
|
char offsetBuf[TSDB_OFFSET_LEN] = {0};
|
||||||
|
tFormatOffset(offsetBuf, tListLen(offsetBuf), offsetVal);
|
||||||
|
|
||||||
|
char commitBuf[TSDB_OFFSET_LEN] = {0};
|
||||||
|
tFormatOffset(commitBuf, tListLen(commitBuf), &pVg->offsetInfo.committedOffset);
|
||||||
|
|
||||||
|
SMqCommitCbParamSet* pParamSet = prepareCommitCbParamSet(tmq, pCommitFp, userParam, 0);
|
||||||
|
if (pParamSet == NULL) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto end;
|
||||||
|
}
|
||||||
|
code = doSendCommitMsg(tmq, pVg->vgId, &pVg->epSet, offsetVal, pTopicName, pParamSet);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
tscError("consumer:0x%" PRIx64 " topic:%s on vgId:%d end commit msg failed, send offset:%s committed:%s, code:%s",
|
||||||
|
tmq->consumerId, pTopicName, pVg->vgId, offsetBuf, commitBuf, tstrerror(terrno));
|
||||||
|
taosMemoryFree(pParamSet);
|
||||||
|
goto end;
|
||||||
|
}
|
||||||
|
|
||||||
|
tscInfo("consumer:0x%" PRIx64 " topic:%s on vgId:%d send commit msg success, send offset:%s committed:%s",
|
||||||
|
tmq->consumerId, pTopicName, pVg->vgId, offsetBuf, commitBuf);
|
||||||
|
pVg->offsetInfo.committedOffset = *offsetVal;
|
||||||
|
|
||||||
end:
|
end:
|
||||||
taosRUnLockLatch(&tmq->lock);
|
taosRUnLockLatch(&tmq->lock);
|
||||||
|
|
Loading…
Reference in New Issue