Merge branch 'main' into enh/TD-22404-M
This commit is contained in:
commit
b93c85fbf5
|
@ -10,7 +10,7 @@
|
|||
<description>Demo project for TDengine</description>
|
||||
|
||||
<properties>
|
||||
<spring.version>5.3.26</spring.version>
|
||||
<spring.version>5.3.27</spring.version>
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -2071,7 +2071,6 @@ static FORCE_INLINE void* tDeserializeSMVSubscribeReq(void* buf, SMVSubscribeReq
|
|||
|
||||
typedef struct {
|
||||
char key[TSDB_SUBSCRIBE_KEY_LEN];
|
||||
SArray* lostConsumers; // SArray<int64_t>
|
||||
SArray* removedConsumers; // SArray<int64_t>
|
||||
SArray* newConsumers; // SArray<int64_t>
|
||||
} SMqRebInfo;
|
||||
|
@ -2082,10 +2081,6 @@ static FORCE_INLINE SMqRebInfo* tNewSMqRebSubscribe(const char* key) {
|
|||
return NULL;
|
||||
}
|
||||
tstrncpy(pRebInfo->key, key, TSDB_SUBSCRIBE_KEY_LEN);
|
||||
pRebInfo->lostConsumers = taosArrayInit(0, sizeof(int64_t));
|
||||
if (pRebInfo->lostConsumers == NULL) {
|
||||
goto _err;
|
||||
}
|
||||
pRebInfo->removedConsumers = taosArrayInit(0, sizeof(int64_t));
|
||||
if (pRebInfo->removedConsumers == NULL) {
|
||||
goto _err;
|
||||
|
@ -2096,7 +2091,6 @@ static FORCE_INLINE SMqRebInfo* tNewSMqRebSubscribe(const char* key) {
|
|||
}
|
||||
return pRebInfo;
|
||||
_err:
|
||||
taosArrayDestroy(pRebInfo->lostConsumers);
|
||||
taosArrayDestroy(pRebInfo->removedConsumers);
|
||||
taosArrayDestroy(pRebInfo->newConsumers);
|
||||
taosMemoryFreeClear(pRebInfo);
|
||||
|
|
|
@ -137,12 +137,12 @@ typedef enum {
|
|||
} EDndReason;
|
||||
|
||||
typedef enum {
|
||||
CONSUMER_UPDATE__TOUCH = 1,
|
||||
CONSUMER_UPDATE__TOUCH = 1, // rebalance req do not need change consume topic
|
||||
CONSUMER_UPDATE__ADD,
|
||||
CONSUMER_UPDATE__REMOVE,
|
||||
CONSUMER_UPDATE__LOST,
|
||||
CONSUMER_UPDATE__RECOVER,
|
||||
CONSUMER_UPDATE__MODIFY,
|
||||
CONSUMER_UPDATE__MODIFY, // subscribe req need change consume topic
|
||||
} ECsmUpdateType;
|
||||
|
||||
typedef struct {
|
||||
|
@ -624,7 +624,7 @@ typedef struct {
|
|||
SArray* rebVgs; // SArray<SMqRebOutputVg>
|
||||
SArray* newConsumers; // SArray<int64_t>
|
||||
SArray* removedConsumers; // SArray<int64_t>
|
||||
SArray* touchedConsumers; // SArray<int64_t>
|
||||
SArray* modifyConsumers; // SArray<int64_t>
|
||||
SMqSubscribeObj* pSub;
|
||||
SMqSubActionLogEntry* pLogEntry;
|
||||
} SMqRebOutputObj;
|
||||
|
|
|
@ -247,7 +247,6 @@ static SMqRebInfo *mndGetOrCreateRebSub(SHashObj *pHash, const char *key) {
|
|||
|
||||
static void freeRebalanceItem(void *param) {
|
||||
SMqRebInfo *pInfo = param;
|
||||
taosArrayDestroy(pInfo->lostConsumers);
|
||||
taosArrayDestroy(pInfo->newConsumers);
|
||||
taosArrayDestroy(pInfo->removedConsumers);
|
||||
}
|
||||
|
@ -335,7 +334,7 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) {
|
|||
taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId);
|
||||
}
|
||||
taosRUnLockLatch(&pConsumer->lock);
|
||||
} else if (status == MQ_CONSUMER_STATUS__MODIFY) {
|
||||
} else {
|
||||
taosRLockLatch(&pConsumer->lock);
|
||||
|
||||
int32_t newTopicNum = taosArrayGetSize(pConsumer->rebNewTopics);
|
||||
|
@ -356,8 +355,6 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) {
|
|||
taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId);
|
||||
}
|
||||
taosRUnLockLatch(&pConsumer->lock);
|
||||
} else {
|
||||
// do nothing
|
||||
}
|
||||
|
||||
mndReleaseConsumer(pMnode, pConsumer);
|
||||
|
@ -917,34 +914,22 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
|
|||
taosWLockLatch(&pOldConsumer->lock);
|
||||
|
||||
if (pNewConsumer->updateType == CONSUMER_UPDATE__MODIFY) {
|
||||
/*A(taosArrayGetSize(pOldConsumer->rebNewTopics) == 0);*/
|
||||
/*A(taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0);*/
|
||||
SArray *tmp = pOldConsumer->rebNewTopics;
|
||||
pOldConsumer->rebNewTopics = pNewConsumer->rebNewTopics;
|
||||
pNewConsumer->rebNewTopics = tmp;
|
||||
|
||||
// this new consumer has identical topics with one existed consumers.
|
||||
if (taosArrayGetSize(pNewConsumer->rebNewTopics) == 0 && taosArrayGetSize(pNewConsumer->rebRemovedTopics) == 0) {
|
||||
pOldConsumer->status = MQ_CONSUMER_STATUS__READY;
|
||||
} else {
|
||||
SArray *tmp = pOldConsumer->rebNewTopics;
|
||||
pOldConsumer->rebNewTopics = pNewConsumer->rebNewTopics;
|
||||
pNewConsumer->rebNewTopics = tmp;
|
||||
tmp = pOldConsumer->rebRemovedTopics;
|
||||
pOldConsumer->rebRemovedTopics = pNewConsumer->rebRemovedTopics;
|
||||
pNewConsumer->rebRemovedTopics = tmp;
|
||||
|
||||
tmp = pOldConsumer->rebRemovedTopics;
|
||||
pOldConsumer->rebRemovedTopics = pNewConsumer->rebRemovedTopics;
|
||||
pNewConsumer->rebRemovedTopics = tmp;
|
||||
tmp = pOldConsumer->assignedTopics;
|
||||
pOldConsumer->assignedTopics = pNewConsumer->assignedTopics;
|
||||
pNewConsumer->assignedTopics = tmp;
|
||||
|
||||
tmp = pOldConsumer->assignedTopics;
|
||||
pOldConsumer->assignedTopics = pNewConsumer->assignedTopics;
|
||||
pNewConsumer->assignedTopics = tmp;
|
||||
|
||||
pOldConsumer->subscribeTime = pNewConsumer->upTime;
|
||||
pOldConsumer->status = MQ_CONSUMER_STATUS__MODIFY;
|
||||
}
|
||||
pOldConsumer->subscribeTime = pNewConsumer->upTime;
|
||||
pOldConsumer->status = MQ_CONSUMER_STATUS__MODIFY;
|
||||
} else if (pNewConsumer->updateType == CONSUMER_UPDATE__LOST) {
|
||||
/*A(taosArrayGetSize(pOldConsumer->rebNewTopics) == 0);*/
|
||||
/*A(taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0);*/
|
||||
|
||||
int32_t sz = taosArrayGetSize(pOldConsumer->currentTopics);
|
||||
/*pOldConsumer->rebRemovedTopics = taosArrayInit(sz, sizeof(void *));*/
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
char *topic = taosStrdup(taosArrayGetP(pOldConsumer->currentTopics, i));
|
||||
taosArrayPush(pOldConsumer->rebRemovedTopics, &topic);
|
||||
|
@ -958,9 +943,6 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
|
|||
pOldConsumer->consumerId, mndConsumerStatusName(status), mndConsumerStatusName(pOldConsumer->status),
|
||||
pOldConsumer->rebalanceTime, (int)taosArrayGetSize(pOldConsumer->rebRemovedTopics));
|
||||
} else if (pNewConsumer->updateType == CONSUMER_UPDATE__RECOVER) {
|
||||
/*A(taosArrayGetSize(pOldConsumer->currentTopics) == 0);*/
|
||||
/*A(taosArrayGetSize(pOldConsumer->rebNewTopics) == 0);*/
|
||||
|
||||
int32_t sz = taosArrayGetSize(pOldConsumer->assignedTopics);
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
char *topic = taosStrdup(taosArrayGetP(pOldConsumer->assignedTopics, i));
|
||||
|
@ -976,7 +958,6 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
|
|||
pOldConsumer->rebalanceTime = pNewConsumer->upTime;
|
||||
|
||||
} else if (pNewConsumer->updateType == CONSUMER_UPDATE__ADD) {
|
||||
ASSERT(taosArrayGetSize(pNewConsumer->rebNewTopics) == 1 && taosArrayGetSize(pNewConsumer->rebRemovedTopics) == 0);
|
||||
char *pNewTopic = taosStrdup(taosArrayGetP(pNewConsumer->rebNewTopics, 0));
|
||||
|
||||
// not exist in current topic
|
||||
|
@ -1015,15 +996,7 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
|
|||
(int)taosArrayGetSize(pOldConsumer->rebRemovedTopics));
|
||||
|
||||
} else if (pNewConsumer->updateType == CONSUMER_UPDATE__REMOVE) {
|
||||
/*A(taosArrayGetSize(pNewConsumer->rebNewTopics) == 0);*/
|
||||
/*A(taosArrayGetSize(pNewConsumer->rebRemovedTopics) == 1);*/
|
||||
char *removedTopic = taosArrayGetP(pNewConsumer->rebRemovedTopics, 0);
|
||||
#if 0
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pOldConsumer->rebNewTopics); i++) {
|
||||
char *topic = taosArrayGetP(pOldConsumer->rebNewTopics, i);
|
||||
A(strcmp(topic, removedTopic) != 0);
|
||||
}
|
||||
#endif
|
||||
|
||||
// remove from removed topic
|
||||
removeFromRemoveTopicList(pOldConsumer, removedTopic);
|
||||
|
|
|
@ -269,7 +269,7 @@ static void addUnassignedVgroups(SMqRebOutputObj *pOutput, SHashObj *pHash) {
|
|||
};
|
||||
|
||||
taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &rebOutput, sizeof(SMqRebOutputVg));
|
||||
mInfo("sub:%s mq re-balance remove vgId:%d from unassigned", pSubKey, pVgEp->vgId);
|
||||
mInfo("sub:%s mq re-balance addUnassignedVgroups vgId:%d from unassigned", pSubKey, pVgEp->vgId);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -289,9 +289,9 @@ static void transferVgroupsForConsumers(SMqRebOutputObj *pOutput, SHashObj *pHas
|
|||
SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
|
||||
int32_t consumerVgNum = taosArrayGetSize(pConsumerEp->vgs);
|
||||
|
||||
// all old consumers still existing are touched
|
||||
// TODO optimize: touch only consumer whose vgs changed
|
||||
taosArrayPush(pOutput->touchedConsumers, &pConsumerEp->consumerId);
|
||||
// all old consumers still existing need to be modified
|
||||
// TODO optimize: modify only consumer whose vgs changed
|
||||
taosArrayPush(pOutput->modifyConsumers, &pConsumerEp->consumerId);
|
||||
if (consumerVgNum > minVgCnt) {
|
||||
if (imbCnt < imbConsumerNum) {
|
||||
if (consumerVgNum == minVgCnt + 1) {
|
||||
|
@ -339,13 +339,13 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
|
|||
mInfo("sub:%s mq re-balance %d vgroups, existed consumers:%d, added:%d, removed:%d", pSubKey, totalVgNum,
|
||||
pInput->oldConsumerNum, numOfAdded, numOfRemoved);
|
||||
|
||||
// 1. build temporary hash(vgId -> SMqRebOutputVg) to store modified vg
|
||||
// 1. build temporary hash(vgId -> SMqRebOutputVg) to store vg that need to be assigned
|
||||
SHashObj *pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
|
||||
|
||||
// 2. check and get actual removed consumers, put their vg into hash
|
||||
// 2. check and get actual removed consumers, put their vg into pHash
|
||||
doRemoveExistedConsumers(pOutput, pHash, pInput);
|
||||
|
||||
// 3. if previously no consumer, there are vgs not assigned
|
||||
// 3. if previously no consumer, there are vgs not assigned, put these vg into pHash
|
||||
addUnassignedVgroups(pOutput, pHash);
|
||||
|
||||
// 4. calc vg number of each consumer
|
||||
|
@ -364,19 +364,17 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
|
|||
mInfo("sub:%s no consumer subscribe this topic", pSubKey);
|
||||
}
|
||||
|
||||
// 5. first scan: remove vgroups from te consumers, who have more vgroups than the threashold value that is
|
||||
// minVgCnt, and then put them into the recycled hash list
|
||||
// 5. remove vgroups from consumers who have more vgroups than the threshold value(minVgCnt or minVgCnt + 1), and then another vg into pHash
|
||||
transferVgroupsForConsumers(pOutput, pHash, minVgCnt, imbConsumerNum);
|
||||
|
||||
// 6. add new consumer into sub
|
||||
doAddNewConsumers(pOutput, pInput);
|
||||
|
||||
// 7. second scan: find consumer do not have enough vgroups, extract from temporary hash and assign to them
|
||||
// All related vg should be put into rebVgs
|
||||
SMqRebOutputVg *pRebVg = NULL;
|
||||
void *pRemovedIter = NULL;
|
||||
void *pIter = NULL;
|
||||
|
||||
// 7. extract bgroups from pHash and assign to consumers that do not have enough vgroups
|
||||
while (1) {
|
||||
pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
|
||||
if (pIter == NULL) {
|
||||
|
@ -390,68 +388,52 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
|
|||
// iter hash and find one vg
|
||||
pRemovedIter = taosHashIterate(pHash, pRemovedIter);
|
||||
if (pRemovedIter == NULL) {
|
||||
mError("sub:%s removed iter is null", pSubKey);
|
||||
mError("sub:%s removed iter is null, never can reach hear", pSubKey);
|
||||
break;
|
||||
}
|
||||
|
||||
pRebVg = (SMqRebOutputVg *)pRemovedIter;
|
||||
// push
|
||||
taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp);
|
||||
pRebVg->newConsumerId = pConsumerEp->consumerId;
|
||||
taosArrayPush(pOutput->rebVgs, pRebVg);
|
||||
mInfo("mq rebalance: add vgId:%d to consumer:0x%" PRIx64 " (second scan) (not enough)", pRebVg->pVgEp->vgId,
|
||||
pConsumerEp->consumerId);
|
||||
taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp);
|
||||
mInfo("mq rebalance: add vgId:%d to consumer:0x%" PRIx64 " for average", pRebVg->pVgEp->vgId, pConsumerEp->consumerId);
|
||||
}
|
||||
}
|
||||
|
||||
// 7. handle unassigned vg
|
||||
if (taosHashGetSize(pOutput->pSub->consumerHash) != 0) {
|
||||
// if has consumer, assign all left vg
|
||||
while (1) {
|
||||
SMqConsumerEp *pConsumerEp = NULL;
|
||||
while (1) {
|
||||
pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
|
||||
if (pIter == NULL) {
|
||||
break;
|
||||
}
|
||||
SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
|
||||
|
||||
if (taosArrayGetSize(pConsumerEp->vgs) == minVgCnt) {
|
||||
pRemovedIter = taosHashIterate(pHash, pRemovedIter);
|
||||
if (pRemovedIter == NULL) {
|
||||
if (pIter != NULL) {
|
||||
taosHashCancelIterate(pOutput->pSub->consumerHash, pIter);
|
||||
pIter = NULL;
|
||||
}
|
||||
mInfo("sub:%s removed iter is null", pSubKey);
|
||||
break;
|
||||
}
|
||||
while (1) {
|
||||
pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
|
||||
pConsumerEp = (SMqConsumerEp *)pIter;
|
||||
|
||||
if (taosArrayGetSize(pConsumerEp->vgs) == minVgCnt) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
pRebVg = (SMqRebOutputVg *)pRemovedIter;
|
||||
taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp);
|
||||
pRebVg->newConsumerId = pConsumerEp->consumerId;
|
||||
if (pRebVg->newConsumerId == pRebVg->oldConsumerId) {
|
||||
mInfo("mq rebalance: skip vg %d for same consumer:0x%" PRIx64 " (second scan)", pRebVg->pVgEp->vgId,
|
||||
pConsumerEp->consumerId);
|
||||
continue;
|
||||
}
|
||||
taosArrayPush(pOutput->rebVgs, pRebVg);
|
||||
mInfo("mq rebalance: add vgId:%d to consumer:0x%" PRIx64 " (second scan) (unassigned)", pRebVg->pVgEp->vgId,
|
||||
pConsumerEp->consumerId);
|
||||
taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp);
|
||||
mInfo("mq rebalance: add vgId:%d to consumer:0x%" PRIx64 " for average + 1", pRebVg->pVgEp->vgId, pConsumerEp->consumerId);
|
||||
}
|
||||
} else {
|
||||
// if all consumer is removed, put all vg into unassigned
|
||||
pIter = NULL;
|
||||
SMqRebOutputVg *pRebOutput = NULL;
|
||||
while (1) {
|
||||
pIter = taosHashIterate(pHash, pIter);
|
||||
if (pIter == NULL) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
pRebOutput = (SMqRebOutputVg *)pIter;
|
||||
// All assigned vg should be put into pOutput->rebVgs
|
||||
if(pRemovedIter != NULL){
|
||||
mError("sub:%s error pRemovedIter should be NULL", pSubKey);
|
||||
}
|
||||
while (1) {
|
||||
pRemovedIter = taosHashIterate(pHash, pRemovedIter);
|
||||
if (pRemovedIter == NULL) {
|
||||
break;
|
||||
}
|
||||
|
||||
SMqRebOutputVg* pRebOutput = (SMqRebOutputVg *)pRemovedIter;
|
||||
taosArrayPush(pOutput->rebVgs, pRebOutput);
|
||||
if(taosHashGetSize(pOutput->pSub->consumerHash) == 0){ // if all consumer is removed, put all vg into unassigned
|
||||
taosArrayPush(pOutput->pSub->unassignedVgs, &pRebOutput->pVgEp);
|
||||
taosArrayPush(pOutput->rebVgs, pRebOutput);
|
||||
mInfo("sub:%s mq re-balance unassign vgId:%d (second scan)", pSubKey, pRebOutput->pVgEp->vgId);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -462,19 +444,18 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
|
|||
mInfo("sub:%s mq re-balance vgId:%d, moved from consumer:0x%" PRIx64 ", to consumer:0x%" PRIx64, pSubKey,
|
||||
pOutputRebVg->pVgEp->vgId, pOutputRebVg->oldConsumerId, pOutputRebVg->newConsumerId);
|
||||
}
|
||||
{
|
||||
pIter = NULL;
|
||||
while (1) {
|
||||
pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
|
||||
if (pIter == NULL) break;
|
||||
SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
|
||||
int32_t sz = taosArrayGetSize(pConsumerEp->vgs);
|
||||
mInfo("sub:%s mq re-balance final cfg: consumer:0x%" PRIx64 " has %d vg", pSubKey, pConsumerEp->consumerId, sz);
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, i);
|
||||
mInfo("sub:%s mq re-balance final cfg: vg %d to consumer:0x%" PRIx64, pSubKey, pVgEp->vgId,
|
||||
pConsumerEp->consumerId);
|
||||
}
|
||||
|
||||
pIter = NULL;
|
||||
while (1) {
|
||||
pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
|
||||
if (pIter == NULL) break;
|
||||
SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
|
||||
int32_t sz = taosArrayGetSize(pConsumerEp->vgs);
|
||||
mInfo("sub:%s mq re-balance final cfg: consumer:0x%" PRIx64 " has %d vg", pSubKey, pConsumerEp->consumerId, sz);
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, i);
|
||||
mInfo("sub:%s mq re-balance final cfg: vg %d to consumer:0x%" PRIx64, pSubKey, pVgEp->vgId,
|
||||
pConsumerEp->consumerId);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -515,9 +496,9 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu
|
|||
|
||||
// 3. commit log: consumer to update status and epoch
|
||||
// 3.1 set touched consumer
|
||||
int32_t consumerNum = taosArrayGetSize(pOutput->touchedConsumers);
|
||||
int32_t consumerNum = taosArrayGetSize(pOutput->modifyConsumers);
|
||||
for (int32_t i = 0; i < consumerNum; i++) {
|
||||
int64_t consumerId = *(int64_t *)taosArrayGet(pOutput->touchedConsumers, i);
|
||||
int64_t consumerId = *(int64_t *)taosArrayGet(pOutput->modifyConsumers, i);
|
||||
SMqConsumerObj *pConsumerOld = mndAcquireConsumer(pMnode, consumerId);
|
||||
SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumerOld->consumerId, pConsumerOld->cgroup);
|
||||
pConsumerNew->updateType = CONSUMER_UPDATE__TOUCH;
|
||||
|
@ -597,15 +578,15 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
|
|||
SMnode *pMnode = pMsg->info.node;
|
||||
SMqDoRebalanceMsg *pReq = pMsg->pCont;
|
||||
void *pIter = NULL;
|
||||
bool rebalanceOnce = false; // to ensure only once.
|
||||
// bool rebalanceOnce = false; // to ensure only once.
|
||||
|
||||
mInfo("mq re-balance start, total required re-balanced trans:%d", taosHashGetSize(pReq->rebSubHash));
|
||||
|
||||
// here we only handle one topic rebalance requirement to ensure the atomic execution of this transaction.
|
||||
while (1) {
|
||||
if (rebalanceOnce) {
|
||||
break;
|
||||
}
|
||||
// if (rebalanceOnce) {
|
||||
// break;
|
||||
// }
|
||||
|
||||
pIter = taosHashIterate(pReq->rebSubHash, pIter);
|
||||
if (pIter == NULL) {
|
||||
|
@ -617,7 +598,7 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
|
|||
SMqRebOutputObj rebOutput = {0};
|
||||
rebOutput.newConsumers = taosArrayInit(0, sizeof(int64_t));
|
||||
rebOutput.removedConsumers = taosArrayInit(0, sizeof(int64_t));
|
||||
rebOutput.touchedConsumers = taosArrayInit(0, sizeof(int64_t));
|
||||
rebOutput.modifyConsumers = taosArrayInit(0, sizeof(int64_t));
|
||||
rebOutput.rebVgs = taosArrayInit(0, sizeof(SMqRebOutputVg));
|
||||
|
||||
SMqRebInfo *pRebInfo = (SMqRebInfo *)pIter;
|
||||
|
@ -653,13 +634,13 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
|
|||
mndReleaseTopic(pMnode, pTopic);
|
||||
|
||||
rebInput.oldConsumerNum = 0;
|
||||
mInfo("topic:%s has no consumers sub yet", topic);
|
||||
mInfo("sub topic:%s has no consumers sub yet", pRebInfo->key);
|
||||
} else {
|
||||
taosRLockLatch(&pSub->lock);
|
||||
rebInput.oldConsumerNum = taosHashGetSize(pSub->consumerHash);
|
||||
rebOutput.pSub = tCloneSubscribeObj(pSub);
|
||||
taosRUnLockLatch(&pSub->lock);
|
||||
mInfo("topic:%s has %d consumers sub till now", pRebInfo->key, rebInput.oldConsumerNum);
|
||||
mInfo("sub topic:%s has %d consumers sub till now", pRebInfo->key, rebInput.oldConsumerNum);
|
||||
mndReleaseSubscribe(pMnode, pSub);
|
||||
}
|
||||
|
||||
|
@ -675,13 +656,14 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
|
|||
}
|
||||
|
||||
taosArrayDestroy(rebOutput.newConsumers);
|
||||
taosArrayDestroy(rebOutput.touchedConsumers);
|
||||
taosArrayDestroy(rebOutput.modifyConsumers);
|
||||
taosArrayDestroy(rebOutput.removedConsumers);
|
||||
taosArrayDestroy(rebOutput.rebVgs);
|
||||
tDeleteSubscribeObj(rebOutput.pSub);
|
||||
taosMemoryFree(rebOutput.pSub);
|
||||
|
||||
rebalanceOnce = true;
|
||||
// taosSsleep(100);
|
||||
// rebalanceOnce = true;
|
||||
}
|
||||
|
||||
// reset flag
|
||||
|
|
|
@ -196,13 +196,13 @@ static int32_t sdbUpdateRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *
|
|||
SSdbRow *pOldRow = *ppOldRow;
|
||||
pOldRow->status = pRaw->status;
|
||||
sdbPrintOper(pSdb, pOldRow, "update");
|
||||
sdbUnLock(pSdb, type);
|
||||
|
||||
int32_t code = 0;
|
||||
SdbUpdateFp updateFp = pSdb->updateFps[type];
|
||||
if (updateFp != NULL) {
|
||||
code = (*updateFp)(pSdb, pOldRow->pObj, pNewRow->pObj);
|
||||
}
|
||||
sdbUnLock(pSdb, type);
|
||||
|
||||
// sdbUnLock(pSdb, type);
|
||||
sdbFreeRow(pSdb, pNewRow, false);
|
||||
|
|
|
@ -1154,6 +1154,7 @@ int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec) {
|
|||
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
|
||||
return 0;
|
||||
} else {
|
||||
tDeleteStreamDispatchReq(&req);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
@ -1196,6 +1197,7 @@ int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
tDeleteStreamRetrieveReq(&req);
|
||||
return 0;
|
||||
} else {
|
||||
tDeleteStreamRetrieveReq(&req);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -176,10 +176,12 @@ int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask) {
|
|||
// add to the ready tasks hash map, not the restored tasks hash map
|
||||
int32_t streamMetaAddDeployedTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask) {
|
||||
if (pMeta->expandFunc(pMeta->ahandle, pTask, ver) < 0) {
|
||||
tFreeStreamTask(pTask);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (streamMetaSaveTask(pMeta, pTask) < 0) {
|
||||
tFreeStreamTask(pTask);
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
|
|
@ -48,8 +48,6 @@ extern "C" {
|
|||
void syncOneReplicaAdvance(SSyncNode* pSyncNode);
|
||||
void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode);
|
||||
|
||||
bool syncAgreeIndex(SSyncNode* pSyncNode, SRaftId* pRaftId, SyncIndex index);
|
||||
bool syncAgree(SSyncNode* pSyncNode, SyncIndex index);
|
||||
bool syncNodeAgreedUpon(SSyncNode* pNode, SyncIndex index);
|
||||
|
||||
int64_t syncNodeUpdateCommitIndex(SSyncNode* ths, SyncIndex commitIndex);
|
||||
|
|
|
@ -55,7 +55,6 @@ int32_t syncNodeReplicateReset(SSyncNode* pSyncNode, SRaftId* pDestId);
|
|||
int32_t syncNodeReplicateWithoutLock(SSyncNode* pNode);
|
||||
|
||||
int32_t syncNodeSendAppendEntries(SSyncNode* pNode, const SRaftId* destRaftId, SRpcMsg* pRpcMsg);
|
||||
int32_t syncNodeMaybeSendAppendEntries(SSyncNode* pNode, const SRaftId* destRaftId, SRpcMsg* pRpcMsg);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -89,17 +89,6 @@
|
|||
// /\ UNCHANGED <<candidateVars, leaderVars>>
|
||||
//
|
||||
|
||||
SSyncRaftEntry* syncBuildRaftEntryFromAppendEntries(const SyncAppendEntries* pMsg) {
|
||||
SSyncRaftEntry* pEntry = taosMemoryMalloc(pMsg->dataLen);
|
||||
if (pEntry == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
(void)memcpy(pEntry, pMsg->data, pMsg->dataLen);
|
||||
ASSERT(pEntry->bytes == pMsg->dataLen);
|
||||
return pEntry;
|
||||
}
|
||||
|
||||
int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
|
||||
SyncAppendEntries* pMsg = pRpcMsg->pCont;
|
||||
SRpcMsg rpcRsp = {0};
|
||||
|
@ -146,7 +135,7 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
|
|||
goto _IGNORE;
|
||||
}
|
||||
|
||||
pEntry = syncBuildRaftEntryFromAppendEntries(pMsg);
|
||||
pEntry = syncEntryBuildFromAppendEntries(pMsg);
|
||||
if (pEntry == NULL) {
|
||||
sError("vgId:%d, failed to get raft entry from append entries since %s", ths->vgId, terrstr());
|
||||
goto _IGNORE;
|
||||
|
|
|
@ -44,22 +44,6 @@
|
|||
// /\ UNCHANGED <<messages, serverVars, candidateVars, leaderVars, log>>
|
||||
//
|
||||
|
||||
bool syncAgreeIndex(SSyncNode* pSyncNode, SRaftId* pRaftId, SyncIndex index) {
|
||||
// I am leader, I agree
|
||||
if (syncUtilSameId(pRaftId, &(pSyncNode->myRaftId)) && pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
|
||||
return true;
|
||||
}
|
||||
|
||||
// follower agree
|
||||
SyncIndex matchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, pRaftId);
|
||||
if (matchIndex >= index) {
|
||||
return true;
|
||||
}
|
||||
|
||||
// not agree
|
||||
return false;
|
||||
}
|
||||
|
||||
static inline int64_t syncNodeAbs64(int64_t a, int64_t b) {
|
||||
ASSERT(a >= 0);
|
||||
ASSERT(b >= 0);
|
||||
|
@ -85,19 +69,6 @@ bool syncNodeAgreedUpon(SSyncNode* pNode, SyncIndex index) {
|
|||
return count >= pNode->quorum;
|
||||
}
|
||||
|
||||
bool syncAgree(SSyncNode* pNode, SyncIndex index) {
|
||||
int agreeCount = 0;
|
||||
for (int i = 0; i < pNode->replicaNum; ++i) {
|
||||
if (syncAgreeIndex(pNode, &(pNode->replicasId[i]), index)) {
|
||||
++agreeCount;
|
||||
}
|
||||
if (agreeCount >= pNode->quorum) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
int64_t syncNodeUpdateCommitIndex(SSyncNode* ths, SyncIndex commitIndex) {
|
||||
SyncIndex lastVer = ths->pLogStore->syncLogLastIndex(ths->pLogStore);
|
||||
commitIndex = TMAX(commitIndex, ths->commitIndex);
|
||||
|
|
|
@ -64,10 +64,13 @@ SSyncRaftEntry* syncEntryBuildFromRpcMsg(const SRpcMsg* pMsg, SyncTerm term, Syn
|
|||
}
|
||||
|
||||
SSyncRaftEntry* syncEntryBuildFromAppendEntries(const SyncAppendEntries* pMsg) {
|
||||
SSyncRaftEntry* pEntry = syncEntryBuild((int32_t)(pMsg->dataLen));
|
||||
if (pEntry == NULL) return NULL;
|
||||
|
||||
SSyncRaftEntry* pEntry = taosMemoryMalloc(pMsg->dataLen);
|
||||
if (pEntry == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
memcpy(pEntry, pMsg->data, pMsg->dataLen);
|
||||
ASSERT(pEntry->bytes == pMsg->dataLen);
|
||||
return pEntry;
|
||||
}
|
||||
|
||||
|
|
|
@ -46,8 +46,6 @@
|
|||
// mdest |-> j])
|
||||
// /\ UNCHANGED <<serverVars, candidateVars, leaderVars, logVars>>
|
||||
|
||||
int32_t syncNodeMaybeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, SRpcMsg* pRpcMsg);
|
||||
|
||||
int32_t syncNodeReplicateReset(SSyncNode* pNode, SRaftId* pDestId) {
|
||||
SSyncLogBuffer* pBuf = pNode->pLogBuf;
|
||||
taosThreadMutexLock(&pBuf->mutex);
|
||||
|
@ -86,20 +84,6 @@ int32_t syncNodeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftI
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t syncNodeMaybeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, SRpcMsg* pRpcMsg) {
|
||||
int32_t ret = 0;
|
||||
SyncAppendEntries* pMsg = pRpcMsg->pCont;
|
||||
|
||||
if (syncNodeNeedSendAppendEntries(pSyncNode, destRaftId, pMsg)) {
|
||||
ret = syncNodeSendAppendEntries(pSyncNode, destRaftId, pRpcMsg);
|
||||
} else {
|
||||
sNTrace(pSyncNode, "do not repcate to dnode:%d for index:%" PRId64, DID(destRaftId), pMsg->prevLogIndex + 1);
|
||||
rpcFreeCont(pRpcMsg->pCont);
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
int32_t syncNodeSendHeartbeat(SSyncNode* pSyncNode, const SRaftId* destId, SRpcMsg* pMsg) {
|
||||
return syncNodeSendMsgById(destId, pSyncNode, pMsg);
|
||||
}
|
||||
|
|
|
@ -226,7 +226,7 @@ class TDTestCase:
|
|||
event.wait()
|
||||
|
||||
tdLog.info("start consume processor")
|
||||
pollDelay = 100
|
||||
pollDelay = 20
|
||||
showMsg = 1
|
||||
showRow = 1
|
||||
self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow)
|
||||
|
|
Loading…
Reference in New Issue