fix(tmq): adjust log.
This commit is contained in:
parent
49df168a76
commit
fad23f8cb0
|
@ -1377,6 +1377,11 @@ CREATE_MSG_FAIL:
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
typedef struct SVgroupSaveInfo {
|
||||||
|
STqOffsetVal offset;
|
||||||
|
int64_t numOfRows;
|
||||||
|
} SVgroupSaveInfo;
|
||||||
|
|
||||||
static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopicEp, SHashObj* pVgOffsetHashMap,
|
static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopicEp, SHashObj* pVgOffsetHashMap,
|
||||||
tmq_t* tmq) {
|
tmq_t* tmq) {
|
||||||
pTopic->schema = pTopicEp->schema;
|
pTopic->schema = pTopicEp->schema;
|
||||||
|
@ -1396,11 +1401,13 @@ static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopic
|
||||||
SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j);
|
SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j);
|
||||||
|
|
||||||
makeTopicVgroupKey(vgKey, pTopic->topicName, pVgEp->vgId);
|
makeTopicVgroupKey(vgKey, pTopic->topicName, pVgEp->vgId);
|
||||||
STqOffsetVal* pOffset = taosHashGet(pVgOffsetHashMap, vgKey, strlen(vgKey));
|
SVgroupSaveInfo* pInfo = taosHashGet(pVgOffsetHashMap, vgKey, strlen(vgKey));
|
||||||
|
|
||||||
|
int64_t numOfRows = 0;
|
||||||
STqOffsetVal offsetNew = {.type = tmq->resetOffsetCfg};
|
STqOffsetVal offsetNew = {.type = tmq->resetOffsetCfg};
|
||||||
if (pOffset != NULL) {
|
if (pInfo != NULL) {
|
||||||
offsetNew = *pOffset;
|
offsetNew = pInfo->offset;
|
||||||
|
numOfRows = pInfo->numOfRows;
|
||||||
}
|
}
|
||||||
|
|
||||||
SMqClientVg clientVg = {
|
SMqClientVg clientVg = {
|
||||||
|
@ -1411,7 +1418,7 @@ static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopic
|
||||||
.vgStatus = TMQ_VG_STATUS__IDLE,
|
.vgStatus = TMQ_VG_STATUS__IDLE,
|
||||||
.vgSkipCnt = 0,
|
.vgSkipCnt = 0,
|
||||||
.emptyBlockReceiveTs = 0,
|
.emptyBlockReceiveTs = 0,
|
||||||
.numOfRows = 0,
|
.numOfRows = numOfRows,
|
||||||
};
|
};
|
||||||
|
|
||||||
taosArrayPush(pTopic->vgs, &clientVg);
|
taosArrayPush(pTopic->vgs, &clientVg);
|
||||||
|
@ -1463,7 +1470,9 @@ static bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) {
|
||||||
tFormatOffset(buf, 80, &pVgCur->currentOffset);
|
tFormatOffset(buf, 80, &pVgCur->currentOffset);
|
||||||
tscDebug("consumer:0x%" PRIx64 ", epoch:%d vgId:%d vgKey:%s, offset:%s", tmq->consumerId, epoch,
|
tscDebug("consumer:0x%" PRIx64 ", epoch:%d vgId:%d vgKey:%s, offset:%s", tmq->consumerId, epoch,
|
||||||
pVgCur->vgId, vgKey, buf);
|
pVgCur->vgId, vgKey, buf);
|
||||||
taosHashPut(pVgOffsetHashMap, vgKey, strlen(vgKey), &pVgCur->currentOffset, sizeof(STqOffsetVal));
|
|
||||||
|
SVgroupSaveInfo info = {.offset = pVgCur->currentOffset, .numOfRows = pVgCur->numOfRows};
|
||||||
|
taosHashPut(pVgOffsetHashMap, vgKey, strlen(vgKey), &info, sizeof(SVgroupSaveInfo));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1835,19 +1844,19 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
|
||||||
char buf[80];
|
char buf[80];
|
||||||
tFormatOffset(buf, 80, &pDataRsp->rspOffset);
|
tFormatOffset(buf, 80, &pDataRsp->rspOffset);
|
||||||
if (pDataRsp->blockNum == 0) {
|
if (pDataRsp->blockNum == 0) {
|
||||||
tscDebug("consumer:0x%" PRIx64 " empty block received, vgId:%d, offset:%s, vg total:%"PRId64" reqId:0x%" PRIx64, tmq->consumerId,
|
tscDebug("consumer:0x%" PRIx64 " empty block received, vgId:%d, offset:%s, vg total:%"PRId64" total:%"PRId64" reqId:0x%" PRIx64, tmq->consumerId,
|
||||||
pVg->vgId, buf, pVg->numOfRows, pollRspWrapper->reqId);
|
pVg->vgId, buf, pVg->numOfRows, tmq->totalRows, pollRspWrapper->reqId);
|
||||||
pRspWrapper = tmqFreeRspWrapper(pRspWrapper);
|
pRspWrapper = tmqFreeRspWrapper(pRspWrapper);
|
||||||
taosFreeQitem(pollRspWrapper);
|
taosFreeQitem(pollRspWrapper);
|
||||||
} else { // build rsp
|
} else { // build rsp
|
||||||
int64_t numOfRows = 0;
|
int64_t numOfRows = 0;
|
||||||
SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper, pVg, &numOfRows);
|
SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper, pVg, &numOfRows);
|
||||||
tscDebug("consumer:0x%" PRIx64 " process poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%" PRId64
|
|
||||||
", vg total:%" PRId64 " reqId:0x%" PRIx64,
|
|
||||||
tmq->consumerId, pVg->vgId, buf, pDataRsp->blockNum, numOfRows, pVg->numOfRows,
|
|
||||||
pollRspWrapper->reqId);
|
|
||||||
|
|
||||||
tmq->totalRows += numOfRows;
|
tmq->totalRows += numOfRows;
|
||||||
|
|
||||||
|
tscDebug("consumer:0x%" PRIx64 " process poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%" PRId64
|
||||||
|
" vg total:%" PRId64 " total:%" PRId64 ", reqId:0x%" PRIx64,
|
||||||
|
tmq->consumerId, pVg->vgId, buf, pDataRsp->blockNum, numOfRows, tmq->totalRows, pVg->numOfRows,
|
||||||
|
pollRspWrapper->reqId);
|
||||||
taosFreeQitem(pollRspWrapper);
|
taosFreeQitem(pollRspWrapper);
|
||||||
return pRsp;
|
return pRsp;
|
||||||
}
|
}
|
||||||
|
@ -1911,9 +1920,9 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
|
||||||
char buf[80];
|
char buf[80];
|
||||||
tFormatOffset(buf, 80, &pVg->currentOffset);
|
tFormatOffset(buf, 80, &pVg->currentOffset);
|
||||||
tscDebug("consumer:0x%" PRIx64 " process taosx poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%" PRId64
|
tscDebug("consumer:0x%" PRIx64 " process taosx poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%" PRId64
|
||||||
", vg total:%" PRId64 " reqId:0x%" PRIx64,
|
", vg total:%" PRId64 " total:%"PRId64" reqId:0x%" PRIx64,
|
||||||
tmq->consumerId, pVg->vgId, buf, pollRspWrapper->dataRsp.blockNum, numOfRows, pVg->numOfRows,
|
tmq->consumerId, pVg->vgId, buf, pollRspWrapper->dataRsp.blockNum, numOfRows, pVg->numOfRows,
|
||||||
pollRspWrapper->reqId);
|
tmq->totalRows, pollRspWrapper->reqId);
|
||||||
|
|
||||||
taosFreeQitem(pollRspWrapper);
|
taosFreeQitem(pollRspWrapper);
|
||||||
return pRsp;
|
return pRsp;
|
||||||
|
|
Loading…
Reference in New Issue