From a656d75ca79186cfb8415e681ca6177da4dd7756 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Mon, 5 Feb 2024 17:07:50 +0800 Subject: [PATCH 1/3] opti:consume data excluded from some source --- include/common/tmsg.h | 7 ++++++- include/libs/executor/executor.h | 2 ++ source/client/src/clientTmq.c | 8 ++++++++ source/common/src/tmsg.c | 12 ++++++++++++ source/dnode/vnode/inc/vnode.h | 2 +- source/dnode/vnode/src/inc/tq.h | 2 +- source/dnode/vnode/src/sma/smaTimeRange.c | 2 +- source/dnode/vnode/src/tq/tqRead.c | 7 +++++-- source/dnode/vnode/src/tq/tqScan.c | 11 ++++++++++- source/dnode/vnode/src/tq/tqSink.c | 4 ++-- source/dnode/vnode/src/tq/tqUtil.c | 2 +- source/libs/executor/inc/querytask.h | 1 + source/libs/executor/src/executor.c | 5 +++++ source/libs/executor/src/scanoperator.c | 2 +- source/libs/parser/src/parInsertUtil.c | 15 +++++++++------ 15 files changed, 65 insertions(+), 17 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index c314d82036..9af68543da 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -3603,6 +3603,7 @@ typedef struct { int64_t timeout; STqOffsetVal reqOffset; int8_t enableReplay; + int8_t sourceExcluded; } SMqPollReq; int32_t tSerializeSMqPollReq(void* buf, int32_t bufLen, SMqPollReq* pReq); @@ -3891,6 +3892,9 @@ int32_t tDeserializeSMqSeekReq(void* buf, int32_t bufLen, SMqSeekReq* pReq); #define SUBMIT_REQ_AUTO_CREATE_TABLE 0x1 #define SUBMIT_REQ_COLUMN_DATA_FORMAT 0x2 +#define SOURCE_NULL 0 +#define SOURCE_TAOSX 1 + typedef struct { int32_t flags; SVCreateTbReq* pCreateTbReq; @@ -3901,7 +3905,8 @@ typedef struct { SArray* aRowP; SArray* aCol; }; - int64_t ctimeMs; + int64_t ctimeMs; + int8_t source; } SSubmitTbData; typedef struct { diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index f78b7a3126..e06a08acba 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -197,6 +197,8 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT void qStreamSetOpen(qTaskInfo_t tinfo); +void qStreamSetSourceExcluded(qTaskInfo_t tinfo, int8_t sourceExcluded); + void qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset); SMqMetaRsp* qStreamExtractMetaMsg(qTaskInfo_t tinfo); diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 15c8903978..96d527d299 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -63,6 +63,7 @@ struct tmq_conf_t { int8_t withTbName; int8_t snapEnable; int8_t replayEnable; + int8_t sourceExcluded; // do not consume, bit uint16_t port; int32_t autoCommitInterval; char* ip; @@ -82,6 +83,7 @@ struct tmq_t { int32_t autoCommitInterval; int8_t resetOffsetCfg; int8_t replayEnable; + int8_t sourceExcluded; // do not consume, bit uint64_t consumerId; tmq_commit_cb* commitCb; void* commitCbUserParam; @@ -384,6 +386,10 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value return TMQ_CONF_INVALID; } } + if (strcasecmp(key, "msg.consume.excluded") == 0) { + conf->sourceExcluded = taosStr2int64(value); + return TMQ_CONF_OK; + } if (strcasecmp(key, "td.connect.db") == 0) { return TMQ_CONF_OK; @@ -1081,6 +1087,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { pTmq->commitCbUserParam = conf->commitCbUserParam; pTmq->resetOffsetCfg = conf->resetOffset; pTmq->replayEnable = conf->replayEnable; + pTmq->sourceExcluded = conf->sourceExcluded; if(conf->replayEnable){ pTmq->autoCommit = false; } @@ -1549,6 +1556,7 @@ void tmqBuildConsumeReqImpl(SMqPollReq* pReq, tmq_t* tmq, int64_t timeout, SMqCl pReq->useSnapshot = tmq->useSnapshot; pReq->reqId = generateRequestId(); pReq->enableReplay = tmq->replayEnable; + pReq->sourceExcluded = tmq->sourceExcluded; } SMqMetaRspObj* tmqBuildMetaRspFromWrapper(SMqPollRspWrapper* pWrapper) { diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index c9e2908e8a..fa696713d6 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -6516,6 +6516,7 @@ int32_t tSerializeSMqPollReq(void *buf, int32_t bufLen, SMqPollReq *pReq) { if (tEncodeI64(&encoder, pReq->timeout) < 0) return -1; if (tSerializeSTqOffsetVal(&encoder, &pReq->reqOffset) < 0) return -1; if (tEncodeI8(&encoder, pReq->enableReplay) < 0) return -1; + if (tEncodeI8(&encoder, pReq->sourceExcluded) < 0) return -1; tEndEncode(&encoder); @@ -6556,6 +6557,10 @@ int32_t tDeserializeSMqPollReq(void *buf, int32_t bufLen, SMqPollReq *pReq) { if (tDecodeI8(&decoder, &pReq->enableReplay) < 0) return -1; } + if (!tDecodeIsEnd(&decoder)) { + if (tDecodeI8(&decoder, &pReq->sourceExcluded) < 0) return -1; + } + tEndDecode(&decoder); tDecoderClear(&decoder); @@ -8578,6 +8583,7 @@ static int32_t tEncodeSSubmitTbData(SEncoder *pCoder, const SSubmitTbData *pSubm } } if (tEncodeI64(pCoder, pSubmitTbData->ctimeMs) < 0) return -1; + if (tEncodeI8(pCoder, pSubmitTbData->source) < 0) return -1; tEndEncode(pCoder); return 0; @@ -8665,6 +8671,12 @@ static int32_t tDecodeSSubmitTbData(SDecoder *pCoder, SSubmitTbData *pSubmitTbDa goto _exit; } } + if (!tDecodeIsEnd(pCoder)) { + if (tDecodeI8(pCoder, &pSubmitTbData->source) < 0) { + code = TSDB_CODE_INVALID_MSG; + goto _exit; + } + } tEndDecode(pCoder); diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 9a4e2edf8d..6bea092b5f 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -221,7 +221,7 @@ bool tqReaderIsQueriedTable(STqReader *pReader, uint64_t uid); bool tqCurrentBlockConsumed(const STqReader *pReader); int32_t tqReaderSeek(STqReader *pReader, int64_t ver, const char *id); -bool tqNextBlockInWal(STqReader *pReader, const char *idstr); +bool tqNextBlockInWal(STqReader *pReader, const char *idstr, int sourceExcluded); bool tqNextBlockImpl(STqReader *pReader, const char *idstr); SWalReader *tqGetWalReader(STqReader *pReader); SSDataBlock *tqGetResultBlock(STqReader *pReader); diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 0ef29fcb3a..ee527a8a6e 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -119,7 +119,7 @@ int32_t tqScanData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, uint64_t reqId); // tqExec -int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxRsp* pRsp, int32_t* totalRows); +int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxRsp* pRsp, int32_t* totalRows, int8_t sourceExcluded); int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols, int8_t precision); int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp, int32_t type, int32_t vgId); diff --git a/source/dnode/vnode/src/sma/smaTimeRange.c b/source/dnode/vnode/src/sma/smaTimeRange.c index 289986e01f..b2322260f0 100644 --- a/source/dnode/vnode/src/sma/smaTimeRange.c +++ b/source/dnode/vnode/src/sma/smaTimeRange.c @@ -193,7 +193,7 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema * continue; } - SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version, .flags = SUBMIT_REQ_AUTO_CREATE_TABLE,}; + SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version, .flags = SUBMIT_REQ_AUTO_CREATE_TABLE, .source = SOURCE_NULL}; int32_t cid = taosArrayGetSize(pDataBlock->pDataBlock) + 1; tbData.pCreateTbReq = buildAutoCreateTableReq(stbFullName, suid, cid, pDataBlock, tagArray); diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index bfa8cfdb53..b0295ba552 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -368,7 +368,7 @@ int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, int64_t maxVer, con } // todo ignore the error in wal? -bool tqNextBlockInWal(STqReader* pReader, const char* id) { +bool tqNextBlockInWal(STqReader* pReader, const char* id, int sourceExcluded) { SWalReader* pWalReader = pReader->pWalReader; SSDataBlock* pDataBlock = NULL; @@ -391,7 +391,10 @@ bool tqNextBlockInWal(STqReader* pReader, const char* id) { numOfBlocks, pReader->msg.msgLen, pReader->msg.ver); SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk); - + if ((pSubmitTbData->source & sourceExcluded) != 0){ + pReader->nextBlk += 1; + continue; + } if (pReader->tbIdHash == NULL || taosHashGet(pReader->tbIdHash, &pSubmitTbData->uid, sizeof(int64_t)) != NULL) { tqTrace("tq reader return submit block, uid:%" PRId64, pSubmitTbData->uid); SSDataBlock* pRes = NULL; diff --git a/source/dnode/vnode/src/tq/tqScan.c b/source/dnode/vnode/src/tq/tqScan.c index 01866ef893..738bd0a9dd 100644 --- a/source/dnode/vnode/src/tq/tqScan.c +++ b/source/dnode/vnode/src/tq/tqScan.c @@ -94,6 +94,7 @@ int32_t tqScanData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* return -1; } + qStreamSetSourceExcluded(task, pRequest->sourceExcluded); while (1) { SSDataBlock* pDataBlock = NULL; code = getDataBlock(task, pHandle, vgId, &pDataBlock); @@ -250,7 +251,7 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMeta return 0; } -int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxRsp* pRsp, int32_t* totalRows) { +int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxRsp* pRsp, int32_t* totalRows, int8_t sourceExcluded) { STqExecHandle* pExec = &pHandle->execHandle; SArray* pBlocks = taosArrayInit(0, sizeof(SSDataBlock)); SArray* pSchemas = taosArrayInit(0, sizeof(void*)); @@ -265,6 +266,10 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR if (tqRetrieveTaosxBlock(pReader, pBlocks, pSchemas, &pSubmitTbDataRet) < 0) { if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) goto loop_table; } + + if ((pSubmitTbDataRet->source & sourceExcluded) != 0){ + goto loop_table; + } if (pRsp->withTbName) { int64_t uid = pExec->pTqReader->lastBlkUid; if (tqAddTbNameToRsp(pTq, uid, pRsp, taosArrayGetSize(pBlocks)) < 0) { @@ -329,6 +334,10 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR if (tqRetrieveTaosxBlock(pReader, pBlocks, pSchemas, &pSubmitTbDataRet) < 0) { if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) goto loop_db; } + + if ((pSubmitTbDataRet->source & sourceExcluded) != 0){ + goto loop_db; + } if (pRsp->withTbName) { int64_t uid = pExec->pTqReader->lastBlkUid; if (tqAddTbNameToRsp(pTq, uid, pRsp, taosArrayGetSize(pBlocks)) < 0) { diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index c2e48d5d92..9645ab5c70 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -791,7 +791,7 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) { return; } - SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version}; + SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version, .source = SOURCE_NULL}; code = setDstTableDataUid(pVnode, pTask, pDataBlock, stbFullName, &tbData); if (code != TSDB_CODE_SUCCESS) { continue; @@ -835,7 +835,7 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) { pTask->execInfo.sink.numOfBlocks += 1; uint64_t groupId = pDataBlock->info.id.groupId; - SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version}; + SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version, .source = SOURCE_NULL}; int32_t* index = taosHashGet(pTableIndexMap, &groupId, sizeof(groupId)); if (index == NULL) { // no data yet, append it diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index d18455d221..a71b45e5d7 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -250,7 +250,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, .ver = pHead->version, }; - code = tqTaosxScanLog(pTq, pHandle, submit, &taosxRsp, &totalRows); + code = tqTaosxScanLog(pTq, pHandle, submit, &taosxRsp, &totalRows, pRequest->sourceExcluded); if (code < 0) { tqError("tmq poll: tqTaosxScanLog error %" PRId64 ", in vgId:%d, subkey %s", pRequest->consumerId, vgId, pRequest->subKey); diff --git a/source/libs/executor/inc/querytask.h b/source/libs/executor/inc/querytask.h index fcafd5a4e3..0c9a5e3197 100644 --- a/source/libs/executor/inc/querytask.h +++ b/source/libs/executor/inc/querytask.h @@ -59,6 +59,7 @@ typedef struct STaskStopInfo { typedef struct { STqOffsetVal currentOffset; // for tmq SMqMetaRsp metaRsp; // for tmq fetching meta + int8_t sourceExcluded; int64_t snapshotVer; SSchemaWrapper* schema; char tbName[TSDB_TABLE_NAME_LEN]; // this is the current scan table: todo refactor diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index fb39de484f..87c65b94a4 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -1148,6 +1148,11 @@ void qStreamSetOpen(qTaskInfo_t tinfo) { pOperator->status = OP_NOT_OPENED; } +void qStreamSetSourceExcluded(qTaskInfo_t tinfo, int8_t sourceExcluded) { + SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; + pTaskInfo->streamInfo.sourceExcluded = sourceExcluded; +} + int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subType) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; SStorageAPI* pAPI = &pTaskInfo->storageAPI; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 3ed5128858..c4c1b2c299 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1978,7 +1978,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__LOG) { while (1) { - bool hasResult = pAPI->tqReaderFn.tqReaderNextBlockInWal(pInfo->tqReader, id); + bool hasResult = pAPI->tqReaderFn.tqReaderNextBlockInWal(pInfo->tqReader, id, pTaskInfo->streamInfo.sourceExcluded); SSDataBlock* pRes = pAPI->tqReaderFn.tqGetResultBlock(pInfo->tqReader); struct SWalReader* pWalReader = pAPI->tqReaderFn.tqReaderGetWalReader(pInfo->tqReader); diff --git a/source/libs/parser/src/parInsertUtil.c b/source/libs/parser/src/parInsertUtil.c index a924ed68b0..ff9ece1659 100644 --- a/source/libs/parser/src/parInsertUtil.c +++ b/source/libs/parser/src/parInsertUtil.c @@ -211,6 +211,7 @@ static int32_t createTableDataCxt(STableMeta* pTableMeta, SVCreateTbReq** pCreat bool colMode, bool ignoreColVals) { STableDataCxt* pTableCxt = taosMemoryCalloc(1, sizeof(STableDataCxt)); if (NULL == pTableCxt) { + *pOutput = NULL; return TSDB_CODE_OUT_OF_MEMORY; } @@ -268,12 +269,8 @@ static int32_t createTableDataCxt(STableMeta* pTableMeta, SVCreateTbReq** pCreat } } - if (TSDB_CODE_SUCCESS == code) { - *pOutput = pTableCxt; - qDebug("tableDataCxt created, uid:%" PRId64 ", vgId:%d", pTableMeta->uid, pTableMeta->vgId); - } else { - taosMemoryFree(pTableCxt); - } + *pOutput = pTableCxt; + qDebug("tableDataCxt created, code:%d, uid:%" PRId64 ", vgId:%d", code, pTableMeta->uid, pTableMeta->vgId); return code; } @@ -288,6 +285,7 @@ static int32_t rebuildTableData(SSubmitTbData* pSrc, SSubmitTbData** pDst) { pTmp->suid = pSrc->suid; pTmp->uid = pSrc->uid; pTmp->sver = pSrc->sver; + pTmp->source = pSrc->source; pTmp->pCreateTbReq = NULL; if (pTmp->flags & SUBMIT_REQ_AUTO_CREATE_TABLE) { if (pSrc->pCreateTbReq) { @@ -344,6 +342,10 @@ int32_t insGetTableDataCxt(SHashObj* pHash, void* id, int32_t idLen, STableMeta* void* pData = *pTableCxt; // deal scan coverity code = taosHashPut(pHash, id, idLen, &pData, POINTER_BYTES); } + + if (TSDB_CODE_SUCCESS != code) { + insDestroyTableDataCxt(*pTableCxt); + } return code; } @@ -651,6 +653,7 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate goto end; } + pTableCxt->pData->source = SOURCE_TAOSX; if(tmp == NULL){ ret = initTableColSubmitData(pTableCxt); if (ret != TSDB_CODE_SUCCESS) { From 478e1a67ae34d5fcac76105eaa1d168924b78f45 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Tue, 6 Feb 2024 18:14:06 +0800 Subject: [PATCH 2/3] fix:[TD-28590]add logic for consume excluded --- include/common/tmsg.h | 1 + source/client/src/clientTmq.c | 39 +++--- source/common/src/tmsg.c | 2 + source/dnode/mnode/impl/src/mndDef.c | 136 +++++++-------------- source/dnode/mnode/impl/src/mndSubscribe.c | 86 ++++++------- tests/system-test/7-tmq/tmq_taosx.py | 40 ++++++ utils/test/c/tmq_taosx_ci.c | 85 +++++++++++++ 7 files changed, 234 insertions(+), 155 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 4731a30152..812d8a02a0 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -3768,6 +3768,7 @@ typedef struct { int32_t vgId; STqOffsetVal offset; int64_t rows; + int64_t ever; } OffsetRows; typedef struct { diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 0f3883d161..3c3aee3032 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -789,27 +789,26 @@ void tmqSendHbReq(void* param, void* tmrId) { req.consumerId = tmq->consumerId; req.epoch = tmq->epoch; taosRLockLatch(&tmq->lock); -// if(tmq->needReportOffsetRows){ - req.topics = taosArrayInit(taosArrayGetSize(tmq->clientTopics), sizeof(TopicOffsetRows)); - for(int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++){ - SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i); - int32_t numOfVgroups = taosArrayGetSize(pTopic->vgs); - TopicOffsetRows* data = taosArrayReserve(req.topics, 1); - strcpy(data->topicName, pTopic->topicName); - data->offsetRows = taosArrayInit(numOfVgroups, sizeof(OffsetRows)); - for(int j = 0; j < numOfVgroups; j++){ - SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j); - OffsetRows* offRows = taosArrayReserve(data->offsetRows, 1); - offRows->vgId = pVg->vgId; - offRows->rows = pVg->numOfRows; - offRows->offset = pVg->offsetInfo.beginOffset; - char buf[TSDB_OFFSET_LEN] = {0}; - tFormatOffset(buf, TSDB_OFFSET_LEN, &offRows->offset); - tscInfo("consumer:0x%" PRIx64 ",report offset: vgId:%d, offset:%s, rows:%"PRId64, tmq->consumerId, offRows->vgId, buf, offRows->rows); - } + req.topics = taosArrayInit(taosArrayGetSize(tmq->clientTopics), sizeof(TopicOffsetRows)); + for(int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++){ + SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i); + int32_t numOfVgroups = taosArrayGetSize(pTopic->vgs); + TopicOffsetRows* data = taosArrayReserve(req.topics, 1); + strcpy(data->topicName, pTopic->topicName); + data->offsetRows = taosArrayInit(numOfVgroups, sizeof(OffsetRows)); + for(int j = 0; j < numOfVgroups; j++){ + SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j); + OffsetRows* offRows = taosArrayReserve(data->offsetRows, 1); + offRows->vgId = pVg->vgId; + offRows->rows = pVg->numOfRows; + offRows->offset = pVg->offsetInfo.endOffset; + offRows->ever = pVg->offsetInfo.walVerEnd; + char buf[TSDB_OFFSET_LEN] = {0}; + tFormatOffset(buf, TSDB_OFFSET_LEN, &offRows->offset); + tscInfo("consumer:0x%" PRIx64 ",report offset, group:%s vgId:%d, offset:%s/%"PRId64", rows:%"PRId64, + tmq->consumerId, tmq->groupId, offRows->vgId, buf, offRows->ever, offRows->rows); } -// tmq->needReportOffsetRows = false; -// } + } taosRUnLockLatch(&tmq->lock); int32_t tlen = tSerializeSMqHbReq(NULL, 0, &req); diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index add1f12fc1..2ffa12f2c1 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -6252,6 +6252,7 @@ int32_t tSerializeSMqHbReq(void *buf, int32_t bufLen, SMqHbReq *pReq) { if (tEncodeI32(&encoder, offRows->vgId) < 0) return -1; if (tEncodeI64(&encoder, offRows->rows) < 0) return -1; if (tEncodeSTqOffsetVal(&encoder, &offRows->offset) < 0) return -1; + if (tEncodeI64(&encoder, offRows->ever) < 0) return -1; } } @@ -6289,6 +6290,7 @@ int32_t tDeserializeSMqHbReq(void *buf, int32_t bufLen, SMqHbReq *pReq) { if (tDecodeI32(&decoder, &offRows->vgId) < 0) return -1; if (tDecodeI64(&decoder, &offRows->rows) < 0) return -1; if (tDecodeSTqOffsetVal(&decoder, &offRows->offset) < 0) return -1; + if (tDecodeI64(&decoder, &offRows->ever) < 0) return -1; } } } diff --git a/source/dnode/mnode/impl/src/mndDef.c b/source/dnode/mnode/impl/src/mndDef.c index 172c3952ad..d59354286d 100644 --- a/source/dnode/mnode/impl/src/mndDef.c +++ b/source/dnode/mnode/impl/src/mndDef.c @@ -422,27 +422,12 @@ void *tDecodeSMqConsumerObj(const void *buf, SMqConsumerObj *pConsumer, int8_t s return (void *)buf; } -// SMqConsumerEp *tCloneSMqConsumerEp(const SMqConsumerEp *pConsumerEpOld) { -// SMqConsumerEp *pConsumerEpNew = taosMemoryMalloc(sizeof(SMqConsumerEp)); -// if (pConsumerEpNew == NULL) return NULL; -// pConsumerEpNew->consumerId = pConsumerEpOld->consumerId; -// pConsumerEpNew->vgs = taosArrayDup(pConsumerEpOld->vgs, NULL); -// return pConsumerEpNew; -// } -// -// void tDeleteSMqConsumerEp(void *data) { -// SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)data; -// taosArrayDestroy(pConsumerEp->vgs); -// } - -int32_t tEncodeSMqConsumerEp(void **buf, const SMqConsumerEp *pConsumerEp) { +int32_t tEncodeOffRows(void **buf, SArray *offsetRows){ int32_t tlen = 0; - tlen += taosEncodeFixedI64(buf, pConsumerEp->consumerId); - tlen += taosEncodeArray(buf, pConsumerEp->vgs, (FEncode)tEncodeSMqVgEp); - int32_t szVgs = taosArrayGetSize(pConsumerEp->offsetRows); + int32_t szVgs = taosArrayGetSize(offsetRows); tlen += taosEncodeFixedI32(buf, szVgs); for (int32_t j = 0; j < szVgs; ++j) { - OffsetRows *offRows = taosArrayGet(pConsumerEp->offsetRows, j); + OffsetRows *offRows = taosArrayGet(offsetRows, j); tlen += taosEncodeFixedI32(buf, offRows->vgId); tlen += taosEncodeFixedI64(buf, offRows->rows); tlen += taosEncodeFixedI8(buf, offRows->offset.type); @@ -454,53 +439,54 @@ int32_t tEncodeSMqConsumerEp(void **buf, const SMqConsumerEp *pConsumerEp) { } else { // do nothing } + tlen += taosEncodeFixedI64(buf, offRows->ever); } - // #if 0 - // int32_t sz = taosArrayGetSize(pConsumerEp->vgs); - // tlen += taosEncodeFixedI32(buf, sz); - // for (int32_t i = 0; i < sz; i++) { - // SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, i); - // tlen += tEncodeSMqVgEp(buf, pVgEp); - // } - // #endif + return tlen; } +int32_t tEncodeSMqConsumerEp(void **buf, const SMqConsumerEp *pConsumerEp) { + int32_t tlen = 0; + tlen += taosEncodeFixedI64(buf, pConsumerEp->consumerId); + tlen += taosEncodeArray(buf, pConsumerEp->vgs, (FEncode)tEncodeSMqVgEp); + + + return tlen + tEncodeOffRows(buf, pConsumerEp->offsetRows); +} + +void *tDecodeOffRows(const void *buf, SArray **offsetRows, int8_t sver){ + int32_t szVgs = 0; + buf = taosDecodeFixedI32(buf, &szVgs); + if (szVgs > 0) { + *offsetRows = taosArrayInit(szVgs, sizeof(OffsetRows)); + if (NULL == *offsetRows) return NULL; + for (int32_t j = 0; j < szVgs; ++j) { + OffsetRows *offRows = taosArrayReserve(*offsetRows, 1); + buf = taosDecodeFixedI32(buf, &offRows->vgId); + buf = taosDecodeFixedI64(buf, &offRows->rows); + buf = taosDecodeFixedI8(buf, &offRows->offset.type); + if (offRows->offset.type == TMQ_OFFSET__SNAPSHOT_DATA || offRows->offset.type == TMQ_OFFSET__SNAPSHOT_META) { + buf = taosDecodeFixedI64(buf, &offRows->offset.uid); + buf = taosDecodeFixedI64(buf, &offRows->offset.ts); + } else if (offRows->offset.type == TMQ_OFFSET__LOG) { + buf = taosDecodeFixedI64(buf, &offRows->offset.version); + } else { + // do nothing + } + if(sver > 2){ + buf = taosDecodeFixedI64(buf, &offRows->ever); + } + } + } + return (void *)buf; +} + void *tDecodeSMqConsumerEp(const void *buf, SMqConsumerEp *pConsumerEp, int8_t sver) { buf = taosDecodeFixedI64(buf, &pConsumerEp->consumerId); buf = taosDecodeArray(buf, &pConsumerEp->vgs, (FDecode)tDecodeSMqVgEp, sizeof(SMqVgEp), sver); if (sver > 1) { - int32_t szVgs = 0; - buf = taosDecodeFixedI32(buf, &szVgs); - if (szVgs > 0) { - pConsumerEp->offsetRows = taosArrayInit(szVgs, sizeof(OffsetRows)); - if (NULL == pConsumerEp->offsetRows) return NULL; - for (int32_t j = 0; j < szVgs; ++j) { - OffsetRows *offRows = taosArrayReserve(pConsumerEp->offsetRows, 1); - buf = taosDecodeFixedI32(buf, &offRows->vgId); - buf = taosDecodeFixedI64(buf, &offRows->rows); - buf = taosDecodeFixedI8(buf, &offRows->offset.type); - if (offRows->offset.type == TMQ_OFFSET__SNAPSHOT_DATA || offRows->offset.type == TMQ_OFFSET__SNAPSHOT_META) { - buf = taosDecodeFixedI64(buf, &offRows->offset.uid); - buf = taosDecodeFixedI64(buf, &offRows->offset.ts); - } else if (offRows->offset.type == TMQ_OFFSET__LOG) { - buf = taosDecodeFixedI64(buf, &offRows->offset.version); - } else { - // do nothing - } - } - } + buf = tDecodeOffRows(buf, &pConsumerEp->offsetRows, sver); } - // #if 0 - // int32_t sz; - // buf = taosDecodeFixedI32(buf, &sz); - // pConsumerEp->vgs = taosArrayInit(sz, sizeof(void *)); - // for (int32_t i = 0; i < sz; i++) { - // SMqVgEp *pVgEp = taosMemoryMalloc(sizeof(SMqVgEp)); - // buf = tDecodeSMqVgEp(buf, pVgEp); - // taosArrayPush(pConsumerEp->vgs, &pVgEp); - // } - // #endif return (void *)buf; } @@ -596,22 +582,7 @@ int32_t tEncodeSubscribeObj(void **buf, const SMqSubscribeObj *pSub) { tlen += taosEncodeArray(buf, pSub->unassignedVgs, (FEncode)tEncodeSMqVgEp); tlen += taosEncodeString(buf, pSub->dbName); - int32_t szVgs = taosArrayGetSize(pSub->offsetRows); - tlen += taosEncodeFixedI32(buf, szVgs); - for (int32_t j = 0; j < szVgs; ++j) { - OffsetRows *offRows = taosArrayGet(pSub->offsetRows, j); - tlen += taosEncodeFixedI32(buf, offRows->vgId); - tlen += taosEncodeFixedI64(buf, offRows->rows); - tlen += taosEncodeFixedI8(buf, offRows->offset.type); - if (offRows->offset.type == TMQ_OFFSET__SNAPSHOT_DATA || offRows->offset.type == TMQ_OFFSET__SNAPSHOT_META) { - tlen += taosEncodeFixedI64(buf, offRows->offset.uid); - tlen += taosEncodeFixedI64(buf, offRows->offset.ts); - } else if (offRows->offset.type == TMQ_OFFSET__LOG) { - tlen += taosEncodeFixedI64(buf, offRows->offset.version); - } else { - // do nothing - } - } + tlen += tEncodeOffRows(buf, pSub->offsetRows); tlen += taosEncodeString(buf, pSub->qmsg); return tlen; } @@ -639,26 +610,7 @@ void *tDecodeSubscribeObj(const void *buf, SMqSubscribeObj *pSub, int8_t sver) { buf = taosDecodeStringTo(buf, pSub->dbName); if (sver > 1) { - int32_t szVgs = 0; - buf = taosDecodeFixedI32(buf, &szVgs); - if (szVgs > 0) { - pSub->offsetRows = taosArrayInit(szVgs, sizeof(OffsetRows)); - if (NULL == pSub->offsetRows) return NULL; - for (int32_t j = 0; j < szVgs; ++j) { - OffsetRows *offRows = taosArrayReserve(pSub->offsetRows, 1); - buf = taosDecodeFixedI32(buf, &offRows->vgId); - buf = taosDecodeFixedI64(buf, &offRows->rows); - buf = taosDecodeFixedI8(buf, &offRows->offset.type); - if (offRows->offset.type == TMQ_OFFSET__SNAPSHOT_DATA || offRows->offset.type == TMQ_OFFSET__SNAPSHOT_META) { - buf = taosDecodeFixedI64(buf, &offRows->offset.uid); - buf = taosDecodeFixedI64(buf, &offRows->offset.ts); - } else if (offRows->offset.type == TMQ_OFFSET__LOG) { - buf = taosDecodeFixedI64(buf, &offRows->offset.version); - } else { - // do nothing - } - } - } + buf = tDecodeOffRows(buf, &pSub->offsetRows, sver); buf = taosDecodeString(buf, &pSub->qmsg); } else { pSub->qmsg = taosStrdup(""); diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 0909003201..fbdfd81cdf 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -24,7 +24,7 @@ #include "tcompare.h" #include "tname.h" -#define MND_SUBSCRIBE_VER_NUMBER 2 +#define MND_SUBSCRIBE_VER_NUMBER 3 #define MND_SUBSCRIBE_RESERVE_SIZE 64 #define MND_CONSUMER_LOST_HB_CNT 6 @@ -530,51 +530,50 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR } } -// if(taosHashGetSize(pOutput->pSub->consumerHash) == 0) { // if all consumer is removed - SMqSubscribeObj *pSub = mndAcquireSubscribeByKey(pMnode, pInput->pRebInfo->key); // put all offset rows - if (pSub) { - taosRLockLatch(&pSub->lock); - if (pOutput->pSub->offsetRows == NULL) { - pOutput->pSub->offsetRows = taosArrayInit(4, sizeof(OffsetRows)); - } - pIter = NULL; - while (1) { - pIter = taosHashIterate(pSub->consumerHash, pIter); - if (pIter == NULL) break; - SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter; - SMqConsumerEp *pConsumerEpNew = taosHashGet(pOutput->pSub->consumerHash, &pConsumerEp->consumerId, sizeof(int64_t)); + SMqSubscribeObj *pSub = mndAcquireSubscribeByKey(pMnode, pInput->pRebInfo->key); // put all offset rows + if (pSub) { + taosRLockLatch(&pSub->lock); + if (pOutput->pSub->offsetRows == NULL) { + pOutput->pSub->offsetRows = taosArrayInit(4, sizeof(OffsetRows)); + } + pIter = NULL; + while (1) { + pIter = taosHashIterate(pSub->consumerHash, pIter); + if (pIter == NULL) break; + SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter; + SMqConsumerEp *pConsumerEpNew = taosHashGet(pOutput->pSub->consumerHash, &pConsumerEp->consumerId, sizeof(int64_t)); - for (int j = 0; j < taosArrayGetSize(pConsumerEp->offsetRows); j++) { - OffsetRows *d1 = taosArrayGet(pConsumerEp->offsetRows, j); - bool jump = false; - for (int i = 0; pConsumerEpNew && i < taosArrayGetSize(pConsumerEpNew->vgs); i++){ - SMqVgEp *pVgEp = taosArrayGetP(pConsumerEpNew->vgs, i); - if(pVgEp->vgId == d1->vgId){ - jump = true; - mInfo("pSub->offsetRows jump, because consumer id:0x%"PRIx64 " and vgId:%d not change", pConsumerEp->consumerId, pVgEp->vgId); - break; - } - } - if(jump) continue; - bool find = false; - for (int i = 0; i < taosArrayGetSize(pOutput->pSub->offsetRows); i++) { - OffsetRows *d2 = taosArrayGet(pOutput->pSub->offsetRows, i); - if (d1->vgId == d2->vgId) { - d2->rows += d1->rows; - d2->offset = d1->offset; - find = true; - mInfo("pSub->offsetRows add vgId:%d, after:%"PRId64", before:%"PRId64, d2->vgId, d2->rows, d1->rows); - break; - } - } - if(!find){ - taosArrayPush(pOutput->pSub->offsetRows, d1); + for (int j = 0; j < taosArrayGetSize(pConsumerEp->offsetRows); j++) { + OffsetRows *d1 = taosArrayGet(pConsumerEp->offsetRows, j); + bool jump = false; + for (int i = 0; pConsumerEpNew && i < taosArrayGetSize(pConsumerEpNew->vgs); i++){ + SMqVgEp *pVgEp = taosArrayGetP(pConsumerEpNew->vgs, i); + if(pVgEp->vgId == d1->vgId){ + jump = true; + mInfo("pSub->offsetRows jump, because consumer id:0x%"PRIx64 " and vgId:%d not change", pConsumerEp->consumerId, pVgEp->vgId); + break; } } + if(jump) continue; + bool find = false; + for (int i = 0; i < taosArrayGetSize(pOutput->pSub->offsetRows); i++) { + OffsetRows *d2 = taosArrayGet(pOutput->pSub->offsetRows, i); + if (d1->vgId == d2->vgId) { + d2->rows += d1->rows; + d2->offset = d1->offset; + d2->ever = d1->ever; + find = true; + mInfo("pSub->offsetRows add vgId:%d, after:%"PRId64", before:%"PRId64, d2->vgId, d2->rows, d1->rows); + break; + } + } + if(!find){ + taosArrayPush(pOutput->pSub->offsetRows, d1); + } } - taosRUnLockLatch(&pSub->lock); - mndReleaseSubscribe(pMnode, pSub); -// } + } + taosRUnLockLatch(&pSub->lock); + mndReleaseSubscribe(pMnode, pSub); } // 8. generate logs @@ -1405,8 +1404,9 @@ static int32_t buildResult(SSDataBlock *pBlock, int32_t* numOfRows, int64_t cons } if(data){ // vg id - char buf[TSDB_OFFSET_LEN + VARSTR_HEADER_SIZE] = {0}; + char buf[TSDB_OFFSET_LEN*2 + VARSTR_HEADER_SIZE] = {0}; tFormatOffset(varDataVal(buf), TSDB_OFFSET_LEN, &data->offset); + sprintf(varDataVal(buf) + strlen(varDataVal(buf)), "/%"PRId64, data->ever); varDataSetLen(buf, strlen(varDataVal(buf))); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataSetVal(pColInfo, *numOfRows, (const char *)buf, false); diff --git a/tests/system-test/7-tmq/tmq_taosx.py b/tests/system-test/7-tmq/tmq_taosx.py index 86c40fdc72..5bd70a5d60 100644 --- a/tests/system-test/7-tmq/tmq_taosx.py +++ b/tests/system-test/7-tmq/tmq_taosx.py @@ -11,6 +11,7 @@ from util.sql import * from util.cases import * from util.dnodes import * from util.common import * +from taos.tmq import * sys.path.append("./7-tmq") from tmqCommon import * @@ -310,6 +311,43 @@ class TDTestCase: return + def consumeExcluded(self): + tdSql.execute(f'create topic topic_excluded as database db_taosx') + consumer_dict = { + "group.id": "g1", + "td.connect.user": "root", + "td.connect.pass": "taosdata", + "auto.offset.reset": "earliest", + "msg.consume.excluded": 1 + } + consumer = Consumer(consumer_dict) + + tdLog.debug("test subscribe topic created by other user") + exceptOccured = False + try: + consumer.subscribe(["topic_excluded"]) + except TmqError: + exceptOccured = True + + if exceptOccured: + tdLog.exit(f"subscribe error") + + try: + while True: + res = consumer.poll(1) + if not res: + break + err = res.error() + if err is not None: + raise err + val = res.value() + + for block in val: + print(block.fetchall()) + + finally: + consumer.close() + def run(self): tdSql.prepare() self.checkWal1VgroupOnlyMeta() @@ -324,6 +362,8 @@ class TDTestCase: self.checkSnapshotMultiVgroups() self.checkWalMultiVgroupsWithDropTable() + # self.consumeExcluded() + self.checkSnapshotMultiVgroupsWithDropTable() def stop(self): diff --git a/utils/test/c/tmq_taosx_ci.c b/utils/test/c/tmq_taosx_ci.c index 8a7074844a..056b7dc6cf 100644 --- a/utils/test/c/tmq_taosx_ci.c +++ b/utils/test/c/tmq_taosx_ci.c @@ -909,6 +909,88 @@ void initLogFile() { taosCloseFile(&pFile2); } +void testConsumeExcluded(int topic_type){ + TAOS* pConn = use_db(); + TAOS_RES *pRes = NULL; + + if(topic_type == 1){ + char *topic = "create topic topic_excluded with meta as database db_taosx"; + pRes = taos_query(pConn, topic); + if (taos_errno(pRes) != 0) { + printf("failed to create topic topic_excluded, reason:%s\n", taos_errstr(pRes)); + taos_close(pConn); + return; + } + taos_free_result(pRes); + }else if(topic_type == 2){ + char *topic = "create topic topic_excluded as select * from stt"; + pRes = taos_query(pConn, topic); + if (taos_errno(pRes) != 0) { + printf("failed to create topic topic_excluded, reason:%s\n", taos_errstr(pRes)); + taos_close(pConn); + return; + } + taos_free_result(pRes); + } + taos_close(pConn); + + tmq_conf_t* conf = tmq_conf_new(); + tmq_conf_set(conf, "group.id", "tg2"); + tmq_conf_set(conf, "client.id", "my app 1"); + tmq_conf_set(conf, "td.connect.user", "root"); + tmq_conf_set(conf, "td.connect.pass", "taosdata"); + tmq_conf_set(conf, "msg.with.table.name", "true"); + tmq_conf_set(conf, "enable.auto.commit", "true"); + tmq_conf_set(conf, "auto.offset.reset", "earliest"); + tmq_conf_set(conf, "msg.consume.excluded", "1"); + + + tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL); + tmq_t* tmq = tmq_consumer_new(conf, NULL, 0); + assert(tmq); + tmq_conf_destroy(conf); + + tmq_list_t* topic_list = tmq_list_new(); + tmq_list_append(topic_list, "topic_excluded"); + + int32_t code = 0; + + if ((code = tmq_subscribe(tmq, topic_list))) { + fprintf(stderr, "%% Failed to start consuming topics: %s\n", tmq_err2str(code)); + printf("subscribe err\n"); + return; + } + while (running) { + TAOS_RES* msg = tmq_consumer_poll(tmq, 1000); + if (msg) { + tmq_raw_data raw = {0}; + tmq_get_raw(msg, &raw); + if(topic_type == 1){ + assert(raw.raw_type != 2 && raw.raw_type != 4); + }else if(topic_type == 2){ + assert(0); + } +// printf("write raw data type: %d\n", raw.raw_type); + tmq_free_raw(raw); + + taos_free_result(msg); + } else { + break; + } + } + + tmq_consumer_close(tmq); + tmq_list_destroy(topic_list); + + pConn = use_db(); + pRes = taos_query(pConn, "drop topic if exists topic_excluded"); + if (taos_errno(pRes) != 0) { + printf("error in drop topic, reason:%s\n", taos_errstr(pRes)); + taos_close(pConn); + return; + } + taos_free_result(pRes); +} int main(int argc, char* argv[]) { for (int32_t i = 1; i < argc; i++) { if (strcmp(argv[i], "-c") == 0) { @@ -942,5 +1024,8 @@ int main(int argc, char* argv[]) { tmq_list_t* topic_list = build_topic_list(); basic_consume_loop(tmq, topic_list); tmq_list_destroy(topic_list); + + testConsumeExcluded(1); + testConsumeExcluded(2); taosCloseFile(&g_fp); } From 43624bbe08f24b95a568126b8fd58f37c4083ac2 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 7 Feb 2024 09:52:41 +0800 Subject: [PATCH 3/3] fix:cases error --- tests/system-test/7-tmq/tmqParamsTest.py | 23 ++++++++++++++++------- 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/tests/system-test/7-tmq/tmqParamsTest.py b/tests/system-test/7-tmq/tmqParamsTest.py index 9286b69278..82a5d42b47 100644 --- a/tests/system-test/7-tmq/tmqParamsTest.py +++ b/tests/system-test/7-tmq/tmqParamsTest.py @@ -133,13 +133,17 @@ class TDTestCase: if snapshot_value == "true": if offset_value != "earliest" and offset_value != "": if offset_value == "latest": - offset_value_list = list(map(lambda x: int(x[-2].replace("wal:", "").replace("earliest", "0").replace("latest", "0").replace(offset_value, "0")), subscription_info)) - tdSql.checkEqual(sum(offset_value_list) >= 0, True) + offset_value_list = list(map(lambda x: (x[-2].replace("wal:", "").replace("earliest", "0").replace("latest", "0").replace(offset_value, "0")), subscription_info)) + offset_value_list1 = list(map(lambda x: int(x.split("/")[0]), offset_value_list)) + offset_value_list2 = list(map(lambda x: int(x.split("/")[1]), offset_value_list)) + tdSql.checkEqual(offset_value_list1 == offset_value_list2, True) + tdSql.checkEqual(sum(offset_value_list1) >= 0, True) rows_value_list = list(map(lambda x: int(x[-1]), subscription_info)) tdSql.checkEqual(sum(rows_value_list), expected_res) elif offset_value == "none": offset_value_list = list(map(lambda x: x[-2], subscription_info)) - tdSql.checkEqual(offset_value_list, ['none']*len(subscription_info)) + offset_value_list1 = list(map(lambda x: (x.split("/")[0]), offset_value_list)) + tdSql.checkEqual(offset_value_list1, ['none']*len(subscription_info)) rows_value_list = list(map(lambda x: x[-1], subscription_info)) tdSql.checkEqual(rows_value_list, [0]*len(subscription_info)) else: @@ -151,18 +155,23 @@ class TDTestCase: # tdSql.checkEqual(sum(rows_value_list), expected_res) else: offset_value_list = list(map(lambda x: x[-2], subscription_info)) - tdSql.checkEqual(offset_value_list, [None]*len(subscription_info)) + offset_value_list1 = list(map(lambda x: (x.split("/")[0]), offset_value_list)) + tdSql.checkEqual(offset_value_list1, [None]*len(subscription_info)) rows_value_list = list(map(lambda x: x[-1], subscription_info)) tdSql.checkEqual(rows_value_list, [None]*len(subscription_info)) else: if offset_value != "none": - offset_value_list = list(map(lambda x: int(x[-2].replace("wal:", "").replace("earliest", "0").replace("latest", "0").replace(offset_value, "0")), subscription_info)) - tdSql.checkEqual(sum(offset_value_list) >= 0, True) + offset_value_list = list(map(lambda x: (x[-2].replace("wal:", "").replace("earliest", "0").replace("latest", "0").replace(offset_value, "0")), subscription_info)) + offset_value_list1 = list(map(lambda x: int(x.split("/")[0]), offset_value_list)) + offset_value_list2 = list(map(lambda x: int(x.split("/")[1]), offset_value_list)) + tdSql.checkEqual(offset_value_list1 == offset_value_list2, True) + tdSql.checkEqual(sum(offset_value_list1) >= 0, True) rows_value_list = list(map(lambda x: int(x[-1]), subscription_info)) tdSql.checkEqual(sum(rows_value_list), expected_res) else: offset_value_list = list(map(lambda x: x[-2], subscription_info)) - tdSql.checkEqual(offset_value_list, ['none']*len(subscription_info)) + offset_value_list1 = list(map(lambda x: (x.split("/")[0]), offset_value_list)) + tdSql.checkEqual(offset_value_list1, ['none']*len(subscription_info)) rows_value_list = list(map(lambda x: x[-1], subscription_info)) tdSql.checkEqual(rows_value_list, [0]*len(subscription_info)) tdSql.execute(f"drop topic if exists {topic_name}")