diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 529392ebfc..8d053defdb 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -800,6 +800,8 @@ static void generateTimedTask(int64_t refId, int32_t type) { if (tsem2_post(&tmq->rspSem) != 0){ tscError("consumer:0x%" PRIx64 " failed to post sem, type:%d", tmq->consumerId, type); } + }else{ + taosFreeQitem(pTaskType); } } @@ -977,7 +979,10 @@ void tmqSendHbReq(void* param, void* tmrId) { OVER: tDestroySMqHbReq(&req); if (tmrId != NULL) { - (void)taosTmrReset(tmqSendHbReq, tmq->heartBeatIntervalMs, param, tmqMgmt.timer, &tmq->hbLiveTimer); + int32_t ret = taosTmrReset(tmqSendHbReq, tmq->heartBeatIntervalMs, param, tmqMgmt.timer, &tmq->hbLiveTimer); + if (!ret){ + tscError("failed to reset timer fo tmq hb"); + } } int32_t ret = taosReleaseRef(tmqMgmt.rsetId, refId); if (ret != 0){ @@ -1001,9 +1006,7 @@ void tmqHandleAllDelayedTask(tmq_t* pTmq) { return; } - (void)taosReadAllQitems(pTmq->delayedTask, qall); - - int32_t numOfItems = taosQallItemSize(qall); + int32_t numOfItems = taosReadAllQitems(pTmq->delayedTask, qall); if (numOfItems == 0) { taosFreeQall(qall); return; @@ -1011,9 +1014,7 @@ void tmqHandleAllDelayedTask(tmq_t* pTmq) { tscDebug("consumer:0x%" PRIx64 " handle delayed %d tasks before poll data", pTmq->consumerId, numOfItems); int8_t* pTaskType = NULL; - (void)taosGetQitem(qall, (void**)&pTaskType); - - while (pTaskType != NULL) { + while (taosGetQitem(qall, (void**)&pTaskType) != 0) { if (*pTaskType == TMQ_DELAYED_TASK__ASK_EP) { code = askEp(pTmq, NULL, false, false); if (code != 0) { @@ -1021,21 +1022,26 @@ void tmqHandleAllDelayedTask(tmq_t* pTmq) { continue; } tscDebug("consumer:0x%" PRIx64 " retrieve ep from mnode in 1s", pTmq->consumerId); - (void)taosTmrReset(tmqAssignAskEpTask, DEFAULT_ASKEP_INTERVAL, (void*)(pTmq->refId), tmqMgmt.timer, + code = taosTmrReset(tmqAssignAskEpTask, DEFAULT_ASKEP_INTERVAL, (void*)(pTmq->refId), tmqMgmt.timer, &pTmq->epTimer); + if (!code){ + tscError("failed to reset timer fo tmq ask ep"); + } } else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) { tmq_commit_cb* pCallbackFn = pTmq->commitCb ? pTmq->commitCb : defaultCommitCbFn; asyncCommitAllOffsets(pTmq, pCallbackFn, pTmq->commitCbUserParam); tscDebug("consumer:0x%" PRIx64 " next commit to vnode(s) in %.2fs", pTmq->consumerId, pTmq->autoCommitInterval / 1000.0); - (void)taosTmrReset(tmqAssignDelayedCommitTask, pTmq->autoCommitInterval, (void*)(pTmq->refId), tmqMgmt.timer, + code = taosTmrReset(tmqAssignDelayedCommitTask, pTmq->autoCommitInterval, (void*)(pTmq->refId), tmqMgmt.timer, &pTmq->commitTimer); + if (!code){ + tscError("failed to reset timer fo commit"); + } } else { tscError("consumer:0x%" PRIx64 " invalid task type:%d", pTmq->consumerId, *pTaskType); } taosFreeQitem(pTaskType); - (void)taosGetQitem(qall, (void**)&pTaskType); } taosFreeQall(qall); @@ -1067,26 +1073,18 @@ static void tmqFreeRspWrapper(SMqRspWrapper* rspWrapper) { void tmqClearUnhandleMsg(tmq_t* tmq) { SMqRspWrapper* rspWrapper = NULL; - while (1) { - (void)taosGetQitem(tmq->qall, (void**)&rspWrapper); - if (rspWrapper) { + while (taosGetQitem(tmq->qall, (void**)&rspWrapper) != 0) { tmqFreeRspWrapper(rspWrapper); taosFreeQitem(rspWrapper); - } else { - break; - } } rspWrapper = NULL; - (void)taosReadAllQitems(tmq->mqueue, tmq->qall); - while (1) { - (void)taosGetQitem(tmq->qall, (void**)&rspWrapper); - if (rspWrapper) { - tmqFreeRspWrapper(rspWrapper); - taosFreeQitem(rspWrapper); - } else { - break; - } + if (taosReadAllQitems(tmq->mqueue, tmq->qall) == 0){ + return; + } + while (taosGetQitem(tmq->qall, (void**)&rspWrapper) != 0) { + tmqFreeRspWrapper(rspWrapper); + taosFreeQitem(rspWrapper); } } @@ -1162,19 +1160,27 @@ void tmqFreeImpl(void* handle) { } taosFreeQall(tmq->qall); - (void)tsem2_destroy(&tmq->rspSem); + if(tsem2_destroy(&tmq->rspSem) != 0) { + tscError("failed to destroy sem in free tmq"); + } taosArrayDestroyEx(tmq->clientTopics, freeClientTopic); taos_close_internal(tmq->pTscObj); if (tmq->commitTimer) { - (void)taosTmrStopA(&tmq->commitTimer); + if (!taosTmrStopA(&tmq->commitTimer)) { + tscError("failed to stop commit timer"); + } } if (tmq->epTimer) { - (void)taosTmrStopA(&tmq->epTimer); + if (!taosTmrStopA(&tmq->epTimer)) { + tscError("failed to stop ep timer"); + } } if (tmq->hbLiveTimer) { - (void)taosTmrStopA(&tmq->hbLiveTimer); + if (!taosTmrStopA(&tmq->hbLiveTimer)) { + tscError("failed to stop hb timer"); + } } taosMemoryFree(tmq); @@ -1320,7 +1326,6 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { if (code) { terrno = code; tscError("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, terrstr(), pTmq->groupId); - (void)tsem2_destroy(&pTmq->rspSem); SET_ERROR_MSG_TMQ("init tscObj failed") goto _failed; } @@ -1427,7 +1432,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { } void* abuf = buf; - (void)tSerializeSCMSubscribeReq(&abuf, &req); + tlen = tSerializeSCMSubscribeReq(&abuf, &req); sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); if (sendInfo == NULL) { @@ -1459,8 +1464,12 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { goto FAIL; } - (void)tsem2_wait(¶m.rspSem); - (void)tsem2_destroy(¶m.rspSem); + if (tsem2_wait(¶m.rspSem) != 0){ + tscError("consumer:0x%" PRIx64 ", failed to wait semaphore in subscribe", tmq->consumerId); + } + if(tsem2_destroy(¶m.rspSem) != 0) { + tscError("consumer:0x%" PRIx64 ", failed to destroy semaphore in subscribe", tmq->consumerId); + } if (param.rspErr != 0) { code = param.rspErr; @@ -1665,6 +1674,8 @@ END: (void)strcpy(pRspWrapper->topicName, pParam->topicName); code = taosWriteQitem(tmq->mqueue, pRspWrapper); if (code != 0) { + tmqFreeRspWrapper((SMqRspWrapper*)pRspWrapper); + taosFreeQitem(pRspWrapper); tscError("consumer:0x%" PRIx64 " put poll res into mqueue failed, code:%d", tmq->consumerId, code); } } @@ -1672,7 +1683,11 @@ END: tscDebug("consumer:0x%" PRIx64 " put poll res into mqueue, type:%d, vgId:%d, total in queue:%d,QID:0x%" PRIx64, tmq ? tmq->consumerId : 0, rspType, vgId, total, requestId); - if (tmq) (void)tsem2_post(&tmq->rspSem); + if (tmq) { + if (tsem2_post(&tmq->rspSem) != 0){ + tscError("failed to post rsp sem, consumer:0x%" PRIx64, tmq->consumerId); + } + } if (pMsg) taosMemoryFreeClear(pMsg->pData); if (pMsg) taosMemoryFreeClear(pMsg->pEpSet); ret = taosReleaseRef(tmqMgmt.rsetId, refId); @@ -1775,7 +1790,7 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) SHashObj* pVgOffsetHashMap = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK); if (pVgOffsetHashMap == NULL) { - (void)taosArrayDestroy(newTopics); + taosArrayDestroy(newTopics); return false; } @@ -1860,10 +1875,10 @@ void tmqBuildConsumeReqImpl(SMqPollReq* pReq, tmq_t* tmq, int64_t timeout, SMqCl pReq->enableBatchMeta = tmq->enableBatchMeta; } -int32_t tmqBuildMetaRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqMetaRspObj** ppRspObj) { +void tmqBuildMetaRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqMetaRspObj** ppRspObj) { SMqMetaRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqMetaRspObj)); if (pRspObj == NULL) { - return terrno; + return; } pRspObj->resType = RES_TYPE__TMQ_META; tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN); @@ -1872,13 +1887,12 @@ int32_t tmqBuildMetaRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqMetaRspObj** (void)memcpy(&pRspObj->metaRsp, &pWrapper->metaRsp, sizeof(SMqMetaRsp)); *ppRspObj = pRspObj; - return 0; } -int32_t tmqBuildBatchMetaRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqBatchMetaRspObj** ppRspObj) { +void tmqBuildBatchMetaRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqBatchMetaRspObj** ppRspObj) { SMqBatchMetaRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqBatchMetaRspObj)); if (pRspObj == NULL) { - return terrno; + return; } pRspObj->common.resType = RES_TYPE__TMQ_BATCH_META; tstrncpy(pRspObj->common.topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN); @@ -1888,7 +1902,6 @@ int32_t tmqBuildBatchMetaRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqBatchMet (void)memcpy(&pRspObj->rsp, &pWrapper->batchMetaRsp, sizeof(SMqBatchMetaRsp)); tscDebug("build batchmeta Rsp from wrapper"); *ppRspObj = pRspObj; - return 0; } void changeByteEndian(char* pData) { @@ -1985,31 +1998,29 @@ static void tmqBuildRspFromWrapperInner(SMqPollRspWrapper* pWrapper, SMqClientVg } } -int32_t tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, int64_t* numOfRows, +void tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, int64_t* numOfRows, SMqRspObj** ppRspObj) { SMqRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqRspObj)); if (pRspObj == NULL) { - return terrno; + return; } pRspObj->common.resType = RES_TYPE__TMQ; (void)memcpy(&pRspObj->rsp, &pWrapper->dataRsp, sizeof(SMqDataRsp)); tmqBuildRspFromWrapperInner(pWrapper, pVg, numOfRows, &pRspObj->common, &pRspObj->rsp.common); *ppRspObj = pRspObj; - return 0; } -int32_t tmqBuildTaosxRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, int64_t* numOfRows, +void tmqBuildTaosxRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, int64_t* numOfRows, SMqTaosxRspObj** ppRspObj) { SMqTaosxRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqTaosxRspObj)); if (pRspObj == NULL) { - return terrno; + return; } pRspObj->common.resType = RES_TYPE__TMQ_METADATA; (void)memcpy(&pRspObj->rsp, &pWrapper->taosxRsp, sizeof(STaosxRsp)); tmqBuildRspFromWrapperInner(pWrapper, pVg, numOfRows, &pRspObj->common, &pRspObj->rsp.common); *ppRspObj = pRspObj; - return 0; } static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* pVg, int64_t timeout) { @@ -2168,12 +2179,11 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) { while (1) { SMqRspWrapper* pRspWrapper = NULL; - (void)taosGetQitem(tmq->qall, (void**)&pRspWrapper); - - if (pRspWrapper == NULL) { - (void)taosReadAllQitems(tmq->mqueue, tmq->qall); - (void)taosGetQitem(tmq->qall, (void**)&pRspWrapper); - if (pRspWrapper == NULL) { + if (taosGetQitem(tmq->qall, (void**)&pRspWrapper) == 0) { + if (taosReadAllQitems(tmq->mqueue, tmq->qall) == 0){ + return NULL; + } + if (taosGetQitem(tmq->qall, (void**)&pRspWrapper) == 0) { return NULL; } } @@ -2251,7 +2261,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) { } else { // build rsp int64_t numOfRows = 0; SMqRspObj* pRsp = NULL; - (void)tmqBuildRspFromWrapper(pollRspWrapper, pVg, &numOfRows, &pRsp); + tmqBuildRspFromWrapper(pollRspWrapper, pVg, &numOfRows, &pRsp); tmq->totalRows += numOfRows; pVg->emptyBlockReceiveTs = 0; if (pRsp && tmq->replayEnable) { @@ -2305,7 +2315,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) { pollRspWrapper->metaRsp.head.walsver, pollRspWrapper->metaRsp.head.walever, tmq->consumerId, true); // build rsp SMqMetaRspObj* pRsp = NULL; - (void)tmqBuildMetaRspFromWrapper(pollRspWrapper, &pRsp); + tmqBuildMetaRspFromWrapper(pollRspWrapper, &pRsp); taosMemoryFreeClear(pollRspWrapper->pEpset); taosFreeQitem(pRspWrapper); taosWUnLockLatch(&tmq->lock); @@ -2343,7 +2353,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) { pollRspWrapper->batchMetaRsp.head.walsver, pollRspWrapper->batchMetaRsp.head.walever, tmq->consumerId, true); SMqBatchMetaRspObj* pRsp = NULL; - (void)tmqBuildBatchMetaRspFromWrapper(pollRspWrapper, &pRsp); + tmqBuildBatchMetaRspFromWrapper(pollRspWrapper, &pRsp); taosMemoryFreeClear(pollRspWrapper->pEpset); taosFreeQitem(pRspWrapper); taosWUnLockLatch(&tmq->lock); @@ -2393,9 +2403,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) { // build rsp int64_t numOfRows = 0; SMqTaosxRspObj* pRsp = NULL; - if (tmqBuildTaosxRspFromWrapper(pollRspWrapper, pVg, &numOfRows, &pRsp) != 0) { - tscError("consumer:0x%" PRIx64 " build taosx rsp failed, vgId:%d", tmq->consumerId, pVg->vgId); - } + tmqBuildTaosxRspFromWrapper(pollRspWrapper, pVg, &numOfRows, &pRsp); tmq->totalRows += numOfRows; char buf[TSDB_OFFSET_LEN] = {0}; @@ -2563,7 +2571,10 @@ int32_t tmq_consumer_close(tmq_t* tmq) { } if (code == 0) { - (void)taosRemoveRef(tmqMgmt.rsetId, tmq->refId); + code = taosRemoveRef(tmqMgmt.rsetId, tmq->refId); + if (code != 0){ + tscError("tmq close failed to remove ref:%" PRId64 ", code:%d", tmq->refId, code); + } } return code; } @@ -2721,7 +2732,9 @@ void tmq_commit_async(tmq_t* tmq, const TAOS_RES* pRes, tmq_commit_cb* cb, void* static void commitCallBackFn(tmq_t* UNUSED_PARAM(tmq), int32_t code, void* param) { SSyncCommitInfo* pInfo = (SSyncCommitInfo*)param; pInfo->code = code; - (void)tsem2_post(&pInfo->sem); + if (tsem2_post(&pInfo->sem) != 0){ + tscError("failed to post rsp sem in commit cb"); + } } int32_t tmq_commit_sync(tmq_t* tmq, const TAOS_RES* pRes) { @@ -2750,10 +2763,14 @@ int32_t tmq_commit_sync(tmq_t* tmq, const TAOS_RES* pRes) { asyncCommitFromResult(tmq, pRes, commitCallBackFn, pInfo); } - (void)tsem2_wait(&pInfo->sem); + if (tsem2_wait(&pInfo->sem) != 0){ + tscError("failed to wait sem for sync commit"); + } code = pInfo->code; - (void)tsem2_destroy(&pInfo->sem); + if(tsem2_destroy(&pInfo->sem) != 0) { + tscError("failed to destroy sem for sync commit"); + } taosMemoryFree(pInfo); tscInfo("consumer:0x%" PRIx64 " sync res commit done, code:%s", tmq->consumerId, tstrerror(code)); @@ -2818,12 +2835,16 @@ int32_t tmq_commit_offset_sync(tmq_t* tmq, const char* pTopicName, int32_t vgId, code = asyncCommitOffset(tmq, tname, vgId, &offsetVal, commitCallBackFn, pInfo); if (code == 0) { - (void)tsem2_wait(&pInfo->sem); + if (tsem2_wait(&pInfo->sem) != 0){ + tscError("failed to wait sem for sync commit offset"); + } code = pInfo->code; } if (code == TSDB_CODE_TMQ_SAME_COMMITTED_VALUE) code = TSDB_CODE_SUCCESS; - (void)tsem2_destroy(&pInfo->sem); + if(tsem2_destroy(&pInfo->sem) != 0) { + tscError("failed to destroy sem for sync commit offset"); + } taosMemoryFree(pInfo); tscInfo("consumer:0x%" PRIx64 " sync send commit to vgId:%d, offset:%" PRId64 " code:%s", tmq->consumerId, vgId, @@ -2876,11 +2897,11 @@ end: } int32_t askEpCb(void* param, SDataBuf* pMsg, int32_t code) { - if (param == NULL) { + SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param; + if (pParam == NULL) { goto FAIL; } - SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param; tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, pParam->refId); if (tmq == NULL) { code = TSDB_CODE_TMQ_CONSUMER_CLOSED; @@ -2918,7 +2939,12 @@ int32_t askEpCb(void* param, SDataBuf* pMsg, int32_t code) { tmqFreeRspWrapper((SMqRspWrapper*)pWrapper); taosFreeQitem(pWrapper); } else { - (void)taosWriteQitem(tmq->mqueue, pWrapper); + code = taosWriteQitem(tmq->mqueue, pWrapper); + if (code != 0) { + tmqFreeRspWrapper((SMqRspWrapper*)pWrapper); + taosFreeQitem(pWrapper); + tscError("consumer:0x%" PRIx64 " put ep res into mqueue failed, code:%d", tmq->consumerId, code); + } } } @@ -2930,11 +2956,13 @@ END: } } FAIL: - if (pParam->sync) { + if (pParam && pParam->sync) { SAskEpInfo* pInfo = pParam->pParam; if (pInfo) { pInfo->code = code; - (void)tsem2_post(&pInfo->sem); + if (tsem2_post(&pInfo->sem) != 0){ + tscError("failed to post rsp sem askep cb"); + } } } @@ -2956,11 +2984,15 @@ int32_t syncAskEp(tmq_t* pTmq) { int32_t code = askEp(pTmq, pInfo, true, false); if (code == 0) { - (void)tsem2_wait(&pInfo->sem); + if (tsem2_wait(&pInfo->sem) != 0){ + tscError("consumer:0x%" PRIx64 ", failed to wait for sem", pTmq->consumerId); + } code = pInfo->code; } - (void)tsem2_destroy(&pInfo->sem); + if(tsem2_destroy(&pInfo->sem) != 0) { + tscError("failed to destroy sem sync ask ep"); + } taosMemoryFree(pInfo); return code; } @@ -3131,7 +3163,9 @@ static int32_t tmqGetWalInfoCb(void* param, SDataBuf* pMsg, int32_t code) { END: pCommon->code = code; if (total == pParam->totalReq) { - (void)tsem2_post(&pCommon->rsp); + if (tsem2_post(&pCommon->rsp) != 0) { + tscError("failed to post semaphore in get wal cb"); + } } if (pMsg) { @@ -3146,8 +3180,10 @@ static void destroyCommonInfo(SMqVgCommon* pCommon) { if (pCommon == NULL) { return; } - (void)taosArrayDestroy(pCommon->pList); - (void)tsem2_destroy(&pCommon->rsp); + taosArrayDestroy(pCommon->pList); + if(tsem2_destroy(&pCommon->rsp) != 0) { + tscError("failed to destroy semaphore for topic:%s", pCommon->pTopicName); + } (void)taosThreadMutexDestroy(&pCommon->mutex); taosMemoryFree(pCommon->pTopicName); taosMemoryFree(pCommon); @@ -3183,7 +3219,9 @@ end: taosMemoryFree(pMsg->pEpSet); } pParam->code = code; - (void)tsem2_post(&pParam->sem); + if (tsem2_post(&pParam->sem) != 0){ + tscError("failed to post semaphore in tmCommittedCb"); + } return code; } @@ -3248,12 +3286,16 @@ int64_t getCommittedFromServer(tmq_t* tmq, char* tname, int32_t vgId, SEpSet* ep int64_t transporterId = 0; code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, epSet, &transporterId, sendInfo); if (code != 0) { - (void)tsem2_destroy(&pParam->sem); + if(tsem2_destroy(&pParam->sem) != 0) { + tscError("failed to destroy semaphore in get committed from server1"); + } taosMemoryFree(pParam); return code; } - (void)tsem2_wait(&pParam->sem); + if (tsem2_wait(&pParam->sem) != 0){ + tscError("failed to wait semaphore in get committed from server"); + } code = pParam->code; if (code == TSDB_CODE_SUCCESS) { if (pParam->vgOffset.offset.val.type == TMQ_OFFSET__LOG) { @@ -3263,7 +3305,9 @@ int64_t getCommittedFromServer(tmq_t* tmq, char* tname, int32_t vgId, SEpSet* ep code = TSDB_CODE_TMQ_SNAPSHOT_ERROR; } } - (void)tsem2_destroy(&pParam->sem); + if(tsem2_destroy(&pParam->sem) != 0) { + tscError("failed to destroy semaphore in get committed from server2"); + } taosMemoryFree(pParam); return code; @@ -3534,7 +3578,9 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a } } - (void)tsem2_wait(&pCommon->rsp); + if (tsem2_wait(&pCommon->rsp) != 0){ + tscError("consumer:0x%" PRIx64 " failed to wait sem in get assignment", tmq->consumerId); + } code = pCommon->code; if (code != TSDB_CODE_SUCCESS) { @@ -3597,7 +3643,9 @@ static int32_t tmqSeekCb(void* param, SDataBuf* pMsg, int32_t code) { } SMqSeekParam* pParam = param; pParam->code = code; - (void)tsem2_post(&pParam->sem); + if (tsem2_post(&pParam->sem) != 0){ + tscError("failed to post sem in tmqSeekCb"); + } return 0; } @@ -3695,14 +3743,20 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_ int64_t transporterId = 0; code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); if (code != 0) { - (void)tsem2_destroy(&pParam->sem); + if(tsem2_destroy(&pParam->sem) != 0) { + tscError("consumer:0x%" PRIx64 "destroy rsp sem failed in seek offset", tmq->consumerId); + } taosMemoryFree(pParam); return code; } - (void)tsem2_wait(&pParam->sem); + if (tsem2_wait(&pParam->sem) != 0){ + tscError("consumer:0x%" PRIx64 "wait rsp sem failed in seek offset", tmq->consumerId); + } code = pParam->code; - (void)tsem2_destroy(&pParam->sem); + if(tsem2_destroy(&pParam->sem) != 0) { + tscError("consumer:0x%" PRIx64 "destroy rsp sem failed in seek offset", tmq->consumerId); + } taosMemoryFree(pParam); tscInfo("consumer:0x%" PRIx64 "send seek to vgId:%d, return code:%s", tmq->consumerId, vgId, tstrerror(code));