diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index a4d7760a7d..26887e2ade 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -99,6 +99,7 @@ struct tmq_t { // poll info int64_t pollCnt; int64_t totalRows; + bool needReportOffsetRows; // timer tmr_h hbLiveTimer; @@ -796,20 +797,24 @@ void tmqSendHbReq(void* param, void* tmrId) { SMqHbReq req = {0}; req.consumerId = tmq->consumerId; req.epoch = tmq->epoch; - req.topics = taosArrayInit(taosArrayGetSize(tmq->clientTopics), sizeof(TopicOffsetRows)); - for(int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++){ - SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i); - int32_t numOfVgroups = taosArrayGetSize(pTopic->vgs); - TopicOffsetRows* data = taosArrayReserve(req.topics, 1); - strcpy(data->topicName, pTopic->topicName); - data->offsetRows = taosArrayInit(numOfVgroups, sizeof(OffsetRows)); - for(int j = 0; j < numOfVgroups; j++){ - SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j); - OffsetRows* offRows = taosArrayReserve(data->offsetRows, 1); - offRows->vgId = pVg->vgId; - offRows->rows = pVg->numOfRows; - offRows->offset = pVg->offsetInfo.committedOffset; + if(tmq->needReportOffsetRows){ + req.topics = taosArrayInit(taosArrayGetSize(tmq->clientTopics), sizeof(TopicOffsetRows)); + for(int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++){ + SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i); + int32_t numOfVgroups = taosArrayGetSize(pTopic->vgs); + TopicOffsetRows* data = taosArrayReserve(req.topics, 1); + strcpy(data->topicName, pTopic->topicName); + data->offsetRows = taosArrayInit(numOfVgroups, sizeof(OffsetRows)); + for(int j = 0; j < numOfVgroups; j++){ + SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j); + OffsetRows* offRows = taosArrayReserve(data->offsetRows, 1); + offRows->vgId = pVg->vgId; + offRows->rows = pVg->numOfRows; + offRows->offset = pVg->offsetInfo.committedOffset; + tscDebug("report offset: %d", offRows->offset.type); + } } + tmq->needReportOffsetRows = false; } int32_t tlen = tSerializeSMqHbReq(NULL, 0, &req); @@ -1087,6 +1092,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { pTmq->status = TMQ_CONSUMER_STATUS__INIT; pTmq->pollCnt = 0; pTmq->epoch = 0; + pTmq->needReportOffsetRows = true; // set conf strcpy(pTmq->clientId, conf->clientId); @@ -2449,6 +2455,7 @@ int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet) { // if no more waiting rsp pParamSet->callbackFn(tmq, pParamSet->code, pParamSet->userParam); taosMemoryFree(pParamSet); + tmq->needReportOffsetRows = true; taosReleaseRef(tmqMgmt.rsetId, refId); return 0; diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index 7f8f6a48fa..d975eb1cd1 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -163,7 +163,7 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_MND_TMQ_DROP_TOPIC, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_TMQ_SUBSCRIBE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_TMQ_ASK_EP, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_TMQ_HB, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_TMQ_HB, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_TMQ_DROP_CGROUP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_TMQ_DROP_CGROUP_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_KILL_TRANS, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index fc1a30ef5d..b5254dbb88 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -431,13 +431,14 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) { mDebug("heartbeat report offset rows.%s:%s", pConsumer->cgroup, data->topicName); SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, data->topicName); - taosRLockLatch(&pSub->lock); + taosWLockLatch(&pSub->lock); SMqConsumerEp *pConsumerEp = taosHashGet(pSub->consumerHash, &consumerId, sizeof(int64_t)); if(pConsumerEp){ + taosArrayDestroy(pConsumerEp->offsetRows); pConsumerEp->offsetRows = data->offsetRows; data->offsetRows = NULL; } - taosRUnLockLatch(&pSub->lock); + taosWUnLockLatch(&pSub->lock); mndReleaseSubscribe(pMnode, pSub); } diff --git a/source/dnode/mnode/impl/src/mndDef.c b/source/dnode/mnode/impl/src/mndDef.c index b6fb96882d..7f79408e8c 100644 --- a/source/dnode/mnode/impl/src/mndDef.c +++ b/source/dnode/mnode/impl/src/mndDef.c @@ -519,6 +519,7 @@ SMqSubscribeObj *tCloneSubscribeObj(const SMqSubscribeObj *pSub) { taosHashPut(pSubNew->consumerHash, &newEp.consumerId, sizeof(int64_t), &newEp, sizeof(SMqConsumerEp)); } pSubNew->unassignedVgs = taosArrayDup(pSub->unassignedVgs, (__array_item_dup_fn_t)tCloneSMqVgEp); + pSubNew->offsetRows = taosArrayDup(pSub->offsetRows, NULL); memcpy(pSubNew->dbName, pSub->dbName, TSDB_DB_FNAME_LEN); return pSubNew; } diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index cadd1f53f0..e100f91aef 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -451,24 +451,42 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR taosArrayPush(pOutput->rebVgs, pRebOutput); if(taosHashGetSize(pOutput->pSub->consumerHash) == 0){ // if all consumer is removed taosArrayPush(pOutput->pSub->unassignedVgs, &pRebOutput->pVgEp); // put all vg into unassigned - SMqSubscribeObj *pSub = mndAcquireSubscribeByKey(pMnode, pInput->pRebInfo->key); // put all offset rows - if(pSub){ - taosRLockLatch(&pSub->lock); - if(pOutput->pSub->offsetRows == NULL){ - pOutput->pSub->offsetRows = taosArrayInit(4, sizeof(OffsetRows)); - }else{ - taosArrayClear(pOutput->pSub->offsetRows); - } - pIter = NULL; - while(1){ - pIter = taosHashIterate(pSub->consumerHash, pIter); - if (pIter == NULL) break; - SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter; - taosArrayAddAll(pOutput->pSub->offsetRows, pConsumerEp->offsetRows); - } - taosRUnLockLatch(&pSub->lock); - mndReleaseSubscribe(pMnode, pSub); + } + } + + if(taosHashGetSize(pOutput->pSub->consumerHash) == 0) { // if all consumer is removed + SMqSubscribeObj *pSub = mndAcquireSubscribeByKey(pMnode, pInput->pRebInfo->key); // put all offset rows + if (pSub) { + taosRLockLatch(&pSub->lock); + bool init = false; + if (pOutput->pSub->offsetRows == NULL) { + pOutput->pSub->offsetRows = taosArrayInit(4, sizeof(OffsetRows)); + init = true; } + pIter = NULL; + while (1) { + pIter = taosHashIterate(pSub->consumerHash, pIter); + if (pIter == NULL) break; + SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter; + if (init) { + taosArrayAddAll(pOutput->pSub->offsetRows, pConsumerEp->offsetRows); + mDebug("pSub->offsetRows is init"); + } else { + for (int j = 0; j < taosArrayGetSize(pConsumerEp->offsetRows); j++) { + OffsetRows *d1 = taosArrayGet(pConsumerEp->offsetRows, j); + for (int i = 0; i < taosArrayGetSize(pOutput->pSub->offsetRows); i++) { + OffsetRows *d2 = taosArrayGet(pOutput->pSub->offsetRows, i); + if (d1->vgId == d2->vgId) { + d2->rows += d1->rows; + d2->offset = d1->offset; + mDebug("pSub->offsetRows add vgId:%d, after:%lld, before:%lld", d2->vgId, d2->rows, d1->rows); + } + } + } + } + } + taosRUnLockLatch(&pSub->lock); + mndReleaseSubscribe(pMnode, pSub); } } diff --git a/tests/system-test/7-tmq/checkOffsetRowParams.py b/tests/system-test/7-tmq/checkOffsetRowParams.py index 8e5ff63f60..17c80c68bf 100644 --- a/tests/system-test/7-tmq/checkOffsetRowParams.py +++ b/tests/system-test/7-tmq/checkOffsetRowParams.py @@ -227,12 +227,11 @@ class TDTestCase: self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) tdLog.info("start consume processor") - pollDelay = 10 + pollDelay = 20 showMsg = 1 showRow = 1 self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow) - time.sleep(2) tdLog.info("start show subscriptions 1") while(1): tdSql.query("show subscriptions") @@ -240,10 +239,11 @@ class TDTestCase: tdLog.info("sleep") time.sleep(1) elif (tdSql.queryResult[0][4] != None): - tdSql.checkData(0, 4, "offset(reset to earlieast)") + # tdSql.checkData(0, 4, "earliest") tdSql.checkData(0, 5, 0) break + time.sleep(2) tdLog.info("start insert data") self.create_ctables(tdSql, parameterDict["dbName"], parameterDict["stbName"], parameterDict["ctbNum"]) self.insert_data(tdSql,\