fix decode error

This commit is contained in:
Liu Jicong 2022-01-25 22:00:55 +08:00
parent 3036d4eac0
commit 9d57522225
1 changed files with 59 additions and 8 deletions

View File

@ -372,6 +372,8 @@ static FORCE_INLINE int32_t tEncodeSMqConsumerEp(void** buf, SMqConsumerEp* pCon
tlen += taosEncodeFixedI32(buf, pConsumerEp->status); tlen += taosEncodeFixedI32(buf, pConsumerEp->status);
tlen += taosEncodeSEpSet(buf, &pConsumerEp->epSet); tlen += taosEncodeSEpSet(buf, &pConsumerEp->epSet);
tlen += taosEncodeFixedI64(buf, pConsumerEp->consumerId); tlen += taosEncodeFixedI64(buf, pConsumerEp->consumerId);
tlen += taosEncodeFixedI64(buf, pConsumerEp->lastConsumerHbTs);
tlen += taosEncodeFixedI64(buf, pConsumerEp->lastVgHbTs);
//tlen += tEncodeSSubQueryMsg(buf, &pConsumerEp->qExec); //tlen += tEncodeSSubQueryMsg(buf, &pConsumerEp->qExec);
tlen += taosEncodeString(buf, pConsumerEp->qmsg); tlen += taosEncodeString(buf, pConsumerEp->qmsg);
return tlen; return tlen;
@ -382,6 +384,8 @@ static FORCE_INLINE void* tDecodeSMqConsumerEp(void** buf, SMqConsumerEp* pConsu
buf = taosDecodeFixedI32(buf, &pConsumerEp->status); buf = taosDecodeFixedI32(buf, &pConsumerEp->status);
buf = taosDecodeSEpSet(buf, &pConsumerEp->epSet); buf = taosDecodeSEpSet(buf, &pConsumerEp->epSet);
buf = taosDecodeFixedI64(buf, &pConsumerEp->consumerId); buf = taosDecodeFixedI64(buf, &pConsumerEp->consumerId);
buf = taosDecodeFixedI64(buf, &pConsumerEp->lastConsumerHbTs);
buf = taosDecodeFixedI64(buf, &pConsumerEp->lastVgHbTs);
//buf = tDecodeSSubQueryMsg(buf, &pConsumerEp->qExec); //buf = tDecodeSSubQueryMsg(buf, &pConsumerEp->qExec);
buf = taosDecodeString(buf, &pConsumerEp->qmsg); buf = taosDecodeString(buf, &pConsumerEp->qmsg);
return buf; return buf;
@ -419,18 +423,27 @@ static FORCE_INLINE SMqSubscribeObj* tNewSubscribeObj() {
free(pSub); free(pSub);
return NULL; return NULL;
} }
pSub->idleConsumer = taosArrayInit(0, sizeof(SMqConsumerEp)); pSub->lostConsumer = taosArrayInit(0, sizeof(SMqConsumerEp));
if (pSub->assigned == NULL) { if (pSub->idleConsumer == NULL) {
taosArrayDestroy(pSub->availConsumer); taosArrayDestroy(pSub->availConsumer);
taosArrayDestroy(pSub->idleConsumer); taosArrayDestroy(pSub->assigned);
free(pSub);
return NULL;
}
pSub->idleConsumer = taosArrayInit(0, sizeof(SMqConsumerEp));
if (pSub->idleConsumer == NULL) {
taosArrayDestroy(pSub->availConsumer);
taosArrayDestroy(pSub->assigned);
taosArrayDestroy(pSub->lostConsumer);
free(pSub); free(pSub);
return NULL; return NULL;
} }
pSub->unassignedVg = taosArrayInit(0, sizeof(SMqConsumerEp)); pSub->unassignedVg = taosArrayInit(0, sizeof(SMqConsumerEp));
if (pSub->assigned == NULL) { if (pSub->unassignedVg == NULL) {
taosArrayDestroy(pSub->availConsumer); taosArrayDestroy(pSub->availConsumer);
taosArrayDestroy(pSub->assigned);
taosArrayDestroy(pSub->lostConsumer);
taosArrayDestroy(pSub->idleConsumer); taosArrayDestroy(pSub->idleConsumer);
taosArrayDestroy(pSub->unassignedVg);
free(pSub); free(pSub);
return NULL; return NULL;
} }
@ -457,6 +470,13 @@ static FORCE_INLINE int32_t tEncodeSubscribeObj(void** buf, const SMqSubscribeOb
tlen += tEncodeSMqConsumerEp(buf, pCEp); tlen += tEncodeSMqConsumerEp(buf, pCEp);
} }
sz = taosArrayGetSize(pSub->lostConsumer);
tlen += taosEncodeFixedI32(buf, sz);
for (int32_t i = 0; i < sz; i++) {
SMqConsumerEp* pCEp = taosArrayGet(pSub->lostConsumer, i);
tlen += tEncodeSMqConsumerEp(buf, pCEp);
}
sz = taosArrayGetSize(pSub->idleConsumer); sz = taosArrayGetSize(pSub->idleConsumer);
tlen += taosEncodeFixedI32(buf, sz); tlen += taosEncodeFixedI32(buf, sz);
for (int32_t i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
@ -481,20 +501,47 @@ static FORCE_INLINE void* tDecodeSubscribeObj(void* buf, SMqSubscribeObj* pSub)
int32_t sz; int32_t sz;
buf = taosDecodeFixedI32(buf, &sz); buf = taosDecodeFixedI32(buf, &sz);
pSub->assigned = taosArrayInit(sz, sizeof(int64_t)); pSub->availConsumer = taosArrayInit(sz, sizeof(int64_t));
if (pSub->assigned == NULL) { if (pSub->availConsumer == NULL) {
return NULL; return NULL;
} }
for (int32_t i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
int64_t consumerId; int64_t consumerId;
buf = taosDecodeFixedI64(buf, &consumerId); buf = taosDecodeFixedI64(buf, &consumerId);
taosArrayPush(pSub->assigned, &consumerId); taosArrayPush(pSub->availConsumer, &consumerId);
}
buf = taosDecodeFixedI32(buf, &sz);
pSub->assigned = taosArrayInit(sz, sizeof(SMqConsumerEp));
if (pSub->assigned == NULL) {
taosArrayDestroy(pSub->availConsumer);
return NULL;
}
for (int32_t i = 0; i < sz; i++) {
SMqConsumerEp cEp;
buf = tDecodeSMqConsumerEp(buf, &cEp);
taosArrayPush(pSub->assigned, &cEp);
}
buf = taosDecodeFixedI32(buf, &sz);
pSub->lostConsumer = taosArrayInit(sz, sizeof(SMqConsumerEp));
if (pSub->lostConsumer == NULL) {
taosArrayDestroy(pSub->availConsumer);
taosArrayDestroy(pSub->assigned);
return NULL;
}
for (int32_t i = 0; i < sz; i++) {
SMqConsumerEp cEp;
buf = tDecodeSMqConsumerEp(buf, &cEp);
taosArrayPush(pSub->lostConsumer, &cEp);
} }
buf = taosDecodeFixedI32(buf, &sz); buf = taosDecodeFixedI32(buf, &sz);
pSub->idleConsumer = taosArrayInit(sz, sizeof(SMqConsumerEp)); pSub->idleConsumer = taosArrayInit(sz, sizeof(SMqConsumerEp));
if (pSub->idleConsumer == NULL) { if (pSub->idleConsumer == NULL) {
taosArrayDestroy(pSub->availConsumer);
taosArrayDestroy(pSub->assigned); taosArrayDestroy(pSub->assigned);
taosArrayDestroy(pSub->lostConsumer);
return NULL; return NULL;
} }
for (int32_t i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
@ -503,10 +550,13 @@ static FORCE_INLINE void* tDecodeSubscribeObj(void* buf, SMqSubscribeObj* pSub)
taosArrayPush(pSub->idleConsumer, &cEp); taosArrayPush(pSub->idleConsumer, &cEp);
} }
buf = taosDecodeFixedI32(buf, &sz); buf = taosDecodeFixedI32(buf, &sz);
pSub->unassignedVg = taosArrayInit(sz, sizeof(SMqConsumerEp)); pSub->unassignedVg = taosArrayInit(sz, sizeof(SMqConsumerEp));
if (pSub->unassignedVg == NULL) { if (pSub->unassignedVg == NULL) {
taosArrayDestroy(pSub->availConsumer);
taosArrayDestroy(pSub->assigned); taosArrayDestroy(pSub->assigned);
taosArrayDestroy(pSub->lostConsumer);
taosArrayDestroy(pSub->idleConsumer); taosArrayDestroy(pSub->idleConsumer);
return NULL; return NULL;
} }
@ -576,6 +626,7 @@ static FORCE_INLINE int32_t tEncodeSMqConsumerTopic(void** buf, SMqConsumerTopic
int32_t tlen = 0; int32_t tlen = 0;
tlen += taosEncodeString(buf, pConsumerTopic->name); tlen += taosEncodeString(buf, pConsumerTopic->name);
tlen += taosEncodeFixedI32(buf, pConsumerTopic->epoch); tlen += taosEncodeFixedI32(buf, pConsumerTopic->epoch);
ASSERT(pConsumerTopic->pVgInfo);
int32_t sz = taosArrayGetSize(pConsumerTopic->pVgInfo); int32_t sz = taosArrayGetSize(pConsumerTopic->pVgInfo);
tlen += taosEncodeFixedI32(buf, sz); tlen += taosEncodeFixedI32(buf, sz);
for (int32_t i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {