Merge pull request #14720 from taosdata/feature/stream
refactor(stream)
This commit is contained in:
commit
9fd1a64557
|
@ -176,7 +176,7 @@ int32_t qGetStreamScanStatus(qTaskInfo_t tinfo, uint64_t* uid, int64_t* ts);
|
||||||
|
|
||||||
int32_t qStreamPrepareTsdbScan(qTaskInfo_t tinfo, uint64_t uid, int64_t ts);
|
int32_t qStreamPrepareTsdbScan(qTaskInfo_t tinfo, uint64_t uid, int64_t ts);
|
||||||
|
|
||||||
int32_t qStreamPrepareScan1(qTaskInfo_t tinfo, const STqOffsetVal* pOffset);
|
int32_t qStreamPrepareScan(qTaskInfo_t tinfo, const STqOffsetVal* pOffset);
|
||||||
|
|
||||||
int32_t qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset);
|
int32_t qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset);
|
||||||
|
|
||||||
|
|
|
@ -133,6 +133,7 @@ typedef struct {
|
||||||
int64_t curFileFirstVer;
|
int64_t curFileFirstVer;
|
||||||
int64_t curVersion;
|
int64_t curVersion;
|
||||||
int64_t capacity;
|
int64_t capacity;
|
||||||
|
int8_t curInvalid;
|
||||||
TdThreadMutex mutex;
|
TdThreadMutex mutex;
|
||||||
SWalFilterCond cond;
|
SWalFilterCond cond;
|
||||||
SWalCkHead *pHead;
|
SWalCkHead *pHead;
|
||||||
|
|
|
@ -271,6 +271,9 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
||||||
tqDebug("tmq poll: consumer %ld (epoch %d), subkey %s, recv poll req in vg %d, req offset %s", consumerId,
|
tqDebug("tmq poll: consumer %ld (epoch %d), subkey %s, recv poll req in vg %d, req offset %s", consumerId,
|
||||||
pReq->epoch, pHandle->subKey, TD_VID(pTq->pVnode), buf);
|
pReq->epoch, pHandle->subKey, TD_VID(pTq->pVnode), buf);
|
||||||
|
|
||||||
|
SMqDataRsp dataRsp = {0};
|
||||||
|
tqInitDataRsp(&dataRsp, pReq, pHandle->execHandle.subType);
|
||||||
|
|
||||||
// 2.reset offset if needed
|
// 2.reset offset if needed
|
||||||
if (reqOffset.type > 0) {
|
if (reqOffset.type > 0) {
|
||||||
fetchOffsetNew = reqOffset;
|
fetchOffsetNew = reqOffset;
|
||||||
|
@ -294,41 +297,25 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
||||||
tqOffsetResetToLog(&fetchOffsetNew, walGetFirstVer(pTq->pVnode->pWal));
|
tqOffsetResetToLog(&fetchOffsetNew, walGetFirstVer(pTq->pVnode->pWal));
|
||||||
}
|
}
|
||||||
} else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
|
} else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
|
||||||
tqOffsetResetToLog(&fetchOffsetNew, walGetLastVer(pTq->pVnode->pWal));
|
tqOffsetResetToLog(&dataRsp.rspOffset, walGetLastVer(pTq->pVnode->pWal));
|
||||||
tqDebug("tmq poll: consumer %ld, subkey %s, offset reset to %ld", consumerId, pHandle->subKey,
|
tqDebug("tmq poll: consumer %ld, subkey %s, offset reset to %ld", consumerId, pHandle->subKey,
|
||||||
fetchOffsetNew.version);
|
dataRsp.rspOffset.version);
|
||||||
SMqDataRsp dataRsp = {0};
|
|
||||||
tqInitDataRsp(&dataRsp, pReq, pHandle->execHandle.subType);
|
|
||||||
dataRsp.rspOffset = fetchOffsetNew;
|
|
||||||
code = 0;
|
|
||||||
if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) {
|
if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) {
|
||||||
code = -1;
|
code = -1;
|
||||||
}
|
}
|
||||||
taosArrayDestroy(dataRsp.blockDataLen);
|
goto OVER;
|
||||||
taosArrayDestroyP(dataRsp.blockData, (FDelete)taosMemoryFree);
|
|
||||||
|
|
||||||
if (dataRsp.withSchema) {
|
|
||||||
taosArrayDestroyP(dataRsp.blockSchema, (FDelete)tDeleteSSchemaWrapper);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (dataRsp.withTbName) {
|
|
||||||
taosArrayDestroyP(dataRsp.blockTbName, (FDelete)taosMemoryFree);
|
|
||||||
}
|
|
||||||
return code;
|
|
||||||
} else if (reqOffset.type == TMQ_OFFSET__RESET_NONE) {
|
} else if (reqOffset.type == TMQ_OFFSET__RESET_NONE) {
|
||||||
tqError("tmq poll: subkey %s, no offset committed for consumer %" PRId64
|
tqError("tmq poll: subkey %s, no offset committed for consumer %" PRId64
|
||||||
" in vg %d, subkey %s, reset none failed",
|
" in vg %d, subkey %s, reset none failed",
|
||||||
pHandle->subKey, consumerId, TD_VID(pTq->pVnode), pReq->subKey);
|
pHandle->subKey, consumerId, TD_VID(pTq->pVnode), pReq->subKey);
|
||||||
terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET;
|
terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET;
|
||||||
return -1;
|
code = -1;
|
||||||
|
goto OVER;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// 3.query
|
// 3.query
|
||||||
SMqDataRsp dataRsp = {0};
|
|
||||||
tqInitDataRsp(&dataRsp, pReq, pHandle->execHandle.subType);
|
|
||||||
|
|
||||||
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN && fetchOffsetNew.type == TMQ_OFFSET__LOG) {
|
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN && fetchOffsetNew.type == TMQ_OFFSET__LOG) {
|
||||||
fetchOffsetNew.version++;
|
fetchOffsetNew.version++;
|
||||||
if (tqScanLog(pTq, &pHandle->execHandle, &dataRsp, &fetchOffsetNew) < 0) {
|
if (tqScanLog(pTq, &pHandle->execHandle, &dataRsp, &fetchOffsetNew) < 0) {
|
||||||
|
@ -337,7 +324,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
||||||
goto OVER;
|
goto OVER;
|
||||||
}
|
}
|
||||||
if (dataRsp.blockNum == 0) {
|
if (dataRsp.blockNum == 0) {
|
||||||
// TODO add to async task
|
// TODO add to async task pool
|
||||||
/*dataRsp.rspOffset.version--;*/
|
/*dataRsp.rspOffset.version--;*/
|
||||||
}
|
}
|
||||||
if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) {
|
if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) {
|
||||||
|
@ -350,7 +337,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
||||||
int64_t fetchVer = fetchOffsetNew.version + 1;
|
int64_t fetchVer = fetchOffsetNew.version + 1;
|
||||||
SWalCkHead* pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048);
|
SWalCkHead* pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048);
|
||||||
if (pCkHead == NULL) {
|
if (pCkHead == NULL) {
|
||||||
return -1;
|
code = -1;
|
||||||
|
goto OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
walSetReaderCapacity(pHandle->pWalReader, 2048);
|
walSetReaderCapacity(pHandle->pWalReader, 2048);
|
||||||
|
|
|
@ -62,7 +62,7 @@ static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp) {
|
||||||
int64_t tqScanLog(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, STqOffsetVal* pOffset) {
|
int64_t tqScanLog(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, STqOffsetVal* pOffset) {
|
||||||
qTaskInfo_t task = pExec->execCol.task[0];
|
qTaskInfo_t task = pExec->execCol.task[0];
|
||||||
|
|
||||||
if (qStreamPrepareScan1(task, pOffset) < 0) {
|
if (qStreamPrepareScan(task, pOffset) < 0) {
|
||||||
pRsp->rspOffset = *pOffset;
|
pRsp->rspOffset = *pOffset;
|
||||||
pRsp->rspOffset.version--;
|
pRsp->rspOffset.version--;
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -110,10 +110,6 @@ int32_t tqScanSnapshot(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, S
|
||||||
ASSERT(pExec->subType == TOPIC_SUB_TYPE__COLUMN);
|
ASSERT(pExec->subType == TOPIC_SUB_TYPE__COLUMN);
|
||||||
qTaskInfo_t task = pExec->execCol.task[workerId];
|
qTaskInfo_t task = pExec->execCol.task[workerId];
|
||||||
|
|
||||||
/*if (qStreamScanSnapshot(task) < 0) {*/
|
|
||||||
/*ASSERT(0);*/
|
|
||||||
/*}*/
|
|
||||||
|
|
||||||
if (qStreamPrepareTsdbScan(task, offset.uid, offset.ts) < 0) {
|
if (qStreamPrepareTsdbScan(task, offset.uid, offset.ts) < 0) {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,8 +22,8 @@ int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHea
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
if (walFetchHead(pHandle->pWalReader, offset, *ppCkHead) < 0) {
|
if (walFetchHead(pHandle->pWalReader, offset, *ppCkHead) < 0) {
|
||||||
tqDebug("tmq poll: consumer:%" PRId64 ", (epoch %d) vgId:%d offset %" PRId64 ", no more log to return", pHandle->consumerId,
|
tqDebug("tmq poll: consumer:%" PRId64 ", (epoch %d) vgId:%d offset %" PRId64 ", no more log to return",
|
||||||
pHandle->epoch, TD_VID(pTq->pVnode), offset);
|
pHandle->consumerId, pHandle->epoch, TD_VID(pTq->pVnode), offset);
|
||||||
*fetchOffset = offset - 1;
|
*fetchOffset = offset - 1;
|
||||||
code = -1;
|
code = -1;
|
||||||
goto END;
|
goto END;
|
||||||
|
@ -104,8 +104,13 @@ void tqCloseReader(STqReader* pReader) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqSeekVer(STqReader* pReader, int64_t ver) {
|
int32_t tqSeekVer(STqReader* pReader, int64_t ver) {
|
||||||
//
|
if (walReadSeekVer(pReader->pWalReader, ver) < 0) {
|
||||||
return walReadSeekVer(pReader->pWalReader, ver);
|
ASSERT(pReader->pWalReader->curInvalid);
|
||||||
|
ASSERT(pReader->pWalReader->curVersion == ver);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
ASSERT(pReader->pWalReader->curVersion == ver);
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) {
|
int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) {
|
||||||
|
@ -114,9 +119,11 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) {
|
||||||
while (1) {
|
while (1) {
|
||||||
if (!fromProcessedMsg) {
|
if (!fromProcessedMsg) {
|
||||||
if (walNextValidMsg(pReader->pWalReader) < 0) {
|
if (walNextValidMsg(pReader->pWalReader) < 0) {
|
||||||
|
pReader->ver = pReader->pWalReader->curVersion - pReader->pWalReader->curInvalid;
|
||||||
ret->offset.type = TMQ_OFFSET__LOG;
|
ret->offset.type = TMQ_OFFSET__LOG;
|
||||||
ret->offset.version = pReader->ver;
|
ret->offset.version = pReader->ver;
|
||||||
ret->fetchType = FETCH_TYPE__NONE;
|
ret->fetchType = FETCH_TYPE__NONE;
|
||||||
|
ASSERT(ret->offset.version >= 0);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
void* body = pReader->pWalReader->pHead->head.body;
|
void* body = pReader->pWalReader->pHead->head.body;
|
||||||
|
@ -131,19 +138,12 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) {
|
||||||
}
|
}
|
||||||
|
|
||||||
while (tqNextDataBlock(pReader)) {
|
while (tqNextDataBlock(pReader)) {
|
||||||
|
// TODO mem free
|
||||||
memset(&ret->data, 0, sizeof(SSDataBlock));
|
memset(&ret->data, 0, sizeof(SSDataBlock));
|
||||||
int32_t code = tqRetrieveDataBlock(&ret->data, pReader);
|
int32_t code = tqRetrieveDataBlock(&ret->data, pReader);
|
||||||
if (code != 0 || ret->data.info.rows == 0) {
|
if (code != 0 || ret->data.info.rows == 0) {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
continue;
|
continue;
|
||||||
#if 0
|
|
||||||
if (fromProcessedMsg) {
|
|
||||||
ret->fetchType = FETCH_TYPE__NONE;
|
|
||||||
return 0;
|
|
||||||
} else {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
}
|
}
|
||||||
ret->fetchType = FETCH_TYPE__DATA;
|
ret->fetchType = FETCH_TYPE__DATA;
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -152,7 +152,7 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) {
|
||||||
if (fromProcessedMsg) {
|
if (fromProcessedMsg) {
|
||||||
ret->offset.type = TMQ_OFFSET__LOG;
|
ret->offset.type = TMQ_OFFSET__LOG;
|
||||||
ret->offset.version = pReader->ver;
|
ret->offset.version = pReader->ver;
|
||||||
ASSERT(pReader->ver != -1);
|
ASSERT(pReader->ver >= 0);
|
||||||
ret->fetchType = FETCH_TYPE__NONE;
|
ret->fetchType = FETCH_TYPE__NONE;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -280,7 +280,7 @@ int32_t qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t qStreamPrepareScan1(qTaskInfo_t tinfo, const STqOffsetVal* pOffset) {
|
int32_t qStreamPrepareScan(qTaskInfo_t tinfo, const STqOffsetVal* pOffset) {
|
||||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
||||||
SOperatorInfo* pOperator = pTaskInfo->pRoot;
|
SOperatorInfo* pOperator = pTaskInfo->pRoot;
|
||||||
ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM);
|
ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM);
|
||||||
|
@ -293,8 +293,55 @@ int32_t qStreamPrepareScan1(qTaskInfo_t tinfo, const STqOffsetVal* pOffset) {
|
||||||
pOperator->status = OP_OPENED;
|
pOperator->status = OP_OPENED;
|
||||||
if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
|
if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
|
||||||
SStreamScanInfo* pInfo = pOperator->info;
|
SStreamScanInfo* pInfo = pOperator->info;
|
||||||
if (tqSeekVer(pInfo->tqReader, pOffset->version) < 0) {
|
if (pOffset->type == TMQ_OFFSET__LOG) {
|
||||||
return -1;
|
if (tqSeekVer(pInfo->tqReader, pOffset->version) < 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
ASSERT(pInfo->tqReader->pWalReader->curVersion == pOffset->version);
|
||||||
|
} else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
|
||||||
|
pInfo->blockType = STREAM_INPUT__TABLE_SCAN;
|
||||||
|
int64_t uid = pOffset->uid;
|
||||||
|
int64_t ts = pOffset->ts;
|
||||||
|
|
||||||
|
if (uid == 0) {
|
||||||
|
if (taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList) != 0) {
|
||||||
|
STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, 0);
|
||||||
|
uid = pTableInfo->uid;
|
||||||
|
ts = INT64_MIN;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (pTaskInfo->streamInfo.lastStatus.type != TMQ_OFFSET__SNAPSHOT_DATA ||
|
||||||
|
pTaskInfo->streamInfo.lastStatus.uid != uid || pTaskInfo->streamInfo.lastStatus.ts != ts) {
|
||||||
|
STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
|
||||||
|
int32_t tableSz = taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList);
|
||||||
|
bool found = false;
|
||||||
|
for (int32_t i = 0; i < tableSz; i++) {
|
||||||
|
STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, i);
|
||||||
|
if (pTableInfo->uid == uid) {
|
||||||
|
found = true;
|
||||||
|
pTableScanInfo->currentTable = i;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO after dropping table, table may be not found
|
||||||
|
ASSERT(found);
|
||||||
|
|
||||||
|
tsdbSetTableId(pTableScanInfo->dataReader, uid);
|
||||||
|
int64_t oldSkey = pTableScanInfo->cond.twindows[0].skey;
|
||||||
|
pTableScanInfo->cond.twindows[0].skey = ts + 1;
|
||||||
|
tsdbReaderReset(pTableScanInfo->dataReader, &pTableScanInfo->cond, 0);
|
||||||
|
pTableScanInfo->cond.twindows[0].skey = oldSkey;
|
||||||
|
pTableScanInfo->scanTimes = 0;
|
||||||
|
pTableScanInfo->curTWinIdx = 0;
|
||||||
|
|
||||||
|
qDebug("tsdb reader offset seek to uid %ld ts %ld, table cur set to %d , all table num %d", uid, ts,
|
||||||
|
pTableScanInfo->currentTable, tableSz);
|
||||||
|
} else {
|
||||||
|
// switch to log
|
||||||
|
}
|
||||||
|
|
||||||
|
} else {
|
||||||
|
ASSERT(0);
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -40,8 +40,8 @@ static int32_t buildDbTableInfoBlock(const SSDataBlock* p, const SSysTableMeta*
|
||||||
const char* dbName);
|
const char* dbName);
|
||||||
|
|
||||||
static int32_t addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int32_t numOfPseudoExpr,
|
static int32_t addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int32_t numOfPseudoExpr,
|
||||||
SSDataBlock* pBlock, const char* idStr);
|
SSDataBlock* pBlock, const char* idStr);
|
||||||
static bool processBlockWithProbability(const SSampleExecInfo* pInfo);
|
static bool processBlockWithProbability(const SSampleExecInfo* pInfo);
|
||||||
|
|
||||||
bool processBlockWithProbability(const SSampleExecInfo* pInfo) {
|
bool processBlockWithProbability(const SSampleExecInfo* pInfo) {
|
||||||
#if 0
|
#if 0
|
||||||
|
@ -265,7 +265,8 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca
|
||||||
if (pTableScanInfo->pseudoSup.numOfExprs > 0) {
|
if (pTableScanInfo->pseudoSup.numOfExprs > 0) {
|
||||||
SExprSupp* pSup = &pTableScanInfo->pseudoSup;
|
SExprSupp* pSup = &pTableScanInfo->pseudoSup;
|
||||||
|
|
||||||
int32_t code = addTagPseudoColumnData(&pTableScanInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pBlock, GET_TASKID(pTaskInfo));
|
int32_t code = addTagPseudoColumnData(&pTableScanInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pBlock,
|
||||||
|
GET_TASKID(pTaskInfo));
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
longjmp(pTaskInfo->env, code);
|
longjmp(pTaskInfo->env, code);
|
||||||
}
|
}
|
||||||
|
@ -303,7 +304,7 @@ static void prepareForDescendingScan(STableScanInfo* pTableScanInfo, SqlFunction
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int32_t numOfPseudoExpr,
|
int32_t addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int32_t numOfPseudoExpr,
|
||||||
SSDataBlock* pBlock, const char* idStr) {
|
SSDataBlock* pBlock, const char* idStr) {
|
||||||
// currently only the tbname pseudo column
|
// currently only the tbname pseudo column
|
||||||
if (numOfPseudoExpr == 0) {
|
if (numOfPseudoExpr == 0) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -313,7 +314,7 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int
|
||||||
metaReaderInit(&mr, pHandle->meta, 0);
|
metaReaderInit(&mr, pHandle->meta, 0);
|
||||||
int32_t code = metaGetTableEntryByUid(&mr, pBlock->info.uid);
|
int32_t code = metaGetTableEntryByUid(&mr, pBlock->info.uid);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
qError("failed to get table meta, uid:0x%"PRIx64 ", code:%s, %s", pBlock->info.uid, tstrerror(terrno), idStr);
|
qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", pBlock->info.uid, tstrerror(terrno), idStr);
|
||||||
metaReaderClear(&mr);
|
metaReaderClear(&mr);
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
@ -697,7 +698,7 @@ static int32_t doGetTableRowSize(void* pMeta, uint64_t uid, int32_t* rowLen, con
|
||||||
metaReaderInit(&mr, pMeta, 0);
|
metaReaderInit(&mr, pMeta, 0);
|
||||||
int32_t code = metaGetTableEntryByUid(&mr, uid);
|
int32_t code = metaGetTableEntryByUid(&mr, uid);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
qError("failed to get table meta, uid:0x%"PRIx64 ", code:%s, %s", uid, tstrerror(terrno), idstr);
|
qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", uid, tstrerror(terrno), idstr);
|
||||||
metaReaderClear(&mr);
|
metaReaderClear(&mr);
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
@ -711,7 +712,7 @@ static int32_t doGetTableRowSize(void* pMeta, uint64_t uid, int32_t* rowLen, con
|
||||||
uint64_t suid = mr.me.ctbEntry.suid;
|
uint64_t suid = mr.me.ctbEntry.suid;
|
||||||
code = metaGetTableEntryByUid(&mr, suid);
|
code = metaGetTableEntryByUid(&mr, suid);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
qError("failed to get table meta, uid:0x%"PRIx64 ", code:%s, %s", suid, tstrerror(terrno), idstr);
|
qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", suid, tstrerror(terrno), idstr);
|
||||||
metaReaderClear(&mr);
|
metaReaderClear(&mr);
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
@ -738,12 +739,13 @@ static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SBlockDistInfo* pBlockScanInfo = pOperator->info;
|
SBlockDistInfo* pBlockScanInfo = pOperator->info;
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
|
|
||||||
STableBlockDistInfo blockDistInfo = {.minRows = INT_MAX, .maxRows = INT_MIN};
|
STableBlockDistInfo blockDistInfo = {.minRows = INT_MAX, .maxRows = INT_MIN};
|
||||||
int32_t code = doGetTableRowSize(pBlockScanInfo->readHandle.meta, pBlockScanInfo->uid, &blockDistInfo.rowSize, GET_TASKID(pTaskInfo));
|
int32_t code = doGetTableRowSize(pBlockScanInfo->readHandle.meta, pBlockScanInfo->uid, &blockDistInfo.rowSize,
|
||||||
|
GET_TASKID(pTaskInfo));
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
longjmp(pTaskInfo->env, code);
|
longjmp(pTaskInfo->env, code);
|
||||||
}
|
}
|
||||||
|
|
||||||
tsdbGetFileBlocksDistInfo(pBlockScanInfo->pHandle, &blockDistInfo);
|
tsdbGetFileBlocksDistInfo(pBlockScanInfo->pHandle, &blockDistInfo);
|
||||||
|
@ -938,7 +940,8 @@ static bool prepareDataScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32_t t
|
||||||
setGroupId(pInfo, pSDB, GROUPID_COLUMN_INDEX, *pRowIndex);
|
setGroupId(pInfo, pSDB, GROUPID_COLUMN_INDEX, *pRowIndex);
|
||||||
(*pRowIndex) += updateSessionWindowInfo(pCurWin, tsCols, NULL, pSDB->info.rows, *pRowIndex, gap, NULL);
|
(*pRowIndex) += updateSessionWindowInfo(pCurWin, tsCols, NULL, pSDB->info.rows, *pRowIndex, gap, NULL);
|
||||||
} else {
|
} else {
|
||||||
win = getActiveTimeWindow(NULL, &dumyInfo, tsCols[*pRowIndex], &pInfo->interval, pInfo->interval.precision, TSDB_ORDER_ASC);
|
win = getActiveTimeWindow(NULL, &dumyInfo, tsCols[*pRowIndex], &pInfo->interval, pInfo->interval.precision,
|
||||||
|
TSDB_ORDER_ASC);
|
||||||
setGroupId(pInfo, pSDB, GROUPID_COLUMN_INDEX, *pRowIndex);
|
setGroupId(pInfo, pSDB, GROUPID_COLUMN_INDEX, *pRowIndex);
|
||||||
(*pRowIndex) +=
|
(*pRowIndex) +=
|
||||||
getNumOfRowsInTimeWindow(&pSDB->info, tsCols, *pRowIndex, win.ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC);
|
getNumOfRowsInTimeWindow(&pSDB->info, tsCols, *pRowIndex, win.ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC);
|
||||||
|
@ -1219,7 +1222,8 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
|
||||||
|
|
||||||
// currently only the tbname pseudo column
|
// currently only the tbname pseudo column
|
||||||
if (pInfo->numOfPseudoExpr > 0) {
|
if (pInfo->numOfPseudoExpr > 0) {
|
||||||
int32_t code = addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, pInfo->pRes, GET_TASKID(pTaskInfo));
|
int32_t code = addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, pInfo->pRes,
|
||||||
|
GET_TASKID(pTaskInfo));
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
longjmp(pTaskInfo->env, code);
|
longjmp(pTaskInfo->env, code);
|
||||||
}
|
}
|
||||||
|
@ -1264,17 +1268,21 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
|
||||||
pTaskInfo->streamInfo.metaBlk = ret.meta;
|
pTaskInfo->streamInfo.metaBlk = ret.meta;
|
||||||
return NULL;
|
return NULL;
|
||||||
} else if (ret.fetchType == FETCH_TYPE__NONE) {
|
} else if (ret.fetchType == FETCH_TYPE__NONE) {
|
||||||
if (ret.offset.version == -1) {
|
/*if (ret.offset.version == -1) {*/
|
||||||
pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__LOG;
|
/*pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__LOG;*/
|
||||||
pTaskInfo->streamInfo.lastStatus.version = pTaskInfo->streamInfo.prepareStatus.version - 1;
|
/*pTaskInfo->streamInfo.lastStatus.version = pTaskInfo->streamInfo.prepareStatus.version - 1;*/
|
||||||
} else {
|
/*} else {*/
|
||||||
pTaskInfo->streamInfo.lastStatus = ret.offset;
|
pTaskInfo->streamInfo.lastStatus = ret.offset;
|
||||||
}
|
ASSERT(pTaskInfo->streamInfo.lastStatus.version + 1 >= pTaskInfo->streamInfo.prepareStatus.version);
|
||||||
|
/*}*/
|
||||||
return NULL;
|
return NULL;
|
||||||
} else {
|
} else {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
} else if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) {
|
||||||
|
SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp);
|
||||||
|
return pResult && pResult->info.rows > 0 ? pResult : NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t total = taosArrayGetSize(pInfo->pBlockLists);
|
size_t total = taosArrayGetSize(pInfo->pBlockLists);
|
||||||
|
@ -1443,7 +1451,8 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
// currently only the tbname pseudo column
|
// currently only the tbname pseudo column
|
||||||
if (pInfo->numOfPseudoExpr > 0) {
|
if (pInfo->numOfPseudoExpr > 0) {
|
||||||
code = addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, pInfo->pRes, GET_TASKID(pTaskInfo));
|
code = addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, pInfo->pRes,
|
||||||
|
GET_TASKID(pTaskInfo));
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
longjmp(pTaskInfo->env, code);
|
longjmp(pTaskInfo->env, code);
|
||||||
}
|
}
|
||||||
|
@ -1481,6 +1490,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
|
||||||
return (pBlockInfo->rows == 0) ? NULL : pInfo->pRes;
|
return (pBlockInfo->rows == 0) ? NULL : pInfo->pRes;
|
||||||
|
|
||||||
} else if (pInfo->blockType == STREAM_INPUT__TABLE_SCAN) {
|
} else if (pInfo->blockType == STREAM_INPUT__TABLE_SCAN) {
|
||||||
|
/*ASSERT(0);*/
|
||||||
// check reader last status
|
// check reader last status
|
||||||
// if not match, reset status
|
// if not match, reset status
|
||||||
SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp);
|
SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp);
|
||||||
|
@ -1873,9 +1883,10 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
|
||||||
metaReaderInit(&mr, pInfo->readHandle.meta, 0);
|
metaReaderInit(&mr, pInfo->readHandle.meta, 0);
|
||||||
|
|
||||||
uint64_t suid = pInfo->pCur->mr.me.ctbEntry.suid;
|
uint64_t suid = pInfo->pCur->mr.me.ctbEntry.suid;
|
||||||
int32_t code = metaGetTableEntryByUid(&mr, suid);
|
int32_t code = metaGetTableEntryByUid(&mr, suid);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
qError("failed to get super table meta, uid:0x%"PRIx64 ", code:%s, %s", suid, tstrerror(terrno), GET_TASKID(pTaskInfo));
|
qError("failed to get super table meta, uid:0x%" PRIx64 ", code:%s, %s", suid, tstrerror(terrno),
|
||||||
|
GET_TASKID(pTaskInfo));
|
||||||
metaReaderClear(&mr);
|
metaReaderClear(&mr);
|
||||||
metaCloseTbCursor(pInfo->pCur);
|
metaCloseTbCursor(pInfo->pCur);
|
||||||
pInfo->pCur = NULL;
|
pInfo->pCur = NULL;
|
||||||
|
@ -2275,9 +2286,10 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
while (pInfo->curPos < size && count < pOperator->resultInfo.capacity) {
|
while (pInfo->curPos < size && count < pOperator->resultInfo.capacity) {
|
||||||
STableKeyInfo* item = taosArrayGet(pInfo->pTableList->pTableList, pInfo->curPos);
|
STableKeyInfo* item = taosArrayGet(pInfo->pTableList->pTableList, pInfo->curPos);
|
||||||
int32_t code = metaGetTableEntryByUid(&mr, item->uid);
|
int32_t code = metaGetTableEntryByUid(&mr, item->uid);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
qError("failed to get table meta, uid:0x%"PRIx64 ", code:%s, %s", item->uid, tstrerror(terrno), GET_TASKID(pTaskInfo));
|
qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", item->uid, tstrerror(terrno),
|
||||||
|
GET_TASKID(pTaskInfo));
|
||||||
metaReaderClear(&mr);
|
metaReaderClear(&mr);
|
||||||
longjmp(pTaskInfo->env, terrno);
|
longjmp(pTaskInfo->env, terrno);
|
||||||
}
|
}
|
||||||
|
@ -2565,8 +2577,8 @@ static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeSc
|
||||||
|
|
||||||
// currently only the tbname pseudo column
|
// currently only the tbname pseudo column
|
||||||
if (pTableScanInfo->numOfPseudoExpr > 0) {
|
if (pTableScanInfo->numOfPseudoExpr > 0) {
|
||||||
int32_t code = addTagPseudoColumnData(&pTableScanInfo->readHandle, pTableScanInfo->pPseudoExpr, pTableScanInfo->numOfPseudoExpr,
|
int32_t code = addTagPseudoColumnData(&pTableScanInfo->readHandle, pTableScanInfo->pPseudoExpr,
|
||||||
pBlock, GET_TASKID(pTaskInfo));
|
pTableScanInfo->numOfPseudoExpr, pBlock, GET_TASKID(pTaskInfo));
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
longjmp(pTaskInfo->env, code);
|
longjmp(pTaskInfo->env, code);
|
||||||
}
|
}
|
||||||
|
|
|
@ -30,8 +30,9 @@ SWalReader *walOpenReader(SWal *pWal, SWalFilterCond *cond) {
|
||||||
pRead->pWal = pWal;
|
pRead->pWal = pWal;
|
||||||
pRead->pIdxFile = NULL;
|
pRead->pIdxFile = NULL;
|
||||||
pRead->pLogFile = NULL;
|
pRead->pLogFile = NULL;
|
||||||
pRead->curVersion = -1;
|
pRead->curVersion = -5;
|
||||||
pRead->curFileFirstVer = -1;
|
pRead->curFileFirstVer = -1;
|
||||||
|
pRead->curInvalid = 1;
|
||||||
pRead->capacity = 0;
|
pRead->capacity = 0;
|
||||||
if (cond)
|
if (cond)
|
||||||
pRead->cond = *cond;
|
pRead->cond = *cond;
|
||||||
|
@ -152,13 +153,17 @@ static int32_t walReadChangeFile(SWalReader *pRead, int64_t fileFirstVer) {
|
||||||
|
|
||||||
int32_t walReadSeekVer(SWalReader *pRead, int64_t ver) {
|
int32_t walReadSeekVer(SWalReader *pRead, int64_t ver) {
|
||||||
SWal *pWal = pRead->pWal;
|
SWal *pWal = pRead->pWal;
|
||||||
if (ver == pRead->curVersion) {
|
if (!pRead->curInvalid && ver == pRead->curVersion) {
|
||||||
wDebug("wal version %ld match, no need to reset", ver);
|
wDebug("wal version %ld match, no need to reset", ver);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pRead->curInvalid = 1;
|
||||||
|
pRead->curVersion = ver;
|
||||||
|
|
||||||
if (ver > pWal->vers.lastVer || ver < pWal->vers.firstVer) {
|
if (ver > pWal->vers.lastVer || ver < pWal->vers.firstVer) {
|
||||||
wError("vgId:$d, invalid index:%" PRId64 ", first index:%" PRId64 ", last index:%" PRId64, pRead->pWal->cfg.vgId, ver,
|
wDebug("vgId:%d, invalid index:%" PRId64 ", first index:%" PRId64 ", last index:%" PRId64, pRead->pWal->cfg.vgId,
|
||||||
pWal->vers.firstVer, pWal->vers.lastVer);
|
ver, pWal->vers.firstVer, pWal->vers.lastVer);
|
||||||
terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
|
terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -171,13 +176,13 @@ int32_t walReadSeekVer(SWalReader *pRead, int64_t ver) {
|
||||||
SWalFileInfo *pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE);
|
SWalFileInfo *pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE);
|
||||||
ASSERT(pRet != NULL);
|
ASSERT(pRet != NULL);
|
||||||
if (pRead->curFileFirstVer != pRet->firstVer) {
|
if (pRead->curFileFirstVer != pRet->firstVer) {
|
||||||
// error code set inner
|
// error code was set inner
|
||||||
if (walReadChangeFile(pRead, pRet->firstVer) < 0) {
|
if (walReadChangeFile(pRead, pRet->firstVer) < 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// error code set inner
|
// error code was set inner
|
||||||
if (walReadSeekFilePos(pRead, pRet->firstVer, ver) < 0) {
|
if (walReadSeekFilePos(pRead, pRet->firstVer, ver) < 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -193,9 +198,11 @@ void walSetReaderCapacity(SWalReader *pRead, int32_t capacity) { pRead->capacity
|
||||||
|
|
||||||
static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer) {
|
static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer) {
|
||||||
int64_t contLen;
|
int64_t contLen;
|
||||||
if (pRead->curVersion != fetchVer) {
|
if (pRead->curInvalid || pRead->curVersion != fetchVer) {
|
||||||
if (walReadSeekVer(pRead, fetchVer) < 0) {
|
if (walReadSeekVer(pRead, fetchVer) < 0) {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
|
pRead->curVersion = fetchVer;
|
||||||
|
pRead->curInvalid = 1;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -207,7 +214,7 @@ static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer) {
|
||||||
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
||||||
}
|
}
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
pRead->curVersion = -1;
|
pRead->curInvalid = 1;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -238,7 +245,7 @@ static int32_t walFetchBodyNew(SWalReader *pRead) {
|
||||||
pRead->pWal->cfg.vgId, pRead->pHead->head.version, ver);
|
pRead->pWal->cfg.vgId, pRead->pHead->head.version, ver);
|
||||||
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
||||||
}
|
}
|
||||||
pRead->curVersion = -1;
|
pRead->curInvalid = 1;
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -246,7 +253,7 @@ static int32_t walFetchBodyNew(SWalReader *pRead) {
|
||||||
if (pReadHead->version != ver) {
|
if (pReadHead->version != ver) {
|
||||||
wError("vgId:%d, wal fetch body error:%" PRId64 ", read request index:%" PRId64, pRead->pWal->cfg.vgId,
|
wError("vgId:%d, wal fetch body error:%" PRId64 ", read request index:%" PRId64, pRead->pWal->cfg.vgId,
|
||||||
pRead->pHead->head.version, ver);
|
pRead->pHead->head.version, ver);
|
||||||
pRead->curVersion = -1;
|
pRead->curInvalid = 1;
|
||||||
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -254,7 +261,7 @@ static int32_t walFetchBodyNew(SWalReader *pRead) {
|
||||||
|
|
||||||
if (walValidBodyCksum(pRead->pHead) != 0) {
|
if (walValidBodyCksum(pRead->pHead) != 0) {
|
||||||
wError("vgId:%d, wal fetch body error:%" PRId64 ", since body checksum not passed", pRead->pWal->cfg.vgId, ver);
|
wError("vgId:%d, wal fetch body error:%" PRId64 ", since body checksum not passed", pRead->pWal->cfg.vgId, ver);
|
||||||
pRead->curVersion = -1;
|
pRead->curInvalid = 1;
|
||||||
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -273,7 +280,7 @@ static int32_t walSkipFetchBodyNew(SWalReader *pRead) {
|
||||||
code = taosLSeekFile(pRead->pLogFile, pRead->pHead->head.bodyLen, SEEK_CUR);
|
code = taosLSeekFile(pRead->pLogFile, pRead->pHead->head.bodyLen, SEEK_CUR);
|
||||||
if (code < 0) {
|
if (code < 0) {
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
pRead->curVersion = -1;
|
pRead->curInvalid = 1;
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -292,7 +299,7 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pRead->curVersion != ver) {
|
if (pRead->curInvalid || pRead->curVersion != ver) {
|
||||||
code = walReadSeekVer(pRead, ver);
|
code = walReadSeekVer(pRead, ver);
|
||||||
if (code < 0) return -1;
|
if (code < 0) return -1;
|
||||||
}
|
}
|
||||||
|
@ -307,8 +314,7 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) {
|
||||||
code = walValidHeadCksum(pHead);
|
code = walValidHeadCksum(pHead);
|
||||||
|
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
wError("vgId:%d, unexpected wal log index:%" PRId64 ", since head checksum not passed", pRead->pWal->cfg.vgId,
|
wError("vgId:%d, unexpected wal log index:%" PRId64 ", since head checksum not passed", pRead->pWal->cfg.vgId, ver);
|
||||||
ver);
|
|
||||||
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -324,7 +330,7 @@ int32_t walSkipFetchBody(SWalReader *pRead, const SWalCkHead *pHead) {
|
||||||
code = taosLSeekFile(pRead->pLogFile, pHead->head.bodyLen, SEEK_CUR);
|
code = taosLSeekFile(pRead->pLogFile, pHead->head.bodyLen, SEEK_CUR);
|
||||||
if (code < 0) {
|
if (code < 0) {
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
pRead->curVersion = -1;
|
pRead->curInvalid = 1;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -356,14 +362,14 @@ int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead) {
|
||||||
if (pReadHead->version != ver) {
|
if (pReadHead->version != ver) {
|
||||||
wError("vgId:%d, wal fetch body error:%" PRId64 ", read request index:%" PRId64, pRead->pWal->cfg.vgId,
|
wError("vgId:%d, wal fetch body error:%" PRId64 ", read request index:%" PRId64, pRead->pWal->cfg.vgId,
|
||||||
pRead->pHead->head.version, ver);
|
pRead->pHead->head.version, ver);
|
||||||
pRead->curVersion = -1;
|
pRead->curInvalid = 1;
|
||||||
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (walValidBodyCksum(*ppHead) != 0) {
|
if (walValidBodyCksum(*ppHead) != 0) {
|
||||||
wError("vgId:%d, wal fetch body error:%" PRId64 ", since body checksum not passed", pRead->pWal->cfg.vgId, ver);
|
wError("vgId:%d, wal fetch body error:%" PRId64 ", since body checksum not passed", pRead->pWal->cfg.vgId, ver);
|
||||||
pRead->curVersion = -1;
|
pRead->curInvalid = 1;
|
||||||
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -380,8 +386,7 @@ int32_t walReadVer(SWalReader *pRead, int64_t ver) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: check wal life
|
if (pRead->curInvalid || pRead->curVersion != ver) {
|
||||||
if (pRead->curVersion != ver) {
|
|
||||||
if (walReadSeekVer(pRead, ver) < 0) {
|
if (walReadSeekVer(pRead, ver) < 0) {
|
||||||
wError("vgId:%d, unexpected wal log index:%" PRId64 ", since %s", pRead->pWal->cfg.vgId, ver, terrstr());
|
wError("vgId:%d, unexpected wal log index:%" PRId64 ", since %s", pRead->pWal->cfg.vgId, ver, terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -389,8 +394,8 @@ int32_t walReadVer(SWalReader *pRead, int64_t ver) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (ver > pRead->pWal->vers.lastVer || ver < pRead->pWal->vers.firstVer) {
|
if (ver > pRead->pWal->vers.lastVer || ver < pRead->pWal->vers.firstVer) {
|
||||||
wError("vgId:%d, invalid index:%" PRId64 ", first index:%" PRId64 ", last index:%" PRId64, pRead->pWal->cfg.vgId, ver,
|
wError("vgId:%d, invalid index:%" PRId64 ", first index:%" PRId64 ", last index:%" PRId64, pRead->pWal->cfg.vgId,
|
||||||
pRead->pWal->vers.firstVer, pRead->pWal->vers.lastVer);
|
ver, pRead->pWal->vers.firstVer, pRead->pWal->vers.lastVer);
|
||||||
terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
|
terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -410,8 +415,7 @@ int32_t walReadVer(SWalReader *pRead, int64_t ver) {
|
||||||
|
|
||||||
code = walValidHeadCksum(pRead->pHead);
|
code = walValidHeadCksum(pRead->pHead);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
wError("vgId:%d, unexpected wal log index:%" PRId64 ", since head checksum not passed", pRead->pWal->cfg.vgId,
|
wError("vgId:%d, unexpected wal log index:%" PRId64 ", since head checksum not passed", pRead->pWal->cfg.vgId, ver);
|
||||||
ver);
|
|
||||||
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -440,16 +444,15 @@ int32_t walReadVer(SWalReader *pRead, int64_t ver) {
|
||||||
if (pRead->pHead->head.version != ver) {
|
if (pRead->pHead->head.version != ver) {
|
||||||
wError("vgId:%d, unexpected wal log index:%" PRId64 ", read request index:%" PRId64, pRead->pWal->cfg.vgId,
|
wError("vgId:%d, unexpected wal log index:%" PRId64 ", read request index:%" PRId64, pRead->pWal->cfg.vgId,
|
||||||
pRead->pHead->head.version, ver);
|
pRead->pHead->head.version, ver);
|
||||||
pRead->curVersion = -1;
|
pRead->curInvalid = 1;
|
||||||
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
code = walValidBodyCksum(pRead->pHead);
|
code = walValidBodyCksum(pRead->pHead);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
wError("vgId:%d, unexpected wal log index:%" PRId64 ", since body checksum not passed", pRead->pWal->cfg.vgId,
|
wError("vgId:%d, unexpected wal log index:%" PRId64 ", since body checksum not passed", pRead->pWal->cfg.vgId, ver);
|
||||||
ver);
|
pRead->curInvalid = 1;
|
||||||
pRead->curVersion = -1;
|
|
||||||
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue