diff --git a/include/client/taos.h b/include/client/taos.h index 7bdf16ed38..3cc2d907ab 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -287,11 +287,20 @@ DLL_EXPORT TAOS_RES *tmq_consumer_poll(tmq_t *tmq, int64_t timeout); DLL_EXPORT int32_t tmq_consumer_close(tmq_t *tmq); DLL_EXPORT int32_t tmq_commit_sync(tmq_t *tmq, const TAOS_RES *msg); DLL_EXPORT void tmq_commit_async(tmq_t *tmq, const TAOS_RES *msg, tmq_commit_cb *cb, void *param); +DLL_EXPORT int32_t tmq_commit_offset_sync(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset); +DLL_EXPORT void tmq_commit_offset_async(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset, tmq_commit_cb *cb, void *param); DLL_EXPORT int32_t tmq_get_topic_assignment(tmq_t *tmq, const char *pTopicName, tmq_topic_assignment **assignment, int32_t *numOfAssignment); DLL_EXPORT void tmq_free_assignment(tmq_topic_assignment* pAssignment); DLL_EXPORT int32_t tmq_offset_seek(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset); +DLL_EXPORT const char *tmq_get_topic_name(TAOS_RES *res); +DLL_EXPORT const char *tmq_get_db_name(TAOS_RES *res); +DLL_EXPORT int32_t tmq_get_vgroup_id(TAOS_RES *res); +DLL_EXPORT int64_t tmq_get_vgroup_offset(TAOS_RES* res); +DLL_EXPORT int64_t tmq_position(tmq_t *tmq, const char *pTopicName, int32_t vgId); +DLL_EXPORT int64_t tmq_committed(tmq_t *tmq, const char *pTopicName, int32_t vgId); + /* ----------------------TMQ CONFIGURATION INTERFACE---------------------- */ enum tmq_conf_res_t { @@ -309,11 +318,6 @@ DLL_EXPORT void tmq_conf_set_auto_commit_cb(tmq_conf_t *conf, tmq_comm /* -------------------------TMQ MSG HANDLE INTERFACE---------------------- */ -DLL_EXPORT const char *tmq_get_topic_name(TAOS_RES *res); -DLL_EXPORT const char *tmq_get_db_name(TAOS_RES *res); -DLL_EXPORT int32_t tmq_get_vgroup_id(TAOS_RES *res); -DLL_EXPORT int64_t tmq_get_vgroup_offset(TAOS_RES* res); - /* ------------------------------ TAOSX -----------------------------------*/ // note: following apis are unstable enum tmq_res_t { diff --git a/include/libs/executor/storageapi.h b/include/libs/executor/storageapi.h index 6031b99cfc..41516adc73 100644 --- a/include/libs/executor/storageapi.h +++ b/include/libs/executor/storageapi.h @@ -228,7 +228,7 @@ typedef struct SStoreTqReader { } SStoreTqReader; typedef struct SStoreSnapshotFn { - int32_t (*createSnapshot)(SSnapContext* ctx, int64_t uid); + int32_t (*setForSnapShot)(SSnapContext* ctx, int64_t uid); int32_t (*destroySnapshot)(SSnapContext* ctx); SMetaTableInfo (*getMetaTableInfoFromSnapshot)(SSnapContext* ctx); int32_t (*getTableInfoFromSnapshot)(SSnapContext* ctx, void** pBuf, int32_t* contLen, int16_t* type, int64_t* uid); diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 5879de2e30..6bbcbe62be 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1901,7 +1901,7 @@ static void updateVgInfo(SMqClientVg* pVg, STqOffsetVal* reqOffset, STqOffsetVal // update the valid wal version range pVg->offsetInfo.walVerBegin = sver; - pVg->offsetInfo.walVerEnd = ever; + pVg->offsetInfo.walVerEnd = ever + 1; // pVg->receivedInfoFromVnode = true; } @@ -2541,7 +2541,7 @@ static int32_t tmqGetWalInfoCb(void* param, SDataBuf* pMsg, int32_t code) { SMqRspHead* pHead = pMsg->pData; tmq_topic_assignment assignment = {.begin = pHead->walsver, - .end = pHead->walever, + .end = pHead->walever + 1, .currentOffset = rsp.rspOffset.version, .vgId = pParam->vgId}; @@ -2600,7 +2600,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a *numOfAssignment = taosArrayGetSize(pTopic->vgs); for (int32_t j = 0; j < (*numOfAssignment); ++j) { SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, j); - int32_t type = pClientVg->offsetInfo.currentOffset.type; + int32_t type = pClientVg->offsetInfo.seekOffset.type; if (isInSnapshotMode(type, tmq->useSnapshot)) { tscError("consumer:0x%" PRIx64 " offset type:%d not wal version, assignment not allowed", tmq->consumerId, type); code = TSDB_CODE_TMQ_SNAPSHOT_ERROR; @@ -2620,7 +2620,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a for (int32_t j = 0; j < (*numOfAssignment); ++j) { SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, j); - if (pClientVg->offsetInfo.currentOffset.type != TMQ_OFFSET__LOG) { + if (pClientVg->offsetInfo.seekOffset.type != TMQ_OFFSET__LOG) { needFetch = true; break; } @@ -2705,7 +2705,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a int64_t transporterId = 0; char offsetFormatBuf[TSDB_OFFSET_LEN] = {0}; - tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pClientVg->offsetInfo.currentOffset); + tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pClientVg->offsetInfo.seekOffset); tscInfo("consumer:0x%" PRIx64 " %s retrieve wal info vgId:%d, epoch %d, req:%s, reqId:0x%" PRIx64, tmq->consumerId, pTopic->topicName, pClientVg->vgId, tmq->epoch, offsetFormatBuf, req.reqId); @@ -2825,7 +2825,7 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_ // update the offset, and then commit to vnode pOffsetInfo->currentOffset.type = TMQ_OFFSET__LOG; - pOffsetInfo->currentOffset.version = offset >= 1 ? offset - 1 : 0; + pOffsetInfo->currentOffset.version = offset; pOffsetInfo->seekOffset = pOffsetInfo->currentOffset; // pOffsetInfo->committedOffset.version = INT64_MIN; pVg->seekUpdated = true; diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 5f53f1c50c..4299cd471a 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -196,7 +196,7 @@ int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHea tqDebug("tmq poll: consumer:0x%" PRIx64 ", (epoch %d) vgId:%d offset %" PRId64 ", no more log to return, reqId:0x%" PRIx64, pHandle->consumerId, pHandle->epoch, vgId, offset, reqId); - *fetchOffset = offset - 1; + *fetchOffset = offset; code = -1; goto END; } diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 8948bae852..7d632f44bc 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -119,7 +119,7 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand } } else { walRefFirstVer(pTq->pVnode->pWal, pHandle->pRef); - tqOffsetResetToLog(pOffsetVal, pHandle->pRef->refVer - 1); + tqOffsetResetToLog(pOffsetVal, pHandle->pRef->refVer); } } else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) { walRefLastVer(pTq->pVnode->pWal, pHandle->pRef); @@ -127,7 +127,7 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand SMqDataRsp dataRsp = {0}; tqInitDataRsp(&dataRsp, pRequest); - tqOffsetResetToLog(&dataRsp.rspOffset, pHandle->pRef->refVer); + tqOffsetResetToLog(&dataRsp.rspOffset, pHandle->pRef->refVer + 1); tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, (latest) offset reset to %" PRId64, consumerId, pHandle->subKey, vgId, dataRsp.rspOffset.version); int32_t code = tqSendDataRsp(pHandle, pMsg, pRequest, &dataRsp, TMQ_MSG_TYPE__POLL_RSP, vgId); @@ -138,7 +138,7 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand } else { STaosxRsp taosxRsp = {0}; tqInitTaosxRsp(&taosxRsp, pRequest); - tqOffsetResetToLog(&taosxRsp.rspOffset, pHandle->pRef->refVer); + tqOffsetResetToLog(&taosxRsp.rspOffset, pHandle->pRef->refVer + 1); int32_t code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP, vgId); tDeleteSTaosxRsp(&taosxRsp); @@ -246,7 +246,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, if (offset->type == TMQ_OFFSET__LOG) { walReaderVerifyOffset(pHandle->pWalReader, offset); - int64_t fetchVer = offset->version + 1; + int64_t fetchVer = offset->version; pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048); if (pCkHead == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -279,14 +279,14 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, // process meta if (pHead->msgType != TDMT_VND_SUBMIT) { if (totalRows > 0) { - tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer - 1); + tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer); setRequestVersion(&taosxRsp.reqOffset, offset->version); code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP, vgId); goto end; } tqDebug("fetch meta msg, ver:%" PRId64 ", type:%s", pHead->version, TMSG_INFO(pHead->msgType)); - tqOffsetResetToLog(&metaRsp.rspOffset, fetchVer); + tqOffsetResetToLog(&metaRsp.rspOffset, fetchVer + 1); metaRsp.resMsgType = pHead->msgType; metaRsp.metaRspLen = pHead->bodyLen; metaRsp.metaRsp = pHead->body; @@ -309,7 +309,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, } if (totalRows >= 4096 || taosxRsp.createTableNum > 0) { - tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer); + tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer + 1); setRequestVersion(&taosxRsp.reqOffset, offset->version); code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP, vgId); goto end; diff --git a/source/dnode/vnode/src/vnd/vnodeInitApi.c b/source/dnode/vnode/src/vnd/vnodeInitApi.c index d2db6368a2..447cab4e9e 100644 --- a/source/dnode/vnode/src/vnd/vnodeInitApi.c +++ b/source/dnode/vnode/src/vnd/vnodeInitApi.c @@ -237,7 +237,7 @@ void initCacheFn(SStoreCacheReader* pCache) { } void initSnapshotFn(SStoreSnapshotFn* pSnapshot) { - pSnapshot->createSnapshot = setForSnapShot; + pSnapshot->setForSnapShot = setForSnapShot; pSnapshot->destroySnapshot = destroySnapContext; pSnapshot->getMetaTableInfoFromSnapshot = getMetaTableInfoFromSnapshot; pSnapshot->getTableInfoFromSnapshot = getTableInfoFromSnapshot; diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 88e2165a12..dfb0c1645a 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -1112,8 +1112,8 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT SStoreTqReader* pReaderAPI = &pTaskInfo->storageAPI.tqReaderFn; SWalReader* pWalReader = pReaderAPI->tqReaderGetWalReader(pInfo->tqReader); walReaderVerifyOffset(pWalReader, pOffset); - if (pReaderAPI->tqReaderSeek(pInfo->tqReader, pOffset->version + 1, id) < 0) { - qError("tqReaderSeek failed ver:%" PRId64 ", %s", pOffset->version + 1, id); + if (pReaderAPI->tqReaderSeek(pInfo->tqReader, pOffset->version, id) < 0) { + qError("tqReaderSeek failed ver:%" PRId64 ", %s", pOffset->version, id); return -1; } } else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) { @@ -1202,7 +1202,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT SOperatorInfo* p = extractOperatorInTree(pOperator, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, id); STableListInfo* pTableListInfo = ((SStreamRawScanInfo*)(p->info))->pTableListInfo; - if (pAPI->snapshotFn.createSnapshot(sContext, pOffset->uid) != 0) { + if (pAPI->snapshotFn.setForSnapShot(sContext, pOffset->uid) != 0) { qError("setDataForSnapShot error. uid:%" PRId64 " , %s", pOffset->uid, id); terrno = TSDB_CODE_PAR_INTERNAL_ERROR; return -1; @@ -1239,7 +1239,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT } else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_META) { SStreamRawScanInfo* pInfo = pOperator->info; SSnapContext* sContext = pInfo->sContext; - if (pTaskInfo->storageAPI.snapshotFn.createSnapshot(sContext, pOffset->uid) != 0) { + if (pTaskInfo->storageAPI.snapshotFn.setForSnapShot(sContext, pOffset->uid) != 0) { qError("setForSnapShot error. uid:%" PRIu64 " ,version:%" PRId64, pOffset->uid, pOffset->version); terrno = TSDB_CODE_PAR_INTERNAL_ERROR; return -1; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index c3d5de572f..b3a9699718 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1644,12 +1644,13 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { pAPI->tsdReader.tsdReaderClose(pTSInfo->base.dataReader); pTSInfo->base.dataReader = NULL; - qDebug("queue scan tsdb over, switch to wal ver %" PRId64 "", pTaskInfo->streamInfo.snapshotVer + 1); - if (pAPI->tqReaderFn.tqReaderSeek(pInfo->tqReader, pTaskInfo->streamInfo.snapshotVer + 1, pTaskInfo->id.str) < 0) { + int64_t validVer = pTaskInfo->streamInfo.snapshotVer + 1; + qDebug("queue scan tsdb over, switch to wal ver %" PRId64 "", validVer); + if (pAPI->tqReaderFn.tqReaderSeek(pInfo->tqReader, validVer, pTaskInfo->id.str) < 0) { return NULL; } - tqOffsetResetToLog(&pTaskInfo->streamInfo.currentOffset, pTaskInfo->streamInfo.snapshotVer); + tqOffsetResetToLog(&pTaskInfo->streamInfo.currentOffset, validVer); } if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__LOG) { @@ -1660,8 +1661,8 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { SSDataBlock* pRes = pAPI->tqReaderFn.tqGetResultBlock(pInfo->tqReader); struct SWalReader* pWalReader = pAPI->tqReaderFn.tqReaderGetWalReader(pInfo->tqReader); - // curVersion move to next, so currentOffset = curVersion - 1 - tqOffsetResetToLog(&pTaskInfo->streamInfo.currentOffset, pWalReader->curVersion - 1); + // curVersion move to next + tqOffsetResetToLog(&pTaskInfo->streamInfo.currentOffset, pWalReader->curVersion); if (hasResult) { qDebug("doQueueScan get data from log %" PRId64 " rows, version:%" PRId64, pRes->info.rows, @@ -2182,7 +2183,7 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) { STqOffsetVal offset = {0}; if (mtInfo.uid == 0) { // read snapshot done, change to get data from wal qDebug("tmqsnap read snapshot done, change to get data from wal"); - tqOffsetResetToLog(&offset, pInfo->sContext->snapVersion); + tqOffsetResetToLog(&offset, pInfo->sContext->snapVersion + 1); } else { tqOffsetResetToData(&offset, mtInfo.uid, INT64_MIN); qDebug("tmqsnap change get data uid:%" PRId64 "", mtInfo.uid); diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index 786f48ce88..a839d6cbd8 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -135,8 +135,8 @@ void walReaderVerifyOffset(SWalReader *pWalReader, STqOffsetVal* pOffset){ int64_t firstVer = walGetFirstVer((pWalReader)->pWal); taosThreadMutexUnlock(&pWalReader->pWal->mutex); - if (pOffset->version + 1 < firstVer){ - pOffset->version = firstVer - 1; + if (pOffset->version < firstVer){ + pOffset->version = firstVer; } }