From e812a659a84da9664c52b5722f8064baaf6203b8 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Tue, 18 Jul 2023 19:30:40 +0800 Subject: [PATCH] fix:add tmq_position() interface & optimize commit logic --- include/util/taoserror.h | 1 + source/client/src/clientTmq.c | 496 +++++++++++++++++++++------------- source/util/src/terror.c | 1 + 3 files changed, 306 insertions(+), 192 deletions(-) diff --git a/include/util/taoserror.h b/include/util/taoserror.h index ff5d37bf00..d6f44f4489 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -778,6 +778,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_TMQ_VERSION_OUT_OF_RANGE TAOS_DEF_ERROR_CODE(0, 0x4007) #define TSDB_CODE_TMQ_INVALID_VGID TAOS_DEF_ERROR_CODE(0, 0x4008) #define TSDB_CODE_TMQ_INVALID_TOPIC TAOS_DEF_ERROR_CODE(0, 0x4009) +#define TSDB_CODE_TMQ_NEED_INITIALIZED TAOS_DEF_ERROR_CODE(0, 0x4010) // stream #define TSDB_CODE_STREAM_TASK_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x4100) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 6bbcbe62be..f7c24c6776 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -139,8 +139,8 @@ enum { typedef struct SVgOffsetInfo { STqOffsetVal committedOffset; - STqOffsetVal currentOffset; - STqOffsetVal seekOffset; // the first version in block for seek operation + STqOffsetVal endOffset; // the last version in TAOS_RES + 1 + STqOffsetVal beginOffset; // the first version in TAOS_RES int64_t walVerBegin; int64_t walVerEnd; } SVgOffsetInfo; @@ -255,8 +255,7 @@ typedef struct SSyncCommitInfo { static int32_t doAskEp(tmq_t* tmq); static int32_t makeTopicVgroupKey(char* dst, const char* topicName, int32_t vg); static int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet); -static int32_t doSendCommitMsg(tmq_t* tmq, SMqClientVg* pVg, const char* pTopicName, SMqCommitCbParamSet* pParamSet, - int32_t index, int32_t totalVgroups, int32_t type); +static int32_t doSendCommitMsg(tmq_t* tmq, int32_t vgId, SEpSet* epSet, STqOffsetVal* offset, const char* pTopicName, SMqCommitCbParamSet* pParamSet); static void commitRspCountDown(SMqCommitCbParamSet* pParamSet, int64_t consumerId, const char* pTopic, int32_t vgId); static void asyncAskEp(tmq_t* pTmq, __tmq_askep_fn_t askEpFn, void* param); static void addToQueueCallbackFn(tmq_t* pTmq, int32_t code, SDataBuf* pDataBuf, void* param); @@ -429,69 +428,10 @@ char** tmq_list_to_c_array(const tmq_list_t* list) { return container->pData; } -//static SMqClientVg* foundClientVg(SArray* pTopicList, const char* pName, int32_t vgId, int32_t* index, -// int32_t* numOfVgroups) { -// int32_t numOfTopics = taosArrayGetSize(pTopicList); -// *index = -1; -// *numOfVgroups = 0; -// -// for (int32_t i = 0; i < numOfTopics; ++i) { -// SMqClientTopic* pTopic = taosArrayGet(pTopicList, i); -// if (strcmp(pTopic->topicName, pName) != 0) { -// continue; -// } -// -// *numOfVgroups = taosArrayGetSize(pTopic->vgs); -// for (int32_t j = 0; j < (*numOfVgroups); ++j) { -// SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, j); -// if (pClientVg->vgId == vgId) { -// *index = j; -// return pClientVg; -// } -// } -// } -// -// return NULL; -//} - -// Two problems do not need to be addressed here -// 1. update to of epset. the response of poll request will automatically handle this problem -// 2. commit failure. This one needs to be resolved. static int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) { SMqCommitCbParam* pParam = (SMqCommitCbParam*)param; SMqCommitCbParamSet* pParamSet = (SMqCommitCbParamSet*)pParam->params; - // if (code != TSDB_CODE_SUCCESS) { // if commit offset failed, let's try again - // taosThreadMutexLock(&pParam->pTmq->lock); - // int32_t numOfVgroups, index; - // SMqClientVg* pVg = foundClientVg(pParam->pTmq->clientTopics, pParam->topicName, pParam->vgId, &index, - // &numOfVgroups); if (pVg == NULL) { - // tscDebug("consumer:0x%" PRIx64 - // " subKey:%s vgId:%d commit failed, code:%s has been transferred to other consumer, no need retry - // ordinal:%d/%d", pParam->pTmq->consumerId, pParam->pOffset->subKey, pParam->vgId, tstrerror(code), - // index + 1, numOfVgroups); - // } else { // let's retry the commit - // int32_t code1 = doSendCommitMsg(pParam->pTmq, pVg, pParam->topicName, pParamSet, index, numOfVgroups); - // if (code1 != TSDB_CODE_SUCCESS) { // retry failed. - // tscError("consumer:0x%" PRIx64 " topic:%s vgId:%d offset:%" PRId64 - // " retry failed, ignore this commit. code:%s ordinal:%d/%d", - // pParam->pTmq->consumerId, pParam->topicName, pVg->vgId, pVg->offsetInfo.committedOffset.version, - // tstrerror(terrno), index + 1, numOfVgroups); - // } - // } - // - // taosThreadMutexUnlock(&pParam->pTmq->lock); - // - // taosMemoryFree(pParam->pOffset); - // taosMemoryFree(pBuf->pData); - // taosMemoryFree(pBuf->pEpSet); - // - // commitRspCountDown(pParamSet, pParam->pTmq->consumerId, pParam->topicName, pParam->vgId); - // return 0; - // } - // - // // todo replace the pTmq with refId - taosMemoryFree(pParam->pOffset); taosMemoryFree(pBuf->pData); taosMemoryFree(pBuf->pEpSet); @@ -500,15 +440,14 @@ static int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) { return 0; } -static int32_t doSendCommitMsg(tmq_t* tmq, SMqClientVg* pVg, const char* pTopicName, SMqCommitCbParamSet* pParamSet, - int32_t index, int32_t totalVgroups, int32_t type) { +static int32_t doSendCommitMsg(tmq_t* tmq, int32_t vgId, SEpSet* epSet, STqOffsetVal* offset, const char* pTopicName, SMqCommitCbParamSet* pParamSet) { SMqVgOffset* pOffset = taosMemoryCalloc(1, sizeof(SMqVgOffset)); if (pOffset == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } pOffset->consumerId = tmq->consumerId; - pOffset->offset.val = pVg->offsetInfo.currentOffset; + pOffset->offset.val = *offset; int32_t groupLen = strlen(tmq->groupId); memcpy(pOffset->offset.subKey, tmq->groupId, groupLen); @@ -519,6 +458,7 @@ static int32_t doSendCommitMsg(tmq_t* tmq, SMqClientVg* pVg, const char* pTopicN int32_t code = 0; tEncodeSize(tEncodeMqVgOffset, pOffset, len, code); if (code < 0) { + taosMemoryFree(pOffset); return TSDB_CODE_INVALID_PARA; } @@ -528,7 +468,7 @@ static int32_t doSendCommitMsg(tmq_t* tmq, SMqClientVg* pVg, const char* pTopicN return TSDB_CODE_OUT_OF_MEMORY; } - ((SMsgHead*)buf)->vgId = htonl(pVg->vgId); + ((SMsgHead*)buf)->vgId = htonl(vgId); void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); @@ -547,7 +487,7 @@ static int32_t doSendCommitMsg(tmq_t* tmq, SMqClientVg* pVg, const char* pTopicN pParam->params = pParamSet; pParam->pOffset = pOffset; - pParam->vgId = pVg->vgId; + pParam->vgId = vgId; pParam->pTmq = tmq; tstrncpy(pParam->topicName, pTopicName, tListLen(pParam->topicName)); @@ -568,23 +508,16 @@ static int32_t doSendCommitMsg(tmq_t* tmq, SMqClientVg* pVg, const char* pTopicN pMsgSendInfo->param = pParam; pMsgSendInfo->paramFreeFp = taosMemoryFree; pMsgSendInfo->fp = tmqCommitCb; - pMsgSendInfo->msgType = type; + pMsgSendInfo->msgType = TDMT_VND_TMQ_COMMIT_OFFSET; atomic_add_fetch_32(&pParamSet->waitingRspNum, 1); atomic_add_fetch_32(&pParamSet->totalRspNum, 1); - SEp* pEp = GET_ACTIVE_EP(&pVg->epSet); - char offsetBuf[TSDB_OFFSET_LEN] = {0}; - tFormatOffset(offsetBuf, tListLen(offsetBuf), &pOffset->offset.val); + SEp* pEp = GET_ACTIVE_EP(epSet); - char commitBuf[TSDB_OFFSET_LEN] = {0}; - tFormatOffset(commitBuf, tListLen(commitBuf), &pVg->offsetInfo.committedOffset); - tscInfo("consumer:0x%" PRIx64 " topic:%s on vgId:%d send offset:%s prev:%s, ep:%s:%d, ordinal:%d/%d, req:0x%" PRIx64, - tmq->consumerId, pOffset->offset.subKey, pVg->vgId, offsetBuf, commitBuf, pEp->fqdn, pEp->port, index + 1, - totalVgroups, pMsgSendInfo->requestId); int64_t transporterId = 0; - asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, pMsgSendInfo); + asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, epSet, &transporterId, pMsgSendInfo); return TSDB_CODE_SUCCESS; } @@ -604,57 +537,28 @@ static SMqClientTopic* getTopicByName(tmq_t* tmq, const char* pTopicName) { return NULL; } -static void asyncCommitOffset(tmq_t* tmq, const TAOS_RES* pRes, int32_t type, tmq_commit_cb* pCommitFp, void* userParam) { - char* pTopicName = NULL; - int32_t vgId = 0; - int32_t code = 0; - - if (pRes == NULL || tmq == NULL) { - pCommitFp(tmq, TSDB_CODE_INVALID_PARA, userParam); - return; - } - - if (TD_RES_TMQ(pRes)) { - SMqRspObj* pRspObj = (SMqRspObj*)pRes; - pTopicName = pRspObj->topic; - vgId = pRspObj->vgId; - } else if (TD_RES_TMQ_META(pRes)) { - SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)pRes; - pTopicName = pMetaRspObj->topic; - vgId = pMetaRspObj->vgId; - } else if (TD_RES_TMQ_METADATA(pRes)) { - SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)pRes; - pTopicName = pRspObj->topic; - vgId = pRspObj->vgId; - } else { - pCommitFp(tmq, TSDB_CODE_TMQ_INVALID_MSG, userParam); - return; - } - +static SMqCommitCbParamSet* prepareCommitCbParamSet(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* userParam, int32_t rspNum){ SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet)); if (pParamSet == NULL) { pCommitFp(tmq, TSDB_CODE_OUT_OF_MEMORY, userParam); - return; + return NULL; } pParamSet->refId = tmq->refId; pParamSet->epoch = tmq->epoch; pParamSet->callbackFn = pCommitFp; pParamSet->userParam = userParam; + pParamSet->waitingRspNum = rspNum; - taosRLockLatch(&tmq->lock); - int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics); - - tscDebug("consumer:0x%" PRIx64 " do manual commit offset for %s, vgId:%d", tmq->consumerId, pTopicName, vgId); + return pParamSet; +} +static SMqClientVg* getClientVg(tmq_t* tmq, char* pTopicName, int32_t vgId){ SMqClientTopic* pTopic = getTopicByName(tmq, pTopicName); if (pTopic == NULL) { - tscWarn("consumer:0x%" PRIx64 " failed to find the specified topic:%s, total topics:%d", tmq->consumerId, - pTopicName, numOfTopics); - taosMemoryFree(pParamSet); - pCommitFp(tmq, TSDB_CODE_SUCCESS, userParam); - taosRUnLockLatch(&tmq->lock); - return; + tscWarn("consumer:0x%" PRIx64 " failed to find the specified topic:%s", tmq->consumerId, pTopicName); + + return NULL; } int32_t j = 0; @@ -669,89 +573,150 @@ static void asyncCommitOffset(tmq_t* tmq, const TAOS_RES* pRes, int32_t type, tm if (j == numOfVgroups) { tscWarn("consumer:0x%" PRIx64 " failed to find the specified vgId:%d, total Vgs:%d, topic:%s", tmq->consumerId, vgId, numOfVgroups, pTopicName); - taosMemoryFree(pParamSet); - pCommitFp(tmq, TSDB_CODE_SUCCESS, userParam); - taosRUnLockLatch(&tmq->lock); - return; + return NULL; } SMqClientVg* pVg = (SMqClientVg*)taosArrayGet(pTopic->vgs, j); - if (pVg->offsetInfo.currentOffset.type > 0 && !tOffsetEqual(&pVg->offsetInfo.currentOffset, &pVg->offsetInfo.committedOffset)) { - code = doSendCommitMsg(tmq, pVg, pTopic->topicName, pParamSet, j, numOfVgroups, type); + return pVg; +} - // failed to commit, callback user function directly. - if (code != TSDB_CODE_SUCCESS) { - taosMemoryFree(pParamSet); - pCommitFp(tmq, code, userParam); +static int32_t asyncCommitOffset(tmq_t* tmq, char* pTopicName, int32_t vgId, STqOffsetVal* offsetVal, tmq_commit_cb* pCommitFp, void* userParam) { + int32_t code = 0; + tscInfo("consumer:0x%" PRIx64 " do manual commit offset for %s, vgId:%d", tmq->consumerId, pTopicName, vgId); + taosRLockLatch(&tmq->lock); + SMqClientVg* pVg = getClientVg(tmq, pTopicName, vgId); + if(pVg == NULL){ + code = TSDB_CODE_TMQ_INVALID_VGID; + goto end; + } + if (offsetVal->type > 0 && !tOffsetEqual(offsetVal, &pVg->offsetInfo.committedOffset)) { + char offsetBuf[TSDB_OFFSET_LEN] = {0}; + tFormatOffset(offsetBuf, tListLen(offsetBuf), offsetVal); + + char commitBuf[TSDB_OFFSET_LEN] = {0}; + tFormatOffset(commitBuf, tListLen(commitBuf), &pVg->offsetInfo.committedOffset); + + SMqCommitCbParamSet* pParamSet = prepareCommitCbParamSet(tmq, pCommitFp, userParam, 0); + if (pParamSet == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto end; } - // update the offset value. - pVg->offsetInfo.committedOffset = pVg->offsetInfo.currentOffset; - } else { // do not perform commit, callback user function directly. - taosMemoryFree(pParamSet); + code = doSendCommitMsg(tmq, pVg->vgId, &pVg->epSet, &pVg->offsetInfo.endOffset, pTopicName, pParamSet); + if (code != TSDB_CODE_SUCCESS) { + tscError("consumer:0x%" PRIx64 " topic:%s on vgId:%d end commit msg failed, send offset:%s committed:%s, code:%s", + tmq->consumerId, pTopicName, pVg->vgId, offsetBuf, commitBuf, tstrerror(terrno)); + taosMemoryFree(pParamSet); + goto end; + } + + tscInfo("consumer:0x%" PRIx64 " topic:%s on vgId:%d send commit msg success, send offset:%s committed:%s", + tmq->consumerId, pTopicName, pVg->vgId, offsetBuf, commitBuf); + pVg->offsetInfo.committedOffset = *offsetVal; + } + +end: + taosRUnLockLatch(&tmq->lock); + return code; +} + +static void asyncCommitFromResult(tmq_t* tmq, const TAOS_RES* pRes, tmq_commit_cb* pCommitFp, void* userParam){ + char* pTopicName = NULL; + int32_t vgId = 0; + STqOffsetVal offsetVal = {0}; + int32_t code = 0; + + if (pRes == NULL || tmq == NULL) { + code = TSDB_CODE_INVALID_PARA; + goto end; + } + + if (TD_RES_TMQ(pRes)) { + SMqRspObj* pRspObj = (SMqRspObj*)pRes; + pTopicName = pRspObj->topic; + vgId = pRspObj->vgId; + offsetVal = pRspObj->rsp.rspOffset; + } else if (TD_RES_TMQ_META(pRes)) { + SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)pRes; + pTopicName = pMetaRspObj->topic; + vgId = pMetaRspObj->vgId; + offsetVal = pMetaRspObj->metaRsp.rspOffset; + } else if (TD_RES_TMQ_METADATA(pRes)) { + SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)pRes; + pTopicName = pRspObj->topic; + vgId = pRspObj->vgId; + offsetVal = pRspObj->rsp.rspOffset; + } else { + code = TSDB_CODE_TMQ_INVALID_MSG; + goto end; + } + + code = asyncCommitOffset(tmq, pTopicName, vgId, &offsetVal, pCommitFp, userParam); + +end: + if(code != TSDB_CODE_SUCCESS){ pCommitFp(tmq, code, userParam); } - taosRUnLockLatch(&tmq->lock); } static void asyncCommitAllOffsets(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* userParam) { - SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet)); - if (pParamSet == NULL) { - pCommitFp(tmq, TSDB_CODE_OUT_OF_MEMORY, userParam); - return; - } - - pParamSet->refId = tmq->refId; - pParamSet->epoch = tmq->epoch; - pParamSet->callbackFn = pCommitFp; - pParamSet->userParam = userParam; - + int32_t code = 0; // init as 1 to prevent concurrency issue - pParamSet->waitingRspNum = 1; + SMqCommitCbParamSet* pParamSet = prepareCommitCbParamSet(tmq, pCommitFp, userParam, 1); + if (pParamSet == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto end; + } taosRLockLatch(&tmq->lock); int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics); - tscDebug("consumer:0x%" PRIx64 " start to commit offset for %d topics", tmq->consumerId, numOfTopics); + tscInfo("consumer:0x%" PRIx64 " start to commit offset for %d topics", tmq->consumerId, numOfTopics); for (int32_t i = 0; i < numOfTopics; i++) { SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i); int32_t numOfVgroups = taosArrayGetSize(pTopic->vgs); - tscDebug("consumer:0x%" PRIx64 " commit offset for topics:%s, numOfVgs:%d", tmq->consumerId, pTopic->topicName, - numOfVgroups); + tscInfo("consumer:0x%" PRIx64 " commit offset for topics:%s, numOfVgs:%d", tmq->consumerId, pTopic->topicName, numOfVgroups); for (int32_t j = 0; j < numOfVgroups; j++) { SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j); - if (pVg->offsetInfo.currentOffset.type > 0 && !tOffsetEqual(&pVg->offsetInfo.currentOffset, &pVg->offsetInfo.committedOffset)) { - int32_t code = doSendCommitMsg(tmq, pVg, pTopic->topicName, pParamSet, j, numOfVgroups, TDMT_VND_TMQ_COMMIT_OFFSET); + if (pVg->offsetInfo.endOffset.type > 0 && !tOffsetEqual(&pVg->offsetInfo.endOffset, &pVg->offsetInfo.committedOffset)) { + char offsetBuf[TSDB_OFFSET_LEN] = {0}; + tFormatOffset(offsetBuf, tListLen(offsetBuf), &pVg->offsetInfo.endOffset); + + char commitBuf[TSDB_OFFSET_LEN] = {0}; + tFormatOffset(commitBuf, tListLen(commitBuf), &pVg->offsetInfo.committedOffset); + + code = doSendCommitMsg(tmq, pVg->vgId, &pVg->epSet, &pVg->offsetInfo.endOffset, pTopic->topicName, pParamSet); if (code != TSDB_CODE_SUCCESS) { - tscError("consumer:0x%" PRIx64 " topic:%s vgId:%d offset:%" PRId64 " failed, code:%s ordinal:%d/%d", - tmq->consumerId, pTopic->topicName, pVg->vgId, pVg->offsetInfo.committedOffset.version, tstrerror(terrno), - j + 1, numOfVgroups); + tscError("consumer:0x%" PRIx64 " topic:%s on vgId:%d end commit msg failed, send offset:%s committed:%s, code:%s ordinal:%d/%d", + tmq->consumerId, pTopic->topicName, pVg->vgId, offsetBuf, commitBuf, tstrerror(terrno), j + 1, numOfVgroups); continue; } - // update the offset value. - pVg->offsetInfo.committedOffset = pVg->offsetInfo.currentOffset; + tscInfo("consumer:0x%" PRIx64 " topic:%s on vgId:%d send commit msg success, send offset:%s committed:%s, ordinal:%d/%d", + tmq->consumerId, pTopic->topicName, pVg->vgId, offsetBuf, commitBuf, j + 1, numOfVgroups); + pVg->offsetInfo.committedOffset = pVg->offsetInfo.endOffset; } else { - tscDebug("consumer:0x%" PRIx64 " topic:%s vgId:%d, no commit, current:%" PRId64 ", ordinal:%d/%d", - tmq->consumerId, pTopic->topicName, pVg->vgId, pVg->offsetInfo.currentOffset.version, j + 1, numOfVgroups); + tscInfo("consumer:0x%" PRIx64 " topic:%s vgId:%d, no commit, current:%" PRId64 ", ordinal:%d/%d", + tmq->consumerId, pTopic->topicName, pVg->vgId, pVg->offsetInfo.endOffset.version, j + 1, numOfVgroups); } } } taosRUnLockLatch(&tmq->lock); - tscDebug("consumer:0x%" PRIx64 " total commit:%d for %d topics", tmq->consumerId, pParamSet->waitingRspNum - 1, - numOfTopics); + tscInfo("consumer:0x%" PRIx64 " total commit:%d for %d topics", tmq->consumerId, pParamSet->waitingRspNum - 1, numOfTopics); - // no request is sent - if (pParamSet->totalRspNum == 0) { - taosMemoryFree(pParamSet); - pCommitFp(tmq, TSDB_CODE_SUCCESS, userParam); + // request is sent + if (pParamSet->totalRspNum != 0) { + // count down since waiting rsp num init as 1 + commitRspCountDown(pParamSet, tmq->consumerId, "", 0); return; } - // count down since waiting rsp num init as 1 - commitRspCountDown(pParamSet, tmq->consumerId, "", 0); +end: + taosMemoryFree(pParamSet); + pCommitFp(tmq, code, userParam); + return; } static void generateTimedTask(int64_t refId, int32_t type) { @@ -827,7 +792,7 @@ void tmqSendHbReq(void* param, void* tmrId) { OffsetRows* offRows = taosArrayReserve(data->offsetRows, 1); offRows->vgId = pVg->vgId; offRows->rows = pVg->numOfRows; - offRows->offset = pVg->offsetInfo.seekOffset; + offRows->offset = pVg->offsetInfo.beginOffset; char buf[TSDB_OFFSET_LEN] = {0}; tFormatOffset(buf, TSDB_OFFSET_LEN, &offRows->offset); tscInfo("consumer:0x%" PRIx64 ",report offset: vgId:%d, offset:%s, rows:%"PRId64, tmq->consumerId, offRows->vgId, buf, offRows->rows); @@ -1523,9 +1488,9 @@ static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopic .numOfRows = pInfo ? pInfo->numOfRows : 0, }; - clientVg.offsetInfo.currentOffset = pInfo ? pInfo->currentOffset : offsetNew; + clientVg.offsetInfo.endOffset = pInfo ? pInfo->currentOffset : offsetNew; clientVg.offsetInfo.committedOffset = pInfo ? pInfo->commitOffset : offsetNew; - clientVg.offsetInfo.seekOffset = pInfo ? pInfo->seekOffset : offsetNew; + clientVg.offsetInfo.beginOffset = pInfo ? pInfo->seekOffset : offsetNew; clientVg.offsetInfo.walVerBegin = -1; clientVg.offsetInfo.walVerEnd = -1; clientVg.seekUpdated = false; @@ -1581,11 +1546,11 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) makeTopicVgroupKey(vgKey, pTopicCur->topicName, pVgCur->vgId); char buf[TSDB_OFFSET_LEN] = {0}; - tFormatOffset(buf, TSDB_OFFSET_LEN, &pVgCur->offsetInfo.currentOffset); + tFormatOffset(buf, TSDB_OFFSET_LEN, &pVgCur->offsetInfo.endOffset); tscInfo("consumer:0x%" PRIx64 ", epoch:%d vgId:%d vgKey:%s, offset:%s", tmq->consumerId, epoch, pVgCur->vgId, vgKey, buf); - SVgroupSaveInfo info = {.currentOffset = pVgCur->offsetInfo.currentOffset, .seekOffset = pVgCur->offsetInfo.seekOffset, .commitOffset = pVgCur->offsetInfo.committedOffset, .numOfRows = pVgCur->numOfRows}; + SVgroupSaveInfo info = {.currentOffset = pVgCur->offsetInfo.endOffset, .seekOffset = pVgCur->offsetInfo.beginOffset, .commitOffset = pVgCur->offsetInfo.committedOffset, .numOfRows = pVgCur->numOfRows}; taosHashPut(pVgOffsetHashMap, vgKey, strlen(vgKey), &info, sizeof(SVgroupSaveInfo)); } } @@ -1682,7 +1647,7 @@ void tmqBuildConsumeReqImpl(SMqPollReq* pReq, tmq_t* tmq, int64_t timeout, SMqCl pReq->consumerId = tmq->consumerId; pReq->timeout = timeout; pReq->epoch = tmq->epoch; - pReq->reqOffset = pVg->offsetInfo.currentOffset; + pReq->reqOffset = pVg->offsetInfo.endOffset; pReq->head.vgId = pVg->vgId; pReq->useSnapshot = tmq->useSnapshot; pReq->reqId = generateRequestId(); @@ -1809,7 +1774,7 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p int64_t transporterId = 0; char offsetFormatBuf[TSDB_OFFSET_LEN] = {0}; - tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pVg->offsetInfo.currentOffset); + tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pVg->offsetInfo.endOffset); tscDebug("consumer:0x%" PRIx64 " send poll to %s vgId:%d, epoch %d, req:%s, reqId:0x%" PRIx64, pTmq->consumerId, pTopic->topicName, pVg->vgId, pTmq->epoch, offsetFormatBuf, req.reqId); @@ -1890,8 +1855,8 @@ static int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* p static void updateVgInfo(SMqClientVg* pVg, STqOffsetVal* reqOffset, STqOffsetVal* rspOffset, int64_t sver, int64_t ever, int64_t consumerId){ if (!pVg->seekUpdated) { tscDebug("consumer:0x%" PRIx64" local offset is update, since seekupdate not set", consumerId); - pVg->offsetInfo.seekOffset = *reqOffset; - pVg->offsetInfo.currentOffset = *rspOffset; + pVg->offsetInfo.beginOffset = *reqOffset; + pVg->offsetInfo.endOffset = *rspOffset; } else { tscDebug("consumer:0x%" PRIx64" local offset is NOT update, since seekupdate is set", consumerId); } @@ -2053,7 +2018,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { tmq->totalRows += numOfRows; char buf[TSDB_OFFSET_LEN] = {0}; - tFormatOffset(buf, TSDB_OFFSET_LEN, &pVg->offsetInfo.currentOffset); + tFormatOffset(buf, TSDB_OFFSET_LEN, &pVg->offsetInfo.endOffset); tscDebug("consumer:0x%" PRIx64 " process taosx poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%" PRId64 ", vg total:%" PRId64 ", total:%" PRId64 ", reqId:0x%" PRIx64, tmq->consumerId, pVg->vgId, buf, pollRspWrapper->dataRsp.blockNum, numOfRows, pVg->numOfRows, @@ -2315,7 +2280,7 @@ void tmq_commit_async(tmq_t* tmq, const TAOS_RES* pRes, tmq_commit_cb* cb, void* if (pRes == NULL) { // here needs to commit all offsets. asyncCommitAllOffsets(tmq, cb, param); } else { // only commit one offset - asyncCommitOffset(tmq, pRes, TDMT_VND_TMQ_COMMIT_OFFSET, cb, param); + asyncCommitFromResult(tmq, pRes, cb, param); } } @@ -2335,7 +2300,7 @@ int32_t tmq_commit_sync(tmq_t* tmq, const TAOS_RES* pRes) { if (pRes == NULL) { asyncCommitAllOffsets(tmq, commitCallBackFn, pInfo); } else { - asyncCommitOffset(tmq, pRes, TDMT_VND_TMQ_COMMIT_OFFSET, commitCallBackFn, pInfo); + asyncCommitFromResult(tmq, pRes, commitCallBackFn, pInfo); } tsem_wait(&pInfo->sem); @@ -2348,6 +2313,87 @@ int32_t tmq_commit_sync(tmq_t* tmq, const TAOS_RES* pRes) { return code; } +// wal range will be ok after calling tmq_get_topic_assignment or poll interface +static bool isWalRangeOk(SVgOffsetInfo* offset){ + if (offset->walVerBegin != -1 && offset->walVerEnd != -1) { + return true; + } + return false; +} + +int32_t tmq_commit_offset_sync(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset){ + if (tmq == NULL || pTopicName == NULL) { + tscError("invalid tmq handle, null"); + return TSDB_CODE_INVALID_PARA; + } + + int32_t accId = tmq->pTscObj->acctId; + char tname[TSDB_TOPIC_FNAME_LEN] = {0}; + sprintf(tname, "%d.%s", accId, pTopicName); + + taosWLockLatch(&tmq->lock); + SMqClientTopic* pTopic = getTopicByName(tmq, tname); + if (pTopic == NULL) { + tscError("consumer:0x%" PRIx64 " invalid topic name:%s", tmq->consumerId, pTopicName); + taosWUnLockLatch(&tmq->lock); + return TSDB_CODE_TMQ_INVALID_TOPIC; + } + + SMqClientVg* pVg = NULL; + int32_t numOfVgs = taosArrayGetSize(pTopic->vgs); + for (int32_t i = 0; i < numOfVgs; ++i) { + SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, i); + if (pClientVg->vgId == vgId) { + pVg = pClientVg; + break; + } + } + + if (pVg == NULL) { + tscError("consumer:0x%" PRIx64 " invalid vgroup id:%d", tmq->consumerId, vgId); + taosWUnLockLatch(&tmq->lock); + return TSDB_CODE_TMQ_INVALID_VGID; + } + + SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo; + if (!isWalRangeOk(pOffsetInfo)) { + tscError("consumer:0x%" PRIx64 " Assignment or poll interface need to be called first", tmq->consumerId); + taosWUnLockLatch(&tmq->lock); + return TSDB_CODE_TMQ_NEED_INITIALIZED; + } + + if (offset < pOffsetInfo->walVerBegin || offset > pOffsetInfo->walVerEnd) { + tscError("consumer:0x%" PRIx64 " invalid seek params, offset:%" PRId64 ", valid range:[%" PRId64 ", %" PRId64 "]", + tmq->consumerId, offset, pOffsetInfo->walVerBegin, pOffsetInfo->walVerEnd); + taosWUnLockLatch(&tmq->lock); + return TSDB_CODE_TMQ_VERSION_OUT_OF_RANGE; + } + taosWUnLockLatch(&tmq->lock); + + STqOffsetVal offsetVal = {.type = TMQ_OFFSET__LOG, .version = offset}; + + SSyncCommitInfo* pInfo = taosMemoryMalloc(sizeof(SSyncCommitInfo)); + if (pInfo == NULL) { + tscError("consumer:0x%"PRIx64" failed to prepare seek operation", tmq->consumerId); + return TSDB_CODE_OUT_OF_MEMORY; + } + + tsem_init(&pInfo->sem, 0, 0); + pInfo->code = 0; + + asyncCommitOffset(tmq, tname, vgId, &offsetVal, commitCallBackFn, pInfo); + + tsem_wait(&pInfo->sem); + int32_t code = pInfo->code; + + tsem_destroy(&pInfo->sem); + taosMemoryFree(pInfo); + + tscInfo("consumer:0x%" PRIx64 " send seek to vgId:%d, offset:%" PRId64" code:%s", tmq->consumerId, vgId, offset, tstrerror(code)); + + return code; +} + void updateEpCallbackFn(tmq_t* pTmq, int32_t code, SDataBuf* pDataBuf, void* param) { SAskEpInfo* pInfo = param; pInfo->code = code; @@ -2490,12 +2536,10 @@ int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet) { void commitRspCountDown(SMqCommitCbParamSet* pParamSet, int64_t consumerId, const char* pTopic, int32_t vgId) { int32_t waitingRspNum = atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1); if (waitingRspNum == 0) { - tscDebug("consumer:0x%" PRIx64 " topic:%s vgId:%d all commit-rsp received, commit completed", consumerId, pTopic, - vgId); + tscInfo("consumer:0x%" PRIx64 " topic:%s vgId:%d all commit-rsp received, commit completed", consumerId, pTopic, vgId); tmqCommitDone(pParamSet); } else { - tscDebug("consumer:0x%" PRIx64 " topic:%s vgId:%d commit-rsp received, remain:%d", consumerId, pTopic, vgId, - waitingRspNum); + tscInfo("consumer:0x%" PRIx64 " topic:%s vgId:%d commit-rsp received, remain:%d", consumerId, pTopic, vgId, waitingRspNum); } } @@ -2578,6 +2622,69 @@ static bool isInSnapshotMode(int8_t type, bool useSnapshot){ return false; } +int64_t tmq_position(tmq_t *tmq, const char *pTopicName, int32_t vgId){ + if (tmq == NULL) { + tscError("invalid tmq handle, null"); + return TSDB_CODE_INVALID_PARA; + } + + int32_t accId = tmq->pTscObj->acctId; + char tname[TSDB_TOPIC_FNAME_LEN] = {0}; + sprintf(tname, "%d.%s", accId, pTopicName); + + taosWLockLatch(&tmq->lock); + SMqClientTopic* pTopic = getTopicByName(tmq, tname); + if (pTopic == NULL) { + tscError("consumer:0x%" PRIx64 " invalid topic name:%s", tmq->consumerId, pTopicName); + taosWUnLockLatch(&tmq->lock); + return TSDB_CODE_TMQ_INVALID_TOPIC; + } + + SMqClientVg* pVg = NULL; + int32_t numOfVgs = taosArrayGetSize(pTopic->vgs); + for (int32_t i = 0; i < numOfVgs; ++i) { + SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, i); + if (pClientVg->vgId == vgId) { + pVg = pClientVg; + break; + } + } + + if (pVg == NULL) { + tscError("consumer:0x%" PRIx64 " invalid vgroup id:%d", tmq->consumerId, vgId); + taosWUnLockLatch(&tmq->lock); + return TSDB_CODE_TMQ_INVALID_VGID; + } + + int32_t type = pVg->offsetInfo.endOffset.type; + if (isInSnapshotMode(type, tmq->useSnapshot)) { + tscError("consumer:0x%" PRIx64 " offset type:%d not wal version, position error", tmq->consumerId, type); + taosWUnLockLatch(&tmq->lock); + return TSDB_CODE_TMQ_SNAPSHOT_ERROR; + } + + if (!isWalRangeOk(&pVg->offsetInfo)) { + tscError("consumer:0x%" PRIx64 " Assignment or poll interface need to be called first", tmq->consumerId); + taosWUnLockLatch(&tmq->lock); + return TSDB_CODE_TMQ_NEED_INITIALIZED; + } + + int64_t position = 0; + STqOffsetVal* pOffsetInfo = &pVg->offsetInfo.endOffset; + if(type == TMQ_OFFSET__LOG){ + position = pOffsetInfo->version; + }else if(type == TMQ_OFFSET__RESET_EARLIEST){ + position = pVg->offsetInfo.walVerBegin; + }else if(type == TMQ_OFFSET__RESET_LATEST){ + position = pVg->offsetInfo.walVerEnd; + }else{ + tscError("consumer:0x%" PRIx64 " offset type:%d can not be reach here", tmq->consumerId, type); + } + taosWUnLockLatch(&tmq->lock); + + return position; +} + int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_assignment** assignment, int32_t* numOfAssignment) { *numOfAssignment = 0; @@ -2585,7 +2692,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a SMqVgCommon* pCommon = NULL; int32_t accId = tmq->pTscObj->acctId; - char tname[128] = {0}; + char tname[TSDB_TOPIC_FNAME_LEN] = {0}; sprintf(tname, "%d.%s", accId, pTopicName); int32_t code = TSDB_CODE_SUCCESS; @@ -2600,7 +2707,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a *numOfAssignment = taosArrayGetSize(pTopic->vgs); for (int32_t j = 0; j < (*numOfAssignment); ++j) { SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, j); - int32_t type = pClientVg->offsetInfo.seekOffset.type; + int32_t type = pClientVg->offsetInfo.beginOffset.type; if (isInSnapshotMode(type, tmq->useSnapshot)) { tscError("consumer:0x%" PRIx64 " offset type:%d not wal version, assignment not allowed", tmq->consumerId, type); code = TSDB_CODE_TMQ_SNAPSHOT_ERROR; @@ -2620,13 +2727,13 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a for (int32_t j = 0; j < (*numOfAssignment); ++j) { SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, j); - if (pClientVg->offsetInfo.seekOffset.type != TMQ_OFFSET__LOG) { + if (pClientVg->offsetInfo.beginOffset.type != TMQ_OFFSET__LOG) { needFetch = true; break; } tmq_topic_assignment* pAssignment = &(*assignment)[j]; - pAssignment->currentOffset = pClientVg->offsetInfo.seekOffset.version; + pAssignment->currentOffset = pClientVg->offsetInfo.beginOffset.version; pAssignment->begin = pClientVg->offsetInfo.walVerBegin; pAssignment->end = pClientVg->offsetInfo.walVerEnd; pAssignment->vgId = pClientVg->vgId; @@ -2665,7 +2772,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a SMqPollReq req = {0}; tmqBuildConsumeReqImpl(&req, tmq, 10, pTopic, pClientVg); - req.reqOffset = pClientVg->offsetInfo.seekOffset; + req.reqOffset = pClientVg->offsetInfo.beginOffset; int32_t msgSize = tSerializeSMqPollReq(NULL, 0, &req); if (msgSize < 0) { @@ -2705,7 +2812,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a int64_t transporterId = 0; char offsetFormatBuf[TSDB_OFFSET_LEN] = {0}; - tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pClientVg->offsetInfo.seekOffset); + tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pClientVg->offsetInfo.beginOffset); tscInfo("consumer:0x%" PRIx64 " %s retrieve wal info vgId:%d, epoch %d, req:%s, reqId:0x%" PRIx64, tmq->consumerId, pTopic->topicName, pClientVg->vgId, tmq->epoch, offsetFormatBuf, req.reqId); @@ -2780,7 +2887,7 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_ } int32_t accId = tmq->pTscObj->acctId; - char tname[128] = {0}; + char tname[TSDB_TOPIC_FNAME_LEN] = {0}; sprintf(tname, "%d.%s", accId, pTopicName); taosWLockLatch(&tmq->lock); @@ -2809,14 +2916,20 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_ SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo; - int32_t type = pOffsetInfo->currentOffset.type; + int32_t type = pOffsetInfo->endOffset.type; if (isInSnapshotMode(type, tmq->useSnapshot)) { tscError("consumer:0x%" PRIx64 " offset type:%d not wal version, seek not allowed", tmq->consumerId, type); taosWUnLockLatch(&tmq->lock); return TSDB_CODE_TMQ_SNAPSHOT_ERROR; } - if (type == TMQ_OFFSET__LOG && (offset < pOffsetInfo->walVerBegin || offset > pOffsetInfo->walVerEnd)) { + if (!isWalRangeOk(&pVg->offsetInfo)) { + tscError("consumer:0x%" PRIx64 " Assignment or poll interface need to be called first", tmq->consumerId); + taosWUnLockLatch(&tmq->lock); + return TSDB_CODE_TMQ_NEED_INITIALIZED; + } + + if (offset < pOffsetInfo->walVerBegin || offset > pOffsetInfo->walVerEnd) { tscError("consumer:0x%" PRIx64 " invalid seek params, offset:%" PRId64 ", valid range:[%" PRId64 ", %" PRId64 "]", tmq->consumerId, offset, pOffsetInfo->walVerBegin, pOffsetInfo->walVerEnd); taosWUnLockLatch(&tmq->lock); @@ -2824,10 +2937,9 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_ } // update the offset, and then commit to vnode - pOffsetInfo->currentOffset.type = TMQ_OFFSET__LOG; - pOffsetInfo->currentOffset.version = offset; - pOffsetInfo->seekOffset = pOffsetInfo->currentOffset; -// pOffsetInfo->committedOffset.version = INT64_MIN; + pOffsetInfo->endOffset.type = TMQ_OFFSET__LOG; + pOffsetInfo->endOffset.version = offset; + pOffsetInfo->beginOffset = pOffsetInfo->endOffset; pVg->seekUpdated = true; tscInfo("consumer:0x%" PRIx64 " seek to %" PRId64 " on vgId:%d", tmq->consumerId, offset, vgId); diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 8231fad3a7..c36480a63e 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -631,6 +631,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SCALAR_CONVERT_ERROR, "Cannot convert to s //tmq TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_INVALID_MSG, "Invalid message") +TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_NEED_INITIALIZED, "Assignment or poll interface need to be called first") TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_SNAPSHOT_ERROR, "Can not operate in snapshot mode") TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_VERSION_OUT_OF_RANGE, "Offset out of range") TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_INVALID_VGID, "VgId does not belong to this consumer")