enh(tmq): support the seek.

This commit is contained in:
Haojun Liao 2023-04-19 14:46:48 +08:00
parent 3f12156c53
commit d5c57ca244
1 changed files with 32 additions and 84 deletions

View File

@ -571,7 +571,7 @@ static SMqClientTopic* getTopicByName(tmq_t* tmq, const char* pTopicName) {
return pTopic;
}
tscError("consumer:0x%" PRIx64 ", failed to find topic:%s", tmq->consumerId, pTopicName);
tscError("consumer:0x%" PRIx64 ", total:%d, failed to find topic:%s", tmq->consumerId, numOfTopics, pTopicName);
return NULL;
}
@ -1571,7 +1571,6 @@ void tmqBuildConsumeReqImpl(SMqPollReq* pReq, tmq_t* tmq, int64_t timeout, SMqCl
pReq->consumerId = tmq->consumerId;
pReq->timeout = timeout;
pReq->epoch = tmq->epoch;
/*pReq->currentOffset = reqOffset;*/
pReq->reqOffset = pVg->offsetInfo.currentOffset;
pReq->head.vgId = pVg->vgId;
pReq->useSnapshot = tmq->useSnapshot;
@ -1670,7 +1669,7 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p
pParam->refId = pTmq->refId;
pParam->epoch = pTmq->epoch;
pParam->pVg = pVg; // pVg may be released,fix it
pParam->pVg = pVg;
pParam->pTopic = pTopic;
pParam->vgId = pVg->vgId;
pParam->requestId = req.reqId;
@ -1682,12 +1681,7 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p
return handleErrorBeforePoll(pVg, pTmq);
}
sendInfo->msgInfo = (SDataBuf){
.pData = msg,
.len = msgSize,
.handle = NULL,
};
sendInfo->msgInfo = (SDataBuf){.pData = msg, .len = msgSize, .handle = NULL};
sendInfo->requestId = req.reqId;
sendInfo->requestObjRefId = 0;
sendInfo->param = pParam;
@ -2374,18 +2368,19 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgroupHandle, int64_t offset) {
if (tmq == NULL) {
tscError("invalid tmq handle, null");
return TSDB_CODE_INVALID_PARA;
}
SMqClientTopic* pTopic = getTopicByName(tmq, pTopicName);
if (pTopic == NULL) {
tscError("consumer:0x:" PRIx64 " invalid topic name:%s", tmq->consumerId, pTopicName);
return TSDB_CODE_INVALID_PARA;
}
SMqClientVg* pVg = NULL;
int32_t numOfVgs = taosArrayGetSize(pTopic->vgs);
for(int32_t i= 0; i < numOfVgs; ++i) {
int32_t numOfVgs = taosArrayGetSize(pTopic->vgs);
for (int32_t i = 0; i < numOfVgs; ++i) {
SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, i);
if (pClientVg->vgId == vgroupHandle) {
pVg = pClientVg;
@ -2394,85 +2389,38 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgroupHandle
}
if (pVg == NULL) {
tscError("consumer:0x:" PRIx64 " invalid vgroup id:%d", tmq->consumerId, vgroupHandle);
return TSDB_CODE_INVALID_PARA;
}
if (offset < pVg->offsetInfo.walVerBegin|| offset > pVg->offsetInfo.walVerEnd) {
SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo;
int32_t type = pOffsetInfo->currentOffset.type;
if (type != TMQ_OFFSET__LOG) {
tscError("consumer:0x:" PRIx64 " offset type:%d not wal version, seek not allowed", tmq->consumerId, type);
return TSDB_CODE_INVALID_PARA;
}
// return 0;
//#if 0
if (offset < pOffsetInfo->walVerBegin || offset > pOffsetInfo->walVerEnd) {
tscError("consumer:0x:" PRIx64 " invalid seek params, offset:%" PRId64, tmq->consumerId, offset);
return TSDB_CODE_INVALID_PARA;
}
// update the offset, and then commit to vnode
if (pOffsetInfo->currentOffset.type == TMQ_OFFSET__LOG) {
pOffsetInfo->currentOffset.version = offset;
pOffsetInfo->committedOffset.version = offset;
}
SMqRspObj rspObj = {.resType = RES_TYPE__TMQ, .vgId = pVg->vgId};
tstrncpy(rspObj.topic, pTopicName, tListLen(rspObj.topic));
tmq_commit_sync(tmq, &rspObj);
// {
// SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet));
// if (pParamSet == NULL) {
//// pCommitFp(tmq, TSDB_CODE_OUT_OF_MEMORY, userParam);
// return -1;
// }
//
// pParamSet->refId = tmq->refId;
// pParamSet->epoch = tmq->epoch;
// pParamSet->callbackFn = pCommitFp;
// pParamSet->userParam = userParam;
//
// int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics);
//
// tscDebug("consumer:0x%" PRIx64 " do manual commit offset for %s, vgId:%d", tmq->consumerId, pTopicName, vgId);
//
// int32_t i = 0;
// for (; i < numOfTopics; i++) {
// SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
// if (strcmp(pTopic->topicName, pTopicName) == 0) {
// break;
// }
// }
//
// if (i == numOfTopics) {
// tscWarn("consumer:0x%" PRIx64 " failed to find the specified topic:%s, total topics:%d", tmq->consumerId,
// pTopicName, numOfTopics);
// taosMemoryFree(pParamSet);
// pCommitFp(tmq, TSDB_CODE_SUCCESS, userParam);
// return;
// }
//
// SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
//
// int32_t j = 0;
// int32_t numOfVgroups = taosArrayGetSize(pTopic->vgs);
// for (j = 0; j < numOfVgroups; j++) {
// SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
// if (pVg->vgId == vgId) {
// break;
// }
// }
//
// if (j == numOfVgroups) {
// tscWarn("consumer:0x%" PRIx64 " failed to find the specified vgId:%d, total Vgs:%d, topic:%s",
// tmq->consumerId,
// vgId, numOfVgroups, pTopicName);
// taosMemoryFree(pParamSet);
// pCommitFp(tmq, TSDB_CODE_SUCCESS, userParam);
// return;
// }
//
// SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
// if (pVg->offsetInfo.currentOffset.type > 0 && !tOffsetEqual(&pVg->offsetInfo.currentOffset,
// &pVg->offsetInfo.committedOffset)) {
// code = doSendCommitMsg(tmq, pVg, pTopic->topicName, pParamSet, j, numOfVgroups);
//
// // failed to commit, callback user function directly.
// if (code != TSDB_CODE_SUCCESS) {
// taosMemoryFree(pParamSet);
// pCommitFp(tmq, code, userParam);
// }
// } else { // do not perform commit, callback user function directly.
// taosMemoryFree(pParamSet);
// pCommitFp(tmq, code, userParam);
// }
// }
//#endif
tscDebug("consumer:0x%" PRIx64 " seek to %" PRId64 " on vgId:%d", tmq->consumerId, offset, pVg->vgId);
int32_t code = tmq_commit_sync(tmq, &rspObj);
if (code != TSDB_CODE_SUCCESS) {
tscError("consumer:0x%" PRIx64 " failed to send seek to vgId:%d, code:%s", tmq->consumerId, pVg->vgId,
tstrerror(code));
}
return code;
}