diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 927bb3a802..c5c0553abe 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -7356,6 +7356,7 @@ int32_t tSerializeSMqPollReq(void *buf, int32_t bufLen, SMqPollReq *pReq) { if (tEncodeSTqOffsetVal(&encoder, &pReq->reqOffset) < 0) return -1; if (tEncodeI8(&encoder, pReq->enableReplay) < 0) return -1; if (tEncodeI8(&encoder, pReq->sourceExcluded) < 0) return -1; + if (tEncodeI8(&encoder, pReq->enableBatchMeta) < 0) return -1; tEndEncode(&encoder); @@ -7367,7 +7368,6 @@ int32_t tSerializeSMqPollReq(void *buf, int32_t bufLen, SMqPollReq *pReq) { pHead->vgId = htonl(pReq->head.vgId); pHead->contLen = htonl(tlen + headLen); } - if (tEncodeI8(&encoder, pReq->enableBatchMeta) < 0) return -1; return tlen + headLen; } diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index 03dd0c4581..668d40dd0b 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -1002,7 +1002,7 @@ int32_t doDeleteTimeWindows(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock, S int32_t getNextQualifiedWindow(SInterval* pInterval, STimeWindow* pNext, SDataBlockInfo* pDataBlockInfo, TSKEY* primaryKeys, int32_t prevPosition, int32_t order); -void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoData* p, int32_t status); +int32_t extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoData* p, int32_t status); #ifdef __cplusplus } diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index dc910888ad..c4cc49e8ea 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -295,7 +295,7 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int3 return NULL; } - createRawScanOperatorInfo(pReaderHandle, pTaskInfo, &pTaskInfo->pRoot); + code = createRawScanOperatorInfo(pReaderHandle, pTaskInfo, &pTaskInfo->pRoot); if (NULL == pTaskInfo->pRoot || code != 0) { taosMemoryFree(pTaskInfo); return NULL; @@ -1160,7 +1160,12 @@ SMqBatchMetaRsp* qStreamExtractMetaMsg(qTaskInfo_t tinfo) { void qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; - tOffsetCopy(pOffset, &pTaskInfo->streamInfo.currentOffset); + int32_t code = tOffsetCopy(pOffset, &pTaskInfo->streamInfo.currentOffset); + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); + pTaskInfo->code = code; + T_LONG_JMP(pTaskInfo->env, code); + } } int32_t initQueryTableDataCondForTmq(SQueryTableDataCond* pCond, SSnapContext* sContext, SMetaTableInfo* pMtInfo) { @@ -1231,7 +1236,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT } if (subType == TOPIC_SUB_TYPE__COLUMN) { - extractOperatorInTree(pOperator, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, id, &pOperator); + code = extractOperatorInTree(pOperator, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, id, &pOperator); if (pOperator == NULL || code != 0) { return code; } diff --git a/source/libs/executor/src/executorInt.c b/source/libs/executor/src/executorInt.c index d56b288129..bceefd2c0d 100644 --- a/source/libs/executor/src/executorInt.c +++ b/source/libs/executor/src/executorInt.c @@ -75,7 +75,7 @@ static UNUSED_FUNC void* u_realloc(void* p, size_t __size) { static int32_t setBlockSMAInfo(SqlFunctionCtx* pCtx, SExprInfo* pExpr, SSDataBlock* pBlock); static int32_t initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size); -static void doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t order, int32_t scanFlag); +static void doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t order, int32_t scanFlag); static int32_t doSetInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order, int32_t scanFlag, bool createDummyCol); @@ -193,7 +193,7 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR // add a new result set for a new group SResultRowPosition pos = {.pageId = pResult->pageId, .offset = pResult->offset}; int32_t code = tSimpleHashPut(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), &pos, - sizeof(SResultRowPosition)); + sizeof(SResultRowPosition)); if (code != TSDB_CODE_SUCCESS) { qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); T_LONG_JMP(pTaskInfo->env, code); @@ -522,7 +522,7 @@ int32_t setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t n if (!pResInfo->initialized) { if (pCtx[i].functionId != -1) { int32_t code = pCtx[i].fpSet.init(&pCtx[i], pResInfo); - if (code != TSDB_CODE_SUCCESS && fmIsUserDefinedFunc(pCtx[i].functionId)){ + if (code != TSDB_CODE_SUCCESS && fmIsUserDefinedFunc(pCtx[i].functionId)) { pResInfo->initialized = false; return TSDB_CODE_UDF_FUNC_EXEC_FAILURE; } @@ -567,7 +567,8 @@ int32_t doFilter(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SColMatchInfo* p code = filterExecute(pFilterInfo, pBlock, &p, NULL, param1.numOfCols, &status); QUERY_CHECK_CODE(code, lino, _err); - extractQualifiedTupleByFilterResult(pBlock, p, status); + code = extractQualifiedTupleByFilterResult(pBlock, p, status); + QUERY_CHECK_CODE(code, lino, _err); if (pColMatchInfo != NULL) { size_t size = taosArrayGetSize(pColMatchInfo->pList); @@ -591,18 +592,20 @@ _err: return code; } -void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoData* p, int32_t status) { +int32_t extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoData* p, int32_t status) { + int32_t code = TSDB_CODE_SUCCESS; int8_t* pIndicator = (int8_t*)p->pData; if (status == FILTER_RESULT_ALL_QUALIFIED) { // here nothing needs to be done } else if (status == FILTER_RESULT_NONE_QUALIFIED) { - trimDataBlock(pBlock, pBlock->info.rows, NULL); + code = trimDataBlock(pBlock, pBlock->info.rows, NULL); pBlock->info.rows = 0; } else if (status == FILTER_RESULT_PARTIAL_QUALIFIED) { - trimDataBlock(pBlock, pBlock->info.rows, (bool*)pIndicator); + code = trimDataBlock(pBlock, pBlock->info.rows, (bool*)pIndicator); } else { qError("unknown filter result type: %d", status); } + return code; } void doUpdateNumOfRows(SqlFunctionCtx* pCtx, SResultRow* pRow, int32_t numOfExprs, const int32_t* rowEntryOffset) { @@ -639,7 +642,7 @@ void copyResultrowToDataBlock(SExprInfo* pExprInfo, int32_t numOfExprs, SResultR pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset); if (pCtx[j].fpSet.finalize) { if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_group_key") == 0 || - strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_group_const_value") == 0) { + strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_group_const_value") == 0) { // for groupkey along with functions that output multiple lines(e.g. Histogram) // need to match groupkey result for each output row of that function. if (pCtx[j].resultInfo->numOfRes != 0) { @@ -678,7 +681,7 @@ _end: // todo refactor. SResultRow has direct pointer in miainfo void finalizeResultRows(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition, SExprSupp* pSup, - SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) { + SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) { SFilePage* page = getBufPage(pBuf, resultRowPosition->pageId); if (page == NULL) { qError("failed to get buffer, code:%s, %s", tstrerror(terrno), GET_TASKID(pTaskInfo)); @@ -694,7 +697,7 @@ void finalizeResultRows(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPositi doUpdateNumOfRows(pCtx, pRow, pSup->numOfExprs, rowEntryOffset); if (pRow->numOfRows == 0) { releaseBufPage(pBuf, page); - return ; + return; } int32_t size = pBlock->info.capacity; diff --git a/source/libs/executor/src/mergejoinoperator.c b/source/libs/executor/src/mergejoinoperator.c index 542f161a80..8db4c6335f 100644 --- a/source/libs/executor/src/mergejoinoperator.c +++ b/source/libs/executor/src/mergejoinoperator.c @@ -308,7 +308,10 @@ int32_t mJoinFilterAndMarkHashRows(SSDataBlock* pBlock, SFilterInfo* pFilterInfo } } - extractQualifiedTupleByFilterResult(pBlock, p, status); + code = extractQualifiedTupleByFilterResult(pBlock, p, status); + if (code != TSDB_CODE_SUCCESS) { + goto _err; + } code = TSDB_CODE_SUCCESS; @@ -375,7 +378,10 @@ int32_t mJoinFilterAndMarkRows(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SM } } - extractQualifiedTupleByFilterResult(pBlock, p, status); + code = extractQualifiedTupleByFilterResult(pBlock, p, status); + if (code != TSDB_CODE_SUCCESS) { + goto _return; + } code = TSDB_CODE_SUCCESS; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 6755f131b6..acc3de3447 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1311,7 +1311,8 @@ static void destroyTableScanOperatorInfo(void* param) { } int32_t createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle, - STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) { + STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo, + SOperatorInfo** pOptrInfo) { QRY_OPTR_CHECK(pOptrInfo); int32_t code = TSDB_CODE_SUCCESS; @@ -1404,7 +1405,7 @@ _error: int32_t createTableSeqScanOperatorInfo(void* pReadHandle, SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) { QRY_OPTR_CHECK(pOptrInfo); - int32_t code = 0; + int32_t code = 0; STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { @@ -1422,7 +1423,7 @@ int32_t createTableSeqScanOperatorInfo(void* pReadHandle, SExecTaskInfo* pTaskIn *pOptrInfo = pOperator; return code; - _end: +_end: if (pInfo != NULL) { taosMemoryFree(pInfo); } @@ -2413,10 +2414,14 @@ _end: return code; } -static void doBlockDataWindowFilter(SSDataBlock* pBlock, int32_t tsIndex, STimeWindow* pWindow, const char* id) { +static int32_t doBlockDataWindowFilter(SSDataBlock* pBlock, int32_t tsIndex, STimeWindow* pWindow, const char* id) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + bool* p = NULL; if (pWindow->skey != INT64_MIN || pWindow->ekey != INT64_MAX) { - bool* p = taosMemoryCalloc(pBlock->info.rows, sizeof(bool)); - bool hasUnqualified = false; + p = taosMemoryCalloc(pBlock->info.rows, sizeof(bool)); + QUERY_CHECK_NULL(p, code, lino, _end, terrno); + bool hasUnqualified = false; SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, tsIndex); @@ -2445,19 +2450,28 @@ static void doBlockDataWindowFilter(SSDataBlock* pBlock, int32_t tsIndex, STimeW } if (hasUnqualified) { - trimDataBlock(pBlock, pBlock->info.rows, p); + code = trimDataBlock(pBlock, pBlock->info.rows, p); + QUERY_CHECK_CODE(code, lino, _end); } - - taosMemoryFree(p); } + +_end: + taosMemoryFree(p); + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + return code; } -static void doBlockDataPrimaryKeyFilter(SSDataBlock* pBlock, STqOffsetVal* offset) { +static int32_t doBlockDataPrimaryKeyFilter(SSDataBlock* pBlock, STqOffsetVal* offset) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; if (pBlock->info.window.skey != offset->ts || offset->primaryKey.type == 0) { - return; + return code; } bool* p = taosMemoryCalloc(pBlock->info.rows, sizeof(bool)); - bool hasUnqualified = false; + QUERY_CHECK_NULL(p, code, lino, _end, terrno); + bool hasUnqualified = false; SColumnInfoData* pColTs = taosArrayGet(pBlock->pDataBlock, 0); SColumnInfoData* pColPk = taosArrayGet(pBlock->pDataBlock, 1); @@ -2485,16 +2499,26 @@ static void doBlockDataPrimaryKeyFilter(SSDataBlock* pBlock, STqOffsetVal* offse } if (hasUnqualified) { - trimDataBlock(pBlock, pBlock->info.rows, p); + code = trimDataBlock(pBlock, pBlock->info.rows, p); + QUERY_CHECK_CODE(code, lino, _end); } +_end: taosMemoryFree(p); + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + return code; } // re-build the delete block, ONLY according to the split timestamp -static void rebuildDeleteBlockData(SSDataBlock* pBlock, STimeWindow* pWindow, const char* id) { +static int32_t rebuildDeleteBlockData(SSDataBlock* pBlock, STimeWindow* pWindow, const char* id) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; int32_t numOfRows = pBlock->info.rows; bool* p = taosMemoryCalloc(numOfRows, sizeof(bool)); + QUERY_CHECK_NULL(p, code, lino, _end, terrno); + bool hasUnqualified = false; int64_t skey = pWindow->skey; int64_t ekey = pWindow->ekey; @@ -2531,14 +2555,20 @@ static void rebuildDeleteBlockData(SSDataBlock* pBlock, STimeWindow* pWindow, co } if (hasUnqualified) { - trimDataBlock(pBlock, pBlock->info.rows, p); + code = trimDataBlock(pBlock, pBlock->info.rows, p); qDebug("%s re-build delete datablock, start key revised to:%" PRId64 ", rows:%" PRId64, id, skey, pBlock->info.rows); + QUERY_CHECK_CODE(code, lino, _end); } else { qDebug("%s not update the delete block", id); } +_end: taosMemoryFree(p); + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + return code; } static int32_t colIdComparFn(const void* param1, const void* param2) { @@ -2657,7 +2687,8 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock } // filter the block extracted from WAL files, according to the time window apply additional time window filter - doBlockDataWindowFilter(pInfo->pRes, pInfo->primaryTsIndex, pTimeWindow, id); + code = doBlockDataWindowFilter(pInfo->pRes, pInfo->primaryTsIndex, pTimeWindow, id); + QUERY_CHECK_CODE(code, lino, _end); pInfo->pRes->info.dataLoad = 1; code = blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex); @@ -2676,14 +2707,19 @@ _end: return code; } -static void processPrimaryKey(SSDataBlock* pBlock, bool hasPrimaryKey, STqOffsetVal* offset) { - SValue val = {0}; +static int32_t processPrimaryKey(SSDataBlock* pBlock, bool hasPrimaryKey, STqOffsetVal* offset) { + int32_t code = TSDB_CODE_SUCCESS; + SValue val = {0}; if (hasPrimaryKey) { - doBlockDataPrimaryKeyFilter(pBlock, offset); + code = doBlockDataPrimaryKeyFilter(pBlock, offset); + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); + return code; + } SColumnInfoData* pColPk = taosArrayGet(pBlock->pDataBlock, 1); if (pBlock->info.rows < 1) { - return; + return code; } void* tmp = colDataGetData(pColPk, pBlock->info.rows - 1); val.type = pColPk->info.type; @@ -2696,6 +2732,7 @@ static void processPrimaryKey(SSDataBlock* pBlock, bool hasPrimaryKey, STqOffset } } tqOffsetResetToData(offset, pBlock->info.id.uid, pBlock->info.window.ekey, val); + return code; } static int32_t doQueueScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { @@ -2711,7 +2748,7 @@ static int32_t doQueueScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { if (isTaskKilled(pTaskInfo)) { (*ppRes) = NULL; - return code; + return pTaskInfo->code; } if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__SNAPSHOT_DATA) { @@ -2720,9 +2757,9 @@ static int32_t doQueueScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { if (pResult && pResult->info.rows > 0) { bool hasPrimaryKey = pAPI->tqReaderFn.tqGetTablePrimaryKey(pInfo->tqReader); - processPrimaryKey(pResult, hasPrimaryKey, &pTaskInfo->streamInfo.currentOffset); - qDebug("tmqsnap doQueueScan get data uid:%" PRId64 "", pResult->info.id.uid); - if (pResult->info.rows > 0) { + code = processPrimaryKey(pResult, hasPrimaryKey, &pTaskInfo->streamInfo.currentOffset); + qDebug("tmqsnap doQueueScan get data utid:%" PRId64 "", pResult->info.id.uid); + if (pResult->info.rows > 0 || code != TSDB_CODE_SUCCESS) { (*ppRes) = pResult; return code; } @@ -3156,7 +3193,8 @@ FETCH_NEXT_BLOCK: code = setBlockGroupIdByUid(pInfo, pDelBlock); QUERY_CHECK_CODE(code, lino, _end); - rebuildDeleteBlockData(pDelBlock, &pStreamInfo->fillHistoryWindow, id); + code = rebuildDeleteBlockData(pDelBlock, &pStreamInfo->fillHistoryWindow, id); + QUERY_CHECK_CODE(code, lino, _end); printSpecDataBlock(pDelBlock, getStreamOpName(pOperator->operatorType), "delete recv filtered", GET_TASKID(pTaskInfo)); if (pDelBlock->info.rows == 0) { @@ -3479,7 +3517,7 @@ static int32_t doRawScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { if (pBlock && pBlock->info.rows > 0) { bool hasPrimaryKey = pAPI->snapshotFn.taosXGetTablePrimaryKey(pInfo->sContext); - processPrimaryKey(pBlock, hasPrimaryKey, &pTaskInfo->streamInfo.currentOffset); + code = processPrimaryKey(pBlock, hasPrimaryKey, &pTaskInfo->streamInfo.currentOffset); qDebug("tmqsnap doRawScan get data uid:%" PRId64 "", pBlock->info.id.uid); (*ppRes) = pBlock; return code; @@ -3493,7 +3531,7 @@ static int32_t doRawScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { tDeleteSchemaWrapper(mtInfo.schema); goto _end; } - STqOffsetVal offset = {0}; + STqOffsetVal offset = {0}; if (mtInfo.uid == 0 || pInfo->sContext->withMeta == ONLY_META) { // read snapshot done, change to get data from wal qDebug("tmqsnap read snapshot done, change to get data from wal"); tqOffsetResetToLog(&offset, pInfo->sContext->snapVersion + 1); @@ -3964,7 +4002,7 @@ int32_t createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* QUERY_CHECK_CODE(code, lino, _error); pInfo->updateWin = (STimeWindow){.skey = INT64_MAX, .ekey = INT64_MAX}; - createSpecialDataBlock(STREAM_CLEAR, &pInfo->pUpdateDataRes); + code = createSpecialDataBlock(STREAM_CLEAR, &pInfo->pUpdateDataRes); QUERY_CHECK_CODE(code, lino, _error); if (hasPrimaryKeyCol(pInfo)) { @@ -4123,7 +4161,7 @@ static EDealRes tagScanRewriteTagColumn(SNode** pNode, void* pContext) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; STagScanFilterContext* pCtx = (STagScanFilterContext*)pContext; - SColumnNode* pSColumnNode = NULL; + SColumnNode* pSColumnNode = NULL; if (QUERY_NODE_COLUMN == nodeType((*pNode))) { pSColumnNode = *(SColumnNode**)pNode; } else if (QUERY_NODE_FUNCTION == nodeType((*pNode))) { @@ -4484,8 +4522,8 @@ static void destroyTagScanOperatorInfo(void* param) { } int32_t createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pTagScanNode, - STableListInfo* pTableListInfo, SNode* pTagCond, SNode* pTagIndexCond, - SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) { + STableListInfo* pTableListInfo, SNode* pTagCond, SNode* pTagIndexCond, + SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) { QRY_OPTR_CHECK(pOptrInfo); int32_t code = TSDB_CODE_SUCCESS; @@ -5083,7 +5121,7 @@ _end: static SSDataBlock* doTableMergeScanParaSubTables(SOperatorInfo* pOperator) { SSDataBlock* pRes = NULL; - int32_t code = doTableMergeScanParaSubTablesNext(pOperator, &pRes); + int32_t code = doTableMergeScanParaSubTablesNext(pOperator, &pRes); return pRes; } @@ -5317,7 +5355,7 @@ int32_t startDurationForGroupTableMergeScan(SOperatorInfo* pOperator) { pInfo->pSortHandle = NULL; code = tsortCreateSortHandle(pInfo->pSortInfo, SORT_BLOCK_TS_MERGE, pInfo->bufPageSize, numOfBufPage, - pInfo->pSortInputBlock, pTaskInfo->id.str, 0, 0, 0, &pInfo->pSortHandle); + pInfo->pSortInputBlock, pTaskInfo->id.str, 0, 0, 0, &pInfo->pSortHandle); if (code) { return code; } @@ -5569,7 +5607,7 @@ _end: static SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) { SSDataBlock* pRes = NULL; - int32_t code = doTableMergeScanNext(pOperator, &pRes); + int32_t code = doTableMergeScanNext(pOperator, &pRes); return pRes; } @@ -5626,10 +5664,11 @@ int32_t getTableMergeScanExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExpla } int32_t createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle, - STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) { + STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo, + SOperatorInfo** pOptrInfo) { QRY_OPTR_CHECK(pOptrInfo); - int32_t code = 0; + int32_t code = 0; STableMergeScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableMergeScanInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { @@ -5641,7 +5680,7 @@ int32_t createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SR int32_t numOfCols = 0; code = extractColMatchInfo(pTableScanNode->scan.pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID, - &pInfo->base.matchInfo); + &pInfo->base.matchInfo); int32_t lino = 0; if (code != TSDB_CODE_SUCCESS) { goto _error; @@ -5867,7 +5906,7 @@ int32_t getTableCountScanSupp(SNodeList* groupTags, SName* tableName, SNodeList* } int32_t createTableCountScanOperatorInfo(SReadHandle* readHandle, STableCountScanPhysiNode* pTblCountScanNode, - SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) { + SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) { QRY_OPTR_CHECK(pOptrInfo); int32_t code = TSDB_CODE_SUCCESS; @@ -6040,7 +6079,7 @@ _end: } static int32_t doTableCountScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { - int32_t code = TSDB_CODE_SUCCESS; + int32_t code = TSDB_CODE_SUCCESS; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; STableCountScanOperatorInfo* pInfo = pOperator->info; STableCountScanSupp* pSupp = &pInfo->supp; @@ -6062,7 +6101,7 @@ static int32_t doTableCountScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRe static SSDataBlock* doTableCountScan(SOperatorInfo* pOperator) { SSDataBlock* pRes = NULL; - int32_t code = doTableCountScanNext(pOperator, &pRes); + int32_t code = doTableCountScanNext(pOperator, &pRes); return pRes; } diff --git a/source/libs/executor/src/streameventwindowoperator.c b/source/libs/executor/src/streameventwindowoperator.c index 76eaccb4ec..17ef2fe41f 100644 --- a/source/libs/executor/src/streameventwindowoperator.c +++ b/source/libs/executor/src/streameventwindowoperator.c @@ -510,18 +510,16 @@ int32_t doStreamEventDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOpera int32_t mapSize = 0; buf = taosDecodeFixedI32(buf, &mapSize); for (int32_t i = 0; i < mapSize; i++) { - SSessionKey key = {0}; SResultWindowInfo winfo = {0}; - buf = decodeSSessionKey(buf, &key); + buf = decodeSSessionKey(buf, &winfo.sessionWin); int32_t winCode = TSDB_CODE_SUCCESS; code = pAggSup->stateStore.streamStateSessionAddIfNotExist( pAggSup->pState, &winfo.sessionWin, pAggSup->gap, (void**)&winfo.pStatePos, &pAggSup->resultRowSize, &winCode); QUERY_CHECK_CODE(code, lino, _end); - ASSERT(winCode == TSDB_CODE_SUCCESS); buf = decodeSResultWindowInfo(buf, &winfo, pInfo->streamAggSup.resultRowSize); code = - tSimpleHashPut(pInfo->streamAggSup.pResultRows, &key, sizeof(SSessionKey), &winfo, sizeof(SResultWindowInfo)); + tSimpleHashPut(pInfo->streamAggSup.pResultRows, &winfo.sessionWin, sizeof(SSessionKey), &winfo, sizeof(SResultWindowInfo)); QUERY_CHECK_CODE(code, lino, _end); } diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index aebc2d9c97..7462d71a8a 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -5183,7 +5183,7 @@ int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pInfo->pDelWins = taosArrayInit(4, sizeof(SWinKey)); pInfo->delIndex = 0; - createSpecialDataBlock(STREAM_DELETE_RESULT, &pInfo->pDelRes); + code = createSpecialDataBlock(STREAM_DELETE_RESULT, &pInfo->pDelRes); QUERY_CHECK_CODE(code, lino, _error); initResultRowInfo(&pInfo->binfo.resultRowInfo); diff --git a/tests/script/tsim/stream/session0.sim b/tests/script/tsim/stream/session0.sim index 38adae4261..0d92070d10 100644 --- a/tests/script/tsim/stream/session0.sim +++ b/tests/script/tsim/stream/session0.sim @@ -189,11 +189,11 @@ run tsim/stream/checkTaskStatus.sim sql insert into t2 values(1648791213001,1,1,3,1.0,1); sql insert into t2 values(1648791213002,2,2,6,3.4,2); sql insert into t2 values(1648791213003,4,9,3,4.8,3); -sql insert into t2 values(1648791233003,3,4,3,2.1,4); -sql insert into t2 values(1648791233004,3,5,3,3.4,5); -sql insert into t2 values(1648791233005,3,6,3,7.6,6); -# +sql insert into t2 values(1648791233003,3,4,3,2.1,4) (1648791233004,3,5,3,3.4,5) (1648791233005,3,6,3,7.6,6); + +sleep 1000 + sql insert into t2 values(1648791223003,20,7,3,10.1,7); $loop_count = 0