diff --git a/include/common/tmsg.h b/include/common/tmsg.h index b06adf3f2d..d0c84fbfa2 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2810,6 +2810,9 @@ enum { TOPIC_SUB_TYPE__COLUMN, }; +#define DEFAULT_MAX_POLL_INTERVAL 3000000 +#define DEFAULT_SESSION_TIMEOUT 12000 + typedef struct { char name[TSDB_TOPIC_FNAME_LEN]; // accout.topic int8_t igExists; @@ -2832,7 +2835,7 @@ typedef struct { typedef struct { int64_t consumerId; char cgroup[TSDB_CGROUP_LEN]; - char clientId[256]; + char clientId[TSDB_CLIENT_ID_LEN]; SArray* topicNames; // SArray int8_t withTbName; @@ -2841,6 +2844,8 @@ typedef struct { int8_t resetOffsetCfg; int8_t enableReplay; int8_t enableBatchMeta; + int32_t sessionTimeoutMs; + int32_t maxPollIntervalMs; } SCMSubscribeReq; static FORCE_INLINE int32_t tSerializeSCMSubscribeReq(void** buf, const SCMSubscribeReq* pReq) { @@ -2862,11 +2867,14 @@ static FORCE_INLINE int32_t tSerializeSCMSubscribeReq(void** buf, const SCMSubsc tlen += taosEncodeFixedI8(buf, pReq->resetOffsetCfg); tlen += taosEncodeFixedI8(buf, pReq->enableReplay); tlen += taosEncodeFixedI8(buf, pReq->enableBatchMeta); + tlen += taosEncodeFixedI32(buf, pReq->sessionTimeoutMs); + tlen += taosEncodeFixedI32(buf, pReq->maxPollIntervalMs); return tlen; } -static FORCE_INLINE int32_t tDeserializeSCMSubscribeReq(void* buf, SCMSubscribeReq* pReq) { +static FORCE_INLINE int32_t tDeserializeSCMSubscribeReq(void* buf, SCMSubscribeReq* pReq, int32_t len) { + void* start = buf; buf = taosDecodeFixedI64(buf, &pReq->consumerId); buf = taosDecodeStringTo(buf, pReq->cgroup); buf = taosDecodeStringTo(buf, pReq->clientId); @@ -2892,6 +2900,14 @@ static FORCE_INLINE int32_t tDeserializeSCMSubscribeReq(void* buf, SCMSubscribeR buf = taosDecodeFixedI8(buf, &pReq->resetOffsetCfg); buf = taosDecodeFixedI8(buf, &pReq->enableReplay); buf = taosDecodeFixedI8(buf, &pReq->enableBatchMeta); + if ((char*)buf - (char*)start < len) { + buf = taosDecodeFixedI32(buf, &pReq->sessionTimeoutMs); + buf = taosDecodeFixedI32(buf, &pReq->maxPollIntervalMs); + } else { + pReq->sessionTimeoutMs = DEFAULT_SESSION_TIMEOUT; + pReq->maxPollIntervalMs = DEFAULT_MAX_POLL_INTERVAL; + } + return 0; } @@ -4119,6 +4135,7 @@ typedef struct { int64_t consumerId; int32_t epoch; SArray* topics; + int8_t pollFlag; } SMqHbReq; typedef struct { diff --git a/include/util/tdef.h b/include/util/tdef.h index c9677845ac..39c65d2c71 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -221,6 +221,8 @@ typedef enum ELogicConditionType { #define TSDB_TABLE_NAME_LEN 193 // it is a null-terminated string #define TSDB_TOPIC_NAME_LEN 193 // it is a null-terminated string #define TSDB_CGROUP_LEN 193 // it is a null-terminated string +#define TSDB_CLIENT_ID_LEN 256 // it is a null-terminated string +#define TSDB_CONSUMER_ID_LEN 32 // it is a null-terminated string #define TSDB_OFFSET_LEN 64 // it is a null-terminated string #define TSDB_USER_CGROUP_LEN (TSDB_USER_LEN + TSDB_CGROUP_LEN) // it is a null-terminated string #define TSDB_STREAM_NAME_LEN 193 // it is a null-terminated string diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index a458edcad9..75c1eabe7e 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -2920,8 +2920,10 @@ void taosAsyncFetchImpl(SRequestObj* pRequest, __taos_async_fn_t fp, void* param .cbParam = pRequest, }; - if (TSDB_CODE_SUCCESS != schedulerFetchRows(pRequest->body.queryJob, &req)) { - tscError("0x%" PRIx64 " failed to schedule fetch rows", pRequest->self); + int32_t code = schedulerFetchRows(pRequest->body.queryJob, &req); + if (TSDB_CODE_SUCCESS != code) { + tscError("0x%" PRIx64 " failed to schedule fetch rows", pRequest->requestId); + pRequest->body.fetchFp(param, pRequest, code); } } diff --git a/source/client/src/clientMonitor.c b/source/client/src/clientMonitor.c index 92a8fc3b29..4bb29f8d97 100644 --- a/source/client/src/clientMonitor.c +++ b/source/client/src/clientMonitor.c @@ -12,13 +12,13 @@ SRWLatch monitorLock; void* monitorTimer; SHashObj* monitorCounterHash; -int32_t slowLogFlag = -1; -int32_t monitorFlag = -1; +int32_t monitorFlag = 0; int32_t quitCnt = 0; tsem2_t monitorSem; STaosQueue* monitorQueue; SHashObj* monitorSlowLogHash; char tmpSlowLogPath[PATH_MAX] = {0}; +TdThread monitorThread; static int32_t getSlowLogTmpDir(char* tmpPath, int32_t size) { int ret = snprintf(tmpPath, size, "%s/tdengine_slow_log/", tsTempDir); @@ -113,11 +113,11 @@ static int32_t monitorReportAsyncCB(void* param, SDataBuf* pMsg, int32_t code) { tscError("failed to send slow log:%s, clusterId:%" PRIx64, p->data, p->clusterId); } MonitorSlowLogData tmp = {.clusterId = p->clusterId, - .type = p->type, - .fileName = p->fileName, - .pFile = p->pFile, - .offset = p->offset, - .data = NULL}; + .type = p->type, + .fileName = p->fileName, + .pFile = p->pFile, + .offset = p->offset, + .data = NULL}; if (monitorPutData2MonitorQueue(tmp) == 0) { p->fileName = NULL; } @@ -164,7 +164,7 @@ static int32_t sendReport(void* pTransporter, SEpSet* epSet, char* pCont, MONITO int64_t transporterId = 0; return asyncSendMsgToServer(pTransporter, epSet, &transporterId, pInfo); -FAILED: + FAILED: monitorFreeSlowLogDataEx(param); return TAOS_GET_TERRNO(TSDB_CODE_TSC_INTERNAL_ERROR); } @@ -276,12 +276,10 @@ void monitorCreateClient(int64_t clusterId) { tscInfo("[monitor] monitorCreateClient for %" PRIx64 "finished %p.", clusterId, pMonitor); } taosWUnLockLatch(&monitorLock); - if (-1 != atomic_val_compare_exchange_32(&monitorFlag, -1, 0)) { - tscDebug("[monitor] monitorFlag already is 0"); - } + return; -fail: + fail: destroyMonitorClient(&pMonitor); taosWUnLockLatch(&monitorLock); } @@ -301,7 +299,7 @@ void monitorCreateClientCounter(int64_t clusterId, const char* name, const char* tscError("failed to add metric to collector"); (void)taos_counter_destroy(newCounter); goto end; -} + } if (taosHashPut(pMonitor->counters, name, strlen(name), &newCounter, POINTER_BYTES) != 0) { tscError("failed to put counter to monitor"); (void)taos_counter_destroy(newCounter); @@ -310,7 +308,7 @@ void monitorCreateClientCounter(int64_t clusterId, const char* name, const char* tscInfo("[monitor] monitorCreateClientCounter %" PRIx64 "(%p):%s : %p.", pMonitor->clusterId, pMonitor, name, newCounter); -end: + end: taosWUnLockLatch(&monitorLock); } @@ -339,7 +337,7 @@ void monitorCounterInc(int64_t clusterId, const char* counterName, const char** } tscDebug("[monitor] monitorCounterInc %" PRIx64 "(%p):%s", pMonitor->clusterId, pMonitor, counterName); -end: + end: taosWUnLockLatch(&monitorLock); } @@ -348,8 +346,6 @@ const char* monitorResultStr(SQL_RESULT_CODE code) { return result_state[code]; } -static void monitorThreadFuncUnexpectedStopped(void) { atomic_store_32(&slowLogFlag, -1); } - static void monitorWriteSlowLog2File(MonitorSlowLogData* slowLogData, char* tmpPath) { TdFilePtr pFile = NULL; void* tmp = taosHashGet(monitorSlowLogHash, &slowLogData->clusterId, LONG_BYTES); @@ -693,20 +689,10 @@ static void monitorSendAllSlowLogFromTempDir(int64_t clusterId) { static void* monitorThreadFunc(void* param) { setThreadName("client-monitor-slowlog"); - -#ifdef WINDOWS - if (taosCheckCurrentInDll()) { - atexit(monitorThreadFuncUnexpectedStopped); - } -#endif - - if (-1 != atomic_val_compare_exchange_32(&slowLogFlag, -1, 0)) { - return NULL; - } tscDebug("monitorThreadFunc start"); int64_t quitTime = 0; while (1) { - if (atomic_load_32(&slowLogFlag) > 0) { + if (atomic_load_32(&monitorFlag) == 1) { if (quitCnt == 0) { monitorSendAllSlowLogAtQuit(); if (quitCnt == 0) { @@ -752,7 +738,6 @@ static void* monitorThreadFunc(void* param) { } (void)tsem2_timewait(&monitorSem, 100); } - atomic_store_32(&slowLogFlag, -2); return NULL; } @@ -767,7 +752,6 @@ static int32_t tscMonitortInit() { return TSDB_CODE_TSC_INTERNAL_ERROR; } - TdThread monitorThread; if (taosThreadCreate(&monitorThread, &thAttr, monitorThreadFunc, NULL) != 0) { tscError("failed to create monitor thread since %s", strerror(errno)); return TSDB_CODE_TSC_INTERNAL_ERROR; @@ -778,13 +762,9 @@ static int32_t tscMonitortInit() { } static void tscMonitorStop() { - if (atomic_val_compare_exchange_32(&slowLogFlag, 0, 1)) { - tscDebug("monitor thread already stopped"); - return; - } - - while (atomic_load_32(&slowLogFlag) > 0) { - taosMsleep(100); + if (taosCheckPthreadValid(monitorThread)) { + (void)taosThreadJoin(monitorThread, NULL); + (void)taosThreadClear(&monitorThread); } } @@ -842,10 +822,7 @@ int32_t monitorInit() { void monitorClose() { tscInfo("[monitor] tscMonitor close"); taosWLockLatch(&monitorLock); - - if (atomic_val_compare_exchange_32(&monitorFlag, 0, 1)) { - tscDebug("[monitor] monitorFlag is not 0"); - } + atomic_store_32(&monitorFlag, 1); tscMonitorStop(); sendAllCounter(); taosHashCleanup(monitorCounterHash); @@ -860,7 +837,7 @@ int32_t monitorPutData2MonitorQueue(MonitorSlowLogData data) { int32_t code = 0; MonitorSlowLogData* slowLogData = NULL; - if (atomic_load_32(&slowLogFlag) == -2) { + if (atomic_load_32(&monitorFlag) == 1) { tscError("[monitor] slow log thread is exiting"); return -1; } diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 46c3cf6622..cfcb68598f 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -37,6 +37,7 @@ struct SMqMgmt { static TdThreadOnce tmqInit = PTHREAD_ONCE_INIT; // initialize only once volatile int32_t tmqInitRes = 0; // initialize rsp code static struct SMqMgmt tmqMgmt = {0}; +static int8_t pollFlag = 0; typedef struct { int32_t code; @@ -56,7 +57,7 @@ struct tmq_list_t { }; struct tmq_conf_t { - char clientId[256]; + char clientId[TSDB_CLIENT_ID_LEN]; char groupId[TSDB_CGROUP_LEN]; int8_t autoCommit; int8_t resetOffset; @@ -66,6 +67,9 @@ struct tmq_conf_t { int8_t sourceExcluded; // do not consume, bit uint16_t port; int32_t autoCommitInterval; + int32_t sessionTimeoutMs; + int32_t heartBeatIntervalMs; + int32_t maxPollIntervalMs; char* ip; char* user; char* pass; @@ -77,15 +81,18 @@ struct tmq_conf_t { struct tmq_t { int64_t refId; char groupId[TSDB_CGROUP_LEN]; - char clientId[256]; + char clientId[TSDB_CLIENT_ID_LEN]; int8_t withTbName; int8_t useSnapshot; int8_t autoCommit; int32_t autoCommitInterval; + int32_t sessionTimeoutMs; + int32_t heartBeatIntervalMs; + int32_t maxPollIntervalMs; int8_t resetOffsetCfg; int8_t replayEnable; int8_t sourceExcluded; // do not consume, bit - uint64_t consumerId; + int64_t consumerId; tmq_commit_cb* commitCb; void* commitCbUserParam; int8_t enableBatchMeta; @@ -266,6 +273,9 @@ tmq_conf_t* tmq_conf_new() { conf->autoCommitInterval = DEFAULT_AUTO_COMMIT_INTERVAL; conf->resetOffset = TMQ_OFFSET__RESET_LATEST; conf->enableBatchMeta = false; + conf->heartBeatIntervalMs = DEFAULT_HEARTBEAT_INTERVAL; + conf->maxPollIntervalMs = DEFAULT_MAX_POLL_INTERVAL; + conf->sessionTimeoutMs = DEFAULT_SESSION_TIMEOUT; return conf; } @@ -295,7 +305,7 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value } if (strcasecmp(key, "client.id") == 0) { - tstrncpy(conf->clientId, value, 256); + tstrncpy(conf->clientId, value, TSDB_CLIENT_ID_LEN); return TMQ_CONF_OK; } @@ -312,7 +322,38 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value } if (strcasecmp(key, "auto.commit.interval.ms") == 0) { - conf->autoCommitInterval = taosStr2int64(value); + int64_t tmp = taosStr2int64(value); + if (tmp < 0 || EINVAL == errno || ERANGE == errno) { + return TMQ_CONF_INVALID; + } + conf->autoCommitInterval = (tmp > INT32_MAX ? INT32_MAX : tmp); + return TMQ_CONF_OK; + } + + if (strcasecmp(key, "session.timeout.ms") == 0) { + int64_t tmp = taosStr2int64(value); + if (tmp < 6000 || tmp > 1800000){ + return TMQ_CONF_INVALID; + } + conf->sessionTimeoutMs = tmp; + return TMQ_CONF_OK; + } + + if (strcasecmp(key, "heartbeat.interval.ms") == 0) { + int64_t tmp = taosStr2int64(value); + if (tmp < 1000 || tmp >= conf->sessionTimeoutMs){ + return TMQ_CONF_INVALID; + } + conf->heartBeatIntervalMs = tmp; + return TMQ_CONF_OK; + } + + if (strcasecmp(key, "max.poll.interval.ms") == 0) { + int64_t tmp = taosStr2int64(value); + if (tmp < 1000 || tmp > INT32_MAX){ + return TMQ_CONF_INVALID; + } + conf->maxPollIntervalMs = tmp; return TMQ_CONF_OK; } @@ -371,7 +412,12 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value } if (strcasecmp(key, "td.connect.port") == 0) { - conf->port = taosStr2int64(value); + int64_t tmp = taosStr2int64(value); + if (tmp <= 0 || tmp > 65535) { + return TMQ_CONF_INVALID; + } + + conf->port = tmp; return TMQ_CONF_OK; } @@ -825,6 +871,7 @@ void tmqSendHbReq(void* param, void* tmrId) { SMqHbReq req = {0}; req.consumerId = tmq->consumerId; req.epoch = tmq->epoch; + req.pollFlag = atomic_load_8(&pollFlag); req.topics = taosArrayInit(taosArrayGetSize(tmq->clientTopics), sizeof(TopicOffsetRows)); if (req.topics == NULL){ return; @@ -906,10 +953,11 @@ void tmqSendHbReq(void* param, void* tmrId) { tscError("tmqSendHbReq asyncSendMsgToServer failed"); } + atomic_val_compare_exchange_8(&pollFlag, 1, 0); OVER: tDestroySMqHbReq(&req); - if (tmrId != NULL) { - (void)taosTmrReset(tmqSendHbReq, DEFAULT_HEARTBEAT_INTERVAL, param, tmqMgmt.timer, &tmq->hbLiveTimer); + if(tmrId != NULL){ + (void)taosTmrReset(tmqSendHbReq, tmq->heartBeatIntervalMs, param, tmqMgmt.timer, &tmq->hbLiveTimer); } (void)taosReleaseRef(tmqMgmt.rsetId, refId); } @@ -1208,6 +1256,9 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { pTmq->useSnapshot = conf->snapEnable; pTmq->autoCommit = conf->autoCommit; pTmq->autoCommitInterval = conf->autoCommitInterval; + pTmq->sessionTimeoutMs = conf->sessionTimeoutMs; + pTmq->heartBeatIntervalMs = conf->heartBeatIntervalMs; + pTmq->maxPollIntervalMs = conf->maxPollIntervalMs; pTmq->commitCb = conf->commitCb; pTmq->commitCbUserParam = conf->commitCbUserParam; pTmq->resetOffsetCfg = conf->resetOffset; @@ -1246,7 +1297,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { goto _failed; } - pTmq->hbLiveTimer = taosTmrStart(tmqSendHbReq, DEFAULT_HEARTBEAT_INTERVAL, (void*)pTmq->refId, tmqMgmt.timer); + pTmq->hbLiveTimer = taosTmrStart(tmqSendHbReq, pTmq->heartBeatIntervalMs, (void*)pTmq->refId, tmqMgmt.timer); if (pTmq->hbLiveTimer == NULL) { SET_ERROR_MSG_TMQ("start heartbeat timer failed") goto _failed; @@ -1279,7 +1330,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { tscInfo("consumer:0x%" PRIx64 " cgroup:%s, subscribe %d topics", tmq->consumerId, tmq->groupId, sz); req.consumerId = tmq->consumerId; - tstrncpy(req.clientId, tmq->clientId, 256); + tstrncpy(req.clientId, tmq->clientId, TSDB_CLIENT_ID_LEN); tstrncpy(req.cgroup, tmq->groupId, TSDB_CGROUP_LEN); req.topicNames = taosArrayInit(sz, sizeof(void*)); @@ -1291,6 +1342,8 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { req.withTbName = tmq->withTbName; req.autoCommit = tmq->autoCommit; req.autoCommitInterval = tmq->autoCommitInterval; + req.sessionTimeoutMs = tmq->sessionTimeoutMs; + req.maxPollIntervalMs = tmq->maxPollIntervalMs; req.resetOffsetCfg = tmq->resetOffsetCfg; req.enableReplay = tmq->replayEnable; req.enableBatchMeta = tmq->enableBatchMeta; @@ -2343,6 +2396,8 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { } } + atomic_val_compare_exchange_8(&pollFlag, 0, 1); + while (1) { tmqHandleAllDelayedTask(tmq); diff --git a/source/common/src/systable.c b/source/common/src/systable.c index 0c0073b4a7..2d69a687a6 100644 --- a/source/common/src/systable.c +++ b/source/common/src/systable.c @@ -482,16 +482,16 @@ static const SSysDbTableSchema connectionsSchema[] = { static const SSysDbTableSchema consumerSchema[] = { - {.name = "consumer_id", .bytes = 32, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false}, - {.name = "consumer_group", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false}, - {.name = "client_id", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false}, + {.name = "consumer_id", .bytes = TSDB_CONSUMER_ID_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false}, + {.name = "consumer_group", .bytes = TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false}, + {.name = "client_id", .bytes = TSDB_CLIENT_ID_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false}, {.name = "status", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false}, {.name = "topics", .bytes = TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false}, /*{.name = "end_point", .bytes = TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},*/ {.name = "up_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false}, {.name = "subscribe_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false}, {.name = "rebalance_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false}, - {.name = "parameters", .bytes = 64 + TSDB_OFFSET_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false}, + {.name = "parameters", .bytes = 128 + TSDB_OFFSET_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false}, }; static const SSysDbTableSchema offsetSchema[] = { diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 32915ba884..9777030953 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -7083,6 +7083,7 @@ int32_t tSerializeSMqHbReq(void *buf, int32_t bufLen, SMqHbReq *pReq) { } } + if (tEncodeI8(&encoder, pReq->pollFlag) < 0) return -1; tEndEncode(&encoder); int32_t tlen = encoder.pos; @@ -7122,6 +7123,9 @@ int32_t tDeserializeSMqHbReq(void *buf, int32_t bufLen, SMqHbReq *pReq) { } } } + if (!tDecodeIsEnd(&decoder)) { + if (tDecodeI8(&decoder, &pReq->pollFlag) < 0) return -1; + } tEndDecode(&decoder); tDecoderClear(&decoder); diff --git a/source/dnode/mnode/impl/inc/mndConsumer.h b/source/dnode/mnode/impl/inc/mndConsumer.h index e1a790867e..a773c61986 100644 --- a/source/dnode/mnode/impl/inc/mndConsumer.h +++ b/source/dnode/mnode/impl/inc/mndConsumer.h @@ -25,7 +25,7 @@ extern "C" { enum { MQ_CONSUMER_STATUS_REBALANCE = 1, MQ_CONSUMER_STATUS_READY, - MQ_CONSUMER_STATUS_LOST, +// MQ_CONSUMER_STATUS_LOST, }; int32_t mndInitConsumer(SMnode *pMnode); diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 5319cc88eb..62e77867f6 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -596,11 +596,12 @@ typedef struct { typedef struct { int64_t consumerId; char cgroup[TSDB_CGROUP_LEN]; - char clientId[256]; + char clientId[TSDB_CLIENT_ID_LEN]; int8_t updateType; // used only for update int32_t epoch; int32_t status; int32_t hbStatus; // hbStatus is not applicable to serialization + int32_t pollStatus; // pollStatus is not applicable to serialization SRWLatch lock; // lock is used for topics update SArray* currentTopics; // SArray SArray* rebNewTopics; // SArray @@ -620,6 +621,8 @@ typedef struct { int8_t autoCommit; int32_t autoCommitInterval; int32_t resetOffsetCfg; + int32_t sessionTimeoutMs; + int32_t maxPollIntervalMs; } SMqConsumerObj; int32_t tNewSMqConsumerObj(int64_t consumerId, char *cgroup, int8_t updateType, diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index fbbe9a742a..1a9f808688 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -25,7 +25,7 @@ #include "tcompare.h" #include "tname.h" -#define MND_CONSUMER_VER_NUMBER 2 +#define MND_CONSUMER_VER_NUMBER 3 #define MND_CONSUMER_RESERVE_SIZE 64 #define MND_MAX_GROUP_PER_TOPIC 100 @@ -55,7 +55,6 @@ int32_t mndInitConsumer(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_MND_TMQ_SUBSCRIBE, mndProcessSubscribeReq); mndSetMsgHandle(pMnode, TDMT_MND_TMQ_HB, mndProcessMqHbReq); mndSetMsgHandle(pMnode, TDMT_MND_TMQ_ASK_EP, mndProcessAskEpReq); - // mndSetMsgHandle(pMnode, TDMT_MND_TMQ_TIMER, mndProcessMqTimerMsg); mndSetMsgHandle(pMnode, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, mndProcessConsumerClearMsg); mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONSUMERS, mndRetrieveConsumer); @@ -238,11 +237,10 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) { MND_TMQ_RETURN_CHECK(mndAcquireConsumer(pMnode, consumerId, &pConsumer)); MND_TMQ_RETURN_CHECK(checkPrivilege(pMnode, pConsumer, &rsp, pMsg->info.conn.user)); atomic_store_32(&pConsumer->hbStatus, 0); - int32_t status = atomic_load_32(&pConsumer->status); - if (status == MQ_CONSUMER_STATUS_LOST) { - mInfo("try to recover consumer:0x%" PRIx64, consumerId); - MND_TMQ_RETURN_CHECK(mndSendConsumerMsg(pMnode, pConsumer->consumerId, TDMT_MND_TMQ_CONSUMER_RECOVER, &pMsg->info)); + if (req.pollFlag == 1){ + atomic_store_32(&pConsumer->pollStatus, 0); } + storeOffsetRows(pMnode, &req, pConsumer); code = buildMqHbRsp(pMsg, &rsp); @@ -389,11 +387,9 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) { code = TSDB_CODE_MND_CONSUMER_NOT_EXIST; goto END; } - atomic_store_32(&pConsumer->hbStatus, 0); + + // 1. check consumer status int32_t status = atomic_load_32(&pConsumer->status); - if (status == MQ_CONSUMER_STATUS_LOST) { - MND_TMQ_RETURN_CHECK(mndSendConsumerMsg(pMnode, pConsumer->consumerId, TDMT_MND_TMQ_CONSUMER_RECOVER, &pMsg->info)); - } if (status != MQ_CONSUMER_STATUS_READY) { mInfo("consumer:0x%" PRIx64 " not ready, status: %s", consumerId, mndConsumerStatusName(status)); code = TSDB_CODE_MND_CONSUMER_NOT_READY; @@ -566,7 +562,7 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { STrans *pTrans = NULL; SCMSubscribeReq subscribe = {0}; - MND_TMQ_RETURN_CHECK(tDeserializeSCMSubscribeReq(msgStr, &subscribe)); + MND_TMQ_RETURN_CHECK(tDeserializeSCMSubscribeReq(msgStr, &subscribe, pMsg->contLen)); if(taosArrayGetSize(subscribe.topicNames) == 0){ SMqConsumerObj *pConsumerTmp = NULL; MND_TMQ_RETURN_CHECK(mndAcquireConsumer(pMnode, subscribe.consumerId, &pConsumerTmp)); @@ -701,17 +697,17 @@ static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer) { return 0; } -static void updateConsumerStatus(SMqConsumerObj *pConsumer) { - int32_t status = pConsumer->status; - - if (taosArrayGetSize(pConsumer->rebNewTopics) == 0 && taosArrayGetSize(pConsumer->rebRemovedTopics) == 0) { - if (status == MQ_CONSUMER_STATUS_REBALANCE) { - pConsumer->status = MQ_CONSUMER_STATUS_READY; - } else if (status == MQ_CONSUMER_STATUS_READY && taosArrayGetSize(pConsumer->currentTopics) == 0) { - pConsumer->status = MQ_CONSUMER_STATUS_LOST; - } - } -} +//static void updateConsumerStatus(SMqConsumerObj *pConsumer) { +// int32_t status = pConsumer->status; +// +// if (taosArrayGetSize(pConsumer->rebNewTopics) == 0 && taosArrayGetSize(pConsumer->rebRemovedTopics) == 0) { +// if (status == MQ_CONSUMER_STATUS_REBALANCE) { +// pConsumer->status = MQ_CONSUMER_STATUS_READY; +// } else if (status == MQ_CONSUMER_STATUS_READY && taosArrayGetSize(pConsumer->currentTopics) == 0) { +// pConsumer->status = MQ_CONSUMER_STATUS_LOST; +// } +// } +//} // remove from topic list static void removeFromTopicList(SArray *topicList, const char *pTopic, int64_t consumerId, char *type) { @@ -757,21 +753,6 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, pOldConsumer->subscribeTime = taosGetTimestampMs(); pOldConsumer->status = MQ_CONSUMER_STATUS_REBALANCE; mInfo("consumer:0x%" PRIx64 " subscribe update, modify existed consumer", pOldConsumer->consumerId); - } else if (pNewConsumer->updateType == CONSUMER_UPDATE_REC) { - int32_t sz = taosArrayGetSize(pOldConsumer->assignedTopics); - for (int32_t i = 0; i < sz; i++) { - void * tmp = taosArrayGetP(pOldConsumer->assignedTopics, i); - if (tmp == NULL){ - return TSDB_CODE_TMQ_INVALID_MSG; - } - char *topic = taosStrdup(tmp); - if (taosArrayPush(pOldConsumer->rebNewTopics, &topic) == NULL) { - taosMemoryFree(topic); - return TSDB_CODE_TMQ_INVALID_MSG; - } - } - pOldConsumer->status = MQ_CONSUMER_STATUS_REBALANCE; - mInfo("consumer:0x%" PRIx64 " recover update", pOldConsumer->consumerId); } else if (pNewConsumer->updateType == CONSUMER_UPDATE_REB) { (void)atomic_add_fetch_32(&pOldConsumer->epoch, 1); pOldConsumer->rebalanceTime = taosGetTimestampMs(); @@ -796,7 +777,11 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, } int32_t status = pOldConsumer->status; - updateConsumerStatus(pOldConsumer); +// updateConsumerStatus(pOldConsumer); + if (taosArrayGetSize(pOldConsumer->rebNewTopics) == 0 && taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0) { + pOldConsumer->status = MQ_CONSUMER_STATUS_READY; + } + pOldConsumer->rebalanceTime = taosGetTimestampMs(); (void)atomic_add_fetch_32(&pOldConsumer->epoch, 1); @@ -816,7 +801,10 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, removeFromTopicList(pOldConsumer->currentTopics, topic, pOldConsumer->consumerId, "current"); int32_t status = pOldConsumer->status; - updateConsumerStatus(pOldConsumer); +// updateConsumerStatus(pOldConsumer); + if (taosArrayGetSize(pOldConsumer->rebNewTopics) == 0 && taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0) { + pOldConsumer->status = MQ_CONSUMER_STATUS_READY; + } pOldConsumer->rebalanceTime = taosGetTimestampMs(); (void)atomic_add_fetch_32(&pOldConsumer->epoch, 1); @@ -852,6 +840,8 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock * int32_t numOfRows = 0; SMqConsumerObj *pConsumer = NULL; int32_t code = 0; + char *parasStr = NULL; + char *status = NULL; while (numOfRows < rowsCapacity) { pShow->pIter = sdbFetch(pSdb, SDB_CONSUMER, pShow->pIter, (void **)&pConsumer); @@ -884,7 +874,7 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock * int32_t cols = 0; // consumer id - char consumerIdHex[32] = {0}; + char consumerIdHex[TSDB_CONSUMER_ID_LEN + VARSTR_HEADER_SIZE] = {0}; (void)sprintf(varDataVal(consumerIdHex), "0x%" PRIx64, pConsumer->consumerId); varDataSetLen(consumerIdHex, strlen(varDataVal(consumerIdHex))); @@ -901,7 +891,7 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock * MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)cgroup, false)); // client id - char clientId[256 + VARSTR_HEADER_SIZE] = {0}; + char clientId[TSDB_CLIENT_ID_LEN + VARSTR_HEADER_SIZE] = {0}; STR_TO_VARSTR(clientId, pConsumer->clientId); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); @@ -909,13 +899,15 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock * MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)clientId, false)); // status - char status[20 + VARSTR_HEADER_SIZE] = {0}; const char *pStatusName = mndConsumerStatusName(pConsumer->status); + status = taosMemoryCalloc(1, pShow->pMeta->pSchemas[cols].bytes); + MND_TMQ_NULL_CHECK(status); STR_TO_VARSTR(status, pStatusName); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); MND_TMQ_NULL_CHECK(pColInfo); MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)status, false)); + taosMemoryFreeClear(status); // one subscribed topic pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); @@ -948,7 +940,8 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock * STqOffsetVal pVal = {.type = pConsumer->resetOffsetCfg}; tFormatOffset(buf, TSDB_OFFSET_LEN, &pVal); - char parasStr[64 + TSDB_OFFSET_LEN + VARSTR_HEADER_SIZE] = {0}; + parasStr = taosMemoryCalloc(1, pShow->pMeta->pSchemas[cols].bytes); + MND_TMQ_NULL_CHECK(parasStr); (void)sprintf(varDataVal(parasStr), "tbname:%d,commit:%d,interval:%dms,reset:%s", pConsumer->withTbName, pConsumer->autoCommit, pConsumer->autoCommitInterval, buf); varDataSetLen(parasStr, strlen(varDataVal(parasStr))); @@ -956,7 +949,7 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock * pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); MND_TMQ_NULL_CHECK(pColInfo); MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)parasStr, false)); - + taosMemoryFreeClear(parasStr); numOfRows++; } @@ -970,6 +963,8 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock * return numOfRows; END: + taosMemoryFreeClear(status); + taosMemoryFreeClear(parasStr); return code; } @@ -982,8 +977,8 @@ const char *mndConsumerStatusName(int status) { switch (status) { case MQ_CONSUMER_STATUS_READY: return "ready"; - case MQ_CONSUMER_STATUS_LOST: - return "lost"; +// case MQ_CONSUMER_STATUS_LOST: +// return "lost"; case MQ_CONSUMER_STATUS_REBALANCE: return "rebalancing"; default: diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index a307e3557b..5a7831ac0e 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -791,12 +791,12 @@ static int32_t mndCreateDb(SMnode *pMnode, SRpcMsg *pReq, SCreateDbReq *pCreate, mndSetDefaultDbCfg(&dbObj.cfg); if ((code = mndCheckDbName(dbObj.name, pUser)) != 0) { - mError("db:%s, failed to create since %s", pCreate->db, terrstr()); + mError("db:%s, failed to create, check db name failed, since %s", pCreate->db, terrstr()); TAOS_RETURN(code); } if ((code = mndCheckDbCfg(pMnode, &dbObj.cfg)) != 0) { - mError("db:%s, failed to create since %s", pCreate->db, terrstr()); + mError("db:%s, failed to create, check db cfg failed, since %s", pCreate->db, terrstr()); TAOS_RETURN(code); } @@ -812,7 +812,7 @@ static int32_t mndCreateDb(SMnode *pMnode, SRpcMsg *pReq, SCreateDbReq *pCreate, SVgObj *pVgroups = NULL; if ((code = mndAllocVgroup(pMnode, &dbObj, &pVgroups)) != 0) { - mError("db:%s, failed to create since %s", pCreate->db, terrstr()); + mError("db:%s, failed to create, alloc vgroup failed, since %s", pCreate->db, terrstr()); TAOS_RETURN(code); } @@ -965,7 +965,7 @@ static int32_t mndProcessCreateDbReq(SRpcMsg *pReq) { TAOS_CHECK_GOTO(mndAcquireUser(pMnode, pReq->info.conn.user, &pUser), &lino, _OVER); - code = mndCreateDb(pMnode, pReq, &createReq, pUser); + TAOS_CHECK_GOTO(mndCreateDb(pMnode, pReq, &createReq, pUser), &lino, _OVER); if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS; SName name = {0}; diff --git a/source/dnode/mnode/impl/src/mndDef.c b/source/dnode/mnode/impl/src/mndDef.c index 6fdd77af98..695bf4d30d 100644 --- a/source/dnode/mnode/impl/src/mndDef.c +++ b/source/dnode/mnode/impl/src/mndDef.c @@ -288,6 +288,7 @@ int32_t tNewSMqConsumerObj(int64_t consumerId, char *cgroup, int8_t updateType, pConsumer->epoch = 0; pConsumer->status = MQ_CONSUMER_STATUS_REBALANCE; pConsumer->hbStatus = 0; + pConsumer->pollStatus = 0; taosInitRWLatch(&pConsumer->lock); pConsumer->createTime = taosGetTimestampMs(); @@ -322,6 +323,8 @@ int32_t tNewSMqConsumerObj(int64_t consumerId, char *cgroup, int8_t updateType, pConsumer->autoCommit = subscribe->autoCommit; pConsumer->autoCommitInterval = subscribe->autoCommitInterval; pConsumer->resetOffsetCfg = subscribe->resetOffsetCfg; + pConsumer->maxPollIntervalMs = subscribe->maxPollIntervalMs; + pConsumer->sessionTimeoutMs = subscribe->sessionTimeoutMs; pConsumer->rebNewTopics = taosArrayDup(subscribe->topicNames, topicNameDup); if (pConsumer->rebNewTopics == NULL){ @@ -424,6 +427,8 @@ int32_t tEncodeSMqConsumerObj(void **buf, const SMqConsumerObj *pConsumer) { tlen += taosEncodeFixedI8(buf, pConsumer->autoCommit); tlen += taosEncodeFixedI32(buf, pConsumer->autoCommitInterval); tlen += taosEncodeFixedI32(buf, pConsumer->resetOffsetCfg); + tlen += taosEncodeFixedI32(buf, pConsumer->maxPollIntervalMs); + tlen += taosEncodeFixedI32(buf, pConsumer->sessionTimeoutMs); return tlen; } @@ -495,6 +500,14 @@ void *tDecodeSMqConsumerObj(const void *buf, SMqConsumerObj *pConsumer, int8_t s buf = taosDecodeFixedI32(buf, &pConsumer->autoCommitInterval); buf = taosDecodeFixedI32(buf, &pConsumer->resetOffsetCfg); } + if (sver > 2){ + buf = taosDecodeFixedI32(buf, &pConsumer->maxPollIntervalMs); + buf = taosDecodeFixedI32(buf, &pConsumer->sessionTimeoutMs); + } else{ + pConsumer->maxPollIntervalMs = DEFAULT_MAX_POLL_INTERVAL; + pConsumer->sessionTimeoutMs = DEFAULT_SESSION_TIMEOUT; + } + return (void *)buf; } diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index 7e072b8fe5..37a171e9a4 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -669,6 +669,13 @@ SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) { } (void)memset(pMnode, 0, sizeof(SMnode)); + int32_t code = taosThreadRwlockInit(&pMnode->lock, NULL); + if (code != 0) { + taosMemoryFree(pMnode); + mError("failed to open mnode lock since %s", tstrerror(code)); + return NULL; + } + char timestr[24] = "1970-01-01 00:00:00.00"; (void)taosParseTime(timestr, &pMnode->checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0); mndSetOptions(pMnode, pOption); @@ -682,7 +689,7 @@ SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) { return NULL; } - int32_t code = mndCreateDir(pMnode, path); + code = mndCreateDir(pMnode, path); if (code != 0) { code = terrno; mError("failed to open mnode since %s", tstrerror(code)); diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index d45346516b..585bf5527d 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -27,8 +27,7 @@ #define MND_SUBSCRIBE_VER_NUMBER 3 #define MND_SUBSCRIBE_RESERVE_SIZE 64 -#define MND_CONSUMER_LOST_HB_CNT 6 -#define MND_CONSUMER_LOST_CLEAR_THRESHOLD 43200 +//#define MND_CONSUMER_LOST_HB_CNT 6 static int32_t mqRebInExecCnt = 0; @@ -331,6 +330,7 @@ static int32_t processRemoveAddVgs(SMnode *pMnode, SMqRebOutputObj *pOutput) { int32_t code = 0; int32_t totalVgNum = 0; SVgObj *pVgroup = NULL; + SMqVgEp *pVgEp = NULL; void *pIter = NULL; SArray *newVgs = taosArrayInit(0, POINTER_BYTES); MND_TMQ_NULL_CHECK(newVgs); @@ -346,11 +346,12 @@ static int32_t processRemoveAddVgs(SMnode *pMnode, SMqRebOutputObj *pOutput) { } totalVgNum++; - SMqVgEp *pVgEp = taosMemoryMalloc(sizeof(SMqVgEp)); + pVgEp = taosMemoryMalloc(sizeof(SMqVgEp)); MND_TMQ_NULL_CHECK(pVgEp); pVgEp->epSet = mndGetVgroupEpset(pMnode, pVgroup); pVgEp->vgId = pVgroup->vgId; MND_TMQ_NULL_CHECK(taosArrayPush(newVgs, &pVgEp)); + pVgEp = NULL; sdbRelease(pMnode->pSdb, pVgroup); } @@ -361,13 +362,13 @@ static int32_t processRemoveAddVgs(SMnode *pMnode, SMqRebOutputObj *pOutput) { SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter; int32_t j = 0; while (j < taosArrayGetSize(pConsumerEp->vgs)) { - SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, j); - MND_TMQ_NULL_CHECK(pVgEp); + SMqVgEp *pVgEpTmp = taosArrayGetP(pConsumerEp->vgs, j); + MND_TMQ_NULL_CHECK(pVgEpTmp); bool find = false; for (int32_t k = 0; k < taosArrayGetSize(newVgs); k++) { SMqVgEp *pnewVgEp = taosArrayGetP(newVgs, k); MND_TMQ_NULL_CHECK(pnewVgEp); - if (pVgEp->vgId == pnewVgEp->vgId) { + if (pVgEpTmp->vgId == pnewVgEp->vgId) { tDeleteSMqVgEp(pnewVgEp); taosArrayRemove(newVgs, k); find = true; @@ -375,8 +376,8 @@ static int32_t processRemoveAddVgs(SMnode *pMnode, SMqRebOutputObj *pOutput) { } } if (!find) { - mInfo("[rebalance] processRemoveAddVgs old vgId:%d", pVgEp->vgId); - tDeleteSMqVgEp(pVgEp); + mInfo("[rebalance] processRemoveAddVgs old vgId:%d", pVgEpTmp->vgId); + tDeleteSMqVgEp(pVgEpTmp); taosArrayRemove(pConsumerEp->vgs, j); continue; } @@ -387,12 +388,16 @@ static int32_t processRemoveAddVgs(SMnode *pMnode, SMqRebOutputObj *pOutput) { if (taosArrayGetSize(pOutput->pSub->unassignedVgs) == 0 && taosArrayGetSize(newVgs) != 0) { MND_TMQ_NULL_CHECK(taosArrayAddAll(pOutput->pSub->unassignedVgs, newVgs)); mInfo("[rebalance] processRemoveAddVgs add new vg num:%d", (int)taosArrayGetSize(newVgs)); - (void)taosArrayDestroy(newVgs); + taosArrayDestroy(newVgs); } else { - (void)taosArrayDestroyP(newVgs, (FDelete)tDeleteSMqVgEp); + taosArrayDestroyP(newVgs, (FDelete)tDeleteSMqVgEp); } return totalVgNum; + END: + sdbRelease(pMnode->pSdb, pVgroup); + taosMemoryFree(pVgEp); + taosArrayDestroyP(newVgs, (FDelete)tDeleteSMqVgEp); return code; } @@ -758,32 +763,32 @@ static int32_t mndCheckConsumer(SRpcMsg *pMsg, SHashObj *rebSubHash) { } int32_t hbStatus = atomic_add_fetch_32(&pConsumer->hbStatus, 1); + int32_t pollStatus = atomic_add_fetch_32(&pConsumer->pollStatus, 1); int32_t status = atomic_load_32(&pConsumer->status); mDebug("[rebalance] check for consumer:0x%" PRIx64 " status:%d(%s), sub-time:%" PRId64 ", createTime:%" PRId64 - ", hbstatus:%d", + ", hbstatus:%d, pollStatus:%d", pConsumer->consumerId, status, mndConsumerStatusName(status), pConsumer->subscribeTime, - pConsumer->createTime, hbStatus); + pConsumer->createTime, hbStatus, pollStatus); if (status == MQ_CONSUMER_STATUS_READY) { - if (taosArrayGetSize(pConsumer->assignedTopics) == 0) { // unsubscribe or close + if (taosArrayGetSize(pConsumer->currentTopics) == 0) { // unsubscribe or close MND_TMQ_RETURN_CHECK(mndSendConsumerMsg(pMnode, pConsumer->consumerId, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, &pMsg->info)); - } else if (hbStatus > MND_CONSUMER_LOST_HB_CNT) { + } else if (hbStatus * tsMqRebalanceInterval * 1000 >= pConsumer->sessionTimeoutMs || + pollStatus * tsMqRebalanceInterval * 1000 >= pConsumer->maxPollIntervalMs) { taosRLockLatch(&pConsumer->lock); MND_TMQ_RETURN_CHECK(buildRebInfo(rebSubHash, pConsumer->currentTopics, 0, pConsumer->cgroup, pConsumer->consumerId)); taosRUnLockLatch(&pConsumer->lock); } else { checkForVgroupSplit(pMnode, pConsumer, rebSubHash); } - } else if (status == MQ_CONSUMER_STATUS_LOST) { - if (hbStatus > MND_CONSUMER_LOST_CLEAR_THRESHOLD) { // clear consumer if lost a day - MND_TMQ_RETURN_CHECK(mndSendConsumerMsg(pMnode, pConsumer->consumerId, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, &pMsg->info)); - } - } else { + } else if (status == MQ_CONSUMER_STATUS_REBALANCE) { taosRLockLatch(&pConsumer->lock); MND_TMQ_RETURN_CHECK(buildRebInfo(rebSubHash, pConsumer->rebNewTopics, 1, pConsumer->cgroup, pConsumer->consumerId)); MND_TMQ_RETURN_CHECK(buildRebInfo(rebSubHash, pConsumer->rebRemovedTopics, 0, pConsumer->cgroup, pConsumer->consumerId)); taosRUnLockLatch(&pConsumer->lock); + } else { + MND_TMQ_RETURN_CHECK(mndSendConsumerMsg(pMnode, pConsumer->consumerId, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, &pMsg->info)); } mndReleaseConsumer(pMnode, pConsumer); @@ -1013,37 +1018,37 @@ END: return code; } -static int32_t mndDropConsumerByGroup(SMnode *pMnode, STrans *pTrans, char *cgroup, char *topic) { - void *pIter = NULL; - SMqConsumerObj *pConsumer = NULL; - int code = 0; - while (1) { - pIter = sdbFetch(pMnode->pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer); - if (pIter == NULL) { - break; - } - - // drop consumer in lost status, other consumers not in lost status already deleted by rebalance - if (pConsumer->status != MQ_CONSUMER_STATUS_LOST || strcmp(cgroup, pConsumer->cgroup) != 0) { - sdbRelease(pMnode->pSdb, pConsumer); - continue; - } - int32_t sz = taosArrayGetSize(pConsumer->assignedTopics); - for (int32_t i = 0; i < sz; i++) { - char *name = taosArrayGetP(pConsumer->assignedTopics, i); - if (name && strcmp(topic, name) == 0) { - MND_TMQ_RETURN_CHECK(mndSetConsumerDropLogs(pTrans, pConsumer)); - } - } - - sdbRelease(pMnode->pSdb, pConsumer); - } - -END: - sdbRelease(pMnode->pSdb, pConsumer); - sdbCancelFetch(pMnode->pSdb, pIter); - return code; -} +//static int32_t mndDropConsumerByGroup(SMnode *pMnode, STrans *pTrans, char *cgroup, char *topic) { +// void *pIter = NULL; +// SMqConsumerObj *pConsumer = NULL; +// int code = 0; +// while (1) { +// pIter = sdbFetch(pMnode->pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer); +// if (pIter == NULL) { +// break; +// } +// +// // drop consumer in lost status, other consumers not in lost status already deleted by rebalance +// if (pConsumer->status != MQ_CONSUMER_STATUS_LOST || strcmp(cgroup, pConsumer->cgroup) != 0) { +// sdbRelease(pMnode->pSdb, pConsumer); +// continue; +// } +// int32_t sz = taosArrayGetSize(pConsumer->assignedTopics); +// for (int32_t i = 0; i < sz; i++) { +// char *name = taosArrayGetP(pConsumer->assignedTopics, i); +// if (name && strcmp(topic, name) == 0) { +// MND_TMQ_RETURN_CHECK(mndSetConsumerDropLogs(pTrans, pConsumer)); +// } +// } +// +// sdbRelease(pMnode->pSdb, pConsumer); +// } +// +//END: +// sdbRelease(pMnode->pSdb, pConsumer); +// sdbCancelFetch(pMnode->pSdb, pIter); +// return code; +//} static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) { SMnode *pMnode = pMsg->info.node; @@ -1079,7 +1084,6 @@ static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) { mInfo("trans:%d, used to drop cgroup:%s on topic %s", pTrans->id, dropReq.cgroup, dropReq.topic); mndTransSetDbName(pTrans, pSub->dbName, dropReq.cgroup); MND_TMQ_RETURN_CHECK(mndTransCheckConflict(pMnode, pTrans)); - MND_TMQ_RETURN_CHECK(mndDropConsumerByGroup(pMnode, pTrans, dropReq.cgroup, dropReq.topic)); MND_TMQ_RETURN_CHECK(sendDeleteSubToVnode(pMnode, pSub, pTrans)); MND_TMQ_RETURN_CHECK(mndSetDropSubCommitLogs(pMnode, pTrans, pSub)); MND_TMQ_RETURN_CHECK(mndTransPrepare(pMnode, pTrans)); diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index 930c4d66d5..46d5afc145 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -613,44 +613,44 @@ static bool checkTopic(SArray *topics, char *topicName){ return false; } -static int32_t mndDropConsumerByTopic(SMnode *pMnode, STrans *pTrans, char *topicName){ - int32_t code = 0; - SSdb *pSdb = pMnode->pSdb; - void *pIter = NULL; - SMqConsumerObj *pConsumer = NULL; - while (1) { - pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer); - if (pIter == NULL) { - break; - } - - bool found = checkTopic(pConsumer->assignedTopics, topicName); - if (found){ - if (pConsumer->status == MQ_CONSUMER_STATUS_LOST) { - MND_TMQ_RETURN_CHECK(mndSetConsumerDropLogs(pTrans, pConsumer)); - sdbRelease(pSdb, pConsumer); - continue; - } - mError("topic:%s, failed to drop since subscribed by consumer:0x%" PRIx64 ", in consumer group %s", - topicName, pConsumer->consumerId, pConsumer->cgroup); - code = TSDB_CODE_MND_TOPIC_SUBSCRIBED; - goto END; - } - - if (checkTopic(pConsumer->rebNewTopics, topicName) || checkTopic(pConsumer->rebRemovedTopics, topicName)) { - code = TSDB_CODE_MND_TOPIC_SUBSCRIBED; - mError("topic:%s, failed to drop since subscribed by consumer:%" PRId64 ", in consumer group %s (reb new)", - topicName, pConsumer->consumerId, pConsumer->cgroup); - goto END; - } - sdbRelease(pSdb, pConsumer); - } - -END: - sdbRelease(pSdb, pConsumer); - sdbCancelFetch(pSdb, pIter); - return code; -} +//static int32_t mndDropConsumerByTopic(SMnode *pMnode, STrans *pTrans, char *topicName){ +// int32_t code = 0; +// SSdb *pSdb = pMnode->pSdb; +// void *pIter = NULL; +// SMqConsumerObj *pConsumer = NULL; +// while (1) { +// pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer); +// if (pIter == NULL) { +// break; +// } +// +// bool found = checkTopic(pConsumer->assignedTopics, topicName); +// if (found){ +// if (pConsumer->status == MQ_CONSUMER_STATUS_LOST) { +// MND_TMQ_RETURN_CHECK(mndSetConsumerDropLogs(pTrans, pConsumer)); +// sdbRelease(pSdb, pConsumer); +// continue; +// } +// mError("topic:%s, failed to drop since subscribed by consumer:0x%" PRIx64 ", in consumer group %s", +// topicName, pConsumer->consumerId, pConsumer->cgroup); +// code = TSDB_CODE_MND_TOPIC_SUBSCRIBED; +// goto END; +// } +// +// if (checkTopic(pConsumer->rebNewTopics, topicName) || checkTopic(pConsumer->rebRemovedTopics, topicName)) { +// code = TSDB_CODE_MND_TOPIC_SUBSCRIBED; +// mError("topic:%s, failed to drop since subscribed by consumer:%" PRId64 ", in consumer group %s (reb new)", +// topicName, pConsumer->consumerId, pConsumer->cgroup); +// goto END; +// } +// sdbRelease(pSdb, pConsumer); +// } +// +//END: +// sdbRelease(pSdb, pConsumer); +// sdbCancelFetch(pSdb, pIter); +// return code; +//} static int32_t mndDropCheckInfoByTopic(SMnode *pMnode, STrans *pTrans, SMqTopicObj *pTopic){ // broadcast to all vnode @@ -722,9 +722,10 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) { mndTransSetDbName(pTrans, pTopic->db, NULL); MND_TMQ_RETURN_CHECK(mndTransCheckConflict(pMnode, pTrans)); mInfo("trans:%d, used to drop topic:%s", pTrans->id, pTopic->name); + MND_TMQ_RETURN_CHECK(mndCheckTopicPrivilege(pMnode, pReq->info.conn.user, MND_OPER_DROP_TOPIC, pTopic)); MND_TMQ_RETURN_CHECK(mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_READ_DB, pTopic->db)); - MND_TMQ_RETURN_CHECK(mndDropConsumerByTopic(pMnode, pTrans, dropReq.name)); +// MND_TMQ_RETURN_CHECK(mndDropConsumerByTopic(pMnode, pTrans, dropReq.name)); MND_TMQ_RETURN_CHECK(mndDropSubByTopic(pMnode, pTrans, dropReq.name)); if (pTopic->ntbUid != 0) { diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index 57a7453eac..5cfc896a1c 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -877,7 +877,7 @@ int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups) { pVgroup->dbUid = pDb->uid; pVgroup->replica = pDb->cfg.replications; - if (mndGetAvailableDnode(pMnode, pDb, pVgroup, pArray) != 0) { + if ((code = mndGetAvailableDnode(pMnode, pDb, pVgroup, pArray)) != 0) { goto _OVER; } diff --git a/utils/test/c/tmq_taosx_ci.c b/utils/test/c/tmq_taosx_ci.c index 51d134a463..2f6a5fa59b 100644 --- a/utils/test/c/tmq_taosx_ci.c +++ b/utils/test/c/tmq_taosx_ci.c @@ -596,6 +596,7 @@ tmq_t* build_consumer() { tmq_conf_set(conf, "enable.auto.commit", "true"); tmq_conf_set(conf, "auto.offset.reset", "earliest"); tmq_conf_set(conf, "msg.consume.excluded", "1"); +// tmq_conf_set(conf, "max.poll.interval.ms", "20000"); if (g_conf.snapShot) { tmq_conf_set(conf, "experimental.snapshot.enable", "true");