diff --git a/include/libs/function/function.h b/include/libs/function/function.h index 24fa2898ea..a71a2a6b7f 100644 --- a/include/libs/function/function.h +++ b/include/libs/function/function.h @@ -36,6 +36,7 @@ typedef struct SFuncExecEnv { } SFuncExecEnv; typedef bool (*FExecGetEnv)(struct SFunctionNode *pFunc, SFuncExecEnv *pEnv); +typedef void (*FExecCleanUp)(struct SqlFunctionCtx* pCtx); typedef int32_t (*FExecInit)(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo *pResultCellInfo); typedef int32_t (*FExecProcess)(struct SqlFunctionCtx *pCtx); typedef int32_t (*FExecFinalize)(struct SqlFunctionCtx *pCtx, SSDataBlock *pBlock); @@ -54,6 +55,7 @@ typedef struct SFuncExecFuncs { FExecProcess process; FExecFinalize finalize; FExecCombine combine; + FExecCleanUp cleanup; processFuncByRow processFuncByRow; } SFuncExecFuncs; diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index 9d6f106336..5d3892d5e0 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -16,10 +16,10 @@ #include "catalog.h" #include "clientInt.h" #include "clientLog.h" -#include "scheduler.h" -#include "trpc.h" -#include "tglobal.h" #include "clientMonitor.h" +#include "scheduler.h" +#include "tglobal.h" +#include "trpc.h" typedef struct { union { @@ -244,11 +244,9 @@ static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog goto _return; } - TSC_ERR_JRET(catalogUpdateDBVgInfo(pCatalog, - (rsp->useDbRsp->db[0] == 'i') ? - TSDB_PERFORMANCE_SCHEMA_DB : - TSDB_INFORMATION_SCHEMA_DB, - rsp->useDbRsp->uid, vgInfo)); + TSC_ERR_JRET(catalogUpdateDBVgInfo( + pCatalog, (rsp->useDbRsp->db[0] == 'i') ? TSDB_PERFORMANCE_SCHEMA_DB : TSDB_INFORMATION_SCHEMA_DB, + rsp->useDbRsp->uid, vgInfo)); } } } @@ -556,7 +554,6 @@ static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) { } } - taosHashRelease(pAppHbMgr->activeInfo, pReq); return TSDB_CODE_SUCCESS; @@ -609,8 +606,8 @@ static int32_t hbAsyncCallBack(void *param, SDataBuf *pMsg, int32_t code) { } pInst->monitorParas = pRsp.monitorParas; - tscDebug("[monitor] paras from hb, clusterId:%" PRIx64 " monitorParas threshold:%d scope:%d", - pInst->clusterId, pRsp.monitorParas.tsSlowLogThreshold, pRsp.monitorParas.tsSlowLogScope); + tscDebug("[monitor] paras from hb, clusterId:%" PRIx64 " monitorParas threshold:%d scope:%d", pInst->clusterId, + pRsp.monitorParas.tsSlowLogThreshold, pRsp.monitorParas.tsSlowLogScope); if (rspNum) { tscDebug("hb got %d rsp, %d empty rsp received before", rspNum, @@ -1108,7 +1105,8 @@ int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req if (clientHbMgr.appHbHash) { code = taosHashPut(clientHbMgr.appHbHash, &hbParam->clusterId, sizeof(uint64_t), NULL, 0); if (TSDB_CODE_SUCCESS != code) { - tscWarn("hbQueryHbReqHandle put clusterId failed, clusterId:%" PRIx64 ", error:%s", hbParam->clusterId, tstrerror(code)); + tscWarn("hbQueryHbReqHandle put clusterId failed, clusterId:%" PRIx64 ", error:%s", hbParam->clusterId, + tstrerror(code)); return code; } } @@ -1261,7 +1259,7 @@ int32_t hbGatherAppInfo(void) { SAppHbMgr *pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, i); if (pAppHbMgr == NULL) continue; - int64_t clusterId = pAppHbMgr->pAppInstInfo->clusterId; + int64_t clusterId = pAppHbMgr->pAppInstInfo->clusterId; SAppHbReq *pApp = taosHashGet(clientHbMgr.appSummary, &clusterId, sizeof(clusterId)); if (NULL == pApp) { (void)memcpy(&req.summary, &pAppHbMgr->pAppInstInfo->summary, sizeof(req.summary)); @@ -1303,8 +1301,7 @@ static void *hbThreadFunc(void *param) { return NULL; } if (sz > 1 && !clientHbMgr.appHbHash) { - clientHbMgr.appHbHash = - taosHashInit(0, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_NO_LOCK); + clientHbMgr.appHbHash = taosHashInit(0, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_NO_LOCK); if (NULL == clientHbMgr.appHbHash) { tscError("taosHashInit failed"); return NULL; @@ -1324,13 +1321,13 @@ static void *hbThreadFunc(void *param) { continue; } SClientHbBatchReq *pReq = NULL; - int32_t code = hbGatherAllInfo(pAppHbMgr, &pReq); + int32_t code = hbGatherAllInfo(pAppHbMgr, &pReq); if (TSDB_CODE_SUCCESS != code || taosArrayGetP(clientHbMgr.appHbMgrs, i) == NULL) { terrno = code ? code : TSDB_CODE_OUT_OF_RANGE; tFreeClientHbBatchReq(pReq); continue; } - int tlen = tSerializeSClientHbBatchReq(NULL, 0, pReq); + int tlen = tSerializeSClientHbBatchReq(NULL, 0, pReq); if (tlen == -1) { tFreeClientHbBatchReq(pReq); break; @@ -1368,9 +1365,8 @@ static void *hbThreadFunc(void *param) { pInfo->requestObjRefId = 0; SAppInstInfo *pAppInstInfo = pAppHbMgr->pAppInstInfo; - int64_t transporterId = 0; SEpSet epSet = getEpSet_s(&pAppInstInfo->mgmtEp); - if (TSDB_CODE_SUCCESS != asyncSendMsgToServer(pAppInstInfo->pTransporter, &epSet, &transporterId, pInfo)) { + if (TSDB_CODE_SUCCESS != asyncSendMsgToServer(pAppInstInfo->pTransporter, &epSet, NULL, pInfo)) { tscWarn("failed to async send msg to server"); } tFreeClientHbBatchReq(pReq); @@ -1389,7 +1385,7 @@ static void *hbThreadFunc(void *param) { } static int32_t hbCreateThread() { - int32_t code = TSDB_CODE_SUCCESS; + int32_t code = TSDB_CODE_SUCCESS; TdThreadAttr thAttr; TSC_ERR_JRET(taosThreadAttrInit(&thAttr)); TSC_ERR_JRET(taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE)); @@ -1467,9 +1463,9 @@ int32_t appHbMgrInit(SAppInstInfo *pAppInstInfo, char *key, SAppHbMgr **pAppHbMg TSC_ERR_JRET(taosThreadMutexLock(&clientHbMgr.lock)); if (taosArrayPush(clientHbMgr.appHbMgrs, &(*pAppHbMgr)) == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - (void)taosThreadMutexUnlock(&clientHbMgr.lock); - goto _return; + code = TSDB_CODE_OUT_OF_MEMORY; + (void)taosThreadMutexUnlock(&clientHbMgr.lock); + goto _return; } (*pAppHbMgr)->idx = taosArrayGetSize(clientHbMgr.appHbMgrs) - 1; TSC_ERR_JRET(taosThreadMutexUnlock(&clientHbMgr.lock)); diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index d1ee26423c..78ff40bb4f 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -1567,9 +1567,8 @@ int32_t taosConnectImpl(const char* user, const char* auth, const char* db, __ta return code; } - int64_t transporterId = 0; - code = asyncSendMsgToServer((*pTscObj)->pAppInfo->pTransporter, &(*pTscObj)->pAppInfo->mgmtEp.epSet, &transporterId, - body); + // int64_t transporterId = 0; + code = asyncSendMsgToServer((*pTscObj)->pAppInfo->pTransporter, &(*pTscObj)->pAppInfo->mgmtEp.epSet, NULL, body); if (TSDB_CODE_SUCCESS != code) { destroyTscObj(*pTscObj); tscError("failed to send connect msg to server, code:%s", tstrerror(code)); diff --git a/source/libs/catalog/src/ctgRemote.c b/source/libs/catalog/src/ctgRemote.c index 4d96283e52..00b1d7ad79 100644 --- a/source/libs/catalog/src/ctgRemote.c +++ b/source/libs/catalog/src/ctgRemote.c @@ -530,8 +530,7 @@ int32_t ctgAsyncSendMsg(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgJob* pJob, pMsgSendInfo->msgInfo.handle = NULL; pMsgSendInfo->msgType = msgType; - int64_t transporterId = 0; - code = asyncSendMsgToServer(pConn->pTrans, &pConn->mgmtEps, &transporterId, pMsgSendInfo); + code = asyncSendMsgToServer(pConn->pTrans, &pConn->mgmtEps, NULL, pMsgSendInfo); pMsgSendInfo = NULL; if (code) { ctgError("asyncSendMsgToSever failed, error: %s", tstrerror(code)); diff --git a/source/libs/executor/src/aggregateoperator.c b/source/libs/executor/src/aggregateoperator.c index 20fbf9d32b..61f1339c82 100644 --- a/source/libs/executor/src/aggregateoperator.c +++ b/source/libs/executor/src/aggregateoperator.c @@ -326,6 +326,9 @@ int32_t doAggregateImpl(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx) { } if (code != TSDB_CODE_SUCCESS) { + if (pCtx[k].fpSet.cleanup != NULL) { + pCtx[k].fpSet.cleanup(&pCtx[k]); + } qError("%s aggregate function error happens, code: %s", GET_TASKID(pOperator->pTaskInfo), tstrerror(code)); return code; } @@ -640,6 +643,9 @@ void applyAggFunctionOnPartialTuples(SExecTaskInfo* taskInfo, SqlFunctionCtx* pC } if (code != TSDB_CODE_SUCCESS) { + if (pCtx[k].fpSet.cleanup != NULL) { + pCtx[k].fpSet.cleanup(&pCtx[k]); + } qError("%s apply functions error, code: %s", GET_TASKID(taskInfo), tstrerror(code)); taskInfo->code = code; T_LONG_JMP(taskInfo->env, code); diff --git a/source/libs/executor/src/mergejoinoperator.c b/source/libs/executor/src/mergejoinoperator.c index 14f3a08e17..30cc596a44 100644 --- a/source/libs/executor/src/mergejoinoperator.c +++ b/source/libs/executor/src/mergejoinoperator.c @@ -1919,10 +1919,10 @@ _return: if (pInfo != NULL) { destroyMergeJoinOperator(pInfo); } + destroyOperatorAndDownstreams(pOperator, pDownstream, oldNum); if (newDownstreams) { taosMemoryFree(pDownstream); } - destroyOperatorAndDownstreams(pOperator, pDownstream, oldNum); pTaskInfo->code = code; return code; diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c index a3772c958c..633eda6bf9 100644 --- a/source/libs/executor/src/projectoperator.c +++ b/source/libs/executor/src/projectoperator.c @@ -1057,6 +1057,9 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc code = pfCtx->fpSet.process(pfCtx); if (code != TSDB_CODE_SUCCESS) { + if (pCtx[k].fpSet.cleanup != NULL) { + pCtx[k].fpSet.cleanup(&pCtx[k]); + } goto _exit; } diff --git a/source/libs/function/inc/builtins.h b/source/libs/function/inc/builtins.h index 343f5b8367..5707ee76f4 100644 --- a/source/libs/function/inc/builtins.h +++ b/source/libs/function/inc/builtins.h @@ -40,6 +40,7 @@ typedef struct SBuiltinFuncDefinition { FExecProcess processFunc; FScalarExecProcess sprocessFunc; FExecFinalize finalizeFunc; + FExecCleanUp cleanupFunc; #ifdef BUILD_NO_CALL FExecProcess invertFunc; #endif diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h index 7403a4ce31..77d6bda35b 100644 --- a/source/libs/function/inc/builtinsimpl.h +++ b/source/libs/function/inc/builtinsimpl.h @@ -239,6 +239,7 @@ bool getModeFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); int32_t modeFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo); int32_t modeFunction(SqlFunctionCtx* pCtx); int32_t modeFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); +void modeFunctionCleanupExt(SqlFunctionCtx* pCtx); bool getTwaFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); int32_t twaFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo); diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 427270797c..4103977d05 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -3658,6 +3658,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .processFunc = modeFunction, .sprocessFunc = modeScalarFunction, .finalizeFunc = modeFinalize, + .cleanupFunc = modeFunctionCleanupExt }, { .name = "abs", diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index fa8cf243c4..84ab103456 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -6019,6 +6019,12 @@ static void modeFunctionCleanup(SModeInfo * pInfo) { taosMemoryFreeClear(pInfo->buf); } +void modeFunctionCleanupExt(SqlFunctionCtx* pCtx) { + SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); + SModeInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo); + modeFunctionCleanup(pInfo); +} + static int32_t saveModeTupleData(SqlFunctionCtx* pCtx, char* data, SModeInfo *pInfo, STuplePos* pPos) { if (IS_VAR_DATA_TYPE(pInfo->colType)) { (void)memcpy(pInfo->buf, data, varDataTLen(data)); diff --git a/source/libs/function/src/functionMgt.c b/source/libs/function/src/functionMgt.c index 254a06426c..2f71ab8e24 100644 --- a/source/libs/function/src/functionMgt.c +++ b/source/libs/function/src/functionMgt.c @@ -142,6 +142,7 @@ int32_t fmGetFuncExecFuncs(int32_t funcId, SFuncExecFuncs* pFpSet) { pFpSet->finalize = funcMgtBuiltins[funcId].finalizeFunc; pFpSet->combine = funcMgtBuiltins[funcId].combineFunc; pFpSet->processFuncByRow = funcMgtBuiltins[funcId].processFuncByRow; + pFpSet->cleanup = funcMgtBuiltins[funcId].cleanupFunc; return TSDB_CODE_SUCCESS; } diff --git a/source/libs/qcom/src/queryUtil.c b/source/libs/qcom/src/queryUtil.c index d47a183121..34821d05cd 100644 --- a/source/libs/qcom/src/queryUtil.c +++ b/source/libs/qcom/src/queryUtil.c @@ -586,7 +586,7 @@ int32_t cloneDbVgInfo(SDBVgInfo* pSrc, SDBVgInfo** pDst) { } memcpy(*pDst, pSrc, sizeof(*pSrc)); (*pDst)->vgArray = NULL; - + if (pSrc->vgHash) { (*pDst)->vgHash = taosHashInit(taosHashGetSize(pSrc->vgHash), taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index ad720e15f5..4e34a47902 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -982,6 +982,7 @@ int32_t schAsyncSendMsg(SSchJob *pJob, SSchTask *pTask, SSchTrans *trans, SQuery SCH_ERR_JRET(code); } trans->pHandle = (void *)refId; + pMsgSendInfo->msgInfo.handle =trans->pHandle; } if (pJob && pTask) { @@ -996,8 +997,7 @@ int32_t schAsyncSendMsg(SSchJob *pJob, SSchTask *pTask, SSchTrans *trans, SQuery pTask->lastMsgType = msgType; } - int64_t transporterId = 0; - code = asyncSendMsgToServerExt(trans->pTrans, epSet, &transporterId, pMsgSendInfo, persistHandle, ctx); + code = asyncSendMsgToServerExt(trans->pTrans, epSet, NULL, pMsgSendInfo, persistHandle, ctx); pMsgSendInfo = NULL; if (code) { SCH_ERR_JRET(code); diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index dfa9595eb0..78320c450c 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -1496,9 +1496,10 @@ static void cliHandleFastFail(SCliConn* pConn, int status) { } } else { SFailFastItem item = {.count = 1, .timestamp = cTimestamp}; - int32_t code = taosHashPut(pThrd->failFastCache, pConn->dstAddr, strlen(pConn->dstAddr), &item, sizeof(SFailFastItem)); + int32_t code = + taosHashPut(pThrd->failFastCache, pConn->dstAddr, strlen(pConn->dstAddr), &item, sizeof(SFailFastItem)); if (code != 0) { - tError("failed to put fail-fast item to cache, reason:%s", tstrerror(code)); + tError("failed to put fail-fast item to cache, reason:%s", tstrerror(code)); } } } @@ -2980,7 +2981,9 @@ int32_t transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, S QUEUE_PUSH(&exh->q, &pCliMsg->seqq); taosWUnLockLatch(&exh->latch); + tDebug("msg refId: %" PRId64 "", handle); + (void)transReleaseExHandle(transGetRefMgt(), handle); (void)transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); return 0; } else {