fix:[TS-4391] rebalance cnt always 1 if msg lost
This commit is contained in:
parent
c83989754f
commit
d158ca74d8
|
@ -220,21 +220,6 @@ FAIL:
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
static SMqRebInfo *mndGetOrCreateRebSub(SHashObj *pHash, const char *key) {
|
|
||||||
SMqRebInfo *pRebInfo = taosHashGet(pHash, key, strlen(key) + 1);
|
|
||||||
if (pRebInfo == NULL) {
|
|
||||||
pRebInfo = tNewSMqRebSubscribe(key);
|
|
||||||
if (pRebInfo == NULL) {
|
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
taosHashPut(pHash, key, strlen(key) + 1, pRebInfo, sizeof(SMqRebInfo));
|
|
||||||
taosMemoryFree(pRebInfo);
|
|
||||||
pRebInfo = taosHashGet(pHash, key, strlen(key) + 1);
|
|
||||||
}
|
|
||||||
return pRebInfo;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
|
static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
SMnode *pMnode = pMsg->info.node;
|
SMnode *pMnode = pMsg->info.node;
|
||||||
|
|
|
@ -209,16 +209,18 @@ static int32_t mndSplitSubscribeKey(const char *key, char *topic, char *cgroup,
|
||||||
}
|
}
|
||||||
|
|
||||||
static SMqRebInfo *mndGetOrCreateRebSub(SHashObj *pHash, const char *key) {
|
static SMqRebInfo *mndGetOrCreateRebSub(SHashObj *pHash, const char *key) {
|
||||||
SMqRebInfo *pRebSub = taosHashGet(pHash, key, strlen(key) + 1);
|
SMqRebInfo *pRebInfo = taosHashGet(pHash, key, strlen(key) + 1);
|
||||||
if (pRebSub == NULL) {
|
if (pRebInfo == NULL) {
|
||||||
pRebSub = tNewSMqRebSubscribe(key);
|
pRebInfo = tNewSMqRebSubscribe(key);
|
||||||
if (pRebSub == NULL) {
|
if (pRebInfo == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
taosHashPut(pHash, key, strlen(key) + 1, pRebSub, sizeof(SMqRebInfo));
|
taosHashPut(pHash, key, strlen(key) + 1, pRebInfo, sizeof(SMqRebInfo));
|
||||||
|
taosMemoryFree(pRebInfo);
|
||||||
|
pRebInfo = taosHashGet(pHash, key, strlen(key) + 1);
|
||||||
}
|
}
|
||||||
return pRebSub;
|
return pRebInfo;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void doRemoveLostConsumers(SMqRebOutputObj *pOutput, SHashObj *pHash, const SMqRebInputObj *pInput) {
|
static void doRemoveLostConsumers(SMqRebOutputObj *pOutput, SHashObj *pHash, const SMqRebInputObj *pInput) {
|
||||||
|
|
Loading…
Reference in New Issue