Merge branch '3.0' of https://github.com/taosdata/TDengine into enh/tsdb_optimize
This commit is contained in:
commit
56dfeadd50
|
@ -2036,7 +2036,6 @@ typedef struct {
|
|||
SArray* topicNames; // SArray<char**>
|
||||
|
||||
int8_t withTbName;
|
||||
int8_t useSnapshot;
|
||||
int8_t autoCommit;
|
||||
int32_t autoCommitInterval;
|
||||
int8_t resetOffsetCfg;
|
||||
|
@ -2056,7 +2055,6 @@ static FORCE_INLINE int32_t tSerializeSCMSubscribeReq(void** buf, const SCMSubsc
|
|||
}
|
||||
|
||||
tlen += taosEncodeFixedI8(buf, pReq->withTbName);
|
||||
tlen += taosEncodeFixedI8(buf, pReq->useSnapshot);
|
||||
tlen += taosEncodeFixedI8(buf, pReq->autoCommit);
|
||||
tlen += taosEncodeFixedI32(buf, pReq->autoCommitInterval);
|
||||
tlen += taosEncodeFixedI8(buf, pReq->resetOffsetCfg);
|
||||
|
@ -2080,7 +2078,6 @@ static FORCE_INLINE void* tDeserializeSCMSubscribeReq(void* buf, SCMSubscribeReq
|
|||
}
|
||||
|
||||
buf = taosDecodeFixedI8(buf, &pReq->withTbName);
|
||||
buf = taosDecodeFixedI8(buf, &pReq->useSnapshot);
|
||||
buf = taosDecodeFixedI8(buf, &pReq->autoCommit);
|
||||
buf = taosDecodeFixedI32(buf, &pReq->autoCommitInterval);
|
||||
buf = taosDecodeFixedI8(buf, &pReq->resetOffsetCfg);
|
||||
|
|
|
@ -77,6 +77,7 @@ int32_t processConnectRsp(void* param, SDataBuf* pMsg, int32_t code) {
|
|||
}
|
||||
|
||||
if ((code = taosCheckVersionCompatibleFromStr(version, connectRsp.sVer, 3)) != 0) {
|
||||
tscError("version not compatible. client version: %s, server version: %s", version, connectRsp.sVer);
|
||||
setErrno(pRequest, code);
|
||||
tsem_post(&pRequest->body.rspSem);
|
||||
goto End;
|
||||
|
|
|
@ -358,7 +358,7 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value
|
|||
return TMQ_CONF_OK;
|
||||
}
|
||||
|
||||
if (strcasecmp(key, "enable.heartbeat.background") == 0) {
|
||||
// if (strcasecmp(key, "enable.heartbeat.background") == 0) {
|
||||
// if (strcasecmp(value, "true") == 0) {
|
||||
// conf->hbBgEnable = true;
|
||||
// return TMQ_CONF_OK;
|
||||
|
@ -366,10 +366,10 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value
|
|||
// conf->hbBgEnable = false;
|
||||
// return TMQ_CONF_OK;
|
||||
// } else {
|
||||
tscError("the default value of enable.heartbeat.background is true, can not be seted");
|
||||
return TMQ_CONF_INVALID;
|
||||
// tscError("the default value of enable.heartbeat.background is true, can not be seted");
|
||||
// return TMQ_CONF_INVALID;
|
||||
// }
|
||||
}
|
||||
// }
|
||||
|
||||
if (strcasecmp(key, "td.connect.ip") == 0) {
|
||||
conf->ip = taosStrdup(value);
|
||||
|
@ -423,30 +423,30 @@ char** tmq_list_to_c_array(const tmq_list_t* list) {
|
|||
return container->pData;
|
||||
}
|
||||
|
||||
static SMqClientVg* foundClientVg(SArray* pTopicList, const char* pName, int32_t vgId, int32_t* index,
|
||||
int32_t* numOfVgroups) {
|
||||
int32_t numOfTopics = taosArrayGetSize(pTopicList);
|
||||
*index = -1;
|
||||
*numOfVgroups = 0;
|
||||
|
||||
for (int32_t i = 0; i < numOfTopics; ++i) {
|
||||
SMqClientTopic* pTopic = taosArrayGet(pTopicList, i);
|
||||
if (strcmp(pTopic->topicName, pName) != 0) {
|
||||
continue;
|
||||
}
|
||||
|
||||
*numOfVgroups = taosArrayGetSize(pTopic->vgs);
|
||||
for (int32_t j = 0; j < (*numOfVgroups); ++j) {
|
||||
SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, j);
|
||||
if (pClientVg->vgId == vgId) {
|
||||
*index = j;
|
||||
return pClientVg;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return NULL;
|
||||
}
|
||||
//static SMqClientVg* foundClientVg(SArray* pTopicList, const char* pName, int32_t vgId, int32_t* index,
|
||||
// int32_t* numOfVgroups) {
|
||||
// int32_t numOfTopics = taosArrayGetSize(pTopicList);
|
||||
// *index = -1;
|
||||
// *numOfVgroups = 0;
|
||||
//
|
||||
// for (int32_t i = 0; i < numOfTopics; ++i) {
|
||||
// SMqClientTopic* pTopic = taosArrayGet(pTopicList, i);
|
||||
// if (strcmp(pTopic->topicName, pName) != 0) {
|
||||
// continue;
|
||||
// }
|
||||
//
|
||||
// *numOfVgroups = taosArrayGetSize(pTopic->vgs);
|
||||
// for (int32_t j = 0; j < (*numOfVgroups); ++j) {
|
||||
// SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, j);
|
||||
// if (pClientVg->vgId == vgId) {
|
||||
// *index = j;
|
||||
// return pClientVg;
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// return NULL;
|
||||
//}
|
||||
|
||||
// Two problems do not need to be addressed here
|
||||
// 1. update to of epset. the response of poll request will automatically handle this problem
|
||||
|
@ -573,7 +573,7 @@ static int32_t doSendCommitMsg(tmq_t* tmq, SMqClientVg* pVg, const char* pTopicN
|
|||
|
||||
char commitBuf[TSDB_OFFSET_LEN] = {0};
|
||||
tFormatOffset(commitBuf, tListLen(commitBuf), &pVg->offsetInfo.committedOffset);
|
||||
tscDebug("consumer:0x%" PRIx64 " topic:%s on vgId:%d send offset:%s prev:%s, ep:%s:%d, ordinal:%d/%d, req:0x%" PRIx64,
|
||||
tscInfo("consumer:0x%" PRIx64 " topic:%s on vgId:%d send offset:%s prev:%s, ep:%s:%d, ordinal:%d/%d, req:0x%" PRIx64,
|
||||
tmq->consumerId, pOffset->offset.subKey, pVg->vgId, offsetBuf, commitBuf, pEp->fqdn, pEp->port, index + 1,
|
||||
totalVgroups, pMsgSendInfo->requestId);
|
||||
|
||||
|
@ -811,7 +811,9 @@ void tmqSendHbReq(void* param, void* tmrId) {
|
|||
offRows->vgId = pVg->vgId;
|
||||
offRows->rows = pVg->numOfRows;
|
||||
offRows->offset = pVg->offsetInfo.committedOffset;
|
||||
tscDebug("report offset: %d", offRows->offset.type);
|
||||
char buf[TSDB_OFFSET_LEN] = {0};
|
||||
tFormatOffset(buf, TSDB_OFFSET_LEN, &offRows->offset);
|
||||
tscInfo("report offset: vgId:%d, offset:%s, rows:%"PRId64, offRows->vgId, buf, offRows->rows);
|
||||
}
|
||||
}
|
||||
tmq->needReportOffsetRows = false;
|
||||
|
@ -862,7 +864,7 @@ OVER:
|
|||
|
||||
static void defaultCommitCbFn(tmq_t* pTmq, int32_t code, void* param) {
|
||||
if (code != 0) {
|
||||
tscDebug("consumer:0x%" PRIx64 ", failed to commit offset, code:%s", pTmq->consumerId, tstrerror(code));
|
||||
tscError("consumer:0x%" PRIx64 ", failed to commit offset, code:%s", pTmq->consumerId, tstrerror(code));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1161,7 +1163,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
|
|||
SCMSubscribeReq req = {0};
|
||||
int32_t code = 0;
|
||||
|
||||
tscDebug("consumer:0x%" PRIx64 " cgroup:%s, subscribe %d topics", tmq->consumerId, tmq->groupId, sz);
|
||||
tscInfo("consumer:0x%" PRIx64 " cgroup:%s, subscribe %d topics", tmq->consumerId, tmq->groupId, sz);
|
||||
|
||||
req.consumerId = tmq->consumerId;
|
||||
tstrncpy(req.clientId, tmq->clientId, 256);
|
||||
|
@ -1174,7 +1176,6 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
|
|||
}
|
||||
|
||||
req.withTbName = tmq->withTbName;
|
||||
req.useSnapshot = tmq->useSnapshot;
|
||||
req.autoCommit = tmq->autoCommit;
|
||||
req.autoCommitInterval = tmq->autoCommitInterval;
|
||||
req.resetOffsetCfg = tmq->resetOffsetCfg;
|
||||
|
@ -1190,7 +1191,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
|
|||
}
|
||||
|
||||
tNameExtractFullName(&name, topicFName);
|
||||
tscDebug("consumer:0x%" PRIx64 " subscribe topic:%s", tmq->consumerId, topicFName);
|
||||
tscInfo("consumer:0x%" PRIx64 " subscribe topic:%s", tmq->consumerId, topicFName);
|
||||
|
||||
taosArrayPush(req.topicNames, &topicFName);
|
||||
}
|
||||
|
@ -1251,7 +1252,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
|
|||
goto FAIL;
|
||||
}
|
||||
|
||||
tscDebug("consumer:0x%" PRIx64 ", mnd not ready for subscribe, retry:%d in 500ms", tmq->consumerId, retryCnt);
|
||||
tscInfo("consumer:0x%" PRIx64 ", mnd not ready for subscribe, retry:%d in 500ms", tmq->consumerId, retryCnt);
|
||||
taosMsleep(500);
|
||||
}
|
||||
|
||||
|
@ -1478,7 +1479,7 @@ static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopic
|
|||
tstrncpy(pTopic->topicName, pTopicEp->topic, TSDB_TOPIC_FNAME_LEN);
|
||||
tstrncpy(pTopic->db, pTopicEp->db, TSDB_DB_FNAME_LEN);
|
||||
|
||||
tscDebug("consumer:0x%" PRIx64 ", update topic:%s, new numOfVgs:%d", tmq->consumerId, pTopic->topicName, vgNumGet);
|
||||
tscInfo("consumer:0x%" PRIx64 ", update topic:%s, new numOfVgs:%d", tmq->consumerId, pTopic->topicName, vgNumGet);
|
||||
pTopic->vgs = taosArrayInit(vgNumGet, sizeof(SMqClientVg));
|
||||
|
||||
for (int32_t j = 0; j < vgNumGet; j++) {
|
||||
|
@ -1531,7 +1532,7 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp)
|
|||
int32_t topicNumGet = taosArrayGetSize(pRsp->topics);
|
||||
|
||||
char vgKey[TSDB_TOPIC_FNAME_LEN + 22];
|
||||
tscDebug("consumer:0x%" PRIx64 " update ep epoch from %d to epoch %d, incoming topics:%d, existed topics:%d",
|
||||
tscInfo("consumer:0x%" PRIx64 " update ep epoch from %d to epoch %d, incoming topics:%d, existed topics:%d",
|
||||
tmq->consumerId, tmq->epoch, epoch, topicNumGet, topicNumCur);
|
||||
if (epoch <= tmq->epoch) {
|
||||
return false;
|
||||
|
@ -1554,14 +1555,14 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp)
|
|||
SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, i);
|
||||
if (pTopicCur->vgs) {
|
||||
int32_t vgNumCur = taosArrayGetSize(pTopicCur->vgs);
|
||||
tscDebug("consumer:0x%" PRIx64 ", current vg num: %d", tmq->consumerId, vgNumCur);
|
||||
tscInfo("consumer:0x%" PRIx64 ", current vg num: %d", tmq->consumerId, vgNumCur);
|
||||
for (int32_t j = 0; j < vgNumCur; j++) {
|
||||
SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, j);
|
||||
makeTopicVgroupKey(vgKey, pTopicCur->topicName, pVgCur->vgId);
|
||||
|
||||
char buf[TSDB_OFFSET_LEN];
|
||||
tFormatOffset(buf, TSDB_OFFSET_LEN, &pVgCur->offsetInfo.currentOffset);
|
||||
tscDebug("consumer:0x%" PRIx64 ", epoch:%d vgId:%d vgKey:%s, offset:%s", tmq->consumerId, epoch, pVgCur->vgId,
|
||||
tscInfo("consumer:0x%" PRIx64 ", epoch:%d vgId:%d vgKey:%s, offset:%s", tmq->consumerId, epoch, pVgCur->vgId,
|
||||
vgKey, buf);
|
||||
|
||||
SVgroupSaveInfo info = {.offset = pVgCur->offsetInfo.currentOffset, .numOfRows = pVgCur->numOfRows};
|
||||
|
@ -1591,7 +1592,7 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp)
|
|||
atomic_store_8(&tmq->status, flag);
|
||||
atomic_store_32(&tmq->epoch, epoch);
|
||||
|
||||
tscDebug("consumer:0x%" PRIx64 " update topic info completed", tmq->consumerId);
|
||||
tscInfo("consumer:0x%" PRIx64 " update topic info completed", tmq->consumerId);
|
||||
return set;
|
||||
}
|
||||
|
||||
|
@ -1627,7 +1628,7 @@ int32_t askEpCallbackFn(void* param, SDataBuf* pMsg, int32_t code) {
|
|||
SMqRspHead* head = pMsg->pData;
|
||||
int32_t epoch = atomic_load_32(&tmq->epoch);
|
||||
if (head->epoch <= epoch) {
|
||||
tscDebug("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d, no need to update local ep",
|
||||
tscInfo("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d, no need to update local ep",
|
||||
tmq->consumerId, head->epoch, epoch);
|
||||
|
||||
if (tmq->status == TMQ_CONSUMER_STATUS__RECOVER) {
|
||||
|
@ -1639,7 +1640,7 @@ int32_t askEpCallbackFn(void* param, SDataBuf* pMsg, int32_t code) {
|
|||
}
|
||||
|
||||
} else {
|
||||
tscDebug("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d, update local ep", tmq->consumerId,
|
||||
tscInfo("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d, update local ep", tmq->consumerId,
|
||||
head->epoch, epoch);
|
||||
}
|
||||
|
||||
|
@ -2067,12 +2068,12 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
|
|||
void* rspObj;
|
||||
int64_t startTime = taosGetTimestampMs();
|
||||
|
||||
tscDebug("consumer:0x%" PRIx64 " start to poll at %" PRId64 ", timeout:%" PRId64, tmq->consumerId, startTime,
|
||||
tscInfo("consumer:0x%" PRIx64 " start to poll at %" PRId64 ", timeout:%" PRId64, tmq->consumerId, startTime,
|
||||
timeout);
|
||||
|
||||
// in no topic status, delayed task also need to be processed
|
||||
if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__INIT) {
|
||||
tscDebug("consumer:0x%" PRIx64 " poll return since consumer is init", tmq->consumerId);
|
||||
tscInfo("consumer:0x%" PRIx64 " poll return since consumer is init", tmq->consumerId);
|
||||
taosMsleep(500); // sleep for a while
|
||||
return NULL;
|
||||
}
|
||||
|
@ -2084,7 +2085,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
tscDebug("consumer:0x%" PRIx64 " not ready, retry:%d/40 in 500ms", tmq->consumerId, retryCnt);
|
||||
tscInfo("consumer:0x%" PRIx64 " not ready, retry:%d/40 in 500ms", tmq->consumerId, retryCnt);
|
||||
taosMsleep(500);
|
||||
}
|
||||
}
|
||||
|
@ -2093,7 +2094,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
|
|||
tmqHandleAllDelayedTask(tmq);
|
||||
|
||||
if (tmqPollImpl(tmq, timeout) < 0) {
|
||||
tscDebug("consumer:0x%" PRIx64 " return due to poll error", tmq->consumerId);
|
||||
tscError("consumer:0x%" PRIx64 " return due to poll error", tmq->consumerId);
|
||||
}
|
||||
|
||||
rspObj = tmqHandleAllRsp(tmq, timeout, false);
|
||||
|
@ -2101,7 +2102,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
|
|||
tscDebug("consumer:0x%" PRIx64 " return rsp %p", tmq->consumerId, rspObj);
|
||||
return (TAOS_RES*)rspObj;
|
||||
} else if (terrno == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) {
|
||||
tscDebug("consumer:0x%" PRIx64 " return null since no committed offset", tmq->consumerId);
|
||||
tscInfo("consumer:0x%" PRIx64 " return null since no committed offset", tmq->consumerId);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
@ -2109,7 +2110,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
|
|||
int64_t currentTime = taosGetTimestampMs();
|
||||
int64_t elapsedTime = currentTime - startTime;
|
||||
if (elapsedTime > timeout) {
|
||||
tscDebug("consumer:0x%" PRIx64 " (epoch %d) timeout, no rsp, start time %" PRId64 ", current time %" PRId64,
|
||||
tscInfo("consumer:0x%" PRIx64 " (epoch %d) timeout, no rsp, start time %" PRId64 ", current time %" PRId64,
|
||||
tmq->consumerId, tmq->epoch, startTime, currentTime);
|
||||
return NULL;
|
||||
}
|
||||
|
@ -2142,7 +2143,7 @@ static void displayConsumeStatistics(const tmq_t* pTmq) {
|
|||
}
|
||||
|
||||
int32_t tmq_consumer_close(tmq_t* tmq) {
|
||||
tscDebug("consumer:0x%" PRIx64 " start to close consumer, status:%d", tmq->consumerId, tmq->status);
|
||||
tscInfo("consumer:0x%" PRIx64 " start to close consumer, status:%d", tmq->consumerId, tmq->status);
|
||||
displayConsumeStatistics(tmq);
|
||||
|
||||
if (tmq->status == TMQ_CONSUMER_STATUS__READY) {
|
||||
|
@ -2169,7 +2170,7 @@ int32_t tmq_consumer_close(tmq_t* tmq) {
|
|||
|
||||
tmq_list_destroy(lst);
|
||||
} else {
|
||||
tscWarn("consumer:0x%" PRIx64 " not in ready state, close it directly", tmq->consumerId);
|
||||
tscInfo("consumer:0x%" PRIx64 " not in ready state, close it directly", tmq->consumerId);
|
||||
}
|
||||
|
||||
taosRemoveRef(tmqMgmt.rsetId, tmq->refId);
|
||||
|
@ -2432,7 +2433,7 @@ void asyncAskEp(tmq_t* pTmq, __tmq_askep_fn_t askEpFn, void* param) {
|
|||
sendInfo->msgType = TDMT_MND_TMQ_ASK_EP;
|
||||
|
||||
SEpSet epSet = getEpSet_s(&pTmq->pTscObj->pAppInfo->mgmtEp);
|
||||
tscDebug("consumer:0x%" PRIx64 " ask ep from mnode, reqId:0x%" PRIx64, pTmq->consumerId, sendInfo->requestId);
|
||||
tscInfo("consumer:0x%" PRIx64 " ask ep from mnode, reqId:0x%" PRIx64, pTmq->consumerId, sendInfo->requestId);
|
||||
|
||||
int64_t transporterId = 0;
|
||||
asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
|
||||
|
@ -2656,7 +2657,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
|
|||
char offsetFormatBuf[TSDB_OFFSET_LEN];
|
||||
tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pClientVg->offsetInfo.currentOffset);
|
||||
|
||||
tscDebug("consumer:0x%" PRIx64 " %s retrieve wal info vgId:%d, epoch %d, req:%s, reqId:0x%" PRIx64,
|
||||
tscInfo("consumer:0x%" PRIx64 " %s retrieve wal info vgId:%d, epoch %d, req:%s, reqId:0x%" PRIx64,
|
||||
tmq->consumerId, pTopic->topicName, pClientVg->vgId, tmq->epoch, offsetFormatBuf, req.reqId);
|
||||
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pClientVg->epSet, &transporterId, sendInfo);
|
||||
}
|
||||
|
@ -2693,7 +2694,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
|
|||
char offsetBuf[TSDB_OFFSET_LEN] = {0};
|
||||
tFormatOffset(offsetBuf, tListLen(offsetBuf), &pOffsetInfo->currentOffset);
|
||||
|
||||
tscDebug("vgId:%d offset is update to:%s", p->vgId, offsetBuf);
|
||||
tscInfo("vgId:%d offset is update to:%s", p->vgId, offsetBuf);
|
||||
|
||||
pOffsetInfo->walVerBegin = p->begin;
|
||||
pOffsetInfo->walVerEnd = p->end;
|
||||
|
@ -2772,7 +2773,7 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_
|
|||
SMqRspObj rspObj = {.resType = RES_TYPE__TMQ, .vgId = pVg->vgId};
|
||||
tstrncpy(rspObj.topic, tname, tListLen(rspObj.topic));
|
||||
|
||||
tscDebug("consumer:0x%" PRIx64 " seek to %" PRId64 " on vgId:%d", tmq->consumerId, offset, pVg->vgId);
|
||||
tscInfo("consumer:0x%" PRIx64 " seek to %" PRId64 " on vgId:%d", tmq->consumerId, offset, pVg->vgId);
|
||||
|
||||
SSyncCommitInfo* pInfo = taosMemoryMalloc(sizeof(SSyncCommitInfo));
|
||||
if (pInfo == NULL) {
|
||||
|
|
|
@ -361,11 +361,7 @@ static const SSysDbTableSchema consumerSchema[] = {
|
|||
{.name = "up_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false},
|
||||
{.name = "subscribe_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false},
|
||||
{.name = "rebalance_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false},
|
||||
{.name = "msg.with.table.name", .bytes = 1, .type = TSDB_DATA_TYPE_SMALLINT, .sysInfo = false},
|
||||
{.name = "experimental.snapshot.enable", .bytes = 1, .type = TSDB_DATA_TYPE_SMALLINT, .sysInfo = false},
|
||||
{.name = "enable.auto.commit", .bytes = 1, .type = TSDB_DATA_TYPE_SMALLINT, .sysInfo = false},
|
||||
{.name = "auto.commit.interval.ms", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false},
|
||||
{.name = "auto.offset.reset", .bytes = TSDB_OFFSET_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
|
||||
{.name = "parameters", .bytes = 64 + TSDB_OFFSET_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
|
||||
};
|
||||
|
||||
static const SSysDbTableSchema offsetSchema[] = {
|
||||
|
|
|
@ -265,6 +265,7 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
|||
|
||||
snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vnodeCfg.vgId);
|
||||
|
||||
#if 0
|
||||
if (pMgmt->pTfs) {
|
||||
if (tfsDirExistAt(pMgmt->pTfs, path, (SDiskID){0})) {
|
||||
terrno = TSDB_CODE_VND_DIR_ALREADY_EXIST;
|
||||
|
@ -278,8 +279,9 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
|||
return -1;
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
if (vnodeCreate(path, &vnodeCfg, pMgmt->pTfs) < 0) {
|
||||
if (vnodeCreate(path, &vnodeCfg, pMgmt->pTfs) < 0) {
|
||||
tFreeSCreateVnodeReq(&req);
|
||||
dError("vgId:%d, failed to create vnode since %s", req.vgId, terrstr());
|
||||
code = terrno;
|
||||
|
|
|
@ -553,7 +553,6 @@ typedef struct {
|
|||
int64_t rebalanceTime;
|
||||
|
||||
int8_t withTbName;
|
||||
int8_t useSnapshot;
|
||||
int8_t autoCommit;
|
||||
int32_t autoCommitInterval;
|
||||
int32_t resetOffsetCfg;
|
||||
|
|
|
@ -697,7 +697,6 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
|
|||
tstrncpy(pConsumerNew->clientId, subscribe.clientId, tListLen(pConsumerNew->clientId));
|
||||
|
||||
pConsumerNew->withTbName = subscribe.withTbName;
|
||||
pConsumerNew->useSnapshot = subscribe.useSnapshot;
|
||||
pConsumerNew->autoCommit = subscribe.autoCommit;
|
||||
pConsumerNew->autoCommitInterval = subscribe.autoCommitInterval;
|
||||
pConsumerNew->resetOffsetCfg = subscribe.resetOffsetCfg;
|
||||
|
@ -1186,25 +1185,16 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *
|
|||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->rebalanceTime, pConsumer->rebalanceTime == 0);
|
||||
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->withTbName, false);
|
||||
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->useSnapshot, false);
|
||||
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->autoCommit, false);
|
||||
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->autoCommitInterval, false);
|
||||
|
||||
char buf[TSDB_OFFSET_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
char buf[TSDB_OFFSET_LEN] = {0};
|
||||
STqOffsetVal pVal = {.type = pConsumer->resetOffsetCfg};
|
||||
tFormatOffset(varDataVal(buf), TSDB_OFFSET_LEN, &pVal);
|
||||
varDataSetLen(buf, strlen(varDataVal(buf)));
|
||||
tFormatOffset(buf, TSDB_OFFSET_LEN, &pVal);
|
||||
|
||||
char parasStr[64 + TSDB_OFFSET_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
sprintf(varDataVal(parasStr), "tbname:%d,commit:%d,interval:%d,reset:%s", pConsumer->withTbName, pConsumer->autoCommit, pConsumer->autoCommitInterval, buf);
|
||||
varDataSetLen(parasStr, strlen(varDataVal(parasStr)));
|
||||
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataSetVal(pColInfo, numOfRows, (const char *)buf, false);
|
||||
colDataSetVal(pColInfo, numOfRows, (const char *)parasStr, false);
|
||||
|
||||
numOfRows++;
|
||||
}
|
||||
|
|
|
@ -326,7 +326,6 @@ int32_t tEncodeSMqConsumerObj(void **buf, const SMqConsumerObj *pConsumer) {
|
|||
}
|
||||
|
||||
tlen += taosEncodeFixedI8(buf, pConsumer->withTbName);
|
||||
tlen += taosEncodeFixedI8(buf, pConsumer->useSnapshot);
|
||||
tlen += taosEncodeFixedI8(buf, pConsumer->autoCommit);
|
||||
tlen += taosEncodeFixedI32(buf, pConsumer->autoCommitInterval);
|
||||
tlen += taosEncodeFixedI32(buf, pConsumer->resetOffsetCfg);
|
||||
|
@ -386,7 +385,6 @@ void *tDecodeSMqConsumerObj(const void *buf, SMqConsumerObj *pConsumer, int8_t s
|
|||
|
||||
if(sver > 1){
|
||||
buf = taosDecodeFixedI8(buf, &pConsumer->withTbName);
|
||||
buf = taosDecodeFixedI8(buf, &pConsumer->useSnapshot);
|
||||
buf = taosDecodeFixedI8(buf, &pConsumer->autoCommit);
|
||||
buf = taosDecodeFixedI32(buf, &pConsumer->autoCommitInterval);
|
||||
buf = taosDecodeFixedI32(buf, &pConsumer->resetOffsetCfg);
|
||||
|
|
|
@ -227,6 +227,7 @@ static int32_t mndProcessConnectReq(SRpcMsg *pReq) {
|
|||
}
|
||||
|
||||
if ((code = taosCheckVersionCompatibleFromStr(connReq.sVer, version, 3)) != 0) {
|
||||
mGError("version not compatible. client version: %s, server version: %s", connReq.sVer, version);
|
||||
terrno = code;
|
||||
goto _OVER;
|
||||
}
|
||||
|
|
|
@ -129,6 +129,12 @@ int32_t vnodeAlterReplica(const char *path, SAlterVnodeReplicaReq *pReq, STfs *p
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t vnodeVgroupIdLen(int32_t vgId) {
|
||||
char tmp[TSDB_FILENAME_LEN];
|
||||
sprintf(tmp, "%d", vgId);
|
||||
return strlen(tmp);
|
||||
}
|
||||
|
||||
int32_t vnodeRenameVgroupId(const char *srcPath, const char *dstPath, int32_t srcVgId, int32_t dstVgId, STfs *pTfs) {
|
||||
int32_t ret = tfsRename(pTfs, srcPath, dstPath);
|
||||
if (ret != 0) return ret;
|
||||
|
@ -154,8 +160,7 @@ int32_t vnodeRenameVgroupId(const char *srcPath, const char *dstPath, int32_t sr
|
|||
|
||||
int32_t tsdbFileVgId = atoi(tsdbFilePrefixPos + 6);
|
||||
if (tsdbFileVgId == srcVgId) {
|
||||
char *tsdbFileSurfixPos = strstr(tsdbFilePrefixPos, "f");
|
||||
if (tsdbFileSurfixPos == NULL) continue;
|
||||
char *tsdbFileSurfixPos = tsdbFilePrefixPos + 6 + vnodeVgroupIdLen(srcVgId);
|
||||
|
||||
tsdbFilePrefixPos[6] = 0;
|
||||
snprintf(newRname, TSDB_FILENAME_LEN, "%s%d%s", oldRname, dstVgId, tsdbFileSurfixPos);
|
||||
|
|
|
@ -101,7 +101,11 @@ static int32_t sortComparCleanup(SMsortComparParam* cmpParam) {
|
|||
for (int32_t i = 0; i < cmpParam->numOfSources; ++i) {
|
||||
SSortSource* pSource = cmpParam->pSources[i];
|
||||
blockDataDestroy(pSource->src.pBlock);
|
||||
if (pSource->pageIdList) {
|
||||
taosArrayDestroy(pSource->pageIdList);
|
||||
}
|
||||
taosMemoryFreeClear(pSource);
|
||||
cmpParam->pSources[i] = NULL;
|
||||
}
|
||||
|
||||
cmpParam->numOfSources = 0;
|
||||
|
@ -123,9 +127,11 @@ void tsortClearOrderdSource(SArray* pOrderedSource, int64_t *fetchUs, int64_t *f
|
|||
// release pageIdList
|
||||
if ((*pSource)->pageIdList) {
|
||||
taosArrayDestroy((*pSource)->pageIdList);
|
||||
(*pSource)->pageIdList = NULL;
|
||||
}
|
||||
if ((*pSource)->param && !(*pSource)->onlyRef) {
|
||||
taosMemoryFree((*pSource)->param);
|
||||
(*pSource)->param = NULL;
|
||||
}
|
||||
|
||||
if (!(*pSource)->onlyRef && (*pSource)->src.pBlock) {
|
||||
|
|
|
@ -1956,9 +1956,9 @@ static uint32_t funcNodeHash(const char* pKey, uint32_t len) {
|
|||
}
|
||||
|
||||
static int32_t funcNodeEqual(const void* pLeft, const void* pRight, size_t len) {
|
||||
if (0 != strcmp((*(const SExprNode**)pLeft)->aliasName, (*(const SExprNode**)pRight)->aliasName)) {
|
||||
return 1;
|
||||
}
|
||||
// if (0 != strcmp((*(const SExprNode**)pLeft)->aliasName, (*(const SExprNode**)pRight)->aliasName)) {
|
||||
// return 1;
|
||||
// }
|
||||
return nodesEqualNode(*(const SNode**)pLeft, *(const SNode**)pRight) ? 0 : 1;
|
||||
}
|
||||
|
||||
|
|
|
@ -4979,6 +4979,7 @@ static int32_t buildTableForSampleAst(SSampleAstInfo* pInfo, SNode** pOutput) {
|
|||
}
|
||||
snprintf(pTable->table.dbName, sizeof(pTable->table.dbName), "%s", pInfo->pDbName);
|
||||
snprintf(pTable->table.tableName, sizeof(pTable->table.tableName), "%s", pInfo->pTableName);
|
||||
snprintf(pTable->table.tableAlias, sizeof(pTable->table.tableAlias), "%s", pInfo->pTableName);
|
||||
TSWAP(pTable->pMeta, pInfo->pRollupTableMeta);
|
||||
*pOutput = (SNode*)pTable;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
|
|
@ -340,6 +340,80 @@ if $data05 != 30.000000000 then
|
|||
return -1
|
||||
endi
|
||||
|
||||
print =============== select with _wstart/order by _wstart from stb from file in designated vgroup
|
||||
sql select _wstart, _wend, min(c1),max(c2),max(c1) from stb interval(5m,10s) sliding(5m) order by _wstart;
|
||||
print $data00 $data01 $data02 $data03 $data04
|
||||
if $rows != 1 then
|
||||
print rows $rows != 1
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data02 != -13 then
|
||||
print data02 $data02 != -13
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data03 != 20.00000 then
|
||||
print data03 $data03 != 20.00000
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data04 != 20 then
|
||||
print data04 $data04 != 20
|
||||
return -1
|
||||
endi
|
||||
|
||||
print =============== select without _wstart/with order by _wstart from stb from file in designated vgroup
|
||||
sql select _wend, min(c1),max(c2),max(c1) from stb interval(5m,10s) sliding(5m) order by _wstart;
|
||||
print $data00 $data01 $data02 $data03
|
||||
if $rows != 1 then
|
||||
print rows $rows != 1
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data01 != -13 then
|
||||
print data01 $data01 != -13
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data02 != 20.00000 then
|
||||
print data02 $data02 != 20.00000
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data03 != 20 then
|
||||
print data03 $data03 != 20
|
||||
return -1
|
||||
endi
|
||||
|
||||
print =============== select * from stb from file in common vgroups
|
||||
sql select _wstart, _wend, min(c1),max(c2),max(c1),max(c3) from stb interval(5m,10s) sliding(5m) order by _wstart;
|
||||
print $data00 $data01 $data02 $data03 $data04 $data05
|
||||
if $rows != 1 then
|
||||
print rows $rows != 1
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data02 != -13 then
|
||||
print data02 $data02 != -13
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data03 != 20.00000 then
|
||||
print data03 $data03 != 20.00000
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data04 != 20 then
|
||||
print data04 $data04 != 20
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data05 != 30.000000000 then
|
||||
print data05 $data05 != 30.000000000
|
||||
return -1
|
||||
endi
|
||||
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||
|
||||
|
||||
|
|
|
@ -243,6 +243,10 @@ class TDTestCase:
|
|||
tdSql.checkData(0, 5, 0)
|
||||
break
|
||||
|
||||
tdSql.query("show consumers")
|
||||
tdSql.checkRows(1)
|
||||
tdSql.checkData(0, 8, "tbname:1,commit:1,interval:2000,reset:earliest")
|
||||
|
||||
time.sleep(2)
|
||||
tdLog.info("start insert data")
|
||||
self.create_ctables(tdSql, parameterDict["dbName"], parameterDict["stbName"], parameterDict["ctbNum"])
|
||||
|
|
|
@ -91,9 +91,14 @@ SWords shellCommands[] = {
|
|||
{"create stream <anyword> into <anyword> as select", 0, 0, NULL}, // 26 append sub sql
|
||||
{"create topic <anyword> as select", 0, 0, NULL}, // 27 append sub sql
|
||||
{"create function <anyword> as <anyword> outputtype <data_types> language <udf_language>", 0, 0, NULL},
|
||||
{"create or replace <anyword> as <anyword> outputtype <data_types> language <udf_language>", 0, 0, NULL},
|
||||
{"create aggregate function <anyword> as <anyword> outputtype <data_types> bufsize <anyword> language <udf_language>", 0, 0, NULL},
|
||||
{"create or replace aggregate function <anyword> as <anyword> outputtype <data_types> bufsize <anyword> language <udf_language>", 0, 0, NULL},
|
||||
{"create user <anyword> pass <anyword> sysinfo 0;", 0, 0, NULL},
|
||||
{"create user <anyword> pass <anyword> sysinfo 1;", 0, 0, NULL},
|
||||
#ifdef TD_ENTERPRISE
|
||||
{"compact database <db_name>", 0, 0, NULL},
|
||||
#endif
|
||||
{"describe <all_table>", 0, 0, NULL},
|
||||
{"delete from <all_table> where ", 0, 0, NULL},
|
||||
{"drop database <db_name>", 0, 0, NULL},
|
||||
|
@ -117,7 +122,11 @@ SWords shellCommands[] = {
|
|||
{"kill connection <anyword> ;", 0, 0, NULL},
|
||||
{"kill query ", 0, 0, NULL},
|
||||
{"kill transaction ", 0, 0, NULL},
|
||||
#ifdef TD_ENTERPRISE
|
||||
{"merge vgroup ", 0, 0, NULL},
|
||||
#endif
|
||||
{"pause stream <stream_name> ;", 0, 0, NULL},
|
||||
{"resume stream <stream_name> ;", 0, 0, NULL},
|
||||
{"reset query cache;", 0, 0, NULL},
|
||||
{"restore dnode <dnode_id> ;", 0, 0, NULL},
|
||||
{"restore vnode on dnode <dnode_id> ;", 0, 0, NULL},
|
||||
|
@ -173,7 +182,9 @@ SWords shellCommands[] = {
|
|||
{"show vgroups;", 0, 0, NULL},
|
||||
{"show consumers;", 0, 0, NULL},
|
||||
{"show grants;", 0, 0, NULL},
|
||||
#ifdef TD_ENTERPRISE
|
||||
{"split vgroup ", 0, 0, NULL},
|
||||
#endif
|
||||
{"insert into <tb_name> values(", 0, 0, NULL},
|
||||
{"insert into <tb_name> using <stb_name> tags(", 0, 0, NULL},
|
||||
{"insert into <tb_name> using <stb_name> <anyword> values(", 0, 0, NULL},
|
||||
|
@ -432,9 +443,10 @@ void showHelp() {
|
|||
kill connection <connection_id>; \n\
|
||||
kill query <query_id>; \n\
|
||||
kill transaction <transaction_id>;\n\
|
||||
----- M ----- \n\
|
||||
merge vgroup ...\n\
|
||||
----- P ----- \n\
|
||||
pause stream <stream_name>;\n\
|
||||
----- R ----- \n\
|
||||
resume stream <stream_name>;\n\
|
||||
reset query cache;\n\
|
||||
restore dnode <dnode_id> ;\n\
|
||||
restore vnode on dnode <dnode_id> ;\n\
|
||||
|
@ -489,14 +501,20 @@ void showHelp() {
|
|||
show vgroups;\n\
|
||||
show consumers;\n\
|
||||
show grants;\n\
|
||||
split vgroup ...\n\
|
||||
----- T ----- \n\
|
||||
trim database <db_name>;\n\
|
||||
----- U ----- \n\
|
||||
use <db_name>;");
|
||||
|
||||
printf("\n\n");
|
||||
#ifdef TD_ENTERPRISE
|
||||
printf(
|
||||
"\n\n\
|
||||
----- special commands on enterpise version ----- \n\
|
||||
compact database <db_name>; \n\
|
||||
split vgroup <vgroup_id>;");
|
||||
#endif
|
||||
|
||||
printf("\n\n");
|
||||
// define in getDuration() function
|
||||
printf(
|
||||
"\
|
||||
|
|
Loading…
Reference in New Issue