fix:[TD-28590]add logic for consume excluded

This commit is contained in:
wangmm0220 2024-02-06 18:14:06 +08:00
parent 871ebfa7ba
commit 478e1a67ae
7 changed files with 234 additions and 155 deletions

View File

@ -3768,6 +3768,7 @@ typedef struct {
int32_t vgId; int32_t vgId;
STqOffsetVal offset; STqOffsetVal offset;
int64_t rows; int64_t rows;
int64_t ever;
} OffsetRows; } OffsetRows;
typedef struct { typedef struct {

View File

@ -789,7 +789,6 @@ void tmqSendHbReq(void* param, void* tmrId) {
req.consumerId = tmq->consumerId; req.consumerId = tmq->consumerId;
req.epoch = tmq->epoch; req.epoch = tmq->epoch;
taosRLockLatch(&tmq->lock); taosRLockLatch(&tmq->lock);
// if(tmq->needReportOffsetRows){
req.topics = taosArrayInit(taosArrayGetSize(tmq->clientTopics), sizeof(TopicOffsetRows)); req.topics = taosArrayInit(taosArrayGetSize(tmq->clientTopics), sizeof(TopicOffsetRows));
for(int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++){ for(int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++){
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i); SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
@ -802,14 +801,14 @@ void tmqSendHbReq(void* param, void* tmrId) {
OffsetRows* offRows = taosArrayReserve(data->offsetRows, 1); OffsetRows* offRows = taosArrayReserve(data->offsetRows, 1);
offRows->vgId = pVg->vgId; offRows->vgId = pVg->vgId;
offRows->rows = pVg->numOfRows; offRows->rows = pVg->numOfRows;
offRows->offset = pVg->offsetInfo.beginOffset; offRows->offset = pVg->offsetInfo.endOffset;
offRows->ever = pVg->offsetInfo.walVerEnd;
char buf[TSDB_OFFSET_LEN] = {0}; char buf[TSDB_OFFSET_LEN] = {0};
tFormatOffset(buf, TSDB_OFFSET_LEN, &offRows->offset); tFormatOffset(buf, TSDB_OFFSET_LEN, &offRows->offset);
tscInfo("consumer:0x%" PRIx64 ",report offset: vgId:%d, offset:%s, rows:%"PRId64, tmq->consumerId, offRows->vgId, buf, offRows->rows); tscInfo("consumer:0x%" PRIx64 ",report offset, group:%s vgId:%d, offset:%s/%"PRId64", rows:%"PRId64,
tmq->consumerId, tmq->groupId, offRows->vgId, buf, offRows->ever, offRows->rows);
} }
} }
// tmq->needReportOffsetRows = false;
// }
taosRUnLockLatch(&tmq->lock); taosRUnLockLatch(&tmq->lock);
int32_t tlen = tSerializeSMqHbReq(NULL, 0, &req); int32_t tlen = tSerializeSMqHbReq(NULL, 0, &req);

View File

@ -6252,6 +6252,7 @@ int32_t tSerializeSMqHbReq(void *buf, int32_t bufLen, SMqHbReq *pReq) {
if (tEncodeI32(&encoder, offRows->vgId) < 0) return -1; if (tEncodeI32(&encoder, offRows->vgId) < 0) return -1;
if (tEncodeI64(&encoder, offRows->rows) < 0) return -1; if (tEncodeI64(&encoder, offRows->rows) < 0) return -1;
if (tEncodeSTqOffsetVal(&encoder, &offRows->offset) < 0) return -1; if (tEncodeSTqOffsetVal(&encoder, &offRows->offset) < 0) return -1;
if (tEncodeI64(&encoder, offRows->ever) < 0) return -1;
} }
} }
@ -6289,6 +6290,7 @@ int32_t tDeserializeSMqHbReq(void *buf, int32_t bufLen, SMqHbReq *pReq) {
if (tDecodeI32(&decoder, &offRows->vgId) < 0) return -1; if (tDecodeI32(&decoder, &offRows->vgId) < 0) return -1;
if (tDecodeI64(&decoder, &offRows->rows) < 0) return -1; if (tDecodeI64(&decoder, &offRows->rows) < 0) return -1;
if (tDecodeSTqOffsetVal(&decoder, &offRows->offset) < 0) return -1; if (tDecodeSTqOffsetVal(&decoder, &offRows->offset) < 0) return -1;
if (tDecodeI64(&decoder, &offRows->ever) < 0) return -1;
} }
} }
} }

View File

@ -422,27 +422,12 @@ void *tDecodeSMqConsumerObj(const void *buf, SMqConsumerObj *pConsumer, int8_t s
return (void *)buf; return (void *)buf;
} }
// SMqConsumerEp *tCloneSMqConsumerEp(const SMqConsumerEp *pConsumerEpOld) { int32_t tEncodeOffRows(void **buf, SArray *offsetRows){
// SMqConsumerEp *pConsumerEpNew = taosMemoryMalloc(sizeof(SMqConsumerEp));
// if (pConsumerEpNew == NULL) return NULL;
// pConsumerEpNew->consumerId = pConsumerEpOld->consumerId;
// pConsumerEpNew->vgs = taosArrayDup(pConsumerEpOld->vgs, NULL);
// return pConsumerEpNew;
// }
//
// void tDeleteSMqConsumerEp(void *data) {
// SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)data;
// taosArrayDestroy(pConsumerEp->vgs);
// }
int32_t tEncodeSMqConsumerEp(void **buf, const SMqConsumerEp *pConsumerEp) {
int32_t tlen = 0; int32_t tlen = 0;
tlen += taosEncodeFixedI64(buf, pConsumerEp->consumerId); int32_t szVgs = taosArrayGetSize(offsetRows);
tlen += taosEncodeArray(buf, pConsumerEp->vgs, (FEncode)tEncodeSMqVgEp);
int32_t szVgs = taosArrayGetSize(pConsumerEp->offsetRows);
tlen += taosEncodeFixedI32(buf, szVgs); tlen += taosEncodeFixedI32(buf, szVgs);
for (int32_t j = 0; j < szVgs; ++j) { for (int32_t j = 0; j < szVgs; ++j) {
OffsetRows *offRows = taosArrayGet(pConsumerEp->offsetRows, j); OffsetRows *offRows = taosArrayGet(offsetRows, j);
tlen += taosEncodeFixedI32(buf, offRows->vgId); tlen += taosEncodeFixedI32(buf, offRows->vgId);
tlen += taosEncodeFixedI64(buf, offRows->rows); tlen += taosEncodeFixedI64(buf, offRows->rows);
tlen += taosEncodeFixedI8(buf, offRows->offset.type); tlen += taosEncodeFixedI8(buf, offRows->offset.type);
@ -454,29 +439,29 @@ int32_t tEncodeSMqConsumerEp(void **buf, const SMqConsumerEp *pConsumerEp) {
} else { } else {
// do nothing // do nothing
} }
tlen += taosEncodeFixedI64(buf, offRows->ever);
} }
// #if 0
// int32_t sz = taosArrayGetSize(pConsumerEp->vgs);
// tlen += taosEncodeFixedI32(buf, sz);
// for (int32_t i = 0; i < sz; i++) {
// SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, i);
// tlen += tEncodeSMqVgEp(buf, pVgEp);
// }
// #endif
return tlen; return tlen;
} }
void *tDecodeSMqConsumerEp(const void *buf, SMqConsumerEp *pConsumerEp, int8_t sver) { int32_t tEncodeSMqConsumerEp(void **buf, const SMqConsumerEp *pConsumerEp) {
buf = taosDecodeFixedI64(buf, &pConsumerEp->consumerId); int32_t tlen = 0;
buf = taosDecodeArray(buf, &pConsumerEp->vgs, (FDecode)tDecodeSMqVgEp, sizeof(SMqVgEp), sver); tlen += taosEncodeFixedI64(buf, pConsumerEp->consumerId);
if (sver > 1) { tlen += taosEncodeArray(buf, pConsumerEp->vgs, (FEncode)tEncodeSMqVgEp);
return tlen + tEncodeOffRows(buf, pConsumerEp->offsetRows);
}
void *tDecodeOffRows(const void *buf, SArray **offsetRows, int8_t sver){
int32_t szVgs = 0; int32_t szVgs = 0;
buf = taosDecodeFixedI32(buf, &szVgs); buf = taosDecodeFixedI32(buf, &szVgs);
if (szVgs > 0) { if (szVgs > 0) {
pConsumerEp->offsetRows = taosArrayInit(szVgs, sizeof(OffsetRows)); *offsetRows = taosArrayInit(szVgs, sizeof(OffsetRows));
if (NULL == pConsumerEp->offsetRows) return NULL; if (NULL == *offsetRows) return NULL;
for (int32_t j = 0; j < szVgs; ++j) { for (int32_t j = 0; j < szVgs; ++j) {
OffsetRows *offRows = taosArrayReserve(pConsumerEp->offsetRows, 1); OffsetRows *offRows = taosArrayReserve(*offsetRows, 1);
buf = taosDecodeFixedI32(buf, &offRows->vgId); buf = taosDecodeFixedI32(buf, &offRows->vgId);
buf = taosDecodeFixedI64(buf, &offRows->rows); buf = taosDecodeFixedI64(buf, &offRows->rows);
buf = taosDecodeFixedI8(buf, &offRows->offset.type); buf = taosDecodeFixedI8(buf, &offRows->offset.type);
@ -488,19 +473,20 @@ void *tDecodeSMqConsumerEp(const void *buf, SMqConsumerEp *pConsumerEp, int8_t s
} else { } else {
// do nothing // do nothing
} }
if(sver > 2){
buf = taosDecodeFixedI64(buf, &offRows->ever);
} }
} }
} }
// #if 0 return (void *)buf;
// int32_t sz; }
// buf = taosDecodeFixedI32(buf, &sz);
// pConsumerEp->vgs = taosArrayInit(sz, sizeof(void *)); void *tDecodeSMqConsumerEp(const void *buf, SMqConsumerEp *pConsumerEp, int8_t sver) {
// for (int32_t i = 0; i < sz; i++) { buf = taosDecodeFixedI64(buf, &pConsumerEp->consumerId);
// SMqVgEp *pVgEp = taosMemoryMalloc(sizeof(SMqVgEp)); buf = taosDecodeArray(buf, &pConsumerEp->vgs, (FDecode)tDecodeSMqVgEp, sizeof(SMqVgEp), sver);
// buf = tDecodeSMqVgEp(buf, pVgEp); if (sver > 1) {
// taosArrayPush(pConsumerEp->vgs, &pVgEp); buf = tDecodeOffRows(buf, &pConsumerEp->offsetRows, sver);
// } }
// #endif
return (void *)buf; return (void *)buf;
} }
@ -596,22 +582,7 @@ int32_t tEncodeSubscribeObj(void **buf, const SMqSubscribeObj *pSub) {
tlen += taosEncodeArray(buf, pSub->unassignedVgs, (FEncode)tEncodeSMqVgEp); tlen += taosEncodeArray(buf, pSub->unassignedVgs, (FEncode)tEncodeSMqVgEp);
tlen += taosEncodeString(buf, pSub->dbName); tlen += taosEncodeString(buf, pSub->dbName);
int32_t szVgs = taosArrayGetSize(pSub->offsetRows); tlen += tEncodeOffRows(buf, pSub->offsetRows);
tlen += taosEncodeFixedI32(buf, szVgs);
for (int32_t j = 0; j < szVgs; ++j) {
OffsetRows *offRows = taosArrayGet(pSub->offsetRows, j);
tlen += taosEncodeFixedI32(buf, offRows->vgId);
tlen += taosEncodeFixedI64(buf, offRows->rows);
tlen += taosEncodeFixedI8(buf, offRows->offset.type);
if (offRows->offset.type == TMQ_OFFSET__SNAPSHOT_DATA || offRows->offset.type == TMQ_OFFSET__SNAPSHOT_META) {
tlen += taosEncodeFixedI64(buf, offRows->offset.uid);
tlen += taosEncodeFixedI64(buf, offRows->offset.ts);
} else if (offRows->offset.type == TMQ_OFFSET__LOG) {
tlen += taosEncodeFixedI64(buf, offRows->offset.version);
} else {
// do nothing
}
}
tlen += taosEncodeString(buf, pSub->qmsg); tlen += taosEncodeString(buf, pSub->qmsg);
return tlen; return tlen;
} }
@ -639,26 +610,7 @@ void *tDecodeSubscribeObj(const void *buf, SMqSubscribeObj *pSub, int8_t sver) {
buf = taosDecodeStringTo(buf, pSub->dbName); buf = taosDecodeStringTo(buf, pSub->dbName);
if (sver > 1) { if (sver > 1) {
int32_t szVgs = 0; buf = tDecodeOffRows(buf, &pSub->offsetRows, sver);
buf = taosDecodeFixedI32(buf, &szVgs);
if (szVgs > 0) {
pSub->offsetRows = taosArrayInit(szVgs, sizeof(OffsetRows));
if (NULL == pSub->offsetRows) return NULL;
for (int32_t j = 0; j < szVgs; ++j) {
OffsetRows *offRows = taosArrayReserve(pSub->offsetRows, 1);
buf = taosDecodeFixedI32(buf, &offRows->vgId);
buf = taosDecodeFixedI64(buf, &offRows->rows);
buf = taosDecodeFixedI8(buf, &offRows->offset.type);
if (offRows->offset.type == TMQ_OFFSET__SNAPSHOT_DATA || offRows->offset.type == TMQ_OFFSET__SNAPSHOT_META) {
buf = taosDecodeFixedI64(buf, &offRows->offset.uid);
buf = taosDecodeFixedI64(buf, &offRows->offset.ts);
} else if (offRows->offset.type == TMQ_OFFSET__LOG) {
buf = taosDecodeFixedI64(buf, &offRows->offset.version);
} else {
// do nothing
}
}
}
buf = taosDecodeString(buf, &pSub->qmsg); buf = taosDecodeString(buf, &pSub->qmsg);
} else { } else {
pSub->qmsg = taosStrdup(""); pSub->qmsg = taosStrdup("");

View File

@ -24,7 +24,7 @@
#include "tcompare.h" #include "tcompare.h"
#include "tname.h" #include "tname.h"
#define MND_SUBSCRIBE_VER_NUMBER 2 #define MND_SUBSCRIBE_VER_NUMBER 3
#define MND_SUBSCRIBE_RESERVE_SIZE 64 #define MND_SUBSCRIBE_RESERVE_SIZE 64
#define MND_CONSUMER_LOST_HB_CNT 6 #define MND_CONSUMER_LOST_HB_CNT 6
@ -530,7 +530,6 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
} }
} }
// if(taosHashGetSize(pOutput->pSub->consumerHash) == 0) { // if all consumer is removed
SMqSubscribeObj *pSub = mndAcquireSubscribeByKey(pMnode, pInput->pRebInfo->key); // put all offset rows SMqSubscribeObj *pSub = mndAcquireSubscribeByKey(pMnode, pInput->pRebInfo->key); // put all offset rows
if (pSub) { if (pSub) {
taosRLockLatch(&pSub->lock); taosRLockLatch(&pSub->lock);
@ -562,6 +561,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
if (d1->vgId == d2->vgId) { if (d1->vgId == d2->vgId) {
d2->rows += d1->rows; d2->rows += d1->rows;
d2->offset = d1->offset; d2->offset = d1->offset;
d2->ever = d1->ever;
find = true; find = true;
mInfo("pSub->offsetRows add vgId:%d, after:%"PRId64", before:%"PRId64, d2->vgId, d2->rows, d1->rows); mInfo("pSub->offsetRows add vgId:%d, after:%"PRId64", before:%"PRId64, d2->vgId, d2->rows, d1->rows);
break; break;
@ -574,7 +574,6 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
} }
taosRUnLockLatch(&pSub->lock); taosRUnLockLatch(&pSub->lock);
mndReleaseSubscribe(pMnode, pSub); mndReleaseSubscribe(pMnode, pSub);
// }
} }
// 8. generate logs // 8. generate logs
@ -1405,8 +1404,9 @@ static int32_t buildResult(SSDataBlock *pBlock, int32_t* numOfRows, int64_t cons
} }
if(data){ if(data){
// vg id // vg id
char buf[TSDB_OFFSET_LEN + VARSTR_HEADER_SIZE] = {0}; char buf[TSDB_OFFSET_LEN*2 + VARSTR_HEADER_SIZE] = {0};
tFormatOffset(varDataVal(buf), TSDB_OFFSET_LEN, &data->offset); tFormatOffset(varDataVal(buf), TSDB_OFFSET_LEN, &data->offset);
sprintf(varDataVal(buf) + strlen(varDataVal(buf)), "/%"PRId64, data->ever);
varDataSetLen(buf, strlen(varDataVal(buf))); varDataSetLen(buf, strlen(varDataVal(buf)));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, *numOfRows, (const char *)buf, false); colDataSetVal(pColInfo, *numOfRows, (const char *)buf, false);

View File

@ -11,6 +11,7 @@ from util.sql import *
from util.cases import * from util.cases import *
from util.dnodes import * from util.dnodes import *
from util.common import * from util.common import *
from taos.tmq import *
sys.path.append("./7-tmq") sys.path.append("./7-tmq")
from tmqCommon import * from tmqCommon import *
@ -310,6 +311,43 @@ class TDTestCase:
return return
def consumeExcluded(self):
tdSql.execute(f'create topic topic_excluded as database db_taosx')
consumer_dict = {
"group.id": "g1",
"td.connect.user": "root",
"td.connect.pass": "taosdata",
"auto.offset.reset": "earliest",
"msg.consume.excluded": 1
}
consumer = Consumer(consumer_dict)
tdLog.debug("test subscribe topic created by other user")
exceptOccured = False
try:
consumer.subscribe(["topic_excluded"])
except TmqError:
exceptOccured = True
if exceptOccured:
tdLog.exit(f"subscribe error")
try:
while True:
res = consumer.poll(1)
if not res:
break
err = res.error()
if err is not None:
raise err
val = res.value()
for block in val:
print(block.fetchall())
finally:
consumer.close()
def run(self): def run(self):
tdSql.prepare() tdSql.prepare()
self.checkWal1VgroupOnlyMeta() self.checkWal1VgroupOnlyMeta()
@ -324,6 +362,8 @@ class TDTestCase:
self.checkSnapshotMultiVgroups() self.checkSnapshotMultiVgroups()
self.checkWalMultiVgroupsWithDropTable() self.checkWalMultiVgroupsWithDropTable()
# self.consumeExcluded()
self.checkSnapshotMultiVgroupsWithDropTable() self.checkSnapshotMultiVgroupsWithDropTable()
def stop(self): def stop(self):

View File

@ -909,6 +909,88 @@ void initLogFile() {
taosCloseFile(&pFile2); taosCloseFile(&pFile2);
} }
void testConsumeExcluded(int topic_type){
TAOS* pConn = use_db();
TAOS_RES *pRes = NULL;
if(topic_type == 1){
char *topic = "create topic topic_excluded with meta as database db_taosx";
pRes = taos_query(pConn, topic);
if (taos_errno(pRes) != 0) {
printf("failed to create topic topic_excluded, reason:%s\n", taos_errstr(pRes));
taos_close(pConn);
return;
}
taos_free_result(pRes);
}else if(topic_type == 2){
char *topic = "create topic topic_excluded as select * from stt";
pRes = taos_query(pConn, topic);
if (taos_errno(pRes) != 0) {
printf("failed to create topic topic_excluded, reason:%s\n", taos_errstr(pRes));
taos_close(pConn);
return;
}
taos_free_result(pRes);
}
taos_close(pConn);
tmq_conf_t* conf = tmq_conf_new();
tmq_conf_set(conf, "group.id", "tg2");
tmq_conf_set(conf, "client.id", "my app 1");
tmq_conf_set(conf, "td.connect.user", "root");
tmq_conf_set(conf, "td.connect.pass", "taosdata");
tmq_conf_set(conf, "msg.with.table.name", "true");
tmq_conf_set(conf, "enable.auto.commit", "true");
tmq_conf_set(conf, "auto.offset.reset", "earliest");
tmq_conf_set(conf, "msg.consume.excluded", "1");
tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
assert(tmq);
tmq_conf_destroy(conf);
tmq_list_t* topic_list = tmq_list_new();
tmq_list_append(topic_list, "topic_excluded");
int32_t code = 0;
if ((code = tmq_subscribe(tmq, topic_list))) {
fprintf(stderr, "%% Failed to start consuming topics: %s\n", tmq_err2str(code));
printf("subscribe err\n");
return;
}
while (running) {
TAOS_RES* msg = tmq_consumer_poll(tmq, 1000);
if (msg) {
tmq_raw_data raw = {0};
tmq_get_raw(msg, &raw);
if(topic_type == 1){
assert(raw.raw_type != 2 && raw.raw_type != 4);
}else if(topic_type == 2){
assert(0);
}
// printf("write raw data type: %d\n", raw.raw_type);
tmq_free_raw(raw);
taos_free_result(msg);
} else {
break;
}
}
tmq_consumer_close(tmq);
tmq_list_destroy(topic_list);
pConn = use_db();
pRes = taos_query(pConn, "drop topic if exists topic_excluded");
if (taos_errno(pRes) != 0) {
printf("error in drop topic, reason:%s\n", taos_errstr(pRes));
taos_close(pConn);
return;
}
taos_free_result(pRes);
}
int main(int argc, char* argv[]) { int main(int argc, char* argv[]) {
for (int32_t i = 1; i < argc; i++) { for (int32_t i = 1; i < argc; i++) {
if (strcmp(argv[i], "-c") == 0) { if (strcmp(argv[i], "-c") == 0) {
@ -942,5 +1024,8 @@ int main(int argc, char* argv[]) {
tmq_list_t* topic_list = build_topic_list(); tmq_list_t* topic_list = build_topic_list();
basic_consume_loop(tmq, topic_list); basic_consume_loop(tmq, topic_list);
tmq_list_destroy(topic_list); tmq_list_destroy(topic_list);
testConsumeExcluded(1);
testConsumeExcluded(2);
taosCloseFile(&g_fp); taosCloseFile(&g_fp);
} }