From c7bbfc9465446b52a72399b18428bfe0b7a4369e Mon Sep 17 00:00:00 2001 From: chenhaoran Date: Sat, 3 Aug 2024 20:47:15 +0800 Subject: [PATCH 01/12] test: confirm that the process has been stopped successfully --- tests/system-test/0-others/taosdShell.py | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/tests/system-test/0-others/taosdShell.py b/tests/system-test/0-others/taosdShell.py index 3b9eb66705..3d1162c370 100644 --- a/tests/system-test/0-others/taosdShell.py +++ b/tests/system-test/0-others/taosdShell.py @@ -88,24 +88,29 @@ class TDTestCase: pid = 0 return pid - def checkAndstopPro(self,processName,startAction): - i = 1 - count = 10 + def checkAndstopPro(self, processName, startAction, count = 10): for i in range(count): taosdPid=self.get_process_pid(processName) + print("taosdPid:",taosdPid) if taosdPid != 0 and taosdPid != "" : tdLog.info("stop taosd %s ,kill pid :%s "%(startAction,taosdPid)) - os.system("kill -9 %d"%taosdPid) - break + for j in range(count): + os.system("kill -9 %d"%taosdPid) + taosdPid=self.get_process_pid(processName) + print("taosdPid2:",taosdPid) + if taosdPid == 0 or taosdPid == "" : + tdLog.info("taosd %s is stoped "%startAction) + return else: tdLog.info( "wait start taosd ,times: %d "%i) time.sleep(1) - i+= 1 else : tdLog.exit("taosd %s is not running "%startAction) + def taosdCommandStop(self,startAction,taosdCmdRun): - processName="taosd" + processName= "taosd" + count = 10 if platform.system().lower() == 'windows': processName="taosd.exe" taosdCmd = taosdCmdRun + startAction @@ -116,7 +121,7 @@ class TDTestCase: else: logTime=datetime.now().strftime('%Y%m%d_%H%M%S_%f') os.system(f"nohup {taosdCmd} > {logTime}.log 2>&1 & ") - self.checkAndstopPro(processName,startAction) + self.checkAndstopPro(processName,startAction,count) os.system(f"rm -rf {logTime}.log") From dd5b5474045b6910c1d72a8be19b682b2fa55871 Mon Sep 17 00:00:00 2001 From: chenhaoran Date: Mon, 5 Aug 2024 09:48:02 +0800 Subject: [PATCH 02/12] test:add concurrency test cases for executing the taos -k command line. --- tests/system-test/0-others/taosShell.py | 37 +++++++++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/tests/system-test/0-others/taosShell.py b/tests/system-test/0-others/taosShell.py index 5158c3dfac..dc741cbfcf 100644 --- a/tests/system-test/0-others/taosShell.py +++ b/tests/system-test/0-others/taosShell.py @@ -16,6 +16,9 @@ from util.sql import * from util.cases import * from util.dnodes import * +import subprocess +import threading + def taos_command (buildPath, key, value, expectString, cfgDir, sqlString='', key1='', value1=''): if len(key) == 0: tdLog.exit("taos test key is null!") @@ -129,6 +132,34 @@ class TDTestCase: buildPath = root[:len(root) - len("/build/bin")] break return buildPath + def run_command(self, commands): + self.taos_output = [] + count = 0 + while count < 2: + print(f"count: {count}") + value = subprocess.getoutput(f"nohup {commands} &") + print(f"value: {value}") + self.taos_output.append(value) + count += 1 + return self.taos_output + + def taos_thread_repeat_k(self, run_command, commands, threads_num=10, output=[]): + threads = [] + taos_output = self.taos_output + threads_num = 20 + + for id in range(threads_num): + #threads.append(Process(target=cloud_consumer, args=(id,))) + threads.append(threading.Thread(target=run_command, args=(commands,))) + for tr in threads: + tr.start() + for tr in threads: + tr.join() + + for value in taos_output: + if "crash" in value: + print(f"command: {commands} crash") + exit(1) def run(self): # sourcery skip: extract-duplicate-method, remove-redundant-fstring tdSql.prepare() @@ -445,6 +476,12 @@ class TDTestCase: tdSql.query('drop database %s'%newDbName) + commands = f"taos -k -c {cfgPath}" + output = self.run_command(commands) + os.sys + self.taos_thread_repeat_k(self.run_command, commands, 10, output) + # os.system("python 0-others/repeat_taos_k.py") + def stop(self): tdSql.close() tdLog.success(f"{__file__} successfully executed") From 1a871e0292dbaee438b023ae803e04862d5c8e08 Mon Sep 17 00:00:00 2001 From: chenhaoran Date: Mon, 5 Aug 2024 11:01:32 +0800 Subject: [PATCH 03/12] test:add concurrency test cases for executing the taos -k command line. --- tests/system-test/0-others/taosShell.py | 21 +++++++-------------- 1 file changed, 7 insertions(+), 14 deletions(-) diff --git a/tests/system-test/0-others/taosShell.py b/tests/system-test/0-others/taosShell.py index dc741cbfcf..549231fc6c 100644 --- a/tests/system-test/0-others/taosShell.py +++ b/tests/system-test/0-others/taosShell.py @@ -111,7 +111,7 @@ class TDTestCase: updatecfgDict["fqdn"] = hostname print ("===================: ", updatecfgDict) - + taos_output = [] def init(self, conn, logSql, replicaVar=1): self.replicaVar = int(replicaVar) tdLog.debug(f"start to excute {__file__}") @@ -119,7 +119,6 @@ class TDTestCase: def getBuildPath(self): selfPath = os.path.dirname(os.path.realpath(__file__)) - if ("community" in selfPath): projPath = selfPath[:selfPath.find("community")] else: @@ -133,21 +132,17 @@ class TDTestCase: break return buildPath def run_command(self, commands): - self.taos_output = [] + count = 0 while count < 2: - print(f"count: {count}") + # print(f"count: {count}") value = subprocess.getoutput(f"nohup {commands} &") - print(f"value: {value}") + # print(f"value: {value}") self.taos_output.append(value) count += 1 - return self.taos_output - def taos_thread_repeat_k(self, run_command, commands, threads_num=10, output=[]): threads = [] - taos_output = self.taos_output - threads_num = 20 - + taos_output = self.taos_output for id in range(threads_num): #threads.append(Process(target=cloud_consumer, args=(id,))) threads.append(threading.Thread(target=run_command, args=(commands,))) @@ -155,11 +150,9 @@ class TDTestCase: tr.start() for tr in threads: tr.join() - for value in taos_output: if "crash" in value: - print(f"command: {commands} crash") - exit(1) + tdLog.exit(f"command: {commands} crash") def run(self): # sourcery skip: extract-duplicate-method, remove-redundant-fstring tdSql.prepare() @@ -479,7 +472,7 @@ class TDTestCase: commands = f"taos -k -c {cfgPath}" output = self.run_command(commands) os.sys - self.taos_thread_repeat_k(self.run_command, commands, 10, output) + self.taos_thread_repeat_k(self.run_command, commands, 100, output) # os.system("python 0-others/repeat_taos_k.py") def stop(self): From de572c8c0fee49f709007eb14e015fe4682b4709 Mon Sep 17 00:00:00 2001 From: chenhaoran Date: Mon, 5 Aug 2024 11:02:23 +0800 Subject: [PATCH 04/12] test:add concurrency test cases for executing the taos -k command line. --- tests/system-test/0-others/taosShell.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/system-test/0-others/taosShell.py b/tests/system-test/0-others/taosShell.py index 549231fc6c..b046785903 100644 --- a/tests/system-test/0-others/taosShell.py +++ b/tests/system-test/0-others/taosShell.py @@ -140,6 +140,7 @@ class TDTestCase: # print(f"value: {value}") self.taos_output.append(value) count += 1 + def taos_thread_repeat_k(self, run_command, commands, threads_num=10, output=[]): threads = [] taos_output = self.taos_output From 0ffef4947bb11912a98aaefde7505083f222f7f9 Mon Sep 17 00:00:00 2001 From: chenhaoran Date: Mon, 5 Aug 2024 11:08:51 +0800 Subject: [PATCH 05/12] test:add concurrency test cases for executing the taos -k command line. --- tests/system-test/0-others/taosShell.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/system-test/0-others/taosShell.py b/tests/system-test/0-others/taosShell.py index b046785903..91e9f2fb89 100644 --- a/tests/system-test/0-others/taosShell.py +++ b/tests/system-test/0-others/taosShell.py @@ -470,7 +470,7 @@ class TDTestCase: tdSql.query('drop database %s'%newDbName) - commands = f"taos -k -c {cfgPath}" + commands = f"{buildPath}/taos -k -c {cfgPath}" output = self.run_command(commands) os.sys self.taos_thread_repeat_k(self.run_command, commands, 100, output) From 1b5cf65ab9156dc7d79f3afddef52eb9cdd1f71a Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Mon, 5 Aug 2024 11:17:49 +0800 Subject: [PATCH 06/12] fix issue for function res --- source/libs/executor/src/aggregateoperator.c | 18 +++++++++-- source/libs/executor/src/cachescanoperator.c | 1 + .../libs/executor/src/countwindowoperator.c | 1 + source/libs/executor/src/executil.c | 5 ++++ source/libs/executor/src/executorInt.c | 10 +++++-- source/libs/executor/src/filloperator.c | 4 ++- source/libs/executor/src/groupoperator.c | 1 + source/libs/executor/src/scanoperator.c | 26 ++++++++++++++++ .../executor/src/streamcountwindowoperator.c | 1 + .../executor/src/streameventwindowoperator.c | 1 + source/libs/executor/src/streamfilloperator.c | 2 ++ .../executor/src/streamtimewindowoperator.c | 4 +++ source/libs/executor/src/sysscanoperator.c | 30 +++++++++++-------- source/libs/executor/src/timesliceoperator.c | 1 + source/libs/executor/src/timewindowoperator.c | 5 ++++ 15 files changed, 92 insertions(+), 18 deletions(-) diff --git a/source/libs/executor/src/aggregateoperator.c b/source/libs/executor/src/aggregateoperator.c index 7e105d2260..4f6840b918 100644 --- a/source/libs/executor/src/aggregateoperator.c +++ b/source/libs/executor/src/aggregateoperator.c @@ -73,7 +73,8 @@ int32_t createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pA SOperatorInfo** pOptrInfo) { QRY_OPTR_CHECK(pOptrInfo); - int32_t code = 0; + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; SAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SAggOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { @@ -84,6 +85,7 @@ int32_t createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pA pOperator->exprSupp.hasWindowOrGroup = false; SSDataBlock* pResBlock = createDataBlockFromDescNode(pAggNode->node.pOutputDataBlockDesc); + QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno); initBasicInfo(&pInfo->binfo, pResBlock); size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; @@ -91,6 +93,7 @@ int32_t createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pA int32_t num = 0; SExprInfo* pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num); + code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str, pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore); if (code != TSDB_CODE_SUCCESS) { @@ -140,6 +143,9 @@ int32_t createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pA return code; _error: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } if (pInfo != NULL) { destroyAggOperatorInfo(pInfo); } @@ -480,6 +486,10 @@ int32_t addNewResultRowBuf(SResultRow* pWindowRes, SDiskbasedBuf* pResultBuf, ui if (taosArrayGetSize(list) == 0) { pData = getNewBufPage(pResultBuf, &pageId); + if (pData == NULL) { + qError("failed to get buffer, code:%s", tstrerror(terrno)); + return terrno; + } pData->num = sizeof(SFilePage); } else { SPageInfo* pi = getLastPageInfo(list); @@ -496,9 +506,11 @@ int32_t addNewResultRowBuf(SResultRow* pWindowRes, SDiskbasedBuf* pResultBuf, ui releaseBufPageInfo(pResultBuf, pi); pData = getNewBufPage(pResultBuf, &pageId); - if (pData != NULL) { - pData->num = sizeof(SFilePage); + if (pData == NULL) { + qError("failed to get buffer, code:%s", tstrerror(terrno)); + return terrno; } + pData->num = sizeof(SFilePage); } } diff --git a/source/libs/executor/src/cachescanoperator.c b/source/libs/executor/src/cachescanoperator.c index 9d49c8e9ca..cb6223f389 100644 --- a/source/libs/executor/src/cachescanoperator.c +++ b/source/libs/executor/src/cachescanoperator.c @@ -107,6 +107,7 @@ int32_t createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandl SDataBlockDescNode* pDescNode = pScanNode->scan.node.pOutputDataBlockDesc; pInfo->pRes = createDataBlockFromDescNode(pDescNode); + QUERY_CHECK_NULL(pInfo->pRes, code, lino, _error, terrno); code = extractColMatchInfo(pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo); QUERY_CHECK_CODE(code, lino, _error); diff --git a/source/libs/executor/src/countwindowoperator.c b/source/libs/executor/src/countwindowoperator.c index b7aa57e4b1..95b001d089 100644 --- a/source/libs/executor/src/countwindowoperator.c +++ b/source/libs/executor/src/countwindowoperator.c @@ -271,6 +271,7 @@ int32_t createCountwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* phy QUERY_CHECK_CODE(code, lino, _error); SSDataBlock* pResBlock = createDataBlockFromDescNode(pCountWindowNode->window.node.pOutputDataBlockDesc); + QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno); code = blockDataEnsureCapacity(pResBlock, pOperator->resultInfo.capacity); QUERY_CHECK_CODE(code, lino, _error); diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 5957d08a18..8e79320907 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -1835,6 +1835,9 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t* } SExprInfo* pExprs = taosMemoryCalloc(*numOfExprs, sizeof(SExprInfo)); + if (!pExprs) { + return NULL; + } for (int32_t i = 0; i < (*numOfExprs); ++i) { STargetNode* pTargetNode = NULL; @@ -1937,6 +1940,8 @@ SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, } else { char* udfName = pExpr->pExpr->_function.pFunctNode->functionName; pCtx->udfName = taosStrdup(udfName); + QUERY_CHECK_NULL(pCtx->udfName, code, lino, _end, terrno); + code = fmGetUdafExecFuncs(pCtx->functionId, &pCtx->fpSet); QUERY_CHECK_CODE(code, lino, _end); } diff --git a/source/libs/executor/src/executorInt.c b/source/libs/executor/src/executorInt.c index bceefd2c0d..e0c180b149 100644 --- a/source/libs/executor/src/executorInt.c +++ b/source/libs/executor/src/executorInt.c @@ -89,6 +89,10 @@ SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int32_t* currentPageId, i int32_t pageId = -1; if (*currentPageId == -1) { pData = getNewBufPage(pResultBuf, &pageId); + if (pData == NULL) { + qError("failed to get buffer, code:%s", tstrerror(terrno)); + return NULL; + } pData->num = sizeof(SFilePage); } else { pData = getBufPage(pResultBuf, *currentPageId); @@ -104,9 +108,11 @@ SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int32_t* currentPageId, i releaseBufPage(pResultBuf, pData); pData = getNewBufPage(pResultBuf, &pageId); - if (pData != NULL) { - pData->num = sizeof(SFilePage); + if (pData == NULL) { + qError("failed to get buffer, code:%s", tstrerror(terrno)); + return NULL; } + pData->num = sizeof(SFilePage); } } diff --git a/source/libs/executor/src/filloperator.c b/source/libs/executor/src/filloperator.c index c4ef74608a..d1f92c1245 100644 --- a/source/libs/executor/src/filloperator.c +++ b/source/libs/executor/src/filloperator.c @@ -454,7 +454,8 @@ static int32_t createPrimaryTsExprIfNeeded(SFillOperatorInfo* pInfo, SFillPhysiN int32_t createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFillNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) { QRY_OPTR_CHECK(pOptrInfo); - int32_t code = 0; + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; SFillOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SFillOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); @@ -464,6 +465,7 @@ int32_t createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFi } pInfo->pRes = createDataBlockFromDescNode(pPhyFillNode->node.pOutputDataBlockDesc); + QUERY_CHECK_NULL(pInfo->pRes, code, lino, _error, terrno); SExprInfo* pExprInfo = createExprInfo(pPhyFillNode->pFillExprs, NULL, &pInfo->numOfExpr); pOperator->exprSupp.pExprInfo = pExprInfo; diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index d88aef8fb7..ee94f435e3 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -1168,6 +1168,7 @@ int32_t createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNo uint32_t defaultBufsz = 0; pInfo->binfo.pRes = createDataBlockFromDescNode(pPartNode->node.pOutputDataBlockDesc); + QUERY_CHECK_NULL(pInfo->binfo.pRes, code, lino, _error, terrno); code = getBufferPgSize(pInfo->binfo.pRes->info.rowSize, &defaultPgsz, &defaultBufsz); if (code != TSDB_CODE_SUCCESS) { terrno = code; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index acc3de3447..4b154af0d6 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -476,17 +476,27 @@ static void freeTableCachedVal(void* param) { } static STableCachedVal* createTableCacheVal(const SMetaReader* pMetaReader) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; STableCachedVal* pVal = taosMemoryMalloc(sizeof(STableCachedVal)); + QUERY_CHECK_NULL(pVal, code, lino, _end, terrno); pVal->pName = taosStrdup(pMetaReader->me.name); + QUERY_CHECK_NULL(pVal->pName, code, lino, _end, terrno); pVal->pTags = NULL; // only child table has tag value if (pMetaReader->me.type == TSDB_CHILD_TABLE) { STag* pTag = (STag*)pMetaReader->me.ctbEntry.pTags; pVal->pTags = taosMemoryMalloc(pTag->len); + QUERY_CHECK_NULL(pVal->pTags, code, lino, _end, terrno); memcpy(pVal->pTags, pTag, pTag->len); } +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + return NULL; + } return pVal; } @@ -581,6 +591,9 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int pHandle->api.metaReaderFn.readerReleaseLock(&mr); STableCachedVal* pVal = createTableCacheVal(&mr); + if(!pVal) { + return terrno; + } val = *pVal; freeReader = true; @@ -1356,6 +1369,8 @@ int32_t createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHa pInfo->base.readerAPI = pTaskInfo->storageAPI.tsdReader; initResultSizeInfo(&pOperator->resultInfo, 4096); pInfo->pResBlock = createDataBlockFromDescNode(pDescNode); + QUERY_CHECK_NULL(pInfo->pResBlock, code, lino, _error, terrno); + code = prepareDataBlockBuf(pInfo->pResBlock, &pInfo->base.matchInfo); QUERY_CHECK_CODE(code, lino, _error); @@ -2485,6 +2500,7 @@ static int32_t doBlockDataPrimaryKeyFilter(SSDataBlock* pBlock, STqOffsetVal* of void* data = colDataGetData(pColPk, i); if (IS_VAR_DATA_TYPE(pColPk->info.type)) { void* tmq = taosMemoryMalloc(offset->primaryKey.nData + VARSTR_HEADER_SIZE); + QUERY_CHECK_NULL(tmq, code, lino, _end, terrno); memcpy(varDataVal(tmq), offset->primaryKey.pData, offset->primaryKey.nData); varDataLen(tmq) = offset->primaryKey.nData; p[i] = (*ts > offset->ts) || (func(data, tmq) > 0); @@ -2709,6 +2725,7 @@ _end: static int32_t processPrimaryKey(SSDataBlock* pBlock, bool hasPrimaryKey, STqOffsetVal* offset) { int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; SValue val = {0}; if (hasPrimaryKey) { code = doBlockDataPrimaryKeyFilter(pBlock, offset); @@ -2725,6 +2742,7 @@ static int32_t processPrimaryKey(SSDataBlock* pBlock, bool hasPrimaryKey, STqOff val.type = pColPk->info.type; if (IS_VAR_DATA_TYPE(pColPk->info.type)) { val.pData = taosMemoryMalloc(varDataLen(tmp)); + QUERY_CHECK_NULL(val.pData, code, lino, _end, terrno); val.nData = varDataLen(tmp); memcpy(val.pData, varDataVal(tmp), varDataLen(tmp)); } else { @@ -2732,6 +2750,11 @@ static int32_t processPrimaryKey(SSDataBlock* pBlock, bool hasPrimaryKey, STqOff } } tqOffsetResetToData(offset, pBlock->info.id.uid, pBlock->info.window.ekey, val); + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } return code; } @@ -3990,6 +4013,7 @@ int32_t createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* } pInfo->pRes = createDataBlockFromDescNode(pDescNode); + QUERY_CHECK_NULL(pInfo->pRes, code, lino, _error, terrno); code = createSpecialDataBlock(STREAM_CLEAR, &pInfo->pUpdateRes); QUERY_CHECK_CODE(code, lino, _error); @@ -4384,6 +4408,8 @@ static int32_t doTagScanFromCtbIdxNext(SOperatorInfo* pOperator, SSDataBlock** p } STUidTagInfo info = {.uid = uid, .pTagVal = pCur->pVal}; info.pTagVal = taosMemoryMalloc(pCur->vLen); + QUERY_CHECK_NULL(info.pTagVal, code, lino, _end, terrno); + memcpy(info.pTagVal, pCur->pVal, pCur->vLen); void* tmp = taosArrayPush(aUidTags, &info); QUERY_CHECK_NULL(tmp, code, lino, _end, terrno); diff --git a/source/libs/executor/src/streamcountwindowoperator.c b/source/libs/executor/src/streamcountwindowoperator.c index 6adc60b79e..c4ca4733b2 100644 --- a/source/libs/executor/src/streamcountwindowoperator.c +++ b/source/libs/executor/src/streamcountwindowoperator.c @@ -831,6 +831,7 @@ int32_t createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* SExprInfo* pExprInfo = createExprInfo(pCountNode->window.pFuncs, NULL, &numOfCols); SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); + QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno); code = initBasicInfoEx(&pInfo->binfo, pExpSup, pExprInfo, numOfCols, pResBlock, &pTaskInfo->storageAPI.functionStore); QUERY_CHECK_CODE(code, lino, _error); diff --git a/source/libs/executor/src/streameventwindowoperator.c b/source/libs/executor/src/streameventwindowoperator.c index 17ef2fe41f..d1610c47e9 100644 --- a/source/libs/executor/src/streameventwindowoperator.c +++ b/source/libs/executor/src/streameventwindowoperator.c @@ -886,6 +886,7 @@ int32_t createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* int32_t numOfCols = 0; SExprInfo* pExprInfo = createExprInfo(pEventNode->window.pFuncs, NULL, &numOfCols); SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); + QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno); code = initBasicInfoEx(&pInfo->binfo, pExpSup, pExprInfo, numOfCols, pResBlock, &pTaskInfo->storageAPI.functionStore); QUERY_CHECK_CODE(code, lino, _error); diff --git a/source/libs/executor/src/streamfilloperator.c b/source/libs/executor/src/streamfilloperator.c index 480814f6a0..7138a8bf21 100644 --- a/source/libs/executor/src/streamfilloperator.c +++ b/source/libs/executor/src/streamfilloperator.c @@ -1352,7 +1352,9 @@ int32_t createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysi initResultSizeInfo(&pOperator->resultInfo, 4096); pInfo->pRes = createDataBlockFromDescNode(pPhyFillNode->node.pOutputDataBlockDesc); + QUERY_CHECK_NULL(pInfo->pRes, code, lino, _error, terrno); pInfo->pSrcBlock = createDataBlockFromDescNode(pPhyFillNode->node.pOutputDataBlockDesc); + QUERY_CHECK_NULL(pInfo->pSrcBlock, code, lino, _error, terrno); code = blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); QUERY_CHECK_CODE(code, lino, _error); diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 7462d71a8a..49930ad19b 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -1888,6 +1888,7 @@ int32_t createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiN int32_t numOfCols = 0; SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &numOfCols); SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); + QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno); initBasicInfo(&pInfo->binfo, pResBlock); pInfo->pState = taosMemoryCalloc(1, sizeof(SStreamState)); @@ -3700,6 +3701,7 @@ int32_t createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode SExprInfo* pExprInfo = createExprInfo(pSessionNode->window.pFuncs, NULL, &numOfCols); SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); + QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno); code = initBasicInfoEx(&pInfo->binfo, pExpSup, pExprInfo, numOfCols, pResBlock, &pTaskInfo->storageAPI.functionStore); if (code != TSDB_CODE_SUCCESS) { goto _error; @@ -4851,6 +4853,7 @@ int32_t createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* int32_t numOfCols = 0; SExprInfo* pExprInfo = createExprInfo(pStateNode->window.pFuncs, NULL, &numOfCols); SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); + QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno); code = initBasicInfoEx(&pInfo->binfo, pExpSup, pExprInfo, numOfCols, pResBlock, &pTaskInfo->storageAPI.functionStore); if (code != TSDB_CODE_SUCCESS) { goto _error; @@ -5129,6 +5132,7 @@ int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &numOfCols); SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); + QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno); pInfo->interval = (SInterval){ .interval = pIntervalPhyNode->interval, .sliding = pIntervalPhyNode->sliding, diff --git a/source/libs/executor/src/sysscanoperator.c b/source/libs/executor/src/sysscanoperator.c index 5f4bbd66ce..e5fe457fe4 100644 --- a/source/libs/executor/src/sysscanoperator.c +++ b/source/libs/executor/src/sysscanoperator.c @@ -2083,6 +2083,7 @@ int32_t createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNo SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; + lino = __LINE__; goto _error; } @@ -2091,9 +2092,7 @@ int32_t createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNo int32_t num = 0; code = extractColMatchInfo(pScanNode->pScanCols, pDescNode, &num, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo); - if (code != TSDB_CODE_SUCCESS) { - goto _error; - } + QUERY_CHECK_CODE(code, lino, _error); extractTbnameSlotId(pInfo, pScanNode); @@ -2101,9 +2100,11 @@ int32_t createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNo pInfo->accountId = pScanPhyNode->accountId; pInfo->pUser = taosStrdup((void*)pUser); + QUERY_CHECK_NULL(pInfo->pUser, code, lino, _error, terrno); pInfo->sysInfo = pScanPhyNode->sysInfo; pInfo->showRewrite = pScanPhyNode->showRewrite; pInfo->pRes = createDataBlockFromDescNode(pDescNode); + QUERY_CHECK_NULL(pInfo->pRes, code, lino, _error, terrno); pInfo->pCondition = pScanNode->node.pConditions; code = filterInitFromNode(pScanNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0); QUERY_CHECK_CODE(code, lino, _error); @@ -2143,7 +2144,7 @@ _error: if (code != TSDB_CODE_SUCCESS) { qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); } - taosMemoryFreeClear(pOperator); + destroyOperator(pOperator); pTaskInfo->code = code; return code; } @@ -2173,15 +2174,19 @@ void destroySysScanOperator(void* param) { (void)tsem_destroy(&pInfo->ready); blockDataDestroy(pInfo->pRes); - const char* name = tNameGetTableName(&pInfo->name); - if (strncasecmp(name, TSDB_INS_TABLE_TABLES, TSDB_TABLE_FNAME_LEN) == 0 || - strncasecmp(name, TSDB_INS_TABLE_TAGS, TSDB_TABLE_FNAME_LEN) == 0 || - strncasecmp(name, TSDB_INS_TABLE_COLS, TSDB_TABLE_FNAME_LEN) == 0 || pInfo->pCur != NULL) { - if (pInfo->pAPI->metaFn.closeTableMetaCursor != NULL) { - pInfo->pAPI->metaFn.closeTableMetaCursor(pInfo->pCur); - } + if (pInfo->name.type == TSDB_TABLE_NAME_T) { + const char* name = tNameGetTableName(&pInfo->name); + if (strncasecmp(name, TSDB_INS_TABLE_TABLES, TSDB_TABLE_FNAME_LEN) == 0 || + strncasecmp(name, TSDB_INS_TABLE_TAGS, TSDB_TABLE_FNAME_LEN) == 0 || + strncasecmp(name, TSDB_INS_TABLE_COLS, TSDB_TABLE_FNAME_LEN) == 0 || pInfo->pCur != NULL) { + if (pInfo->pAPI->metaFn.closeTableMetaCursor != NULL) { + pInfo->pAPI->metaFn.closeTableMetaCursor(pInfo->pCur); + } - pInfo->pCur = NULL; + pInfo->pCur = NULL; + } + } else { + qError("pInfo->name is not initialized"); } if (pInfo->pIdx) { @@ -2694,6 +2699,7 @@ int32_t createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDistScanP } pInfo->pResBlock = createDataBlockFromDescNode(pBlockScanNode->node.pOutputDataBlockDesc); + QUERY_CHECK_NULL(pInfo->pResBlock, code, lino, _error, terrno); code = blockDataEnsureCapacity(pInfo->pResBlock, 1); QUERY_CHECK_CODE(code, lino, _error); diff --git a/source/libs/executor/src/timesliceoperator.c b/source/libs/executor/src/timesliceoperator.c index 6eaef50491..968432b385 100644 --- a/source/libs/executor/src/timesliceoperator.c +++ b/source/libs/executor/src/timesliceoperator.c @@ -1150,6 +1150,7 @@ int32_t createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyN pInfo->pFillColInfo = createFillColInfo(pExprInfo, numOfExprs, NULL, 0, (SNodeListNode*)pInterpPhyNode->pFillValues); pInfo->pLinearInfo = NULL; pInfo->pRes = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); + QUERY_CHECK_NULL(pInfo->pRes, code, lino, _error, terrno); pInfo->win = pInterpPhyNode->timeRange; pInfo->interval.interval = pInterpPhyNode->interval; pInfo->current = pInfo->win.skey; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index a1ec923352..a173611595 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1285,6 +1285,7 @@ int32_t createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPhysiNode } SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->window.node.pOutputDataBlockDesc); + QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno); initBasicInfo(&pInfo->binfo, pResBlock); SExprSupp* pSup = &pOperator->exprSupp; @@ -1613,6 +1614,7 @@ int32_t createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWinodwPhy } SSDataBlock* pResBlock = createDataBlockFromDescNode(pStateNode->window.node.pOutputDataBlockDesc); + QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno); initBasicInfo(&pInfo->binfo, pResBlock); initResultRowInfo(&pInfo->binfo.resultRowInfo); @@ -1684,6 +1686,7 @@ int32_t createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPh int32_t numOfCols = 0; SExprInfo* pExprInfo = createExprInfo(pSessionNode->window.pFuncs, NULL, &numOfCols); SSDataBlock* pResBlock = createDataBlockFromDescNode(pSessionNode->window.node.pOutputDataBlockDesc); + QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno); initBasicInfo(&pInfo->binfo, pResBlock); code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str, @@ -2019,6 +2022,7 @@ int32_t createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SMerge QUERY_CHECK_CODE(code, lino, _error); SSDataBlock* pResBlock = createDataBlockFromDescNode(pNode->window.node.pOutputDataBlockDesc); + QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno); initBasicInfo(&iaInfo->binfo, pResBlock); code = initExecTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &iaInfo->win); QUERY_CHECK_CODE(code, lino, _error); @@ -2344,6 +2348,7 @@ int32_t createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMergeInterva } SSDataBlock* pResBlock = createDataBlockFromDescNode(pIntervalPhyNode->window.node.pOutputDataBlockDesc); + QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno); initBasicInfo(&pIntervalInfo->binfo, pResBlock); code = initExecTimeWindowInfo(&pIntervalInfo->twAggSup.timeWindowData, &pIntervalInfo->win); QUERY_CHECK_CODE(code, lino, _error); From f0e4d2f085e59663684d7abfc1f85619892cd109 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Mon, 5 Aug 2024 12:22:25 +0800 Subject: [PATCH 07/12] fix malloc issue --- source/libs/executor/src/cachescanoperator.c | 2 + .../libs/executor/src/eventwindowoperator.c | 3 ++ source/libs/executor/src/exchangeoperator.c | 5 +++ source/libs/executor/src/executil.c | 28 ++++++++++-- source/libs/executor/src/executor.c | 1 + source/libs/executor/src/executorInt.c | 3 ++ source/libs/executor/src/filloperator.c | 3 ++ source/libs/executor/src/groupoperator.c | 10 +++++ source/libs/executor/src/scanoperator.c | 17 ++++++++ source/libs/executor/src/sortoperator.c | 3 ++ .../executor/src/streamcountwindowoperator.c | 4 ++ .../executor/src/streameventwindowoperator.c | 8 ++++ source/libs/executor/src/streamfilloperator.c | 19 ++++++-- .../executor/src/streamtimewindowoperator.c | 43 +++++++++++++++++++ source/libs/executor/src/sysscanoperator.c | 11 +++++ source/libs/executor/src/timesliceoperator.c | 4 ++ source/libs/executor/test/executorTests.cpp | 2 + 17 files changed, 160 insertions(+), 6 deletions(-) diff --git a/source/libs/executor/src/cachescanoperator.c b/source/libs/executor/src/cachescanoperator.c index 9d49c8e9ca..137f05c356 100644 --- a/source/libs/executor/src/cachescanoperator.c +++ b/source/libs/executor/src/cachescanoperator.c @@ -165,6 +165,7 @@ int32_t createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandl int32_t capacity = 0; pInfo->pUidList = taosArrayInit(4, sizeof(int64_t)); + QUERY_CHECK_NULL(pInfo->pUidList, code, lino, _error, terrno); // partition by tbname if (oneTableForEachGroup(pTableListInfo) || (totalTables == 1)) { @@ -203,6 +204,7 @@ int32_t createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandl p->pExprInfo = createExprInfo(pScanNode->scan.pScanPseudoCols, NULL, &p->numOfExprs); p->pCtx = createSqlFunctionCtx(p->pExprInfo, p->numOfExprs, &p->rowEntryInfoOffset, &pTaskInfo->storageAPI.functionStore); + QUERY_CHECK_NULL(p->pCtx, code, lino, _error, terrno); } setOperatorInfo(pOperator, "CachedRowScanOperator", QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN, false, OP_NOT_OPENED, diff --git a/source/libs/executor/src/eventwindowoperator.c b/source/libs/executor/src/eventwindowoperator.c index 629afbbb8e..90a0c7927e 100644 --- a/source/libs/executor/src/eventwindowoperator.c +++ b/source/libs/executor/src/eventwindowoperator.c @@ -238,6 +238,9 @@ static int32_t setSingleOutputTupleBufv1(SResultRowInfo* pResultRowInfo, STimeWi SExprSupp* pExprSup, SAggSupporter* pAggSup) { if (*pResult == NULL) { SResultRow* p = taosMemoryCalloc(1, pAggSup->resultRowSize); + if (!p) { + return terrno; + } pResultRowInfo->cur = (SResultRowPosition){.pageId = p->pageId, .offset = p->offset}; *pResult = p; } diff --git a/source/libs/executor/src/exchangeoperator.c b/source/libs/executor/src/exchangeoperator.c index 5af8df8f06..2f018d7a69 100644 --- a/source/libs/executor/src/exchangeoperator.c +++ b/source/libs/executor/src/exchangeoperator.c @@ -307,6 +307,9 @@ static int32_t initDataSource(int32_t numOfSources, SExchangeInfo* pInfo, const int32_t len = strlen(id) + 1; pInfo->pTaskId = taosMemoryCalloc(1, len); + if (!pInfo->pTaskId) { + return terrno; + } strncpy(pInfo->pTaskId, id, len); for (int32_t i = 0; i < numOfSources; ++i) { SSourceDataInfo dataInfo = {0}; @@ -389,7 +392,9 @@ int32_t createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode* pExNo pInfo->pDummyBlock = createDataBlockFromDescNode(pExNode->node.pOutputDataBlockDesc); pInfo->pResultBlockList = taosArrayInit(64, POINTER_BYTES); + QUERY_CHECK_NULL(pInfo->pResultBlockList, code, lino, _error, terrno); pInfo->pRecycledBlocks = taosArrayInit(64, POINTER_BYTES); + QUERY_CHECK_NULL(pInfo->pRecycledBlocks, code, lino, _error, terrno); SExchangeOpStopInfo stopInfo = {QUERY_NODE_PHYSICAL_PLAN_EXCHANGE, pInfo->self}; code = qAppendTaskStopInfo(pTaskInfo, &stopInfo); diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 5957d08a18..d0f0fce8fc 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -144,6 +144,7 @@ int32_t initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap, void* pData = NULL; pGroupResInfo->pRows = taosArrayInit(size, POINTER_BYTES); + QUERY_CHECK_NULL(pGroupResInfo->pRows, code, lino, _end, terrno); size_t keyLen = 0; int32_t iter = 0; @@ -353,9 +354,15 @@ EDealRes doTranslateTagExpr(SNode** pNode, void* pContext) { } else if (pSColumnNode->node.resType.type == TSDB_DATA_TYPE_JSON) { int32_t len = ((const STag*)p)->len; res->datum.p = taosMemoryCalloc(len + 1, 1); + if (NULL == res->datum.p) { + return DEAL_RES_ERROR; + } memcpy(res->datum.p, p, len); } else if (IS_VAR_DATA_TYPE(pSColumnNode->node.resType.type)) { res->datum.p = taosMemoryCalloc(tagVal.nData + VARSTR_HEADER_SIZE + 1, 1); + if (NULL == res->datum.p) { + return DEAL_RES_ERROR; + } memcpy(varDataVal(res->datum.p), tagVal.pData, tagVal.nData); varDataSetLen(res->datum.p, tagVal.nData); } else { @@ -378,6 +385,9 @@ EDealRes doTranslateTagExpr(SNode** pNode, void* pContext) { int32_t len = strlen(mr->me.name); res->datum.p = taosMemoryCalloc(len + VARSTR_HEADER_SIZE + 1, 1); + if (NULL == res->datum.p) { + return DEAL_RES_ERROR; + } memcpy(varDataVal(res->datum.p), mr->me.name, len); varDataSetLen(res->datum.p, len); nodesDestroyNode(*pNode); @@ -856,6 +866,7 @@ static SArray* getTableNameList(const SNodeListNode* pList) { // remove the duplicates SArray* pNewList = taosArrayInit(taosArrayGetSize(pTbList), sizeof(void*)); + QUERY_CHECK_NULL(pNewList, code, lino, _end, terrno); void* tmp = taosArrayPush(pNewList, taosArrayGet(pTbList, 0)); QUERY_CHECK_NULL(tmp, code, lino, _end, terrno); @@ -1739,6 +1750,7 @@ int32_t createExprFromOneNode(SExprInfo* pExp, SNode* pNode, int16_t slotId) { int32_t numOfParam = LIST_LENGTH(pFuncNode->pParameterList); pExp->base.pParam = taosMemoryCalloc(numOfParam, sizeof(SFunctParam)); + QUERY_CHECK_NULL(pExp->base.pParam, code, lino, _end, terrno); pExp->base.numOfParams = numOfParam; for (int32_t j = 0; j < numOfParam; ++j) { @@ -1760,6 +1772,7 @@ int32_t createExprFromOneNode(SExprInfo* pExp, SNode* pNode, int16_t slotId) { SOperatorNode* pOpNode = (SOperatorNode*)pNode; pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam)); + QUERY_CHECK_NULL(pExp->base.pParam, code, lino, _end, terrno); pExp->base.numOfParams = 1; SDataType* pType = &pOpNode->node.resType; @@ -1771,6 +1784,7 @@ int32_t createExprFromOneNode(SExprInfo* pExp, SNode* pNode, int16_t slotId) { SCaseWhenNode* pCaseNode = (SCaseWhenNode*)pNode; pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam)); + QUERY_CHECK_NULL(pExp->base.pParam, code, lino, _end, terrno); pExp->base.numOfParams = 1; SDataType* pType = &pCaseNode->node.resType; @@ -1781,9 +1795,8 @@ int32_t createExprFromOneNode(SExprInfo* pExp, SNode* pNode, int16_t slotId) { pExp->pExpr->nodeType = QUERY_NODE_OPERATOR; SLogicConditionNode* pCond = (SLogicConditionNode*)pNode; pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam)); - if (!pExp->base.pParam) { - code = terrno; - } + QUERY_CHECK_NULL(pExp->base.pParam, code, lino, _end, terrno); + if (TSDB_CODE_SUCCESS == code) { pExp->base.numOfParams = 1; SDataType* pType = &pCond->node.resType; @@ -1808,6 +1821,9 @@ int32_t createExprFromTargetNode(SExprInfo* pExp, STargetNode* pTargetNode) { SExprInfo* createExpr(SNodeList* pNodeList, int32_t* numOfExprs) { *numOfExprs = LIST_LENGTH(pNodeList); SExprInfo* pExprs = taosMemoryCalloc(*numOfExprs, sizeof(SExprInfo)); + if (!pExprs) { + return NULL; + } for (int32_t i = 0; i < (*numOfExprs); ++i) { SExprInfo* pExp = &pExprs[i]; @@ -2068,6 +2084,9 @@ int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysi pCond->numOfCols = LIST_LENGTH(pTableScanNode->scan.pScanCols); pCond->colList = taosMemoryCalloc(pCond->numOfCols, sizeof(SColumnInfo)); + if (!pCond->colList) { + return terrno; + } pCond->pSlotList = taosMemoryMalloc(sizeof(int32_t) * pCond->numOfCols); if (pCond->colList == NULL || pCond->pSlotList == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -2448,6 +2467,9 @@ static int32_t sortTableGroup(STableListInfo* pTableListInfo) { int32_t size = taosArrayGetSize(pTableListInfo->pTableList); SArray* pList = taosArrayInit(4, sizeof(int32_t)); + if (!pList) { + return terrno; + } STableKeyInfo* pInfo = taosArrayGet(pTableListInfo->pTableList, 0); uint64_t gid = pInfo->groupId; diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 3ee08d8fb0..ef76f14aa9 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -373,6 +373,7 @@ static int32_t filterUnqualifiedTables(const SStreamScanInfo* pScanInfo, const S int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; SArray* qa = taosArrayInit(4, sizeof(tb_uid_t)); + QUERY_CHECK_NULL(qa, code, lino, _end, terrno); int32_t numOfUids = taosArrayGetSize(tableIdList); if (numOfUids == 0) { (*ppArrayRes) = qa; diff --git a/source/libs/executor/src/executorInt.c b/source/libs/executor/src/executorInt.c index bceefd2c0d..acd500810c 100644 --- a/source/libs/executor/src/executorInt.c +++ b/source/libs/executor/src/executorInt.c @@ -409,6 +409,9 @@ static int32_t doCreateConstantValColumnSMAInfo(SInputColumnInfoData* pInput, SF SColumnDataAgg* da = NULL; if (pInput->pColumnDataAgg[paramIndex] == NULL) { da = taosMemoryCalloc(1, sizeof(SColumnDataAgg)); + if (!da) { + return terrno; + } pInput->pColumnDataAgg[paramIndex] = da; if (da == NULL) { return TSDB_CODE_OUT_OF_MEMORY; diff --git a/source/libs/executor/src/filloperator.c b/source/libs/executor/src/filloperator.c index c4ef74608a..964b331598 100644 --- a/source/libs/executor/src/filloperator.c +++ b/source/libs/executor/src/filloperator.c @@ -397,6 +397,9 @@ static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t pInfo->win.ekey = win.skey; } pInfo->p = taosMemoryCalloc(numOfCols, POINTER_BYTES); + if (pInfo->p) { + return terrno; + } if (pInfo->pFillInfo == NULL || pInfo->p == NULL) { taosMemoryFree(pInfo->pFillInfo); diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index d88aef8fb7..4371879cdc 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -854,6 +854,9 @@ _end: int32_t* setupColumnOffset(const SSDataBlock* pBlock, int32_t rowCapacity) { size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); int32_t* offset = taosMemoryCalloc(numOfCols, sizeof(int32_t)); + if (!offset) { + return NULL; + } offset[0] = sizeof(int32_t) + sizeof(uint64_t); // the number of rows in current page, ref to SSDataBlock paged serialization format @@ -1193,6 +1196,8 @@ int32_t createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNo blockDataGetCapacityInRow(pInfo->binfo.pRes, getBufPageSize(pInfo->pBuf), blockDataGetSerialMetaSize(taosArrayGetSize(pInfo->binfo.pRes->pDataBlock))); pInfo->columnOffset = setupColumnOffset(pInfo->binfo.pRes, pInfo->rowCapacity); + QUERY_CHECK_NULL(pInfo->columnOffset, code, lino, _error, terrno); + code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, pInfo->pGroupCols); if (code != TSDB_CODE_SUCCESS) { terrno = code; @@ -1431,6 +1436,7 @@ static void doStreamHashPartitionImpl(SStreamPartitionOperatorInfo* pInfo, SSDat SPartitionDataInfo newParData = {0}; newParData.groupId = calcGroupId(pInfo->partitionSup.keyBuf, keyLen); newParData.rowIds = taosArrayInit(64, sizeof(int32_t)); + QUERY_CHECK_NULL(newParData.rowIds, code, lino, _end, terrno); void* tmp = taosArrayPush(newParData.rowIds, &i); QUERY_CHECK_NULL(tmp, code, lino, _end, terrno); @@ -1594,6 +1600,9 @@ SSDataBlock* buildCreateTableBlock(SExprSupp* tbName, SExprSupp* tag) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock)); + if (!pBlock) { + return NULL; + } pBlock->info.hasVarCol = false; pBlock->info.id.groupId = 0; pBlock->info.rows = 0; @@ -1601,6 +1610,7 @@ SSDataBlock* buildCreateTableBlock(SExprSupp* tbName, SExprSupp* tag) { pBlock->info.watermark = INT64_MIN; pBlock->pDataBlock = taosArrayInit(4, sizeof(SColumnInfoData)); + QUERY_CHECK_NULL(pBlock->pDataBlock, code, lino, _end, terrno); SColumnInfoData infoData = {0}; infoData.info.type = TSDB_DATA_TYPE_VARCHAR; if (tbName->numOfExprs > 0) { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index acc3de3447..2daf9e1fc7 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1280,6 +1280,9 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { static int32_t getTableScannerExecInfo(struct SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) { SFileBlockLoadRecorder* pRecorder = taosMemoryCalloc(1, sizeof(SFileBlockLoadRecorder)); + if (!pRecorder) { + return terrno; + } STableScanInfo* pTableScanInfo = pOptr->info; *pRecorder = pTableScanInfo->base.readRecorder; *pOptrExplain = pRecorder; @@ -1341,6 +1344,7 @@ int32_t createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHa pSup->pExprInfo = createExprInfo(pScanNode->pScanPseudoCols, NULL, &pSup->numOfExprs); pSup->pCtx = createSqlFunctionCtx(pSup->pExprInfo, pSup->numOfExprs, &pSup->rowEntryInfoOffset, &pTaskInfo->storageAPI.functionStore); + QUERY_CHECK_NULL(pSup->pCtx, code, lino, _error, terrno); } pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]}; @@ -3007,6 +3011,9 @@ void streamScanOperatorDecode(void* pBuff, int32_t len, SStreamScanInfo* pInfo) } void* pUpInfo = taosMemoryCalloc(1, sizeof(SUpdateInfo)); + if (!pUpInfo) { + return; + } int32_t code = pInfo->stateStore.updateInfoDeserialize(buf, tlen, pUpInfo); if (code == TSDB_CODE_SUCCESS) { pInfo->stateStore.updateInfoDestroy(pInfo->pUpdateInfo); @@ -3466,6 +3473,7 @@ static int32_t extractTableIdList(const STableListInfo* pTableListInfo, SArray** int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; SArray* tableIdList = taosArrayInit(4, sizeof(uint64_t)); + QUERY_CHECK_NULL(tableIdList, code, lino, _end, terrno); // Transfer the Array of STableKeyInfo into uid list. size_t size = tableListGetSize(pTableListInfo); @@ -3861,6 +3869,8 @@ int32_t createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pInfo->primaryKeyIndex = -1; int32_t numOfOutput = taosArrayGetSize(pInfo->matchInfo.pList); pColIds = taosArrayInit(numOfOutput, sizeof(int16_t)); + QUERY_CHECK_NULL(pColIds, code, lino, _error, terrno); + for (int32_t i = 0; i < numOfOutput; ++i) { SColMatchItem* id = taosArrayGet(pInfo->matchInfo.pList, i); @@ -5286,6 +5296,7 @@ int32_t generateSortByTsPkInfo(SArray* colMatchInfo, int32_t order, SArray** ppS int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; SArray* pSortInfo = taosArrayInit(1, sizeof(SBlockOrderInfo)); + QUERY_CHECK_NULL(pSortInfo, code, lino, _end, terrno); SBlockOrderInfo biTs = {0}; SBlockOrderInfo biPk = {0}; @@ -5374,9 +5385,11 @@ int32_t startDurationForGroupTableMergeScan(SOperatorInfo* pOperator) { QUERY_CHECK_CODE(code, lino, _end); STableMergeScanSortSourceParam* param = taosMemoryCalloc(1, sizeof(STableMergeScanSortSourceParam)); + QUERY_CHECK_NULL(param, code, lino, _end, terrno); param->pOperator = pOperator; SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource)); + QUERY_CHECK_NULL(ps, code, lino, _end, terrno); ps->param = param; ps->onlyRef = false; code = tsortAddSource(pInfo->pSortHandle, ps); @@ -5653,6 +5666,9 @@ int32_t getTableMergeScanExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExpla ASSERT(pOptr != NULL); // TODO: merge these two info into one struct STableMergeScanExecInfo* execInfo = taosMemoryCalloc(1, sizeof(STableMergeScanExecInfo)); + if (!execInfo) { + return terrno; + } STableMergeScanInfo* pInfo = pOptr->info; execInfo->blockRecorder = pInfo->base.readRecorder; execInfo->sortExecInfo = pInfo->sortExecInfo; @@ -5697,6 +5713,7 @@ int32_t createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SR pSup->pExprInfo = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pSup->numOfExprs); pSup->pCtx = createSqlFunctionCtx(pSup->pExprInfo, pSup->numOfExprs, &pSup->rowEntryInfoOffset, &pTaskInfo->storageAPI.functionStore); + QUERY_CHECK_NULL(pSup->pCtx, code, lino, _error, terrno); } pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]}; diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index a0c56df49c..a4e1e0b648 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -60,6 +60,7 @@ int32_t createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortN QRY_OPTR_CHECK(pOptrInfo); int32_t code = 0; + int32_t lino = 0; SSortOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { @@ -89,6 +90,7 @@ int32_t createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortN pOperator->exprSupp.pCtx = createSqlFunctionCtx(pOperator->exprSupp.pExprInfo, numOfCols, &pOperator->exprSupp.rowEntryInfoOffset, &pTaskInfo->storageAPI.functionStore); + QUERY_CHECK_NULL(pOperator->exprSupp.pCtx, code, lino, _error, terrno); initResultSizeInfo(&pOperator->resultInfo, 1024); code = filterInitFromNode((SNode*)pSortNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0); if (code != TSDB_CODE_SUCCESS) { @@ -778,6 +780,7 @@ int32_t createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSortPhysiNo initResultSizeInfo(&pOperator->resultInfo, 1024); pOperator->exprSupp.pCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pOperator->exprSupp.rowEntryInfoOffset, &pTaskInfo->storageAPI.functionStore); + QUERY_CHECK_NULL(pOperator->exprSupp.pCtx, code, lino, _error, terrno); pInfo->binfo.pRes = createDataBlockFromDescNode(pDescNode); code = blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); diff --git a/source/libs/executor/src/streamcountwindowoperator.c b/source/libs/executor/src/streamcountwindowoperator.c index 6adc60b79e..27809f8a69 100644 --- a/source/libs/executor/src/streamcountwindowoperator.c +++ b/source/libs/executor/src/streamcountwindowoperator.c @@ -651,10 +651,12 @@ static int32_t doStreamCountAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRe SOperatorInfo* downstream = pOperator->pDownstream[0]; if (!pInfo->pUpdated) { pInfo->pUpdated = taosArrayInit(16, sizeof(SResultWindowInfo)); + QUERY_CHECK_NULL(pInfo->pUpdated, code, lino, _end, terrno); } if (!pInfo->pStUpdated) { _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pStUpdated = tSimpleHashInit(64, hashFn); + QUERY_CHECK_NULL(pInfo->pStUpdated, code, lino, _end, terrno); } while (1) { SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); @@ -857,6 +859,7 @@ int32_t createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pInfo->binfo.pRes = pResBlock; _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pStDeleted = tSimpleHashInit(64, hashFn); + QUERY_CHECK_NULL(pInfo->pStDeleted, code, lino, _error, terrno); pInfo->pDelIterator = NULL; code = createSpecialDataBlock(STREAM_DELETE_RESULT, &pInfo->pDelRes); @@ -878,6 +881,7 @@ int32_t createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pInfo->recvGetAll = false; pInfo->pPkDeleted = tSimpleHashInit(64, hashFn); + QUERY_CHECK_NULL(pInfo->pPkDeleted, code, lino, _error, terrno); pInfo->destHasPrimaryKey = pCountNode->window.destHasPrimayKey; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_COUNT; diff --git a/source/libs/executor/src/streameventwindowoperator.c b/source/libs/executor/src/streameventwindowoperator.c index 17ef2fe41f..30032b37ff 100644 --- a/source/libs/executor/src/streameventwindowoperator.c +++ b/source/libs/executor/src/streameventwindowoperator.c @@ -619,10 +619,12 @@ static int32_t doStreamEventAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRe SOperatorInfo* downstream = pOperator->pDownstream[0]; if (!pInfo->pUpdated) { pInfo->pUpdated = taosArrayInit(16, sizeof(SEventWindowInfo)); + QUERY_CHECK_NULL(pInfo->pUpdated, code, lino, _end, terrno); } if (!pInfo->pSeUpdated) { _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pSeUpdated = tSimpleHashInit(64, hashFn); + QUERY_CHECK_NULL(pInfo->pSeUpdated, code, lino, _end, terrno); } while (1) { SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); @@ -738,6 +740,9 @@ void streamEventReleaseState(SOperatorInfo* pOperator) { int32_t winSize = taosArrayGetSize(pInfo->historyWins) * sizeof(SSessionKey); int32_t resSize = winSize + sizeof(TSKEY); char* pBuff = taosMemoryCalloc(1, resSize); + if (!pBuff) { + return ; + } memcpy(pBuff, pInfo->historyWins->pData, winSize); memcpy(pBuff + winSize, &pInfo->twAggSup.maxTs, sizeof(TSKEY)); qDebug("===stream=== event window operator relase state. save result count:%d", @@ -780,10 +785,12 @@ void streamEventReloadState(SOperatorInfo* pOperator) { if (!pInfo->pSeUpdated && num > 0) { _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pSeUpdated = tSimpleHashInit(64, hashFn); + QUERY_CHECK_NULL(pInfo->pSeUpdated, code, lino, _end, terrno); } if (!pInfo->pSeDeleted && num > 0) { _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pSeDeleted = tSimpleHashInit(64, hashFn); + QUERY_CHECK_NULL(pInfo->pSeUpdated, code, lino, _end, terrno); } for (int32_t i = 0; i < num; i++) { SEventWindowInfo curInfo = {0}; @@ -897,6 +904,7 @@ int32_t createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pSeDeleted = tSimpleHashInit(64, hashFn); + QUERY_CHECK_NULL(pInfo->pSeUpdated, code, lino, _error, terrno); pInfo->pDelIterator = NULL; code = createSpecialDataBlock(STREAM_DELETE_RESULT, &pInfo->pDelRes); QUERY_CHECK_CODE(code, lino, _error); diff --git a/source/libs/executor/src/streamfilloperator.c b/source/libs/executor/src/streamfilloperator.c index 480814f6a0..c0949ec012 100644 --- a/source/libs/executor/src/streamfilloperator.c +++ b/source/libs/executor/src/streamfilloperator.c @@ -604,6 +604,7 @@ static void doStreamFillLinear(SStreamFillSupporter* pFillSup, SStreamFillInfo* SPoint cur = {0}; cur.key = pFillInfo->current; cur.val = taosMemoryCalloc(1, pCell->bytes); + QUERY_CHECK_NULL(cur.val, code, lino, _end, terrno); taosGetLinearInterpolationVal(&cur, pCell->type, &start, pEnd, pCell->type); code = colDataSetVal(pColData, index, (const char*)cur.val, false); QUERY_CHECK_CODE(code, lino, _end); @@ -683,15 +684,24 @@ _end: } } -void keepBlockRowInDiscBuf(SOperatorInfo* pOperator, SStreamFillInfo* pFillInfo, SSDataBlock* pBlock, TSKEY* tsCol, +int32_t keepBlockRowInDiscBuf(SOperatorInfo* pOperator, SStreamFillInfo* pFillInfo, SSDataBlock* pBlock, TSKEY* tsCol, int32_t rowId, uint64_t groupId, int32_t rowSize) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; TSKEY ts = tsCol[rowId]; pFillInfo->nextRowKey = ts; SResultRowData tmpNextRow = {.key = ts}; tmpNextRow.pRowVal = taosMemoryCalloc(1, rowSize); + QUERY_CHECK_NULL(tmpNextRow.pRowVal, code, lino, _end, terrno); transBlockToResultRow(pBlock, rowId, ts, &tmpNextRow); keepResultInDiscBuf(pOperator, groupId, &tmpNextRow, rowSize); taosMemoryFreeClear(tmpNextRow.pRowVal); + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + return code; } static void doFillResults(SOperatorInfo* pOperator, SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo, @@ -721,13 +731,15 @@ static void doStreamFillImpl(SOperatorInfo* pOperator) { pInfo->srcRowIndex++; if (pInfo->srcRowIndex == 0) { - keepBlockRowInDiscBuf(pOperator, pFillInfo, pBlock, tsCol, pInfo->srcRowIndex, groupId, pFillSup->rowSize); + code = keepBlockRowInDiscBuf(pOperator, pFillInfo, pBlock, tsCol, pInfo->srcRowIndex, groupId, pFillSup->rowSize); + QUERY_CHECK_CODE(code, lino, _end); pInfo->srcRowIndex++; } while (pInfo->srcRowIndex < pBlock->info.rows) { TSKEY ts = tsCol[pInfo->srcRowIndex]; - keepBlockRowInDiscBuf(pOperator, pFillInfo, pBlock, tsCol, pInfo->srcRowIndex, groupId, pFillSup->rowSize); + code = keepBlockRowInDiscBuf(pOperator, pFillInfo, pBlock, tsCol, pInfo->srcRowIndex, groupId, pFillSup->rowSize); + QUERY_CHECK_CODE(code, lino, _end); doFillResults(pOperator, pFillSup, pFillInfo, pBlock, tsCol, pInfo->srcRowIndex - 1, pRes); if (pInfo->pRes->info.rows == pInfo->pRes->info.capacity) { code = blockDataUpdateTsWindow(pRes, pInfo->primaryTsCol); @@ -1207,6 +1219,7 @@ static SStreamFillSupporter* initStreamFillSup(SStreamFillPhysiNode* pPhyFillNod _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pFillSup->pResMap = tSimpleHashInit(16, hashFn); + QUERY_CHECK_NULL(pFillSup->pResMap, code, lino, _end, terrno); pFillSup->hasDelete = false; _end: diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 7462d71a8a..24490f3e1a 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -647,6 +647,7 @@ int32_t addPullWindow(SHashObj* pMap, SWinKey* pWinRes, int32_t size) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; SArray* childIds = taosArrayInit(8, sizeof(int32_t)); + QUERY_CHECK_NULL(childIds, code, lino, _end, terrno); for (int32_t i = 0; i < size; i++) { void* tmp = taosArrayPush(childIds, &i); if (!tmp) { @@ -1579,10 +1580,12 @@ static int32_t doStreamFinalIntervalAggNext(SOperatorInfo* pOperator, SSDataBloc if (!pInfo->pUpdated) { pInfo->pUpdated = taosArrayInit(4096, POINTER_BYTES); + QUERY_CHECK_NULL(pInfo->pUpdated, code, lino, _end, terrno); } if (!pInfo->pUpdatedMap) { _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pUpdatedMap = tSimpleHashInit(4096, hashFn); + QUERY_CHECK_NULL(pInfo->pUpdatedMap, code, lino, _end, terrno); } while (1) { @@ -1609,6 +1612,7 @@ static int32_t doStreamFinalIntervalAggNext(SOperatorInfo* pOperator, SSDataBloc } else if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT || pBlock->info.type == STREAM_CLEAR) { SArray* delWins = taosArrayInit(8, sizeof(SWinKey)); + QUERY_CHECK_NULL(delWins, code, lino, _end, terrno); SHashObj* finalMap = IS_FINAL_INTERVAL_OP(pOperator) ? pInfo->pFinalPullDataMap : NULL; code = doDeleteWindows(pOperator, &pInfo->interval, pBlock, delWins, pInfo->pUpdatedMap, finalMap); QUERY_CHECK_CODE(code, lino, _end); @@ -1891,6 +1895,7 @@ int32_t createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiN initBasicInfo(&pInfo->binfo, pResBlock); pInfo->pState = taosMemoryCalloc(1, sizeof(SStreamState)); + QUERY_CHECK_NULL(pInfo->pState, code, lino, _error, terrno); qInfo("open state %p", pInfo->pState); pAPI->stateStore.streamStateCopyBackend(pTaskInfo->streamInfo.pState, pInfo->pState); //*(pInfo->pState) = *(pTaskInfo->streamInfo.pState); @@ -1914,6 +1919,7 @@ int32_t createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiN } pInfo->pPullWins = taosArrayInit(8, sizeof(SPullWindowInfo)); + QUERY_CHECK_NULL(pInfo->pPullWins, code, lino, _error, terrno); pInfo->pullIndex = 0; _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pPullDataMap = taosHashInit(64, hashFn, true, HASH_NO_LOCK); @@ -1929,6 +1935,7 @@ int32_t createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiN pInfo->delIndex = 0; pInfo->pDelWins = taosArrayInit(4, sizeof(SWinKey)); + QUERY_CHECK_NULL(pInfo->pDelWins, code, lino, _error, terrno); pInfo->delKey.ts = INT64_MAX; pInfo->delKey.groupId = 0; pInfo->numOfDatapack = 0; @@ -1953,7 +1960,9 @@ int32_t createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiN pInfo->clearState = false; pInfo->pMidPullDatas = taosArrayInit(4, sizeof(SWinKey)); + QUERY_CHECK_NULL(pInfo->pMidPullDatas, code, lino, _error, terrno); pInfo->pDeletedMap = tSimpleHashInit(4096, hashFn); + QUERY_CHECK_NULL(pInfo->pDeletedMap, code, lino, _error, terrno); pInfo->destHasPrimaryKey = pIntervalPhyNode->window.destHasPrimayKey; pOperator->operatorType = pPhyNode->type; @@ -2129,6 +2138,9 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SExprSupp* pExpSup, in initDummyFunction(pSup->pDummyCtx, pExpSup->pCtx, numOfOutput); pSup->pState = taosMemoryCalloc(1, sizeof(SStreamState)); + if (!pSup->pState) { + return terrno; + } *(pSup->pState) = *pState; pSup->stateStore.streamStateSetNumber(pSup->pState, -1, tsIndex); int32_t funResSize = getMaxFunResSize(pExpSup, numOfOutput); @@ -2138,6 +2150,9 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SExprSupp* pExpSup, in _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pSup->pResultRows = tSimpleHashInit(32, hashFn); + if (!pSup->pResultRows) { + return terrno; + } for (int32_t i = 0; i < numOfOutput; ++i) { pExpSup->pCtx[i].saveHandle.pState = pSup->pState; @@ -3340,10 +3355,12 @@ static int32_t doStreamSessionAggNext(SOperatorInfo* pOperator, SSDataBlock** pp SOperatorInfo* downstream = pOperator->pDownstream[0]; if (!pInfo->pUpdated) { pInfo->pUpdated = taosArrayInit(16, sizeof(SResultWindowInfo)); + QUERY_CHECK_NULL(pInfo->pUpdated, code, lino, _end, terrno); } if (!pInfo->pStUpdated) { _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pStUpdated = tSimpleHashInit(64, hashFn); + QUERY_CHECK_NULL(pInfo->pStUpdated, code, lino, _end, terrno); } while (1) { SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); @@ -3356,6 +3373,7 @@ static int32_t doStreamSessionAggNext(SOperatorInfo* pOperator, SSDataBlock** pp if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT || pBlock->info.type == STREAM_CLEAR) { SArray* pWins = taosArrayInit(16, sizeof(SSessionKey)); + QUERY_CHECK_NULL(pWins, code, lino, _end, terrno); // gap must be 0 code = doDeleteTimeWindows(pAggSup, pBlock, pWins); QUERY_CHECK_CODE(code, lino, _end); @@ -3496,6 +3514,9 @@ void streamSessionReleaseState(SOperatorInfo* pOperator) { int32_t winSize = taosArrayGetSize(pInfo->historyWins) * sizeof(SSessionKey); int32_t resSize = winSize + sizeof(TSKEY); char* pBuff = taosMemoryCalloc(1, resSize); + if (!pBuff) { + return; + } memcpy(pBuff, pInfo->historyWins->pData, winSize); memcpy(pBuff + winSize, &pInfo->twAggSup.maxTs, sizeof(TSKEY)); pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_SESSION_OP_STATE_NAME, @@ -3623,6 +3644,7 @@ void streamSessionReloadState(SOperatorInfo* pOperator) { if (!pInfo->pStUpdated && num > 0) { _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pStUpdated = tSimpleHashInit(64, hashFn); + QUERY_CHECK_NULL(pInfo->pStUpdated, code, lino, _end, terrno); } for (int32_t i = 0; i < num; i++) { SResultWindowInfo winInfo = {0}; @@ -3731,6 +3753,7 @@ int32_t createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode pInfo->order = TSDB_ORDER_ASC; _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pStDeleted = tSimpleHashInit(64, hashFn); + QUERY_CHECK_NULL(pInfo->pStUpdated, code, lino, _error, terrno); pInfo->pDelIterator = NULL; code = createSpecialDataBlock(STREAM_DELETE_RESULT, &pInfo->pDelRes); QUERY_CHECK_CODE(code, lino, _error); @@ -3757,6 +3780,7 @@ int32_t createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode pInfo->recvGetAll = false; pInfo->destHasPrimaryKey = pSessionNode->window.destHasPrimayKey; pInfo->pPkDeleted = tSimpleHashInit(64, hashFn); + QUERY_CHECK_NULL(pInfo->pPkDeleted, code, lino, _error, terrno); pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION; setOperatorInfo(pOperator, getStreamOpName(pOperator->operatorType), QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION, true, @@ -3886,10 +3910,12 @@ static int32_t doStreamSessionSemiAggNext(SOperatorInfo* pOperator, SSDataBlock* SOperatorInfo* downstream = pOperator->pDownstream[0]; if (!pInfo->pUpdated) { pInfo->pUpdated = taosArrayInit(16, sizeof(SResultWindowInfo)); + QUERY_CHECK_NULL(pInfo->pUpdated, code, lino, _end, terrno); } if (!pInfo->pStUpdated) { _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pStUpdated = tSimpleHashInit(64, hashFn); + QUERY_CHECK_NULL(pInfo->pStUpdated, code, lino, _end, terrno); } while (1) { SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); @@ -4008,6 +4034,7 @@ int32_t createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream, SPhys if (numOfChild > 0) { pInfo->pChildren = taosArrayInit(numOfChild, sizeof(void*)); + QUERY_CHECK_NULL(pInfo->pChildren, code, lino, _error, terrno); for (int32_t i = 0; i < numOfChild; i++) { SOperatorInfo* pChildOp = NULL; code = createStreamFinalSessionAggOperatorInfo(NULL, pPhyNode, pTaskInfo, 0, pHandle, &pChildOp); @@ -4598,10 +4625,12 @@ static int32_t doStreamStateAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRe SOperatorInfo* downstream = pOperator->pDownstream[0]; if (!pInfo->pUpdated) { pInfo->pUpdated = taosArrayInit(16, sizeof(SResultWindowInfo)); + QUERY_CHECK_NULL(pInfo->pUpdated, code, lino, _end, terrno); } if (!pInfo->pSeUpdated) { _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pSeUpdated = tSimpleHashInit(64, hashFn); + QUERY_CHECK_NULL(pInfo->pSeUpdated, code, lino, _end, terrno); } while (1) { SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); @@ -4702,6 +4731,9 @@ void streamStateReleaseState(SOperatorInfo* pOperator) { int32_t winSize = taosArrayGetSize(pInfo->historyWins) * sizeof(SSessionKey); int32_t resSize = winSize + sizeof(TSKEY); char* pBuff = taosMemoryCalloc(1, resSize); + if (!pBuff) { + return ; + } memcpy(pBuff, pInfo->historyWins->pData, winSize); memcpy(pBuff + winSize, &pInfo->twAggSup.maxTs, sizeof(TSKEY)); qDebug("===stream=== relase state. save result count:%d", (int32_t)taosArrayGetSize(pInfo->historyWins)); @@ -4752,10 +4784,12 @@ void streamStateReloadState(SOperatorInfo* pOperator) { if (!pInfo->pSeUpdated && num > 0) { _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pSeUpdated = tSimpleHashInit(64, hashFn); + QUERY_CHECK_NULL(pInfo->pSeUpdated, code, lino, _end, terrno); } if (!pInfo->pSeDeleted && num > 0) { _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pSeDeleted = tSimpleHashInit(64, hashFn); + QUERY_CHECK_NULL(pInfo->pSeDeleted, code, lino, _end, terrno); } for (int32_t i = 0; i < num; i++) { SStateWindowInfo curInfo = {0}; @@ -4865,6 +4899,7 @@ int32_t createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pSeDeleted = tSimpleHashInit(64, hashFn); + QUERY_CHECK_NULL(pInfo->pSeDeleted, code, lino, _error, terrno); pInfo->pDelIterator = NULL; code = createSpecialDataBlock(STREAM_DELETE_RESULT, &pInfo->pDelRes); @@ -4891,6 +4926,7 @@ int32_t createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pInfo->recvGetAll = false; pInfo->pPkDeleted = tSimpleHashInit(64, hashFn); + QUERY_CHECK_NULL(pInfo->pPkDeleted, code, lino, _error, terrno); pInfo->destHasPrimaryKey = pStateNode->window.destHasPrimayKey; setOperatorInfo(pOperator, "StreamStateAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE, true, OP_NOT_OPENED, @@ -4985,11 +5021,13 @@ static int32_t doStreamIntervalAggNext(SOperatorInfo* pOperator, SSDataBlock** p if (!pInfo->pUpdated) { pInfo->pUpdated = taosArrayInit(4096, POINTER_BYTES); + QUERY_CHECK_NULL(pInfo->pUpdated, code, lino, _end, terrno); } if (!pInfo->pUpdatedMap) { _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pUpdatedMap = tSimpleHashInit(4096, hashFn); + QUERY_CHECK_NULL(pInfo->pUpdatedMap, code, lino, _end, terrno); } while (1) { @@ -5164,6 +5202,7 @@ int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* initResultSizeInfo(&pOperator->resultInfo, 4096); pInfo->pState = taosMemoryCalloc(1, sizeof(SStreamState)); + QUERY_CHECK_NULL(pInfo->pState, code, lino, _error, terrno); *(pInfo->pState) = *(pTaskInfo->streamInfo.pState); pAPI->stateStore.streamStateSetNumber(pInfo->pState, -1, pInfo->primaryTsIndex); @@ -5181,6 +5220,7 @@ int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pInfo->invertible = false; pInfo->pDelWins = taosArrayInit(4, sizeof(SWinKey)); + QUERY_CHECK_NULL(pInfo->pDelWins, code, lino, _error, terrno); pInfo->delIndex = 0; code = createSpecialDataBlock(STREAM_DELETE_RESULT, &pInfo->pDelRes); @@ -5221,6 +5261,7 @@ int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pDeletedMap = tSimpleHashInit(4096, hashFn); + QUERY_CHECK_NULL(pInfo->pDeletedMap, code, lino, _error, terrno); pInfo->destHasPrimaryKey = pIntervalPhyNode->window.destHasPrimayKey; // for stream @@ -5453,10 +5494,12 @@ static int32_t doStreamMidIntervalAggNext(SOperatorInfo* pOperator, SSDataBlock* if (!pInfo->pUpdated) { pInfo->pUpdated = taosArrayInit(4096, POINTER_BYTES); + QUERY_CHECK_NULL(pInfo->pUpdated, code, lino, _end, terrno); } if (!pInfo->pUpdatedMap) { _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pUpdatedMap = tSimpleHashInit(4096, hashFn); + QUERY_CHECK_NULL(pInfo->pUpdatedMap, code, lino, _end, terrno); } while (1) { diff --git a/source/libs/executor/src/sysscanoperator.c b/source/libs/executor/src/sysscanoperator.c index 5f4bbd66ce..80b4b646bf 100644 --- a/source/libs/executor/src/sysscanoperator.c +++ b/source/libs/executor/src/sysscanoperator.c @@ -1772,8 +1772,10 @@ static SSDataBlock* sysTableScanUserTables(SOperatorInfo* pOperator) { .pMeta = pInfo->readHandle.vnode, .pVnode = pInfo->readHandle.vnode, .pAPI = &pTaskInfo->storageAPI}; SSysTableIndex* idx = taosMemoryMalloc(sizeof(SSysTableIndex)); + QUERY_CHECK_NULL(idx, code, lino, _end, terrno); idx->init = 0; idx->uids = taosArrayInit(128, sizeof(int64_t)); + QUERY_CHECK_NULL(idx->uids, code, lino, _end, terrno); idx->lastIdx = 0; pInfo->pIdx = idx; // set idx arg @@ -1992,6 +1994,9 @@ static SSDataBlock* sysTableScanFromMNode(SOperatorInfo* pOperator, SSysTableSca int32_t contLen = tSerializeSRetrieveTableReq(NULL, 0, &pInfo->req); char* buf1 = taosMemoryCalloc(1, contLen); + if (!buf1) { + return NULL; + } int32_t tempRes = tSerializeSRetrieveTableReq(buf1, contLen, &pInfo->req); if (tempRes < 0) { code = terrno; @@ -2470,12 +2475,18 @@ static int32_t optSysTabFilte(void* arg, SNode* cond, SArray* result) { bool hasIdx = false; bool hasRslt = true; SArray* mRslt = taosArrayInit(len, POINTER_BYTES); + if (!mRslt) { + return terrno; + } SListCell* cell = pList->pHead; for (int i = 0; i < len; i++) { if (cell == NULL) break; SArray* aRslt = taosArrayInit(16, sizeof(int64_t)); + if (!aRslt) { + return terrno; + } ret = optSysTabFilteImpl(arg, cell->pNode, aRslt); if (ret == 0) { diff --git a/source/libs/executor/src/timesliceoperator.c b/source/libs/executor/src/timesliceoperator.c index 6eaef50491..00f88934d9 100644 --- a/source/libs/executor/src/timesliceoperator.c +++ b/source/libs/executor/src/timesliceoperator.c @@ -666,6 +666,9 @@ static int32_t initGroupKeyKeeper(STimeSliceOperatorInfo* pInfo, SExprSupp* pExp pInfo->pPrevGroupKey->type = pExprInfo->base.resSchema.type; pInfo->pPrevGroupKey->isNull = false; pInfo->pPrevGroupKey->pData = taosMemoryCalloc(1, pInfo->pPrevGroupKey->bytes); + if (!pInfo->pPrevGroupKey->pData) { + return terrno; + } } } @@ -1168,6 +1171,7 @@ int32_t createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyN if (IS_VAR_DATA_TYPE(pInfo->pkCol.type)) { pInfo->prevKey.pks[0].pData = taosMemoryCalloc(1, pInfo->pkCol.bytes); + QUERY_CHECK_NULL(pInfo->prevKey.pks[0].pData, code, lino, _error, terrno); } } diff --git a/source/libs/executor/test/executorTests.cpp b/source/libs/executor/test/executorTests.cpp index 5b34075ecf..ff33732b23 100644 --- a/source/libs/executor/test/executorTests.cpp +++ b/source/libs/executor/test/executorTests.cpp @@ -198,6 +198,7 @@ SSDataBlock* get2ColsDummyBlock(SOperatorInfo* pOperator) { SOperatorInfo* createDummyOperator(int32_t startVal, int32_t numOfBlocks, int32_t rowsPerPage, int32_t type, int32_t numOfCols) { SOperatorInfo* pOperator = static_cast(taosMemoryCalloc(1, sizeof(SOperatorInfo))); + ASSERT(!pOperator); pOperator->name = "dummyInputOpertor4Test"; if (numOfCols == 1) { @@ -207,6 +208,7 @@ SOperatorInfo* createDummyOperator(int32_t startVal, int32_t numOfBlocks, int32_ } SDummyInputInfo* pInfo = (SDummyInputInfo*)taosMemoryCalloc(1, sizeof(SDummyInputInfo)); + ASSERT(!pInfo); pInfo->totalPages = numOfBlocks; pInfo->startVal = startVal; pInfo->numOfRowsPerPage = rowsPerPage; From 9d64202f02de45b2a926f0d6dc45944566327c32 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Mon, 5 Aug 2024 13:18:54 +0800 Subject: [PATCH 08/12] adj res --- source/libs/executor/src/scanoperator.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 4b154af0d6..d408883769 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1303,7 +1303,9 @@ static int32_t getTableScannerExecInfo(struct SOperatorInfo* pOptr, void** pOptr static void destroyTableScanBase(STableScanBase* pBase, TsdReader* pAPI) { cleanupQueryTableDataCond(&pBase->cond); - pAPI->tsdReaderClose(pBase->dataReader); + if (pAPI->tsdReaderClose) { + pAPI->tsdReaderClose(pBase->dataReader); + } pBase->dataReader = NULL; if (pBase->matchInfo.pList != NULL) { From 573ab1131452146e84aec22d27913ad943081c00 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Mon, 5 Aug 2024 13:28:27 +0800 Subject: [PATCH 09/12] fix malloc issue --- source/libs/executor/src/streamtimewindowoperator.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 24490f3e1a..90d00b369a 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -3753,7 +3753,7 @@ int32_t createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode pInfo->order = TSDB_ORDER_ASC; _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pStDeleted = tSimpleHashInit(64, hashFn); - QUERY_CHECK_NULL(pInfo->pStUpdated, code, lino, _error, terrno); + QUERY_CHECK_NULL(pInfo->pStDeleted, code, lino, _error, terrno); pInfo->pDelIterator = NULL; code = createSpecialDataBlock(STREAM_DELETE_RESULT, &pInfo->pDelRes); QUERY_CHECK_CODE(code, lino, _error); From bfe3662d03577858c8486a51a9642cf14a51e343 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Mon, 5 Aug 2024 13:37:07 +0800 Subject: [PATCH 10/12] fix issue --- source/libs/executor/src/filloperator.c | 2 +- source/libs/executor/src/streameventwindowoperator.c | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/source/libs/executor/src/filloperator.c b/source/libs/executor/src/filloperator.c index 964b331598..41036dfd94 100644 --- a/source/libs/executor/src/filloperator.c +++ b/source/libs/executor/src/filloperator.c @@ -397,7 +397,7 @@ static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t pInfo->win.ekey = win.skey; } pInfo->p = taosMemoryCalloc(numOfCols, POINTER_BYTES); - if (pInfo->p) { + if (!pInfo->p) { return terrno; } diff --git a/source/libs/executor/src/streameventwindowoperator.c b/source/libs/executor/src/streameventwindowoperator.c index 30032b37ff..72f8172bc4 100644 --- a/source/libs/executor/src/streameventwindowoperator.c +++ b/source/libs/executor/src/streameventwindowoperator.c @@ -790,7 +790,7 @@ void streamEventReloadState(SOperatorInfo* pOperator) { if (!pInfo->pSeDeleted && num > 0) { _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pSeDeleted = tSimpleHashInit(64, hashFn); - QUERY_CHECK_NULL(pInfo->pSeUpdated, code, lino, _end, terrno); + QUERY_CHECK_NULL(pInfo->pSeDeleted, code, lino, _end, terrno); } for (int32_t i = 0; i < num; i++) { SEventWindowInfo curInfo = {0}; @@ -904,7 +904,7 @@ int32_t createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pSeDeleted = tSimpleHashInit(64, hashFn); - QUERY_CHECK_NULL(pInfo->pSeUpdated, code, lino, _error, terrno); + QUERY_CHECK_NULL(pInfo->pSeDeleted, code, lino, _error, terrno); pInfo->pDelIterator = NULL; code = createSpecialDataBlock(STREAM_DELETE_RESULT, &pInfo->pDelRes); QUERY_CHECK_CODE(code, lino, _error); From 816c08fe2fc2c68481b175333c0143734a1f1480 Mon Sep 17 00:00:00 2001 From: sima Date: Mon, 5 Aug 2024 13:57:32 +0800 Subject: [PATCH 11/12] Revert "feat:[TS-5137] Support group/partition by position and alias" This reverts commit d9750319b848da6062b84354c68374f33376ea9a. --- docs/en/12-taos-sql/06-select.md | 18 +++--------------- docs/zh/12-taos-sql/06-select.md | 18 +++--------------- 2 files changed, 6 insertions(+), 30 deletions(-) diff --git a/docs/en/12-taos-sql/06-select.md b/docs/en/12-taos-sql/06-select.md index 33236c0173..8bfdaeb9c8 100755 --- a/docs/en/12-taos-sql/06-select.md +++ b/docs/en/12-taos-sql/06-select.md @@ -65,16 +65,10 @@ interp_clause: RANGE(ts_val [, ts_val]) EVERY(every_val) FILL(fill_mod_and_val) partition_by_clause: - PARTITION BY partition_by_expr [, partition_by_expr] ... - -partition_by_expr: - {expr | position | c_alias} + PARTITION BY expr [, expr] ... group_by_clause: - GROUP BY group_by_expr [, group_by_expr] ... HAVING condition - -group_by_expr: - {expr | position | c_alias} + GROUP BY expr [, expr] ... HAVING condition order_by_clasue: ORDER BY order_expr [, order_expr] ... @@ -280,13 +274,7 @@ If you use a GROUP BY clause, the SELECT list can only include the following ite The GROUP BY clause groups each row of data by the value of the expression following the clause and returns a combined result for each group. -In the GROUP BY clause, columns from a table or view can be grouped by specifying the column name. These columns do not need to be included in the SELECT list. - -You can specify integers in GROUP BY expression to indicate the expressions in the select list used for grouping. For example, 1 indicates the first item in the select list. - -You can specify column names in result set to indicate the expressions in the select list used for grouping. - -When using position and result set column names for grouping in the GROUP BY clause, the corresponding expressions in the select list must not be aggregate functions. +The expressions in a GROUP BY clause can include any column in any table or view. It is not necessary that the expressions appear in the SELECT list. The GROUP BY clause does not guarantee that the results are ordered. If you want to ensure that grouped data is ordered, use the ORDER BY clause. diff --git a/docs/zh/12-taos-sql/06-select.md b/docs/zh/12-taos-sql/06-select.md index af19559c81..f10c5ebb69 100755 --- a/docs/zh/12-taos-sql/06-select.md +++ b/docs/zh/12-taos-sql/06-select.md @@ -65,16 +65,10 @@ interp_clause: RANGE(ts_val [, ts_val]) EVERY(every_val) FILL(fill_mod_and_val) partition_by_clause: - PARTITION BY partition_by_expr [, partition_by_expr] ... - -partition_by_expr: - {expr | position | c_alias} + PARTITION BY expr [, expr] ... group_by_clause: - GROUP BY group_by_expr [, group_by_expr] ... HAVING condition - -group_by_expr: - {expr | position | c_alias} + GROUP BY expr [, expr] ... HAVING condition order_by_clasue: ORDER BY order_expr [, order_expr] ... @@ -280,13 +274,7 @@ TDengine 支持基于时间戳主键的 INNER JOIN,规则如下: GROUP BY 子句对每行数据按 GROUP BY 后的表达式的值进行分组,并为每个组返回一行汇总信息。 -GROUP BY 子句中可以通过指定表或视图的列名来按照表或视图中的任何列分组,这些列不需要出现在 SELECT 列表中。 - -GROUP BY 子句中可以使用位置语法,位置标识为正整数,从 1 开始,表示使用 SELECT 列表的第几个表达式进行分组。 - -GROUP BY 子句中可以使用结果集列名,表示使用 SELECT 列表的指定表达式进行分组。 - -GROUP BY 子句中在使用位置语法和结果集列名进行分组时,其对应的 SELECT 列表中的表达式不能是聚集函数。 +GROUP BY 子句中的表达式可以包含表或视图中的任何列,这些列不需要出现在 SELECT 列表中。 该子句对行进行分组,但不保证结果集的顺序。若要对分组进行排序,请使用 ORDER BY 子句 From 28e3c70ad99f030210ae91b91998c23a3bb50b39 Mon Sep 17 00:00:00 2001 From: wangjiaming0909 <604227650@qq.com> Date: Mon, 5 Aug 2024 14:00:46 +0800 Subject: [PATCH 12/12] revert fill doc change --- docs/en/12-taos-sql/12-distinguished.md | 1 - docs/zh/12-taos-sql/12-distinguished.md | 1 - 2 files changed, 2 deletions(-) diff --git a/docs/en/12-taos-sql/12-distinguished.md b/docs/en/12-taos-sql/12-distinguished.md index 8eecb706c0..bfc9ca32c0 100644 --- a/docs/en/12-taos-sql/12-distinguished.md +++ b/docs/en/12-taos-sql/12-distinguished.md @@ -102,7 +102,6 @@ The detailed beaviors of `NULL`, `NULL_F`, `VALUE`, and VALUE_F are described be 1. A huge volume of interpolation output may be returned using `FILL`, so it's recommended to specify the time range when using `FILL`. The maximum number of interpolation values that can be returned in a single query is 10,000,000. 2. The result set is in ascending order of timestamp when you aggregate by time window. 3. If aggregate by window is used on STable, the aggregate function is performed on all the rows matching the filter conditions. If `PARTITION BY` is not used in the query, the result set will be returned in strict ascending order of timestamp; otherwise the result set will be returned in the order of ascending timestamp in each group. -4. The output windows of Fill are related with time range of WHERE Clause. For asc fill, the first output window is the first window that conains the start time of WHERE clause. The last output window is the last window that contains the end time of WHERE clause. ::: diff --git a/docs/zh/12-taos-sql/12-distinguished.md b/docs/zh/12-taos-sql/12-distinguished.md index 50bf36d2e1..0eaeb0dfa7 100755 --- a/docs/zh/12-taos-sql/12-distinguished.md +++ b/docs/zh/12-taos-sql/12-distinguished.md @@ -97,7 +97,6 @@ NULL, NULL_F, VALUE, VALUE_F 这几种填充模式针对不同场景区别如下 1. 使用 FILL 语句的时候可能生成大量的填充输出,务必指定查询的时间区间。针对每次查询,系统可返回不超过 1 千万条具有插值的结果。 2. 在时间维度聚合中,返回的结果中时间序列严格单调递增。 3. 如果查询对象是超级表,则聚合函数会作用于该超级表下满足值过滤条件的所有表的数据。如果查询中没有使用 PARTITION BY 语句,则返回的结果按照时间序列严格单调递增;如果查询中使用了 PARTITION BY 语句分组,则返回结果中每个 PARTITION 内按照时间序列严格单调递增。 -4. Fill输出的起始和结束窗口与WHERE条件的时间范围有关, 如增序Fill时, 第一个窗口是包含WHERE条件开始时间的第一个窗口, 最后一个窗口是包含WHERE条件结束时间的最后一个窗口。 :::