fix:report current offset in tmq
This commit is contained in:
parent
c5dfe169a6
commit
d3dedbe676
|
@ -810,7 +810,7 @@ void tmqSendHbReq(void* param, void* tmrId) {
|
||||||
OffsetRows* offRows = taosArrayReserve(data->offsetRows, 1);
|
OffsetRows* offRows = taosArrayReserve(data->offsetRows, 1);
|
||||||
offRows->vgId = pVg->vgId;
|
offRows->vgId = pVg->vgId;
|
||||||
offRows->rows = pVg->numOfRows;
|
offRows->rows = pVg->numOfRows;
|
||||||
offRows->offset = pVg->offsetInfo.committedOffset;
|
offRows->offset = pVg->offsetInfo.currentOffset;
|
||||||
char buf[TSDB_OFFSET_LEN] = {0};
|
char buf[TSDB_OFFSET_LEN] = {0};
|
||||||
tFormatOffset(buf, TSDB_OFFSET_LEN, &offRows->offset);
|
tFormatOffset(buf, TSDB_OFFSET_LEN, &offRows->offset);
|
||||||
tscInfo("report offset: vgId:%d, offset:%s, rows:%"PRId64, offRows->vgId, buf, offRows->rows);
|
tscInfo("report offset: vgId:%d, offset:%s, rows:%"PRId64, offRows->vgId, buf, offRows->rows);
|
||||||
|
@ -1463,7 +1463,8 @@ CREATE_MSG_FAIL:
|
||||||
}
|
}
|
||||||
|
|
||||||
typedef struct SVgroupSaveInfo {
|
typedef struct SVgroupSaveInfo {
|
||||||
STqOffsetVal offset;
|
STqOffsetVal currentOffset;
|
||||||
|
STqOffsetVal commitOffset;
|
||||||
int64_t numOfRows;
|
int64_t numOfRows;
|
||||||
} SVgroupSaveInfo;
|
} SVgroupSaveInfo;
|
||||||
|
|
||||||
|
@ -1488,12 +1489,7 @@ static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopic
|
||||||
makeTopicVgroupKey(vgKey, pTopic->topicName, pVgEp->vgId);
|
makeTopicVgroupKey(vgKey, pTopic->topicName, pVgEp->vgId);
|
||||||
SVgroupSaveInfo* pInfo = taosHashGet(pVgOffsetHashMap, vgKey, strlen(vgKey));
|
SVgroupSaveInfo* pInfo = taosHashGet(pVgOffsetHashMap, vgKey, strlen(vgKey));
|
||||||
|
|
||||||
int64_t numOfRows = 0;
|
|
||||||
STqOffsetVal offsetNew = {.type = tmq->resetOffsetCfg};
|
STqOffsetVal offsetNew = {.type = tmq->resetOffsetCfg};
|
||||||
if (pInfo != NULL) {
|
|
||||||
offsetNew = pInfo->offset;
|
|
||||||
numOfRows = pInfo->numOfRows;
|
|
||||||
}
|
|
||||||
|
|
||||||
SMqClientVg clientVg = {
|
SMqClientVg clientVg = {
|
||||||
.pollCnt = 0,
|
.pollCnt = 0,
|
||||||
|
@ -1502,11 +1498,11 @@ static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopic
|
||||||
.vgStatus = TMQ_VG_STATUS__IDLE,
|
.vgStatus = TMQ_VG_STATUS__IDLE,
|
||||||
.vgSkipCnt = 0,
|
.vgSkipCnt = 0,
|
||||||
.emptyBlockReceiveTs = 0,
|
.emptyBlockReceiveTs = 0,
|
||||||
.numOfRows = numOfRows,
|
.numOfRows = pInfo ? pInfo->numOfRows : 0,
|
||||||
};
|
};
|
||||||
|
|
||||||
clientVg.offsetInfo.currentOffset = offsetNew;
|
clientVg.offsetInfo.currentOffset = pInfo ? pInfo->currentOffset : offsetNew;
|
||||||
clientVg.offsetInfo.committedOffset = offsetNew;
|
clientVg.offsetInfo.committedOffset = pInfo ? pInfo->commitOffset : offsetNew;
|
||||||
clientVg.offsetInfo.walVerBegin = -1;
|
clientVg.offsetInfo.walVerBegin = -1;
|
||||||
clientVg.offsetInfo.walVerEnd = -1;
|
clientVg.offsetInfo.walVerEnd = -1;
|
||||||
clientVg.seekUpdated = false;
|
clientVg.seekUpdated = false;
|
||||||
|
@ -1565,7 +1561,7 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp)
|
||||||
tscInfo("consumer:0x%" PRIx64 ", epoch:%d vgId:%d vgKey:%s, offset:%s", tmq->consumerId, epoch, pVgCur->vgId,
|
tscInfo("consumer:0x%" PRIx64 ", epoch:%d vgId:%d vgKey:%s, offset:%s", tmq->consumerId, epoch, pVgCur->vgId,
|
||||||
vgKey, buf);
|
vgKey, buf);
|
||||||
|
|
||||||
SVgroupSaveInfo info = {.offset = pVgCur->offsetInfo.currentOffset, .numOfRows = pVgCur->numOfRows};
|
SVgroupSaveInfo info = {.currentOffset = pVgCur->offsetInfo.currentOffset, .commitOffset = pVgCur->offsetInfo.committedOffset, .numOfRows = pVgCur->numOfRows};
|
||||||
taosHashPut(pVgOffsetHashMap, vgKey, strlen(vgKey), &info, sizeof(SVgroupSaveInfo));
|
taosHashPut(pVgOffsetHashMap, vgKey, strlen(vgKey), &info, sizeof(SVgroupSaveInfo));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue