diff --git a/docs/en/12-taos-sql/06-select.md b/docs/en/12-taos-sql/06-select.md index 33236c0173..8bfdaeb9c8 100755 --- a/docs/en/12-taos-sql/06-select.md +++ b/docs/en/12-taos-sql/06-select.md @@ -65,16 +65,10 @@ interp_clause: RANGE(ts_val [, ts_val]) EVERY(every_val) FILL(fill_mod_and_val) partition_by_clause: - PARTITION BY partition_by_expr [, partition_by_expr] ... - -partition_by_expr: - {expr | position | c_alias} + PARTITION BY expr [, expr] ... group_by_clause: - GROUP BY group_by_expr [, group_by_expr] ... HAVING condition - -group_by_expr: - {expr | position | c_alias} + GROUP BY expr [, expr] ... HAVING condition order_by_clasue: ORDER BY order_expr [, order_expr] ... @@ -280,13 +274,7 @@ If you use a GROUP BY clause, the SELECT list can only include the following ite The GROUP BY clause groups each row of data by the value of the expression following the clause and returns a combined result for each group. -In the GROUP BY clause, columns from a table or view can be grouped by specifying the column name. These columns do not need to be included in the SELECT list. - -You can specify integers in GROUP BY expression to indicate the expressions in the select list used for grouping. For example, 1 indicates the first item in the select list. - -You can specify column names in result set to indicate the expressions in the select list used for grouping. - -When using position and result set column names for grouping in the GROUP BY clause, the corresponding expressions in the select list must not be aggregate functions. +The expressions in a GROUP BY clause can include any column in any table or view. It is not necessary that the expressions appear in the SELECT list. The GROUP BY clause does not guarantee that the results are ordered. If you want to ensure that grouped data is ordered, use the ORDER BY clause. diff --git a/docs/en/12-taos-sql/12-distinguished.md b/docs/en/12-taos-sql/12-distinguished.md index 8eecb706c0..bfc9ca32c0 100644 --- a/docs/en/12-taos-sql/12-distinguished.md +++ b/docs/en/12-taos-sql/12-distinguished.md @@ -102,7 +102,6 @@ The detailed beaviors of `NULL`, `NULL_F`, `VALUE`, and VALUE_F are described be 1. A huge volume of interpolation output may be returned using `FILL`, so it's recommended to specify the time range when using `FILL`. The maximum number of interpolation values that can be returned in a single query is 10,000,000. 2. The result set is in ascending order of timestamp when you aggregate by time window. 3. If aggregate by window is used on STable, the aggregate function is performed on all the rows matching the filter conditions. If `PARTITION BY` is not used in the query, the result set will be returned in strict ascending order of timestamp; otherwise the result set will be returned in the order of ascending timestamp in each group. -4. The output windows of Fill are related with time range of WHERE Clause. For asc fill, the first output window is the first window that conains the start time of WHERE clause. The last output window is the last window that contains the end time of WHERE clause. ::: diff --git a/docs/zh/12-taos-sql/06-select.md b/docs/zh/12-taos-sql/06-select.md index af19559c81..f10c5ebb69 100755 --- a/docs/zh/12-taos-sql/06-select.md +++ b/docs/zh/12-taos-sql/06-select.md @@ -65,16 +65,10 @@ interp_clause: RANGE(ts_val [, ts_val]) EVERY(every_val) FILL(fill_mod_and_val) partition_by_clause: - PARTITION BY partition_by_expr [, partition_by_expr] ... - -partition_by_expr: - {expr | position | c_alias} + PARTITION BY expr [, expr] ... group_by_clause: - GROUP BY group_by_expr [, group_by_expr] ... HAVING condition - -group_by_expr: - {expr | position | c_alias} + GROUP BY expr [, expr] ... HAVING condition order_by_clasue: ORDER BY order_expr [, order_expr] ... @@ -280,13 +274,7 @@ TDengine 支持基于时间戳主键的 INNER JOIN,规则如下: GROUP BY 子句对每行数据按 GROUP BY 后的表达式的值进行分组,并为每个组返回一行汇总信息。 -GROUP BY 子句中可以通过指定表或视图的列名来按照表或视图中的任何列分组,这些列不需要出现在 SELECT 列表中。 - -GROUP BY 子句中可以使用位置语法,位置标识为正整数,从 1 开始,表示使用 SELECT 列表的第几个表达式进行分组。 - -GROUP BY 子句中可以使用结果集列名,表示使用 SELECT 列表的指定表达式进行分组。 - -GROUP BY 子句中在使用位置语法和结果集列名进行分组时,其对应的 SELECT 列表中的表达式不能是聚集函数。 +GROUP BY 子句中的表达式可以包含表或视图中的任何列,这些列不需要出现在 SELECT 列表中。 该子句对行进行分组,但不保证结果集的顺序。若要对分组进行排序,请使用 ORDER BY 子句 diff --git a/docs/zh/12-taos-sql/12-distinguished.md b/docs/zh/12-taos-sql/12-distinguished.md index 50bf36d2e1..0eaeb0dfa7 100755 --- a/docs/zh/12-taos-sql/12-distinguished.md +++ b/docs/zh/12-taos-sql/12-distinguished.md @@ -97,7 +97,6 @@ NULL, NULL_F, VALUE, VALUE_F 这几种填充模式针对不同场景区别如下 1. 使用 FILL 语句的时候可能生成大量的填充输出,务必指定查询的时间区间。针对每次查询,系统可返回不超过 1 千万条具有插值的结果。 2. 在时间维度聚合中,返回的结果中时间序列严格单调递增。 3. 如果查询对象是超级表,则聚合函数会作用于该超级表下满足值过滤条件的所有表的数据。如果查询中没有使用 PARTITION BY 语句,则返回的结果按照时间序列严格单调递增;如果查询中使用了 PARTITION BY 语句分组,则返回结果中每个 PARTITION 内按照时间序列严格单调递增。 -4. Fill输出的起始和结束窗口与WHERE条件的时间范围有关, 如增序Fill时, 第一个窗口是包含WHERE条件开始时间的第一个窗口, 最后一个窗口是包含WHERE条件结束时间的最后一个窗口。 ::: diff --git a/source/libs/executor/src/aggregateoperator.c b/source/libs/executor/src/aggregateoperator.c index 7c63120fcf..a6197370e3 100644 --- a/source/libs/executor/src/aggregateoperator.c +++ b/source/libs/executor/src/aggregateoperator.c @@ -90,6 +90,7 @@ int32_t createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pA pOperator->exprSupp.hasWindowOrGroup = false; SSDataBlock* pResBlock = createDataBlockFromDescNode(pAggNode->node.pOutputDataBlockDesc); + QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno); initBasicInfo(&pInfo->binfo, pResBlock); size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; @@ -140,6 +141,9 @@ int32_t createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pA return code; _error: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } if (pInfo != NULL) { destroyAggOperatorInfo(pInfo); } @@ -480,6 +484,10 @@ int32_t addNewResultRowBuf(SResultRow* pWindowRes, SDiskbasedBuf* pResultBuf, ui if (taosArrayGetSize(list) == 0) { pData = getNewBufPage(pResultBuf, &pageId); + if (pData == NULL) { + qError("failed to get buffer, code:%s", tstrerror(terrno)); + return terrno; + } pData->num = sizeof(SFilePage); } else { SPageInfo* pi = getLastPageInfo(list); @@ -496,9 +504,11 @@ int32_t addNewResultRowBuf(SResultRow* pWindowRes, SDiskbasedBuf* pResultBuf, ui releaseBufPageInfo(pResultBuf, pi); pData = getNewBufPage(pResultBuf, &pageId); - if (pData != NULL) { - pData->num = sizeof(SFilePage); + if (pData == NULL) { + qError("failed to get buffer, code:%s", tstrerror(terrno)); + return terrno; } + pData->num = sizeof(SFilePage); } } diff --git a/source/libs/executor/src/cachescanoperator.c b/source/libs/executor/src/cachescanoperator.c index 81d55ec092..73f770a599 100644 --- a/source/libs/executor/src/cachescanoperator.c +++ b/source/libs/executor/src/cachescanoperator.c @@ -107,6 +107,7 @@ int32_t createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandl SDataBlockDescNode* pDescNode = pScanNode->scan.node.pOutputDataBlockDesc; pInfo->pRes = createDataBlockFromDescNode(pDescNode); + QUERY_CHECK_NULL(pInfo->pRes, code, lino, _error, terrno); code = extractColMatchInfo(pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo); QUERY_CHECK_CODE(code, lino, _error); @@ -165,6 +166,7 @@ int32_t createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandl int32_t capacity = 0; pInfo->pUidList = taosArrayInit(4, sizeof(int64_t)); + QUERY_CHECK_NULL(pInfo->pUidList, code, lino, _error, terrno); // partition by tbname if (oneTableForEachGroup(pTableListInfo) || (totalTables == 1)) { @@ -205,6 +207,7 @@ int32_t createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandl p->pCtx = createSqlFunctionCtx(p->pExprInfo, p->numOfExprs, &p->rowEntryInfoOffset, &pTaskInfo->storageAPI.functionStore); + QUERY_CHECK_NULL(p->pCtx, code, lino, _error, terrno); } setOperatorInfo(pOperator, "CachedRowScanOperator", QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN, false, OP_NOT_OPENED, diff --git a/source/libs/executor/src/countwindowoperator.c b/source/libs/executor/src/countwindowoperator.c index 8d2ad4cbad..e382bbc9ed 100644 --- a/source/libs/executor/src/countwindowoperator.c +++ b/source/libs/executor/src/countwindowoperator.c @@ -276,6 +276,7 @@ int32_t createCountwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* phy QUERY_CHECK_CODE(code, lino, _error); SSDataBlock* pResBlock = createDataBlockFromDescNode(pCountWindowNode->window.node.pOutputDataBlockDesc); + QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno); code = blockDataEnsureCapacity(pResBlock, pOperator->resultInfo.capacity); QUERY_CHECK_CODE(code, lino, _error); diff --git a/source/libs/executor/src/eventwindowoperator.c b/source/libs/executor/src/eventwindowoperator.c index 6a39cac525..fcd202e221 100644 --- a/source/libs/executor/src/eventwindowoperator.c +++ b/source/libs/executor/src/eventwindowoperator.c @@ -244,6 +244,9 @@ static int32_t setSingleOutputTupleBufv1(SResultRowInfo* pResultRowInfo, STimeWi SExprSupp* pExprSup, SAggSupporter* pAggSup) { if (*pResult == NULL) { SResultRow* p = taosMemoryCalloc(1, pAggSup->resultRowSize); + if (!p) { + return terrno; + } pResultRowInfo->cur = (SResultRowPosition){.pageId = p->pageId, .offset = p->offset}; *pResult = p; } diff --git a/source/libs/executor/src/exchangeoperator.c b/source/libs/executor/src/exchangeoperator.c index 5af8df8f06..2f018d7a69 100644 --- a/source/libs/executor/src/exchangeoperator.c +++ b/source/libs/executor/src/exchangeoperator.c @@ -307,6 +307,9 @@ static int32_t initDataSource(int32_t numOfSources, SExchangeInfo* pInfo, const int32_t len = strlen(id) + 1; pInfo->pTaskId = taosMemoryCalloc(1, len); + if (!pInfo->pTaskId) { + return terrno; + } strncpy(pInfo->pTaskId, id, len); for (int32_t i = 0; i < numOfSources; ++i) { SSourceDataInfo dataInfo = {0}; @@ -389,7 +392,9 @@ int32_t createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode* pExNo pInfo->pDummyBlock = createDataBlockFromDescNode(pExNode->node.pOutputDataBlockDesc); pInfo->pResultBlockList = taosArrayInit(64, POINTER_BYTES); + QUERY_CHECK_NULL(pInfo->pResultBlockList, code, lino, _error, terrno); pInfo->pRecycledBlocks = taosArrayInit(64, POINTER_BYTES); + QUERY_CHECK_NULL(pInfo->pRecycledBlocks, code, lino, _error, terrno); SExchangeOpStopInfo stopInfo = {QUERY_NODE_PHYSICAL_PLAN_EXCHANGE, pInfo->self}; code = qAppendTaskStopInfo(pTaskInfo, &stopInfo); diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index cdc8cc6dd5..0ca9908fb1 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -144,6 +144,7 @@ int32_t initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap, void* pData = NULL; pGroupResInfo->pRows = taosArrayInit(size, POINTER_BYTES); + QUERY_CHECK_NULL(pGroupResInfo->pRows, code, lino, _end, terrno); size_t keyLen = 0; int32_t iter = 0; @@ -353,9 +354,15 @@ EDealRes doTranslateTagExpr(SNode** pNode, void* pContext) { } else if (pSColumnNode->node.resType.type == TSDB_DATA_TYPE_JSON) { int32_t len = ((const STag*)p)->len; res->datum.p = taosMemoryCalloc(len + 1, 1); + if (NULL == res->datum.p) { + return DEAL_RES_ERROR; + } memcpy(res->datum.p, p, len); } else if (IS_VAR_DATA_TYPE(pSColumnNode->node.resType.type)) { res->datum.p = taosMemoryCalloc(tagVal.nData + VARSTR_HEADER_SIZE + 1, 1); + if (NULL == res->datum.p) { + return DEAL_RES_ERROR; + } memcpy(varDataVal(res->datum.p), tagVal.pData, tagVal.nData); varDataSetLen(res->datum.p, tagVal.nData); } else { @@ -378,6 +385,9 @@ EDealRes doTranslateTagExpr(SNode** pNode, void* pContext) { int32_t len = strlen(mr->me.name); res->datum.p = taosMemoryCalloc(len + VARSTR_HEADER_SIZE + 1, 1); + if (NULL == res->datum.p) { + return DEAL_RES_ERROR; + } memcpy(varDataVal(res->datum.p), mr->me.name, len); varDataSetLen(res->datum.p, len); nodesDestroyNode(*pNode); @@ -856,6 +866,7 @@ static SArray* getTableNameList(const SNodeListNode* pList) { // remove the duplicates SArray* pNewList = taosArrayInit(taosArrayGetSize(pTbList), sizeof(void*)); + QUERY_CHECK_NULL(pNewList, code, lino, _end, terrno); void* tmp = taosArrayPush(pNewList, taosArrayGet(pTbList, 0)); QUERY_CHECK_NULL(tmp, code, lino, _end, terrno); @@ -1739,6 +1750,7 @@ int32_t createExprFromOneNode(SExprInfo* pExp, SNode* pNode, int16_t slotId) { int32_t numOfParam = LIST_LENGTH(pFuncNode->pParameterList); pExp->base.pParam = taosMemoryCalloc(numOfParam, sizeof(SFunctParam)); + QUERY_CHECK_NULL(pExp->base.pParam, code, lino, _end, terrno); pExp->base.numOfParams = numOfParam; for (int32_t j = 0; j < numOfParam; ++j) { @@ -1760,6 +1772,7 @@ int32_t createExprFromOneNode(SExprInfo* pExp, SNode* pNode, int16_t slotId) { SOperatorNode* pOpNode = (SOperatorNode*)pNode; pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam)); + QUERY_CHECK_NULL(pExp->base.pParam, code, lino, _end, terrno); pExp->base.numOfParams = 1; SDataType* pType = &pOpNode->node.resType; @@ -1771,6 +1784,7 @@ int32_t createExprFromOneNode(SExprInfo* pExp, SNode* pNode, int16_t slotId) { SCaseWhenNode* pCaseNode = (SCaseWhenNode*)pNode; pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam)); + QUERY_CHECK_NULL(pExp->base.pParam, code, lino, _end, terrno); pExp->base.numOfParams = 1; SDataType* pType = &pCaseNode->node.resType; @@ -1781,9 +1795,8 @@ int32_t createExprFromOneNode(SExprInfo* pExp, SNode* pNode, int16_t slotId) { pExp->pExpr->nodeType = QUERY_NODE_OPERATOR; SLogicConditionNode* pCond = (SLogicConditionNode*)pNode; pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam)); - if (!pExp->base.pParam) { - code = terrno; - } + QUERY_CHECK_NULL(pExp->base.pParam, code, lino, _end, terrno); + if (TSDB_CODE_SUCCESS == code) { pExp->base.numOfParams = 1; SDataType* pType = &pCond->node.resType; @@ -1808,6 +1821,9 @@ int32_t createExprFromTargetNode(SExprInfo* pExp, STargetNode* pTargetNode) { SExprInfo* createExpr(SNodeList* pNodeList, int32_t* numOfExprs) { *numOfExprs = LIST_LENGTH(pNodeList); SExprInfo* pExprs = taosMemoryCalloc(*numOfExprs, sizeof(SExprInfo)); + if (!pExprs) { + return NULL; + } for (int32_t i = 0; i < (*numOfExprs); ++i) { SExprInfo* pExp = &pExprs[i]; @@ -1944,6 +1960,8 @@ SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, } else { char* udfName = pExpr->pExpr->_function.pFunctNode->functionName; pCtx->udfName = taosStrdup(udfName); + QUERY_CHECK_NULL(pCtx->udfName, code, lino, _end, terrno); + code = fmGetUdafExecFuncs(pCtx->functionId, &pCtx->fpSet); QUERY_CHECK_CODE(code, lino, _end); } @@ -2075,6 +2093,9 @@ int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysi pCond->numOfCols = LIST_LENGTH(pTableScanNode->scan.pScanCols); pCond->colList = taosMemoryCalloc(pCond->numOfCols, sizeof(SColumnInfo)); + if (!pCond->colList) { + return terrno; + } pCond->pSlotList = taosMemoryMalloc(sizeof(int32_t) * pCond->numOfCols); if (pCond->colList == NULL || pCond->pSlotList == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -2455,6 +2476,9 @@ static int32_t sortTableGroup(STableListInfo* pTableListInfo) { int32_t size = taosArrayGetSize(pTableListInfo->pTableList); SArray* pList = taosArrayInit(4, sizeof(int32_t)); + if (!pList) { + return terrno; + } STableKeyInfo* pInfo = taosArrayGet(pTableListInfo->pTableList, 0); uint64_t gid = pInfo->groupId; diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 3ee08d8fb0..ef76f14aa9 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -373,6 +373,7 @@ static int32_t filterUnqualifiedTables(const SStreamScanInfo* pScanInfo, const S int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; SArray* qa = taosArrayInit(4, sizeof(tb_uid_t)); + QUERY_CHECK_NULL(qa, code, lino, _end, terrno); int32_t numOfUids = taosArrayGetSize(tableIdList); if (numOfUids == 0) { (*ppArrayRes) = qa; diff --git a/source/libs/executor/src/executorInt.c b/source/libs/executor/src/executorInt.c index bceefd2c0d..e339ca5d29 100644 --- a/source/libs/executor/src/executorInt.c +++ b/source/libs/executor/src/executorInt.c @@ -89,6 +89,10 @@ SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int32_t* currentPageId, i int32_t pageId = -1; if (*currentPageId == -1) { pData = getNewBufPage(pResultBuf, &pageId); + if (pData == NULL) { + qError("failed to get buffer, code:%s", tstrerror(terrno)); + return NULL; + } pData->num = sizeof(SFilePage); } else { pData = getBufPage(pResultBuf, *currentPageId); @@ -104,9 +108,11 @@ SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int32_t* currentPageId, i releaseBufPage(pResultBuf, pData); pData = getNewBufPage(pResultBuf, &pageId); - if (pData != NULL) { - pData->num = sizeof(SFilePage); + if (pData == NULL) { + qError("failed to get buffer, code:%s", tstrerror(terrno)); + return NULL; } + pData->num = sizeof(SFilePage); } } @@ -409,6 +415,9 @@ static int32_t doCreateConstantValColumnSMAInfo(SInputColumnInfoData* pInput, SF SColumnDataAgg* da = NULL; if (pInput->pColumnDataAgg[paramIndex] == NULL) { da = taosMemoryCalloc(1, sizeof(SColumnDataAgg)); + if (!da) { + return terrno; + } pInput->pColumnDataAgg[paramIndex] = da; if (da == NULL) { return TSDB_CODE_OUT_OF_MEMORY; diff --git a/source/libs/executor/src/filloperator.c b/source/libs/executor/src/filloperator.c index d88e09273f..50117f6c7e 100644 --- a/source/libs/executor/src/filloperator.c +++ b/source/libs/executor/src/filloperator.c @@ -397,6 +397,9 @@ static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t pInfo->win.ekey = win.skey; } pInfo->p = taosMemoryCalloc(numOfCols, POINTER_BYTES); + if (!pInfo->p) { + return terrno; + } if (pInfo->pFillInfo == NULL || pInfo->p == NULL) { taosMemoryFree(pInfo->pFillInfo); @@ -465,6 +468,7 @@ int32_t createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFi } pInfo->pRes = createDataBlockFromDescNode(pPhyFillNode->node.pOutputDataBlockDesc); + QUERY_CHECK_NULL(pInfo->pRes, code, lino, _error, terrno); SExprInfo* pExprInfo = NULL; code = createExprInfo(pPhyFillNode->pFillExprs, NULL, &pExprInfo, &pInfo->numOfExpr); diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 43b2f5ab6d..502f9ff0f7 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -859,6 +859,9 @@ _end: int32_t* setupColumnOffset(const SSDataBlock* pBlock, int32_t rowCapacity) { size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); int32_t* offset = taosMemoryCalloc(numOfCols, sizeof(int32_t)); + if (!offset) { + return NULL; + } offset[0] = sizeof(int32_t) + sizeof(uint64_t); // the number of rows in current page, ref to SSDataBlock paged serialization format @@ -1173,6 +1176,7 @@ int32_t createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNo uint32_t defaultBufsz = 0; pInfo->binfo.pRes = createDataBlockFromDescNode(pPartNode->node.pOutputDataBlockDesc); + QUERY_CHECK_NULL(pInfo->binfo.pRes, code, lino, _error, terrno); code = getBufferPgSize(pInfo->binfo.pRes->info.rowSize, &defaultPgsz, &defaultBufsz); if (code != TSDB_CODE_SUCCESS) { goto _error; @@ -1193,6 +1197,8 @@ int32_t createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNo blockDataGetCapacityInRow(pInfo->binfo.pRes, getBufPageSize(pInfo->pBuf), blockDataGetSerialMetaSize(taosArrayGetSize(pInfo->binfo.pRes->pDataBlock))); pInfo->columnOffset = setupColumnOffset(pInfo->binfo.pRes, pInfo->rowCapacity); + QUERY_CHECK_NULL(pInfo->columnOffset, code, lino, _error, terrno); + code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, pInfo->pGroupCols); if (code != TSDB_CODE_SUCCESS) { goto _error; @@ -1427,6 +1433,7 @@ static void doStreamHashPartitionImpl(SStreamPartitionOperatorInfo* pInfo, SSDat SPartitionDataInfo newParData = {0}; newParData.groupId = calcGroupId(pInfo->partitionSup.keyBuf, keyLen); newParData.rowIds = taosArrayInit(64, sizeof(int32_t)); + QUERY_CHECK_NULL(newParData.rowIds, code, lino, _end, terrno); void* tmp = taosArrayPush(newParData.rowIds, &i); QUERY_CHECK_NULL(tmp, code, lino, _end, terrno); @@ -1590,6 +1597,9 @@ SSDataBlock* buildCreateTableBlock(SExprSupp* tbName, SExprSupp* tag) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock)); + if (!pBlock) { + return NULL; + } pBlock->info.hasVarCol = false; pBlock->info.id.groupId = 0; pBlock->info.rows = 0; @@ -1597,6 +1607,7 @@ SSDataBlock* buildCreateTableBlock(SExprSupp* tbName, SExprSupp* tag) { pBlock->info.watermark = INT64_MIN; pBlock->pDataBlock = taosArrayInit(4, sizeof(SColumnInfoData)); + QUERY_CHECK_NULL(pBlock->pDataBlock, code, lino, _end, terrno); SColumnInfoData infoData = {0}; infoData.info.type = TSDB_DATA_TYPE_VARCHAR; if (tbName->numOfExprs > 0) { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index d491ffb524..37c5bc1aeb 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -476,17 +476,27 @@ static void freeTableCachedVal(void* param) { } static STableCachedVal* createTableCacheVal(const SMetaReader* pMetaReader) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; STableCachedVal* pVal = taosMemoryMalloc(sizeof(STableCachedVal)); + QUERY_CHECK_NULL(pVal, code, lino, _end, terrno); pVal->pName = taosStrdup(pMetaReader->me.name); + QUERY_CHECK_NULL(pVal->pName, code, lino, _end, terrno); pVal->pTags = NULL; // only child table has tag value if (pMetaReader->me.type == TSDB_CHILD_TABLE) { STag* pTag = (STag*)pMetaReader->me.ctbEntry.pTags; pVal->pTags = taosMemoryMalloc(pTag->len); + QUERY_CHECK_NULL(pVal->pTags, code, lino, _end, terrno); memcpy(pVal->pTags, pTag, pTag->len); } +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + return NULL; + } return pVal; } @@ -581,6 +591,9 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int pHandle->api.metaReaderFn.readerReleaseLock(&mr); STableCachedVal* pVal = createTableCacheVal(&mr); + if(!pVal) { + return terrno; + } val = *pVal; freeReader = true; @@ -1280,6 +1293,9 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { static int32_t getTableScannerExecInfo(struct SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) { SFileBlockLoadRecorder* pRecorder = taosMemoryCalloc(1, sizeof(SFileBlockLoadRecorder)); + if (!pRecorder) { + return terrno; + } STableScanInfo* pTableScanInfo = pOptr->info; *pRecorder = pTableScanInfo->base.readRecorder; *pOptrExplain = pRecorder; @@ -1290,7 +1306,9 @@ static int32_t getTableScannerExecInfo(struct SOperatorInfo* pOptr, void** pOptr static void destroyTableScanBase(STableScanBase* pBase, TsdReader* pAPI) { cleanupQueryTableDataCond(&pBase->cond); - pAPI->tsdReaderClose(pBase->dataReader); + if (pAPI->tsdReaderClose) { + pAPI->tsdReaderClose(pBase->dataReader); + } pBase->dataReader = NULL; if (pBase->matchInfo.pList != NULL) { @@ -1344,6 +1362,7 @@ int32_t createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHa pSup->pCtx = createSqlFunctionCtx(pSup->pExprInfo, pSup->numOfExprs, &pSup->rowEntryInfoOffset, &pTaskInfo->storageAPI.functionStore); + QUERY_CHECK_NULL(pSup->pCtx, code, lino, _error, terrno); } pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]}; @@ -1359,6 +1378,8 @@ int32_t createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHa pInfo->base.readerAPI = pTaskInfo->storageAPI.tsdReader; initResultSizeInfo(&pOperator->resultInfo, 4096); pInfo->pResBlock = createDataBlockFromDescNode(pDescNode); + QUERY_CHECK_NULL(pInfo->pResBlock, code, lino, _error, terrno); + code = prepareDataBlockBuf(pInfo->pResBlock, &pInfo->base.matchInfo); QUERY_CHECK_CODE(code, lino, _error); @@ -2488,6 +2509,7 @@ static int32_t doBlockDataPrimaryKeyFilter(SSDataBlock* pBlock, STqOffsetVal* of void* data = colDataGetData(pColPk, i); if (IS_VAR_DATA_TYPE(pColPk->info.type)) { void* tmq = taosMemoryMalloc(offset->primaryKey.nData + VARSTR_HEADER_SIZE); + QUERY_CHECK_NULL(tmq, code, lino, _end, terrno); memcpy(varDataVal(tmq), offset->primaryKey.pData, offset->primaryKey.nData); varDataLen(tmq) = offset->primaryKey.nData; p[i] = (*ts > offset->ts) || (func(data, tmq) > 0); @@ -2712,6 +2734,7 @@ _end: static int32_t processPrimaryKey(SSDataBlock* pBlock, bool hasPrimaryKey, STqOffsetVal* offset) { int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; SValue val = {0}; if (hasPrimaryKey) { code = doBlockDataPrimaryKeyFilter(pBlock, offset); @@ -2728,6 +2751,7 @@ static int32_t processPrimaryKey(SSDataBlock* pBlock, bool hasPrimaryKey, STqOff val.type = pColPk->info.type; if (IS_VAR_DATA_TYPE(pColPk->info.type)) { val.pData = taosMemoryMalloc(varDataLen(tmp)); + QUERY_CHECK_NULL(val.pData, code, lino, _end, terrno); val.nData = varDataLen(tmp); memcpy(val.pData, varDataVal(tmp), varDataLen(tmp)); } else { @@ -2735,6 +2759,11 @@ static int32_t processPrimaryKey(SSDataBlock* pBlock, bool hasPrimaryKey, STqOff } } tqOffsetResetToData(offset, pBlock->info.id.uid, pBlock->info.window.ekey, val); + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } return code; } @@ -3010,6 +3039,9 @@ void streamScanOperatorDecode(void* pBuff, int32_t len, SStreamScanInfo* pInfo) } void* pUpInfo = taosMemoryCalloc(1, sizeof(SUpdateInfo)); + if (!pUpInfo) { + return; + } int32_t code = pInfo->stateStore.updateInfoDeserialize(buf, tlen, pUpInfo); if (code == TSDB_CODE_SUCCESS) { pInfo->stateStore.updateInfoDestroy(pInfo->pUpdateInfo); @@ -3469,6 +3501,7 @@ static int32_t extractTableIdList(const STableListInfo* pTableListInfo, SArray** int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; SArray* tableIdList = taosArrayInit(4, sizeof(uint64_t)); + QUERY_CHECK_NULL(tableIdList, code, lino, _end, terrno); // Transfer the Array of STableKeyInfo into uid list. size_t size = tableListGetSize(pTableListInfo); @@ -3864,6 +3897,8 @@ int32_t createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pInfo->primaryKeyIndex = -1; int32_t numOfOutput = taosArrayGetSize(pInfo->matchInfo.pList); pColIds = taosArrayInit(numOfOutput, sizeof(int16_t)); + QUERY_CHECK_NULL(pColIds, code, lino, _error, terrno); + for (int32_t i = 0; i < numOfOutput; ++i) { SColMatchItem* id = taosArrayGet(pInfo->matchInfo.pList, i); @@ -3992,6 +4027,7 @@ int32_t createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* QUERY_CHECK_CODE(code, lino, _error); pInfo->pRes = createDataBlockFromDescNode(pDescNode); + QUERY_CHECK_NULL(pInfo->pRes, code, lino, _error, terrno); code = createSpecialDataBlock(STREAM_CLEAR, &pInfo->pUpdateRes); QUERY_CHECK_CODE(code, lino, _error); @@ -4386,6 +4422,8 @@ static int32_t doTagScanFromCtbIdxNext(SOperatorInfo* pOperator, SSDataBlock** p } STUidTagInfo info = {.uid = uid, .pTagVal = pCur->pVal}; info.pTagVal = taosMemoryMalloc(pCur->vLen); + QUERY_CHECK_NULL(info.pTagVal, code, lino, _end, terrno); + memcpy(info.pTagVal, pCur->pVal, pCur->vLen); void* tmp = taosArrayPush(aUidTags, &info); QUERY_CHECK_NULL(tmp, code, lino, _end, terrno); @@ -5292,6 +5330,7 @@ int32_t generateSortByTsPkInfo(SArray* colMatchInfo, int32_t order, SArray** ppS int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; SArray* pSortInfo = taosArrayInit(1, sizeof(SBlockOrderInfo)); + QUERY_CHECK_NULL(pSortInfo, code, lino, _end, terrno); SBlockOrderInfo biTs = {0}; SBlockOrderInfo biPk = {0}; @@ -5380,9 +5419,11 @@ int32_t startDurationForGroupTableMergeScan(SOperatorInfo* pOperator) { QUERY_CHECK_CODE(code, lino, _end); STableMergeScanSortSourceParam* param = taosMemoryCalloc(1, sizeof(STableMergeScanSortSourceParam)); + QUERY_CHECK_NULL(param, code, lino, _end, terrno); param->pOperator = pOperator; SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource)); + QUERY_CHECK_NULL(ps, code, lino, _end, terrno); ps->param = param; ps->onlyRef = false; code = tsortAddSource(pInfo->pSortHandle, ps); @@ -5659,6 +5700,9 @@ int32_t getTableMergeScanExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExpla ASSERT(pOptr != NULL); // TODO: merge these two info into one struct STableMergeScanExecInfo* execInfo = taosMemoryCalloc(1, sizeof(STableMergeScanExecInfo)); + if (!execInfo) { + return terrno; + } STableMergeScanInfo* pInfo = pOptr->info; execInfo->blockRecorder = pInfo->base.readRecorder; execInfo->sortExecInfo = pInfo->sortExecInfo; @@ -5705,6 +5749,7 @@ int32_t createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SR pSup->pCtx = createSqlFunctionCtx(pSup->pExprInfo, pSup->numOfExprs, &pSup->rowEntryInfoOffset, &pTaskInfo->storageAPI.functionStore); + QUERY_CHECK_NULL(pSup->pCtx, code, lino, _error, terrno); } pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]}; diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index a08787d358..858f26ad18 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -93,6 +93,7 @@ int32_t createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortN pOperator->exprSupp.pCtx = createSqlFunctionCtx(pOperator->exprSupp.pExprInfo, numOfCols, &pOperator->exprSupp.rowEntryInfoOffset, &pTaskInfo->storageAPI.functionStore); + QUERY_CHECK_NULL(pOperator->exprSupp.pCtx, code, lino, _error, terrno); initResultSizeInfo(&pOperator->resultInfo, 1024); code = filterInitFromNode((SNode*)pSortNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0); if (code != TSDB_CODE_SUCCESS) { @@ -784,6 +785,7 @@ int32_t createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSortPhysiNo initResultSizeInfo(&pOperator->resultInfo, 1024); pOperator->exprSupp.pCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pOperator->exprSupp.rowEntryInfoOffset, &pTaskInfo->storageAPI.functionStore); + QUERY_CHECK_NULL(pOperator->exprSupp.pCtx, code, lino, _error, terrno); pInfo->binfo.pRes = createDataBlockFromDescNode(pDescNode); code = blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); diff --git a/source/libs/executor/src/streamcountwindowoperator.c b/source/libs/executor/src/streamcountwindowoperator.c index 8ac73b44f6..499c08f89d 100644 --- a/source/libs/executor/src/streamcountwindowoperator.c +++ b/source/libs/executor/src/streamcountwindowoperator.c @@ -651,10 +651,12 @@ static int32_t doStreamCountAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRe SOperatorInfo* downstream = pOperator->pDownstream[0]; if (!pInfo->pUpdated) { pInfo->pUpdated = taosArrayInit(16, sizeof(SResultWindowInfo)); + QUERY_CHECK_NULL(pInfo->pUpdated, code, lino, _end, terrno); } if (!pInfo->pStUpdated) { _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pStUpdated = tSimpleHashInit(64, hashFn); + QUERY_CHECK_NULL(pInfo->pStUpdated, code, lino, _end, terrno); } while (1) { SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); @@ -837,6 +839,7 @@ int32_t createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* QUERY_CHECK_CODE(code, lino, _error); SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); + QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno); code = initBasicInfoEx(&pInfo->binfo, pExpSup, pExprInfo, numOfCols, pResBlock, &pTaskInfo->storageAPI.functionStore); QUERY_CHECK_CODE(code, lino, _error); @@ -863,6 +866,7 @@ int32_t createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pInfo->binfo.pRes = pResBlock; _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pStDeleted = tSimpleHashInit(64, hashFn); + QUERY_CHECK_NULL(pInfo->pStDeleted, code, lino, _error, terrno); pInfo->pDelIterator = NULL; code = createSpecialDataBlock(STREAM_DELETE_RESULT, &pInfo->pDelRes); @@ -884,6 +888,7 @@ int32_t createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pInfo->recvGetAll = false; pInfo->pPkDeleted = tSimpleHashInit(64, hashFn); + QUERY_CHECK_NULL(pInfo->pPkDeleted, code, lino, _error, terrno); pInfo->destHasPrimaryKey = pCountNode->window.destHasPrimayKey; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_COUNT; diff --git a/source/libs/executor/src/streameventwindowoperator.c b/source/libs/executor/src/streameventwindowoperator.c index 1311216c06..54fddf843b 100644 --- a/source/libs/executor/src/streameventwindowoperator.c +++ b/source/libs/executor/src/streameventwindowoperator.c @@ -619,10 +619,12 @@ static int32_t doStreamEventAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRe SOperatorInfo* downstream = pOperator->pDownstream[0]; if (!pInfo->pUpdated) { pInfo->pUpdated = taosArrayInit(16, sizeof(SEventWindowInfo)); + QUERY_CHECK_NULL(pInfo->pUpdated, code, lino, _end, terrno); } if (!pInfo->pSeUpdated) { _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pSeUpdated = tSimpleHashInit(64, hashFn); + QUERY_CHECK_NULL(pInfo->pSeUpdated, code, lino, _end, terrno); } while (1) { SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); @@ -738,6 +740,9 @@ void streamEventReleaseState(SOperatorInfo* pOperator) { int32_t winSize = taosArrayGetSize(pInfo->historyWins) * sizeof(SSessionKey); int32_t resSize = winSize + sizeof(TSKEY); char* pBuff = taosMemoryCalloc(1, resSize); + if (!pBuff) { + return ; + } memcpy(pBuff, pInfo->historyWins->pData, winSize); memcpy(pBuff + winSize, &pInfo->twAggSup.maxTs, sizeof(TSKEY)); qDebug("===stream=== event window operator relase state. save result count:%d", @@ -780,10 +785,12 @@ void streamEventReloadState(SOperatorInfo* pOperator) { if (!pInfo->pSeUpdated && num > 0) { _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pSeUpdated = tSimpleHashInit(64, hashFn); + QUERY_CHECK_NULL(pInfo->pSeUpdated, code, lino, _end, terrno); } if (!pInfo->pSeDeleted && num > 0) { _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pSeDeleted = tSimpleHashInit(64, hashFn); + QUERY_CHECK_NULL(pInfo->pSeDeleted, code, lino, _end, terrno); } for (int32_t i = 0; i < num; i++) { SEventWindowInfo curInfo = {0}; @@ -892,6 +899,7 @@ int32_t createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* QUERY_CHECK_CODE(code, lino, _error); SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); + QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno); code = initBasicInfoEx(&pInfo->binfo, pExpSup, pExprInfo, numOfCols, pResBlock, &pTaskInfo->storageAPI.functionStore); QUERY_CHECK_CODE(code, lino, _error); @@ -903,6 +911,7 @@ int32_t createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pSeDeleted = tSimpleHashInit(64, hashFn); + QUERY_CHECK_NULL(pInfo->pSeDeleted, code, lino, _error, terrno); pInfo->pDelIterator = NULL; code = createSpecialDataBlock(STREAM_DELETE_RESULT, &pInfo->pDelRes); QUERY_CHECK_CODE(code, lino, _error); diff --git a/source/libs/executor/src/streamfilloperator.c b/source/libs/executor/src/streamfilloperator.c index 39aedd9d59..b7b5b1a38c 100644 --- a/source/libs/executor/src/streamfilloperator.c +++ b/source/libs/executor/src/streamfilloperator.c @@ -604,6 +604,7 @@ static void doStreamFillLinear(SStreamFillSupporter* pFillSup, SStreamFillInfo* SPoint cur = {0}; cur.key = pFillInfo->current; cur.val = taosMemoryCalloc(1, pCell->bytes); + QUERY_CHECK_NULL(cur.val, code, lino, _end, terrno); taosGetLinearInterpolationVal(&cur, pCell->type, &start, pEnd, pCell->type); code = colDataSetVal(pColData, index, (const char*)cur.val, false); QUERY_CHECK_CODE(code, lino, _end); @@ -683,15 +684,24 @@ _end: } } -void keepBlockRowInDiscBuf(SOperatorInfo* pOperator, SStreamFillInfo* pFillInfo, SSDataBlock* pBlock, TSKEY* tsCol, +int32_t keepBlockRowInDiscBuf(SOperatorInfo* pOperator, SStreamFillInfo* pFillInfo, SSDataBlock* pBlock, TSKEY* tsCol, int32_t rowId, uint64_t groupId, int32_t rowSize) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; TSKEY ts = tsCol[rowId]; pFillInfo->nextRowKey = ts; SResultRowData tmpNextRow = {.key = ts}; tmpNextRow.pRowVal = taosMemoryCalloc(1, rowSize); + QUERY_CHECK_NULL(tmpNextRow.pRowVal, code, lino, _end, terrno); transBlockToResultRow(pBlock, rowId, ts, &tmpNextRow); keepResultInDiscBuf(pOperator, groupId, &tmpNextRow, rowSize); taosMemoryFreeClear(tmpNextRow.pRowVal); + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + return code; } static void doFillResults(SOperatorInfo* pOperator, SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo, @@ -721,13 +731,15 @@ static void doStreamFillImpl(SOperatorInfo* pOperator) { pInfo->srcRowIndex++; if (pInfo->srcRowIndex == 0) { - keepBlockRowInDiscBuf(pOperator, pFillInfo, pBlock, tsCol, pInfo->srcRowIndex, groupId, pFillSup->rowSize); + code = keepBlockRowInDiscBuf(pOperator, pFillInfo, pBlock, tsCol, pInfo->srcRowIndex, groupId, pFillSup->rowSize); + QUERY_CHECK_CODE(code, lino, _end); pInfo->srcRowIndex++; } while (pInfo->srcRowIndex < pBlock->info.rows) { TSKEY ts = tsCol[pInfo->srcRowIndex]; - keepBlockRowInDiscBuf(pOperator, pFillInfo, pBlock, tsCol, pInfo->srcRowIndex, groupId, pFillSup->rowSize); + code = keepBlockRowInDiscBuf(pOperator, pFillInfo, pBlock, tsCol, pInfo->srcRowIndex, groupId, pFillSup->rowSize); + QUERY_CHECK_CODE(code, lino, _end); doFillResults(pOperator, pFillSup, pFillInfo, pBlock, tsCol, pInfo->srcRowIndex - 1, pRes); if (pInfo->pRes->info.rows == pInfo->pRes->info.capacity) { code = blockDataUpdateTsWindow(pRes, pInfo->primaryTsCol); @@ -1214,6 +1226,7 @@ static SStreamFillSupporter* initStreamFillSup(SStreamFillPhysiNode* pPhyFillNod _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pFillSup->pResMap = tSimpleHashInit(16, hashFn); + QUERY_CHECK_NULL(pFillSup->pResMap, code, lino, _end, terrno); pFillSup->hasDelete = false; _end: @@ -1363,7 +1376,9 @@ int32_t createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysi initResultSizeInfo(&pOperator->resultInfo, 4096); pInfo->pRes = createDataBlockFromDescNode(pPhyFillNode->node.pOutputDataBlockDesc); + QUERY_CHECK_NULL(pInfo->pRes, code, lino, _error, terrno); pInfo->pSrcBlock = createDataBlockFromDescNode(pPhyFillNode->node.pOutputDataBlockDesc); + QUERY_CHECK_NULL(pInfo->pSrcBlock, code, lino, _error, terrno); code = blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); QUERY_CHECK_CODE(code, lino, _error); diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 3c696a1be8..e95ffeb658 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -647,6 +647,7 @@ int32_t addPullWindow(SHashObj* pMap, SWinKey* pWinRes, int32_t size) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; SArray* childIds = taosArrayInit(8, sizeof(int32_t)); + QUERY_CHECK_NULL(childIds, code, lino, _end, terrno); for (int32_t i = 0; i < size; i++) { void* tmp = taosArrayPush(childIds, &i); if (!tmp) { @@ -1579,10 +1580,12 @@ static int32_t doStreamFinalIntervalAggNext(SOperatorInfo* pOperator, SSDataBloc if (!pInfo->pUpdated) { pInfo->pUpdated = taosArrayInit(4096, POINTER_BYTES); + QUERY_CHECK_NULL(pInfo->pUpdated, code, lino, _end, terrno); } if (!pInfo->pUpdatedMap) { _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pUpdatedMap = tSimpleHashInit(4096, hashFn); + QUERY_CHECK_NULL(pInfo->pUpdatedMap, code, lino, _end, terrno); } while (1) { @@ -1609,6 +1612,7 @@ static int32_t doStreamFinalIntervalAggNext(SOperatorInfo* pOperator, SSDataBloc } else if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT || pBlock->info.type == STREAM_CLEAR) { SArray* delWins = taosArrayInit(8, sizeof(SWinKey)); + QUERY_CHECK_NULL(delWins, code, lino, _end, terrno); SHashObj* finalMap = IS_FINAL_INTERVAL_OP(pOperator) ? pInfo->pFinalPullDataMap : NULL; code = doDeleteWindows(pOperator, &pInfo->interval, pBlock, delWins, pInfo->pUpdatedMap, finalMap); QUERY_CHECK_CODE(code, lino, _end); @@ -1895,9 +1899,11 @@ int32_t createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiN QUERY_CHECK_CODE(code, lino, _error); SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); + QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno); initBasicInfo(&pInfo->binfo, pResBlock); pInfo->pState = taosMemoryCalloc(1, sizeof(SStreamState)); + QUERY_CHECK_NULL(pInfo->pState, code, lino, _error, terrno); qInfo("open state %p", pInfo->pState); pAPI->stateStore.streamStateCopyBackend(pTaskInfo->streamInfo.pState, pInfo->pState); //*(pInfo->pState) = *(pTaskInfo->streamInfo.pState); @@ -1921,6 +1927,7 @@ int32_t createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiN } pInfo->pPullWins = taosArrayInit(8, sizeof(SPullWindowInfo)); + QUERY_CHECK_NULL(pInfo->pPullWins, code, lino, _error, terrno); pInfo->pullIndex = 0; _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pPullDataMap = taosHashInit(64, hashFn, true, HASH_NO_LOCK); @@ -1936,6 +1943,7 @@ int32_t createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiN pInfo->delIndex = 0; pInfo->pDelWins = taosArrayInit(4, sizeof(SWinKey)); + QUERY_CHECK_NULL(pInfo->pDelWins, code, lino, _error, terrno); pInfo->delKey.ts = INT64_MAX; pInfo->delKey.groupId = 0; pInfo->numOfDatapack = 0; @@ -1960,7 +1968,9 @@ int32_t createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiN pInfo->clearState = false; pInfo->pMidPullDatas = taosArrayInit(4, sizeof(SWinKey)); + QUERY_CHECK_NULL(pInfo->pMidPullDatas, code, lino, _error, terrno); pInfo->pDeletedMap = tSimpleHashInit(4096, hashFn); + QUERY_CHECK_NULL(pInfo->pDeletedMap, code, lino, _error, terrno); pInfo->destHasPrimaryKey = pIntervalPhyNode->window.destHasPrimayKey; pOperator->operatorType = pPhyNode->type; @@ -2136,6 +2146,9 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SExprSupp* pExpSup, in initDummyFunction(pSup->pDummyCtx, pExpSup->pCtx, numOfOutput); pSup->pState = taosMemoryCalloc(1, sizeof(SStreamState)); + if (!pSup->pState) { + return terrno; + } *(pSup->pState) = *pState; pSup->stateStore.streamStateSetNumber(pSup->pState, -1, tsIndex); int32_t funResSize = getMaxFunResSize(pExpSup, numOfOutput); @@ -2145,6 +2158,9 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SExprSupp* pExpSup, in _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pSup->pResultRows = tSimpleHashInit(32, hashFn); + if (!pSup->pResultRows) { + return terrno; + } for (int32_t i = 0; i < numOfOutput; ++i) { pExpSup->pCtx[i].saveHandle.pState = pSup->pState; @@ -3347,10 +3363,12 @@ static int32_t doStreamSessionAggNext(SOperatorInfo* pOperator, SSDataBlock** pp SOperatorInfo* downstream = pOperator->pDownstream[0]; if (!pInfo->pUpdated) { pInfo->pUpdated = taosArrayInit(16, sizeof(SResultWindowInfo)); + QUERY_CHECK_NULL(pInfo->pUpdated, code, lino, _end, terrno); } if (!pInfo->pStUpdated) { _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pStUpdated = tSimpleHashInit(64, hashFn); + QUERY_CHECK_NULL(pInfo->pStUpdated, code, lino, _end, terrno); } while (1) { SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); @@ -3363,6 +3381,7 @@ static int32_t doStreamSessionAggNext(SOperatorInfo* pOperator, SSDataBlock** pp if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT || pBlock->info.type == STREAM_CLEAR) { SArray* pWins = taosArrayInit(16, sizeof(SSessionKey)); + QUERY_CHECK_NULL(pWins, code, lino, _end, terrno); // gap must be 0 code = doDeleteTimeWindows(pAggSup, pBlock, pWins); QUERY_CHECK_CODE(code, lino, _end); @@ -3503,6 +3522,9 @@ void streamSessionReleaseState(SOperatorInfo* pOperator) { int32_t winSize = taosArrayGetSize(pInfo->historyWins) * sizeof(SSessionKey); int32_t resSize = winSize + sizeof(TSKEY); char* pBuff = taosMemoryCalloc(1, resSize); + if (!pBuff) { + return; + } memcpy(pBuff, pInfo->historyWins->pData, winSize); memcpy(pBuff + winSize, &pInfo->twAggSup.maxTs, sizeof(TSKEY)); pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_SESSION_OP_STATE_NAME, @@ -3630,6 +3652,7 @@ void streamSessionReloadState(SOperatorInfo* pOperator) { if (!pInfo->pStUpdated && num > 0) { _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pStUpdated = tSimpleHashInit(64, hashFn); + QUERY_CHECK_NULL(pInfo->pStUpdated, code, lino, _end, terrno); } for (int32_t i = 0; i < num; i++) { SResultWindowInfo winInfo = {0}; @@ -3713,6 +3736,7 @@ int32_t createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode QUERY_CHECK_CODE(code, lino, _error); SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); + QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno); code = initBasicInfoEx(&pInfo->binfo, pExpSup, pExprInfo, numOfCols, pResBlock, &pTaskInfo->storageAPI.functionStore); if (code != TSDB_CODE_SUCCESS) { goto _error; @@ -3744,6 +3768,7 @@ int32_t createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode pInfo->order = TSDB_ORDER_ASC; _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pStDeleted = tSimpleHashInit(64, hashFn); + QUERY_CHECK_NULL(pInfo->pStDeleted, code, lino, _error, terrno); pInfo->pDelIterator = NULL; code = createSpecialDataBlock(STREAM_DELETE_RESULT, &pInfo->pDelRes); QUERY_CHECK_CODE(code, lino, _error); @@ -3770,6 +3795,7 @@ int32_t createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode pInfo->recvGetAll = false; pInfo->destHasPrimaryKey = pSessionNode->window.destHasPrimayKey; pInfo->pPkDeleted = tSimpleHashInit(64, hashFn); + QUERY_CHECK_NULL(pInfo->pPkDeleted, code, lino, _error, terrno); pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION; setOperatorInfo(pOperator, getStreamOpName(pOperator->operatorType), QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION, true, @@ -3899,10 +3925,12 @@ static int32_t doStreamSessionSemiAggNext(SOperatorInfo* pOperator, SSDataBlock* SOperatorInfo* downstream = pOperator->pDownstream[0]; if (!pInfo->pUpdated) { pInfo->pUpdated = taosArrayInit(16, sizeof(SResultWindowInfo)); + QUERY_CHECK_NULL(pInfo->pUpdated, code, lino, _end, terrno); } if (!pInfo->pStUpdated) { _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pStUpdated = tSimpleHashInit(64, hashFn); + QUERY_CHECK_NULL(pInfo->pStUpdated, code, lino, _end, terrno); } while (1) { SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); @@ -4021,6 +4049,7 @@ int32_t createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream, SPhys if (numOfChild > 0) { pInfo->pChildren = taosArrayInit(numOfChild, sizeof(void*)); + QUERY_CHECK_NULL(pInfo->pChildren, code, lino, _error, terrno); for (int32_t i = 0; i < numOfChild; i++) { SOperatorInfo* pChildOp = NULL; code = createStreamFinalSessionAggOperatorInfo(NULL, pPhyNode, pTaskInfo, 0, pHandle, &pChildOp); @@ -4611,10 +4640,12 @@ static int32_t doStreamStateAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRe SOperatorInfo* downstream = pOperator->pDownstream[0]; if (!pInfo->pUpdated) { pInfo->pUpdated = taosArrayInit(16, sizeof(SResultWindowInfo)); + QUERY_CHECK_NULL(pInfo->pUpdated, code, lino, _end, terrno); } if (!pInfo->pSeUpdated) { _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pSeUpdated = tSimpleHashInit(64, hashFn); + QUERY_CHECK_NULL(pInfo->pSeUpdated, code, lino, _end, terrno); } while (1) { SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); @@ -4715,6 +4746,9 @@ void streamStateReleaseState(SOperatorInfo* pOperator) { int32_t winSize = taosArrayGetSize(pInfo->historyWins) * sizeof(SSessionKey); int32_t resSize = winSize + sizeof(TSKEY); char* pBuff = taosMemoryCalloc(1, resSize); + if (!pBuff) { + return ; + } memcpy(pBuff, pInfo->historyWins->pData, winSize); memcpy(pBuff + winSize, &pInfo->twAggSup.maxTs, sizeof(TSKEY)); qDebug("===stream=== relase state. save result count:%d", (int32_t)taosArrayGetSize(pInfo->historyWins)); @@ -4765,10 +4799,12 @@ void streamStateReloadState(SOperatorInfo* pOperator) { if (!pInfo->pSeUpdated && num > 0) { _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pSeUpdated = tSimpleHashInit(64, hashFn); + QUERY_CHECK_NULL(pInfo->pSeUpdated, code, lino, _end, terrno); } if (!pInfo->pSeDeleted && num > 0) { _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pSeDeleted = tSimpleHashInit(64, hashFn); + QUERY_CHECK_NULL(pInfo->pSeDeleted, code, lino, _end, terrno); } for (int32_t i = 0; i < num; i++) { SStateWindowInfo curInfo = {0}; @@ -4870,6 +4906,7 @@ int32_t createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* QUERY_CHECK_CODE(code, lino, _error); SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); + QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno); code = initBasicInfoEx(&pInfo->binfo, pExpSup, pExprInfo, numOfCols, pResBlock, &pTaskInfo->storageAPI.functionStore); if (code != TSDB_CODE_SUCCESS) { goto _error; @@ -4884,6 +4921,7 @@ int32_t createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pSeDeleted = tSimpleHashInit(64, hashFn); + QUERY_CHECK_NULL(pInfo->pSeDeleted, code, lino, _error, terrno); pInfo->pDelIterator = NULL; code = createSpecialDataBlock(STREAM_DELETE_RESULT, &pInfo->pDelRes); @@ -4910,6 +4948,7 @@ int32_t createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pInfo->recvGetAll = false; pInfo->pPkDeleted = tSimpleHashInit(64, hashFn); + QUERY_CHECK_NULL(pInfo->pPkDeleted, code, lino, _error, terrno); pInfo->destHasPrimaryKey = pStateNode->window.destHasPrimayKey; setOperatorInfo(pOperator, "StreamStateAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE, true, OP_NOT_OPENED, @@ -5004,11 +5043,13 @@ static int32_t doStreamIntervalAggNext(SOperatorInfo* pOperator, SSDataBlock** p if (!pInfo->pUpdated) { pInfo->pUpdated = taosArrayInit(4096, POINTER_BYTES); + QUERY_CHECK_NULL(pInfo->pUpdated, code, lino, _end, terrno); } if (!pInfo->pUpdatedMap) { _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pUpdatedMap = tSimpleHashInit(4096, hashFn); + QUERY_CHECK_NULL(pInfo->pUpdatedMap, code, lino, _end, terrno); } while (1) { @@ -5151,6 +5192,7 @@ int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* QUERY_CHECK_CODE(code, lino, _error); SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); + QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno); pInfo->interval = (SInterval){ .interval = pIntervalPhyNode->interval, .sliding = pIntervalPhyNode->sliding, @@ -5186,6 +5228,7 @@ int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* initResultSizeInfo(&pOperator->resultInfo, 4096); pInfo->pState = taosMemoryCalloc(1, sizeof(SStreamState)); + QUERY_CHECK_NULL(pInfo->pState, code, lino, _error, terrno); *(pInfo->pState) = *(pTaskInfo->streamInfo.pState); pAPI->stateStore.streamStateSetNumber(pInfo->pState, -1, pInfo->primaryTsIndex); @@ -5207,6 +5250,7 @@ int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pInfo->invertible = false; pInfo->pDelWins = taosArrayInit(4, sizeof(SWinKey)); + QUERY_CHECK_NULL(pInfo->pDelWins, code, lino, _error, terrno); pInfo->delIndex = 0; code = createSpecialDataBlock(STREAM_DELETE_RESULT, &pInfo->pDelRes); @@ -5247,6 +5291,7 @@ int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pDeletedMap = tSimpleHashInit(4096, hashFn); + QUERY_CHECK_NULL(pInfo->pDeletedMap, code, lino, _error, terrno); pInfo->destHasPrimaryKey = pIntervalPhyNode->window.destHasPrimayKey; // for stream @@ -5479,10 +5524,12 @@ static int32_t doStreamMidIntervalAggNext(SOperatorInfo* pOperator, SSDataBlock* if (!pInfo->pUpdated) { pInfo->pUpdated = taosArrayInit(4096, POINTER_BYTES); + QUERY_CHECK_NULL(pInfo->pUpdated, code, lino, _end, terrno); } if (!pInfo->pUpdatedMap) { _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pUpdatedMap = tSimpleHashInit(4096, hashFn); + QUERY_CHECK_NULL(pInfo->pUpdatedMap, code, lino, _end, terrno); } while (1) { diff --git a/source/libs/executor/src/sysscanoperator.c b/source/libs/executor/src/sysscanoperator.c index 90c760136e..14fda685c0 100644 --- a/source/libs/executor/src/sysscanoperator.c +++ b/source/libs/executor/src/sysscanoperator.c @@ -1772,8 +1772,10 @@ static SSDataBlock* sysTableScanUserTables(SOperatorInfo* pOperator) { .pMeta = pInfo->readHandle.vnode, .pVnode = pInfo->readHandle.vnode, .pAPI = &pTaskInfo->storageAPI}; SSysTableIndex* idx = taosMemoryMalloc(sizeof(SSysTableIndex)); + QUERY_CHECK_NULL(idx, code, lino, _end, terrno); idx->init = 0; idx->uids = taosArrayInit(128, sizeof(int64_t)); + QUERY_CHECK_NULL(idx->uids, code, lino, _end, terrno); idx->lastIdx = 0; pInfo->pIdx = idx; // set idx arg @@ -1992,6 +1994,9 @@ static SSDataBlock* sysTableScanFromMNode(SOperatorInfo* pOperator, SSysTableSca int32_t contLen = tSerializeSRetrieveTableReq(NULL, 0, &pInfo->req); char* buf1 = taosMemoryCalloc(1, contLen); + if (!buf1) { + return NULL; + } int32_t tempRes = tSerializeSRetrieveTableReq(buf1, contLen, &pInfo->req); if (tempRes < 0) { code = terrno; @@ -2083,6 +2088,7 @@ int32_t createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNo SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; + lino = __LINE__; goto _error; } @@ -2091,9 +2097,7 @@ int32_t createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNo int32_t num = 0; code = extractColMatchInfo(pScanNode->pScanCols, pDescNode, &num, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo); - if (code != TSDB_CODE_SUCCESS) { - goto _error; - } + QUERY_CHECK_CODE(code, lino, _error); extractTbnameSlotId(pInfo, pScanNode); @@ -2101,9 +2105,11 @@ int32_t createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNo pInfo->accountId = pScanPhyNode->accountId; pInfo->pUser = taosStrdup((void*)pUser); + QUERY_CHECK_NULL(pInfo->pUser, code, lino, _error, terrno); pInfo->sysInfo = pScanPhyNode->sysInfo; pInfo->showRewrite = pScanPhyNode->showRewrite; pInfo->pRes = createDataBlockFromDescNode(pDescNode); + QUERY_CHECK_NULL(pInfo->pRes, code, lino, _error, terrno); pInfo->pCondition = pScanNode->node.pConditions; code = filterInitFromNode(pScanNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0); QUERY_CHECK_CODE(code, lino, _error); @@ -2143,7 +2149,7 @@ _error: if (code != TSDB_CODE_SUCCESS) { qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); } - taosMemoryFreeClear(pOperator); + destroyOperator(pOperator); pTaskInfo->code = code; return code; } @@ -2173,15 +2179,19 @@ void destroySysScanOperator(void* param) { (void)tsem_destroy(&pInfo->ready); blockDataDestroy(pInfo->pRes); - const char* name = tNameGetTableName(&pInfo->name); - if (strncasecmp(name, TSDB_INS_TABLE_TABLES, TSDB_TABLE_FNAME_LEN) == 0 || - strncasecmp(name, TSDB_INS_TABLE_TAGS, TSDB_TABLE_FNAME_LEN) == 0 || - strncasecmp(name, TSDB_INS_TABLE_COLS, TSDB_TABLE_FNAME_LEN) == 0 || pInfo->pCur != NULL) { - if (pInfo->pAPI->metaFn.closeTableMetaCursor != NULL) { - pInfo->pAPI->metaFn.closeTableMetaCursor(pInfo->pCur); - } + if (pInfo->name.type == TSDB_TABLE_NAME_T) { + const char* name = tNameGetTableName(&pInfo->name); + if (strncasecmp(name, TSDB_INS_TABLE_TABLES, TSDB_TABLE_FNAME_LEN) == 0 || + strncasecmp(name, TSDB_INS_TABLE_TAGS, TSDB_TABLE_FNAME_LEN) == 0 || + strncasecmp(name, TSDB_INS_TABLE_COLS, TSDB_TABLE_FNAME_LEN) == 0 || pInfo->pCur != NULL) { + if (pInfo->pAPI->metaFn.closeTableMetaCursor != NULL) { + pInfo->pAPI->metaFn.closeTableMetaCursor(pInfo->pCur); + } - pInfo->pCur = NULL; + pInfo->pCur = NULL; + } + } else { + qError("pInfo->name is not initialized"); } if (pInfo->pIdx) { @@ -2470,12 +2480,18 @@ static int32_t optSysTabFilte(void* arg, SNode* cond, SArray* result) { bool hasIdx = false; bool hasRslt = true; SArray* mRslt = taosArrayInit(len, POINTER_BYTES); + if (!mRslt) { + return terrno; + } SListCell* cell = pList->pHead; for (int i = 0; i < len; i++) { if (cell == NULL) break; SArray* aRslt = taosArrayInit(16, sizeof(int64_t)); + if (!aRslt) { + return terrno; + } ret = optSysTabFilteImpl(arg, cell->pNode, aRslt); if (ret == 0) { @@ -2694,6 +2710,7 @@ int32_t createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDistScanP } pInfo->pResBlock = createDataBlockFromDescNode(pBlockScanNode->node.pOutputDataBlockDesc); + QUERY_CHECK_NULL(pInfo->pResBlock, code, lino, _error, terrno); code = blockDataEnsureCapacity(pInfo->pResBlock, 1); QUERY_CHECK_CODE(code, lino, _error); diff --git a/source/libs/executor/src/timesliceoperator.c b/source/libs/executor/src/timesliceoperator.c index 99a66efecb..a4bf2dce72 100644 --- a/source/libs/executor/src/timesliceoperator.c +++ b/source/libs/executor/src/timesliceoperator.c @@ -666,6 +666,9 @@ static int32_t initGroupKeyKeeper(STimeSliceOperatorInfo* pInfo, SExprSupp* pExp pInfo->pPrevGroupKey->type = pExprInfo->base.resSchema.type; pInfo->pPrevGroupKey->isNull = false; pInfo->pPrevGroupKey->pData = taosMemoryCalloc(1, pInfo->pPrevGroupKey->bytes); + if (!pInfo->pPrevGroupKey->pData) { + return terrno; + } } } @@ -1156,6 +1159,7 @@ int32_t createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyN pInfo->pFillColInfo = createFillColInfo(pExprInfo, numOfExprs, NULL, 0, (SNodeListNode*)pInterpPhyNode->pFillValues); pInfo->pLinearInfo = NULL; pInfo->pRes = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); + QUERY_CHECK_NULL(pInfo->pRes, code, lino, _error, terrno); pInfo->win = pInterpPhyNode->timeRange; pInfo->interval.interval = pInterpPhyNode->interval; pInfo->current = pInfo->win.skey; @@ -1174,6 +1178,7 @@ int32_t createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyN if (IS_VAR_DATA_TYPE(pInfo->pkCol.type)) { pInfo->prevKey.pks[0].pData = taosMemoryCalloc(1, pInfo->pkCol.bytes); + QUERY_CHECK_NULL(pInfo->prevKey.pks[0].pData, code, lino, _error, terrno); } } diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 989eb97327..7a5ab58331 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1285,6 +1285,7 @@ int32_t createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPhysiNode } SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->window.node.pOutputDataBlockDesc); + QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno); initBasicInfo(&pInfo->binfo, pResBlock); SExprSupp* pSup = &pOperator->exprSupp; @@ -1625,6 +1626,7 @@ int32_t createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWinodwPhy } SSDataBlock* pResBlock = createDataBlockFromDescNode(pStateNode->window.node.pOutputDataBlockDesc); + QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno); initBasicInfo(&pInfo->binfo, pResBlock); initResultRowInfo(&pInfo->binfo.resultRowInfo); @@ -1699,6 +1701,7 @@ int32_t createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPh QUERY_CHECK_CODE(code, lino, _error); SSDataBlock* pResBlock = createDataBlockFromDescNode(pSessionNode->window.node.pOutputDataBlockDesc); + QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno); initBasicInfo(&pInfo->binfo, pResBlock); code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str, @@ -2039,6 +2042,7 @@ int32_t createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SMerge QUERY_CHECK_CODE(code, lino, _error); SSDataBlock* pResBlock = createDataBlockFromDescNode(pNode->window.node.pOutputDataBlockDesc); + QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno); initBasicInfo(&iaInfo->binfo, pResBlock); code = initExecTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &iaInfo->win); QUERY_CHECK_CODE(code, lino, _error); @@ -2366,6 +2370,7 @@ int32_t createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMergeInterva } SSDataBlock* pResBlock = createDataBlockFromDescNode(pIntervalPhyNode->window.node.pOutputDataBlockDesc); + QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno); initBasicInfo(&pIntervalInfo->binfo, pResBlock); code = initExecTimeWindowInfo(&pIntervalInfo->twAggSup.timeWindowData, &pIntervalInfo->win); QUERY_CHECK_CODE(code, lino, _error); diff --git a/source/libs/executor/test/executorTests.cpp b/source/libs/executor/test/executorTests.cpp index 5b34075ecf..ff33732b23 100644 --- a/source/libs/executor/test/executorTests.cpp +++ b/source/libs/executor/test/executorTests.cpp @@ -198,6 +198,7 @@ SSDataBlock* get2ColsDummyBlock(SOperatorInfo* pOperator) { SOperatorInfo* createDummyOperator(int32_t startVal, int32_t numOfBlocks, int32_t rowsPerPage, int32_t type, int32_t numOfCols) { SOperatorInfo* pOperator = static_cast(taosMemoryCalloc(1, sizeof(SOperatorInfo))); + ASSERT(!pOperator); pOperator->name = "dummyInputOpertor4Test"; if (numOfCols == 1) { @@ -207,6 +208,7 @@ SOperatorInfo* createDummyOperator(int32_t startVal, int32_t numOfBlocks, int32_ } SDummyInputInfo* pInfo = (SDummyInputInfo*)taosMemoryCalloc(1, sizeof(SDummyInputInfo)); + ASSERT(!pInfo); pInfo->totalPages = numOfBlocks; pInfo->startVal = startVal; pInfo->numOfRowsPerPage = rowsPerPage; diff --git a/tests/system-test/0-others/taosShell.py b/tests/system-test/0-others/taosShell.py index 5158c3dfac..91e9f2fb89 100644 --- a/tests/system-test/0-others/taosShell.py +++ b/tests/system-test/0-others/taosShell.py @@ -16,6 +16,9 @@ from util.sql import * from util.cases import * from util.dnodes import * +import subprocess +import threading + def taos_command (buildPath, key, value, expectString, cfgDir, sqlString='', key1='', value1=''): if len(key) == 0: tdLog.exit("taos test key is null!") @@ -108,7 +111,7 @@ class TDTestCase: updatecfgDict["fqdn"] = hostname print ("===================: ", updatecfgDict) - + taos_output = [] def init(self, conn, logSql, replicaVar=1): self.replicaVar = int(replicaVar) tdLog.debug(f"start to excute {__file__}") @@ -116,7 +119,6 @@ class TDTestCase: def getBuildPath(self): selfPath = os.path.dirname(os.path.realpath(__file__)) - if ("community" in selfPath): projPath = selfPath[:selfPath.find("community")] else: @@ -129,6 +131,29 @@ class TDTestCase: buildPath = root[:len(root) - len("/build/bin")] break return buildPath + def run_command(self, commands): + + count = 0 + while count < 2: + # print(f"count: {count}") + value = subprocess.getoutput(f"nohup {commands} &") + # print(f"value: {value}") + self.taos_output.append(value) + count += 1 + + def taos_thread_repeat_k(self, run_command, commands, threads_num=10, output=[]): + threads = [] + taos_output = self.taos_output + for id in range(threads_num): + #threads.append(Process(target=cloud_consumer, args=(id,))) + threads.append(threading.Thread(target=run_command, args=(commands,))) + for tr in threads: + tr.start() + for tr in threads: + tr.join() + for value in taos_output: + if "crash" in value: + tdLog.exit(f"command: {commands} crash") def run(self): # sourcery skip: extract-duplicate-method, remove-redundant-fstring tdSql.prepare() @@ -445,6 +470,12 @@ class TDTestCase: tdSql.query('drop database %s'%newDbName) + commands = f"{buildPath}/taos -k -c {cfgPath}" + output = self.run_command(commands) + os.sys + self.taos_thread_repeat_k(self.run_command, commands, 100, output) + # os.system("python 0-others/repeat_taos_k.py") + def stop(self): tdSql.close() tdLog.success(f"{__file__} successfully executed") diff --git a/tests/system-test/0-others/taosdShell.py b/tests/system-test/0-others/taosdShell.py index 3b9eb66705..3d1162c370 100644 --- a/tests/system-test/0-others/taosdShell.py +++ b/tests/system-test/0-others/taosdShell.py @@ -88,24 +88,29 @@ class TDTestCase: pid = 0 return pid - def checkAndstopPro(self,processName,startAction): - i = 1 - count = 10 + def checkAndstopPro(self, processName, startAction, count = 10): for i in range(count): taosdPid=self.get_process_pid(processName) + print("taosdPid:",taosdPid) if taosdPid != 0 and taosdPid != "" : tdLog.info("stop taosd %s ,kill pid :%s "%(startAction,taosdPid)) - os.system("kill -9 %d"%taosdPid) - break + for j in range(count): + os.system("kill -9 %d"%taosdPid) + taosdPid=self.get_process_pid(processName) + print("taosdPid2:",taosdPid) + if taosdPid == 0 or taosdPid == "" : + tdLog.info("taosd %s is stoped "%startAction) + return else: tdLog.info( "wait start taosd ,times: %d "%i) time.sleep(1) - i+= 1 else : tdLog.exit("taosd %s is not running "%startAction) + def taosdCommandStop(self,startAction,taosdCmdRun): - processName="taosd" + processName= "taosd" + count = 10 if platform.system().lower() == 'windows': processName="taosd.exe" taosdCmd = taosdCmdRun + startAction @@ -116,7 +121,7 @@ class TDTestCase: else: logTime=datetime.now().strftime('%Y%m%d_%H%M%S_%f') os.system(f"nohup {taosdCmd} > {logTime}.log 2>&1 & ") - self.checkAndstopPro(processName,startAction) + self.checkAndstopPro(processName,startAction,count) os.system(f"rm -rf {logTime}.log")