From a673f500b8de4c56216b41060973849328c2c519 Mon Sep 17 00:00:00 2001 From: xsren <285808407@qq.com> Date: Mon, 2 Sep 2024 15:12:47 +0800 Subject: [PATCH 1/5] fix: free invalid mem --- source/libs/executor/src/executorInt.c | 1 + source/libs/executor/src/mergejoinoperator.c | 2 +- source/libs/function/src/builtinsimpl.c | 4 ++++ 3 files changed, 6 insertions(+), 1 deletion(-) diff --git a/source/libs/executor/src/executorInt.c b/source/libs/executor/src/executorInt.c index d09a83315a..92a7ecab83 100644 --- a/source/libs/executor/src/executorInt.c +++ b/source/libs/executor/src/executorInt.c @@ -371,6 +371,7 @@ static int32_t doSetInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int } if (hasPk && (j == pkParamIdx)) { pInput->pPrimaryKey = pInput->pData[j]; + QUERY_CHECK_CONDITION((pInput->pData[j]->pData != NULL), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); } QUERY_CHECK_CONDITION((pInput->pData[j] != NULL), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); } else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) { diff --git a/source/libs/executor/src/mergejoinoperator.c b/source/libs/executor/src/mergejoinoperator.c index 14f3a08e17..30cc596a44 100644 --- a/source/libs/executor/src/mergejoinoperator.c +++ b/source/libs/executor/src/mergejoinoperator.c @@ -1919,10 +1919,10 @@ _return: if (pInfo != NULL) { destroyMergeJoinOperator(pInfo); } + destroyOperatorAndDownstreams(pOperator, pDownstream, oldNum); if (newDownstreams) { taosMemoryFree(pDownstream); } - destroyOperatorAndDownstreams(pOperator, pDownstream, oldNum); pTaskInfo->code = code; return code; diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 6397b92191..e04ffaaeda 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -3163,6 +3163,10 @@ static int32_t doSaveLastrow(SqlFunctionCtx* pCtx, char* pData, int32_t rowIndex } if (pCtx->hasPrimaryKey) { + if (colDataIsNull_s(pkCol, rowIndex)) { + qError("primary key is null, rowIndex:%d", rowIndex); + return TSDB_CODE_FUNC_FUNTION_ERROR; + } char* pkData = colDataGetData(pkCol, rowIndex); if (IS_VAR_DATA_TYPE(pInfo->pkType)) { pInfo->pkBytes = varDataTLen(pkData); From 198486977ede5902722644555164b4601a596bde Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 2 Sep 2024 17:35:10 +0800 Subject: [PATCH 2/5] fix mem leak --- source/client/src/clientHb.c | 42 ++++++++++++--------------- source/client/src/clientImpl.c | 5 ++-- source/libs/catalog/src/ctgRemote.c | 3 +- source/libs/qcom/src/queryUtil.c | 2 +- source/libs/scheduler/src/schRemote.c | 3 +- source/libs/transport/src/transCli.c | 7 +++-- 6 files changed, 29 insertions(+), 33 deletions(-) diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index 9d6f106336..5d3892d5e0 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -16,10 +16,10 @@ #include "catalog.h" #include "clientInt.h" #include "clientLog.h" -#include "scheduler.h" -#include "trpc.h" -#include "tglobal.h" #include "clientMonitor.h" +#include "scheduler.h" +#include "tglobal.h" +#include "trpc.h" typedef struct { union { @@ -244,11 +244,9 @@ static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog goto _return; } - TSC_ERR_JRET(catalogUpdateDBVgInfo(pCatalog, - (rsp->useDbRsp->db[0] == 'i') ? - TSDB_PERFORMANCE_SCHEMA_DB : - TSDB_INFORMATION_SCHEMA_DB, - rsp->useDbRsp->uid, vgInfo)); + TSC_ERR_JRET(catalogUpdateDBVgInfo( + pCatalog, (rsp->useDbRsp->db[0] == 'i') ? TSDB_PERFORMANCE_SCHEMA_DB : TSDB_INFORMATION_SCHEMA_DB, + rsp->useDbRsp->uid, vgInfo)); } } } @@ -556,7 +554,6 @@ static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) { } } - taosHashRelease(pAppHbMgr->activeInfo, pReq); return TSDB_CODE_SUCCESS; @@ -609,8 +606,8 @@ static int32_t hbAsyncCallBack(void *param, SDataBuf *pMsg, int32_t code) { } pInst->monitorParas = pRsp.monitorParas; - tscDebug("[monitor] paras from hb, clusterId:%" PRIx64 " monitorParas threshold:%d scope:%d", - pInst->clusterId, pRsp.monitorParas.tsSlowLogThreshold, pRsp.monitorParas.tsSlowLogScope); + tscDebug("[monitor] paras from hb, clusterId:%" PRIx64 " monitorParas threshold:%d scope:%d", pInst->clusterId, + pRsp.monitorParas.tsSlowLogThreshold, pRsp.monitorParas.tsSlowLogScope); if (rspNum) { tscDebug("hb got %d rsp, %d empty rsp received before", rspNum, @@ -1108,7 +1105,8 @@ int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req if (clientHbMgr.appHbHash) { code = taosHashPut(clientHbMgr.appHbHash, &hbParam->clusterId, sizeof(uint64_t), NULL, 0); if (TSDB_CODE_SUCCESS != code) { - tscWarn("hbQueryHbReqHandle put clusterId failed, clusterId:%" PRIx64 ", error:%s", hbParam->clusterId, tstrerror(code)); + tscWarn("hbQueryHbReqHandle put clusterId failed, clusterId:%" PRIx64 ", error:%s", hbParam->clusterId, + tstrerror(code)); return code; } } @@ -1261,7 +1259,7 @@ int32_t hbGatherAppInfo(void) { SAppHbMgr *pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, i); if (pAppHbMgr == NULL) continue; - int64_t clusterId = pAppHbMgr->pAppInstInfo->clusterId; + int64_t clusterId = pAppHbMgr->pAppInstInfo->clusterId; SAppHbReq *pApp = taosHashGet(clientHbMgr.appSummary, &clusterId, sizeof(clusterId)); if (NULL == pApp) { (void)memcpy(&req.summary, &pAppHbMgr->pAppInstInfo->summary, sizeof(req.summary)); @@ -1303,8 +1301,7 @@ static void *hbThreadFunc(void *param) { return NULL; } if (sz > 1 && !clientHbMgr.appHbHash) { - clientHbMgr.appHbHash = - taosHashInit(0, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_NO_LOCK); + clientHbMgr.appHbHash = taosHashInit(0, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_NO_LOCK); if (NULL == clientHbMgr.appHbHash) { tscError("taosHashInit failed"); return NULL; @@ -1324,13 +1321,13 @@ static void *hbThreadFunc(void *param) { continue; } SClientHbBatchReq *pReq = NULL; - int32_t code = hbGatherAllInfo(pAppHbMgr, &pReq); + int32_t code = hbGatherAllInfo(pAppHbMgr, &pReq); if (TSDB_CODE_SUCCESS != code || taosArrayGetP(clientHbMgr.appHbMgrs, i) == NULL) { terrno = code ? code : TSDB_CODE_OUT_OF_RANGE; tFreeClientHbBatchReq(pReq); continue; } - int tlen = tSerializeSClientHbBatchReq(NULL, 0, pReq); + int tlen = tSerializeSClientHbBatchReq(NULL, 0, pReq); if (tlen == -1) { tFreeClientHbBatchReq(pReq); break; @@ -1368,9 +1365,8 @@ static void *hbThreadFunc(void *param) { pInfo->requestObjRefId = 0; SAppInstInfo *pAppInstInfo = pAppHbMgr->pAppInstInfo; - int64_t transporterId = 0; SEpSet epSet = getEpSet_s(&pAppInstInfo->mgmtEp); - if (TSDB_CODE_SUCCESS != asyncSendMsgToServer(pAppInstInfo->pTransporter, &epSet, &transporterId, pInfo)) { + if (TSDB_CODE_SUCCESS != asyncSendMsgToServer(pAppInstInfo->pTransporter, &epSet, NULL, pInfo)) { tscWarn("failed to async send msg to server"); } tFreeClientHbBatchReq(pReq); @@ -1389,7 +1385,7 @@ static void *hbThreadFunc(void *param) { } static int32_t hbCreateThread() { - int32_t code = TSDB_CODE_SUCCESS; + int32_t code = TSDB_CODE_SUCCESS; TdThreadAttr thAttr; TSC_ERR_JRET(taosThreadAttrInit(&thAttr)); TSC_ERR_JRET(taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE)); @@ -1467,9 +1463,9 @@ int32_t appHbMgrInit(SAppInstInfo *pAppInstInfo, char *key, SAppHbMgr **pAppHbMg TSC_ERR_JRET(taosThreadMutexLock(&clientHbMgr.lock)); if (taosArrayPush(clientHbMgr.appHbMgrs, &(*pAppHbMgr)) == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - (void)taosThreadMutexUnlock(&clientHbMgr.lock); - goto _return; + code = TSDB_CODE_OUT_OF_MEMORY; + (void)taosThreadMutexUnlock(&clientHbMgr.lock); + goto _return; } (*pAppHbMgr)->idx = taosArrayGetSize(clientHbMgr.appHbMgrs) - 1; TSC_ERR_JRET(taosThreadMutexUnlock(&clientHbMgr.lock)); diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index d1ee26423c..78ff40bb4f 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -1567,9 +1567,8 @@ int32_t taosConnectImpl(const char* user, const char* auth, const char* db, __ta return code; } - int64_t transporterId = 0; - code = asyncSendMsgToServer((*pTscObj)->pAppInfo->pTransporter, &(*pTscObj)->pAppInfo->mgmtEp.epSet, &transporterId, - body); + // int64_t transporterId = 0; + code = asyncSendMsgToServer((*pTscObj)->pAppInfo->pTransporter, &(*pTscObj)->pAppInfo->mgmtEp.epSet, NULL, body); if (TSDB_CODE_SUCCESS != code) { destroyTscObj(*pTscObj); tscError("failed to send connect msg to server, code:%s", tstrerror(code)); diff --git a/source/libs/catalog/src/ctgRemote.c b/source/libs/catalog/src/ctgRemote.c index ef29907b96..db440f59df 100644 --- a/source/libs/catalog/src/ctgRemote.c +++ b/source/libs/catalog/src/ctgRemote.c @@ -530,8 +530,7 @@ int32_t ctgAsyncSendMsg(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgJob* pJob, pMsgSendInfo->msgInfo.handle = NULL; pMsgSendInfo->msgType = msgType; - int64_t transporterId = 0; - code = asyncSendMsgToServer(pConn->pTrans, &pConn->mgmtEps, &transporterId, pMsgSendInfo); + code = asyncSendMsgToServer(pConn->pTrans, &pConn->mgmtEps, NULL, pMsgSendInfo); pMsgSendInfo = NULL; if (code) { ctgError("asyncSendMsgToSever failed, error: %s", tstrerror(code)); diff --git a/source/libs/qcom/src/queryUtil.c b/source/libs/qcom/src/queryUtil.c index d47a183121..34821d05cd 100644 --- a/source/libs/qcom/src/queryUtil.c +++ b/source/libs/qcom/src/queryUtil.c @@ -586,7 +586,7 @@ int32_t cloneDbVgInfo(SDBVgInfo* pSrc, SDBVgInfo** pDst) { } memcpy(*pDst, pSrc, sizeof(*pSrc)); (*pDst)->vgArray = NULL; - + if (pSrc->vgHash) { (*pDst)->vgHash = taosHashInit(taosHashGetSize(pSrc->vgHash), taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index ad720e15f5..34dea4bb5d 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -996,8 +996,7 @@ int32_t schAsyncSendMsg(SSchJob *pJob, SSchTask *pTask, SSchTrans *trans, SQuery pTask->lastMsgType = msgType; } - int64_t transporterId = 0; - code = asyncSendMsgToServerExt(trans->pTrans, epSet, &transporterId, pMsgSendInfo, persistHandle, ctx); + code = asyncSendMsgToServerExt(trans->pTrans, epSet, NULL, pMsgSendInfo, persistHandle, ctx); pMsgSendInfo = NULL; if (code) { SCH_ERR_JRET(code); diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index dfa9595eb0..78320c450c 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -1496,9 +1496,10 @@ static void cliHandleFastFail(SCliConn* pConn, int status) { } } else { SFailFastItem item = {.count = 1, .timestamp = cTimestamp}; - int32_t code = taosHashPut(pThrd->failFastCache, pConn->dstAddr, strlen(pConn->dstAddr), &item, sizeof(SFailFastItem)); + int32_t code = + taosHashPut(pThrd->failFastCache, pConn->dstAddr, strlen(pConn->dstAddr), &item, sizeof(SFailFastItem)); if (code != 0) { - tError("failed to put fail-fast item to cache, reason:%s", tstrerror(code)); + tError("failed to put fail-fast item to cache, reason:%s", tstrerror(code)); } } } @@ -2980,7 +2981,9 @@ int32_t transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, S QUEUE_PUSH(&exh->q, &pCliMsg->seqq); taosWUnLockLatch(&exh->latch); + tDebug("msg refId: %" PRId64 "", handle); + (void)transReleaseExHandle(transGetRefMgt(), handle); (void)transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); return 0; } else { From 87818986e02da83d63727e2d21679687d036ee73 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 2 Sep 2024 19:02:56 +0800 Subject: [PATCH 3/5] fix mem leak --- source/libs/scheduler/src/schRemote.c | 1 + 1 file changed, 1 insertion(+) diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index 34dea4bb5d..4e34a47902 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -982,6 +982,7 @@ int32_t schAsyncSendMsg(SSchJob *pJob, SSchTask *pTask, SSchTrans *trans, SQuery SCH_ERR_JRET(code); } trans->pHandle = (void *)refId; + pMsgSendInfo->msgInfo.handle =trans->pHandle; } if (pJob && pTask) { From 02dd9c9160b49b318db20523a6adf09cae67a31b Mon Sep 17 00:00:00 2001 From: Jing Sima Date: Tue, 3 Sep 2024 17:34:23 +0800 Subject: [PATCH 4/5] fix:[TD-31818] fix memory leak allocated by mode function. --- include/libs/function/function.h | 2 ++ source/libs/executor/src/aggregateoperator.c | 6 ++++++ source/libs/executor/src/projectoperator.c | 3 +++ source/libs/function/inc/builtins.h | 1 + source/libs/function/inc/builtinsimpl.h | 1 + source/libs/function/src/builtins.c | 1 + source/libs/function/src/builtinsimpl.c | 6 ++++++ source/libs/function/src/functionMgt.c | 1 + 8 files changed, 21 insertions(+) diff --git a/include/libs/function/function.h b/include/libs/function/function.h index 24fa2898ea..a71a2a6b7f 100644 --- a/include/libs/function/function.h +++ b/include/libs/function/function.h @@ -36,6 +36,7 @@ typedef struct SFuncExecEnv { } SFuncExecEnv; typedef bool (*FExecGetEnv)(struct SFunctionNode *pFunc, SFuncExecEnv *pEnv); +typedef void (*FExecCleanUp)(struct SqlFunctionCtx* pCtx); typedef int32_t (*FExecInit)(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo *pResultCellInfo); typedef int32_t (*FExecProcess)(struct SqlFunctionCtx *pCtx); typedef int32_t (*FExecFinalize)(struct SqlFunctionCtx *pCtx, SSDataBlock *pBlock); @@ -54,6 +55,7 @@ typedef struct SFuncExecFuncs { FExecProcess process; FExecFinalize finalize; FExecCombine combine; + FExecCleanUp cleanup; processFuncByRow processFuncByRow; } SFuncExecFuncs; diff --git a/source/libs/executor/src/aggregateoperator.c b/source/libs/executor/src/aggregateoperator.c index f0e0f81cf5..01a67a6a03 100644 --- a/source/libs/executor/src/aggregateoperator.c +++ b/source/libs/executor/src/aggregateoperator.c @@ -326,6 +326,9 @@ int32_t doAggregateImpl(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx) { } if (code != TSDB_CODE_SUCCESS) { + if (pCtx[k].fpSet.cleanup != NULL) { + pCtx[k].fpSet.cleanup(&pCtx[k]); + } qError("%s aggregate function error happens, code: %s", GET_TASKID(pOperator->pTaskInfo), tstrerror(code)); return code; } @@ -640,6 +643,9 @@ void applyAggFunctionOnPartialTuples(SExecTaskInfo* taskInfo, SqlFunctionCtx* pC } if (code != TSDB_CODE_SUCCESS) { + if (pCtx[k].fpSet.cleanup != NULL) { + pCtx[k].fpSet.cleanup(&pCtx[k]); + } qError("%s apply functions error, code: %s", GET_TASKID(taskInfo), tstrerror(code)); taskInfo->code = code; T_LONG_JMP(taskInfo->env, code); diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c index a9ba57e1d4..21b7c0880f 100644 --- a/source/libs/executor/src/projectoperator.c +++ b/source/libs/executor/src/projectoperator.c @@ -1057,6 +1057,9 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc code = pfCtx->fpSet.process(pfCtx); if (code != TSDB_CODE_SUCCESS) { + if (pCtx[k].fpSet.cleanup != NULL) { + pCtx[k].fpSet.cleanup(&pCtx[k]); + } goto _exit; } diff --git a/source/libs/function/inc/builtins.h b/source/libs/function/inc/builtins.h index 343f5b8367..5707ee76f4 100644 --- a/source/libs/function/inc/builtins.h +++ b/source/libs/function/inc/builtins.h @@ -40,6 +40,7 @@ typedef struct SBuiltinFuncDefinition { FExecProcess processFunc; FScalarExecProcess sprocessFunc; FExecFinalize finalizeFunc; + FExecCleanUp cleanupFunc; #ifdef BUILD_NO_CALL FExecProcess invertFunc; #endif diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h index 7403a4ce31..77d6bda35b 100644 --- a/source/libs/function/inc/builtinsimpl.h +++ b/source/libs/function/inc/builtinsimpl.h @@ -239,6 +239,7 @@ bool getModeFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); int32_t modeFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo); int32_t modeFunction(SqlFunctionCtx* pCtx); int32_t modeFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); +void modeFunctionCleanupExt(SqlFunctionCtx* pCtx); bool getTwaFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); int32_t twaFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo); diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 4bc86eb0c6..17ba430150 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -3658,6 +3658,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .processFunc = modeFunction, .sprocessFunc = modeScalarFunction, .finalizeFunc = modeFinalize, + .cleanupFunc = modeFunctionCleanupExt }, { .name = "abs", diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index fa8cf243c4..84ab103456 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -6019,6 +6019,12 @@ static void modeFunctionCleanup(SModeInfo * pInfo) { taosMemoryFreeClear(pInfo->buf); } +void modeFunctionCleanupExt(SqlFunctionCtx* pCtx) { + SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); + SModeInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo); + modeFunctionCleanup(pInfo); +} + static int32_t saveModeTupleData(SqlFunctionCtx* pCtx, char* data, SModeInfo *pInfo, STuplePos* pPos) { if (IS_VAR_DATA_TYPE(pInfo->colType)) { (void)memcpy(pInfo->buf, data, varDataTLen(data)); diff --git a/source/libs/function/src/functionMgt.c b/source/libs/function/src/functionMgt.c index 254a06426c..2f71ab8e24 100644 --- a/source/libs/function/src/functionMgt.c +++ b/source/libs/function/src/functionMgt.c @@ -142,6 +142,7 @@ int32_t fmGetFuncExecFuncs(int32_t funcId, SFuncExecFuncs* pFpSet) { pFpSet->finalize = funcMgtBuiltins[funcId].finalizeFunc; pFpSet->combine = funcMgtBuiltins[funcId].combineFunc; pFpSet->processFuncByRow = funcMgtBuiltins[funcId].processFuncByRow; + pFpSet->cleanup = funcMgtBuiltins[funcId].cleanupFunc; return TSDB_CODE_SUCCESS; } From 73fb43de2a7e929048e90001c14acbb351ec0722 Mon Sep 17 00:00:00 2001 From: facetosea <25808407@qq.com> Date: Wed, 4 Sep 2024 07:11:51 +0800 Subject: [PATCH 5/5] only for memLeak --- source/libs/executor/src/executorInt.c | 1 - source/libs/function/src/builtinsimpl.c | 4 ---- 2 files changed, 5 deletions(-) diff --git a/source/libs/executor/src/executorInt.c b/source/libs/executor/src/executorInt.c index 92a7ecab83..d09a83315a 100644 --- a/source/libs/executor/src/executorInt.c +++ b/source/libs/executor/src/executorInt.c @@ -371,7 +371,6 @@ static int32_t doSetInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int } if (hasPk && (j == pkParamIdx)) { pInput->pPrimaryKey = pInput->pData[j]; - QUERY_CHECK_CONDITION((pInput->pData[j]->pData != NULL), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); } QUERY_CHECK_CONDITION((pInput->pData[j] != NULL), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); } else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) { diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index e04ffaaeda..6397b92191 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -3163,10 +3163,6 @@ static int32_t doSaveLastrow(SqlFunctionCtx* pCtx, char* pData, int32_t rowIndex } if (pCtx->hasPrimaryKey) { - if (colDataIsNull_s(pkCol, rowIndex)) { - qError("primary key is null, rowIndex:%d", rowIndex); - return TSDB_CODE_FUNC_FUNTION_ERROR; - } char* pkData = colDataGetData(pkCol, rowIndex); if (IS_VAR_DATA_TYPE(pInfo->pkType)) { pInfo->pkBytes = varDataTLen(pkData);