fix flag init

This commit is contained in:
Liu Jicong 2022-04-21 17:08:11 +08:00
parent 2dcb014511
commit cb1318a76b
3 changed files with 17 additions and 28 deletions

View File

@ -490,11 +490,6 @@ void* tDecodeSMqConsumerObj(const void* buf, SMqConsumerObj* pConsumer
typedef struct { typedef struct {
int32_t vgId; int32_t vgId;
int8_t subType;
int8_t withTbName;
int8_t withSchema;
int8_t withTag;
int8_t withTagSchema;
char* qmsg; char* qmsg;
SEpSet epSet; SEpSet epSet;
} SMqVgEp; } SMqVgEp;

View File

@ -148,13 +148,7 @@ SMqVgEp *tCloneSMqVgEp(const SMqVgEp *pVgEp) {
SMqVgEp *pVgEpNew = taosMemoryMalloc(sizeof(SMqVgEp)); SMqVgEp *pVgEpNew = taosMemoryMalloc(sizeof(SMqVgEp));
if (pVgEpNew == NULL) return NULL; if (pVgEpNew == NULL) return NULL;
pVgEpNew->vgId = pVgEp->vgId; pVgEpNew->vgId = pVgEp->vgId;
pVgEpNew->subType = pVgEp->subType;
pVgEpNew->withTbName = pVgEp->withTbName;
pVgEpNew->withSchema = pVgEp->withSchema;
pVgEpNew->withTag = pVgEp->withTag;
pVgEpNew->withTagSchema = pVgEp->withTagSchema;
pVgEpNew->qmsg = strdup(pVgEp->qmsg); pVgEpNew->qmsg = strdup(pVgEp->qmsg);
/*memcpy(pVgEpNew->topic, pVgEp->topic, TSDB_TOPIC_FNAME_LEN);*/
pVgEpNew->epSet = pVgEp->epSet; pVgEpNew->epSet = pVgEp->epSet;
return pVgEpNew; return pVgEpNew;
} }
@ -164,26 +158,14 @@ void tDeleteSMqVgEp(SMqVgEp *pVgEp) { taosMemoryFree(pVgEp->qmsg); }
int32_t tEncodeSMqVgEp(void **buf, const SMqVgEp *pVgEp) { int32_t tEncodeSMqVgEp(void **buf, const SMqVgEp *pVgEp) {
int32_t tlen = 0; int32_t tlen = 0;
tlen += taosEncodeFixedI32(buf, pVgEp->vgId); tlen += taosEncodeFixedI32(buf, pVgEp->vgId);
tlen += taosEncodeFixedI8(buf, pVgEp->subType);
tlen += taosEncodeFixedI8(buf, pVgEp->withTbName);
tlen += taosEncodeFixedI8(buf, pVgEp->withSchema);
tlen += taosEncodeFixedI8(buf, pVgEp->withTag);
tlen += taosEncodeFixedI8(buf, pVgEp->withTagSchema);
tlen += taosEncodeString(buf, pVgEp->qmsg); tlen += taosEncodeString(buf, pVgEp->qmsg);
/*tlen += taosEncodeString(buf, pVgEp->topic);*/
tlen += taosEncodeSEpSet(buf, &pVgEp->epSet); tlen += taosEncodeSEpSet(buf, &pVgEp->epSet);
return tlen; return tlen;
} }
void *tDecodeSMqVgEp(const void *buf, SMqVgEp *pVgEp) { void *tDecodeSMqVgEp(const void *buf, SMqVgEp *pVgEp) {
buf = taosDecodeFixedI32(buf, &pVgEp->vgId); buf = taosDecodeFixedI32(buf, &pVgEp->vgId);
buf = taosDecodeFixedI8(buf, &pVgEp->subType);
buf = taosDecodeFixedI8(buf, &pVgEp->withTbName);
buf = taosDecodeFixedI8(buf, &pVgEp->withSchema);
buf = taosDecodeFixedI8(buf, &pVgEp->withTag);
buf = taosDecodeFixedI8(buf, &pVgEp->withTagSchema);
buf = taosDecodeString(buf, &pVgEp->qmsg); buf = taosDecodeString(buf, &pVgEp->qmsg);
/*buf = taosDecodeStringTo(buf, pVgEp->topic);*/
buf = taosDecodeSEpSet(buf, &pVgEp->epSet); buf = taosDecodeSEpSet(buf, &pVgEp->epSet);
return (void *)buf; return (void *)buf;
} }

