diff --git a/include/common/tmsg.h b/include/common/tmsg.h index bb2450e8f7..02c097b8d0 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2071,7 +2071,6 @@ static FORCE_INLINE void* tDeserializeSMVSubscribeReq(void* buf, SMVSubscribeReq typedef struct { char key[TSDB_SUBSCRIBE_KEY_LEN]; - SArray* lostConsumers; // SArray SArray* removedConsumers; // SArray SArray* newConsumers; // SArray } SMqRebInfo; @@ -2082,10 +2081,6 @@ static FORCE_INLINE SMqRebInfo* tNewSMqRebSubscribe(const char* key) { return NULL; } tstrncpy(pRebInfo->key, key, TSDB_SUBSCRIBE_KEY_LEN); - pRebInfo->lostConsumers = taosArrayInit(0, sizeof(int64_t)); - if (pRebInfo->lostConsumers == NULL) { - goto _err; - } pRebInfo->removedConsumers = taosArrayInit(0, sizeof(int64_t)); if (pRebInfo->removedConsumers == NULL) { goto _err; @@ -2096,7 +2091,6 @@ static FORCE_INLINE SMqRebInfo* tNewSMqRebSubscribe(const char* key) { } return pRebInfo; _err: - taosArrayDestroy(pRebInfo->lostConsumers); taosArrayDestroy(pRebInfo->removedConsumers); taosArrayDestroy(pRebInfo->newConsumers); taosMemoryFreeClear(pRebInfo); diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 3a4f06f6fa..fcd314d2ae 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -137,12 +137,12 @@ typedef enum { } EDndReason; typedef enum { - CONSUMER_UPDATE__TOUCH = 1, + CONSUMER_UPDATE__TOUCH = 1, // rebalance req do not need change consume topic CONSUMER_UPDATE__ADD, CONSUMER_UPDATE__REMOVE, CONSUMER_UPDATE__LOST, CONSUMER_UPDATE__RECOVER, - CONSUMER_UPDATE__MODIFY, + CONSUMER_UPDATE__MODIFY, // subscribe req need change consume topic } ECsmUpdateType; typedef struct { @@ -624,7 +624,7 @@ typedef struct { SArray* rebVgs; // SArray SArray* newConsumers; // SArray SArray* removedConsumers; // SArray - SArray* touchedConsumers; // SArray + SArray* modifyConsumers; // SArray SMqSubscribeObj* pSub; SMqSubActionLogEntry* pLogEntry; } SMqRebOutputObj; diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 1b146506a2..65a2fa72a2 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -247,7 +247,6 @@ static SMqRebInfo *mndGetOrCreateRebSub(SHashObj *pHash, const char *key) { static void freeRebalanceItem(void *param) { SMqRebInfo *pInfo = param; - taosArrayDestroy(pInfo->lostConsumers); taosArrayDestroy(pInfo->newConsumers); taosArrayDestroy(pInfo->removedConsumers); } @@ -335,7 +334,7 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) { taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId); } taosRUnLockLatch(&pConsumer->lock); - } else if (status == MQ_CONSUMER_STATUS__MODIFY) { + } else { taosRLockLatch(&pConsumer->lock); int32_t newTopicNum = taosArrayGetSize(pConsumer->rebNewTopics); @@ -356,8 +355,6 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) { taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId); } taosRUnLockLatch(&pConsumer->lock); - } else { - // do nothing } mndReleaseConsumer(pMnode, pConsumer); @@ -917,34 +914,22 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, taosWLockLatch(&pOldConsumer->lock); if (pNewConsumer->updateType == CONSUMER_UPDATE__MODIFY) { - /*A(taosArrayGetSize(pOldConsumer->rebNewTopics) == 0);*/ - /*A(taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0);*/ + SArray *tmp = pOldConsumer->rebNewTopics; + pOldConsumer->rebNewTopics = pNewConsumer->rebNewTopics; + pNewConsumer->rebNewTopics = tmp; - // this new consumer has identical topics with one existed consumers. - if (taosArrayGetSize(pNewConsumer->rebNewTopics) == 0 && taosArrayGetSize(pNewConsumer->rebRemovedTopics) == 0) { - pOldConsumer->status = MQ_CONSUMER_STATUS__READY; - } else { - SArray *tmp = pOldConsumer->rebNewTopics; - pOldConsumer->rebNewTopics = pNewConsumer->rebNewTopics; - pNewConsumer->rebNewTopics = tmp; + tmp = pOldConsumer->rebRemovedTopics; + pOldConsumer->rebRemovedTopics = pNewConsumer->rebRemovedTopics; + pNewConsumer->rebRemovedTopics = tmp; - tmp = pOldConsumer->rebRemovedTopics; - pOldConsumer->rebRemovedTopics = pNewConsumer->rebRemovedTopics; - pNewConsumer->rebRemovedTopics = tmp; + tmp = pOldConsumer->assignedTopics; + pOldConsumer->assignedTopics = pNewConsumer->assignedTopics; + pNewConsumer->assignedTopics = tmp; - tmp = pOldConsumer->assignedTopics; - pOldConsumer->assignedTopics = pNewConsumer->assignedTopics; - pNewConsumer->assignedTopics = tmp; - - pOldConsumer->subscribeTime = pNewConsumer->upTime; - pOldConsumer->status = MQ_CONSUMER_STATUS__MODIFY; - } + pOldConsumer->subscribeTime = pNewConsumer->upTime; + pOldConsumer->status = MQ_CONSUMER_STATUS__MODIFY; } else if (pNewConsumer->updateType == CONSUMER_UPDATE__LOST) { - /*A(taosArrayGetSize(pOldConsumer->rebNewTopics) == 0);*/ - /*A(taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0);*/ - int32_t sz = taosArrayGetSize(pOldConsumer->currentTopics); - /*pOldConsumer->rebRemovedTopics = taosArrayInit(sz, sizeof(void *));*/ for (int32_t i = 0; i < sz; i++) { char *topic = taosStrdup(taosArrayGetP(pOldConsumer->currentTopics, i)); taosArrayPush(pOldConsumer->rebRemovedTopics, &topic); @@ -958,9 +943,6 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, pOldConsumer->consumerId, mndConsumerStatusName(status), mndConsumerStatusName(pOldConsumer->status), pOldConsumer->rebalanceTime, (int)taosArrayGetSize(pOldConsumer->rebRemovedTopics)); } else if (pNewConsumer->updateType == CONSUMER_UPDATE__RECOVER) { - /*A(taosArrayGetSize(pOldConsumer->currentTopics) == 0);*/ - /*A(taosArrayGetSize(pOldConsumer->rebNewTopics) == 0);*/ - int32_t sz = taosArrayGetSize(pOldConsumer->assignedTopics); for (int32_t i = 0; i < sz; i++) { char *topic = taosStrdup(taosArrayGetP(pOldConsumer->assignedTopics, i)); @@ -976,7 +958,6 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, pOldConsumer->rebalanceTime = pNewConsumer->upTime; } else if (pNewConsumer->updateType == CONSUMER_UPDATE__ADD) { - ASSERT(taosArrayGetSize(pNewConsumer->rebNewTopics) == 1 && taosArrayGetSize(pNewConsumer->rebRemovedTopics) == 0); char *pNewTopic = taosStrdup(taosArrayGetP(pNewConsumer->rebNewTopics, 0)); // not exist in current topic @@ -1015,15 +996,7 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, (int)taosArrayGetSize(pOldConsumer->rebRemovedTopics)); } else if (pNewConsumer->updateType == CONSUMER_UPDATE__REMOVE) { - /*A(taosArrayGetSize(pNewConsumer->rebNewTopics) == 0);*/ - /*A(taosArrayGetSize(pNewConsumer->rebRemovedTopics) == 1);*/ char *removedTopic = taosArrayGetP(pNewConsumer->rebRemovedTopics, 0); -#if 0 - for (int32_t i = 0; i < taosArrayGetSize(pOldConsumer->rebNewTopics); i++) { - char *topic = taosArrayGetP(pOldConsumer->rebNewTopics, i); - A(strcmp(topic, removedTopic) != 0); - } -#endif // remove from removed topic removeFromRemoveTopicList(pOldConsumer, removedTopic); diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 49921c9a1a..015c497de1 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -269,7 +269,7 @@ static void addUnassignedVgroups(SMqRebOutputObj *pOutput, SHashObj *pHash) { }; taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &rebOutput, sizeof(SMqRebOutputVg)); - mInfo("sub:%s mq re-balance remove vgId:%d from unassigned", pSubKey, pVgEp->vgId); + mInfo("sub:%s mq re-balance addUnassignedVgroups vgId:%d from unassigned", pSubKey, pVgEp->vgId); } } @@ -289,9 +289,9 @@ static void transferVgroupsForConsumers(SMqRebOutputObj *pOutput, SHashObj *pHas SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter; int32_t consumerVgNum = taosArrayGetSize(pConsumerEp->vgs); - // all old consumers still existing are touched - // TODO optimize: touch only consumer whose vgs changed - taosArrayPush(pOutput->touchedConsumers, &pConsumerEp->consumerId); + // all old consumers still existing need to be modified + // TODO optimize: modify only consumer whose vgs changed + taosArrayPush(pOutput->modifyConsumers, &pConsumerEp->consumerId); if (consumerVgNum > minVgCnt) { if (imbCnt < imbConsumerNum) { if (consumerVgNum == minVgCnt + 1) { @@ -339,13 +339,13 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR mInfo("sub:%s mq re-balance %d vgroups, existed consumers:%d, added:%d, removed:%d", pSubKey, totalVgNum, pInput->oldConsumerNum, numOfAdded, numOfRemoved); - // 1. build temporary hash(vgId -> SMqRebOutputVg) to store modified vg + // 1. build temporary hash(vgId -> SMqRebOutputVg) to store vg that need to be assigned SHashObj *pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); - // 2. check and get actual removed consumers, put their vg into hash + // 2. check and get actual removed consumers, put their vg into pHash doRemoveExistedConsumers(pOutput, pHash, pInput); - // 3. if previously no consumer, there are vgs not assigned + // 3. if previously no consumer, there are vgs not assigned, put these vg into pHash addUnassignedVgroups(pOutput, pHash); // 4. calc vg number of each consumer @@ -364,19 +364,17 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR mInfo("sub:%s no consumer subscribe this topic", pSubKey); } - // 5. first scan: remove vgroups from te consumers, who have more vgroups than the threashold value that is - // minVgCnt, and then put them into the recycled hash list + // 5. remove vgroups from consumers who have more vgroups than the threshold value(minVgCnt or minVgCnt + 1), and then another vg into pHash transferVgroupsForConsumers(pOutput, pHash, minVgCnt, imbConsumerNum); // 6. add new consumer into sub doAddNewConsumers(pOutput, pInput); - // 7. second scan: find consumer do not have enough vgroups, extract from temporary hash and assign to them - // All related vg should be put into rebVgs SMqRebOutputVg *pRebVg = NULL; void *pRemovedIter = NULL; void *pIter = NULL; + // 7. extract bgroups from pHash and assign to consumers that do not have enough vgroups while (1) { pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter); if (pIter == NULL) { @@ -390,68 +388,52 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR // iter hash and find one vg pRemovedIter = taosHashIterate(pHash, pRemovedIter); if (pRemovedIter == NULL) { - mError("sub:%s removed iter is null", pSubKey); + mError("sub:%s removed iter is null, never can reach hear", pSubKey); break; } pRebVg = (SMqRebOutputVg *)pRemovedIter; - // push - taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp); pRebVg->newConsumerId = pConsumerEp->consumerId; - taosArrayPush(pOutput->rebVgs, pRebVg); - mInfo("mq rebalance: add vgId:%d to consumer:0x%" PRIx64 " (second scan) (not enough)", pRebVg->pVgEp->vgId, - pConsumerEp->consumerId); + taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp); + mInfo("mq rebalance: add vgId:%d to consumer:0x%" PRIx64 " for average", pRebVg->pVgEp->vgId, pConsumerEp->consumerId); } } - // 7. handle unassigned vg - if (taosHashGetSize(pOutput->pSub->consumerHash) != 0) { - // if has consumer, assign all left vg - while (1) { - SMqConsumerEp *pConsumerEp = NULL; + while (1) { + pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter); + if (pIter == NULL) { + break; + } + SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter; + + if (taosArrayGetSize(pConsumerEp->vgs) == minVgCnt) { pRemovedIter = taosHashIterate(pHash, pRemovedIter); if (pRemovedIter == NULL) { - if (pIter != NULL) { - taosHashCancelIterate(pOutput->pSub->consumerHash, pIter); - pIter = NULL; - } + mInfo("sub:%s removed iter is null", pSubKey); break; } - while (1) { - pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter); - pConsumerEp = (SMqConsumerEp *)pIter; - if (taosArrayGetSize(pConsumerEp->vgs) == minVgCnt) { - break; - } - } pRebVg = (SMqRebOutputVg *)pRemovedIter; - taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp); pRebVg->newConsumerId = pConsumerEp->consumerId; - if (pRebVg->newConsumerId == pRebVg->oldConsumerId) { - mInfo("mq rebalance: skip vg %d for same consumer:0x%" PRIx64 " (second scan)", pRebVg->pVgEp->vgId, - pConsumerEp->consumerId); - continue; - } - taosArrayPush(pOutput->rebVgs, pRebVg); - mInfo("mq rebalance: add vgId:%d to consumer:0x%" PRIx64 " (second scan) (unassigned)", pRebVg->pVgEp->vgId, - pConsumerEp->consumerId); + taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp); + mInfo("mq rebalance: add vgId:%d to consumer:0x%" PRIx64 " for average + 1", pRebVg->pVgEp->vgId, pConsumerEp->consumerId); } - } else { - // if all consumer is removed, put all vg into unassigned - pIter = NULL; - SMqRebOutputVg *pRebOutput = NULL; - while (1) { - pIter = taosHashIterate(pHash, pIter); - if (pIter == NULL) { - break; - } + } - pRebOutput = (SMqRebOutputVg *)pIter; + // All assigned vg should be put into pOutput->rebVgs + if(pRemovedIter != NULL){ + mError("sub:%s error pRemovedIter should be NULL", pSubKey); + } + while (1) { + pRemovedIter = taosHashIterate(pHash, pRemovedIter); + if (pRemovedIter == NULL) { + break; + } + SMqRebOutputVg* pRebOutput = (SMqRebOutputVg *)pRemovedIter; + taosArrayPush(pOutput->rebVgs, pRebOutput); + if(taosHashGetSize(pOutput->pSub->consumerHash) == 0){ // if all consumer is removed, put all vg into unassigned taosArrayPush(pOutput->pSub->unassignedVgs, &pRebOutput->pVgEp); - taosArrayPush(pOutput->rebVgs, pRebOutput); - mInfo("sub:%s mq re-balance unassign vgId:%d (second scan)", pSubKey, pRebOutput->pVgEp->vgId); } } @@ -462,19 +444,18 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR mInfo("sub:%s mq re-balance vgId:%d, moved from consumer:0x%" PRIx64 ", to consumer:0x%" PRIx64, pSubKey, pOutputRebVg->pVgEp->vgId, pOutputRebVg->oldConsumerId, pOutputRebVg->newConsumerId); } - { - pIter = NULL; - while (1) { - pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter); - if (pIter == NULL) break; - SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter; - int32_t sz = taosArrayGetSize(pConsumerEp->vgs); - mInfo("sub:%s mq re-balance final cfg: consumer:0x%" PRIx64 " has %d vg", pSubKey, pConsumerEp->consumerId, sz); - for (int32_t i = 0; i < sz; i++) { - SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, i); - mInfo("sub:%s mq re-balance final cfg: vg %d to consumer:0x%" PRIx64, pSubKey, pVgEp->vgId, - pConsumerEp->consumerId); - } + + pIter = NULL; + while (1) { + pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter); + if (pIter == NULL) break; + SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter; + int32_t sz = taosArrayGetSize(pConsumerEp->vgs); + mInfo("sub:%s mq re-balance final cfg: consumer:0x%" PRIx64 " has %d vg", pSubKey, pConsumerEp->consumerId, sz); + for (int32_t i = 0; i < sz; i++) { + SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, i); + mInfo("sub:%s mq re-balance final cfg: vg %d to consumer:0x%" PRIx64, pSubKey, pVgEp->vgId, + pConsumerEp->consumerId); } } @@ -515,9 +496,9 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu // 3. commit log: consumer to update status and epoch // 3.1 set touched consumer - int32_t consumerNum = taosArrayGetSize(pOutput->touchedConsumers); + int32_t consumerNum = taosArrayGetSize(pOutput->modifyConsumers); for (int32_t i = 0; i < consumerNum; i++) { - int64_t consumerId = *(int64_t *)taosArrayGet(pOutput->touchedConsumers, i); + int64_t consumerId = *(int64_t *)taosArrayGet(pOutput->modifyConsumers, i); SMqConsumerObj *pConsumerOld = mndAcquireConsumer(pMnode, consumerId); SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumerOld->consumerId, pConsumerOld->cgroup); pConsumerNew->updateType = CONSUMER_UPDATE__TOUCH; @@ -597,15 +578,15 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) { SMnode *pMnode = pMsg->info.node; SMqDoRebalanceMsg *pReq = pMsg->pCont; void *pIter = NULL; - bool rebalanceOnce = false; // to ensure only once. +// bool rebalanceOnce = false; // to ensure only once. mInfo("mq re-balance start, total required re-balanced trans:%d", taosHashGetSize(pReq->rebSubHash)); // here we only handle one topic rebalance requirement to ensure the atomic execution of this transaction. while (1) { - if (rebalanceOnce) { - break; - } +// if (rebalanceOnce) { +// break; +// } pIter = taosHashIterate(pReq->rebSubHash, pIter); if (pIter == NULL) { @@ -617,7 +598,7 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) { SMqRebOutputObj rebOutput = {0}; rebOutput.newConsumers = taosArrayInit(0, sizeof(int64_t)); rebOutput.removedConsumers = taosArrayInit(0, sizeof(int64_t)); - rebOutput.touchedConsumers = taosArrayInit(0, sizeof(int64_t)); + rebOutput.modifyConsumers = taosArrayInit(0, sizeof(int64_t)); rebOutput.rebVgs = taosArrayInit(0, sizeof(SMqRebOutputVg)); SMqRebInfo *pRebInfo = (SMqRebInfo *)pIter; @@ -653,13 +634,13 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) { mndReleaseTopic(pMnode, pTopic); rebInput.oldConsumerNum = 0; - mInfo("topic:%s has no consumers sub yet", topic); + mInfo("sub topic:%s has no consumers sub yet", pRebInfo->key); } else { taosRLockLatch(&pSub->lock); rebInput.oldConsumerNum = taosHashGetSize(pSub->consumerHash); rebOutput.pSub = tCloneSubscribeObj(pSub); taosRUnLockLatch(&pSub->lock); - mInfo("topic:%s has %d consumers sub till now", pRebInfo->key, rebInput.oldConsumerNum); + mInfo("sub topic:%s has %d consumers sub till now", pRebInfo->key, rebInput.oldConsumerNum); mndReleaseSubscribe(pMnode, pSub); } @@ -675,13 +656,14 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) { } taosArrayDestroy(rebOutput.newConsumers); - taosArrayDestroy(rebOutput.touchedConsumers); + taosArrayDestroy(rebOutput.modifyConsumers); taosArrayDestroy(rebOutput.removedConsumers); taosArrayDestroy(rebOutput.rebVgs); tDeleteSubscribeObj(rebOutput.pSub); taosMemoryFree(rebOutput.pSub); - rebalanceOnce = true; +// taosSsleep(100); +// rebalanceOnce = true; } // reset flag diff --git a/source/dnode/mnode/sdb/src/sdbHash.c b/source/dnode/mnode/sdb/src/sdbHash.c index 569c78a68c..f1cee6395b 100644 --- a/source/dnode/mnode/sdb/src/sdbHash.c +++ b/source/dnode/mnode/sdb/src/sdbHash.c @@ -196,13 +196,13 @@ static int32_t sdbUpdateRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow * SSdbRow *pOldRow = *ppOldRow; pOldRow->status = pRaw->status; sdbPrintOper(pSdb, pOldRow, "update"); - sdbUnLock(pSdb, type); int32_t code = 0; SdbUpdateFp updateFp = pSdb->updateFps[type]; if (updateFp != NULL) { code = (*updateFp)(pSdb, pOldRow->pObj, pNewRow->pObj); } + sdbUnLock(pSdb, type); // sdbUnLock(pSdb, type); sdbFreeRow(pSdb, pNewRow, false); diff --git a/tests/system-test/7-tmq/subscribeDb3.py b/tests/system-test/7-tmq/subscribeDb3.py index f82f89f1b9..bddb196f4a 100644 --- a/tests/system-test/7-tmq/subscribeDb3.py +++ b/tests/system-test/7-tmq/subscribeDb3.py @@ -226,7 +226,7 @@ class TDTestCase: event.wait() tdLog.info("start consume processor") - pollDelay = 100 + pollDelay = 20 showMsg = 1 showRow = 1 self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow)