diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index 1edf41b6f7..cff4b0234c 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -176,7 +176,7 @@ int32_t qGetStreamScanStatus(qTaskInfo_t tinfo, uint64_t* uid, int64_t* ts); int32_t qStreamPrepareTsdbScan(qTaskInfo_t tinfo, uint64_t uid, int64_t ts); -int32_t qStreamPrepareScan1(qTaskInfo_t tinfo, const STqOffsetVal* pOffset); +int32_t qStreamPrepareScan(qTaskInfo_t tinfo, const STqOffsetVal* pOffset); int32_t qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset); diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index 6d9f29906b..9b94864a05 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -133,6 +133,7 @@ typedef struct { int64_t curFileFirstVer; int64_t curVersion; int64_t capacity; + int8_t curInvalid; TdThreadMutex mutex; SWalFilterCond cond; SWalCkHead *pHead; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 52949838b9..0c72d35027 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -271,6 +271,9 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { tqDebug("tmq poll: consumer %ld (epoch %d), subkey %s, recv poll req in vg %d, req offset %s", consumerId, pReq->epoch, pHandle->subKey, TD_VID(pTq->pVnode), buf); + SMqDataRsp dataRsp = {0}; + tqInitDataRsp(&dataRsp, pReq, pHandle->execHandle.subType); + // 2.reset offset if needed if (reqOffset.type > 0) { fetchOffsetNew = reqOffset; @@ -294,41 +297,25 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { tqOffsetResetToLog(&fetchOffsetNew, walGetFirstVer(pTq->pVnode->pWal)); } } else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) { - tqOffsetResetToLog(&fetchOffsetNew, walGetLastVer(pTq->pVnode->pWal)); + tqOffsetResetToLog(&dataRsp.rspOffset, walGetLastVer(pTq->pVnode->pWal)); tqDebug("tmq poll: consumer %ld, subkey %s, offset reset to %ld", consumerId, pHandle->subKey, - fetchOffsetNew.version); - SMqDataRsp dataRsp = {0}; - tqInitDataRsp(&dataRsp, pReq, pHandle->execHandle.subType); - dataRsp.rspOffset = fetchOffsetNew; - code = 0; + dataRsp.rspOffset.version); if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) { code = -1; } - taosArrayDestroy(dataRsp.blockDataLen); - taosArrayDestroyP(dataRsp.blockData, (FDelete)taosMemoryFree); - - if (dataRsp.withSchema) { - taosArrayDestroyP(dataRsp.blockSchema, (FDelete)tDeleteSSchemaWrapper); - } - - if (dataRsp.withTbName) { - taosArrayDestroyP(dataRsp.blockTbName, (FDelete)taosMemoryFree); - } - return code; + goto OVER; } else if (reqOffset.type == TMQ_OFFSET__RESET_NONE) { tqError("tmq poll: subkey %s, no offset committed for consumer %" PRId64 " in vg %d, subkey %s, reset none failed", pHandle->subKey, consumerId, TD_VID(pTq->pVnode), pReq->subKey); terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET; - return -1; + code = -1; + goto OVER; } } } // 3.query - SMqDataRsp dataRsp = {0}; - tqInitDataRsp(&dataRsp, pReq, pHandle->execHandle.subType); - if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN && fetchOffsetNew.type == TMQ_OFFSET__LOG) { fetchOffsetNew.version++; if (tqScanLog(pTq, &pHandle->execHandle, &dataRsp, &fetchOffsetNew) < 0) { @@ -337,7 +324,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { goto OVER; } if (dataRsp.blockNum == 0) { - // TODO add to async task + // TODO add to async task pool /*dataRsp.rspOffset.version--;*/ } if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) { @@ -350,7 +337,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { int64_t fetchVer = fetchOffsetNew.version + 1; SWalCkHead* pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048); if (pCkHead == NULL) { - return -1; + code = -1; + goto OVER; } walSetReaderCapacity(pHandle->pWalReader, 2048); diff --git a/source/dnode/vnode/src/tq/tqExec.c b/source/dnode/vnode/src/tq/tqExec.c index d381cfcdc7..4ecd88b1fd 100644 --- a/source/dnode/vnode/src/tq/tqExec.c +++ b/source/dnode/vnode/src/tq/tqExec.c @@ -62,7 +62,7 @@ static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp) { int64_t tqScanLog(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, STqOffsetVal* pOffset) { qTaskInfo_t task = pExec->execCol.task[0]; - if (qStreamPrepareScan1(task, pOffset) < 0) { + if (qStreamPrepareScan(task, pOffset) < 0) { pRsp->rspOffset = *pOffset; pRsp->rspOffset.version--; return 0; @@ -110,10 +110,6 @@ int32_t tqScanSnapshot(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, S ASSERT(pExec->subType == TOPIC_SUB_TYPE__COLUMN); qTaskInfo_t task = pExec->execCol.task[workerId]; - /*if (qStreamScanSnapshot(task) < 0) {*/ - /*ASSERT(0);*/ - /*}*/ - if (qStreamPrepareTsdbScan(task, offset.uid, offset.ts) < 0) { ASSERT(0); } diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 35179b0234..8753ecc47c 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -22,8 +22,8 @@ int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHea while (1) { if (walFetchHead(pHandle->pWalReader, offset, *ppCkHead) < 0) { - tqDebug("tmq poll: consumer:%" PRId64 ", (epoch %d) vgId:%d offset %" PRId64 ", no more log to return", pHandle->consumerId, - pHandle->epoch, TD_VID(pTq->pVnode), offset); + tqDebug("tmq poll: consumer:%" PRId64 ", (epoch %d) vgId:%d offset %" PRId64 ", no more log to return", + pHandle->consumerId, pHandle->epoch, TD_VID(pTq->pVnode), offset); *fetchOffset = offset - 1; code = -1; goto END; @@ -104,8 +104,13 @@ void tqCloseReader(STqReader* pReader) { } int32_t tqSeekVer(STqReader* pReader, int64_t ver) { - // - return walReadSeekVer(pReader->pWalReader, ver); + if (walReadSeekVer(pReader->pWalReader, ver) < 0) { + ASSERT(pReader->pWalReader->curInvalid); + ASSERT(pReader->pWalReader->curVersion == ver); + return -1; + } + ASSERT(pReader->pWalReader->curVersion == ver); + return 0; } int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) { @@ -114,9 +119,11 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) { while (1) { if (!fromProcessedMsg) { if (walNextValidMsg(pReader->pWalReader) < 0) { + pReader->ver = pReader->pWalReader->curVersion - pReader->pWalReader->curInvalid; ret->offset.type = TMQ_OFFSET__LOG; ret->offset.version = pReader->ver; ret->fetchType = FETCH_TYPE__NONE; + ASSERT(ret->offset.version >= 0); return -1; } void* body = pReader->pWalReader->pHead->head.body; @@ -131,19 +138,12 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) { } while (tqNextDataBlock(pReader)) { + // TODO mem free memset(&ret->data, 0, sizeof(SSDataBlock)); int32_t code = tqRetrieveDataBlock(&ret->data, pReader); if (code != 0 || ret->data.info.rows == 0) { ASSERT(0); continue; -#if 0 - if (fromProcessedMsg) { - ret->fetchType = FETCH_TYPE__NONE; - return 0; - } else { - break; - } -#endif } ret->fetchType = FETCH_TYPE__DATA; return 0; @@ -152,7 +152,7 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) { if (fromProcessedMsg) { ret->offset.type = TMQ_OFFSET__LOG; ret->offset.version = pReader->ver; - ASSERT(pReader->ver != -1); + ASSERT(pReader->ver >= 0); ret->fetchType = FETCH_TYPE__NONE; return 0; } diff --git a/source/libs/executor/src/executorMain.c b/source/libs/executor/src/executorMain.c index ad33c0ae55..ca1d8b2b10 100644 --- a/source/libs/executor/src/executorMain.c +++ b/source/libs/executor/src/executorMain.c @@ -280,7 +280,7 @@ int32_t qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset) { return 0; } -int32_t qStreamPrepareScan1(qTaskInfo_t tinfo, const STqOffsetVal* pOffset) { +int32_t qStreamPrepareScan(qTaskInfo_t tinfo, const STqOffsetVal* pOffset) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; SOperatorInfo* pOperator = pTaskInfo->pRoot; ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM); @@ -293,8 +293,55 @@ int32_t qStreamPrepareScan1(qTaskInfo_t tinfo, const STqOffsetVal* pOffset) { pOperator->status = OP_OPENED; if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { SStreamScanInfo* pInfo = pOperator->info; - if (tqSeekVer(pInfo->tqReader, pOffset->version) < 0) { - return -1; + if (pOffset->type == TMQ_OFFSET__LOG) { + if (tqSeekVer(pInfo->tqReader, pOffset->version) < 0) { + return -1; + } + ASSERT(pInfo->tqReader->pWalReader->curVersion == pOffset->version); + } else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) { + pInfo->blockType = STREAM_INPUT__TABLE_SCAN; + int64_t uid = pOffset->uid; + int64_t ts = pOffset->ts; + + if (uid == 0) { + if (taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList) != 0) { + STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, 0); + uid = pTableInfo->uid; + ts = INT64_MIN; + } + } + if (pTaskInfo->streamInfo.lastStatus.type != TMQ_OFFSET__SNAPSHOT_DATA || + pTaskInfo->streamInfo.lastStatus.uid != uid || pTaskInfo->streamInfo.lastStatus.ts != ts) { + STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info; + int32_t tableSz = taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList); + bool found = false; + for (int32_t i = 0; i < tableSz; i++) { + STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, i); + if (pTableInfo->uid == uid) { + found = true; + pTableScanInfo->currentTable = i; + } + } + + // TODO after dropping table, table may be not found + ASSERT(found); + + tsdbSetTableId(pTableScanInfo->dataReader, uid); + int64_t oldSkey = pTableScanInfo->cond.twindows[0].skey; + pTableScanInfo->cond.twindows[0].skey = ts + 1; + tsdbReaderReset(pTableScanInfo->dataReader, &pTableScanInfo->cond, 0); + pTableScanInfo->cond.twindows[0].skey = oldSkey; + pTableScanInfo->scanTimes = 0; + pTableScanInfo->curTWinIdx = 0; + + qDebug("tsdb reader offset seek to uid %ld ts %ld, table cur set to %d , all table num %d", uid, ts, + pTableScanInfo->currentTable, tableSz); + } else { + // switch to log + } + + } else { + ASSERT(0); } return 0; } else { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index cdac46c948..d2f63849d0 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -40,8 +40,8 @@ static int32_t buildDbTableInfoBlock(const SSDataBlock* p, const SSysTableMeta* const char* dbName); static int32_t addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int32_t numOfPseudoExpr, - SSDataBlock* pBlock, const char* idStr); -static bool processBlockWithProbability(const SSampleExecInfo* pInfo); + SSDataBlock* pBlock, const char* idStr); +static bool processBlockWithProbability(const SSampleExecInfo* pInfo); bool processBlockWithProbability(const SSampleExecInfo* pInfo) { #if 0 @@ -265,7 +265,8 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca if (pTableScanInfo->pseudoSup.numOfExprs > 0) { SExprSupp* pSup = &pTableScanInfo->pseudoSup; - int32_t code = addTagPseudoColumnData(&pTableScanInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pBlock, GET_TASKID(pTaskInfo)); + int32_t code = addTagPseudoColumnData(&pTableScanInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pBlock, + GET_TASKID(pTaskInfo)); if (code != TSDB_CODE_SUCCESS) { longjmp(pTaskInfo->env, code); } @@ -303,7 +304,7 @@ static void prepareForDescendingScan(STableScanInfo* pTableScanInfo, SqlFunction } int32_t addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int32_t numOfPseudoExpr, - SSDataBlock* pBlock, const char* idStr) { + SSDataBlock* pBlock, const char* idStr) { // currently only the tbname pseudo column if (numOfPseudoExpr == 0) { return TSDB_CODE_SUCCESS; @@ -313,7 +314,7 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int metaReaderInit(&mr, pHandle->meta, 0); int32_t code = metaGetTableEntryByUid(&mr, pBlock->info.uid); if (code != TSDB_CODE_SUCCESS) { - qError("failed to get table meta, uid:0x%"PRIx64 ", code:%s, %s", pBlock->info.uid, tstrerror(terrno), idStr); + qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", pBlock->info.uid, tstrerror(terrno), idStr); metaReaderClear(&mr); return terrno; } @@ -697,7 +698,7 @@ static int32_t doGetTableRowSize(void* pMeta, uint64_t uid, int32_t* rowLen, con metaReaderInit(&mr, pMeta, 0); int32_t code = metaGetTableEntryByUid(&mr, uid); if (code != TSDB_CODE_SUCCESS) { - qError("failed to get table meta, uid:0x%"PRIx64 ", code:%s, %s", uid, tstrerror(terrno), idstr); + qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", uid, tstrerror(terrno), idstr); metaReaderClear(&mr); return terrno; } @@ -711,7 +712,7 @@ static int32_t doGetTableRowSize(void* pMeta, uint64_t uid, int32_t* rowLen, con uint64_t suid = mr.me.ctbEntry.suid; code = metaGetTableEntryByUid(&mr, suid); if (code != TSDB_CODE_SUCCESS) { - qError("failed to get table meta, uid:0x%"PRIx64 ", code:%s, %s", suid, tstrerror(terrno), idstr); + qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", suid, tstrerror(terrno), idstr); metaReaderClear(&mr); return terrno; } @@ -738,12 +739,13 @@ static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator) { } SBlockDistInfo* pBlockScanInfo = pOperator->info; - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; STableBlockDistInfo blockDistInfo = {.minRows = INT_MAX, .maxRows = INT_MIN}; - int32_t code = doGetTableRowSize(pBlockScanInfo->readHandle.meta, pBlockScanInfo->uid, &blockDistInfo.rowSize, GET_TASKID(pTaskInfo)); + int32_t code = doGetTableRowSize(pBlockScanInfo->readHandle.meta, pBlockScanInfo->uid, &blockDistInfo.rowSize, + GET_TASKID(pTaskInfo)); if (code != TSDB_CODE_SUCCESS) { - longjmp(pTaskInfo->env, code); + longjmp(pTaskInfo->env, code); } tsdbGetFileBlocksDistInfo(pBlockScanInfo->pHandle, &blockDistInfo); @@ -938,7 +940,8 @@ static bool prepareDataScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32_t t setGroupId(pInfo, pSDB, GROUPID_COLUMN_INDEX, *pRowIndex); (*pRowIndex) += updateSessionWindowInfo(pCurWin, tsCols, NULL, pSDB->info.rows, *pRowIndex, gap, NULL); } else { - win = getActiveTimeWindow(NULL, &dumyInfo, tsCols[*pRowIndex], &pInfo->interval, pInfo->interval.precision, TSDB_ORDER_ASC); + win = getActiveTimeWindow(NULL, &dumyInfo, tsCols[*pRowIndex], &pInfo->interval, pInfo->interval.precision, + TSDB_ORDER_ASC); setGroupId(pInfo, pSDB, GROUPID_COLUMN_INDEX, *pRowIndex); (*pRowIndex) += getNumOfRowsInTimeWindow(&pSDB->info, tsCols, *pRowIndex, win.ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC); @@ -1219,7 +1222,8 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock // currently only the tbname pseudo column if (pInfo->numOfPseudoExpr > 0) { - int32_t code = addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, pInfo->pRes, GET_TASKID(pTaskInfo)); + int32_t code = addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, pInfo->pRes, + GET_TASKID(pTaskInfo)); if (code != TSDB_CODE_SUCCESS) { longjmp(pTaskInfo->env, code); } @@ -1264,17 +1268,21 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { pTaskInfo->streamInfo.metaBlk = ret.meta; return NULL; } else if (ret.fetchType == FETCH_TYPE__NONE) { - if (ret.offset.version == -1) { - pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__LOG; - pTaskInfo->streamInfo.lastStatus.version = pTaskInfo->streamInfo.prepareStatus.version - 1; - } else { - pTaskInfo->streamInfo.lastStatus = ret.offset; - } + /*if (ret.offset.version == -1) {*/ + /*pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__LOG;*/ + /*pTaskInfo->streamInfo.lastStatus.version = pTaskInfo->streamInfo.prepareStatus.version - 1;*/ + /*} else {*/ + pTaskInfo->streamInfo.lastStatus = ret.offset; + ASSERT(pTaskInfo->streamInfo.lastStatus.version + 1 >= pTaskInfo->streamInfo.prepareStatus.version); + /*}*/ return NULL; } else { ASSERT(0); } } + } else if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) { + SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp); + return pResult && pResult->info.rows > 0 ? pResult : NULL; } size_t total = taosArrayGetSize(pInfo->pBlockLists); @@ -1443,7 +1451,8 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { // currently only the tbname pseudo column if (pInfo->numOfPseudoExpr > 0) { - code = addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, pInfo->pRes, GET_TASKID(pTaskInfo)); + code = addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, pInfo->pRes, + GET_TASKID(pTaskInfo)); if (code != TSDB_CODE_SUCCESS) { longjmp(pTaskInfo->env, code); } @@ -1481,6 +1490,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { return (pBlockInfo->rows == 0) ? NULL : pInfo->pRes; } else if (pInfo->blockType == STREAM_INPUT__TABLE_SCAN) { + /*ASSERT(0);*/ // check reader last status // if not match, reset status SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp); @@ -1873,9 +1883,10 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) { metaReaderInit(&mr, pInfo->readHandle.meta, 0); uint64_t suid = pInfo->pCur->mr.me.ctbEntry.suid; - int32_t code = metaGetTableEntryByUid(&mr, suid); + int32_t code = metaGetTableEntryByUid(&mr, suid); if (code != TSDB_CODE_SUCCESS) { - qError("failed to get super table meta, uid:0x%"PRIx64 ", code:%s, %s", suid, tstrerror(terrno), GET_TASKID(pTaskInfo)); + qError("failed to get super table meta, uid:0x%" PRIx64 ", code:%s, %s", suid, tstrerror(terrno), + GET_TASKID(pTaskInfo)); metaReaderClear(&mr); metaCloseTbCursor(pInfo->pCur); pInfo->pCur = NULL; @@ -2275,9 +2286,10 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) { while (pInfo->curPos < size && count < pOperator->resultInfo.capacity) { STableKeyInfo* item = taosArrayGet(pInfo->pTableList->pTableList, pInfo->curPos); - int32_t code = metaGetTableEntryByUid(&mr, item->uid); + int32_t code = metaGetTableEntryByUid(&mr, item->uid); if (code != TSDB_CODE_SUCCESS) { - qError("failed to get table meta, uid:0x%"PRIx64 ", code:%s, %s", item->uid, tstrerror(terrno), GET_TASKID(pTaskInfo)); + qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", item->uid, tstrerror(terrno), + GET_TASKID(pTaskInfo)); metaReaderClear(&mr); longjmp(pTaskInfo->env, terrno); } @@ -2565,8 +2577,8 @@ static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeSc // currently only the tbname pseudo column if (pTableScanInfo->numOfPseudoExpr > 0) { - int32_t code = addTagPseudoColumnData(&pTableScanInfo->readHandle, pTableScanInfo->pPseudoExpr, pTableScanInfo->numOfPseudoExpr, - pBlock, GET_TASKID(pTaskInfo)); + int32_t code = addTagPseudoColumnData(&pTableScanInfo->readHandle, pTableScanInfo->pPseudoExpr, + pTableScanInfo->numOfPseudoExpr, pBlock, GET_TASKID(pTaskInfo)); if (code != TSDB_CODE_SUCCESS) { longjmp(pTaskInfo->env, code); } diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index 18b500aa19..2d910a85b8 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -30,8 +30,9 @@ SWalReader *walOpenReader(SWal *pWal, SWalFilterCond *cond) { pRead->pWal = pWal; pRead->pIdxFile = NULL; pRead->pLogFile = NULL; - pRead->curVersion = -1; + pRead->curVersion = -5; pRead->curFileFirstVer = -1; + pRead->curInvalid = 1; pRead->capacity = 0; if (cond) pRead->cond = *cond; @@ -152,13 +153,17 @@ static int32_t walReadChangeFile(SWalReader *pRead, int64_t fileFirstVer) { int32_t walReadSeekVer(SWalReader *pRead, int64_t ver) { SWal *pWal = pRead->pWal; - if (ver == pRead->curVersion) { + if (!pRead->curInvalid && ver == pRead->curVersion) { wDebug("wal version %ld match, no need to reset", ver); return 0; } + + pRead->curInvalid = 1; + pRead->curVersion = ver; + if (ver > pWal->vers.lastVer || ver < pWal->vers.firstVer) { - wError("vgId:$d, invalid index:%" PRId64 ", first index:%" PRId64 ", last index:%" PRId64, pRead->pWal->cfg.vgId, ver, - pWal->vers.firstVer, pWal->vers.lastVer); + wDebug("vgId:%d, invalid index:%" PRId64 ", first index:%" PRId64 ", last index:%" PRId64, pRead->pWal->cfg.vgId, + ver, pWal->vers.firstVer, pWal->vers.lastVer); terrno = TSDB_CODE_WAL_LOG_NOT_EXIST; return -1; } @@ -171,13 +176,13 @@ int32_t walReadSeekVer(SWalReader *pRead, int64_t ver) { SWalFileInfo *pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE); ASSERT(pRet != NULL); if (pRead->curFileFirstVer != pRet->firstVer) { - // error code set inner + // error code was set inner if (walReadChangeFile(pRead, pRet->firstVer) < 0) { return -1; } } - // error code set inner + // error code was set inner if (walReadSeekFilePos(pRead, pRet->firstVer, ver) < 0) { return -1; } @@ -193,9 +198,11 @@ void walSetReaderCapacity(SWalReader *pRead, int32_t capacity) { pRead->capacity static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer) { int64_t contLen; - if (pRead->curVersion != fetchVer) { + if (pRead->curInvalid || pRead->curVersion != fetchVer) { if (walReadSeekVer(pRead, fetchVer) < 0) { ASSERT(0); + pRead->curVersion = fetchVer; + pRead->curInvalid = 1; return -1; } } @@ -207,7 +214,7 @@ static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer) { terrno = TSDB_CODE_WAL_FILE_CORRUPTED; } ASSERT(0); - pRead->curVersion = -1; + pRead->curInvalid = 1; return -1; } return 0; @@ -238,7 +245,7 @@ static int32_t walFetchBodyNew(SWalReader *pRead) { pRead->pWal->cfg.vgId, pRead->pHead->head.version, ver); terrno = TSDB_CODE_WAL_FILE_CORRUPTED; } - pRead->curVersion = -1; + pRead->curInvalid = 1; ASSERT(0); return -1; } @@ -246,7 +253,7 @@ static int32_t walFetchBodyNew(SWalReader *pRead) { if (pReadHead->version != ver) { wError("vgId:%d, wal fetch body error:%" PRId64 ", read request index:%" PRId64, pRead->pWal->cfg.vgId, pRead->pHead->head.version, ver); - pRead->curVersion = -1; + pRead->curInvalid = 1; terrno = TSDB_CODE_WAL_FILE_CORRUPTED; ASSERT(0); return -1; @@ -254,7 +261,7 @@ static int32_t walFetchBodyNew(SWalReader *pRead) { if (walValidBodyCksum(pRead->pHead) != 0) { wError("vgId:%d, wal fetch body error:%" PRId64 ", since body checksum not passed", pRead->pWal->cfg.vgId, ver); - pRead->curVersion = -1; + pRead->curInvalid = 1; terrno = TSDB_CODE_WAL_FILE_CORRUPTED; ASSERT(0); return -1; @@ -273,7 +280,7 @@ static int32_t walSkipFetchBodyNew(SWalReader *pRead) { code = taosLSeekFile(pRead->pLogFile, pRead->pHead->head.bodyLen, SEEK_CUR); if (code < 0) { terrno = TAOS_SYSTEM_ERROR(errno); - pRead->curVersion = -1; + pRead->curInvalid = 1; ASSERT(0); return -1; } @@ -292,7 +299,7 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) { return -1; } - if (pRead->curVersion != ver) { + if (pRead->curInvalid || pRead->curVersion != ver) { code = walReadSeekVer(pRead, ver); if (code < 0) return -1; } @@ -307,8 +314,7 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) { code = walValidHeadCksum(pHead); if (code != 0) { - wError("vgId:%d, unexpected wal log index:%" PRId64 ", since head checksum not passed", pRead->pWal->cfg.vgId, - ver); + wError("vgId:%d, unexpected wal log index:%" PRId64 ", since head checksum not passed", pRead->pWal->cfg.vgId, ver); terrno = TSDB_CODE_WAL_FILE_CORRUPTED; return -1; } @@ -324,7 +330,7 @@ int32_t walSkipFetchBody(SWalReader *pRead, const SWalCkHead *pHead) { code = taosLSeekFile(pRead->pLogFile, pHead->head.bodyLen, SEEK_CUR); if (code < 0) { terrno = TAOS_SYSTEM_ERROR(errno); - pRead->curVersion = -1; + pRead->curInvalid = 1; return -1; } @@ -356,14 +362,14 @@ int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead) { if (pReadHead->version != ver) { wError("vgId:%d, wal fetch body error:%" PRId64 ", read request index:%" PRId64, pRead->pWal->cfg.vgId, pRead->pHead->head.version, ver); - pRead->curVersion = -1; + pRead->curInvalid = 1; terrno = TSDB_CODE_WAL_FILE_CORRUPTED; return -1; } if (walValidBodyCksum(*ppHead) != 0) { wError("vgId:%d, wal fetch body error:%" PRId64 ", since body checksum not passed", pRead->pWal->cfg.vgId, ver); - pRead->curVersion = -1; + pRead->curInvalid = 1; terrno = TSDB_CODE_WAL_FILE_CORRUPTED; return -1; } @@ -380,8 +386,7 @@ int32_t walReadVer(SWalReader *pRead, int64_t ver) { return -1; } - // TODO: check wal life - if (pRead->curVersion != ver) { + if (pRead->curInvalid || pRead->curVersion != ver) { if (walReadSeekVer(pRead, ver) < 0) { wError("vgId:%d, unexpected wal log index:%" PRId64 ", since %s", pRead->pWal->cfg.vgId, ver, terrstr()); return -1; @@ -389,8 +394,8 @@ int32_t walReadVer(SWalReader *pRead, int64_t ver) { } if (ver > pRead->pWal->vers.lastVer || ver < pRead->pWal->vers.firstVer) { - wError("vgId:%d, invalid index:%" PRId64 ", first index:%" PRId64 ", last index:%" PRId64, pRead->pWal->cfg.vgId, ver, - pRead->pWal->vers.firstVer, pRead->pWal->vers.lastVer); + wError("vgId:%d, invalid index:%" PRId64 ", first index:%" PRId64 ", last index:%" PRId64, pRead->pWal->cfg.vgId, + ver, pRead->pWal->vers.firstVer, pRead->pWal->vers.lastVer); terrno = TSDB_CODE_WAL_LOG_NOT_EXIST; return -1; } @@ -410,8 +415,7 @@ int32_t walReadVer(SWalReader *pRead, int64_t ver) { code = walValidHeadCksum(pRead->pHead); if (code != 0) { - wError("vgId:%d, unexpected wal log index:%" PRId64 ", since head checksum not passed", pRead->pWal->cfg.vgId, - ver); + wError("vgId:%d, unexpected wal log index:%" PRId64 ", since head checksum not passed", pRead->pWal->cfg.vgId, ver); terrno = TSDB_CODE_WAL_FILE_CORRUPTED; return -1; } @@ -440,16 +444,15 @@ int32_t walReadVer(SWalReader *pRead, int64_t ver) { if (pRead->pHead->head.version != ver) { wError("vgId:%d, unexpected wal log index:%" PRId64 ", read request index:%" PRId64, pRead->pWal->cfg.vgId, pRead->pHead->head.version, ver); - pRead->curVersion = -1; + pRead->curInvalid = 1; terrno = TSDB_CODE_WAL_FILE_CORRUPTED; return -1; } code = walValidBodyCksum(pRead->pHead); if (code != 0) { - wError("vgId:%d, unexpected wal log index:%" PRId64 ", since body checksum not passed", pRead->pWal->cfg.vgId, - ver); - pRead->curVersion = -1; + wError("vgId:%d, unexpected wal log index:%" PRId64 ", since body checksum not passed", pRead->pWal->cfg.vgId, ver); + pRead->curInvalid = 1; terrno = TSDB_CODE_WAL_FILE_CORRUPTED; return -1; }