From bbdcbcb75bf9283225039ae3b7d1a7f05ec8fe6a Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Fri, 14 Jul 2023 16:43:28 +0800 Subject: [PATCH 1/6] fix:modify commit version to next validate version --- include/client/taos.h | 14 +++++++++----- include/libs/executor/storageapi.h | 2 +- source/client/src/clientTmq.c | 12 ++++++------ source/dnode/vnode/src/tq/tqRead.c | 2 +- source/dnode/vnode/src/tq/tqUtil.c | 14 +++++++------- source/dnode/vnode/src/vnd/vnodeInitApi.c | 2 +- source/libs/executor/src/executor.c | 8 ++++---- source/libs/executor/src/scanoperator.c | 13 +++++++------ source/libs/wal/src/walRead.c | 4 ++-- 9 files changed, 38 insertions(+), 33 deletions(-) diff --git a/include/client/taos.h b/include/client/taos.h index 7bdf16ed38..3cc2d907ab 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -287,11 +287,20 @@ DLL_EXPORT TAOS_RES *tmq_consumer_poll(tmq_t *tmq, int64_t timeout); DLL_EXPORT int32_t tmq_consumer_close(tmq_t *tmq); DLL_EXPORT int32_t tmq_commit_sync(tmq_t *tmq, const TAOS_RES *msg); DLL_EXPORT void tmq_commit_async(tmq_t *tmq, const TAOS_RES *msg, tmq_commit_cb *cb, void *param); +DLL_EXPORT int32_t tmq_commit_offset_sync(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset); +DLL_EXPORT void tmq_commit_offset_async(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset, tmq_commit_cb *cb, void *param); DLL_EXPORT int32_t tmq_get_topic_assignment(tmq_t *tmq, const char *pTopicName, tmq_topic_assignment **assignment, int32_t *numOfAssignment); DLL_EXPORT void tmq_free_assignment(tmq_topic_assignment* pAssignment); DLL_EXPORT int32_t tmq_offset_seek(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset); +DLL_EXPORT const char *tmq_get_topic_name(TAOS_RES *res); +DLL_EXPORT const char *tmq_get_db_name(TAOS_RES *res); +DLL_EXPORT int32_t tmq_get_vgroup_id(TAOS_RES *res); +DLL_EXPORT int64_t tmq_get_vgroup_offset(TAOS_RES* res); +DLL_EXPORT int64_t tmq_position(tmq_t *tmq, const char *pTopicName, int32_t vgId); +DLL_EXPORT int64_t tmq_committed(tmq_t *tmq, const char *pTopicName, int32_t vgId); + /* ----------------------TMQ CONFIGURATION INTERFACE---------------------- */ enum tmq_conf_res_t { @@ -309,11 +318,6 @@ DLL_EXPORT void tmq_conf_set_auto_commit_cb(tmq_conf_t *conf, tmq_comm /* -------------------------TMQ MSG HANDLE INTERFACE---------------------- */ -DLL_EXPORT const char *tmq_get_topic_name(TAOS_RES *res); -DLL_EXPORT const char *tmq_get_db_name(TAOS_RES *res); -DLL_EXPORT int32_t tmq_get_vgroup_id(TAOS_RES *res); -DLL_EXPORT int64_t tmq_get_vgroup_offset(TAOS_RES* res); - /* ------------------------------ TAOSX -----------------------------------*/ // note: following apis are unstable enum tmq_res_t { diff --git a/include/libs/executor/storageapi.h b/include/libs/executor/storageapi.h index 6031b99cfc..41516adc73 100644 --- a/include/libs/executor/storageapi.h +++ b/include/libs/executor/storageapi.h @@ -228,7 +228,7 @@ typedef struct SStoreTqReader { } SStoreTqReader; typedef struct SStoreSnapshotFn { - int32_t (*createSnapshot)(SSnapContext* ctx, int64_t uid); + int32_t (*setForSnapShot)(SSnapContext* ctx, int64_t uid); int32_t (*destroySnapshot)(SSnapContext* ctx); SMetaTableInfo (*getMetaTableInfoFromSnapshot)(SSnapContext* ctx); int32_t (*getTableInfoFromSnapshot)(SSnapContext* ctx, void** pBuf, int32_t* contLen, int16_t* type, int64_t* uid); diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 5879de2e30..6bbcbe62be 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1901,7 +1901,7 @@ static void updateVgInfo(SMqClientVg* pVg, STqOffsetVal* reqOffset, STqOffsetVal // update the valid wal version range pVg->offsetInfo.walVerBegin = sver; - pVg->offsetInfo.walVerEnd = ever; + pVg->offsetInfo.walVerEnd = ever + 1; // pVg->receivedInfoFromVnode = true; } @@ -2541,7 +2541,7 @@ static int32_t tmqGetWalInfoCb(void* param, SDataBuf* pMsg, int32_t code) { SMqRspHead* pHead = pMsg->pData; tmq_topic_assignment assignment = {.begin = pHead->walsver, - .end = pHead->walever, + .end = pHead->walever + 1, .currentOffset = rsp.rspOffset.version, .vgId = pParam->vgId}; @@ -2600,7 +2600,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a *numOfAssignment = taosArrayGetSize(pTopic->vgs); for (int32_t j = 0; j < (*numOfAssignment); ++j) { SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, j); - int32_t type = pClientVg->offsetInfo.currentOffset.type; + int32_t type = pClientVg->offsetInfo.seekOffset.type; if (isInSnapshotMode(type, tmq->useSnapshot)) { tscError("consumer:0x%" PRIx64 " offset type:%d not wal version, assignment not allowed", tmq->consumerId, type); code = TSDB_CODE_TMQ_SNAPSHOT_ERROR; @@ -2620,7 +2620,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a for (int32_t j = 0; j < (*numOfAssignment); ++j) { SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, j); - if (pClientVg->offsetInfo.currentOffset.type != TMQ_OFFSET__LOG) { + if (pClientVg->offsetInfo.seekOffset.type != TMQ_OFFSET__LOG) { needFetch = true; break; } @@ -2705,7 +2705,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a int64_t transporterId = 0; char offsetFormatBuf[TSDB_OFFSET_LEN] = {0}; - tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pClientVg->offsetInfo.currentOffset); + tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pClientVg->offsetInfo.seekOffset); tscInfo("consumer:0x%" PRIx64 " %s retrieve wal info vgId:%d, epoch %d, req:%s, reqId:0x%" PRIx64, tmq->consumerId, pTopic->topicName, pClientVg->vgId, tmq->epoch, offsetFormatBuf, req.reqId); @@ -2825,7 +2825,7 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_ // update the offset, and then commit to vnode pOffsetInfo->currentOffset.type = TMQ_OFFSET__LOG; - pOffsetInfo->currentOffset.version = offset >= 1 ? offset - 1 : 0; + pOffsetInfo->currentOffset.version = offset; pOffsetInfo->seekOffset = pOffsetInfo->currentOffset; // pOffsetInfo->committedOffset.version = INT64_MIN; pVg->seekUpdated = true; diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 5f53f1c50c..4299cd471a 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -196,7 +196,7 @@ int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHea tqDebug("tmq poll: consumer:0x%" PRIx64 ", (epoch %d) vgId:%d offset %" PRId64 ", no more log to return, reqId:0x%" PRIx64, pHandle->consumerId, pHandle->epoch, vgId, offset, reqId); - *fetchOffset = offset - 1; + *fetchOffset = offset; code = -1; goto END; } diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 8948bae852..7d632f44bc 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -119,7 +119,7 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand } } else { walRefFirstVer(pTq->pVnode->pWal, pHandle->pRef); - tqOffsetResetToLog(pOffsetVal, pHandle->pRef->refVer - 1); + tqOffsetResetToLog(pOffsetVal, pHandle->pRef->refVer); } } else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) { walRefLastVer(pTq->pVnode->pWal, pHandle->pRef); @@ -127,7 +127,7 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand SMqDataRsp dataRsp = {0}; tqInitDataRsp(&dataRsp, pRequest); - tqOffsetResetToLog(&dataRsp.rspOffset, pHandle->pRef->refVer); + tqOffsetResetToLog(&dataRsp.rspOffset, pHandle->pRef->refVer + 1); tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, (latest) offset reset to %" PRId64, consumerId, pHandle->subKey, vgId, dataRsp.rspOffset.version); int32_t code = tqSendDataRsp(pHandle, pMsg, pRequest, &dataRsp, TMQ_MSG_TYPE__POLL_RSP, vgId); @@ -138,7 +138,7 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand } else { STaosxRsp taosxRsp = {0}; tqInitTaosxRsp(&taosxRsp, pRequest); - tqOffsetResetToLog(&taosxRsp.rspOffset, pHandle->pRef->refVer); + tqOffsetResetToLog(&taosxRsp.rspOffset, pHandle->pRef->refVer + 1); int32_t code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP, vgId); tDeleteSTaosxRsp(&taosxRsp); @@ -246,7 +246,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, if (offset->type == TMQ_OFFSET__LOG) { walReaderVerifyOffset(pHandle->pWalReader, offset); - int64_t fetchVer = offset->version + 1; + int64_t fetchVer = offset->version; pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048); if (pCkHead == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -279,14 +279,14 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, // process meta if (pHead->msgType != TDMT_VND_SUBMIT) { if (totalRows > 0) { - tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer - 1); + tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer); setRequestVersion(&taosxRsp.reqOffset, offset->version); code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP, vgId); goto end; } tqDebug("fetch meta msg, ver:%" PRId64 ", type:%s", pHead->version, TMSG_INFO(pHead->msgType)); - tqOffsetResetToLog(&metaRsp.rspOffset, fetchVer); + tqOffsetResetToLog(&metaRsp.rspOffset, fetchVer + 1); metaRsp.resMsgType = pHead->msgType; metaRsp.metaRspLen = pHead->bodyLen; metaRsp.metaRsp = pHead->body; @@ -309,7 +309,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, } if (totalRows >= 4096 || taosxRsp.createTableNum > 0) { - tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer); + tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer + 1); setRequestVersion(&taosxRsp.reqOffset, offset->version); code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP, vgId); goto end; diff --git a/source/dnode/vnode/src/vnd/vnodeInitApi.c b/source/dnode/vnode/src/vnd/vnodeInitApi.c index d2db6368a2..447cab4e9e 100644 --- a/source/dnode/vnode/src/vnd/vnodeInitApi.c +++ b/source/dnode/vnode/src/vnd/vnodeInitApi.c @@ -237,7 +237,7 @@ void initCacheFn(SStoreCacheReader* pCache) { } void initSnapshotFn(SStoreSnapshotFn* pSnapshot) { - pSnapshot->createSnapshot = setForSnapShot; + pSnapshot->setForSnapShot = setForSnapShot; pSnapshot->destroySnapshot = destroySnapContext; pSnapshot->getMetaTableInfoFromSnapshot = getMetaTableInfoFromSnapshot; pSnapshot->getTableInfoFromSnapshot = getTableInfoFromSnapshot; diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 88e2165a12..dfb0c1645a 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -1112,8 +1112,8 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT SStoreTqReader* pReaderAPI = &pTaskInfo->storageAPI.tqReaderFn; SWalReader* pWalReader = pReaderAPI->tqReaderGetWalReader(pInfo->tqReader); walReaderVerifyOffset(pWalReader, pOffset); - if (pReaderAPI->tqReaderSeek(pInfo->tqReader, pOffset->version + 1, id) < 0) { - qError("tqReaderSeek failed ver:%" PRId64 ", %s", pOffset->version + 1, id); + if (pReaderAPI->tqReaderSeek(pInfo->tqReader, pOffset->version, id) < 0) { + qError("tqReaderSeek failed ver:%" PRId64 ", %s", pOffset->version, id); return -1; } } else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) { @@ -1202,7 +1202,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT SOperatorInfo* p = extractOperatorInTree(pOperator, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, id); STableListInfo* pTableListInfo = ((SStreamRawScanInfo*)(p->info))->pTableListInfo; - if (pAPI->snapshotFn.createSnapshot(sContext, pOffset->uid) != 0) { + if (pAPI->snapshotFn.setForSnapShot(sContext, pOffset->uid) != 0) { qError("setDataForSnapShot error. uid:%" PRId64 " , %s", pOffset->uid, id); terrno = TSDB_CODE_PAR_INTERNAL_ERROR; return -1; @@ -1239,7 +1239,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT } else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_META) { SStreamRawScanInfo* pInfo = pOperator->info; SSnapContext* sContext = pInfo->sContext; - if (pTaskInfo->storageAPI.snapshotFn.createSnapshot(sContext, pOffset->uid) != 0) { + if (pTaskInfo->storageAPI.snapshotFn.setForSnapShot(sContext, pOffset->uid) != 0) { qError("setForSnapShot error. uid:%" PRIu64 " ,version:%" PRId64, pOffset->uid, pOffset->version); terrno = TSDB_CODE_PAR_INTERNAL_ERROR; return -1; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index c3d5de572f..b3a9699718 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1644,12 +1644,13 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { pAPI->tsdReader.tsdReaderClose(pTSInfo->base.dataReader); pTSInfo->base.dataReader = NULL; - qDebug("queue scan tsdb over, switch to wal ver %" PRId64 "", pTaskInfo->streamInfo.snapshotVer + 1); - if (pAPI->tqReaderFn.tqReaderSeek(pInfo->tqReader, pTaskInfo->streamInfo.snapshotVer + 1, pTaskInfo->id.str) < 0) { + int64_t validVer = pTaskInfo->streamInfo.snapshotVer + 1; + qDebug("queue scan tsdb over, switch to wal ver %" PRId64 "", validVer); + if (pAPI->tqReaderFn.tqReaderSeek(pInfo->tqReader, validVer, pTaskInfo->id.str) < 0) { return NULL; } - tqOffsetResetToLog(&pTaskInfo->streamInfo.currentOffset, pTaskInfo->streamInfo.snapshotVer); + tqOffsetResetToLog(&pTaskInfo->streamInfo.currentOffset, validVer); } if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__LOG) { @@ -1660,8 +1661,8 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { SSDataBlock* pRes = pAPI->tqReaderFn.tqGetResultBlock(pInfo->tqReader); struct SWalReader* pWalReader = pAPI->tqReaderFn.tqReaderGetWalReader(pInfo->tqReader); - // curVersion move to next, so currentOffset = curVersion - 1 - tqOffsetResetToLog(&pTaskInfo->streamInfo.currentOffset, pWalReader->curVersion - 1); + // curVersion move to next + tqOffsetResetToLog(&pTaskInfo->streamInfo.currentOffset, pWalReader->curVersion); if (hasResult) { qDebug("doQueueScan get data from log %" PRId64 " rows, version:%" PRId64, pRes->info.rows, @@ -2182,7 +2183,7 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) { STqOffsetVal offset = {0}; if (mtInfo.uid == 0) { // read snapshot done, change to get data from wal qDebug("tmqsnap read snapshot done, change to get data from wal"); - tqOffsetResetToLog(&offset, pInfo->sContext->snapVersion); + tqOffsetResetToLog(&offset, pInfo->sContext->snapVersion + 1); } else { tqOffsetResetToData(&offset, mtInfo.uid, INT64_MIN); qDebug("tmqsnap change get data uid:%" PRId64 "", mtInfo.uid); diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index 786f48ce88..a839d6cbd8 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -135,8 +135,8 @@ void walReaderVerifyOffset(SWalReader *pWalReader, STqOffsetVal* pOffset){ int64_t firstVer = walGetFirstVer((pWalReader)->pWal); taosThreadMutexUnlock(&pWalReader->pWal->mutex); - if (pOffset->version + 1 < firstVer){ - pOffset->version = firstVer - 1; + if (pOffset->version < firstVer){ + pOffset->version = firstVer; } } From e812a659a84da9664c52b5722f8064baaf6203b8 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Tue, 18 Jul 2023 19:30:40 +0800 Subject: [PATCH 2/6] fix:add tmq_position() interface & optimize commit logic --- include/util/taoserror.h | 1 + source/client/src/clientTmq.c | 496 +++++++++++++++++++++------------- source/util/src/terror.c | 1 + 3 files changed, 306 insertions(+), 192 deletions(-) diff --git a/include/util/taoserror.h b/include/util/taoserror.h index ff5d37bf00..d6f44f4489 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -778,6 +778,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_TMQ_VERSION_OUT_OF_RANGE TAOS_DEF_ERROR_CODE(0, 0x4007) #define TSDB_CODE_TMQ_INVALID_VGID TAOS_DEF_ERROR_CODE(0, 0x4008) #define TSDB_CODE_TMQ_INVALID_TOPIC TAOS_DEF_ERROR_CODE(0, 0x4009) +#define TSDB_CODE_TMQ_NEED_INITIALIZED TAOS_DEF_ERROR_CODE(0, 0x4010) // stream #define TSDB_CODE_STREAM_TASK_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x4100) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 6bbcbe62be..f7c24c6776 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -139,8 +139,8 @@ enum { typedef struct SVgOffsetInfo { STqOffsetVal committedOffset; - STqOffsetVal currentOffset; - STqOffsetVal seekOffset; // the first version in block for seek operation + STqOffsetVal endOffset; // the last version in TAOS_RES + 1 + STqOffsetVal beginOffset; // the first version in TAOS_RES int64_t walVerBegin; int64_t walVerEnd; } SVgOffsetInfo; @@ -255,8 +255,7 @@ typedef struct SSyncCommitInfo { static int32_t doAskEp(tmq_t* tmq); static int32_t makeTopicVgroupKey(char* dst, const char* topicName, int32_t vg); static int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet); -static int32_t doSendCommitMsg(tmq_t* tmq, SMqClientVg* pVg, const char* pTopicName, SMqCommitCbParamSet* pParamSet, - int32_t index, int32_t totalVgroups, int32_t type); +static int32_t doSendCommitMsg(tmq_t* tmq, int32_t vgId, SEpSet* epSet, STqOffsetVal* offset, const char* pTopicName, SMqCommitCbParamSet* pParamSet); static void commitRspCountDown(SMqCommitCbParamSet* pParamSet, int64_t consumerId, const char* pTopic, int32_t vgId); static void asyncAskEp(tmq_t* pTmq, __tmq_askep_fn_t askEpFn, void* param); static void addToQueueCallbackFn(tmq_t* pTmq, int32_t code, SDataBuf* pDataBuf, void* param); @@ -429,69 +428,10 @@ char** tmq_list_to_c_array(const tmq_list_t* list) { return container->pData; } -//static SMqClientVg* foundClientVg(SArray* pTopicList, const char* pName, int32_t vgId, int32_t* index, -// int32_t* numOfVgroups) { -// int32_t numOfTopics = taosArrayGetSize(pTopicList); -// *index = -1; -// *numOfVgroups = 0; -// -// for (int32_t i = 0; i < numOfTopics; ++i) { -// SMqClientTopic* pTopic = taosArrayGet(pTopicList, i); -// if (strcmp(pTopic->topicName, pName) != 0) { -// continue; -// } -// -// *numOfVgroups = taosArrayGetSize(pTopic->vgs); -// for (int32_t j = 0; j < (*numOfVgroups); ++j) { -// SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, j); -// if (pClientVg->vgId == vgId) { -// *index = j; -// return pClientVg; -// } -// } -// } -// -// return NULL; -//} - -// Two problems do not need to be addressed here -// 1. update to of epset. the response of poll request will automatically handle this problem -// 2. commit failure. This one needs to be resolved. static int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) { SMqCommitCbParam* pParam = (SMqCommitCbParam*)param; SMqCommitCbParamSet* pParamSet = (SMqCommitCbParamSet*)pParam->params; - // if (code != TSDB_CODE_SUCCESS) { // if commit offset failed, let's try again - // taosThreadMutexLock(&pParam->pTmq->lock); - // int32_t numOfVgroups, index; - // SMqClientVg* pVg = foundClientVg(pParam->pTmq->clientTopics, pParam->topicName, pParam->vgId, &index, - // &numOfVgroups); if (pVg == NULL) { - // tscDebug("consumer:0x%" PRIx64 - // " subKey:%s vgId:%d commit failed, code:%s has been transferred to other consumer, no need retry - // ordinal:%d/%d", pParam->pTmq->consumerId, pParam->pOffset->subKey, pParam->vgId, tstrerror(code), - // index + 1, numOfVgroups); - // } else { // let's retry the commit - // int32_t code1 = doSendCommitMsg(pParam->pTmq, pVg, pParam->topicName, pParamSet, index, numOfVgroups); - // if (code1 != TSDB_CODE_SUCCESS) { // retry failed. - // tscError("consumer:0x%" PRIx64 " topic:%s vgId:%d offset:%" PRId64 - // " retry failed, ignore this commit. code:%s ordinal:%d/%d", - // pParam->pTmq->consumerId, pParam->topicName, pVg->vgId, pVg->offsetInfo.committedOffset.version, - // tstrerror(terrno), index + 1, numOfVgroups); - // } - // } - // - // taosThreadMutexUnlock(&pParam->pTmq->lock); - // - // taosMemoryFree(pParam->pOffset); - // taosMemoryFree(pBuf->pData); - // taosMemoryFree(pBuf->pEpSet); - // - // commitRspCountDown(pParamSet, pParam->pTmq->consumerId, pParam->topicName, pParam->vgId); - // return 0; - // } - // - // // todo replace the pTmq with refId - taosMemoryFree(pParam->pOffset); taosMemoryFree(pBuf->pData); taosMemoryFree(pBuf->pEpSet); @@ -500,15 +440,14 @@ static int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) { return 0; } -static int32_t doSendCommitMsg(tmq_t* tmq, SMqClientVg* pVg, const char* pTopicName, SMqCommitCbParamSet* pParamSet, - int32_t index, int32_t totalVgroups, int32_t type) { +static int32_t doSendCommitMsg(tmq_t* tmq, int32_t vgId, SEpSet* epSet, STqOffsetVal* offset, const char* pTopicName, SMqCommitCbParamSet* pParamSet) { SMqVgOffset* pOffset = taosMemoryCalloc(1, sizeof(SMqVgOffset)); if (pOffset == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } pOffset->consumerId = tmq->consumerId; - pOffset->offset.val = pVg->offsetInfo.currentOffset; + pOffset->offset.val = *offset; int32_t groupLen = strlen(tmq->groupId); memcpy(pOffset->offset.subKey, tmq->groupId, groupLen); @@ -519,6 +458,7 @@ static int32_t doSendCommitMsg(tmq_t* tmq, SMqClientVg* pVg, const char* pTopicN int32_t code = 0; tEncodeSize(tEncodeMqVgOffset, pOffset, len, code); if (code < 0) { + taosMemoryFree(pOffset); return TSDB_CODE_INVALID_PARA; } @@ -528,7 +468,7 @@ static int32_t doSendCommitMsg(tmq_t* tmq, SMqClientVg* pVg, const char* pTopicN return TSDB_CODE_OUT_OF_MEMORY; } - ((SMsgHead*)buf)->vgId = htonl(pVg->vgId); + ((SMsgHead*)buf)->vgId = htonl(vgId); void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); @@ -547,7 +487,7 @@ static int32_t doSendCommitMsg(tmq_t* tmq, SMqClientVg* pVg, const char* pTopicN pParam->params = pParamSet; pParam->pOffset = pOffset; - pParam->vgId = pVg->vgId; + pParam->vgId = vgId; pParam->pTmq = tmq; tstrncpy(pParam->topicName, pTopicName, tListLen(pParam->topicName)); @@ -568,23 +508,16 @@ static int32_t doSendCommitMsg(tmq_t* tmq, SMqClientVg* pVg, const char* pTopicN pMsgSendInfo->param = pParam; pMsgSendInfo->paramFreeFp = taosMemoryFree; pMsgSendInfo->fp = tmqCommitCb; - pMsgSendInfo->msgType = type; + pMsgSendInfo->msgType = TDMT_VND_TMQ_COMMIT_OFFSET; atomic_add_fetch_32(&pParamSet->waitingRspNum, 1); atomic_add_fetch_32(&pParamSet->totalRspNum, 1); - SEp* pEp = GET_ACTIVE_EP(&pVg->epSet); - char offsetBuf[TSDB_OFFSET_LEN] = {0}; - tFormatOffset(offsetBuf, tListLen(offsetBuf), &pOffset->offset.val); + SEp* pEp = GET_ACTIVE_EP(epSet); - char commitBuf[TSDB_OFFSET_LEN] = {0}; - tFormatOffset(commitBuf, tListLen(commitBuf), &pVg->offsetInfo.committedOffset); - tscInfo("consumer:0x%" PRIx64 " topic:%s on vgId:%d send offset:%s prev:%s, ep:%s:%d, ordinal:%d/%d, req:0x%" PRIx64, - tmq->consumerId, pOffset->offset.subKey, pVg->vgId, offsetBuf, commitBuf, pEp->fqdn, pEp->port, index + 1, - totalVgroups, pMsgSendInfo->requestId); int64_t transporterId = 0; - asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, pMsgSendInfo); + asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, epSet, &transporterId, pMsgSendInfo); return TSDB_CODE_SUCCESS; } @@ -604,57 +537,28 @@ static SMqClientTopic* getTopicByName(tmq_t* tmq, const char* pTopicName) { return NULL; } -static void asyncCommitOffset(tmq_t* tmq, const TAOS_RES* pRes, int32_t type, tmq_commit_cb* pCommitFp, void* userParam) { - char* pTopicName = NULL; - int32_t vgId = 0; - int32_t code = 0; - - if (pRes == NULL || tmq == NULL) { - pCommitFp(tmq, TSDB_CODE_INVALID_PARA, userParam); - return; - } - - if (TD_RES_TMQ(pRes)) { - SMqRspObj* pRspObj = (SMqRspObj*)pRes; - pTopicName = pRspObj->topic; - vgId = pRspObj->vgId; - } else if (TD_RES_TMQ_META(pRes)) { - SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)pRes; - pTopicName = pMetaRspObj->topic; - vgId = pMetaRspObj->vgId; - } else if (TD_RES_TMQ_METADATA(pRes)) { - SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)pRes; - pTopicName = pRspObj->topic; - vgId = pRspObj->vgId; - } else { - pCommitFp(tmq, TSDB_CODE_TMQ_INVALID_MSG, userParam); - return; - } - +static SMqCommitCbParamSet* prepareCommitCbParamSet(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* userParam, int32_t rspNum){ SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet)); if (pParamSet == NULL) { pCommitFp(tmq, TSDB_CODE_OUT_OF_MEMORY, userParam); - return; + return NULL; } pParamSet->refId = tmq->refId; pParamSet->epoch = tmq->epoch; pParamSet->callbackFn = pCommitFp; pParamSet->userParam = userParam; + pParamSet->waitingRspNum = rspNum; - taosRLockLatch(&tmq->lock); - int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics); - - tscDebug("consumer:0x%" PRIx64 " do manual commit offset for %s, vgId:%d", tmq->consumerId, pTopicName, vgId); + return pParamSet; +} +static SMqClientVg* getClientVg(tmq_t* tmq, char* pTopicName, int32_t vgId){ SMqClientTopic* pTopic = getTopicByName(tmq, pTopicName); if (pTopic == NULL) { - tscWarn("consumer:0x%" PRIx64 " failed to find the specified topic:%s, total topics:%d", tmq->consumerId, - pTopicName, numOfTopics); - taosMemoryFree(pParamSet); - pCommitFp(tmq, TSDB_CODE_SUCCESS, userParam); - taosRUnLockLatch(&tmq->lock); - return; + tscWarn("consumer:0x%" PRIx64 " failed to find the specified topic:%s", tmq->consumerId, pTopicName); + + return NULL; } int32_t j = 0; @@ -669,89 +573,150 @@ static void asyncCommitOffset(tmq_t* tmq, const TAOS_RES* pRes, int32_t type, tm if (j == numOfVgroups) { tscWarn("consumer:0x%" PRIx64 " failed to find the specified vgId:%d, total Vgs:%d, topic:%s", tmq->consumerId, vgId, numOfVgroups, pTopicName); - taosMemoryFree(pParamSet); - pCommitFp(tmq, TSDB_CODE_SUCCESS, userParam); - taosRUnLockLatch(&tmq->lock); - return; + return NULL; } SMqClientVg* pVg = (SMqClientVg*)taosArrayGet(pTopic->vgs, j); - if (pVg->offsetInfo.currentOffset.type > 0 && !tOffsetEqual(&pVg->offsetInfo.currentOffset, &pVg->offsetInfo.committedOffset)) { - code = doSendCommitMsg(tmq, pVg, pTopic->topicName, pParamSet, j, numOfVgroups, type); + return pVg; +} - // failed to commit, callback user function directly. - if (code != TSDB_CODE_SUCCESS) { - taosMemoryFree(pParamSet); - pCommitFp(tmq, code, userParam); +static int32_t asyncCommitOffset(tmq_t* tmq, char* pTopicName, int32_t vgId, STqOffsetVal* offsetVal, tmq_commit_cb* pCommitFp, void* userParam) { + int32_t code = 0; + tscInfo("consumer:0x%" PRIx64 " do manual commit offset for %s, vgId:%d", tmq->consumerId, pTopicName, vgId); + taosRLockLatch(&tmq->lock); + SMqClientVg* pVg = getClientVg(tmq, pTopicName, vgId); + if(pVg == NULL){ + code = TSDB_CODE_TMQ_INVALID_VGID; + goto end; + } + if (offsetVal->type > 0 && !tOffsetEqual(offsetVal, &pVg->offsetInfo.committedOffset)) { + char offsetBuf[TSDB_OFFSET_LEN] = {0}; + tFormatOffset(offsetBuf, tListLen(offsetBuf), offsetVal); + + char commitBuf[TSDB_OFFSET_LEN] = {0}; + tFormatOffset(commitBuf, tListLen(commitBuf), &pVg->offsetInfo.committedOffset); + + SMqCommitCbParamSet* pParamSet = prepareCommitCbParamSet(tmq, pCommitFp, userParam, 0); + if (pParamSet == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto end; } - // update the offset value. - pVg->offsetInfo.committedOffset = pVg->offsetInfo.currentOffset; - } else { // do not perform commit, callback user function directly. - taosMemoryFree(pParamSet); + code = doSendCommitMsg(tmq, pVg->vgId, &pVg->epSet, &pVg->offsetInfo.endOffset, pTopicName, pParamSet); + if (code != TSDB_CODE_SUCCESS) { + tscError("consumer:0x%" PRIx64 " topic:%s on vgId:%d end commit msg failed, send offset:%s committed:%s, code:%s", + tmq->consumerId, pTopicName, pVg->vgId, offsetBuf, commitBuf, tstrerror(terrno)); + taosMemoryFree(pParamSet); + goto end; + } + + tscInfo("consumer:0x%" PRIx64 " topic:%s on vgId:%d send commit msg success, send offset:%s committed:%s", + tmq->consumerId, pTopicName, pVg->vgId, offsetBuf, commitBuf); + pVg->offsetInfo.committedOffset = *offsetVal; + } + +end: + taosRUnLockLatch(&tmq->lock); + return code; +} + +static void asyncCommitFromResult(tmq_t* tmq, const TAOS_RES* pRes, tmq_commit_cb* pCommitFp, void* userParam){ + char* pTopicName = NULL; + int32_t vgId = 0; + STqOffsetVal offsetVal = {0}; + int32_t code = 0; + + if (pRes == NULL || tmq == NULL) { + code = TSDB_CODE_INVALID_PARA; + goto end; + } + + if (TD_RES_TMQ(pRes)) { + SMqRspObj* pRspObj = (SMqRspObj*)pRes; + pTopicName = pRspObj->topic; + vgId = pRspObj->vgId; + offsetVal = pRspObj->rsp.rspOffset; + } else if (TD_RES_TMQ_META(pRes)) { + SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)pRes; + pTopicName = pMetaRspObj->topic; + vgId = pMetaRspObj->vgId; + offsetVal = pMetaRspObj->metaRsp.rspOffset; + } else if (TD_RES_TMQ_METADATA(pRes)) { + SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)pRes; + pTopicName = pRspObj->topic; + vgId = pRspObj->vgId; + offsetVal = pRspObj->rsp.rspOffset; + } else { + code = TSDB_CODE_TMQ_INVALID_MSG; + goto end; + } + + code = asyncCommitOffset(tmq, pTopicName, vgId, &offsetVal, pCommitFp, userParam); + +end: + if(code != TSDB_CODE_SUCCESS){ pCommitFp(tmq, code, userParam); } - taosRUnLockLatch(&tmq->lock); } static void asyncCommitAllOffsets(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* userParam) { - SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet)); - if (pParamSet == NULL) { - pCommitFp(tmq, TSDB_CODE_OUT_OF_MEMORY, userParam); - return; - } - - pParamSet->refId = tmq->refId; - pParamSet->epoch = tmq->epoch; - pParamSet->callbackFn = pCommitFp; - pParamSet->userParam = userParam; - + int32_t code = 0; // init as 1 to prevent concurrency issue - pParamSet->waitingRspNum = 1; + SMqCommitCbParamSet* pParamSet = prepareCommitCbParamSet(tmq, pCommitFp, userParam, 1); + if (pParamSet == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto end; + } taosRLockLatch(&tmq->lock); int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics); - tscDebug("consumer:0x%" PRIx64 " start to commit offset for %d topics", tmq->consumerId, numOfTopics); + tscInfo("consumer:0x%" PRIx64 " start to commit offset for %d topics", tmq->consumerId, numOfTopics); for (int32_t i = 0; i < numOfTopics; i++) { SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i); int32_t numOfVgroups = taosArrayGetSize(pTopic->vgs); - tscDebug("consumer:0x%" PRIx64 " commit offset for topics:%s, numOfVgs:%d", tmq->consumerId, pTopic->topicName, - numOfVgroups); + tscInfo("consumer:0x%" PRIx64 " commit offset for topics:%s, numOfVgs:%d", tmq->consumerId, pTopic->topicName, numOfVgroups); for (int32_t j = 0; j < numOfVgroups; j++) { SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j); - if (pVg->offsetInfo.currentOffset.type > 0 && !tOffsetEqual(&pVg->offsetInfo.currentOffset, &pVg->offsetInfo.committedOffset)) { - int32_t code = doSendCommitMsg(tmq, pVg, pTopic->topicName, pParamSet, j, numOfVgroups, TDMT_VND_TMQ_COMMIT_OFFSET); + if (pVg->offsetInfo.endOffset.type > 0 && !tOffsetEqual(&pVg->offsetInfo.endOffset, &pVg->offsetInfo.committedOffset)) { + char offsetBuf[TSDB_OFFSET_LEN] = {0}; + tFormatOffset(offsetBuf, tListLen(offsetBuf), &pVg->offsetInfo.endOffset); + + char commitBuf[TSDB_OFFSET_LEN] = {0}; + tFormatOffset(commitBuf, tListLen(commitBuf), &pVg->offsetInfo.committedOffset); + + code = doSendCommitMsg(tmq, pVg->vgId, &pVg->epSet, &pVg->offsetInfo.endOffset, pTopic->topicName, pParamSet); if (code != TSDB_CODE_SUCCESS) { - tscError("consumer:0x%" PRIx64 " topic:%s vgId:%d offset:%" PRId64 " failed, code:%s ordinal:%d/%d", - tmq->consumerId, pTopic->topicName, pVg->vgId, pVg->offsetInfo.committedOffset.version, tstrerror(terrno), - j + 1, numOfVgroups); + tscError("consumer:0x%" PRIx64 " topic:%s on vgId:%d end commit msg failed, send offset:%s committed:%s, code:%s ordinal:%d/%d", + tmq->consumerId, pTopic->topicName, pVg->vgId, offsetBuf, commitBuf, tstrerror(terrno), j + 1, numOfVgroups); continue; } - // update the offset value. - pVg->offsetInfo.committedOffset = pVg->offsetInfo.currentOffset; + tscInfo("consumer:0x%" PRIx64 " topic:%s on vgId:%d send commit msg success, send offset:%s committed:%s, ordinal:%d/%d", + tmq->consumerId, pTopic->topicName, pVg->vgId, offsetBuf, commitBuf, j + 1, numOfVgroups); + pVg->offsetInfo.committedOffset = pVg->offsetInfo.endOffset; } else { - tscDebug("consumer:0x%" PRIx64 " topic:%s vgId:%d, no commit, current:%" PRId64 ", ordinal:%d/%d", - tmq->consumerId, pTopic->topicName, pVg->vgId, pVg->offsetInfo.currentOffset.version, j + 1, numOfVgroups); + tscInfo("consumer:0x%" PRIx64 " topic:%s vgId:%d, no commit, current:%" PRId64 ", ordinal:%d/%d", + tmq->consumerId, pTopic->topicName, pVg->vgId, pVg->offsetInfo.endOffset.version, j + 1, numOfVgroups); } } } taosRUnLockLatch(&tmq->lock); - tscDebug("consumer:0x%" PRIx64 " total commit:%d for %d topics", tmq->consumerId, pParamSet->waitingRspNum - 1, - numOfTopics); + tscInfo("consumer:0x%" PRIx64 " total commit:%d for %d topics", tmq->consumerId, pParamSet->waitingRspNum - 1, numOfTopics); - // no request is sent - if (pParamSet->totalRspNum == 0) { - taosMemoryFree(pParamSet); - pCommitFp(tmq, TSDB_CODE_SUCCESS, userParam); + // request is sent + if (pParamSet->totalRspNum != 0) { + // count down since waiting rsp num init as 1 + commitRspCountDown(pParamSet, tmq->consumerId, "", 0); return; } - // count down since waiting rsp num init as 1 - commitRspCountDown(pParamSet, tmq->consumerId, "", 0); +end: + taosMemoryFree(pParamSet); + pCommitFp(tmq, code, userParam); + return; } static void generateTimedTask(int64_t refId, int32_t type) { @@ -827,7 +792,7 @@ void tmqSendHbReq(void* param, void* tmrId) { OffsetRows* offRows = taosArrayReserve(data->offsetRows, 1); offRows->vgId = pVg->vgId; offRows->rows = pVg->numOfRows; - offRows->offset = pVg->offsetInfo.seekOffset; + offRows->offset = pVg->offsetInfo.beginOffset; char buf[TSDB_OFFSET_LEN] = {0}; tFormatOffset(buf, TSDB_OFFSET_LEN, &offRows->offset); tscInfo("consumer:0x%" PRIx64 ",report offset: vgId:%d, offset:%s, rows:%"PRId64, tmq->consumerId, offRows->vgId, buf, offRows->rows); @@ -1523,9 +1488,9 @@ static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopic .numOfRows = pInfo ? pInfo->numOfRows : 0, }; - clientVg.offsetInfo.currentOffset = pInfo ? pInfo->currentOffset : offsetNew; + clientVg.offsetInfo.endOffset = pInfo ? pInfo->currentOffset : offsetNew; clientVg.offsetInfo.committedOffset = pInfo ? pInfo->commitOffset : offsetNew; - clientVg.offsetInfo.seekOffset = pInfo ? pInfo->seekOffset : offsetNew; + clientVg.offsetInfo.beginOffset = pInfo ? pInfo->seekOffset : offsetNew; clientVg.offsetInfo.walVerBegin = -1; clientVg.offsetInfo.walVerEnd = -1; clientVg.seekUpdated = false; @@ -1581,11 +1546,11 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) makeTopicVgroupKey(vgKey, pTopicCur->topicName, pVgCur->vgId); char buf[TSDB_OFFSET_LEN] = {0}; - tFormatOffset(buf, TSDB_OFFSET_LEN, &pVgCur->offsetInfo.currentOffset); + tFormatOffset(buf, TSDB_OFFSET_LEN, &pVgCur->offsetInfo.endOffset); tscInfo("consumer:0x%" PRIx64 ", epoch:%d vgId:%d vgKey:%s, offset:%s", tmq->consumerId, epoch, pVgCur->vgId, vgKey, buf); - SVgroupSaveInfo info = {.currentOffset = pVgCur->offsetInfo.currentOffset, .seekOffset = pVgCur->offsetInfo.seekOffset, .commitOffset = pVgCur->offsetInfo.committedOffset, .numOfRows = pVgCur->numOfRows}; + SVgroupSaveInfo info = {.currentOffset = pVgCur->offsetInfo.endOffset, .seekOffset = pVgCur->offsetInfo.beginOffset, .commitOffset = pVgCur->offsetInfo.committedOffset, .numOfRows = pVgCur->numOfRows}; taosHashPut(pVgOffsetHashMap, vgKey, strlen(vgKey), &info, sizeof(SVgroupSaveInfo)); } } @@ -1682,7 +1647,7 @@ void tmqBuildConsumeReqImpl(SMqPollReq* pReq, tmq_t* tmq, int64_t timeout, SMqCl pReq->consumerId = tmq->consumerId; pReq->timeout = timeout; pReq->epoch = tmq->epoch; - pReq->reqOffset = pVg->offsetInfo.currentOffset; + pReq->reqOffset = pVg->offsetInfo.endOffset; pReq->head.vgId = pVg->vgId; pReq->useSnapshot = tmq->useSnapshot; pReq->reqId = generateRequestId(); @@ -1809,7 +1774,7 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p int64_t transporterId = 0; char offsetFormatBuf[TSDB_OFFSET_LEN] = {0}; - tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pVg->offsetInfo.currentOffset); + tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pVg->offsetInfo.endOffset); tscDebug("consumer:0x%" PRIx64 " send poll to %s vgId:%d, epoch %d, req:%s, reqId:0x%" PRIx64, pTmq->consumerId, pTopic->topicName, pVg->vgId, pTmq->epoch, offsetFormatBuf, req.reqId); @@ -1890,8 +1855,8 @@ static int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* p static void updateVgInfo(SMqClientVg* pVg, STqOffsetVal* reqOffset, STqOffsetVal* rspOffset, int64_t sver, int64_t ever, int64_t consumerId){ if (!pVg->seekUpdated) { tscDebug("consumer:0x%" PRIx64" local offset is update, since seekupdate not set", consumerId); - pVg->offsetInfo.seekOffset = *reqOffset; - pVg->offsetInfo.currentOffset = *rspOffset; + pVg->offsetInfo.beginOffset = *reqOffset; + pVg->offsetInfo.endOffset = *rspOffset; } else { tscDebug("consumer:0x%" PRIx64" local offset is NOT update, since seekupdate is set", consumerId); } @@ -2053,7 +2018,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { tmq->totalRows += numOfRows; char buf[TSDB_OFFSET_LEN] = {0}; - tFormatOffset(buf, TSDB_OFFSET_LEN, &pVg->offsetInfo.currentOffset); + tFormatOffset(buf, TSDB_OFFSET_LEN, &pVg->offsetInfo.endOffset); tscDebug("consumer:0x%" PRIx64 " process taosx poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%" PRId64 ", vg total:%" PRId64 ", total:%" PRId64 ", reqId:0x%" PRIx64, tmq->consumerId, pVg->vgId, buf, pollRspWrapper->dataRsp.blockNum, numOfRows, pVg->numOfRows, @@ -2315,7 +2280,7 @@ void tmq_commit_async(tmq_t* tmq, const TAOS_RES* pRes, tmq_commit_cb* cb, void* if (pRes == NULL) { // here needs to commit all offsets. asyncCommitAllOffsets(tmq, cb, param); } else { // only commit one offset - asyncCommitOffset(tmq, pRes, TDMT_VND_TMQ_COMMIT_OFFSET, cb, param); + asyncCommitFromResult(tmq, pRes, cb, param); } } @@ -2335,7 +2300,7 @@ int32_t tmq_commit_sync(tmq_t* tmq, const TAOS_RES* pRes) { if (pRes == NULL) { asyncCommitAllOffsets(tmq, commitCallBackFn, pInfo); } else { - asyncCommitOffset(tmq, pRes, TDMT_VND_TMQ_COMMIT_OFFSET, commitCallBackFn, pInfo); + asyncCommitFromResult(tmq, pRes, commitCallBackFn, pInfo); } tsem_wait(&pInfo->sem); @@ -2348,6 +2313,87 @@ int32_t tmq_commit_sync(tmq_t* tmq, const TAOS_RES* pRes) { return code; } +// wal range will be ok after calling tmq_get_topic_assignment or poll interface +static bool isWalRangeOk(SVgOffsetInfo* offset){ + if (offset->walVerBegin != -1 && offset->walVerEnd != -1) { + return true; + } + return false; +} + +int32_t tmq_commit_offset_sync(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset){ + if (tmq == NULL || pTopicName == NULL) { + tscError("invalid tmq handle, null"); + return TSDB_CODE_INVALID_PARA; + } + + int32_t accId = tmq->pTscObj->acctId; + char tname[TSDB_TOPIC_FNAME_LEN] = {0}; + sprintf(tname, "%d.%s", accId, pTopicName); + + taosWLockLatch(&tmq->lock); + SMqClientTopic* pTopic = getTopicByName(tmq, tname); + if (pTopic == NULL) { + tscError("consumer:0x%" PRIx64 " invalid topic name:%s", tmq->consumerId, pTopicName); + taosWUnLockLatch(&tmq->lock); + return TSDB_CODE_TMQ_INVALID_TOPIC; + } + + SMqClientVg* pVg = NULL; + int32_t numOfVgs = taosArrayGetSize(pTopic->vgs); + for (int32_t i = 0; i < numOfVgs; ++i) { + SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, i); + if (pClientVg->vgId == vgId) { + pVg = pClientVg; + break; + } + } + + if (pVg == NULL) { + tscError("consumer:0x%" PRIx64 " invalid vgroup id:%d", tmq->consumerId, vgId); + taosWUnLockLatch(&tmq->lock); + return TSDB_CODE_TMQ_INVALID_VGID; + } + + SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo; + if (!isWalRangeOk(pOffsetInfo)) { + tscError("consumer:0x%" PRIx64 " Assignment or poll interface need to be called first", tmq->consumerId); + taosWUnLockLatch(&tmq->lock); + return TSDB_CODE_TMQ_NEED_INITIALIZED; + } + + if (offset < pOffsetInfo->walVerBegin || offset > pOffsetInfo->walVerEnd) { + tscError("consumer:0x%" PRIx64 " invalid seek params, offset:%" PRId64 ", valid range:[%" PRId64 ", %" PRId64 "]", + tmq->consumerId, offset, pOffsetInfo->walVerBegin, pOffsetInfo->walVerEnd); + taosWUnLockLatch(&tmq->lock); + return TSDB_CODE_TMQ_VERSION_OUT_OF_RANGE; + } + taosWUnLockLatch(&tmq->lock); + + STqOffsetVal offsetVal = {.type = TMQ_OFFSET__LOG, .version = offset}; + + SSyncCommitInfo* pInfo = taosMemoryMalloc(sizeof(SSyncCommitInfo)); + if (pInfo == NULL) { + tscError("consumer:0x%"PRIx64" failed to prepare seek operation", tmq->consumerId); + return TSDB_CODE_OUT_OF_MEMORY; + } + + tsem_init(&pInfo->sem, 0, 0); + pInfo->code = 0; + + asyncCommitOffset(tmq, tname, vgId, &offsetVal, commitCallBackFn, pInfo); + + tsem_wait(&pInfo->sem); + int32_t code = pInfo->code; + + tsem_destroy(&pInfo->sem); + taosMemoryFree(pInfo); + + tscInfo("consumer:0x%" PRIx64 " send seek to vgId:%d, offset:%" PRId64" code:%s", tmq->consumerId, vgId, offset, tstrerror(code)); + + return code; +} + void updateEpCallbackFn(tmq_t* pTmq, int32_t code, SDataBuf* pDataBuf, void* param) { SAskEpInfo* pInfo = param; pInfo->code = code; @@ -2490,12 +2536,10 @@ int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet) { void commitRspCountDown(SMqCommitCbParamSet* pParamSet, int64_t consumerId, const char* pTopic, int32_t vgId) { int32_t waitingRspNum = atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1); if (waitingRspNum == 0) { - tscDebug("consumer:0x%" PRIx64 " topic:%s vgId:%d all commit-rsp received, commit completed", consumerId, pTopic, - vgId); + tscInfo("consumer:0x%" PRIx64 " topic:%s vgId:%d all commit-rsp received, commit completed", consumerId, pTopic, vgId); tmqCommitDone(pParamSet); } else { - tscDebug("consumer:0x%" PRIx64 " topic:%s vgId:%d commit-rsp received, remain:%d", consumerId, pTopic, vgId, - waitingRspNum); + tscInfo("consumer:0x%" PRIx64 " topic:%s vgId:%d commit-rsp received, remain:%d", consumerId, pTopic, vgId, waitingRspNum); } } @@ -2578,6 +2622,69 @@ static bool isInSnapshotMode(int8_t type, bool useSnapshot){ return false; } +int64_t tmq_position(tmq_t *tmq, const char *pTopicName, int32_t vgId){ + if (tmq == NULL) { + tscError("invalid tmq handle, null"); + return TSDB_CODE_INVALID_PARA; + } + + int32_t accId = tmq->pTscObj->acctId; + char tname[TSDB_TOPIC_FNAME_LEN] = {0}; + sprintf(tname, "%d.%s", accId, pTopicName); + + taosWLockLatch(&tmq->lock); + SMqClientTopic* pTopic = getTopicByName(tmq, tname); + if (pTopic == NULL) { + tscError("consumer:0x%" PRIx64 " invalid topic name:%s", tmq->consumerId, pTopicName); + taosWUnLockLatch(&tmq->lock); + return TSDB_CODE_TMQ_INVALID_TOPIC; + } + + SMqClientVg* pVg = NULL; + int32_t numOfVgs = taosArrayGetSize(pTopic->vgs); + for (int32_t i = 0; i < numOfVgs; ++i) { + SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, i); + if (pClientVg->vgId == vgId) { + pVg = pClientVg; + break; + } + } + + if (pVg == NULL) { + tscError("consumer:0x%" PRIx64 " invalid vgroup id:%d", tmq->consumerId, vgId); + taosWUnLockLatch(&tmq->lock); + return TSDB_CODE_TMQ_INVALID_VGID; + } + + int32_t type = pVg->offsetInfo.endOffset.type; + if (isInSnapshotMode(type, tmq->useSnapshot)) { + tscError("consumer:0x%" PRIx64 " offset type:%d not wal version, position error", tmq->consumerId, type); + taosWUnLockLatch(&tmq->lock); + return TSDB_CODE_TMQ_SNAPSHOT_ERROR; + } + + if (!isWalRangeOk(&pVg->offsetInfo)) { + tscError("consumer:0x%" PRIx64 " Assignment or poll interface need to be called first", tmq->consumerId); + taosWUnLockLatch(&tmq->lock); + return TSDB_CODE_TMQ_NEED_INITIALIZED; + } + + int64_t position = 0; + STqOffsetVal* pOffsetInfo = &pVg->offsetInfo.endOffset; + if(type == TMQ_OFFSET__LOG){ + position = pOffsetInfo->version; + }else if(type == TMQ_OFFSET__RESET_EARLIEST){ + position = pVg->offsetInfo.walVerBegin; + }else if(type == TMQ_OFFSET__RESET_LATEST){ + position = pVg->offsetInfo.walVerEnd; + }else{ + tscError("consumer:0x%" PRIx64 " offset type:%d can not be reach here", tmq->consumerId, type); + } + taosWUnLockLatch(&tmq->lock); + + return position; +} + int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_assignment** assignment, int32_t* numOfAssignment) { *numOfAssignment = 0; @@ -2585,7 +2692,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a SMqVgCommon* pCommon = NULL; int32_t accId = tmq->pTscObj->acctId; - char tname[128] = {0}; + char tname[TSDB_TOPIC_FNAME_LEN] = {0}; sprintf(tname, "%d.%s", accId, pTopicName); int32_t code = TSDB_CODE_SUCCESS; @@ -2600,7 +2707,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a *numOfAssignment = taosArrayGetSize(pTopic->vgs); for (int32_t j = 0; j < (*numOfAssignment); ++j) { SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, j); - int32_t type = pClientVg->offsetInfo.seekOffset.type; + int32_t type = pClientVg->offsetInfo.beginOffset.type; if (isInSnapshotMode(type, tmq->useSnapshot)) { tscError("consumer:0x%" PRIx64 " offset type:%d not wal version, assignment not allowed", tmq->consumerId, type); code = TSDB_CODE_TMQ_SNAPSHOT_ERROR; @@ -2620,13 +2727,13 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a for (int32_t j = 0; j < (*numOfAssignment); ++j) { SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, j); - if (pClientVg->offsetInfo.seekOffset.type != TMQ_OFFSET__LOG) { + if (pClientVg->offsetInfo.beginOffset.type != TMQ_OFFSET__LOG) { needFetch = true; break; } tmq_topic_assignment* pAssignment = &(*assignment)[j]; - pAssignment->currentOffset = pClientVg->offsetInfo.seekOffset.version; + pAssignment->currentOffset = pClientVg->offsetInfo.beginOffset.version; pAssignment->begin = pClientVg->offsetInfo.walVerBegin; pAssignment->end = pClientVg->offsetInfo.walVerEnd; pAssignment->vgId = pClientVg->vgId; @@ -2665,7 +2772,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a SMqPollReq req = {0}; tmqBuildConsumeReqImpl(&req, tmq, 10, pTopic, pClientVg); - req.reqOffset = pClientVg->offsetInfo.seekOffset; + req.reqOffset = pClientVg->offsetInfo.beginOffset; int32_t msgSize = tSerializeSMqPollReq(NULL, 0, &req); if (msgSize < 0) { @@ -2705,7 +2812,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a int64_t transporterId = 0; char offsetFormatBuf[TSDB_OFFSET_LEN] = {0}; - tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pClientVg->offsetInfo.seekOffset); + tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pClientVg->offsetInfo.beginOffset); tscInfo("consumer:0x%" PRIx64 " %s retrieve wal info vgId:%d, epoch %d, req:%s, reqId:0x%" PRIx64, tmq->consumerId, pTopic->topicName, pClientVg->vgId, tmq->epoch, offsetFormatBuf, req.reqId); @@ -2780,7 +2887,7 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_ } int32_t accId = tmq->pTscObj->acctId; - char tname[128] = {0}; + char tname[TSDB_TOPIC_FNAME_LEN] = {0}; sprintf(tname, "%d.%s", accId, pTopicName); taosWLockLatch(&tmq->lock); @@ -2809,14 +2916,20 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_ SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo; - int32_t type = pOffsetInfo->currentOffset.type; + int32_t type = pOffsetInfo->endOffset.type; if (isInSnapshotMode(type, tmq->useSnapshot)) { tscError("consumer:0x%" PRIx64 " offset type:%d not wal version, seek not allowed", tmq->consumerId, type); taosWUnLockLatch(&tmq->lock); return TSDB_CODE_TMQ_SNAPSHOT_ERROR; } - if (type == TMQ_OFFSET__LOG && (offset < pOffsetInfo->walVerBegin || offset > pOffsetInfo->walVerEnd)) { + if (!isWalRangeOk(&pVg->offsetInfo)) { + tscError("consumer:0x%" PRIx64 " Assignment or poll interface need to be called first", tmq->consumerId); + taosWUnLockLatch(&tmq->lock); + return TSDB_CODE_TMQ_NEED_INITIALIZED; + } + + if (offset < pOffsetInfo->walVerBegin || offset > pOffsetInfo->walVerEnd) { tscError("consumer:0x%" PRIx64 " invalid seek params, offset:%" PRId64 ", valid range:[%" PRId64 ", %" PRId64 "]", tmq->consumerId, offset, pOffsetInfo->walVerBegin, pOffsetInfo->walVerEnd); taosWUnLockLatch(&tmq->lock); @@ -2824,10 +2937,9 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_ } // update the offset, and then commit to vnode - pOffsetInfo->currentOffset.type = TMQ_OFFSET__LOG; - pOffsetInfo->currentOffset.version = offset; - pOffsetInfo->seekOffset = pOffsetInfo->currentOffset; -// pOffsetInfo->committedOffset.version = INT64_MIN; + pOffsetInfo->endOffset.type = TMQ_OFFSET__LOG; + pOffsetInfo->endOffset.version = offset; + pOffsetInfo->beginOffset = pOffsetInfo->endOffset; pVg->seekUpdated = true; tscInfo("consumer:0x%" PRIx64 " seek to %" PRId64 " on vgId:%d", tmq->consumerId, offset, vgId); diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 8231fad3a7..c36480a63e 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -631,6 +631,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SCALAR_CONVERT_ERROR, "Cannot convert to s //tmq TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_INVALID_MSG, "Invalid message") +TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_NEED_INITIALIZED, "Assignment or poll interface need to be called first") TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_SNAPSHOT_ERROR, "Can not operate in snapshot mode") TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_VERSION_OUT_OF_RANGE, "Offset out of range") TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_INVALID_VGID, "VgId does not belong to this consumer") From 5cb35f2fa6389e8003287d35a0a02421da1c6ebc Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 19 Jul 2023 17:29:39 +0800 Subject: [PATCH 3/6] feat:add committed & position & commite_offset interface --- include/client/taos.h | 2 +- include/common/tmsgdef.h | 1 + include/util/taoserror.h | 1 + source/client/src/clientRawBlockWrite.c | 7 + source/client/src/clientTmq.c | 468 +++++++++++++------- source/client/test/clientTests.cpp | 83 ++++ source/dnode/mgmt/mgmt_vnode/src/vmHandle.c | 1 + source/dnode/vnode/src/inc/vnodeInt.h | 1 + source/dnode/vnode/src/tq/tq.c | 43 ++ source/dnode/vnode/src/vnd/vnodeSvr.c | 4 +- source/util/src/terror.c | 1 + 11 files changed, 461 insertions(+), 151 deletions(-) diff --git a/include/client/taos.h b/include/client/taos.h index 3cc2d907ab..5ea1510e44 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -288,7 +288,7 @@ DLL_EXPORT int32_t tmq_consumer_close(tmq_t *tmq); DLL_EXPORT int32_t tmq_commit_sync(tmq_t *tmq, const TAOS_RES *msg); DLL_EXPORT void tmq_commit_async(tmq_t *tmq, const TAOS_RES *msg, tmq_commit_cb *cb, void *param); DLL_EXPORT int32_t tmq_commit_offset_sync(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset); -DLL_EXPORT void tmq_commit_offset_async(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset, tmq_commit_cb *cb, void *param); +DLL_EXPORT int32_t tmq_commit_offset_async(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset, tmq_commit_cb *cb, void *param); DLL_EXPORT int32_t tmq_get_topic_assignment(tmq_t *tmq, const char *pTopicName, tmq_topic_assignment **assignment, int32_t *numOfAssignment); DLL_EXPORT void tmq_free_assignment(tmq_topic_assignment* pAssignment); diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 3f4335af94..aa23442291 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -312,6 +312,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_VND_TMQ_CONSUME, "vnode-tmq-consume", SMqPollReq, SMqDataBlkRsp) TD_DEF_MSG_TYPE(TDMT_VND_TMQ_CONSUME_PUSH, "vnode-tmq-consume-push", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_TMQ_VG_WALINFO, "vnode-tmq-vg-walinfo", SMqPollReq, SMqDataBlkRsp) + TD_DEF_MSG_TYPE(TDMT_VND_TMQ_VG_COMMITTEDINFO, "vnode-tmq-committed-walinfo", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_TMQ_MAX_MSG, "vnd-tmq-max", NULL, NULL) diff --git a/include/util/taoserror.h b/include/util/taoserror.h index d6f44f4489..f37402c18c 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -779,6 +779,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_TMQ_INVALID_VGID TAOS_DEF_ERROR_CODE(0, 0x4008) #define TSDB_CODE_TMQ_INVALID_TOPIC TAOS_DEF_ERROR_CODE(0, 0x4009) #define TSDB_CODE_TMQ_NEED_INITIALIZED TAOS_DEF_ERROR_CODE(0, 0x4010) +#define TSDB_CODE_TMQ_NO_COMMITTED TAOS_DEF_ERROR_CODE(0, 0x4011) // stream #define TSDB_CODE_STREAM_TASK_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x4100) diff --git a/source/client/src/clientRawBlockWrite.c b/source/client/src/clientRawBlockWrite.c index 90b10e0920..dd311db126 100644 --- a/source/client/src/clientRawBlockWrite.c +++ b/source/client/src/clientRawBlockWrite.c @@ -1327,6 +1327,9 @@ end: int taos_write_raw_block_with_fields(TAOS* taos, int rows, char* pData, const char* tbname, TAOS_FIELD* fields, int numFields) { + if (!taos || !pData || !tbname) { + return TSDB_CODE_INVALID_PARA; + } int32_t code = TSDB_CODE_SUCCESS; STableMeta* pTableMeta = NULL; SQuery* pQuery = NULL; @@ -1413,6 +1416,9 @@ end: } int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname) { + if (!taos || !pData || !tbname) { + return TSDB_CODE_INVALID_PARA; + } int32_t code = TSDB_CODE_SUCCESS; STableMeta* pTableMeta = NULL; SQuery* pQuery = NULL; @@ -1812,6 +1818,7 @@ end: } char* tmq_get_json_meta(TAOS_RES* res) { + if (res == NULL) return NULL; uDebug("tmq_get_json_meta called"); if (!TD_RES_TMQ_META(res) && !TD_RES_TMQ_METADATA(res)) { return NULL; diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 071b5d6b0f..f2ea7309e4 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -220,6 +220,12 @@ typedef struct SMqSeekParam { int32_t code; } SMqSeekParam; +typedef struct SMqCommittedParam { + tsem_t sem; + int32_t code; + SMqVgOffset vgOffset; +} SMqCommittedParam; + typedef struct SMqVgWalInfoParam { int32_t vgId; int32_t epoch; @@ -241,7 +247,7 @@ typedef struct { typedef struct { SMqCommitCbParamSet* params; - SMqVgOffset* pOffset; +// SMqVgOffset* pOffset; char topicName[TSDB_TOPIC_FNAME_LEN]; int32_t vgId; tmq_t* pTmq; @@ -292,6 +298,9 @@ void tmq_conf_destroy(tmq_conf_t* conf) { } tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value) { + if (conf == NULL || key == NULL || value == NULL){ + return TMQ_CONF_INVALID; + } if (strcasecmp(key, "group.id") == 0) { tstrncpy(conf->groupId, value, TSDB_CGROUP_LEN); return TMQ_CONF_OK; @@ -406,6 +415,7 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value tmq_list_t* tmq_list_new() { return (tmq_list_t*)taosArrayInit(0, sizeof(void*)); } int32_t tmq_list_append(tmq_list_t* list, const char* src) { + if(list == NULL) return -1; SArray* container = &list->container; if (src == NULL || src[0] == 0) return -1; char* topic = taosStrdup(src); @@ -414,16 +424,19 @@ int32_t tmq_list_append(tmq_list_t* list, const char* src) { } void tmq_list_destroy(tmq_list_t* list) { + if(list == NULL) return; SArray* container = &list->container; taosArrayDestroyP(container, taosMemoryFree); } int32_t tmq_list_get_size(const tmq_list_t* list) { + if(list == NULL) return -1; const SArray* container = &list->container; return taosArrayGetSize(container); } char** tmq_list_to_c_array(const tmq_list_t* list) { + if(list == NULL) return NULL; const SArray* container = &list->container; return container->pData; } @@ -432,7 +445,7 @@ static int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) { SMqCommitCbParam* pParam = (SMqCommitCbParam*)param; SMqCommitCbParamSet* pParamSet = (SMqCommitCbParamSet*)pParam->params; - taosMemoryFree(pParam->pOffset); +// taosMemoryFree(pParam->pOffset); taosMemoryFree(pBuf->pData); taosMemoryFree(pBuf->pEpSet); @@ -441,30 +454,25 @@ static int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) { } static int32_t doSendCommitMsg(tmq_t* tmq, int32_t vgId, SEpSet* epSet, STqOffsetVal* offset, const char* pTopicName, SMqCommitCbParamSet* pParamSet) { - SMqVgOffset* pOffset = taosMemoryCalloc(1, sizeof(SMqVgOffset)); - if (pOffset == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; - } + SMqVgOffset pOffset = {0}; - pOffset->consumerId = tmq->consumerId; - pOffset->offset.val = *offset; + pOffset.consumerId = tmq->consumerId; + pOffset.offset.val = *offset; int32_t groupLen = strlen(tmq->groupId); - memcpy(pOffset->offset.subKey, tmq->groupId, groupLen); - pOffset->offset.subKey[groupLen] = TMQ_SEPARATOR; - strcpy(pOffset->offset.subKey + groupLen + 1, pTopicName); + memcpy(pOffset.offset.subKey, tmq->groupId, groupLen); + pOffset.offset.subKey[groupLen] = TMQ_SEPARATOR; + strcpy(pOffset.offset.subKey + groupLen + 1, pTopicName); int32_t len = 0; int32_t code = 0; - tEncodeSize(tEncodeMqVgOffset, pOffset, len, code); + tEncodeSize(tEncodeMqVgOffset, &pOffset, len, code); if (code < 0) { - taosMemoryFree(pOffset); return TSDB_CODE_INVALID_PARA; } void* buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len); if (buf == NULL) { - taosMemoryFree(pOffset); return TSDB_CODE_OUT_OF_MEMORY; } @@ -474,19 +482,18 @@ static int32_t doSendCommitMsg(tmq_t* tmq, int32_t vgId, SEpSet* epSet, STqOffse SEncoder encoder; tEncoderInit(&encoder, abuf, len); - tEncodeMqVgOffset(&encoder, pOffset); + tEncodeMqVgOffset(&encoder, &pOffset); tEncoderClear(&encoder); // build param SMqCommitCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqCommitCbParam)); if (pParam == NULL) { - taosMemoryFree(pOffset); taosMemoryFree(buf); return TSDB_CODE_OUT_OF_MEMORY; } pParam->params = pParamSet; - pParam->pOffset = pOffset; +// pParam->pOffset = pOffset; pParam->vgId = vgId; pParam->pTmq = tmq; @@ -495,7 +502,6 @@ static int32_t doSendCommitMsg(tmq_t* tmq, int32_t vgId, SEpSet* epSet, STqOffse // build send info SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); if (pMsgSendInfo == NULL) { - taosMemoryFree(pOffset); taosMemoryFree(buf); taosMemoryFree(pParam); return TSDB_CODE_OUT_OF_MEMORY; @@ -553,40 +559,34 @@ static SMqCommitCbParamSet* prepareCommitCbParamSet(tmq_t* tmq, tmq_commit_cb* p return pParamSet; } -static SMqClientVg* getClientVg(tmq_t* tmq, char* pTopicName, int32_t vgId){ + + +static int32_t getClientVg(tmq_t* tmq, char* pTopicName, int32_t vgId, SMqClientVg** pVg){ SMqClientTopic* pTopic = getTopicByName(tmq, pTopicName); if (pTopic == NULL) { - tscWarn("consumer:0x%" PRIx64 " failed to find the specified topic:%s", tmq->consumerId, pTopicName); - - return NULL; + tscError("consumer:0x%" PRIx64 " invalid topic name:%s", tmq->consumerId, pTopicName); + return TSDB_CODE_TMQ_INVALID_TOPIC; } - int32_t j = 0; - int32_t numOfVgroups = taosArrayGetSize(pTopic->vgs); - for (j = 0; j < numOfVgroups; j++) { - SMqClientVg* pVg = (SMqClientVg*)taosArrayGet(pTopic->vgs, j); - if (pVg->vgId == vgId) { + int32_t numOfVgs = taosArrayGetSize(pTopic->vgs); + for (int32_t i = 0; i < numOfVgs; ++i) { + SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, i); + if (pClientVg->vgId == vgId) { + *pVg = pClientVg; break; } } - if (j == numOfVgroups) { - tscWarn("consumer:0x%" PRIx64 " failed to find the specified vgId:%d, total Vgs:%d, topic:%s", tmq->consumerId, - vgId, numOfVgroups, pTopicName); - return NULL; - } - - SMqClientVg* pVg = (SMqClientVg*)taosArrayGet(pTopic->vgs, j); - return pVg; + return *pVg == NULL ? TSDB_CODE_TMQ_INVALID_VGID : TSDB_CODE_SUCCESS; } static int32_t asyncCommitOffset(tmq_t* tmq, char* pTopicName, int32_t vgId, STqOffsetVal* offsetVal, tmq_commit_cb* pCommitFp, void* userParam) { int32_t code = 0; tscInfo("consumer:0x%" PRIx64 " do manual commit offset for %s, vgId:%d", tmq->consumerId, pTopicName, vgId); taosRLockLatch(&tmq->lock); - SMqClientVg* pVg = getClientVg(tmq, pTopicName, vgId); - if(pVg == NULL){ - code = TSDB_CODE_TMQ_INVALID_VGID; + SMqClientVg* pVg = NULL; + code = getClientVg(tmq, pTopicName, vgId, &pVg); + if(code != 0){ goto end; } if (offsetVal->type > 0 && !tOffsetEqual(offsetVal, &pVg->offsetInfo.committedOffset)) { @@ -601,7 +601,7 @@ static int32_t asyncCommitOffset(tmq_t* tmq, char* pTopicName, int32_t vgId, STq code = TSDB_CODE_OUT_OF_MEMORY; goto end; } - code = doSendCommitMsg(tmq, pVg->vgId, &pVg->epSet, &pVg->offsetInfo.endOffset, pTopicName, pParamSet); + code = doSendCommitMsg(tmq, pVg->vgId, &pVg->epSet, offsetVal, pTopicName, pParamSet); if (code != TSDB_CODE_SUCCESS) { tscError("consumer:0x%" PRIx64 " topic:%s on vgId:%d end commit msg failed, send offset:%s committed:%s, code:%s", tmq->consumerId, pTopicName, pVg->vgId, offsetBuf, commitBuf, tstrerror(terrno)); @@ -964,6 +964,7 @@ int32_t tmqSubscribeCb(void* param, SDataBuf* pMsg, int32_t code) { } int32_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics) { + if(tmq == NULL) return TSDB_CODE_INVALID_PARA; if (*topics == NULL) { *topics = tmq_list_new(); } @@ -977,6 +978,7 @@ int32_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics) { } int32_t tmq_unsubscribe(tmq_t* tmq) { + if(tmq == NULL) return TSDB_CODE_INVALID_PARA; if (tmq->autoCommit) { int32_t rsp = tmq_commit_sync(tmq, NULL); if (rsp != 0) { @@ -1047,6 +1049,7 @@ static void tmqMgmtInit(void) { } tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { + if(conf == NULL) return NULL; taosThreadOnce(&tmqInit, tmqMgmtInit); if (tmqInitRes != 0) { terrno = tmqInitRes; @@ -1140,6 +1143,7 @@ _failed: } int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { + if(tmq == NULL) return TSDB_CODE_INVALID_PARA; const int32_t MAX_RETRY_COUNT = 120 * 2; // let's wait for 2 mins at most const SArray* container = &topic_list->container; int32_t sz = taosArrayGetSize(container); @@ -1264,6 +1268,7 @@ FAIL: } void tmq_conf_set_auto_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb, void* param) { + if(conf == NULL) return; conf->commitCb = cb; conf->commitCbUserParam = param; } @@ -2050,6 +2055,8 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { } TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { + if(tmq == NULL) return NULL; + void* rspObj; int64_t startTime = taosGetTimestampMs(); @@ -2129,6 +2136,8 @@ static void displayConsumeStatistics(tmq_t* pTmq) { } int32_t tmq_consumer_close(tmq_t* tmq) { + if(tmq == NULL) return TSDB_CODE_INVALID_PARA; + tscInfo("consumer:0x%" PRIx64 " start to close consumer, status:%d", tmq->consumerId, tmq->status); displayConsumeStatistics(tmq); @@ -2174,6 +2183,9 @@ const char* tmq_err2str(int32_t err) { } tmq_res_t tmq_get_res_type(TAOS_RES* res) { + if (res == NULL){ + return TMQ_RES_INVALID; + } if (TD_RES_TMQ(res)) { return TMQ_RES_DATA; } else if (TD_RES_TMQ_META(res)) { @@ -2186,6 +2198,9 @@ tmq_res_t tmq_get_res_type(TAOS_RES* res) { } const char* tmq_get_topic_name(TAOS_RES* res) { + if (res == NULL){ + return NULL; + } if (TD_RES_TMQ(res)) { SMqRspObj* pRspObj = (SMqRspObj*)res; return strchr(pRspObj->topic, '.') + 1; @@ -2201,6 +2216,10 @@ const char* tmq_get_topic_name(TAOS_RES* res) { } const char* tmq_get_db_name(TAOS_RES* res) { + if (res == NULL){ + return NULL; + } + if (TD_RES_TMQ(res)) { SMqRspObj* pRspObj = (SMqRspObj*)res; return strchr(pRspObj->db, '.') + 1; @@ -2216,6 +2235,9 @@ const char* tmq_get_db_name(TAOS_RES* res) { } int32_t tmq_get_vgroup_id(TAOS_RES* res) { + if (res == NULL){ + return -1; + } if (TD_RES_TMQ(res)) { SMqRspObj* pRspObj = (SMqRspObj*)res; return pRspObj->vgId; @@ -2231,6 +2253,9 @@ int32_t tmq_get_vgroup_id(TAOS_RES* res) { } int64_t tmq_get_vgroup_offset(TAOS_RES* res) { + if (res == NULL){ + return TSDB_CODE_INVALID_PARA; + } if (TD_RES_TMQ(res)) { SMqRspObj* pRspObj = (SMqRspObj*) res; STqOffsetVal* pOffset = &pRspObj->rsp.reqOffset; @@ -2254,10 +2279,13 @@ int64_t tmq_get_vgroup_offset(TAOS_RES* res) { } // data from tsdb, no valid offset info - return -1; + return TSDB_CODE_TMQ_SNAPSHOT_ERROR; } const char* tmq_get_table_name(TAOS_RES* res) { + if (res == NULL){ + return NULL; + } if (TD_RES_TMQ(res)) { SMqRspObj* pRspObj = (SMqRspObj*)res; if (!pRspObj->rsp.withTbName || pRspObj->rsp.blockTbName == NULL || pRspObj->resIter < 0 || @@ -2277,6 +2305,10 @@ const char* tmq_get_table_name(TAOS_RES* res) { } void tmq_commit_async(tmq_t* tmq, const TAOS_RES* pRes, tmq_commit_cb* cb, void* param) { + if (tmq == NULL) { + tscError("invalid tmq handle, null"); + return; + } if (pRes == NULL) { // here needs to commit all offsets. asyncCommitAllOffsets(tmq, cb, param); } else { // only commit one offset @@ -2291,6 +2323,11 @@ static void commitCallBackFn(tmq_t *UNUSED_PARAM(tmq), int32_t code, void* param } int32_t tmq_commit_sync(tmq_t* tmq, const TAOS_RES* pRes) { + if (tmq == NULL) { + tscError("invalid tmq handle, null"); + return TSDB_CODE_INVALID_PARA; + } + int32_t code = 0; SSyncCommitInfo* pInfo = taosMemoryMalloc(sizeof(SSyncCommitInfo)); @@ -2314,11 +2351,18 @@ int32_t tmq_commit_sync(tmq_t* tmq, const TAOS_RES* pRes) { } // wal range will be ok after calling tmq_get_topic_assignment or poll interface -static bool isWalRangeOk(SVgOffsetInfo* offset){ - if (offset->walVerBegin != -1 && offset->walVerEnd != -1) { - return true; +static int32_t checkWalRange(SVgOffsetInfo* offset, int64_t value){ + if (offset->walVerBegin == -1 || offset->walVerEnd == -1) { + tscError("Assignment or poll interface need to be called first"); + return TSDB_CODE_TMQ_NEED_INITIALIZED; } - return false; + + if (value != -1 && (value < offset->walVerBegin || value > offset->walVerEnd)) { + tscError("invalid seek params, offset:%" PRId64 ", valid range:[%" PRId64 ", %" PRId64 "]", value, offset->walVerBegin, offset->walVerEnd); + return TSDB_CODE_TMQ_VERSION_OUT_OF_RANGE; + } + + return 0; } int32_t tmq_commit_offset_sync(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset){ @@ -2332,41 +2376,18 @@ int32_t tmq_commit_offset_sync(tmq_t *tmq, const char *pTopicName, int32_t vgId, sprintf(tname, "%d.%s", accId, pTopicName); taosWLockLatch(&tmq->lock); - SMqClientTopic* pTopic = getTopicByName(tmq, tname); - if (pTopic == NULL) { - tscError("consumer:0x%" PRIx64 " invalid topic name:%s", tmq->consumerId, pTopicName); - taosWUnLockLatch(&tmq->lock); - return TSDB_CODE_TMQ_INVALID_TOPIC; - } - SMqClientVg* pVg = NULL; - int32_t numOfVgs = taosArrayGetSize(pTopic->vgs); - for (int32_t i = 0; i < numOfVgs; ++i) { - SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, i); - if (pClientVg->vgId == vgId) { - pVg = pClientVg; - break; - } - } - - if (pVg == NULL) { - tscError("consumer:0x%" PRIx64 " invalid vgroup id:%d", tmq->consumerId, vgId); + int32_t code = getClientVg(tmq, tname, vgId, &pVg); + if(code != 0){ taosWUnLockLatch(&tmq->lock); - return TSDB_CODE_TMQ_INVALID_VGID; + return code; } SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo; - if (!isWalRangeOk(pOffsetInfo)) { - tscError("consumer:0x%" PRIx64 " Assignment or poll interface need to be called first", tmq->consumerId); + code = checkWalRange(pOffsetInfo, offset); + if (code != 0) { taosWUnLockLatch(&tmq->lock); - return TSDB_CODE_TMQ_NEED_INITIALIZED; - } - - if (offset < pOffsetInfo->walVerBegin || offset > pOffsetInfo->walVerEnd) { - tscError("consumer:0x%" PRIx64 " invalid seek params, offset:%" PRId64 ", valid range:[%" PRId64 ", %" PRId64 "]", - tmq->consumerId, offset, pOffsetInfo->walVerBegin, pOffsetInfo->walVerEnd); - taosWUnLockLatch(&tmq->lock); - return TSDB_CODE_TMQ_VERSION_OUT_OF_RANGE; + return code; } taosWUnLockLatch(&tmq->lock); @@ -2384,7 +2405,7 @@ int32_t tmq_commit_offset_sync(tmq_t *tmq, const char *pTopicName, int32_t vgId, asyncCommitOffset(tmq, tname, vgId, &offsetVal, commitCallBackFn, pInfo); tsem_wait(&pInfo->sem); - int32_t code = pInfo->code; + code = pInfo->code; tsem_destroy(&pInfo->sem); taosMemoryFree(pInfo); @@ -2394,6 +2415,41 @@ int32_t tmq_commit_offset_sync(tmq_t *tmq, const char *pTopicName, int32_t vgId, return code; } +int32_t tmq_commit_offset_async(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset, tmq_commit_cb *cb, void *param){ + if (tmq == NULL || pTopicName == NULL) { + tscError("invalid tmq handle, null"); + return TSDB_CODE_INVALID_PARA; + } + + int32_t accId = tmq->pTscObj->acctId; + char tname[TSDB_TOPIC_FNAME_LEN] = {0}; + sprintf(tname, "%d.%s", accId, pTopicName); + + taosWLockLatch(&tmq->lock); + SMqClientVg* pVg = NULL; + int32_t code = getClientVg(tmq, tname, vgId, &pVg); + if(code != 0){ + taosWUnLockLatch(&tmq->lock); + return code; + } + + SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo; + code = checkWalRange(pOffsetInfo, offset); + if (code != 0) { + taosWUnLockLatch(&tmq->lock); + return code; + } + taosWUnLockLatch(&tmq->lock); + + STqOffsetVal offsetVal = {.type = TMQ_OFFSET__LOG, .version = offset}; + + code = asyncCommitOffset(tmq, tname, vgId, &offsetVal, cb, param); + + tscInfo("consumer:0x%" PRIx64 " send seek to vgId:%d, offset:%" PRId64" code:%s", tmq->consumerId, vgId, offset, tstrerror(code)); + + return code; +} + void updateEpCallbackFn(tmq_t* pTmq, int32_t code, SDataBuf* pDataBuf, void* param) { SAskEpInfo* pInfo = param; pInfo->code = code; @@ -2525,7 +2581,10 @@ int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet) { } // if no more waiting rsp - pParamSet->callbackFn(tmq, pParamSet->code, pParamSet->userParam); + if(pParamSet->callbackFn != NULL){ + pParamSet->callbackFn(tmq, pParamSet->code, pParamSet->userParam); + } + taosMemoryFree(pParamSet); // tmq->needReportOffsetRows = true; @@ -2622,8 +2681,104 @@ static bool isInSnapshotMode(int8_t type, bool useSnapshot){ return false; } +static int32_t tmCommittedCb(void* param, SDataBuf* pMsg, int32_t code) { + SMqCommittedParam* pParam = param; + + if (code != 0){ + goto end; + } + if (pMsg) { + SDecoder decoder; + tDecoderInit(&decoder, (uint8_t*)pMsg->pData, pMsg->len); + if (tDecodeMqVgOffset(&decoder, &pParam->vgOffset) < 0) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto end; + } + tDecoderClear(&decoder); + } + + end: + if(pMsg){ + taosMemoryFree(pMsg->pData); + taosMemoryFree(pMsg->pEpSet); + } + pParam->code = code; + tsem_post(&pParam->sem); + return 0; +} + +int64_t getCommittedFromServer(tmq_t *tmq, char* tname, int32_t vgId, SEpSet* epSet){ + int32_t code = 0; + SMqVgOffset pOffset = {0}; + + pOffset.consumerId = tmq->consumerId; + + int32_t groupLen = strlen(tmq->groupId); + memcpy(pOffset.offset.subKey, tmq->groupId, groupLen); + pOffset.offset.subKey[groupLen] = TMQ_SEPARATOR; + strcpy(pOffset.offset.subKey + groupLen + 1, tname); + + int32_t len = 0; + tEncodeSize(tEncodeMqVgOffset, &pOffset, len, code); + if (code < 0) { + return TSDB_CODE_INVALID_PARA; + } + + void* buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len); + if (buf == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + ((SMsgHead*)buf)->vgId = htonl(vgId); + + void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); + + SEncoder encoder; + tEncoderInit(&encoder, abuf, len); + tEncodeMqVgOffset(&encoder, &pOffset); + tEncoderClear(&encoder); + + SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); + if (sendInfo == NULL) { + taosMemoryFree(buf); + return TSDB_CODE_OUT_OF_MEMORY; + } + + SMqCommittedParam* pParam = taosMemoryMalloc(sizeof(SMqCommittedParam)); + if (pParam == NULL) { + taosMemoryFree(buf); + taosMemoryFree(sendInfo); + return TSDB_CODE_OUT_OF_MEMORY; + } + tsem_init(&pParam->sem, 0, 0); + + sendInfo->msgInfo = (SDataBuf){.pData = buf, .len = sizeof(SMsgHead) + len, .handle = NULL}; + sendInfo->requestId = generateRequestId(); + sendInfo->requestObjRefId = 0; + sendInfo->param = pParam; + sendInfo->fp = tmCommittedCb; + sendInfo->msgType = TDMT_VND_TMQ_VG_COMMITTEDINFO; + + int64_t transporterId = 0; + asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, epSet, &transporterId, sendInfo); + + tsem_wait(&pParam->sem); + code = pParam->code; + if(code == TSDB_CODE_SUCCESS){ + if(pParam->vgOffset.offset.val.type == TMQ_OFFSET__LOG){ + code = pParam->vgOffset.offset.val.version; + }else{ + code = TSDB_CODE_TMQ_SNAPSHOT_ERROR; + } + } + tsem_destroy(&pParam->sem); + taosMemoryFree(pParam); + + return code; +} + int64_t tmq_position(tmq_t *tmq, const char *pTopicName, int32_t vgId){ - if (tmq == NULL) { + if (tmq == NULL || pTopicName == NULL) { tscError("invalid tmq handle, null"); return TSDB_CODE_INVALID_PARA; } @@ -2633,60 +2788,103 @@ int64_t tmq_position(tmq_t *tmq, const char *pTopicName, int32_t vgId){ sprintf(tname, "%d.%s", accId, pTopicName); taosWLockLatch(&tmq->lock); - SMqClientTopic* pTopic = getTopicByName(tmq, tname); - if (pTopic == NULL) { - tscError("consumer:0x%" PRIx64 " invalid topic name:%s", tmq->consumerId, pTopicName); - taosWUnLockLatch(&tmq->lock); - return TSDB_CODE_TMQ_INVALID_TOPIC; - } SMqClientVg* pVg = NULL; - int32_t numOfVgs = taosArrayGetSize(pTopic->vgs); - for (int32_t i = 0; i < numOfVgs; ++i) { - SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, i); - if (pClientVg->vgId == vgId) { - pVg = pClientVg; - break; - } - } - - if (pVg == NULL) { - tscError("consumer:0x%" PRIx64 " invalid vgroup id:%d", tmq->consumerId, vgId); + int32_t code = getClientVg(tmq, tname, vgId, &pVg); + if(code != 0){ taosWUnLockLatch(&tmq->lock); - return TSDB_CODE_TMQ_INVALID_VGID; + return code; } - int32_t type = pVg->offsetInfo.endOffset.type; + SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo; + int32_t type = pOffsetInfo->endOffset.type; if (isInSnapshotMode(type, tmq->useSnapshot)) { tscError("consumer:0x%" PRIx64 " offset type:%d not wal version, position error", tmq->consumerId, type); taosWUnLockLatch(&tmq->lock); return TSDB_CODE_TMQ_SNAPSHOT_ERROR; } - if (!isWalRangeOk(&pVg->offsetInfo)) { - tscError("consumer:0x%" PRIx64 " Assignment or poll interface need to be called first", tmq->consumerId); + code = checkWalRange(pOffsetInfo, -1); + if (code != 0) { taosWUnLockLatch(&tmq->lock); - return TSDB_CODE_TMQ_NEED_INITIALIZED; + return code; } + SEpSet epSet = pVg->epSet; + int64_t begin = pVg->offsetInfo.walVerBegin; + int64_t end = pVg->offsetInfo.walVerEnd; + taosWUnLockLatch(&tmq->lock); int64_t position = 0; - STqOffsetVal* pOffsetInfo = &pVg->offsetInfo.endOffset; if(type == TMQ_OFFSET__LOG){ - position = pOffsetInfo->version; - }else if(type == TMQ_OFFSET__RESET_EARLIEST){ - position = pVg->offsetInfo.walVerBegin; - }else if(type == TMQ_OFFSET__RESET_LATEST){ - position = pVg->offsetInfo.walVerEnd; + position = pOffsetInfo->endOffset.version; + }else if(type == TMQ_OFFSET__RESET_EARLIEST || type == TMQ_OFFSET__RESET_LATEST){ + code = getCommittedFromServer(tmq, tname, vgId, &epSet); + if(code == TSDB_CODE_TMQ_NO_COMMITTED){ + if(type == TMQ_OFFSET__RESET_EARLIEST){ + position = begin; + } else if(type == TMQ_OFFSET__RESET_LATEST){ + position = end; + } + }else{ + position = code; + } }else{ tscError("consumer:0x%" PRIx64 " offset type:%d can not be reach here", tmq->consumerId, type); } - taosWUnLockLatch(&tmq->lock); return position; } +int64_t tmq_committed(tmq_t *tmq, const char *pTopicName, int32_t vgId){ + if (tmq == NULL || pTopicName == NULL) { + tscError("invalid tmq handle, null"); + return TSDB_CODE_INVALID_PARA; + } + + int32_t accId = tmq->pTscObj->acctId; + char tname[TSDB_TOPIC_FNAME_LEN] = {0}; + sprintf(tname, "%d.%s", accId, pTopicName); + + taosWLockLatch(&tmq->lock); + + SMqClientVg* pVg = NULL; + int32_t code = getClientVg(tmq, tname, vgId, &pVg); + if(code != 0){ + taosWUnLockLatch(&tmq->lock); + return code; + } + + SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo; + if (isInSnapshotMode(pOffsetInfo->endOffset.type, tmq->useSnapshot)) { + tscError("consumer:0x%" PRIx64 " offset type:%d not wal version, committed error", tmq->consumerId, pOffsetInfo->endOffset.type); + taosWUnLockLatch(&tmq->lock); + return TSDB_CODE_TMQ_SNAPSHOT_ERROR; + } + + if (isInSnapshotMode(pOffsetInfo->committedOffset.type, tmq->useSnapshot)) { + tscError("consumer:0x%" PRIx64 " offset type:%d not wal version, committed error", tmq->consumerId, pOffsetInfo->committedOffset.type); + taosWUnLockLatch(&tmq->lock); + return TSDB_CODE_TMQ_SNAPSHOT_ERROR; + } + + int64_t committed = 0; + if(pOffsetInfo->committedOffset.type == TMQ_OFFSET__LOG){ + committed = pOffsetInfo->committedOffset.version; + taosWUnLockLatch(&tmq->lock); + return committed; + } + SEpSet epSet = pVg->epSet; + taosWUnLockLatch(&tmq->lock); + + return getCommittedFromServer(tmq, tname, vgId, &epSet); +} + int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_assignment** assignment, int32_t* numOfAssignment) { + if(tmq == NULL || pTopicName == NULL || assignment == NULL || numOfAssignment == NULL){ + tscError("invalid tmq handle, null"); + return TSDB_CODE_INVALID_PARA; + } *numOfAssignment = 0; *assignment = NULL; SMqVgCommon* pCommon = NULL; @@ -2881,7 +3079,7 @@ static int32_t tmqSeekCb(void* param, SDataBuf* pMsg, int32_t code) { } int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_t offset) { - if (tmq == NULL) { + if (tmq == NULL || pTopicName == NULL) { tscError("invalid tmq handle, null"); return TSDB_CODE_INVALID_PARA; } @@ -2891,27 +3089,12 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_ sprintf(tname, "%d.%s", accId, pTopicName); taosWLockLatch(&tmq->lock); - SMqClientTopic* pTopic = getTopicByName(tmq, tname); - if (pTopic == NULL) { - tscError("consumer:0x%" PRIx64 " invalid topic name:%s", tmq->consumerId, pTopicName); - taosWUnLockLatch(&tmq->lock); - return TSDB_CODE_TMQ_INVALID_TOPIC; - } SMqClientVg* pVg = NULL; - int32_t numOfVgs = taosArrayGetSize(pTopic->vgs); - for (int32_t i = 0; i < numOfVgs; ++i) { - SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, i); - if (pClientVg->vgId == vgId) { - pVg = pClientVg; - break; - } - } - - if (pVg == NULL) { - tscError("consumer:0x%" PRIx64 " invalid vgroup id:%d", tmq->consumerId, vgId); + int32_t code = getClientVg(tmq, tname, vgId, &pVg); + if(code != 0){ taosWUnLockLatch(&tmq->lock); - return TSDB_CODE_TMQ_INVALID_VGID; + return code; } SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo; @@ -2923,53 +3106,44 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_ return TSDB_CODE_TMQ_SNAPSHOT_ERROR; } - if (!isWalRangeOk(&pVg->offsetInfo)) { - tscError("consumer:0x%" PRIx64 " Assignment or poll interface need to be called first", tmq->consumerId); + code = checkWalRange(pOffsetInfo, -1); + if (code != 0) { taosWUnLockLatch(&tmq->lock); - return TSDB_CODE_TMQ_NEED_INITIALIZED; - } - - if (offset < pOffsetInfo->walVerBegin || offset > pOffsetInfo->walVerEnd) { - tscError("consumer:0x%" PRIx64 " invalid seek params, offset:%" PRId64 ", valid range:[%" PRId64 ", %" PRId64 "]", - tmq->consumerId, offset, pOffsetInfo->walVerBegin, pOffsetInfo->walVerEnd); - taosWUnLockLatch(&tmq->lock); - return TSDB_CODE_TMQ_VERSION_OUT_OF_RANGE; + return code; } + tscInfo("consumer:0x%" PRIx64 " seek to %" PRId64 " on vgId:%d", tmq->consumerId, offset, vgId); // update the offset, and then commit to vnode pOffsetInfo->endOffset.type = TMQ_OFFSET__LOG; pOffsetInfo->endOffset.version = offset; pOffsetInfo->beginOffset = pOffsetInfo->endOffset; pVg->seekUpdated = true; - tscInfo("consumer:0x%" PRIx64 " seek to %" PRId64 " on vgId:%d", tmq->consumerId, offset, vgId); + SEpSet epSet = pVg->epSet; + taosWUnLockLatch(&tmq->lock); SMqSeekReq req = {0}; - snprintf(req.subKey, TSDB_SUBSCRIBE_KEY_LEN, "%s:%s", tmq->groupId, pTopic->topicName); - req.head.vgId = pVg->vgId; + snprintf(req.subKey, TSDB_SUBSCRIBE_KEY_LEN, "%s:%s", tmq->groupId, tname); + req.head.vgId = vgId; req.consumerId = tmq->consumerId; int32_t msgSize = tSerializeSMqSeekReq(NULL, 0, &req); if (msgSize < 0) { - taosWUnLockLatch(&tmq->lock); return TSDB_CODE_PAR_INTERNAL_ERROR; } char* msg = taosMemoryCalloc(1, msgSize); if (NULL == msg) { - taosWUnLockLatch(&tmq->lock); return TSDB_CODE_OUT_OF_MEMORY; } if (tSerializeSMqSeekReq(msg, msgSize, &req) < 0) { taosMemoryFree(msg); - taosWUnLockLatch(&tmq->lock); return TSDB_CODE_PAR_INTERNAL_ERROR; } SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); if (sendInfo == NULL) { taosMemoryFree(msg); - taosWUnLockLatch(&tmq->lock); return TSDB_CODE_OUT_OF_MEMORY; } @@ -2977,7 +3151,6 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_ if (pParam == NULL) { taosMemoryFree(msg); taosMemoryFree(sendInfo); - taosWUnLockLatch(&tmq->lock); return TSDB_CODE_OUT_OF_MEMORY; } tsem_init(&pParam->sem, 0, 0); @@ -2991,18 +3164,15 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_ int64_t transporterId = 0; tscInfo("consumer:0x%" PRIx64 " %s send seek info vgId:%d, epoch %d" PRIx64, - tmq->consumerId, pTopic->topicName, vgId, tmq->epoch); - asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo); - taosWUnLockLatch(&tmq->lock); + tmq->consumerId, tname, vgId, tmq->epoch); + asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); tsem_wait(&pParam->sem); - int32_t code = pParam->code; + code = pParam->code; tsem_destroy(&pParam->sem); taosMemoryFree(pParam); - if (code != TSDB_CODE_SUCCESS) { - tscError("consumer:0x%" PRIx64 " failed to send seek to vgId:%d, code:%s", tmq->consumerId, vgId, tstrerror(code)); - } + tscInfo("consumer:0x%" PRIx64 "send seek to vgId:%d, return code:%s", tmq->consumerId, vgId, tstrerror(code)); return code; } diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index 6aeb2152d5..bfd6908e16 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -1075,6 +1075,89 @@ TEST(clientCase, sub_db_test) { fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows); } +TEST(clientCase, tmq_commit) { +// taos_options(TSDB_OPTION_CONFIGDIR, "~/first/cfg"); + + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + ASSERT_NE(pConn, nullptr); + + tmq_conf_t* conf = tmq_conf_new(); + + tmq_conf_set(conf, "enable.auto.commit", "false"); + tmq_conf_set(conf, "auto.commit.interval.ms", "2000"); + tmq_conf_set(conf, "group.id", "group_id_2"); + tmq_conf_set(conf, "td.connect.user", "root"); + tmq_conf_set(conf, "td.connect.pass", "taosdata"); + tmq_conf_set(conf, "auto.offset.reset", "earliest"); + tmq_conf_set(conf, "msg.with.table.name", "true"); + + tmq_t* tmq = tmq_consumer_new(conf, NULL, 0); + tmq_conf_destroy(conf); + + char topicName[128] = "tp"; + // 创建订阅 topics 列表 + tmq_list_t* topicList = tmq_list_new(); + tmq_list_append(topicList, topicName); + + // 启动订阅 + tmq_subscribe(tmq, topicList); + tmq_list_destroy(topicList); + + int32_t totalRows = 0; + int32_t msgCnt = 0; + int32_t timeout = 2000; + + tmq_topic_assignment* pAssign = NULL; + int32_t numOfAssign = 0; + + int32_t code = tmq_get_topic_assignment(tmq, topicName, &pAssign, &numOfAssign); + if (code != 0) { + printf("error occurs:%s\n", tmq_err2str(code)); + tmq_free_assignment(pAssign); + tmq_consumer_close(tmq); + taos_close(pConn); + fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows); + return; + } + + for(int i = 0; i < numOfAssign; i++){ + printf("assign i:%d, vgId:%d, offset:%lld, start:%lld, end:%lld\n", i, pAssign[i].vgId, pAssign[i].currentOffset, pAssign[i].begin, pAssign[i].end); + + int64_t position = tmq_position(tmq, topicName, pAssign[i].vgId); + printf("position vgId:%d, position:%lld\n", pAssign[i].vgId, position); + tmq_offset_seek(tmq, topicName, pAssign[i].vgId, 1); + position = tmq_position(tmq, topicName, pAssign[i].vgId); + printf("after seek 100, position vgId:%d, position:%lld\n", pAssign[i].vgId, position); + } + + while (1) { + printf("start to poll\n"); + TAOS_RES* pRes = tmq_consumer_poll(tmq, timeout); + if (pRes) { + printSubResults(pRes, &totalRows); + } else { + break; + } + + tmq_commit_sync(tmq, pRes); + for(int i = 0; i < numOfAssign; i++) { + int64_t committed = tmq_committed(tmq, topicName, pAssign[i].vgId); + printf("committed vgId:%d, committed:%lld\n", pAssign[i].vgId, committed); + } + if (pRes != NULL) { + taos_free_result(pRes); + } + +// tmq_offset_seek(tmq, "tp", pAssign[0].vgId, pAssign[0].begin); + } + + tmq_free_assignment(pAssign); + + tmq_consumer_close(tmq); + taos_close(pConn); + fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows); +} + TEST(clientCase, td_25129) { // taos_options(TSDB_OPTION_CONFIGDIR, "~/first/cfg"); diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 738b7db46a..8a5a4e5079 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -732,6 +732,7 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_CONSUME, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_CONSUME_PUSH, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_VG_WALINFO, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_VG_COMMITTEDINFO, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_DELETE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_BATCH_DEL, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_COMMIT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index b5a7e5fc6b..2a5cdbe555 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -232,6 +232,7 @@ int32_t tqProcessSeekReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessPollPush(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg); +int32_t tqProcessVgCommittedInfoReq(STQ* pTq, SRpcMsg* pMsg); // tq-stream int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 0b10b62267..bf0067b128 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -578,6 +578,49 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { return code; } +int32_t tqProcessVgCommittedInfoReq(STQ* pTq, SRpcMsg* pMsg) { + void* data = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); + int32_t len = pMsg->contLen - sizeof(SMsgHead); + + SMqVgOffset vgOffset = {0}; + + SDecoder decoder; + tDecoderInit(&decoder, (uint8_t*)data, len); + if (tDecodeMqVgOffset(&decoder, &vgOffset) < 0) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + tDecoderClear(&decoder); + + STqOffset* pOffset = &vgOffset.offset; + STqOffset* pSavedOffset = tqOffsetRead(pTq->pOffsetStore, pOffset->subKey); + if (pSavedOffset == NULL) { + return TSDB_CODE_TMQ_NO_COMMITTED; + } + vgOffset.offset = *pSavedOffset; + + int32_t code = 0; + tEncodeSize(tEncodeMqVgOffset, &vgOffset, len, code); + if (code < 0) { + return TSDB_CODE_INVALID_PARA; + } + + void* buf = taosMemoryCalloc(1, len); + if (buf == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + SEncoder encoder; + tEncoderInit(&encoder, buf, len); + tEncodeMqVgOffset(&encoder, &vgOffset); + tEncoderClear(&encoder); + + SRpcMsg rsp = {.info = pMsg->info, .pCont = buf, .contLen = len, .code = 0}; + + tmsgSendRsp(&rsp); + + return 0; +} + int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) { SMqPollReq req = {0}; if (tDeserializeSMqPollReq(pMsg->pCont, pMsg->contLen, &req) < 0) { diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 0d9c478c1b..204107ee3c 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -462,7 +462,7 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg } break; case TDMT_VND_TMQ_COMMIT_OFFSET: - if (tqProcessOffsetCommitReq(pVnode->pTq, ver, pReq, pMsg->contLen - sizeof(SMsgHead)) < 0) { + if (tqProcessOffsetCommitReq(pVnode->pTq, ver, pReq, len) < 0) { goto _err; } break; @@ -638,6 +638,8 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { // return tqProcessPollReq(pVnode->pTq, pMsg); case TDMT_VND_TMQ_VG_WALINFO: return tqProcessVgWalInfoReq(pVnode->pTq, pMsg); + case TDMT_VND_TMQ_VG_COMMITTEDINFO: + return tqProcessVgCommittedInfoReq(pVnode->pTq, pMsg); case TDMT_VND_TMQ_SEEK: return tqProcessSeekReq(pVnode->pTq, pMsg); case TDMT_STREAM_TASK_RUN: diff --git a/source/util/src/terror.c b/source/util/src/terror.c index c36480a63e..83a50f7051 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -633,6 +633,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SCALAR_CONVERT_ERROR, "Cannot convert to s TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_INVALID_MSG, "Invalid message") TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_NEED_INITIALIZED, "Assignment or poll interface need to be called first") TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_SNAPSHOT_ERROR, "Can not operate in snapshot mode") +TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_NO_COMMITTED, "No committed info") TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_VERSION_OUT_OF_RANGE, "Offset out of range") TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_INVALID_VGID, "VgId does not belong to this consumer") TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_INVALID_TOPIC, "Topic does not belong to this consumer") From d7d81d82a0f5824c5bf9fc63e7a5ba4af031710c Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 19 Jul 2023 18:27:19 +0800 Subject: [PATCH 4/6] feat:add committed & position & commite_offset interface --- source/client/test/clientTests.cpp | 8 +++++++- source/dnode/vnode/src/tq/tq.c | 14 +++++++------- 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index bfd6908e16..b331e68b73 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -1127,7 +1127,7 @@ TEST(clientCase, tmq_commit) { printf("position vgId:%d, position:%lld\n", pAssign[i].vgId, position); tmq_offset_seek(tmq, topicName, pAssign[i].vgId, 1); position = tmq_position(tmq, topicName, pAssign[i].vgId); - printf("after seek 100, position vgId:%d, position:%lld\n", pAssign[i].vgId, position); + printf("after seek 1, position vgId:%d, position:%lld\n", pAssign[i].vgId, position); } while (1) { @@ -1143,6 +1143,12 @@ TEST(clientCase, tmq_commit) { for(int i = 0; i < numOfAssign; i++) { int64_t committed = tmq_committed(tmq, topicName, pAssign[i].vgId); printf("committed vgId:%d, committed:%lld\n", pAssign[i].vgId, committed); + if(committed > 0){ + int32_t code = tmq_commit_offset_sync(tmq, topicName, pAssign[i].vgId, 4); + printf("tmq_commit_offset_sync vgId:%d, offset:4, code:%d\n", pAssign[i].vgId, code); + int64_t committed = tmq_committed(tmq, topicName, pAssign[i].vgId); + printf("after tmq_commit_offset_sync, committed vgId:%d, committed:%lld\n", pAssign[i].vgId, committed); + } } if (pRes != NULL) { taos_free_result(pRes); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index bf0067b128..03d6932578 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -85,9 +85,9 @@ void tqDestroyTqHandle(void* data) { } } -static bool tqOffsetLessOrEqual(const STqOffset* pLeft, const STqOffset* pRight) { +static bool tqOffsetEqual(const STqOffset* pLeft, const STqOffset* pRight) { return pLeft->val.type == TMQ_OFFSET__LOG && pRight->val.type == TMQ_OFFSET__LOG && - pLeft->val.version <= pRight->val.version; + pLeft->val.version == pRight->val.version; } STQ* tqOpen(const char* path, SVnode* pVnode) { @@ -302,10 +302,10 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t sversion, char* msg, int32_t STqOffset* pOffset = &vgOffset.offset; if (pOffset->val.type == TMQ_OFFSET__SNAPSHOT_DATA || pOffset->val.type == TMQ_OFFSET__SNAPSHOT_META) { - tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:snapshot) uid:%" PRId64 ", ts:%" PRId64, + tqInfo("receive offset commit msg to %s on vgId:%d, offset(type:snapshot) uid:%" PRId64 ", ts:%" PRId64, pOffset->subKey, vgId, pOffset->val.uid, pOffset->val.ts); } else if (pOffset->val.type == TMQ_OFFSET__LOG) { - tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:log) version:%" PRId64, pOffset->subKey, vgId, + tqInfo("receive offset commit msg to %s on vgId:%d, offset(type:log) version:%" PRId64, pOffset->subKey, vgId, pOffset->val.version); if (pOffset->val.version + 1 == sversion) { pOffset->val.version += 1; @@ -316,8 +316,8 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t sversion, char* msg, int32_t } STqOffset* pSavedOffset = tqOffsetRead(pTq->pOffsetStore, pOffset->subKey); - if (pSavedOffset != NULL && tqOffsetLessOrEqual(pOffset, pSavedOffset)) { - tqDebug("not update the offset, vgId:%d sub:%s since committed:%" PRId64 " less than/equal to existed:%" PRId64, + if (pSavedOffset != NULL && tqOffsetEqual(pOffset, pSavedOffset)) { + tqInfo("not update the offset, vgId:%d sub:%s since committed:%" PRId64 " less than/equal to existed:%" PRId64, vgId, pOffset->subKey, pOffset->val.version, pSavedOffset->val.version); return 0; // no need to update the offset value } @@ -605,7 +605,7 @@ int32_t tqProcessVgCommittedInfoReq(STQ* pTq, SRpcMsg* pMsg) { return TSDB_CODE_INVALID_PARA; } - void* buf = taosMemoryCalloc(1, len); + void* buf = rpcMallocCont(len); if (buf == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } From 49c87a7cf682a03959d39525a0213d1bcfecf91e Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 19 Jul 2023 19:14:18 +0800 Subject: [PATCH 5/6] feat:add committed & position & commite_offset interface --- source/client/test/clientTests.cpp | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index b331e68b73..02443a696c 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -1183,9 +1183,10 @@ TEST(clientCase, td_25129) { tmq_t* tmq = tmq_consumer_new(conf, NULL, 0); tmq_conf_destroy(conf); + char topicName[128] = "tp"; // 创建订阅 topics 列表 tmq_list_t* topicList = tmq_list_new(); - tmq_list_append(topicList, "tp"); + tmq_list_append(topicList, topicName); // 启动订阅 tmq_subscribe(tmq, topicList); @@ -1203,7 +1204,7 @@ TEST(clientCase, td_25129) { tmq_topic_assignment* pAssign = NULL; int32_t numOfAssign = 0; - int32_t code = tmq_get_topic_assignment(tmq, "tp", &pAssign, &numOfAssign); + int32_t code = tmq_get_topic_assignment(tmq, topicName, &pAssign, &numOfAssign); if (code != 0) { printf("error occurs:%s\n", tmq_err2str(code)); tmq_free_assignment(pAssign); @@ -1220,7 +1221,7 @@ TEST(clientCase, td_25129) { // tmq_offset_seek(tmq, "tp", pAssign[0].vgId, 4); tmq_free_assignment(pAssign); - code = tmq_get_topic_assignment(tmq, "tp", &pAssign, &numOfAssign); + code = tmq_get_topic_assignment(tmq, topicName, &pAssign, &numOfAssign); if (code != 0) { printf("error occurs:%s\n", tmq_err2str(code)); tmq_free_assignment(pAssign); @@ -1236,7 +1237,7 @@ TEST(clientCase, td_25129) { tmq_free_assignment(pAssign); - code = tmq_get_topic_assignment(tmq, "tp", &pAssign, &numOfAssign); + code = tmq_get_topic_assignment(tmq, topicName, &pAssign, &numOfAssign); if (code != 0) { printf("error occurs:%s\n", tmq_err2str(code)); tmq_free_assignment(pAssign); @@ -1266,7 +1267,7 @@ TEST(clientCase, td_25129) { printSubResults(pRes, &totalRows); - code = tmq_get_topic_assignment(tmq, "tp", &pAssign, &numOfAssign); + code = tmq_get_topic_assignment(tmq, topicName, &pAssign, &numOfAssign); if (code != 0) { printf("error occurs:%s\n", tmq_err2str(code)); tmq_free_assignment(pAssign); @@ -1280,10 +1281,11 @@ TEST(clientCase, td_25129) { printf("assign i:%d, vgId:%d, offset:%lld, start:%lld, end:%lld\n", i, pAssign[i].vgId, pAssign[i].currentOffset, pAssign[i].begin, pAssign[i].end); } } else { - tmq_offset_seek(tmq, "tp", pAssign[0].vgId, pAssign[0].currentOffset); - tmq_offset_seek(tmq, "tp", pAssign[1].vgId, pAssign[1].currentOffset); + for(int i = 0; i < numOfAssign; i++) { + tmq_offset_seek(tmq, topicName, pAssign[i].vgId, pAssign[i].currentOffset); + } tmq_commit_sync(tmq, pRes); - continue; + break; } // tmq_commit_sync(tmq, pRes); From e1c4cca33dc1ecb9df30a73e5b0c7031661ff0bb Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Thu, 20 Jul 2023 00:07:04 +0800 Subject: [PATCH 6/6] feat:add committed & position & commite_offset interface --- source/client/src/clientTmq.c | 2 +- source/client/test/clientTests.cpp | 4 ++++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index f2ea7309e4..3576df434b 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -3106,7 +3106,7 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_ return TSDB_CODE_TMQ_SNAPSHOT_ERROR; } - code = checkWalRange(pOffsetInfo, -1); + code = checkWalRange(pOffsetInfo, offset); if (code != 0) { taosWUnLockLatch(&tmq->lock); return code; diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index 02443a696c..d88a26cbb2 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -1123,6 +1123,9 @@ TEST(clientCase, tmq_commit) { for(int i = 0; i < numOfAssign; i++){ printf("assign i:%d, vgId:%d, offset:%lld, start:%lld, end:%lld\n", i, pAssign[i].vgId, pAssign[i].currentOffset, pAssign[i].begin, pAssign[i].end); + int64_t committed = tmq_committed(tmq, topicName, pAssign[i].vgId); + printf("committed vgId:%d, committed:%lld\n", pAssign[i].vgId, committed); + int64_t position = tmq_position(tmq, topicName, pAssign[i].vgId); printf("position vgId:%d, position:%lld\n", pAssign[i].vgId, position); tmq_offset_seek(tmq, topicName, pAssign[i].vgId, 1); @@ -1317,6 +1320,7 @@ TEST(clientCase, td_25129) { printf("assign i:%d, vgId:%d, offset:%lld, start:%lld, end:%lld\n", i, pAssign[i].vgId, pAssign[i].currentOffset, pAssign[i].begin, pAssign[i].end); } + tmq_free_assignment(pAssign); tmq_consumer_close(tmq); taos_close(pConn); fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows);