Merge branch 'main' of https://github.com/taosdata/TDengine into main

This commit is contained in:
jiajingbin 2023-07-07 19:22:34 +08:00
commit 7a48d05f48
7 changed files with 60 additions and 83 deletions

View File

@ -283,6 +283,8 @@ Provides dnode configuration information.
| 2 | consumer_group | BINARY(193) | Subscribed consumer group | | 2 | consumer_group | BINARY(193) | Subscribed consumer group |
| 3 | vgroup_id | INT | Vgroup ID for the consumer | | 3 | vgroup_id | INT | Vgroup ID for the consumer |
| 4 | consumer_id | BIGINT | Consumer ID | | 4 | consumer_id | BIGINT | Consumer ID |
| 5 | offset | BINARY(64) | Consumption progress |
| 6 | rows | BIGINT | Number of consumption items |
## INS_STREAMS ## INS_STREAMS

View File

@ -284,6 +284,8 @@ TDengine 内置了一个名为 `INFORMATION_SCHEMA` 的数据库,提供对数
| 2 | consumer_group | BINARY(193) | 订阅者的消费者组 | | 2 | consumer_group | BINARY(193) | 订阅者的消费者组 |
| 3 | vgroup_id | INT | 消费者被分配的 vgroup id | | 3 | vgroup_id | INT | 消费者被分配的 vgroup id |
| 4 | consumer_id | BIGINT | 消费者的唯一 id | | 4 | consumer_id | BIGINT | 消费者的唯一 id |
| 5 | offset | BINARY(64) | 消费者的消费进度 |
| 6 | rows | BIGINT | 消费者的消费的数据条数 |
## INS_STREAMS ## INS_STREAMS

View File

@ -87,7 +87,7 @@ os.system("rm -rf /tmp/dumpdata/*")
# dump data out # dump data out
print("taosdump dump out data") print("taosdump dump out data")
os.system("taosdump -o /tmp/dumpdata -D test -y -h %s "%serverHost) os.system("taosdump -o /tmp/dumpdata -D test -h %s "%serverHost)
# drop database of test # drop database of test
print("drop database test") print("drop database test")
@ -95,7 +95,7 @@ os.system(" taos -s ' drop database test ;' -h %s "%serverHost)
# dump data in # dump data in
print("taosdump dump data in") print("taosdump dump data in")
os.system("taosdump -i /tmp/dumpdata -y -h %s "%serverHost) os.system("taosdump -i /tmp/dumpdata -h %s "%serverHost)
result = conn.query("SELECT count(*) from test.meters") result = conn.query("SELECT count(*) from test.meters")

View File

