fix:set offset if read wal none

This commit is contained in:
wangmm0220 2023-03-20 10:10:07 +08:00
parent 55fb48b699
commit a5144f0ba2
4 changed files with 6 additions and 16 deletions

View File

@ -208,7 +208,7 @@ typedef struct SSDataBlock {
} SSDataBlock; } SSDataBlock;
enum { enum {
FETCH_TYPE__DATA = 1, FETCH_TYPE__DATA = 0,
FETCH_TYPE__NONE, FETCH_TYPE__NONE,
}; };

View File

@ -336,17 +336,6 @@ static int32_t tqInitDataRsp(SMqDataRsp* pRsp, const SMqPollReq* pReq, int8_t su
} }
pRsp->withTbName = 0; pRsp->withTbName = 0;
#if 0
pRsp->withTbName = pReq->withTbName;
if (pRsp->withTbName) {
pRsp->blockTbName = taosArrayInit(0, sizeof(void*));
if (pRsp->blockTbName == NULL) {
// TODO free
return -1;
}
}
#endif
pRsp->withSchema = false; pRsp->withSchema = false;
return 0; return 0;
} }

View File

@ -1623,13 +1623,14 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
while (1) { while (1) {
SFetchRet ret = {0}; SFetchRet ret = {0};
tqNextBlock(pInfo->tqReader, &ret); tqNextBlock(pInfo->tqReader, &ret);
pTaskInfo->streamInfo.currentOffset = ret.offset;
if (ret.fetchType == FETCH_TYPE__DATA) { if (ret.fetchType == FETCH_TYPE__DATA) {
blockDataCleanup(pInfo->pRes); blockDataCleanup(pInfo->pRes);
setBlockIntoRes(pInfo, &ret.data, true); setBlockIntoRes(pInfo, &ret.data, true);
qDebug("queue scan log return %d rows", pInfo->pRes->info.rows); qDebug("queue scan log return %d rows", pInfo->pRes->info.rows);
return pInfo->pRes; return pInfo->pRes;
}else{
pTaskInfo->streamInfo.currentOffset = ret.offset;
} }
return NULL; return NULL;
} }

View File

@ -469,11 +469,11 @@ int32_t insMergeTableDataCxt(SHashObj* pTableHash, SArray** pVgDataBlocks) {
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
SVgroupDataCxt* pVgCxt = NULL; SVgroupDataCxt* pVgCxt = NULL;
int32_t vgId = pTableCxt->pMeta->vgId; int32_t vgId = pTableCxt->pMeta->vgId;
void** p = taosHashGet(pVgroupHash, &vgId, sizeof(vgId)); void** pp = taosHashGet(pVgroupHash, &vgId, sizeof(vgId));
if (NULL == p) { if (NULL == pp) {
code = createVgroupDataCxt(pTableCxt, pVgroupHash, pVgroupList, &pVgCxt); code = createVgroupDataCxt(pTableCxt, pVgroupHash, pVgroupList, &pVgCxt);
} else { } else {
pVgCxt = *(SVgroupDataCxt**)p; pVgCxt = *(SVgroupDataCxt**)pp;
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = fillVgroupDataCxt(pTableCxt, pVgCxt); code = fillVgroupDataCxt(pTableCxt, pVgCxt);