enh(tmq): add check for consumer Id;

This commit is contained in:
Haojun Liao 2023-04-23 11:51:43 +08:00
parent 3dbe208712
commit 9cf89c6b6f
5 changed files with 103 additions and 51 deletions

View File

@ -2805,6 +2805,7 @@ typedef struct {
} SMqOffset; } SMqOffset;
typedef struct { typedef struct {
int64_t consumerId;
int32_t num; int32_t num;
SMqOffset* offsets; SMqOffset* offsets;
} SMqCMCommitOffsetReq; } SMqCMCommitOffsetReq;
@ -2872,6 +2873,14 @@ typedef struct {
int32_t tEncodeSTqOffset(SEncoder* pEncoder, const STqOffset* pOffset); int32_t tEncodeSTqOffset(SEncoder* pEncoder, const STqOffset* pOffset);
int32_t tDecodeSTqOffset(SDecoder* pDecoder, STqOffset* pOffset); int32_t tDecodeSTqOffset(SDecoder* pDecoder, STqOffset* pOffset);
typedef struct SMqVgOffset {
int64_t consumerId;
STqOffset offset;
} SMqVgOffset;
int32_t tEncodeMqVgOffset(SEncoder* pEncoder, const SMqVgOffset* pOffset);
int32_t tDecodeMqVgOffset(SDecoder* pDecoder, SMqVgOffset* pOffset);
typedef struct { typedef struct {
char name[TSDB_TABLE_FNAME_LEN]; char name[TSDB_TABLE_FNAME_LEN];
char stb[TSDB_TABLE_FNAME_LEN]; char stb[TSDB_TABLE_FNAME_LEN];

View File

@ -228,7 +228,7 @@ typedef struct {
typedef struct { typedef struct {
SMqCommitCbParamSet* params; SMqCommitCbParamSet* params;
STqOffset* pOffset; SMqVgOffset* pOffset;
char topicName[TSDB_TOPIC_FNAME_LEN]; char topicName[TSDB_TOPIC_FNAME_LEN];
int32_t vgId; int32_t vgId;
tmq_t* pTmq; tmq_t* pTmq;
@ -492,21 +492,22 @@ static int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) {
static int32_t doSendCommitMsg(tmq_t* tmq, SMqClientVg* pVg, const char* pTopicName, SMqCommitCbParamSet* pParamSet, static int32_t doSendCommitMsg(tmq_t* tmq, SMqClientVg* pVg, const char* pTopicName, SMqCommitCbParamSet* pParamSet,
int32_t index, int32_t totalVgroups, int32_t type) { int32_t index, int32_t totalVgroups, int32_t type) {
STqOffset* pOffset = taosMemoryCalloc(1, sizeof(STqOffset)); SMqVgOffset* pOffset = taosMemoryCalloc(1, sizeof(SMqVgOffset));
if (pOffset == NULL) { if (pOffset == NULL) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
pOffset->val = pVg->offsetInfo.currentOffset; pOffset->consumerId = tmq->consumerId;
pOffset->offset.val = pVg->offsetInfo.currentOffset;
int32_t groupLen = strlen(tmq->groupId); int32_t groupLen = strlen(tmq->groupId);
memcpy(pOffset->subKey, tmq->groupId, groupLen); memcpy(pOffset->offset.subKey, tmq->groupId, groupLen);
pOffset->subKey[groupLen] = TMQ_SEPARATOR; pOffset->offset.subKey[groupLen] = TMQ_SEPARATOR;
strcpy(pOffset->subKey + groupLen + 1, pTopicName); strcpy(pOffset->offset.subKey + groupLen + 1, pTopicName);
int32_t len = 0; int32_t len = 0;
int32_t code = 0; int32_t code = 0;
tEncodeSize(tEncodeSTqOffset, pOffset, len, code); tEncodeSize(tEncodeMqVgOffset, pOffset, len, code);
if (code < 0) { if (code < 0) {
return TSDB_CODE_INVALID_PARA; return TSDB_CODE_INVALID_PARA;
} }
@ -523,7 +524,7 @@ static int32_t doSendCommitMsg(tmq_t* tmq, SMqClientVg* pVg, const char* pTopicN
SEncoder encoder; SEncoder encoder;
tEncoderInit(&encoder, abuf, len); tEncoderInit(&encoder, abuf, len);
tEncodeSTqOffset(&encoder, pOffset); tEncodeMqVgOffset(&encoder, pOffset);
tEncoderClear(&encoder); tEncoderClear(&encoder);
// build param // build param
@ -564,12 +565,12 @@ static int32_t doSendCommitMsg(tmq_t* tmq, SMqClientVg* pVg, const char* pTopicN
SEp* pEp = GET_ACTIVE_EP(&pVg->epSet); SEp* pEp = GET_ACTIVE_EP(&pVg->epSet);
char offsetBuf[80] = {0}; char offsetBuf[80] = {0};
tFormatOffset(offsetBuf, tListLen(offsetBuf), &pOffset->val); tFormatOffset(offsetBuf, tListLen(offsetBuf), &pOffset->offset.val);
char commitBuf[80] = {0}; char commitBuf[80] = {0};
tFormatOffset(commitBuf, tListLen(commitBuf), &pVg->offsetInfo.committedOffset); tFormatOffset(commitBuf, tListLen(commitBuf), &pVg->offsetInfo.committedOffset);
tscDebug("consumer:0x%" PRIx64 " topic:%s on vgId:%d send offset:%s prev:%s, ep:%s:%d, ordinal:%d/%d, req:0x%" PRIx64, tscDebug("consumer:0x%" PRIx64 " topic:%s on vgId:%d send offset:%s prev:%s, ep:%s:%d, ordinal:%d/%d, req:0x%" PRIx64,
tmq->consumerId, pOffset->subKey, pVg->vgId, offsetBuf, commitBuf, pEp->fqdn, pEp->port, index + 1, tmq->consumerId, pOffset->offset.subKey, pVg->vgId, offsetBuf, commitBuf, pEp->fqdn, pEp->port, index + 1,
totalVgroups, pMsgSendInfo->requestId); totalVgroups, pMsgSendInfo->requestId);
int64_t transporterId = 0; int64_t transporterId = 0;
@ -2506,6 +2507,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
terrno = code; terrno = code;
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
taosMemoryFree(*assignment); taosMemoryFree(*assignment);
*assignment = NULL;
*numOfAssignment = 0; *numOfAssignment = 0;
} else { } else {
int32_t num = taosArrayGetSize(pCommon->pList); int32_t num = taosArrayGetSize(pCommon->pList);

View File

@ -1082,7 +1082,7 @@ TEST(clientCase, sub_tb_test) {
tmq_conf_t* conf = tmq_conf_new(); tmq_conf_t* conf = tmq_conf_new();
tmq_conf_set(conf, "enable.auto.commit", "false"); tmq_conf_set(conf, "enable.auto.commit", "false");
tmq_conf_set(conf, "auto.commit.interval.ms", "1000"); tmq_conf_set(conf, "auto.commit.interval.ms", "1000");
tmq_conf_set(conf, "group.id", "cgrpName45"); tmq_conf_set(conf, "group.id", "cgrpName1024");
tmq_conf_set(conf, "td.connect.user", "root"); tmq_conf_set(conf, "td.connect.user", "root");
tmq_conf_set(conf, "td.connect.pass", "taosdata"); tmq_conf_set(conf, "td.connect.pass", "taosdata");
tmq_conf_set(conf, "auto.offset.reset", "earliest"); tmq_conf_set(conf, "auto.offset.reset", "earliest");
@ -1113,8 +1113,14 @@ TEST(clientCase, sub_tb_test) {
tmq_topic_assignment* pAssign = NULL; tmq_topic_assignment* pAssign = NULL;
int32_t numOfAssign = 0; int32_t numOfAssign = 0;
// TAOS_RES* p = tmq_consumer_poll(tmq, timeout);
int32_t code = tmq_get_topic_assignment(tmq, "topic_t1", &pAssign, &numOfAssign); int32_t code = tmq_get_topic_assignment(tmq, "topic_t1", &pAssign, &numOfAssign);
if (code != 0) {
printf("error occurs:%s\n", tmq_err2str(code));
tmq_consumer_close(tmq);
taos_close(pConn);
fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows);
return;
}
while (1) { while (1) {
TAOS_RES* pRes = tmq_consumer_poll(tmq, timeout); TAOS_RES* pRes = tmq_consumer_poll(tmq, timeout);

View File

@ -6909,6 +6909,18 @@ int32_t tDecodeSTqOffset(SDecoder *pDecoder, STqOffset *pOffset) {
return 0; return 0;
} }
int32_t tEncodeMqVgOffset(SEncoder* pEncoder, const SMqVgOffset* pOffset) {
if (tEncodeI64(pEncoder, pOffset->consumerId) < 0) return -1;
if (tEncodeSTqOffset(pEncoder, &pOffset->offset) < 0) return -1;
return 0;
}
int32_t tDecodeMqVgOffset(SDecoder* pDecoder, SMqVgOffset* pOffset) {
if (tDecodeI64(pDecoder, &pOffset->consumerId) < 0) return -1;
if (tDecodeSTqOffset(pDecoder, &pOffset->offset) < 0) return -1;
return 0;
}
int32_t tEncodeSTqCheckInfo(SEncoder *pEncoder, const STqCheckInfo *pInfo) { int32_t tEncodeSTqCheckInfo(SEncoder *pEncoder, const STqCheckInfo *pInfo) {
if (tEncodeCStr(pEncoder, pInfo->topic) < 0) return -1; if (tEncodeCStr(pEncoder, pInfo->topic) < 0) return -1;
if (tEncodeI64(pEncoder, pInfo->ntbUid) < 0) return -1; if (tEncodeI64(pEncoder, pInfo->ntbUid) < 0) return -1;

View File

@ -191,46 +191,48 @@ int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq*
} }
int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
STqOffset offset = {0}; SMqVgOffset vgOffset = {0};
int32_t vgId = TD_VID(pTq->pVnode); int32_t vgId = TD_VID(pTq->pVnode);
SDecoder decoder; SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)msg, msgLen); tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
if (tDecodeSTqOffset(&decoder, &offset) < 0) { if (tDecodeMqVgOffset(&decoder, &vgOffset) < 0) {
return -1; return -1;
} }
tDecoderClear(&decoder); tDecoderClear(&decoder);
if (offset.val.type == TMQ_OFFSET__SNAPSHOT_DATA || offset.val.type == TMQ_OFFSET__SNAPSHOT_META) { STqOffset* pOffset = &vgOffset.offset;
if (pOffset->val.type == TMQ_OFFSET__SNAPSHOT_DATA || pOffset->val.type == TMQ_OFFSET__SNAPSHOT_META) {
tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:snapshot) uid:%" PRId64 ", ts:%" PRId64, tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:snapshot) uid:%" PRId64 ", ts:%" PRId64,
offset.subKey, vgId, offset.val.uid, offset.val.ts); pOffset->subKey, vgId, pOffset->val.uid, pOffset->val.ts);
} else if (offset.val.type == TMQ_OFFSET__LOG) { } else if (pOffset->val.type == TMQ_OFFSET__LOG) {
tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:log) version:%" PRId64, offset.subKey, vgId, tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:log) version:%" PRId64, pOffset->subKey, vgId,
offset.val.version); pOffset->val.version);
if (offset.val.version + 1 == sversion) { if (pOffset->val.version + 1 == sversion) {
offset.val.version += 1; pOffset->val.version += 1;
} }
} else { } else {
tqError("invalid commit offset type:%d", offset.val.type); tqError("invalid commit offset type:%d", pOffset->val.type);
return -1; return -1;
} }
STqOffset* pSavedOffset = tqOffsetRead(pTq->pOffsetStore, offset.subKey); STqOffset* pSavedOffset = tqOffsetRead(pTq->pOffsetStore, pOffset->subKey);
if (pSavedOffset != NULL && tqOffsetLessOrEqual(&offset, pSavedOffset)) { if (pSavedOffset != NULL && tqOffsetLessOrEqual(pOffset, pSavedOffset)) {
tqDebug("not update the offset, vgId:%d sub:%s since committed:%" PRId64 " less than/equal to existed:%" PRId64, tqDebug("not update the offset, vgId:%d sub:%s since committed:%" PRId64 " less than/equal to existed:%" PRId64,
vgId, offset.subKey, offset.val.version, pSavedOffset->val.version); vgId, pOffset->subKey, pOffset->val.version, pSavedOffset->val.version);
return 0; // no need to update the offset value return 0; // no need to update the offset value
} }
// save the new offset value // save the new offset value
if (tqOffsetWrite(pTq->pOffsetStore, &offset) < 0) { if (tqOffsetWrite(pTq->pOffsetStore, pOffset) < 0) {
return -1; return -1;
} }
if (offset.val.type == TMQ_OFFSET__LOG) { if (pOffset->val.type == TMQ_OFFSET__LOG) {
STqHandle* pHandle = taosHashGet(pTq->pHandle, offset.subKey, strlen(offset.subKey)); STqHandle* pHandle = taosHashGet(pTq->pHandle, pOffset->subKey, strlen(pOffset->subKey));
if (pHandle && (walRefVer(pHandle->pRef, offset.val.version) < 0)) { if (pHandle && (walRefVer(pHandle->pRef, pOffset->val.version) < 0)) {
return -1; return -1;
} }
} }
@ -239,50 +241,71 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t sversion, char* msg, int32_t
} }
int32_t tqProcessSeekReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { int32_t tqProcessSeekReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
STqOffset offset = {0}; SMqVgOffset vgOffset = {0};
int32_t vgId = TD_VID(pTq->pVnode); int32_t vgId = TD_VID(pTq->pVnode);
SDecoder decoder; SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)msg, msgLen); tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
if (tDecodeSTqOffset(&decoder, &offset) < 0) { if (tDecodeMqVgOffset(&decoder, &vgOffset) < 0) {
return -1; return -1;
} }
tDecoderClear(&decoder); tDecoderClear(&decoder);
if (offset.val.type != TMQ_OFFSET__LOG) { STqOffset* pOffset = &vgOffset.offset;
tqError("vgId:%d, subKey:%s invalid seek offset type:%d", vgId, offset.subKey, offset.val.type); if (pOffset->val.type != TMQ_OFFSET__LOG) {
tqError("vgId:%d, subKey:%s invalid seek offset type:%d", vgId, pOffset->subKey, pOffset->val.type);
return -1; return -1;
} }
STqOffset* pSavedOffset = tqOffsetRead(pTq->pOffsetStore, offset.subKey); STqHandle* pHandle = taosHashGet(pTq->pHandle, pOffset->subKey, strlen(pOffset->subKey));
if (pSavedOffset != NULL && pSavedOffset->val.type != TMQ_OFFSET__LOG) { if (pHandle == NULL) {
tqError("invalid saved offset type, vgId:%d sub:%s", vgId, offset.subKey); tqError("tmq seek: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", vgOffset.consumerId, vgId,
return 0; // no need to update the offset value pOffset->subKey);
terrno = TSDB_CODE_INVALID_MSG;
return -1;
} }
if (pSavedOffset->val.version == offset.val.version) { // 2. check consumer-vg assignment status
tqDebug("vgId:%d subKey:%s no need to seek to %" PRId64 " prev offset:%" PRId64, vgId, offset.subKey, offset.val.version, taosRLockLatch(&pTq->lock);
pSavedOffset->val.version); if (pHandle->consumerId != vgOffset.consumerId) {
return 0; tqDebug("ERROR tmq seek: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
vgOffset.consumerId, vgId, pOffset->subKey, pHandle->consumerId);
terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
taosRUnLockLatch(&pTq->lock);
return -1;
} }
taosRUnLockLatch(&pTq->lock);
STqHandle* pHandle = taosHashGet(pTq->pHandle, offset.subKey, strlen(offset.subKey)); //3. check the offset info
STqOffset* pSavedOffset = tqOffsetRead(pTq->pOffsetStore, pOffset->subKey);
if (pSavedOffset != NULL) {
if (pSavedOffset->val.type != TMQ_OFFSET__LOG) {
tqError("invalid saved offset type, vgId:%d sub:%s", vgId, pOffset->subKey);
return 0; // no need to update the offset value
}
if (pSavedOffset->val.version == pOffset->val.version) {
tqDebug("vgId:%d subKey:%s no need to seek to %" PRId64 " prev offset:%" PRId64, vgId, pOffset->subKey,
pOffset->val.version, pSavedOffset->val.version);
return 0;
}
}
int64_t sver = 0, ever = 0; int64_t sver = 0, ever = 0;
walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever); walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
if (offset.val.version < sver) { if (pOffset->val.version < sver) {
offset.val.version = sver; pOffset->val.version = sver;
} else if (offset.val.version > ever) { } else if (pOffset->val.version > ever) {
offset.val.version = ever; pOffset->val.version = ever;
} }
// save the new offset value // save the new offset value
tqDebug("vgId:%d sub:%s seek to %" PRId64 " prev offset:%" PRId64, vgId, offset.subKey, offset.val.version, tqDebug("vgId:%d sub:%s seek to %" PRId64 " prev offset:%" PRId64, vgId, pOffset->subKey, pOffset->val.version,
pSavedOffset->val.version); pSavedOffset->val.version);
if (tqOffsetWrite(pTq->pOffsetStore, &offset) < 0) { if (tqOffsetWrite(pTq->pOffsetStore, pOffset) < 0) {
tqError("failed to save offset, vgId:%d sub:%s seek to %" PRId64, vgId, offset.subKey, offset.val.version); tqError("failed to save offset, vgId:%d sub:%s seek to %" PRId64, vgId, pOffset->subKey, pOffset->val.version);
return -1; return -1;
} }