Merge pull request #24750 from taosdata/mark/3.0
fix:[TD-28590]add logic for consume excluded
This commit is contained in:
commit
8f63952ad5
|
@ -3638,6 +3638,7 @@ typedef struct {
|
|||
int64_t timeout;
|
||||
STqOffsetVal reqOffset;
|
||||
int8_t enableReplay;
|
||||
int8_t sourceExcluded;
|
||||
} SMqPollReq;
|
||||
|
||||
int32_t tSerializeSMqPollReq(void* buf, int32_t bufLen, SMqPollReq* pReq);
|
||||
|
@ -3767,6 +3768,7 @@ typedef struct {
|
|||
int32_t vgId;
|
||||
STqOffsetVal offset;
|
||||
int64_t rows;
|
||||
int64_t ever;
|
||||
} OffsetRows;
|
||||
|
||||
typedef struct {
|
||||
|
@ -3924,6 +3926,9 @@ int32_t tDeserializeSMqSeekReq(void* buf, int32_t bufLen, SMqSeekReq* pReq);
|
|||
#define SUBMIT_REQ_COLUMN_DATA_FORMAT 0x2
|
||||
#define SUBMIT_REQ_FROM_FILE 0x4
|
||||
|
||||
#define SOURCE_NULL 0
|
||||
#define SOURCE_TAOSX 1
|
||||
|
||||
typedef struct {
|
||||
int32_t flags;
|
||||
SVCreateTbReq* pCreateTbReq;
|
||||
|
@ -3934,7 +3939,8 @@ typedef struct {
|
|||
SArray* aRowP;
|
||||
SArray* aCol;
|
||||
};
|
||||
int64_t ctimeMs;
|
||||
int64_t ctimeMs;
|
||||
int8_t source;
|
||||
} SSubmitTbData;
|
||||
|
||||
typedef struct {
|
||||
|
|
|
@ -197,6 +197,8 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
|
|||
|
||||
void qStreamSetOpen(qTaskInfo_t tinfo);
|
||||
|
||||
void qStreamSetSourceExcluded(qTaskInfo_t tinfo, int8_t sourceExcluded);
|
||||
|
||||
void qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset);
|
||||
|
||||
SMqMetaRsp* qStreamExtractMetaMsg(qTaskInfo_t tinfo);
|
||||
|
|
|
@ -63,6 +63,7 @@ struct tmq_conf_t {
|
|||
int8_t withTbName;
|
||||
int8_t snapEnable;
|
||||
int8_t replayEnable;
|
||||
int8_t sourceExcluded; // do not consume, bit
|
||||
uint16_t port;
|
||||
int32_t autoCommitInterval;
|
||||
char* ip;
|
||||
|
@ -82,6 +83,7 @@ struct tmq_t {
|
|||
int32_t autoCommitInterval;
|
||||
int8_t resetOffsetCfg;
|
||||
int8_t replayEnable;
|
||||
int8_t sourceExcluded; // do not consume, bit
|
||||
uint64_t consumerId;
|
||||
tmq_commit_cb* commitCb;
|
||||
void* commitCbUserParam;
|
||||
|
@ -385,6 +387,10 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value
|
|||
return TMQ_CONF_INVALID;
|
||||
}
|
||||
}
|
||||
if (strcasecmp(key, "msg.consume.excluded") == 0) {
|
||||
conf->sourceExcluded = taosStr2int64(value);
|
||||
return TMQ_CONF_OK;
|
||||
}
|
||||
|
||||
if (strcasecmp(key, "td.connect.db") == 0) {
|
||||
return TMQ_CONF_OK;
|
||||
|
@ -783,27 +789,26 @@ void tmqSendHbReq(void* param, void* tmrId) {
|
|||
req.consumerId = tmq->consumerId;
|
||||
req.epoch = tmq->epoch;
|
||||
taosRLockLatch(&tmq->lock);
|
||||
// if(tmq->needReportOffsetRows){
|
||||
req.topics = taosArrayInit(taosArrayGetSize(tmq->clientTopics), sizeof(TopicOffsetRows));
|
||||
for(int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++){
|
||||
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
|
||||
int32_t numOfVgroups = taosArrayGetSize(pTopic->vgs);
|
||||
TopicOffsetRows* data = taosArrayReserve(req.topics, 1);
|
||||
strcpy(data->topicName, pTopic->topicName);
|
||||
data->offsetRows = taosArrayInit(numOfVgroups, sizeof(OffsetRows));
|
||||
for(int j = 0; j < numOfVgroups; j++){
|
||||
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
|
||||
OffsetRows* offRows = taosArrayReserve(data->offsetRows, 1);
|
||||
offRows->vgId = pVg->vgId;
|
||||
offRows->rows = pVg->numOfRows;
|
||||
offRows->offset = pVg->offsetInfo.beginOffset;
|
||||
char buf[TSDB_OFFSET_LEN] = {0};
|
||||
tFormatOffset(buf, TSDB_OFFSET_LEN, &offRows->offset);
|
||||
tscInfo("consumer:0x%" PRIx64 ",report offset: vgId:%d, offset:%s, rows:%"PRId64, tmq->consumerId, offRows->vgId, buf, offRows->rows);
|
||||
}
|
||||
req.topics = taosArrayInit(taosArrayGetSize(tmq->clientTopics), sizeof(TopicOffsetRows));
|
||||
for(int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++){
|
||||
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
|
||||
int32_t numOfVgroups = taosArrayGetSize(pTopic->vgs);
|
||||
TopicOffsetRows* data = taosArrayReserve(req.topics, 1);
|
||||
strcpy(data->topicName, pTopic->topicName);
|
||||
data->offsetRows = taosArrayInit(numOfVgroups, sizeof(OffsetRows));
|
||||
for(int j = 0; j < numOfVgroups; j++){
|
||||
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
|
||||
OffsetRows* offRows = taosArrayReserve(data->offsetRows, 1);
|
||||
offRows->vgId = pVg->vgId;
|
||||
offRows->rows = pVg->numOfRows;
|
||||
offRows->offset = pVg->offsetInfo.endOffset;
|
||||
offRows->ever = pVg->offsetInfo.walVerEnd;
|
||||
char buf[TSDB_OFFSET_LEN] = {0};
|
||||
tFormatOffset(buf, TSDB_OFFSET_LEN, &offRows->offset);
|
||||
tscInfo("consumer:0x%" PRIx64 ",report offset, group:%s vgId:%d, offset:%s/%"PRId64", rows:%"PRId64,
|
||||
tmq->consumerId, tmq->groupId, offRows->vgId, buf, offRows->ever, offRows->rows);
|
||||
}
|
||||
// tmq->needReportOffsetRows = false;
|
||||
// }
|
||||
}
|
||||
taosRUnLockLatch(&tmq->lock);
|
||||
|
||||
int32_t tlen = tSerializeSMqHbReq(NULL, 0, &req);
|
||||
|
@ -1108,6 +1113,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
|
|||
pTmq->commitCbUserParam = conf->commitCbUserParam;
|
||||
pTmq->resetOffsetCfg = conf->resetOffset;
|
||||
pTmq->replayEnable = conf->replayEnable;
|
||||
pTmq->sourceExcluded = conf->sourceExcluded;
|
||||
if(conf->replayEnable){
|
||||
pTmq->autoCommit = false;
|
||||
}
|
||||
|
@ -1576,6 +1582,7 @@ void tmqBuildConsumeReqImpl(SMqPollReq* pReq, tmq_t* tmq, int64_t timeout, SMqCl
|
|||
pReq->useSnapshot = tmq->useSnapshot;
|
||||
pReq->reqId = generateRequestId();
|
||||
pReq->enableReplay = tmq->replayEnable;
|
||||
pReq->sourceExcluded = tmq->sourceExcluded;
|
||||
}
|
||||
|
||||
SMqMetaRspObj* tmqBuildMetaRspFromWrapper(SMqPollRspWrapper* pWrapper) {
|
||||
|
|
|
@ -6252,6 +6252,7 @@ int32_t tSerializeSMqHbReq(void *buf, int32_t bufLen, SMqHbReq *pReq) {
|
|||
if (tEncodeI32(&encoder, offRows->vgId) < 0) return -1;
|
||||
if (tEncodeI64(&encoder, offRows->rows) < 0) return -1;
|
||||
if (tEncodeSTqOffsetVal(&encoder, &offRows->offset) < 0) return -1;
|
||||
if (tEncodeI64(&encoder, offRows->ever) < 0) return -1;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -6289,6 +6290,7 @@ int32_t tDeserializeSMqHbReq(void *buf, int32_t bufLen, SMqHbReq *pReq) {
|
|||
if (tDecodeI32(&decoder, &offRows->vgId) < 0) return -1;
|
||||
if (tDecodeI64(&decoder, &offRows->rows) < 0) return -1;
|
||||
if (tDecodeSTqOffsetVal(&decoder, &offRows->offset) < 0) return -1;
|
||||
if (tDecodeI64(&decoder, &offRows->ever) < 0) return -1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -6600,6 +6602,7 @@ int32_t tSerializeSMqPollReq(void *buf, int32_t bufLen, SMqPollReq *pReq) {
|
|||
if (tEncodeI64(&encoder, pReq->timeout) < 0) return -1;
|
||||
if (tSerializeSTqOffsetVal(&encoder, &pReq->reqOffset) < 0) return -1;
|
||||
if (tEncodeI8(&encoder, pReq->enableReplay) < 0) return -1;
|
||||
if (tEncodeI8(&encoder, pReq->sourceExcluded) < 0) return -1;
|
||||
|
||||
tEndEncode(&encoder);
|
||||
|
||||
|
@ -6640,6 +6643,10 @@ int32_t tDeserializeSMqPollReq(void *buf, int32_t bufLen, SMqPollReq *pReq) {
|
|||
if (tDecodeI8(&decoder, &pReq->enableReplay) < 0) return -1;
|
||||
}
|
||||
|
||||
if (!tDecodeIsEnd(&decoder)) {
|
||||
if (tDecodeI8(&decoder, &pReq->sourceExcluded) < 0) return -1;
|
||||
}
|
||||
|
||||
tEndDecode(&decoder);
|
||||
|
||||
tDecoderClear(&decoder);
|
||||
|
@ -8663,6 +8670,7 @@ static int32_t tEncodeSSubmitTbData(SEncoder *pCoder, const SSubmitTbData *pSubm
|
|||
}
|
||||
}
|
||||
if (tEncodeI64(pCoder, pSubmitTbData->ctimeMs) < 0) return -1;
|
||||
if (tEncodeI8(pCoder, pSubmitTbData->source) < 0) return -1;
|
||||
|
||||
tEndEncode(pCoder);
|
||||
return 0;
|
||||
|
@ -8750,6 +8758,12 @@ static int32_t tDecodeSSubmitTbData(SDecoder *pCoder, SSubmitTbData *pSubmitTbDa
|
|||
goto _exit;
|
||||
}
|
||||
}
|
||||
if (!tDecodeIsEnd(pCoder)) {
|
||||
if (tDecodeI8(pCoder, &pSubmitTbData->source) < 0) {
|
||||
code = TSDB_CODE_INVALID_MSG;
|
||||
goto _exit;
|
||||
}
|
||||
}
|
||||
|
||||
tEndDecode(pCoder);
|
||||
|
||||
|
|
|
@ -422,27 +422,12 @@ void *tDecodeSMqConsumerObj(const void *buf, SMqConsumerObj *pConsumer, int8_t s
|
|||
return (void *)buf;
|
||||
}
|
||||
|
||||
// SMqConsumerEp *tCloneSMqConsumerEp(const SMqConsumerEp *pConsumerEpOld) {
|
||||
// SMqConsumerEp *pConsumerEpNew = taosMemoryMalloc(sizeof(SMqConsumerEp));
|
||||
// if (pConsumerEpNew == NULL) return NULL;
|
||||
// pConsumerEpNew->consumerId = pConsumerEpOld->consumerId;
|
||||
// pConsumerEpNew->vgs = taosArrayDup(pConsumerEpOld->vgs, NULL);
|
||||
// return pConsumerEpNew;
|
||||
// }
|
||||
//
|
||||
// void tDeleteSMqConsumerEp(void *data) {
|
||||
// SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)data;
|
||||
// taosArrayDestroy(pConsumerEp->vgs);
|
||||
// }
|
||||
|
||||
int32_t tEncodeSMqConsumerEp(void **buf, const SMqConsumerEp *pConsumerEp) {
|
||||
int32_t tEncodeOffRows(void **buf, SArray *offsetRows){
|
||||
int32_t tlen = 0;
|
||||
tlen += taosEncodeFixedI64(buf, pConsumerEp->consumerId);
|
||||
tlen += taosEncodeArray(buf, pConsumerEp->vgs, (FEncode)tEncodeSMqVgEp);
|
||||
int32_t szVgs = taosArrayGetSize(pConsumerEp->offsetRows);
|
||||
int32_t szVgs = taosArrayGetSize(offsetRows);
|
||||
tlen += taosEncodeFixedI32(buf, szVgs);
|
||||
for (int32_t j = 0; j < szVgs; ++j) {
|
||||
OffsetRows *offRows = taosArrayGet(pConsumerEp->offsetRows, j);
|
||||
OffsetRows *offRows = taosArrayGet(offsetRows, j);
|
||||
tlen += taosEncodeFixedI32(buf, offRows->vgId);
|
||||
tlen += taosEncodeFixedI64(buf, offRows->rows);
|
||||
tlen += taosEncodeFixedI8(buf, offRows->offset.type);
|
||||
|
@ -454,53 +439,54 @@ int32_t tEncodeSMqConsumerEp(void **buf, const SMqConsumerEp *pConsumerEp) {
|
|||
} else {
|
||||
// do nothing
|
||||
}
|
||||
tlen += taosEncodeFixedI64(buf, offRows->ever);
|
||||
}
|
||||
// #if 0
|
||||
// int32_t sz = taosArrayGetSize(pConsumerEp->vgs);
|
||||
// tlen += taosEncodeFixedI32(buf, sz);
|
||||
// for (int32_t i = 0; i < sz; i++) {
|
||||
// SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, i);
|
||||
// tlen += tEncodeSMqVgEp(buf, pVgEp);
|
||||
// }
|
||||
// #endif
|
||||
|
||||
return tlen;
|
||||
}
|
||||
|
||||
int32_t tEncodeSMqConsumerEp(void **buf, const SMqConsumerEp *pConsumerEp) {
|
||||
int32_t tlen = 0;
|
||||
tlen += taosEncodeFixedI64(buf, pConsumerEp->consumerId);
|
||||
tlen += taosEncodeArray(buf, pConsumerEp->vgs, (FEncode)tEncodeSMqVgEp);
|
||||
|
||||
|
||||
return tlen + tEncodeOffRows(buf, pConsumerEp->offsetRows);
|
||||
}
|
||||
|
||||
void *tDecodeOffRows(const void *buf, SArray **offsetRows, int8_t sver){
|
||||
int32_t szVgs = 0;
|
||||
buf = taosDecodeFixedI32(buf, &szVgs);
|
||||
if (szVgs > 0) {
|
||||
*offsetRows = taosArrayInit(szVgs, sizeof(OffsetRows));
|
||||
if (NULL == *offsetRows) return NULL;
|
||||
for (int32_t j = 0; j < szVgs; ++j) {
|
||||
OffsetRows *offRows = taosArrayReserve(*offsetRows, 1);
|
||||
buf = taosDecodeFixedI32(buf, &offRows->vgId);
|
||||
buf = taosDecodeFixedI64(buf, &offRows->rows);
|
||||
buf = taosDecodeFixedI8(buf, &offRows->offset.type);
|
||||
if (offRows->offset.type == TMQ_OFFSET__SNAPSHOT_DATA || offRows->offset.type == TMQ_OFFSET__SNAPSHOT_META) {
|
||||
buf = taosDecodeFixedI64(buf, &offRows->offset.uid);
|
||||
buf = taosDecodeFixedI64(buf, &offRows->offset.ts);
|
||||
} else if (offRows->offset.type == TMQ_OFFSET__LOG) {
|
||||
buf = taosDecodeFixedI64(buf, &offRows->offset.version);
|
||||
} else {
|
||||
// do nothing
|
||||
}
|
||||
if(sver > 2){
|
||||
buf = taosDecodeFixedI64(buf, &offRows->ever);
|
||||
}
|
||||
}
|
||||
}
|
||||
return (void *)buf;
|
||||
}
|
||||
|
||||
void *tDecodeSMqConsumerEp(const void *buf, SMqConsumerEp *pConsumerEp, int8_t sver) {
|
||||
buf = taosDecodeFixedI64(buf, &pConsumerEp->consumerId);
|
||||
buf = taosDecodeArray(buf, &pConsumerEp->vgs, (FDecode)tDecodeSMqVgEp, sizeof(SMqVgEp), sver);
|
||||
if (sver > 1) {
|
||||
int32_t szVgs = 0;
|
||||
buf = taosDecodeFixedI32(buf, &szVgs);
|
||||
if (szVgs > 0) {
|
||||
pConsumerEp->offsetRows = taosArrayInit(szVgs, sizeof(OffsetRows));
|
||||
if (NULL == pConsumerEp->offsetRows) return NULL;
|
||||
for (int32_t j = 0; j < szVgs; ++j) {
|
||||
OffsetRows *offRows = taosArrayReserve(pConsumerEp->offsetRows, 1);
|
||||
buf = taosDecodeFixedI32(buf, &offRows->vgId);
|
||||
buf = taosDecodeFixedI64(buf, &offRows->rows);
|
||||
buf = taosDecodeFixedI8(buf, &offRows->offset.type);
|
||||
if (offRows->offset.type == TMQ_OFFSET__SNAPSHOT_DATA || offRows->offset.type == TMQ_OFFSET__SNAPSHOT_META) {
|
||||
buf = taosDecodeFixedI64(buf, &offRows->offset.uid);
|
||||
buf = taosDecodeFixedI64(buf, &offRows->offset.ts);
|
||||
} else if (offRows->offset.type == TMQ_OFFSET__LOG) {
|
||||
buf = taosDecodeFixedI64(buf, &offRows->offset.version);
|
||||
} else {
|
||||
// do nothing
|
||||
}
|
||||
}
|
||||
}
|
||||
buf = tDecodeOffRows(buf, &pConsumerEp->offsetRows, sver);
|
||||
}
|
||||
// #if 0
|
||||
// int32_t sz;
|
||||
// buf = taosDecodeFixedI32(buf, &sz);
|
||||
// pConsumerEp->vgs = taosArrayInit(sz, sizeof(void *));
|
||||
// for (int32_t i = 0; i < sz; i++) {
|
||||
// SMqVgEp *pVgEp = taosMemoryMalloc(sizeof(SMqVgEp));
|
||||
// buf = tDecodeSMqVgEp(buf, pVgEp);
|
||||
// taosArrayPush(pConsumerEp->vgs, &pVgEp);
|
||||
// }
|
||||
// #endif
|
||||
|
||||
return (void *)buf;
|
||||
}
|
||||
|
@ -596,22 +582,7 @@ int32_t tEncodeSubscribeObj(void **buf, const SMqSubscribeObj *pSub) {
|
|||
tlen += taosEncodeArray(buf, pSub->unassignedVgs, (FEncode)tEncodeSMqVgEp);
|
||||
tlen += taosEncodeString(buf, pSub->dbName);
|
||||
|
||||
int32_t szVgs = taosArrayGetSize(pSub->offsetRows);
|
||||
tlen += taosEncodeFixedI32(buf, szVgs);
|
||||
for (int32_t j = 0; j < szVgs; ++j) {
|
||||
OffsetRows *offRows = taosArrayGet(pSub->offsetRows, j);
|
||||
tlen += taosEncodeFixedI32(buf, offRows->vgId);
|
||||
tlen += taosEncodeFixedI64(buf, offRows->rows);
|
||||
tlen += taosEncodeFixedI8(buf, offRows->offset.type);
|
||||
if (offRows->offset.type == TMQ_OFFSET__SNAPSHOT_DATA || offRows->offset.type == TMQ_OFFSET__SNAPSHOT_META) {
|
||||
tlen += taosEncodeFixedI64(buf, offRows->offset.uid);
|
||||
tlen += taosEncodeFixedI64(buf, offRows->offset.ts);
|
||||
} else if (offRows->offset.type == TMQ_OFFSET__LOG) {
|
||||
tlen += taosEncodeFixedI64(buf, offRows->offset.version);
|
||||
} else {
|
||||
// do nothing
|
||||
}
|
||||
}
|
||||
tlen += tEncodeOffRows(buf, pSub->offsetRows);
|
||||
tlen += taosEncodeString(buf, pSub->qmsg);
|
||||
return tlen;
|
||||
}
|
||||
|
@ -639,26 +610,7 @@ void *tDecodeSubscribeObj(const void *buf, SMqSubscribeObj *pSub, int8_t sver) {
|
|||
buf = taosDecodeStringTo(buf, pSub->dbName);
|
||||
|
||||
if (sver > 1) {
|
||||
int32_t szVgs = 0;
|
||||
buf = taosDecodeFixedI32(buf, &szVgs);
|
||||
if (szVgs > 0) {
|
||||
pSub->offsetRows = taosArrayInit(szVgs, sizeof(OffsetRows));
|
||||
if (NULL == pSub->offsetRows) return NULL;
|
||||
for (int32_t j = 0; j < szVgs; ++j) {
|
||||
OffsetRows *offRows = taosArrayReserve(pSub->offsetRows, 1);
|
||||
buf = taosDecodeFixedI32(buf, &offRows->vgId);
|
||||
buf = taosDecodeFixedI64(buf, &offRows->rows);
|
||||
buf = taosDecodeFixedI8(buf, &offRows->offset.type);
|
||||
if (offRows->offset.type == TMQ_OFFSET__SNAPSHOT_DATA || offRows->offset.type == TMQ_OFFSET__SNAPSHOT_META) {
|
||||
buf = taosDecodeFixedI64(buf, &offRows->offset.uid);
|
||||
buf = taosDecodeFixedI64(buf, &offRows->offset.ts);
|
||||
} else if (offRows->offset.type == TMQ_OFFSET__LOG) {
|
||||
buf = taosDecodeFixedI64(buf, &offRows->offset.version);
|
||||
} else {
|
||||
// do nothing
|
||||
}
|
||||
}
|
||||
}
|
||||
buf = tDecodeOffRows(buf, &pSub->offsetRows, sver);
|
||||
buf = taosDecodeString(buf, &pSub->qmsg);
|
||||
} else {
|
||||
pSub->qmsg = taosStrdup("");
|
||||
|
|
|
@ -24,7 +24,7 @@
|
|||
#include "tcompare.h"
|
||||
#include "tname.h"
|
||||
|
||||
#define MND_SUBSCRIBE_VER_NUMBER 2
|
||||
#define MND_SUBSCRIBE_VER_NUMBER 3
|
||||
#define MND_SUBSCRIBE_RESERVE_SIZE 64
|
||||
|
||||
#define MND_CONSUMER_LOST_HB_CNT 6
|
||||
|
@ -530,51 +530,50 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
|
|||
}
|
||||
}
|
||||
|
||||
// if(taosHashGetSize(pOutput->pSub->consumerHash) == 0) { // if all consumer is removed
|
||||
SMqSubscribeObj *pSub = mndAcquireSubscribeByKey(pMnode, pInput->pRebInfo->key); // put all offset rows
|
||||
if (pSub) {
|
||||
taosRLockLatch(&pSub->lock);
|
||||
if (pOutput->pSub->offsetRows == NULL) {
|
||||
pOutput->pSub->offsetRows = taosArrayInit(4, sizeof(OffsetRows));
|
||||
}
|
||||
pIter = NULL;
|
||||
while (1) {
|
||||
pIter = taosHashIterate(pSub->consumerHash, pIter);
|
||||
if (pIter == NULL) break;
|
||||
SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
|
||||
SMqConsumerEp *pConsumerEpNew = taosHashGet(pOutput->pSub->consumerHash, &pConsumerEp->consumerId, sizeof(int64_t));
|
||||
SMqSubscribeObj *pSub = mndAcquireSubscribeByKey(pMnode, pInput->pRebInfo->key); // put all offset rows
|
||||
if (pSub) {
|
||||
taosRLockLatch(&pSub->lock);
|
||||
if (pOutput->pSub->offsetRows == NULL) {
|
||||
pOutput->pSub->offsetRows = taosArrayInit(4, sizeof(OffsetRows));
|
||||
}
|
||||
pIter = NULL;
|
||||
while (1) {
|
||||
pIter = taosHashIterate(pSub->consumerHash, pIter);
|
||||
if (pIter == NULL) break;
|
||||
SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
|
||||
SMqConsumerEp *pConsumerEpNew = taosHashGet(pOutput->pSub->consumerHash, &pConsumerEp->consumerId, sizeof(int64_t));
|
||||
|
||||
for (int j = 0; j < taosArrayGetSize(pConsumerEp->offsetRows); j++) {
|
||||
OffsetRows *d1 = taosArrayGet(pConsumerEp->offsetRows, j);
|
||||
bool jump = false;
|
||||
for (int i = 0; pConsumerEpNew && i < taosArrayGetSize(pConsumerEpNew->vgs); i++){
|
||||
SMqVgEp *pVgEp = taosArrayGetP(pConsumerEpNew->vgs, i);
|
||||
if(pVgEp->vgId == d1->vgId){
|
||||
jump = true;
|
||||
mInfo("pSub->offsetRows jump, because consumer id:0x%"PRIx64 " and vgId:%d not change", pConsumerEp->consumerId, pVgEp->vgId);
|
||||
break;
|
||||
}
|
||||
}
|
||||
if(jump) continue;
|
||||
bool find = false;
|
||||
for (int i = 0; i < taosArrayGetSize(pOutput->pSub->offsetRows); i++) {
|
||||
OffsetRows *d2 = taosArrayGet(pOutput->pSub->offsetRows, i);
|
||||
if (d1->vgId == d2->vgId) {
|
||||
d2->rows += d1->rows;
|
||||
d2->offset = d1->offset;
|
||||
find = true;
|
||||
mInfo("pSub->offsetRows add vgId:%d, after:%"PRId64", before:%"PRId64, d2->vgId, d2->rows, d1->rows);
|
||||
break;
|
||||
}
|
||||
}
|
||||
if(!find){
|
||||
taosArrayPush(pOutput->pSub->offsetRows, d1);
|
||||
for (int j = 0; j < taosArrayGetSize(pConsumerEp->offsetRows); j++) {
|
||||
OffsetRows *d1 = taosArrayGet(pConsumerEp->offsetRows, j);
|
||||
bool jump = false;
|
||||
for (int i = 0; pConsumerEpNew && i < taosArrayGetSize(pConsumerEpNew->vgs); i++){
|
||||
SMqVgEp *pVgEp = taosArrayGetP(pConsumerEpNew->vgs, i);
|
||||
if(pVgEp->vgId == d1->vgId){
|
||||
jump = true;
|
||||
mInfo("pSub->offsetRows jump, because consumer id:0x%"PRIx64 " and vgId:%d not change", pConsumerEp->consumerId, pVgEp->vgId);
|
||||
break;
|
||||
}
|
||||
}
|
||||
if(jump) continue;
|
||||
bool find = false;
|
||||
for (int i = 0; i < taosArrayGetSize(pOutput->pSub->offsetRows); i++) {
|
||||
OffsetRows *d2 = taosArrayGet(pOutput->pSub->offsetRows, i);
|
||||
if (d1->vgId == d2->vgId) {
|
||||
d2->rows += d1->rows;
|
||||
d2->offset = d1->offset;
|
||||
d2->ever = d1->ever;
|
||||
find = true;
|
||||
mInfo("pSub->offsetRows add vgId:%d, after:%"PRId64", before:%"PRId64, d2->vgId, d2->rows, d1->rows);
|
||||
break;
|
||||
}
|
||||
}
|
||||
if(!find){
|
||||
taosArrayPush(pOutput->pSub->offsetRows, d1);
|
||||
}
|
||||
}
|
||||
taosRUnLockLatch(&pSub->lock);
|
||||
mndReleaseSubscribe(pMnode, pSub);
|
||||
// }
|
||||
}
|
||||
taosRUnLockLatch(&pSub->lock);
|
||||
mndReleaseSubscribe(pMnode, pSub);
|
||||
}
|
||||
|
||||
// 8. generate logs
|
||||
|
@ -1405,8 +1404,9 @@ static int32_t buildResult(SSDataBlock *pBlock, int32_t* numOfRows, int64_t cons
|
|||
}
|
||||
if(data){
|
||||
// vg id
|
||||
char buf[TSDB_OFFSET_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
char buf[TSDB_OFFSET_LEN*2 + VARSTR_HEADER_SIZE] = {0};
|
||||
tFormatOffset(varDataVal(buf), TSDB_OFFSET_LEN, &data->offset);
|
||||
sprintf(varDataVal(buf) + strlen(varDataVal(buf)), "/%"PRId64, data->ever);
|
||||
varDataSetLen(buf, strlen(varDataVal(buf)));
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataSetVal(pColInfo, *numOfRows, (const char *)buf, false);
|
||||
|
|
|
@ -223,7 +223,7 @@ bool tqReaderIsQueriedTable(STqReader *pReader, uint64_t uid);
|
|||
bool tqCurrentBlockConsumed(const STqReader *pReader);
|
||||
|
||||
int32_t tqReaderSeek(STqReader *pReader, int64_t ver, const char *id);
|
||||
bool tqNextBlockInWal(STqReader *pReader, const char *idstr);
|
||||
bool tqNextBlockInWal(STqReader *pReader, const char *idstr, int sourceExcluded);
|
||||
bool tqNextBlockImpl(STqReader *pReader, const char *idstr);
|
||||
SWalReader *tqGetWalReader(STqReader *pReader);
|
||||
SSDataBlock *tqGetResultBlock(STqReader *pReader);
|
||||
|
|
|
@ -118,7 +118,7 @@ int32_t tqScanData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal*
|
|||
int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, uint64_t reqId);
|
||||
|
||||
// tqExec
|
||||
int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxRsp* pRsp, int32_t* totalRows);
|
||||
int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxRsp* pRsp, int32_t* totalRows, int8_t sourceExcluded);
|
||||
int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols, int8_t precision);
|
||||
int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp,
|
||||
int32_t type, int32_t vgId);
|
||||
|
|
|
@ -193,7 +193,7 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema *
|
|||
continue;
|
||||
}
|
||||
|
||||
SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version, .flags = SUBMIT_REQ_AUTO_CREATE_TABLE,};
|
||||
SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version, .flags = SUBMIT_REQ_AUTO_CREATE_TABLE, .source = SOURCE_NULL};
|
||||
|
||||
int32_t cid = taosArrayGetSize(pDataBlock->pDataBlock) + 1;
|
||||
tbData.pCreateTbReq = buildAutoCreateTableReq(stbFullName, suid, cid, pDataBlock, tagArray, true);
|
||||
|
|
|
@ -368,7 +368,7 @@ int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, int64_t maxVer, con
|
|||
}
|
||||
|
||||
// todo ignore the error in wal?
|
||||
bool tqNextBlockInWal(STqReader* pReader, const char* id) {
|
||||
bool tqNextBlockInWal(STqReader* pReader, const char* id, int sourceExcluded) {
|
||||
SWalReader* pWalReader = pReader->pWalReader;
|
||||
SSDataBlock* pDataBlock = NULL;
|
||||
|
||||
|
@ -391,7 +391,10 @@ bool tqNextBlockInWal(STqReader* pReader, const char* id) {
|
|||
numOfBlocks, pReader->msg.msgLen, pReader->msg.ver);
|
||||
|
||||
SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
|
||||
|
||||
if ((pSubmitTbData->source & sourceExcluded) != 0){
|
||||
pReader->nextBlk += 1;
|
||||
continue;
|
||||
}
|
||||
if (pReader->tbIdHash == NULL || taosHashGet(pReader->tbIdHash, &pSubmitTbData->uid, sizeof(int64_t)) != NULL) {
|
||||
tqTrace("tq reader return submit block, uid:%" PRId64, pSubmitTbData->uid);
|
||||
SSDataBlock* pRes = NULL;
|
||||
|
|
|
@ -93,6 +93,7 @@ int32_t tqScanData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal*
|
|||
return -1;
|
||||
}
|
||||
|
||||
qStreamSetSourceExcluded(task, pRequest->sourceExcluded);
|
||||
while (1) {
|
||||
SSDataBlock* pDataBlock = NULL;
|
||||
code = getDataBlock(task, pHandle, vgId, &pDataBlock);
|
||||
|
@ -249,7 +250,7 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMeta
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxRsp* pRsp, int32_t* totalRows) {
|
||||
int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxRsp* pRsp, int32_t* totalRows, int8_t sourceExcluded) {
|
||||
STqExecHandle* pExec = &pHandle->execHandle;
|
||||
SArray* pBlocks = taosArrayInit(0, sizeof(SSDataBlock));
|
||||
SArray* pSchemas = taosArrayInit(0, sizeof(void*));
|
||||
|
@ -264,6 +265,10 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
|
|||
if (tqRetrieveTaosxBlock(pReader, pBlocks, pSchemas, &pSubmitTbDataRet) < 0) {
|
||||
if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) goto loop_table;
|
||||
}
|
||||
|
||||
if ((pSubmitTbDataRet->source & sourceExcluded) != 0){
|
||||
goto loop_table;
|
||||
}
|
||||
if (pRsp->withTbName) {
|
||||
int64_t uid = pExec->pTqReader->lastBlkUid;
|
||||
if (tqAddTbNameToRsp(pTq, uid, pRsp, taosArrayGetSize(pBlocks)) < 0) {
|
||||
|
@ -328,6 +333,10 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
|
|||
if (tqRetrieveTaosxBlock(pReader, pBlocks, pSchemas, &pSubmitTbDataRet) < 0) {
|
||||
if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) goto loop_db;
|
||||
}
|
||||
|
||||
if ((pSubmitTbDataRet->source & sourceExcluded) != 0){
|
||||
goto loop_db;
|
||||
}
|
||||
if (pRsp->withTbName) {
|
||||
int64_t uid = pExec->pTqReader->lastBlkUid;
|
||||
if (tqAddTbNameToRsp(pTq, uid, pRsp, taosArrayGetSize(pBlocks)) < 0) {
|
||||
|
|
|
@ -815,7 +815,7 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) {
|
|||
return;
|
||||
}
|
||||
|
||||
SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version};
|
||||
SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version, .source = SOURCE_NULL};
|
||||
code = setDstTableDataUid(pVnode, pTask, pDataBlock, stbFullName, &tbData);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
continue;
|
||||
|
@ -859,7 +859,7 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) {
|
|||
pTask->execInfo.sink.numOfBlocks += 1;
|
||||
uint64_t groupId = pDataBlock->info.id.groupId;
|
||||
|
||||
SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version};
|
||||
SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version, .source = SOURCE_NULL};
|
||||
|
||||
int32_t* index = taosHashGet(pTableIndexMap, &groupId, sizeof(groupId));
|
||||
if (index == NULL) { // no data yet, append it
|
||||
|
|
|
@ -250,7 +250,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
|
|||
.ver = pHead->version,
|
||||
};
|
||||
|
||||
code = tqTaosxScanLog(pTq, pHandle, submit, &taosxRsp, &totalRows);
|
||||
code = tqTaosxScanLog(pTq, pHandle, submit, &taosxRsp, &totalRows, pRequest->sourceExcluded);
|
||||
if (code < 0) {
|
||||
tqError("tmq poll: tqTaosxScanLog error %" PRId64 ", in vgId:%d, subkey %s", pRequest->consumerId, vgId,
|
||||
pRequest->subKey);
|
||||
|
|
|
@ -59,6 +59,7 @@ typedef struct STaskStopInfo {
|
|||
typedef struct {
|
||||
STqOffsetVal currentOffset; // for tmq
|
||||
SMqMetaRsp metaRsp; // for tmq fetching meta
|
||||
int8_t sourceExcluded;
|
||||
int64_t snapshotVer;
|
||||
SSchemaWrapper* schema;
|
||||
char tbName[TSDB_TABLE_NAME_LEN]; // this is the current scan table: todo refactor
|
||||
|
|
|
@ -1152,6 +1152,11 @@ void qStreamSetOpen(qTaskInfo_t tinfo) {
|
|||
pOperator->status = OP_NOT_OPENED;
|
||||
}
|
||||
|
||||
void qStreamSetSourceExcluded(qTaskInfo_t tinfo, int8_t sourceExcluded) {
|
||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
||||
pTaskInfo->streamInfo.sourceExcluded = sourceExcluded;
|
||||
}
|
||||
|
||||
int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subType) {
|
||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
||||
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
|
||||
|
|
|
@ -2010,7 +2010,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
|
|||
if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__LOG) {
|
||||
|
||||
while (1) {
|
||||
bool hasResult = pAPI->tqReaderFn.tqReaderNextBlockInWal(pInfo->tqReader, id);
|
||||
bool hasResult = pAPI->tqReaderFn.tqReaderNextBlockInWal(pInfo->tqReader, id, pTaskInfo->streamInfo.sourceExcluded);
|
||||
|
||||
SSDataBlock* pRes = pAPI->tqReaderFn.tqGetResultBlock(pInfo->tqReader);
|
||||
struct SWalReader* pWalReader = pAPI->tqReaderFn.tqReaderGetWalReader(pInfo->tqReader);
|
||||
|
|
|
@ -211,6 +211,7 @@ static int32_t createTableDataCxt(STableMeta* pTableMeta, SVCreateTbReq** pCreat
|
|||
bool colMode, bool ignoreColVals) {
|
||||
STableDataCxt* pTableCxt = taosMemoryCalloc(1, sizeof(STableDataCxt));
|
||||
if (NULL == pTableCxt) {
|
||||
*pOutput = NULL;
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
|
@ -268,12 +269,8 @@ static int32_t createTableDataCxt(STableMeta* pTableMeta, SVCreateTbReq** pCreat
|
|||
}
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
*pOutput = pTableCxt;
|
||||
qDebug("tableDataCxt created, uid:%" PRId64 ", vgId:%d", pTableMeta->uid, pTableMeta->vgId);
|
||||
} else {
|
||||
taosMemoryFree(pTableCxt);
|
||||
}
|
||||
*pOutput = pTableCxt;
|
||||
qDebug("tableDataCxt created, code:%d, uid:%" PRId64 ", vgId:%d", code, pTableMeta->uid, pTableMeta->vgId);
|
||||
|
||||
return code;
|
||||
}
|
||||
|
@ -288,6 +285,7 @@ static int32_t rebuildTableData(SSubmitTbData* pSrc, SSubmitTbData** pDst) {
|
|||
pTmp->suid = pSrc->suid;
|
||||
pTmp->uid = pSrc->uid;
|
||||
pTmp->sver = pSrc->sver;
|
||||
pTmp->source = pSrc->source;
|
||||
pTmp->pCreateTbReq = NULL;
|
||||
if (pTmp->flags & SUBMIT_REQ_AUTO_CREATE_TABLE) {
|
||||
if (pSrc->pCreateTbReq) {
|
||||
|
@ -344,6 +342,10 @@ int32_t insGetTableDataCxt(SHashObj* pHash, void* id, int32_t idLen, STableMeta*
|
|||
void* pData = *pTableCxt; // deal scan coverity
|
||||
code = taosHashPut(pHash, id, idLen, &pData, POINTER_BYTES);
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
insDestroyTableDataCxt(*pTableCxt);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -651,6 +653,7 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate
|
|||
goto end;
|
||||
}
|
||||
|
||||
pTableCxt->pData->source = SOURCE_TAOSX;
|
||||
if(tmp == NULL){
|
||||
ret = initTableColSubmitData(pTableCxt);
|
||||
if (ret != TSDB_CODE_SUCCESS) {
|
||||
|
|
|
@ -133,13 +133,17 @@ class TDTestCase:
|
|||
if snapshot_value == "true":
|
||||
if offset_value != "earliest" and offset_value != "":
|
||||
if offset_value == "latest":
|
||||
offset_value_list = list(map(lambda x: int(x[-2].replace("wal:", "").replace("earliest", "0").replace("latest", "0").replace(offset_value, "0")), subscription_info))
|
||||
tdSql.checkEqual(sum(offset_value_list) >= 0, True)
|
||||
offset_value_list = list(map(lambda x: (x[-2].replace("wal:", "").replace("earliest", "0").replace("latest", "0").replace(offset_value, "0")), subscription_info))
|
||||
offset_value_list1 = list(map(lambda x: int(x.split("/")[0]), offset_value_list))
|
||||
offset_value_list2 = list(map(lambda x: int(x.split("/")[1]), offset_value_list))
|
||||
tdSql.checkEqual(offset_value_list1 == offset_value_list2, True)
|
||||
tdSql.checkEqual(sum(offset_value_list1) >= 0, True)
|
||||
rows_value_list = list(map(lambda x: int(x[-1]), subscription_info))
|
||||
tdSql.checkEqual(sum(rows_value_list), expected_res)
|
||||
elif offset_value == "none":
|
||||
offset_value_list = list(map(lambda x: x[-2], subscription_info))
|
||||
tdSql.checkEqual(offset_value_list, ['none']*len(subscription_info))
|
||||
offset_value_list1 = list(map(lambda x: (x.split("/")[0]), offset_value_list))
|
||||
tdSql.checkEqual(offset_value_list1, ['none']*len(subscription_info))
|
||||
rows_value_list = list(map(lambda x: x[-1], subscription_info))
|
||||
tdSql.checkEqual(rows_value_list, [0]*len(subscription_info))
|
||||
else:
|
||||
|
@ -151,18 +155,23 @@ class TDTestCase:
|
|||
# tdSql.checkEqual(sum(rows_value_list), expected_res)
|
||||
else:
|
||||
offset_value_list = list(map(lambda x: x[-2], subscription_info))
|
||||
tdSql.checkEqual(offset_value_list, [None]*len(subscription_info))
|
||||
offset_value_list1 = list(map(lambda x: (x.split("/")[0]), offset_value_list))
|
||||
tdSql.checkEqual(offset_value_list1, [None]*len(subscription_info))
|
||||
rows_value_list = list(map(lambda x: x[-1], subscription_info))
|
||||
tdSql.checkEqual(rows_value_list, [None]*len(subscription_info))
|
||||
else:
|
||||
if offset_value != "none":
|
||||
offset_value_list = list(map(lambda x: int(x[-2].replace("wal:", "").replace("earliest", "0").replace("latest", "0").replace(offset_value, "0")), subscription_info))
|
||||
tdSql.checkEqual(sum(offset_value_list) >= 0, True)
|
||||
offset_value_list = list(map(lambda x: (x[-2].replace("wal:", "").replace("earliest", "0").replace("latest", "0").replace(offset_value, "0")), subscription_info))
|
||||
offset_value_list1 = list(map(lambda x: int(x.split("/")[0]), offset_value_list))
|
||||
offset_value_list2 = list(map(lambda x: int(x.split("/")[1]), offset_value_list))
|
||||
tdSql.checkEqual(offset_value_list1 == offset_value_list2, True)
|
||||
tdSql.checkEqual(sum(offset_value_list1) >= 0, True)
|
||||
rows_value_list = list(map(lambda x: int(x[-1]), subscription_info))
|
||||
tdSql.checkEqual(sum(rows_value_list), expected_res)
|
||||
else:
|
||||
offset_value_list = list(map(lambda x: x[-2], subscription_info))
|
||||
tdSql.checkEqual(offset_value_list, ['none']*len(subscription_info))
|
||||
offset_value_list1 = list(map(lambda x: (x.split("/")[0]), offset_value_list))
|
||||
tdSql.checkEqual(offset_value_list1, ['none']*len(subscription_info))
|
||||
rows_value_list = list(map(lambda x: x[-1], subscription_info))
|
||||
tdSql.checkEqual(rows_value_list, [0]*len(subscription_info))
|
||||
tdSql.execute(f"drop topic if exists {topic_name}")
|
||||
|
|
|
@ -11,6 +11,7 @@ from util.sql import *
|
|||
from util.cases import *
|
||||
from util.dnodes import *
|
||||
from util.common import *
|
||||
from taos.tmq import *
|
||||
sys.path.append("./7-tmq")
|
||||
from tmqCommon import *
|
||||
|
||||
|
@ -310,6 +311,43 @@ class TDTestCase:
|
|||
|
||||
return
|
||||
|
||||
def consumeExcluded(self):
|
||||
tdSql.execute(f'create topic topic_excluded as database db_taosx')
|
||||
consumer_dict = {
|
||||
"group.id": "g1",
|
||||
"td.connect.user": "root",
|
||||
"td.connect.pass": "taosdata",
|
||||
"auto.offset.reset": "earliest",
|
||||
"msg.consume.excluded": 1
|
||||
}
|
||||
consumer = Consumer(consumer_dict)
|
||||
|
||||
tdLog.debug("test subscribe topic created by other user")
|
||||
exceptOccured = False
|
||||
try:
|
||||
consumer.subscribe(["topic_excluded"])
|
||||
except TmqError:
|
||||
exceptOccured = True
|
||||
|
||||
if exceptOccured:
|
||||
tdLog.exit(f"subscribe error")
|
||||
|
||||
try:
|
||||
while True:
|
||||
res = consumer.poll(1)
|
||||
if not res:
|
||||
break
|
||||
err = res.error()
|
||||
if err is not None:
|
||||
raise err
|
||||
val = res.value()
|
||||
|
||||
for block in val:
|
||||
print(block.fetchall())
|
||||
|
||||
finally:
|
||||
consumer.close()
|
||||
|
||||
def run(self):
|
||||
tdSql.prepare()
|
||||
self.checkWal1VgroupOnlyMeta()
|
||||
|
@ -324,6 +362,8 @@ class TDTestCase:
|
|||
self.checkSnapshotMultiVgroups()
|
||||
|
||||
self.checkWalMultiVgroupsWithDropTable()
|
||||
# self.consumeExcluded()
|
||||
|
||||
self.checkSnapshotMultiVgroupsWithDropTable()
|
||||
|
||||
def stop(self):
|
||||
|
|
|
@ -909,6 +909,88 @@ void initLogFile() {
|
|||
taosCloseFile(&pFile2);
|
||||
}
|
||||
|
||||
void testConsumeExcluded(int topic_type){
|
||||
TAOS* pConn = use_db();
|
||||
TAOS_RES *pRes = NULL;
|
||||
|
||||
if(topic_type == 1){
|
||||
char *topic = "create topic topic_excluded with meta as database db_taosx";
|
||||
pRes = taos_query(pConn, topic);
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to create topic topic_excluded, reason:%s\n", taos_errstr(pRes));
|
||||
taos_close(pConn);
|
||||
return;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
}else if(topic_type == 2){
|
||||
char *topic = "create topic topic_excluded as select * from stt";
|
||||
pRes = taos_query(pConn, topic);
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to create topic topic_excluded, reason:%s\n", taos_errstr(pRes));
|
||||
taos_close(pConn);
|
||||
return;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
}
|
||||
taos_close(pConn);
|
||||
|
||||
tmq_conf_t* conf = tmq_conf_new();
|
||||
tmq_conf_set(conf, "group.id", "tg2");
|
||||
tmq_conf_set(conf, "client.id", "my app 1");
|
||||
tmq_conf_set(conf, "td.connect.user", "root");
|
||||
tmq_conf_set(conf, "td.connect.pass", "taosdata");
|
||||
tmq_conf_set(conf, "msg.with.table.name", "true");
|
||||
tmq_conf_set(conf, "enable.auto.commit", "true");
|
||||
tmq_conf_set(conf, "auto.offset.reset", "earliest");
|
||||
tmq_conf_set(conf, "msg.consume.excluded", "1");
|
||||
|
||||
|
||||
tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
|
||||
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
|
||||
assert(tmq);
|
||||
tmq_conf_destroy(conf);
|
||||
|
||||
tmq_list_t* topic_list = tmq_list_new();
|
||||
tmq_list_append(topic_list, "topic_excluded");
|
||||
|
||||
int32_t code = 0;
|
||||
|
||||
if ((code = tmq_subscribe(tmq, topic_list))) {
|
||||
fprintf(stderr, "%% Failed to start consuming topics: %s\n", tmq_err2str(code));
|
||||
printf("subscribe err\n");
|
||||
return;
|
||||
}
|
||||
while (running) {
|
||||
TAOS_RES* msg = tmq_consumer_poll(tmq, 1000);
|
||||
if (msg) {
|
||||
tmq_raw_data raw = {0};
|
||||
tmq_get_raw(msg, &raw);
|
||||
if(topic_type == 1){
|
||||
assert(raw.raw_type != 2 && raw.raw_type != 4);
|
||||
}else if(topic_type == 2){
|
||||
assert(0);
|
||||
}
|
||||
// printf("write raw data type: %d\n", raw.raw_type);
|
||||
tmq_free_raw(raw);
|
||||
|
||||
taos_free_result(msg);
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
tmq_consumer_close(tmq);
|
||||
tmq_list_destroy(topic_list);
|
||||
|
||||
pConn = use_db();
|
||||
pRes = taos_query(pConn, "drop topic if exists topic_excluded");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("error in drop topic, reason:%s\n", taos_errstr(pRes));
|
||||
taos_close(pConn);
|
||||
return;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
}
|
||||
int main(int argc, char* argv[]) {
|
||||
for (int32_t i = 1; i < argc; i++) {
|
||||
if (strcmp(argv[i], "-c") == 0) {
|
||||
|
@ -942,5 +1024,8 @@ int main(int argc, char* argv[]) {
|
|||
tmq_list_t* topic_list = build_topic_list();
|
||||
basic_consume_loop(tmq, topic_list);
|
||||
tmq_list_destroy(topic_list);
|
||||
|
||||
testConsumeExcluded(1);
|
||||
testConsumeExcluded(2);
|
||||
taosCloseFile(&g_fp);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue