From a22047ecf75e4df7a01b194140432e670bb91032 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Fri, 18 Feb 2022 15:29:45 +0800 Subject: [PATCH] add reset offset --- include/common/tmsg.h | 10 ++++-- source/client/src/tmq.c | 2 +- source/common/src/tmsg.c | 34 +++++++++++++++++- source/dnode/mnode/impl/inc/mndDef.h | 15 ++++---- source/dnode/mnode/impl/src/mndSubscribe.c | 40 ++++++++++++++++++++++ 5 files changed, 89 insertions(+), 12 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index c6a7d06898..481d20ed17 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1751,6 +1751,11 @@ typedef struct { char cgroup[TSDB_CONSUMER_GROUP_LEN]; } SMqOffset; +typedef struct { + int32_t vgId; + SArray* offsets; // SArray +} SMqVgOffsets; + typedef struct { int32_t num; SMqOffset* offsets; @@ -1761,8 +1766,8 @@ typedef struct { } SMqCMResetOffsetRsp; typedef struct { - int32_t num; - SMqOffset* offsets; + int64_t leftForVer; + SMqVgOffsets offsets; } SMqMVResetOffsetReq; typedef struct { @@ -1773,7 +1778,6 @@ int32_t tEncodeSMqOffset(SCoder* encoder, const SMqOffset* pOffset); int32_t tDecodeSMqOffset(SCoder* decoder, SMqOffset* pOffset); int32_t tEncodeSMqCMResetOffsetReq(SCoder* encoder, const SMqCMResetOffsetReq* pReq); int32_t tDecodeSMqCMResetOffsetReq(SCoder* decoder, SMqCMResetOffsetReq* pReq); - int32_t tEncodeSMqMVResetOffsetReq(SCoder* encoder, const SMqMVResetOffsetReq* pReq); int32_t tDecodeSMqMVResetOffsetReq(SCoder* decoder, SMqMVResetOffsetReq* pReq); diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index 47f278d380..e91dee6e97 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -269,7 +269,7 @@ tmq_resp_err_t tmq_reset_offset(tmq_t* tmq, const tmq_topic_vgroup_list_t* offse tsem_wait(¶m.rspSem); tsem_destroy(¶m.rspSem); - return TMQ_RESP_ERR__SUCCESS; + return param.rspErr; } tmq_resp_err_t tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) { diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 4b6171362e..14937e4db2 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -2323,6 +2323,34 @@ int32_t tDecodeSMqOffset(SCoder *decoder, SMqOffset *pOffset) { return 0; } +int32_t tEncodeSMqVgOffsets(SCoder *encoder, const SMqVgOffsets *pOffsets) { + if (tStartEncode(encoder) < 0) return -1; + if (tEncodeI32(encoder, pOffsets->vgId) < 0) return -1; + int32_t sz = taosArrayGetSize(pOffsets->offsets); + if (tEncodeI32(encoder, sz) < 0) return -1; + for (int32_t i = 0; i < sz; i++) { + SMqOffset *offset = taosArrayGet(pOffsets->offsets, i); + if (tEncodeSMqOffset(encoder, offset) < 0) return -1; + } + tEndEncode(encoder); + return encoder->pos; +} + +int32_t tDecodeSMqVgOffsets(SCoder *decoder, SMqVgOffsets *pOffsets) { + int32_t sz; + if (tStartDecode(decoder) < 0) return -1; + if (tDecodeI32(decoder, &pOffsets->vgId) < 0) return -1; + if (tDecodeI32(decoder, &sz) < 0) return -1; + pOffsets->offsets = taosArrayInit(sz, sizeof(SMqOffset)); + for (int32_t i = 0; i < sz; i++) { + SMqOffset offset; + if (tDecodeSMqOffset(decoder, &offset) < 0) return -1; + taosArrayPush(pOffsets->offsets, &offset); + } + tEndDecode(decoder); + return 0; +} + int32_t tEncodeSMqCMResetOffsetReq(SCoder *encoder, const SMqCMResetOffsetReq *pReq) { if (tStartEncode(encoder) < 0) return -1; if (tEncodeI32(encoder, pReq->num) < 0) return -1; @@ -2334,17 +2362,20 @@ int32_t tEncodeSMqCMResetOffsetReq(SCoder *encoder, const SMqCMResetOffsetReq *p } int32_t tDecodeSMqCMResetOffsetReq(SCoder *decoder, SMqCMResetOffsetReq *pReq) { + if (tStartDecode(decoder) < 0) return -1; if (tDecodeI32(decoder, &pReq->num) < 0) return -1; pReq->offsets = TCODER_MALLOC(pReq->num * sizeof(SMqOffset), decoder); if (pReq->offsets == NULL) return -1; for (int32_t i = 0; i < pReq->num; i++) { tDecodeSMqOffset(decoder, &pReq->offsets[i]); } + tEndDecode(decoder); return 0; } +#if 0 int32_t tEncodeSMqMVResetOffsetReq(SCoder *encoder, const SMqMVResetOffsetReq *pReq) { - if (tEncodeI32(encoder, pReq->num) < 0) return -1; + if (tEncodeI64(encoder, pReq->leftForVer) < 0) return -1; for (int32_t i = 0; i < pReq->num; i++) { tEncodeSMqOffset(encoder, &pReq->offsets[i]); } @@ -2360,3 +2391,4 @@ int32_t tDecodeSMqMVResetOffsetReq(SCoder *decoder, SMqMVResetOffsetReq *pReq) { } return 0; } +#endif diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 73c0446e3f..74c5a6a463 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -256,6 +256,7 @@ typedef struct { int8_t quorum; int8_t update; int8_t cacheLastRow; + int8_t streamMode; } SDbCfg; typedef struct { @@ -423,9 +424,9 @@ typedef struct { char key[TSDB_SUBSCRIBE_KEY_LEN]; int32_t status; int32_t vgNum; - SArray* consumers; // SArray - SArray* lostConsumers; // SArray - SArray* unassignedVg; // SArray + SArray* consumers; // SArray + SArray* lostConsumers; // SArray + SArray* unassignedVg; // SArray } SMqSubscribeObj; static FORCE_INLINE SMqSubscribeObj* tNewSubscribeObj() { @@ -539,13 +540,13 @@ static FORCE_INLINE void* tDecodeSubscribeObj(void* buf, SMqSubscribeObj* pSub) static FORCE_INLINE void tDeleteSMqSubscribeObj(SMqSubscribeObj* pSub) { if (pSub->consumers) { taosArrayDestroyEx(pSub->consumers, (void (*)(void*))tDeleteSMqSubConsumer); - //taosArrayDestroy(pSub->consumers); + // taosArrayDestroy(pSub->consumers); pSub->consumers = NULL; } if (pSub->unassignedVg) { taosArrayDestroyEx(pSub->unassignedVg, (void (*)(void*))tDeleteSMqConsumerEp); - //taosArrayDestroy(pSub->unassignedVg); + // taosArrayDestroy(pSub->unassignedVg); pSub->unassignedVg = NULL; } } @@ -570,8 +571,8 @@ typedef struct { int64_t connId; SRWLatch lock; char cgroup[TSDB_CONSUMER_GROUP_LEN]; - SArray* currentTopics; // SArray - SArray* recentRemovedTopics; // SArray + SArray* currentTopics; // SArray + SArray* recentRemovedTopics; // SArray int32_t epoch; // stat int64_t pollCnt; diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 7fc194de9c..874c7c2a22 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -53,6 +53,7 @@ static int32_t mndProcessSubscribeInternalRsp(SMnodeMsg *pMsg); static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg); static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg); static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg); +static int32_t mndProcessResetOffsetReq(SMnodeMsg *pMsg); static int32_t mndPersistMqSetConnReq(SMnode *pMnode, STrans *pTrans, const SMqTopicObj *pTopic, const char *cgroup, const SMqConsumerEp *pConsumerEp); @@ -204,6 +205,45 @@ static int32_t mndPersistCancelConnReq(SMnode *pMnode, STrans *pTrans, const SMq return 0; } +#if 0 +static int32_t mndProcessResetOffsetReq(SMnodeMsg *pMsg) { + SMnode *pMnode = pMsg->pMnode; + uint8_t *str = pMsg->rpcMsg.pCont; + SMqCMResetOffsetReq req; + + SCoder decoder; + tCoderInit(&decoder, TD_LITTLE_ENDIAN, str, pMsg->rpcMsg.contLen, TD_DECODER); + tDecodeSMqCMResetOffsetReq(&decoder, &req); + + SHashObj *pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); + if (pHash == NULL) { + return -1; + } + + for (int32_t i = 0; i < req.num; i++) { + SMqOffset *pOffset = &req.offsets[i]; + SMqVgOffsets *pVgOffset = taosHashGet(pHash, &pOffset->vgId, sizeof(int32_t)); + if (pVgOffset == NULL) { + pVgOffset = malloc(sizeof(SMqVgOffsets)); + if (pVgOffset == NULL) { + return -1; + } + pVgOffset->offsets = taosArrayInit(0, sizeof(void *)); + taosArrayPush(pVgOffset->offsets, &pOffset); + } + taosHashPut(pHash, &pOffset->vgId, sizeof(int32_t), &pVgOffset, sizeof(void *)); + } + + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pMsg->rpcMsg); + if (pTrans == NULL) { + mError("mq-reset-offset: failed since %s", terrstr()); + return -1; + } + + return 0; +} +#endif + static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) { SMnode *pMnode = pMsg->pMnode; SMqCMGetSubEpReq *pReq = (SMqCMGetSubEpReq *)pMsg->rpcMsg.pCont;