From b28437aaba2bbc13c0030e129c71858866cc687b Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 10 Jul 2024 18:42:20 +0800 Subject: [PATCH 01/24] feat:[TS-4592] clear lost status for consumer --- include/common/tmsg.h | 21 ++- include/util/tdef.h | 2 + source/client/src/clientTmq.c | 73 +++++++- source/common/src/systable.c | 8 +- source/common/src/tmsg.c | 4 + source/dnode/mnode/impl/inc/mndDef.h | 5 +- source/dnode/mnode/impl/src/mndConsumer.c | 200 +++++++++++---------- source/dnode/mnode/impl/src/mndDef.c | 13 ++ source/dnode/mnode/impl/src/mndSubscribe.c | 33 ++-- 9 files changed, 228 insertions(+), 131 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index a83aa4da44..c812138282 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2772,6 +2772,9 @@ enum { TOPIC_SUB_TYPE__COLUMN, }; +#define DEFAULT_MAX_POLL_INTERVAL 3000000 +#define DEFAULT_SESSION_TIMEOUT 10000 + typedef struct { char name[TSDB_TOPIC_FNAME_LEN]; // accout.topic int8_t igExists; @@ -2794,7 +2797,7 @@ typedef struct { typedef struct { int64_t consumerId; char cgroup[TSDB_CGROUP_LEN]; - char clientId[256]; + char clientId[TSDB_CLIENT_ID_LEN]; SArray* topicNames; // SArray int8_t withTbName; @@ -2803,6 +2806,8 @@ typedef struct { int8_t resetOffsetCfg; int8_t enableReplay; int8_t enableBatchMeta; + int32_t sessionTimeoutMs; + int32_t maxPollIntervalMs; } SCMSubscribeReq; static FORCE_INLINE int32_t tSerializeSCMSubscribeReq(void** buf, const SCMSubscribeReq* pReq) { @@ -2824,11 +2829,14 @@ static FORCE_INLINE int32_t tSerializeSCMSubscribeReq(void** buf, const SCMSubsc tlen += taosEncodeFixedI8(buf, pReq->resetOffsetCfg); tlen += taosEncodeFixedI8(buf, pReq->enableReplay); tlen += taosEncodeFixedI8(buf, pReq->enableBatchMeta); + tlen += taosEncodeFixedI32(buf, pReq->sessionTimeoutMs); + tlen += taosEncodeFixedI32(buf, pReq->maxPollIntervalMs); return tlen; } -static FORCE_INLINE void* tDeserializeSCMSubscribeReq(void* buf, SCMSubscribeReq* pReq) { +static FORCE_INLINE void* tDeserializeSCMSubscribeReq(void* buf, SCMSubscribeReq* pReq, int32_t len) { + void* start = buf; buf = taosDecodeFixedI64(buf, &pReq->consumerId); buf = taosDecodeStringTo(buf, pReq->cgroup); buf = taosDecodeStringTo(buf, pReq->clientId); @@ -2849,6 +2857,14 @@ static FORCE_INLINE void* tDeserializeSCMSubscribeReq(void* buf, SCMSubscribeReq buf = taosDecodeFixedI8(buf, &pReq->resetOffsetCfg); buf = taosDecodeFixedI8(buf, &pReq->enableReplay); buf = taosDecodeFixedI8(buf, &pReq->enableBatchMeta); + if (buf - start < len) { + buf = taosDecodeFixedI32(buf, &pReq->sessionTimeoutMs); + buf = taosDecodeFixedI32(buf, &pReq->maxPollIntervalMs); + } else { + pReq->sessionTimeoutMs = DEFAULT_SESSION_TIMEOUT; + pReq->maxPollIntervalMs = DEFAULT_MAX_POLL_INTERVAL; + } + return buf; } @@ -4060,6 +4076,7 @@ typedef struct { int64_t consumerId; int32_t epoch; SArray* topics; + int8_t pollFlag; } SMqHbReq; typedef struct { diff --git a/include/util/tdef.h b/include/util/tdef.h index 9c2858ed30..70358c861c 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -221,6 +221,8 @@ typedef enum ELogicConditionType { #define TSDB_TABLE_NAME_LEN 193 // it is a null-terminated string #define TSDB_TOPIC_NAME_LEN 193 // it is a null-terminated string #define TSDB_CGROUP_LEN 193 // it is a null-terminated string +#define TSDB_CLIENT_ID_LEN 256 // it is a null-terminated string +#define TSDB_CONSUMER_ID_LEN 32 // it is a null-terminated string #define TSDB_OFFSET_LEN 64 // it is a null-terminated string #define TSDB_USER_CGROUP_LEN (TSDB_USER_LEN + TSDB_CGROUP_LEN) // it is a null-terminated string #define TSDB_STREAM_NAME_LEN 193 // it is a null-terminated string diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 21d1a528da..2921b7b333 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -37,6 +37,7 @@ struct SMqMgmt { static TdThreadOnce tmqInit = PTHREAD_ONCE_INIT; // initialize only once volatile int32_t tmqInitRes = 0; // initialize rsp code static struct SMqMgmt tmqMgmt = {0}; +static int8_t pollFlag = 0; typedef struct { int32_t code; @@ -56,7 +57,7 @@ struct tmq_list_t { }; struct tmq_conf_t { - char clientId[256]; + char clientId[TSDB_CLIENT_ID_LEN]; char groupId[TSDB_CGROUP_LEN]; int8_t autoCommit; int8_t resetOffset; @@ -66,6 +67,9 @@ struct tmq_conf_t { int8_t sourceExcluded; // do not consume, bit uint16_t port; int32_t autoCommitInterval; + int32_t sessionTimeoutMs; + int32_t heartBeatIntervalMs; + int32_t maxPollIntervalMs; char* ip; char* user; char* pass; @@ -77,15 +81,18 @@ struct tmq_conf_t { struct tmq_t { int64_t refId; char groupId[TSDB_CGROUP_LEN]; - char clientId[256]; + char clientId[TSDB_CLIENT_ID_LEN]; int8_t withTbName; int8_t useSnapshot; int8_t autoCommit; int32_t autoCommitInterval; + int32_t sessionTimeoutMs; + int32_t heartBeatIntervalMs; + int32_t maxPollIntervalMs; int8_t resetOffsetCfg; int8_t replayEnable; int8_t sourceExcluded; // do not consume, bit - uint64_t consumerId; + int64_t consumerId; tmq_commit_cb* commitCb; void* commitCbUserParam; int8_t enableBatchMeta; @@ -272,6 +279,9 @@ tmq_conf_t* tmq_conf_new() { conf->autoCommitInterval = DEFAULT_AUTO_COMMIT_INTERVAL; conf->resetOffset = TMQ_OFFSET__RESET_LATEST; conf->enableBatchMeta = false; + conf->heartBeatIntervalMs = DEFAULT_HEARTBEAT_INTERVAL; + conf->maxPollIntervalMs = DEFAULT_MAX_POLL_INTERVAL; + conf->sessionTimeoutMs = DEFAULT_SESSION_TIMEOUT; return conf; } @@ -301,7 +311,7 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value } if (strcasecmp(key, "client.id") == 0) { - tstrncpy(conf->clientId, value, 256); + tstrncpy(conf->clientId, value, TSDB_CLIENT_ID_LEN); return TMQ_CONF_OK; } @@ -318,7 +328,38 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value } if (strcasecmp(key, "auto.commit.interval.ms") == 0) { - conf->autoCommitInterval = taosStr2int64(value); + int64_t tmp = taosStr2int64(value); + if (tmp < 0 || EINVAL == errno || ERANGE == errno) { + return TMQ_CONF_INVALID; + } + conf->autoCommitInterval = (tmp > INT32_MAX ? INT32_MAX : tmp); + return TMQ_CONF_OK; + } + + if (strcasecmp(key, "session.timeout.ms") == 0) { + int64_t tmp = taosStr2int64(value); + if (tmp < 6000 || tmp > 1800000){ + return TMQ_CONF_INVALID; + } + conf->sessionTimeoutMs = tmp; + return TMQ_CONF_OK; + } + + if (strcasecmp(key, "heartbeat.interval.ms") == 0) { + int64_t tmp = taosStr2int64(value); + if (tmp < 1000 || tmp >= conf->sessionTimeoutMs){ + return TMQ_CONF_INVALID; + } + conf->heartBeatIntervalMs = tmp; + return TMQ_CONF_OK; + } + + if (strcasecmp(key, "max.poll.interval.ms") == 0) { + int64_t tmp = taosStr2int64(value); + if (tmp < 1000 || tmp > INT32_MAX){ + return TMQ_CONF_INVALID; + } + conf->maxPollIntervalMs = tmp; return TMQ_CONF_OK; } @@ -377,7 +418,12 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value } if (strcasecmp(key, "td.connect.port") == 0) { - conf->port = taosStr2int64(value); + int64_t tmp = taosStr2int64(value); + if (tmp <= 0 || tmp > 65535) { + return TMQ_CONF_INVALID; + } + + conf->port = tmp; return TMQ_CONF_OK; } @@ -813,6 +859,7 @@ void tmqSendHbReq(void* param, void* tmrId) { SMqHbReq req = {0}; req.consumerId = tmq->consumerId; req.epoch = tmq->epoch; + req.pollFlag = atomic_load_8(&pollFlag); taosRLockLatch(&tmq->lock); req.topics = taosArrayInit(taosArrayGetSize(tmq->clientTopics), sizeof(TopicOffsetRows)); for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) { @@ -878,9 +925,10 @@ void tmqSendHbReq(void* param, void* tmrId) { tscError("tmqSendHbReq asyncSendMsgToServer failed"); } + atomic_val_compare_exchange_8(&pollFlag, 1, 0); OVER: tDestroySMqHbReq(&req); - taosTmrReset(tmqSendHbReq, DEFAULT_HEARTBEAT_INTERVAL, param, tmqMgmt.timer, &tmq->hbLiveTimer); + taosTmrReset(tmqSendHbReq, tmq->heartBeatIntervalMs, param, tmqMgmt.timer, &tmq->hbLiveTimer); taosReleaseRef(tmqMgmt.rsetId, refId); } @@ -1134,6 +1182,9 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { pTmq->useSnapshot = conf->snapEnable; pTmq->autoCommit = conf->autoCommit; pTmq->autoCommitInterval = conf->autoCommitInterval; + pTmq->sessionTimeoutMs = conf->sessionTimeoutMs; + pTmq->heartBeatIntervalMs = conf->heartBeatIntervalMs; + pTmq->maxPollIntervalMs = conf->maxPollIntervalMs; pTmq->commitCb = conf->commitCb; pTmq->commitCbUserParam = conf->commitCbUserParam; pTmq->resetOffsetCfg = conf->resetOffset; @@ -1173,7 +1224,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t)); *pRefId = pTmq->refId; - pTmq->hbLiveTimer = taosTmrStart(tmqSendHbReq, DEFAULT_HEARTBEAT_INTERVAL, pRefId, tmqMgmt.timer); + pTmq->hbLiveTimer = taosTmrStart(tmqSendHbReq, pTmq->heartBeatIntervalMs, pRefId, tmqMgmt.timer); char buf[TSDB_OFFSET_LEN] = {0}; STqOffsetVal offset = {.type = pTmq->resetOffsetCfg}; @@ -1203,7 +1254,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { tscInfo("consumer:0x%" PRIx64 " cgroup:%s, subscribe %d topics", tmq->consumerId, tmq->groupId, sz); req.consumerId = tmq->consumerId; - tstrncpy(req.clientId, tmq->clientId, 256); + tstrncpy(req.clientId, tmq->clientId, TSDB_CLIENT_ID_LEN); tstrncpy(req.cgroup, tmq->groupId, TSDB_CGROUP_LEN); req.topicNames = taosArrayInit(sz, sizeof(void*)); @@ -1215,6 +1266,8 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { req.withTbName = tmq->withTbName; req.autoCommit = tmq->autoCommit; req.autoCommitInterval = tmq->autoCommitInterval; + req.sessionTimeoutMs = tmq->sessionTimeoutMs; + req.maxPollIntervalMs = tmq->maxPollIntervalMs; req.resetOffsetCfg = tmq->resetOffsetCfg; req.enableReplay = tmq->replayEnable; req.enableBatchMeta = tmq->enableBatchMeta; @@ -2207,6 +2260,8 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { } } + atomic_val_compare_exchange_8(&pollFlag, 0, 1); + while (1) { tmqHandleAllDelayedTask(tmq); diff --git a/source/common/src/systable.c b/source/common/src/systable.c index 0c0073b4a7..2d69a687a6 100644 --- a/source/common/src/systable.c +++ b/source/common/src/systable.c @@ -482,16 +482,16 @@ static const SSysDbTableSchema connectionsSchema[] = { static const SSysDbTableSchema consumerSchema[] = { - {.name = "consumer_id", .bytes = 32, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false}, - {.name = "consumer_group", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false}, - {.name = "client_id", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false}, + {.name = "consumer_id", .bytes = TSDB_CONSUMER_ID_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false}, + {.name = "consumer_group", .bytes = TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false}, + {.name = "client_id", .bytes = TSDB_CLIENT_ID_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false}, {.name = "status", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false}, {.name = "topics", .bytes = TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false}, /*{.name = "end_point", .bytes = TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},*/ {.name = "up_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false}, {.name = "subscribe_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false}, {.name = "rebalance_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false}, - {.name = "parameters", .bytes = 64 + TSDB_OFFSET_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false}, + {.name = "parameters", .bytes = 128 + TSDB_OFFSET_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false}, }; static const SSysDbTableSchema offsetSchema[] = { diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 10719674f5..3612e6553c 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -7002,6 +7002,7 @@ int32_t tSerializeSMqHbReq(void *buf, int32_t bufLen, SMqHbReq *pReq) { } } + if (tEncodeI8(&encoder, pReq->pollFlag) < 0) return -1; tEndEncode(&encoder); int32_t tlen = encoder.pos; @@ -7041,6 +7042,9 @@ int32_t tDeserializeSMqHbReq(void *buf, int32_t bufLen, SMqHbReq *pReq) { } } } + if (!tDecodeIsEnd(&decoder)) { + if (tDecodeI8(&decoder, &pReq->pollFlag) < 0) return -1; + } tEndDecode(&decoder); tDecoderClear(&decoder); diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 089c4a10b3..b700c440a5 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -596,11 +596,12 @@ typedef struct { typedef struct { int64_t consumerId; char cgroup[TSDB_CGROUP_LEN]; - char clientId[256]; + char clientId[TSDB_CLIENT_ID_LEN]; int8_t updateType; // used only for update int32_t epoch; int32_t status; int32_t hbStatus; // hbStatus is not applicable to serialization + int32_t pollStatus; // pollStatus is not applicable to serialization SRWLatch lock; // lock is used for topics update SArray* currentTopics; // SArray SArray* rebNewTopics; // SArray @@ -620,6 +621,8 @@ typedef struct { int8_t autoCommit; int32_t autoCommitInterval; int32_t resetOffsetCfg; + int32_t sessionTimeoutMs; + int32_t maxPollIntervalMs; } SMqConsumerObj; SMqConsumerObj *tNewSMqConsumerObj(int64_t consumerId, char *cgroup, int8_t updateType, char *topic, SCMSubscribeReq *subscribe); diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 9a7a8155ec..c37739252c 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -25,7 +25,7 @@ #include "tcompare.h" #include "tname.h" -#define MND_CONSUMER_VER_NUMBER 2 +#define MND_CONSUMER_VER_NUMBER 3 #define MND_CONSUMER_RESERVE_SIZE 64 #define MND_MAX_GROUP_PER_TOPIC 100 @@ -40,7 +40,7 @@ static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg); static int32_t mndProcessAskEpReq(SRpcMsg *pMsg); static int32_t mndProcessMqHbReq(SRpcMsg *pMsg); static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg); -static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg); +//static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg); int32_t mndInitConsumer(SMnode *pMnode) { SSdbTable table = { @@ -57,7 +57,7 @@ int32_t mndInitConsumer(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_MND_TMQ_HB, mndProcessMqHbReq); mndSetMsgHandle(pMnode, TDMT_MND_TMQ_ASK_EP, mndProcessAskEpReq); // mndSetMsgHandle(pMnode, TDMT_MND_TMQ_TIMER, mndProcessMqTimerMsg); - mndSetMsgHandle(pMnode, TDMT_MND_TMQ_CONSUMER_RECOVER, mndProcessConsumerRecoverMsg); +// mndSetMsgHandle(pMnode, TDMT_MND_TMQ_CONSUMER_RECOVER, mndProcessConsumerRecoverMsg); mndSetMsgHandle(pMnode, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, mndProcessConsumerClearMsg); mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONSUMERS, mndRetrieveConsumer); @@ -144,56 +144,56 @@ FAILED: return code; } -static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg) { - int32_t code = 0; - SMnode *pMnode = pMsg->info.node; - SMqConsumerRecoverMsg *pRecoverMsg = pMsg->pCont; - SMqConsumerObj *pConsumerNew = NULL; - STrans *pTrans = NULL; - SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, pRecoverMsg->consumerId); - if (pConsumer == NULL) { - mError("cannot find consumer %" PRId64 " when processing consumer recover msg", pRecoverMsg->consumerId); - code = -1; - goto END; - } - - mInfo("receive consumer recover msg, consumer:0x%" PRIx64 " status:%d(%s)", pRecoverMsg->consumerId, - pConsumer->status, mndConsumerStatusName(pConsumer->status)); - - if (pConsumer->status != MQ_CONSUMER_STATUS_LOST) { - terrno = TSDB_CODE_MND_CONSUMER_NOT_READY; - code = -1; - goto END; - } - - pConsumerNew = tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup, CONSUMER_UPDATE_REC, NULL, NULL); - if (pConsumerNew == NULL){ - code = -1; - goto END; - } - - pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg, "recover-csm"); - if (pTrans == NULL) { - code = -1; - goto END; - } - code = validateTopics(pConsumer->assignedTopics, pMnode, pMsg->info.conn.user, false); - if (code != 0) { - goto END; - } - - code = mndSetConsumerCommitLogs(pTrans, pConsumerNew); - if (code != 0) { - goto END; - } - - code = mndTransPrepare(pMnode, pTrans); -END: - mndReleaseConsumer(pMnode, pConsumer); - tDeleteSMqConsumerObj(pConsumerNew); - mndTransDrop(pTrans); - return code; -} +//static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg) { +// int32_t code = 0; +// SMnode *pMnode = pMsg->info.node; +// SMqConsumerRecoverMsg *pRecoverMsg = pMsg->pCont; +// SMqConsumerObj *pConsumerNew = NULL; +// STrans *pTrans = NULL; +// SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, pRecoverMsg->consumerId); +// if (pConsumer == NULL) { +// mError("cannot find consumer %" PRId64 " when processing consumer recover msg", pRecoverMsg->consumerId); +// code = -1; +// goto END; +// } +// +// mInfo("receive consumer recover msg, consumer:0x%" PRIx64 " status:%d(%s)", pRecoverMsg->consumerId, +// pConsumer->status, mndConsumerStatusName(pConsumer->status)); +// +// if (pConsumer->status != MQ_CONSUMER_STATUS_LOST) { +// terrno = TSDB_CODE_MND_CONSUMER_NOT_READY; +// code = -1; +// goto END; +// } +// +// pConsumerNew = tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup, CONSUMER_UPDATE_REC, NULL, NULL); +// if (pConsumerNew == NULL){ +// code = -1; +// goto END; +// } +// +// pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg, "recover-csm"); +// if (pTrans == NULL) { +// code = -1; +// goto END; +// } +// code = validateTopics(pConsumer->assignedTopics, pMnode, pMsg->info.conn.user, false); +// if (code != 0) { +// goto END; +// } +// +// code = mndSetConsumerCommitLogs(pTrans, pConsumerNew); +// if (code != 0) { +// goto END; +// } +// +// code = mndTransPrepare(pMnode, pTrans); +//END: +// mndReleaseConsumer(pMnode, pConsumer); +// tDeleteSMqConsumerObj(pConsumerNew); +// mndTransDrop(pTrans); +// return code; +//} static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg) { int32_t code = 0; @@ -328,13 +328,15 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) { } atomic_store_32(&pConsumer->hbStatus, 0); - - int32_t status = atomic_load_32(&pConsumer->status); - - if (status == MQ_CONSUMER_STATUS_LOST) { - mInfo("try to recover consumer:0x%" PRIx64, consumerId); - mndSendConsumerMsg(pMnode, pConsumer->consumerId, TDMT_MND_TMQ_CONSUMER_RECOVER, &pMsg->info); + if (req.pollFlag == 1){ + atomic_store_32(&pConsumer->pollStatus, 0); } +// int32_t status = atomic_load_32(&pConsumer->status); +// +// if (status == MQ_CONSUMER_STATUS_LOST) { +// mInfo("try to recover consumer:0x%" PRIx64, consumerId); +// mndSendConsumerMsg(pMnode, pConsumer->consumerId, TDMT_MND_TMQ_CONSUMER_RECOVER, &pMsg->info); +// } storeOffsetRows(pMnode, &req, pConsumer); code = buildMqHbRsp(pMsg, &rsp); @@ -480,14 +482,12 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) { goto END; } - atomic_store_32(&pConsumer->hbStatus, 0); - // 1. check consumer status int32_t status = atomic_load_32(&pConsumer->status); - - if (status == MQ_CONSUMER_STATUS_LOST) { - mndSendConsumerMsg(pMnode, pConsumer->consumerId, TDMT_MND_TMQ_CONSUMER_RECOVER, &pMsg->info); - } +// +// if (status == MQ_CONSUMER_STATUS_LOST) { +// mndSendConsumerMsg(pMnode, pConsumer->consumerId, TDMT_MND_TMQ_CONSUMER_RECOVER, &pMsg->info); +// } if (status != MQ_CONSUMER_STATUS_READY) { mInfo("consumer:0x%" PRIx64 " not ready, status: %s", consumerId, mndConsumerStatusName(status)); @@ -652,7 +652,7 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { int32_t code = 0; SCMSubscribeReq subscribe = {0}; - tDeserializeSCMSubscribeReq(msgStr, &subscribe); + tDeserializeSCMSubscribeReq(msgStr, &subscribe, pMsg->contLen); SMqConsumerObj *pConsumerNew = NULL; STrans *pTrans = NULL; @@ -806,17 +806,17 @@ static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer) { return 0; } -static void updateConsumerStatus(SMqConsumerObj *pConsumer) { - int32_t status = pConsumer->status; - - if (taosArrayGetSize(pConsumer->rebNewTopics) == 0 && taosArrayGetSize(pConsumer->rebRemovedTopics) == 0) { - if (status == MQ_CONSUMER_STATUS_REBALANCE) { - pConsumer->status = MQ_CONSUMER_STATUS_READY; - } else if (status == MQ_CONSUMER_STATUS_READY && taosArrayGetSize(pConsumer->currentTopics) == 0) { - pConsumer->status = MQ_CONSUMER_STATUS_LOST; - } - } -} +//static void updateConsumerStatus(SMqConsumerObj *pConsumer) { +// int32_t status = pConsumer->status; +// +// if (taosArrayGetSize(pConsumer->rebNewTopics) == 0 && taosArrayGetSize(pConsumer->rebRemovedTopics) == 0) { +// if (status == MQ_CONSUMER_STATUS_REBALANCE) { +// pConsumer->status = MQ_CONSUMER_STATUS_READY; +// } else if (status == MQ_CONSUMER_STATUS_READY && taosArrayGetSize(pConsumer->currentTopics) == 0) { +// pConsumer->status = MQ_CONSUMER_STATUS_LOST; +// } +// } +//} // remove from topic list static void removeFromTopicList(SArray *topicList, const char *pTopic, int64_t consumerId, char *type) { @@ -863,14 +863,14 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, pOldConsumer->subscribeTime = taosGetTimestampMs(); pOldConsumer->status = MQ_CONSUMER_STATUS_REBALANCE; mInfo("consumer:0x%" PRIx64 " subscribe update, modify existed consumer", pOldConsumer->consumerId); - } else if (pNewConsumer->updateType == CONSUMER_UPDATE_REC) { - int32_t sz = taosArrayGetSize(pOldConsumer->assignedTopics); - for (int32_t i = 0; i < sz; i++) { - char *topic = taosStrdup(taosArrayGetP(pOldConsumer->assignedTopics, i)); - taosArrayPush(pOldConsumer->rebNewTopics, &topic); - } - pOldConsumer->status = MQ_CONSUMER_STATUS_REBALANCE; - mInfo("consumer:0x%" PRIx64 " recover update", pOldConsumer->consumerId); +// } else if (pNewConsumer->updateType == CONSUMER_UPDATE_REC) { +// int32_t sz = taosArrayGetSize(pOldConsumer->assignedTopics); +// for (int32_t i = 0; i < sz; i++) { +// char *topic = taosStrdup(taosArrayGetP(pOldConsumer->assignedTopics, i)); +// taosArrayPush(pOldConsumer->rebNewTopics, &topic); +// } +// pOldConsumer->status = MQ_CONSUMER_STATUS_REBALANCE; +// mInfo("consumer:0x%" PRIx64 " recover update", pOldConsumer->consumerId); } else if (pNewConsumer->updateType == CONSUMER_UPDATE_REB) { atomic_add_fetch_32(&pOldConsumer->epoch, 1); @@ -889,7 +889,11 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, } int32_t status = pOldConsumer->status; - updateConsumerStatus(pOldConsumer); +// updateConsumerStatus(pOldConsumer); + if (taosArrayGetSize(pOldConsumer->rebNewTopics) == 0 && taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0) { + pOldConsumer->status = MQ_CONSUMER_STATUS_READY; + } + pOldConsumer->rebalanceTime = taosGetTimestampMs(); atomic_add_fetch_32(&pOldConsumer->epoch, 1); @@ -906,7 +910,10 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, removeFromTopicList(pOldConsumer->currentTopics, topic, pOldConsumer->consumerId, "current"); int32_t status = pOldConsumer->status; - updateConsumerStatus(pOldConsumer); +// updateConsumerStatus(pOldConsumer); + if (taosArrayGetSize(pOldConsumer->rebNewTopics) == 0 && taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0) { + pOldConsumer->status = MQ_CONSUMER_STATUS_READY; + } pOldConsumer->rebalanceTime = taosGetTimestampMs(); atomic_add_fetch_32(&pOldConsumer->epoch, 1); @@ -973,7 +980,7 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock * int32_t cols = 0; // consumer id - char consumerIdHex[32] = {0}; + char consumerIdHex[TSDB_CONSUMER_ID_LEN + VARSTR_HEADER_SIZE] = {0}; sprintf(varDataVal(consumerIdHex), "0x%" PRIx64, pConsumer->consumerId); varDataSetLen(consumerIdHex, strlen(varDataVal(consumerIdHex))); @@ -988,19 +995,20 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock * colDataSetVal(pColInfo, numOfRows, (const char *)cgroup, false); // client id - char clientId[256 + VARSTR_HEADER_SIZE] = {0}; + char clientId[TSDB_CLIENT_ID_LEN + VARSTR_HEADER_SIZE] = {0}; STR_TO_VARSTR(clientId, pConsumer->clientId); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataSetVal(pColInfo, numOfRows, (const char *)clientId, false); // status - char status[20 + VARSTR_HEADER_SIZE] = {0}; const char *pStatusName = mndConsumerStatusName(pConsumer->status); + char *status = taosMemoryCalloc(1, pShow->pMeta->pSchemas[cols].bytes); STR_TO_VARSTR(status, pStatusName); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataSetVal(pColInfo, numOfRows, (const char *)status, false); + taosMemoryFree(status); // one subscribed topic pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); @@ -1033,14 +1041,14 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock * STqOffsetVal pVal = {.type = pConsumer->resetOffsetCfg}; tFormatOffset(buf, TSDB_OFFSET_LEN, &pVal); - char parasStr[64 + TSDB_OFFSET_LEN + VARSTR_HEADER_SIZE] = {0}; - sprintf(varDataVal(parasStr), "tbname:%d,commit:%d,interval:%dms,reset:%s", pConsumer->withTbName, - pConsumer->autoCommit, pConsumer->autoCommitInterval, buf); + char *parasStr = taosMemoryCalloc(1, pShow->pMeta->pSchemas[cols].bytes); + sprintf(varDataVal(parasStr), "tbname:%d,commit:%d,interval:%dms,reset:%s,maxPoll:%d,timeout:%d", pConsumer->withTbName, + pConsumer->autoCommit, pConsumer->autoCommitInterval, buf, pConsumer->maxPollIntervalMs, pConsumer->sessionTimeoutMs); varDataSetLen(parasStr, strlen(varDataVal(parasStr))); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataSetVal(pColInfo, numOfRows, (const char *)parasStr, false); - + taosMemoryFree(parasStr); numOfRows++; } @@ -1063,8 +1071,8 @@ const char *mndConsumerStatusName(int status) { switch (status) { case MQ_CONSUMER_STATUS_READY: return "ready"; - case MQ_CONSUMER_STATUS_LOST: - return "lost"; +// case MQ_CONSUMER_STATUS_LOST: +// return "lost"; case MQ_CONSUMER_STATUS_REBALANCE: return "rebalancing"; default: diff --git a/source/dnode/mnode/impl/src/mndDef.c b/source/dnode/mnode/impl/src/mndDef.c index 5164557184..1373691cdd 100644 --- a/source/dnode/mnode/impl/src/mndDef.c +++ b/source/dnode/mnode/impl/src/mndDef.c @@ -266,6 +266,7 @@ SMqConsumerObj *tNewSMqConsumerObj(int64_t consumerId, char *cgroup, int8_t upda pConsumer->epoch = 0; pConsumer->status = MQ_CONSUMER_STATUS_REBALANCE; pConsumer->hbStatus = 0; + pConsumer->pollStatus = 0; taosInitRWLatch(&pConsumer->lock); pConsumer->createTime = taosGetTimestampMs(); @@ -294,6 +295,8 @@ SMqConsumerObj *tNewSMqConsumerObj(int64_t consumerId, char *cgroup, int8_t upda pConsumer->autoCommit = subscribe->autoCommit; pConsumer->autoCommitInterval = subscribe->autoCommitInterval; pConsumer->resetOffsetCfg = subscribe->resetOffsetCfg; + pConsumer->maxPollIntervalMs = subscribe->maxPollIntervalMs; + pConsumer->sessionTimeoutMs = subscribe->sessionTimeoutMs; pConsumer->rebNewTopics = taosArrayDup(subscribe->topicNames, topicNameDup); @@ -396,6 +399,8 @@ int32_t tEncodeSMqConsumerObj(void **buf, const SMqConsumerObj *pConsumer) { tlen += taosEncodeFixedI8(buf, pConsumer->autoCommit); tlen += taosEncodeFixedI32(buf, pConsumer->autoCommitInterval); tlen += taosEncodeFixedI32(buf, pConsumer->resetOffsetCfg); + tlen += taosEncodeFixedI32(buf, pConsumer->maxPollIntervalMs); + tlen += taosEncodeFixedI32(buf, pConsumer->sessionTimeoutMs); return tlen; } @@ -456,6 +461,14 @@ void *tDecodeSMqConsumerObj(const void *buf, SMqConsumerObj *pConsumer, int8_t s buf = taosDecodeFixedI32(buf, &pConsumer->autoCommitInterval); buf = taosDecodeFixedI32(buf, &pConsumer->resetOffsetCfg); } + if (sver > 2){ + buf = taosDecodeFixedI32(buf, &pConsumer->maxPollIntervalMs); + buf = taosDecodeFixedI32(buf, &pConsumer->sessionTimeoutMs); + } else{ + pConsumer->maxPollIntervalMs = DEFAULT_MAX_POLL_INTERVAL; + pConsumer->sessionTimeoutMs = DEFAULT_SESSION_TIMEOUT; + } + return (void *)buf; } diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index e2bedc258a..d2615d7b2f 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -27,8 +27,7 @@ #define MND_SUBSCRIBE_VER_NUMBER 3 #define MND_SUBSCRIBE_RESERVE_SIZE 64 -#define MND_CONSUMER_LOST_HB_CNT 6 -#define MND_CONSUMER_LOST_CLEAR_THRESHOLD 43200 +//#define MND_CONSUMER_LOST_HB_CNT 6 static int32_t mqRebInExecCnt = 0; @@ -234,7 +233,7 @@ static void processRemovedConsumers(SMqRebOutputObj *pOutput, SHashObj *pHash, c int32_t numOfRemoved = taosArrayGetSize(pInput->pRebInfo->removedConsumers); int32_t actualRemoved = 0; for (int32_t i = 0; i < numOfRemoved; i++) { - uint64_t consumerId = *(uint64_t *)taosArrayGet(pInput->pRebInfo->removedConsumers, i); + int64_t consumerId = *(int64_t *)taosArrayGet(pInput->pRebInfo->removedConsumers, i); SMqConsumerEp *pConsumerEp = taosHashGet(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t)); if (pConsumerEp == NULL) { continue; @@ -378,12 +377,10 @@ static int32_t processRemoveAddVgs(SMnode *pMnode, SMqRebOutputObj *pOutput) { } } - if (taosArrayGetSize(pOutput->pSub->unassignedVgs) == 0 && taosArrayGetSize(newVgs) != 0) { + if (taosArrayGetSize(newVgs) != 0) { taosArrayAddAll(pOutput->pSub->unassignedVgs, newVgs); mInfo("[rebalance] processRemoveAddVgs add new vg num:%d", (int)taosArrayGetSize(newVgs)); taosArrayDestroy(newVgs); - } else { - taosArrayDestroyP(newVgs, (FDelete)tDeleteSMqVgEp); } return totalVgNum; } @@ -678,7 +675,7 @@ static void freeRebalanceItem(void *param) { static void buildRebInfo(SHashObj *rebSubHash, SArray *topicList, int8_t type, char *group, int64_t consumerId) { int32_t topicNum = taosArrayGetSize(topicList); for (int32_t i = 0; i < topicNum; i++) { - char key[TSDB_SUBSCRIBE_KEY_LEN]; + char key[TSDB_SUBSCRIBE_KEY_LEN] = {0}; char *removedTopic = taosArrayGetP(topicList, i); mndMakeSubscribeKey(key, group, removedTopic); SMqRebInfo *pRebSub = mndGetOrCreateRebSub(rebSubHash, key); @@ -707,7 +704,7 @@ static void checkForVgroupSplit(SMnode *pMnode, SMqConsumerObj *pConsumer, SHash SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, j); SVgObj *pVgroup = mndAcquireVgroup(pMnode, pVgEp->vgId); if (!pVgroup) { - char key[TSDB_SUBSCRIBE_KEY_LEN]; + char key[TSDB_SUBSCRIBE_KEY_LEN] = {0}; mndMakeSubscribeKey(key, pConsumer->cgroup, topic); mndGetOrCreateRebSub(rebSubHash, key); mInfo("vnode splitted, vgId:%d rebalance will be triggered", pVgEp->vgId); @@ -733,27 +730,25 @@ static void mndCheckConsumer(SRpcMsg *pMsg, SHashObj *rebSubHash) { } int32_t hbStatus = atomic_add_fetch_32(&pConsumer->hbStatus, 1); + int32_t pollStatus = atomic_add_fetch_32(&pConsumer->pollStatus, 1); int32_t status = atomic_load_32(&pConsumer->status); mDebug("[rebalance] check for consumer:0x%" PRIx64 " status:%d(%s), sub-time:%" PRId64 ", createTime:%" PRId64 - ", hbstatus:%d", + ", hbstatus:%d, pollStatus:%d", pConsumer->consumerId, status, mndConsumerStatusName(status), pConsumer->subscribeTime, - pConsumer->createTime, hbStatus); + pConsumer->createTime, hbStatus, pollStatus); if (status == MQ_CONSUMER_STATUS_READY) { - if (taosArrayGetSize(pConsumer->assignedTopics) == 0) { // unsubscribe or close + if (taosArrayGetSize(pConsumer->currentTopics) == 0) { // unsubscribe or close mndSendConsumerMsg(pMnode, pConsumer->consumerId, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, &pMsg->info); - } else if (hbStatus > MND_CONSUMER_LOST_HB_CNT) { + } else if (hbStatus * tsMqRebalanceInterval * 1000 >= pConsumer->sessionTimeoutMs || + pollStatus * tsMqRebalanceInterval * 1000 >= pConsumer->maxPollIntervalMs) { taosRLockLatch(&pConsumer->lock); buildRebInfo(rebSubHash, pConsumer->currentTopics, 0, pConsumer->cgroup, pConsumer->consumerId); taosRUnLockLatch(&pConsumer->lock); } else { checkForVgroupSplit(pMnode, pConsumer, rebSubHash); } - } else if (status == MQ_CONSUMER_STATUS_LOST) { - if (hbStatus > MND_CONSUMER_LOST_CLEAR_THRESHOLD) { // clear consumer if lost a day - mndSendConsumerMsg(pMnode, pConsumer->consumerId, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, &pMsg->info); - } } else { taosRLockLatch(&pConsumer->lock); buildRebInfo(rebSubHash, pConsumer->rebNewTopics, 1, pConsumer->cgroup, pConsumer->consumerId); @@ -832,8 +827,8 @@ static int32_t buildRebOutput(SMnode *pMnode, SMqRebInputObj *rebInput, SMqRebOu if (pSub == NULL) { // split sub key and extract topic - char topic[TSDB_TOPIC_FNAME_LEN]; - char cgroup[TSDB_CGROUP_LEN]; + char topic[TSDB_TOPIC_FNAME_LEN] = {0}; + char cgroup[TSDB_CGROUP_LEN] = {0}; mndSplitSubscribeKey(key, topic, cgroup, true); SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic); @@ -878,7 +873,7 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) { SMnode *pMnode = pMsg->info.node; mDebug("[rebalance] start to process mq timer") - if (!mndRebTryStart()) { + if (!mndRebTryStart()) { mInfo("[rebalance] mq rebalance already in progress, do nothing") return code; } From 4ef2c3964968aa6baf49cb89fd607b6594402df6 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Thu, 11 Jul 2024 14:56:25 +0800 Subject: [PATCH 02/24] fix:[TS-4592]remove lost status --- include/common/tmsg.h | 2 +- source/client/src/clientEnv.c | 2 +- source/dnode/mnode/impl/inc/mndConsumer.h | 2 +- source/dnode/mnode/impl/src/mndSubscribe.c | 82 ++++++++++---------- source/dnode/mnode/impl/src/mndTopic.c | 90 +++++++++++----------- source/dnode/mnode/sdb/src/sdbFile.c | 2 + utils/test/c/tmq_taosx_ci.c | 1 + 7 files changed, 93 insertions(+), 88 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index c812138282..be7accd0da 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2773,7 +2773,7 @@ enum { }; #define DEFAULT_MAX_POLL_INTERVAL 3000000 -#define DEFAULT_SESSION_TIMEOUT 10000 +#define DEFAULT_SESSION_TIMEOUT 12000 typedef struct { char name[TSDB_TOPIC_FNAME_LEN]; // accout.topic diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index 3a821768f8..1336372ae2 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -127,7 +127,7 @@ static void generateWriteSlowLog(STscObj *pTscObj, SRequestObj *pRequest, int32_ cJSON_AddItemToObject(json, "error_info", cJSON_CreateString(tstrerror(pRequest->code))); cJSON_AddItemToObject(json, "type", cJSON_CreateNumber(reqType)); cJSON_AddItemToObject(json, "rows_num", cJSON_CreateNumber(pRequest->body.resInfo.numOfRows + pRequest->body.resInfo.totalRows)); - if(strlen(pRequest->sqlstr) > pTscObj->pAppInfo->monitorParas.tsSlowLogMaxLen){ + if(pRequest->sqlstr != NULL && strlen(pRequest->sqlstr) > pTscObj->pAppInfo->monitorParas.tsSlowLogMaxLen){ char tmp = pRequest->sqlstr[pTscObj->pAppInfo->monitorParas.tsSlowLogMaxLen]; pRequest->sqlstr[pTscObj->pAppInfo->monitorParas.tsSlowLogMaxLen] = '\0'; cJSON_AddItemToObject(json, "sql", cJSON_CreateString(pRequest->sqlstr)); diff --git a/source/dnode/mnode/impl/inc/mndConsumer.h b/source/dnode/mnode/impl/inc/mndConsumer.h index 5184ad0eca..7308343d1c 100644 --- a/source/dnode/mnode/impl/inc/mndConsumer.h +++ b/source/dnode/mnode/impl/inc/mndConsumer.h @@ -25,7 +25,7 @@ extern "C" { enum { MQ_CONSUMER_STATUS_REBALANCE = 1, MQ_CONSUMER_STATUS_READY, - MQ_CONSUMER_STATUS_LOST, +// MQ_CONSUMER_STATUS_LOST, }; int32_t mndInitConsumer(SMnode *pMnode); diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index d2615d7b2f..e03eee07a1 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -749,11 +749,13 @@ static void mndCheckConsumer(SRpcMsg *pMsg, SHashObj *rebSubHash) { } else { checkForVgroupSplit(pMnode, pConsumer, rebSubHash); } - } else { + } else if (status == MQ_CONSUMER_STATUS_REBALANCE) { taosRLockLatch(&pConsumer->lock); buildRebInfo(rebSubHash, pConsumer->rebNewTopics, 1, pConsumer->cgroup, pConsumer->consumerId); buildRebInfo(rebSubHash, pConsumer->rebRemovedTopics, 0, pConsumer->cgroup, pConsumer->consumerId); taosRUnLockLatch(&pConsumer->lock); + } else { + mndSendConsumerMsg(pMnode, pConsumer->consumerId, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, &pMsg->info); } mndReleaseConsumer(pMnode, pConsumer); @@ -974,41 +976,41 @@ END: return ret; } -static int32_t mndDropConsumerByGroup(SMnode *pMnode, STrans *pTrans, char *cgroup, char *topic) { - void *pIter = NULL; - SMqConsumerObj *pConsumer = NULL; - int ret = 0; - while (1) { - pIter = sdbFetch(pMnode->pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer); - if (pIter == NULL) { - break; - } - - // drop consumer in lost status, other consumers not in lost status already deleted by rebalance - if (pConsumer->status != MQ_CONSUMER_STATUS_LOST || strcmp(cgroup, pConsumer->cgroup) != 0) { - sdbRelease(pMnode->pSdb, pConsumer); - continue; - } - int32_t sz = taosArrayGetSize(pConsumer->assignedTopics); - for (int32_t i = 0; i < sz; i++) { - char *name = taosArrayGetP(pConsumer->assignedTopics, i); - if (strcmp(topic, name) == 0) { - int32_t code = mndSetConsumerDropLogs(pTrans, pConsumer); - if (code != 0) { - ret = code; - goto END; - } - } - } - - sdbRelease(pMnode->pSdb, pConsumer); - } - -END: - sdbRelease(pMnode->pSdb, pConsumer); - sdbCancelFetch(pMnode->pSdb, pIter); - return ret; -} +//static int32_t mndDropConsumerByGroup(SMnode *pMnode, STrans *pTrans, char *cgroup, char *topic) { +// void *pIter = NULL; +// SMqConsumerObj *pConsumer = NULL; +// int ret = 0; +// while (1) { +// pIter = sdbFetch(pMnode->pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer); +// if (pIter == NULL) { +// break; +// } +// +// // drop consumer in lost status, other consumers not in lost status already deleted by rebalance +// if (pConsumer->status != MQ_CONSUMER_STATUS_LOST || strcmp(cgroup, pConsumer->cgroup) != 0) { +// sdbRelease(pMnode->pSdb, pConsumer); +// continue; +// } +// int32_t sz = taosArrayGetSize(pConsumer->assignedTopics); +// for (int32_t i = 0; i < sz; i++) { +// char *name = taosArrayGetP(pConsumer->assignedTopics, i); +// if (strcmp(topic, name) == 0) { +// int32_t code = mndSetConsumerDropLogs(pTrans, pConsumer); +// if (code != 0) { +// ret = code; +// goto END; +// } +// } +// } +// +// sdbRelease(pMnode->pSdb, pConsumer); +// } +// +//END: +// sdbRelease(pMnode->pSdb, pConsumer); +// sdbCancelFetch(pMnode->pSdb, pIter); +// return ret; +//} static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) { SMnode *pMnode = pMsg->info.node; @@ -1055,10 +1057,10 @@ static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) { goto end; } - code = mndDropConsumerByGroup(pMnode, pTrans, dropReq.cgroup, dropReq.topic); - if (code != 0) { - goto end; - } +// code = mndDropConsumerByGroup(pMnode, pTrans, dropReq.cgroup, dropReq.topic); +// if (code != 0) { +// goto end; +// } code = sendDeleteSubToVnode(pMnode, pSub, pTrans); if (code != 0) { diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index bcb38a3902..9ca0fed08a 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -668,47 +668,47 @@ static bool checkTopic(SArray *topics, char *topicName){ return false; } -static int32_t mndDropConsumerByTopic(SMnode *pMnode, STrans *pTrans, char *topicName){ - int32_t code = 0; - SSdb *pSdb = pMnode->pSdb; - void *pIter = NULL; - SMqConsumerObj *pConsumer = NULL; - while (1) { - pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer); - if (pIter == NULL) { - break; - } - - bool found = checkTopic(pConsumer->assignedTopics, topicName); - if (found){ - if (pConsumer->status == MQ_CONSUMER_STATUS_LOST) { - code = mndSetConsumerDropLogs(pTrans, pConsumer); - if (code != 0) { - goto end; - } - sdbRelease(pSdb, pConsumer); - continue; - } - mError("topic:%s, failed to drop since subscribed by consumer:0x%" PRIx64 ", in consumer group %s", - topicName, pConsumer->consumerId, pConsumer->cgroup); - code = TSDB_CODE_MND_TOPIC_SUBSCRIBED; - goto end; - } - - if (checkTopic(pConsumer->rebNewTopics, topicName) || checkTopic(pConsumer->rebRemovedTopics, topicName)) { - code = TSDB_CODE_MND_TOPIC_SUBSCRIBED; - mError("topic:%s, failed to drop since subscribed by consumer:%" PRId64 ", in consumer group %s (reb new)", - topicName, pConsumer->consumerId, pConsumer->cgroup); - goto end; - } - sdbRelease(pSdb, pConsumer); - } - -end: - sdbRelease(pSdb, pConsumer); - sdbCancelFetch(pSdb, pIter); - return code; -} +//static int32_t mndDropConsumerByTopic(SMnode *pMnode, STrans *pTrans, char *topicName){ +// int32_t code = 0; +// SSdb *pSdb = pMnode->pSdb; +// void *pIter = NULL; +// SMqConsumerObj *pConsumer = NULL; +// while (1) { +// pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer); +// if (pIter == NULL) { +// break; +// } +// +// bool found = checkTopic(pConsumer->assignedTopics, topicName); +// if (found){ +// if (pConsumer->status == MQ_CONSUMER_STATUS_LOST) { +// code = mndSetConsumerDropLogs(pTrans, pConsumer); +// if (code != 0) { +// goto end; +// } +// sdbRelease(pSdb, pConsumer); +// continue; +// } +// mError("topic:%s, failed to drop since subscribed by consumer:0x%" PRIx64 ", in consumer group %s", +// topicName, pConsumer->consumerId, pConsumer->cgroup); +// code = TSDB_CODE_MND_TOPIC_SUBSCRIBED; +// goto end; +// } +// +// if (checkTopic(pConsumer->rebNewTopics, topicName) || checkTopic(pConsumer->rebRemovedTopics, topicName)) { +// code = TSDB_CODE_MND_TOPIC_SUBSCRIBED; +// mError("topic:%s, failed to drop since subscribed by consumer:%" PRId64 ", in consumer group %s (reb new)", +// topicName, pConsumer->consumerId, pConsumer->cgroup); +// goto end; +// } +// sdbRelease(pSdb, pConsumer); +// } +// +//end: +// sdbRelease(pSdb, pConsumer); +// sdbCancelFetch(pSdb, pIter); +// return code; +//} static int32_t mndDropCheckInfoByTopic(SMnode *pMnode, STrans *pTrans, SMqTopicObj *pTopic){ // broadcast to all vnode @@ -804,10 +804,10 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) { goto end; } - code = mndDropConsumerByTopic(pMnode, pTrans, dropReq.name); - if (code != 0) { - goto end; - } +// code = mndDropConsumerByTopic(pMnode, pTrans, dropReq.name); +// if (code != 0) { +// goto end; +// } code = mndDropSubByTopic(pMnode, pTrans, dropReq.name); if (code < 0) { diff --git a/source/dnode/mnode/sdb/src/sdbFile.c b/source/dnode/mnode/sdb/src/sdbFile.c index d94650695c..ab928a4edc 100644 --- a/source/dnode/mnode/sdb/src/sdbFile.c +++ b/source/dnode/mnode/sdb/src/sdbFile.c @@ -339,6 +339,8 @@ static int32_t sdbReadFileImp(SSdb *pSdb) { code = sdbWriteWithoutFree(pSdb, pRaw); if (code != 0) { mError("failed to read sdb file:%s since %s", file, terrstr()); + code = sdbWriteWithoutFree(pSdb, pRaw); + goto _OVER; } } diff --git a/utils/test/c/tmq_taosx_ci.c b/utils/test/c/tmq_taosx_ci.c index 51d134a463..2f6a5fa59b 100644 --- a/utils/test/c/tmq_taosx_ci.c +++ b/utils/test/c/tmq_taosx_ci.c @@ -596,6 +596,7 @@ tmq_t* build_consumer() { tmq_conf_set(conf, "enable.auto.commit", "true"); tmq_conf_set(conf, "auto.offset.reset", "earliest"); tmq_conf_set(conf, "msg.consume.excluded", "1"); +// tmq_conf_set(conf, "max.poll.interval.ms", "20000"); if (g_conf.snapShot) { tmq_conf_set(conf, "experimental.snapshot.enable", "true"); From ea9819744eab2b5655bdade5f1f61d1c57e35c02 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Thu, 11 Jul 2024 15:08:59 +0800 Subject: [PATCH 03/24] fix:[TS-4592]remove lost status --- source/client/src/clientEnv.c | 2 +- source/dnode/mnode/sdb/src/sdbFile.c | 2 -- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index 18907ede5d..ecfa1e3392 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -127,7 +127,7 @@ static void generateWriteSlowLog(STscObj *pTscObj, SRequestObj *pRequest, int32_ cJSON_AddItemToObject(json, "error_info", cJSON_CreateString(tstrerror(pRequest->code))); cJSON_AddItemToObject(json, "type", cJSON_CreateNumber(reqType)); cJSON_AddItemToObject(json, "rows_num", cJSON_CreateNumber(pRequest->body.resInfo.numOfRows + pRequest->body.resInfo.totalRows)); - if(pRequest->sqlstr != NULL && strlen(pRequest->sqlstr) > pTscObj->pAppInfo->monitorParas.tsSlowLogMaxLen){ + if(strlen(pRequest->sqlstr) > pTscObj->pAppInfo->monitorParas.tsSlowLogMaxLen){ char tmp = pRequest->sqlstr[pTscObj->pAppInfo->monitorParas.tsSlowLogMaxLen]; pRequest->sqlstr[pTscObj->pAppInfo->monitorParas.tsSlowLogMaxLen] = '\0'; cJSON_AddItemToObject(json, "sql", cJSON_CreateString(pRequest->sqlstr)); diff --git a/source/dnode/mnode/sdb/src/sdbFile.c b/source/dnode/mnode/sdb/src/sdbFile.c index ab928a4edc..d94650695c 100644 --- a/source/dnode/mnode/sdb/src/sdbFile.c +++ b/source/dnode/mnode/sdb/src/sdbFile.c @@ -339,8 +339,6 @@ static int32_t sdbReadFileImp(SSdb *pSdb) { code = sdbWriteWithoutFree(pSdb, pRaw); if (code != 0) { mError("failed to read sdb file:%s since %s", file, terrstr()); - code = sdbWriteWithoutFree(pSdb, pRaw); - goto _OVER; } } From 44027f7978f7780ba5371ccdead6f60de80277a2 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Tue, 30 Jul 2024 01:25:37 +0800 Subject: [PATCH 04/24] feat:[TS-4592]remove lost status for consumer --- include/common/tmsg.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 4058805d02..5a6f13d1c7 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2886,7 +2886,7 @@ static FORCE_INLINE int32_t tDeserializeSCMSubscribeReq(void* buf, SCMSubscribeR buf = taosDecodeFixedI8(buf, &pReq->resetOffsetCfg); buf = taosDecodeFixedI8(buf, &pReq->enableReplay); buf = taosDecodeFixedI8(buf, &pReq->enableBatchMeta); - if (buf - start < len) { + if ((char*)buf - (char*)start < len) { buf = taosDecodeFixedI32(buf, &pReq->sessionTimeoutMs); buf = taosDecodeFixedI32(buf, &pReq->maxPollIntervalMs); } else { From f3698adebfed4c4580315e47daff3248c2bb6184 Mon Sep 17 00:00:00 2001 From: wangjiaming0909 <604227650@qq.com> Date: Fri, 12 Jul 2024 09:58:48 +0800 Subject: [PATCH 05/24] fix project with bool condition node --- source/libs/executor/src/executil.c | 13 +++++++++++++ tests/system-test/2-query/project_group.py | 2 ++ 2 files changed, 15 insertions(+) diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index d810cf2428..853b2865bb 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -1777,6 +1777,19 @@ int32_t createExprFromOneNode(SExprInfo* pExp, SNode* pNode, int16_t slotId) { pExp->base.resSchema = createResSchema(pType->type, pType->bytes, slotId, pType->scale, pType->precision, pCaseNode->node.aliasName); pExp->pExpr->_optrRoot.pRootNode = pNode; + } else if (type == QUERY_NODE_LOGIC_CONDITION) { + pExp->pExpr->nodeType = QUERY_NODE_OPERATOR; + SLogicConditionNode* pCond = (SLogicConditionNode*)pNode; + pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam)); + if (!pExp->base.pParam) { + code = terrno; + } + if (TSDB_CODE_SUCCESS == code) { + pExp->base.numOfParams = 1; + SDataType* pType = &pCond->node.resType; + pExp->base.resSchema = createResSchema(pType->type, pType->bytes, slotId, pType->scale, pType->precision, pCond->node.aliasName); + pExp->pExpr->_optrRoot.pRootNode = pNode; + } } else { ASSERT(0); } diff --git a/tests/system-test/2-query/project_group.py b/tests/system-test/2-query/project_group.py index 44943e5088..19fe8b1cf0 100644 --- a/tests/system-test/2-query/project_group.py +++ b/tests/system-test/2-query/project_group.py @@ -57,6 +57,8 @@ class TDTestCase: tdSql.query("select * from (select ts, col1 from sta partition by tbname) limit 2"); tdSql.checkRows(2) + tdSql.query('select col1 > 0 and col2 > 0 from stb') + tdSql.checkRows(12) def stop(self): tdSql.close() tdLog.success("%s successfully executed" % __file__) From 1d426f40d2f719eb55e82ccb5a2ed57b3c10d5b9 Mon Sep 17 00:00:00 2001 From: dmchen Date: Tue, 30 Jul 2024 09:38:58 +0000 Subject: [PATCH 06/24] fix/TD-31140 --- source/dnode/mnode/impl/src/mndDb.c | 8 ++++---- source/dnode/mnode/impl/src/mndVgroup.c | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index a307e3557b..5a7831ac0e 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -791,12 +791,12 @@ static int32_t mndCreateDb(SMnode *pMnode, SRpcMsg *pReq, SCreateDbReq *pCreate, mndSetDefaultDbCfg(&dbObj.cfg); if ((code = mndCheckDbName(dbObj.name, pUser)) != 0) { - mError("db:%s, failed to create since %s", pCreate->db, terrstr()); + mError("db:%s, failed to create, check db name failed, since %s", pCreate->db, terrstr()); TAOS_RETURN(code); } if ((code = mndCheckDbCfg(pMnode, &dbObj.cfg)) != 0) { - mError("db:%s, failed to create since %s", pCreate->db, terrstr()); + mError("db:%s, failed to create, check db cfg failed, since %s", pCreate->db, terrstr()); TAOS_RETURN(code); } @@ -812,7 +812,7 @@ static int32_t mndCreateDb(SMnode *pMnode, SRpcMsg *pReq, SCreateDbReq *pCreate, SVgObj *pVgroups = NULL; if ((code = mndAllocVgroup(pMnode, &dbObj, &pVgroups)) != 0) { - mError("db:%s, failed to create since %s", pCreate->db, terrstr()); + mError("db:%s, failed to create, alloc vgroup failed, since %s", pCreate->db, terrstr()); TAOS_RETURN(code); } @@ -965,7 +965,7 @@ static int32_t mndProcessCreateDbReq(SRpcMsg *pReq) { TAOS_CHECK_GOTO(mndAcquireUser(pMnode, pReq->info.conn.user, &pUser), &lino, _OVER); - code = mndCreateDb(pMnode, pReq, &createReq, pUser); + TAOS_CHECK_GOTO(mndCreateDb(pMnode, pReq, &createReq, pUser), &lino, _OVER); if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS; SName name = {0}; diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index 57a7453eac..5cfc896a1c 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -877,7 +877,7 @@ int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups) { pVgroup->dbUid = pDb->uid; pVgroup->replica = pDb->cfg.replications; - if (mndGetAvailableDnode(pMnode, pDb, pVgroup, pArray) != 0) { + if ((code = mndGetAvailableDnode(pMnode, pDb, pVgroup, pArray)) != 0) { goto _OVER; } From 9c2dae3613e5d45f770192ef7ca1277bff3ebded Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Tue, 30 Jul 2024 19:27:33 +0800 Subject: [PATCH 07/24] feat:[TS-4592]remove lost status for consumer --- source/dnode/mnode/impl/src/mndSubscribe.c | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index a74200472a..b2a866979c 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -362,13 +362,13 @@ static int32_t processRemoveAddVgs(SMnode *pMnode, SMqRebOutputObj *pOutput) { SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter; int32_t j = 0; while (j < taosArrayGetSize(pConsumerEp->vgs)) { - SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, j); - MND_TMQ_NULL_CHECK(pVgEp); + SMqVgEp *pVgEpTmp = taosArrayGetP(pConsumerEp->vgs, j); + MND_TMQ_NULL_CHECK(pVgEpTmp); bool find = false; for (int32_t k = 0; k < taosArrayGetSize(newVgs); k++) { SMqVgEp *pnewVgEp = taosArrayGetP(newVgs, k); MND_TMQ_NULL_CHECK(pnewVgEp); - if (pVgEp->vgId == pnewVgEp->vgId) { + if (pVgEpTmp->vgId == pnewVgEp->vgId) { tDeleteSMqVgEp(pnewVgEp); taosArrayRemove(newVgs, k); find = true; @@ -376,8 +376,8 @@ static int32_t processRemoveAddVgs(SMnode *pMnode, SMqRebOutputObj *pOutput) { } } if (!find) { - mInfo("[rebalance] processRemoveAddVgs old vgId:%d", pVgEp->vgId); - tDeleteSMqVgEp(pVgEp); + mInfo("[rebalance] processRemoveAddVgs old vgId:%d", pVgEpTmp->vgId); + tDeleteSMqVgEp(pVgEpTmp); taosArrayRemove(pConsumerEp->vgs, j); continue; } @@ -385,7 +385,7 @@ static int32_t processRemoveAddVgs(SMnode *pMnode, SMqRebOutputObj *pOutput) { } } - if (taosArrayGetSize(newVgs) != 0) { + if (taosArrayGetSize(pOutput->pSub->unassignedVgs) == 0 && taosArrayGetSize(newVgs) != 0) { MND_TMQ_NULL_CHECK(taosArrayAddAll(pOutput->pSub->unassignedVgs, newVgs)); mInfo("[rebalance] processRemoveAddVgs add new vg num:%d", (int)taosArrayGetSize(newVgs)); taosArrayDestroy(newVgs); From a81d8261e5b4159756c8d7f41c5e22a8d2fa320f Mon Sep 17 00:00:00 2001 From: sima Date: Wed, 31 Jul 2024 11:30:53 +0800 Subject: [PATCH 08/24] fix:[TD-31113] return 0.0 instead of -0.0 when using round() and ceil() --- source/libs/scalar/src/sclfunc.c | 4 ++-- tests/system-test/2-query/ceil.py | 5 ++++- tests/system-test/2-query/round.py | 5 ++++- 3 files changed, 10 insertions(+), 4 deletions(-) diff --git a/source/libs/scalar/src/sclfunc.c b/source/libs/scalar/src/sclfunc.c index 3e5471700c..23cc7324f0 100644 --- a/source/libs/scalar/src/sclfunc.c +++ b/source/libs/scalar/src/sclfunc.c @@ -262,7 +262,7 @@ static int32_t doScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarP colDataSetNULL(pOutputData, i); continue; } - out[i] = f1(in[i]); + out[i] = f1(in[i]) + 0; } break; } @@ -276,7 +276,7 @@ static int32_t doScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarP colDataSetNULL(pOutputData, i); continue; } - out[i] = d1(in[i]); + out[i] = d1(in[i]) + 0; } break; } diff --git a/tests/system-test/2-query/ceil.py b/tests/system-test/2-query/ceil.py index aabc716a74..e719d819d8 100644 --- a/tests/system-test/2-query/ceil.py +++ b/tests/system-test/2-query/ceil.py @@ -57,7 +57,7 @@ class TDTestCase: ( '2020-10-21 01:01:01.000', 1, 11111, 111, 11, 1.11, 11.11, 1, "binary1", "nchar1", now()+1a ) ( '2020-12-31 01:01:01.000', 2, 22222, 222, 22, 2.22, 22.22, 0, "binary2", "nchar2", now()+2a ) ( '2021-01-01 01:01:06.000', 3, 33333, 333, 33, 3.33, 33.33, 0, "binary3", "nchar3", now()+3a ) - ( '2021-05-07 01:01:10.000', 4, 44444, 444, 44, 4.44, 44.44, 1, "binary4", "nchar4", now()+4a ) + ( '2021-05-07 01:01:10.000', 4, 44444, 444, 44, -0.444, 44.44, 1, "binary4", "nchar4", now()+4a ) ( '2021-07-21 01:01:01.000', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ( '2021-09-30 01:01:16.000', 5, 55555, 555, 55, 5.55, 55.55, 0, "binary5", "nchar5", now()+5a ) ( '2022-02-01 01:01:20.000', 6, 66666, 666, 66, 6.66, 66.66, 1, "binary6", "nchar6", now()+6a ) @@ -223,6 +223,9 @@ class TDTestCase: tdSql.checkData(3, 4, 33) tdSql.checkData(5, 5, None) + tdSql.query(f"select ceil(c5) from {dbname}.t1") + tdSql.checkData(4 , 0, 0) + self.check_result_auto( f"select c1, c2, c3 , c4, c5 from {dbname}.t1", f"select (c1), ceil(c2) ,ceil(c3), ceil(c4), ceil(c5) from {dbname}.t1") # used for sub table diff --git a/tests/system-test/2-query/round.py b/tests/system-test/2-query/round.py index d647f516ae..f87f234fa3 100644 --- a/tests/system-test/2-query/round.py +++ b/tests/system-test/2-query/round.py @@ -53,7 +53,7 @@ class TDTestCase: ( '2020-10-21 01:01:01.000', 1, 11111, 111, 11, 1.11, 11.11, 1, "binary1", "nchar1", now()+1a ) ( '2020-12-31 01:01:01.000', 2, 22222, 222, 22, 2.22, 22.22, 0, "binary2", "nchar2", now()+2a ) ( '2021-01-01 01:01:06.000', 3, 33333, 333, 33, 3.33, 33.33, 0, "binary3", "nchar3", now()+3a ) - ( '2021-05-07 01:01:10.000', 4, 44444, 444, 44, 4.44, 44.44, 1, "binary4", "nchar4", now()+4a ) + ( '2021-05-07 01:01:10.000', 4, 44444, 444, 44, -0.444, 44.44, 1, "binary4", "nchar4", now()+4a ) ( '2021-07-21 01:01:01.000', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ( '2021-09-30 01:01:16.000', 5, 55555, 555, 55, 5.55, 55.55, 0, "binary5", "nchar5", now()+5a ) ( '2022-02-01 01:01:20.000', 6, 66666, 666, 66, 6.66, 66.66, 1, "binary6", "nchar6", now()+6a ) @@ -232,6 +232,9 @@ class TDTestCase: tdSql.checkData(3, 4, 33) tdSql.checkData(5, 5, None) + tdSql.query(f"select round(c5) from {dbname}.t1") + tdSql.checkData(4 , 0, 0) + self.check_result_auto( f"select c1, c2, c3 , c4, c5 from {dbname}.t1", f"select (c1), round(c2) ,round(c3), round(c4), round(c5) from {dbname}.t1") # used for sub table From 7298feac146582bc5fd403d36c350707d13e6211 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 31 Jul 2024 14:11:45 +0800 Subject: [PATCH 09/24] fix:[TD-31146] invalid read if tmq is freed --- source/client/src/clientTmq.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 46c3cf6622..84cc251b46 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -240,7 +240,7 @@ typedef struct { SMqCommitCbParamSet* params; char topicName[TSDB_TOPIC_FNAME_LEN]; int32_t vgId; - tmq_t* pTmq; + int64_t consumerId; } SMqCommitCbParam; typedef struct SSyncCommitInfo { @@ -439,7 +439,7 @@ static int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) { taosMemoryFree(pBuf->pData); taosMemoryFree(pBuf->pEpSet); - return commitRspCountDown(pParamSet, pParam->pTmq->consumerId, pParam->topicName, pParam->vgId); + return commitRspCountDown(pParamSet, pParam->consumerId, pParam->topicName, pParam->vgId); } static int32_t doSendCommitMsg(tmq_t* tmq, int32_t vgId, SEpSet* epSet, STqOffsetVal* offset, const char* pTopicName, @@ -483,7 +483,7 @@ static int32_t doSendCommitMsg(tmq_t* tmq, int32_t vgId, SEpSet* epSet, STqOffse pParam->params = pParamSet; pParam->vgId = vgId; - pParam->pTmq = tmq; + pParam->consumerId = tmq->consumerId; tstrncpy(pParam->topicName, pTopicName, tListLen(pParam->topicName)); From 6c8303297c8d8a20d7e5756eb2856cf10baeb78f Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 31 Jul 2024 14:36:05 +0800 Subject: [PATCH 10/24] fix:[TD-31015]monitor close first before slow log thread exit --- source/client/src/clientMonitor.c | 61 ++++++++++--------------------- 1 file changed, 19 insertions(+), 42 deletions(-) diff --git a/source/client/src/clientMonitor.c b/source/client/src/clientMonitor.c index 92a8fc3b29..4bb29f8d97 100644 --- a/source/client/src/clientMonitor.c +++ b/source/client/src/clientMonitor.c @@ -12,13 +12,13 @@ SRWLatch monitorLock; void* monitorTimer; SHashObj* monitorCounterHash; -int32_t slowLogFlag = -1; -int32_t monitorFlag = -1; +int32_t monitorFlag = 0; int32_t quitCnt = 0; tsem2_t monitorSem; STaosQueue* monitorQueue; SHashObj* monitorSlowLogHash; char tmpSlowLogPath[PATH_MAX] = {0}; +TdThread monitorThread; static int32_t getSlowLogTmpDir(char* tmpPath, int32_t size) { int ret = snprintf(tmpPath, size, "%s/tdengine_slow_log/", tsTempDir); @@ -113,11 +113,11 @@ static int32_t monitorReportAsyncCB(void* param, SDataBuf* pMsg, int32_t code) { tscError("failed to send slow log:%s, clusterId:%" PRIx64, p->data, p->clusterId); } MonitorSlowLogData tmp = {.clusterId = p->clusterId, - .type = p->type, - .fileName = p->fileName, - .pFile = p->pFile, - .offset = p->offset, - .data = NULL}; + .type = p->type, + .fileName = p->fileName, + .pFile = p->pFile, + .offset = p->offset, + .data = NULL}; if (monitorPutData2MonitorQueue(tmp) == 0) { p->fileName = NULL; } @@ -164,7 +164,7 @@ static int32_t sendReport(void* pTransporter, SEpSet* epSet, char* pCont, MONITO int64_t transporterId = 0; return asyncSendMsgToServer(pTransporter, epSet, &transporterId, pInfo); -FAILED: + FAILED: monitorFreeSlowLogDataEx(param); return TAOS_GET_TERRNO(TSDB_CODE_TSC_INTERNAL_ERROR); } @@ -276,12 +276,10 @@ void monitorCreateClient(int64_t clusterId) { tscInfo("[monitor] monitorCreateClient for %" PRIx64 "finished %p.", clusterId, pMonitor); } taosWUnLockLatch(&monitorLock); - if (-1 != atomic_val_compare_exchange_32(&monitorFlag, -1, 0)) { - tscDebug("[monitor] monitorFlag already is 0"); - } + return; -fail: + fail: destroyMonitorClient(&pMonitor); taosWUnLockLatch(&monitorLock); } @@ -301,7 +299,7 @@ void monitorCreateClientCounter(int64_t clusterId, const char* name, const char* tscError("failed to add metric to collector"); (void)taos_counter_destroy(newCounter); goto end; -} + } if (taosHashPut(pMonitor->counters, name, strlen(name), &newCounter, POINTER_BYTES) != 0) { tscError("failed to put counter to monitor"); (void)taos_counter_destroy(newCounter); @@ -310,7 +308,7 @@ void monitorCreateClientCounter(int64_t clusterId, const char* name, const char* tscInfo("[monitor] monitorCreateClientCounter %" PRIx64 "(%p):%s : %p.", pMonitor->clusterId, pMonitor, name, newCounter); -end: + end: taosWUnLockLatch(&monitorLock); } @@ -339,7 +337,7 @@ void monitorCounterInc(int64_t clusterId, const char* counterName, const char** } tscDebug("[monitor] monitorCounterInc %" PRIx64 "(%p):%s", pMonitor->clusterId, pMonitor, counterName); -end: + end: taosWUnLockLatch(&monitorLock); } @@ -348,8 +346,6 @@ const char* monitorResultStr(SQL_RESULT_CODE code) { return result_state[code]; } -static void monitorThreadFuncUnexpectedStopped(void) { atomic_store_32(&slowLogFlag, -1); } - static void monitorWriteSlowLog2File(MonitorSlowLogData* slowLogData, char* tmpPath) { TdFilePtr pFile = NULL; void* tmp = taosHashGet(monitorSlowLogHash, &slowLogData->clusterId, LONG_BYTES); @@ -693,20 +689,10 @@ static void monitorSendAllSlowLogFromTempDir(int64_t clusterId) { static void* monitorThreadFunc(void* param) { setThreadName("client-monitor-slowlog"); - -#ifdef WINDOWS - if (taosCheckCurrentInDll()) { - atexit(monitorThreadFuncUnexpectedStopped); - } -#endif - - if (-1 != atomic_val_compare_exchange_32(&slowLogFlag, -1, 0)) { - return NULL; - } tscDebug("monitorThreadFunc start"); int64_t quitTime = 0; while (1) { - if (atomic_load_32(&slowLogFlag) > 0) { + if (atomic_load_32(&monitorFlag) == 1) { if (quitCnt == 0) { monitorSendAllSlowLogAtQuit(); if (quitCnt == 0) { @@ -752,7 +738,6 @@ static void* monitorThreadFunc(void* param) { } (void)tsem2_timewait(&monitorSem, 100); } - atomic_store_32(&slowLogFlag, -2); return NULL; } @@ -767,7 +752,6 @@ static int32_t tscMonitortInit() { return TSDB_CODE_TSC_INTERNAL_ERROR; } - TdThread monitorThread; if (taosThreadCreate(&monitorThread, &thAttr, monitorThreadFunc, NULL) != 0) { tscError("failed to create monitor thread since %s", strerror(errno)); return TSDB_CODE_TSC_INTERNAL_ERROR; @@ -778,13 +762,9 @@ static int32_t tscMonitortInit() { } static void tscMonitorStop() { - if (atomic_val_compare_exchange_32(&slowLogFlag, 0, 1)) { - tscDebug("monitor thread already stopped"); - return; - } - - while (atomic_load_32(&slowLogFlag) > 0) { - taosMsleep(100); + if (taosCheckPthreadValid(monitorThread)) { + (void)taosThreadJoin(monitorThread, NULL); + (void)taosThreadClear(&monitorThread); } } @@ -842,10 +822,7 @@ int32_t monitorInit() { void monitorClose() { tscInfo("[monitor] tscMonitor close"); taosWLockLatch(&monitorLock); - - if (atomic_val_compare_exchange_32(&monitorFlag, 0, 1)) { - tscDebug("[monitor] monitorFlag is not 0"); - } + atomic_store_32(&monitorFlag, 1); tscMonitorStop(); sendAllCounter(); taosHashCleanup(monitorCounterHash); @@ -860,7 +837,7 @@ int32_t monitorPutData2MonitorQueue(MonitorSlowLogData data) { int32_t code = 0; MonitorSlowLogData* slowLogData = NULL; - if (atomic_load_32(&slowLogFlag) == -2) { + if (atomic_load_32(&monitorFlag) == 1) { tscError("[monitor] slow log thread is exiting"); return -1; } From 8e23cbfa2e785335674cd89f812e6bca3881c705 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Wed, 31 Jul 2024 15:37:21 +0800 Subject: [PATCH 11/24] fix: fetch row failed issue --- source/client/src/clientImpl.c | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index a458edcad9..75c1eabe7e 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -2920,8 +2920,10 @@ void taosAsyncFetchImpl(SRequestObj* pRequest, __taos_async_fn_t fp, void* param .cbParam = pRequest, }; - if (TSDB_CODE_SUCCESS != schedulerFetchRows(pRequest->body.queryJob, &req)) { - tscError("0x%" PRIx64 " failed to schedule fetch rows", pRequest->self); + int32_t code = schedulerFetchRows(pRequest->body.queryJob, &req); + if (TSDB_CODE_SUCCESS != code) { + tscError("0x%" PRIx64 " failed to schedule fetch rows", pRequest->requestId); + pRequest->body.fetchFp(param, pRequest, code); } } From ffba28b6c3f7875feddafa7d764173c87e71958a Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 31 Jul 2024 23:36:16 +0800 Subject: [PATCH 12/24] feat:[TD-31097]init lock to avoid error in mac os --- source/dnode/mnode/impl/src/mndMain.c | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index 7e072b8fe5..37a171e9a4 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -669,6 +669,13 @@ SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) { } (void)memset(pMnode, 0, sizeof(SMnode)); + int32_t code = taosThreadRwlockInit(&pMnode->lock, NULL); + if (code != 0) { + taosMemoryFree(pMnode); + mError("failed to open mnode lock since %s", tstrerror(code)); + return NULL; + } + char timestr[24] = "1970-01-01 00:00:00.00"; (void)taosParseTime(timestr, &pMnode->checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0); mndSetOptions(pMnode, pOption); @@ -682,7 +689,7 @@ SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) { return NULL; } - int32_t code = mndCreateDir(pMnode, path); + code = mndCreateDir(pMnode, path); if (code != 0) { code = terrno; mError("failed to open mnode since %s", tstrerror(code)); From 75efea55513161a182eec91a84d61436fed7c57a Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 31 Jul 2024 23:44:20 +0800 Subject: [PATCH 13/24] feat:[TS-4592]remove lost status for consumer --- source/dnode/mnode/impl/src/mndConsumer.c | 2 ++ source/dnode/mnode/impl/src/mndSubscribe.c | 5 ++--- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 37527325db..1a9f808688 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -901,6 +901,7 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock * // status const char *pStatusName = mndConsumerStatusName(pConsumer->status); status = taosMemoryCalloc(1, pShow->pMeta->pSchemas[cols].bytes); + MND_TMQ_NULL_CHECK(status); STR_TO_VARSTR(status, pStatusName); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); @@ -940,6 +941,7 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock * tFormatOffset(buf, TSDB_OFFSET_LEN, &pVal); parasStr = taosMemoryCalloc(1, pShow->pMeta->pSchemas[cols].bytes); + MND_TMQ_NULL_CHECK(parasStr); (void)sprintf(varDataVal(parasStr), "tbname:%d,commit:%d,interval:%dms,reset:%s", pConsumer->withTbName, pConsumer->autoCommit, pConsumer->autoCommitInterval, buf); varDataSetLen(parasStr, strlen(varDataVal(parasStr))); diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index b2a866979c..8bc3c9064e 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -396,7 +396,6 @@ static int32_t processRemoveAddVgs(SMnode *pMnode, SMqRebOutputObj *pOutput) { END: taosMemoryFree(pVgEp); - sdbRelease(pMnode->pSdb, pVgroup); taosArrayDestroyP(newVgs, (FDelete)tDeleteSMqVgEp); return code; } @@ -773,7 +772,7 @@ static int32_t mndCheckConsumer(SRpcMsg *pMsg, SHashObj *rebSubHash) { if (status == MQ_CONSUMER_STATUS_READY) { if (taosArrayGetSize(pConsumer->currentTopics) == 0) { // unsubscribe or close - mndSendConsumerMsg(pMnode, pConsumer->consumerId, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, &pMsg->info); + MND_TMQ_RETURN_CHECK(mndSendConsumerMsg(pMnode, pConsumer->consumerId, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, &pMsg->info)); } else if (hbStatus * tsMqRebalanceInterval * 1000 >= pConsumer->sessionTimeoutMs || pollStatus * tsMqRebalanceInterval * 1000 >= pConsumer->maxPollIntervalMs) { taosRLockLatch(&pConsumer->lock); @@ -788,7 +787,7 @@ static int32_t mndCheckConsumer(SRpcMsg *pMsg, SHashObj *rebSubHash) { MND_TMQ_RETURN_CHECK(buildRebInfo(rebSubHash, pConsumer->rebRemovedTopics, 0, pConsumer->cgroup, pConsumer->consumerId)); taosRUnLockLatch(&pConsumer->lock); } else { - mndSendConsumerMsg(pMnode, pConsumer->consumerId, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, &pMsg->info); + MND_TMQ_RETURN_CHECK(mndSendConsumerMsg(pMnode, pConsumer->consumerId, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, &pMsg->info)); } mndReleaseConsumer(pMnode, pConsumer); From 86b06d0a7a91a61ff4564ae01b2670cab00b5bf2 Mon Sep 17 00:00:00 2001 From: xsren <285808407@qq.com> Date: Thu, 1 Aug 2024 09:42:55 +0800 Subject: [PATCH 14/24] fix: PkOrder Mem --- source/libs/executor/src/tsort.c | 1 + 1 file changed, 1 insertion(+) diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index fa7d59e137..896b4db7cd 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -1699,6 +1699,7 @@ static int32_t initRowIdSort(SSortHandle* pHandle) { taosArrayDestroy(pHandle->pSortInfo); pHandle->pSortInfo = pOrderInfoList; + pHandle->cmpParam.pPkOrder = (pHandle->bSortPk) ? taosArrayGet(pHandle->pSortInfo, 1) : NULL; return TSDB_CODE_SUCCESS; } From ff1778220c27b3634c397c358a5bf62af1ac285e Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Thu, 1 Aug 2024 10:24:43 +0800 Subject: [PATCH 15/24] feat:[TS-4592]remove lost status for consumer --- source/dnode/mnode/impl/src/mndSubscribe.c | 1 + 1 file changed, 1 insertion(+) diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 8bc3c9064e..9a64387e82 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -395,6 +395,7 @@ static int32_t processRemoveAddVgs(SMnode *pMnode, SMqRebOutputObj *pOutput) { return totalVgNum; END: + sdbRelease(pMnode->pSdb, pVgroup); taosMemoryFree(pVgEp); taosArrayDestroyP(newVgs, (FDelete)tDeleteSMqVgEp); return code; From b93e283945053dcbd8f0c36ae21f16d876aea18d Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 1 Aug 2024 10:30:37 +0800 Subject: [PATCH 16/24] fix(stream): check existence for dst stable. --- source/dnode/vnode/src/tq/tqSink.c | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index d9e39ad6f5..3b375f7f82 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -957,6 +957,12 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) { metaReaderDoInit(&mer1, pVnode->pMeta, META_READER_LOCK); code = metaReaderGetTableEntryByUid(&mer1, pOutputInfo->tbSink.stbUid); + if (code != TSDB_CODE_SUCCESS) { + tqError("s-task:%s vgId:%d failed to get the dst stable, failed to sink results", id, vgId); + metaReaderClear(&mer1); + return; + } + pOutputInfo->tbSink.pTagSchema = tCloneSSchemaWrapper(&mer1.me.stbEntry.schemaTag); metaReaderClear(&mer1); From c75281b9ff93df4436820d775c06b3f62f89849a Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Thu, 1 Aug 2024 11:29:21 +0800 Subject: [PATCH 17/24] fix:[TD-31146] invalid read if tmq is freed --- source/client/src/clientTmq.c | 32 +++++++++++++++----------------- 1 file changed, 15 insertions(+), 17 deletions(-) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 84cc251b46..c3867f2821 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1452,22 +1452,22 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { tmq_t* tmq = NULL; SMqPollCbParam* pParam = (SMqPollCbParam*)param; if (pParam == NULL || pMsg == NULL) { - goto FAIL2; + return TSDB_CODE_TSC_INTERNAL_ERROR; } int64_t refId = pParam->refId; int32_t vgId = pParam->vgId; uint64_t requestId = pParam->requestId; tmq = taosAcquireRef(tmqMgmt.rsetId, refId); if (tmq == NULL) { - code = TSDB_CODE_TMQ_CONSUMER_CLOSED; - goto FAIL2; + return TSDB_CODE_TMQ_CONSUMER_CLOSED; } SMqPollRspWrapper* pRspWrapper = NULL; - code = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM, 0, (void**)&pRspWrapper); - if (code) { + int32_t ret = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM, 0, (void**)&pRspWrapper); + if (ret) { + code = ret; tscWarn("consumer:0x%" PRIx64 " msg discard from vgId:%d, since out of memory", tmq->consumerId, vgId); - goto FAIL1; + goto END; } if (code != 0) { @@ -1550,25 +1550,23 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { } END: - pRspWrapper->code = code; - pRspWrapper->vgId = vgId; - (void)strcpy(pRspWrapper->topicName, pParam->topicName); - code = taosWriteQitem(tmq->mqueue, pRspWrapper); - if(code != 0){ - tscError("consumer:0x%" PRIx64 " put poll res into mqueue failed, code:%d", tmq->consumerId, code); + if (pRspWrapper){ + pRspWrapper->code = code; + pRspWrapper->vgId = vgId; + (void)strcpy(pRspWrapper->topicName, pParam->topicName); + code = taosWriteQitem(tmq->mqueue, pRspWrapper); + if(code != 0){ + tscError("consumer:0x%" PRIx64 " put poll res into mqueue failed, code:%d", tmq->consumerId, code); + } } - int32_t total = taosQueueItemSize(tmq->mqueue); tscDebug("consumer:0x%" PRIx64 " put poll res into mqueue, type:%d, vgId:%d, total in queue:%d, reqId:0x%" PRIx64, tmq->consumerId, rspType, vgId, total, requestId); -FAIL1: - (void)taosReleaseRef(tmqMgmt.rsetId, refId); - -FAIL2: if (tmq) (void)tsem2_post(&tmq->rspSem); if (pMsg) taosMemoryFreeClear(pMsg->pData); if (pMsg) taosMemoryFreeClear(pMsg->pEpSet); + (void)taosReleaseRef(tmqMgmt.rsetId, refId); return code; } From db47d2f8ab3d0ac772f22f9b0711606c7a5ade70 Mon Sep 17 00:00:00 2001 From: xsren <285808407@qq.com> Date: Thu, 1 Aug 2024 14:05:28 +0800 Subject: [PATCH 18/24] fix: udfd close --- source/libs/function/src/tudf.c | 1 + 1 file changed, 1 insertion(+) diff --git a/source/libs/function/src/tudf.c b/source/libs/function/src/tudf.c index 66e503fd89..ad9e5ce7d4 100644 --- a/source/libs/function/src/tudf.c +++ b/source/libs/function/src/tudf.c @@ -229,6 +229,7 @@ static void udfWatchUdfd(void *args) { if(uv_loop_close(&pData->loop) != 0) { fnError("udfd loop close failed, lino:%d", __LINE__); } + return; _exit: if (terrno != 0) { From c482e83fbe77842343b176cc88e40173614993a0 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Thu, 1 Aug 2024 15:50:21 +0800 Subject: [PATCH 19/24] fix issue --- source/libs/executor/src/streamfilloperator.c | 1 + 1 file changed, 1 insertion(+) diff --git a/source/libs/executor/src/streamfilloperator.c b/source/libs/executor/src/streamfilloperator.c index c6bf13dabd..480814f6a0 100644 --- a/source/libs/executor/src/streamfilloperator.c +++ b/source/libs/executor/src/streamfilloperator.c @@ -470,6 +470,7 @@ static int32_t checkResult(SStreamFillSupporter* pFillSup, TSKEY ts, uint64_t gr SWinKey key = {.groupId = groupId, .ts = ts}; if (tSimpleHashGet(pFillSup->pResMap, &key, sizeof(SWinKey)) != NULL) { (*pRes) = false; + goto _end; } code = tSimpleHashPut(pFillSup->pResMap, &key, sizeof(SWinKey), NULL, 0); QUERY_CHECK_CODE(code, lino, _end); From d6ac07d574661d961d70e25bc9e27ae461161a45 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Thu, 1 Aug 2024 15:55:09 +0800 Subject: [PATCH 20/24] fix: query timeline based on function issue --- source/libs/parser/src/parTranslater.c | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 879f527a85..85d9ddde85 100755 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -2608,7 +2608,8 @@ static int32_t calcSelectFuncNum(SFunctionNode* pFunc, int32_t currSelectFuncNum : 1); } -static void setFuncClassification(SNode* pCurrStmt, SFunctionNode* pFunc) { +static void setFuncClassification(STranslateContext* pCxt, SFunctionNode* pFunc) { + SNode* pCurrStmt = pCxt->pCurrStmt; if (NULL != pCurrStmt && QUERY_NODE_SELECT_STMT == nodeType(pCurrStmt)) { SSelectStmt* pSelect = (SSelectStmt*)pCurrStmt; pSelect->hasAggFuncs = pSelect->hasAggFuncs ? true : fmIsAggFunc(pFunc->funcId); @@ -2641,7 +2642,9 @@ static void setFuncClassification(SNode* pCurrStmt, SFunctionNode* pFunc) { pSelect->hasLastFunc = pSelect->hasLastFunc ? true : (FUNCTION_TYPE_LAST == pFunc->funcType); pSelect->hasTimeLineFunc = pSelect->hasTimeLineFunc ? true : fmIsTimelineFunc(pFunc->funcId); pSelect->hasUdaf = pSelect->hasUdaf ? true : fmIsUserDefinedFunc(pFunc->funcId) && fmIsAggFunc(pFunc->funcId); - pSelect->onlyHasKeepOrderFunc = pSelect->onlyHasKeepOrderFunc ? fmIsKeepOrderFunc(pFunc->funcId) : false; + if (SQL_CLAUSE_SELECT == pCxt->currClause) { + pSelect->onlyHasKeepOrderFunc = pSelect->onlyHasKeepOrderFunc ? fmIsKeepOrderFunc(pFunc->funcId) : false; + } } } @@ -2903,7 +2906,7 @@ static int32_t translateNormalFunction(STranslateContext* pCxt, SNode** ppNode) code = translateBlockDistFunc(pCxt, pFunc); } if (TSDB_CODE_SUCCESS == code) { - setFuncClassification(pCxt->pCurrStmt, pFunc); + setFuncClassification(pCxt, pFunc); } return code; } From 2075bfd0f6cdda5bf17b4886006856bf0e7d3bbc Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 1 Aug 2024 17:17:32 +0800 Subject: [PATCH 21/24] fix(stream): check return value. --- source/dnode/mnode/impl/src/mndStream.c | 97 +++++++++++++++++++------ source/libs/executor/src/sortoperator.c | 1 + source/util/src/tcompression.c | 4 +- 3 files changed, 77 insertions(+), 25 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index b7ab76984a..de10e991d3 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -290,7 +290,7 @@ static int32_t createSchemaByFields(const SArray *pFields, SSchemaWrapper *pWrap pWrapper->nCols = taosArrayGetSize(pFields); pWrapper->pSchema = taosMemoryCalloc(pWrapper->nCols, sizeof(SSchema)); if (NULL == pWrapper->pSchema) { - return TSDB_CODE_OUT_OF_MEMORY; + return terrno; } SNode *pNode; @@ -328,15 +328,18 @@ static bool hasDestPrimaryKey(SSchemaWrapper *pWrapper) { static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj, SCMCreateStreamReq *pCreate) { SNode *pAst = NULL; SQueryPlan *pPlan = NULL; + int32_t code = 0; mInfo("stream:%s to create", pCreate->name); memcpy(pObj->name, pCreate->name, TSDB_STREAM_FNAME_LEN); pObj->createTime = taosGetTimestampMs(); pObj->updateTime = pObj->createTime; pObj->version = 1; + if (pCreate->smaId > 0) { pObj->subTableWithoutMd5 = 1; } + pObj->smaId = pCreate->smaId; pObj->indexForMultiAggBalance = -1; @@ -360,8 +363,10 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj, SDbObj *pSourceDb = mndAcquireDb(pMnode, pCreate->sourceDB); if (pSourceDb == NULL) { mInfo("stream:%s failed to create, source db %s not exist since %s", pCreate->name, pObj->sourceDb, terrstr()); - return terrno; + code = terrno; + goto FAIL; } + pObj->sourceDbUid = pSourceDb->uid; mndReleaseDb(pMnode, pSourceDb); @@ -369,9 +374,11 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj, SDbObj *pTargetDb = mndAcquireDbByStb(pMnode, pObj->targetSTbName); if (pTargetDb == NULL) { - mInfo("stream:%s failed to create, target db %s not exist since %s", pCreate->name, pObj->targetDb, terrstr()); - return terrno; + mError("stream:%s failed to create, target db %s not exist since %s", pCreate->name, pObj->targetDb, terrstr()); + code = terrno; + goto FAIL; } + tstrncpy(pObj->targetDb, pTargetDb->name, TSDB_DB_FNAME_LEN); if (pCreate->createStb == STREAM_CREATE_STABLE_TRUE) { @@ -389,12 +396,12 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj, pCreate->ast = NULL; // deserialize ast - if (nodesStringToNode(pObj->ast, &pAst) < 0) { + if ((code = nodesStringToNode(pObj->ast, &pAst)) < 0) { goto FAIL; } // create output schema - if (createSchemaByFields(pCreate->pCols, &pObj->outputSchema) != TSDB_CODE_SUCCESS) { + if ((code = createSchemaByFields(pCreate->pCols, &pObj->outputSchema)) != TSDB_CODE_SUCCESS) { goto FAIL; } @@ -403,6 +410,7 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj, pObj->outputSchema.nCols += numOfNULL; SSchema *pFullSchema = taosMemoryCalloc(pObj->outputSchema.nCols, sizeof(SSchema)); if (!pFullSchema) { + code = terrno; goto FAIL; } @@ -410,6 +418,10 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj, int32_t dataIndex = 0; for (int16_t i = 0; i < pObj->outputSchema.nCols; i++) { SColLocation *pos = taosArrayGet(pCreate->fillNullCols, nullIndex); + if (pos == NULL) { + continue; + } + if (nullIndex >= numOfNULL || i < pos->slotId) { pFullSchema[i].bytes = pObj->outputSchema.pSchema[dataIndex].bytes; pFullSchema[i].colId = i + 1; // pObj->outputSchema.pSchema[dataIndex].colId; @@ -444,22 +456,31 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj, }; // using ast and param to build physical plan - if (qCreateQueryPlan(&cxt, &pPlan, NULL) < 0) { + if ((code = qCreateQueryPlan(&cxt, &pPlan, NULL)) < 0) { goto FAIL; } // save physcial plan - if (nodesNodeToString((SNode *)pPlan, false, &pObj->physicalPlan, NULL) != 0) { + if ((code = nodesNodeToString((SNode *)pPlan, false, &pObj->physicalPlan, NULL)) != 0) { goto FAIL; } pObj->tagSchema.nCols = pCreate->numOfTags; if (pCreate->numOfTags) { pObj->tagSchema.pSchema = taosMemoryCalloc(pCreate->numOfTags, sizeof(SSchema)); + if (pObj->tagSchema.pSchema == NULL) { + code = terrno; + goto FAIL; + } } + /*A(pCreate->numOfTags == taosArrayGetSize(pCreate->pTags));*/ for (int32_t i = 0; i < pCreate->numOfTags; i++) { SField *pField = taosArrayGet(pCreate->pTags, i); + if (pField == NULL) { + continue; + } + pObj->tagSchema.pSchema[i].colId = pObj->outputSchema.nCols + i + 1; pObj->tagSchema.pSchema[i].bytes = pField->bytes; pObj->tagSchema.pSchema[i].flags = pField->flags; @@ -470,7 +491,7 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj, FAIL: if (pAst != NULL) nodesDestroyNode(pAst); if (pPlan != NULL) qDestroyQueryPlan(pPlan); - return 0; + return code; } int32_t mndPersistTaskDeployReq(STrans *pTrans, SStreamTask *pTask) { @@ -575,12 +596,15 @@ int32_t mndPersistStream(STrans *pTrans, SStreamObj *pStream) { static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStreamObj *pStream, const char *user) { SStbObj *pStb = NULL; SDbObj *pDb = NULL; + int32_t code = 0; + int32_t lino = 0; SMCreateStbReq createReq = {0}; tstrncpy(createReq.name, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN); createReq.numOfColumns = pStream->outputSchema.nCols; createReq.numOfTags = 1; // group id createReq.pColumns = taosArrayInit_s(sizeof(SFieldWithOptions), createReq.numOfColumns); + TSDB_CHECK_NULL(createReq.pColumns, code, lino, _OVER, terrno); // build fields for (int32_t i = 0; i < createReq.numOfColumns; i++) { @@ -595,6 +619,8 @@ static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStre if (pStream->tagSchema.nCols == 0) { createReq.numOfTags = 1; createReq.pTags = taosArrayInit_s(sizeof(SField), 1); + TSDB_CHECK_NULL(createReq.pTags, code, lino, _OVER, terrno); + // build tags SField *pField = taosArrayGet(createReq.pTags, 0); strcpy(pField->name, "group_id"); @@ -604,6 +630,8 @@ static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStre } else { createReq.numOfTags = pStream->tagSchema.nCols; createReq.pTags = taosArrayInit_s(sizeof(SField), createReq.numOfTags); + TSDB_CHECK_NULL(createReq.pTags, code, lino, _OVER, terrno); + for (int32_t i = 0; i < createReq.numOfTags; i++) { SField *pField = taosArrayGet(createReq.pTags, i); pField->bytes = pStream->tagSchema.pSchema[i].bytes; @@ -657,7 +685,7 @@ static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStre mndReleaseStb(pMnode, pStb); mndReleaseDb(pMnode, pDb); mDebug("stream:%s create dst stable:%s, cols:%d", pStream->name, pStream->targetSTbName, pStream->outputSchema.nCols); - return 0; + return code; _OVER: tFreeSMCreateStbReq(&createReq); @@ -665,7 +693,7 @@ _OVER: mndReleaseDb(pMnode, pDb); mDebug("stream:%s failed to create dst stable:%s, code:%s", pStream->name, pStream->targetSTbName, tstrerror(terrno)); - return -1; + return code; } // 1. stream number check @@ -709,9 +737,9 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { char *sql = NULL; int32_t sqlLen = 0; const char *pMsg = "create stream tasks on dnodes"; - int32_t code = 0; - terrno = TSDB_CODE_SUCCESS; + int32_t code = TSDB_CODE_SUCCESS; + terrno = TSDB_CODE_SUCCESS; SCMCreateStreamReq createReq = {0}; if (tDeserializeSCMCreateStreamReq(pReq->pCont, pReq->contLen, &createReq) != 0) { code = TSDB_CODE_INVALID_MSG; @@ -749,6 +777,11 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { if (createReq.sql != NULL) { sqlLen = strlen(createReq.sql); sql = taosMemoryMalloc(sqlLen + 1); + if (sql == NULL) { + code = terrno; + goto _OVER; + } + memset(sql, 0, sqlLen + 1); memcpy(sql, createReq.sql, sqlLen); } @@ -942,8 +975,7 @@ static int32_t mndBuildStreamCheckpointSourceReq(void **pBuf, int32_t *pLen, int void *buf = taosMemoryMalloc(tlen); if (buf == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; + return terrno; } void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); @@ -1150,7 +1182,11 @@ static int32_t mndCheckTaskAndNodeStatus(SMnode *pMnode) { SArray *pInvalidList = taosArrayInit(4, sizeof(STaskId)); for (int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) { - STaskId *p = taosArrayGet(execInfo.pTaskList, i); + STaskId *p = taosArrayGet(execInfo.pTaskList, i); + if (p == NULL) { + continue; + } + STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, p, sizeof(*p)); if (pEntry == NULL) { continue; @@ -1159,8 +1195,12 @@ static int32_t mndCheckTaskAndNodeStatus(SMnode *pMnode) { if (pEntry->status == TASK_STATUS__STOP) { for (int32_t j = 0; j < taosArrayGetSize(pInvalidList); ++j) { STaskId *pId = taosArrayGet(pInvalidList, j); + if (pId == NULL) { + continue; + } + if (pEntry->id.streamId == pId->streamId) { - void* px = taosArrayPush(pInvalidList, &pEntry->id); + void *px = taosArrayPush(pInvalidList, &pEntry->id); if (px == NULL) { mError("failed to put stream into invalid list, code:%s", tstrerror(TSDB_CODE_OUT_OF_MEMORY)); } @@ -1243,6 +1283,10 @@ static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) { } SArray *pList = taosArrayInit(4, sizeof(SCheckpointInterval)); + if (pList == NULL) { + return -1; + } + int64_t now = taosGetTimestampMs(); while ((pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream)) != NULL) { @@ -2472,14 +2516,15 @@ int32_t mndProcessCheckpointReport(SRpcMsg *pReq) { SArray **pReqTaskList = (SArray **)taosHashGet(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId)); if (pReqTaskList == NULL) { SArray *pList = taosArrayInit(4, sizeof(STaskChkptInfo)); - doAddReportStreamTask(pList, &req); + if (pList != NULL) { + doAddReportStreamTask(pList, &req); + code = taosHashPut(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId), &pList, POINTER_BYTES); + if (code) { + mError("stream:0x%" PRIx64 " failed to put into checkpoint stream", req.streamId); + } - code = taosHashPut(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId), &pList, POINTER_BYTES); - if (code) { - mError("stream:0x%"PRIx64 " failed to put into checkpoint stream", req.streamId); + pReqTaskList = (SArray **)taosHashGet(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId)); } - - pReqTaskList = (SArray **)taosHashGet(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId)); } else { doAddReportStreamTask(*pReqTaskList, &req); } @@ -2545,6 +2590,9 @@ int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) { SMnode *pMnode = pMsg->info.node; int64_t now = taosGetTimestampMs(); SArray *pStreamList = taosArrayInit(4, sizeof(int64_t)); + if (pStreamList == NULL) { + return terrno; + } mDebug("start to process consensus-checkpointId in tmr"); @@ -2572,6 +2620,9 @@ int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) { int64_t streamId = -1; int32_t num = taosArrayGetSize(pInfo->pTaskList); SArray *pList = taosArrayInit(4, sizeof(int32_t)); + if (pList == NULL) { + continue; + } SStreamObj *pStream = NULL; code = mndGetStreamObj(pMnode, pInfo->streamId, &pStream); diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index 7dfe88fe85..a0c56df49c 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -365,6 +365,7 @@ int32_t doOpenSortOperator(SOperatorInfo* pOperator) { ps->onlyRef = true; code = tsortAddSource(pInfo->pSortHandle, ps); if (code) { + taosMemoryFree(ps); return code; } diff --git a/source/util/src/tcompression.c b/source/util/src/tcompression.c index 8402d2a658..bec9a5797d 100644 --- a/source/util/src/tcompression.c +++ b/source/util/src/tcompression.c @@ -252,7 +252,7 @@ int32_t l2ComressInitImpl_xz(char *lossyColumns, float fPrecision, double dPreci } int32_t l2CompressImpl_xz(const char *const input, const int32_t inputSize, char *const output, int32_t outputSize, const char type, int8_t lvl) { - size_t len = FL2_compress(output + 1, outputSize - 1, input, inputSize, lvl); + size_t len = 0;//FL2_compress(output + 1, outputSize - 1, input, inputSize, lvl); if (len > inputSize) { output[0] = 0; memcpy(output + 1, input, inputSize); @@ -264,7 +264,7 @@ int32_t l2CompressImpl_xz(const char *const input, const int32_t inputSize, char int32_t l2DecompressImpl_xz(const char *const input, const int32_t compressedSize, char *const output, int32_t outputSize, const char type) { if (input[0] == 1) { - return FL2_decompress(output, outputSize, input + 1, compressedSize - 1); + return 0;//FL2_decompress(output, outputSize, input + 1, compressedSize - 1); } else if (input[0] == 0) { memcpy(output, input + 1, compressedSize - 1); return compressedSize - 1; From fcaaf0609ae0c1abd3905553f8be86a8917d891d Mon Sep 17 00:00:00 2001 From: Ping Xiao Date: Thu, 1 Aug 2024 17:49:29 +0800 Subject: [PATCH 22/24] fix TD-31182 --- packaging/rpm/tdengine.spec | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packaging/rpm/tdengine.spec b/packaging/rpm/tdengine.spec index 000a82b6b4..3e23e29a40 100644 --- a/packaging/rpm/tdengine.spec +++ b/packaging/rpm/tdengine.spec @@ -80,8 +80,8 @@ if [ -f %{_compiledir}/../../../explorer/target/taos-explorer.service ]; then cp %{_compiledir}/../../../explorer/target/taos-explorer.service %{buildroot}%{homepath}/cfg ||: fi -if [ -f %{_compiledir}/../../../explorer/server/example/explorer.toml ]; then - cp %{_compiledir}/../../../explorer/server/example/explorer.toml %{buildroot}%{homepath}/cfg ||: +if [ -f %{_compiledir}/../../../explorer/server/examples/explorer.toml ]; then + cp %{_compiledir}/../../../explorer/server/examples/explorer.toml %{buildroot}%{homepath}/cfg ||: fi #cp %{_compiledir}/../packaging/rpm/taosd %{buildroot}%{homepath}/init.d From e4ccf44302ccd5526d861c5793b395f349cd4924 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 1 Aug 2024 18:35:18 +0800 Subject: [PATCH 23/24] fix(util): fix error. --- source/util/src/tcompression.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/util/src/tcompression.c b/source/util/src/tcompression.c index bec9a5797d..8402d2a658 100644 --- a/source/util/src/tcompression.c +++ b/source/util/src/tcompression.c @@ -252,7 +252,7 @@ int32_t l2ComressInitImpl_xz(char *lossyColumns, float fPrecision, double dPreci } int32_t l2CompressImpl_xz(const char *const input, const int32_t inputSize, char *const output, int32_t outputSize, const char type, int8_t lvl) { - size_t len = 0;//FL2_compress(output + 1, outputSize - 1, input, inputSize, lvl); + size_t len = FL2_compress(output + 1, outputSize - 1, input, inputSize, lvl); if (len > inputSize) { output[0] = 0; memcpy(output + 1, input, inputSize); @@ -264,7 +264,7 @@ int32_t l2CompressImpl_xz(const char *const input, const int32_t inputSize, char int32_t l2DecompressImpl_xz(const char *const input, const int32_t compressedSize, char *const output, int32_t outputSize, const char type) { if (input[0] == 1) { - return 0;//FL2_decompress(output, outputSize, input + 1, compressedSize - 1); + return FL2_decompress(output, outputSize, input + 1, compressedSize - 1); } else if (input[0] == 0) { memcpy(output, input + 1, compressedSize - 1); return compressedSize - 1; From 106f3eabfb979548fec78cd1f44116a3a184c7e5 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Wed, 31 Jul 2024 15:17:23 +0800 Subject: [PATCH 24/24] common/cos: make 503 retry with 1~3s --- source/common/src/cos.c | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/source/common/src/cos.c b/source/common/src/cos.c index 96309ece86..335c654acd 100644 --- a/source/common/src/cos.c +++ b/source/common/src/cos.c @@ -1178,6 +1178,13 @@ int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size, &getObjectDataCallback}; TS3SizeCBD cbd = {0}; + int retryCount = 0; + static int maxRetryCount = 5; + static int minRetryInterval = 1000; // ms + static int maxRetryInterval = 3000; // ms + +_retry: + (void)memset(&cbd, 0, sizeof(cbd)); cbd.content_length = size; cbd.buf_pos = 0; do { @@ -1185,6 +1192,11 @@ int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size, } while (S3_status_is_retryable(cbd.status) && should_retry()); if (cbd.status != S3StatusOK) { + if (S3StatusErrorSlowDown == cbd.status && retryCount++ < maxRetryCount) { + taosMsleep(taosRand() % (maxRetryInterval - minRetryInterval + 1) + minRetryInterval); + uInfo("%s: %d/%s(%s) retry get object", __func__, cbd.status, S3_get_status_name(cbd.status), cbd.err_msg); + goto _retry; + } uError("%s: %d/%s(%s)", __func__, cbd.status, S3_get_status_name(cbd.status), cbd.err_msg); TAOS_RETURN(TAOS_SYSTEM_ERROR(EIO));