From a0bf82878d8fe2e5720f2cfbbe3e71a6180c58bf Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 3 Apr 2024 09:11:49 +0000 Subject: [PATCH] free handle --- source/libs/executor/src/executil.c | 49 +++++++++++++++------------ source/libs/scheduler/src/schRemote.c | 2 ++ 2 files changed, 29 insertions(+), 22 deletions(-) diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 9a480359ed..add2955a2b 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -45,8 +45,8 @@ static FilterCondType checkTagCond(SNode* cond); static int32_t optimizeTbnameInCond(void* metaHandle, int64_t suid, SArray* list, SNode* pTagCond, SStorageAPI* pAPI); static int32_t optimizeTbnameInCondImpl(void* metaHandle, SArray* list, SNode* pTagCond, SStorageAPI* pStoreAPI); -static int32_t getTableList(void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, SNode* pTagIndexCond, - STableListInfo* pListInfo, uint8_t* digest, const char* idstr, SStorageAPI* pStorageAPI); +static int32_t getTableList(void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, SNode* pTagIndexCond, + STableListInfo* pListInfo, uint8_t* digest, const char* idstr, SStorageAPI* pStorageAPI); static int64_t getLimit(const SNode* pLimit) { return NULL == pLimit ? -1 : ((SLimitNode*)pLimit)->limit; } static int64_t getOffset(const SNode* pLimit) { return NULL == pLimit ? -1 : ((SLimitNode*)pLimit)->offset; } @@ -642,7 +642,8 @@ int32_t getColInfoResultForGroupby(void* pVnode, SNodeList* group, STableListInf info->groupId = calcGroupId(keyBuf, len); if (initRemainGroups) { // groupId ~ table uid - taosHashPut(pTableListInfo->remainGroups, &(info->groupId), sizeof(info->groupId), &(info->uid), sizeof(info->uid)); + taosHashPut(pTableListInfo->remainGroups, &(info->groupId), sizeof(info->groupId), &(info->uid), + sizeof(info->uid)); } } @@ -858,7 +859,7 @@ static int32_t optimizeTbnameInCondImpl(void* pVnode, SArray* pExistedUidList, S } SSDataBlock* createTagValBlockForFilter(SArray* pColList, int32_t numOfTables, SArray* pUidTagList, void* pVnode, - SStorageAPI* pStorageAPI) { + SStorageAPI* pStorageAPI) { SSDataBlock* pResBlock = createDataBlock(); if (pResBlock == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -939,11 +940,12 @@ SSDataBlock* createTagValBlockForFilter(SArray* pColList, int32_t numOfTables, S return pResBlock; } -static int32_t doSetQualifiedUid(STableListInfo* pListInfo, SArray* pUidList, const SArray* pUidTagList, bool* pResultList, bool addUid) { +static int32_t doSetQualifiedUid(STableListInfo* pListInfo, SArray* pUidList, const SArray* pUidTagList, + bool* pResultList, bool addUid) { taosArrayClear(pUidList); STableKeyInfo info = {.uid = 0, .groupId = 0}; - int32_t numOfTables = taosArrayGetSize(pUidTagList); + int32_t numOfTables = taosArrayGetSize(pUidTagList); for (int32_t i = 0; i < numOfTables; ++i) { if (pResultList[i]) { uint64_t uid = ((STUidTagInfo*)taosArrayGet(pUidTagList, i))->uid; @@ -1143,7 +1145,7 @@ int32_t getTableList(void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, S if (code != 0 || status == SFLT_NOT_INDEX) { // temporarily disable it for performance sake qDebug("failed to get tableIds from index, suid:%" PRIu64, pScanNode->uid); } else { - qInfo("succ to get filter result, table num: %d", (int)taosArrayGetSize(pUidList)); + qDebug("succ to get filter result, table num: %d", (int)taosArrayGetSize(pUidList)); } } } @@ -1165,7 +1167,8 @@ int32_t getTableList(void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, S memcpy(pPayload + sizeof(int32_t), taosArrayGet(pUidList, 0), numOfTables * sizeof(uint64_t)); } - pStorageAPI->metaFn.putCachedTableList(pVnode, pScanNode->suid, context.digest, tListLen(context.digest), pPayload, size, 1); + pStorageAPI->metaFn.putCachedTableList(pVnode, pScanNode->suid, context.digest, tListLen(context.digest), + pPayload, size, 1); digest[0] = 1; memcpy(digest + 1, context.digest, tListLen(context.digest)); } @@ -1725,7 +1728,8 @@ SColumn extractColumnFromColumnNode(SColumnNode* pColNode) { return c; } -int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysiNode* pTableScanNode, const SReadHandle* readHandle) { +int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysiNode* pTableScanNode, + const SReadHandle* readHandle) { pCond->order = pTableScanNode->scanSeq[0] > 0 ? TSDB_ORDER_ASC : TSDB_ORDER_DESC; pCond->numOfCols = LIST_LENGTH(pTableScanNode->scan.pScanCols); @@ -1748,8 +1752,7 @@ int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysi // allowed read stt file optimization mode pCond->notLoadData = (pTableScanNode->dataRequired == FUNC_DATA_REQUIRED_NOT_LOAD) && - (pTableScanNode->scan.node.pConditions == NULL) && - (pTableScanNode->interval == 0); + (pTableScanNode->scan.node.pConditions == NULL) && (pTableScanNode->interval == 0); int32_t j = 0; for (int32_t i = 0; i < pCond->numOfCols; ++i) { @@ -1891,7 +1894,8 @@ void getNextTimeWindow(const SInterval* pInterval, STimeWindow* tw, int32_t orde int32_t factor = GET_FORWARD_DIRECTION_FACTOR(order); slidingStart = taosTimeAdd(slidingStart, factor * pInterval->sliding, pInterval->slidingUnit, pInterval->precision); tw->skey = taosTimeAdd(slidingStart, pInterval->offset, pInterval->offsetUnit, pInterval->precision); - int64_t slidingEnd = taosTimeAdd(slidingStart, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1; + int64_t slidingEnd = + taosTimeAdd(slidingStart, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1; tw->ekey = taosTimeAdd(slidingEnd, pInterval->offset, pInterval->offsetUnit, pInterval->precision); } @@ -2136,7 +2140,7 @@ int32_t buildGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle* if (groupSort && groupByTbname) { taosArraySort(pTableListInfo->pTableList, orderbyGroupIdComparFn); pTableListInfo->numOfOuputGroups = numOfTables; - } else if (groupByTbname && pScanNode->groupOrderScan){ + } else if (groupByTbname && pScanNode->groupOrderScan) { pTableListInfo->numOfOuputGroups = numOfTables; } else if (groupByTbname && tsCountAlwaysReturnValue && ((STableScanPhysiNode*)pScanNode)->needCountEmptyTable) { pTableListInfo->numOfOuputGroups = numOfTables; @@ -2147,7 +2151,8 @@ int32_t buildGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle* bool initRemainGroups = false; if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == nodeType(pScanNode)) { STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pScanNode; - if (tsCountAlwaysReturnValue && pTableScanNode->needCountEmptyTable && !(groupSort || pScanNode->groupOrderScan)) { + if (tsCountAlwaysReturnValue && pTableScanNode->needCountEmptyTable && + !(groupSort || pScanNode->groupOrderScan)) { initRemainGroups = true; } } @@ -2271,7 +2276,7 @@ void printSpecDataBlock(SSDataBlock* pBlock, const char* flag, const char* opStr } if (qDebugFlag & DEBUG_DEBUG) { char* pBuf = NULL; - char flagBuf[64]; + char flagBuf[64]; snprintf(flagBuf, sizeof(flagBuf), "%s %s", flag, opStr); qDebug("%s", dumpBlockData(pBlock, flagBuf, &pBuf, taskIdStr)); taosMemoryFree(pBuf); @@ -2280,7 +2285,7 @@ void printSpecDataBlock(SSDataBlock* pBlock, const char* flag, const char* opStr TSKEY getStartTsKey(STimeWindow* win, const TSKEY* tsCols) { return tsCols == NULL ? win->skey : tsCols[0]; } -void updateTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pWin, int64_t delta) { +void updateTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pWin, int64_t delta) { int64_t* ts = (int64_t*)pColData->pData; int64_t duration = pWin->ekey - pWin->skey + delta; @@ -2289,13 +2294,14 @@ void updateTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pWin, int64_t ts[4] = pWin->ekey + delta; // window end key } -int32_t compKeys(const SArray* pSortGroupCols, const char* oldkeyBuf, int32_t oldKeysLen, const SSDataBlock* pBlock, int32_t rowIndex) { +int32_t compKeys(const SArray* pSortGroupCols, const char* oldkeyBuf, int32_t oldKeysLen, const SSDataBlock* pBlock, + int32_t rowIndex) { SColumnDataAgg* pColAgg = NULL; const char* isNull = oldkeyBuf; const char* p = oldkeyBuf + sizeof(int8_t) * pSortGroupCols->size; for (int32_t i = 0; i < pSortGroupCols->size; ++i) { - const SColumn* pCol = (SColumn*)TARRAY_GET_ELEM(pSortGroupCols, i); + const SColumn* pCol = (SColumn*)TARRAY_GET_ELEM(pSortGroupCols, i); const SColumnInfoData* pColInfoData = TARRAY_GET_ELEM(pBlock->pDataBlock, pCol->slotId); if (pBlock->pBlockAgg) pColAgg = pBlock->pBlockAgg[pCol->slotId]; @@ -2321,8 +2327,7 @@ int32_t compKeys(const SArray* pSortGroupCols, const char* oldkeyBuf, int32_t ol return 0; } -int32_t buildKeys(char* keyBuf, const SArray* pSortGroupCols, const SSDataBlock* pBlock, - int32_t rowIndex) { +int32_t buildKeys(char* keyBuf, const SArray* pSortGroupCols, const SSDataBlock* pBlock, int32_t rowIndex) { uint32_t colNum = pSortGroupCols->size; SColumnDataAgg* pColAgg = NULL; char* isNull = keyBuf; @@ -2370,7 +2375,7 @@ uint64_t calcGroupId(char* pData, int32_t len) { } SNodeList* makeColsNodeArrFromSortKeys(SNodeList* pSortKeys) { - SNode* node; + SNode* node; SNodeList* ret = NULL; FOREACH(node, pSortKeys) { SOrderByExprNode* pSortKey = (SOrderByExprNode*)node; @@ -2386,6 +2391,6 @@ int32_t extractKeysLen(const SArray* keys) { SColumn* pCol = (SColumn*)taosArrayGet(keys, i); len += pCol->bytes; } - len += sizeof(int8_t) * keyNum; //null flag + len += sizeof(int8_t) * keyNum; // null flag return len; } diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index 980b9aa6bf..079fd7d29d 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -486,6 +486,7 @@ int32_t schHandleNotifyCallback(void *param, SDataBuf *pMsg, int32_t code) { SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param; qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 " task notify rsp received, code:0x%x", pParam->queryId, pParam->taskId, code); + rpcReleaseHandle(pMsg->handle, TAOS_CONN_CLIENT); if (pMsg) { taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pEpSet); @@ -526,6 +527,7 @@ int32_t schHandleHbCallback(void *param, SDataBuf *pMsg, int32_t code) { if (code) { qError("hb rsp error:%s", tstrerror(code)); + rpcReleaseHandle(pMsg->handle, TAOS_CONN_CLIENT); SCH_ERR_JRET(code); }