diff --git a/examples/JDBC/taosdemo/pom.xml b/examples/JDBC/taosdemo/pom.xml
index 4731d8e237..0d47663bba 100644
--- a/examples/JDBC/taosdemo/pom.xml
+++ b/examples/JDBC/taosdemo/pom.xml
@@ -10,7 +10,7 @@
Demo project for TDengine
- 5.3.26
+ 5.3.27
diff --git a/include/common/tmsg.h b/include/common/tmsg.h
index bb2450e8f7..02c097b8d0 100644
--- a/include/common/tmsg.h
+++ b/include/common/tmsg.h
@@ -2071,7 +2071,6 @@ static FORCE_INLINE void* tDeserializeSMVSubscribeReq(void* buf, SMVSubscribeReq
typedef struct {
char key[TSDB_SUBSCRIBE_KEY_LEN];
- SArray* lostConsumers; // SArray
SArray* removedConsumers; // SArray
SArray* newConsumers; // SArray
} SMqRebInfo;
@@ -2082,10 +2081,6 @@ static FORCE_INLINE SMqRebInfo* tNewSMqRebSubscribe(const char* key) {
return NULL;
}
tstrncpy(pRebInfo->key, key, TSDB_SUBSCRIBE_KEY_LEN);
- pRebInfo->lostConsumers = taosArrayInit(0, sizeof(int64_t));
- if (pRebInfo->lostConsumers == NULL) {
- goto _err;
- }
pRebInfo->removedConsumers = taosArrayInit(0, sizeof(int64_t));
if (pRebInfo->removedConsumers == NULL) {
goto _err;
@@ -2096,7 +2091,6 @@ static FORCE_INLINE SMqRebInfo* tNewSMqRebSubscribe(const char* key) {
}
return pRebInfo;
_err:
- taosArrayDestroy(pRebInfo->lostConsumers);
taosArrayDestroy(pRebInfo->removedConsumers);
taosArrayDestroy(pRebInfo->newConsumers);
taosMemoryFreeClear(pRebInfo);
diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h
index 3a4f06f6fa..fcd314d2ae 100644
--- a/source/dnode/mnode/impl/inc/mndDef.h
+++ b/source/dnode/mnode/impl/inc/mndDef.h
@@ -137,12 +137,12 @@ typedef enum {
} EDndReason;
typedef enum {
- CONSUMER_UPDATE__TOUCH = 1,
+ CONSUMER_UPDATE__TOUCH = 1, // rebalance req do not need change consume topic
CONSUMER_UPDATE__ADD,
CONSUMER_UPDATE__REMOVE,
CONSUMER_UPDATE__LOST,
CONSUMER_UPDATE__RECOVER,
- CONSUMER_UPDATE__MODIFY,
+ CONSUMER_UPDATE__MODIFY, // subscribe req need change consume topic
} ECsmUpdateType;
typedef struct {
@@ -624,7 +624,7 @@ typedef struct {
SArray* rebVgs; // SArray
SArray* newConsumers; // SArray
SArray* removedConsumers; // SArray
- SArray* touchedConsumers; // SArray
+ SArray* modifyConsumers; // SArray
SMqSubscribeObj* pSub;
SMqSubActionLogEntry* pLogEntry;
} SMqRebOutputObj;
diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c
index 1b146506a2..65a2fa72a2 100644
--- a/source/dnode/mnode/impl/src/mndConsumer.c
+++ b/source/dnode/mnode/impl/src/mndConsumer.c
@@ -247,7 +247,6 @@ static SMqRebInfo *mndGetOrCreateRebSub(SHashObj *pHash, const char *key) {
static void freeRebalanceItem(void *param) {
SMqRebInfo *pInfo = param;
- taosArrayDestroy(pInfo->lostConsumers);
taosArrayDestroy(pInfo->newConsumers);
taosArrayDestroy(pInfo->removedConsumers);
}
@@ -335,7 +334,7 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) {
taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId);
}
taosRUnLockLatch(&pConsumer->lock);
- } else if (status == MQ_CONSUMER_STATUS__MODIFY) {
+ } else {
taosRLockLatch(&pConsumer->lock);
int32_t newTopicNum = taosArrayGetSize(pConsumer->rebNewTopics);
@@ -356,8 +355,6 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) {
taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId);
}
taosRUnLockLatch(&pConsumer->lock);
- } else {
- // do nothing
}
mndReleaseConsumer(pMnode, pConsumer);
@@ -917,34 +914,22 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
taosWLockLatch(&pOldConsumer->lock);
if (pNewConsumer->updateType == CONSUMER_UPDATE__MODIFY) {
- /*A(taosArrayGetSize(pOldConsumer->rebNewTopics) == 0);*/
- /*A(taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0);*/
+ SArray *tmp = pOldConsumer->rebNewTopics;
+ pOldConsumer->rebNewTopics = pNewConsumer->rebNewTopics;
+ pNewConsumer->rebNewTopics = tmp;
- // this new consumer has identical topics with one existed consumers.
- if (taosArrayGetSize(pNewConsumer->rebNewTopics) == 0 && taosArrayGetSize(pNewConsumer->rebRemovedTopics) == 0) {
- pOldConsumer->status = MQ_CONSUMER_STATUS__READY;
- } else {
- SArray *tmp = pOldConsumer->rebNewTopics;
- pOldConsumer->rebNewTopics = pNewConsumer->rebNewTopics;
- pNewConsumer->rebNewTopics = tmp;
+ tmp = pOldConsumer->rebRemovedTopics;
+ pOldConsumer->rebRemovedTopics = pNewConsumer->rebRemovedTopics;
+ pNewConsumer->rebRemovedTopics = tmp;
- tmp = pOldConsumer->rebRemovedTopics;
- pOldConsumer->rebRemovedTopics = pNewConsumer->rebRemovedTopics;
- pNewConsumer->rebRemovedTopics = tmp;
+ tmp = pOldConsumer->assignedTopics;
+ pOldConsumer->assignedTopics = pNewConsumer->assignedTopics;
+ pNewConsumer->assignedTopics = tmp;
- tmp = pOldConsumer->assignedTopics;
- pOldConsumer->assignedTopics = pNewConsumer->assignedTopics;
- pNewConsumer->assignedTopics = tmp;
-
- pOldConsumer->subscribeTime = pNewConsumer->upTime;
- pOldConsumer->status = MQ_CONSUMER_STATUS__MODIFY;
- }
+ pOldConsumer->subscribeTime = pNewConsumer->upTime;
+ pOldConsumer->status = MQ_CONSUMER_STATUS__MODIFY;
} else if (pNewConsumer->updateType == CONSUMER_UPDATE__LOST) {
- /*A(taosArrayGetSize(pOldConsumer->rebNewTopics) == 0);*/
- /*A(taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0);*/
-
int32_t sz = taosArrayGetSize(pOldConsumer->currentTopics);
- /*pOldConsumer->rebRemovedTopics = taosArrayInit(sz, sizeof(void *));*/
for (int32_t i = 0; i < sz; i++) {
char *topic = taosStrdup(taosArrayGetP(pOldConsumer->currentTopics, i));
taosArrayPush(pOldConsumer->rebRemovedTopics, &topic);
@@ -958,9 +943,6 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
pOldConsumer->consumerId, mndConsumerStatusName(status), mndConsumerStatusName(pOldConsumer->status),
pOldConsumer->rebalanceTime, (int)taosArrayGetSize(pOldConsumer->rebRemovedTopics));
} else if (pNewConsumer->updateType == CONSUMER_UPDATE__RECOVER) {
- /*A(taosArrayGetSize(pOldConsumer->currentTopics) == 0);*/
- /*A(taosArrayGetSize(pOldConsumer->rebNewTopics) == 0);*/
-
int32_t sz = taosArrayGetSize(pOldConsumer->assignedTopics);
for (int32_t i = 0; i < sz; i++) {
char *topic = taosStrdup(taosArrayGetP(pOldConsumer->assignedTopics, i));
@@ -976,7 +958,6 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
pOldConsumer->rebalanceTime = pNewConsumer->upTime;
} else if (pNewConsumer->updateType == CONSUMER_UPDATE__ADD) {
- ASSERT(taosArrayGetSize(pNewConsumer->rebNewTopics) == 1 && taosArrayGetSize(pNewConsumer->rebRemovedTopics) == 0);
char *pNewTopic = taosStrdup(taosArrayGetP(pNewConsumer->rebNewTopics, 0));
// not exist in current topic
@@ -1015,15 +996,7 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
(int)taosArrayGetSize(pOldConsumer->rebRemovedTopics));
} else if (pNewConsumer->updateType == CONSUMER_UPDATE__REMOVE) {
- /*A(taosArrayGetSize(pNewConsumer->rebNewTopics) == 0);*/
- /*A(taosArrayGetSize(pNewConsumer->rebRemovedTopics) == 1);*/
char *removedTopic = taosArrayGetP(pNewConsumer->rebRemovedTopics, 0);
-#if 0
- for (int32_t i = 0; i < taosArrayGetSize(pOldConsumer->rebNewTopics); i++) {
- char *topic = taosArrayGetP(pOldConsumer->rebNewTopics, i);
- A(strcmp(topic, removedTopic) != 0);
- }
-#endif
// remove from removed topic
removeFromRemoveTopicList(pOldConsumer, removedTopic);
diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c
index 49921c9a1a..015c497de1 100644
--- a/source/dnode/mnode/impl/src/mndSubscribe.c
+++ b/source/dnode/mnode/impl/src/mndSubscribe.c
@@ -269,7 +269,7 @@ static void addUnassignedVgroups(SMqRebOutputObj *pOutput, SHashObj *pHash) {
};
taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &rebOutput, sizeof(SMqRebOutputVg));
- mInfo("sub:%s mq re-balance remove vgId:%d from unassigned", pSubKey, pVgEp->vgId);
+ mInfo("sub:%s mq re-balance addUnassignedVgroups vgId:%d from unassigned", pSubKey, pVgEp->vgId);
}
}
@@ -289,9 +289,9 @@ static void transferVgroupsForConsumers(SMqRebOutputObj *pOutput, SHashObj *pHas
SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
int32_t consumerVgNum = taosArrayGetSize(pConsumerEp->vgs);
- // all old consumers still existing are touched
- // TODO optimize: touch only consumer whose vgs changed
- taosArrayPush(pOutput->touchedConsumers, &pConsumerEp->consumerId);
+ // all old consumers still existing need to be modified
+ // TODO optimize: modify only consumer whose vgs changed
+ taosArrayPush(pOutput->modifyConsumers, &pConsumerEp->consumerId);
if (consumerVgNum > minVgCnt) {
if (imbCnt < imbConsumerNum) {
if (consumerVgNum == minVgCnt + 1) {
@@ -339,13 +339,13 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
mInfo("sub:%s mq re-balance %d vgroups, existed consumers:%d, added:%d, removed:%d", pSubKey, totalVgNum,
pInput->oldConsumerNum, numOfAdded, numOfRemoved);
- // 1. build temporary hash(vgId -> SMqRebOutputVg) to store modified vg
+ // 1. build temporary hash(vgId -> SMqRebOutputVg) to store vg that need to be assigned
SHashObj *pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
- // 2. check and get actual removed consumers, put their vg into hash
+ // 2. check and get actual removed consumers, put their vg into pHash
doRemoveExistedConsumers(pOutput, pHash, pInput);
- // 3. if previously no consumer, there are vgs not assigned
+ // 3. if previously no consumer, there are vgs not assigned, put these vg into pHash
addUnassignedVgroups(pOutput, pHash);
// 4. calc vg number of each consumer
@@ -364,19 +364,17 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
mInfo("sub:%s no consumer subscribe this topic", pSubKey);
}
- // 5. first scan: remove vgroups from te consumers, who have more vgroups than the threashold value that is
- // minVgCnt, and then put them into the recycled hash list
+ // 5. remove vgroups from consumers who have more vgroups than the threshold value(minVgCnt or minVgCnt + 1), and then another vg into pHash
transferVgroupsForConsumers(pOutput, pHash, minVgCnt, imbConsumerNum);
// 6. add new consumer into sub
doAddNewConsumers(pOutput, pInput);
- // 7. second scan: find consumer do not have enough vgroups, extract from temporary hash and assign to them
- // All related vg should be put into rebVgs
SMqRebOutputVg *pRebVg = NULL;
void *pRemovedIter = NULL;
void *pIter = NULL;
+ // 7. extract bgroups from pHash and assign to consumers that do not have enough vgroups
while (1) {
pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
if (pIter == NULL) {
@@ -390,68 +388,52 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
// iter hash and find one vg
pRemovedIter = taosHashIterate(pHash, pRemovedIter);
if (pRemovedIter == NULL) {
- mError("sub:%s removed iter is null", pSubKey);
+ mError("sub:%s removed iter is null, never can reach hear", pSubKey);
break;
}
pRebVg = (SMqRebOutputVg *)pRemovedIter;
- // push
- taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp);
pRebVg->newConsumerId = pConsumerEp->consumerId;
- taosArrayPush(pOutput->rebVgs, pRebVg);
- mInfo("mq rebalance: add vgId:%d to consumer:0x%" PRIx64 " (second scan) (not enough)", pRebVg->pVgEp->vgId,
- pConsumerEp->consumerId);
+ taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp);
+ mInfo("mq rebalance: add vgId:%d to consumer:0x%" PRIx64 " for average", pRebVg->pVgEp->vgId, pConsumerEp->consumerId);
}
}
- // 7. handle unassigned vg
- if (taosHashGetSize(pOutput->pSub->consumerHash) != 0) {
- // if has consumer, assign all left vg
- while (1) {
- SMqConsumerEp *pConsumerEp = NULL;
+ while (1) {
+ pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
+ if (pIter == NULL) {
+ break;
+ }
+ SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
+
+ if (taosArrayGetSize(pConsumerEp->vgs) == minVgCnt) {
pRemovedIter = taosHashIterate(pHash, pRemovedIter);
if (pRemovedIter == NULL) {
- if (pIter != NULL) {
- taosHashCancelIterate(pOutput->pSub->consumerHash, pIter);
- pIter = NULL;
- }
+ mInfo("sub:%s removed iter is null", pSubKey);
break;
}
- while (1) {
- pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
- pConsumerEp = (SMqConsumerEp *)pIter;
- if (taosArrayGetSize(pConsumerEp->vgs) == minVgCnt) {
- break;
- }
- }
pRebVg = (SMqRebOutputVg *)pRemovedIter;
- taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp);
pRebVg->newConsumerId = pConsumerEp->consumerId;
- if (pRebVg->newConsumerId == pRebVg->oldConsumerId) {
- mInfo("mq rebalance: skip vg %d for same consumer:0x%" PRIx64 " (second scan)", pRebVg->pVgEp->vgId,
- pConsumerEp->consumerId);
- continue;
- }
- taosArrayPush(pOutput->rebVgs, pRebVg);
- mInfo("mq rebalance: add vgId:%d to consumer:0x%" PRIx64 " (second scan) (unassigned)", pRebVg->pVgEp->vgId,
- pConsumerEp->consumerId);
+ taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp);
+ mInfo("mq rebalance: add vgId:%d to consumer:0x%" PRIx64 " for average + 1", pRebVg->pVgEp->vgId, pConsumerEp->consumerId);
}
- } else {
- // if all consumer is removed, put all vg into unassigned
- pIter = NULL;
- SMqRebOutputVg *pRebOutput = NULL;
- while (1) {
- pIter = taosHashIterate(pHash, pIter);
- if (pIter == NULL) {
- break;
- }
+ }
- pRebOutput = (SMqRebOutputVg *)pIter;
+ // All assigned vg should be put into pOutput->rebVgs
+ if(pRemovedIter != NULL){
+ mError("sub:%s error pRemovedIter should be NULL", pSubKey);
+ }
+ while (1) {
+ pRemovedIter = taosHashIterate(pHash, pRemovedIter);
+ if (pRemovedIter == NULL) {
+ break;
+ }
+ SMqRebOutputVg* pRebOutput = (SMqRebOutputVg *)pRemovedIter;
+ taosArrayPush(pOutput->rebVgs, pRebOutput);
+ if(taosHashGetSize(pOutput->pSub->consumerHash) == 0){ // if all consumer is removed, put all vg into unassigned
taosArrayPush(pOutput->pSub->unassignedVgs, &pRebOutput->pVgEp);
- taosArrayPush(pOutput->rebVgs, pRebOutput);
- mInfo("sub:%s mq re-balance unassign vgId:%d (second scan)", pSubKey, pRebOutput->pVgEp->vgId);
}
}
@@ -462,19 +444,18 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
mInfo("sub:%s mq re-balance vgId:%d, moved from consumer:0x%" PRIx64 ", to consumer:0x%" PRIx64, pSubKey,
pOutputRebVg->pVgEp->vgId, pOutputRebVg->oldConsumerId, pOutputRebVg->newConsumerId);
}
- {
- pIter = NULL;
- while (1) {
- pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
- if (pIter == NULL) break;
- SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
- int32_t sz = taosArrayGetSize(pConsumerEp->vgs);
- mInfo("sub:%s mq re-balance final cfg: consumer:0x%" PRIx64 " has %d vg", pSubKey, pConsumerEp->consumerId, sz);
- for (int32_t i = 0; i < sz; i++) {
- SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, i);
- mInfo("sub:%s mq re-balance final cfg: vg %d to consumer:0x%" PRIx64, pSubKey, pVgEp->vgId,
- pConsumerEp->consumerId);
- }
+
+ pIter = NULL;
+ while (1) {
+ pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
+ if (pIter == NULL) break;
+ SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
+ int32_t sz = taosArrayGetSize(pConsumerEp->vgs);
+ mInfo("sub:%s mq re-balance final cfg: consumer:0x%" PRIx64 " has %d vg", pSubKey, pConsumerEp->consumerId, sz);
+ for (int32_t i = 0; i < sz; i++) {
+ SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, i);
+ mInfo("sub:%s mq re-balance final cfg: vg %d to consumer:0x%" PRIx64, pSubKey, pVgEp->vgId,
+ pConsumerEp->consumerId);
}
}
@@ -515,9 +496,9 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu
// 3. commit log: consumer to update status and epoch
// 3.1 set touched consumer
- int32_t consumerNum = taosArrayGetSize(pOutput->touchedConsumers);
+ int32_t consumerNum = taosArrayGetSize(pOutput->modifyConsumers);
for (int32_t i = 0; i < consumerNum; i++) {
- int64_t consumerId = *(int64_t *)taosArrayGet(pOutput->touchedConsumers, i);
+ int64_t consumerId = *(int64_t *)taosArrayGet(pOutput->modifyConsumers, i);
SMqConsumerObj *pConsumerOld = mndAcquireConsumer(pMnode, consumerId);
SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumerOld->consumerId, pConsumerOld->cgroup);
pConsumerNew->updateType = CONSUMER_UPDATE__TOUCH;
@@ -597,15 +578,15 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
SMnode *pMnode = pMsg->info.node;
SMqDoRebalanceMsg *pReq = pMsg->pCont;
void *pIter = NULL;
- bool rebalanceOnce = false; // to ensure only once.
+// bool rebalanceOnce = false; // to ensure only once.
mInfo("mq re-balance start, total required re-balanced trans:%d", taosHashGetSize(pReq->rebSubHash));
// here we only handle one topic rebalance requirement to ensure the atomic execution of this transaction.
while (1) {
- if (rebalanceOnce) {
- break;
- }
+// if (rebalanceOnce) {
+// break;
+// }
pIter = taosHashIterate(pReq->rebSubHash, pIter);
if (pIter == NULL) {
@@ -617,7 +598,7 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
SMqRebOutputObj rebOutput = {0};
rebOutput.newConsumers = taosArrayInit(0, sizeof(int64_t));
rebOutput.removedConsumers = taosArrayInit(0, sizeof(int64_t));
- rebOutput.touchedConsumers = taosArrayInit(0, sizeof(int64_t));
+ rebOutput.modifyConsumers = taosArrayInit(0, sizeof(int64_t));
rebOutput.rebVgs = taosArrayInit(0, sizeof(SMqRebOutputVg));
SMqRebInfo *pRebInfo = (SMqRebInfo *)pIter;
@@ -653,13 +634,13 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
mndReleaseTopic(pMnode, pTopic);
rebInput.oldConsumerNum = 0;
- mInfo("topic:%s has no consumers sub yet", topic);
+ mInfo("sub topic:%s has no consumers sub yet", pRebInfo->key);
} else {
taosRLockLatch(&pSub->lock);
rebInput.oldConsumerNum = taosHashGetSize(pSub->consumerHash);
rebOutput.pSub = tCloneSubscribeObj(pSub);
taosRUnLockLatch(&pSub->lock);
- mInfo("topic:%s has %d consumers sub till now", pRebInfo->key, rebInput.oldConsumerNum);
+ mInfo("sub topic:%s has %d consumers sub till now", pRebInfo->key, rebInput.oldConsumerNum);
mndReleaseSubscribe(pMnode, pSub);
}
@@ -675,13 +656,14 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
}
taosArrayDestroy(rebOutput.newConsumers);
- taosArrayDestroy(rebOutput.touchedConsumers);
+ taosArrayDestroy(rebOutput.modifyConsumers);
taosArrayDestroy(rebOutput.removedConsumers);
taosArrayDestroy(rebOutput.rebVgs);
tDeleteSubscribeObj(rebOutput.pSub);
taosMemoryFree(rebOutput.pSub);
- rebalanceOnce = true;
+// taosSsleep(100);
+// rebalanceOnce = true;
}
// reset flag
diff --git a/source/dnode/mnode/sdb/src/sdbHash.c b/source/dnode/mnode/sdb/src/sdbHash.c
index 569c78a68c..f1cee6395b 100644
--- a/source/dnode/mnode/sdb/src/sdbHash.c
+++ b/source/dnode/mnode/sdb/src/sdbHash.c
@@ -196,13 +196,13 @@ static int32_t sdbUpdateRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *
SSdbRow *pOldRow = *ppOldRow;
pOldRow->status = pRaw->status;
sdbPrintOper(pSdb, pOldRow, "update");
- sdbUnLock(pSdb, type);
int32_t code = 0;
SdbUpdateFp updateFp = pSdb->updateFps[type];
if (updateFp != NULL) {
code = (*updateFp)(pSdb, pOldRow->pObj, pNewRow->pObj);
}
+ sdbUnLock(pSdb, type);
// sdbUnLock(pSdb, type);
sdbFreeRow(pSdb, pNewRow, false);
diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c
index 1230a352d9..ce4efe905f 100644
--- a/source/dnode/vnode/src/tq/tq.c
+++ b/source/dnode/vnode/src/tq/tq.c
@@ -1154,6 +1154,7 @@ int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec) {
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
return 0;
} else {
+ tDeleteStreamDispatchReq(&req);
return -1;
}
}
@@ -1196,6 +1197,7 @@ int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) {
tDeleteStreamRetrieveReq(&req);
return 0;
} else {
+ tDeleteStreamRetrieveReq(&req);
return -1;
}
}
diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c
index 51cc315780..c3a7b70904 100644
--- a/source/libs/stream/src/streamMeta.c
+++ b/source/libs/stream/src/streamMeta.c
@@ -176,10 +176,12 @@ int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask) {
// add to the ready tasks hash map, not the restored tasks hash map
int32_t streamMetaAddDeployedTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask) {
if (pMeta->expandFunc(pMeta->ahandle, pTask, ver) < 0) {
+ tFreeStreamTask(pTask);
return -1;
}
if (streamMetaSaveTask(pMeta, pTask) < 0) {
+ tFreeStreamTask(pTask);
return -1;
}
diff --git a/source/libs/sync/inc/syncCommit.h b/source/libs/sync/inc/syncCommit.h
index 7d638a7336..07b4702b1b 100644
--- a/source/libs/sync/inc/syncCommit.h
+++ b/source/libs/sync/inc/syncCommit.h
@@ -48,8 +48,6 @@ extern "C" {
void syncOneReplicaAdvance(SSyncNode* pSyncNode);
void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode);
-bool syncAgreeIndex(SSyncNode* pSyncNode, SRaftId* pRaftId, SyncIndex index);
-bool syncAgree(SSyncNode* pSyncNode, SyncIndex index);
bool syncNodeAgreedUpon(SSyncNode* pNode, SyncIndex index);
int64_t syncNodeUpdateCommitIndex(SSyncNode* ths, SyncIndex commitIndex);
diff --git a/source/libs/sync/inc/syncReplication.h b/source/libs/sync/inc/syncReplication.h
index a55fd7ead3..04456b2454 100644
--- a/source/libs/sync/inc/syncReplication.h
+++ b/source/libs/sync/inc/syncReplication.h
@@ -55,7 +55,6 @@ int32_t syncNodeReplicateReset(SSyncNode* pSyncNode, SRaftId* pDestId);
int32_t syncNodeReplicateWithoutLock(SSyncNode* pNode);
int32_t syncNodeSendAppendEntries(SSyncNode* pNode, const SRaftId* destRaftId, SRpcMsg* pRpcMsg);
-int32_t syncNodeMaybeSendAppendEntries(SSyncNode* pNode, const SRaftId* destRaftId, SRpcMsg* pRpcMsg);
#ifdef __cplusplus
}
diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c
index 9ab545075c..deae4b0b3f 100644
--- a/source/libs/sync/src/syncAppendEntries.c
+++ b/source/libs/sync/src/syncAppendEntries.c
@@ -89,17 +89,6 @@
// /\ UNCHANGED <>
//
-SSyncRaftEntry* syncBuildRaftEntryFromAppendEntries(const SyncAppendEntries* pMsg) {
- SSyncRaftEntry* pEntry = taosMemoryMalloc(pMsg->dataLen);
- if (pEntry == NULL) {
- terrno = TSDB_CODE_OUT_OF_MEMORY;
- return NULL;
- }
- (void)memcpy(pEntry, pMsg->data, pMsg->dataLen);
- ASSERT(pEntry->bytes == pMsg->dataLen);
- return pEntry;
-}
-
int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
SyncAppendEntries* pMsg = pRpcMsg->pCont;
SRpcMsg rpcRsp = {0};
@@ -146,7 +135,7 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
goto _IGNORE;
}
- pEntry = syncBuildRaftEntryFromAppendEntries(pMsg);
+ pEntry = syncEntryBuildFromAppendEntries(pMsg);
if (pEntry == NULL) {
sError("vgId:%d, failed to get raft entry from append entries since %s", ths->vgId, terrstr());
goto _IGNORE;
diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c
index 2501b4df8b..01f1f00c8b 100644
--- a/source/libs/sync/src/syncCommit.c
+++ b/source/libs/sync/src/syncCommit.c
@@ -44,22 +44,6 @@
// /\ UNCHANGED <>
//
-bool syncAgreeIndex(SSyncNode* pSyncNode, SRaftId* pRaftId, SyncIndex index) {
- // I am leader, I agree
- if (syncUtilSameId(pRaftId, &(pSyncNode->myRaftId)) && pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
- return true;
- }
-
- // follower agree
- SyncIndex matchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, pRaftId);
- if (matchIndex >= index) {
- return true;
- }
-
- // not agree
- return false;
-}
-
static inline int64_t syncNodeAbs64(int64_t a, int64_t b) {
ASSERT(a >= 0);
ASSERT(b >= 0);
@@ -85,19 +69,6 @@ bool syncNodeAgreedUpon(SSyncNode* pNode, SyncIndex index) {
return count >= pNode->quorum;
}
-bool syncAgree(SSyncNode* pNode, SyncIndex index) {
- int agreeCount = 0;
- for (int i = 0; i < pNode->replicaNum; ++i) {
- if (syncAgreeIndex(pNode, &(pNode->replicasId[i]), index)) {
- ++agreeCount;
- }
- if (agreeCount >= pNode->quorum) {
- return true;
- }
- }
- return false;
-}
-
int64_t syncNodeUpdateCommitIndex(SSyncNode* ths, SyncIndex commitIndex) {
SyncIndex lastVer = ths->pLogStore->syncLogLastIndex(ths->pLogStore);
commitIndex = TMAX(commitIndex, ths->commitIndex);
diff --git a/source/libs/sync/src/syncRaftEntry.c b/source/libs/sync/src/syncRaftEntry.c
index 3e63e2fb8e..8f42780eb9 100644
--- a/source/libs/sync/src/syncRaftEntry.c
+++ b/source/libs/sync/src/syncRaftEntry.c
@@ -64,10 +64,13 @@ SSyncRaftEntry* syncEntryBuildFromRpcMsg(const SRpcMsg* pMsg, SyncTerm term, Syn
}
SSyncRaftEntry* syncEntryBuildFromAppendEntries(const SyncAppendEntries* pMsg) {
- SSyncRaftEntry* pEntry = syncEntryBuild((int32_t)(pMsg->dataLen));
- if (pEntry == NULL) return NULL;
-
+ SSyncRaftEntry* pEntry = taosMemoryMalloc(pMsg->dataLen);
+ if (pEntry == NULL) {
+ terrno = TSDB_CODE_OUT_OF_MEMORY;
+ return NULL;
+ }
memcpy(pEntry, pMsg->data, pMsg->dataLen);
+ ASSERT(pEntry->bytes == pMsg->dataLen);
return pEntry;
}
diff --git a/source/libs/sync/src/syncReplication.c b/source/libs/sync/src/syncReplication.c
index 8ac9a860e3..2776225a39 100644
--- a/source/libs/sync/src/syncReplication.c
+++ b/source/libs/sync/src/syncReplication.c
@@ -46,8 +46,6 @@
// mdest |-> j])
// /\ UNCHANGED <>
-int32_t syncNodeMaybeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, SRpcMsg* pRpcMsg);
-
int32_t syncNodeReplicateReset(SSyncNode* pNode, SRaftId* pDestId) {
SSyncLogBuffer* pBuf = pNode->pLogBuf;
taosThreadMutexLock(&pBuf->mutex);
@@ -86,20 +84,6 @@ int32_t syncNodeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftI
return 0;
}
-int32_t syncNodeMaybeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, SRpcMsg* pRpcMsg) {
- int32_t ret = 0;
- SyncAppendEntries* pMsg = pRpcMsg->pCont;
-
- if (syncNodeNeedSendAppendEntries(pSyncNode, destRaftId, pMsg)) {
- ret = syncNodeSendAppendEntries(pSyncNode, destRaftId, pRpcMsg);
- } else {
- sNTrace(pSyncNode, "do not repcate to dnode:%d for index:%" PRId64, DID(destRaftId), pMsg->prevLogIndex + 1);
- rpcFreeCont(pRpcMsg->pCont);
- }
-
- return ret;
-}
-
int32_t syncNodeSendHeartbeat(SSyncNode* pSyncNode, const SRaftId* destId, SRpcMsg* pMsg) {
return syncNodeSendMsgById(destId, pSyncNode, pMsg);
}
diff --git a/tests/system-test/7-tmq/subscribeDb3.py b/tests/system-test/7-tmq/subscribeDb3.py
index f82f89f1b9..bddb196f4a 100644
--- a/tests/system-test/7-tmq/subscribeDb3.py
+++ b/tests/system-test/7-tmq/subscribeDb3.py
@@ -226,7 +226,7 @@ class TDTestCase:
event.wait()
tdLog.info("start consume processor")
- pollDelay = 100
+ pollDelay = 20
showMsg = 1
showRow = 1
self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow)