add reset offset

This commit is contained in:
Liu Jicong 2022-02-18 15:29:45 +08:00
parent 7bbca5e4b5
commit a22047ecf7
5 changed files with 89 additions and 12 deletions

View File

@ -1751,6 +1751,11 @@ typedef struct {
char cgroup[TSDB_CONSUMER_GROUP_LEN];
} SMqOffset;
typedef struct {
int32_t vgId;
SArray* offsets; // SArray<SMqOffset>
} SMqVgOffsets;
typedef struct {
int32_t num;
SMqOffset* offsets;
@ -1761,8 +1766,8 @@ typedef struct {
} SMqCMResetOffsetRsp;
typedef struct {
int32_t num;
SMqOffset* offsets;
int64_t leftForVer;
SMqVgOffsets offsets;
} SMqMVResetOffsetReq;
typedef struct {
@ -1773,7 +1778,6 @@ int32_t tEncodeSMqOffset(SCoder* encoder, const SMqOffset* pOffset);
int32_t tDecodeSMqOffset(SCoder* decoder, SMqOffset* pOffset);
int32_t tEncodeSMqCMResetOffsetReq(SCoder* encoder, const SMqCMResetOffsetReq* pReq);
int32_t tDecodeSMqCMResetOffsetReq(SCoder* decoder, SMqCMResetOffsetReq* pReq);
int32_t tEncodeSMqMVResetOffsetReq(SCoder* encoder, const SMqMVResetOffsetReq* pReq);
int32_t tDecodeSMqMVResetOffsetReq(SCoder* decoder, SMqMVResetOffsetReq* pReq);

View File

@ -269,7 +269,7 @@ tmq_resp_err_t tmq_reset_offset(tmq_t* tmq, const tmq_topic_vgroup_list_t* offse
tsem_wait(&param.rspSem);
tsem_destroy(&param.rspSem);
return TMQ_RESP_ERR__SUCCESS;
return param.rspErr;
}
tmq_resp_err_t tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) {

View File

@ -2323,6 +2323,34 @@ int32_t tDecodeSMqOffset(SCoder *decoder, SMqOffset *pOffset) {
return 0;
}
int32_t tEncodeSMqVgOffsets(SCoder *encoder, const SMqVgOffsets *pOffsets) {
if (tStartEncode(encoder) < 0) return -1;
if (tEncodeI32(encoder, pOffsets->vgId) < 0) return -1;
int32_t sz = taosArrayGetSize(pOffsets->offsets);
if (tEncodeI32(encoder, sz) < 0) return -1;
for (int32_t i = 0; i < sz; i++) {
SMqOffset *offset = taosArrayGet(pOffsets->offsets, i);
if (tEncodeSMqOffset(encoder, offset) < 0) return -1;
}
tEndEncode(encoder);
return encoder->pos;
}
int32_t tDecodeSMqVgOffsets(SCoder *decoder, SMqVgOffsets *pOffsets) {
int32_t sz;
if (tStartDecode(decoder) < 0) return -1;
if (tDecodeI32(decoder, &pOffsets->vgId) < 0) return -1;
if (tDecodeI32(decoder, &sz) < 0) return -1;
pOffsets->offsets = taosArrayInit(sz, sizeof(SMqOffset));
for (int32_t i = 0; i < sz; i++) {
SMqOffset offset;
if (tDecodeSMqOffset(decoder, &offset) < 0) return -1;
taosArrayPush(pOffsets->offsets, &offset);
}
tEndDecode(decoder);
return 0;
}
int32_t tEncodeSMqCMResetOffsetReq(SCoder *encoder, const SMqCMResetOffsetReq *pReq) {
if (tStartEncode(encoder) < 0) return -1;
if (tEncodeI32(encoder, pReq->num) < 0) return -1;
@ -2334,17 +2362,20 @@ int32_t tEncodeSMqCMResetOffsetReq(SCoder *encoder, const SMqCMResetOffsetReq *p
}
int32_t tDecodeSMqCMResetOffsetReq(SCoder *decoder, SMqCMResetOffsetReq *pReq) {
if (tStartDecode(decoder) < 0) return -1;
if (tDecodeI32(decoder, &pReq->num) < 0) return -1;
pReq->offsets = TCODER_MALLOC(pReq->num * sizeof(SMqOffset), decoder);
if (pReq->offsets == NULL) return -1;
for (int32_t i = 0; i < pReq->num; i++) {
tDecodeSMqOffset(decoder, &pReq->offsets[i]);
}
tEndDecode(decoder);
return 0;
}
#if 0
int32_t tEncodeSMqMVResetOffsetReq(SCoder *encoder, const SMqMVResetOffsetReq *pReq) {
if (tEncodeI32(encoder, pReq->num) < 0) return -1;
if (tEncodeI64(encoder, pReq->leftForVer) < 0) return -1;
for (int32_t i = 0; i < pReq->num; i++) {
tEncodeSMqOffset(encoder, &pReq->offsets[i]);
}
@ -2360,3 +2391,4 @@ int32_t tDecodeSMqMVResetOffsetReq(SCoder *decoder, SMqMVResetOffsetReq *pReq) {
}
return 0;
}
#endif

View File

@ -256,6 +256,7 @@ typedef struct {
int8_t quorum;
int8_t update;
int8_t cacheLastRow;
int8_t streamMode;
} SDbCfg;
typedef struct {
@ -423,9 +424,9 @@ typedef struct {
char key[TSDB_SUBSCRIBE_KEY_LEN];
int32_t status;
int32_t vgNum;
SArray* consumers; // SArray<SMqSubConsumer>
SArray* lostConsumers; // SArray<SMqSubConsumer>
SArray* unassignedVg; // SArray<SMqConsumerEp>
SArray* consumers; // SArray<SMqSubConsumer>
SArray* lostConsumers; // SArray<SMqSubConsumer>
SArray* unassignedVg; // SArray<SMqConsumerEp>
} SMqSubscribeObj;
static FORCE_INLINE SMqSubscribeObj* tNewSubscribeObj() {
@ -539,13 +540,13 @@ static FORCE_INLINE void* tDecodeSubscribeObj(void* buf, SMqSubscribeObj* pSub)
static FORCE_INLINE void tDeleteSMqSubscribeObj(SMqSubscribeObj* pSub) {
if (pSub->consumers) {
taosArrayDestroyEx(pSub->consumers, (void (*)(void*))tDeleteSMqSubConsumer);
//taosArrayDestroy(pSub->consumers);
// taosArrayDestroy(pSub->consumers);
pSub->consumers = NULL;
}
if (pSub->unassignedVg) {
taosArrayDestroyEx(pSub->unassignedVg, (void (*)(void*))tDeleteSMqConsumerEp);
//taosArrayDestroy(pSub->unassignedVg);
// taosArrayDestroy(pSub->unassignedVg);
pSub->unassignedVg = NULL;
}
}
@ -570,8 +571,8 @@ typedef struct {
int64_t connId;
SRWLatch lock;
char cgroup[TSDB_CONSUMER_GROUP_LEN];
SArray* currentTopics; // SArray<char*>
SArray* recentRemovedTopics; // SArray<char*>
SArray* currentTopics; // SArray<char*>
SArray* recentRemovedTopics; // SArray<char*>
int32_t epoch;
// stat
int64_t pollCnt;

View File

@ -53,6 +53,7 @@ static int32_t mndProcessSubscribeInternalRsp(SMnodeMsg *pMsg);
static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg);
static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg);
static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg);
static int32_t mndProcessResetOffsetReq(SMnodeMsg *pMsg);
static int32_t mndPersistMqSetConnReq(SMnode *pMnode, STrans *pTrans, const SMqTopicObj *pTopic, const char *cgroup,
const SMqConsumerEp *pConsumerEp);
@ -204,6 +205,45 @@ static int32_t mndPersistCancelConnReq(SMnode *pMnode, STrans *pTrans, const SMq
return 0;
}
#if 0
static int32_t mndProcessResetOffsetReq(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode;
uint8_t *str = pMsg->rpcMsg.pCont;
SMqCMResetOffsetReq req;
SCoder decoder;
tCoderInit(&decoder, TD_LITTLE_ENDIAN, str, pMsg->rpcMsg.contLen, TD_DECODER);
tDecodeSMqCMResetOffsetReq(&decoder, &req);
SHashObj *pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
if (pHash == NULL) {
return -1;
}
for (int32_t i = 0; i < req.num; i++) {
SMqOffset *pOffset = &req.offsets[i];
SMqVgOffsets *pVgOffset = taosHashGet(pHash, &pOffset->vgId, sizeof(int32_t));
if (pVgOffset == NULL) {
pVgOffset = malloc(sizeof(SMqVgOffsets));
if (pVgOffset == NULL) {
return -1;
}
pVgOffset->offsets = taosArrayInit(0, sizeof(void *));
taosArrayPush(pVgOffset->offsets, &pOffset);
}
taosHashPut(pHash, &pOffset->vgId, sizeof(int32_t), &pVgOffset, sizeof(void *));
}
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pMsg->rpcMsg);
if (pTrans == NULL) {
mError("mq-reset-offset: failed since %s", terrstr());
return -1;
}
return 0;
}
#endif
static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode;
SMqCMGetSubEpReq *pReq = (SMqCMGetSubEpReq *)pMsg->rpcMsg.pCont;