fix:optimize log & change return value for async interface

This commit is contained in:
wangmm0220 2023-07-20 18:28:21 +08:00
parent e1c4cca33d
commit 99c3ebc282
3 changed files with 32 additions and 21 deletions

View File

@ -288,7 +288,7 @@ DLL_EXPORT int32_t tmq_consumer_close(tmq_t *tmq);
DLL_EXPORT int32_t tmq_commit_sync(tmq_t *tmq, const TAOS_RES *msg);
DLL_EXPORT void tmq_commit_async(tmq_t *tmq, const TAOS_RES *msg, tmq_commit_cb *cb, void *param);
DLL_EXPORT int32_t tmq_commit_offset_sync(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset);
DLL_EXPORT int32_t tmq_commit_offset_async(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset, tmq_commit_cb *cb, void *param);
DLL_EXPORT void tmq_commit_offset_async(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset, tmq_commit_cb *cb, void *param);
DLL_EXPORT int32_t tmq_get_topic_assignment(tmq_t *tmq, const char *pTopicName, tmq_topic_assignment **assignment,
int32_t *numOfAssignment);
DLL_EXPORT void tmq_free_assignment(tmq_topic_assignment* pAssignment);

View File

@ -312,7 +312,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_CONSUME, "vnode-tmq-consume", SMqPollReq, SMqDataBlkRsp)
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_CONSUME_PUSH, "vnode-tmq-consume-push", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_VG_WALINFO, "vnode-tmq-vg-walinfo", SMqPollReq, SMqDataBlkRsp)
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_VG_COMMITTEDINFO, "vnode-tmq-committed-walinfo", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_VG_COMMITTEDINFO, "vnode-tmq-committedinfo", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_MAX_MSG, "vnd-tmq-max", NULL, NULL)

View File

