diff --git a/docs/en/12-taos-sql/22-meta.md b/docs/en/12-taos-sql/22-meta.md index 4123bdfb58..f165470d10 100644 --- a/docs/en/12-taos-sql/22-meta.md +++ b/docs/en/12-taos-sql/22-meta.md @@ -283,6 +283,8 @@ Provides dnode configuration information. | 2 | consumer_group | BINARY(193) | Subscribed consumer group | | 3 | vgroup_id | INT | Vgroup ID for the consumer | | 4 | consumer_id | BIGINT | Consumer ID | +| 5 | offset | BINARY(64) | Consumption progress | +| 6 | rows | BIGINT | Number of consumption items | ## INS_STREAMS diff --git a/docs/zh/12-taos-sql/22-meta.md b/docs/zh/12-taos-sql/22-meta.md index 3fffbd0706..fe8d6d4c69 100644 --- a/docs/zh/12-taos-sql/22-meta.md +++ b/docs/zh/12-taos-sql/22-meta.md @@ -284,6 +284,8 @@ TDengine 内置了一个名为 `INFORMATION_SCHEMA` 的数据库,提供对数 | 2 | consumer_group | BINARY(193) | 订阅者的消费者组 | | 3 | vgroup_id | INT | 消费者被分配的 vgroup id | | 4 | consumer_id | BIGINT | 消费者的唯一 id | +| 5 | offset | BINARY(64) | 消费者的消费进度 | +| 6 | rows | BIGINT | 消费者的消费的数据条数 | ## INS_STREAMS diff --git a/packaging/checkPackageRuning.py b/packaging/checkPackageRuning.py index 96e2378fb3..914ee83f29 100755 --- a/packaging/checkPackageRuning.py +++ b/packaging/checkPackageRuning.py @@ -87,7 +87,7 @@ os.system("rm -rf /tmp/dumpdata/*") # dump data out print("taosdump dump out data") -os.system("taosdump -o /tmp/dumpdata -D test -y -h %s "%serverHost) +os.system("taosdump -o /tmp/dumpdata -D test -h %s "%serverHost) # drop database of test print("drop database test") @@ -95,7 +95,7 @@ os.system(" taos -s ' drop database test ;' -h %s "%serverHost) # dump data in print("taosdump dump data in") -os.system("taosdump -i /tmp/dumpdata -y -h %s "%serverHost) +os.system("taosdump -i /tmp/dumpdata -h %s "%serverHost) result = conn.query("SELECT count(*) from test.meters") diff --git a/packaging/testpackage.sh b/packaging/testpackage.sh index 081383f89b..0622b01f2b 100755 --- a/packaging/testpackage.sh +++ b/packaging/testpackage.sh @@ -152,7 +152,7 @@ function wgetFile { file=$1 versionPath=$2 sourceP=$3 -nasServerIP="192.168.1.131" +nasServerIP="192.168.1.213" packagePath="/nas/TDengine/v${versionPath}/${verMode}" if [ -f ${file} ];then echoColor YD "${file} already exists ,it will delete it and download it again " diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 9fc25be2e7..3fbde36e89 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1877,6 +1877,23 @@ static int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* p return 0; } +static void updateVgInfo(SMqClientVg* pVg, STqOffsetVal* offset, int64_t sver, int64_t ever, int64_t consumerId){ + if (!pVg->seekUpdated) { + tscDebug("consumer:0x%" PRIx64" local offset is update, since seekupdate not set", consumerId); + pVg->offsetInfo.currentOffset = *offset; + } else { + tscDebug("consumer:0x%" PRIx64" local offset is NOT update, since seekupdate is set", consumerId); + } + + // update the status + atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); + + // update the valid wal version range + pVg->offsetInfo.walVerBegin = sver; + pVg->offsetInfo.walVerEnd = ever; + pVg->receivedInfoFromVnode = true; +} + static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { tscDebug("consumer:0x%" PRIx64 " start to handle the rsp, total:%d", tmq->consumerId, tmq->qall->numOfItems); @@ -1925,22 +1942,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { pVg->epSet = *pollRspWrapper->pEpset; } - // update the local offset value only for the returned values, only when the local offset is NOT updated - // by tmq_offset_seek function - if (!pVg->seekUpdated) { - tscDebug("consumer:0x%" PRIx64" local offset is update, since seekupdate not set", tmq->consumerId); - pVg->offsetInfo.currentOffset = pDataRsp->rspOffset; - } else { - tscDebug("consumer:0x%" PRIx64" local offset is NOT update, since seekupdate is set", tmq->consumerId); - } - - // update the status - atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); - - // update the valid wal version range - pVg->offsetInfo.walVerBegin = pDataRsp->head.walsver; - pVg->offsetInfo.walVerEnd = pDataRsp->head.walever; - pVg->receivedInfoFromVnode = true; + updateVgInfo(pVg, &pDataRsp->rspOffset, pDataRsp->head.walsver, pDataRsp->head.walever, tmq->consumerId); char buf[TSDB_OFFSET_LEN]; tFormatOffset(buf, TSDB_OFFSET_LEN, &pDataRsp->rspOffset); @@ -1990,11 +1992,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { return NULL; } - if(pollRspWrapper->metaRsp.rspOffset.type != 0){ // if offset is validate - pVg->offsetInfo.currentOffset = pollRspWrapper->metaRsp.rspOffset; - } - - atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); + updateVgInfo(pVg, &pollRspWrapper->metaRsp.rspOffset, pollRspWrapper->metaRsp.head.walsver, pollRspWrapper->metaRsp.head.walever, tmq->consumerId); // build rsp SMqMetaRspObj* pRsp = tmqBuildMetaRspFromWrapper(pollRspWrapper); taosFreeQitem(pollRspWrapper); @@ -2022,18 +2020,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { return NULL; } - // update the local offset value only for the returned values, only when the local offset is NOT updated - // by tmq_offset_seek function - if (!pVg->seekUpdated) { - if(pollRspWrapper->taosxRsp.rspOffset.type != 0) { // if offset is validate - tscDebug("consumer:0x%" PRIx64" local offset is update, since seekupdate not set", tmq->consumerId); - pVg->offsetInfo.currentOffset = pollRspWrapper->taosxRsp.rspOffset; - } - } else { - tscDebug("consumer:0x%" PRIx64" local offset is NOT update, since seekupdate is set", tmq->consumerId); - } - - atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); + updateVgInfo(pVg, &pollRspWrapper->taosxRsp.rspOffset, pollRspWrapper->taosxRsp.head.walsver, pollRspWrapper->taosxRsp.head.walever, tmq->consumerId); if (pollRspWrapper->taosxRsp.blockNum == 0) { tscDebug("consumer:0x%" PRIx64 " taosx empty block received, vgId:%d, vg total:%" PRId64 ", reqId:0x%" PRIx64, @@ -2615,6 +2602,8 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a pAssignment->begin = pClientVg->offsetInfo.walVerBegin; pAssignment->end = pClientVg->offsetInfo.walVerEnd; pAssignment->vgId = pClientVg->vgId; + tscInfo("consumer:0x%" PRIx64 " get assignment from local:%d->%" PRId64, tmq->consumerId, + pAssignment->vgId, pAssignment->currentOffset); } if (needFetch) { @@ -2690,7 +2679,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pClientVg->offsetInfo.currentOffset); tscInfo("consumer:0x%" PRIx64 " %s retrieve wal info vgId:%d, epoch %d, req:%s, reqId:0x%" PRIx64, - tmq->consumerId, pTopic->topicName, pClientVg->vgId, tmq->epoch, offsetFormatBuf, req.reqId); + tmq->consumerId, pTopic->topicName, pClientVg->vgId, tmq->epoch, offsetFormatBuf, req.reqId); asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pClientVg->epSet, &transporterId, sendInfo); } @@ -2718,17 +2707,17 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a SVgOffsetInfo* pOffsetInfo = &pClientVg->offsetInfo; - pOffsetInfo->currentOffset.type = TMQ_OFFSET__LOG; +// pOffsetInfo->currentOffset.type = TMQ_OFFSET__LOG; - char offsetBuf[TSDB_OFFSET_LEN] = {0}; - tFormatOffset(offsetBuf, tListLen(offsetBuf), &pOffsetInfo->currentOffset); +// char offsetBuf[TSDB_OFFSET_LEN] = {0}; +// tFormatOffset(offsetBuf, tListLen(offsetBuf), &pOffsetInfo->currentOffset); - tscInfo("vgId:%d offset is update to:%s", p->vgId, offsetBuf); + tscInfo("vgId:%d offset is old to:%"PRId64, p->vgId, p->currentOffset); pOffsetInfo->walVerBegin = p->begin; pOffsetInfo->walVerEnd = p->end; - pOffsetInfo->currentOffset.version = p->currentOffset; - pOffsetInfo->committedOffset.version = p->currentOffset; +// pOffsetInfo->currentOffset.version = p->currentOffset; +// pOffsetInfo->committedOffset.version = p->currentOffset; } } } diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 7ecd994b5a..48de21199b 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -275,7 +275,7 @@ static void doAddNewConsumers(SMqRebOutputObj *pOutput, const SMqRebInputObj *pI taosHashPut(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t), &newConsumerEp, sizeof(SMqConsumerEp)); taosArrayPush(pOutput->newConsumers, &consumerId); - mInfo("sub:%s mq rebalance add new consumer:%" PRIx64, pSubKey, consumerId); + mInfo("sub:%s mq rebalance add new consumer:0x%" PRIx64, pSubKey, consumerId); } } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index a293858207..2d757fff08 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -560,48 +560,32 @@ int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) { SMqDataRsp dataRsp = {0}; tqInitDataRsp(&dataRsp, &req); - STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, req.subKey); - if (pOffset != NULL) { - if (pOffset->val.type != TMQ_OFFSET__LOG) { - tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s use snapshot, no valid wal info", consumerId, vgId, req.subKey); - terrno = TSDB_CODE_INVALID_PARA; - tDeleteMqDataRsp(&dataRsp); - return -1; - } - - dataRsp.rspOffset.type = TMQ_OFFSET__LOG; - dataRsp.rspOffset.version = pOffset->val.version; - } else { - if (req.useSnapshot == true) { - tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s snapshot not support wal info", consumerId, vgId, req.subKey); - terrno = TSDB_CODE_INVALID_PARA; - tDeleteMqDataRsp(&dataRsp); - return -1; - } - - dataRsp.rspOffset.type = TMQ_OFFSET__LOG; - - if (reqOffset.type == TMQ_OFFSET__LOG) { - int64_t currentVer = walReaderGetCurrentVer(pHandle->execHandle.pTqReader->pWalReader); - if (currentVer == -1) { // not start to read data from wal yet, return req offset directly - dataRsp.rspOffset.version = reqOffset.version; - } else { - dataRsp.rspOffset.version = currentVer; // return current consume offset value - } - } else if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEST) { - dataRsp.rspOffset.version = sver; // not consume yet, set the earliest position - } else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) { - dataRsp.rspOffset.version = ever; - } else { - tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s invalid offset type:%d", consumerId, vgId, req.subKey, - reqOffset.type); - terrno = TSDB_CODE_INVALID_PARA; - tDeleteMqDataRsp(&dataRsp); - return -1; - } + if (req.useSnapshot == true) { + tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s snapshot not support wal info", consumerId, vgId, req.subKey); + terrno = TSDB_CODE_INVALID_PARA; + tDeleteMqDataRsp(&dataRsp); + return -1; } + dataRsp.rspOffset.type = TMQ_OFFSET__LOG; + + if (reqOffset.type == TMQ_OFFSET__LOG) { + dataRsp.rspOffset.version = reqOffset.version; + } else if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEST) { + dataRsp.rspOffset.version = sver; // not consume yet, set the earliest position + } else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) { + dataRsp.rspOffset.version = ever; + } else { + tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s invalid offset type:%d", consumerId, vgId, req.subKey, + reqOffset.type); + terrno = TSDB_CODE_INVALID_PARA; + tDeleteMqDataRsp(&dataRsp); + return -1; + } + tqInfo("consumer:0x%" PRIx64 " vgId:%d subkey:%s get assignment from init:%"PRId64, consumerId, vgId, req.subKey, dataRsp.rspOffset.version); + tqDoSendDataRsp(&pMsg->info, &dataRsp, req.epoch, req.consumerId, TMQ_MSG_TYPE__WALINFO_RSP, sver, ever); + tDeleteMqDataRsp(&dataRsp); return 0; }