Merge branch '3.0' into fix/create_tb

This commit is contained in:
Haojun Liao 2024-08-05 09:31:45 +08:00
commit 2731ffe2c6
31 changed files with 432 additions and 256 deletions

View File

@ -2811,6 +2811,9 @@ enum {
TOPIC_SUB_TYPE__COLUMN, TOPIC_SUB_TYPE__COLUMN,
}; };
#define DEFAULT_MAX_POLL_INTERVAL 3000000
#define DEFAULT_SESSION_TIMEOUT 12000
typedef struct { typedef struct {
char name[TSDB_TOPIC_FNAME_LEN]; // accout.topic char name[TSDB_TOPIC_FNAME_LEN]; // accout.topic
int8_t igExists; int8_t igExists;
@ -2833,7 +2836,7 @@ typedef struct {
typedef struct { typedef struct {
int64_t consumerId; int64_t consumerId;
char cgroup[TSDB_CGROUP_LEN]; char cgroup[TSDB_CGROUP_LEN];
char clientId[256]; char clientId[TSDB_CLIENT_ID_LEN];
SArray* topicNames; // SArray<char**> SArray* topicNames; // SArray<char**>
int8_t withTbName; int8_t withTbName;
@ -2842,6 +2845,8 @@ typedef struct {
int8_t resetOffsetCfg; int8_t resetOffsetCfg;
int8_t enableReplay; int8_t enableReplay;
int8_t enableBatchMeta; int8_t enableBatchMeta;
int32_t sessionTimeoutMs;
int32_t maxPollIntervalMs;
} SCMSubscribeReq; } SCMSubscribeReq;
static FORCE_INLINE int32_t tSerializeSCMSubscribeReq(void** buf, const SCMSubscribeReq* pReq) { static FORCE_INLINE int32_t tSerializeSCMSubscribeReq(void** buf, const SCMSubscribeReq* pReq) {
@ -2863,11 +2868,14 @@ static FORCE_INLINE int32_t tSerializeSCMSubscribeReq(void** buf, const SCMSubsc
tlen += taosEncodeFixedI8(buf, pReq->resetOffsetCfg); tlen += taosEncodeFixedI8(buf, pReq->resetOffsetCfg);
tlen += taosEncodeFixedI8(buf, pReq->enableReplay); tlen += taosEncodeFixedI8(buf, pReq->enableReplay);
tlen += taosEncodeFixedI8(buf, pReq->enableBatchMeta); tlen += taosEncodeFixedI8(buf, pReq->enableBatchMeta);
tlen += taosEncodeFixedI32(buf, pReq->sessionTimeoutMs);
tlen += taosEncodeFixedI32(buf, pReq->maxPollIntervalMs);
return tlen; return tlen;
} }
static FORCE_INLINE int32_t tDeserializeSCMSubscribeReq(void* buf, SCMSubscribeReq* pReq) { static FORCE_INLINE int32_t tDeserializeSCMSubscribeReq(void* buf, SCMSubscribeReq* pReq, int32_t len) {
void* start = buf;
buf = taosDecodeFixedI64(buf, &pReq->consumerId); buf = taosDecodeFixedI64(buf, &pReq->consumerId);
buf = taosDecodeStringTo(buf, pReq->cgroup); buf = taosDecodeStringTo(buf, pReq->cgroup);
buf = taosDecodeStringTo(buf, pReq->clientId); buf = taosDecodeStringTo(buf, pReq->clientId);
@ -2893,6 +2901,14 @@ static FORCE_INLINE int32_t tDeserializeSCMSubscribeReq(void* buf, SCMSubscribeR
buf = taosDecodeFixedI8(buf, &pReq->resetOffsetCfg); buf = taosDecodeFixedI8(buf, &pReq->resetOffsetCfg);
buf = taosDecodeFixedI8(buf, &pReq->enableReplay); buf = taosDecodeFixedI8(buf, &pReq->enableReplay);
buf = taosDecodeFixedI8(buf, &pReq->enableBatchMeta); buf = taosDecodeFixedI8(buf, &pReq->enableBatchMeta);
if ((char*)buf - (char*)start < len) {
buf = taosDecodeFixedI32(buf, &pReq->sessionTimeoutMs);
buf = taosDecodeFixedI32(buf, &pReq->maxPollIntervalMs);
} else {
pReq->sessionTimeoutMs = DEFAULT_SESSION_TIMEOUT;
pReq->maxPollIntervalMs = DEFAULT_MAX_POLL_INTERVAL;
}
return 0; return 0;
} }
@ -4120,6 +4136,7 @@ typedef struct {
int64_t consumerId; int64_t consumerId;
int32_t epoch; int32_t epoch;
SArray* topics; SArray* topics;
int8_t pollFlag;
} SMqHbReq; } SMqHbReq;
typedef struct { typedef struct {

View File

@ -221,6 +221,8 @@ typedef enum ELogicConditionType {
#define TSDB_TABLE_NAME_LEN 193 // it is a null-terminated string #define TSDB_TABLE_NAME_LEN 193 // it is a null-terminated string
#define TSDB_TOPIC_NAME_LEN 193 // it is a null-terminated string #define TSDB_TOPIC_NAME_LEN 193 // it is a null-terminated string
#define TSDB_CGROUP_LEN 193 // it is a null-terminated string #define TSDB_CGROUP_LEN 193 // it is a null-terminated string
#define TSDB_CLIENT_ID_LEN 256 // it is a null-terminated string
#define TSDB_CONSUMER_ID_LEN 32 // it is a null-terminated string
#define TSDB_OFFSET_LEN 64 // it is a null-terminated string #define TSDB_OFFSET_LEN 64 // it is a null-terminated string
#define TSDB_USER_CGROUP_LEN (TSDB_USER_LEN + TSDB_CGROUP_LEN) // it is a null-terminated string #define TSDB_USER_CGROUP_LEN (TSDB_USER_LEN + TSDB_CGROUP_LEN) // it is a null-terminated string
#define TSDB_STREAM_NAME_LEN 193 // it is a null-terminated string #define TSDB_STREAM_NAME_LEN 193 // it is a null-terminated string

View File

@ -80,8 +80,8 @@ if [ -f %{_compiledir}/../../../explorer/target/taos-explorer.service ]; then
cp %{_compiledir}/../../../explorer/target/taos-explorer.service %{buildroot}%{homepath}/cfg ||: cp %{_compiledir}/../../../explorer/target/taos-explorer.service %{buildroot}%{homepath}/cfg ||:
fi fi
if [ -f %{_compiledir}/../../../explorer/server/example/explorer.toml ]; then if [ -f %{_compiledir}/../../../explorer/server/examples/explorer.toml ]; then
cp %{_compiledir}/../../../explorer/server/example/explorer.toml %{buildroot}%{homepath}/cfg ||: cp %{_compiledir}/../../../explorer/server/examples/explorer.toml %{buildroot}%{homepath}/cfg ||:
fi fi
#cp %{_compiledir}/../packaging/rpm/taosd %{buildroot}%{homepath}/init.d #cp %{_compiledir}/../packaging/rpm/taosd %{buildroot}%{homepath}/init.d

View File

@ -2920,8 +2920,10 @@ void taosAsyncFetchImpl(SRequestObj* pRequest, __taos_async_fn_t fp, void* param
.cbParam = pRequest, .cbParam = pRequest,
}; };
if (TSDB_CODE_SUCCESS != schedulerFetchRows(pRequest->body.queryJob, &req)) { int32_t code = schedulerFetchRows(pRequest->body.queryJob, &req);
tscError("0x%" PRIx64 " failed to schedule fetch rows", pRequest->self); if (TSDB_CODE_SUCCESS != code) {
tscError("0x%" PRIx64 " failed to schedule fetch rows", pRequest->requestId);
pRequest->body.fetchFp(param, pRequest, code);
} }
} }

View File

@ -12,13 +12,13 @@
SRWLatch monitorLock; SRWLatch monitorLock;
void* monitorTimer; void* monitorTimer;
SHashObj* monitorCounterHash; SHashObj* monitorCounterHash;
int32_t slowLogFlag = -1; int32_t monitorFlag = 0;
int32_t monitorFlag = -1;
int32_t quitCnt = 0; int32_t quitCnt = 0;
tsem2_t monitorSem; tsem2_t monitorSem;
STaosQueue* monitorQueue; STaosQueue* monitorQueue;
SHashObj* monitorSlowLogHash; SHashObj* monitorSlowLogHash;
char tmpSlowLogPath[PATH_MAX] = {0}; char tmpSlowLogPath[PATH_MAX] = {0};
TdThread monitorThread;
static int32_t getSlowLogTmpDir(char* tmpPath, int32_t size) { static int32_t getSlowLogTmpDir(char* tmpPath, int32_t size) {
int ret = snprintf(tmpPath, size, "%s/tdengine_slow_log/", tsTempDir); int ret = snprintf(tmpPath, size, "%s/tdengine_slow_log/", tsTempDir);
@ -113,11 +113,11 @@ static int32_t monitorReportAsyncCB(void* param, SDataBuf* pMsg, int32_t code) {
tscError("failed to send slow log:%s, clusterId:%" PRIx64, p->data, p->clusterId); tscError("failed to send slow log:%s, clusterId:%" PRIx64, p->data, p->clusterId);
} }
MonitorSlowLogData tmp = {.clusterId = p->clusterId, MonitorSlowLogData tmp = {.clusterId = p->clusterId,
.type = p->type, .type = p->type,
.fileName = p->fileName, .fileName = p->fileName,
.pFile = p->pFile, .pFile = p->pFile,
.offset = p->offset, .offset = p->offset,
.data = NULL}; .data = NULL};
if (monitorPutData2MonitorQueue(tmp) == 0) { if (monitorPutData2MonitorQueue(tmp) == 0) {
p->fileName = NULL; p->fileName = NULL;
} }
@ -164,7 +164,7 @@ static int32_t sendReport(void* pTransporter, SEpSet* epSet, char* pCont, MONITO
int64_t transporterId = 0; int64_t transporterId = 0;
return asyncSendMsgToServer(pTransporter, epSet, &transporterId, pInfo); return asyncSendMsgToServer(pTransporter, epSet, &transporterId, pInfo);
FAILED: FAILED:
monitorFreeSlowLogDataEx(param); monitorFreeSlowLogDataEx(param);
return TAOS_GET_TERRNO(TSDB_CODE_TSC_INTERNAL_ERROR); return TAOS_GET_TERRNO(TSDB_CODE_TSC_INTERNAL_ERROR);
} }
@ -276,12 +276,10 @@ void monitorCreateClient(int64_t clusterId) {
tscInfo("[monitor] monitorCreateClient for %" PRIx64 "finished %p.", clusterId, pMonitor); tscInfo("[monitor] monitorCreateClient for %" PRIx64 "finished %p.", clusterId, pMonitor);
} }
taosWUnLockLatch(&monitorLock); taosWUnLockLatch(&monitorLock);
if (-1 != atomic_val_compare_exchange_32(&monitorFlag, -1, 0)) {
tscDebug("[monitor] monitorFlag already is 0");
}
return; return;
fail: fail:
destroyMonitorClient(&pMonitor); destroyMonitorClient(&pMonitor);
taosWUnLockLatch(&monitorLock); taosWUnLockLatch(&monitorLock);
} }
@ -301,7 +299,7 @@ void monitorCreateClientCounter(int64_t clusterId, const char* name, const char*
tscError("failed to add metric to collector"); tscError("failed to add metric to collector");
(void)taos_counter_destroy(newCounter); (void)taos_counter_destroy(newCounter);
goto end; goto end;
} }
if (taosHashPut(pMonitor->counters, name, strlen(name), &newCounter, POINTER_BYTES) != 0) { if (taosHashPut(pMonitor->counters, name, strlen(name), &newCounter, POINTER_BYTES) != 0) {
tscError("failed to put counter to monitor"); tscError("failed to put counter to monitor");
(void)taos_counter_destroy(newCounter); (void)taos_counter_destroy(newCounter);
@ -310,7 +308,7 @@ void monitorCreateClientCounter(int64_t clusterId, const char* name, const char*
tscInfo("[monitor] monitorCreateClientCounter %" PRIx64 "(%p):%s : %p.", pMonitor->clusterId, pMonitor, name, tscInfo("[monitor] monitorCreateClientCounter %" PRIx64 "(%p):%s : %p.", pMonitor->clusterId, pMonitor, name,
newCounter); newCounter);
end: end:
taosWUnLockLatch(&monitorLock); taosWUnLockLatch(&monitorLock);
} }
@ -339,7 +337,7 @@ void monitorCounterInc(int64_t clusterId, const char* counterName, const char**
} }
tscDebug("[monitor] monitorCounterInc %" PRIx64 "(%p):%s", pMonitor->clusterId, pMonitor, counterName); tscDebug("[monitor] monitorCounterInc %" PRIx64 "(%p):%s", pMonitor->clusterId, pMonitor, counterName);
end: end:
taosWUnLockLatch(&monitorLock); taosWUnLockLatch(&monitorLock);
} }
@ -348,8 +346,6 @@ const char* monitorResultStr(SQL_RESULT_CODE code) {
return result_state[code]; return result_state[code];
} }
static void monitorThreadFuncUnexpectedStopped(void) { atomic_store_32(&slowLogFlag, -1); }
static void monitorWriteSlowLog2File(MonitorSlowLogData* slowLogData, char* tmpPath) { static void monitorWriteSlowLog2File(MonitorSlowLogData* slowLogData, char* tmpPath) {
TdFilePtr pFile = NULL; TdFilePtr pFile = NULL;
void* tmp = taosHashGet(monitorSlowLogHash, &slowLogData->clusterId, LONG_BYTES); void* tmp = taosHashGet(monitorSlowLogHash, &slowLogData->clusterId, LONG_BYTES);
@ -693,20 +689,10 @@ static void monitorSendAllSlowLogFromTempDir(int64_t clusterId) {
static void* monitorThreadFunc(void* param) { static void* monitorThreadFunc(void* param) {
setThreadName("client-monitor-slowlog"); setThreadName("client-monitor-slowlog");
#ifdef WINDOWS
if (taosCheckCurrentInDll()) {
atexit(monitorThreadFuncUnexpectedStopped);
}
#endif
if (-1 != atomic_val_compare_exchange_32(&slowLogFlag, -1, 0)) {
return NULL;
}
tscDebug("monitorThreadFunc start"); tscDebug("monitorThreadFunc start");
int64_t quitTime = 0; int64_t quitTime = 0;
while (1) { while (1) {
if (atomic_load_32(&slowLogFlag) > 0) { if (atomic_load_32(&monitorFlag) == 1) {
if (quitCnt == 0) { if (quitCnt == 0) {
monitorSendAllSlowLogAtQuit(); monitorSendAllSlowLogAtQuit();
if (quitCnt == 0) { if (quitCnt == 0) {
@ -752,7 +738,6 @@ static void* monitorThreadFunc(void* param) {
} }
(void)tsem2_timewait(&monitorSem, 100); (void)tsem2_timewait(&monitorSem, 100);
} }
atomic_store_32(&slowLogFlag, -2);
return NULL; return NULL;
} }
@ -767,7 +752,6 @@ static int32_t tscMonitortInit() {
return TSDB_CODE_TSC_INTERNAL_ERROR; return TSDB_CODE_TSC_INTERNAL_ERROR;
} }
TdThread monitorThread;
if (taosThreadCreate(&monitorThread, &thAttr, monitorThreadFunc, NULL) != 0) { if (taosThreadCreate(&monitorThread, &thAttr, monitorThreadFunc, NULL) != 0) {
tscError("failed to create monitor thread since %s", strerror(errno)); tscError("failed to create monitor thread since %s", strerror(errno));
return TSDB_CODE_TSC_INTERNAL_ERROR; return TSDB_CODE_TSC_INTERNAL_ERROR;
@ -778,13 +762,9 @@ static int32_t tscMonitortInit() {
} }
static void tscMonitorStop() { static void tscMonitorStop() {
if (atomic_val_compare_exchange_32(&slowLogFlag, 0, 1)) { if (taosCheckPthreadValid(monitorThread)) {
tscDebug("monitor thread already stopped"); (void)taosThreadJoin(monitorThread, NULL);
return; (void)taosThreadClear(&monitorThread);
}
while (atomic_load_32(&slowLogFlag) > 0) {
taosMsleep(100);
} }
} }
@ -842,10 +822,7 @@ int32_t monitorInit() {
void monitorClose() { void monitorClose() {
tscInfo("[monitor] tscMonitor close"); tscInfo("[monitor] tscMonitor close");
taosWLockLatch(&monitorLock); taosWLockLatch(&monitorLock);
atomic_store_32(&monitorFlag, 1);
if (atomic_val_compare_exchange_32(&monitorFlag, 0, 1)) {
tscDebug("[monitor] monitorFlag is not 0");
}
tscMonitorStop(); tscMonitorStop();
sendAllCounter(); sendAllCounter();
taosHashCleanup(monitorCounterHash); taosHashCleanup(monitorCounterHash);
@ -860,7 +837,7 @@ int32_t monitorPutData2MonitorQueue(MonitorSlowLogData data) {
int32_t code = 0; int32_t code = 0;
MonitorSlowLogData* slowLogData = NULL; MonitorSlowLogData* slowLogData = NULL;
if (atomic_load_32(&slowLogFlag) == -2) { if (atomic_load_32(&monitorFlag) == 1) {
tscError("[monitor] slow log thread is exiting"); tscError("[monitor] slow log thread is exiting");
return -1; return -1;
} }

View File

@ -37,6 +37,7 @@ struct SMqMgmt {
static TdThreadOnce tmqInit = PTHREAD_ONCE_INIT; // initialize only once static TdThreadOnce tmqInit = PTHREAD_ONCE_INIT; // initialize only once
volatile int32_t tmqInitRes = 0; // initialize rsp code volatile int32_t tmqInitRes = 0; // initialize rsp code
static struct SMqMgmt tmqMgmt = {0}; static struct SMqMgmt tmqMgmt = {0};
static int8_t pollFlag = 0;
typedef struct { typedef struct {
int32_t code; int32_t code;
@ -56,7 +57,7 @@ struct tmq_list_t {
}; };
struct tmq_conf_t { struct tmq_conf_t {
char clientId[256]; char clientId[TSDB_CLIENT_ID_LEN];
char groupId[TSDB_CGROUP_LEN]; char groupId[TSDB_CGROUP_LEN];
int8_t autoCommit; int8_t autoCommit;
int8_t resetOffset; int8_t resetOffset;
@ -66,6 +67,9 @@ struct tmq_conf_t {
int8_t sourceExcluded; // do not consume, bit int8_t sourceExcluded; // do not consume, bit
uint16_t port; uint16_t port;
int32_t autoCommitInterval; int32_t autoCommitInterval;
int32_t sessionTimeoutMs;
int32_t heartBeatIntervalMs;
int32_t maxPollIntervalMs;
char* ip; char* ip;
char* user; char* user;
char* pass; char* pass;
@ -77,15 +81,18 @@ struct tmq_conf_t {
struct tmq_t { struct tmq_t {
int64_t refId; int64_t refId;
char groupId[TSDB_CGROUP_LEN]; char groupId[TSDB_CGROUP_LEN];
char clientId[256]; char clientId[TSDB_CLIENT_ID_LEN];
int8_t withTbName; int8_t withTbName;
int8_t useSnapshot; int8_t useSnapshot;
int8_t autoCommit; int8_t autoCommit;
int32_t autoCommitInterval; int32_t autoCommitInterval;
int32_t sessionTimeoutMs;
int32_t heartBeatIntervalMs;
int32_t maxPollIntervalMs;
int8_t resetOffsetCfg; int8_t resetOffsetCfg;
int8_t replayEnable; int8_t replayEnable;
int8_t sourceExcluded; // do not consume, bit int8_t sourceExcluded; // do not consume, bit
uint64_t consumerId; int64_t consumerId;
tmq_commit_cb* commitCb; tmq_commit_cb* commitCb;
void* commitCbUserParam; void* commitCbUserParam;
int8_t enableBatchMeta; int8_t enableBatchMeta;
@ -240,7 +247,7 @@ typedef struct {
SMqCommitCbParamSet* params; SMqCommitCbParamSet* params;
char topicName[TSDB_TOPIC_FNAME_LEN]; char topicName[TSDB_TOPIC_FNAME_LEN];
int32_t vgId; int32_t vgId;
tmq_t* pTmq; int64_t consumerId;
} SMqCommitCbParam; } SMqCommitCbParam;
typedef struct SSyncCommitInfo { typedef struct SSyncCommitInfo {
@ -266,6 +273,9 @@ tmq_conf_t* tmq_conf_new() {
conf->autoCommitInterval = DEFAULT_AUTO_COMMIT_INTERVAL; conf->autoCommitInterval = DEFAULT_AUTO_COMMIT_INTERVAL;
conf->resetOffset = TMQ_OFFSET__RESET_LATEST; conf->resetOffset = TMQ_OFFSET__RESET_LATEST;
conf->enableBatchMeta = false; conf->enableBatchMeta = false;
conf->heartBeatIntervalMs = DEFAULT_HEARTBEAT_INTERVAL;
conf->maxPollIntervalMs = DEFAULT_MAX_POLL_INTERVAL;
conf->sessionTimeoutMs = DEFAULT_SESSION_TIMEOUT;
return conf; return conf;
} }
@ -295,7 +305,7 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value
} }
if (strcasecmp(key, "client.id") == 0) { if (strcasecmp(key, "client.id") == 0) {
tstrncpy(conf->clientId, value, 256); tstrncpy(conf->clientId, value, TSDB_CLIENT_ID_LEN);
return TMQ_CONF_OK; return TMQ_CONF_OK;
} }
@ -312,7 +322,38 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value
} }
if (strcasecmp(key, "auto.commit.interval.ms") == 0) { if (strcasecmp(key, "auto.commit.interval.ms") == 0) {
conf->autoCommitInterval = taosStr2int64(value); int64_t tmp = taosStr2int64(value);
if (tmp < 0 || EINVAL == errno || ERANGE == errno) {
return TMQ_CONF_INVALID;
}
conf->autoCommitInterval = (tmp > INT32_MAX ? INT32_MAX : tmp);
return TMQ_CONF_OK;
}
if (strcasecmp(key, "session.timeout.ms") == 0) {
int64_t tmp = taosStr2int64(value);
if (tmp < 6000 || tmp > 1800000){
return TMQ_CONF_INVALID;
}
conf->sessionTimeoutMs = tmp;
return TMQ_CONF_OK;
}
if (strcasecmp(key, "heartbeat.interval.ms") == 0) {
int64_t tmp = taosStr2int64(value);
if (tmp < 1000 || tmp >= conf->sessionTimeoutMs){
return TMQ_CONF_INVALID;
}
conf->heartBeatIntervalMs = tmp;
return TMQ_CONF_OK;
}
if (strcasecmp(key, "max.poll.interval.ms") == 0) {
int64_t tmp = taosStr2int64(value);
if (tmp < 1000 || tmp > INT32_MAX){
return TMQ_CONF_INVALID;
}
conf->maxPollIntervalMs = tmp;
return TMQ_CONF_OK; return TMQ_CONF_OK;
} }
@ -371,7 +412,12 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value
} }
if (strcasecmp(key, "td.connect.port") == 0) { if (strcasecmp(key, "td.connect.port") == 0) {
conf->port = taosStr2int64(value); int64_t tmp = taosStr2int64(value);
if (tmp <= 0 || tmp > 65535) {
return TMQ_CONF_INVALID;
}
conf->port = tmp;
return TMQ_CONF_OK; return TMQ_CONF_OK;
} }
@ -439,7 +485,7 @@ static int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) {
taosMemoryFree(pBuf->pData); taosMemoryFree(pBuf->pData);
taosMemoryFree(pBuf->pEpSet); taosMemoryFree(pBuf->pEpSet);
return commitRspCountDown(pParamSet, pParam->pTmq->consumerId, pParam->topicName, pParam->vgId); return commitRspCountDown(pParamSet, pParam->consumerId, pParam->topicName, pParam->vgId);
} }
static int32_t doSendCommitMsg(tmq_t* tmq, int32_t vgId, SEpSet* epSet, STqOffsetVal* offset, const char* pTopicName, static int32_t doSendCommitMsg(tmq_t* tmq, int32_t vgId, SEpSet* epSet, STqOffsetVal* offset, const char* pTopicName,
@ -483,7 +529,7 @@ static int32_t doSendCommitMsg(tmq_t* tmq, int32_t vgId, SEpSet* epSet, STqOffse
pParam->params = pParamSet; pParam->params = pParamSet;
pParam->vgId = vgId; pParam->vgId = vgId;
pParam->pTmq = tmq; pParam->consumerId = tmq->consumerId;
tstrncpy(pParam->topicName, pTopicName, tListLen(pParam->topicName)); tstrncpy(pParam->topicName, pTopicName, tListLen(pParam->topicName));
@ -825,6 +871,7 @@ void tmqSendHbReq(void* param, void* tmrId) {
SMqHbReq req = {0}; SMqHbReq req = {0};
req.consumerId = tmq->consumerId; req.consumerId = tmq->consumerId;
req.epoch = tmq->epoch; req.epoch = tmq->epoch;
req.pollFlag = atomic_load_8(&pollFlag);
req.topics = taosArrayInit(taosArrayGetSize(tmq->clientTopics), sizeof(TopicOffsetRows)); req.topics = taosArrayInit(taosArrayGetSize(tmq->clientTopics), sizeof(TopicOffsetRows));
if (req.topics == NULL){ if (req.topics == NULL){
return; return;
@ -906,10 +953,11 @@ void tmqSendHbReq(void* param, void* tmrId) {
tscError("tmqSendHbReq asyncSendMsgToServer failed"); tscError("tmqSendHbReq asyncSendMsgToServer failed");
} }
atomic_val_compare_exchange_8(&pollFlag, 1, 0);
OVER: OVER:
tDestroySMqHbReq(&req); tDestroySMqHbReq(&req);
if (tmrId != NULL) { if(tmrId != NULL){
(void)taosTmrReset(tmqSendHbReq, DEFAULT_HEARTBEAT_INTERVAL, param, tmqMgmt.timer, &tmq->hbLiveTimer); (void)taosTmrReset(tmqSendHbReq, tmq->heartBeatIntervalMs, param, tmqMgmt.timer, &tmq->hbLiveTimer);
} }
(void)taosReleaseRef(tmqMgmt.rsetId, refId); (void)taosReleaseRef(tmqMgmt.rsetId, refId);
} }
@ -1208,6 +1256,9 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
pTmq->useSnapshot = conf->snapEnable; pTmq->useSnapshot = conf->snapEnable;
pTmq->autoCommit = conf->autoCommit; pTmq->autoCommit = conf->autoCommit;
pTmq->autoCommitInterval = conf->autoCommitInterval; pTmq->autoCommitInterval = conf->autoCommitInterval;
pTmq->sessionTimeoutMs = conf->sessionTimeoutMs;
pTmq->heartBeatIntervalMs = conf->heartBeatIntervalMs;
pTmq->maxPollIntervalMs = conf->maxPollIntervalMs;
pTmq->commitCb = conf->commitCb; pTmq->commitCb = conf->commitCb;
pTmq->commitCbUserParam = conf->commitCbUserParam; pTmq->commitCbUserParam = conf->commitCbUserParam;
pTmq->resetOffsetCfg = conf->resetOffset; pTmq->resetOffsetCfg = conf->resetOffset;
@ -1246,7 +1297,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
goto _failed; goto _failed;
} }
pTmq->hbLiveTimer = taosTmrStart(tmqSendHbReq, DEFAULT_HEARTBEAT_INTERVAL, (void*)pTmq->refId, tmqMgmt.timer); pTmq->hbLiveTimer = taosTmrStart(tmqSendHbReq, pTmq->heartBeatIntervalMs, (void*)pTmq->refId, tmqMgmt.timer);
if (pTmq->hbLiveTimer == NULL) { if (pTmq->hbLiveTimer == NULL) {
SET_ERROR_MSG_TMQ("start heartbeat timer failed") SET_ERROR_MSG_TMQ("start heartbeat timer failed")
goto _failed; goto _failed;
@ -1279,7 +1330,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
tscInfo("consumer:0x%" PRIx64 " cgroup:%s, subscribe %d topics", tmq->consumerId, tmq->groupId, sz); tscInfo("consumer:0x%" PRIx64 " cgroup:%s, subscribe %d topics", tmq->consumerId, tmq->groupId, sz);
req.consumerId = tmq->consumerId; req.consumerId = tmq->consumerId;
tstrncpy(req.clientId, tmq->clientId, 256); tstrncpy(req.clientId, tmq->clientId, TSDB_CLIENT_ID_LEN);
tstrncpy(req.cgroup, tmq->groupId, TSDB_CGROUP_LEN); tstrncpy(req.cgroup, tmq->groupId, TSDB_CGROUP_LEN);
req.topicNames = taosArrayInit(sz, sizeof(void*)); req.topicNames = taosArrayInit(sz, sizeof(void*));
@ -1291,6 +1342,8 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
req.withTbName = tmq->withTbName; req.withTbName = tmq->withTbName;
req.autoCommit = tmq->autoCommit; req.autoCommit = tmq->autoCommit;
req.autoCommitInterval = tmq->autoCommitInterval; req.autoCommitInterval = tmq->autoCommitInterval;
req.sessionTimeoutMs = tmq->sessionTimeoutMs;
req.maxPollIntervalMs = tmq->maxPollIntervalMs;
req.resetOffsetCfg = tmq->resetOffsetCfg; req.resetOffsetCfg = tmq->resetOffsetCfg;
req.enableReplay = tmq->replayEnable; req.enableReplay = tmq->replayEnable;
req.enableBatchMeta = tmq->enableBatchMeta; req.enableBatchMeta = tmq->enableBatchMeta;
@ -1452,22 +1505,22 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
tmq_t* tmq = NULL; tmq_t* tmq = NULL;
SMqPollCbParam* pParam = (SMqPollCbParam*)param; SMqPollCbParam* pParam = (SMqPollCbParam*)param;
if (pParam == NULL || pMsg == NULL) { if (pParam == NULL || pMsg == NULL) {
goto FAIL2; return TSDB_CODE_TSC_INTERNAL_ERROR;
} }
int64_t refId = pParam->refId; int64_t refId = pParam->refId;
int32_t vgId = pParam->vgId; int32_t vgId = pParam->vgId;
uint64_t requestId = pParam->requestId; uint64_t requestId = pParam->requestId;
tmq = taosAcquireRef(tmqMgmt.rsetId, refId); tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
if (tmq == NULL) { if (tmq == NULL) {
code = TSDB_CODE_TMQ_CONSUMER_CLOSED; return TSDB_CODE_TMQ_CONSUMER_CLOSED;
goto FAIL2;
} }
SMqPollRspWrapper* pRspWrapper = NULL; SMqPollRspWrapper* pRspWrapper = NULL;
code = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM, 0, (void**)&pRspWrapper); int32_t ret = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM, 0, (void**)&pRspWrapper);
if (code) { if (ret) {
code = ret;
tscWarn("consumer:0x%" PRIx64 " msg discard from vgId:%d, since out of memory", tmq->consumerId, vgId); tscWarn("consumer:0x%" PRIx64 " msg discard from vgId:%d, since out of memory", tmq->consumerId, vgId);
goto FAIL1; goto END;
} }
if (code != 0) { if (code != 0) {
@ -1550,25 +1603,23 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
} }
END: END:
pRspWrapper->code = code; if (pRspWrapper){
pRspWrapper->vgId = vgId; pRspWrapper->code = code;
(void)strcpy(pRspWrapper->topicName, pParam->topicName); pRspWrapper->vgId = vgId;
code = taosWriteQitem(tmq->mqueue, pRspWrapper); (void)strcpy(pRspWrapper->topicName, pParam->topicName);
if(code != 0){ code = taosWriteQitem(tmq->mqueue, pRspWrapper);
tscError("consumer:0x%" PRIx64 " put poll res into mqueue failed, code:%d", tmq->consumerId, code); if(code != 0){
tscError("consumer:0x%" PRIx64 " put poll res into mqueue failed, code:%d", tmq->consumerId, code);
}
} }
int32_t total = taosQueueItemSize(tmq->mqueue); int32_t total = taosQueueItemSize(tmq->mqueue);
tscDebug("consumer:0x%" PRIx64 " put poll res into mqueue, type:%d, vgId:%d, total in queue:%d, reqId:0x%" PRIx64, tscDebug("consumer:0x%" PRIx64 " put poll res into mqueue, type:%d, vgId:%d, total in queue:%d, reqId:0x%" PRIx64,
tmq->consumerId, rspType, vgId, total, requestId); tmq->consumerId, rspType, vgId, total, requestId);
FAIL1:
(void)taosReleaseRef(tmqMgmt.rsetId, refId);
FAIL2:
if (tmq) (void)tsem2_post(&tmq->rspSem); if (tmq) (void)tsem2_post(&tmq->rspSem);
if (pMsg) taosMemoryFreeClear(pMsg->pData); if (pMsg) taosMemoryFreeClear(pMsg->pData);
if (pMsg) taosMemoryFreeClear(pMsg->pEpSet); if (pMsg) taosMemoryFreeClear(pMsg->pEpSet);
(void)taosReleaseRef(tmqMgmt.rsetId, refId);
return code; return code;
} }
@ -2343,6 +2394,8 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
} }
} }
atomic_val_compare_exchange_8(&pollFlag, 0, 1);
while (1) { while (1) {
tmqHandleAllDelayedTask(tmq); tmqHandleAllDelayedTask(tmq);

View File

@ -1178,6 +1178,13 @@ int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size,
&getObjectDataCallback}; &getObjectDataCallback};
TS3SizeCBD cbd = {0}; TS3SizeCBD cbd = {0};
int retryCount = 0;
static int maxRetryCount = 5;
static int minRetryInterval = 1000; // ms
static int maxRetryInterval = 3000; // ms
_retry:
(void)memset(&cbd, 0, sizeof(cbd));
cbd.content_length = size; cbd.content_length = size;
cbd.buf_pos = 0; cbd.buf_pos = 0;
do { do {
@ -1185,6 +1192,11 @@ int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size,
} while (S3_status_is_retryable(cbd.status) && should_retry()); } while (S3_status_is_retryable(cbd.status) && should_retry());
if (cbd.status != S3StatusOK) { if (cbd.status != S3StatusOK) {
if (S3StatusErrorSlowDown == cbd.status && retryCount++ < maxRetryCount) {
taosMsleep(taosRand() % (maxRetryInterval - minRetryInterval + 1) + minRetryInterval);
uInfo("%s: %d/%s(%s) retry get object", __func__, cbd.status, S3_get_status_name(cbd.status), cbd.err_msg);
goto _retry;
}
uError("%s: %d/%s(%s)", __func__, cbd.status, S3_get_status_name(cbd.status), cbd.err_msg); uError("%s: %d/%s(%s)", __func__, cbd.status, S3_get_status_name(cbd.status), cbd.err_msg);
TAOS_RETURN(TAOS_SYSTEM_ERROR(EIO)); TAOS_RETURN(TAOS_SYSTEM_ERROR(EIO));

View File

@ -482,16 +482,16 @@ static const SSysDbTableSchema connectionsSchema[] = {
static const SSysDbTableSchema consumerSchema[] = { static const SSysDbTableSchema consumerSchema[] = {
{.name = "consumer_id", .bytes = 32, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false}, {.name = "consumer_id", .bytes = TSDB_CONSUMER_ID_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
{.name = "consumer_group", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false}, {.name = "consumer_group", .bytes = TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
{.name = "client_id", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false}, {.name = "client_id", .bytes = TSDB_CLIENT_ID_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
{.name = "status", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false}, {.name = "status", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
{.name = "topics", .bytes = TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false}, {.name = "topics", .bytes = TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
/*{.name = "end_point", .bytes = TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},*/ /*{.name = "end_point", .bytes = TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},*/
{.name = "up_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false}, {.name = "up_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false},
{.name = "subscribe_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false}, {.name = "subscribe_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false},
{.name = "rebalance_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false}, {.name = "rebalance_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false},
{.name = "parameters", .bytes = 64 + TSDB_OFFSET_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false}, {.name = "parameters", .bytes = 128 + TSDB_OFFSET_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
}; };
static const SSysDbTableSchema offsetSchema[] = { static const SSysDbTableSchema offsetSchema[] = {

View File

@ -7091,6 +7091,7 @@ int32_t tSerializeSMqHbReq(void *buf, int32_t bufLen, SMqHbReq *pReq) {
} }
} }
if (tEncodeI8(&encoder, pReq->pollFlag) < 0) return -1;
tEndEncode(&encoder); tEndEncode(&encoder);
int32_t tlen = encoder.pos; int32_t tlen = encoder.pos;
@ -7130,6 +7131,9 @@ int32_t tDeserializeSMqHbReq(void *buf, int32_t bufLen, SMqHbReq *pReq) {
} }
} }
} }
if (!tDecodeIsEnd(&decoder)) {
if (tDecodeI8(&decoder, &pReq->pollFlag) < 0) return -1;
}
tEndDecode(&decoder); tEndDecode(&decoder);
tDecoderClear(&decoder); tDecoderClear(&decoder);

View File

@ -25,7 +25,7 @@ extern "C" {
enum { enum {
MQ_CONSUMER_STATUS_REBALANCE = 1, MQ_CONSUMER_STATUS_REBALANCE = 1,
MQ_CONSUMER_STATUS_READY, MQ_CONSUMER_STATUS_READY,
MQ_CONSUMER_STATUS_LOST, // MQ_CONSUMER_STATUS_LOST,
}; };
int32_t mndInitConsumer(SMnode *pMnode); int32_t mndInitConsumer(SMnode *pMnode);

View File

@ -596,11 +596,12 @@ typedef struct {
typedef struct { typedef struct {
int64_t consumerId; int64_t consumerId;
char cgroup[TSDB_CGROUP_LEN]; char cgroup[TSDB_CGROUP_LEN];
char clientId[256]; char clientId[TSDB_CLIENT_ID_LEN];
int8_t updateType; // used only for update int8_t updateType; // used only for update
int32_t epoch; int32_t epoch;
int32_t status; int32_t status;
int32_t hbStatus; // hbStatus is not applicable to serialization int32_t hbStatus; // hbStatus is not applicable to serialization
int32_t pollStatus; // pollStatus is not applicable to serialization
SRWLatch lock; // lock is used for topics update SRWLatch lock; // lock is used for topics update
SArray* currentTopics; // SArray<char*> SArray* currentTopics; // SArray<char*>
SArray* rebNewTopics; // SArray<char*> SArray* rebNewTopics; // SArray<char*>
@ -620,6 +621,8 @@ typedef struct {
int8_t autoCommit; int8_t autoCommit;
int32_t autoCommitInterval; int32_t autoCommitInterval;
int32_t resetOffsetCfg; int32_t resetOffsetCfg;
int32_t sessionTimeoutMs;
int32_t maxPollIntervalMs;
} SMqConsumerObj; } SMqConsumerObj;
int32_t tNewSMqConsumerObj(int64_t consumerId, char *cgroup, int8_t updateType, int32_t tNewSMqConsumerObj(int64_t consumerId, char *cgroup, int8_t updateType,

View File

@ -25,7 +25,7 @@
#include "tcompare.h" #include "tcompare.h"
#include "tname.h" #include "tname.h"
#define MND_CONSUMER_VER_NUMBER 2 #define MND_CONSUMER_VER_NUMBER 3
#define MND_CONSUMER_RESERVE_SIZE 64 #define MND_CONSUMER_RESERVE_SIZE 64
#define MND_MAX_GROUP_PER_TOPIC 100 #define MND_MAX_GROUP_PER_TOPIC 100
@ -55,7 +55,6 @@ int32_t mndInitConsumer(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_MND_TMQ_SUBSCRIBE, mndProcessSubscribeReq); mndSetMsgHandle(pMnode, TDMT_MND_TMQ_SUBSCRIBE, mndProcessSubscribeReq);
mndSetMsgHandle(pMnode, TDMT_MND_TMQ_HB, mndProcessMqHbReq); mndSetMsgHandle(pMnode, TDMT_MND_TMQ_HB, mndProcessMqHbReq);
mndSetMsgHandle(pMnode, TDMT_MND_TMQ_ASK_EP, mndProcessAskEpReq); mndSetMsgHandle(pMnode, TDMT_MND_TMQ_ASK_EP, mndProcessAskEpReq);
// mndSetMsgHandle(pMnode, TDMT_MND_TMQ_TIMER, mndProcessMqTimerMsg);
mndSetMsgHandle(pMnode, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, mndProcessConsumerClearMsg); mndSetMsgHandle(pMnode, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, mndProcessConsumerClearMsg);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONSUMERS, mndRetrieveConsumer); mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONSUMERS, mndRetrieveConsumer);
@ -238,11 +237,10 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
MND_TMQ_RETURN_CHECK(mndAcquireConsumer(pMnode, consumerId, &pConsumer)); MND_TMQ_RETURN_CHECK(mndAcquireConsumer(pMnode, consumerId, &pConsumer));
MND_TMQ_RETURN_CHECK(checkPrivilege(pMnode, pConsumer, &rsp, pMsg->info.conn.user)); MND_TMQ_RETURN_CHECK(checkPrivilege(pMnode, pConsumer, &rsp, pMsg->info.conn.user));
atomic_store_32(&pConsumer->hbStatus, 0); atomic_store_32(&pConsumer->hbStatus, 0);
int32_t status = atomic_load_32(&pConsumer->status); if (req.pollFlag == 1){
if (status == MQ_CONSUMER_STATUS_LOST) { atomic_store_32(&pConsumer->pollStatus, 0);
mInfo("try to recover consumer:0x%" PRIx64, consumerId);
MND_TMQ_RETURN_CHECK(mndSendConsumerMsg(pMnode, pConsumer->consumerId, TDMT_MND_TMQ_CONSUMER_RECOVER, &pMsg->info));
} }
storeOffsetRows(pMnode, &req, pConsumer); storeOffsetRows(pMnode, &req, pConsumer);
code = buildMqHbRsp(pMsg, &rsp); code = buildMqHbRsp(pMsg, &rsp);
@ -389,11 +387,9 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
code = TSDB_CODE_MND_CONSUMER_NOT_EXIST; code = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
goto END; goto END;
} }
atomic_store_32(&pConsumer->hbStatus, 0);
// 1. check consumer status
int32_t status = atomic_load_32(&pConsumer->status); int32_t status = atomic_load_32(&pConsumer->status);
if (status == MQ_CONSUMER_STATUS_LOST) {
MND_TMQ_RETURN_CHECK(mndSendConsumerMsg(pMnode, pConsumer->consumerId, TDMT_MND_TMQ_CONSUMER_RECOVER, &pMsg->info));
}
if (status != MQ_CONSUMER_STATUS_READY) { if (status != MQ_CONSUMER_STATUS_READY) {
mInfo("consumer:0x%" PRIx64 " not ready, status: %s", consumerId, mndConsumerStatusName(status)); mInfo("consumer:0x%" PRIx64 " not ready, status: %s", consumerId, mndConsumerStatusName(status));
code = TSDB_CODE_MND_CONSUMER_NOT_READY; code = TSDB_CODE_MND_CONSUMER_NOT_READY;
@ -566,7 +562,7 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
STrans *pTrans = NULL; STrans *pTrans = NULL;
SCMSubscribeReq subscribe = {0}; SCMSubscribeReq subscribe = {0};
MND_TMQ_RETURN_CHECK(tDeserializeSCMSubscribeReq(msgStr, &subscribe)); MND_TMQ_RETURN_CHECK(tDeserializeSCMSubscribeReq(msgStr, &subscribe, pMsg->contLen));
if(taosArrayGetSize(subscribe.topicNames) == 0){ if(taosArrayGetSize(subscribe.topicNames) == 0){
SMqConsumerObj *pConsumerTmp = NULL; SMqConsumerObj *pConsumerTmp = NULL;
MND_TMQ_RETURN_CHECK(mndAcquireConsumer(pMnode, subscribe.consumerId, &pConsumerTmp)); MND_TMQ_RETURN_CHECK(mndAcquireConsumer(pMnode, subscribe.consumerId, &pConsumerTmp));
@ -701,17 +697,17 @@ static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer) {
return 0; return 0;
} }
static void updateConsumerStatus(SMqConsumerObj *pConsumer) { //static void updateConsumerStatus(SMqConsumerObj *pConsumer) {
int32_t status = pConsumer->status; // int32_t status = pConsumer->status;
//
if (taosArrayGetSize(pConsumer->rebNewTopics) == 0 && taosArrayGetSize(pConsumer->rebRemovedTopics) == 0) { // if (taosArrayGetSize(pConsumer->rebNewTopics) == 0 && taosArrayGetSize(pConsumer->rebRemovedTopics) == 0) {
if (status == MQ_CONSUMER_STATUS_REBALANCE) { // if (status == MQ_CONSUMER_STATUS_REBALANCE) {
pConsumer->status = MQ_CONSUMER_STATUS_READY; // pConsumer->status = MQ_CONSUMER_STATUS_READY;
} else if (status == MQ_CONSUMER_STATUS_READY && taosArrayGetSize(pConsumer->currentTopics) == 0) { // } else if (status == MQ_CONSUMER_STATUS_READY && taosArrayGetSize(pConsumer->currentTopics) == 0) {
pConsumer->status = MQ_CONSUMER_STATUS_LOST; // pConsumer->status = MQ_CONSUMER_STATUS_LOST;
} // }
} // }
} //}
// remove from topic list // remove from topic list
static void removeFromTopicList(SArray *topicList, const char *pTopic, int64_t consumerId, char *type) { static void removeFromTopicList(SArray *topicList, const char *pTopic, int64_t consumerId, char *type) {
@ -757,21 +753,6 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
pOldConsumer->subscribeTime = taosGetTimestampMs(); pOldConsumer->subscribeTime = taosGetTimestampMs();
pOldConsumer->status = MQ_CONSUMER_STATUS_REBALANCE; pOldConsumer->status = MQ_CONSUMER_STATUS_REBALANCE;
mInfo("consumer:0x%" PRIx64 " subscribe update, modify existed consumer", pOldConsumer->consumerId); mInfo("consumer:0x%" PRIx64 " subscribe update, modify existed consumer", pOldConsumer->consumerId);
} else if (pNewConsumer->updateType == CONSUMER_UPDATE_REC) {
int32_t sz = taosArrayGetSize(pOldConsumer->assignedTopics);
for (int32_t i = 0; i < sz; i++) {
void * tmp = taosArrayGetP(pOldConsumer->assignedTopics, i);
if (tmp == NULL){
return TSDB_CODE_TMQ_INVALID_MSG;
}
char *topic = taosStrdup(tmp);
if (taosArrayPush(pOldConsumer->rebNewTopics, &topic) == NULL) {
taosMemoryFree(topic);
return TSDB_CODE_TMQ_INVALID_MSG;
}
}
pOldConsumer->status = MQ_CONSUMER_STATUS_REBALANCE;
mInfo("consumer:0x%" PRIx64 " recover update", pOldConsumer->consumerId);
} else if (pNewConsumer->updateType == CONSUMER_UPDATE_REB) { } else if (pNewConsumer->updateType == CONSUMER_UPDATE_REB) {
(void)atomic_add_fetch_32(&pOldConsumer->epoch, 1); (void)atomic_add_fetch_32(&pOldConsumer->epoch, 1);
pOldConsumer->rebalanceTime = taosGetTimestampMs(); pOldConsumer->rebalanceTime = taosGetTimestampMs();
@ -796,7 +777,11 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
} }
int32_t status = pOldConsumer->status; int32_t status = pOldConsumer->status;
updateConsumerStatus(pOldConsumer); // updateConsumerStatus(pOldConsumer);
if (taosArrayGetSize(pOldConsumer->rebNewTopics) == 0 && taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0) {
pOldConsumer->status = MQ_CONSUMER_STATUS_READY;
}
pOldConsumer->rebalanceTime = taosGetTimestampMs(); pOldConsumer->rebalanceTime = taosGetTimestampMs();
(void)atomic_add_fetch_32(&pOldConsumer->epoch, 1); (void)atomic_add_fetch_32(&pOldConsumer->epoch, 1);
@ -816,7 +801,10 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
removeFromTopicList(pOldConsumer->currentTopics, topic, pOldConsumer->consumerId, "current"); removeFromTopicList(pOldConsumer->currentTopics, topic, pOldConsumer->consumerId, "current");
int32_t status = pOldConsumer->status; int32_t status = pOldConsumer->status;
updateConsumerStatus(pOldConsumer); // updateConsumerStatus(pOldConsumer);
if (taosArrayGetSize(pOldConsumer->rebNewTopics) == 0 && taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0) {
pOldConsumer->status = MQ_CONSUMER_STATUS_READY;
}
pOldConsumer->rebalanceTime = taosGetTimestampMs(); pOldConsumer->rebalanceTime = taosGetTimestampMs();
(void)atomic_add_fetch_32(&pOldConsumer->epoch, 1); (void)atomic_add_fetch_32(&pOldConsumer->epoch, 1);
@ -852,6 +840,8 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *
int32_t numOfRows = 0; int32_t numOfRows = 0;
SMqConsumerObj *pConsumer = NULL; SMqConsumerObj *pConsumer = NULL;
int32_t code = 0; int32_t code = 0;
char *parasStr = NULL;
char *status = NULL;
while (numOfRows < rowsCapacity) { while (numOfRows < rowsCapacity) {
pShow->pIter = sdbFetch(pSdb, SDB_CONSUMER, pShow->pIter, (void **)&pConsumer); pShow->pIter = sdbFetch(pSdb, SDB_CONSUMER, pShow->pIter, (void **)&pConsumer);
@ -884,7 +874,7 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *
int32_t cols = 0; int32_t cols = 0;
// consumer id // consumer id
char consumerIdHex[32] = {0}; char consumerIdHex[TSDB_CONSUMER_ID_LEN + VARSTR_HEADER_SIZE] = {0};
(void)sprintf(varDataVal(consumerIdHex), "0x%" PRIx64, pConsumer->consumerId); (void)sprintf(varDataVal(consumerIdHex), "0x%" PRIx64, pConsumer->consumerId);
varDataSetLen(consumerIdHex, strlen(varDataVal(consumerIdHex))); varDataSetLen(consumerIdHex, strlen(varDataVal(consumerIdHex)));
@ -901,7 +891,7 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *
MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)cgroup, false)); MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)cgroup, false));
// client id // client id
char clientId[256 + VARSTR_HEADER_SIZE] = {0}; char clientId[TSDB_CLIENT_ID_LEN + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(clientId, pConsumer->clientId); STR_TO_VARSTR(clientId, pConsumer->clientId);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
@ -909,13 +899,15 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *
MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)clientId, false)); MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)clientId, false));
// status // status
char status[20 + VARSTR_HEADER_SIZE] = {0};
const char *pStatusName = mndConsumerStatusName(pConsumer->status); const char *pStatusName = mndConsumerStatusName(pConsumer->status);
status = taosMemoryCalloc(1, pShow->pMeta->pSchemas[cols].bytes);
MND_TMQ_NULL_CHECK(status);
STR_TO_VARSTR(status, pStatusName); STR_TO_VARSTR(status, pStatusName);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
MND_TMQ_NULL_CHECK(pColInfo); MND_TMQ_NULL_CHECK(pColInfo);
MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)status, false)); MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)status, false));
taosMemoryFreeClear(status);
// one subscribed topic // one subscribed topic
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
@ -948,7 +940,8 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *
STqOffsetVal pVal = {.type = pConsumer->resetOffsetCfg}; STqOffsetVal pVal = {.type = pConsumer->resetOffsetCfg};
tFormatOffset(buf, TSDB_OFFSET_LEN, &pVal); tFormatOffset(buf, TSDB_OFFSET_LEN, &pVal);
char parasStr[64 + TSDB_OFFSET_LEN + VARSTR_HEADER_SIZE] = {0}; parasStr = taosMemoryCalloc(1, pShow->pMeta->pSchemas[cols].bytes);
MND_TMQ_NULL_CHECK(parasStr);
(void)sprintf(varDataVal(parasStr), "tbname:%d,commit:%d,interval:%dms,reset:%s", pConsumer->withTbName, (void)sprintf(varDataVal(parasStr), "tbname:%d,commit:%d,interval:%dms,reset:%s", pConsumer->withTbName,
pConsumer->autoCommit, pConsumer->autoCommitInterval, buf); pConsumer->autoCommit, pConsumer->autoCommitInterval, buf);
varDataSetLen(parasStr, strlen(varDataVal(parasStr))); varDataSetLen(parasStr, strlen(varDataVal(parasStr)));
@ -956,7 +949,7 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
MND_TMQ_NULL_CHECK(pColInfo); MND_TMQ_NULL_CHECK(pColInfo);
MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)parasStr, false)); MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)parasStr, false));
taosMemoryFreeClear(parasStr);
numOfRows++; numOfRows++;
} }
@ -970,6 +963,8 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *
return numOfRows; return numOfRows;
END: END:
taosMemoryFreeClear(status);
taosMemoryFreeClear(parasStr);
return code; return code;
} }
@ -982,8 +977,8 @@ const char *mndConsumerStatusName(int status) {
switch (status) { switch (status) {
case MQ_CONSUMER_STATUS_READY: case MQ_CONSUMER_STATUS_READY:
return "ready"; return "ready";
case MQ_CONSUMER_STATUS_LOST: // case MQ_CONSUMER_STATUS_LOST:
return "lost"; // return "lost";
case MQ_CONSUMER_STATUS_REBALANCE: case MQ_CONSUMER_STATUS_REBALANCE:
return "rebalancing"; return "rebalancing";
default: default:

View File

@ -791,12 +791,12 @@ static int32_t mndCreateDb(SMnode *pMnode, SRpcMsg *pReq, SCreateDbReq *pCreate,
mndSetDefaultDbCfg(&dbObj.cfg); mndSetDefaultDbCfg(&dbObj.cfg);
if ((code = mndCheckDbName(dbObj.name, pUser)) != 0) { if ((code = mndCheckDbName(dbObj.name, pUser)) != 0) {
mError("db:%s, failed to create since %s", pCreate->db, terrstr()); mError("db:%s, failed to create, check db name failed, since %s", pCreate->db, terrstr());
TAOS_RETURN(code); TAOS_RETURN(code);
} }
if ((code = mndCheckDbCfg(pMnode, &dbObj.cfg)) != 0) { if ((code = mndCheckDbCfg(pMnode, &dbObj.cfg)) != 0) {
mError("db:%s, failed to create since %s", pCreate->db, terrstr()); mError("db:%s, failed to create, check db cfg failed, since %s", pCreate->db, terrstr());
TAOS_RETURN(code); TAOS_RETURN(code);
} }
@ -812,7 +812,7 @@ static int32_t mndCreateDb(SMnode *pMnode, SRpcMsg *pReq, SCreateDbReq *pCreate,
SVgObj *pVgroups = NULL; SVgObj *pVgroups = NULL;
if ((code = mndAllocVgroup(pMnode, &dbObj, &pVgroups)) != 0) { if ((code = mndAllocVgroup(pMnode, &dbObj, &pVgroups)) != 0) {
mError("db:%s, failed to create since %s", pCreate->db, terrstr()); mError("db:%s, failed to create, alloc vgroup failed, since %s", pCreate->db, terrstr());
TAOS_RETURN(code); TAOS_RETURN(code);
} }
@ -965,7 +965,7 @@ static int32_t mndProcessCreateDbReq(SRpcMsg *pReq) {
TAOS_CHECK_GOTO(mndAcquireUser(pMnode, pReq->info.conn.user, &pUser), &lino, _OVER); TAOS_CHECK_GOTO(mndAcquireUser(pMnode, pReq->info.conn.user, &pUser), &lino, _OVER);
code = mndCreateDb(pMnode, pReq, &createReq, pUser); TAOS_CHECK_GOTO(mndCreateDb(pMnode, pReq, &createReq, pUser), &lino, _OVER);
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS; if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
SName name = {0}; SName name = {0};

View File

@ -288,6 +288,7 @@ int32_t tNewSMqConsumerObj(int64_t consumerId, char *cgroup, int8_t updateType,
pConsumer->epoch = 0; pConsumer->epoch = 0;
pConsumer->status = MQ_CONSUMER_STATUS_REBALANCE; pConsumer->status = MQ_CONSUMER_STATUS_REBALANCE;
pConsumer->hbStatus = 0; pConsumer->hbStatus = 0;
pConsumer->pollStatus = 0;
taosInitRWLatch(&pConsumer->lock); taosInitRWLatch(&pConsumer->lock);
pConsumer->createTime = taosGetTimestampMs(); pConsumer->createTime = taosGetTimestampMs();
@ -322,6 +323,8 @@ int32_t tNewSMqConsumerObj(int64_t consumerId, char *cgroup, int8_t updateType,
pConsumer->autoCommit = subscribe->autoCommit; pConsumer->autoCommit = subscribe->autoCommit;
pConsumer->autoCommitInterval = subscribe->autoCommitInterval; pConsumer->autoCommitInterval = subscribe->autoCommitInterval;
pConsumer->resetOffsetCfg = subscribe->resetOffsetCfg; pConsumer->resetOffsetCfg = subscribe->resetOffsetCfg;
pConsumer->maxPollIntervalMs = subscribe->maxPollIntervalMs;
pConsumer->sessionTimeoutMs = subscribe->sessionTimeoutMs;
pConsumer->rebNewTopics = taosArrayDup(subscribe->topicNames, topicNameDup); pConsumer->rebNewTopics = taosArrayDup(subscribe->topicNames, topicNameDup);
if (pConsumer->rebNewTopics == NULL){ if (pConsumer->rebNewTopics == NULL){
@ -424,6 +427,8 @@ int32_t tEncodeSMqConsumerObj(void **buf, const SMqConsumerObj *pConsumer) {
tlen += taosEncodeFixedI8(buf, pConsumer->autoCommit); tlen += taosEncodeFixedI8(buf, pConsumer->autoCommit);
tlen += taosEncodeFixedI32(buf, pConsumer->autoCommitInterval); tlen += taosEncodeFixedI32(buf, pConsumer->autoCommitInterval);
tlen += taosEncodeFixedI32(buf, pConsumer->resetOffsetCfg); tlen += taosEncodeFixedI32(buf, pConsumer->resetOffsetCfg);
tlen += taosEncodeFixedI32(buf, pConsumer->maxPollIntervalMs);
tlen += taosEncodeFixedI32(buf, pConsumer->sessionTimeoutMs);
return tlen; return tlen;
} }
@ -495,6 +500,14 @@ void *tDecodeSMqConsumerObj(const void *buf, SMqConsumerObj *pConsumer, int8_t s
buf = taosDecodeFixedI32(buf, &pConsumer->autoCommitInterval); buf = taosDecodeFixedI32(buf, &pConsumer->autoCommitInterval);
buf = taosDecodeFixedI32(buf, &pConsumer->resetOffsetCfg); buf = taosDecodeFixedI32(buf, &pConsumer->resetOffsetCfg);
} }
if (sver > 2){
buf = taosDecodeFixedI32(buf, &pConsumer->maxPollIntervalMs);
buf = taosDecodeFixedI32(buf, &pConsumer->sessionTimeoutMs);
} else{
pConsumer->maxPollIntervalMs = DEFAULT_MAX_POLL_INTERVAL;
pConsumer->sessionTimeoutMs = DEFAULT_SESSION_TIMEOUT;
}
return (void *)buf; return (void *)buf;
} }

View File

@ -669,6 +669,13 @@ SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) {
} }
(void)memset(pMnode, 0, sizeof(SMnode)); (void)memset(pMnode, 0, sizeof(SMnode));
int32_t code = taosThreadRwlockInit(&pMnode->lock, NULL);
if (code != 0) {
taosMemoryFree(pMnode);
mError("failed to open mnode lock since %s", tstrerror(code));
return NULL;
}
char timestr[24] = "1970-01-01 00:00:00.00"; char timestr[24] = "1970-01-01 00:00:00.00";
(void)taosParseTime(timestr, &pMnode->checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0); (void)taosParseTime(timestr, &pMnode->checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0);
mndSetOptions(pMnode, pOption); mndSetOptions(pMnode, pOption);
@ -682,7 +689,7 @@ SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) {
return NULL; return NULL;
} }
int32_t code = mndCreateDir(pMnode, path); code = mndCreateDir(pMnode, path);
if (code != 0) { if (code != 0) {
code = terrno; code = terrno;
mError("failed to open mnode since %s", tstrerror(code)); mError("failed to open mnode since %s", tstrerror(code));

View File

@ -290,7 +290,7 @@ static int32_t createSchemaByFields(const SArray *pFields, SSchemaWrapper *pWrap
pWrapper->nCols = taosArrayGetSize(pFields); pWrapper->nCols = taosArrayGetSize(pFields);
pWrapper->pSchema = taosMemoryCalloc(pWrapper->nCols, sizeof(SSchema)); pWrapper->pSchema = taosMemoryCalloc(pWrapper->nCols, sizeof(SSchema));
if (NULL == pWrapper->pSchema) { if (NULL == pWrapper->pSchema) {
return TSDB_CODE_OUT_OF_MEMORY; return terrno;
} }
SNode *pNode; SNode *pNode;
@ -328,15 +328,18 @@ static bool hasDestPrimaryKey(SSchemaWrapper *pWrapper) {
static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj, SCMCreateStreamReq *pCreate) { static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj, SCMCreateStreamReq *pCreate) {
SNode *pAst = NULL; SNode *pAst = NULL;
SQueryPlan *pPlan = NULL; SQueryPlan *pPlan = NULL;
int32_t code = 0;
mInfo("stream:%s to create", pCreate->name); mInfo("stream:%s to create", pCreate->name);
memcpy(pObj->name, pCreate->name, TSDB_STREAM_FNAME_LEN); memcpy(pObj->name, pCreate->name, TSDB_STREAM_FNAME_LEN);
pObj->createTime = taosGetTimestampMs(); pObj->createTime = taosGetTimestampMs();
pObj->updateTime = pObj->createTime; pObj->updateTime = pObj->createTime;
pObj->version = 1; pObj->version = 1;
if (pCreate->smaId > 0) { if (pCreate->smaId > 0) {
pObj->subTableWithoutMd5 = 1; pObj->subTableWithoutMd5 = 1;
} }
pObj->smaId = pCreate->smaId; pObj->smaId = pCreate->smaId;
pObj->indexForMultiAggBalance = -1; pObj->indexForMultiAggBalance = -1;
@ -360,8 +363,10 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj,
SDbObj *pSourceDb = mndAcquireDb(pMnode, pCreate->sourceDB); SDbObj *pSourceDb = mndAcquireDb(pMnode, pCreate->sourceDB);
if (pSourceDb == NULL) { if (pSourceDb == NULL) {
mInfo("stream:%s failed to create, source db %s not exist since %s", pCreate->name, pObj->sourceDb, terrstr()); mInfo("stream:%s failed to create, source db %s not exist since %s", pCreate->name, pObj->sourceDb, terrstr());
return terrno; code = terrno;
goto FAIL;
} }
pObj->sourceDbUid = pSourceDb->uid; pObj->sourceDbUid = pSourceDb->uid;
mndReleaseDb(pMnode, pSourceDb); mndReleaseDb(pMnode, pSourceDb);
@ -369,9 +374,11 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj,
SDbObj *pTargetDb = mndAcquireDbByStb(pMnode, pObj->targetSTbName); SDbObj *pTargetDb = mndAcquireDbByStb(pMnode, pObj->targetSTbName);
if (pTargetDb == NULL) { if (pTargetDb == NULL) {
mInfo("stream:%s failed to create, target db %s not exist since %s", pCreate->name, pObj->targetDb, terrstr()); mError("stream:%s failed to create, target db %s not exist since %s", pCreate->name, pObj->targetDb, terrstr());
return terrno; code = terrno;
goto FAIL;
} }
tstrncpy(pObj->targetDb, pTargetDb->name, TSDB_DB_FNAME_LEN); tstrncpy(pObj->targetDb, pTargetDb->name, TSDB_DB_FNAME_LEN);
if (pCreate->createStb == STREAM_CREATE_STABLE_TRUE) { if (pCreate->createStb == STREAM_CREATE_STABLE_TRUE) {
@ -389,12 +396,12 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj,
pCreate->ast = NULL; pCreate->ast = NULL;
// deserialize ast // deserialize ast
if (nodesStringToNode(pObj->ast, &pAst) < 0) { if ((code = nodesStringToNode(pObj->ast, &pAst)) < 0) {
goto FAIL; goto FAIL;
} }
// create output schema // create output schema
if (createSchemaByFields(pCreate->pCols, &pObj->outputSchema) != TSDB_CODE_SUCCESS) { if ((code = createSchemaByFields(pCreate->pCols, &pObj->outputSchema)) != TSDB_CODE_SUCCESS) {
goto FAIL; goto FAIL;
} }
@ -403,6 +410,7 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj,
pObj->outputSchema.nCols += numOfNULL; pObj->outputSchema.nCols += numOfNULL;
SSchema *pFullSchema = taosMemoryCalloc(pObj->outputSchema.nCols, sizeof(SSchema)); SSchema *pFullSchema = taosMemoryCalloc(pObj->outputSchema.nCols, sizeof(SSchema));
if (!pFullSchema) { if (!pFullSchema) {
code = terrno;
goto FAIL; goto FAIL;
} }
@ -410,6 +418,10 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj,
int32_t dataIndex = 0; int32_t dataIndex = 0;
for (int16_t i = 0; i < pObj->outputSchema.nCols; i++) { for (int16_t i = 0; i < pObj->outputSchema.nCols; i++) {
SColLocation *pos = taosArrayGet(pCreate->fillNullCols, nullIndex); SColLocation *pos = taosArrayGet(pCreate->fillNullCols, nullIndex);
if (pos == NULL) {
continue;
}
if (nullIndex >= numOfNULL || i < pos->slotId) { if (nullIndex >= numOfNULL || i < pos->slotId) {
pFullSchema[i].bytes = pObj->outputSchema.pSchema[dataIndex].bytes; pFullSchema[i].bytes = pObj->outputSchema.pSchema[dataIndex].bytes;
pFullSchema[i].colId = i + 1; // pObj->outputSchema.pSchema[dataIndex].colId; pFullSchema[i].colId = i + 1; // pObj->outputSchema.pSchema[dataIndex].colId;
@ -444,22 +456,31 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj,
}; };
// using ast and param to build physical plan // using ast and param to build physical plan
if (qCreateQueryPlan(&cxt, &pPlan, NULL) < 0) { if ((code = qCreateQueryPlan(&cxt, &pPlan, NULL)) < 0) {
goto FAIL; goto FAIL;
} }
// save physcial plan // save physcial plan
if (nodesNodeToString((SNode *)pPlan, false, &pObj->physicalPlan, NULL) != 0) { if ((code = nodesNodeToString((SNode *)pPlan, false, &pObj->physicalPlan, NULL)) != 0) {
goto FAIL; goto FAIL;
} }
pObj->tagSchema.nCols = pCreate->numOfTags; pObj->tagSchema.nCols = pCreate->numOfTags;
if (pCreate->numOfTags) { if (pCreate->numOfTags) {
pObj->tagSchema.pSchema = taosMemoryCalloc(pCreate->numOfTags, sizeof(SSchema)); pObj->tagSchema.pSchema = taosMemoryCalloc(pCreate->numOfTags, sizeof(SSchema));
if (pObj->tagSchema.pSchema == NULL) {
code = terrno;
goto FAIL;
}
} }
/*A(pCreate->numOfTags == taosArrayGetSize(pCreate->pTags));*/ /*A(pCreate->numOfTags == taosArrayGetSize(pCreate->pTags));*/
for (int32_t i = 0; i < pCreate->numOfTags; i++) { for (int32_t i = 0; i < pCreate->numOfTags; i++) {
SField *pField = taosArrayGet(pCreate->pTags, i); SField *pField = taosArrayGet(pCreate->pTags, i);
if (pField == NULL) {
continue;
}
pObj->tagSchema.pSchema[i].colId = pObj->outputSchema.nCols + i + 1; pObj->tagSchema.pSchema[i].colId = pObj->outputSchema.nCols + i + 1;
pObj->tagSchema.pSchema[i].bytes = pField->bytes; pObj->tagSchema.pSchema[i].bytes = pField->bytes;
pObj->tagSchema.pSchema[i].flags = pField->flags; pObj->tagSchema.pSchema[i].flags = pField->flags;
@ -470,7 +491,7 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj,
FAIL: FAIL:
if (pAst != NULL) nodesDestroyNode(pAst); if (pAst != NULL) nodesDestroyNode(pAst);
if (pPlan != NULL) qDestroyQueryPlan(pPlan); if (pPlan != NULL) qDestroyQueryPlan(pPlan);
return 0; return code;
} }
int32_t mndPersistTaskDeployReq(STrans *pTrans, SStreamTask *pTask) { int32_t mndPersistTaskDeployReq(STrans *pTrans, SStreamTask *pTask) {
@ -575,12 +596,15 @@ int32_t mndPersistStream(STrans *pTrans, SStreamObj *pStream) {
static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStreamObj *pStream, const char *user) { static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStreamObj *pStream, const char *user) {
SStbObj *pStb = NULL; SStbObj *pStb = NULL;
SDbObj *pDb = NULL; SDbObj *pDb = NULL;
int32_t code = 0;
int32_t lino = 0;
SMCreateStbReq createReq = {0}; SMCreateStbReq createReq = {0};
tstrncpy(createReq.name, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN); tstrncpy(createReq.name, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
createReq.numOfColumns = pStream->outputSchema.nCols; createReq.numOfColumns = pStream->outputSchema.nCols;
createReq.numOfTags = 1; // group id createReq.numOfTags = 1; // group id
createReq.pColumns = taosArrayInit_s(sizeof(SFieldWithOptions), createReq.numOfColumns); createReq.pColumns = taosArrayInit_s(sizeof(SFieldWithOptions), createReq.numOfColumns);
TSDB_CHECK_NULL(createReq.pColumns, code, lino, _OVER, terrno);
// build fields // build fields
for (int32_t i = 0; i < createReq.numOfColumns; i++) { for (int32_t i = 0; i < createReq.numOfColumns; i++) {
@ -595,6 +619,8 @@ static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStre
if (pStream->tagSchema.nCols == 0) { if (pStream->tagSchema.nCols == 0) {
createReq.numOfTags = 1; createReq.numOfTags = 1;
createReq.pTags = taosArrayInit_s(sizeof(SField), 1); createReq.pTags = taosArrayInit_s(sizeof(SField), 1);
TSDB_CHECK_NULL(createReq.pTags, code, lino, _OVER, terrno);
// build tags // build tags
SField *pField = taosArrayGet(createReq.pTags, 0); SField *pField = taosArrayGet(createReq.pTags, 0);
strcpy(pField->name, "group_id"); strcpy(pField->name, "group_id");
@ -604,6 +630,8 @@ static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStre
} else { } else {
createReq.numOfTags = pStream->tagSchema.nCols; createReq.numOfTags = pStream->tagSchema.nCols;
createReq.pTags = taosArrayInit_s(sizeof(SField), createReq.numOfTags); createReq.pTags = taosArrayInit_s(sizeof(SField), createReq.numOfTags);
TSDB_CHECK_NULL(createReq.pTags, code, lino, _OVER, terrno);
for (int32_t i = 0; i < createReq.numOfTags; i++) { for (int32_t i = 0; i < createReq.numOfTags; i++) {
SField *pField = taosArrayGet(createReq.pTags, i); SField *pField = taosArrayGet(createReq.pTags, i);
pField->bytes = pStream->tagSchema.pSchema[i].bytes; pField->bytes = pStream->tagSchema.pSchema[i].bytes;
@ -657,7 +685,7 @@ static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStre
mndReleaseStb(pMnode, pStb); mndReleaseStb(pMnode, pStb);
mndReleaseDb(pMnode, pDb); mndReleaseDb(pMnode, pDb);
mDebug("stream:%s create dst stable:%s, cols:%d", pStream->name, pStream->targetSTbName, pStream->outputSchema.nCols); mDebug("stream:%s create dst stable:%s, cols:%d", pStream->name, pStream->targetSTbName, pStream->outputSchema.nCols);
return 0; return code;
_OVER: _OVER:
tFreeSMCreateStbReq(&createReq); tFreeSMCreateStbReq(&createReq);
@ -665,7 +693,7 @@ _OVER:
mndReleaseDb(pMnode, pDb); mndReleaseDb(pMnode, pDb);
mDebug("stream:%s failed to create dst stable:%s, code:%s", pStream->name, pStream->targetSTbName, tstrerror(terrno)); mDebug("stream:%s failed to create dst stable:%s, code:%s", pStream->name, pStream->targetSTbName, tstrerror(terrno));
return -1; return code;
} }
// 1. stream number check // 1. stream number check
@ -709,9 +737,9 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
char *sql = NULL; char *sql = NULL;
int32_t sqlLen = 0; int32_t sqlLen = 0;
const char *pMsg = "create stream tasks on dnodes"; const char *pMsg = "create stream tasks on dnodes";
int32_t code = 0; int32_t code = TSDB_CODE_SUCCESS;
terrno = TSDB_CODE_SUCCESS;
terrno = TSDB_CODE_SUCCESS;
SCMCreateStreamReq createReq = {0}; SCMCreateStreamReq createReq = {0};
if (tDeserializeSCMCreateStreamReq(pReq->pCont, pReq->contLen, &createReq) != 0) { if (tDeserializeSCMCreateStreamReq(pReq->pCont, pReq->contLen, &createReq) != 0) {
code = TSDB_CODE_INVALID_MSG; code = TSDB_CODE_INVALID_MSG;
@ -749,6 +777,11 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
if (createReq.sql != NULL) { if (createReq.sql != NULL) {
sqlLen = strlen(createReq.sql); sqlLen = strlen(createReq.sql);
sql = taosMemoryMalloc(sqlLen + 1); sql = taosMemoryMalloc(sqlLen + 1);
if (sql == NULL) {
code = terrno;
goto _OVER;
}
memset(sql, 0, sqlLen + 1); memset(sql, 0, sqlLen + 1);
memcpy(sql, createReq.sql, sqlLen); memcpy(sql, createReq.sql, sqlLen);
} }
@ -942,8 +975,7 @@ static int32_t mndBuildStreamCheckpointSourceReq(void **pBuf, int32_t *pLen, int
void *buf = taosMemoryMalloc(tlen); void *buf = taosMemoryMalloc(tlen);
if (buf == NULL) { if (buf == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; return terrno;
return -1;
} }
void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
@ -1150,7 +1182,11 @@ static int32_t mndCheckTaskAndNodeStatus(SMnode *pMnode) {
SArray *pInvalidList = taosArrayInit(4, sizeof(STaskId)); SArray *pInvalidList = taosArrayInit(4, sizeof(STaskId));
for (int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) { for (int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) {
STaskId *p = taosArrayGet(execInfo.pTaskList, i); STaskId *p = taosArrayGet(execInfo.pTaskList, i);
if (p == NULL) {
continue;
}
STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, p, sizeof(*p)); STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, p, sizeof(*p));
if (pEntry == NULL) { if (pEntry == NULL) {
continue; continue;
@ -1159,8 +1195,12 @@ static int32_t mndCheckTaskAndNodeStatus(SMnode *pMnode) {
if (pEntry->status == TASK_STATUS__STOP) { if (pEntry->status == TASK_STATUS__STOP) {
for (int32_t j = 0; j < taosArrayGetSize(pInvalidList); ++j) { for (int32_t j = 0; j < taosArrayGetSize(pInvalidList); ++j) {
STaskId *pId = taosArrayGet(pInvalidList, j); STaskId *pId = taosArrayGet(pInvalidList, j);
if (pId == NULL) {
continue;
}
if (pEntry->id.streamId == pId->streamId) { if (pEntry->id.streamId == pId->streamId) {
void* px = taosArrayPush(pInvalidList, &pEntry->id); void *px = taosArrayPush(pInvalidList, &pEntry->id);
if (px == NULL) { if (px == NULL) {
mError("failed to put stream into invalid list, code:%s", tstrerror(TSDB_CODE_OUT_OF_MEMORY)); mError("failed to put stream into invalid list, code:%s", tstrerror(TSDB_CODE_OUT_OF_MEMORY));
} }
@ -1243,6 +1283,10 @@ static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) {
} }
SArray *pList = taosArrayInit(4, sizeof(SCheckpointInterval)); SArray *pList = taosArrayInit(4, sizeof(SCheckpointInterval));
if (pList == NULL) {
return -1;
}
int64_t now = taosGetTimestampMs(); int64_t now = taosGetTimestampMs();
while ((pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream)) != NULL) { while ((pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream)) != NULL) {
@ -2472,14 +2516,15 @@ int32_t mndProcessCheckpointReport(SRpcMsg *pReq) {
SArray **pReqTaskList = (SArray **)taosHashGet(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId)); SArray **pReqTaskList = (SArray **)taosHashGet(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId));
if (pReqTaskList == NULL) { if (pReqTaskList == NULL) {
SArray *pList = taosArrayInit(4, sizeof(STaskChkptInfo)); SArray *pList = taosArrayInit(4, sizeof(STaskChkptInfo));
doAddReportStreamTask(pList, &req); if (pList != NULL) {
doAddReportStreamTask(pList, &req);
code = taosHashPut(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId), &pList, POINTER_BYTES);
if (code) {
mError("stream:0x%" PRIx64 " failed to put into checkpoint stream", req.streamId);
}
code = taosHashPut(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId), &pList, POINTER_BYTES); pReqTaskList = (SArray **)taosHashGet(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId));
if (code) {
mError("stream:0x%"PRIx64 " failed to put into checkpoint stream", req.streamId);
} }
pReqTaskList = (SArray **)taosHashGet(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId));
} else { } else {
doAddReportStreamTask(*pReqTaskList, &req); doAddReportStreamTask(*pReqTaskList, &req);
} }
@ -2545,6 +2590,9 @@ int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) {
SMnode *pMnode = pMsg->info.node; SMnode *pMnode = pMsg->info.node;
int64_t now = taosGetTimestampMs(); int64_t now = taosGetTimestampMs();
SArray *pStreamList = taosArrayInit(4, sizeof(int64_t)); SArray *pStreamList = taosArrayInit(4, sizeof(int64_t));
if (pStreamList == NULL) {
return terrno;
}
mDebug("start to process consensus-checkpointId in tmr"); mDebug("start to process consensus-checkpointId in tmr");
@ -2572,6 +2620,9 @@ int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) {
int64_t streamId = -1; int64_t streamId = -1;
int32_t num = taosArrayGetSize(pInfo->pTaskList); int32_t num = taosArrayGetSize(pInfo->pTaskList);
SArray *pList = taosArrayInit(4, sizeof(int32_t)); SArray *pList = taosArrayInit(4, sizeof(int32_t));
if (pList == NULL) {
continue;
}
SStreamObj *pStream = NULL; SStreamObj *pStream = NULL;
code = mndGetStreamObj(pMnode, pInfo->streamId, &pStream); code = mndGetStreamObj(pMnode, pInfo->streamId, &pStream);

View File

@ -27,8 +27,7 @@
#define MND_SUBSCRIBE_VER_NUMBER 3 #define MND_SUBSCRIBE_VER_NUMBER 3
#define MND_SUBSCRIBE_RESERVE_SIZE 64 #define MND_SUBSCRIBE_RESERVE_SIZE 64
#define MND_CONSUMER_LOST_HB_CNT 6 //#define MND_CONSUMER_LOST_HB_CNT 6
#define MND_CONSUMER_LOST_CLEAR_THRESHOLD 43200
static int32_t mqRebInExecCnt = 0; static int32_t mqRebInExecCnt = 0;
@ -331,6 +330,7 @@ static int32_t processRemoveAddVgs(SMnode *pMnode, SMqRebOutputObj *pOutput) {
int32_t code = 0; int32_t code = 0;
int32_t totalVgNum = 0; int32_t totalVgNum = 0;
SVgObj *pVgroup = NULL; SVgObj *pVgroup = NULL;
SMqVgEp *pVgEp = NULL;
void *pIter = NULL; void *pIter = NULL;
SArray *newVgs = taosArrayInit(0, POINTER_BYTES); SArray *newVgs = taosArrayInit(0, POINTER_BYTES);
MND_TMQ_NULL_CHECK(newVgs); MND_TMQ_NULL_CHECK(newVgs);
@ -346,11 +346,12 @@ static int32_t processRemoveAddVgs(SMnode *pMnode, SMqRebOutputObj *pOutput) {
} }
totalVgNum++; totalVgNum++;
SMqVgEp *pVgEp = taosMemoryMalloc(sizeof(SMqVgEp)); pVgEp = taosMemoryMalloc(sizeof(SMqVgEp));
MND_TMQ_NULL_CHECK(pVgEp); MND_TMQ_NULL_CHECK(pVgEp);
pVgEp->epSet = mndGetVgroupEpset(pMnode, pVgroup); pVgEp->epSet = mndGetVgroupEpset(pMnode, pVgroup);
pVgEp->vgId = pVgroup->vgId; pVgEp->vgId = pVgroup->vgId;
MND_TMQ_NULL_CHECK(taosArrayPush(newVgs, &pVgEp)); MND_TMQ_NULL_CHECK(taosArrayPush(newVgs, &pVgEp));
pVgEp = NULL;
sdbRelease(pMnode->pSdb, pVgroup); sdbRelease(pMnode->pSdb, pVgroup);
} }
@ -361,13 +362,13 @@ static int32_t processRemoveAddVgs(SMnode *pMnode, SMqRebOutputObj *pOutput) {
SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter; SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
int32_t j = 0; int32_t j = 0;
while (j < taosArrayGetSize(pConsumerEp->vgs)) { while (j < taosArrayGetSize(pConsumerEp->vgs)) {
SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, j); SMqVgEp *pVgEpTmp = taosArrayGetP(pConsumerEp->vgs, j);
MND_TMQ_NULL_CHECK(pVgEp); MND_TMQ_NULL_CHECK(pVgEpTmp);
bool find = false; bool find = false;
for (int32_t k = 0; k < taosArrayGetSize(newVgs); k++) { for (int32_t k = 0; k < taosArrayGetSize(newVgs); k++) {
SMqVgEp *pnewVgEp = taosArrayGetP(newVgs, k); SMqVgEp *pnewVgEp = taosArrayGetP(newVgs, k);
MND_TMQ_NULL_CHECK(pnewVgEp); MND_TMQ_NULL_CHECK(pnewVgEp);
if (pVgEp->vgId == pnewVgEp->vgId) { if (pVgEpTmp->vgId == pnewVgEp->vgId) {
tDeleteSMqVgEp(pnewVgEp); tDeleteSMqVgEp(pnewVgEp);
taosArrayRemove(newVgs, k); taosArrayRemove(newVgs, k);
find = true; find = true;
@ -375,8 +376,8 @@ static int32_t processRemoveAddVgs(SMnode *pMnode, SMqRebOutputObj *pOutput) {
} }
} }
if (!find) { if (!find) {
mInfo("[rebalance] processRemoveAddVgs old vgId:%d", pVgEp->vgId); mInfo("[rebalance] processRemoveAddVgs old vgId:%d", pVgEpTmp->vgId);
tDeleteSMqVgEp(pVgEp); tDeleteSMqVgEp(pVgEpTmp);
taosArrayRemove(pConsumerEp->vgs, j); taosArrayRemove(pConsumerEp->vgs, j);
continue; continue;
} }
@ -387,12 +388,16 @@ static int32_t processRemoveAddVgs(SMnode *pMnode, SMqRebOutputObj *pOutput) {
if (taosArrayGetSize(pOutput->pSub->unassignedVgs) == 0 && taosArrayGetSize(newVgs) != 0) { if (taosArrayGetSize(pOutput->pSub->unassignedVgs) == 0 && taosArrayGetSize(newVgs) != 0) {
MND_TMQ_NULL_CHECK(taosArrayAddAll(pOutput->pSub->unassignedVgs, newVgs)); MND_TMQ_NULL_CHECK(taosArrayAddAll(pOutput->pSub->unassignedVgs, newVgs));
mInfo("[rebalance] processRemoveAddVgs add new vg num:%d", (int)taosArrayGetSize(newVgs)); mInfo("[rebalance] processRemoveAddVgs add new vg num:%d", (int)taosArrayGetSize(newVgs));
(void)taosArrayDestroy(newVgs); taosArrayDestroy(newVgs);
} else { } else {
(void)taosArrayDestroyP(newVgs, (FDelete)tDeleteSMqVgEp); taosArrayDestroyP(newVgs, (FDelete)tDeleteSMqVgEp);
} }
return totalVgNum; return totalVgNum;
END: END:
sdbRelease(pMnode->pSdb, pVgroup);
taosMemoryFree(pVgEp);
taosArrayDestroyP(newVgs, (FDelete)tDeleteSMqVgEp);
return code; return code;
} }
@ -758,32 +763,32 @@ static int32_t mndCheckConsumer(SRpcMsg *pMsg, SHashObj *rebSubHash) {
} }
int32_t hbStatus = atomic_add_fetch_32(&pConsumer->hbStatus, 1); int32_t hbStatus = atomic_add_fetch_32(&pConsumer->hbStatus, 1);
int32_t pollStatus = atomic_add_fetch_32(&pConsumer->pollStatus, 1);
int32_t status = atomic_load_32(&pConsumer->status); int32_t status = atomic_load_32(&pConsumer->status);
mDebug("[rebalance] check for consumer:0x%" PRIx64 " status:%d(%s), sub-time:%" PRId64 ", createTime:%" PRId64 mDebug("[rebalance] check for consumer:0x%" PRIx64 " status:%d(%s), sub-time:%" PRId64 ", createTime:%" PRId64
", hbstatus:%d", ", hbstatus:%d, pollStatus:%d",
pConsumer->consumerId, status, mndConsumerStatusName(status), pConsumer->subscribeTime, pConsumer->consumerId, status, mndConsumerStatusName(status), pConsumer->subscribeTime,
pConsumer->createTime, hbStatus); pConsumer->createTime, hbStatus, pollStatus);
if (status == MQ_CONSUMER_STATUS_READY) { if (status == MQ_CONSUMER_STATUS_READY) {
if (taosArrayGetSize(pConsumer->assignedTopics) == 0) { // unsubscribe or close if (taosArrayGetSize(pConsumer->currentTopics) == 0) { // unsubscribe or close
MND_TMQ_RETURN_CHECK(mndSendConsumerMsg(pMnode, pConsumer->consumerId, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, &pMsg->info)); MND_TMQ_RETURN_CHECK(mndSendConsumerMsg(pMnode, pConsumer->consumerId, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, &pMsg->info));
} else if (hbStatus > MND_CONSUMER_LOST_HB_CNT) { } else if (hbStatus * tsMqRebalanceInterval * 1000 >= pConsumer->sessionTimeoutMs ||
pollStatus * tsMqRebalanceInterval * 1000 >= pConsumer->maxPollIntervalMs) {
taosRLockLatch(&pConsumer->lock); taosRLockLatch(&pConsumer->lock);
MND_TMQ_RETURN_CHECK(buildRebInfo(rebSubHash, pConsumer->currentTopics, 0, pConsumer->cgroup, pConsumer->consumerId)); MND_TMQ_RETURN_CHECK(buildRebInfo(rebSubHash, pConsumer->currentTopics, 0, pConsumer->cgroup, pConsumer->consumerId));
taosRUnLockLatch(&pConsumer->lock); taosRUnLockLatch(&pConsumer->lock);
} else { } else {
checkForVgroupSplit(pMnode, pConsumer, rebSubHash); checkForVgroupSplit(pMnode, pConsumer, rebSubHash);
} }
} else if (status == MQ_CONSUMER_STATUS_LOST) { } else if (status == MQ_CONSUMER_STATUS_REBALANCE) {
if (hbStatus > MND_CONSUMER_LOST_CLEAR_THRESHOLD) { // clear consumer if lost a day
MND_TMQ_RETURN_CHECK(mndSendConsumerMsg(pMnode, pConsumer->consumerId, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, &pMsg->info));
}
} else {
taosRLockLatch(&pConsumer->lock); taosRLockLatch(&pConsumer->lock);
MND_TMQ_RETURN_CHECK(buildRebInfo(rebSubHash, pConsumer->rebNewTopics, 1, pConsumer->cgroup, pConsumer->consumerId)); MND_TMQ_RETURN_CHECK(buildRebInfo(rebSubHash, pConsumer->rebNewTopics, 1, pConsumer->cgroup, pConsumer->consumerId));
MND_TMQ_RETURN_CHECK(buildRebInfo(rebSubHash, pConsumer->rebRemovedTopics, 0, pConsumer->cgroup, pConsumer->consumerId)); MND_TMQ_RETURN_CHECK(buildRebInfo(rebSubHash, pConsumer->rebRemovedTopics, 0, pConsumer->cgroup, pConsumer->consumerId));
taosRUnLockLatch(&pConsumer->lock); taosRUnLockLatch(&pConsumer->lock);
} else {
MND_TMQ_RETURN_CHECK(mndSendConsumerMsg(pMnode, pConsumer->consumerId, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, &pMsg->info));
} }
mndReleaseConsumer(pMnode, pConsumer); mndReleaseConsumer(pMnode, pConsumer);
@ -1013,37 +1018,37 @@ END:
return code; return code;
} }
static int32_t mndDropConsumerByGroup(SMnode *pMnode, STrans *pTrans, char *cgroup, char *topic) { //static int32_t mndDropConsumerByGroup(SMnode *pMnode, STrans *pTrans, char *cgroup, char *topic) {
void *pIter = NULL; // void *pIter = NULL;
SMqConsumerObj *pConsumer = NULL; // SMqConsumerObj *pConsumer = NULL;
int code = 0; // int code = 0;
while (1) { // while (1) {
pIter = sdbFetch(pMnode->pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer); // pIter = sdbFetch(pMnode->pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
if (pIter == NULL) { // if (pIter == NULL) {
break; // break;
} // }
//
// drop consumer in lost status, other consumers not in lost status already deleted by rebalance // // drop consumer in lost status, other consumers not in lost status already deleted by rebalance
if (pConsumer->status != MQ_CONSUMER_STATUS_LOST || strcmp(cgroup, pConsumer->cgroup) != 0) { // if (pConsumer->status != MQ_CONSUMER_STATUS_LOST || strcmp(cgroup, pConsumer->cgroup) != 0) {
sdbRelease(pMnode->pSdb, pConsumer); // sdbRelease(pMnode->pSdb, pConsumer);
continue; // continue;
} // }
int32_t sz = taosArrayGetSize(pConsumer->assignedTopics); // int32_t sz = taosArrayGetSize(pConsumer->assignedTopics);
for (int32_t i = 0; i < sz; i++) { // for (int32_t i = 0; i < sz; i++) {
char *name = taosArrayGetP(pConsumer->assignedTopics, i); // char *name = taosArrayGetP(pConsumer->assignedTopics, i);
if (name && strcmp(topic, name) == 0) { // if (name && strcmp(topic, name) == 0) {
MND_TMQ_RETURN_CHECK(mndSetConsumerDropLogs(pTrans, pConsumer)); // MND_TMQ_RETURN_CHECK(mndSetConsumerDropLogs(pTrans, pConsumer));
} // }
} // }
//
sdbRelease(pMnode->pSdb, pConsumer); // sdbRelease(pMnode->pSdb, pConsumer);
} // }
//
END: //END:
sdbRelease(pMnode->pSdb, pConsumer); // sdbRelease(pMnode->pSdb, pConsumer);
sdbCancelFetch(pMnode->pSdb, pIter); // sdbCancelFetch(pMnode->pSdb, pIter);
return code; // return code;
} //}
static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) { static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) {
SMnode *pMnode = pMsg->info.node; SMnode *pMnode = pMsg->info.node;
@ -1079,7 +1084,6 @@ static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) {
mInfo("trans:%d, used to drop cgroup:%s on topic %s", pTrans->id, dropReq.cgroup, dropReq.topic); mInfo("trans:%d, used to drop cgroup:%s on topic %s", pTrans->id, dropReq.cgroup, dropReq.topic);
mndTransSetDbName(pTrans, pSub->dbName, dropReq.cgroup); mndTransSetDbName(pTrans, pSub->dbName, dropReq.cgroup);
MND_TMQ_RETURN_CHECK(mndTransCheckConflict(pMnode, pTrans)); MND_TMQ_RETURN_CHECK(mndTransCheckConflict(pMnode, pTrans));
MND_TMQ_RETURN_CHECK(mndDropConsumerByGroup(pMnode, pTrans, dropReq.cgroup, dropReq.topic));
MND_TMQ_RETURN_CHECK(sendDeleteSubToVnode(pMnode, pSub, pTrans)); MND_TMQ_RETURN_CHECK(sendDeleteSubToVnode(pMnode, pSub, pTrans));
MND_TMQ_RETURN_CHECK(mndSetDropSubCommitLogs(pMnode, pTrans, pSub)); MND_TMQ_RETURN_CHECK(mndSetDropSubCommitLogs(pMnode, pTrans, pSub));
MND_TMQ_RETURN_CHECK(mndTransPrepare(pMnode, pTrans)); MND_TMQ_RETURN_CHECK(mndTransPrepare(pMnode, pTrans));

View File

@ -613,44 +613,44 @@ static bool checkTopic(SArray *topics, char *topicName){
return false; return false;
} }
static int32_t mndDropConsumerByTopic(SMnode *pMnode, STrans *pTrans, char *topicName){ //static int32_t mndDropConsumerByTopic(SMnode *pMnode, STrans *pTrans, char *topicName){
int32_t code = 0; // int32_t code = 0;
SSdb *pSdb = pMnode->pSdb; // SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL; // void *pIter = NULL;
SMqConsumerObj *pConsumer = NULL; // SMqConsumerObj *pConsumer = NULL;
while (1) { // while (1) {
pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer); // pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
if (pIter == NULL) { // if (pIter == NULL) {
break; // break;
} // }
//
bool found = checkTopic(pConsumer->assignedTopics, topicName); // bool found = checkTopic(pConsumer->assignedTopics, topicName);
if (found){ // if (found){
if (pConsumer->status == MQ_CONSUMER_STATUS_LOST) { // if (pConsumer->status == MQ_CONSUMER_STATUS_LOST) {
MND_TMQ_RETURN_CHECK(mndSetConsumerDropLogs(pTrans, pConsumer)); // MND_TMQ_RETURN_CHECK(mndSetConsumerDropLogs(pTrans, pConsumer));
sdbRelease(pSdb, pConsumer); // sdbRelease(pSdb, pConsumer);
continue; // continue;
} // }
mError("topic:%s, failed to drop since subscribed by consumer:0x%" PRIx64 ", in consumer group %s", // mError("topic:%s, failed to drop since subscribed by consumer:0x%" PRIx64 ", in consumer group %s",
topicName, pConsumer->consumerId, pConsumer->cgroup); // topicName, pConsumer->consumerId, pConsumer->cgroup);
code = TSDB_CODE_MND_TOPIC_SUBSCRIBED; // code = TSDB_CODE_MND_TOPIC_SUBSCRIBED;
goto END; // goto END;
} // }
//
if (checkTopic(pConsumer->rebNewTopics, topicName) || checkTopic(pConsumer->rebRemovedTopics, topicName)) { // if (checkTopic(pConsumer->rebNewTopics, topicName) || checkTopic(pConsumer->rebRemovedTopics, topicName)) {
code = TSDB_CODE_MND_TOPIC_SUBSCRIBED; // code = TSDB_CODE_MND_TOPIC_SUBSCRIBED;
mError("topic:%s, failed to drop since subscribed by consumer:%" PRId64 ", in consumer group %s (reb new)", // mError("topic:%s, failed to drop since subscribed by consumer:%" PRId64 ", in consumer group %s (reb new)",
topicName, pConsumer->consumerId, pConsumer->cgroup); // topicName, pConsumer->consumerId, pConsumer->cgroup);
goto END; // goto END;
} // }
sdbRelease(pSdb, pConsumer); // sdbRelease(pSdb, pConsumer);
} // }
//
END: //END:
sdbRelease(pSdb, pConsumer); // sdbRelease(pSdb, pConsumer);
sdbCancelFetch(pSdb, pIter); // sdbCancelFetch(pSdb, pIter);
return code; // return code;
} //}
static int32_t mndDropCheckInfoByTopic(SMnode *pMnode, STrans *pTrans, SMqTopicObj *pTopic){ static int32_t mndDropCheckInfoByTopic(SMnode *pMnode, STrans *pTrans, SMqTopicObj *pTopic){
// broadcast to all vnode // broadcast to all vnode
@ -722,9 +722,10 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
mndTransSetDbName(pTrans, pTopic->db, NULL); mndTransSetDbName(pTrans, pTopic->db, NULL);
MND_TMQ_RETURN_CHECK(mndTransCheckConflict(pMnode, pTrans)); MND_TMQ_RETURN_CHECK(mndTransCheckConflict(pMnode, pTrans));
mInfo("trans:%d, used to drop topic:%s", pTrans->id, pTopic->name); mInfo("trans:%d, used to drop topic:%s", pTrans->id, pTopic->name);
MND_TMQ_RETURN_CHECK(mndCheckTopicPrivilege(pMnode, pReq->info.conn.user, MND_OPER_DROP_TOPIC, pTopic)); MND_TMQ_RETURN_CHECK(mndCheckTopicPrivilege(pMnode, pReq->info.conn.user, MND_OPER_DROP_TOPIC, pTopic));
MND_TMQ_RETURN_CHECK(mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_READ_DB, pTopic->db)); MND_TMQ_RETURN_CHECK(mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_READ_DB, pTopic->db));
MND_TMQ_RETURN_CHECK(mndDropConsumerByTopic(pMnode, pTrans, dropReq.name)); // MND_TMQ_RETURN_CHECK(mndDropConsumerByTopic(pMnode, pTrans, dropReq.name));
MND_TMQ_RETURN_CHECK(mndDropSubByTopic(pMnode, pTrans, dropReq.name)); MND_TMQ_RETURN_CHECK(mndDropSubByTopic(pMnode, pTrans, dropReq.name));
if (pTopic->ntbUid != 0) { if (pTopic->ntbUid != 0) {

View File

@ -877,7 +877,7 @@ int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups) {
pVgroup->dbUid = pDb->uid; pVgroup->dbUid = pDb->uid;
pVgroup->replica = pDb->cfg.replications; pVgroup->replica = pDb->cfg.replications;
if (mndGetAvailableDnode(pMnode, pDb, pVgroup, pArray) != 0) { if ((code = mndGetAvailableDnode(pMnode, pDb, pVgroup, pArray)) != 0) {
goto _OVER; goto _OVER;
} }

View File

@ -957,6 +957,12 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) {
metaReaderDoInit(&mer1, pVnode->pMeta, META_READER_LOCK); metaReaderDoInit(&mer1, pVnode->pMeta, META_READER_LOCK);
code = metaReaderGetTableEntryByUid(&mer1, pOutputInfo->tbSink.stbUid); code = metaReaderGetTableEntryByUid(&mer1, pOutputInfo->tbSink.stbUid);
if (code != TSDB_CODE_SUCCESS) {
tqError("s-task:%s vgId:%d failed to get the dst stable, failed to sink results", id, vgId);
metaReaderClear(&mer1);
return;
}
pOutputInfo->tbSink.pTagSchema = tCloneSSchemaWrapper(&mer1.me.stbEntry.schemaTag); pOutputInfo->tbSink.pTagSchema = tCloneSSchemaWrapper(&mer1.me.stbEntry.schemaTag);
metaReaderClear(&mer1); metaReaderClear(&mer1);

View File

@ -1777,6 +1777,19 @@ int32_t createExprFromOneNode(SExprInfo* pExp, SNode* pNode, int16_t slotId) {
pExp->base.resSchema = pExp->base.resSchema =
createResSchema(pType->type, pType->bytes, slotId, pType->scale, pType->precision, pCaseNode->node.aliasName); createResSchema(pType->type, pType->bytes, slotId, pType->scale, pType->precision, pCaseNode->node.aliasName);
pExp->pExpr->_optrRoot.pRootNode = pNode; pExp->pExpr->_optrRoot.pRootNode = pNode;
} else if (type == QUERY_NODE_LOGIC_CONDITION) {
pExp->pExpr->nodeType = QUERY_NODE_OPERATOR;
SLogicConditionNode* pCond = (SLogicConditionNode*)pNode;
pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam));
if (!pExp->base.pParam) {
code = terrno;
}
if (TSDB_CODE_SUCCESS == code) {
pExp->base.numOfParams = 1;
SDataType* pType = &pCond->node.resType;
pExp->base.resSchema = createResSchema(pType->type, pType->bytes, slotId, pType->scale, pType->precision, pCond->node.aliasName);
pExp->pExpr->_optrRoot.pRootNode = pNode;
}
} else { } else {
ASSERT(0); ASSERT(0);
} }

View File

@ -365,6 +365,7 @@ int32_t doOpenSortOperator(SOperatorInfo* pOperator) {
ps->onlyRef = true; ps->onlyRef = true;
code = tsortAddSource(pInfo->pSortHandle, ps); code = tsortAddSource(pInfo->pSortHandle, ps);
if (code) { if (code) {
taosMemoryFree(ps);
return code; return code;
} }

View File

@ -470,6 +470,7 @@ static int32_t checkResult(SStreamFillSupporter* pFillSup, TSKEY ts, uint64_t gr
SWinKey key = {.groupId = groupId, .ts = ts}; SWinKey key = {.groupId = groupId, .ts = ts};
if (tSimpleHashGet(pFillSup->pResMap, &key, sizeof(SWinKey)) != NULL) { if (tSimpleHashGet(pFillSup->pResMap, &key, sizeof(SWinKey)) != NULL) {
(*pRes) = false; (*pRes) = false;
goto _end;
} }
code = tSimpleHashPut(pFillSup->pResMap, &key, sizeof(SWinKey), NULL, 0); code = tSimpleHashPut(pFillSup->pResMap, &key, sizeof(SWinKey), NULL, 0);
QUERY_CHECK_CODE(code, lino, _end); QUERY_CHECK_CODE(code, lino, _end);

View File

@ -1699,6 +1699,7 @@ static int32_t initRowIdSort(SSortHandle* pHandle) {
taosArrayDestroy(pHandle->pSortInfo); taosArrayDestroy(pHandle->pSortInfo);
pHandle->pSortInfo = pOrderInfoList; pHandle->pSortInfo = pOrderInfoList;
pHandle->cmpParam.pPkOrder = (pHandle->bSortPk) ? taosArrayGet(pHandle->pSortInfo, 1) : NULL;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }

View File

@ -229,6 +229,7 @@ static void udfWatchUdfd(void *args) {
if(uv_loop_close(&pData->loop) != 0) { if(uv_loop_close(&pData->loop) != 0) {
fnError("udfd loop close failed, lino:%d", __LINE__); fnError("udfd loop close failed, lino:%d", __LINE__);
} }
return;
_exit: _exit:
if (terrno != 0) { if (terrno != 0) {

View File

@ -2608,7 +2608,8 @@ static int32_t calcSelectFuncNum(SFunctionNode* pFunc, int32_t currSelectFuncNum
: 1); : 1);
} }
static void setFuncClassification(SNode* pCurrStmt, SFunctionNode* pFunc) { static void setFuncClassification(STranslateContext* pCxt, SFunctionNode* pFunc) {
SNode* pCurrStmt = pCxt->pCurrStmt;
if (NULL != pCurrStmt && QUERY_NODE_SELECT_STMT == nodeType(pCurrStmt)) { if (NULL != pCurrStmt && QUERY_NODE_SELECT_STMT == nodeType(pCurrStmt)) {
SSelectStmt* pSelect = (SSelectStmt*)pCurrStmt; SSelectStmt* pSelect = (SSelectStmt*)pCurrStmt;
pSelect->hasAggFuncs = pSelect->hasAggFuncs ? true : fmIsAggFunc(pFunc->funcId); pSelect->hasAggFuncs = pSelect->hasAggFuncs ? true : fmIsAggFunc(pFunc->funcId);
@ -2641,7 +2642,9 @@ static void setFuncClassification(SNode* pCurrStmt, SFunctionNode* pFunc) {
pSelect->hasLastFunc = pSelect->hasLastFunc ? true : (FUNCTION_TYPE_LAST == pFunc->funcType); pSelect->hasLastFunc = pSelect->hasLastFunc ? true : (FUNCTION_TYPE_LAST == pFunc->funcType);
pSelect->hasTimeLineFunc = pSelect->hasTimeLineFunc ? true : fmIsTimelineFunc(pFunc->funcId); pSelect->hasTimeLineFunc = pSelect->hasTimeLineFunc ? true : fmIsTimelineFunc(pFunc->funcId);
pSelect->hasUdaf = pSelect->hasUdaf ? true : fmIsUserDefinedFunc(pFunc->funcId) && fmIsAggFunc(pFunc->funcId); pSelect->hasUdaf = pSelect->hasUdaf ? true : fmIsUserDefinedFunc(pFunc->funcId) && fmIsAggFunc(pFunc->funcId);
pSelect->onlyHasKeepOrderFunc = pSelect->onlyHasKeepOrderFunc ? fmIsKeepOrderFunc(pFunc->funcId) : false; if (SQL_CLAUSE_SELECT == pCxt->currClause) {
pSelect->onlyHasKeepOrderFunc = pSelect->onlyHasKeepOrderFunc ? fmIsKeepOrderFunc(pFunc->funcId) : false;
}
} }
} }
@ -2903,7 +2906,7 @@ static int32_t translateNormalFunction(STranslateContext* pCxt, SNode** ppNode)
code = translateBlockDistFunc(pCxt, pFunc); code = translateBlockDistFunc(pCxt, pFunc);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
setFuncClassification(pCxt->pCurrStmt, pFunc); setFuncClassification(pCxt, pFunc);
} }
return code; return code;
} }

View File

@ -262,7 +262,7 @@ static int32_t doScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarP
colDataSetNULL(pOutputData, i); colDataSetNULL(pOutputData, i);
continue; continue;
} }
out[i] = f1(in[i]); out[i] = f1(in[i]) + 0;
} }
break; break;
} }
@ -276,7 +276,7 @@ static int32_t doScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarP
colDataSetNULL(pOutputData, i); colDataSetNULL(pOutputData, i);
continue; continue;
} }
out[i] = d1(in[i]); out[i] = d1(in[i]) + 0;
} }
break; break;
} }

View File

@ -57,7 +57,7 @@ class TDTestCase:
( '2020-10-21 01:01:01.000', 1, 11111, 111, 11, 1.11, 11.11, 1, "binary1", "nchar1", now()+1a ) ( '2020-10-21 01:01:01.000', 1, 11111, 111, 11, 1.11, 11.11, 1, "binary1", "nchar1", now()+1a )
( '2020-12-31 01:01:01.000', 2, 22222, 222, 22, 2.22, 22.22, 0, "binary2", "nchar2", now()+2a ) ( '2020-12-31 01:01:01.000', 2, 22222, 222, 22, 2.22, 22.22, 0, "binary2", "nchar2", now()+2a )
( '2021-01-01 01:01:06.000', 3, 33333, 333, 33, 3.33, 33.33, 0, "binary3", "nchar3", now()+3a ) ( '2021-01-01 01:01:06.000', 3, 33333, 333, 33, 3.33, 33.33, 0, "binary3", "nchar3", now()+3a )
( '2021-05-07 01:01:10.000', 4, 44444, 444, 44, 4.44, 44.44, 1, "binary4", "nchar4", now()+4a ) ( '2021-05-07 01:01:10.000', 4, 44444, 444, 44, -0.444, 44.44, 1, "binary4", "nchar4", now()+4a )
( '2021-07-21 01:01:01.000', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ( '2021-07-21 01:01:01.000', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
( '2021-09-30 01:01:16.000', 5, 55555, 555, 55, 5.55, 55.55, 0, "binary5", "nchar5", now()+5a ) ( '2021-09-30 01:01:16.000', 5, 55555, 555, 55, 5.55, 55.55, 0, "binary5", "nchar5", now()+5a )
( '2022-02-01 01:01:20.000', 6, 66666, 666, 66, 6.66, 66.66, 1, "binary6", "nchar6", now()+6a ) ( '2022-02-01 01:01:20.000', 6, 66666, 666, 66, 6.66, 66.66, 1, "binary6", "nchar6", now()+6a )
@ -223,6 +223,9 @@ class TDTestCase:
tdSql.checkData(3, 4, 33) tdSql.checkData(3, 4, 33)
tdSql.checkData(5, 5, None) tdSql.checkData(5, 5, None)
tdSql.query(f"select ceil(c5) from {dbname}.t1")
tdSql.checkData(4 , 0, 0)
self.check_result_auto( f"select c1, c2, c3 , c4, c5 from {dbname}.t1", f"select (c1), ceil(c2) ,ceil(c3), ceil(c4), ceil(c5) from {dbname}.t1") self.check_result_auto( f"select c1, c2, c3 , c4, c5 from {dbname}.t1", f"select (c1), ceil(c2) ,ceil(c3), ceil(c4), ceil(c5) from {dbname}.t1")
# used for sub table # used for sub table

View File

@ -57,6 +57,8 @@ class TDTestCase:
tdSql.query("select * from (select ts, col1 from sta partition by tbname) limit 2"); tdSql.query("select * from (select ts, col1 from sta partition by tbname) limit 2");
tdSql.checkRows(2) tdSql.checkRows(2)
tdSql.query('select col1 > 0 and col2 > 0 from stb')
tdSql.checkRows(12)
def stop(self): def stop(self):
tdSql.close() tdSql.close()
tdLog.success("%s successfully executed" % __file__) tdLog.success("%s successfully executed" % __file__)

View File

@ -53,7 +53,7 @@ class TDTestCase:
( '2020-10-21 01:01:01.000', 1, 11111, 111, 11, 1.11, 11.11, 1, "binary1", "nchar1", now()+1a ) ( '2020-10-21 01:01:01.000', 1, 11111, 111, 11, 1.11, 11.11, 1, "binary1", "nchar1", now()+1a )
( '2020-12-31 01:01:01.000', 2, 22222, 222, 22, 2.22, 22.22, 0, "binary2", "nchar2", now()+2a ) ( '2020-12-31 01:01:01.000', 2, 22222, 222, 22, 2.22, 22.22, 0, "binary2", "nchar2", now()+2a )
( '2021-01-01 01:01:06.000', 3, 33333, 333, 33, 3.33, 33.33, 0, "binary3", "nchar3", now()+3a ) ( '2021-01-01 01:01:06.000', 3, 33333, 333, 33, 3.33, 33.33, 0, "binary3", "nchar3", now()+3a )
( '2021-05-07 01:01:10.000', 4, 44444, 444, 44, 4.44, 44.44, 1, "binary4", "nchar4", now()+4a ) ( '2021-05-07 01:01:10.000', 4, 44444, 444, 44, -0.444, 44.44, 1, "binary4", "nchar4", now()+4a )
( '2021-07-21 01:01:01.000', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ( '2021-07-21 01:01:01.000', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
( '2021-09-30 01:01:16.000', 5, 55555, 555, 55, 5.55, 55.55, 0, "binary5", "nchar5", now()+5a ) ( '2021-09-30 01:01:16.000', 5, 55555, 555, 55, 5.55, 55.55, 0, "binary5", "nchar5", now()+5a )
( '2022-02-01 01:01:20.000', 6, 66666, 666, 66, 6.66, 66.66, 1, "binary6", "nchar6", now()+6a ) ( '2022-02-01 01:01:20.000', 6, 66666, 666, 66, 6.66, 66.66, 1, "binary6", "nchar6", now()+6a )
@ -232,6 +232,9 @@ class TDTestCase:
tdSql.checkData(3, 4, 33) tdSql.checkData(3, 4, 33)
tdSql.checkData(5, 5, None) tdSql.checkData(5, 5, None)
tdSql.query(f"select round(c5) from {dbname}.t1")
tdSql.checkData(4 , 0, 0)
self.check_result_auto( f"select c1, c2, c3 , c4, c5 from {dbname}.t1", f"select (c1), round(c2) ,round(c3), round(c4), round(c5) from {dbname}.t1") self.check_result_auto( f"select c1, c2, c3 , c4, c5 from {dbname}.t1", f"select (c1), round(c2) ,round(c3), round(c4), round(c5) from {dbname}.t1")
# used for sub table # used for sub table

View File

@ -596,6 +596,7 @@ tmq_t* build_consumer() {
tmq_conf_set(conf, "enable.auto.commit", "true"); tmq_conf_set(conf, "enable.auto.commit", "true");
tmq_conf_set(conf, "auto.offset.reset", "earliest"); tmq_conf_set(conf, "auto.offset.reset", "earliest");
tmq_conf_set(conf, "msg.consume.excluded", "1"); tmq_conf_set(conf, "msg.consume.excluded", "1");
// tmq_conf_set(conf, "max.poll.interval.ms", "20000");
if (g_conf.snapShot) { if (g_conf.snapShot) {
tmq_conf_set(conf, "experimental.snapshot.enable", "true"); tmq_conf_set(conf, "experimental.snapshot.enable", "true");