From ac9b27dcce38c6cd2fa607330d7fc8f1c612b9b5 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Tue, 23 Jul 2024 14:19:02 +0800 Subject: [PATCH] fix:[TD-31017]process return value in client for tmq --- source/client/inc/clientInt.h | 3 +- source/client/src/clientMain.c | 19 +- source/client/src/clientTmq.c | 915 ++++++++++++++++++++------------- source/common/src/tmsg.c | 2 +- 4 files changed, 578 insertions(+), 361 deletions(-) diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index 7c342b85d4..507738acc9 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -332,8 +332,7 @@ static FORCE_INLINE SReqResultInfo* tmqGetCurResInfo(TAOS_RES* res) { return (SReqResultInfo*)&msg->common.resInfo; } -SReqResultInfo* tmqGetNextResInfo(TAOS_RES* res, bool convertUcs4); - +int32_t tmqGetNextResInfo(TAOS_RES* res, bool convertUcs4, SReqResultInfo** pResInfo); static FORCE_INLINE SReqResultInfo* tscGetCurResInfo(TAOS_RES* res) { if (TD_RES_QUERY(res)) return &(((SRequestObj*)res)->body.resInfo); return tmqGetCurResInfo(res); diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index f65edc103a..3955610d5c 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -424,9 +424,11 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) { return doAsyncFetchRows(pRequest, true, true); } else if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res)) { SMqRspObj *msg = ((SMqRspObj *)res); - SReqResultInfo *pResultInfo; + SReqResultInfo *pResultInfo = NULL; if (msg->common.resIter == -1) { - pResultInfo = tmqGetNextResInfo(res, true); + if(tmqGetNextResInfo(res, true, &pResultInfo) != 0){ + return NULL; + } } else { pResultInfo = tmqGetCurResInfo(res); } @@ -436,8 +438,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) { pResultInfo->current += 1; return pResultInfo->row; } else { - pResultInfo = tmqGetNextResInfo(res, true); - if (pResultInfo == NULL) { + if (tmqGetNextResInfo(res, true, &pResultInfo) != 0){ return NULL; } @@ -752,8 +753,9 @@ int taos_fetch_block_s(TAOS_RES *res, int *numOfRows, TAOS_ROW *rows) { (*numOfRows) = pResultInfo->numOfRows; return pRequest->code; } else if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res)) { - SReqResultInfo *pResultInfo = tmqGetNextResInfo(res, true); - if (pResultInfo == NULL) return -1; + SReqResultInfo *pResultInfo = NULL; + int32_t code = tmqGetNextResInfo(res, true, &pResultInfo); + if (code != 0) return code; pResultInfo->current = pResultInfo->numOfRows; (*rows) = pResultInfo->row; @@ -774,8 +776,9 @@ int taos_fetch_raw_block(TAOS_RES *res, int *numOfRows, void **pData) { } if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res)) { - SReqResultInfo *pResultInfo = tmqGetNextResInfo(res, false); - if (pResultInfo == NULL) { + SReqResultInfo *pResultInfo = NULL; + int32_t code = tmqGetNextResInfo(res, false, &pResultInfo); + if (code != 0) { (*numOfRows) = 0; return 0; } diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index c363686343..e1b7eab40b 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -29,6 +29,22 @@ #define DEFAULT_HEARTBEAT_INTERVAL 3000 #define DEFAULT_ASKEP_INTERVAL 1000 +#define CLIENT_TMQ_NULL_CHECK(c) \ + do { \ + if (c == NULL) { \ + code = TSDB_CODE_OUT_OF_MEMORY; \ + goto END; \ + } \ + } while (0) + +#define CLIENT_TMQ_RETURN_CHECK(c) \ + do { \ + code = c; \ + if (code != 0) { \ + goto END; \ + } \ + } while (0) + struct SMqMgmt { tmr_h timer; int32_t rsetId; @@ -112,7 +128,7 @@ struct tmq_t { typedef struct SAskEpInfo { int32_t code; - tsem_t sem; + tsem2_t sem; } SAskEpInfo; enum { @@ -125,6 +141,7 @@ enum { TMQ_CONSUMER_STATUS__READY, TMQ_CONSUMER_STATUS__NO_TOPIC, TMQ_CONSUMER_STATUS__RECOVER, + TMQ_CONSUMER_STATUS__CLOSED, }; enum { @@ -181,9 +198,7 @@ typedef struct { } SMqPollRspWrapper; typedef struct { - // int64_t refId; - // int32_t epoch; - tsem_t rspSem; + tsem2_t rspSem; int32_t rspErr; } SMqSubscribeCbParam; @@ -201,7 +216,7 @@ typedef struct { } SMqPollCbParam; typedef struct SMqVgCommon { - tsem_t rsp; + tsem2_t rsp; int32_t numOfRsp; SArray* pList; TdThreadMutex mutex; @@ -211,12 +226,12 @@ typedef struct SMqVgCommon { } SMqVgCommon; typedef struct SMqSeekParam { - tsem_t sem; + tsem2_t sem; int32_t code; } SMqSeekParam; typedef struct SMqCommittedParam { - tsem_t sem; + tsem2_t sem; int32_t code; SMqVgOffset vgOffset; } SMqCommittedParam; @@ -234,36 +249,31 @@ typedef struct { int32_t waitingRspNum; int32_t code; tmq_commit_cb* callbackFn; - /*SArray* successfulOffsets;*/ - /*SArray* failedOffsets;*/ void* userParam; } SMqCommitCbParamSet; typedef struct { SMqCommitCbParamSet* params; - // SMqVgOffset* pOffset; char topicName[TSDB_TOPIC_FNAME_LEN]; int32_t vgId; tmq_t* pTmq; } SMqCommitCbParam; typedef struct SSyncCommitInfo { - tsem_t sem; + tsem2_t sem; int32_t code; } SSyncCommitInfo; static int32_t syncAskEp(tmq_t* tmq); -static int32_t makeTopicVgroupKey(char* dst, const char* topicName, int32_t vg); static int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet); static int32_t doSendCommitMsg(tmq_t* tmq, int32_t vgId, SEpSet* epSet, STqOffsetVal* offset, const char* pTopicName, SMqCommitCbParamSet* pParamSet); -static void commitRspCountDown(SMqCommitCbParamSet* pParamSet, int64_t consumerId, const char* pTopic, int32_t vgId); +static int32_t commitRspCountDown(SMqCommitCbParamSet* pParamSet, int64_t consumerId, const char* pTopic, int32_t vgId); static int32_t askEp(tmq_t* pTmq, void* param, bool sync, bool updateEpset); tmq_conf_t* tmq_conf_new() { tmq_conf_t* conf = taosMemoryCalloc(1, sizeof(tmq_conf_t)); if (conf == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; return conf; } @@ -445,8 +455,7 @@ static int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) { taosMemoryFree(pBuf->pData); taosMemoryFree(pBuf->pEpSet); - commitRspCountDown(pParamSet, pParam->pTmq->consumerId, pParam->topicName, pParam->vgId); - return 0; + return commitRspCountDown(pParamSet, pParam->pTmq->consumerId, pParam->topicName, pParam->vgId); } static int32_t doSendCommitMsg(tmq_t* tmq, int32_t vgId, SEpSet* epSet, STqOffsetVal* offset, const char* pTopicName, @@ -457,9 +466,9 @@ static int32_t doSendCommitMsg(tmq_t* tmq, int32_t vgId, SEpSet* epSet, STqOffse pOffset.offset.val = *offset; int32_t groupLen = strlen(tmq->groupId); - memcpy(pOffset.offset.subKey, tmq->groupId, groupLen); + (void)memcpy(pOffset.offset.subKey, tmq->groupId, groupLen); pOffset.offset.subKey[groupLen] = TMQ_SEPARATOR; - strcpy(pOffset.offset.subKey + groupLen + 1, pTopicName); + (void)strcpy(pOffset.offset.subKey + groupLen + 1, pTopicName); int32_t len = 0; int32_t code = 0; @@ -479,7 +488,11 @@ static int32_t doSendCommitMsg(tmq_t* tmq, int32_t vgId, SEpSet* epSet, STqOffse SEncoder encoder; tEncoderInit(&encoder, abuf, len); - tEncodeMqVgOffset(&encoder, &pOffset); + if(tEncodeMqVgOffset(&encoder, &pOffset) < 0) { + tEncoderClear(&encoder); + taosMemoryFree(buf); + return TSDB_CODE_INVALID_PARA; + } tEncoderClear(&encoder); // build param @@ -490,7 +503,6 @@ static int32_t doSendCommitMsg(tmq_t* tmq, int32_t vgId, SEpSet* epSet, STqOffse } pParam->params = pParamSet; - // pParam->pOffset = pOffset; pParam->vgId = vgId; pParam->pTmq = tmq; @@ -514,35 +526,35 @@ static int32_t doSendCommitMsg(tmq_t* tmq, int32_t vgId, SEpSet* epSet, STqOffse pMsgSendInfo->msgType = TDMT_VND_TMQ_COMMIT_OFFSET; int64_t transporterId = 0; - atomic_add_fetch_32(&pParamSet->waitingRspNum, 1); + (void)atomic_add_fetch_32(&pParamSet->waitingRspNum, 1); code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, epSet, &transporterId, pMsgSendInfo); if (code != 0) { - atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1); + (void)atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1); return code; } return code; } -static SMqClientTopic* getTopicByName(tmq_t* tmq, const char* pTopicName) { +static int32_t getTopicByName(tmq_t* tmq, const char* pTopicName, SMqClientTopic **topic) { int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics); for (int32_t i = 0; i < numOfTopics; ++i) { SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i); - if (strcmp(pTopic->topicName, pTopicName) != 0) { + if (pTopic == NULL || strcmp(pTopic->topicName, pTopicName) != 0) { continue; } - - return pTopic; + *topic = pTopic; + return 0; } tscError("consumer:0x%" PRIx64 ", total:%d, failed to find topic:%s", tmq->consumerId, numOfTopics, pTopicName); - return NULL; + return TSDB_CODE_TMQ_INVALID_TOPIC; } -static SMqCommitCbParamSet* prepareCommitCbParamSet(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* userParam, - int32_t rspNum) { +static int32_t prepareCommitCbParamSet(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* userParam, + int32_t rspNum, SMqCommitCbParamSet** ppParamSet) { SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet)); if (pParamSet == NULL) { - return NULL; + return TSDB_CODE_OUT_OF_MEMORY; } pParamSet->refId = tmq->refId; @@ -550,21 +562,22 @@ static SMqCommitCbParamSet* prepareCommitCbParamSet(tmq_t* tmq, tmq_commit_cb* p pParamSet->callbackFn = pCommitFp; pParamSet->userParam = userParam; pParamSet->waitingRspNum = rspNum; - - return pParamSet; + *ppParamSet = pParamSet; + return 0; } static int32_t getClientVg(tmq_t* tmq, char* pTopicName, int32_t vgId, SMqClientVg** pVg) { - SMqClientTopic* pTopic = getTopicByName(tmq, pTopicName); - if (pTopic == NULL) { + SMqClientTopic* pTopic = NULL; + int32_t code = getTopicByName(tmq, pTopicName, &pTopic); + if (code != 0) { tscError("consumer:0x%" PRIx64 " invalid topic name:%s", tmq->consumerId, pTopicName); - return TSDB_CODE_TMQ_INVALID_TOPIC; + return code; } int32_t numOfVgs = taosArrayGetSize(pTopic->vgs); for (int32_t i = 0; i < numOfVgs; ++i) { SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, i); - if (pClientVg->vgId == vgId) { + if (pClientVg && pClientVg->vgId == vgId) { *pVg = pClientVg; break; } @@ -591,16 +604,23 @@ static int32_t asyncCommitOffset(tmq_t* tmq, char* pTopicName, int32_t vgId, STq goto end; } char offsetBuf[TSDB_OFFSET_LEN] = {0}; - tFormatOffset(offsetBuf, tListLen(offsetBuf), offsetVal); - - char commitBuf[TSDB_OFFSET_LEN] = {0}; - tFormatOffset(commitBuf, tListLen(commitBuf), &pVg->offsetInfo.committedOffset); - - SMqCommitCbParamSet* pParamSet = prepareCommitCbParamSet(tmq, pCommitFp, userParam, 0); - if (pParamSet == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; + code = tFormatOffset(offsetBuf, tListLen(offsetBuf), offsetVal); + if (code != 0) { goto end; } + + char commitBuf[TSDB_OFFSET_LEN] = {0}; + code = tFormatOffset(commitBuf, tListLen(commitBuf), &pVg->offsetInfo.committedOffset); + if (code != 0) { + goto end; + } + + SMqCommitCbParamSet* pParamSet = NULL; + code = prepareCommitCbParamSet(tmq, pCommitFp, userParam, 0, &pParamSet); + if (code != 0) { + goto end; + } + code = doSendCommitMsg(tmq, pVg->vgId, &pVg->epSet, offsetVal, pTopicName, pParamSet); if (code != TSDB_CODE_SUCCESS) { tscError("consumer:0x%" PRIx64 " topic:%s on vgId:%d end commit msg failed, send offset:%s committed:%s, code:%s", @@ -666,32 +686,39 @@ end: static void asyncCommitAllOffsets(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* userParam) { int32_t code = 0; // init as 1 to prevent concurrency issue - SMqCommitCbParamSet* pParamSet = prepareCommitCbParamSet(tmq, pCommitFp, userParam, 1); - if (pParamSet == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; + SMqCommitCbParamSet* pParamSet = NULL; + code = prepareCommitCbParamSet(tmq, pCommitFp, userParam, 1, &pParamSet); + if (code != 0) { goto end; } - taosRLockLatch(&tmq->lock); int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics); tscDebug("consumer:0x%" PRIx64 " start to commit offset for %d topics", tmq->consumerId, numOfTopics); for (int32_t i = 0; i < numOfTopics; i++) { SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i); + if (pTopic != NULL) { + code = TSDB_CODE_TMQ_INVALID_TOPIC; + taosRUnLockLatch(&tmq->lock); + goto end; + } int32_t numOfVgroups = taosArrayGetSize(pTopic->vgs); - tscDebug("consumer:0x%" PRIx64 " commit offset for topics:%s, numOfVgs:%d", tmq->consumerId, pTopic->topicName, numOfVgroups); for (int32_t j = 0; j < numOfVgroups; j++) { SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j); - + if (pVg != NULL) { + code = TSDB_CODE_INVALID_PARA; + taosRUnLockLatch(&tmq->lock); + goto end; + } if (pVg->offsetInfo.endOffset.type > 0 && !tOffsetEqual(&pVg->offsetInfo.endOffset, &pVg->offsetInfo.committedOffset)) { char offsetBuf[TSDB_OFFSET_LEN] = {0}; - tFormatOffset(offsetBuf, tListLen(offsetBuf), &pVg->offsetInfo.endOffset); + (void)tFormatOffset(offsetBuf, tListLen(offsetBuf), &pVg->offsetInfo.endOffset); char commitBuf[TSDB_OFFSET_LEN] = {0}; - tFormatOffset(commitBuf, tListLen(commitBuf), &pVg->offsetInfo.committedOffset); + (void)tFormatOffset(commitBuf, tListLen(commitBuf), &pVg->offsetInfo.committedOffset); code = doSendCommitMsg(tmq, pVg->vgId, &pVg->epSet, &pVg->offsetInfo.endOffset, pTopic->topicName, pParamSet); if (code != TSDB_CODE_SUCCESS) { @@ -720,7 +747,12 @@ static void asyncCommitAllOffsets(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* us // request is sent if (pParamSet->waitingRspNum != 1) { // count down since waiting rsp num init as 1 - commitRspCountDown(pParamSet, tmq->consumerId, "", 0); + code = commitRspCountDown(pParamSet, tmq->consumerId, "", 0); + if (code != 0){ + tscError("consumer:0x%" PRIx64 " commit rsp count down failed, code:%s", tmq->consumerId, tstrerror(code)); + pParamSet = NULL; + goto end; + } return; } @@ -733,9 +765,9 @@ end: } static void generateTimedTask(int64_t refId, int32_t type) { - tmq_t* tmq; - int8_t* pTaskType; - int32_t code; + tmq_t* tmq = NULL; + int8_t* pTaskType = NULL; + int32_t code = 0; tmq = taosAcquireRef(tmqMgmt.rsetId, refId); if (tmq == NULL) return; @@ -744,7 +776,7 @@ static void generateTimedTask(int64_t refId, int32_t type) { if (code == TSDB_CODE_SUCCESS) { *pTaskType = type; if (taosWriteQitem(tmq->delayedTask, pTaskType) == 0) { - tsem2_post(&tmq->rspSem); + (void)tsem2_post(&tmq->rspSem); } } @@ -761,7 +793,7 @@ void tmqReplayTask(void* param, void* tmrId) { tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId); if (tmq == NULL) return; - tsem2_post(&tmq->rspSem); + (void)tsem2_post(&tmq->rspSem); taosReleaseRef(tmqMgmt.rsetId, refId); } @@ -771,23 +803,29 @@ void tmqAssignDelayedCommitTask(void* param, void* tmrId) { } int32_t tmqHbCb(void* param, SDataBuf* pMsg, int32_t code) { - if (pMsg == NULL) { + if (code != 0){ return code; } + if (pMsg == NULL || param == NULL) { + return TSDB_CODE_INVALID_PARA; + } SMqHbRsp rsp = {0}; - tDeserializeSMqHbRsp(pMsg->pData, pMsg->len, &rsp); + code = tDeserializeSMqHbRsp(pMsg->pData, pMsg->len, &rsp); + if (code != 0) { + return code; + } - int64_t refId = *(int64_t*)param; + int64_t refId = (int64_t)param; tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId); if (tmq != NULL) { taosWLockLatch(&tmq->lock); for (int32_t i = 0; i < taosArrayGetSize(rsp.topicPrivileges); i++) { STopicPrivilege* privilege = taosArrayGet(rsp.topicPrivileges, i); - if (privilege->noPrivilege == 1) { + if (privilege && privilege->noPrivilege == 1) { int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics); for (int32_t j = 0; j < topicNumCur; j++) { SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, j); - if (strcmp(pTopicCur->topicName, privilege->topic) == 0) { + if (pTopicCur && strcmp(pTopicCur->topicName, privilege->topic) == 0) { tscInfo("consumer:0x%" PRIx64 ", has no privilege, topic:%s", tmq->consumerId, privilege->topic); pTopicCur->noPrivilege = 1; } @@ -814,23 +852,41 @@ void tmqSendHbReq(void* param, void* tmrId) { SMqHbReq req = {0}; req.consumerId = tmq->consumerId; req.epoch = tmq->epoch; - taosRLockLatch(&tmq->lock); req.topics = taosArrayInit(taosArrayGetSize(tmq->clientTopics), sizeof(TopicOffsetRows)); + if (req.topics == NULL){ + return; + } + taosRLockLatch(&tmq->lock); for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) { SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i); + if (pTopic == NULL) { + continue; + } int32_t numOfVgroups = taosArrayGetSize(pTopic->vgs); TopicOffsetRows* data = taosArrayReserve(req.topics, 1); - strcpy(data->topicName, pTopic->topicName); + if (data == NULL){ + continue; + } + (void)strcpy(data->topicName, pTopic->topicName); data->offsetRows = taosArrayInit(numOfVgroups, sizeof(OffsetRows)); + if (data->offsetRows == NULL){ + continue; + } for (int j = 0; j < numOfVgroups; j++) { SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j); + if (pVg == NULL){ + continue; + } OffsetRows* offRows = taosArrayReserve(data->offsetRows, 1); + if (offRows == NULL){ + continue; + } offRows->vgId = pVg->vgId; offRows->rows = pVg->numOfRows; offRows->offset = pVg->offsetInfo.endOffset; offRows->ever = pVg->offsetInfo.walVerEnd == -1 ? 0 : pVg->offsetInfo.walVerEnd; char buf[TSDB_OFFSET_LEN] = {0}; - tFormatOffset(buf, TSDB_OFFSET_LEN, &offRows->offset); + (void)tFormatOffset(buf, TSDB_OFFSET_LEN, &offRows->offset); tscDebug("consumer:0x%" PRIx64 ",report offset, group:%s vgId:%d, offset:%s/%" PRId64 ", rows:%" PRId64, tmq->consumerId, tmq->groupId, offRows->vgId, buf, offRows->ever, offRows->rows); } @@ -865,9 +921,7 @@ void tmqSendHbReq(void* param, void* tmrId) { sendInfo->requestId = generateRequestId(); sendInfo->requestObjRefId = 0; - sendInfo->paramFreeFp = taosMemoryFree; - sendInfo->param = taosMemoryMalloc(sizeof(int64_t)); - *(int64_t*)sendInfo->param = refId; + sendInfo->param = (void*)refId; sendInfo->fp = tmqHbCb; sendInfo->msgType = TDMT_MND_TMQ_HB; @@ -893,48 +947,53 @@ static void defaultCommitCbFn(tmq_t* pTmq, int32_t code, void* param) { } } -int32_t tmqHandleAllDelayedTask(tmq_t* pTmq) { - STaosQall* qall; - int32_t code; +void tmqHandleAllDelayedTask(tmq_t* pTmq) { + STaosQall* qall = NULL; + int32_t code = 0; code = taosAllocateQall(&qall); - if (code) return code; + if (code) { + tscError("consumer:0x%" PRIx64 ", failed to allocate qall, code:%s", pTmq->consumerId, tstrerror(code)); + return; + } - taosReadAllQitems(pTmq->delayedTask, qall); + (void)taosReadAllQitems(pTmq->delayedTask, qall); int32_t numOfItems = taosQallItemSize(qall); if (numOfItems == 0) { taosFreeQall(qall); - return TSDB_CODE_SUCCESS; + return; } tscDebug("consumer:0x%" PRIx64 " handle delayed %d tasks before poll data", pTmq->consumerId, numOfItems); int8_t* pTaskType = NULL; - taosGetQitem(qall, (void**)&pTaskType); + (void)taosGetQitem(qall, (void**)&pTaskType); while (pTaskType != NULL) { if (*pTaskType == TMQ_DELAYED_TASK__ASK_EP) { - askEp(pTmq, NULL, false, false); - + code = askEp(pTmq, NULL, false, false); + if (code != 0) { + tscError("consumer:0x%" PRIx64 " failed to ask ep, code:%s", pTmq->consumerId, tstrerror(code)); + continue; + } tscDebug("consumer:0x%" PRIx64 " retrieve ep from mnode in 1s", pTmq->consumerId); - taosTmrReset(tmqAssignAskEpTask, DEFAULT_ASKEP_INTERVAL, (void*)(pTmq->refId), tmqMgmt.timer, &pTmq->epTimer); + (void)taosTmrReset(tmqAssignAskEpTask, DEFAULT_ASKEP_INTERVAL, (void*)(pTmq->refId), tmqMgmt.timer, &pTmq->epTimer); } else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) { tmq_commit_cb* pCallbackFn = pTmq->commitCb ? pTmq->commitCb : defaultCommitCbFn; asyncCommitAllOffsets(pTmq, pCallbackFn, pTmq->commitCbUserParam); tscDebug("consumer:0x%" PRIx64 " next commit to vnode(s) in %.2fs", pTmq->consumerId, pTmq->autoCommitInterval / 1000.0); - taosTmrReset(tmqAssignDelayedCommitTask, pTmq->autoCommitInterval, (void*)(pTmq->refId), tmqMgmt.timer, + (void)taosTmrReset(tmqAssignDelayedCommitTask, pTmq->autoCommitInterval, (void*)(pTmq->refId), tmqMgmt.timer, &pTmq->commitTimer); } else { tscError("consumer:0x%" PRIx64 " invalid task type:%d", pTmq->consumerId, *pTaskType); } taosFreeQitem(pTaskType); - taosGetQitem(qall, (void**)&pTaskType); + (void)taosGetQitem(qall, (void**)&pTaskType); } taosFreeQall(qall); - return 0; } static void tmqFreeRspWrapper(SMqRspWrapper* rspWrapper) { @@ -964,7 +1023,7 @@ static void tmqFreeRspWrapper(SMqRspWrapper* rspWrapper) { void tmqClearUnhandleMsg(tmq_t* tmq) { SMqRspWrapper* rspWrapper = NULL; while (1) { - taosGetQitem(tmq->qall, (void**)&rspWrapper); + (void)taosGetQitem(tmq->qall, (void**)&rspWrapper); if (rspWrapper) { tmqFreeRspWrapper(rspWrapper); taosFreeQitem(rspWrapper); @@ -974,9 +1033,9 @@ void tmqClearUnhandleMsg(tmq_t* tmq) { } rspWrapper = NULL; - taosReadAllQitems(tmq->mqueue, tmq->qall); + (void)taosReadAllQitems(tmq->mqueue, tmq->qall); while (1) { - taosGetQitem(tmq->qall, (void**)&rspWrapper); + (void)taosGetQitem(tmq->qall, (void**)&rspWrapper); if (rspWrapper) { tmqFreeRspWrapper(rspWrapper); taosFreeQitem(rspWrapper); @@ -997,7 +1056,7 @@ int32_t tmqSubscribeCb(void* param, SDataBuf* pMsg, int32_t code) { if (pMsg) { taosMemoryFree(pMsg->pEpSet); } - tsem_post(&pParam->rspSem); + (void)tsem2_post(&pParam->rspSem); return 0; } @@ -1005,43 +1064,38 @@ int32_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics) { if (tmq == NULL) return TSDB_CODE_INVALID_PARA; if (*topics == NULL) { *topics = tmq_list_new(); + if (*topics == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } } taosRLockLatch(&tmq->lock); for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) { SMqClientTopic* topic = taosArrayGet(tmq->clientTopics, i); - tmq_list_append(*topics, strchr(topic->topicName, '.') + 1); + if(topic == NULL) { + tscError("topic is null"); + continue; + } + char* tmp = strchr(topic->topicName, '.'); + if(tmp == NULL) { + tscError("topic name is invalid:%s", topic->topicName); + continue; + } + if(tmq_list_append(*topics, tmp+ 1) != 0) { + tscError("failed to append topic:%s", tmp + 1); + continue; + } } taosRUnLockLatch(&tmq->lock); return 0; } -int32_t tmq_unsubscribe(tmq_t* tmq) { - if (tmq == NULL) return TSDB_CODE_INVALID_PARA; - if (tmq->status != TMQ_CONSUMER_STATUS__READY) { - tscInfo("consumer:0x%" PRIx64 " not in ready state, unsubscribe it directly", tmq->consumerId); - return 0; - } - if (tmq->autoCommit) { - int32_t rsp = tmq_commit_sync(tmq, NULL); - if (rsp != 0) { - return rsp; - } - } - taosSsleep(2); // sleep 2s for hb to send offset and rows to server - - tmq_list_t* lst = tmq_list_new(); - int32_t rsp = tmq_subscribe(tmq, lst); - tmq_list_destroy(lst); - return rsp; -} - static void freeClientVg(void* param) { SMqClientVg* pVg = param; tOffsetDestroy(&pVg->offsetInfo.endOffset); tOffsetDestroy(&pVg->offsetInfo.beginOffset); tOffsetDestroy(&pVg->offsetInfo.committedOffset); } -static void freeClientVgImpl(void* param) { +static void freeClientTopic(void* param) { SMqClientTopic* pTopic = param; taosMemoryFreeClear(pTopic->schema.pSchema); taosArrayDestroyEx(pTopic->vgs, freeClientVg); @@ -1051,7 +1105,6 @@ void tmqFreeImpl(void* handle) { tmq_t* tmq = (tmq_t*)handle; int64_t id = tmq->consumerId; - // TODO stop timer if (tmq->mqueue) { tmqClearUnhandleMsg(tmq); taosCloseQueue(tmq->mqueue); @@ -1062,19 +1115,19 @@ void tmqFreeImpl(void* handle) { } taosFreeQall(tmq->qall); - tsem2_destroy(&tmq->rspSem); + (void)tsem2_destroy(&tmq->rspSem); - taosArrayDestroyEx(tmq->clientTopics, freeClientVgImpl); + taosArrayDestroyEx(tmq->clientTopics, freeClientTopic); taos_close_internal(tmq->pTscObj); if (tmq->commitTimer) { - taosTmrStopA(&tmq->commitTimer); + (void)taosTmrStopA(&tmq->commitTimer); } if (tmq->epTimer) { - taosTmrStopA(&tmq->epTimer); + (void)taosTmrStopA(&tmq->epTimer); } if (tmq->hbLiveTimer) { - taosTmrStopA(&tmq->hbLiveTimer); + (void)taosTmrStopA(&tmq->hbLiveTimer); } taosMemoryFree(tmq); @@ -1102,32 +1155,34 @@ void tmqMgmtClose(void) { } if (tmqMgmt.rsetId >= 0) { - taosCloseRef(tmqMgmt.rsetId); + (void)taosCloseRef(tmqMgmt.rsetId); tmqMgmt.rsetId = -1; } } #define SET_ERROR_MSG_TMQ(MSG) \ - if (errstr != NULL) snprintf(errstr, errstrLen, MSG); + if (errstr != NULL) (void)snprintf(errstr, errstrLen, MSG); tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { - int32_t code; + int32_t code = 0; if (conf == NULL) { SET_ERROR_MSG_TMQ("configure is null") return NULL; } - taosThreadOnce(&tmqInit, tmqMgmtInit); + code = taosThreadOnce(&tmqInit, tmqMgmtInit); + if (code != 0) { + SET_ERROR_MSG_TMQ("tmq init error") + return NULL; + } if (tmqInitRes != 0) { - terrno = tmqInitRes; SET_ERROR_MSG_TMQ("tmq timer init error") return NULL; } tmq_t* pTmq = taosMemoryCalloc(1, sizeof(tmq_t)); if (pTmq == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - tscError("failed to create consumer, groupId:%s, code:%s", conf->groupId, terrstr()); + tscError("failed to create consumer, groupId:%s", conf->groupId); SET_ERROR_MSG_TMQ("malloc tmq failed") return NULL; } @@ -1136,33 +1191,34 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { const char* pass = conf->pass == NULL ? TSDB_DEFAULT_PASS : conf->pass; pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic)); + if (pTmq->clientTopics == NULL) { + tscError("failed to create consumer, groupId:%s", conf->groupId); + SET_ERROR_MSG_TMQ("malloc client topics failed") + goto _failed; + } code = taosOpenQueue(&pTmq->mqueue); if (code) { - terrno = code; - tscError("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, terrstr(), pTmq->groupId); + tscError("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, tstrerror(code), pTmq->groupId); SET_ERROR_MSG_TMQ("open queue failed") goto _failed; } code = taosOpenQueue(&pTmq->delayedTask); if (code) { - terrno = code; - tscError("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, terrstr(), pTmq->groupId); + tscError("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, tstrerror(code), pTmq->groupId); SET_ERROR_MSG_TMQ("open delayed task queue failed") goto _failed; } code = taosAllocateQall(&pTmq->qall); if (code) { - terrno = code; - tscError("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, terrstr(), pTmq->groupId); + tscError("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, tstrerror(code), pTmq->groupId); SET_ERROR_MSG_TMQ("allocate qall failed") goto _failed; } if (conf->groupId[0] == 0) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - tscError("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, terrstr(), pTmq->groupId); + tscError("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, tstrerror(code), pTmq->groupId); SET_ERROR_MSG_TMQ("malloc tmq element failed or group is empty") goto _failed; } @@ -1173,8 +1229,8 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { pTmq->epoch = 0; // set conf - strcpy(pTmq->clientId, conf->clientId); - strcpy(pTmq->groupId, conf->groupId); + (void)strcpy(pTmq->clientId, conf->clientId); + (void)strcpy(pTmq->groupId, conf->groupId); pTmq->withTbName = conf->withTbName; pTmq->useSnapshot = conf->snapEnable; pTmq->autoCommit = conf->autoCommit; @@ -1205,7 +1261,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { pTmq->pTscObj = taos_connect_internal(conf->ip, user, pass, NULL, NULL, conf->port, CONN_TYPE__TMQ); if (pTmq->pTscObj == NULL) { tscError("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, terrstr(), pTmq->groupId); - tsem2_destroy(&pTmq->rspSem); + (void)tsem2_destroy(&pTmq->rspSem); SET_ERROR_MSG_TMQ("init tscObj failed") goto _failed; } @@ -1217,10 +1273,13 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { } pTmq->hbLiveTimer = taosTmrStart(tmqSendHbReq, DEFAULT_HEARTBEAT_INTERVAL, (void*)pTmq->refId, tmqMgmt.timer); - + if (pTmq->hbLiveTimer == NULL) { + SET_ERROR_MSG_TMQ("start heartbeat timer failed") + goto _failed; + } char buf[TSDB_OFFSET_LEN] = {0}; STqOffsetVal offset = {.type = pTmq->resetOffsetCfg}; - tFormatOffset(buf, tListLen(buf), &offset); + (void)tFormatOffset(buf, tListLen(buf), &offset); tscInfo("consumer:0x%" PRIx64 " is setup, refId:%" PRId64 ", groupId:%s, snapshot:%d, autoCommit:%d, commitInterval:%dms, offset:%s", pTmq->consumerId, pTmq->refId, pTmq->groupId, pTmq->useSnapshot, pTmq->autoCommit, pTmq->autoCommitInterval, @@ -1248,8 +1307,8 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { req.consumerId = tmq->consumerId; tstrncpy(req.clientId, tmq->clientId, 256); tstrncpy(req.cgroup, tmq->groupId, TSDB_CGROUP_LEN); - req.topicNames = taosArrayInit(sz, sizeof(void*)); + req.topicNames = taosArrayInit(sz, sizeof(void*)); if (req.topicNames == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto FAIL; @@ -1264,22 +1323,38 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { for (int32_t i = 0; i < sz; i++) { char* topic = taosArrayGetP(container, i); - + if (topic == NULL) { + code = TSDB_CODE_INVALID_PARA; + goto FAIL; + } SName name = {0}; - tNameSetDbName(&name, tmq->pTscObj->acctId, topic, strlen(topic)); + code = tNameSetDbName(&name, tmq->pTscObj->acctId, topic, strlen(topic)); + if (code) { + tscError("consumer:0x%" PRIx64 " cgroup:%s, failed to set topic name, code:%d", tmq->consumerId, tmq->groupId, code); + goto FAIL; + } char* topicFName = taosMemoryCalloc(1, TSDB_TOPIC_FNAME_LEN); if (topicFName == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; goto FAIL; } - tNameExtractFullName(&name, topicFName); - tscInfo("consumer:0x%" PRIx64 " subscribe topic:%s", tmq->consumerId, topicFName); + code = tNameExtractFullName(&name, topicFName); + if (code) { + tscError("consumer:0x%" PRIx64 " cgroup:%s, failed to extract topic name, code:%d", tmq->consumerId, tmq->groupId, code); + taosMemoryFree(topicFName); + goto FAIL; + } - taosArrayPush(req.topicNames, &topicFName); + if (taosArrayPush(req.topicNames, &topicFName) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + taosMemoryFree(topicFName); + goto FAIL; + } + tscInfo("consumer:0x%" PRIx64 " subscribe topic:%s", tmq->consumerId, topicFName); } int32_t tlen = tSerializeSCMSubscribeReq(NULL, &req); - buf = taosMemoryMalloc(tlen); if (buf == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; @@ -1287,7 +1362,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { } void* abuf = buf; - tSerializeSCMSubscribeReq(&abuf, &req); + (void)tSerializeSCMSubscribeReq(&abuf, &req); sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); if (sendInfo == NULL) { @@ -1297,7 +1372,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { } SMqSubscribeCbParam param = {.rspErr = 0}; - if (tsem_init(¶m.rspSem, 0, 0) != 0) { + if (tsem2_init(¶m.rspSem, 0, 0) != 0) { code = TSDB_CODE_TSC_INTERNAL_ERROR; taosMemoryFree(buf); taosMemoryFree(sendInfo); @@ -1305,7 +1380,6 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { } sendInfo->msgInfo = (SDataBuf){.pData = buf, .len = tlen, .handle = NULL}; - sendInfo->requestId = generateRequestId(); sendInfo->requestObjRefId = 0; sendInfo->param = ¶m; @@ -1320,8 +1394,8 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { goto FAIL; } - tsem_wait(¶m.rspSem); - tsem_destroy(¶m.rspSem); + (void)tsem2_wait(¶m.rspSem); + (void)tsem2_destroy(¶m.rspSem); if (param.rspErr != 0) { code = param.rspErr; @@ -1343,11 +1417,12 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { taosMsleep(500); } - // init ep timer tmq->epTimer = taosTmrStart(tmqAssignAskEpTask, DEFAULT_ASKEP_INTERVAL, (void*)(tmq->refId), tmqMgmt.timer); - // init auto commit timer - tmq->commitTimer = - taosTmrStart(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, (void*)(tmq->refId), tmqMgmt.timer); + tmq->commitTimer =taosTmrStart(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, (void*)(tmq->refId), tmqMgmt.timer); + if (tmq->epTimer == NULL || tmq->commitTimer == NULL) { + code = TSDB_CODE_TSC_INTERNAL_ERROR; + goto FAIL; + } FAIL: taosArrayDestroyP(req.topicNames, taosMemoryFree); @@ -1361,21 +1436,21 @@ void tmq_conf_set_auto_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb, void* para conf->commitCbUserParam = param; } -static SMqClientVg* getVgInfo(tmq_t* tmq, char* topicName, int32_t vgId) { +static void getVgInfo(tmq_t* tmq, char* topicName, int32_t vgId, SMqClientVg** pVg) { int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics); for (int i = 0; i < topicNumCur; i++) { SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, i); - if (strcmp(pTopicCur->topicName, topicName) == 0) { + if (pTopicCur && strcmp(pTopicCur->topicName, topicName) == 0) { int32_t vgNumCur = taosArrayGetSize(pTopicCur->vgs); for (int32_t j = 0; j < vgNumCur; j++) { SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, j); - if (pVgCur->vgId == vgId) { - return pVgCur; + if (pVgCur && pVgCur->vgId == vgId) { + *pVg = pVgCur; + return; } } } } - return NULL; } static SMqClientTopic* getTopicInfo(tmq_t* tmq, char* topicName) { @@ -1391,7 +1466,8 @@ static SMqClientTopic* getTopicInfo(tmq_t* tmq, char* topicName) { static void setVgIdle(tmq_t* tmq, char* topicName, int32_t vgId) { taosWLockLatch(&tmq->lock); - SMqClientVg* pVg = getVgInfo(tmq, topicName, vgId); + SMqClientVg* pVg = NULL; + getVgInfo(tmq, topicName, vgId, &pVg); if (pVg) { atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); } @@ -1402,7 +1478,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { tmq_t* tmq = NULL; SMqPollCbParam* pParam = (SMqPollCbParam*)param; if (pParam == NULL || pMsg == NULL) { - goto FAIL; + goto FAIL2; } int64_t refId = pParam->refId; int32_t vgId = pParam->vgId; @@ -1410,15 +1486,14 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { tmq = taosAcquireRef(tmqMgmt.rsetId, refId); if (tmq == NULL) { code = TSDB_CODE_TMQ_CONSUMER_CLOSED; - goto FAIL; + goto FAIL2; } - SMqPollRspWrapper* pRspWrapper; + SMqPollRspWrapper* pRspWrapper = NULL; code = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM, 0, (void**)&pRspWrapper); if (code) { tscWarn("consumer:0x%" PRIx64 " msg discard from vgId:%d, since out of memory", tmq->consumerId, vgId); - taosReleaseRef(tmqMgmt.rsetId, refId); - goto FAIL; + goto FAIL1; } if (code != 0) { @@ -1438,7 +1513,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { tscWarn("consumer:0x%" PRIx64 " msg discard from vgId:%d since from earlier epoch, rsp epoch %d, current epoch %d, reqId:0x%" PRIx64, tmq->consumerId, vgId, msgEpoch, clientEpoch, requestId); - code = -1; + code = TSDB_CODE_TMQ_CONSUMER_MISMATCH; goto END; } @@ -1451,56 +1526,51 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { pMsg->pEpSet = NULL; if (rspType == TMQ_MSG_TYPE__POLL_DATA_RSP) { - SDecoder decoder; + SDecoder decoder = {0}; tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead)); if (tDecodeMqDataRsp(&decoder, &pRspWrapper->dataRsp) < 0) { tDecoderClear(&decoder); - taosReleaseRef(tmqMgmt.rsetId, refId); code = TSDB_CODE_OUT_OF_MEMORY; goto END; } tDecoderClear(&decoder); - memcpy(&pRspWrapper->dataRsp, pMsg->pData, sizeof(SMqRspHead)); + (void)memcpy(&pRspWrapper->dataRsp, pMsg->pData, sizeof(SMqRspHead)); char buf[TSDB_OFFSET_LEN] = {0}; - tFormatOffset(buf, TSDB_OFFSET_LEN, &pRspWrapper->dataRsp.common.rspOffset); + (void)tFormatOffset(buf, TSDB_OFFSET_LEN, &pRspWrapper->dataRsp.common.rspOffset); tscDebug("consumer:0x%" PRIx64 " recv poll rsp, vgId:%d, req ver:%" PRId64 ", rsp:%s type %d, reqId:0x%" PRIx64, tmq->consumerId, vgId, pRspWrapper->dataRsp.common.reqOffset.version, buf, rspType, requestId); } else if (rspType == TMQ_MSG_TYPE__POLL_META_RSP) { - SDecoder decoder; + SDecoder decoder = {0}; tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead)); if (tDecodeMqMetaRsp(&decoder, &pRspWrapper->metaRsp) < 0) { tDecoderClear(&decoder); - taosReleaseRef(tmqMgmt.rsetId, refId); code = TSDB_CODE_OUT_OF_MEMORY; goto END; } tDecoderClear(&decoder); - memcpy(&pRspWrapper->metaRsp, pMsg->pData, sizeof(SMqRspHead)); + (void)memcpy(&pRspWrapper->metaRsp, pMsg->pData, sizeof(SMqRspHead)); } else if (rspType == TMQ_MSG_TYPE__POLL_DATA_META_RSP) { - SDecoder decoder; + SDecoder decoder = {0}; tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead)); if (tDecodeSTaosxRsp(&decoder, &pRspWrapper->taosxRsp) < 0) { tDecoderClear(&decoder); - taosReleaseRef(tmqMgmt.rsetId, refId); code = TSDB_CODE_OUT_OF_MEMORY; goto END; } tDecoderClear(&decoder); - memcpy(&pRspWrapper->taosxRsp, pMsg->pData, sizeof(SMqRspHead)); + (void)memcpy(&pRspWrapper->taosxRsp, pMsg->pData, sizeof(SMqRspHead)); } else if (rspType == TMQ_MSG_TYPE__POLL_BATCH_META_RSP) { - SDecoder decoder; + SDecoder decoder = {0}; tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead)); if (tSemiDecodeMqBatchMetaRsp(&decoder, &pRspWrapper->batchMetaRsp) < 0) { tDecoderClear(&decoder); - taosReleaseRef(tmqMgmt.rsetId, refId); code = TSDB_CODE_OUT_OF_MEMORY; - goto FAIL; + goto END; } tDecoderClear(&decoder); - memcpy(&pRspWrapper->batchMetaRsp, pMsg->pData, sizeof(SMqRspHead)); - tscDebug("consumer:0x%" PRIx64 " recv poll batchmeta rsp, vgId:%d, reqId:0x%" PRIx64, tmq->consumerId, vgId, - requestId); + (void)memcpy(&pRspWrapper->batchMetaRsp, pMsg->pData, sizeof(SMqRspHead)); + tscDebug("consumer:0x%" PRIx64 " recv poll batchmeta rsp, vgId:%d, reqId:0x%" PRIx64, tmq->consumerId, vgId,requestId); } else { // invalid rspType tscError("consumer:0x%" PRIx64 " invalid rsp msg received, type:%d ignored", tmq->consumerId, rspType); } @@ -1508,16 +1578,21 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { END: pRspWrapper->code = code; pRspWrapper->vgId = vgId; - strcpy(pRspWrapper->topicName, pParam->topicName); - taosWriteQitem(tmq->mqueue, pRspWrapper); + (void)strcpy(pRspWrapper->topicName, pParam->topicName); + code = taosWriteQitem(tmq->mqueue, pRspWrapper); + if(code != 0){ + tscError("consumer:0x%" PRIx64 " put poll res into mqueue failed, code:%d", tmq->consumerId, code); + } int32_t total = taosQueueItemSize(tmq->mqueue); tscDebug("consumer:0x%" PRIx64 " put poll res into mqueue, type:%d, vgId:%d, total in queue:%d, reqId:0x%" PRIx64, tmq->consumerId, rspType, vgId, total, requestId); + +FAIL1: taosReleaseRef(tmqMgmt.rsetId, refId); -FAIL: - if (tmq) tsem2_post(&tmq->rspSem); +FAIL2: + if (tmq) (void)tsem2_post(&tmq->rspSem); if (pMsg) taosMemoryFreeClear(pMsg->pData); if (pMsg) taosMemoryFreeClear(pMsg->pEpSet); @@ -1546,11 +1621,16 @@ static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopic tscInfo("consumer:0x%" PRIx64 ", update topic:%s, new numOfVgs:%d", tmq->consumerId, pTopic->topicName, vgNumGet); pTopic->vgs = taosArrayInit(vgNumGet, sizeof(SMqClientVg)); - + if (pTopic->vgs == NULL) { + tscError("consumer:0x%" PRIx64 ", failed to init vgs for topic:%s", tmq->consumerId, pTopic->topicName); + return; + } for (int32_t j = 0; j < vgNumGet; j++) { SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j); - - makeTopicVgroupKey(vgKey, pTopic->topicName, pVgEp->vgId); + if (pVgEp == NULL){ + continue; + } + (void)sprintf(vgKey, "%s:%d", pTopic->topicName, pVgEp->vgId); SVgroupSaveInfo* pInfo = taosHashGet(pVgOffsetHashMap, vgKey, strlen(vgKey)); STqOffsetVal offsetNew = {0}; @@ -1583,7 +1663,11 @@ static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopic clientVg.offsetInfo.committedOffset = offsetNew; clientVg.offsetInfo.beginOffset = offsetNew; } - taosArrayPush(pTopic->vgs, &clientVg); + if (taosArrayPush(pTopic->vgs, &clientVg) == NULL){ + tscError("consumer:0x%" PRIx64 ", failed to push vg:%d into topic:%s", tmq->consumerId, pVgEp->vgId, + pTopic->topicName); + freeClientVg(&clientVg); + } } } @@ -1607,7 +1691,7 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) SHashObj* pVgOffsetHashMap = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK); if (pVgOffsetHashMap == NULL) { - taosArrayDestroy(newTopics); + (void)taosArrayDestroy(newTopics); return false; } @@ -1620,15 +1704,18 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) for (int32_t i = 0; i < topicNumCur; i++) { // find old topic SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, i); - if (pTopicCur->vgs) { + if (pTopicCur && pTopicCur->vgs) { int32_t vgNumCur = taosArrayGetSize(pTopicCur->vgs); tscInfo("consumer:0x%" PRIx64 ", current vg num: %d", tmq->consumerId, vgNumCur); for (int32_t j = 0; j < vgNumCur; j++) { SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, j); - makeTopicVgroupKey(vgKey, pTopicCur->topicName, pVgCur->vgId); + if (pVgCur == NULL) { + continue; + } + (void)sprintf(vgKey, "%s:%d", pTopicCur->topicName, pVgCur->vgId); char buf[TSDB_OFFSET_LEN] = {0}; - tFormatOffset(buf, TSDB_OFFSET_LEN, &pVgCur->offsetInfo.endOffset); + (void)tFormatOffset(buf, TSDB_OFFSET_LEN, &pVgCur->offsetInfo.endOffset); tscInfo("consumer:0x%" PRIx64 ", epoch:%d vgId:%d vgKey:%s, offset:%s", tmq->consumerId, epoch, pVgCur->vgId, vgKey, buf); @@ -1637,7 +1724,9 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) .commitOffset = pVgCur->offsetInfo.committedOffset, .numOfRows = pVgCur->numOfRows, .vgStatus = pVgCur->vgStatus}; - taosHashPut(pVgOffsetHashMap, vgKey, strlen(vgKey), &info, sizeof(SVgroupSaveInfo)); + if(taosHashPut(pVgOffsetHashMap, vgKey, strlen(vgKey), &info, sizeof(SVgroupSaveInfo)) != 0){ + tscError("consumer:0x%" PRIx64 ", failed to put vg:%d into hashmap", tmq->consumerId, pVgCur->vgId); + } } } } @@ -1645,15 +1734,21 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) for (int32_t i = 0; i < topicNumGet; i++) { SMqClientTopic topic = {0}; SMqSubTopicEp* pTopicEp = taosArrayGet(pRsp->topics, i); + if (pTopicEp == NULL) { + continue; + } initClientTopicFromRsp(&topic, pTopicEp, pVgOffsetHashMap, tmq); - taosArrayPush(newTopics, &topic); + if(taosArrayPush(newTopics, &topic) == NULL){ + tscError("consumer:0x%" PRIx64 ", failed to push topic:%s into new topics", tmq->consumerId, topic.topicName); + freeClientTopic(&topic); + } } taosHashCleanup(pVgOffsetHashMap); // destroy current buffered existed topics info if (tmq->clientTopics) { - taosArrayDestroyEx(tmq->clientTopics, freeClientVgImpl); + taosArrayDestroyEx(tmq->clientTopics, freeClientTopic); } tmq->clientTopics = newTopics; taosWUnLockLatch(&tmq->lock); @@ -1668,9 +1763,9 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) void tmqBuildConsumeReqImpl(SMqPollReq* pReq, tmq_t* tmq, int64_t timeout, SMqClientTopic* pTopic, SMqClientVg* pVg) { int32_t groupLen = strlen(tmq->groupId); - memcpy(pReq->subKey, tmq->groupId, groupLen); + (void)memcpy(pReq->subKey, tmq->groupId, groupLen); pReq->subKey[groupLen] = TMQ_SEPARATOR; - strcpy(pReq->subKey + groupLen + 1, pTopic->topicName); + (void)strcpy(pReq->subKey + groupLen + 1, pTopic->topicName); pReq->withTbName = tmq->withTbName; pReq->consumerId = tmq->consumerId; @@ -1685,36 +1780,41 @@ void tmqBuildConsumeReqImpl(SMqPollReq* pReq, tmq_t* tmq, int64_t timeout, SMqCl pReq->enableBatchMeta = tmq->enableBatchMeta; } -SMqMetaRspObj* tmqBuildMetaRspFromWrapper(SMqPollRspWrapper* pWrapper) { +int32_t tmqBuildMetaRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqMetaRspObj** ppRspObj) { SMqMetaRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqMetaRspObj)); if (pRspObj == NULL) { - return NULL; + return TSDB_CODE_OUT_OF_MEMORY; } pRspObj->resType = RES_TYPE__TMQ_META; tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN); tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN); pRspObj->vgId = pWrapper->vgHandle->vgId; - memcpy(&pRspObj->metaRsp, &pWrapper->metaRsp, sizeof(SMqMetaRsp)); - return pRspObj; + (void)memcpy(&pRspObj->metaRsp, &pWrapper->metaRsp, sizeof(SMqMetaRsp)); + *ppRspObj = pRspObj; + return 0; } -SMqBatchMetaRspObj* tmqBuildBatchMetaRspFromWrapper(SMqPollRspWrapper* pWrapper) { +int32_t tmqBuildBatchMetaRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqBatchMetaRspObj** ppRspObj) { SMqBatchMetaRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqBatchMetaRspObj)); if (pRspObj == NULL) { - return NULL; + return TSDB_CODE_OUT_OF_MEMORY; } pRspObj->common.resType = RES_TYPE__TMQ_BATCH_META; tstrncpy(pRspObj->common.topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN); tstrncpy(pRspObj->common.db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN); pRspObj->common.vgId = pWrapper->vgHandle->vgId; - memcpy(&pRspObj->rsp, &pWrapper->batchMetaRsp, sizeof(SMqBatchMetaRsp)); + (void)memcpy(&pRspObj->rsp, &pWrapper->batchMetaRsp, sizeof(SMqBatchMetaRsp)); tscDebug("build batchmeta Rsp from wrapper"); - return pRspObj; + *ppRspObj = pRspObj; + return 0; } void changeByteEndian(char* pData) { + if (pData == NULL) { + return; + } char* p = pData; // | version | total length | total rows | total columns | flag seg| block group id | column schema | each column @@ -1740,6 +1840,9 @@ void changeByteEndian(char* pData) { } static void tmqGetRawDataRowsPrecisionFromRes(void* pRetrieve, void** rawData, int64_t* rows, int32_t* precision) { + if (pRetrieve == NULL) { + return; + } if (*(int64_t*)pRetrieve == 0) { *rawData = ((SRetrieveTableRsp*)pRetrieve)->data; *rows = htobe64(((SRetrieveTableRsp*)pRetrieve)->numOfRows); @@ -1771,6 +1874,10 @@ static void tmqBuildRspFromWrapperInner(SMqPollRspWrapper* pWrapper, SMqClientVg if (!pDataRsp->withSchema) { // withSchema is false if subscribe subquery, true if subscribe db or stable pDataRsp->withSchema = true; pDataRsp->blockSchema = taosArrayInit(pDataRsp->blockNum, sizeof(void*)); + if (pDataRsp->blockSchema == NULL){ + tscError("failed to allocate memory for blockSchema"); + return; + } } // extract the rows in this data packet for (int32_t i = 0; i < pDataRsp->blockNum; ++i) { @@ -1786,33 +1893,38 @@ static void tmqBuildRspFromWrapperInner(SMqPollRspWrapper* pWrapper, SMqClientVg if (needTransformSchema) { // withSchema is false if subscribe subquery, true if subscribe db or stable SSchemaWrapper* schema = tCloneSSchemaWrapper(&pWrapper->topicHandle->schema); if (schema) { - taosArrayPush(pDataRsp->blockSchema, &schema); + if (taosArrayPush(pDataRsp->blockSchema, &schema) == NULL){ + tscError("failed to push schema into blockSchema"); + continue; + } } } } } -SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, int64_t* numOfRows) { +int32_t tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, int64_t* numOfRows, SMqRspObj** ppRspObj) { SMqRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqRspObj)); if (pRspObj == NULL) { - return NULL; + return TSDB_CODE_OUT_OF_MEMORY; } pRspObj->common.resType = RES_TYPE__TMQ; - memcpy(&pRspObj->rsp, &pWrapper->dataRsp, sizeof(SMqDataRsp)); + (void)memcpy(&pRspObj->rsp, &pWrapper->dataRsp, sizeof(SMqDataRsp)); tmqBuildRspFromWrapperInner(pWrapper, pVg, numOfRows, &pRspObj->common, &pRspObj->rsp.common); - return pRspObj; + *ppRspObj = pRspObj; + return 0; } -SMqTaosxRspObj* tmqBuildTaosxRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, int64_t* numOfRows) { +int32_t tmqBuildTaosxRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, int64_t* numOfRows, SMqTaosxRspObj** ppRspObj) { SMqTaosxRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqTaosxRspObj)); if (pRspObj == NULL) { - return NULL; + return TSDB_CODE_OUT_OF_MEMORY; } pRspObj->common.resType = RES_TYPE__TMQ_METADATA; - memcpy(&pRspObj->rsp, &pWrapper->taosxRsp, sizeof(STaosxRsp)); + (void)memcpy(&pRspObj->rsp, &pWrapper->taosxRsp, sizeof(STaosxRsp)); tmqBuildRspFromWrapperInner(pWrapper, pVg, numOfRows, &pRspObj->common, &pRspObj->rsp.common); - return pRspObj; + *ppRspObj = pRspObj; + return 0; } static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* pVg, int64_t timeout) { @@ -1849,13 +1961,14 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p } pParam->refId = pTmq->refId; - strcpy(pParam->topicName, pTopic->topicName); + (void)strcpy(pParam->topicName, pTopic->topicName); pParam->vgId = pVg->vgId; pParam->requestId = req.reqId; sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); if (sendInfo == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; + taosMemoryFreeClear(pParam); taosMemoryFreeClear(msg); return code; } @@ -1870,7 +1983,7 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p int64_t transporterId = 0; char offsetFormatBuf[TSDB_OFFSET_LEN] = {0}; - tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pVg->offsetInfo.endOffset); + (void)tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pVg->offsetInfo.endOffset); code = asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo); tscDebug("consumer:0x%" PRIx64 " send poll to %s vgId:%d, code:%d, epoch %d, req:%s, reqId:0x%" PRIx64, pTmq->consumerId, pTopic->topicName, pVg->vgId, code, pTmq->epoch, offsetFormatBuf, req.reqId); @@ -1898,6 +2011,9 @@ static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) { for (int i = 0; i < numOfTopics; i++) { SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i); + if (pTopic == NULL){ + continue; + } int32_t numOfVg = taosArrayGetSize(pTopic->vgs); if (pTopic->noPrivilege) { tscDebug("consumer:0x%" PRIx64 " has no privilegr for topic:%s", tmq->consumerId, pTopic->topicName); @@ -1905,6 +2021,9 @@ static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) { } for (int j = 0; j < numOfVg; j++) { SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j); + if (pVg == NULL) { + continue; + } int64_t elapsed = taosGetTimestampMs() - pVg->emptyBlockReceiveTs; if (elapsed < EMPTY_BLOCK_POLL_IDLE_DURATION && elapsed >= 0) { // less than 10ms tscDebug("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for 10ms before start next poll", tmq->consumerId, @@ -1966,11 +2085,11 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) { while (1) { SMqRspWrapper* pRspWrapper = NULL; - taosGetQitem(tmq->qall, (void**)&pRspWrapper); + (void)taosGetQitem(tmq->qall, (void**)&pRspWrapper); if (pRspWrapper == NULL) { - taosReadAllQitems(tmq->mqueue, tmq->qall); - taosGetQitem(tmq->qall, (void**)&pRspWrapper); + (void)taosReadAllQitems(tmq->mqueue, tmq->qall); + (void)taosGetQitem(tmq->qall, (void**)&pRspWrapper); if (pRspWrapper == NULL) { return NULL; } @@ -1984,17 +2103,19 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) { atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__RECOVER); tscDebug("consumer:0x%" PRIx64 " wait for the rebalance, set status to be RECOVER", tmq->consumerId); } else if (pRspWrapper->code == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) { - terrno = pRspWrapper->code; - tscError("consumer:0x%" PRIx64 " unexpected rsp from poll, code:%s", tmq->consumerId, - tstrerror(pRspWrapper->code)); + tscInfo("consumer:0x%" PRIx64 " return null since no committed offset", tmq->consumerId); } else { if (pRspWrapper->code == TSDB_CODE_VND_INVALID_VGROUP_ID) { // for vnode transform - askEp(tmq, NULL, false, true); + int32_t code = askEp(tmq, NULL, false, true); + if (code != 0) { + tscError("consumer:0x%" PRIx64 " failed to ask ep, code:%s", tmq->consumerId, tstrerror(code)); + } } tscError("consumer:0x%" PRIx64 " msg from vgId:%d discarded, since %s", tmq->consumerId, pollRspWrapper->vgId, tstrerror(pRspWrapper->code)); taosWLockLatch(&tmq->lock); - SMqClientVg* pVg = getVgInfo(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId); + SMqClientVg* pVg = NULL; + getVgInfo(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId, &pVg); if (pVg) pVg->emptyBlockReceiveTs = taosGetTimestampMs(); taosWUnLockLatch(&tmq->lock); } @@ -2008,7 +2129,8 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) { if (pDataRsp->head.epoch == consumerEpoch) { taosWLockLatch(&tmq->lock); - SMqClientVg* pVg = getVgInfo(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId); + SMqClientVg* pVg = NULL; + getVgInfo(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId, &pVg); pollRspWrapper->vgHandle = pVg; pollRspWrapper->topicHandle = getTopicInfo(tmq, pollRspWrapper->topicName); if (pollRspWrapper->vgHandle == NULL || pollRspWrapper->topicHandle == NULL) { @@ -2032,7 +2154,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) { tmq->consumerId, pDataRsp->blockNum != 0); char buf[TSDB_OFFSET_LEN] = {0}; - tFormatOffset(buf, TSDB_OFFSET_LEN, &pDataRsp->rspOffset); + (void)tFormatOffset(buf, TSDB_OFFSET_LEN, &pDataRsp->rspOffset); if (pDataRsp->blockNum == 0) { tscDebug("consumer:0x%" PRIx64 " empty block received, vgId:%d, offset:%s, vg total:%" PRId64 ", total:%" PRId64 ", reqId:0x%" PRIx64, @@ -2043,14 +2165,18 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) { taosWUnLockLatch(&tmq->lock); } else { // build rsp int64_t numOfRows = 0; - SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper, pVg, &numOfRows); + SMqRspObj* pRsp = NULL; + (void)tmqBuildRspFromWrapper(pollRspWrapper, pVg, &numOfRows, &pRsp); tmq->totalRows += numOfRows; pVg->emptyBlockReceiveTs = 0; - if (tmq->replayEnable) { + if (pRsp && tmq->replayEnable) { pVg->blockReceiveTs = taosGetTimestampMs(); pVg->blockSleepForReplay = pRsp->rsp.sleepTime; if (pVg->blockSleepForReplay > 0) { - taosTmrStart(tmqReplayTask, pVg->blockSleepForReplay, (void*)(tmq->refId), tmqMgmt.timer); + if (taosTmrStart(tmqReplayTask, pVg->blockSleepForReplay, (void*)(tmq->refId), tmqMgmt.timer) == NULL) { + tscError("consumer:0x%" PRIx64 " failed to start replay timer, vgId:%d, sleep:%"PRId64, tmq->consumerId, + pVg->vgId, pVg->blockSleepForReplay); + } } } tscDebug("consumer:0x%" PRIx64 " process poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%" PRId64 @@ -2076,7 +2202,8 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) { if (pollRspWrapper->metaRsp.head.epoch == consumerEpoch) { taosWLockLatch(&tmq->lock); - SMqClientVg* pVg = getVgInfo(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId); + SMqClientVg* pVg = NULL; + getVgInfo(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId, &pVg); pollRspWrapper->vgHandle = pVg; pollRspWrapper->topicHandle = getTopicInfo(tmq, pollRspWrapper->topicName); if (pollRspWrapper->vgHandle == NULL || pollRspWrapper->topicHandle == NULL) { @@ -2091,7 +2218,8 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) { updateVgInfo(pVg, &pollRspWrapper->metaRsp.rspOffset, &pollRspWrapper->metaRsp.rspOffset, pollRspWrapper->metaRsp.head.walsver, pollRspWrapper->metaRsp.head.walever, tmq->consumerId, true); // build rsp - SMqMetaRspObj* pRsp = tmqBuildMetaRspFromWrapper(pollRspWrapper); + SMqMetaRspObj* pRsp = NULL; + (void)tmqBuildMetaRspFromWrapper(pollRspWrapper, &pRsp); taosFreeQitem(pRspWrapper); taosWUnLockLatch(&tmq->lock); return pRsp; @@ -2110,7 +2238,8 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) { if (pollRspWrapper->batchMetaRsp.head.epoch == consumerEpoch) { taosWLockLatch(&tmq->lock); - SMqClientVg* pVg = getVgInfo(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId); + SMqClientVg* pVg = NULL; + getVgInfo(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId, &pVg); pollRspWrapper->vgHandle = pVg; pollRspWrapper->topicHandle = getTopicInfo(tmq, pollRspWrapper->topicName); if (pollRspWrapper->vgHandle == NULL || pollRspWrapper->topicHandle == NULL) { @@ -2123,11 +2252,11 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) { } // build rsp - void* pRsp = NULL; updateVgInfo(pVg, &pollRspWrapper->batchMetaRsp.rspOffset, &pollRspWrapper->batchMetaRsp.rspOffset, pollRspWrapper->batchMetaRsp.head.walsver, pollRspWrapper->batchMetaRsp.head.walever, tmq->consumerId, true); - pRsp = tmqBuildBatchMetaRspFromWrapper(pollRspWrapper); + SMqBatchMetaRspObj* pRsp = NULL; + (void)tmqBuildBatchMetaRspFromWrapper(pollRspWrapper, &pRsp) ; taosFreeQitem(pRspWrapper); taosWUnLockLatch(&tmq->lock); return pRsp; @@ -2145,7 +2274,8 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) { if (pDataRsp->head.epoch == consumerEpoch) { taosWLockLatch(&tmq->lock); - SMqClientVg* pVg = getVgInfo(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId); + SMqClientVg* pVg = NULL; + getVgInfo(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId, &pVg); pollRspWrapper->vgHandle = pVg; pollRspWrapper->topicHandle = getTopicInfo(tmq, pollRspWrapper->topicName); if (pollRspWrapper->vgHandle == NULL || pollRspWrapper->topicHandle == NULL) { @@ -2174,11 +2304,14 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) { // build rsp int64_t numOfRows = 0; - void* pRsp = tmqBuildTaosxRspFromWrapper(pollRspWrapper, pVg, &numOfRows); + SMqTaosxRspObj* pRsp = NULL; + if (tmqBuildTaosxRspFromWrapper(pollRspWrapper, pVg, &numOfRows, &pRsp) !=0 ) { + tscError("consumer:0x%" PRIx64 " build taosx rsp failed, vgId:%d", tmq->consumerId, pVg->vgId); + } tmq->totalRows += numOfRows; char buf[TSDB_OFFSET_LEN] = {0}; - tFormatOffset(buf, TSDB_OFFSET_LEN, &pVg->offsetInfo.endOffset); + (void)tFormatOffset(buf, TSDB_OFFSET_LEN, &pVg->offsetInfo.endOffset); tscDebug("consumer:0x%" PRIx64 " process taosx poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%" PRId64 ", vg total:%" PRId64 ", total:%" PRId64 ", reqId:0x%" PRIx64, tmq->consumerId, pVg->vgId, buf, pDataRsp->blockNum, numOfRows, pVg->numOfRows, tmq->totalRows, @@ -2198,7 +2331,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) { tscDebug("consumer:0x%" PRIx64 " ep msg received", tmq->consumerId); SMqAskEpRspWrapper* pEpRspWrapper = (SMqAskEpRspWrapper*)pRspWrapper; SMqAskEpRsp* rspMsg = &pEpRspWrapper->msg; - doUpdateLocalEp(tmq, pEpRspWrapper->epoch, rspMsg); + (void)doUpdateLocalEp(tmq, pEpRspWrapper->epoch, rspMsg); tmqFreeRspWrapper(pRspWrapper); taosFreeQitem(pRspWrapper); } else { @@ -2251,9 +2384,6 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { if (rspObj) { tscDebug("consumer:0x%" PRIx64 " return rsp %p", tmq->consumerId, rspObj); return (TAOS_RES*)rspObj; - } else if (terrno == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) { - tscInfo("consumer:0x%" PRIx64 " return null since no committed offset", tmq->consumerId); - return NULL; } if (timeout >= 0) { @@ -2264,10 +2394,9 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { tmq->consumerId, tmq->epoch, startTime, currentTime); return NULL; } - tsem2_timewait(&tmq->rspSem, (timeout - elapsedTime)); + (void)tsem2_timewait(&tmq->rspSem, (timeout - elapsedTime)); } else { - // use tsem_timewait instead of tsem_wait to avoid unexpected stuck - tsem2_timewait(&tmq->rspSem, 1000); + (void)tsem2_timewait(&tmq->rspSem, 1000); } } } @@ -2281,11 +2410,12 @@ static void displayConsumeStatistics(tmq_t* pTmq) { tscDebug("consumer:0x%" PRIx64 " rows dist begin: ", pTmq->consumerId); for (int32_t i = 0; i < numOfTopics; ++i) { SMqClientTopic* pTopics = taosArrayGet(pTmq->clientTopics, i); - + if (pTopics == NULL) continue; tscDebug("consumer:0x%" PRIx64 " topic:%d", pTmq->consumerId, i); int32_t numOfVgs = taosArrayGetSize(pTopics->vgs); for (int32_t j = 0; j < numOfVgs; ++j) { SMqClientVg* pVg = taosArrayGet(pTopics->vgs, j); + if (pVg == NULL) continue; tscDebug("topic:%s, %d. vgId:%d rows:%" PRId64, pTopics->topicName, j, pVg->vgId, pVg->numOfRows); } } @@ -2293,34 +2423,54 @@ static void displayConsumeStatistics(tmq_t* pTmq) { tscDebug("consumer:0x%" PRIx64 " rows dist end", pTmq->consumerId); } +static int32_t innerClose(tmq_t* tmq){ + if (tmq->status != TMQ_CONSUMER_STATUS__READY) { + tscInfo("consumer:0x%" PRIx64 " not in ready state, unsubscribe it directly", tmq->consumerId); + return 0; + } + if (tmq->autoCommit) { + int32_t code = tmq_commit_sync(tmq, NULL); + if (code != 0) { + return code; + } + } + tmqSendHbReq((void*)(tmq->refId), NULL); + + tmq_list_t* lst = tmq_list_new(); + if (lst == NULL){ + return TSDB_CODE_OUT_OF_MEMORY; + } + int32_t code = tmq_subscribe(tmq, lst); + tmq_list_destroy(lst); + return code; +} +int32_t tmq_unsubscribe(tmq_t* tmq) { + if (tmq == NULL) return TSDB_CODE_INVALID_PARA; + int32_t code = 0; + if (atomic_load_8(&tmq->status) != TMQ_CONSUMER_STATUS__CLOSED) { + code = innerClose(tmq); + if(code == 0){ + atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__CLOSED); + } + } + return code; +} + int32_t tmq_consumer_close(tmq_t* tmq) { if (tmq == NULL) return TSDB_CODE_INVALID_PARA; tscInfo("consumer:0x%" PRIx64 " start to close consumer, status:%d", tmq->consumerId, tmq->status); displayConsumeStatistics(tmq); - - if (tmq->status == TMQ_CONSUMER_STATUS__READY) { - // if auto commit is set, commit before close consumer. Otherwise, do nothing. - if (tmq->autoCommit) { - int32_t code = tmq_commit_sync(tmq, NULL); - if (code != 0) { - return code; - } + int32_t code = 0; + if (atomic_load_8(&tmq->status) != TMQ_CONSUMER_STATUS__CLOSED) { + code = innerClose(tmq); + if(code == 0){ + atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__CLOSED); + taosRemoveRef(tmqMgmt.rsetId, tmq->refId); } - tmqSendHbReq((void*)(tmq->refId), NULL); - - tmq_list_t* lst = tmq_list_new(); - int32_t code = tmq_subscribe(tmq, lst); - tmq_list_destroy(lst); - if (code != 0) { - return code; - } - } else { - tscInfo("consumer:0x%" PRIx64 " not in ready state, close it directly", tmq->consumerId); } - taosRemoveRef(tmqMgmt.rsetId, tmq->refId); - return 0; + return code; } const char* tmq_err2str(int32_t err) { @@ -2332,7 +2482,7 @@ const char* tmq_err2str(int32_t err) { if (*(taosGetErrMsg()) == 0) { return tstrerror(err); } else { - snprintf(taosGetErrMsgReturn(), ERR_MSG_LEN, "%s,detail:%s", tstrerror(err), taosGetErrMsg()); + (void)snprintf(taosGetErrMsgReturn(), ERR_MSG_LEN, "%s,detail:%s", tstrerror(err), taosGetErrMsg()); return (const char*)taosGetErrMsgReturn(); } } @@ -2360,10 +2510,17 @@ const char* tmq_get_topic_name(TAOS_RES* res) { return NULL; } if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res) || TD_RES_TMQ_BATCH_META(res)) { - return strchr(((SMqRspObjCommon*)res)->topic, '.') + 1; + char *tmp = strchr(((SMqRspObjCommon*)res)->topic, '.'); + if (tmp == NULL) { + return NULL; + } + return tmp + 1; } else if (TD_RES_TMQ_META(res)) { - SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res; - return strchr(pMetaRspObj->topic, '.') + 1; + char *tmp = strchr(((SMqMetaRspObj*)res)->topic, '.'); + if (tmp == NULL) { + return NULL; + } + return tmp + 1; } else { return NULL; } @@ -2375,10 +2532,17 @@ const char* tmq_get_db_name(TAOS_RES* res) { } if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res) || TD_RES_TMQ_BATCH_META(res)) { - return strchr(((SMqRspObjCommon*)res)->db, '.') + 1; + char *tmp = strchr(((SMqRspObjCommon*)res)->db, '.'); + if (tmp == NULL) { + return NULL; + } + return tmp + 1; } else if (TD_RES_TMQ_META(res)) { - SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res; - return strchr(pMetaRspObj->db, '.') + 1; + char *tmp = strchr(((SMqMetaRspObj*)res)->db, '.'); + if (tmp == NULL) { + return NULL; + } + return tmp + 1; } else { return NULL; } @@ -2391,8 +2555,7 @@ int32_t tmq_get_vgroup_id(TAOS_RES* res) { if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res) || TD_RES_TMQ_BATCH_META(res)) { return ((SMqRspObjCommon*)res)->vgId; } else if (TD_RES_TMQ_META(res)) { - SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res; - return pMetaRspObj->vgId; + return ((SMqMetaRspObj*)res)->vgId; } else { return TSDB_CODE_INVALID_PARA; } @@ -2463,7 +2626,7 @@ void tmq_commit_async(tmq_t* tmq, const TAOS_RES* pRes, tmq_commit_cb* cb, void* static void commitCallBackFn(tmq_t* UNUSED_PARAM(tmq), int32_t code, void* param) { SSyncCommitInfo* pInfo = (SSyncCommitInfo*)param; pInfo->code = code; - tsem_post(&pInfo->sem); + (void)tsem2_post(&pInfo->sem); } int32_t tmq_commit_sync(tmq_t* tmq, const TAOS_RES* pRes) { @@ -2475,7 +2638,15 @@ int32_t tmq_commit_sync(tmq_t* tmq, const TAOS_RES* pRes) { int32_t code = 0; SSyncCommitInfo* pInfo = taosMemoryMalloc(sizeof(SSyncCommitInfo)); - tsem_init(&pInfo->sem, 0, 0); + if(pInfo == NULL) { + tscError("failed to allocate memory for sync commit"); + return TSDB_CODE_OUT_OF_MEMORY; + } + if (tsem2_init(&pInfo->sem, 0, 0) != 0) { + tscError("failed to init sem for sync commit"); + taosMemoryFree(pInfo); + return TSDB_CODE_OUT_OF_MEMORY; + } pInfo->code = 0; if (pRes == NULL) { @@ -2484,10 +2655,10 @@ int32_t tmq_commit_sync(tmq_t* tmq, const TAOS_RES* pRes) { asyncCommitFromResult(tmq, pRes, commitCallBackFn, pInfo); } - tsem_wait(&pInfo->sem); + (void)tsem2_wait(&pInfo->sem); code = pInfo->code; - tsem_destroy(&pInfo->sem); + (void)tsem2_destroy(&pInfo->sem); taosMemoryFree(pInfo); tscInfo("consumer:0x%" PRIx64 " sync res commit done, code:%s", tmq->consumerId, tstrerror(code)); @@ -2518,7 +2689,7 @@ int32_t tmq_commit_offset_sync(tmq_t* tmq, const char* pTopicName, int32_t vgId, int32_t accId = tmq->pTscObj->acctId; char tname[TSDB_TOPIC_FNAME_LEN] = {0}; - sprintf(tname, "%d.%s", accId, pTopicName); + (void)snprintf(tname, TSDB_TOPIC_FNAME_LEN, "%d.%s", accId, pTopicName); taosWLockLatch(&tmq->lock); SMqClientVg* pVg = NULL; @@ -2544,17 +2715,20 @@ int32_t tmq_commit_offset_sync(tmq_t* tmq, const char* pTopicName, int32_t vgId, return TSDB_CODE_OUT_OF_MEMORY; } - tsem_init(&pInfo->sem, 0, 0); + if (tsem2_init(&pInfo->sem, 0, 0) != 0) { + taosMemoryFree(pInfo); + return TSDB_CODE_OUT_OF_MEMORY; + } pInfo->code = 0; code = asyncCommitOffset(tmq, tname, vgId, &offsetVal, commitCallBackFn, pInfo); if (code == 0) { - tsem_wait(&pInfo->sem); + (void)tsem2_wait(&pInfo->sem); code = pInfo->code; } if (code == TSDB_CODE_TMQ_SAME_COMMITTED_VALUE) code = TSDB_CODE_SUCCESS; - tsem_destroy(&pInfo->sem); + (void)tsem2_destroy(&pInfo->sem); taosMemoryFree(pInfo); tscInfo("consumer:0x%" PRIx64 " sync send commit to vgId:%d, offset:%" PRId64 " code:%s", tmq->consumerId, vgId, @@ -2629,7 +2803,7 @@ int32_t askEpCb(void* param, SDataBuf* pMsg, int32_t code) { if (pParam->sync) { SMqAskEpRsp rsp = {0}; tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp); - doUpdateLocalEp(tmq, head->epoch, &rsp); + (void)doUpdateLocalEp(tmq, head->epoch, &rsp); tDeleteSMqAskEpRsp(&rsp); } else { SMqAskEpRspWrapper* pWrapper; @@ -2640,7 +2814,7 @@ int32_t askEpCb(void* param, SDataBuf* pMsg, int32_t code) { pWrapper->tmqRspType = TMQ_MSG_TYPE__EP_RSP; pWrapper->epoch = head->epoch; - memcpy(&pWrapper->msg, pMsg->pData, sizeof(SMqRspHead)); + (void)memcpy(&pWrapper->msg, pMsg->pData, sizeof(SMqRspHead)); tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pWrapper->msg); taosWriteQitem(tmq->mqueue, pWrapper); @@ -2654,7 +2828,7 @@ FAIL: SAskEpInfo* pInfo = pParam->pParam; if (pInfo) { pInfo->code = code; - tsem_post(&pInfo->sem); + (void)tsem2_post(&pInfo->sem); } } @@ -2669,15 +2843,18 @@ FAIL: int32_t syncAskEp(tmq_t* pTmq) { SAskEpInfo* pInfo = taosMemoryMalloc(sizeof(SAskEpInfo)); if (pInfo == NULL) return TSDB_CODE_OUT_OF_MEMORY; - tsem_init(&pInfo->sem, 0, 0); + if (tsem2_init(&pInfo->sem, 0, 0) != 0) { + taosMemoryFree(pInfo); + return TSDB_CODE_TSC_INTERNAL_ERROR; + } int32_t code = askEp(pTmq, pInfo, true, false); if (code == 0) { - tsem_wait(&pInfo->sem); + (void)tsem2_wait(&pInfo->sem); code = pInfo->code; } - tsem_destroy(&pInfo->sem); + (void)tsem2_destroy(&pInfo->sem); taosMemoryFree(pInfo); return code; } @@ -2686,7 +2863,7 @@ int32_t askEp(tmq_t* pTmq, void* param, bool sync, bool updateEpSet) { SMqAskEpReq req = {0}; req.consumerId = pTmq->consumerId; req.epoch = updateEpSet ? -1 : pTmq->epoch; - strcpy(req.cgroup, pTmq->groupId); + (void)strcpy(req.cgroup, pTmq->groupId); int code = 0; SMqAskEpCbParam* pParam = NULL; void* pReq = NULL; @@ -2740,18 +2917,13 @@ int32_t askEp(tmq_t* pTmq, void* param, bool sync, bool updateEpSet) { return asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &epSet, NULL, sendInfo); } -int32_t makeTopicVgroupKey(char* dst, const char* topicName, int32_t vg) { - return sprintf(dst, "%s:%d", topicName, vg); -} - int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet) { int64_t refId = pParamSet->refId; tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId); if (tmq == NULL) { taosMemoryFree(pParamSet); - terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED; - return terrno; + return TSDB_CODE_TMQ_CONSUMER_CLOSED; } // if no more waiting rsp @@ -2760,23 +2932,23 @@ int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet) { } taosMemoryFree(pParamSet); - taosReleaseRef(tmqMgmt.rsetId, refId); - return 0; + return taosReleaseRef(tmqMgmt.rsetId, refId); } -void commitRspCountDown(SMqCommitCbParamSet* pParamSet, int64_t consumerId, const char* pTopic, int32_t vgId) { +int32_t commitRspCountDown(SMqCommitCbParamSet* pParamSet, int64_t consumerId, const char* pTopic, int32_t vgId) { int32_t waitingRspNum = atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1); if (waitingRspNum == 0) { tscInfo("consumer:0x%" PRIx64 " topic:%s vgId:%d all commit-rsp received, commit completed", consumerId, pTopic, vgId); - tmqCommitDone(pParamSet); + return tmqCommitDone(pParamSet); } else { tscInfo("consumer:0x%" PRIx64 " topic:%s vgId:%d commit-rsp received, remain:%d", consumerId, pTopic, vgId, waitingRspNum); } + return 0; } -SReqResultInfo* tmqGetNextResInfo(TAOS_RES* res, bool convertUcs4) { +int32_t tmqGetNextResInfo(TAOS_RES* res, bool convertUcs4, SReqResultInfo** pResInfo) { SMqDataRspCommon* common = (SMqDataRspCommon*)POINTER_SHIFT(res, sizeof(SMqRspObjCommon)); SMqRspObjCommon* pRspObj = (SMqRspObjCommon*)res; pRspObj->resIter++; @@ -2784,7 +2956,9 @@ SReqResultInfo* tmqGetNextResInfo(TAOS_RES* res, bool convertUcs4) { if (common->withSchema) { doFreeReqResultInfo(&pRspObj->resInfo); SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(common->blockSchema, pRspObj->resIter); - setResSchemaInfo(&pRspObj->resInfo, pSW->pSchema, pSW->nCols); + if (pSW){ + setResSchemaInfo(&pRspObj->resInfo, pSW->pSchema, pSW->nCols); + } } void* pRetrieve = taosArrayGetP(common->blockData, pRspObj->resIter); @@ -2798,15 +2972,17 @@ SReqResultInfo* tmqGetNextResInfo(TAOS_RES* res, bool convertUcs4) { pRspObj->resInfo.current = 0; pRspObj->resInfo.precision = precision; - // TODO handle the compressed case pRspObj->resInfo.totalRows += pRspObj->resInfo.numOfRows; - setResultDataPtr(&pRspObj->resInfo, pRspObj->resInfo.fields, pRspObj->resInfo.numOfCols, pRspObj->resInfo.numOfRows, + int32_t code = setResultDataPtr(&pRspObj->resInfo, pRspObj->resInfo.fields, pRspObj->resInfo.numOfCols, pRspObj->resInfo.numOfRows, convertUcs4); - - return &pRspObj->resInfo; + if (code != 0){ + return code; + } + *pResInfo = &pRspObj->resInfo; + return code; } - return NULL; + return TSDB_CODE_TSC_INTERNAL_ERROR; } static int32_t tmqGetWalInfoCb(void* param, SDataBuf* pMsg, int32_t code) { @@ -2820,28 +2996,36 @@ static int32_t tmqGetWalInfoCb(void* param, SDataBuf* pMsg, int32_t code) { if (code != TSDB_CODE_SUCCESS) { tscError("consumer:0x%" PRIx64 " failed to get the wal info from vgId:%d for topic:%s", pCommon->consumerId, pParam->vgId, pCommon->pTopicName); - pCommon->code = code; + } else { - SMqDataRsp rsp; - SDecoder decoder; + SMqDataRsp rsp = {0}; + SDecoder decoder = {0}; tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead)); - tDecodeMqDataRsp(&decoder, &rsp); + code = tDecodeMqDataRsp(&decoder, &rsp); tDecoderClear(&decoder); + if (code != 0){ + goto END; + } SMqRspHead* pHead = pMsg->pData; - tmq_topic_assignment assignment = {.begin = pHead->walsver, .end = pHead->walever + 1, .currentOffset = rsp.common.rspOffset.version, .vgId = pParam->vgId}; taosThreadMutexLock(&pCommon->mutex); - taosArrayPush(pCommon->pList, &assignment); + if(taosArrayPush(pCommon->pList, &assignment) == NULL){ + tscError("consumer:0x%" PRIx64 " failed to push the wal info from vgId:%d for topic:%s", pCommon->consumerId, + pParam->vgId, pCommon->pTopicName); + code = TSDB_CODE_TSC_INTERNAL_ERROR; + } taosThreadMutexUnlock(&pCommon->mutex); } +END: + pCommon->code = code; if (total == pParam->totalReq) { - tsem_post(&pCommon->rsp); + (void)tsem2_post(&pCommon->rsp); } if (pMsg) { @@ -2849,15 +3033,15 @@ static int32_t tmqGetWalInfoCb(void* param, SDataBuf* pMsg, int32_t code) { taosMemoryFree(pMsg->pEpSet); } - return 0; + return code; } static void destroyCommonInfo(SMqVgCommon* pCommon) { if (pCommon == NULL) { return; } - taosArrayDestroy(pCommon->pList); - tsem_destroy(&pCommon->rsp); + (void)taosArrayDestroy(pCommon->pList); + (void)tsem2_destroy(&pCommon->rsp); taosThreadMutexDestroy(&pCommon->mutex); taosMemoryFree(pCommon->pTopicName); taosMemoryFree(pCommon); @@ -2877,7 +3061,7 @@ static int32_t tmCommittedCb(void* param, SDataBuf* pMsg, int32_t code) { goto end; } if (pMsg) { - SDecoder decoder; + SDecoder decoder = {0}; tDecoderInit(&decoder, (uint8_t*)pMsg->pData, pMsg->len); if (tDecodeMqVgOffset(&decoder, &pParam->vgOffset) < 0) { tOffsetDestroy(&pParam->vgOffset.offset); @@ -2893,8 +3077,8 @@ end: taosMemoryFree(pMsg->pEpSet); } pParam->code = code; - tsem_post(&pParam->sem); - return 0; + (void)tsem2_post(&pParam->sem); + return code; } int64_t getCommittedFromServer(tmq_t* tmq, char* tname, int32_t vgId, SEpSet* epSet) { @@ -2904,9 +3088,9 @@ int64_t getCommittedFromServer(tmq_t* tmq, char* tname, int32_t vgId, SEpSet* ep pOffset.consumerId = tmq->consumerId; int32_t groupLen = strlen(tmq->groupId); - memcpy(pOffset.offset.subKey, tmq->groupId, groupLen); + (void)memcpy(pOffset.offset.subKey, tmq->groupId, groupLen); pOffset.offset.subKey[groupLen] = TMQ_SEPARATOR; - strcpy(pOffset.offset.subKey + groupLen + 1, tname); + (void)strcpy(pOffset.offset.subKey + groupLen + 1, tname); int32_t len = 0; tEncodeSize(tEncodeMqVgOffset, &pOffset, len, code); @@ -2925,7 +3109,12 @@ int64_t getCommittedFromServer(tmq_t* tmq, char* tname, int32_t vgId, SEpSet* ep SEncoder encoder; tEncoderInit(&encoder, abuf, len); - tEncodeMqVgOffset(&encoder, &pOffset); + code = tEncodeMqVgOffset(&encoder, &pOffset); + if (code < 0) { + taosMemoryFree(buf); + tEncoderClear(&encoder); + return code; + } tEncoderClear(&encoder); SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); @@ -2940,7 +3129,12 @@ int64_t getCommittedFromServer(tmq_t* tmq, char* tname, int32_t vgId, SEpSet* ep taosMemoryFree(sendInfo); return TSDB_CODE_OUT_OF_MEMORY; } - tsem_init(&pParam->sem, 0, 0); + if (tsem2_init(&pParam->sem, 0, 0) != 0){ + taosMemoryFree(buf); + taosMemoryFree(sendInfo); + taosMemoryFree(pParam); + return TSDB_CODE_TSC_INTERNAL_ERROR; + } sendInfo->msgInfo = (SDataBuf){.pData = buf, .len = sizeof(SMsgHead) + len, .handle = NULL}; sendInfo->requestId = generateRequestId(); @@ -2952,12 +3146,12 @@ int64_t getCommittedFromServer(tmq_t* tmq, char* tname, int32_t vgId, SEpSet* ep int64_t transporterId = 0; code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, epSet, &transporterId, sendInfo); if (code != 0) { - tsem_destroy(&pParam->sem); + (void)tsem2_destroy(&pParam->sem); taosMemoryFree(pParam); return code; } - tsem_wait(&pParam->sem); + (void)tsem2_wait(&pParam->sem); code = pParam->code; if (code == TSDB_CODE_SUCCESS) { if (pParam->vgOffset.offset.val.type == TMQ_OFFSET__LOG) { @@ -2967,7 +3161,7 @@ int64_t getCommittedFromServer(tmq_t* tmq, char* tname, int32_t vgId, SEpSet* ep code = TSDB_CODE_TMQ_SNAPSHOT_ERROR; } } - tsem_destroy(&pParam->sem); + (void)tsem2_destroy(&pParam->sem); taosMemoryFree(pParam); return code; @@ -2981,7 +3175,7 @@ int64_t tmq_position(tmq_t* tmq, const char* pTopicName, int32_t vgId) { int32_t accId = tmq->pTscObj->acctId; char tname[TSDB_TOPIC_FNAME_LEN] = {0}; - sprintf(tname, "%d.%s", accId, pTopicName); + (void)snprintf(tname, TSDB_TOPIC_FNAME_LEN, "%d.%s", accId, pTopicName); taosWLockLatch(&tmq->lock); @@ -3040,7 +3234,7 @@ int64_t tmq_committed(tmq_t* tmq, const char* pTopicName, int32_t vgId) { int32_t accId = tmq->pTscObj->acctId; char tname[TSDB_TOPIC_FNAME_LEN] = {0}; - sprintf(tname, "%d.%s", accId, pTopicName); + (void)snprintf(tname, TSDB_TOPIC_FNAME_LEN, "%d.%s", accId, pTopicName); taosWLockLatch(&tmq->lock); @@ -3094,13 +3288,14 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a int32_t accId = tmq->pTscObj->acctId; char tname[TSDB_TOPIC_FNAME_LEN] = {0}; - sprintf(tname, "%d.%s", accId, pTopicName); - int32_t code = TSDB_CODE_SUCCESS; + (void)snprintf(tname, TSDB_TOPIC_FNAME_LEN, "%d.%s", accId, pTopicName); taosWLockLatch(&tmq->lock); - SMqClientTopic* pTopic = getTopicByName(tmq, tname); - if (pTopic == NULL) { - code = TSDB_CODE_TMQ_INVALID_TOPIC; + + SMqClientTopic* pTopic = NULL; + int32_t code = getTopicByName(tmq, tname, &pTopic); + if (code != 0) { + tscError("consumer:0x%" PRIx64 " invalid topic name:%s", tmq->consumerId, pTopicName); goto end; } @@ -3108,6 +3303,9 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a *numOfAssignment = taosArrayGetSize(pTopic->vgs); for (int32_t j = 0; j < (*numOfAssignment); ++j) { SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, j); + if (pClientVg == NULL){ + continue; + } int32_t type = pClientVg->offsetInfo.beginOffset.type; if (isInSnapshotMode(type, tmq->useSnapshot)) { tscError("consumer:0x%" PRIx64 " offset type:%d not wal version, assignment not allowed", tmq->consumerId, type); @@ -3128,6 +3326,9 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a for (int32_t j = 0; j < (*numOfAssignment); ++j) { SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, j); + if (pClientVg == NULL){ + continue; + } if (pClientVg->offsetInfo.beginOffset.type != TMQ_OFFSET__LOG) { needFetch = true; break; @@ -3145,24 +3346,31 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a if (needFetch) { pCommon = taosMemoryCalloc(1, sizeof(SMqVgCommon)); if (pCommon == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - code = terrno; + code = TSDB_CODE_OUT_OF_MEMORY; goto end; } pCommon->pList = taosArrayInit(4, sizeof(tmq_topic_assignment)); - tsem_init(&pCommon->rsp, 0, 0); + if (pCommon->pList == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto end; + } + if (tsem2_init(&pCommon->rsp, 0, 0) != 0){ + code = TSDB_CODE_OUT_OF_MEMORY; + goto end; + } taosThreadMutexInit(&pCommon->mutex, 0); pCommon->pTopicName = taosStrdup(pTopic->topicName); pCommon->consumerId = tmq->consumerId; - terrno = TSDB_CODE_OUT_OF_MEMORY; for (int32_t i = 0; i < (*numOfAssignment); ++i) { SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, i); - + if (pClientVg == NULL){ + continue; + } SMqVgWalInfoParam* pParam = taosMemoryMalloc(sizeof(SMqVgWalInfoParam)); if (pParam == NULL) { - code = terrno; + code = TSDB_CODE_OUT_OF_MEMORY; goto end; } @@ -3178,21 +3386,21 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a int32_t msgSize = tSerializeSMqPollReq(NULL, 0, &req); if (msgSize < 0) { taosMemoryFree(pParam); - code = terrno; + code = TSDB_CODE_OUT_OF_MEMORY; goto end; } char* msg = taosMemoryCalloc(1, msgSize); if (NULL == msg) { taosMemoryFree(pParam); - code = terrno; + code = TSDB_CODE_OUT_OF_MEMORY; goto end; } if (tSerializeSMqPollReq(msg, msgSize, &req) < 0) { taosMemoryFree(msg); taosMemoryFree(pParam); - code = terrno; + code = TSDB_CODE_OUT_OF_MEMORY; goto end; } @@ -3200,7 +3408,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a if (sendInfo == NULL) { taosMemoryFree(pParam); taosMemoryFree(msg); - code = terrno; + code = TSDB_CODE_OUT_OF_MEMORY; goto end; } @@ -3214,7 +3422,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a int64_t transporterId = 0; char offsetFormatBuf[TSDB_OFFSET_LEN] = {0}; - tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pClientVg->offsetInfo.beginOffset); + (void)tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pClientVg->offsetInfo.beginOffset); tscInfo("consumer:0x%" PRIx64 " %s retrieve wal info vgId:%d, epoch %d, req:%s, reqId:0x%" PRIx64, tmq->consumerId, pTopic->topicName, pClientVg->vgId, tmq->epoch, offsetFormatBuf, req.reqId); @@ -3224,10 +3432,9 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a } } - tsem_wait(&pCommon->rsp); + (void)tsem2_wait(&pCommon->rsp); code = pCommon->code; - terrno = code; if (code != TSDB_CODE_SUCCESS) { goto end; } @@ -3242,6 +3449,9 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a for (int32_t i = 0; i < taosArrayGetSize(pTopic->vgs); ++i) { SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, i); + if (pClientVg == NULL){ + continue; + } if (pClientVg->vgId != p->vgId) { continue; } @@ -3285,7 +3495,7 @@ static int32_t tmqSeekCb(void* param, SDataBuf* pMsg, int32_t code) { } SMqSeekParam* pParam = param; pParam->code = code; - tsem_post(&pParam->sem); + (void)tsem2_post(&pParam->sem); return 0; } @@ -3299,7 +3509,7 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_ int32_t accId = tmq->pTscObj->acctId; char tname[TSDB_TOPIC_FNAME_LEN] = {0}; - sprintf(tname, "%d.%s", accId, pTopicName); + (void)snprintf(tname, TSDB_TOPIC_FNAME_LEN, "%d.%s", accId, pTopicName); taosWLockLatch(&tmq->lock); @@ -3366,7 +3576,12 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_ taosMemoryFree(sendInfo); return TSDB_CODE_OUT_OF_MEMORY; } - tsem_init(&pParam->sem, 0, 0); + if (tsem2_init(&pParam->sem, 0, 0) != 0){ + taosMemoryFree(msg); + taosMemoryFree(sendInfo); + taosMemoryFree(pParam); + return TSDB_CODE_TSC_INTERNAL_ERROR; + } sendInfo->msgInfo = (SDataBuf){.pData = msg, .len = msgSize, .handle = NULL}; sendInfo->requestId = generateRequestId(); @@ -3378,14 +3593,14 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_ int64_t transporterId = 0; code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); if (code != 0) { - tsem_destroy(&pParam->sem); + (void)tsem2_destroy(&pParam->sem); taosMemoryFree(pParam); return code; } - tsem_wait(&pParam->sem); + (void)tsem2_wait(&pParam->sem); code = pParam->code; - tsem_destroy(&pParam->sem); + (void)tsem2_destroy(&pParam->sem); taosMemoryFree(pParam); tscInfo("consumer:0x%" PRIx64 "send seek to vgId:%d, return code:%s", tmq->consumerId, vgId, tstrerror(code)); diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 9efff055bf..cf62beefd0 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -9248,7 +9248,7 @@ void tOffsetCopy(STqOffsetVal *pLeft, const STqOffsetVal *pRight) { *pLeft = *pRight; if (IS_VAR_DATA_TYPE(pRight->primaryKey.type)) { pLeft->primaryKey.pData = taosMemoryMalloc(pRight->primaryKey.nData); - memcpy(pLeft->primaryKey.pData, pRight->primaryKey.pData, pRight->primaryKey.nData); + (void)memcpy(pLeft->primaryKey.pData, pRight->primaryKey.pData, pRight->primaryKey.nData); } }