View File

@ -85,6 +85,12 @@ static SMqSubscribeObj *mndCreateSub(SMnode *pMnode, const SMqTopicObj *pTopic,
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return NULL;
} }
pSub->subType = pTopic->subType;
pSub->withTbName = pTopic->withTbName;
pSub->withSchema = pTopic->withSchema;
pSub->withTag = pTopic->withTag;
pSub->withTagSchema = pTopic->withTagSchema;
ASSERT(taosHashGetSize(pSub->consumerHash) == 1); ASSERT(taosHashGetSize(pSub->consumerHash) == 1);
if (mndSchedInitSubEp(pMnode, pTopic, pSub) < 0) { if (mndSchedInitSubEp(pMnode, pTopic, pSub) < 0) {
@ -98,13 +104,19 @@ static SMqSubscribeObj *mndCreateSub(SMnode *pMnode, const SMqTopicObj *pTopic,
return pSub; return pSub;
} }
static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, const char *subKey, const SMqRebOutputVg *pRebVg) { static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, const SMqSubscribeObj *pSub,
const SMqRebOutputVg *pRebVg) {
SMqRebVgReq req = {0}; SMqRebVgReq req = {0};
req.oldConsumerId = pRebVg->oldConsumerId; req.oldConsumerId = pRebVg->oldConsumerId;
req.newConsumerId = pRebVg->newConsumerId; req.newConsumerId = pRebVg->newConsumerId;
req.vgId = pRebVg->pVgEp->vgId; req.vgId = pRebVg->pVgEp->vgId;
req.qmsg = pRebVg->pVgEp->qmsg; req.qmsg = pRebVg->pVgEp->qmsg;
strncpy(req.subKey, subKey, TSDB_SUBSCRIBE_KEY_LEN); req.subType = pSub->subType;
req.withTbName = pSub->withTbName;
req.withSchema = pSub->withSchema;
req.withTag = pSub->withTag;
req.withTagSchema = pSub->withTagSchema;
strncpy(req.subKey, pSub->key, TSDB_SUBSCRIBE_KEY_LEN);
int32_t tlen = sizeof(SMsgHead) + tEncodeSMqRebVgReq(NULL, &req); int32_t tlen = sizeof(SMsgHead) + tEncodeSMqRebVgReq(NULL, &req);
void *buf = taosMemoryMalloc(tlen); void *buf = taosMemoryMalloc(tlen);
@ -125,13 +137,13 @@ static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, const char *subK
return 0; return 0;
} }
static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, const char *subKey, static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, const SMqSubscribeObj *pSub,
const SMqRebOutputVg *pRebVg) { const SMqRebOutputVg *pRebVg) {
ASSERT(pRebVg->oldConsumerId != pRebVg->newConsumerId); ASSERT(pRebVg->oldConsumerId != pRebVg->newConsumerId);
void *buf; void *buf;
int32_t tlen; int32_t tlen;
if (mndBuildSubChangeReq(&buf, &tlen, subKey, pRebVg) < 0) { if (mndBuildSubChangeReq(&buf, &tlen, pSub, pRebVg) < 0) {
return -1; return -1;
} }
@ -395,7 +407,7 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SNodeMsg *pMsg, const SMqRebO
int32_t vgNum = taosArrayGetSize(rebVgs); int32_t vgNum = taosArrayGetSize(rebVgs);
for (int32_t i = 0; i < vgNum; i++) { for (int32_t i = 0; i < vgNum; i++) {
SMqRebOutputVg *pRebVg = taosArrayGet(rebVgs, i); SMqRebOutputVg *pRebVg = taosArrayGet(rebVgs, i);
if (mndPersistSubChangeVgReq(pMnode, pTrans, pOutput->pSub->key, pRebVg) < 0) { if (mndPersistSubChangeVgReq(pMnode, pTrans, pOutput->pSub, pRebVg) < 0) {
goto REB_FAIL; goto REB_FAIL;
} }
} }