From d5c57ca2449fd5059bce578d2ce73b6cdbe1e277 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 19 Apr 2023 14:46:48 +0800 Subject: [PATCH] enh(tmq): support the seek. --- source/client/src/clientTmq.c | 116 ++++++++++------------------------ 1 file changed, 32 insertions(+), 84 deletions(-) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index d6ad4c3db9..6f4152ca97 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -571,7 +571,7 @@ static SMqClientTopic* getTopicByName(tmq_t* tmq, const char* pTopicName) { return pTopic; } - tscError("consumer:0x%" PRIx64 ", failed to find topic:%s", tmq->consumerId, pTopicName); + tscError("consumer:0x%" PRIx64 ", total:%d, failed to find topic:%s", tmq->consumerId, numOfTopics, pTopicName); return NULL; } @@ -1571,7 +1571,6 @@ void tmqBuildConsumeReqImpl(SMqPollReq* pReq, tmq_t* tmq, int64_t timeout, SMqCl pReq->consumerId = tmq->consumerId; pReq->timeout = timeout; pReq->epoch = tmq->epoch; - /*pReq->currentOffset = reqOffset;*/ pReq->reqOffset = pVg->offsetInfo.currentOffset; pReq->head.vgId = pVg->vgId; pReq->useSnapshot = tmq->useSnapshot; @@ -1670,7 +1669,7 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p pParam->refId = pTmq->refId; pParam->epoch = pTmq->epoch; - pParam->pVg = pVg; // pVg may be released,fix it + pParam->pVg = pVg; pParam->pTopic = pTopic; pParam->vgId = pVg->vgId; pParam->requestId = req.reqId; @@ -1682,12 +1681,7 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p return handleErrorBeforePoll(pVg, pTmq); } - sendInfo->msgInfo = (SDataBuf){ - .pData = msg, - .len = msgSize, - .handle = NULL, - }; - + sendInfo->msgInfo = (SDataBuf){.pData = msg, .len = msgSize, .handle = NULL}; sendInfo->requestId = req.reqId; sendInfo->requestObjRefId = 0; sendInfo->param = pParam; @@ -2374,18 +2368,19 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgroupHandle, int64_t offset) { if (tmq == NULL) { + tscError("invalid tmq handle, null"); return TSDB_CODE_INVALID_PARA; } SMqClientTopic* pTopic = getTopicByName(tmq, pTopicName); if (pTopic == NULL) { + tscError("consumer:0x:" PRIx64 " invalid topic name:%s", tmq->consumerId, pTopicName); return TSDB_CODE_INVALID_PARA; } SMqClientVg* pVg = NULL; - - int32_t numOfVgs = taosArrayGetSize(pTopic->vgs); - for(int32_t i= 0; i < numOfVgs; ++i) { + int32_t numOfVgs = taosArrayGetSize(pTopic->vgs); + for (int32_t i = 0; i < numOfVgs; ++i) { SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, i); if (pClientVg->vgId == vgroupHandle) { pVg = pClientVg; @@ -2394,85 +2389,38 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgroupHandle } if (pVg == NULL) { + tscError("consumer:0x:" PRIx64 " invalid vgroup id:%d", tmq->consumerId, vgroupHandle); return TSDB_CODE_INVALID_PARA; } - if (offset < pVg->offsetInfo.walVerBegin|| offset > pVg->offsetInfo.walVerEnd) { + SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo; + + int32_t type = pOffsetInfo->currentOffset.type; + if (type != TMQ_OFFSET__LOG) { + tscError("consumer:0x:" PRIx64 " offset type:%d not wal version, seek not allowed", tmq->consumerId, type); return TSDB_CODE_INVALID_PARA; } -// return 0; -//#if 0 + if (offset < pOffsetInfo->walVerBegin || offset > pOffsetInfo->walVerEnd) { + tscError("consumer:0x:" PRIx64 " invalid seek params, offset:%" PRId64, tmq->consumerId, offset); + return TSDB_CODE_INVALID_PARA; + } + + // update the offset, and then commit to vnode + if (pOffsetInfo->currentOffset.type == TMQ_OFFSET__LOG) { + pOffsetInfo->currentOffset.version = offset; + pOffsetInfo->committedOffset.version = offset; + } + SMqRspObj rspObj = {.resType = RES_TYPE__TMQ, .vgId = pVg->vgId}; tstrncpy(rspObj.topic, pTopicName, tListLen(rspObj.topic)); - tmq_commit_sync(tmq, &rspObj); - // { - // SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet)); - // if (pParamSet == NULL) { - //// pCommitFp(tmq, TSDB_CODE_OUT_OF_MEMORY, userParam); - // return -1; - // } - // - // pParamSet->refId = tmq->refId; - // pParamSet->epoch = tmq->epoch; - // pParamSet->callbackFn = pCommitFp; - // pParamSet->userParam = userParam; - // - // int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics); - // - // tscDebug("consumer:0x%" PRIx64 " do manual commit offset for %s, vgId:%d", tmq->consumerId, pTopicName, vgId); - // - // int32_t i = 0; - // for (; i < numOfTopics; i++) { - // SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i); - // if (strcmp(pTopic->topicName, pTopicName) == 0) { - // break; - // } - // } - // - // if (i == numOfTopics) { - // tscWarn("consumer:0x%" PRIx64 " failed to find the specified topic:%s, total topics:%d", tmq->consumerId, - // pTopicName, numOfTopics); - // taosMemoryFree(pParamSet); - // pCommitFp(tmq, TSDB_CODE_SUCCESS, userParam); - // return; - // } - // - // SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i); - // - // int32_t j = 0; - // int32_t numOfVgroups = taosArrayGetSize(pTopic->vgs); - // for (j = 0; j < numOfVgroups; j++) { - // SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j); - // if (pVg->vgId == vgId) { - // break; - // } - // } - // - // if (j == numOfVgroups) { - // tscWarn("consumer:0x%" PRIx64 " failed to find the specified vgId:%d, total Vgs:%d, topic:%s", - // tmq->consumerId, - // vgId, numOfVgroups, pTopicName); - // taosMemoryFree(pParamSet); - // pCommitFp(tmq, TSDB_CODE_SUCCESS, userParam); - // return; - // } - // - // SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j); - // if (pVg->offsetInfo.currentOffset.type > 0 && !tOffsetEqual(&pVg->offsetInfo.currentOffset, - // &pVg->offsetInfo.committedOffset)) { - // code = doSendCommitMsg(tmq, pVg, pTopic->topicName, pParamSet, j, numOfVgroups); - // - // // failed to commit, callback user function directly. - // if (code != TSDB_CODE_SUCCESS) { - // taosMemoryFree(pParamSet); - // pCommitFp(tmq, code, userParam); - // } - // } else { // do not perform commit, callback user function directly. - // taosMemoryFree(pParamSet); - // pCommitFp(tmq, code, userParam); - // } - // } - //#endif + tscDebug("consumer:0x%" PRIx64 " seek to %" PRId64 " on vgId:%d", tmq->consumerId, offset, pVg->vgId); + int32_t code = tmq_commit_sync(tmq, &rspObj); + if (code != TSDB_CODE_SUCCESS) { + tscError("consumer:0x%" PRIx64 " failed to send seek to vgId:%d, code:%s", tmq->consumerId, pVg->vgId, + tstrerror(code)); + } + + return code; } \ No newline at end of file