fix(tmq): offset

This commit is contained in:
Liu Jicong 2022-07-08 21:37:15 +08:00
parent a9e75c87aa
commit 8d01ae00cb
6 changed files with 39 additions and 12 deletions

View File

@ -294,6 +294,15 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
}
} else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
tqOffsetResetToLog(&fetchOffsetNew, walGetLastVer(pTq->pVnode->pWal));
tqDebug("tmq poll: consumer %ld, subkey %s, offset reset to %ld", consumerId, pHandle->subKey,
fetchOffsetNew.version);
SMqDataRsp dataRsp = {0};
tqInitDataRsp(&dataRsp, pReq, pHandle->execHandle.subType);
dataRsp.rspOffset = fetchOffsetNew;
if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) {
code = -1;
}
goto OVER;
} else if (reqOffset.type == TMQ_OFFSET__RESET_NONE) {
tqError("tmq poll: subkey %s, no offset committed for consumer %ld in vg %d, subkey %s, reset none failed",
pHandle->subKey, consumerId, TD_VID(pTq->pVnode), pReq->subKey);

View File

@ -63,7 +63,9 @@ int64_t tqScanLog(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, STqOff
qTaskInfo_t task = pExec->execCol.task[0];
if (qStreamPrepareScan1(task, pOffset) < 0) {
ASSERT(0);
pRsp->rspOffset = *pOffset;
pRsp->rspOffset.version--;
return 0;
}
while (1) {
@ -91,6 +93,11 @@ int64_t tqScanLog(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, STqOff
if (qStreamExtractOffset(task, &pRsp->rspOffset) < 0) {
ASSERT(0);
}
if (pRsp->rspOffset.type == TMQ_OFFSET__LOG) {
ASSERT(pRsp->rspOffset.version + 1 >= pRsp->reqOffset.version);
}
ASSERT(pRsp->rspOffset.type != 0);
break;

View File

@ -152,6 +152,7 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) {
if (fromProcessedMsg) {
ret->offset.type = TMQ_OFFSET__LOG;
ret->offset.version = pReader->ver;
ASSERT(pReader->ver != -1);
ret->fetchType = FETCH_TYPE__NONE;
return 0;
}

View File

@ -293,7 +293,9 @@ int32_t qStreamPrepareScan1(qTaskInfo_t tinfo, const STqOffsetVal* pOffset) {
pOperator->status = OP_OPENED;
if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
SStreamScanInfo* pInfo = pOperator->info;
tqSeekVer(pInfo->tqReader, pOffset->version);
if (tqSeekVer(pInfo->tqReader, pOffset->version) < 0) {
return -1;
}
return 0;
} else {
ASSERT(pOperator->numOfDownstream == 1);

View File

@ -1222,7 +1222,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
if (setBlockIntoRes(pInfo, &ret.data) < 0) {
ASSERT(0);
}
pTaskInfo->streamInfo.lastStatus = ret.offset;
/*pTaskInfo->streamInfo.lastStatus = ret.offset;*/
if (pInfo->pRes->info.rows > 0) {
return pInfo->pRes;
} else {
@ -1234,7 +1234,12 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
pTaskInfo->streamInfo.metaBlk = ret.meta;
return NULL;
} else if (ret.fetchType == FETCH_TYPE__NONE) {
if (ret.offset.version == -1) {
pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__LOG;
pTaskInfo->streamInfo.lastStatus.version = pTaskInfo->streamInfo.prepareStatus.version - 1;
} else {
pTaskInfo->streamInfo.lastStatus = ret.offset;
}
return NULL;
} else {
ASSERT(0);
@ -1356,8 +1361,6 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
pInfo->pRes->info.type = STREAM_NORMAL;
pInfo->pRes->info.capacity = block.info.rows;
uint64_t* groupIdPre = taosHashGet(pOperator->pTaskInfo->tableqinfoList.map, &block.info.uid, sizeof(int64_t));
if (groupIdPre) {
pInfo->pRes->info.groupId = *groupIdPre;

View File

@ -190,7 +190,10 @@ void walSetReaderCapacity(SWalReader *pRead, int32_t capacity) { pRead->capacity
static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer) {
int64_t contLen;
if (pRead->curVersion != fetchVer) {
if (walReadSeekVer(pRead, fetchVer) < 0) return -1;
if (walReadSeekVer(pRead, fetchVer) < 0) {
ASSERT(0);
return -1;
}
}
contLen = taosReadFile(pRead->pLogFile, pRead->pHead, sizeof(SWalCkHead));
if (contLen != sizeof(SWalCkHead)) {
@ -199,6 +202,7 @@ static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer) {
} else {
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
}
ASSERT(0);
pRead->curVersion = -1;
return -1;
}
@ -265,6 +269,7 @@ static int32_t walSkipFetchBodyNew(SWalReader *pRead) {
if (code < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
pRead->curVersion = -1;
ASSERT(0);
return -1;
}