fix:enable split if there are topics & fix memory leak
This commit is contained in:
parent
d3f54e678f
commit
f9c5aa01af
|
@ -332,7 +332,7 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) {
|
||||||
char key[TSDB_SUBSCRIBE_KEY_LEN];
|
char key[TSDB_SUBSCRIBE_KEY_LEN];
|
||||||
mndMakeSubscribeKey(key, pConsumer->cgroup, topic);
|
mndMakeSubscribeKey(key, pConsumer->cgroup, topic);
|
||||||
mndGetOrCreateRebSub(pRebMsg->rebSubHash, key);
|
mndGetOrCreateRebSub(pRebMsg->rebSubHash, key);
|
||||||
mInfo("vnode splitted, rebalance will be triggered");
|
mInfo("vnode splitted, vgId:%d rebalance will be triggered", pVgEp->vgId);
|
||||||
}
|
}
|
||||||
mndReleaseVgroup(pMnode, pVgroup);
|
mndReleaseVgroup(pMnode, pVgroup);
|
||||||
}
|
}
|
||||||
|
|
|
@ -417,7 +417,7 @@ static int32_t processRemoveAddVgs(SMnode *pMnode, SMqRebOutputObj *pOutput){
|
||||||
taosArrayAddAll(pOutput->pSub->unassignedVgs, newVgs);
|
taosArrayAddAll(pOutput->pSub->unassignedVgs, newVgs);
|
||||||
mInfo("processRemoveAddVgs add new vg num:%d", (int)taosArrayGetSize(newVgs));
|
mInfo("processRemoveAddVgs add new vg num:%d", (int)taosArrayGetSize(newVgs));
|
||||||
}
|
}
|
||||||
taosArrayDestroy(newVgs);
|
taosArrayDestroyP(newVgs, (FDelete)tDeleteSMqVgEp);
|
||||||
return totalVgNum;
|
return totalVgNum;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -2672,14 +2672,14 @@ int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj *pVgro
|
||||||
SDbObj dbObj = {0};
|
SDbObj dbObj = {0};
|
||||||
SArray *pArray = mndBuildDnodesArray(pMnode, 0);
|
SArray *pArray = mndBuildDnodesArray(pMnode, 0);
|
||||||
|
|
||||||
int32_t numOfTopics = 0;
|
// int32_t numOfTopics = 0;
|
||||||
if (mndGetNumOfTopics(pMnode, pDb->name, &numOfTopics) != 0) {
|
// if (mndGetNumOfTopics(pMnode, pDb->name, &numOfTopics) != 0) {
|
||||||
goto _OVER;
|
// goto _OVER;
|
||||||
}
|
// }
|
||||||
if (numOfTopics > 0) {
|
// if (numOfTopics > 0) {
|
||||||
terrno = TSDB_CODE_MND_TOPIC_MUST_BE_DELETED;
|
// terrno = TSDB_CODE_MND_TOPIC_MUST_BE_DELETED;
|
||||||
goto _OVER;
|
// goto _OVER;
|
||||||
}
|
// }
|
||||||
|
|
||||||
int32_t numOfStreams = 0;
|
int32_t numOfStreams = 0;
|
||||||
if (mndGetNumOfStreams(pMnode, pDb->name, &numOfStreams) != 0) {
|
if (mndGetNumOfStreams(pMnode, pDb->name, &numOfStreams) != 0) {
|
||||||
|
|
|
@ -712,7 +712,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if (pHandle->consumerId == req.newConsumerId) { // do nothing
|
if (pHandle->consumerId == req.newConsumerId) { // do nothing
|
||||||
tqInfo("vgId:%d no switch consumer:0x%" PRIx64 " remains, because redo wal log", req.vgId, req.newConsumerId);
|
tqInfo("vgId:%d no switch consumer:0x%" PRIx64 " remains", req.vgId, req.newConsumerId);
|
||||||
} else {
|
} else {
|
||||||
tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId,
|
tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId,
|
||||||
req.newConsumerId);
|
req.newConsumerId);
|
||||||
|
|
Loading…
Reference in New Issue