fix(stream): check return value.

This commit is contained in:
Haojun Liao 2024-07-29 10:29:40 +08:00
parent 878d77fc7f
commit 79ad7585ef
7 changed files with 54 additions and 43 deletions

View File

@ -201,7 +201,7 @@ void qStreamSetOpen(qTaskInfo_t tinfo);
void qStreamSetSourceExcluded(qTaskInfo_t tinfo, int8_t sourceExcluded);
void qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset);
int32_t qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset);
SMqBatchMetaRsp* qStreamExtractMetaMsg(qTaskInfo_t tinfo);

View File

@ -115,14 +115,15 @@ int32_t tqScanData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal*
if (pDataBlock == NULL) {
break;
}
STqOffsetVal offset = {0};
qStreamExtractOffset(task, &offset);
code = qStreamExtractOffset(task, &offset);
TSDB_CHECK_CODE(code, line, END);
pHandle->block = NULL;
code = createOneDataBlock(pDataBlock, true, &pHandle->block);
if (code) {
return code;
}
TSDB_CHECK_CODE(code, line, END);
pHandle->blockTime = offset.ts;
tOffsetDestroy(&offset);
@ -140,8 +141,11 @@ int32_t tqScanData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal*
} else {
code = copyDataBlock(pHandle->block, pDataBlock);
TSDB_CHECK_CODE(code, line, END);
STqOffsetVal offset = {0};
qStreamExtractOffset(task, &offset);
code = qStreamExtractOffset(task, &offset);
TSDB_CHECK_CODE(code, line, END);
pRsp->sleepTime = offset.ts - pHandle->blockTime;
pHandle->blockTime = offset.ts;
tOffsetDestroy(&offset);
@ -164,10 +168,11 @@ int32_t tqScanData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal*
tqDebug("consumer:0x%" PRIx64 " vgId:%d tmq task executed finished, total blocks:%d, totalRows:%d",
pHandle->consumerId, vgId, pRsp->common.blockNum, totalRows);
qStreamExtractOffset(task, &pRsp->common.rspOffset);
code = qStreamExtractOffset(task, &pRsp->common.rspOffset);
END:
if (code != 0) {
tqError("consumer:0x%" PRIx64 " vgId:%d tmq task executed error, line:%d code:%d", pHandle->consumerId, vgId, line, code);
tqError("consumer:0x%" PRIx64 " vgId:%d tmq task executed error, line:%d code:%d", pHandle->consumerId, vgId, line,
code);
}
return code;
}
@ -241,31 +246,40 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqBatc
// get meta
SMqBatchMetaRsp* tmp = qStreamExtractMetaMsg(task);
if (taosArrayGetSize(tmp->batchMetaReq) > 0) {
qStreamExtractOffset(task, &tmp->rspOffset);
code = qStreamExtractOffset(task, &tmp->rspOffset);
if (code) {
return code;
}
*pBatchMetaRsp = *tmp;
tqDebug("tmqsnap task get meta");
break;
}
if (pDataBlock == NULL) {
qStreamExtractOffset(task, pOffset);
code = qStreamExtractOffset(task, pOffset);
if (code) {
break;
}
if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
continue;
}
tqDebug("tmqsnap vgId: %d, tsdb consume over, switch to wal, ver %" PRId64, TD_VID(pTq->pVnode),
pHandle->snapshotVer + 1);
qStreamExtractOffset(task, &pRsp->common.rspOffset);
code = qStreamExtractOffset(task, &pRsp->common.rspOffset);
break;
}
if (pRsp->common.blockNum > 0) {
tqDebug("tmqsnap task exec exited, get data");
qStreamExtractOffset(task, &pRsp->common.rspOffset);
code = qStreamExtractOffset(task, &pRsp->common.rspOffset);
break;
}
}
return 0;
return code;
}

View File

