fix:[TD-31899] remove void(func)

This commit is contained in:
wangmm0220 2024-09-11 09:36:19 +08:00
parent 23507b0eac
commit a322de0114
1 changed files with 138 additions and 84 deletions

View File

@ -800,6 +800,8 @@ static void generateTimedTask(int64_t refId, int32_t type) {
if (tsem2_post(&tmq->rspSem) != 0){ if (tsem2_post(&tmq->rspSem) != 0){
tscError("consumer:0x%" PRIx64 " failed to post sem, type:%d", tmq->consumerId, type); tscError("consumer:0x%" PRIx64 " failed to post sem, type:%d", tmq->consumerId, type);
} }
}else{
taosFreeQitem(pTaskType);
} }
} }
@ -977,7 +979,10 @@ void tmqSendHbReq(void* param, void* tmrId) {
OVER: OVER:
tDestroySMqHbReq(&req); tDestroySMqHbReq(&req);
if (tmrId != NULL) { if (tmrId != NULL) {
(void)taosTmrReset(tmqSendHbReq, tmq->heartBeatIntervalMs, param, tmqMgmt.timer, &tmq->hbLiveTimer); int32_t ret = taosTmrReset(tmqSendHbReq, tmq->heartBeatIntervalMs, param, tmqMgmt.timer, &tmq->hbLiveTimer);
if (!ret){
tscError("failed to reset timer fo tmq hb");
}
} }
int32_t ret = taosReleaseRef(tmqMgmt.rsetId, refId); int32_t ret = taosReleaseRef(tmqMgmt.rsetId, refId);
if (ret != 0){ if (ret != 0){
@ -1001,9 +1006,7 @@ void tmqHandleAllDelayedTask(tmq_t* pTmq) {
return; return;
} }
(void)taosReadAllQitems(pTmq->delayedTask, qall); int32_t numOfItems = taosReadAllQitems(pTmq->delayedTask, qall);
int32_t numOfItems = taosQallItemSize(qall);
if (numOfItems == 0) { if (numOfItems == 0) {
taosFreeQall(qall); taosFreeQall(qall);
return; return;
@ -1011,9 +1014,7 @@ void tmqHandleAllDelayedTask(tmq_t* pTmq) {
tscDebug("consumer:0x%" PRIx64 " handle delayed %d tasks before poll data", pTmq->consumerId, numOfItems); tscDebug("consumer:0x%" PRIx64 " handle delayed %d tasks before poll data", pTmq->consumerId, numOfItems);
int8_t* pTaskType = NULL; int8_t* pTaskType = NULL;
(void)taosGetQitem(qall, (void**)&pTaskType); while (taosGetQitem(qall, (void**)&pTaskType) != 0) {
while (pTaskType != NULL) {
if (*pTaskType == TMQ_DELAYED_TASK__ASK_EP) { if (*pTaskType == TMQ_DELAYED_TASK__ASK_EP) {
code = askEp(pTmq, NULL, false, false); code = askEp(pTmq, NULL, false, false);
if (code != 0) { if (code != 0) {
@ -1021,21 +1022,26 @@ void tmqHandleAllDelayedTask(tmq_t* pTmq) {
continue; continue;
} }
tscDebug("consumer:0x%" PRIx64 " retrieve ep from mnode in 1s", pTmq->consumerId); tscDebug("consumer:0x%" PRIx64 " retrieve ep from mnode in 1s", pTmq->consumerId);
(void)taosTmrReset(tmqAssignAskEpTask, DEFAULT_ASKEP_INTERVAL, (void*)(pTmq->refId), tmqMgmt.timer, code = taosTmrReset(tmqAssignAskEpTask, DEFAULT_ASKEP_INTERVAL, (void*)(pTmq->refId), tmqMgmt.timer,
&pTmq->epTimer); &pTmq->epTimer);
if (!code){
tscError("failed to reset timer fo tmq ask ep");
}
} else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) { } else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) {
tmq_commit_cb* pCallbackFn = pTmq->commitCb ? pTmq->commitCb : defaultCommitCbFn; tmq_commit_cb* pCallbackFn = pTmq->commitCb ? pTmq->commitCb : defaultCommitCbFn;
asyncCommitAllOffsets(pTmq, pCallbackFn, pTmq->commitCbUserParam); asyncCommitAllOffsets(pTmq, pCallbackFn, pTmq->commitCbUserParam);
tscDebug("consumer:0x%" PRIx64 " next commit to vnode(s) in %.2fs", pTmq->consumerId, tscDebug("consumer:0x%" PRIx64 " next commit to vnode(s) in %.2fs", pTmq->consumerId,
pTmq->autoCommitInterval / 1000.0); pTmq->autoCommitInterval / 1000.0);
(void)taosTmrReset(tmqAssignDelayedCommitTask, pTmq->autoCommitInterval, (void*)(pTmq->refId), tmqMgmt.timer, code = taosTmrReset(tmqAssignDelayedCommitTask, pTmq->autoCommitInterval, (void*)(pTmq->refId), tmqMgmt.timer,
&pTmq->commitTimer); &pTmq->commitTimer);
if (!code){
tscError("failed to reset timer fo commit");
}
} else { } else {
tscError("consumer:0x%" PRIx64 " invalid task type:%d", pTmq->consumerId, *pTaskType); tscError("consumer:0x%" PRIx64 " invalid task type:%d", pTmq->consumerId, *pTaskType);
} }
taosFreeQitem(pTaskType); taosFreeQitem(pTaskType);
(void)taosGetQitem(qall, (void**)&pTaskType);
} }
taosFreeQall(qall); taosFreeQall(qall);
@ -1067,26 +1073,18 @@ static void tmqFreeRspWrapper(SMqRspWrapper* rspWrapper) {
void tmqClearUnhandleMsg(tmq_t* tmq) { void tmqClearUnhandleMsg(tmq_t* tmq) {
SMqRspWrapper* rspWrapper = NULL; SMqRspWrapper* rspWrapper = NULL;
while (1) { while (taosGetQitem(tmq->qall, (void**)&rspWrapper) != 0) {
(void)taosGetQitem(tmq->qall, (void**)&rspWrapper);
if (rspWrapper) {
tmqFreeRspWrapper(rspWrapper); tmqFreeRspWrapper(rspWrapper);
taosFreeQitem(rspWrapper); taosFreeQitem(rspWrapper);
} else {
break;
}
} }
rspWrapper = NULL; rspWrapper = NULL;
(void)taosReadAllQitems(tmq->mqueue, tmq->qall); if (taosReadAllQitems(tmq->mqueue, tmq->qall) == 0){
while (1) { return;
(void)taosGetQitem(tmq->qall, (void**)&rspWrapper); }
if (rspWrapper) { while (taosGetQitem(tmq->qall, (void**)&rspWrapper) != 0) {
tmqFreeRspWrapper(rspWrapper); tmqFreeRspWrapper(rspWrapper);
taosFreeQitem(rspWrapper); taosFreeQitem(rspWrapper);
} else {
break;
}
} }
} }
@ -1162,19 +1160,27 @@ void tmqFreeImpl(void* handle) {
} }
taosFreeQall(tmq->qall); taosFreeQall(tmq->qall);
(void)tsem2_destroy(&tmq->rspSem); if(tsem2_destroy(&tmq->rspSem) != 0) {
tscError("failed to destroy sem in free tmq");
}
taosArrayDestroyEx(tmq->clientTopics, freeClientTopic); taosArrayDestroyEx(tmq->clientTopics, freeClientTopic);
taos_close_internal(tmq->pTscObj); taos_close_internal(tmq->pTscObj);
if (tmq->commitTimer) { if (tmq->commitTimer) {
(void)taosTmrStopA(&tmq->commitTimer); if (!taosTmrStopA(&tmq->commitTimer)) {
tscError("failed to stop commit timer");
}
} }
if (tmq->epTimer) { if (tmq->epTimer) {
(void)taosTmrStopA(&tmq->epTimer); if (!taosTmrStopA(&tmq->epTimer)) {
tscError("failed to stop ep timer");
}
} }
if (tmq->hbLiveTimer) { if (tmq->hbLiveTimer) {
(void)taosTmrStopA(&tmq->hbLiveTimer); if (!taosTmrStopA(&tmq->hbLiveTimer)) {
tscError("failed to stop hb timer");
}
} }
taosMemoryFree(tmq); taosMemoryFree(tmq);
@ -1320,7 +1326,6 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
if (code) { if (code) {
terrno = code; terrno = code;
tscError("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, terrstr(), pTmq->groupId); tscError("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, terrstr(), pTmq->groupId);
(void)tsem2_destroy(&pTmq->rspSem);
SET_ERROR_MSG_TMQ("init tscObj failed") SET_ERROR_MSG_TMQ("init tscObj failed")
goto _failed; goto _failed;
} }
@ -1427,7 +1432,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
} }
void* abuf = buf; void* abuf = buf;
(void)tSerializeSCMSubscribeReq(&abuf, &req); tlen = tSerializeSCMSubscribeReq(&abuf, &req);
sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
if (sendInfo == NULL) { if (sendInfo == NULL) {
@ -1459,8 +1464,12 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
goto FAIL; goto FAIL;
} }
(void)tsem2_wait(&param.rspSem); if (tsem2_wait(&param.rspSem) != 0){
(void)tsem2_destroy(&param.rspSem); tscError("consumer:0x%" PRIx64 ", failed to wait semaphore in subscribe", tmq->consumerId);
}
if(tsem2_destroy(&param.rspSem) != 0) {
tscError("consumer:0x%" PRIx64 ", failed to destroy semaphore in subscribe", tmq->consumerId);
}
if (param.rspErr != 0) { if (param.rspErr != 0) {
code = param.rspErr; code = param.rspErr;
@ -1665,6 +1674,8 @@ END:
(void)strcpy(pRspWrapper->topicName, pParam->topicName); (void)strcpy(pRspWrapper->topicName, pParam->topicName);
code = taosWriteQitem(tmq->mqueue, pRspWrapper); code = taosWriteQitem(tmq->mqueue, pRspWrapper);
if (code != 0) { if (code != 0) {
tmqFreeRspWrapper((SMqRspWrapper*)pRspWrapper);
taosFreeQitem(pRspWrapper);
tscError("consumer:0x%" PRIx64 " put poll res into mqueue failed, code:%d", tmq->consumerId, code); tscError("consumer:0x%" PRIx64 " put poll res into mqueue failed, code:%d", tmq->consumerId, code);
} }
} }
@ -1672,7 +1683,11 @@ END:
tscDebug("consumer:0x%" PRIx64 " put poll res into mqueue, type:%d, vgId:%d, total in queue:%d,QID:0x%" PRIx64, tscDebug("consumer:0x%" PRIx64 " put poll res into mqueue, type:%d, vgId:%d, total in queue:%d,QID:0x%" PRIx64,
tmq ? tmq->consumerId : 0, rspType, vgId, total, requestId); tmq ? tmq->consumerId : 0, rspType, vgId, total, requestId);
if (tmq) (void)tsem2_post(&tmq->rspSem); if (tmq) {
if (tsem2_post(&tmq->rspSem) != 0){
tscError("failed to post rsp sem, consumer:0x%" PRIx64, tmq->consumerId);
}
}
if (pMsg) taosMemoryFreeClear(pMsg->pData); if (pMsg) taosMemoryFreeClear(pMsg->pData);
if (pMsg) taosMemoryFreeClear(pMsg->pEpSet); if (pMsg) taosMemoryFreeClear(pMsg->pEpSet);
ret = taosReleaseRef(tmqMgmt.rsetId, refId); ret = taosReleaseRef(tmqMgmt.rsetId, refId);
@ -1775,7 +1790,7 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp)
SHashObj* pVgOffsetHashMap = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK); SHashObj* pVgOffsetHashMap = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK);
if (pVgOffsetHashMap == NULL) { if (pVgOffsetHashMap == NULL) {
(void)taosArrayDestroy(newTopics); taosArrayDestroy(newTopics);
return false; return false;
} }
@ -1860,10 +1875,10 @@ void tmqBuildConsumeReqImpl(SMqPollReq* pReq, tmq_t* tmq, int64_t timeout, SMqCl
pReq->enableBatchMeta = tmq->enableBatchMeta; pReq->enableBatchMeta = tmq->enableBatchMeta;
} }
int32_t tmqBuildMetaRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqMetaRspObj** ppRspObj) { void tmqBuildMetaRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqMetaRspObj** ppRspObj) {
SMqMetaRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqMetaRspObj)); SMqMetaRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqMetaRspObj));
if (pRspObj == NULL) { if (pRspObj == NULL) {
return terrno; return;
} }
pRspObj->resType = RES_TYPE__TMQ_META; pRspObj->resType = RES_TYPE__TMQ_META;
tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN); tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
@ -1872,13 +1887,12 @@ int32_t tmqBuildMetaRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqMetaRspObj**
(void)memcpy(&pRspObj->metaRsp, &pWrapper->metaRsp, sizeof(SMqMetaRsp)); (void)memcpy(&pRspObj->metaRsp, &pWrapper->metaRsp, sizeof(SMqMetaRsp));
*ppRspObj = pRspObj; *ppRspObj = pRspObj;
return 0;
} }
int32_t tmqBuildBatchMetaRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqBatchMetaRspObj** ppRspObj) { void tmqBuildBatchMetaRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqBatchMetaRspObj** ppRspObj) {
SMqBatchMetaRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqBatchMetaRspObj)); SMqBatchMetaRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqBatchMetaRspObj));
if (pRspObj == NULL) { if (pRspObj == NULL) {
return terrno; return;
} }
pRspObj->common.resType = RES_TYPE__TMQ_BATCH_META; pRspObj->common.resType = RES_TYPE__TMQ_BATCH_META;
tstrncpy(pRspObj->common.topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN); tstrncpy(pRspObj->common.topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
@ -1888,7 +1902,6 @@ int32_t tmqBuildBatchMetaRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqBatchMet
(void)memcpy(&pRspObj->rsp, &pWrapper->batchMetaRsp, sizeof(SMqBatchMetaRsp)); (void)memcpy(&pRspObj->rsp, &pWrapper->batchMetaRsp, sizeof(SMqBatchMetaRsp));
tscDebug("build batchmeta Rsp from wrapper"); tscDebug("build batchmeta Rsp from wrapper");
*ppRspObj = pRspObj; *ppRspObj = pRspObj;
return 0;
} }
void changeByteEndian(char* pData) { void changeByteEndian(char* pData) {
@ -1985,31 +1998,29 @@ static void tmqBuildRspFromWrapperInner(SMqPollRspWrapper* pWrapper, SMqClientVg
} }
} }
int32_t tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, int64_t* numOfRows, void tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, int64_t* numOfRows,
SMqRspObj** ppRspObj) { SMqRspObj** ppRspObj) {
SMqRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqRspObj)); SMqRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqRspObj));
if (pRspObj == NULL) { if (pRspObj == NULL) {
return terrno; return;
} }
pRspObj->common.resType = RES_TYPE__TMQ; pRspObj->common.resType = RES_TYPE__TMQ;
(void)memcpy(&pRspObj->rsp, &pWrapper->dataRsp, sizeof(SMqDataRsp)); (void)memcpy(&pRspObj->rsp, &pWrapper->dataRsp, sizeof(SMqDataRsp));
tmqBuildRspFromWrapperInner(pWrapper, pVg, numOfRows, &pRspObj->common, &pRspObj->rsp.common); tmqBuildRspFromWrapperInner(pWrapper, pVg, numOfRows, &pRspObj->common, &pRspObj->rsp.common);
*ppRspObj = pRspObj; *ppRspObj = pRspObj;
return 0;
} }
int32_t tmqBuildTaosxRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, int64_t* numOfRows, void tmqBuildTaosxRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, int64_t* numOfRows,
SMqTaosxRspObj** ppRspObj) { SMqTaosxRspObj** ppRspObj) {
SMqTaosxRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqTaosxRspObj)); SMqTaosxRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqTaosxRspObj));
if (pRspObj == NULL) { if (pRspObj == NULL) {
return terrno; return;
} }
pRspObj->common.resType = RES_TYPE__TMQ_METADATA; pRspObj->common.resType = RES_TYPE__TMQ_METADATA;
(void)memcpy(&pRspObj->rsp, &pWrapper->taosxRsp, sizeof(STaosxRsp)); (void)memcpy(&pRspObj->rsp, &pWrapper->taosxRsp, sizeof(STaosxRsp));
tmqBuildRspFromWrapperInner(pWrapper, pVg, numOfRows, &pRspObj->common, &pRspObj->rsp.common); tmqBuildRspFromWrapperInner(pWrapper, pVg, numOfRows, &pRspObj->common, &pRspObj->rsp.common);
*ppRspObj = pRspObj; *ppRspObj = pRspObj;
return 0;
} }
static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* pVg, int64_t timeout) { static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* pVg, int64_t timeout) {
@ -2168,12 +2179,11 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) {
while (1) { while (1) {
SMqRspWrapper* pRspWrapper = NULL; SMqRspWrapper* pRspWrapper = NULL;
(void)taosGetQitem(tmq->qall, (void**)&pRspWrapper); if (taosGetQitem(tmq->qall, (void**)&pRspWrapper) == 0) {
if (taosReadAllQitems(tmq->mqueue, tmq->qall) == 0){
if (pRspWrapper == NULL) { return NULL;
(void)taosReadAllQitems(tmq->mqueue, tmq->qall); }
(void)taosGetQitem(tmq->qall, (void**)&pRspWrapper); if (taosGetQitem(tmq->qall, (void**)&pRspWrapper) == 0) {
if (pRspWrapper == NULL) {
return NULL; return NULL;
} }
} }
@ -2251,7 +2261,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) {
} else { // build rsp } else { // build rsp
int64_t numOfRows = 0; int64_t numOfRows = 0;
SMqRspObj* pRsp = NULL; SMqRspObj* pRsp = NULL;
(void)tmqBuildRspFromWrapper(pollRspWrapper, pVg, &numOfRows, &pRsp); tmqBuildRspFromWrapper(pollRspWrapper, pVg, &numOfRows, &pRsp);
tmq->totalRows += numOfRows; tmq->totalRows += numOfRows;
pVg->emptyBlockReceiveTs = 0; pVg->emptyBlockReceiveTs = 0;
if (pRsp && tmq->replayEnable) { if (pRsp && tmq->replayEnable) {
@ -2305,7 +2315,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) {
pollRspWrapper->metaRsp.head.walsver, pollRspWrapper->metaRsp.head.walever, tmq->consumerId, true); pollRspWrapper->metaRsp.head.walsver, pollRspWrapper->metaRsp.head.walever, tmq->consumerId, true);
// build rsp // build rsp
SMqMetaRspObj* pRsp = NULL; SMqMetaRspObj* pRsp = NULL;
(void)tmqBuildMetaRspFromWrapper(pollRspWrapper, &pRsp); tmqBuildMetaRspFromWrapper(pollRspWrapper, &pRsp);
taosMemoryFreeClear(pollRspWrapper->pEpset); taosMemoryFreeClear(pollRspWrapper->pEpset);
taosFreeQitem(pRspWrapper); taosFreeQitem(pRspWrapper);
taosWUnLockLatch(&tmq->lock); taosWUnLockLatch(&tmq->lock);
@ -2343,7 +2353,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) {
pollRspWrapper->batchMetaRsp.head.walsver, pollRspWrapper->batchMetaRsp.head.walever, pollRspWrapper->batchMetaRsp.head.walsver, pollRspWrapper->batchMetaRsp.head.walever,
tmq->consumerId, true); tmq->consumerId, true);
SMqBatchMetaRspObj* pRsp = NULL; SMqBatchMetaRspObj* pRsp = NULL;
(void)tmqBuildBatchMetaRspFromWrapper(pollRspWrapper, &pRsp); tmqBuildBatchMetaRspFromWrapper(pollRspWrapper, &pRsp);
taosMemoryFreeClear(pollRspWrapper->pEpset); taosMemoryFreeClear(pollRspWrapper->pEpset);
taosFreeQitem(pRspWrapper); taosFreeQitem(pRspWrapper);
taosWUnLockLatch(&tmq->lock); taosWUnLockLatch(&tmq->lock);
@ -2393,9 +2403,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) {
// build rsp // build rsp
int64_t numOfRows = 0; int64_t numOfRows = 0;
SMqTaosxRspObj* pRsp = NULL; SMqTaosxRspObj* pRsp = NULL;
if (tmqBuildTaosxRspFromWrapper(pollRspWrapper, pVg, &numOfRows, &pRsp) != 0) { tmqBuildTaosxRspFromWrapper(pollRspWrapper, pVg, &numOfRows, &pRsp);
tscError("consumer:0x%" PRIx64 " build taosx rsp failed, vgId:%d", tmq->consumerId, pVg->vgId);
}
tmq->totalRows += numOfRows; tmq->totalRows += numOfRows;
char buf[TSDB_OFFSET_LEN] = {0}; char buf[TSDB_OFFSET_LEN] = {0};
@ -2563,7 +2571,10 @@ int32_t tmq_consumer_close(tmq_t* tmq) {
} }
if (code == 0) { if (code == 0) {
(void)taosRemoveRef(tmqMgmt.rsetId, tmq->refId); code = taosRemoveRef(tmqMgmt.rsetId, tmq->refId);
if (code != 0){
tscError("tmq close failed to remove ref:%" PRId64 ", code:%d", tmq->refId, code);
}
} }
return code; return code;
} }
@ -2721,7 +2732,9 @@ void tmq_commit_async(tmq_t* tmq, const TAOS_RES* pRes, tmq_commit_cb* cb, void*
static void commitCallBackFn(tmq_t* UNUSED_PARAM(tmq), int32_t code, void* param) { static void commitCallBackFn(tmq_t* UNUSED_PARAM(tmq), int32_t code, void* param) {
SSyncCommitInfo* pInfo = (SSyncCommitInfo*)param; SSyncCommitInfo* pInfo = (SSyncCommitInfo*)param;
pInfo->code = code; pInfo->code = code;
(void)tsem2_post(&pInfo->sem); if (tsem2_post(&pInfo->sem) != 0){
tscError("failed to post rsp sem in commit cb");
}
} }
int32_t tmq_commit_sync(tmq_t* tmq, const TAOS_RES* pRes) { int32_t tmq_commit_sync(tmq_t* tmq, const TAOS_RES* pRes) {
@ -2750,10 +2763,14 @@ int32_t tmq_commit_sync(tmq_t* tmq, const TAOS_RES* pRes) {
asyncCommitFromResult(tmq, pRes, commitCallBackFn, pInfo); asyncCommitFromResult(tmq, pRes, commitCallBackFn, pInfo);
} }
(void)tsem2_wait(&pInfo->sem); if (tsem2_wait(&pInfo->sem) != 0){
tscError("failed to wait sem for sync commit");
}
code = pInfo->code; code = pInfo->code;
(void)tsem2_destroy(&pInfo->sem); if(tsem2_destroy(&pInfo->sem) != 0) {
tscError("failed to destroy sem for sync commit");
}
taosMemoryFree(pInfo); taosMemoryFree(pInfo);
tscInfo("consumer:0x%" PRIx64 " sync res commit done, code:%s", tmq->consumerId, tstrerror(code)); tscInfo("consumer:0x%" PRIx64 " sync res commit done, code:%s", tmq->consumerId, tstrerror(code));
@ -2818,12 +2835,16 @@ int32_t tmq_commit_offset_sync(tmq_t* tmq, const char* pTopicName, int32_t vgId,
code = asyncCommitOffset(tmq, tname, vgId, &offsetVal, commitCallBackFn, pInfo); code = asyncCommitOffset(tmq, tname, vgId, &offsetVal, commitCallBackFn, pInfo);
if (code == 0) { if (code == 0) {
(void)tsem2_wait(&pInfo->sem); if (tsem2_wait(&pInfo->sem) != 0){
tscError("failed to wait sem for sync commit offset");
}
code = pInfo->code; code = pInfo->code;
} }
if (code == TSDB_CODE_TMQ_SAME_COMMITTED_VALUE) code = TSDB_CODE_SUCCESS; if (code == TSDB_CODE_TMQ_SAME_COMMITTED_VALUE) code = TSDB_CODE_SUCCESS;
(void)tsem2_destroy(&pInfo->sem); if(tsem2_destroy(&pInfo->sem) != 0) {
tscError("failed to destroy sem for sync commit offset");
}
taosMemoryFree(pInfo); taosMemoryFree(pInfo);
tscInfo("consumer:0x%" PRIx64 " sync send commit to vgId:%d, offset:%" PRId64 " code:%s", tmq->consumerId, vgId, tscInfo("consumer:0x%" PRIx64 " sync send commit to vgId:%d, offset:%" PRId64 " code:%s", tmq->consumerId, vgId,
@ -2876,11 +2897,11 @@ end:
} }
int32_t askEpCb(void* param, SDataBuf* pMsg, int32_t code) { int32_t askEpCb(void* param, SDataBuf* pMsg, int32_t code) {
if (param == NULL) { SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param;
if (pParam == NULL) {
goto FAIL; goto FAIL;
} }
SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param;
tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, pParam->refId); tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, pParam->refId);
if (tmq == NULL) { if (tmq == NULL) {
code = TSDB_CODE_TMQ_CONSUMER_CLOSED; code = TSDB_CODE_TMQ_CONSUMER_CLOSED;
@ -2918,7 +2939,12 @@ int32_t askEpCb(void* param, SDataBuf* pMsg, int32_t code) {
tmqFreeRspWrapper((SMqRspWrapper*)pWrapper); tmqFreeRspWrapper((SMqRspWrapper*)pWrapper);
taosFreeQitem(pWrapper); taosFreeQitem(pWrapper);
} else { } else {
(void)taosWriteQitem(tmq->mqueue, pWrapper); code = taosWriteQitem(tmq->mqueue, pWrapper);
if (code != 0) {
tmqFreeRspWrapper((SMqRspWrapper*)pWrapper);
taosFreeQitem(pWrapper);
tscError("consumer:0x%" PRIx64 " put ep res into mqueue failed, code:%d", tmq->consumerId, code);
}
} }
} }
@ -2930,11 +2956,13 @@ END:
} }
} }
FAIL: FAIL:
if (pParam->sync) { if (pParam && pParam->sync) {
SAskEpInfo* pInfo = pParam->pParam; SAskEpInfo* pInfo = pParam->pParam;
if (pInfo) { if (pInfo) {
pInfo->code = code; pInfo->code = code;
(void)tsem2_post(&pInfo->sem); if (tsem2_post(&pInfo->sem) != 0){
tscError("failed to post rsp sem askep cb");
}
} }
} }
@ -2956,11 +2984,15 @@ int32_t syncAskEp(tmq_t* pTmq) {
int32_t code = askEp(pTmq, pInfo, true, false); int32_t code = askEp(pTmq, pInfo, true, false);
if (code == 0) { if (code == 0) {
(void)tsem2_wait(&pInfo->sem); if (tsem2_wait(&pInfo->sem) != 0){
tscError("consumer:0x%" PRIx64 ", failed to wait for sem", pTmq->consumerId);
}
code = pInfo->code; code = pInfo->code;
} }
(void)tsem2_destroy(&pInfo->sem); if(tsem2_destroy(&pInfo->sem) != 0) {
tscError("failed to destroy sem sync ask ep");
}
taosMemoryFree(pInfo); taosMemoryFree(pInfo);
return code; return code;
} }
@ -3131,7 +3163,9 @@ static int32_t tmqGetWalInfoCb(void* param, SDataBuf* pMsg, int32_t code) {
END: END:
pCommon->code = code; pCommon->code = code;
if (total == pParam->totalReq) { if (total == pParam->totalReq) {
(void)tsem2_post(&pCommon->rsp); if (tsem2_post(&pCommon->rsp) != 0) {
tscError("failed to post semaphore in get wal cb");
}
} }
if (pMsg) { if (pMsg) {
@ -3146,8 +3180,10 @@ static void destroyCommonInfo(SMqVgCommon* pCommon) {
if (pCommon == NULL) { if (pCommon == NULL) {
return; return;
} }
(void)taosArrayDestroy(pCommon->pList); taosArrayDestroy(pCommon->pList);
(void)tsem2_destroy(&pCommon->rsp); if(tsem2_destroy(&pCommon->rsp) != 0) {
tscError("failed to destroy semaphore for topic:%s", pCommon->pTopicName);
}
(void)taosThreadMutexDestroy(&pCommon->mutex); (void)taosThreadMutexDestroy(&pCommon->mutex);
taosMemoryFree(pCommon->pTopicName); taosMemoryFree(pCommon->pTopicName);
taosMemoryFree(pCommon); taosMemoryFree(pCommon);
@ -3183,7 +3219,9 @@ end:
taosMemoryFree(pMsg->pEpSet); taosMemoryFree(pMsg->pEpSet);
} }
pParam->code = code; pParam->code = code;
(void)tsem2_post(&pParam->sem); if (tsem2_post(&pParam->sem) != 0){
tscError("failed to post semaphore in tmCommittedCb");
}
return code; return code;
} }
@ -3248,12 +3286,16 @@ int64_t getCommittedFromServer(tmq_t* tmq, char* tname, int32_t vgId, SEpSet* ep
int64_t transporterId = 0; int64_t transporterId = 0;
code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, epSet, &transporterId, sendInfo); code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, epSet, &transporterId, sendInfo);
if (code != 0) { if (code != 0) {
(void)tsem2_destroy(&pParam->sem); if(tsem2_destroy(&pParam->sem) != 0) {
tscError("failed to destroy semaphore in get committed from server1");
}
taosMemoryFree(pParam); taosMemoryFree(pParam);
return code; return code;
} }
(void)tsem2_wait(&pParam->sem); if (tsem2_wait(&pParam->sem) != 0){
tscError("failed to wait semaphore in get committed from server");
}
code = pParam->code; code = pParam->code;
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
if (pParam->vgOffset.offset.val.type == TMQ_OFFSET__LOG) { if (pParam->vgOffset.offset.val.type == TMQ_OFFSET__LOG) {
@ -3263,7 +3305,9 @@ int64_t getCommittedFromServer(tmq_t* tmq, char* tname, int32_t vgId, SEpSet* ep
code = TSDB_CODE_TMQ_SNAPSHOT_ERROR; code = TSDB_CODE_TMQ_SNAPSHOT_ERROR;
} }
} }
(void)tsem2_destroy(&pParam->sem); if(tsem2_destroy(&pParam->sem) != 0) {
tscError("failed to destroy semaphore in get committed from server2");
}
taosMemoryFree(pParam); taosMemoryFree(pParam);
return code; return code;
@ -3534,7 +3578,9 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
} }
} }
(void)tsem2_wait(&pCommon->rsp); if (tsem2_wait(&pCommon->rsp) != 0){
tscError("consumer:0x%" PRIx64 " failed to wait sem in get assignment", tmq->consumerId);
}
code = pCommon->code; code = pCommon->code;
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
@ -3597,7 +3643,9 @@ static int32_t tmqSeekCb(void* param, SDataBuf* pMsg, int32_t code) {
} }
SMqSeekParam* pParam = param; SMqSeekParam* pParam = param;
pParam->code = code; pParam->code = code;
(void)tsem2_post(&pParam->sem); if (tsem2_post(&pParam->sem) != 0){
tscError("failed to post sem in tmqSeekCb");
}
return 0; return 0;
} }
@ -3695,14 +3743,20 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_
int64_t transporterId = 0; int64_t transporterId = 0;
code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
if (code != 0) { if (code != 0) {
(void)tsem2_destroy(&pParam->sem); if(tsem2_destroy(&pParam->sem) != 0) {
tscError("consumer:0x%" PRIx64 "destroy rsp sem failed in seek offset", tmq->consumerId);
}
taosMemoryFree(pParam); taosMemoryFree(pParam);
return code; return code;
} }
(void)tsem2_wait(&pParam->sem); if (tsem2_wait(&pParam->sem) != 0){
tscError("consumer:0x%" PRIx64 "wait rsp sem failed in seek offset", tmq->consumerId);
}
code = pParam->code; code = pParam->code;
(void)tsem2_destroy(&pParam->sem); if(tsem2_destroy(&pParam->sem) != 0) {
tscError("consumer:0x%" PRIx64 "destroy rsp sem failed in seek offset", tmq->consumerId);
}
taosMemoryFree(pParam); taosMemoryFree(pParam);
tscInfo("consumer:0x%" PRIx64 "send seek to vgId:%d, return code:%s", tmq->consumerId, vgId, tstrerror(code)); tscInfo("consumer:0x%" PRIx64 "send seek to vgId:%d, return code:%s", tmq->consumerId, vgId, tstrerror(code));