fix(tmq): reset offset
This commit is contained in:
parent
7cf99d5359
commit
0fccdace56
|
@ -319,9 +319,9 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
||||||
|
|
||||||
// 3.query
|
// 3.query
|
||||||
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||||
if (fetchOffsetNew.type == TMQ_OFFSET__LOG) {
|
/*if (fetchOffsetNew.type == TMQ_OFFSET__LOG) {*/
|
||||||
fetchOffsetNew.version++;
|
/*fetchOffsetNew.version++;*/
|
||||||
}
|
/*}*/
|
||||||
if (tqScan(pTq, pHandle, &dataRsp, &fetchOffsetNew) < 0) {
|
if (tqScan(pTq, pHandle, &dataRsp, &fetchOffsetNew) < 0) {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
code = -1;
|
code = -1;
|
||||||
|
|
|
@ -66,7 +66,6 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVa
|
||||||
if (qStreamPrepareScan(task, pOffset) < 0) {
|
if (qStreamPrepareScan(task, pOffset) < 0) {
|
||||||
ASSERT(pOffset->type == TMQ_OFFSET__LOG);
|
ASSERT(pOffset->type == TMQ_OFFSET__LOG);
|
||||||
pRsp->rspOffset = *pOffset;
|
pRsp->rspOffset = *pOffset;
|
||||||
pRsp->rspOffset.version--;
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -288,14 +288,22 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, const STqOffsetVal* pOffset) {
|
||||||
if (!tOffsetEqual(pOffset, &pTaskInfo->streamInfo.lastStatus)) {
|
if (!tOffsetEqual(pOffset, &pTaskInfo->streamInfo.lastStatus)) {
|
||||||
while (1) {
|
while (1) {
|
||||||
uint8_t type = pOperator->operatorType;
|
uint8_t type = pOperator->operatorType;
|
||||||
pOperator->status = OP_OPENED;
|
/*pOperator->status = OP_OPENED;*/
|
||||||
if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
|
if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
|
||||||
SStreamScanInfo* pInfo = pOperator->info;
|
SStreamScanInfo* pInfo = pOperator->info;
|
||||||
if (pOffset->type == TMQ_OFFSET__LOG) {
|
if (pOffset->type == TMQ_OFFSET__LOG) {
|
||||||
if (tqSeekVer(pInfo->tqReader, pOffset->version) < 0) {
|
#if 0
|
||||||
|
if (tOffsetEqual(pOffset, &pTaskInfo->streamInfo.lastStatus) &&
|
||||||
|
pInfo->tqReader->pWalReader->curVersion != pOffset->version) {
|
||||||
|
qError("prepare scan ver %ld actual ver %ld, last %ld", pOffset->version,
|
||||||
|
pInfo->tqReader->pWalReader->curVersion, pTaskInfo->streamInfo.lastStatus.version);
|
||||||
|
ASSERT(0);
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
if (tqSeekVer(pInfo->tqReader, pOffset->version + 1) < 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
ASSERT(pInfo->tqReader->pWalReader->curVersion == pOffset->version);
|
ASSERT(pInfo->tqReader->pWalReader->curVersion == pOffset->version + 1);
|
||||||
} else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
|
} else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
|
||||||
/*pInfo->blockType = STREAM_INPUT__TABLE_SCAN;*/
|
/*pInfo->blockType = STREAM_INPUT__TABLE_SCAN;*/
|
||||||
int64_t uid = pOffset->uid;
|
int64_t uid = pOffset->uid;
|
||||||
|
|
Loading…
Reference in New Issue