fix:do not return error if commit nothing

This commit is contained in:
wangmm0220 2023-09-06 18:29:28 +08:00
parent cc62961337
commit 629de12bb1
7 changed files with 63 additions and 83 deletions

View File

@ -190,7 +190,6 @@ typedef struct {
typedef struct {
int64_t refId;
int32_t epoch;
void* pParam;
__tmq_askep_fn_t pUserFn;
} SMqAskEpCbParam;
@ -708,7 +707,6 @@ static void asyncCommitAllOffsets(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* us
commitRspCountDown(pParamSet, tmq->consumerId, "", 0);
return;
}
code = TSDB_CODE_TMQ_SAME_COMMITTED_VALUE;
end:
taosMemoryFree(pParamSet);
@ -743,20 +741,6 @@ void tmqAssignDelayedCommitTask(void* param, void* tmrId) {
taosMemoryFree(param);
}
//void tmqAssignDelayedReportTask(void* param, void* tmrId) {
// int64_t refId = *(int64_t*)param;
// tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
// if (tmq != NULL) {
// int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM, 0);
// *pTaskType = TMQ_DELAYED_TASK__REPORT;
// taosWriteQitem(tmq->delayedTask, pTaskType);
// tsem_post(&tmq->rspSem);
// }
//
// taosReleaseRef(tmqMgmt.rsetId, refId);
// taosMemoryFree(param);
//}
int32_t tmqHbCb(void* param, SDataBuf* pMsg, int32_t code) {
if (pMsg) {
taosMemoryFree(pMsg->pData);
@ -984,7 +968,7 @@ int32_t tmq_unsubscribe(tmq_t* tmq) {
}
if (tmq->autoCommit) {
int32_t rsp = tmq_commit_sync(tmq, NULL);
if (rsp != 0 && rsp != TSDB_CODE_TMQ_SAME_COMMITTED_VALUE) {
if (rsp != 0) {
return rsp;
}
}
@ -1085,7 +1069,6 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
pTmq->status = TMQ_CONSUMER_STATUS__INIT;
pTmq->pollCnt = 0;
pTmq->epoch = 0;
// pTmq->needReportOffsetRows = true;
// set conf
strcpy(pTmq->clientId, conf->clientId);
@ -1146,7 +1129,7 @@ _failed:
}
int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
if(tmq == NULL) return TSDB_CODE_INVALID_PARA;
if(tmq == NULL || topic_list == NULL) return TSDB_CODE_INVALID_PARA;
const int32_t MAX_RETRY_COUNT = 120 * 2; // let's wait for 2 mins at most
const SArray* container = &topic_list->container;
int32_t sz = taosArrayGetSize(container);
@ -1222,7 +1205,10 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);
int64_t transporterId = 0;
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
if(code != 0){
goto FAIL;
}
// avoid double free if msg is sent
buf = NULL;
@ -1239,7 +1225,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
int32_t retryCnt = 0;
while (TSDB_CODE_MND_CONSUMER_NOT_READY == doAskEp(tmq)) {
if (retryCnt++ > MAX_RETRY_COUNT) {
tscError("consumer:0x%" PRIx64 ", mnd not ready for subscribe, retry:%d in 500ms", tmq->consumerId, retryCnt);
tscError("consumer:0x%" PRIx64 ", mnd not ready for subscribe, retry more than 2 minutes", tmq->consumerId);
code = TSDB_CODE_MND_CONSUMER_NOT_READY;
goto FAIL;
}
@ -1512,10 +1498,9 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp)
taosWLockLatch(&tmq->lock);
int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
char vgKey[TSDB_TOPIC_FNAME_LEN + 22];
char vgKey[TSDB_TOPIC_FNAME_LEN + 22] = {0};
tscInfo("consumer:0x%" PRIx64 " update ep epoch from %d to epoch %d, incoming topics:%d, existed topics:%d",
tmq->consumerId, tmq->epoch, epoch, topicNumGet, topicNumCur);
// todo extract method
for (int32_t i = 0; i < topicNumCur; i++) {
// find old topic
SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, i);
@ -1566,32 +1551,17 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp)
int32_t askEpCallbackFn(void* param, SDataBuf* pMsg, int32_t code) {
SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param;
tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, pParam->refId);
if (tmq == NULL) {
terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED;
// pParam->pUserFn(tmq, terrno, NULL, pParam->pParam);
taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet);
taosMemoryFree(pParam);
return terrno;
code = TSDB_CODE_TMQ_CONSUMER_CLOSED;
goto END;
}
if (code != TSDB_CODE_SUCCESS) {
tscError("consumer:0x%" PRIx64 ", get topic endpoint error, code:%s", tmq->consumerId, tstrerror(code));
pParam->pUserFn(tmq, code, NULL, pParam->pParam);
taosReleaseRef(tmqMgmt.rsetId, pParam->refId);
taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet);
taosMemoryFree(pParam);
return code;
goto END;
}
// tmq's epoch is monotonically increase,
// so it's safe to discard any old epoch msg.
// Epoch will only increase when received newer epoch ep msg
SMqRspHead* head = pMsg->pData;
int32_t epoch = atomic_load_32(&tmq->epoch);
if (head->epoch <= epoch) {
@ -1610,10 +1580,10 @@ int32_t askEpCallbackFn(void* param, SDataBuf* pMsg, int32_t code) {
tscInfo("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d, update local ep", tmq->consumerId,
head->epoch, epoch);
}
pParam->pUserFn(tmq, code, pMsg, pParam->pParam);
taosReleaseRef(tmqMgmt.rsetId, pParam->refId);
END:
pParam->pUserFn(tmq, code, pMsg, pParam->pParam);
taosMemoryFree(pMsg->pEpSet);
taosMemoryFree(pMsg->pData);
taosMemoryFree(pParam);
@ -1925,9 +1895,9 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
} else {
tscInfo("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
tmq->consumerId, pollRspWrapper->vgId, pDataRsp->head.epoch, consumerEpoch);
setVgIdle(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId);
pRspWrapper = tmqFreeRspWrapper(pRspWrapper);
taosFreeQitem(pollRspWrapper);
setVgIdle(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId);
}
} else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) {
// todo handle the wal range and epset for each vgroup
@ -1957,9 +1927,9 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
} else {
tscInfo("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
tmq->consumerId, pollRspWrapper->vgId, pollRspWrapper->metaRsp.head.epoch, consumerEpoch);
setVgIdle(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId);
pRspWrapper = tmqFreeRspWrapper(pRspWrapper);
taosFreeQitem(pollRspWrapper);
setVgIdle(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId);
}
} else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_META_RSP) {
SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)pRspWrapper;
@ -2015,9 +1985,9 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
} else {
tscInfo("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
tmq->consumerId, pollRspWrapper->vgId, pollRspWrapper->taosxRsp.head.epoch, consumerEpoch);
setVgIdle(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId);
pRspWrapper = tmqFreeRspWrapper(pRspWrapper);
taosFreeQitem(pollRspWrapper);
setVgIdle(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId);
}
} else {
tscDebug("consumer:0x%" PRIx64 " not data msg received", tmq->consumerId);
@ -2124,7 +2094,7 @@ int32_t tmq_consumer_close(tmq_t* tmq) {
// if auto commit is set, commit before close consumer. Otherwise, do nothing.
if (tmq->autoCommit) {
int32_t rsp = tmq_commit_sync(tmq, NULL);
if (rsp != 0 && rsp != TSDB_CODE_TMQ_SAME_COMMITTED_VALUE) {
if (rsp != 0) {
return rsp;
}
}
@ -2440,23 +2410,29 @@ end:
}
}
void updateEpCallbackFn(tmq_t* pTmq, int32_t code, SDataBuf* pDataBuf, void* param) {
void defaultAskEpCb(tmq_t* pTmq, int32_t code, SDataBuf* pDataBuf, void* param) {
SAskEpInfo* pInfo = param;
pInfo->code = code;
if (code == TSDB_CODE_SUCCESS) {
SMqRspHead* head = pDataBuf->pData;
SMqAskEpRsp rsp;
tDecodeSMqAskEpRsp(POINTER_SHIFT(pDataBuf->pData, sizeof(SMqRspHead)), &rsp);
doUpdateLocalEp(pTmq, head->epoch, &rsp);
tDeleteSMqAskEpRsp(&rsp);
if (pTmq == NULL || code != TSDB_CODE_SUCCESS){
goto END;
}
SMqRspHead* head = pDataBuf->pData;
SMqAskEpRsp rsp;
tDecodeSMqAskEpRsp(POINTER_SHIFT(pDataBuf->pData, sizeof(SMqRspHead)), &rsp);
doUpdateLocalEp(pTmq, head->epoch, &rsp);
tDeleteSMqAskEpRsp(&rsp);
END:
tsem_post(&pInfo->sem);
}
void addToQueueCallbackFn(tmq_t* pTmq, int32_t code, SDataBuf* pDataBuf, void* param) {
if (pTmq == NULL){
terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED;
return;
}
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
return;
@ -2482,7 +2458,7 @@ int32_t doAskEp(tmq_t* pTmq) {
SAskEpInfo* pInfo = taosMemoryMalloc(sizeof(SAskEpInfo));
tsem_init(&pInfo->sem, 0, 0);
asyncAskEp(pTmq, updateEpCallbackFn, pInfo);
asyncAskEp(pTmq, defaultAskEpCb, pInfo);
tsem_wait(&pInfo->sem);
int32_t code = pInfo->code;
@ -2496,49 +2472,45 @@ void asyncAskEp(tmq_t* pTmq, __tmq_askep_fn_t askEpFn, void* param) {
req.consumerId = pTmq->consumerId;
req.epoch = pTmq->epoch;
strcpy(req.cgroup, pTmq->groupId);
int code = 0;
SMqAskEpCbParam* pParam = NULL;
void* pReq = NULL;
int32_t tlen = tSerializeSMqAskEpReq(NULL, 0, &req);
if (tlen < 0) {
tscError("consumer:0x%" PRIx64 ", tSerializeSMqAskEpReq failed", pTmq->consumerId);
askEpFn(pTmq, TSDB_CODE_INVALID_PARA, NULL, param);
return;
code = TSDB_CODE_INVALID_PARA;
goto FAIL;
}
void* pReq = taosMemoryCalloc(1, tlen);
pReq = taosMemoryCalloc(1, tlen);
if (pReq == NULL) {
tscError("consumer:0x%" PRIx64 ", failed to malloc askEpReq msg, size:%d", pTmq->consumerId, tlen);
askEpFn(pTmq, TSDB_CODE_OUT_OF_MEMORY, NULL, param);
return;
code = TSDB_CODE_OUT_OF_MEMORY;
goto FAIL;
}
if (tSerializeSMqAskEpReq(pReq, tlen, &req) < 0) {
tscError("consumer:0x%" PRIx64 ", tSerializeSMqAskEpReq %d failed", pTmq->consumerId, tlen);
taosMemoryFree(pReq);
askEpFn(pTmq, TSDB_CODE_INVALID_PARA, NULL, param);
return;
code = TSDB_CODE_INVALID_PARA;
goto FAIL;
}
SMqAskEpCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqAskEpCbParam));
pParam = taosMemoryCalloc(1, sizeof(SMqAskEpCbParam));
if (pParam == NULL) {
tscError("consumer:0x%" PRIx64 ", failed to malloc subscribe param", pTmq->consumerId);
taosMemoryFree(pReq);
askEpFn(pTmq, TSDB_CODE_OUT_OF_MEMORY, NULL, param);
return;
code = TSDB_CODE_OUT_OF_MEMORY;
goto FAIL;
}
pParam->refId = pTmq->refId;
pParam->epoch = pTmq->epoch;
pParam->pUserFn = askEpFn;
pParam->pParam = param;
SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
if (sendInfo == NULL) {
taosMemoryFree(pParam);
taosMemoryFree(pReq);
askEpFn(pTmq, TSDB_CODE_OUT_OF_MEMORY, NULL, param);
return;
code = TSDB_CODE_OUT_OF_MEMORY;
goto FAIL;
}
sendInfo->msgInfo = (SDataBuf){.pData = pReq, .len = tlen, .handle = NULL};
@ -2553,7 +2525,15 @@ void asyncAskEp(tmq_t* pTmq, __tmq_askep_fn_t askEpFn, void* param) {
tscInfo("consumer:0x%" PRIx64 " ask ep from mnode, reqId:0x%" PRIx64, pTmq->consumerId, sendInfo->requestId);
int64_t transporterId = 0;
asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
code = asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
if(code == 0){
return;
}
FAIL:
taosMemoryFreeClear(pParam);
taosMemoryFreeClear(pReq);
askEpFn(pTmq, code, NULL, param);
}
int32_t makeTopicVgroupKey(char* dst, const char* topicName, int32_t vg) {

View File

@ -94,7 +94,7 @@ class TDTestCase:
resultList=[]
while 1:
tdSql.query("select * from %s.consumeresult"%cdbName)
#tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3))
tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3)))
if tdSql.getRows() == expectRows:
break
else:
@ -336,7 +336,7 @@ class TDTestCase:
for i in range(expectRows):
totalConsumeRows += resultList[i]
if totalConsumeRows > expectrowcnt or totalConsumeRows <= 0:
if totalConsumeRows > expectrowcnt or totalConsumeRows < 0:
tdLog.info("act consume rows: %d, expect consume rows between %d and 0"%(totalConsumeRows, expectrowcnt))
tdLog.exit("tmq consume rows error!")

View File

@ -218,7 +218,7 @@ class TDTestCase:
actConsumeTotalRows = resultList[0]
if not (actConsumeTotalRows > 0 and actConsumeTotalRows < totalRowsInserted):
if not (actConsumeTotalRows >= 0 and actConsumeTotalRows <= totalRowsInserted):
tdLog.info("act consume rows: %d"%(actConsumeTotalRows))
tdLog.info("and second consume rows should be between 0 and %d"%(totalRowsInserted))
tdLog.exit("%d tmq consume rows error!"%consumerId)

View File

@ -216,7 +216,7 @@ class TDTestCase:
actConsumeTotalRows = resultList[0]
tdLog.info("act consume rows: %d, expect rows range (0, %d)"%(actConsumeTotalRows, totalRowsInserted))
if not (actConsumeTotalRows > 0 and actConsumeTotalRows < totalRowsInserted):
if not (actConsumeTotalRows >= 0 and actConsumeTotalRows <= totalRowsInserted):
tdLog.info("act consume rows: %d"%(actConsumeTotalRows))
tdLog.info("and second consume rows should be between 0 and %d"%(totalRowsInserted))
tdLog.exit("%d tmq consume rows error!"%consumerId)

View File

@ -218,7 +218,7 @@ class TDTestCase:
tdLog.info("act consume rows: %d"%(actConsumeTotalRows))
tdLog.info("and second consume rows should be between 0 and %d"%(totalRowsInserted))
if not (actConsumeTotalRows > 0 and actConsumeTotalRows <= totalRowsInserted):
if not (actConsumeTotalRows >= 0 and actConsumeTotalRows <= totalRowsInserted):
tdLog.exit("%d tmq consume rows error!"%consumerId)
time.sleep(10)

View File

@ -216,7 +216,7 @@ class TDTestCase:
actConsumeTotalRows = resultList[0]
if not (actConsumeTotalRows > 0 and actConsumeTotalRows <= totalRowsInserted):
if not (actConsumeTotalRows >= 0 and actConsumeTotalRows <= totalRowsInserted):
tdLog.info("act consume rows: %d"%(actConsumeTotalRows))
tdLog.info("and second consume rows should be between 0 and %d"%(totalRowsInserted))
tdLog.exit("%d tmq consume rows error!"%consumerId)

View File

@ -217,7 +217,7 @@ class TDTestCase:
tdLog.info("act consume rows: %d"%(actConsumeTotalRows))
tdLog.info("and second consume rows should be between 0 and %d"%(totalRowsInserted))
if not ((actConsumeTotalRows > 0) and (actConsumeTotalRows <= totalRowsInserted)):
if not ((actConsumeTotalRows >= 0) and (actConsumeTotalRows <= totalRowsInserted)):
tdLog.exit("%d tmq consume rows error!"%consumerId)
time.sleep(10)