diff --git a/include/client/taos.h b/include/client/taos.h index 5ea1510e44..3cc2d907ab 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -288,7 +288,7 @@ DLL_EXPORT int32_t tmq_consumer_close(tmq_t *tmq); DLL_EXPORT int32_t tmq_commit_sync(tmq_t *tmq, const TAOS_RES *msg); DLL_EXPORT void tmq_commit_async(tmq_t *tmq, const TAOS_RES *msg, tmq_commit_cb *cb, void *param); DLL_EXPORT int32_t tmq_commit_offset_sync(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset); -DLL_EXPORT int32_t tmq_commit_offset_async(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset, tmq_commit_cb *cb, void *param); +DLL_EXPORT void tmq_commit_offset_async(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset, tmq_commit_cb *cb, void *param); DLL_EXPORT int32_t tmq_get_topic_assignment(tmq_t *tmq, const char *pTopicName, tmq_topic_assignment **assignment, int32_t *numOfAssignment); DLL_EXPORT void tmq_free_assignment(tmq_topic_assignment* pAssignment); diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index aa23442291..7d12c2a1d6 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -312,7 +312,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_VND_TMQ_CONSUME, "vnode-tmq-consume", SMqPollReq, SMqDataBlkRsp) TD_DEF_MSG_TYPE(TDMT_VND_TMQ_CONSUME_PUSH, "vnode-tmq-consume-push", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_TMQ_VG_WALINFO, "vnode-tmq-vg-walinfo", SMqPollReq, SMqDataBlkRsp) - TD_DEF_MSG_TYPE(TDMT_VND_TMQ_VG_COMMITTEDINFO, "vnode-tmq-committed-walinfo", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_TMQ_VG_COMMITTEDINFO, "vnode-tmq-committedinfo", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_TMQ_MAX_MSG, "vnd-tmq-max", NULL, NULL) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 3576df434b..96ec88e2e9 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -523,9 +523,7 @@ static int32_t doSendCommitMsg(tmq_t* tmq, int32_t vgId, SEpSet* epSet, STqOffse int64_t transporterId = 0; - asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, epSet, &transporterId, pMsgSendInfo); - - return TSDB_CODE_SUCCESS; + return asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, epSet, &transporterId, pMsgSendInfo); } static SMqClientTopic* getTopicByName(tmq_t* tmq, const char* pTopicName) { @@ -546,7 +544,6 @@ static SMqClientTopic* getTopicByName(tmq_t* tmq, const char* pTopicName) { static SMqCommitCbParamSet* prepareCommitCbParamSet(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* userParam, int32_t rspNum){ SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet)); if (pParamSet == NULL) { - pCommitFp(tmq, TSDB_CODE_OUT_OF_MEMORY, userParam); return NULL; } @@ -715,7 +712,9 @@ static void asyncCommitAllOffsets(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* us end: taosMemoryFree(pParamSet); - pCommitFp(tmq, code, userParam); + if(pParamSet->callbackFn != NULL) { + pCommitFp(tmq, code, userParam); + } return; } @@ -2307,6 +2306,9 @@ const char* tmq_get_table_name(TAOS_RES* res) { void tmq_commit_async(tmq_t* tmq, const TAOS_RES* pRes, tmq_commit_cb* cb, void* param) { if (tmq == NULL) { tscError("invalid tmq handle, null"); + if(cb != NULL) { + cb(tmq, TSDB_CODE_INVALID_PARA, param); + } return; } if (pRes == NULL) { // here needs to commit all offsets. @@ -2410,15 +2412,17 @@ int32_t tmq_commit_offset_sync(tmq_t *tmq, const char *pTopicName, int32_t vgId, tsem_destroy(&pInfo->sem); taosMemoryFree(pInfo); - tscInfo("consumer:0x%" PRIx64 " send seek to vgId:%d, offset:%" PRId64" code:%s", tmq->consumerId, vgId, offset, tstrerror(code)); + tscInfo("consumer:0x%" PRIx64 " sync send seek to vgId:%d, offset:%" PRId64" code:%s", tmq->consumerId, vgId, offset, tstrerror(code)); return code; } -int32_t tmq_commit_offset_async(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset, tmq_commit_cb *cb, void *param){ +void tmq_commit_offset_async(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset, tmq_commit_cb *cb, void *param){ + int32_t code = 0; if (tmq == NULL || pTopicName == NULL) { tscError("invalid tmq handle, null"); - return TSDB_CODE_INVALID_PARA; + code = TSDB_CODE_INVALID_PARA; + goto end; } int32_t accId = tmq->pTscObj->acctId; @@ -2427,17 +2431,17 @@ int32_t tmq_commit_offset_async(tmq_t *tmq, const char *pTopicName, int32_t vgId taosWLockLatch(&tmq->lock); SMqClientVg* pVg = NULL; - int32_t code = getClientVg(tmq, tname, vgId, &pVg); + code = getClientVg(tmq, tname, vgId, &pVg); if(code != 0){ taosWUnLockLatch(&tmq->lock); - return code; + goto end; } SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo; code = checkWalRange(pOffsetInfo, offset); if (code != 0) { taosWUnLockLatch(&tmq->lock); - return code; + goto end; } taosWUnLockLatch(&tmq->lock); @@ -2445,9 +2449,12 @@ int32_t tmq_commit_offset_async(tmq_t *tmq, const char *pTopicName, int32_t vgId code = asyncCommitOffset(tmq, tname, vgId, &offsetVal, cb, param); - tscInfo("consumer:0x%" PRIx64 " send seek to vgId:%d, offset:%" PRId64" code:%s", tmq->consumerId, vgId, offset, tstrerror(code)); + tscInfo("consumer:0x%" PRIx64 " async send seek to vgId:%d, offset:%" PRId64" code:%s", tmq->consumerId, vgId, offset, tstrerror(code)); - return code; +end: + if(code != 0 && cb != NULL){ + cb(tmq, code, param); + } } void updateEpCallbackFn(tmq_t* pTmq, int32_t code, SDataBuf* pDataBuf, void* param) { @@ -2832,6 +2839,7 @@ int64_t tmq_position(tmq_t *tmq, const char *pTopicName, int32_t vgId){ tscError("consumer:0x%" PRIx64 " offset type:%d can not be reach here", tmq->consumerId, type); } + tscInfo("consumer:0x%" PRIx64 " tmq_position vgId:%d position:%" PRId64, tmq->consumerId, vgId, position); return position; } @@ -2871,12 +2879,16 @@ int64_t tmq_committed(tmq_t *tmq, const char *pTopicName, int32_t vgId){ if(pOffsetInfo->committedOffset.type == TMQ_OFFSET__LOG){ committed = pOffsetInfo->committedOffset.version; taosWUnLockLatch(&tmq->lock); - return committed; + goto end; } SEpSet epSet = pVg->epSet; taosWUnLockLatch(&tmq->lock); - return getCommittedFromServer(tmq, tname, vgId, &epSet); + committed = getCommittedFromServer(tmq, tname, vgId, &epSet); + +end: + tscInfo("consumer:0x%" PRIx64 " tmq_committed vgId:%d committed:%" PRId64, tmq->consumerId, vgId, committed); + return committed; } int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_assignment** assignment, @@ -2897,7 +2909,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a taosWLockLatch(&tmq->lock); SMqClientTopic* pTopic = getTopicByName(tmq, tname); if (pTopic == NULL) { - code = TSDB_CODE_INVALID_PARA; + code = TSDB_CODE_TMQ_INVALID_TOPIC; goto end; } @@ -3040,7 +3052,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a } SVgOffsetInfo* pOffsetInfo = &pClientVg->offsetInfo; - tscInfo("vgId:%d offset is update to:%"PRId64, p->vgId, p->currentOffset); + tscInfo("consumer:0x%" PRIx64 " %s vgId:%d offset is update to:%"PRId64, tmq->consumerId, pTopic->topicName, p->vgId, p->currentOffset); pOffsetInfo->walVerBegin = p->begin; pOffsetInfo->walVerEnd = p->end; @@ -3078,6 +3090,7 @@ static int32_t tmqSeekCb(void* param, SDataBuf* pMsg, int32_t code) { return 0; } +// seek interface have to send msg to server to cancel push handle if needed, because consumer may be in wait status if there is no data to poll int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_t offset) { if (tmq == NULL || pTopicName == NULL) { tscError("invalid tmq handle, null"); @@ -3163,8 +3176,6 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_ sendInfo->msgType = TDMT_VND_TMQ_SEEK; int64_t transporterId = 0; - tscInfo("consumer:0x%" PRIx64 " %s send seek info vgId:%d, epoch %d" PRIx64, - tmq->consumerId, tname, vgId, tmq->epoch); asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); tsem_wait(&pParam->sem);