fix:modify offset description to string
This commit is contained in:
parent
5c5e76f894
commit
fe155898fe
|
@ -99,6 +99,7 @@ struct tmq_t {
|
||||||
// poll info
|
// poll info
|
||||||
int64_t pollCnt;
|
int64_t pollCnt;
|
||||||
int64_t totalRows;
|
int64_t totalRows;
|
||||||
|
bool needReportOffsetRows;
|
||||||
|
|
||||||
// timer
|
// timer
|
||||||
tmr_h hbLiveTimer;
|
tmr_h hbLiveTimer;
|
||||||
|
@ -796,20 +797,24 @@ void tmqSendHbReq(void* param, void* tmrId) {
|
||||||
SMqHbReq req = {0};
|
SMqHbReq req = {0};
|
||||||
req.consumerId = tmq->consumerId;
|
req.consumerId = tmq->consumerId;
|
||||||
req.epoch = tmq->epoch;
|
req.epoch = tmq->epoch;
|
||||||
req.topics = taosArrayInit(taosArrayGetSize(tmq->clientTopics), sizeof(TopicOffsetRows));
|
if(tmq->needReportOffsetRows){
|
||||||
for(int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++){
|
req.topics = taosArrayInit(taosArrayGetSize(tmq->clientTopics), sizeof(TopicOffsetRows));
|
||||||
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
|
for(int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++){
|
||||||
int32_t numOfVgroups = taosArrayGetSize(pTopic->vgs);
|
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
|
||||||
TopicOffsetRows* data = taosArrayReserve(req.topics, 1);
|
int32_t numOfVgroups = taosArrayGetSize(pTopic->vgs);
|
||||||
strcpy(data->topicName, pTopic->topicName);
|
TopicOffsetRows* data = taosArrayReserve(req.topics, 1);
|
||||||
data->offsetRows = taosArrayInit(numOfVgroups, sizeof(OffsetRows));
|
strcpy(data->topicName, pTopic->topicName);
|
||||||
for(int j = 0; j < numOfVgroups; j++){
|
data->offsetRows = taosArrayInit(numOfVgroups, sizeof(OffsetRows));
|
||||||
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
|
for(int j = 0; j < numOfVgroups; j++){
|
||||||
OffsetRows* offRows = taosArrayReserve(data->offsetRows, 1);
|
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
|
||||||
offRows->vgId = pVg->vgId;
|
OffsetRows* offRows = taosArrayReserve(data->offsetRows, 1);
|
||||||
offRows->rows = pVg->numOfRows;
|
offRows->vgId = pVg->vgId;
|
||||||
offRows->offset = pVg->offsetInfo.committedOffset;
|
offRows->rows = pVg->numOfRows;
|
||||||
|
offRows->offset = pVg->offsetInfo.committedOffset;
|
||||||
|
tscDebug("report offset: %d", offRows->offset.type);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
tmq->needReportOffsetRows = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tlen = tSerializeSMqHbReq(NULL, 0, &req);
|
int32_t tlen = tSerializeSMqHbReq(NULL, 0, &req);
|
||||||
|
@ -1087,6 +1092,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
|
||||||
pTmq->status = TMQ_CONSUMER_STATUS__INIT;
|
pTmq->status = TMQ_CONSUMER_STATUS__INIT;
|
||||||
pTmq->pollCnt = 0;
|
pTmq->pollCnt = 0;
|
||||||
pTmq->epoch = 0;
|
pTmq->epoch = 0;
|
||||||
|
pTmq->needReportOffsetRows = true;
|
||||||
|
|
||||||
// set conf
|
// set conf
|
||||||
strcpy(pTmq->clientId, conf->clientId);
|
strcpy(pTmq->clientId, conf->clientId);
|
||||||
|
@ -2449,6 +2455,7 @@ int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet) {
|
||||||
// if no more waiting rsp
|
// if no more waiting rsp
|
||||||
pParamSet->callbackFn(tmq, pParamSet->code, pParamSet->userParam);
|
pParamSet->callbackFn(tmq, pParamSet->code, pParamSet->userParam);
|
||||||
taosMemoryFree(pParamSet);
|
taosMemoryFree(pParamSet);
|
||||||
|
tmq->needReportOffsetRows = true;
|
||||||
|
|
||||||
taosReleaseRef(tmqMgmt.rsetId, refId);
|
taosReleaseRef(tmqMgmt.rsetId, refId);
|
||||||
return 0;
|
return 0;
|
||||||
|
|
|
@ -163,7 +163,7 @@ SArray *mmGetMsgHandles() {
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_MND_TMQ_DROP_TOPIC, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_MND_TMQ_DROP_TOPIC, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_MND_TMQ_SUBSCRIBE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_MND_TMQ_SUBSCRIBE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_MND_TMQ_ASK_EP, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_MND_TMQ_ASK_EP, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_MND_TMQ_HB, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_MND_TMQ_HB, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_MND_TMQ_DROP_CGROUP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_MND_TMQ_DROP_CGROUP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_MND_TMQ_DROP_CGROUP_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_MND_TMQ_DROP_CGROUP_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_MND_KILL_TRANS, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_MND_KILL_TRANS, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
|
|
|
@ -431,13 +431,14 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
|
||||||
mDebug("heartbeat report offset rows.%s:%s", pConsumer->cgroup, data->topicName);
|
mDebug("heartbeat report offset rows.%s:%s", pConsumer->cgroup, data->topicName);
|
||||||
|
|
||||||
SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, data->topicName);
|
SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, data->topicName);
|
||||||
taosRLockLatch(&pSub->lock);
|
taosWLockLatch(&pSub->lock);
|
||||||
SMqConsumerEp *pConsumerEp = taosHashGet(pSub->consumerHash, &consumerId, sizeof(int64_t));
|
SMqConsumerEp *pConsumerEp = taosHashGet(pSub->consumerHash, &consumerId, sizeof(int64_t));
|
||||||
if(pConsumerEp){
|
if(pConsumerEp){
|
||||||
|
taosArrayDestroy(pConsumerEp->offsetRows);
|
||||||
pConsumerEp->offsetRows = data->offsetRows;
|
pConsumerEp->offsetRows = data->offsetRows;
|
||||||
data->offsetRows = NULL;
|
data->offsetRows = NULL;
|
||||||
}
|
}
|
||||||
taosRUnLockLatch(&pSub->lock);
|
taosWUnLockLatch(&pSub->lock);
|
||||||
|
|
||||||
mndReleaseSubscribe(pMnode, pSub);
|
mndReleaseSubscribe(pMnode, pSub);
|
||||||
}
|
}
|
||||||
|
|
|
@ -519,6 +519,7 @@ SMqSubscribeObj *tCloneSubscribeObj(const SMqSubscribeObj *pSub) {
|
||||||
taosHashPut(pSubNew->consumerHash, &newEp.consumerId, sizeof(int64_t), &newEp, sizeof(SMqConsumerEp));
|
taosHashPut(pSubNew->consumerHash, &newEp.consumerId, sizeof(int64_t), &newEp, sizeof(SMqConsumerEp));
|
||||||
}
|
}
|
||||||
pSubNew->unassignedVgs = taosArrayDup(pSub->unassignedVgs, (__array_item_dup_fn_t)tCloneSMqVgEp);
|
pSubNew->unassignedVgs = taosArrayDup(pSub->unassignedVgs, (__array_item_dup_fn_t)tCloneSMqVgEp);
|
||||||
|
pSubNew->offsetRows = taosArrayDup(pSub->offsetRows, NULL);
|
||||||
memcpy(pSubNew->dbName, pSub->dbName, TSDB_DB_FNAME_LEN);
|
memcpy(pSubNew->dbName, pSub->dbName, TSDB_DB_FNAME_LEN);
|
||||||
return pSubNew;
|
return pSubNew;
|
||||||
}
|
}
|
||||||
|
|
|
@ -451,24 +451,42 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
|
||||||
taosArrayPush(pOutput->rebVgs, pRebOutput);
|
taosArrayPush(pOutput->rebVgs, pRebOutput);
|
||||||
if(taosHashGetSize(pOutput->pSub->consumerHash) == 0){ // if all consumer is removed
|
if(taosHashGetSize(pOutput->pSub->consumerHash) == 0){ // if all consumer is removed
|
||||||
taosArrayPush(pOutput->pSub->unassignedVgs, &pRebOutput->pVgEp); // put all vg into unassigned
|
taosArrayPush(pOutput->pSub->unassignedVgs, &pRebOutput->pVgEp); // put all vg into unassigned
|
||||||
SMqSubscribeObj *pSub = mndAcquireSubscribeByKey(pMnode, pInput->pRebInfo->key); // put all offset rows
|
}
|
||||||
if(pSub){
|
}
|
||||||
taosRLockLatch(&pSub->lock);
|
|
||||||
if(pOutput->pSub->offsetRows == NULL){
|
if(taosHashGetSize(pOutput->pSub->consumerHash) == 0) { // if all consumer is removed
|
||||||
pOutput->pSub->offsetRows = taosArrayInit(4, sizeof(OffsetRows));
|
SMqSubscribeObj *pSub = mndAcquireSubscribeByKey(pMnode, pInput->pRebInfo->key); // put all offset rows
|
||||||
}else{
|
if (pSub) {
|
||||||
taosArrayClear(pOutput->pSub->offsetRows);
|
taosRLockLatch(&pSub->lock);
|
||||||
}
|
bool init = false;
|
||||||
pIter = NULL;
|
if (pOutput->pSub->offsetRows == NULL) {
|
||||||
while(1){
|
pOutput->pSub->offsetRows = taosArrayInit(4, sizeof(OffsetRows));
|
||||||
pIter = taosHashIterate(pSub->consumerHash, pIter);
|
init = true;
|
||||||
if (pIter == NULL) break;
|
|
||||||
SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
|
|
||||||
taosArrayAddAll(pOutput->pSub->offsetRows, pConsumerEp->offsetRows);
|
|
||||||
}
|
|
||||||
taosRUnLockLatch(&pSub->lock);
|
|
||||||
mndReleaseSubscribe(pMnode, pSub);
|
|
||||||
}
|
}
|
||||||
|
pIter = NULL;
|
||||||
|
while (1) {
|
||||||
|
pIter = taosHashIterate(pSub->consumerHash, pIter);
|
||||||
|
if (pIter == NULL) break;
|
||||||
|
SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
|
||||||
|
if (init) {
|
||||||
|
taosArrayAddAll(pOutput->pSub->offsetRows, pConsumerEp->offsetRows);
|
||||||
|
mDebug("pSub->offsetRows is init");
|
||||||
|
} else {
|
||||||
|
for (int j = 0; j < taosArrayGetSize(pConsumerEp->offsetRows); j++) {
|
||||||
|
OffsetRows *d1 = taosArrayGet(pConsumerEp->offsetRows, j);
|
||||||
|
for (int i = 0; i < taosArrayGetSize(pOutput->pSub->offsetRows); i++) {
|
||||||
|
OffsetRows *d2 = taosArrayGet(pOutput->pSub->offsetRows, i);
|
||||||
|
if (d1->vgId == d2->vgId) {
|
||||||
|
d2->rows += d1->rows;
|
||||||
|
d2->offset = d1->offset;
|
||||||
|
mDebug("pSub->offsetRows add vgId:%d, after:%lld, before:%lld", d2->vgId, d2->rows, d1->rows);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
taosRUnLockLatch(&pSub->lock);
|
||||||
|
mndReleaseSubscribe(pMnode, pSub);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -227,12 +227,11 @@ class TDTestCase:
|
||||||
self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
|
self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
|
||||||
|
|
||||||
tdLog.info("start consume processor")
|
tdLog.info("start consume processor")
|
||||||
pollDelay = 10
|
pollDelay = 20
|
||||||
showMsg = 1
|
showMsg = 1
|
||||||
showRow = 1
|
showRow = 1
|
||||||
self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow)
|
self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow)
|
||||||
|
|
||||||
time.sleep(2)
|
|
||||||
tdLog.info("start show subscriptions 1")
|
tdLog.info("start show subscriptions 1")
|
||||||
while(1):
|
while(1):
|
||||||
tdSql.query("show subscriptions")
|
tdSql.query("show subscriptions")
|
||||||
|
@ -240,10 +239,11 @@ class TDTestCase:
|
||||||
tdLog.info("sleep")
|
tdLog.info("sleep")
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
elif (tdSql.queryResult[0][4] != None):
|
elif (tdSql.queryResult[0][4] != None):
|
||||||
tdSql.checkData(0, 4, "offset(reset to earlieast)")
|
# tdSql.checkData(0, 4, "earliest")
|
||||||
tdSql.checkData(0, 5, 0)
|
tdSql.checkData(0, 5, 0)
|
||||||
break
|
break
|
||||||
|
|
||||||
|
time.sleep(2)
|
||||||
tdLog.info("start insert data")
|
tdLog.info("start insert data")
|
||||||
self.create_ctables(tdSql, parameterDict["dbName"], parameterDict["stbName"], parameterDict["ctbNum"])
|
self.create_ctables(tdSql, parameterDict["dbName"], parameterDict["stbName"], parameterDict["ctbNum"])
|
||||||
self.insert_data(tdSql,\
|
self.insert_data(tdSql,\
|
||||||
|
|
Loading…
Reference in New Issue