Merge pull request #21827 from taosdata/feature/TS-3495
fear:fix error for report offset & rows in tmq
This commit is contained in:
commit
01c0bbb3f8
|
@ -214,7 +214,7 @@ int32_t walSkipFetchBody(SWalReader *pRead, const SWalCkHead *pHead);
|
|||
|
||||
void walRefFirstVer(SWal *, SWalRef *);
|
||||
void walRefLastVer(SWal *, SWalRef *);
|
||||
void walRefCommitVer(SWal *, SWalRef *);
|
||||
//void walRefCommitVer(SWal *, SWalRef *);
|
||||
|
||||
SWalRef *walOpenRef(SWal *);
|
||||
void walCloseRef(SWal *pWal, int64_t refId);
|
||||
|
|
|
@ -99,7 +99,7 @@ struct tmq_t {
|
|||
// poll info
|
||||
int64_t pollCnt;
|
||||
int64_t totalRows;
|
||||
bool needReportOffsetRows;
|
||||
// bool needReportOffsetRows;
|
||||
|
||||
// timer
|
||||
tmr_h hbLiveTimer;
|
||||
|
@ -797,7 +797,7 @@ void tmqSendHbReq(void* param, void* tmrId) {
|
|||
SMqHbReq req = {0};
|
||||
req.consumerId = tmq->consumerId;
|
||||
req.epoch = tmq->epoch;
|
||||
if(tmq->needReportOffsetRows){
|
||||
// if(tmq->needReportOffsetRows){
|
||||
req.topics = taosArrayInit(taosArrayGetSize(tmq->clientTopics), sizeof(TopicOffsetRows));
|
||||
for(int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++){
|
||||
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
|
||||
|
@ -810,14 +810,14 @@ void tmqSendHbReq(void* param, void* tmrId) {
|
|||
OffsetRows* offRows = taosArrayReserve(data->offsetRows, 1);
|
||||
offRows->vgId = pVg->vgId;
|
||||
offRows->rows = pVg->numOfRows;
|
||||
offRows->offset = pVg->offsetInfo.committedOffset;
|
||||
offRows->offset = pVg->offsetInfo.currentOffset;
|
||||
char buf[TSDB_OFFSET_LEN] = {0};
|
||||
tFormatOffset(buf, TSDB_OFFSET_LEN, &offRows->offset);
|
||||
tscInfo("report offset: vgId:%d, offset:%s, rows:%"PRId64, offRows->vgId, buf, offRows->rows);
|
||||
}
|
||||
}
|
||||
tmq->needReportOffsetRows = false;
|
||||
}
|
||||
// tmq->needReportOffsetRows = false;
|
||||
// }
|
||||
|
||||
int32_t tlen = tSerializeSMqHbReq(NULL, 0, &req);
|
||||
if (tlen < 0) {
|
||||
|
@ -1094,7 +1094,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
|
|||
pTmq->status = TMQ_CONSUMER_STATUS__INIT;
|
||||
pTmq->pollCnt = 0;
|
||||
pTmq->epoch = 0;
|
||||
pTmq->needReportOffsetRows = true;
|
||||
// pTmq->needReportOffsetRows = true;
|
||||
|
||||
// set conf
|
||||
strcpy(pTmq->clientId, conf->clientId);
|
||||
|
@ -1463,7 +1463,8 @@ CREATE_MSG_FAIL:
|
|||
}
|
||||
|
||||
typedef struct SVgroupSaveInfo {
|
||||
STqOffsetVal offset;
|
||||
STqOffsetVal currentOffset;
|
||||
STqOffsetVal commitOffset;
|
||||
int64_t numOfRows;
|
||||
} SVgroupSaveInfo;
|
||||
|
||||
|
@ -1488,12 +1489,7 @@ static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopic
|
|||
makeTopicVgroupKey(vgKey, pTopic->topicName, pVgEp->vgId);
|
||||
SVgroupSaveInfo* pInfo = taosHashGet(pVgOffsetHashMap, vgKey, strlen(vgKey));
|
||||
|
||||
int64_t numOfRows = 0;
|
||||
STqOffsetVal offsetNew = {.type = tmq->resetOffsetCfg};
|
||||
if (pInfo != NULL) {
|
||||
offsetNew = pInfo->offset;
|
||||
numOfRows = pInfo->numOfRows;
|
||||
}
|
||||
|
||||
SMqClientVg clientVg = {
|
||||
.pollCnt = 0,
|
||||
|
@ -1502,11 +1498,11 @@ static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopic
|
|||
.vgStatus = TMQ_VG_STATUS__IDLE,
|
||||
.vgSkipCnt = 0,
|
||||
.emptyBlockReceiveTs = 0,
|
||||
.numOfRows = numOfRows,
|
||||
.numOfRows = pInfo ? pInfo->numOfRows : 0,
|
||||
};
|
||||
|
||||
clientVg.offsetInfo.currentOffset = offsetNew;
|
||||
clientVg.offsetInfo.committedOffset = offsetNew;
|
||||
clientVg.offsetInfo.currentOffset = pInfo ? pInfo->currentOffset : offsetNew;
|
||||
clientVg.offsetInfo.committedOffset = pInfo ? pInfo->commitOffset : offsetNew;
|
||||
clientVg.offsetInfo.walVerBegin = -1;
|
||||
clientVg.offsetInfo.walVerEnd = -1;
|
||||
clientVg.seekUpdated = false;
|
||||
|
@ -1565,7 +1561,7 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp)
|
|||
tscInfo("consumer:0x%" PRIx64 ", epoch:%d vgId:%d vgKey:%s, offset:%s", tmq->consumerId, epoch, pVgCur->vgId,
|
||||
vgKey, buf);
|
||||
|
||||
SVgroupSaveInfo info = {.offset = pVgCur->offsetInfo.currentOffset, .numOfRows = pVgCur->numOfRows};
|
||||
SVgroupSaveInfo info = {.currentOffset = pVgCur->offsetInfo.currentOffset, .commitOffset = pVgCur->offsetInfo.committedOffset, .numOfRows = pVgCur->numOfRows};
|
||||
taosHashPut(pVgOffsetHashMap, vgKey, strlen(vgKey), &info, sizeof(SVgroupSaveInfo));
|
||||
}
|
||||
}
|
||||
|
@ -1930,7 +1926,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
|
|||
tFormatOffset(buf, TSDB_OFFSET_LEN, &pDataRsp->rspOffset);
|
||||
if (pDataRsp->blockNum == 0) {
|
||||
tscDebug("consumer:0x%" PRIx64 " empty block received, vgId:%d, offset:%s, vg total:%" PRId64
|
||||
" total:%" PRId64 " reqId:0x%" PRIx64,
|
||||
", total:%" PRId64 ", reqId:0x%" PRIx64,
|
||||
tmq->consumerId, pVg->vgId, buf, pVg->numOfRows, tmq->totalRows, pollRspWrapper->reqId);
|
||||
pRspWrapper = tmqFreeRspWrapper(pRspWrapper);
|
||||
pVg->emptyBlockReceiveTs = taosGetTimestampMs();
|
||||
|
@ -1941,7 +1937,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
|
|||
tmq->totalRows += numOfRows;
|
||||
pVg->emptyBlockReceiveTs = 0;
|
||||
tscDebug("consumer:0x%" PRIx64 " process poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%" PRId64
|
||||
" vg total:%" PRId64 " total:%" PRId64 ", reqId:0x%" PRIx64,
|
||||
", vg total:%" PRId64 ", total:%" PRId64 ", reqId:0x%" PRIx64,
|
||||
tmq->consumerId, pVg->vgId, buf, pDataRsp->blockNum, numOfRows, pVg->numOfRows, tmq->totalRows,
|
||||
pollRspWrapper->reqId);
|
||||
taosFreeQitem(pollRspWrapper);
|
||||
|
@ -2013,7 +2009,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
|
|||
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
|
||||
|
||||
if (pollRspWrapper->taosxRsp.blockNum == 0) {
|
||||
tscDebug("consumer:0x%" PRIx64 " taosx empty block received, vgId:%d, vg total:%" PRId64 " reqId:0x%" PRIx64,
|
||||
tscDebug("consumer:0x%" PRIx64 " taosx empty block received, vgId:%d, vg total:%" PRId64 ", reqId:0x%" PRIx64,
|
||||
tmq->consumerId, pVg->vgId, pVg->numOfRows, pollRspWrapper->reqId);
|
||||
pVg->emptyBlockReceiveTs = taosGetTimestampMs();
|
||||
pRspWrapper = tmqFreeRspWrapper(pRspWrapper);
|
||||
|
@ -2037,7 +2033,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
|
|||
char buf[TSDB_OFFSET_LEN];
|
||||
tFormatOffset(buf, TSDB_OFFSET_LEN, &pVg->offsetInfo.currentOffset);
|
||||
tscDebug("consumer:0x%" PRIx64 " process taosx poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%" PRId64
|
||||
", vg total:%" PRId64 " total:%" PRId64 " reqId:0x%" PRIx64,
|
||||
", vg total:%" PRId64 ", total:%" PRId64 ", reqId:0x%" PRIx64,
|
||||
tmq->consumerId, pVg->vgId, buf, pollRspWrapper->dataRsp.blockNum, numOfRows, pVg->numOfRows,
|
||||
tmq->totalRows, pollRspWrapper->reqId);
|
||||
|
||||
|
@ -2456,7 +2452,7 @@ int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet) {
|
|||
// if no more waiting rsp
|
||||
pParamSet->callbackFn(tmq, pParamSet->code, pParamSet->userParam);
|
||||
taosMemoryFree(pParamSet);
|
||||
tmq->needReportOffsetRows = true;
|
||||
// tmq->needReportOffsetRows = true;
|
||||
|
||||
taosReleaseRef(tmqMgmt.rsetId, refId);
|
||||
return 0;
|
||||
|
|
|
@ -3745,8 +3745,8 @@ int32_t tSerializeSSTbHbRsp(void *buf, int32_t bufLen, SSTbHbRsp *pRsp) {
|
|||
if (tEncodeI32(&encoder, pIndexRsp->indexSize) < 0) return -1;
|
||||
int32_t num = taosArrayGetSize(pIndexRsp->pIndex);
|
||||
if (tEncodeI32(&encoder, num) < 0) return -1;
|
||||
for (int32_t i = 0; i < num; ++i) {
|
||||
STableIndexInfo *pInfo = (STableIndexInfo *)taosArrayGet(pIndexRsp->pIndex, i);
|
||||
for (int32_t j = 0; j < num; ++j) {
|
||||
STableIndexInfo *pInfo = (STableIndexInfo *)taosArrayGet(pIndexRsp->pIndex, j);
|
||||
if (tSerializeSTableIndexInfo(&encoder, pInfo) < 0) return -1;
|
||||
}
|
||||
}
|
||||
|
@ -3812,7 +3812,7 @@ int32_t tDeserializeSSTbHbRsp(void *buf, int32_t bufLen, SSTbHbRsp *pRsp) {
|
|||
tableIndexRsp.pIndex = taosArrayInit(num, sizeof(STableIndexInfo));
|
||||
if (NULL == tableIndexRsp.pIndex) return -1;
|
||||
STableIndexInfo info;
|
||||
for (int32_t i = 0; i < num; ++i) {
|
||||
for (int32_t j = 0; j < num; ++j) {
|
||||
if (tDeserializeSTableIndexInfo(&decoder, &info) < 0) return -1;
|
||||
if (NULL == taosArrayPush(tableIndexRsp.pIndex, &info)) {
|
||||
taosMemoryFree(info.expr);
|
||||
|
@ -7168,9 +7168,9 @@ int32_t tFormatOffset(char *buf, int32_t maxLen, const STqOffsetVal *pVal) {
|
|||
} else if (pVal->type == TMQ_OFFSET__RESET_LATEST) {
|
||||
snprintf(buf, maxLen, "latest");
|
||||
} else if (pVal->type == TMQ_OFFSET__LOG) {
|
||||
snprintf(buf, maxLen, "log:%" PRId64, pVal->version);
|
||||
snprintf(buf, maxLen, "wal:%" PRId64, pVal->version);
|
||||
} else if (pVal->type == TMQ_OFFSET__SNAPSHOT_DATA || pVal->type == TMQ_OFFSET__SNAPSHOT_META) {
|
||||
snprintf(buf, maxLen, "snapshot:%" PRId64 "|%" PRId64, pVal->uid, pVal->ts);
|
||||
snprintf(buf, maxLen, "tsdb:%" PRId64 "|%" PRId64, pVal->uid, pVal->ts);
|
||||
} else {
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
|
|
|
@ -630,6 +630,8 @@ void *tDecodeSubscribeObj(const void *buf, SMqSubscribeObj *pSub, int8_t sver) {
|
|||
}
|
||||
}
|
||||
buf = taosDecodeString(buf, &pSub->qmsg);
|
||||
}else{
|
||||
pSub->qmsg = taosStrdup("");
|
||||
}
|
||||
return (void *)buf;
|
||||
}
|
||||
|
|
|
@ -379,7 +379,7 @@ int32_t tqCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle){
|
|||
handle->execHandle.execTb.qmsg = taosStrdup(req->qmsg);
|
||||
}
|
||||
|
||||
handle->snapshotVer = walGetLastVer(pTq->pVnode->pWal);
|
||||
handle->snapshotVer = walGetCommittedVer(pTq->pVnode->pWal);
|
||||
|
||||
if(buildHandle(pTq, handle) < 0){
|
||||
return -1;
|
||||
|
|
|
@ -81,11 +81,11 @@ void walRefLastVer(SWal *pWal, SWalRef *pRef) {
|
|||
wDebug("vgId:%d, wal ref version %" PRId64 " for last", pWal->cfg.vgId, ver);
|
||||
}
|
||||
|
||||
void walRefCommitVer(SWal *pWal, SWalRef *pRef) {
|
||||
taosThreadMutexLock(&pWal->mutex);
|
||||
int64_t ver = walGetCommittedVer(pWal);
|
||||
pRef->refVer = ver;
|
||||
|
||||
taosThreadMutexUnlock(&pWal->mutex);
|
||||
wDebug("vgId:%d, wal ref version %" PRId64 " for committed", pWal->cfg.vgId, ver);
|
||||
}
|
||||
//void walRefCommitVer(SWal *pWal, SWalRef *pRef) {
|
||||
// taosThreadMutexLock(&pWal->mutex);
|
||||
// int64_t ver = walGetCommittedVer(pWal);
|
||||
// pRef->refVer = ver;
|
||||
//
|
||||
// taosThreadMutexUnlock(&pWal->mutex);
|
||||
// wDebug("vgId:%d, wal ref version %" PRId64 " for committed", pWal->cfg.vgId, ver);
|
||||
//}
|
||||
|
|
Loading…
Reference in New Issue