fix(tmq): adjust the total rows counting.

This commit is contained in:
Haojun Liao 2023-03-19 11:39:31 +08:00
parent ae5155edcf
commit e5e4f507c2
1 changed files with 11 additions and 6 deletions

View File

@ -1604,10 +1604,11 @@ SMqMetaRspObj* tmqBuildMetaRspFromWrapper(SMqPollRspWrapper* pWrapper) {
return pRspObj;
}
SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg) {
SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, int64_t* numOfRows) {
SMqRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqRspObj));
pRspObj->resType = RES_TYPE__TMQ;
(*numOfRows) = 0;
tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
@ -1626,8 +1627,8 @@ SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg)
for(int32_t i = 0; i < pRspObj->rsp.blockNum; ++i) {
SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)taosArrayGetP(pRspObj->rsp.blockData, i);
int64_t rows = htobe64(pRetrieve->numOfRows);
pRspObj->resInfo.totalRows += rows;
pVg->numOfRows += rows;
(*numOfRows) += rows;
}
return pRspObj;
@ -1839,11 +1840,12 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
pRspWrapper = tmqFreeRspWrapper(pRspWrapper);
taosFreeQitem(pollRspWrapper);
} else { // build rsp
SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper, pVg);
int64_t numOfRows = 0;
SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper, pVg, &numOfRows);
tscDebug("consumer:0x%" PRIx64 " process poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%"PRId64" reqId:0x%" PRIx64,
tmq->consumerId, pVg->vgId, buf, pDataRsp->blockNum, pRsp->resInfo.totalRows, pollRspWrapper->reqId);
tmq->consumerId, pVg->vgId, buf, pDataRsp->blockNum, numOfRows, pollRspWrapper->reqId);
tmq->totalRows += pRsp->resInfo.totalRows;
tmq->totalRows += numOfRows;
taosFreeQitem(pollRspWrapper);
return pRsp;
}
@ -1895,12 +1897,15 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
// build rsp
void* pRsp = NULL;
int64_t numOfRows = 0;
if (pollRspWrapper->taosxRsp.createTableNum == 0) {
pRsp = tmqBuildRspFromWrapper(pollRspWrapper, pVg);
pRsp = tmqBuildRspFromWrapper(pollRspWrapper, pVg, &numOfRows);
} else {
pRsp = tmqBuildTaosxRspFromWrapper(pollRspWrapper);
}
tmq->totalRows += numOfRows;
char buf[80];
tFormatOffset(buf, 80, &pVg->currentOffset);
tscDebug("consumer:0x%" PRIx64 " process taosx poll rsp, vgId:%d, offset:%s, blocks:%d, reqId:0x%"PRIx64, tmq->consumerId, pVg->vgId,