@ -48,7 +48,7 @@ typedef struct {
static int32_t getCurrentBlockInfo(SDataBlockIter* pBlockIter, SFileDataBlockInfo** pInfo);
static int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity,
STsdbReader* pReader);
static int32_t getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader, TSDBROW** pRes);
static void getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader, TSDBROW** pRes);
static int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, SRowKey* pKey,
STsdbReader* pReader);
static int32_t doMergeRowsInSttBlock(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pScanInfo,
@ -3866,11 +3866,11 @@ bool hasBeenDropped(const SArray* pDelList, int32_t* index, int64_t key, int64_t
return false;
}
FORCE_INLINE int32_t getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader, TSDBROW** pRes) {
FORCE_INLINE void getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader, TSDBROW** pRes) {
*pRes = NULL;
if (!pIter->hasVal) {
return TSDB_CODE_SUCCESS;
return;
}
int32_t order = pReader->info.order;
@ -3880,20 +3880,20 @@ FORCE_INLINE int32_t getValidMemRow(SIterInfo* pIter, const SArray* pDelList, ST
TSDBROW_INIT_KEY(pRow, key);
if (outOfTimeWindow(key.ts, &pReader->info.window)) {
pIter->hasVal = false;
return TSDB_CODE_SUCCESS;
return;
}
// it is a valid data version
if (key.version <= pReader->info.verRange.maxVer && key.version >= pReader->info.verRange.minVer) {
if (pDelList == NULL || TARRAY_SIZE(pDelList) == 0) {
*pRes = pRow;
return TSDB_CODE_SUCCESS;
return;
} else {
bool dropped = hasBeenDropped(pDelList, &pIter->index, key.ts, key.version, order, &pReader->info.verRange,
pReader->suppInfo.numOfPks > 0);
if (!dropped) {
*pRes = pRow;
return TSDB_CODE_SUCCESS;
return;
}
}
}
@ -3901,7 +3901,7 @@ FORCE_INLINE int32_t getValidMemRow(SIterInfo* pIter, const SArray* pDelList, ST
while (1) {
pIter->hasVal = tsdbTbDataIterNext(pIter->iter);
if (!pIter->hasVal) {
return TSDB_CODE_SUCCESS;
return;
}
pRow = tsdbTbDataIterGet(pIter->iter);
@ -3909,19 +3909,19 @@ FORCE_INLINE int32_t getValidMemRow(SIterInfo* pIter, const SArray* pDelList, ST
TSDBROW_INIT_KEY(pRow, key);
if (outOfTimeWindow(key.ts, &pReader->info.window)) {
pIter->hasVal = false;
return TSDB_CODE_SUCCESS;
return;
}
if (key.version <= pReader->info.verRange.maxVer && key.version >= pReader->info.verRange.minVer) {
if (pDelList == NULL || TARRAY_SIZE(pDelList) == 0) {
*pRes = pRow;
return TSDB_CODE_SUCCESS;
return;
} else {
bool dropped = hasBeenDropped(pDelList, &pIter->index, key.ts, key.version, order, &pReader->info.verRange,
pReader->suppInfo.numOfPks > 0);
if (!dropped) {
*pRes = pRow;
return TSDB_CODE_SUCCESS;
return;
}
}
}

View File

@ -1002,7 +1002,7 @@ int32_t doDeleteTimeWindows(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock, S
int32_t getNextQualifiedWindow(SInterval* pInterval, STimeWindow* pNext, SDataBlockInfo* pDataBlockInfo,
TSKEY* primaryKeys, int32_t prevPosition, int32_t order);
void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoData* p, int32_t status);
int32_t extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoData* p, int32_t status);
#ifdef __cplusplus
}

View File

@ -295,7 +295,7 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int3
return NULL;
}
createRawScanOperatorInfo(pReaderHandle, pTaskInfo, &pTaskInfo->pRoot);
code = createRawScanOperatorInfo(pReaderHandle, pTaskInfo, &pTaskInfo->pRoot);
if (NULL == pTaskInfo->pRoot || code != 0) {
taosMemoryFree(pTaskInfo);
return NULL;
@ -1158,9 +1158,9 @@ SMqBatchMetaRsp* qStreamExtractMetaMsg(qTaskInfo_t tinfo) {
return &pTaskInfo->streamInfo.btMetaRsp;
}
void qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset) {
int32_t qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
tOffsetCopy(pOffset, &pTaskInfo->streamInfo.currentOffset);
return tOffsetCopy(pOffset, &pTaskInfo->streamInfo.currentOffset);
}
int32_t initQueryTableDataCondForTmq(SQueryTableDataCond* pCond, SSnapContext* sContext, SMetaTableInfo* pMtInfo) {
@ -1231,7 +1231,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
}
if (subType == TOPIC_SUB_TYPE__COLUMN) {
extractOperatorInTree(pOperator, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, id, &pOperator);
code = extractOperatorInTree(pOperator, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, id, &pOperator);
if (pOperator == NULL || code != 0) {
return code;
}
@ -1431,8 +1431,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
}
end:
tOffsetCopy(&pTaskInfo->streamInfo.currentOffset, pOffset);
(void) tOffsetCopy(&pTaskInfo->streamInfo.currentOffset, pOffset);
return 0;
}

View File

@ -567,7 +567,8 @@ int32_t doFilter(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SColMatchInfo* p
code = filterExecute(pFilterInfo, pBlock, &p, NULL, param1.numOfCols, &status);
QUERY_CHECK_CODE(code, lino, _err);
extractQualifiedTupleByFilterResult(pBlock, p, status);
code = extractQualifiedTupleByFilterResult(pBlock, p, status);
QUERY_CHECK_CODE(code, lino, _err);
if (pColMatchInfo != NULL) {
size_t size = taosArrayGetSize(pColMatchInfo->pList);
@ -591,18 +592,21 @@ _err:
return code;
}
void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoData* p, int32_t status) {
int32_t extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoData* p, int32_t status) {
int32_t code = 0;
int8_t* pIndicator = (int8_t*)p->pData;
if (status == FILTER_RESULT_ALL_QUALIFIED) {
// here nothing needs to be done
} else if (status == FILTER_RESULT_NONE_QUALIFIED) {
trimDataBlock(pBlock, pBlock->info.rows, NULL);
code = trimDataBlock(pBlock, pBlock->info.rows, NULL);
pBlock->info.rows = 0;
} else if (status == FILTER_RESULT_PARTIAL_QUALIFIED) {
trimDataBlock(pBlock, pBlock->info.rows, (bool*)pIndicator);
code = trimDataBlock(pBlock, pBlock->info.rows, (bool*)pIndicator);
} else {
qError("unknown filter result type: %d", status);
}
return code;
}
void doUpdateNumOfRows(SqlFunctionCtx* pCtx, SResultRow* pRow, int32_t numOfExprs, const int32_t* rowEntryOffset) {

View File

@ -308,12 +308,9 @@ int32_t mJoinFilterAndMarkHashRows(SSDataBlock* pBlock, SFilterInfo* pFilterInfo
}
}
extractQualifiedTupleByFilterResult(pBlock, p, status);
code = TSDB_CODE_SUCCESS;
code = extractQualifiedTupleByFilterResult(pBlock, p, status);
_err:
colDataDestroy(p);
taosMemoryFree(p);
@ -375,12 +372,9 @@ int32_t mJoinFilterAndMarkRows(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SM
}
}
extractQualifiedTupleByFilterResult(pBlock, p, status);
code = TSDB_CODE_SUCCESS;
code = extractQualifiedTupleByFilterResult(pBlock, p, status);
_return:
colDataDestroy(p);
taosMemoryFree(p);