fix:semaphore always wait in tmq commit logic

This commit is contained in:
wangmm0220 2023-07-27 09:57:52 +08:00
parent a346518af6
commit 11e690cb67
1 changed files with 9 additions and 8 deletions

View File

@ -650,7 +650,7 @@ static void asyncCommitFromResult(tmq_t* tmq, const TAOS_RES* pRes, tmq_commit_c
code = asyncCommitOffset(tmq, pTopicName, vgId, &offsetVal, pCommitFp, userParam);
end:
if(code != TSDB_CODE_SUCCESS){
if(code != TSDB_CODE_SUCCESS && pCommitFp != NULL){
pCommitFp(tmq, code, userParam);
}
}
@ -2348,7 +2348,7 @@ int32_t tmq_commit_sync(tmq_t* tmq, const TAOS_RES* pRes) {
tsem_destroy(&pInfo->sem);
taosMemoryFree(pInfo);
tscDebug("consumer:0x%" PRIx64 " sync commit done, code:%s", tmq->consumerId, tstrerror(code));
tscInfo("consumer:0x%" PRIx64 " sync res commit done, code:%s", tmq->consumerId, tstrerror(code));
return code;
}
@ -2404,15 +2404,16 @@ int32_t tmq_commit_offset_sync(tmq_t *tmq, const char *pTopicName, int32_t vgId,
tsem_init(&pInfo->sem, 0, 0);
pInfo->code = 0;
asyncCommitOffset(tmq, tname, vgId, &offsetVal, commitCallBackFn, pInfo);
tsem_wait(&pInfo->sem);
code = pInfo->code;
code = asyncCommitOffset(tmq, tname, vgId, &offsetVal, commitCallBackFn, pInfo);
if(code == 0){
tsem_wait(&pInfo->sem);
code = pInfo->code;
}
tsem_destroy(&pInfo->sem);
taosMemoryFree(pInfo);
tscInfo("consumer:0x%" PRIx64 " sync send seek to vgId:%d, offset:%" PRId64" code:%s", tmq->consumerId, vgId, offset, tstrerror(code));
tscInfo("consumer:0x%" PRIx64 " sync send commit to vgId:%d, offset:%" PRId64" code:%s", tmq->consumerId, vgId, offset, tstrerror(code));
return code;
}
@ -2449,7 +2450,7 @@ void tmq_commit_offset_async(tmq_t *tmq, const char *pTopicName, int32_t vgId, i
code = asyncCommitOffset(tmq, tname, vgId, &offsetVal, cb, param);
tscInfo("consumer:0x%" PRIx64 " async send seek to vgId:%d, offset:%" PRId64" code:%s", tmq->consumerId, vgId, offset, tstrerror(code));
tscInfo("consumer:0x%" PRIx64 " async send commit to vgId:%d, offset:%" PRId64" code:%s", tmq->consumerId, vgId, offset, tstrerror(code));
end:
if(code != 0 && cb != NULL){