fix double free
This commit is contained in:
parent
108fd4cfc3
commit
19892cfaf7
|
@ -597,7 +597,6 @@ int32_t tmqAsyncAskEp(tmq_t* tmq, bool wait) {
|
||||||
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
|
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
|
||||||
|
|
||||||
END:
|
END:
|
||||||
tfree(buf);
|
|
||||||
if (wait) tsem_wait(&tmq->rspSem);
|
if (wait) tsem_wait(&tmq->rspSem);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -526,7 +526,7 @@ static FORCE_INLINE void* tDecodeSubscribeObj(void* buf, SMqSubscribeObj* pSub)
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
for (int32_t i = 0; i < sz; i++) {
|
for (int32_t i = 0; i < sz; i++) {
|
||||||
SMqConsumerEp cEp;
|
SMqConsumerEp cEp = {0};
|
||||||
buf = tDecodeSMqConsumerEp(buf, &cEp);
|
buf = tDecodeSMqConsumerEp(buf, &cEp);
|
||||||
taosArrayPush(pSub->assigned, &cEp);
|
taosArrayPush(pSub->assigned, &cEp);
|
||||||
}
|
}
|
||||||
|
@ -539,7 +539,7 @@ static FORCE_INLINE void* tDecodeSubscribeObj(void* buf, SMqSubscribeObj* pSub)
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
for (int32_t i = 0; i < sz; i++) {
|
for (int32_t i = 0; i < sz; i++) {
|
||||||
SMqConsumerEp cEp;
|
SMqConsumerEp cEp = {0};
|
||||||
buf = tDecodeSMqConsumerEp(buf, &cEp);
|
buf = tDecodeSMqConsumerEp(buf, &cEp);
|
||||||
taosArrayPush(pSub->lostConsumer, &cEp);
|
taosArrayPush(pSub->lostConsumer, &cEp);
|
||||||
}
|
}
|
||||||
|
@ -553,7 +553,7 @@ static FORCE_INLINE void* tDecodeSubscribeObj(void* buf, SMqSubscribeObj* pSub)
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
for (int32_t i = 0; i < sz; i++) {
|
for (int32_t i = 0; i < sz; i++) {
|
||||||
SMqConsumerEp cEp;
|
SMqConsumerEp cEp = {0};
|
||||||
buf = tDecodeSMqConsumerEp(buf, &cEp);
|
buf = tDecodeSMqConsumerEp(buf, &cEp);
|
||||||
taosArrayPush(pSub->idleConsumer, &cEp);
|
taosArrayPush(pSub->idleConsumer, &cEp);
|
||||||
}
|
}
|
||||||
|
@ -569,7 +569,7 @@ static FORCE_INLINE void* tDecodeSubscribeObj(void* buf, SMqSubscribeObj* pSub)
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
for (int32_t i = 0; i < sz; i++) {
|
for (int32_t i = 0; i < sz; i++) {
|
||||||
SMqConsumerEp cEp;
|
SMqConsumerEp cEp = {0};
|
||||||
buf = tDecodeSMqConsumerEp(buf, &cEp);
|
buf = tDecodeSMqConsumerEp(buf, &cEp);
|
||||||
taosArrayPush(pSub->unassignedVg, &cEp);
|
taosArrayPush(pSub->unassignedVg, &cEp);
|
||||||
}
|
}
|
||||||
|
@ -578,11 +578,26 @@ static FORCE_INLINE void* tDecodeSubscribeObj(void* buf, SMqSubscribeObj* pSub)
|
||||||
}
|
}
|
||||||
|
|
||||||
static FORCE_INLINE void tDeleteSMqSubscribeObj(SMqSubscribeObj* pSub) {
|
static FORCE_INLINE void tDeleteSMqSubscribeObj(SMqSubscribeObj* pSub) {
|
||||||
if (pSub->availConsumer) taosArrayDestroy(pSub->availConsumer);
|
if (pSub->availConsumer) {
|
||||||
if (pSub->assigned) taosArrayDestroyEx(pSub->assigned, (void (*)(void*))tDeleteSMqConsumerEp);
|
taosArrayDestroy(pSub->availConsumer);
|
||||||
if (pSub->unassignedVg) taosArrayDestroyEx(pSub->unassignedVg, (void (*)(void*))tDeleteSMqConsumerEp);
|
pSub->availConsumer = NULL;
|
||||||
if (pSub->idleConsumer) taosArrayDestroyEx(pSub->idleConsumer, (void (*)(void*))tDeleteSMqConsumerEp);
|
}
|
||||||
if (pSub->lostConsumer) taosArrayDestroyEx(pSub->lostConsumer, (void (*)(void*))tDeleteSMqConsumerEp);
|
if (pSub->assigned) {
|
||||||
|
taosArrayDestroyEx(pSub->assigned, (void (*)(void*))tDeleteSMqConsumerEp);
|
||||||
|
pSub->assigned = NULL;
|
||||||
|
}
|
||||||
|
if (pSub->unassignedVg) {
|
||||||
|
taosArrayDestroyEx(pSub->unassignedVg, (void (*)(void*))tDeleteSMqConsumerEp);
|
||||||
|
pSub->unassignedVg = NULL;
|
||||||
|
}
|
||||||
|
if (pSub->idleConsumer) {
|
||||||
|
taosArrayDestroyEx(pSub->idleConsumer, (void (*)(void*))tDeleteSMqConsumerEp);
|
||||||
|
pSub->idleConsumer = NULL;
|
||||||
|
}
|
||||||
|
if (pSub->lostConsumer) {
|
||||||
|
taosArrayDestroyEx(pSub->lostConsumer, (void (*)(void*))tDeleteSMqConsumerEp);
|
||||||
|
pSub->lostConsumer = NULL;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
typedef struct SMqCGroup {
|
typedef struct SMqCGroup {
|
||||||
|
|
|
@ -292,7 +292,7 @@ static int mndInitUnassignedVg(SMnode *pMnode, SMqTopicObj *pTopic, SArray *unas
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
SMqConsumerEp CEp;
|
SMqConsumerEp CEp = {0};
|
||||||
CEp.status = 0;
|
CEp.status = 0;
|
||||||
CEp.consumerId = -1;
|
CEp.consumerId = -1;
|
||||||
CEp.lastConsumerHbTs = CEp.lastVgHbTs = -1;
|
CEp.lastConsumerHbTs = CEp.lastVgHbTs = -1;
|
||||||
|
|
Loading…
Reference in New Issue