Merge branch '3.0' into feat/TD-24499
This commit is contained in:
commit
f84eef0ac4
|
@ -2034,6 +2034,12 @@ typedef struct {
|
||||||
char cgroup[TSDB_CGROUP_LEN];
|
char cgroup[TSDB_CGROUP_LEN];
|
||||||
char clientId[256];
|
char clientId[256];
|
||||||
SArray* topicNames; // SArray<char**>
|
SArray* topicNames; // SArray<char**>
|
||||||
|
|
||||||
|
int8_t withTbName;
|
||||||
|
int8_t useSnapshot;
|
||||||
|
int8_t autoCommit;
|
||||||
|
int32_t autoCommitInterval;
|
||||||
|
int8_t resetOffsetCfg;
|
||||||
} SCMSubscribeReq;
|
} SCMSubscribeReq;
|
||||||
|
|
||||||
static FORCE_INLINE int32_t tSerializeSCMSubscribeReq(void** buf, const SCMSubscribeReq* pReq) {
|
static FORCE_INLINE int32_t tSerializeSCMSubscribeReq(void** buf, const SCMSubscribeReq* pReq) {
|
||||||
|
@ -2048,6 +2054,13 @@ static FORCE_INLINE int32_t tSerializeSCMSubscribeReq(void** buf, const SCMSubsc
|
||||||
for (int32_t i = 0; i < topicNum; i++) {
|
for (int32_t i = 0; i < topicNum; i++) {
|
||||||
tlen += taosEncodeString(buf, (char*)taosArrayGetP(pReq->topicNames, i));
|
tlen += taosEncodeString(buf, (char*)taosArrayGetP(pReq->topicNames, i));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
tlen += taosEncodeFixedI8(buf, pReq->withTbName);
|
||||||
|
tlen += taosEncodeFixedI8(buf, pReq->useSnapshot);
|
||||||
|
tlen += taosEncodeFixedI8(buf, pReq->autoCommit);
|
||||||
|
tlen += taosEncodeFixedI32(buf, pReq->autoCommitInterval);
|
||||||
|
tlen += taosEncodeFixedI8(buf, pReq->resetOffsetCfg);
|
||||||
|
|
||||||
return tlen;
|
return tlen;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2065,6 +2078,12 @@ static FORCE_INLINE void* tDeserializeSCMSubscribeReq(void* buf, SCMSubscribeReq
|
||||||
buf = taosDecodeString(buf, &name);
|
buf = taosDecodeString(buf, &name);
|
||||||
taosArrayPush(pReq->topicNames, &name);
|
taosArrayPush(pReq->topicNames, &name);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
buf = taosDecodeFixedI8(buf, &pReq->withTbName);
|
||||||
|
buf = taosDecodeFixedI8(buf, &pReq->useSnapshot);
|
||||||
|
buf = taosDecodeFixedI8(buf, &pReq->autoCommit);
|
||||||
|
buf = taosDecodeFixedI32(buf, &pReq->autoCommitInterval);
|
||||||
|
buf = taosDecodeFixedI8(buf, &pReq->resetOffsetCfg);
|
||||||
return buf;
|
return buf;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2456,15 +2475,6 @@ typedef struct {
|
||||||
char cgroup[TSDB_CGROUP_LEN];
|
char cgroup[TSDB_CGROUP_LEN];
|
||||||
} SMqAskEpReq;
|
} SMqAskEpReq;
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
int64_t consumerId;
|
|
||||||
int32_t epoch;
|
|
||||||
} SMqHbReq;
|
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
int8_t reserved;
|
|
||||||
} SMqHbRsp;
|
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t key;
|
int32_t key;
|
||||||
int32_t valueLen;
|
int32_t valueLen;
|
||||||
|
@ -2893,7 +2903,7 @@ int32_t tDecodeSMqCMCommitOffsetReq(SDecoder* decoder, SMqCMCommitOffsetReq* pRe
|
||||||
// tqOffset
|
// tqOffset
|
||||||
enum {
|
enum {
|
||||||
TMQ_OFFSET__RESET_NONE = -3,
|
TMQ_OFFSET__RESET_NONE = -3,
|
||||||
TMQ_OFFSET__RESET_EARLIEAST = -2,
|
TMQ_OFFSET__RESET_EARLIEST = -2,
|
||||||
TMQ_OFFSET__RESET_LATEST = -1,
|
TMQ_OFFSET__RESET_LATEST = -1,
|
||||||
TMQ_OFFSET__LOG = 1,
|
TMQ_OFFSET__LOG = 1,
|
||||||
TMQ_OFFSET__SNAPSHOT_DATA = 2,
|
TMQ_OFFSET__SNAPSHOT_DATA = 2,
|
||||||
|
@ -3356,6 +3366,28 @@ static FORCE_INLINE void tDeleteSMqAskEpRsp(SMqAskEpRsp* pRsp) {
|
||||||
taosArrayDestroyEx(pRsp->topics, (FDelete)tDeleteMqSubTopicEp);
|
taosArrayDestroyEx(pRsp->topics, (FDelete)tDeleteMqSubTopicEp);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
int32_t vgId;
|
||||||
|
STqOffsetVal offset;
|
||||||
|
int64_t rows;
|
||||||
|
}OffsetRows;
|
||||||
|
|
||||||
|
typedef struct{
|
||||||
|
char topicName[TSDB_TOPIC_FNAME_LEN];
|
||||||
|
SArray* offsetRows;
|
||||||
|
}TopicOffsetRows;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
int64_t consumerId;
|
||||||
|
int32_t epoch;
|
||||||
|
SArray* topics;
|
||||||
|
} SMqHbReq;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
int8_t reserved;
|
||||||
|
} SMqHbRsp;
|
||||||
|
|
||||||
|
|
||||||
#define TD_AUTO_CREATE_TABLE 0x1
|
#define TD_AUTO_CREATE_TABLE 0x1
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int64_t suid;
|
int64_t suid;
|
||||||
|
@ -3480,10 +3512,8 @@ int32_t tSerializeSMqAskEpReq(void* buf, int32_t bufLen, SMqAskEpReq* pReq);
|
||||||
int32_t tDeserializeSMqAskEpReq(void* buf, int32_t bufLen, SMqAskEpReq* pReq);
|
int32_t tDeserializeSMqAskEpReq(void* buf, int32_t bufLen, SMqAskEpReq* pReq);
|
||||||
int32_t tSerializeSMqHbReq(void* buf, int32_t bufLen, SMqHbReq* pReq);
|
int32_t tSerializeSMqHbReq(void* buf, int32_t bufLen, SMqHbReq* pReq);
|
||||||
int32_t tDeserializeSMqHbReq(void* buf, int32_t bufLen, SMqHbReq* pReq);
|
int32_t tDeserializeSMqHbReq(void* buf, int32_t bufLen, SMqHbReq* pReq);
|
||||||
int32_t tSerializeSMqAskEpReq(void* buf, int32_t bufLen, SMqAskEpReq* pReq);
|
int32_t tDeatroySMqHbReq(SMqHbReq* pReq);
|
||||||
int32_t tDeserializeSMqAskEpReq(void* buf, int32_t bufLen, SMqAskEpReq* pReq);
|
|
||||||
int32_t tSerializeSMqHbReq(void* buf, int32_t bufLen, SMqHbReq* pReq);
|
|
||||||
int32_t tDeserializeSMqHbReq(void* buf, int32_t bufLen, SMqHbReq* pReq);
|
|
||||||
|
|
||||||
#define SUBMIT_REQ_AUTO_CREATE_TABLE 0x1
|
#define SUBMIT_REQ_AUTO_CREATE_TABLE 0x1
|
||||||
#define SUBMIT_REQ_COLUMN_DATA_FORMAT 0x2
|
#define SUBMIT_REQ_COLUMN_DATA_FORMAT 0x2
|
||||||
|
|
|
@ -195,6 +195,7 @@ typedef enum ELogicConditionType {
|
||||||
#define TSDB_TABLE_NAME_LEN 193 // it is a null-terminated string
|
#define TSDB_TABLE_NAME_LEN 193 // it is a null-terminated string
|
||||||
#define TSDB_TOPIC_NAME_LEN 193 // it is a null-terminated string
|
#define TSDB_TOPIC_NAME_LEN 193 // it is a null-terminated string
|
||||||
#define TSDB_CGROUP_LEN 193 // it is a null-terminated string
|
#define TSDB_CGROUP_LEN 193 // it is a null-terminated string
|
||||||
|
#define TSDB_OFFSET_LEN 64 // it is a null-terminated string
|
||||||
#define TSDB_USER_CGROUP_LEN (TSDB_USER_LEN + TSDB_CGROUP_LEN) // it is a null-terminated string
|
#define TSDB_USER_CGROUP_LEN (TSDB_USER_LEN + TSDB_CGROUP_LEN) // it is a null-terminated string
|
||||||
#define TSDB_STREAM_NAME_LEN 193 // it is a null-terminated string
|
#define TSDB_STREAM_NAME_LEN 193 // it is a null-terminated string
|
||||||
#define TSDB_DB_NAME_LEN 65
|
#define TSDB_DB_NAME_LEN 65
|
||||||
|
|
|
@ -82,7 +82,7 @@ struct tmq_t {
|
||||||
int8_t useSnapshot;
|
int8_t useSnapshot;
|
||||||
int8_t autoCommit;
|
int8_t autoCommit;
|
||||||
int32_t autoCommitInterval;
|
int32_t autoCommitInterval;
|
||||||
int32_t resetOffsetCfg;
|
int8_t resetOffsetCfg;
|
||||||
uint64_t consumerId;
|
uint64_t consumerId;
|
||||||
bool hbBgEnable;
|
bool hbBgEnable;
|
||||||
tmq_commit_cb* commitCb;
|
tmq_commit_cb* commitCb;
|
||||||
|
@ -99,6 +99,7 @@ struct tmq_t {
|
||||||
// poll info
|
// poll info
|
||||||
int64_t pollCnt;
|
int64_t pollCnt;
|
||||||
int64_t totalRows;
|
int64_t totalRows;
|
||||||
|
bool needReportOffsetRows;
|
||||||
|
|
||||||
// timer
|
// timer
|
||||||
tmr_h hbLiveTimer;
|
tmr_h hbLiveTimer;
|
||||||
|
@ -264,7 +265,7 @@ tmq_conf_t* tmq_conf_new() {
|
||||||
conf->withTbName = false;
|
conf->withTbName = false;
|
||||||
conf->autoCommit = true;
|
conf->autoCommit = true;
|
||||||
conf->autoCommitInterval = DEFAULT_AUTO_COMMIT_INTERVAL;
|
conf->autoCommitInterval = DEFAULT_AUTO_COMMIT_INTERVAL;
|
||||||
conf->resetOffset = TMQ_OFFSET__RESET_EARLIEAST;
|
conf->resetOffset = TMQ_OFFSET__RESET_EARLIEST;
|
||||||
conf->hbBgEnable = true;
|
conf->hbBgEnable = true;
|
||||||
|
|
||||||
return conf;
|
return conf;
|
||||||
|
@ -318,7 +319,7 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value
|
||||||
conf->resetOffset = TMQ_OFFSET__RESET_NONE;
|
conf->resetOffset = TMQ_OFFSET__RESET_NONE;
|
||||||
return TMQ_CONF_OK;
|
return TMQ_CONF_OK;
|
||||||
} else if (strcasecmp(value, "earliest") == 0) {
|
} else if (strcasecmp(value, "earliest") == 0) {
|
||||||
conf->resetOffset = TMQ_OFFSET__RESET_EARLIEAST;
|
conf->resetOffset = TMQ_OFFSET__RESET_EARLIEST;
|
||||||
return TMQ_CONF_OK;
|
return TMQ_CONF_OK;
|
||||||
} else if (strcasecmp(value, "latest") == 0) {
|
} else if (strcasecmp(value, "latest") == 0) {
|
||||||
conf->resetOffset = TMQ_OFFSET__RESET_LATEST;
|
conf->resetOffset = TMQ_OFFSET__RESET_LATEST;
|
||||||
|
@ -567,10 +568,10 @@ static int32_t doSendCommitMsg(tmq_t* tmq, SMqClientVg* pVg, const char* pTopicN
|
||||||
atomic_add_fetch_32(&pParamSet->totalRspNum, 1);
|
atomic_add_fetch_32(&pParamSet->totalRspNum, 1);
|
||||||
|
|
||||||
SEp* pEp = GET_ACTIVE_EP(&pVg->epSet);
|
SEp* pEp = GET_ACTIVE_EP(&pVg->epSet);
|
||||||
char offsetBuf[80] = {0};
|
char offsetBuf[TSDB_OFFSET_LEN] = {0};
|
||||||
tFormatOffset(offsetBuf, tListLen(offsetBuf), &pOffset->offset.val);
|
tFormatOffset(offsetBuf, tListLen(offsetBuf), &pOffset->offset.val);
|
||||||
|
|
||||||
char commitBuf[80] = {0};
|
char commitBuf[TSDB_OFFSET_LEN] = {0};
|
||||||
tFormatOffset(commitBuf, tListLen(commitBuf), &pVg->offsetInfo.committedOffset);
|
tFormatOffset(commitBuf, tListLen(commitBuf), &pVg->offsetInfo.committedOffset);
|
||||||
tscDebug("consumer:0x%" PRIx64 " topic:%s on vgId:%d send offset:%s prev:%s, ep:%s:%d, ordinal:%d/%d, req:0x%" PRIx64,
|
tscDebug("consumer:0x%" PRIx64 " topic:%s on vgId:%d send offset:%s prev:%s, ep:%s:%d, ordinal:%d/%d, req:0x%" PRIx64,
|
||||||
tmq->consumerId, pOffset->offset.subKey, pVg->vgId, offsetBuf, commitBuf, pEp->fqdn, pEp->port, index + 1,
|
tmq->consumerId, pOffset->offset.subKey, pVg->vgId, offsetBuf, commitBuf, pEp->fqdn, pEp->port, index + 1,
|
||||||
|
@ -796,6 +797,25 @@ void tmqSendHbReq(void* param, void* tmrId) {
|
||||||
SMqHbReq req = {0};
|
SMqHbReq req = {0};
|
||||||
req.consumerId = tmq->consumerId;
|
req.consumerId = tmq->consumerId;
|
||||||
req.epoch = tmq->epoch;
|
req.epoch = tmq->epoch;
|
||||||
|
if(tmq->needReportOffsetRows){
|
||||||
|
req.topics = taosArrayInit(taosArrayGetSize(tmq->clientTopics), sizeof(TopicOffsetRows));
|
||||||
|
for(int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++){
|
||||||
|
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
|
||||||
|
int32_t numOfVgroups = taosArrayGetSize(pTopic->vgs);
|
||||||
|
TopicOffsetRows* data = taosArrayReserve(req.topics, 1);
|
||||||
|
strcpy(data->topicName, pTopic->topicName);
|
||||||
|
data->offsetRows = taosArrayInit(numOfVgroups, sizeof(OffsetRows));
|
||||||
|
for(int j = 0; j < numOfVgroups; j++){
|
||||||
|
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
|
||||||
|
OffsetRows* offRows = taosArrayReserve(data->offsetRows, 1);
|
||||||
|
offRows->vgId = pVg->vgId;
|
||||||
|
offRows->rows = pVg->numOfRows;
|
||||||
|
offRows->offset = pVg->offsetInfo.committedOffset;
|
||||||
|
tscDebug("report offset: %d", offRows->offset.type);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
tmq->needReportOffsetRows = false;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t tlen = tSerializeSMqHbReq(NULL, 0, &req);
|
int32_t tlen = tSerializeSMqHbReq(NULL, 0, &req);
|
||||||
if (tlen < 0) {
|
if (tlen < 0) {
|
||||||
|
@ -835,6 +855,7 @@ void tmqSendHbReq(void* param, void* tmrId) {
|
||||||
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
|
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
|
||||||
|
|
||||||
OVER:
|
OVER:
|
||||||
|
tDeatroySMqHbReq(&req);
|
||||||
taosTmrReset(tmqSendHbReq, 1000, param, tmqMgmt.timer, &tmq->hbLiveTimer);
|
taosTmrReset(tmqSendHbReq, 1000, param, tmqMgmt.timer, &tmq->hbLiveTimer);
|
||||||
taosReleaseRef(tmqMgmt.rsetId, refId);
|
taosReleaseRef(tmqMgmt.rsetId, refId);
|
||||||
}
|
}
|
||||||
|
@ -969,6 +990,14 @@ int32_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tmq_unsubscribe(tmq_t* tmq) {
|
int32_t tmq_unsubscribe(tmq_t* tmq) {
|
||||||
|
if (tmq->autoCommit) {
|
||||||
|
int32_t rsp = tmq_commit_sync(tmq, NULL);
|
||||||
|
if (rsp != 0) {
|
||||||
|
return rsp;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
taosSsleep(2); // sleep 2s for hb to send offset and rows to server
|
||||||
|
|
||||||
int32_t rsp;
|
int32_t rsp;
|
||||||
int32_t retryCnt = 0;
|
int32_t retryCnt = 0;
|
||||||
tmq_list_t* lst = tmq_list_new();
|
tmq_list_t* lst = tmq_list_new();
|
||||||
|
@ -1063,6 +1092,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
|
||||||
pTmq->status = TMQ_CONSUMER_STATUS__INIT;
|
pTmq->status = TMQ_CONSUMER_STATUS__INIT;
|
||||||
pTmq->pollCnt = 0;
|
pTmq->pollCnt = 0;
|
||||||
pTmq->epoch = 0;
|
pTmq->epoch = 0;
|
||||||
|
pTmq->needReportOffsetRows = true;
|
||||||
|
|
||||||
// set conf
|
// set conf
|
||||||
strcpy(pTmq->clientId, conf->clientId);
|
strcpy(pTmq->clientId, conf->clientId);
|
||||||
|
@ -1107,7 +1137,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
|
||||||
pTmq->hbLiveTimer = taosTmrStart(tmqSendHbReq, 1000, pRefId, tmqMgmt.timer);
|
pTmq->hbLiveTimer = taosTmrStart(tmqSendHbReq, 1000, pRefId, tmqMgmt.timer);
|
||||||
}
|
}
|
||||||
|
|
||||||
char buf[80] = {0};
|
char buf[TSDB_OFFSET_LEN] = {0};
|
||||||
STqOffsetVal offset = {.type = pTmq->resetOffsetCfg};
|
STqOffsetVal offset = {.type = pTmq->resetOffsetCfg};
|
||||||
tFormatOffset(buf, tListLen(buf), &offset);
|
tFormatOffset(buf, tListLen(buf), &offset);
|
||||||
tscInfo("consumer:0x%" PRIx64 " is setup, refId:%" PRId64
|
tscInfo("consumer:0x%" PRIx64 " is setup, refId:%" PRId64
|
||||||
|
@ -1123,7 +1153,7 @@ _failed:
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
|
int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
|
||||||
const int32_t MAX_RETRY_COUNT = 120 * 60; // let's wait for 2 mins at most
|
const int32_t MAX_RETRY_COUNT = 120 * 2; // let's wait for 2 mins at most
|
||||||
const SArray* container = &topic_list->container;
|
const SArray* container = &topic_list->container;
|
||||||
int32_t sz = taosArrayGetSize(container);
|
int32_t sz = taosArrayGetSize(container);
|
||||||
void* buf = NULL;
|
void* buf = NULL;
|
||||||
|
@ -1143,6 +1173,12 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
|
||||||
goto FAIL;
|
goto FAIL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
req.withTbName = tmq->withTbName;
|
||||||
|
req.useSnapshot = tmq->useSnapshot;
|
||||||
|
req.autoCommit = tmq->autoCommit;
|
||||||
|
req.autoCommitInterval = tmq->autoCommitInterval;
|
||||||
|
req.resetOffsetCfg = tmq->resetOffsetCfg;
|
||||||
|
|
||||||
for (int32_t i = 0; i < sz; i++) {
|
for (int32_t i = 0; i < sz; i++) {
|
||||||
char* topic = taosArrayGetP(container, i);
|
char* topic = taosArrayGetP(container, i);
|
||||||
|
|
||||||
|
@ -1375,8 +1411,8 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
|
||||||
tDecoderClear(&decoder);
|
tDecoderClear(&decoder);
|
||||||
memcpy(&pRspWrapper->dataRsp, pMsg->pData, sizeof(SMqRspHead));
|
memcpy(&pRspWrapper->dataRsp, pMsg->pData, sizeof(SMqRspHead));
|
||||||
|
|
||||||
char buf[80];
|
char buf[TSDB_OFFSET_LEN];
|
||||||
tFormatOffset(buf, 80, &pRspWrapper->dataRsp.rspOffset);
|
tFormatOffset(buf, TSDB_OFFSET_LEN, &pRspWrapper->dataRsp.rspOffset);
|
||||||
tscDebug("consumer:0x%" PRIx64 " recv poll rsp, vgId:%d, req ver:%" PRId64 ", rsp:%s type %d, reqId:0x%" PRIx64,
|
tscDebug("consumer:0x%" PRIx64 " recv poll rsp, vgId:%d, req ver:%" PRId64 ", rsp:%s type %d, reqId:0x%" PRIx64,
|
||||||
tmq->consumerId, vgId, pRspWrapper->dataRsp.reqOffset.version, buf, rspType, requestId);
|
tmq->consumerId, vgId, pRspWrapper->dataRsp.reqOffset.version, buf, rspType, requestId);
|
||||||
} else if (rspType == TMQ_MSG_TYPE__POLL_META_RSP) {
|
} else if (rspType == TMQ_MSG_TYPE__POLL_META_RSP) {
|
||||||
|
@ -1523,8 +1559,8 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp)
|
||||||
SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, j);
|
SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, j);
|
||||||
makeTopicVgroupKey(vgKey, pTopicCur->topicName, pVgCur->vgId);
|
makeTopicVgroupKey(vgKey, pTopicCur->topicName, pVgCur->vgId);
|
||||||
|
|
||||||
char buf[80];
|
char buf[TSDB_OFFSET_LEN];
|
||||||
tFormatOffset(buf, 80, &pVgCur->offsetInfo.currentOffset);
|
tFormatOffset(buf, TSDB_OFFSET_LEN, &pVgCur->offsetInfo.currentOffset);
|
||||||
tscDebug("consumer:0x%" PRIx64 ", epoch:%d vgId:%d vgKey:%s, offset:%s", tmq->consumerId, epoch, pVgCur->vgId,
|
tscDebug("consumer:0x%" PRIx64 ", epoch:%d vgId:%d vgKey:%s, offset:%s", tmq->consumerId, epoch, pVgCur->vgId,
|
||||||
vgKey, buf);
|
vgKey, buf);
|
||||||
|
|
||||||
|
@ -1673,7 +1709,7 @@ SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg,
|
||||||
return pRspObj;
|
return pRspObj;
|
||||||
}
|
}
|
||||||
|
|
||||||
SMqTaosxRspObj* tmqBuildTaosxRspFromWrapper(SMqPollRspWrapper* pWrapper) {
|
SMqTaosxRspObj* tmqBuildTaosxRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, int64_t* numOfRows) {
|
||||||
SMqTaosxRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqTaosxRspObj));
|
SMqTaosxRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqTaosxRspObj));
|
||||||
pRspObj->resType = RES_TYPE__TMQ_METADATA;
|
pRspObj->resType = RES_TYPE__TMQ_METADATA;
|
||||||
tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
|
tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
|
||||||
|
@ -1688,6 +1724,13 @@ SMqTaosxRspObj* tmqBuildTaosxRspFromWrapper(SMqPollRspWrapper* pWrapper) {
|
||||||
setResSchemaInfo(&pRspObj->resInfo, pWrapper->topicHandle->schema.pSchema, pWrapper->topicHandle->schema.nCols);
|
setResSchemaInfo(&pRspObj->resInfo, pWrapper->topicHandle->schema.pSchema, pWrapper->topicHandle->schema.nCols);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// extract the rows in this data packet
|
||||||
|
for (int32_t i = 0; i < pRspObj->rsp.blockNum; ++i) {
|
||||||
|
SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)taosArrayGetP(pRspObj->rsp.blockData, i);
|
||||||
|
int64_t rows = htobe64(pRetrieve->numOfRows);
|
||||||
|
pVg->numOfRows += rows;
|
||||||
|
(*numOfRows) += rows;
|
||||||
|
}
|
||||||
return pRspObj;
|
return pRspObj;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1745,7 +1788,7 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p
|
||||||
sendInfo->msgType = TDMT_VND_TMQ_CONSUME;
|
sendInfo->msgType = TDMT_VND_TMQ_CONSUME;
|
||||||
|
|
||||||
int64_t transporterId = 0;
|
int64_t transporterId = 0;
|
||||||
char offsetFormatBuf[80];
|
char offsetFormatBuf[TSDB_OFFSET_LEN];
|
||||||
tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pVg->offsetInfo.currentOffset);
|
tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pVg->offsetInfo.currentOffset);
|
||||||
|
|
||||||
tscDebug("consumer:0x%" PRIx64 " send poll to %s vgId:%d, epoch %d, req:%s, reqId:0x%" PRIx64, pTmq->consumerId,
|
tscDebug("consumer:0x%" PRIx64 " send poll to %s vgId:%d, epoch %d, req:%s, reqId:0x%" PRIx64, pTmq->consumerId,
|
||||||
|
@ -1882,8 +1925,8 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
|
||||||
pVg->offsetInfo.walVerEnd = pDataRsp->head.walever;
|
pVg->offsetInfo.walVerEnd = pDataRsp->head.walever;
|
||||||
pVg->receivedInfoFromVnode = true;
|
pVg->receivedInfoFromVnode = true;
|
||||||
|
|
||||||
char buf[80];
|
char buf[TSDB_OFFSET_LEN];
|
||||||
tFormatOffset(buf, 80, &pDataRsp->rspOffset);
|
tFormatOffset(buf, TSDB_OFFSET_LEN, &pDataRsp->rspOffset);
|
||||||
if (pDataRsp->blockNum == 0) {
|
if (pDataRsp->blockNum == 0) {
|
||||||
tscDebug("consumer:0x%" PRIx64 " empty block received, vgId:%d, offset:%s, vg total:%" PRId64
|
tscDebug("consumer:0x%" PRIx64 " empty block received, vgId:%d, offset:%s, vg total:%" PRId64
|
||||||
" total:%" PRId64 " reqId:0x%" PRIx64,
|
" total:%" PRId64 " reqId:0x%" PRIx64,
|
||||||
|
@ -1985,13 +2028,13 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
|
||||||
if (pollRspWrapper->taosxRsp.createTableNum == 0) {
|
if (pollRspWrapper->taosxRsp.createTableNum == 0) {
|
||||||
pRsp = tmqBuildRspFromWrapper(pollRspWrapper, pVg, &numOfRows);
|
pRsp = tmqBuildRspFromWrapper(pollRspWrapper, pVg, &numOfRows);
|
||||||
} else {
|
} else {
|
||||||
pRsp = tmqBuildTaosxRspFromWrapper(pollRspWrapper);
|
pRsp = tmqBuildTaosxRspFromWrapper(pollRspWrapper, pVg, &numOfRows);
|
||||||
}
|
}
|
||||||
|
|
||||||
tmq->totalRows += numOfRows;
|
tmq->totalRows += numOfRows;
|
||||||
|
|
||||||
char buf[80];
|
char buf[TSDB_OFFSET_LEN];
|
||||||
tFormatOffset(buf, 80, &pVg->offsetInfo.currentOffset);
|
tFormatOffset(buf, TSDB_OFFSET_LEN, &pVg->offsetInfo.currentOffset);
|
||||||
tscDebug("consumer:0x%" PRIx64 " process taosx poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%" PRId64
|
tscDebug("consumer:0x%" PRIx64 " process taosx poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%" PRId64
|
||||||
", vg total:%" PRId64 " total:%" PRId64 " reqId:0x%" PRIx64,
|
", vg total:%" PRId64 " total:%" PRId64 " reqId:0x%" PRIx64,
|
||||||
tmq->consumerId, pVg->vgId, buf, pollRspWrapper->dataRsp.blockNum, numOfRows, pVg->numOfRows,
|
tmq->consumerId, pVg->vgId, buf, pollRspWrapper->dataRsp.blockNum, numOfRows, pVg->numOfRows,
|
||||||
|
@ -2110,6 +2153,7 @@ int32_t tmq_consumer_close(tmq_t* tmq) {
|
||||||
return rsp;
|
return rsp;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
taosSsleep(2); // sleep 2s for hb to send offset and rows to server
|
||||||
|
|
||||||
int32_t retryCnt = 0;
|
int32_t retryCnt = 0;
|
||||||
tmq_list_t* lst = tmq_list_new();
|
tmq_list_t* lst = tmq_list_new();
|
||||||
|
@ -2411,6 +2455,7 @@ int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet) {
|
||||||
// if no more waiting rsp
|
// if no more waiting rsp
|
||||||
pParamSet->callbackFn(tmq, pParamSet->code, pParamSet->userParam);
|
pParamSet->callbackFn(tmq, pParamSet->code, pParamSet->userParam);
|
||||||
taosMemoryFree(pParamSet);
|
taosMemoryFree(pParamSet);
|
||||||
|
tmq->needReportOffsetRows = true;
|
||||||
|
|
||||||
taosReleaseRef(tmqMgmt.rsetId, refId);
|
taosReleaseRef(tmqMgmt.rsetId, refId);
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -2608,7 +2653,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
|
||||||
sendInfo->msgType = TDMT_VND_TMQ_VG_WALINFO;
|
sendInfo->msgType = TDMT_VND_TMQ_VG_WALINFO;
|
||||||
|
|
||||||
int64_t transporterId = 0;
|
int64_t transporterId = 0;
|
||||||
char offsetFormatBuf[80];
|
char offsetFormatBuf[TSDB_OFFSET_LEN];
|
||||||
tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pClientVg->offsetInfo.currentOffset);
|
tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pClientVg->offsetInfo.currentOffset);
|
||||||
|
|
||||||
tscDebug("consumer:0x%" PRIx64 " %s retrieve wal info vgId:%d, epoch %d, req:%s, reqId:0x%" PRIx64,
|
tscDebug("consumer:0x%" PRIx64 " %s retrieve wal info vgId:%d, epoch %d, req:%s, reqId:0x%" PRIx64,
|
||||||
|
@ -2645,7 +2690,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
|
||||||
|
|
||||||
pOffsetInfo->currentOffset.type = TMQ_OFFSET__LOG;
|
pOffsetInfo->currentOffset.type = TMQ_OFFSET__LOG;
|
||||||
|
|
||||||
char offsetBuf[80] = {0};
|
char offsetBuf[TSDB_OFFSET_LEN] = {0};
|
||||||
tFormatOffset(offsetBuf, tListLen(offsetBuf), &pOffsetInfo->currentOffset);
|
tFormatOffset(offsetBuf, tListLen(offsetBuf), &pOffsetInfo->currentOffset);
|
||||||
|
|
||||||
tscDebug("vgId:%d offset is update to:%s", p->vgId, offsetBuf);
|
tscDebug("vgId:%d offset is update to:%s", p->vgId, offsetBuf);
|
||||||
|
|
|
@ -291,6 +291,8 @@ static const SSysDbTableSchema subscriptionSchema[] = {
|
||||||
{.name = "consumer_group", .bytes = TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
|
{.name = "consumer_group", .bytes = TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
|
||||||
{.name = "vgroup_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false},
|
{.name = "vgroup_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false},
|
||||||
{.name = "consumer_id", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false},
|
{.name = "consumer_id", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false},
|
||||||
|
{.name = "offset", .bytes = TSDB_OFFSET_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
|
||||||
|
{.name = "rows", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false},
|
||||||
};
|
};
|
||||||
|
|
||||||
static const SSysDbTableSchema vnodesSchema[] = {
|
static const SSysDbTableSchema vnodesSchema[] = {
|
||||||
|
@ -359,6 +361,11 @@ static const SSysDbTableSchema consumerSchema[] = {
|
||||||
{.name = "up_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false},
|
{.name = "up_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false},
|
||||||
{.name = "subscribe_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false},
|
{.name = "subscribe_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false},
|
||||||
{.name = "rebalance_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false},
|
{.name = "rebalance_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false},
|
||||||
|
{.name = "msg.with.table.name", .bytes = 1, .type = TSDB_DATA_TYPE_SMALLINT, .sysInfo = false},
|
||||||
|
{.name = "experimental.snapshot.enable", .bytes = 1, .type = TSDB_DATA_TYPE_SMALLINT, .sysInfo = false},
|
||||||
|
{.name = "enable.auto.commit", .bytes = 1, .type = TSDB_DATA_TYPE_SMALLINT, .sysInfo = false},
|
||||||
|
{.name = "auto.commit.interval.ms", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false},
|
||||||
|
{.name = "auto.offset.reset", .bytes = TSDB_OFFSET_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
|
||||||
};
|
};
|
||||||
|
|
||||||
static const SSysDbTableSchema offsetSchema[] = {
|
static const SSysDbTableSchema offsetSchema[] = {
|
||||||
|
|
|
@ -5340,6 +5340,15 @@ int32_t tDeserializeSMqAskEpReq(void *buf, int32_t bufLen, SMqAskEpReq *pReq) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t tDeatroySMqHbReq(SMqHbReq* pReq){
|
||||||
|
for(int i = 0; i < taosArrayGetSize(pReq->topics); i++){
|
||||||
|
TopicOffsetRows* vgs = taosArrayGet(pReq->topics, i);
|
||||||
|
if(vgs) taosArrayDestroy(vgs->offsetRows);
|
||||||
|
}
|
||||||
|
taosArrayDestroy(pReq->topics);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t tSerializeSMqHbReq(void *buf, int32_t bufLen, SMqHbReq *pReq) {
|
int32_t tSerializeSMqHbReq(void *buf, int32_t bufLen, SMqHbReq *pReq) {
|
||||||
SEncoder encoder = {0};
|
SEncoder encoder = {0};
|
||||||
tEncoderInit(&encoder, buf, bufLen);
|
tEncoderInit(&encoder, buf, bufLen);
|
||||||
|
@ -5348,6 +5357,21 @@ int32_t tSerializeSMqHbReq(void *buf, int32_t bufLen, SMqHbReq *pReq) {
|
||||||
if (tEncodeI64(&encoder, pReq->consumerId) < 0) return -1;
|
if (tEncodeI64(&encoder, pReq->consumerId) < 0) return -1;
|
||||||
if (tEncodeI32(&encoder, pReq->epoch) < 0) return -1;
|
if (tEncodeI32(&encoder, pReq->epoch) < 0) return -1;
|
||||||
|
|
||||||
|
int32_t sz = taosArrayGetSize(pReq->topics);
|
||||||
|
if (tEncodeI32(&encoder, sz) < 0) return -1;
|
||||||
|
for (int32_t i = 0; i < sz; ++i) {
|
||||||
|
TopicOffsetRows* vgs = (TopicOffsetRows*)taosArrayGet(pReq->topics, i);
|
||||||
|
if (tEncodeCStr(&encoder, vgs->topicName) < 0) return -1;
|
||||||
|
int32_t szVgs = taosArrayGetSize(vgs->offsetRows);
|
||||||
|
if (tEncodeI32(&encoder, szVgs) < 0) return -1;
|
||||||
|
for (int32_t j = 0; j < szVgs; ++j) {
|
||||||
|
OffsetRows *offRows = taosArrayGet(vgs->offsetRows, j);
|
||||||
|
if (tEncodeI32(&encoder, offRows->vgId) < 0) return -1;
|
||||||
|
if (tEncodeI64(&encoder, offRows->rows) < 0) return -1;
|
||||||
|
if (tEncodeSTqOffsetVal(&encoder, &offRows->offset) < 0) return -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
tEndEncode(&encoder);
|
tEndEncode(&encoder);
|
||||||
|
|
||||||
int32_t tlen = encoder.pos;
|
int32_t tlen = encoder.pos;
|
||||||
|
@ -5364,7 +5388,28 @@ int32_t tDeserializeSMqHbReq(void *buf, int32_t bufLen, SMqHbReq *pReq) {
|
||||||
|
|
||||||
if (tDecodeI64(&decoder, &pReq->consumerId) < 0) return -1;
|
if (tDecodeI64(&decoder, &pReq->consumerId) < 0) return -1;
|
||||||
if (tDecodeI32(&decoder, &pReq->epoch) < 0) return -1;
|
if (tDecodeI32(&decoder, &pReq->epoch) < 0) return -1;
|
||||||
|
int32_t sz = 0;
|
||||||
|
if (tDecodeI32(&decoder, &sz) < 0) return -1;
|
||||||
|
if(sz > 0){
|
||||||
|
pReq->topics = taosArrayInit(sz, sizeof(TopicOffsetRows));
|
||||||
|
if (NULL == pReq->topics) return -1;
|
||||||
|
for (int32_t i = 0; i < sz; ++i) {
|
||||||
|
TopicOffsetRows* data = taosArrayReserve(pReq->topics, 1);
|
||||||
|
tDecodeCStrTo(&decoder, data->topicName);
|
||||||
|
int32_t szVgs = 0;
|
||||||
|
if (tDecodeI32(&decoder, &szVgs) < 0) return -1;
|
||||||
|
if(szVgs > 0){
|
||||||
|
data->offsetRows = taosArrayInit(szVgs, sizeof(OffsetRows));
|
||||||
|
if (NULL == data->offsetRows) return -1;
|
||||||
|
for (int32_t j= 0; j < szVgs; ++j) {
|
||||||
|
OffsetRows* offRows = taosArrayReserve(data->offsetRows, 1);
|
||||||
|
if (tDecodeI32(&decoder, &offRows->vgId) < 0) return -1;
|
||||||
|
if (tDecodeI64(&decoder, &offRows->rows) < 0) return -1;
|
||||||
|
if (tDecodeSTqOffsetVal(&decoder, &offRows->offset) < 0) return -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
tEndDecode(&decoder);
|
tEndDecode(&decoder);
|
||||||
|
|
||||||
tDecoderClear(&decoder);
|
tDecoderClear(&decoder);
|
||||||
|
@ -7093,15 +7138,15 @@ int32_t tDecodeSTqOffsetVal(SDecoder *pDecoder, STqOffsetVal *pOffsetVal) {
|
||||||
|
|
||||||
int32_t tFormatOffset(char *buf, int32_t maxLen, const STqOffsetVal *pVal) {
|
int32_t tFormatOffset(char *buf, int32_t maxLen, const STqOffsetVal *pVal) {
|
||||||
if (pVal->type == TMQ_OFFSET__RESET_NONE) {
|
if (pVal->type == TMQ_OFFSET__RESET_NONE) {
|
||||||
snprintf(buf, maxLen, "offset(reset to none)");
|
snprintf(buf, maxLen, "none");
|
||||||
} else if (pVal->type == TMQ_OFFSET__RESET_EARLIEAST) {
|
} else if (pVal->type == TMQ_OFFSET__RESET_EARLIEST) {
|
||||||
snprintf(buf, maxLen, "offset(reset to earlieast)");
|
snprintf(buf, maxLen, "earliest");
|
||||||
} else if (pVal->type == TMQ_OFFSET__RESET_LATEST) {
|
} else if (pVal->type == TMQ_OFFSET__RESET_LATEST) {
|
||||||
snprintf(buf, maxLen, "offset(reset to latest)");
|
snprintf(buf, maxLen, "latest");
|
||||||
} else if (pVal->type == TMQ_OFFSET__LOG) {
|
} else if (pVal->type == TMQ_OFFSET__LOG) {
|
||||||
snprintf(buf, maxLen, "offset(log) ver:%" PRId64, pVal->version);
|
snprintf(buf, maxLen, "log:%" PRId64, pVal->version);
|
||||||
} else if (pVal->type == TMQ_OFFSET__SNAPSHOT_DATA || pVal->type == TMQ_OFFSET__SNAPSHOT_META) {
|
} else if (pVal->type == TMQ_OFFSET__SNAPSHOT_DATA || pVal->type == TMQ_OFFSET__SNAPSHOT_META) {
|
||||||
snprintf(buf, maxLen, "offset(snapshot) uid:%" PRId64 " ts:%" PRId64, pVal->uid, pVal->ts);
|
snprintf(buf, maxLen, "snapshot:%" PRId64 "|%" PRId64, pVal->uid, pVal->ts);
|
||||||
} else {
|
} else {
|
||||||
return TSDB_CODE_INVALID_PARA;
|
return TSDB_CODE_INVALID_PARA;
|
||||||
}
|
}
|
||||||
|
@ -7119,7 +7164,7 @@ bool tOffsetEqual(const STqOffsetVal *pLeft, const STqOffsetVal *pRight) {
|
||||||
return pLeft->uid == pRight->uid;
|
return pLeft->uid == pRight->uid;
|
||||||
} else {
|
} else {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
/*ASSERT(pLeft->type == TMQ_OFFSET__RESET_NONE || pLeft->type == TMQ_OFFSET__RESET_EARLIEAST ||*/
|
/*ASSERT(pLeft->type == TMQ_OFFSET__RESET_NONE || pLeft->type == TMQ_OFFSET__RESET_EARLIEST ||*/
|
||||||
/*pLeft->type == TMQ_OFFSET__RESET_LATEST);*/
|
/*pLeft->type == TMQ_OFFSET__RESET_LATEST);*/
|
||||||
/*return true;*/
|
/*return true;*/
|
||||||
}
|
}
|
||||||
|
|
|
@ -163,7 +163,7 @@ SArray *mmGetMsgHandles() {
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_MND_TMQ_DROP_TOPIC, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_MND_TMQ_DROP_TOPIC, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_MND_TMQ_SUBSCRIBE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_MND_TMQ_SUBSCRIBE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_MND_TMQ_ASK_EP, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_MND_TMQ_ASK_EP, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_MND_TMQ_HB, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_MND_TMQ_HB, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_MND_TMQ_DROP_CGROUP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_MND_TMQ_DROP_CGROUP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_MND_TMQ_DROP_CGROUP_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_MND_TMQ_DROP_CGROUP_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_MND_KILL_TRANS, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_MND_KILL_TRANS, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
|
|
|
@ -551,12 +551,18 @@ typedef struct {
|
||||||
int64_t upTime;
|
int64_t upTime;
|
||||||
int64_t subscribeTime;
|
int64_t subscribeTime;
|
||||||
int64_t rebalanceTime;
|
int64_t rebalanceTime;
|
||||||
|
|
||||||
|
int8_t withTbName;
|
||||||
|
int8_t useSnapshot;
|
||||||
|
int8_t autoCommit;
|
||||||
|
int32_t autoCommitInterval;
|
||||||
|
int32_t resetOffsetCfg;
|
||||||
} SMqConsumerObj;
|
} SMqConsumerObj;
|
||||||
|
|
||||||
SMqConsumerObj* tNewSMqConsumerObj(int64_t consumerId, char cgroup[TSDB_CGROUP_LEN]);
|
SMqConsumerObj* tNewSMqConsumerObj(int64_t consumerId, char cgroup[TSDB_CGROUP_LEN]);
|
||||||
void tDeleteSMqConsumerObj(SMqConsumerObj* pConsumer);
|
void tDeleteSMqConsumerObj(SMqConsumerObj* pConsumer);
|
||||||
int32_t tEncodeSMqConsumerObj(void** buf, const SMqConsumerObj* pConsumer);
|
int32_t tEncodeSMqConsumerObj(void** buf, const SMqConsumerObj* pConsumer);
|
||||||
void* tDecodeSMqConsumerObj(const void* buf, SMqConsumerObj* pConsumer);
|
void* tDecodeSMqConsumerObj(const void* buf, SMqConsumerObj* pConsumer, int8_t sver);
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t vgId;
|
int32_t vgId;
|
||||||
|
@ -572,12 +578,13 @@ void* tDecodeSMqVgEp(const void* buf, SMqVgEp* pVgEp);
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int64_t consumerId; // -1 for unassigned
|
int64_t consumerId; // -1 for unassigned
|
||||||
SArray* vgs; // SArray<SMqVgEp*>
|
SArray* vgs; // SArray<SMqVgEp*>
|
||||||
|
SArray* offsetRows; // SArray<OffsetRows*>
|
||||||
} SMqConsumerEp;
|
} SMqConsumerEp;
|
||||||
|
|
||||||
SMqConsumerEp* tCloneSMqConsumerEp(const SMqConsumerEp* pEp);
|
//SMqConsumerEp* tCloneSMqConsumerEp(const SMqConsumerEp* pEp);
|
||||||
void tDeleteSMqConsumerEp(void* pEp);
|
//void tDeleteSMqConsumerEp(void* pEp);
|
||||||
int32_t tEncodeSMqConsumerEp(void** buf, const SMqConsumerEp* pEp);
|
int32_t tEncodeSMqConsumerEp(void** buf, const SMqConsumerEp* pEp);
|
||||||
void* tDecodeSMqConsumerEp(const void* buf, SMqConsumerEp* pEp);
|
void* tDecodeSMqConsumerEp(const void* buf, SMqConsumerEp* pEp, int8_t sver);
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
char key[TSDB_SUBSCRIBE_KEY_LEN];
|
char key[TSDB_SUBSCRIBE_KEY_LEN];
|
||||||
|
@ -589,6 +596,7 @@ typedef struct {
|
||||||
int64_t stbUid;
|
int64_t stbUid;
|
||||||
SHashObj* consumerHash; // consumerId -> SMqConsumerEp
|
SHashObj* consumerHash; // consumerId -> SMqConsumerEp
|
||||||
SArray* unassignedVgs; // SArray<SMqVgEp*>
|
SArray* unassignedVgs; // SArray<SMqVgEp*>
|
||||||
|
SArray* offsetRows;
|
||||||
char dbName[TSDB_DB_FNAME_LEN];
|
char dbName[TSDB_DB_FNAME_LEN];
|
||||||
} SMqSubscribeObj;
|
} SMqSubscribeObj;
|
||||||
|
|
||||||
|
@ -596,7 +604,7 @@ SMqSubscribeObj* tNewSubscribeObj(const char key[TSDB_SUBSCRIBE_KEY_LEN]);
|
||||||
SMqSubscribeObj* tCloneSubscribeObj(const SMqSubscribeObj* pSub);
|
SMqSubscribeObj* tCloneSubscribeObj(const SMqSubscribeObj* pSub);
|
||||||
void tDeleteSubscribeObj(SMqSubscribeObj* pSub);
|
void tDeleteSubscribeObj(SMqSubscribeObj* pSub);
|
||||||
int32_t tEncodeSubscribeObj(void** buf, const SMqSubscribeObj* pSub);
|
int32_t tEncodeSubscribeObj(void** buf, const SMqSubscribeObj* pSub);
|
||||||
void* tDecodeSubscribeObj(const void* buf, SMqSubscribeObj* pSub);
|
void* tDecodeSubscribeObj(const void* buf, SMqSubscribeObj* pSub, int8_t sver);
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t epoch;
|
int32_t epoch;
|
||||||
|
|
|
@ -23,7 +23,7 @@
|
||||||
#include "tcompare.h"
|
#include "tcompare.h"
|
||||||
#include "tname.h"
|
#include "tname.h"
|
||||||
|
|
||||||
#define MND_CONSUMER_VER_NUMBER 1
|
#define MND_CONSUMER_VER_NUMBER 2
|
||||||
#define MND_CONSUMER_RESERVE_SIZE 64
|
#define MND_CONSUMER_RESERVE_SIZE 64
|
||||||
|
|
||||||
#define MND_CONSUMER_LOST_HB_CNT 6
|
#define MND_CONSUMER_LOST_HB_CNT 6
|
||||||
|
@ -391,12 +391,13 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
|
static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
|
||||||
|
int32_t code = 0;
|
||||||
SMnode *pMnode = pMsg->info.node;
|
SMnode *pMnode = pMsg->info.node;
|
||||||
SMqHbReq req = {0};
|
SMqHbReq req = {0};
|
||||||
|
|
||||||
if (tDeserializeSMqHbReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
|
if ((code = tDeserializeSMqHbReq(pMsg->pCont, pMsg->contLen, &req)) < 0) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return -1;
|
goto end;
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t consumerId = req.consumerId;
|
int64_t consumerId = req.consumerId;
|
||||||
|
@ -404,7 +405,8 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
|
||||||
if (pConsumer == NULL) {
|
if (pConsumer == NULL) {
|
||||||
mError("consumer:0x%" PRIx64 " not exist", consumerId);
|
mError("consumer:0x%" PRIx64 " not exist", consumerId);
|
||||||
terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
|
terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
|
||||||
return -1;
|
code = -1;
|
||||||
|
goto end;
|
||||||
}
|
}
|
||||||
|
|
||||||
atomic_store_32(&pConsumer->hbStatus, 0);
|
atomic_store_32(&pConsumer->hbStatus, 0);
|
||||||
|
@ -424,9 +426,28 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
|
||||||
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &pRpcMsg);
|
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &pRpcMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for(int i = 0; i < taosArrayGetSize(req.topics); i++){
|
||||||
|
TopicOffsetRows* data = taosArrayGet(req.topics, i);
|
||||||
|
mDebug("heartbeat report offset rows.%s:%s", pConsumer->cgroup, data->topicName);
|
||||||
|
|
||||||
|
SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, data->topicName);
|
||||||
|
taosWLockLatch(&pSub->lock);
|
||||||
|
SMqConsumerEp *pConsumerEp = taosHashGet(pSub->consumerHash, &consumerId, sizeof(int64_t));
|
||||||
|
if(pConsumerEp){
|
||||||
|
taosArrayDestroy(pConsumerEp->offsetRows);
|
||||||
|
pConsumerEp->offsetRows = data->offsetRows;
|
||||||
|
data->offsetRows = NULL;
|
||||||
|
}
|
||||||
|
taosWUnLockLatch(&pSub->lock);
|
||||||
|
|
||||||
|
mndReleaseSubscribe(pMnode, pSub);
|
||||||
|
}
|
||||||
|
|
||||||
mndReleaseConsumer(pMnode, pConsumer);
|
mndReleaseConsumer(pMnode, pConsumer);
|
||||||
|
|
||||||
return 0;
|
end:
|
||||||
|
tDeatroySMqHbReq(&req);
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
|
static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
|
||||||
|
@ -675,6 +696,12 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
|
||||||
pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup);
|
pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup);
|
||||||
tstrncpy(pConsumerNew->clientId, subscribe.clientId, tListLen(pConsumerNew->clientId));
|
tstrncpy(pConsumerNew->clientId, subscribe.clientId, tListLen(pConsumerNew->clientId));
|
||||||
|
|
||||||
|
pConsumerNew->withTbName = subscribe.withTbName;
|
||||||
|
pConsumerNew->useSnapshot = subscribe.useSnapshot;
|
||||||
|
pConsumerNew->autoCommit = subscribe.autoCommit;
|
||||||
|
pConsumerNew->autoCommitInterval = subscribe.autoCommitInterval;
|
||||||
|
pConsumerNew->resetOffsetCfg = subscribe.resetOffsetCfg;
|
||||||
|
|
||||||
// set the update type
|
// set the update type
|
||||||
pConsumerNew->updateType = CONSUMER_UPDATE__REBALANCE;
|
pConsumerNew->updateType = CONSUMER_UPDATE__REBALANCE;
|
||||||
taosArrayDestroy(pConsumerNew->assignedTopics);
|
taosArrayDestroy(pConsumerNew->assignedTopics);
|
||||||
|
@ -822,7 +849,7 @@ SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) {
|
||||||
goto CM_DECODE_OVER;
|
goto CM_DECODE_OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (sver != MND_CONSUMER_VER_NUMBER) {
|
if (sver < 1 || sver > MND_CONSUMER_VER_NUMBER) {
|
||||||
terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
|
terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
|
||||||
goto CM_DECODE_OVER;
|
goto CM_DECODE_OVER;
|
||||||
}
|
}
|
||||||
|
@ -849,7 +876,7 @@ SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) {
|
||||||
SDB_GET_BINARY(pRaw, dataPos, buf, len, CM_DECODE_OVER);
|
SDB_GET_BINARY(pRaw, dataPos, buf, len, CM_DECODE_OVER);
|
||||||
SDB_GET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE, CM_DECODE_OVER);
|
SDB_GET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE, CM_DECODE_OVER);
|
||||||
|
|
||||||
if (tDecodeSMqConsumerObj(buf, pConsumer) == NULL) {
|
if (tDecodeSMqConsumerObj(buf, pConsumer, sver) == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY; // TODO set correct error code
|
terrno = TSDB_CODE_OUT_OF_MEMORY; // TODO set correct error code
|
||||||
goto CM_DECODE_OVER;
|
goto CM_DECODE_OVER;
|
||||||
}
|
}
|
||||||
|
@ -1159,6 +1186,26 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->rebalanceTime, pConsumer->rebalanceTime == 0);
|
colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->rebalanceTime, pConsumer->rebalanceTime == 0);
|
||||||
|
|
||||||
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
|
colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->withTbName, false);
|
||||||
|
|
||||||
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
|
colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->useSnapshot, false);
|
||||||
|
|
||||||
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
|
colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->autoCommit, false);
|
||||||
|
|
||||||
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
|
colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->autoCommitInterval, false);
|
||||||
|
|
||||||
|
char buf[TSDB_OFFSET_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||||
|
STqOffsetVal pVal = {.type = pConsumer->resetOffsetCfg};
|
||||||
|
tFormatOffset(varDataVal(buf), TSDB_OFFSET_LEN, &pVal);
|
||||||
|
varDataSetLen(buf, strlen(varDataVal(buf)));
|
||||||
|
|
||||||
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
|
colDataSetVal(pColInfo, numOfRows, (const char *)buf, false);
|
||||||
|
|
||||||
numOfRows++;
|
numOfRows++;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -321,10 +321,15 @@ int32_t tEncodeSMqConsumerObj(void **buf, const SMqConsumerObj *pConsumer) {
|
||||||
tlen += taosEncodeFixedI32(buf, 0);
|
tlen += taosEncodeFixedI32(buf, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
tlen += taosEncodeFixedI8(buf, pConsumer->withTbName);
|
||||||
|
tlen += taosEncodeFixedI8(buf, pConsumer->useSnapshot);
|
||||||
|
tlen += taosEncodeFixedI8(buf, pConsumer->autoCommit);
|
||||||
|
tlen += taosEncodeFixedI32(buf, pConsumer->autoCommitInterval);
|
||||||
|
tlen += taosEncodeFixedI32(buf, pConsumer->resetOffsetCfg);
|
||||||
return tlen;
|
return tlen;
|
||||||
}
|
}
|
||||||
|
|
||||||
void *tDecodeSMqConsumerObj(const void *buf, SMqConsumerObj *pConsumer) {
|
void *tDecodeSMqConsumerObj(const void *buf, SMqConsumerObj *pConsumer, int8_t sver) {
|
||||||
int32_t sz;
|
int32_t sz;
|
||||||
buf = taosDecodeFixedI64(buf, &pConsumer->consumerId);
|
buf = taosDecodeFixedI64(buf, &pConsumer->consumerId);
|
||||||
buf = taosDecodeStringTo(buf, pConsumer->clientId);
|
buf = taosDecodeStringTo(buf, pConsumer->clientId);
|
||||||
|
@ -375,50 +380,96 @@ void *tDecodeSMqConsumerObj(const void *buf, SMqConsumerObj *pConsumer) {
|
||||||
taosArrayPush(pConsumer->assignedTopics, &topic);
|
taosArrayPush(pConsumer->assignedTopics, &topic);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if(sver > 1){
|
||||||
|
buf = taosDecodeFixedI8(buf, &pConsumer->withTbName);
|
||||||
|
buf = taosDecodeFixedI8(buf, &pConsumer->useSnapshot);
|
||||||
|
buf = taosDecodeFixedI8(buf, &pConsumer->autoCommit);
|
||||||
|
buf = taosDecodeFixedI32(buf, &pConsumer->autoCommitInterval);
|
||||||
|
buf = taosDecodeFixedI32(buf, &pConsumer->resetOffsetCfg);
|
||||||
|
}
|
||||||
|
|
||||||
return (void *)buf;
|
return (void *)buf;
|
||||||
}
|
}
|
||||||
|
|
||||||
SMqConsumerEp *tCloneSMqConsumerEp(const SMqConsumerEp *pConsumerEpOld) {
|
//SMqConsumerEp *tCloneSMqConsumerEp(const SMqConsumerEp *pConsumerEpOld) {
|
||||||
SMqConsumerEp *pConsumerEpNew = taosMemoryMalloc(sizeof(SMqConsumerEp));
|
// SMqConsumerEp *pConsumerEpNew = taosMemoryMalloc(sizeof(SMqConsumerEp));
|
||||||
if (pConsumerEpNew == NULL) return NULL;
|
// if (pConsumerEpNew == NULL) return NULL;
|
||||||
pConsumerEpNew->consumerId = pConsumerEpOld->consumerId;
|
// pConsumerEpNew->consumerId = pConsumerEpOld->consumerId;
|
||||||
pConsumerEpNew->vgs = taosArrayDup(pConsumerEpOld->vgs, (__array_item_dup_fn_t)tCloneSMqVgEp);
|
// pConsumerEpNew->vgs = taosArrayDup(pConsumerEpOld->vgs, (__array_item_dup_fn_t)tCloneSMqVgEp);
|
||||||
return pConsumerEpNew;
|
// return pConsumerEpNew;
|
||||||
}
|
//}
|
||||||
|
//
|
||||||
void tDeleteSMqConsumerEp(void *data) {
|
//void tDeleteSMqConsumerEp(void *data) {
|
||||||
SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)data;
|
// SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)data;
|
||||||
taosArrayDestroyP(pConsumerEp->vgs, (FDelete)tDeleteSMqVgEp);
|
// taosArrayDestroyP(pConsumerEp->vgs, (FDelete)tDeleteSMqVgEp);
|
||||||
}
|
//}
|
||||||
|
|
||||||
int32_t tEncodeSMqConsumerEp(void **buf, const SMqConsumerEp *pConsumerEp) {
|
int32_t tEncodeSMqConsumerEp(void **buf, const SMqConsumerEp *pConsumerEp) {
|
||||||
int32_t tlen = 0;
|
int32_t tlen = 0;
|
||||||
tlen += taosEncodeFixedI64(buf, pConsumerEp->consumerId);
|
tlen += taosEncodeFixedI64(buf, pConsumerEp->consumerId);
|
||||||
tlen += taosEncodeArray(buf, pConsumerEp->vgs, (FEncode)tEncodeSMqVgEp);
|
tlen += taosEncodeArray(buf, pConsumerEp->vgs, (FEncode)tEncodeSMqVgEp);
|
||||||
#if 0
|
int32_t szVgs = taosArrayGetSize(pConsumerEp->offsetRows);
|
||||||
int32_t sz = taosArrayGetSize(pConsumerEp->vgs);
|
tlen += taosEncodeFixedI32(buf, szVgs);
|
||||||
tlen += taosEncodeFixedI32(buf, sz);
|
for (int32_t j= 0; j < szVgs; ++j) {
|
||||||
for (int32_t i = 0; i < sz; i++) {
|
OffsetRows *offRows = taosArrayGet(pConsumerEp->offsetRows, j);
|
||||||
SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, i);
|
tlen += taosEncodeFixedI32(buf, offRows->vgId);
|
||||||
tlen += tEncodeSMqVgEp(buf, pVgEp);
|
tlen += taosEncodeFixedI64(buf, offRows->rows);
|
||||||
|
tlen += taosEncodeFixedI8(buf, offRows->offset.type);
|
||||||
|
if (offRows->offset.type == TMQ_OFFSET__SNAPSHOT_DATA || offRows->offset.type == TMQ_OFFSET__SNAPSHOT_META) {
|
||||||
|
tlen += taosEncodeFixedI64(buf, offRows->offset.uid);
|
||||||
|
tlen += taosEncodeFixedI64(buf, offRows->offset.ts);
|
||||||
|
} else if (offRows->offset.type == TMQ_OFFSET__LOG) {
|
||||||
|
tlen += taosEncodeFixedI64(buf, offRows->offset.version);
|
||||||
|
} else {
|
||||||
|
// do nothing
|
||||||
}
|
}
|
||||||
#endif
|
}
|
||||||
|
//#if 0
|
||||||
|
// int32_t sz = taosArrayGetSize(pConsumerEp->vgs);
|
||||||
|
// tlen += taosEncodeFixedI32(buf, sz);
|
||||||
|
// for (int32_t i = 0; i < sz; i++) {
|
||||||
|
// SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, i);
|
||||||
|
// tlen += tEncodeSMqVgEp(buf, pVgEp);
|
||||||
|
// }
|
||||||
|
//#endif
|
||||||
return tlen;
|
return tlen;
|
||||||
}
|
}
|
||||||
|
|
||||||
void *tDecodeSMqConsumerEp(const void *buf, SMqConsumerEp *pConsumerEp) {
|
void *tDecodeSMqConsumerEp(const void *buf, SMqConsumerEp *pConsumerEp, int8_t sver) {
|
||||||
buf = taosDecodeFixedI64(buf, &pConsumerEp->consumerId);
|
buf = taosDecodeFixedI64(buf, &pConsumerEp->consumerId);
|
||||||
buf = taosDecodeArray(buf, &pConsumerEp->vgs, (FDecode)tDecodeSMqVgEp, sizeof(SMqVgEp));
|
buf = taosDecodeArray(buf, &pConsumerEp->vgs, (FDecode)tDecodeSMqVgEp, sizeof(SMqVgEp));
|
||||||
#if 0
|
if (sver > 1){
|
||||||
int32_t sz;
|
int32_t szVgs = 0;
|
||||||
buf = taosDecodeFixedI32(buf, &sz);
|
buf = taosDecodeFixedI32(buf, &szVgs);
|
||||||
pConsumerEp->vgs = taosArrayInit(sz, sizeof(void *));
|
if(szVgs > 0){
|
||||||
for (int32_t i = 0; i < sz; i++) {
|
pConsumerEp->offsetRows = taosArrayInit(szVgs, sizeof(OffsetRows));
|
||||||
SMqVgEp *pVgEp = taosMemoryMalloc(sizeof(SMqVgEp));
|
if (NULL == pConsumerEp->offsetRows) return NULL;
|
||||||
buf = tDecodeSMqVgEp(buf, pVgEp);
|
for (int32_t j= 0; j < szVgs; ++j) {
|
||||||
taosArrayPush(pConsumerEp->vgs, &pVgEp);
|
OffsetRows* offRows = taosArrayReserve(pConsumerEp->offsetRows, 1);
|
||||||
|
buf = taosDecodeFixedI32(buf, &offRows->vgId);
|
||||||
|
buf = taosDecodeFixedI64(buf, &offRows->rows);
|
||||||
|
buf = taosDecodeFixedI8(buf, &offRows->offset.type);
|
||||||
|
if (offRows->offset.type == TMQ_OFFSET__SNAPSHOT_DATA || offRows->offset.type == TMQ_OFFSET__SNAPSHOT_META) {
|
||||||
|
buf = taosDecodeFixedI64(buf, &offRows->offset.uid);
|
||||||
|
buf = taosDecodeFixedI64(buf, &offRows->offset.ts);
|
||||||
|
} else if (offRows->offset.type == TMQ_OFFSET__LOG) {
|
||||||
|
buf = taosDecodeFixedI64(buf, &offRows->offset.version);
|
||||||
|
} else {
|
||||||
|
// do nothing
|
||||||
}
|
}
|
||||||
#endif
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
//#if 0
|
||||||
|
// int32_t sz;
|
||||||
|
// buf = taosDecodeFixedI32(buf, &sz);
|
||||||
|
// pConsumerEp->vgs = taosArrayInit(sz, sizeof(void *));
|
||||||
|
// for (int32_t i = 0; i < sz; i++) {
|
||||||
|
// SMqVgEp *pVgEp = taosMemoryMalloc(sizeof(SMqVgEp));
|
||||||
|
// buf = tDecodeSMqVgEp(buf, pVgEp);
|
||||||
|
// taosArrayPush(pConsumerEp->vgs, &pVgEp);
|
||||||
|
// }
|
||||||
|
//#endif
|
||||||
|
|
||||||
return (void *)buf;
|
return (void *)buf;
|
||||||
}
|
}
|
||||||
|
@ -468,6 +519,7 @@ SMqSubscribeObj *tCloneSubscribeObj(const SMqSubscribeObj *pSub) {
|
||||||
taosHashPut(pSubNew->consumerHash, &newEp.consumerId, sizeof(int64_t), &newEp, sizeof(SMqConsumerEp));
|
taosHashPut(pSubNew->consumerHash, &newEp.consumerId, sizeof(int64_t), &newEp, sizeof(SMqConsumerEp));
|
||||||
}
|
}
|
||||||
pSubNew->unassignedVgs = taosArrayDup(pSub->unassignedVgs, (__array_item_dup_fn_t)tCloneSMqVgEp);
|
pSubNew->unassignedVgs = taosArrayDup(pSub->unassignedVgs, (__array_item_dup_fn_t)tCloneSMqVgEp);
|
||||||
|
pSubNew->offsetRows = taosArrayDup(pSub->offsetRows, NULL);
|
||||||
memcpy(pSubNew->dbName, pSub->dbName, TSDB_DB_FNAME_LEN);
|
memcpy(pSubNew->dbName, pSub->dbName, TSDB_DB_FNAME_LEN);
|
||||||
return pSubNew;
|
return pSubNew;
|
||||||
}
|
}
|
||||||
|
@ -479,9 +531,11 @@ void tDeleteSubscribeObj(SMqSubscribeObj *pSub) {
|
||||||
if (pIter == NULL) break;
|
if (pIter == NULL) break;
|
||||||
SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
|
SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
|
||||||
taosArrayDestroyP(pConsumerEp->vgs, (FDelete)tDeleteSMqVgEp);
|
taosArrayDestroyP(pConsumerEp->vgs, (FDelete)tDeleteSMqVgEp);
|
||||||
|
taosArrayDestroy(pConsumerEp->offsetRows);
|
||||||
}
|
}
|
||||||
taosHashCleanup(pSub->consumerHash);
|
taosHashCleanup(pSub->consumerHash);
|
||||||
taosArrayDestroyP(pSub->unassignedVgs, (FDelete)tDeleteSMqVgEp);
|
taosArrayDestroyP(pSub->unassignedVgs, (FDelete)tDeleteSMqVgEp);
|
||||||
|
taosArrayDestroy(pSub->offsetRows);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tEncodeSubscribeObj(void **buf, const SMqSubscribeObj *pSub) {
|
int32_t tEncodeSubscribeObj(void **buf, const SMqSubscribeObj *pSub) {
|
||||||
|
@ -508,10 +562,27 @@ int32_t tEncodeSubscribeObj(void **buf, const SMqSubscribeObj *pSub) {
|
||||||
if (cnt != sz) return -1;
|
if (cnt != sz) return -1;
|
||||||
tlen += taosEncodeArray(buf, pSub->unassignedVgs, (FEncode)tEncodeSMqVgEp);
|
tlen += taosEncodeArray(buf, pSub->unassignedVgs, (FEncode)tEncodeSMqVgEp);
|
||||||
tlen += taosEncodeString(buf, pSub->dbName);
|
tlen += taosEncodeString(buf, pSub->dbName);
|
||||||
|
|
||||||
|
int32_t szVgs = taosArrayGetSize(pSub->offsetRows);
|
||||||
|
tlen += taosEncodeFixedI32(buf, szVgs);
|
||||||
|
for (int32_t j= 0; j < szVgs; ++j) {
|
||||||
|
OffsetRows *offRows = taosArrayGet(pSub->offsetRows, j);
|
||||||
|
tlen += taosEncodeFixedI32(buf, offRows->vgId);
|
||||||
|
tlen += taosEncodeFixedI64(buf, offRows->rows);
|
||||||
|
tlen += taosEncodeFixedI8(buf, offRows->offset.type);
|
||||||
|
if (offRows->offset.type == TMQ_OFFSET__SNAPSHOT_DATA || offRows->offset.type == TMQ_OFFSET__SNAPSHOT_META) {
|
||||||
|
tlen += taosEncodeFixedI64(buf, offRows->offset.uid);
|
||||||
|
tlen += taosEncodeFixedI64(buf, offRows->offset.ts);
|
||||||
|
} else if (offRows->offset.type == TMQ_OFFSET__LOG) {
|
||||||
|
tlen += taosEncodeFixedI64(buf, offRows->offset.version);
|
||||||
|
} else {
|
||||||
|
// do nothing
|
||||||
|
}
|
||||||
|
}
|
||||||
return tlen;
|
return tlen;
|
||||||
}
|
}
|
||||||
|
|
||||||
void *tDecodeSubscribeObj(const void *buf, SMqSubscribeObj *pSub) {
|
void *tDecodeSubscribeObj(const void *buf, SMqSubscribeObj *pSub, int8_t sver) {
|
||||||
//
|
//
|
||||||
buf = taosDecodeStringTo(buf, pSub->key);
|
buf = taosDecodeStringTo(buf, pSub->key);
|
||||||
buf = taosDecodeFixedI64(buf, &pSub->dbUid);
|
buf = taosDecodeFixedI64(buf, &pSub->dbUid);
|
||||||
|
@ -526,74 +597,97 @@ void *tDecodeSubscribeObj(const void *buf, SMqSubscribeObj *pSub) {
|
||||||
pSub->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
|
pSub->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
|
||||||
for (int32_t i = 0; i < sz; i++) {
|
for (int32_t i = 0; i < sz; i++) {
|
||||||
SMqConsumerEp consumerEp = {0};
|
SMqConsumerEp consumerEp = {0};
|
||||||
buf = tDecodeSMqConsumerEp(buf, &consumerEp);
|
buf = tDecodeSMqConsumerEp(buf, &consumerEp, sver);
|
||||||
taosHashPut(pSub->consumerHash, &consumerEp.consumerId, sizeof(int64_t), &consumerEp, sizeof(SMqConsumerEp));
|
taosHashPut(pSub->consumerHash, &consumerEp.consumerId, sizeof(int64_t), &consumerEp, sizeof(SMqConsumerEp));
|
||||||
}
|
}
|
||||||
|
|
||||||
buf = taosDecodeArray(buf, &pSub->unassignedVgs, (FDecode)tDecodeSMqVgEp, sizeof(SMqVgEp));
|
buf = taosDecodeArray(buf, &pSub->unassignedVgs, (FDecode)tDecodeSMqVgEp, sizeof(SMqVgEp));
|
||||||
buf = taosDecodeStringTo(buf, pSub->dbName);
|
buf = taosDecodeStringTo(buf, pSub->dbName);
|
||||||
|
|
||||||
|
if (sver > 1){
|
||||||
|
int32_t szVgs = 0;
|
||||||
|
buf = taosDecodeFixedI32(buf, &szVgs);
|
||||||
|
if(szVgs > 0){
|
||||||
|
pSub->offsetRows = taosArrayInit(szVgs, sizeof(OffsetRows));
|
||||||
|
if (NULL == pSub->offsetRows) return NULL;
|
||||||
|
for (int32_t j= 0; j < szVgs; ++j) {
|
||||||
|
OffsetRows* offRows = taosArrayReserve(pSub->offsetRows, 1);
|
||||||
|
buf = taosDecodeFixedI32(buf, &offRows->vgId);
|
||||||
|
buf = taosDecodeFixedI64(buf, &offRows->rows);
|
||||||
|
buf = taosDecodeFixedI8(buf, &offRows->offset.type);
|
||||||
|
if (offRows->offset.type == TMQ_OFFSET__SNAPSHOT_DATA || offRows->offset.type == TMQ_OFFSET__SNAPSHOT_META) {
|
||||||
|
buf = taosDecodeFixedI64(buf, &offRows->offset.uid);
|
||||||
|
buf = taosDecodeFixedI64(buf, &offRows->offset.ts);
|
||||||
|
} else if (offRows->offset.type == TMQ_OFFSET__LOG) {
|
||||||
|
buf = taosDecodeFixedI64(buf, &offRows->offset.version);
|
||||||
|
} else {
|
||||||
|
// do nothing
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
return (void *)buf;
|
return (void *)buf;
|
||||||
}
|
}
|
||||||
|
|
||||||
SMqSubActionLogEntry *tCloneSMqSubActionLogEntry(SMqSubActionLogEntry *pEntry) {
|
//SMqSubActionLogEntry *tCloneSMqSubActionLogEntry(SMqSubActionLogEntry *pEntry) {
|
||||||
SMqSubActionLogEntry *pEntryNew = taosMemoryMalloc(sizeof(SMqSubActionLogEntry));
|
// SMqSubActionLogEntry *pEntryNew = taosMemoryMalloc(sizeof(SMqSubActionLogEntry));
|
||||||
if (pEntryNew == NULL) return NULL;
|
// if (pEntryNew == NULL) return NULL;
|
||||||
pEntryNew->epoch = pEntry->epoch;
|
// pEntryNew->epoch = pEntry->epoch;
|
||||||
pEntryNew->consumers = taosArrayDup(pEntry->consumers, (__array_item_dup_fn_t)tCloneSMqConsumerEp);
|
// pEntryNew->consumers = taosArrayDup(pEntry->consumers, (__array_item_dup_fn_t)tCloneSMqConsumerEp);
|
||||||
return pEntryNew;
|
// return pEntryNew;
|
||||||
}
|
//}
|
||||||
|
//
|
||||||
|
//void tDeleteSMqSubActionLogEntry(SMqSubActionLogEntry *pEntry) {
|
||||||
|
// taosArrayDestroyEx(pEntry->consumers, (FDelete)tDeleteSMqConsumerEp);
|
||||||
|
//}
|
||||||
|
|
||||||
void tDeleteSMqSubActionLogEntry(SMqSubActionLogEntry *pEntry) {
|
//int32_t tEncodeSMqSubActionLogEntry(void **buf, const SMqSubActionLogEntry *pEntry) {
|
||||||
taosArrayDestroyEx(pEntry->consumers, (FDelete)tDeleteSMqConsumerEp);
|
// int32_t tlen = 0;
|
||||||
}
|
// tlen += taosEncodeFixedI32(buf, pEntry->epoch);
|
||||||
|
// tlen += taosEncodeArray(buf, pEntry->consumers, (FEncode)tEncodeSMqSubActionLogEntry);
|
||||||
|
// return tlen;
|
||||||
|
//}
|
||||||
|
//
|
||||||
|
//void *tDecodeSMqSubActionLogEntry(const void *buf, SMqSubActionLogEntry *pEntry) {
|
||||||
|
// buf = taosDecodeFixedI32(buf, &pEntry->epoch);
|
||||||
|
// buf = taosDecodeArray(buf, &pEntry->consumers, (FDecode)tDecodeSMqSubActionLogEntry, sizeof(SMqSubActionLogEntry));
|
||||||
|
// return (void *)buf;
|
||||||
|
//}
|
||||||
|
|
||||||
int32_t tEncodeSMqSubActionLogEntry(void **buf, const SMqSubActionLogEntry *pEntry) {
|
//SMqSubActionLogObj *tCloneSMqSubActionLogObj(SMqSubActionLogObj *pLog) {
|
||||||
int32_t tlen = 0;
|
// SMqSubActionLogObj *pLogNew = taosMemoryMalloc(sizeof(SMqSubActionLogObj));
|
||||||
tlen += taosEncodeFixedI32(buf, pEntry->epoch);
|
// if (pLogNew == NULL) return pLogNew;
|
||||||
tlen += taosEncodeArray(buf, pEntry->consumers, (FEncode)tEncodeSMqSubActionLogEntry);
|
// memcpy(pLogNew->key, pLog->key, TSDB_SUBSCRIBE_KEY_LEN);
|
||||||
return tlen;
|
// pLogNew->logs = taosArrayDup(pLog->logs, (__array_item_dup_fn_t)tCloneSMqConsumerEp);
|
||||||
}
|
// return pLogNew;
|
||||||
|
//}
|
||||||
|
//
|
||||||
|
//void tDeleteSMqSubActionLogObj(SMqSubActionLogObj *pLog) {
|
||||||
|
// taosArrayDestroyEx(pLog->logs, (FDelete)tDeleteSMqConsumerEp);
|
||||||
|
//}
|
||||||
|
|
||||||
void *tDecodeSMqSubActionLogEntry(const void *buf, SMqSubActionLogEntry *pEntry) {
|
//int32_t tEncodeSMqSubActionLogObj(void **buf, const SMqSubActionLogObj *pLog) {
|
||||||
buf = taosDecodeFixedI32(buf, &pEntry->epoch);
|
// int32_t tlen = 0;
|
||||||
buf = taosDecodeArray(buf, &pEntry->consumers, (FDecode)tDecodeSMqSubActionLogEntry, sizeof(SMqSubActionLogEntry));
|
// tlen += taosEncodeString(buf, pLog->key);
|
||||||
return (void *)buf;
|
// tlen += taosEncodeArray(buf, pLog->logs, (FEncode)tEncodeSMqSubActionLogEntry);
|
||||||
}
|
// return tlen;
|
||||||
|
//}
|
||||||
SMqSubActionLogObj *tCloneSMqSubActionLogObj(SMqSubActionLogObj *pLog) {
|
//
|
||||||
SMqSubActionLogObj *pLogNew = taosMemoryMalloc(sizeof(SMqSubActionLogObj));
|
//void *tDecodeSMqSubActionLogObj(const void *buf, SMqSubActionLogObj *pLog) {
|
||||||
if (pLogNew == NULL) return pLogNew;
|
// buf = taosDecodeStringTo(buf, pLog->key);
|
||||||
memcpy(pLogNew->key, pLog->key, TSDB_SUBSCRIBE_KEY_LEN);
|
// buf = taosDecodeArray(buf, &pLog->logs, (FDecode)tDecodeSMqSubActionLogEntry, sizeof(SMqSubActionLogEntry));
|
||||||
pLogNew->logs = taosArrayDup(pLog->logs, (__array_item_dup_fn_t)tCloneSMqConsumerEp);
|
// return (void *)buf;
|
||||||
return pLogNew;
|
//}
|
||||||
}
|
//
|
||||||
|
//int32_t tEncodeSMqOffsetObj(void **buf, const SMqOffsetObj *pOffset) {
|
||||||
void tDeleteSMqSubActionLogObj(SMqSubActionLogObj *pLog) {
|
// int32_t tlen = 0;
|
||||||
taosArrayDestroyEx(pLog->logs, (FDelete)tDeleteSMqConsumerEp);
|
// tlen += taosEncodeString(buf, pOffset->key);
|
||||||
}
|
// tlen += taosEncodeFixedI64(buf, pOffset->offset);
|
||||||
|
// return tlen;
|
||||||
int32_t tEncodeSMqSubActionLogObj(void **buf, const SMqSubActionLogObj *pLog) {
|
//}
|
||||||
int32_t tlen = 0;
|
//
|
||||||
tlen += taosEncodeString(buf, pLog->key);
|
//void *tDecodeSMqOffsetObj(void *buf, SMqOffsetObj *pOffset) {
|
||||||
tlen += taosEncodeArray(buf, pLog->logs, (FEncode)tEncodeSMqSubActionLogEntry);
|
// buf = taosDecodeStringTo(buf, pOffset->key);
|
||||||
return tlen;
|
// buf = taosDecodeFixedI64(buf, &pOffset->offset);
|
||||||
}
|
// return buf;
|
||||||
|
//}
|
||||||
void *tDecodeSMqSubActionLogObj(const void *buf, SMqSubActionLogObj *pLog) {
|
|
||||||
buf = taosDecodeStringTo(buf, pLog->key);
|
|
||||||
buf = taosDecodeArray(buf, &pLog->logs, (FDecode)tDecodeSMqSubActionLogEntry, sizeof(SMqSubActionLogEntry));
|
|
||||||
return (void *)buf;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tEncodeSMqOffsetObj(void **buf, const SMqOffsetObj *pOffset) {
|
|
||||||
int32_t tlen = 0;
|
|
||||||
tlen += taosEncodeString(buf, pOffset->key);
|
|
||||||
tlen += taosEncodeFixedI64(buf, pOffset->offset);
|
|
||||||
return tlen;
|
|
||||||
}
|
|
||||||
|
|
||||||
void *tDecodeSMqOffsetObj(void *buf, SMqOffsetObj *pOffset) {
|
|
||||||
buf = taosDecodeStringTo(buf, pOffset->key);
|
|
||||||
buf = taosDecodeFixedI64(buf, &pOffset->offset);
|
|
||||||
return buf;
|
|
||||||
}
|
|
||||||
|
|
|
@ -24,7 +24,7 @@
|
||||||
#include "tcompare.h"
|
#include "tcompare.h"
|
||||||
#include "tname.h"
|
#include "tname.h"
|
||||||
|
|
||||||
#define MND_SUBSCRIBE_VER_NUMBER 1
|
#define MND_SUBSCRIBE_VER_NUMBER 2
|
||||||
#define MND_SUBSCRIBE_RESERVE_SIZE 64
|
#define MND_SUBSCRIBE_RESERVE_SIZE 64
|
||||||
|
|
||||||
#define MND_SUBSCRIBE_REBALANCE_CNT 3
|
#define MND_SUBSCRIBE_REBALANCE_CNT 3
|
||||||
|
@ -255,7 +255,7 @@ static void doAddNewConsumers(SMqRebOutputObj *pOutput, const SMqRebInputObj *pI
|
||||||
for (int32_t i = 0; i < numOfNewConsumers; i++) {
|
for (int32_t i = 0; i < numOfNewConsumers; i++) {
|
||||||
int64_t consumerId = *(int64_t *)taosArrayGet(pInput->pRebInfo->newConsumers, i);
|
int64_t consumerId = *(int64_t *)taosArrayGet(pInput->pRebInfo->newConsumers, i);
|
||||||
|
|
||||||
SMqConsumerEp newConsumerEp;
|
SMqConsumerEp newConsumerEp = {0};
|
||||||
newConsumerEp.consumerId = consumerId;
|
newConsumerEp.consumerId = consumerId;
|
||||||
newConsumerEp.vgs = taosArrayInit(0, sizeof(void *));
|
newConsumerEp.vgs = taosArrayInit(0, sizeof(void *));
|
||||||
|
|
||||||
|
@ -449,8 +449,44 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
|
||||||
|
|
||||||
SMqRebOutputVg* pRebOutput = (SMqRebOutputVg *)pRemovedIter;
|
SMqRebOutputVg* pRebOutput = (SMqRebOutputVg *)pRemovedIter;
|
||||||
taosArrayPush(pOutput->rebVgs, pRebOutput);
|
taosArrayPush(pOutput->rebVgs, pRebOutput);
|
||||||
if(taosHashGetSize(pOutput->pSub->consumerHash) == 0){ // if all consumer is removed, put all vg into unassigned
|
if(taosHashGetSize(pOutput->pSub->consumerHash) == 0){ // if all consumer is removed
|
||||||
taosArrayPush(pOutput->pSub->unassignedVgs, &pRebOutput->pVgEp);
|
taosArrayPush(pOutput->pSub->unassignedVgs, &pRebOutput->pVgEp); // put all vg into unassigned
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if(taosHashGetSize(pOutput->pSub->consumerHash) == 0) { // if all consumer is removed
|
||||||
|
SMqSubscribeObj *pSub = mndAcquireSubscribeByKey(pMnode, pInput->pRebInfo->key); // put all offset rows
|
||||||
|
if (pSub) {
|
||||||
|
taosRLockLatch(&pSub->lock);
|
||||||
|
bool init = false;
|
||||||
|
if (pOutput->pSub->offsetRows == NULL) {
|
||||||
|
pOutput->pSub->offsetRows = taosArrayInit(4, sizeof(OffsetRows));
|
||||||
|
init = true;
|
||||||
|
}
|
||||||
|
pIter = NULL;
|
||||||
|
while (1) {
|
||||||
|
pIter = taosHashIterate(pSub->consumerHash, pIter);
|
||||||
|
if (pIter == NULL) break;
|
||||||
|
SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
|
||||||
|
if (init) {
|
||||||
|
taosArrayAddAll(pOutput->pSub->offsetRows, pConsumerEp->offsetRows);
|
||||||
|
// mDebug("pSub->offsetRows is init");
|
||||||
|
} else {
|
||||||
|
for (int j = 0; j < taosArrayGetSize(pConsumerEp->offsetRows); j++) {
|
||||||
|
OffsetRows *d1 = taosArrayGet(pConsumerEp->offsetRows, j);
|
||||||
|
for (int i = 0; i < taosArrayGetSize(pOutput->pSub->offsetRows); i++) {
|
||||||
|
OffsetRows *d2 = taosArrayGet(pOutput->pSub->offsetRows, i);
|
||||||
|
if (d1->vgId == d2->vgId) {
|
||||||
|
d2->rows += d1->rows;
|
||||||
|
d2->offset = d1->offset;
|
||||||
|
// mDebug("pSub->offsetRows add vgId:%d, after:%"PRId64", before:%"PRId64, d2->vgId, d2->rows, d1->rows);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
taosRUnLockLatch(&pSub->lock);
|
||||||
|
mndReleaseSubscribe(pMnode, pSub);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -809,7 +845,7 @@ static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw) {
|
||||||
int8_t sver = 0;
|
int8_t sver = 0;
|
||||||
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto SUB_DECODE_OVER;
|
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto SUB_DECODE_OVER;
|
||||||
|
|
||||||
if (sver != MND_SUBSCRIBE_VER_NUMBER) {
|
if (sver > MND_SUBSCRIBE_VER_NUMBER || sver < 1) {
|
||||||
terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
|
terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
|
||||||
goto SUB_DECODE_OVER;
|
goto SUB_DECODE_OVER;
|
||||||
}
|
}
|
||||||
|
@ -828,7 +864,7 @@ static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw) {
|
||||||
SDB_GET_BINARY(pRaw, dataPos, buf, tlen, SUB_DECODE_OVER);
|
SDB_GET_BINARY(pRaw, dataPos, buf, tlen, SUB_DECODE_OVER);
|
||||||
SDB_GET_RESERVE(pRaw, dataPos, MND_SUBSCRIBE_RESERVE_SIZE, SUB_DECODE_OVER);
|
SDB_GET_RESERVE(pRaw, dataPos, MND_SUBSCRIBE_RESERVE_SIZE, SUB_DECODE_OVER);
|
||||||
|
|
||||||
if (tDecodeSubscribeObj(buf, pSub) == NULL) {
|
if (tDecodeSubscribeObj(buf, pSub, sver) == NULL) {
|
||||||
goto SUB_DECODE_OVER;
|
goto SUB_DECODE_OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -890,6 +926,10 @@ static int32_t mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubsc
|
||||||
pOldSub->unassignedVgs = pNewSub->unassignedVgs;
|
pOldSub->unassignedVgs = pNewSub->unassignedVgs;
|
||||||
pNewSub->unassignedVgs = tmp1;
|
pNewSub->unassignedVgs = tmp1;
|
||||||
|
|
||||||
|
SArray *tmp2 = pOldSub->offsetRows;
|
||||||
|
pOldSub->offsetRows = pNewSub->offsetRows;
|
||||||
|
pNewSub->offsetRows = tmp2;
|
||||||
|
|
||||||
taosWUnLockLatch(&pOldSub->lock);
|
taosWUnLockLatch(&pOldSub->lock);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -1028,6 +1068,61 @@ END:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t buildResult(SSDataBlock *pBlock, int32_t* numOfRows, int64_t consumerId, const char* topic, const char* cgroup, SArray* vgs, SArray *offsetRows){
|
||||||
|
int32_t sz = taosArrayGetSize(vgs);
|
||||||
|
for (int32_t j = 0; j < sz; j++) {
|
||||||
|
SMqVgEp *pVgEp = taosArrayGetP(vgs, j);
|
||||||
|
|
||||||
|
SColumnInfoData *pColInfo;
|
||||||
|
int32_t cols = 0;
|
||||||
|
|
||||||
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
|
colDataSetVal(pColInfo, *numOfRows, (const char *)topic, false);
|
||||||
|
|
||||||
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
|
colDataSetVal(pColInfo, *numOfRows, (const char *)cgroup, false);
|
||||||
|
|
||||||
|
// vg id
|
||||||
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
|
colDataSetVal(pColInfo, *numOfRows, (const char *)&pVgEp->vgId, false);
|
||||||
|
|
||||||
|
// consumer id
|
||||||
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
|
colDataSetVal(pColInfo, *numOfRows, (const char *)&consumerId, consumerId == -1);
|
||||||
|
|
||||||
|
mDebug("mnd show subscriptions: topic %s, consumer:0x%" PRIx64 " cgroup %s vgid %d", varDataVal(topic),
|
||||||
|
consumerId, varDataVal(cgroup), pVgEp->vgId);
|
||||||
|
|
||||||
|
// offset
|
||||||
|
OffsetRows *data = NULL;
|
||||||
|
for(int i = 0; i < taosArrayGetSize(offsetRows); i++){
|
||||||
|
OffsetRows *tmp = taosArrayGet(offsetRows, i);
|
||||||
|
if(tmp->vgId != pVgEp->vgId){
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
data = tmp;
|
||||||
|
}
|
||||||
|
if(data){
|
||||||
|
// vg id
|
||||||
|
char buf[TSDB_OFFSET_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||||
|
tFormatOffset(varDataVal(buf), TSDB_OFFSET_LEN, &data->offset);
|
||||||
|
varDataSetLen(buf, strlen(varDataVal(buf)));
|
||||||
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
|
colDataSetVal(pColInfo, *numOfRows, (const char *)buf, false);
|
||||||
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
|
colDataSetVal(pColInfo, *numOfRows, (const char *)&data->rows, false);
|
||||||
|
}else{
|
||||||
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
|
colDataSetNULL(pColInfo, *numOfRows);
|
||||||
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
|
colDataSetNULL(pColInfo, *numOfRows);
|
||||||
|
mError("mnd show subscriptions: do not find vgId:%d in offsetRows", pVgEp->vgId);
|
||||||
|
}
|
||||||
|
(*numOfRows)++;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
|
int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
|
||||||
SMnode *pMnode = pReq->info.node;
|
SMnode *pMnode = pReq->info.node;
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
|
@ -1048,6 +1143,13 @@ int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock
|
||||||
blockDataEnsureCapacity(pBlock, numOfRows + pSub->vgNum);
|
blockDataEnsureCapacity(pBlock, numOfRows + pSub->vgNum);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// topic and cgroup
|
||||||
|
char topic[TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||||
|
char cgroup[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||||
|
mndSplitSubscribeKey(pSub->key, varDataVal(topic), varDataVal(cgroup), false);
|
||||||
|
varDataSetLen(topic, strlen(varDataVal(topic)));
|
||||||
|
varDataSetLen(cgroup, strlen(varDataVal(cgroup)));
|
||||||
|
|
||||||
SMqConsumerEp *pConsumerEp = NULL;
|
SMqConsumerEp *pConsumerEp = NULL;
|
||||||
void *pIter = NULL;
|
void *pIter = NULL;
|
||||||
while (1) {
|
while (1) {
|
||||||
|
@ -1055,97 +1157,11 @@ int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock
|
||||||
if (pIter == NULL) break;
|
if (pIter == NULL) break;
|
||||||
pConsumerEp = (SMqConsumerEp *)pIter;
|
pConsumerEp = (SMqConsumerEp *)pIter;
|
||||||
|
|
||||||
int32_t sz = taosArrayGetSize(pConsumerEp->vgs);
|
buildResult(pBlock, &numOfRows, pConsumerEp->consumerId, topic, cgroup, pConsumerEp->vgs, pConsumerEp->offsetRows);
|
||||||
for (int32_t j = 0; j < sz; j++) {
|
|
||||||
SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, j);
|
|
||||||
|
|
||||||
SColumnInfoData *pColInfo;
|
|
||||||
int32_t cols = 0;
|
|
||||||
|
|
||||||
// topic and cgroup
|
|
||||||
char topic[TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
|
||||||
char cgroup[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0};
|
|
||||||
mndSplitSubscribeKey(pSub->key, varDataVal(topic), varDataVal(cgroup), false);
|
|
||||||
varDataSetLen(topic, strlen(varDataVal(topic)));
|
|
||||||
varDataSetLen(cgroup, strlen(varDataVal(cgroup)));
|
|
||||||
|
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
|
||||||
colDataSetVal(pColInfo, numOfRows, (const char *)topic, false);
|
|
||||||
|
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
|
||||||
colDataSetVal(pColInfo, numOfRows, (const char *)cgroup, false);
|
|
||||||
|
|
||||||
// vg id
|
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
|
||||||
colDataSetVal(pColInfo, numOfRows, (const char *)&pVgEp->vgId, false);
|
|
||||||
|
|
||||||
// consumer id
|
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
|
||||||
colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumerEp->consumerId, false);
|
|
||||||
|
|
||||||
mDebug("mnd show subscriptions: topic %s, consumer:0x%" PRIx64 " cgroup %s vgid %d", varDataVal(topic),
|
|
||||||
pConsumerEp->consumerId, varDataVal(cgroup), pVgEp->vgId);
|
|
||||||
|
|
||||||
// offset
|
|
||||||
#if 0
|
|
||||||
// subscribe time
|
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
|
||||||
colDataSetVal(pColInfo, numOfRows, (const char *)&pSub->subscribeTime, false);
|
|
||||||
|
|
||||||
// rebalance time
|
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
|
||||||
colDataSetVal(pColInfo, numOfRows, (const char *)&pSub->rebalanceTime, pConsumer->rebalanceTime == 0);
|
|
||||||
#endif
|
|
||||||
|
|
||||||
numOfRows++;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// do not show for cleared subscription
|
// do not show for cleared subscription
|
||||||
int32_t sz = taosArrayGetSize(pSub->unassignedVgs);
|
buildResult(pBlock, &numOfRows, -1, topic, cgroup, pSub->unassignedVgs, pSub->offsetRows);
|
||||||
for (int32_t i = 0; i < sz; i++) {
|
|
||||||
SMqVgEp *pVgEp = taosArrayGetP(pSub->unassignedVgs, i);
|
|
||||||
|
|
||||||
SColumnInfoData *pColInfo;
|
|
||||||
int32_t cols = 0;
|
|
||||||
|
|
||||||
// topic and cgroup
|
|
||||||
char topic[TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
|
||||||
char cgroup[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0};
|
|
||||||
mndSplitSubscribeKey(pSub->key, varDataVal(topic), varDataVal(cgroup), false);
|
|
||||||
varDataSetLen(topic, strlen(varDataVal(topic)));
|
|
||||||
varDataSetLen(cgroup, strlen(varDataVal(cgroup)));
|
|
||||||
|
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
|
||||||
colDataSetVal(pColInfo, numOfRows, (const char *)topic, false);
|
|
||||||
|
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
|
||||||
colDataSetVal(pColInfo, numOfRows, (const char *)cgroup, false);
|
|
||||||
|
|
||||||
// vg id
|
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
|
||||||
colDataSetVal(pColInfo, numOfRows, (const char *)&pVgEp->vgId, false);
|
|
||||||
|
|
||||||
// consumer id
|
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
|
||||||
colDataSetVal(pColInfo, numOfRows, NULL, true);
|
|
||||||
|
|
||||||
mDebug("mnd show subscriptions(unassigned): topic %s, cgroup %s vgid %d", varDataVal(topic), varDataVal(cgroup),
|
|
||||||
pVgEp->vgId);
|
|
||||||
|
|
||||||
// offset
|
|
||||||
#if 0
|
|
||||||
// subscribe time
|
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
|
||||||
colDataSetVal(pColInfo, numOfRows, (const char *)&pSub->subscribeTime, false);
|
|
||||||
|
|
||||||
// rebalance time
|
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
|
||||||
colDataSetVal(pColInfo, numOfRows, (const char *)&pSub->rebalanceTime, pConsumer->rebalanceTime == 0);
|
|
||||||
#endif
|
|
||||||
|
|
||||||
numOfRows++;
|
|
||||||
}
|
|
||||||
|
|
||||||
pBlock->info.rows = numOfRows;
|
pBlock->info.rows = numOfRows;
|
||||||
|
|
||||||
|
|
|
@ -243,8 +243,8 @@ int32_t tqPushDataRsp(STqHandle* pHandle, int32_t vgId) {
|
||||||
tqDoSendDataRsp(&pHandle->msg->info, &dataRsp, pHandle->epoch, pHandle->consumerId, TMQ_MSG_TYPE__POLL_RSP, sver,
|
tqDoSendDataRsp(&pHandle->msg->info, &dataRsp, pHandle->epoch, pHandle->consumerId, TMQ_MSG_TYPE__POLL_RSP, sver,
|
||||||
ever);
|
ever);
|
||||||
|
|
||||||
char buf1[80] = {0};
|
char buf1[TSDB_OFFSET_LEN] = {0};
|
||||||
char buf2[80] = {0};
|
char buf2[TSDB_OFFSET_LEN] = {0};
|
||||||
tFormatOffset(buf1, tListLen(buf1), &dataRsp.reqOffset);
|
tFormatOffset(buf1, tListLen(buf1), &dataRsp.reqOffset);
|
||||||
tFormatOffset(buf2, tListLen(buf2), &dataRsp.rspOffset);
|
tFormatOffset(buf2, tListLen(buf2), &dataRsp.rspOffset);
|
||||||
tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) push rsp, block num: %d, req:%s, rsp:%s", vgId,
|
tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) push rsp, block num: %d, req:%s, rsp:%s", vgId,
|
||||||
|
@ -259,10 +259,10 @@ int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq*
|
||||||
|
|
||||||
tqDoSendDataRsp(&pMsg->info, pRsp, pReq->epoch, pReq->consumerId, type, sver, ever);
|
tqDoSendDataRsp(&pMsg->info, pRsp, pReq->epoch, pReq->consumerId, type, sver, ever);
|
||||||
|
|
||||||
char buf1[80] = {0};
|
char buf1[TSDB_OFFSET_LEN] = {0};
|
||||||
char buf2[80] = {0};
|
char buf2[TSDB_OFFSET_LEN] = {0};
|
||||||
tFormatOffset(buf1, 80, &pRsp->reqOffset);
|
tFormatOffset(buf1, TSDB_OFFSET_LEN, &pRsp->reqOffset);
|
||||||
tFormatOffset(buf2, 80, &pRsp->rspOffset);
|
tFormatOffset(buf2, TSDB_OFFSET_LEN, &pRsp->rspOffset);
|
||||||
|
|
||||||
tqDebug("vgId:%d consumer:0x%" PRIx64 " (epoch %d) send rsp, block num:%d, req:%s, rsp:%s, reqId:0x%" PRIx64, vgId,
|
tqDebug("vgId:%d consumer:0x%" PRIx64 " (epoch %d) send rsp, block num:%d, req:%s, rsp:%s, reqId:0x%" PRIx64, vgId,
|
||||||
pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2, pReq->reqId);
|
pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2, pReq->reqId);
|
||||||
|
@ -481,8 +481,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
pHandle->epoch = reqEpoch;
|
pHandle->epoch = reqEpoch;
|
||||||
}
|
}
|
||||||
|
|
||||||
char buf[80];
|
char buf[TSDB_OFFSET_LEN];
|
||||||
tFormatOffset(buf, 80, &reqOffset);
|
tFormatOffset(buf, TSDB_OFFSET_LEN, &reqOffset);
|
||||||
tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, recv poll req vgId:%d, req:%s, reqId:0x%" PRIx64,
|
tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, recv poll req vgId:%d, req:%s, reqId:0x%" PRIx64,
|
||||||
consumerId, req.epoch, pHandle->subKey, vgId, buf, req.reqId);
|
consumerId, req.epoch, pHandle->subKey, vgId, buf, req.reqId);
|
||||||
|
|
||||||
|
@ -559,7 +559,7 @@ int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
} else {
|
} else {
|
||||||
dataRsp.rspOffset.version = currentVer; // return current consume offset value
|
dataRsp.rspOffset.version = currentVer; // return current consume offset value
|
||||||
}
|
}
|
||||||
} else if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEAST) {
|
} else if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEST) {
|
||||||
dataRsp.rspOffset.version = sver; // not consume yet, set the earliest position
|
dataRsp.rspOffset.version = sver; // not consume yet, set the earliest position
|
||||||
} else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
|
} else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
|
||||||
dataRsp.rspOffset.version = ever;
|
dataRsp.rspOffset.version = ever;
|
||||||
|
|
|
@ -99,15 +99,15 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand
|
||||||
if (pOffset != NULL) {
|
if (pOffset != NULL) {
|
||||||
*pOffsetVal = pOffset->val;
|
*pOffsetVal = pOffset->val;
|
||||||
|
|
||||||
char formatBuf[80];
|
char formatBuf[TSDB_OFFSET_LEN];
|
||||||
tFormatOffset(formatBuf, 80, pOffsetVal);
|
tFormatOffset(formatBuf, TSDB_OFFSET_LEN, pOffsetVal);
|
||||||
tqDebug("tmq poll: consumer:0x%" PRIx64
|
tqDebug("tmq poll: consumer:0x%" PRIx64
|
||||||
", subkey %s, vgId:%d, existed offset found, offset reset to %s and continue. reqId:0x%" PRIx64,
|
", subkey %s, vgId:%d, existed offset found, offset reset to %s and continue. reqId:0x%" PRIx64,
|
||||||
consumerId, pHandle->subKey, vgId, formatBuf, pRequest->reqId);
|
consumerId, pHandle->subKey, vgId, formatBuf, pRequest->reqId);
|
||||||
return 0;
|
return 0;
|
||||||
} else {
|
} else {
|
||||||
// no poll occurs in this vnode for this topic, let's seek to the right offset value.
|
// no poll occurs in this vnode for this topic, let's seek to the right offset value.
|
||||||
if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEAST) {
|
if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEST) {
|
||||||
if (pRequest->useSnapshot) {
|
if (pRequest->useSnapshot) {
|
||||||
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey:%s, vgId:%d, (earliest) set offset to be snapshot",
|
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey:%s, vgId:%d, (earliest) set offset to be snapshot",
|
||||||
consumerId, pHandle->subKey, vgId);
|
consumerId, pHandle->subKey, vgId);
|
||||||
|
@ -186,8 +186,8 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
|
||||||
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_RSP, vgId);
|
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_RSP, vgId);
|
||||||
|
|
||||||
end : {
|
end : {
|
||||||
char buf[80] = {0};
|
char buf[TSDB_OFFSET_LEN] = {0};
|
||||||
tFormatOffset(buf, 80, &dataRsp.rspOffset);
|
tFormatOffset(buf, TSDB_OFFSET_LEN, &dataRsp.rspOffset);
|
||||||
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, rsp offset type:%s, reqId:0x%" PRIx64
|
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, rsp offset type:%s, reqId:0x%" PRIx64
|
||||||
" code:%d",
|
" code:%d",
|
||||||
consumerId, pHandle->subKey, vgId, dataRsp.blockNum, buf, pRequest->reqId, code);
|
consumerId, pHandle->subKey, vgId, dataRsp.blockNum, buf, pRequest->reqId, code);
|
||||||
|
|
|
@ -21,9 +21,7 @@ class TDTestCase:
|
||||||
tdSql.execute("create table db.stb (ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint, c6 tinyint unsigned, c7 smallint unsigned, c8 int unsigned, c9 bigint unsigned, c10 float, c11 double, c12 varchar(100), c13 nchar(100)) tags(t int)")
|
tdSql.execute("create table db.stb (ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint, c6 tinyint unsigned, c7 smallint unsigned, c8 int unsigned, c9 bigint unsigned, c10 float, c11 double, c12 varchar(100), c13 nchar(100)) tags(t int)")
|
||||||
tdSql.execute("insert into db.ctb using db.stb tags(1) (ts, c1) values (now, 1)")
|
tdSql.execute("insert into db.ctb using db.stb tags(1) (ts, c1) values (now, 1)")
|
||||||
|
|
||||||
tdSql.query("select count(*) from information_schema.ins_columns")
|
tdSql.execute("select count(*) from information_schema.ins_columns")
|
||||||
# enterprise version: 289, community version: 281
|
|
||||||
tdSql.checkData(0, 0, 289)
|
|
||||||
|
|
||||||
tdSql.query("select * from information_schema.ins_columns where table_name = 'ntb'")
|
tdSql.query("select * from information_schema.ins_columns where table_name = 'ntb'")
|
||||||
tdSql.checkRows(14)
|
tdSql.checkRows(14)
|
||||||
|
|
|
@ -0,0 +1,313 @@
|
||||||
|
|
||||||
|
import taos
|
||||||
|
import sys
|
||||||
|
import time
|
||||||
|
import socket
|
||||||
|
import os
|
||||||
|
import threading
|
||||||
|
from enum import Enum
|
||||||
|
|
||||||
|
from util.log import *
|
||||||
|
from util.sql import *
|
||||||
|
from util.cases import *
|
||||||
|
from util.dnodes import *
|
||||||
|
sys.path.append("./7-tmq")
|
||||||
|
from tmqCommon import *
|
||||||
|
|
||||||
|
class actionType(Enum):
|
||||||
|
CREATE_DATABASE = 0
|
||||||
|
CREATE_STABLE = 1
|
||||||
|
CREATE_CTABLE = 2
|
||||||
|
INSERT_DATA = 3
|
||||||
|
|
||||||
|
class TDTestCase:
|
||||||
|
hostname = socket.gethostname()
|
||||||
|
#rpcDebugFlagVal = '143'
|
||||||
|
#clientCfgDict = {'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''}
|
||||||
|
#clientCfgDict["rpcDebugFlag"] = rpcDebugFlagVal
|
||||||
|
#updatecfgDict = {'clientCfg': {}, 'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''}
|
||||||
|
#updatecfgDict["rpcDebugFlag"] = rpcDebugFlagVal
|
||||||
|
#print ("===================: ", updatecfgDict)
|
||||||
|
|
||||||
|
def init(self, conn, logSql, replicaVar=1):
|
||||||
|
self.replicaVar = int(replicaVar)
|
||||||
|
tdLog.debug(f"start to excute {__file__}")
|
||||||
|
tdSql.init(conn.cursor())
|
||||||
|
#tdSql.init(conn.cursor(), logSql) # output sql.txt file
|
||||||
|
|
||||||
|
def getBuildPath(self):
|
||||||
|
selfPath = os.path.dirname(os.path.realpath(__file__))
|
||||||
|
|
||||||
|
if ("community" in selfPath):
|
||||||
|
projPath = selfPath[:selfPath.find("community")]
|
||||||
|
else:
|
||||||
|
projPath = selfPath[:selfPath.find("tests")]
|
||||||
|
|
||||||
|
for root, dirs, files in os.walk(projPath):
|
||||||
|
if ("taosd" in files or "taosd.exe" in files):
|
||||||
|
rootRealPath = os.path.dirname(os.path.realpath(root))
|
||||||
|
if ("packaging" not in rootRealPath):
|
||||||
|
buildPath = root[:len(root) - len("/build/bin")]
|
||||||
|
break
|
||||||
|
return buildPath
|
||||||
|
|
||||||
|
def newcur(self,cfg,host,port):
|
||||||
|
user = "root"
|
||||||
|
password = "taosdata"
|
||||||
|
con=taos.connect(host=host, user=user, password=password, config=cfg ,port=port)
|
||||||
|
cur=con.cursor()
|
||||||
|
print(cur)
|
||||||
|
return cur
|
||||||
|
|
||||||
|
def initConsumerTable(self,cdbName='cdb'):
|
||||||
|
tdLog.info("create consume database, and consume info table, and consume result table")
|
||||||
|
tdSql.query("create database if not exists %s vgroups 1 wal_retention_period 3600"%(cdbName))
|
||||||
|
tdSql.query("drop table if exists %s.consumeinfo "%(cdbName))
|
||||||
|
tdSql.query("drop table if exists %s.consumeresult "%(cdbName))
|
||||||
|
|
||||||
|
tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)"%cdbName)
|
||||||
|
tdSql.query("create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"%cdbName)
|
||||||
|
|
||||||
|
def initConsumerInfoTable(self,cdbName='cdb'):
|
||||||
|
tdLog.info("drop consumeinfo table")
|
||||||
|
tdSql.query("drop table if exists %s.consumeinfo "%(cdbName))
|
||||||
|
tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)"%cdbName)
|
||||||
|
|
||||||
|
def insertConsumerInfo(self,consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifmanualcommit,cdbName='cdb'):
|
||||||
|
sql = "insert into %s.consumeinfo values "%cdbName
|
||||||
|
sql += "(now, %d, '%s', '%s', %d, %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata, ifmanualcommit)
|
||||||
|
tdLog.info("consume info sql: %s"%sql)
|
||||||
|
tdSql.query(sql)
|
||||||
|
|
||||||
|
def selectConsumeResult(self,expectRows,cdbName='cdb'):
|
||||||
|
resultList=[]
|
||||||
|
while 1:
|
||||||
|
tdSql.query("select * from %s.consumeresult"%cdbName)
|
||||||
|
#tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3))
|
||||||
|
if tdSql.getRows() == expectRows:
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
time.sleep(5)
|
||||||
|
|
||||||
|
for i in range(expectRows):
|
||||||
|
tdLog.info ("consume id: %d, consume msgs: %d, consume rows: %d"%(tdSql.getData(i , 1), tdSql.getData(i , 2), tdSql.getData(i , 3)))
|
||||||
|
resultList.append(tdSql.getData(i , 3))
|
||||||
|
|
||||||
|
return resultList
|
||||||
|
|
||||||
|
def startTmqSimProcess(self,buildPath,cfgPath,pollDelay,dbName,showMsg=1,showRow=1,cdbName='cdb',valgrind=0):
|
||||||
|
if valgrind == 1:
|
||||||
|
logFile = cfgPath + '/../log/valgrind-tmq.log'
|
||||||
|
shellCmd = 'nohup valgrind --log-file=' + logFile
|
||||||
|
shellCmd += '--tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all --num-callers=20 -v --workaround-gcc296-bugs=yes '
|
||||||
|
|
||||||
|
if (platform.system().lower() == 'windows'):
|
||||||
|
shellCmd = 'mintty -h never -w hide ' + buildPath + '\\build\\bin\\tmq_sim.exe -c ' + cfgPath
|
||||||
|
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName)
|
||||||
|
shellCmd += "> nul 2>&1 &"
|
||||||
|
else:
|
||||||
|
shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath
|
||||||
|
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName)
|
||||||
|
shellCmd += "> /dev/null 2>&1 &"
|
||||||
|
tdLog.info(shellCmd)
|
||||||
|
os.system(shellCmd)
|
||||||
|
|
||||||
|
def create_database(self,tsql, dbName,dropFlag=1,vgroups=4,replica=1):
|
||||||
|
if dropFlag == 1:
|
||||||
|
tsql.execute("drop database if exists %s"%(dbName))
|
||||||
|
|
||||||
|
tsql.execute("create database if not exists %s vgroups %d replica %d wal_retention_period 3600"%(dbName, vgroups, replica))
|
||||||
|
tdLog.debug("complete to create database %s"%(dbName))
|
||||||
|
return
|
||||||
|
|
||||||
|
def create_stable(self,tsql, dbName,stbName):
|
||||||
|
tsql.execute("create table if not exists %s.%s (ts timestamp, c1 bigint, c2 binary(16)) tags(t1 int)"%(dbName, stbName))
|
||||||
|
tdLog.debug("complete to create %s.%s" %(dbName, stbName))
|
||||||
|
return
|
||||||
|
|
||||||
|
def create_ctables(self,tsql, dbName,stbName,ctbNum):
|
||||||
|
tsql.execute("use %s" %dbName)
|
||||||
|
pre_create = "create table"
|
||||||
|
sql = pre_create
|
||||||
|
#tdLog.debug("doing create one stable %s and %d child table in %s ..." %(stbname, count ,dbname))
|
||||||
|
for i in range(ctbNum):
|
||||||
|
sql += " %s_%d using %s tags(%d)"%(stbName,i,stbName,i+1)
|
||||||
|
if (i > 0) and (i%100 == 0):
|
||||||
|
tsql.execute(sql)
|
||||||
|
sql = pre_create
|
||||||
|
if sql != pre_create:
|
||||||
|
tsql.execute(sql)
|
||||||
|
|
||||||
|
tdLog.debug("complete to create %d child tables in %s.%s" %(ctbNum, dbName, stbName))
|
||||||
|
return
|
||||||
|
|
||||||
|
def insert_data(self,tsql,dbName,stbName,ctbNum,rowsPerTbl,batchNum,startTs=0):
|
||||||
|
tdLog.debug("start to insert data ............")
|
||||||
|
tsql.execute("use %s" %dbName)
|
||||||
|
pre_insert = "insert into "
|
||||||
|
sql = pre_insert
|
||||||
|
|
||||||
|
if startTs == 0:
|
||||||
|
t = time.time()
|
||||||
|
startTs = int(round(t * 1000))
|
||||||
|
|
||||||
|
#tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows))
|
||||||
|
rowsOfSql = 0
|
||||||
|
for i in range(ctbNum):
|
||||||
|
sql += " %s_%d values "%(stbName,i)
|
||||||
|
for j in range(rowsPerTbl):
|
||||||
|
sql += "(%d, %d, 'tmqrow_%d') "%(startTs + j, j, j)
|
||||||
|
rowsOfSql += 1
|
||||||
|
if (j > 0) and ((rowsOfSql == batchNum) or (j == rowsPerTbl - 1)):
|
||||||
|
tsql.execute(sql)
|
||||||
|
rowsOfSql = 0
|
||||||
|
if j < rowsPerTbl - 1:
|
||||||
|
sql = "insert into %s_%d values " %(stbName,i)
|
||||||
|
else:
|
||||||
|
sql = "insert into "
|
||||||
|
#end sql
|
||||||
|
if sql != pre_insert:
|
||||||
|
#print("insert sql:%s"%sql)
|
||||||
|
tsql.execute(sql)
|
||||||
|
tdLog.debug("insert data ............ [OK]")
|
||||||
|
return
|
||||||
|
|
||||||
|
def prepareEnv(self, **parameterDict):
|
||||||
|
# create new connector for my thread
|
||||||
|
tsql=self.newcur(parameterDict['cfg'], 'localhost', 6030)
|
||||||
|
|
||||||
|
if parameterDict["actionType"] == actionType.CREATE_DATABASE:
|
||||||
|
self.create_database(tsql, parameterDict["dbName"])
|
||||||
|
elif parameterDict["actionType"] == actionType.CREATE_STABLE:
|
||||||
|
self.create_stable(tsql, parameterDict["dbName"], parameterDict["stbName"])
|
||||||
|
elif parameterDict["actionType"] == actionType.CREATE_CTABLE:
|
||||||
|
self.create_ctables(tsql, parameterDict["dbName"], parameterDict["stbName"], parameterDict["ctbNum"])
|
||||||
|
elif parameterDict["actionType"] == actionType.INSERT_DATA:
|
||||||
|
self.insert_data(tsql, parameterDict["dbName"], parameterDict["stbName"], parameterDict["ctbNum"], \
|
||||||
|
parameterDict["rowsPerTbl"],parameterDict["batchNum"])
|
||||||
|
else:
|
||||||
|
tdLog.exit("not support's action: ", parameterDict["actionType"])
|
||||||
|
|
||||||
|
return
|
||||||
|
|
||||||
|
def tmqCase1(self, cfgPath, buildPath):
|
||||||
|
tdLog.printNoPrefix("======== test case 1: ")
|
||||||
|
|
||||||
|
self.initConsumerTable()
|
||||||
|
|
||||||
|
# create and start thread
|
||||||
|
parameterDict = {'cfg': '', \
|
||||||
|
'actionType': 0, \
|
||||||
|
'dbName': 'db1', \
|
||||||
|
'dropFlag': 1, \
|
||||||
|
'vgroups': 4, \
|
||||||
|
'replica': 1, \
|
||||||
|
'stbName': 'stb1', \
|
||||||
|
'ctbNum': 10, \
|
||||||
|
'rowsPerTbl': 10000, \
|
||||||
|
'batchNum': 100, \
|
||||||
|
'startTs': 1640966400000} # 2022-01-01 00:00:00.000
|
||||||
|
|
||||||
|
self.create_database(tdSql, parameterDict["dbName"])
|
||||||
|
self.create_stable(tdSql, parameterDict["dbName"], parameterDict["stbName"])
|
||||||
|
|
||||||
|
tdLog.info("create topics from stb1")
|
||||||
|
topicFromStb1 = 'topic_stb1'
|
||||||
|
|
||||||
|
tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb1, parameterDict['dbName'], parameterDict['stbName']))
|
||||||
|
consumerId = 0
|
||||||
|
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"]
|
||||||
|
topicList = topicFromStb1
|
||||||
|
ifcheckdata = 0
|
||||||
|
ifManualCommit = 0
|
||||||
|
keyList = 'group.id:cgrp1,\
|
||||||
|
enable.auto.commit:true,\
|
||||||
|
auto.commit.interval.ms:2000,\
|
||||||
|
auto.offset.reset:earliest'
|
||||||
|
self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
|
||||||
|
|
||||||
|
tdLog.info("start consume processor")
|
||||||
|
pollDelay = 20
|
||||||
|
showMsg = 1
|
||||||
|
showRow = 1
|
||||||
|
self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow)
|
||||||
|
|
||||||
|
tdLog.info("start show subscriptions 1")
|
||||||
|
while(1):
|
||||||
|
tdSql.query("show subscriptions")
|
||||||
|
if (tdSql.getRows() == 0):
|
||||||
|
tdLog.info("sleep")
|
||||||
|
time.sleep(1)
|
||||||
|
elif (tdSql.queryResult[0][4] != None):
|
||||||
|
# tdSql.checkData(0, 4, "earliest")
|
||||||
|
tdSql.checkData(0, 5, 0)
|
||||||
|
break
|
||||||
|
|
||||||
|
time.sleep(2)
|
||||||
|
tdLog.info("start insert data")
|
||||||
|
self.create_ctables(tdSql, parameterDict["dbName"], parameterDict["stbName"], parameterDict["ctbNum"])
|
||||||
|
self.insert_data(tdSql,\
|
||||||
|
parameterDict["dbName"],\
|
||||||
|
parameterDict["stbName"],\
|
||||||
|
parameterDict["ctbNum"],\
|
||||||
|
parameterDict["rowsPerTbl"],\
|
||||||
|
parameterDict["batchNum"])
|
||||||
|
|
||||||
|
time.sleep(2)
|
||||||
|
tdLog.info("start show subscriptions 2")
|
||||||
|
tdSql.query("show subscriptions")
|
||||||
|
tdSql.checkRows(4)
|
||||||
|
print(tdSql.queryResult)
|
||||||
|
# tdSql.checkData(0, 4, 'offset(log) ver:103')
|
||||||
|
tdSql.checkData(0, 5, 10000)
|
||||||
|
# tdSql.checkData(1, 4, 'offset(log) ver:103')
|
||||||
|
tdSql.checkData(1, 5, 10000)
|
||||||
|
# tdSql.checkData(2, 4, 'offset(log) ver:303')
|
||||||
|
tdSql.checkData(2, 5, 50000)
|
||||||
|
# tdSql.checkData(3, 4, 'offset(log) ver:239')
|
||||||
|
tdSql.checkData(3, 5, 30000)
|
||||||
|
|
||||||
|
tdLog.info("insert process end, and start to check consume result")
|
||||||
|
expectRows = 1
|
||||||
|
resultList = self.selectConsumeResult(expectRows)
|
||||||
|
|
||||||
|
time.sleep(2)
|
||||||
|
tdLog.info("start show subscriptions 3")
|
||||||
|
tdSql.query("show subscriptions")
|
||||||
|
tdSql.checkRows(4)
|
||||||
|
print(tdSql.queryResult)
|
||||||
|
tdSql.checkData(0, 3, None)
|
||||||
|
# tdSql.checkData(0, 4, 'offset(log) ver:103')
|
||||||
|
tdSql.checkData(0, 5, 10000)
|
||||||
|
# tdSql.checkData(1, 4, 'offset(log) ver:103')
|
||||||
|
tdSql.checkData(1, 5, 10000)
|
||||||
|
# tdSql.checkData(2, 4, 'offset(log) ver:303')
|
||||||
|
tdSql.checkData(2, 5, 50000)
|
||||||
|
# tdSql.checkData(3, 4, 'offset(log) ver:239')
|
||||||
|
tdSql.checkData(3, 5, 30000)
|
||||||
|
|
||||||
|
tdSql.query("drop topic %s"%topicFromStb1)
|
||||||
|
|
||||||
|
tdLog.printNoPrefix("======== test case 1 end ...... ")
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
tdSql.prepare()
|
||||||
|
buildPath = self.getBuildPath()
|
||||||
|
if (buildPath == ""):
|
||||||
|
tdLog.exit("taosd not found!")
|
||||||
|
else:
|
||||||
|
tdLog.info("taosd found in %s" % buildPath)
|
||||||
|
cfgPath = buildPath + "/../sim/psim/cfg"
|
||||||
|
tdLog.info("cfgPath: %s" % cfgPath)
|
||||||
|
|
||||||
|
self.tmqCase1(cfgPath, buildPath)
|
||||||
|
# self.tmqCase2(cfgPath, buildPath)
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
tdSql.close()
|
||||||
|
tdLog.success(f"{__file__} successfully executed")
|
||||||
|
|
||||||
|
event = threading.Event()
|
||||||
|
|
||||||
|
tdCases.addLinux(__file__, TDTestCase())
|
||||||
|
tdCases.addWindows(__file__, TDTestCase())
|
Loading…
Reference in New Issue