fix(tmq): set correct start offset value.
This commit is contained in:
parent
c8ad465a0d
commit
5f4cb41e28
|
@ -535,10 +535,14 @@ static int32_t doSendCommitMsg(tmq_t* tmq, SMqClientVg* pVg, const char* pTopicN
|
||||||
atomic_add_fetch_32(&pParamSet->totalRspNum, 1);
|
atomic_add_fetch_32(&pParamSet->totalRspNum, 1);
|
||||||
|
|
||||||
SEp* pEp = GET_ACTIVE_EP(&pVg->epSet);
|
SEp* pEp = GET_ACTIVE_EP(&pVg->epSet);
|
||||||
tscDebug("consumer:0x%" PRIx64 " topic:%s on vgId:%d send offset:%" PRId64 " prev:%" PRId64
|
char offsetBuf[80] = {0};
|
||||||
", ep:%s:%d, ordinal:%d/%d, req:0x%" PRIx64,
|
tFormatOffset(offsetBuf, tListLen(offsetBuf), &pOffset->val);
|
||||||
tmq->consumerId, pOffset->subKey, pVg->vgId, pOffset->val.version, pVg->committedOffset.version, pEp->fqdn,
|
|
||||||
pEp->port, index + 1, totalVgroups, pMsgSendInfo->requestId);
|
char commitBuf[80] = {0};
|
||||||
|
tFormatOffset(commitBuf, tListLen(commitBuf), &pVg->committedOffset);
|
||||||
|
tscDebug("consumer:0x%" PRIx64 " topic:%s on vgId:%d send offset:%s prev:%s, ep:%s:%d, ordinal:%d/%d, req:0x%" PRIx64,
|
||||||
|
tmq->consumerId, pOffset->subKey, pVg->vgId, offsetBuf, commitBuf, pEp->fqdn, pEp->port, index + 1,
|
||||||
|
totalVgroups, pMsgSendInfo->requestId);
|
||||||
|
|
||||||
int64_t transporterId = 0;
|
int64_t transporterId = 0;
|
||||||
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, pMsgSendInfo);
|
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, pMsgSendInfo);
|
||||||
|
|
|
@ -399,8 +399,8 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand
|
||||||
|
|
||||||
char formatBuf[80];
|
char formatBuf[80];
|
||||||
tFormatOffset(formatBuf, 80, pOffsetVal);
|
tFormatOffset(formatBuf, 80, pOffsetVal);
|
||||||
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, existed offset found, offset reset to %s and continue.",
|
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, existed offset found, offset reset to %s and continue. reqId:0x%"PRIx64,
|
||||||
consumerId, pHandle->subKey, vgId, formatBuf);
|
consumerId, pHandle->subKey, vgId, formatBuf, pRequest->reqId);
|
||||||
return 0;
|
return 0;
|
||||||
} else {
|
} else {
|
||||||
// no poll occurs in this vnode for this topic, let's seek to the right offset value.
|
// no poll occurs in this vnode for this topic, let's seek to the right offset value.
|
||||||
|
|
|
@ -92,7 +92,7 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs
|
||||||
SSDataBlock* pDataBlock = NULL;
|
SSDataBlock* pDataBlock = NULL;
|
||||||
uint64_t ts = 0;
|
uint64_t ts = 0;
|
||||||
|
|
||||||
tqDebug("vgId:%d, tmq task start to execute, consumer:0x%" PRIx64, vgId, pHandle->consumerId);
|
tqDebug("vgId:%d, tmq task start to execute, consumer:0x%"PRIx64, vgId, pHandle->consumerId);
|
||||||
|
|
||||||
code = qExecTask(task, &pDataBlock, &ts);
|
code = qExecTask(task, &pDataBlock, &ts);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
|
|
@ -1156,6 +1156,11 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
|
||||||
}
|
}
|
||||||
|
|
||||||
STableKeyInfo keyInfo = {.uid = uid};
|
STableKeyInfo keyInfo = {.uid = uid};
|
||||||
|
int64_t oldSkey = pScanBaseInfo->cond.twindows.skey;
|
||||||
|
|
||||||
|
// let's start from the next ts that returned to consumer.
|
||||||
|
pScanBaseInfo->cond.twindows.skey = ts + 1;
|
||||||
|
|
||||||
if (pScanBaseInfo->dataReader == NULL) {
|
if (pScanBaseInfo->dataReader == NULL) {
|
||||||
int32_t code = tsdbReaderOpen(pScanBaseInfo->readHandle.vnode, &pScanBaseInfo->cond, &keyInfo, 1,
|
int32_t code = tsdbReaderOpen(pScanBaseInfo->readHandle.vnode, &pScanBaseInfo->cond, &keyInfo, 1,
|
||||||
pScanInfo->pResBlock, &pScanBaseInfo->dataReader, NULL);
|
pScanInfo->pResBlock, &pScanBaseInfo->dataReader, NULL);
|
||||||
|
@ -1164,21 +1169,20 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
|
||||||
terrno = code;
|
terrno = code;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
qDebug("tsdb reader created with offset(snapshot) uid:%" PRId64 " ts %" PRId64 " table index:%d, total:%d, %s",
|
||||||
|
uid, ts, pScanInfo->currentTable, numOfTables, id);
|
||||||
} else {
|
} else {
|
||||||
tsdbSetTableList(pScanBaseInfo->dataReader, &keyInfo, 1);
|
tsdbSetTableList(pScanBaseInfo->dataReader, &keyInfo, 1);
|
||||||
int64_t oldSkey = pScanBaseInfo->cond.twindows.skey;
|
|
||||||
|
|
||||||
// let's start from the next ts that returned to consumer.
|
|
||||||
pScanBaseInfo->cond.twindows.skey = ts + 1;
|
|
||||||
tsdbReaderReset(pScanBaseInfo->dataReader, &pScanBaseInfo->cond);
|
tsdbReaderReset(pScanBaseInfo->dataReader, &pScanBaseInfo->cond);
|
||||||
|
|
||||||
// restore the key value
|
|
||||||
pScanBaseInfo->cond.twindows.skey = oldSkey;
|
|
||||||
pScanInfo->scanTimes = 0;
|
pScanInfo->scanTimes = 0;
|
||||||
|
|
||||||
qDebug("tsdb reader offset seek snapshot to uid:%" PRId64 " ts %" PRId64 " table index:%d numOfTable:%d, %s",
|
qDebug("tsdb reader offset seek snapshot to uid:%" PRId64 " ts %" PRId64 " table index:%d numOfTable:%d, %s",
|
||||||
uid, ts, pScanInfo->currentTable, numOfTables, id);
|
uid, ts, pScanInfo->currentTable, numOfTables, id);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// restore the key value
|
||||||
|
pScanBaseInfo->cond.twindows.skey = oldSkey;
|
||||||
} else {
|
} else {
|
||||||
qError("invalid pOffset->type:%d, %s", pOffset->type, id);
|
qError("invalid pOffset->type:%d, %s", pOffset->type, id);
|
||||||
return -1;
|
return -1;
|
||||||
|
|
Loading…
Reference in New Issue