@ -152,7 +152,7 @@ function wgetFile {
file=$1 file=$1
versionPath=$2 versionPath=$2
sourceP=$3 sourceP=$3
nasServerIP="192.168.1.131" nasServerIP="192.168.1.213"
packagePath="/nas/TDengine/v${versionPath}/${verMode}" packagePath="/nas/TDengine/v${versionPath}/${verMode}"
if [ -f ${file} ];then if [ -f ${file} ];then
echoColor YD "${file} already exists ,it will delete it and download it again " echoColor YD "${file} already exists ,it will delete it and download it again "

View File

@ -1877,6 +1877,23 @@ static int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* p
return 0; return 0;
} }
static void updateVgInfo(SMqClientVg* pVg, STqOffsetVal* offset, int64_t sver, int64_t ever, int64_t consumerId){
if (!pVg->seekUpdated) {
tscDebug("consumer:0x%" PRIx64" local offset is update, since seekupdate not set", consumerId);
pVg->offsetInfo.currentOffset = *offset;
} else {
tscDebug("consumer:0x%" PRIx64" local offset is NOT update, since seekupdate is set", consumerId);
}
// update the status
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
// update the valid wal version range
pVg->offsetInfo.walVerBegin = sver;
pVg->offsetInfo.walVerEnd = ever;
pVg->receivedInfoFromVnode = true;
}
static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
tscDebug("consumer:0x%" PRIx64 " start to handle the rsp, total:%d", tmq->consumerId, tmq->qall->numOfItems); tscDebug("consumer:0x%" PRIx64 " start to handle the rsp, total:%d", tmq->consumerId, tmq->qall->numOfItems);
@ -1925,22 +1942,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
pVg->epSet = *pollRspWrapper->pEpset; pVg->epSet = *pollRspWrapper->pEpset;
} }
// update the local offset value only for the returned values, only when the local offset is NOT updated updateVgInfo(pVg, &pDataRsp->rspOffset, pDataRsp->head.walsver, pDataRsp->head.walever, tmq->consumerId);
// by tmq_offset_seek function
if (!pVg->seekUpdated) {
tscDebug("consumer:0x%" PRIx64" local offset is update, since seekupdate not set", tmq->consumerId);
pVg->offsetInfo.currentOffset = pDataRsp->rspOffset;
} else {
tscDebug("consumer:0x%" PRIx64" local offset is NOT update, since seekupdate is set", tmq->consumerId);
}
// update the status
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
// update the valid wal version range
pVg->offsetInfo.walVerBegin = pDataRsp->head.walsver;
pVg->offsetInfo.walVerEnd = pDataRsp->head.walever;
pVg->receivedInfoFromVnode = true;
char buf[TSDB_OFFSET_LEN]; char buf[TSDB_OFFSET_LEN];
tFormatOffset(buf, TSDB_OFFSET_LEN, &pDataRsp->rspOffset); tFormatOffset(buf, TSDB_OFFSET_LEN, &pDataRsp->rspOffset);
@ -1990,11 +1992,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
return NULL; return NULL;
} }
if(pollRspWrapper->metaRsp.rspOffset.type != 0){ // if offset is validate updateVgInfo(pVg, &pollRspWrapper->metaRsp.rspOffset, pollRspWrapper->metaRsp.head.walsver, pollRspWrapper->metaRsp.head.walever, tmq->consumerId);
pVg->offsetInfo.currentOffset = pollRspWrapper->metaRsp.rspOffset;
}
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
// build rsp // build rsp
SMqMetaRspObj* pRsp = tmqBuildMetaRspFromWrapper(pollRspWrapper); SMqMetaRspObj* pRsp = tmqBuildMetaRspFromWrapper(pollRspWrapper);
taosFreeQitem(pollRspWrapper); taosFreeQitem(pollRspWrapper);
@ -2022,18 +2020,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
return NULL; return NULL;
} }
// update the local offset value only for the returned values, only when the local offset is NOT updated updateVgInfo(pVg, &pollRspWrapper->taosxRsp.rspOffset, pollRspWrapper->taosxRsp.head.walsver, pollRspWrapper->taosxRsp.head.walever, tmq->consumerId);
// by tmq_offset_seek function
if (!pVg->seekUpdated) {
if(pollRspWrapper->taosxRsp.rspOffset.type != 0) { // if offset is validate
tscDebug("consumer:0x%" PRIx64" local offset is update, since seekupdate not set", tmq->consumerId);
pVg->offsetInfo.currentOffset = pollRspWrapper->taosxRsp.rspOffset;
}
} else {
tscDebug("consumer:0x%" PRIx64" local offset is NOT update, since seekupdate is set", tmq->consumerId);
}
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
if (pollRspWrapper->taosxRsp.blockNum == 0) { if (pollRspWrapper->taosxRsp.blockNum == 0) {
tscDebug("consumer:0x%" PRIx64 " taosx empty block received, vgId:%d, vg total:%" PRId64 ", reqId:0x%" PRIx64, tscDebug("consumer:0x%" PRIx64 " taosx empty block received, vgId:%d, vg total:%" PRId64 ", reqId:0x%" PRIx64,
@ -2615,6 +2602,8 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
pAssignment->begin = pClientVg->offsetInfo.walVerBegin; pAssignment->begin = pClientVg->offsetInfo.walVerBegin;
pAssignment->end = pClientVg->offsetInfo.walVerEnd; pAssignment->end = pClientVg->offsetInfo.walVerEnd;
pAssignment->vgId = pClientVg->vgId; pAssignment->vgId = pClientVg->vgId;
tscInfo("consumer:0x%" PRIx64 " get assignment from local:%d->%" PRId64, tmq->consumerId,
pAssignment->vgId, pAssignment->currentOffset);
} }
if (needFetch) { if (needFetch) {
@ -2718,17 +2707,17 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
SVgOffsetInfo* pOffsetInfo = &pClientVg->offsetInfo; SVgOffsetInfo* pOffsetInfo = &pClientVg->offsetInfo;
pOffsetInfo->currentOffset.type = TMQ_OFFSET__LOG; // pOffsetInfo->currentOffset.type = TMQ_OFFSET__LOG;
char offsetBuf[TSDB_OFFSET_LEN] = {0}; // char offsetBuf[TSDB_OFFSET_LEN] = {0};
tFormatOffset(offsetBuf, tListLen(offsetBuf), &pOffsetInfo->currentOffset); // tFormatOffset(offsetBuf, tListLen(offsetBuf), &pOffsetInfo->currentOffset);
tscInfo("vgId:%d offset is update to:%s", p->vgId, offsetBuf); tscInfo("vgId:%d offset is old to:%"PRId64, p->vgId, p->currentOffset);
pOffsetInfo->walVerBegin = p->begin; pOffsetInfo->walVerBegin = p->begin;
pOffsetInfo->walVerEnd = p->end; pOffsetInfo->walVerEnd = p->end;
pOffsetInfo->currentOffset.version = p->currentOffset; // pOffsetInfo->currentOffset.version = p->currentOffset;
pOffsetInfo->committedOffset.version = p->currentOffset; // pOffsetInfo->committedOffset.version = p->currentOffset;
} }
} }
} }

