diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index ed83e41427..c4021e5302 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -152,7 +152,6 @@ typedef struct { int32_t vgId; int32_t vgStatus; int32_t vgSkipCnt; // here used to mark the slow vgroups -// bool receivedInfoFromVnode; // has already received info from vnode int64_t emptyBlockReceiveTs; // once empty block is received, idle for ignoreCnt then start to poll data bool seekUpdated; // offset is updated by seek operator, therefore, not update by vnode rsp. SEpSet epSet; @@ -1314,7 +1313,6 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { } if (code != 0) { - // in case of consumer mismatch, wait for 500ms and retry if (code == TSDB_CODE_TMQ_CONSUMER_MISMATCH) { atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__RECOVER); tscDebug("consumer:0x%" PRIx64 " wait for the re-balance, set status to be RECOVER", @@ -1672,35 +1670,35 @@ SMqTaosxRspObj* tmqBuildTaosxRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqClie return pRspObj; } -static int32_t handleErrorBeforePoll(SMqClientVg* pVg, tmq_t* pTmq) { - atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); - tsem_post(&pTmq->rspSem); - return -1; -} - static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* pVg, int64_t timeout) { SMqPollReq req = {0}; + char* msg = NULL; + SMqPollCbParam* pParam = NULL; + SMsgSendInfo* sendInfo = NULL; + int code = 0; tmqBuildConsumeReqImpl(&req, pTmq, timeout, pTopic, pVg); int32_t msgSize = tSerializeSMqPollReq(NULL, 0, &req); - if (msgSize < 0) { - return handleErrorBeforePoll(pVg, pTmq); + if (msgSize < 0){ + code = TSDB_CODE_INVALID_MSG; + goto FAIL; } - char* msg = taosMemoryCalloc(1, msgSize); + msg = taosMemoryCalloc(1, msgSize); if (NULL == msg) { - return handleErrorBeforePoll(pVg, pTmq); + code = TSDB_CODE_OUT_OF_MEMORY; + goto FAIL; } if (tSerializeSMqPollReq(msg, msgSize, &req) < 0) { - taosMemoryFree(msg); - return handleErrorBeforePoll(pVg, pTmq); + code = TSDB_CODE_INVALID_MSG; + goto FAIL; } - SMqPollCbParam* pParam = taosMemoryMalloc(sizeof(SMqPollCbParam)); + pParam = taosMemoryMalloc(sizeof(SMqPollCbParam)); if (pParam == NULL) { - taosMemoryFree(msg); - return handleErrorBeforePoll(pVg, pTmq); + code = TSDB_CODE_OUT_OF_MEMORY; + goto FAIL; } pParam->refId = pTmq->refId; @@ -1708,11 +1706,10 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p pParam->vgId = pVg->vgId; pParam->requestId = req.reqId; - SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); + sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); if (sendInfo == NULL) { - taosMemoryFree(pParam); - taosMemoryFree(msg); - return handleErrorBeforePoll(pVg, pTmq); + code = TSDB_CODE_OUT_OF_MEMORY; + goto FAIL; } sendInfo->msgInfo = (SDataBuf){.pData = msg, .len = msgSize, .handle = NULL}; @@ -1728,13 +1725,21 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p tscDebug("consumer:0x%" PRIx64 " send poll to %s vgId:%d, epoch %d, req:%s, reqId:0x%" PRIx64, pTmq->consumerId, pTopic->topicName, pVg->vgId, pTmq->epoch, offsetFormatBuf, req.reqId); - asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo); + code = asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo); + if(code != 0){ + goto FAIL; + } pVg->pollCnt++; pVg->seekUpdated = false; // reset this flag. pTmq->pollCnt++; - return TSDB_CODE_SUCCESS; + return 0; + +FAIL: + taosMemoryFreeClear(pParam); + taosMemoryFreeClear(msg); + return code; } // broadcast the poll request to all related vnodes @@ -1771,6 +1776,8 @@ static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) { atomic_store_32(&pVg->vgSkipCnt, 0); code = doTmqPollImpl(tmq, pTopic, pVg, timeout); if (code != TSDB_CODE_SUCCESS) { + atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); + tsem_post(&tmq->rspSem); goto end; } } @@ -1782,19 +1789,15 @@ end: return code; } -static int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* pReset) { +static int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper) { if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) { - /*printf("ep %d %d\n", rspMsg->head.epoch, tmq->epoch);*/ if (rspWrapper->epoch > atomic_load_32(&tmq->epoch)) { SMqAskEpRspWrapper* pEpRspWrapper = (SMqAskEpRspWrapper*)rspWrapper; SMqAskEpRsp* rspMsg = &pEpRspWrapper->msg; doUpdateLocalEp(tmq, rspWrapper->epoch, rspMsg); - /*tmqClearUnhandleMsg(tmq);*/ tDeleteSMqAskEpRsp(rspMsg); - *pReset = true; } else { tmqFreeRspWrapper(rspWrapper); - *pReset = false; } } else { return -1; @@ -1819,7 +1822,7 @@ static void updateVgInfo(SMqClientVg* pVg, STqOffsetVal* reqOffset, STqOffsetVal pVg->offsetInfo.walVerEnd = ever + 1; } -static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { +static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) { tscDebug("consumer:0x%" PRIx64 " start to handle the rsp, total:%d", tmq->consumerId, tmq->qall->numOfItems); while (1) { @@ -1972,12 +1975,12 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { tmq->totalRows += numOfRows; - char buf[TSDB_OFFSET_LEN] = {0}; - tFormatOffset(buf, TSDB_OFFSET_LEN, &pVg->offsetInfo.endOffset); - tscDebug("consumer:0x%" PRIx64 " process taosx poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%" PRId64 - ", vg total:%" PRId64 ", total:%" PRId64 ", reqId:0x%" PRIx64, - tmq->consumerId, pVg->vgId, buf, pollRspWrapper->dataRsp.blockNum, numOfRows, pVg->numOfRows, - tmq->totalRows, pollRspWrapper->reqId); + char buf[TSDB_OFFSET_LEN] = {0}; + tFormatOffset(buf, TSDB_OFFSET_LEN, &pVg->offsetInfo.endOffset); + tscDebug("consumer:0x%" PRIx64 " process taosx poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%" PRId64 + ", vg total:%" PRId64 ", total:%" PRId64 ", reqId:0x%" PRIx64, + tmq->consumerId, pVg->vgId, buf, pollRspWrapper->dataRsp.blockNum, numOfRows, pVg->numOfRows, + tmq->totalRows, pollRspWrapper->reqId); taosFreeQitem(pollRspWrapper); taosWUnLockLatch(&tmq->lock); @@ -1991,14 +1994,8 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { } } else { tscDebug("consumer:0x%" PRIx64 " not data msg received", tmq->consumerId); - - bool reset = false; - tmqHandleNoPollRsp(tmq, pRspWrapper, &reset); + tmqHandleNoPollRsp(tmq, pRspWrapper); taosFreeQitem(pRspWrapper); - if (pollIfReset && reset) { - tscDebug("consumer:0x%" PRIx64 ", reset and repoll", tmq->consumerId); - tmqPollImpl(tmq, timeout); - } } } } @@ -2006,7 +2003,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { if(tmq == NULL) return NULL; - void* rspObj; + void* rspObj = NULL; int64_t startTime = taosGetTimestampMs(); tscInfo("consumer:0x%" PRIx64 " start to poll at %" PRId64 ", timeout:%" PRId64, tmq->consumerId, startTime, @@ -2038,7 +2035,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { tscError("consumer:0x%" PRIx64 " return due to poll error", tmq->consumerId); } - rspObj = tmqHandleAllRsp(tmq, timeout, false); + rspObj = tmqHandleAllRsp(tmq, timeout); if (rspObj) { tscDebug("consumer:0x%" PRIx64 " return rsp %p", tmq->consumerId, rspObj); return (TAOS_RES*)rspObj; @@ -2728,7 +2725,14 @@ int64_t getCommittedFromServer(tmq_t *tmq, char* tname, int32_t vgId, SEpSet* ep sendInfo->msgType = TDMT_VND_TMQ_VG_COMMITTEDINFO; int64_t transporterId = 0; - asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, epSet, &transporterId, sendInfo); + code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, epSet, &transporterId, sendInfo); + if(code != 0){ + taosMemoryFree(buf); + taosMemoryFree(sendInfo); + tsem_destroy(&pParam->sem); + taosMemoryFree(pParam); + return code; + } tsem_wait(&pParam->sem); code = pParam->code; @@ -3142,7 +3146,14 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_ sendInfo->msgType = TDMT_VND_TMQ_SEEK; int64_t transporterId = 0; - asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); + code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); + if(code != 0){ + taosMemoryFree(msg); + taosMemoryFree(sendInfo); + tsem_destroy(&pParam->sem); + taosMemoryFree(pParam); + return code; + } tsem_wait(&pParam->sem); code = pParam->code; diff --git a/tests/system-test/7-tmq/subscribeDb3.py b/tests/system-test/7-tmq/subscribeDb3.py index 9e8ca99930..37e3a17100 100644 --- a/tests/system-test/7-tmq/subscribeDb3.py +++ b/tests/system-test/7-tmq/subscribeDb3.py @@ -94,7 +94,7 @@ class TDTestCase: resultList=[] while 1: tdSql.query("select * from %s.consumeresult"%cdbName) - tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3))) + # tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3))) if tdSql.getRows() == expectRows: break else: