From d9a16952571cfff06894a8210de783b5b3376360 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Fri, 4 Aug 2023 18:07:44 +0800 Subject: [PATCH 1/4] fix:add log --- source/client/test/clientTests.cpp | 174 ++++++++++++++++++++ source/dnode/mgmt/mgmt_vnode/src/vmWorker.c | 2 + source/dnode/vnode/src/tq/tq.c | 6 + source/dnode/vnode/src/vnd/vnodeSvr.c | 2 +- 4 files changed, 183 insertions(+), 1 deletion(-) diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index 6f978b0143..69c688f61c 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -1442,4 +1442,178 @@ TEST(clientCase, sub_tb_mt_test) { } } +TEST(clientCase, ts_3756) { +// taos_options(TSDB_OPTION_CONFIGDIR, "~/first/cfg"); + + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + ASSERT_NE(pConn, nullptr); + + tmq_conf_t* conf = tmq_conf_new(); + + tmq_conf_set(conf, "enable.auto.commit", "false"); + tmq_conf_set(conf, "auto.commit.interval.ms", "2000"); + tmq_conf_set(conf, "group.id", "group_id_2"); + tmq_conf_set(conf, "td.connect.user", "root"); + tmq_conf_set(conf, "td.connect.pass", "taosdata"); + tmq_conf_set(conf, "auto.offset.reset", "latest"); + tmq_conf_set(conf, "msg.with.table.name", "false"); + + tmq_t* tmq = tmq_consumer_new(conf, NULL, 0); + tmq_conf_destroy(conf); + + // 创建订阅 topics 列表 + tmq_list_t* topicList = tmq_list_new(); + tmq_list_append(topicList, "tp"); + + // 启动订阅 + tmq_subscribe(tmq, topicList); + tmq_list_destroy(topicList); + + TAOS_FIELD* fields = NULL; + int32_t numOfFields = 0; + int32_t precision = 0; + int32_t totalRows = 0; + int32_t msgCnt = 0; + int32_t timeout = 200; + + int32_t count = 0; + + tmq_topic_assignment* pAssign = NULL; + int32_t numOfAssign = 0; + + int32_t code = tmq_get_topic_assignment(tmq, "tp", &pAssign, &numOfAssign); + if (code != 0) { + printf("error occurs:%s\n", tmq_err2str(code)); + tmq_free_assignment(pAssign); + tmq_consumer_close(tmq); + taos_close(pConn); + fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows); + return; + } + + for(int i = 0; i < numOfAssign; i++){ + printf("assign i:%d, vgId:%d, offset:%lld, start:%lld, end:%lld\n", i, pAssign[i].vgId, pAssign[i].currentOffset, pAssign[i].begin, pAssign[i].end); + } + +// tmq_offset_seek(tmq, "tp", pAssign[0].vgId, 4); + tmq_free_assignment(pAssign); + + code = tmq_get_topic_assignment(tmq, "tp", &pAssign, &numOfAssign); + if (code != 0) { + printf("error occurs:%s\n", tmq_err2str(code)); + tmq_free_assignment(pAssign); + tmq_consumer_close(tmq); + taos_close(pConn); + fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows); + return; + } + + for(int i = 0; i < numOfAssign; i++){ + printf("assign i:%d, vgId:%d, offset:%lld, start:%lld, end:%lld\n", i, pAssign[i].vgId, pAssign[i].currentOffset, pAssign[i].begin, pAssign[i].end); + } + + tmq_free_assignment(pAssign); + + code = tmq_get_topic_assignment(tmq, "tp", &pAssign, &numOfAssign); + if (code != 0) { + printf("error occurs:%s\n", tmq_err2str(code)); + tmq_free_assignment(pAssign); + tmq_consumer_close(tmq); + taos_close(pConn); + fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows); + return; + } + + for(int i = 0; i < numOfAssign; i++){ + printf("assign i:%d, vgId:%d, offset:%lld, start:%lld, end:%lld\n", i, pAssign[i].vgId, pAssign[i].currentOffset, pAssign[i].begin, pAssign[i].end); + } + + while (1) { + printf("start to poll\n"); + TAOS_RES* pRes = tmq_consumer_poll(tmq, timeout); + if (pRes) { + char buf[128]; + + const char* topicName = tmq_get_topic_name(pRes); +// const char* dbName = tmq_get_db_name(pRes); +// int32_t vgroupId = tmq_get_vgroup_id(pRes); +// +// printf("topic: %s\n", topicName); +// printf("db: %s\n", dbName); +// printf("vgroup id: %d\n", vgroupId); + + printSubResults(pRes, &totalRows); + + tmq_topic_assignment* pAssignTmp = NULL; + int32_t numOfAssignTmp = 0; + + code = tmq_get_topic_assignment(tmq, "tp", &pAssignTmp, &numOfAssignTmp); + if (code != 0) { + printf("error occurs:%s\n", tmq_err2str(code)); + tmq_free_assignment(pAssign); + tmq_consumer_close(tmq); + taos_close(pConn); + fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows); + return; + } + + for(int i = 0; i < numOfAssign; i++){ + printf("assign i:%d, vgId:%d, offset:%lld, start:%lld, end:%lld\n", i, pAssignTmp[i].vgId, pAssignTmp[i].currentOffset, pAssignTmp[i].begin, pAssignTmp[i].end); + } + if(numOfAssign != 0){ + int i = 0; + for(; i < numOfAssign; i++){ + if(pAssign[i].currentOffset != pAssignTmp[i].currentOffset){ + break; + } + } + if(i == numOfAssign){ + printf("all position is same\n"); + break; + } + tmq_free_assignment(pAssign); + } + numOfAssign = numOfAssignTmp; + pAssign = pAssignTmp; + + } else { +// tmq_offset_seek(tmq, "tp", pAssign[0].vgId, pAssign[0].currentOffset); +// tmq_offset_seek(tmq, "tp", pAssign[1].vgId, pAssign[1].currentOffset); +// tmq_commit_sync(tmq, pRes); + continue; + } + +// tmq_commit_sync(tmq, pRes); + if (pRes != NULL) { + taos_free_result(pRes); + // if ((++count) > 1) { + // break; + // } + } else { +// break; + } + +// tmq_offset_seek(tmq, "tp", pAssign[0].vgId, pAssign[0].begin); + } + + tmq_free_assignment(pAssign); + + code = tmq_get_topic_assignment(tmq, "tp", &pAssign, &numOfAssign); + if (code != 0) { + printf("error occurs:%s\n", tmq_err2str(code)); + tmq_free_assignment(pAssign); + tmq_consumer_close(tmq); + taos_close(pConn); + fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows); + return; + } + + for(int i = 0; i < numOfAssign; i++){ + printf("assign i:%d, vgId:%d, offset:%lld, start:%lld, end:%lld\n", i, pAssign[i].vgId, pAssign[i].currentOffset, pAssign[i].begin, pAssign[i].end); + } + + tmq_consumer_close(tmq); + taos_close(pConn); + fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows); +} #pragma GCC diagnostic pop diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index 247c1729a3..f47f193e17 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -116,6 +116,8 @@ static void vmProcessFetchQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, pMsg, pInfo); if (code != 0) { + dGError("vnodeProcessFetchMsg vgId:%d, msg:%p failed code:%s", pVnode->vgId, pMsg, tstrerror(code)); + if (terrno != 0) { code = terrno; } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 4d433042ad..b02debd1a6 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -677,6 +677,9 @@ int32_t tqProcessVgCommittedInfoReq(STQ* pTq, SRpcMsg* pMsg) { tmsgSendRsp(&rsp); +// if(terrno == TSDB_CODE_REF_NOT_EXIST){ +// ASSERT(0); +// } return 0; } @@ -758,6 +761,9 @@ int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) { tqDoSendDataRsp(&pMsg->info, &dataRsp, req.epoch, req.consumerId, TMQ_MSG_TYPE__WALINFO_RSP, sver, ever); tDeleteMqDataRsp(&dataRsp); +// if(terrno == TSDB_CODE_REF_NOT_EXIST){ +// ASSERT(0); +// } return 0; } diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 743470aac8..ca1b7ea2f9 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -594,7 +594,7 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { } int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { - vTrace("vgId:%d, msg:%p in fetch queue is processing", pVnode->config.vgId, pMsg); + vTrace("vgId:%d, msg:%p in fetch queue is processing, terrno:%d", pVnode->config.vgId, pMsg, terrno); if ((pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_VND_TABLE_META || pMsg->msgType == TDMT_VND_TABLE_CFG || pMsg->msgType == TDMT_VND_BATCH_META) && !syncIsReadyForRead(pVnode->sync)) { From 5f02e2ddfb811b6ed3218c28dd127dc1f92ed17a Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Mon, 7 Aug 2023 09:31:29 +0800 Subject: [PATCH 2/4] fix:ref is not there --- source/dnode/mgmt/mgmt_vnode/src/vmWorker.c | 1 + source/dnode/vnode/src/tq/tq.c | 6 ------ source/dnode/vnode/src/vnd/vnodeSvr.c | 2 +- 3 files changed, 2 insertions(+), 7 deletions(-) diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index f47f193e17..30051f72ed 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -114,6 +114,7 @@ static void vmProcessFetchQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO const STraceId *trace = &pMsg->info.traceId; dGTrace("vgId:%d, msg:%p get from vnode-fetch queue", pVnode->vgId, pMsg); + terrno = 0; int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, pMsg, pInfo); if (code != 0) { dGError("vnodeProcessFetchMsg vgId:%d, msg:%p failed code:%s", pVnode->vgId, pMsg, tstrerror(code)); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index b02debd1a6..4d433042ad 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -677,9 +677,6 @@ int32_t tqProcessVgCommittedInfoReq(STQ* pTq, SRpcMsg* pMsg) { tmsgSendRsp(&rsp); -// if(terrno == TSDB_CODE_REF_NOT_EXIST){ -// ASSERT(0); -// } return 0; } @@ -761,9 +758,6 @@ int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) { tqDoSendDataRsp(&pMsg->info, &dataRsp, req.epoch, req.consumerId, TMQ_MSG_TYPE__WALINFO_RSP, sver, ever); tDeleteMqDataRsp(&dataRsp); -// if(terrno == TSDB_CODE_REF_NOT_EXIST){ -// ASSERT(0); -// } return 0; } diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index ca1b7ea2f9..743470aac8 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -594,7 +594,7 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { } int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { - vTrace("vgId:%d, msg:%p in fetch queue is processing, terrno:%d", pVnode->config.vgId, pMsg, terrno); + vTrace("vgId:%d, msg:%p in fetch queue is processing", pVnode->config.vgId, pMsg); if ((pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_VND_TABLE_META || pMsg->msgType == TDMT_VND_TABLE_CFG || pMsg->msgType == TDMT_VND_BATCH_META) && !syncIsReadyForRead(pVnode->sync)) { From 9ecabb30f5566d2560ca1cf12267cdad36d41102 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Mon, 7 Aug 2023 10:27:44 +0800 Subject: [PATCH 3/4] fix:ref is not there --- source/dnode/mgmt/mgmt_vnode/src/vmWorker.c | 2 +- source/dnode/vnode/src/tq/tq.c | 12 ++++++++---- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index 30051f72ed..4ee9a60ae6 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -114,7 +114,7 @@ static void vmProcessFetchQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO const STraceId *trace = &pMsg->info.traceId; dGTrace("vgId:%d, msg:%p get from vnode-fetch queue", pVnode->vgId, pMsg); - terrno = 0; +// terrno = 0; int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, pMsg, pInfo); if (code != 0) { dGError("vnodeProcessFetchMsg vgId:%d, msg:%p failed code:%s", pVnode->vgId, pMsg, tstrerror(code)); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 4d433042ad..0371e979cc 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -646,7 +646,8 @@ int32_t tqProcessVgCommittedInfoReq(STQ* pTq, SRpcMsg* pMsg) { SDecoder decoder; tDecoderInit(&decoder, (uint8_t*)data, len); if (tDecodeMqVgOffset(&decoder, &vgOffset) < 0) { - return TSDB_CODE_OUT_OF_MEMORY; + terrno = TSDB_CODE_OUT_OF_MEMORY; + return terrno; } tDecoderClear(&decoder); @@ -654,19 +655,22 @@ int32_t tqProcessVgCommittedInfoReq(STQ* pTq, SRpcMsg* pMsg) { STqOffset* pOffset = &vgOffset.offset; STqOffset* pSavedOffset = tqOffsetRead(pTq->pOffsetStore, pOffset->subKey); if (pSavedOffset == NULL) { - return TSDB_CODE_TMQ_NO_COMMITTED; + terrno = TSDB_CODE_TMQ_NO_COMMITTED; + return terrno; } vgOffset.offset = *pSavedOffset; int32_t code = 0; tEncodeSize(tEncodeMqVgOffset, &vgOffset, len, code); if (code < 0) { - return TSDB_CODE_INVALID_PARA; + terrno = TSDB_CODE_INVALID_PARA; + return terrno; } void* buf = rpcMallocCont(len); if (buf == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; + terrno = TSDB_CODE_OUT_OF_MEMORY; + return terrno; } SEncoder encoder; tEncoderInit(&encoder, buf, len); From 5daf38c0d56a3be6e25e67807c9218a5d28ed78b Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Mon, 7 Aug 2023 11:29:54 +0800 Subject: [PATCH 4/4] fix:ref is not there --- source/dnode/mgmt/mgmt_vnode/src/vmWorker.c | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index 4ee9a60ae6..44a27fb791 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -114,12 +114,10 @@ static void vmProcessFetchQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO const STraceId *trace = &pMsg->info.traceId; dGTrace("vgId:%d, msg:%p get from vnode-fetch queue", pVnode->vgId, pMsg); -// terrno = 0; + terrno = 0; int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, pMsg, pInfo); if (code != 0) { - dGError("vnodeProcessFetchMsg vgId:%d, msg:%p failed code:%s", pVnode->vgId, pMsg, tstrerror(code)); - - if (terrno != 0) { + if (code == -1 && terrno != 0) { code = terrno; }