fix:[TD-31017]process return value in client for tmq

This commit is contained in:
wangmm0220 2024-07-23 14:32:28 +08:00
parent ac9b27dcce
commit 62a781ac6e
2 changed files with 24 additions and 19 deletions

View File

@ -4036,7 +4036,9 @@ static FORCE_INLINE void* tDecodeSMqAskEpRsp(void* buf, SMqAskEpRsp* pRsp) {
for (int32_t i = 0; i < sz; i++) {
SMqSubTopicEp topicEp;
buf = tDecodeMqSubTopicEp(buf, &topicEp);
taosArrayPush(pRsp->topics, &topicEp);
if (taosArrayPush(pRsp->topics, &topicEp) == NULL) {
return NULL;
}
}
return buf;
}

View File

@ -794,7 +794,7 @@ void tmqReplayTask(void* param, void* tmrId) {
if (tmq == NULL) return;
(void)tsem2_post(&tmq->rspSem);
taosReleaseRef(tmqMgmt.rsetId, refId);
(void)taosReleaseRef(tmqMgmt.rsetId, refId);
}
void tmqAssignDelayedCommitTask(void* param, void* tmrId) {
@ -833,7 +833,7 @@ int32_t tmqHbCb(void* param, SDataBuf* pMsg, int32_t code) {
}
}
taosWUnLockLatch(&tmq->lock);
taosReleaseRef(tmqMgmt.rsetId, refId);
(void)taosReleaseRef(tmqMgmt.rsetId, refId);
}
tDestroySMqHbRsp(&rsp);
taosMemoryFree(pMsg->pData);
@ -936,9 +936,9 @@ void tmqSendHbReq(void* param, void* tmrId) {
OVER:
tDestroySMqHbReq(&req);
if (tmrId != NULL) {
taosTmrReset(tmqSendHbReq, DEFAULT_HEARTBEAT_INTERVAL, param, tmqMgmt.timer, &tmq->hbLiveTimer);
(void)taosTmrReset(tmqSendHbReq, DEFAULT_HEARTBEAT_INTERVAL, param, tmqMgmt.timer, &tmq->hbLiveTimer);
}
taosReleaseRef(tmqMgmt.rsetId, refId);
(void)taosReleaseRef(tmqMgmt.rsetId, refId);
}
static void defaultCommitCbFn(tmq_t* pTmq, int32_t code, void* param) {
@ -1589,7 +1589,7 @@ END:
tmq->consumerId, rspType, vgId, total, requestId);
FAIL1:
taosReleaseRef(tmqMgmt.rsetId, refId);
(void)taosReleaseRef(tmqMgmt.rsetId, refId);
FAIL2:
if (tmq) (void)tsem2_post(&tmq->rspSem);
@ -2748,7 +2748,7 @@ void tmq_commit_offset_async(tmq_t* tmq, const char* pTopicName, int32_t vgId, i
int32_t accId = tmq->pTscObj->acctId;
char tname[TSDB_TOPIC_FNAME_LEN] = {0};
sprintf(tname, "%d.%s", accId, pTopicName);
(void)snprintf(tname, TSDB_TOPIC_FNAME_LEN, "%d.%s", accId, pTopicName);
taosWLockLatch(&tmq->lock);
SMqClientVg* pVg = NULL;
@ -2802,11 +2802,12 @@ int32_t askEpCb(void* param, SDataBuf* pMsg, int32_t code) {
tscDebug("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d", tmq->consumerId, head->epoch, epoch);
if (pParam->sync) {
SMqAskEpRsp rsp = {0};
tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp);
(void)doUpdateLocalEp(tmq, head->epoch, &rsp);
if(tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp) != NULL){
(void)doUpdateLocalEp(tmq, head->epoch, &rsp);
}
tDeleteSMqAskEpRsp(&rsp);
} else {
SMqAskEpRspWrapper* pWrapper;
SMqAskEpRspWrapper* pWrapper = NULL;
code = taosAllocateQitem(sizeof(SMqAskEpRspWrapper), DEF_QITEM, 0, (void**)&pWrapper);
if (code) {
goto END;
@ -2815,13 +2816,15 @@ int32_t askEpCb(void* param, SDataBuf* pMsg, int32_t code) {
pWrapper->tmqRspType = TMQ_MSG_TYPE__EP_RSP;
pWrapper->epoch = head->epoch;
(void)memcpy(&pWrapper->msg, pMsg->pData, sizeof(SMqRspHead));
tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pWrapper->msg);
taosWriteQitem(tmq->mqueue, pWrapper);
if (tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pWrapper->msg) != NULL){
taosFreeQitem(pWrapper);
}else{
(void)taosWriteQitem(tmq->mqueue, pWrapper);
}
}
END:
taosReleaseRef(tmqMgmt.rsetId, pParam->refId);
(void)taosReleaseRef(tmqMgmt.rsetId, pParam->refId);
FAIL:
if (pParam->sync) {
@ -3013,13 +3016,13 @@ static int32_t tmqGetWalInfoCb(void* param, SDataBuf* pMsg, int32_t code) {
.currentOffset = rsp.common.rspOffset.version,
.vgId = pParam->vgId};
taosThreadMutexLock(&pCommon->mutex);
(void)taosThreadMutexLock(&pCommon->mutex);
if(taosArrayPush(pCommon->pList, &assignment) == NULL){
tscError("consumer:0x%" PRIx64 " failed to push the wal info from vgId:%d for topic:%s", pCommon->consumerId,
pParam->vgId, pCommon->pTopicName);
code = TSDB_CODE_TSC_INTERNAL_ERROR;
}
taosThreadMutexUnlock(&pCommon->mutex);
(void)taosThreadMutexUnlock(&pCommon->mutex);
}
END:
@ -3042,7 +3045,7 @@ static void destroyCommonInfo(SMqVgCommon* pCommon) {
}
(void)taosArrayDestroy(pCommon->pList);
(void)tsem2_destroy(&pCommon->rsp);
taosThreadMutexDestroy(&pCommon->mutex);
(void)taosThreadMutexDestroy(&pCommon->mutex);
taosMemoryFree(pCommon->pTopicName);
taosMemoryFree(pCommon);
}
@ -3359,7 +3362,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
code = TSDB_CODE_OUT_OF_MEMORY;
goto end;
}
taosThreadMutexInit(&pCommon->mutex, 0);
(void)taosThreadMutexInit(&pCommon->mutex, 0);
pCommon->pTopicName = taosStrdup(pTopic->topicName);
pCommon->consumerId = tmq->consumerId;
@ -3545,7 +3548,7 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_
taosWUnLockLatch(&tmq->lock);
SMqSeekReq req = {0};
snprintf(req.subKey, TSDB_SUBSCRIBE_KEY_LEN, "%s:%s", tmq->groupId, tname);
(void)snprintf(req.subKey, TSDB_SUBSCRIBE_KEY_LEN, "%s:%s", tmq->groupId, tname);
req.head.vgId = vgId;
req.consumerId = tmq->consumerId;