From 4ef2c3964968aa6baf49cb89fd607b6594402df6 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Thu, 11 Jul 2024 14:56:25 +0800 Subject: [PATCH] fix:[TS-4592]remove lost status --- include/common/tmsg.h | 2 +- source/client/src/clientEnv.c | 2 +- source/dnode/mnode/impl/inc/mndConsumer.h | 2 +- source/dnode/mnode/impl/src/mndSubscribe.c | 82 ++++++++++---------- source/dnode/mnode/impl/src/mndTopic.c | 90 +++++++++++----------- source/dnode/mnode/sdb/src/sdbFile.c | 2 + utils/test/c/tmq_taosx_ci.c | 1 + 7 files changed, 93 insertions(+), 88 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index c812138282..be7accd0da 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2773,7 +2773,7 @@ enum { }; #define DEFAULT_MAX_POLL_INTERVAL 3000000 -#define DEFAULT_SESSION_TIMEOUT 10000 +#define DEFAULT_SESSION_TIMEOUT 12000 typedef struct { char name[TSDB_TOPIC_FNAME_LEN]; // accout.topic diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index 3a821768f8..1336372ae2 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -127,7 +127,7 @@ static void generateWriteSlowLog(STscObj *pTscObj, SRequestObj *pRequest, int32_ cJSON_AddItemToObject(json, "error_info", cJSON_CreateString(tstrerror(pRequest->code))); cJSON_AddItemToObject(json, "type", cJSON_CreateNumber(reqType)); cJSON_AddItemToObject(json, "rows_num", cJSON_CreateNumber(pRequest->body.resInfo.numOfRows + pRequest->body.resInfo.totalRows)); - if(strlen(pRequest->sqlstr) > pTscObj->pAppInfo->monitorParas.tsSlowLogMaxLen){ + if(pRequest->sqlstr != NULL && strlen(pRequest->sqlstr) > pTscObj->pAppInfo->monitorParas.tsSlowLogMaxLen){ char tmp = pRequest->sqlstr[pTscObj->pAppInfo->monitorParas.tsSlowLogMaxLen]; pRequest->sqlstr[pTscObj->pAppInfo->monitorParas.tsSlowLogMaxLen] = '\0'; cJSON_AddItemToObject(json, "sql", cJSON_CreateString(pRequest->sqlstr)); diff --git a/source/dnode/mnode/impl/inc/mndConsumer.h b/source/dnode/mnode/impl/inc/mndConsumer.h index 5184ad0eca..7308343d1c 100644 --- a/source/dnode/mnode/impl/inc/mndConsumer.h +++ b/source/dnode/mnode/impl/inc/mndConsumer.h @@ -25,7 +25,7 @@ extern "C" { enum { MQ_CONSUMER_STATUS_REBALANCE = 1, MQ_CONSUMER_STATUS_READY, - MQ_CONSUMER_STATUS_LOST, +// MQ_CONSUMER_STATUS_LOST, }; int32_t mndInitConsumer(SMnode *pMnode); diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index d2615d7b2f..e03eee07a1 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -749,11 +749,13 @@ static void mndCheckConsumer(SRpcMsg *pMsg, SHashObj *rebSubHash) { } else { checkForVgroupSplit(pMnode, pConsumer, rebSubHash); } - } else { + } else if (status == MQ_CONSUMER_STATUS_REBALANCE) { taosRLockLatch(&pConsumer->lock); buildRebInfo(rebSubHash, pConsumer->rebNewTopics, 1, pConsumer->cgroup, pConsumer->consumerId); buildRebInfo(rebSubHash, pConsumer->rebRemovedTopics, 0, pConsumer->cgroup, pConsumer->consumerId); taosRUnLockLatch(&pConsumer->lock); + } else { + mndSendConsumerMsg(pMnode, pConsumer->consumerId, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, &pMsg->info); } mndReleaseConsumer(pMnode, pConsumer); @@ -974,41 +976,41 @@ END: return ret; } -static int32_t mndDropConsumerByGroup(SMnode *pMnode, STrans *pTrans, char *cgroup, char *topic) { - void *pIter = NULL; - SMqConsumerObj *pConsumer = NULL; - int ret = 0; - while (1) { - pIter = sdbFetch(pMnode->pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer); - if (pIter == NULL) { - break; - } - - // drop consumer in lost status, other consumers not in lost status already deleted by rebalance - if (pConsumer->status != MQ_CONSUMER_STATUS_LOST || strcmp(cgroup, pConsumer->cgroup) != 0) { - sdbRelease(pMnode->pSdb, pConsumer); - continue; - } - int32_t sz = taosArrayGetSize(pConsumer->assignedTopics); - for (int32_t i = 0; i < sz; i++) { - char *name = taosArrayGetP(pConsumer->assignedTopics, i); - if (strcmp(topic, name) == 0) { - int32_t code = mndSetConsumerDropLogs(pTrans, pConsumer); - if (code != 0) { - ret = code; - goto END; - } - } - } - - sdbRelease(pMnode->pSdb, pConsumer); - } - -END: - sdbRelease(pMnode->pSdb, pConsumer); - sdbCancelFetch(pMnode->pSdb, pIter); - return ret; -} +//static int32_t mndDropConsumerByGroup(SMnode *pMnode, STrans *pTrans, char *cgroup, char *topic) { +// void *pIter = NULL; +// SMqConsumerObj *pConsumer = NULL; +// int ret = 0; +// while (1) { +// pIter = sdbFetch(pMnode->pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer); +// if (pIter == NULL) { +// break; +// } +// +// // drop consumer in lost status, other consumers not in lost status already deleted by rebalance +// if (pConsumer->status != MQ_CONSUMER_STATUS_LOST || strcmp(cgroup, pConsumer->cgroup) != 0) { +// sdbRelease(pMnode->pSdb, pConsumer); +// continue; +// } +// int32_t sz = taosArrayGetSize(pConsumer->assignedTopics); +// for (int32_t i = 0; i < sz; i++) { +// char *name = taosArrayGetP(pConsumer->assignedTopics, i); +// if (strcmp(topic, name) == 0) { +// int32_t code = mndSetConsumerDropLogs(pTrans, pConsumer); +// if (code != 0) { +// ret = code; +// goto END; +// } +// } +// } +// +// sdbRelease(pMnode->pSdb, pConsumer); +// } +// +//END: +// sdbRelease(pMnode->pSdb, pConsumer); +// sdbCancelFetch(pMnode->pSdb, pIter); +// return ret; +//} static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) { SMnode *pMnode = pMsg->info.node; @@ -1055,10 +1057,10 @@ static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) { goto end; } - code = mndDropConsumerByGroup(pMnode, pTrans, dropReq.cgroup, dropReq.topic); - if (code != 0) { - goto end; - } +// code = mndDropConsumerByGroup(pMnode, pTrans, dropReq.cgroup, dropReq.topic); +// if (code != 0) { +// goto end; +// } code = sendDeleteSubToVnode(pMnode, pSub, pTrans); if (code != 0) { diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index bcb38a3902..9ca0fed08a 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -668,47 +668,47 @@ static bool checkTopic(SArray *topics, char *topicName){ return false; } -static int32_t mndDropConsumerByTopic(SMnode *pMnode, STrans *pTrans, char *topicName){ - int32_t code = 0; - SSdb *pSdb = pMnode->pSdb; - void *pIter = NULL; - SMqConsumerObj *pConsumer = NULL; - while (1) { - pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer); - if (pIter == NULL) { - break; - } - - bool found = checkTopic(pConsumer->assignedTopics, topicName); - if (found){ - if (pConsumer->status == MQ_CONSUMER_STATUS_LOST) { - code = mndSetConsumerDropLogs(pTrans, pConsumer); - if (code != 0) { - goto end; - } - sdbRelease(pSdb, pConsumer); - continue; - } - mError("topic:%s, failed to drop since subscribed by consumer:0x%" PRIx64 ", in consumer group %s", - topicName, pConsumer->consumerId, pConsumer->cgroup); - code = TSDB_CODE_MND_TOPIC_SUBSCRIBED; - goto end; - } - - if (checkTopic(pConsumer->rebNewTopics, topicName) || checkTopic(pConsumer->rebRemovedTopics, topicName)) { - code = TSDB_CODE_MND_TOPIC_SUBSCRIBED; - mError("topic:%s, failed to drop since subscribed by consumer:%" PRId64 ", in consumer group %s (reb new)", - topicName, pConsumer->consumerId, pConsumer->cgroup); - goto end; - } - sdbRelease(pSdb, pConsumer); - } - -end: - sdbRelease(pSdb, pConsumer); - sdbCancelFetch(pSdb, pIter); - return code; -} +//static int32_t mndDropConsumerByTopic(SMnode *pMnode, STrans *pTrans, char *topicName){ +// int32_t code = 0; +// SSdb *pSdb = pMnode->pSdb; +// void *pIter = NULL; +// SMqConsumerObj *pConsumer = NULL; +// while (1) { +// pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer); +// if (pIter == NULL) { +// break; +// } +// +// bool found = checkTopic(pConsumer->assignedTopics, topicName); +// if (found){ +// if (pConsumer->status == MQ_CONSUMER_STATUS_LOST) { +// code = mndSetConsumerDropLogs(pTrans, pConsumer); +// if (code != 0) { +// goto end; +// } +// sdbRelease(pSdb, pConsumer); +// continue; +// } +// mError("topic:%s, failed to drop since subscribed by consumer:0x%" PRIx64 ", in consumer group %s", +// topicName, pConsumer->consumerId, pConsumer->cgroup); +// code = TSDB_CODE_MND_TOPIC_SUBSCRIBED; +// goto end; +// } +// +// if (checkTopic(pConsumer->rebNewTopics, topicName) || checkTopic(pConsumer->rebRemovedTopics, topicName)) { +// code = TSDB_CODE_MND_TOPIC_SUBSCRIBED; +// mError("topic:%s, failed to drop since subscribed by consumer:%" PRId64 ", in consumer group %s (reb new)", +// topicName, pConsumer->consumerId, pConsumer->cgroup); +// goto end; +// } +// sdbRelease(pSdb, pConsumer); +// } +// +//end: +// sdbRelease(pSdb, pConsumer); +// sdbCancelFetch(pSdb, pIter); +// return code; +//} static int32_t mndDropCheckInfoByTopic(SMnode *pMnode, STrans *pTrans, SMqTopicObj *pTopic){ // broadcast to all vnode @@ -804,10 +804,10 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) { goto end; } - code = mndDropConsumerByTopic(pMnode, pTrans, dropReq.name); - if (code != 0) { - goto end; - } +// code = mndDropConsumerByTopic(pMnode, pTrans, dropReq.name); +// if (code != 0) { +// goto end; +// } code = mndDropSubByTopic(pMnode, pTrans, dropReq.name); if (code < 0) { diff --git a/source/dnode/mnode/sdb/src/sdbFile.c b/source/dnode/mnode/sdb/src/sdbFile.c index d94650695c..ab928a4edc 100644 --- a/source/dnode/mnode/sdb/src/sdbFile.c +++ b/source/dnode/mnode/sdb/src/sdbFile.c @@ -339,6 +339,8 @@ static int32_t sdbReadFileImp(SSdb *pSdb) { code = sdbWriteWithoutFree(pSdb, pRaw); if (code != 0) { mError("failed to read sdb file:%s since %s", file, terrstr()); + code = sdbWriteWithoutFree(pSdb, pRaw); + goto _OVER; } } diff --git a/utils/test/c/tmq_taosx_ci.c b/utils/test/c/tmq_taosx_ci.c index 51d134a463..2f6a5fa59b 100644 --- a/utils/test/c/tmq_taosx_ci.c +++ b/utils/test/c/tmq_taosx_ci.c @@ -596,6 +596,7 @@ tmq_t* build_consumer() { tmq_conf_set(conf, "enable.auto.commit", "true"); tmq_conf_set(conf, "auto.offset.reset", "earliest"); tmq_conf_set(conf, "msg.consume.excluded", "1"); +// tmq_conf_set(conf, "max.poll.interval.ms", "20000"); if (g_conf.snapShot) { tmq_conf_set(conf, "experimental.snapshot.enable", "true");