@ -523,9 +523,7 @@ static int32_t doSendCommitMsg(tmq_t* tmq, int32_t vgId, SEpSet* epSet, STqOffse
int64_t transporterId = 0;
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, epSet, &transporterId, pMsgSendInfo);
return TSDB_CODE_SUCCESS;
return asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, epSet, &transporterId, pMsgSendInfo);
}
static SMqClientTopic* getTopicByName(tmq_t* tmq, const char* pTopicName) {
@ -546,7 +544,6 @@ static SMqClientTopic* getTopicByName(tmq_t* tmq, const char* pTopicName) {
static SMqCommitCbParamSet* prepareCommitCbParamSet(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* userParam, int32_t rspNum){
SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet));
if (pParamSet == NULL) {
pCommitFp(tmq, TSDB_CODE_OUT_OF_MEMORY, userParam);
return NULL;
}
@ -715,7 +712,9 @@ static void asyncCommitAllOffsets(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* us
end:
taosMemoryFree(pParamSet);
pCommitFp(tmq, code, userParam);
if(pParamSet->callbackFn != NULL) {
pCommitFp(tmq, code, userParam);
}
return;
}
@ -2307,6 +2306,9 @@ const char* tmq_get_table_name(TAOS_RES* res) {
void tmq_commit_async(tmq_t* tmq, const TAOS_RES* pRes, tmq_commit_cb* cb, void* param) {
if (tmq == NULL) {
tscError("invalid tmq handle, null");
if(cb != NULL) {
cb(tmq, TSDB_CODE_INVALID_PARA, param);
}
return;
}
if (pRes == NULL) { // here needs to commit all offsets.
@ -2410,15 +2412,17 @@ int32_t tmq_commit_offset_sync(tmq_t *tmq, const char *pTopicName, int32_t vgId,
tsem_destroy(&pInfo->sem);
taosMemoryFree(pInfo);
tscInfo("consumer:0x%" PRIx64 " send seek to vgId:%d, offset:%" PRId64" code:%s", tmq->consumerId, vgId, offset, tstrerror(code));
tscInfo("consumer:0x%" PRIx64 " sync send seek to vgId:%d, offset:%" PRId64" code:%s", tmq->consumerId, vgId, offset, tstrerror(code));
return code;
}
int32_t tmq_commit_offset_async(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset, tmq_commit_cb *cb, void *param){
void tmq_commit_offset_async(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset, tmq_commit_cb *cb, void *param){
int32_t code = 0;
if (tmq == NULL || pTopicName == NULL) {
tscError("invalid tmq handle, null");
return TSDB_CODE_INVALID_PARA;
code = TSDB_CODE_INVALID_PARA;
goto end;
}
int32_t accId = tmq->pTscObj->acctId;
@ -2427,17 +2431,17 @@ int32_t tmq_commit_offset_async(tmq_t *tmq, const char *pTopicName, int32_t vgId
taosWLockLatch(&tmq->lock);
SMqClientVg* pVg = NULL;
int32_t code = getClientVg(tmq, tname, vgId, &pVg);
code = getClientVg(tmq, tname, vgId, &pVg);
if(code != 0){
taosWUnLockLatch(&tmq->lock);
return code;
goto end;
}
SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo;
code = checkWalRange(pOffsetInfo, offset);
if (code != 0) {
taosWUnLockLatch(&tmq->lock);
return code;
goto end;
}
taosWUnLockLatch(&tmq->lock);
@ -2445,9 +2449,12 @@ int32_t tmq_commit_offset_async(tmq_t *tmq, const char *pTopicName, int32_t vgId
code = asyncCommitOffset(tmq, tname, vgId, &offsetVal, cb, param);
tscInfo("consumer:0x%" PRIx64 " send seek to vgId:%d, offset:%" PRId64" code:%s", tmq->consumerId, vgId, offset, tstrerror(code));
tscInfo("consumer:0x%" PRIx64 " async send seek to vgId:%d, offset:%" PRId64" code:%s", tmq->consumerId, vgId, offset, tstrerror(code));
return code;
end:
if(code != 0 && cb != NULL){
cb(tmq, code, param);
}
}
void updateEpCallbackFn(tmq_t* pTmq, int32_t code, SDataBuf* pDataBuf, void* param) {
@ -2832,6 +2839,7 @@ int64_t tmq_position(tmq_t *tmq, const char *pTopicName, int32_t vgId){
tscError("consumer:0x%" PRIx64 " offset type:%d can not be reach here", tmq->consumerId, type);
}
tscInfo("consumer:0x%" PRIx64 " tmq_position vgId:%d position:%" PRId64, tmq->consumerId, vgId, position);
return position;
}
@ -2871,12 +2879,16 @@ int64_t tmq_committed(tmq_t *tmq, const char *pTopicName, int32_t vgId){
if(pOffsetInfo->committedOffset.type == TMQ_OFFSET__LOG){
committed = pOffsetInfo->committedOffset.version;
taosWUnLockLatch(&tmq->lock);
return committed;
goto end;
}
SEpSet epSet = pVg->epSet;
taosWUnLockLatch(&tmq->lock);
return getCommittedFromServer(tmq, tname, vgId, &epSet);
committed = getCommittedFromServer(tmq, tname, vgId, &epSet);
end:
tscInfo("consumer:0x%" PRIx64 " tmq_committed vgId:%d committed:%" PRId64, tmq->consumerId, vgId, committed);
return committed;
}
int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_assignment** assignment,
@ -2897,7 +2909,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
taosWLockLatch(&tmq->lock);
SMqClientTopic* pTopic = getTopicByName(tmq, tname);
if (pTopic == NULL) {
code = TSDB_CODE_INVALID_PARA;
code = TSDB_CODE_TMQ_INVALID_TOPIC;
goto end;
}
@ -3040,7 +3052,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
}
SVgOffsetInfo* pOffsetInfo = &pClientVg->offsetInfo;
tscInfo("vgId:%d offset is update to:%"PRId64, p->vgId, p->currentOffset);
tscInfo("consumer:0x%" PRIx64 " %s vgId:%d offset is update to:%"PRId64, tmq->consumerId, pTopic->topicName, p->vgId, p->currentOffset);
pOffsetInfo->walVerBegin = p->begin;
pOffsetInfo->walVerEnd = p->end;
@ -3078,6 +3090,7 @@ static int32_t tmqSeekCb(void* param, SDataBuf* pMsg, int32_t code) {
return 0;
}
// seek interface have to send msg to server to cancel push handle if needed, because consumer may be in wait status if there is no data to poll
int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_t offset) {
if (tmq == NULL || pTopicName == NULL) {
tscError("invalid tmq handle, null");
@ -3163,8 +3176,6 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_
sendInfo->msgType = TDMT_VND_TMQ_SEEK;
int64_t transporterId = 0;
tscInfo("consumer:0x%" PRIx64 " %s send seek info vgId:%d, epoch %d" PRIx64,
tmq->consumerId, tname, vgId, tmq->epoch);
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
tsem_wait(&pParam->sem);