fix:[TS-4592]remove lost status

This commit is contained in:
wangmm0220 2024-07-11 14:56:25 +08:00
parent b28437aaba
commit 4ef2c39649
7 changed files with 93 additions and 88 deletions

View File

@ -2773,7 +2773,7 @@ enum {
};
#define DEFAULT_MAX_POLL_INTERVAL 3000000
#define DEFAULT_SESSION_TIMEOUT 10000
#define DEFAULT_SESSION_TIMEOUT 12000
typedef struct {
char name[TSDB_TOPIC_FNAME_LEN]; // accout.topic

View File

@ -127,7 +127,7 @@ static void generateWriteSlowLog(STscObj *pTscObj, SRequestObj *pRequest, int32_
cJSON_AddItemToObject(json, "error_info", cJSON_CreateString(tstrerror(pRequest->code)));
cJSON_AddItemToObject(json, "type", cJSON_CreateNumber(reqType));
cJSON_AddItemToObject(json, "rows_num", cJSON_CreateNumber(pRequest->body.resInfo.numOfRows + pRequest->body.resInfo.totalRows));
if(strlen(pRequest->sqlstr) > pTscObj->pAppInfo->monitorParas.tsSlowLogMaxLen){
if(pRequest->sqlstr != NULL && strlen(pRequest->sqlstr) > pTscObj->pAppInfo->monitorParas.tsSlowLogMaxLen){
char tmp = pRequest->sqlstr[pTscObj->pAppInfo->monitorParas.tsSlowLogMaxLen];
pRequest->sqlstr[pTscObj->pAppInfo->monitorParas.tsSlowLogMaxLen] = '\0';
cJSON_AddItemToObject(json, "sql", cJSON_CreateString(pRequest->sqlstr));

View File

@ -25,7 +25,7 @@ extern "C" {
enum {
MQ_CONSUMER_STATUS_REBALANCE = 1,
MQ_CONSUMER_STATUS_READY,
MQ_CONSUMER_STATUS_LOST,
// MQ_CONSUMER_STATUS_LOST,
};
int32_t mndInitConsumer(SMnode *pMnode);

View File

@ -749,11 +749,13 @@ static void mndCheckConsumer(SRpcMsg *pMsg, SHashObj *rebSubHash) {
} else {
checkForVgroupSplit(pMnode, pConsumer, rebSubHash);
}
} else {
} else if (status == MQ_CONSUMER_STATUS_REBALANCE) {
taosRLockLatch(&pConsumer->lock);
buildRebInfo(rebSubHash, pConsumer->rebNewTopics, 1, pConsumer->cgroup, pConsumer->consumerId);
buildRebInfo(rebSubHash, pConsumer->rebRemovedTopics, 0, pConsumer->cgroup, pConsumer->consumerId);
taosRUnLockLatch(&pConsumer->lock);
} else {
mndSendConsumerMsg(pMnode, pConsumer->consumerId, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, &pMsg->info);
}
mndReleaseConsumer(pMnode, pConsumer);
@ -974,41 +976,41 @@ END:
return ret;
}
static int32_t mndDropConsumerByGroup(SMnode *pMnode, STrans *pTrans, char *cgroup, char *topic) {
void *pIter = NULL;
SMqConsumerObj *pConsumer = NULL;
int ret = 0;
while (1) {
pIter = sdbFetch(pMnode->pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
if (pIter == NULL) {
break;
}
// drop consumer in lost status, other consumers not in lost status already deleted by rebalance
if (pConsumer->status != MQ_CONSUMER_STATUS_LOST || strcmp(cgroup, pConsumer->cgroup) != 0) {
sdbRelease(pMnode->pSdb, pConsumer);
continue;
}
int32_t sz = taosArrayGetSize(pConsumer->assignedTopics);
for (int32_t i = 0; i < sz; i++) {
char *name = taosArrayGetP(pConsumer->assignedTopics, i);
if (strcmp(topic, name) == 0) {
int32_t code = mndSetConsumerDropLogs(pTrans, pConsumer);
if (code != 0) {
ret = code;
goto END;
}
}
}
sdbRelease(pMnode->pSdb, pConsumer);
}
END:
sdbRelease(pMnode->pSdb, pConsumer);
sdbCancelFetch(pMnode->pSdb, pIter);
return ret;
}
//static int32_t mndDropConsumerByGroup(SMnode *pMnode, STrans *pTrans, char *cgroup, char *topic) {
// void *pIter = NULL;
// SMqConsumerObj *pConsumer = NULL;
// int ret = 0;
// while (1) {
// pIter = sdbFetch(pMnode->pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
// if (pIter == NULL) {
// break;
// }
//
// // drop consumer in lost status, other consumers not in lost status already deleted by rebalance
// if (pConsumer->status != MQ_CONSUMER_STATUS_LOST || strcmp(cgroup, pConsumer->cgroup) != 0) {
// sdbRelease(pMnode->pSdb, pConsumer);
// continue;
// }
// int32_t sz = taosArrayGetSize(pConsumer->assignedTopics);
// for (int32_t i = 0; i < sz; i++) {
// char *name = taosArrayGetP(pConsumer->assignedTopics, i);
// if (strcmp(topic, name) == 0) {
// int32_t code = mndSetConsumerDropLogs(pTrans, pConsumer);
// if (code != 0) {
// ret = code;
// goto END;
// }
// }
// }
//
// sdbRelease(pMnode->pSdb, pConsumer);
// }
//
//END:
// sdbRelease(pMnode->pSdb, pConsumer);
// sdbCancelFetch(pMnode->pSdb, pIter);
// return ret;
//}
static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) {
SMnode *pMnode = pMsg->info.node;
@ -1055,10 +1057,10 @@ static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) {
goto end;
}
code = mndDropConsumerByGroup(pMnode, pTrans, dropReq.cgroup, dropReq.topic);
if (code != 0) {
goto end;
}
// code = mndDropConsumerByGroup(pMnode, pTrans, dropReq.cgroup, dropReq.topic);
// if (code != 0) {
// goto end;
// }
code = sendDeleteSubToVnode(pMnode, pSub, pTrans);
if (code != 0) {

View File

@ -668,47 +668,47 @@ static bool checkTopic(SArray *topics, char *topicName){
return false;
}
static int32_t mndDropConsumerByTopic(SMnode *pMnode, STrans *pTrans, char *topicName){
int32_t code = 0;
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
SMqConsumerObj *pConsumer = NULL;
while (1) {
pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
if (pIter == NULL) {
break;
}
bool found = checkTopic(pConsumer->assignedTopics, topicName);
if (found){
if (pConsumer->status == MQ_CONSUMER_STATUS_LOST) {
code = mndSetConsumerDropLogs(pTrans, pConsumer);
if (code != 0) {
goto end;
}
sdbRelease(pSdb, pConsumer);
continue;
}
mError("topic:%s, failed to drop since subscribed by consumer:0x%" PRIx64 ", in consumer group %s",
topicName, pConsumer->consumerId, pConsumer->cgroup);
code = TSDB_CODE_MND_TOPIC_SUBSCRIBED;
goto end;
}
if (checkTopic(pConsumer->rebNewTopics, topicName) || checkTopic(pConsumer->rebRemovedTopics, topicName)) {
code = TSDB_CODE_MND_TOPIC_SUBSCRIBED;
mError("topic:%s, failed to drop since subscribed by consumer:%" PRId64 ", in consumer group %s (reb new)",
topicName, pConsumer->consumerId, pConsumer->cgroup);
goto end;
}
sdbRelease(pSdb, pConsumer);
}
end:
sdbRelease(pSdb, pConsumer);
sdbCancelFetch(pSdb, pIter);
return code;
}
//static int32_t mndDropConsumerByTopic(SMnode *pMnode, STrans *pTrans, char *topicName){
// int32_t code = 0;
// SSdb *pSdb = pMnode->pSdb;
// void *pIter = NULL;
// SMqConsumerObj *pConsumer = NULL;
// while (1) {
// pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
// if (pIter == NULL) {
// break;
// }
//
// bool found = checkTopic(pConsumer->assignedTopics, topicName);
// if (found){
// if (pConsumer->status == MQ_CONSUMER_STATUS_LOST) {
// code = mndSetConsumerDropLogs(pTrans, pConsumer);
// if (code != 0) {
// goto end;
// }
// sdbRelease(pSdb, pConsumer);
// continue;
// }
// mError("topic:%s, failed to drop since subscribed by consumer:0x%" PRIx64 ", in consumer group %s",
// topicName, pConsumer->consumerId, pConsumer->cgroup);
// code = TSDB_CODE_MND_TOPIC_SUBSCRIBED;
// goto end;
// }
//
// if (checkTopic(pConsumer->rebNewTopics, topicName) || checkTopic(pConsumer->rebRemovedTopics, topicName)) {
// code = TSDB_CODE_MND_TOPIC_SUBSCRIBED;
// mError("topic:%s, failed to drop since subscribed by consumer:%" PRId64 ", in consumer group %s (reb new)",
// topicName, pConsumer->consumerId, pConsumer->cgroup);
// goto end;
// }
// sdbRelease(pSdb, pConsumer);
// }
//
//end:
// sdbRelease(pSdb, pConsumer);
// sdbCancelFetch(pSdb, pIter);
// return code;
//}
static int32_t mndDropCheckInfoByTopic(SMnode *pMnode, STrans *pTrans, SMqTopicObj *pTopic){
// broadcast to all vnode
@ -804,10 +804,10 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
goto end;
}
code = mndDropConsumerByTopic(pMnode, pTrans, dropReq.name);
if (code != 0) {
goto end;
}
// code = mndDropConsumerByTopic(pMnode, pTrans, dropReq.name);
// if (code != 0) {
// goto end;
// }
code = mndDropSubByTopic(pMnode, pTrans, dropReq.name);
if (code < 0) {

View File

@ -339,6 +339,8 @@ static int32_t sdbReadFileImp(SSdb *pSdb) {
code = sdbWriteWithoutFree(pSdb, pRaw);
if (code != 0) {
mError("failed to read sdb file:%s since %s", file, terrstr());
code = sdbWriteWithoutFree(pSdb, pRaw);
goto _OVER;
}
}

View File

@ -596,6 +596,7 @@ tmq_t* build_consumer() {
tmq_conf_set(conf, "enable.auto.commit", "true");
tmq_conf_set(conf, "auto.offset.reset", "earliest");
tmq_conf_set(conf, "msg.consume.excluded", "1");
// tmq_conf_set(conf, "max.poll.interval.ms", "20000");
if (g_conf.snapShot) {
tmq_conf_set(conf, "experimental.snapshot.enable", "true");