View File

@ -275,7 +275,7 @@ static void doAddNewConsumers(SMqRebOutputObj *pOutput, const SMqRebInputObj *pI
taosHashPut(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t), &newConsumerEp, sizeof(SMqConsumerEp)); taosHashPut(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t), &newConsumerEp, sizeof(SMqConsumerEp));
taosArrayPush(pOutput->newConsumers, &consumerId); taosArrayPush(pOutput->newConsumers, &consumerId);
mInfo("sub:%s mq rebalance add new consumer:%" PRIx64, pSubKey, consumerId); mInfo("sub:%s mq rebalance add new consumer:0x%" PRIx64, pSubKey, consumerId);
} }
} }

View File

@ -560,18 +560,6 @@ int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) {
SMqDataRsp dataRsp = {0}; SMqDataRsp dataRsp = {0};
tqInitDataRsp(&dataRsp, &req); tqInitDataRsp(&dataRsp, &req);
STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, req.subKey);
if (pOffset != NULL) {
if (pOffset->val.type != TMQ_OFFSET__LOG) {
tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s use snapshot, no valid wal info", consumerId, vgId, req.subKey);
terrno = TSDB_CODE_INVALID_PARA;
tDeleteMqDataRsp(&dataRsp);
return -1;
}
dataRsp.rspOffset.type = TMQ_OFFSET__LOG;
dataRsp.rspOffset.version = pOffset->val.version;
} else {
if (req.useSnapshot == true) { if (req.useSnapshot == true) {
tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s snapshot not support wal info", consumerId, vgId, req.subKey); tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s snapshot not support wal info", consumerId, vgId, req.subKey);
terrno = TSDB_CODE_INVALID_PARA; terrno = TSDB_CODE_INVALID_PARA;
@ -582,12 +570,7 @@ int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) {
dataRsp.rspOffset.type = TMQ_OFFSET__LOG; dataRsp.rspOffset.type = TMQ_OFFSET__LOG;
if (reqOffset.type == TMQ_OFFSET__LOG) { if (reqOffset.type == TMQ_OFFSET__LOG) {
int64_t currentVer = walReaderGetCurrentVer(pHandle->execHandle.pTqReader->pWalReader);
if (currentVer == -1) { // not start to read data from wal yet, return req offset directly
dataRsp.rspOffset.version = reqOffset.version; dataRsp.rspOffset.version = reqOffset.version;
} else {
dataRsp.rspOffset.version = currentVer; // return current consume offset value
}
} else if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEST) { } else if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEST) {
dataRsp.rspOffset.version = sver; // not consume yet, set the earliest position dataRsp.rspOffset.version = sver; // not consume yet, set the earliest position
} else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) { } else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
@ -599,9 +582,10 @@ int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) {
tDeleteMqDataRsp(&dataRsp); tDeleteMqDataRsp(&dataRsp);
return -1; return -1;
} }
} tqInfo("consumer:0x%" PRIx64 " vgId:%d subkey:%s get assignment from init:%"PRId64, consumerId, vgId, req.subKey, dataRsp.rspOffset.version);
tqDoSendDataRsp(&pMsg->info, &dataRsp, req.epoch, req.consumerId, TMQ_MSG_TYPE__WALINFO_RSP, sver, ever); tqDoSendDataRsp(&pMsg->info, &dataRsp, req.epoch, req.consumerId, TMQ_MSG_TYPE__WALINFO_RSP, sver, ever);
tDeleteMqDataRsp(&dataRsp);
return 0; return 0;
} }