free handle

This commit is contained in:
yihaoDeng 2024-04-03 09:11:49 +00:00
parent 588f05c7dd
commit a0bf82878d
2 changed files with 29 additions and 22 deletions

View File

@ -45,8 +45,8 @@ static FilterCondType checkTagCond(SNode* cond);
static int32_t optimizeTbnameInCond(void* metaHandle, int64_t suid, SArray* list, SNode* pTagCond, SStorageAPI* pAPI); static int32_t optimizeTbnameInCond(void* metaHandle, int64_t suid, SArray* list, SNode* pTagCond, SStorageAPI* pAPI);
static int32_t optimizeTbnameInCondImpl(void* metaHandle, SArray* list, SNode* pTagCond, SStorageAPI* pStoreAPI); static int32_t optimizeTbnameInCondImpl(void* metaHandle, SArray* list, SNode* pTagCond, SStorageAPI* pStoreAPI);
static int32_t getTableList(void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, SNode* pTagIndexCond, static int32_t getTableList(void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, SNode* pTagIndexCond,
STableListInfo* pListInfo, uint8_t* digest, const char* idstr, SStorageAPI* pStorageAPI); STableListInfo* pListInfo, uint8_t* digest, const char* idstr, SStorageAPI* pStorageAPI);
static int64_t getLimit(const SNode* pLimit) { return NULL == pLimit ? -1 : ((SLimitNode*)pLimit)->limit; } static int64_t getLimit(const SNode* pLimit) { return NULL == pLimit ? -1 : ((SLimitNode*)pLimit)->limit; }
static int64_t getOffset(const SNode* pLimit) { return NULL == pLimit ? -1 : ((SLimitNode*)pLimit)->offset; } static int64_t getOffset(const SNode* pLimit) { return NULL == pLimit ? -1 : ((SLimitNode*)pLimit)->offset; }
@ -642,7 +642,8 @@ int32_t getColInfoResultForGroupby(void* pVnode, SNodeList* group, STableListInf
info->groupId = calcGroupId(keyBuf, len); info->groupId = calcGroupId(keyBuf, len);
if (initRemainGroups) { if (initRemainGroups) {
// groupId ~ table uid // groupId ~ table uid
taosHashPut(pTableListInfo->remainGroups, &(info->groupId), sizeof(info->groupId), &(info->uid), sizeof(info->uid)); taosHashPut(pTableListInfo->remainGroups, &(info->groupId), sizeof(info->groupId), &(info->uid),
sizeof(info->uid));
} }
} }
@ -858,7 +859,7 @@ static int32_t optimizeTbnameInCondImpl(void* pVnode, SArray* pExistedUidList, S
} }
SSDataBlock* createTagValBlockForFilter(SArray* pColList, int32_t numOfTables, SArray* pUidTagList, void* pVnode, SSDataBlock* createTagValBlockForFilter(SArray* pColList, int32_t numOfTables, SArray* pUidTagList, void* pVnode,
SStorageAPI* pStorageAPI) { SStorageAPI* pStorageAPI) {
SSDataBlock* pResBlock = createDataBlock(); SSDataBlock* pResBlock = createDataBlock();
if (pResBlock == NULL) { if (pResBlock == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
@ -939,11 +940,12 @@ SSDataBlock* createTagValBlockForFilter(SArray* pColList, int32_t numOfTables, S
return pResBlock; return pResBlock;
} }
static int32_t doSetQualifiedUid(STableListInfo* pListInfo, SArray* pUidList, const SArray* pUidTagList, bool* pResultList, bool addUid) { static int32_t doSetQualifiedUid(STableListInfo* pListInfo, SArray* pUidList, const SArray* pUidTagList,
bool* pResultList, bool addUid) {
taosArrayClear(pUidList); taosArrayClear(pUidList);
STableKeyInfo info = {.uid = 0, .groupId = 0}; STableKeyInfo info = {.uid = 0, .groupId = 0};
int32_t numOfTables = taosArrayGetSize(pUidTagList); int32_t numOfTables = taosArrayGetSize(pUidTagList);
for (int32_t i = 0; i < numOfTables; ++i) { for (int32_t i = 0; i < numOfTables; ++i) {
if (pResultList[i]) { if (pResultList[i]) {
uint64_t uid = ((STUidTagInfo*)taosArrayGet(pUidTagList, i))->uid; uint64_t uid = ((STUidTagInfo*)taosArrayGet(pUidTagList, i))->uid;
@ -1143,7 +1145,7 @@ int32_t getTableList(void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, S
if (code != 0 || status == SFLT_NOT_INDEX) { // temporarily disable it for performance sake if (code != 0 || status == SFLT_NOT_INDEX) { // temporarily disable it for performance sake
qDebug("failed to get tableIds from index, suid:%" PRIu64, pScanNode->uid); qDebug("failed to get tableIds from index, suid:%" PRIu64, pScanNode->uid);
} else { } else {
qInfo("succ to get filter result, table num: %d", (int)taosArrayGetSize(pUidList)); qDebug("succ to get filter result, table num: %d", (int)taosArrayGetSize(pUidList));
} }
} }
} }
@ -1165,7 +1167,8 @@ int32_t getTableList(void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, S
memcpy(pPayload + sizeof(int32_t), taosArrayGet(pUidList, 0), numOfTables * sizeof(uint64_t)); memcpy(pPayload + sizeof(int32_t), taosArrayGet(pUidList, 0), numOfTables * sizeof(uint64_t));
} }
pStorageAPI->metaFn.putCachedTableList(pVnode, pScanNode->suid, context.digest, tListLen(context.digest), pPayload, size, 1); pStorageAPI->metaFn.putCachedTableList(pVnode, pScanNode->suid, context.digest, tListLen(context.digest),
pPayload, size, 1);
digest[0] = 1; digest[0] = 1;
memcpy(digest + 1, context.digest, tListLen(context.digest)); memcpy(digest + 1, context.digest, tListLen(context.digest));
} }
@ -1725,7 +1728,8 @@ SColumn extractColumnFromColumnNode(SColumnNode* pColNode) {
return c; return c;
} }
int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysiNode* pTableScanNode, const SReadHandle* readHandle) { int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysiNode* pTableScanNode,
const SReadHandle* readHandle) {
pCond->order = pTableScanNode->scanSeq[0] > 0 ? TSDB_ORDER_ASC : TSDB_ORDER_DESC; pCond->order = pTableScanNode->scanSeq[0] > 0 ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
pCond->numOfCols = LIST_LENGTH(pTableScanNode->scan.pScanCols); pCond->numOfCols = LIST_LENGTH(pTableScanNode->scan.pScanCols);
@ -1748,8 +1752,7 @@ int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysi
// allowed read stt file optimization mode // allowed read stt file optimization mode
pCond->notLoadData = (pTableScanNode->dataRequired == FUNC_DATA_REQUIRED_NOT_LOAD) && pCond->notLoadData = (pTableScanNode->dataRequired == FUNC_DATA_REQUIRED_NOT_LOAD) &&
(pTableScanNode->scan.node.pConditions == NULL) && (pTableScanNode->scan.node.pConditions == NULL) && (pTableScanNode->interval == 0);
(pTableScanNode->interval == 0);
int32_t j = 0; int32_t j = 0;
for (int32_t i = 0; i < pCond->numOfCols; ++i) { for (int32_t i = 0; i < pCond->numOfCols; ++i) {
@ -1891,7 +1894,8 @@ void getNextTimeWindow(const SInterval* pInterval, STimeWindow* tw, int32_t orde
int32_t factor = GET_FORWARD_DIRECTION_FACTOR(order); int32_t factor = GET_FORWARD_DIRECTION_FACTOR(order);
slidingStart = taosTimeAdd(slidingStart, factor * pInterval->sliding, pInterval->slidingUnit, pInterval->precision); slidingStart = taosTimeAdd(slidingStart, factor * pInterval->sliding, pInterval->slidingUnit, pInterval->precision);
tw->skey = taosTimeAdd(slidingStart, pInterval->offset, pInterval->offsetUnit, pInterval->precision); tw->skey = taosTimeAdd(slidingStart, pInterval->offset, pInterval->offsetUnit, pInterval->precision);
int64_t slidingEnd = taosTimeAdd(slidingStart, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1; int64_t slidingEnd =
taosTimeAdd(slidingStart, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1;
tw->ekey = taosTimeAdd(slidingEnd, pInterval->offset, pInterval->offsetUnit, pInterval->precision); tw->ekey = taosTimeAdd(slidingEnd, pInterval->offset, pInterval->offsetUnit, pInterval->precision);
} }
@ -2136,7 +2140,7 @@ int32_t buildGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle*
if (groupSort && groupByTbname) { if (groupSort && groupByTbname) {
taosArraySort(pTableListInfo->pTableList, orderbyGroupIdComparFn); taosArraySort(pTableListInfo->pTableList, orderbyGroupIdComparFn);
pTableListInfo->numOfOuputGroups = numOfTables; pTableListInfo->numOfOuputGroups = numOfTables;
} else if (groupByTbname && pScanNode->groupOrderScan){ } else if (groupByTbname && pScanNode->groupOrderScan) {
pTableListInfo->numOfOuputGroups = numOfTables; pTableListInfo->numOfOuputGroups = numOfTables;
} else if (groupByTbname && tsCountAlwaysReturnValue && ((STableScanPhysiNode*)pScanNode)->needCountEmptyTable) { } else if (groupByTbname && tsCountAlwaysReturnValue && ((STableScanPhysiNode*)pScanNode)->needCountEmptyTable) {
pTableListInfo->numOfOuputGroups = numOfTables; pTableListInfo->numOfOuputGroups = numOfTables;
@ -2147,7 +2151,8 @@ int32_t buildGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle*
bool initRemainGroups = false; bool initRemainGroups = false;
if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == nodeType(pScanNode)) { if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == nodeType(pScanNode)) {
STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pScanNode; STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pScanNode;
if (tsCountAlwaysReturnValue && pTableScanNode->needCountEmptyTable && !(groupSort || pScanNode->groupOrderScan)) { if (tsCountAlwaysReturnValue && pTableScanNode->needCountEmptyTable &&
!(groupSort || pScanNode->groupOrderScan)) {
initRemainGroups = true; initRemainGroups = true;
} }
} }
@ -2271,7 +2276,7 @@ void printSpecDataBlock(SSDataBlock* pBlock, const char* flag, const char* opStr
} }
if (qDebugFlag & DEBUG_DEBUG) { if (qDebugFlag & DEBUG_DEBUG) {
char* pBuf = NULL; char* pBuf = NULL;
char flagBuf[64]; char flagBuf[64];
snprintf(flagBuf, sizeof(flagBuf), "%s %s", flag, opStr); snprintf(flagBuf, sizeof(flagBuf), "%s %s", flag, opStr);
qDebug("%s", dumpBlockData(pBlock, flagBuf, &pBuf, taskIdStr)); qDebug("%s", dumpBlockData(pBlock, flagBuf, &pBuf, taskIdStr));
taosMemoryFree(pBuf); taosMemoryFree(pBuf);
@ -2280,7 +2285,7 @@ void printSpecDataBlock(SSDataBlock* pBlock, const char* flag, const char* opStr
TSKEY getStartTsKey(STimeWindow* win, const TSKEY* tsCols) { return tsCols == NULL ? win->skey : tsCols[0]; } TSKEY getStartTsKey(STimeWindow* win, const TSKEY* tsCols) { return tsCols == NULL ? win->skey : tsCols[0]; }
void updateTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pWin, int64_t delta) { void updateTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pWin, int64_t delta) {
int64_t* ts = (int64_t*)pColData->pData; int64_t* ts = (int64_t*)pColData->pData;
int64_t duration = pWin->ekey - pWin->skey + delta; int64_t duration = pWin->ekey - pWin->skey + delta;
@ -2289,13 +2294,14 @@ void updateTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pWin, int64_t
ts[4] = pWin->ekey + delta; // window end key ts[4] = pWin->ekey + delta; // window end key
} }
int32_t compKeys(const SArray* pSortGroupCols, const char* oldkeyBuf, int32_t oldKeysLen, const SSDataBlock* pBlock, int32_t rowIndex) { int32_t compKeys(const SArray* pSortGroupCols, const char* oldkeyBuf, int32_t oldKeysLen, const SSDataBlock* pBlock,
int32_t rowIndex) {
SColumnDataAgg* pColAgg = NULL; SColumnDataAgg* pColAgg = NULL;
const char* isNull = oldkeyBuf; const char* isNull = oldkeyBuf;
const char* p = oldkeyBuf + sizeof(int8_t) * pSortGroupCols->size; const char* p = oldkeyBuf + sizeof(int8_t) * pSortGroupCols->size;
for (int32_t i = 0; i < pSortGroupCols->size; ++i) { for (int32_t i = 0; i < pSortGroupCols->size; ++i) {
const SColumn* pCol = (SColumn*)TARRAY_GET_ELEM(pSortGroupCols, i); const SColumn* pCol = (SColumn*)TARRAY_GET_ELEM(pSortGroupCols, i);
const SColumnInfoData* pColInfoData = TARRAY_GET_ELEM(pBlock->pDataBlock, pCol->slotId); const SColumnInfoData* pColInfoData = TARRAY_GET_ELEM(pBlock->pDataBlock, pCol->slotId);
if (pBlock->pBlockAgg) pColAgg = pBlock->pBlockAgg[pCol->slotId]; if (pBlock->pBlockAgg) pColAgg = pBlock->pBlockAgg[pCol->slotId];
@ -2321,8 +2327,7 @@ int32_t compKeys(const SArray* pSortGroupCols, const char* oldkeyBuf, int32_t ol
return 0; return 0;
} }
int32_t buildKeys(char* keyBuf, const SArray* pSortGroupCols, const SSDataBlock* pBlock, int32_t buildKeys(char* keyBuf, const SArray* pSortGroupCols, const SSDataBlock* pBlock, int32_t rowIndex) {
int32_t rowIndex) {
uint32_t colNum = pSortGroupCols->size; uint32_t colNum = pSortGroupCols->size;
SColumnDataAgg* pColAgg = NULL; SColumnDataAgg* pColAgg = NULL;
char* isNull = keyBuf; char* isNull = keyBuf;
@ -2370,7 +2375,7 @@ uint64_t calcGroupId(char* pData, int32_t len) {
} }
SNodeList* makeColsNodeArrFromSortKeys(SNodeList* pSortKeys) { SNodeList* makeColsNodeArrFromSortKeys(SNodeList* pSortKeys) {
SNode* node; SNode* node;
SNodeList* ret = NULL; SNodeList* ret = NULL;
FOREACH(node, pSortKeys) { FOREACH(node, pSortKeys) {
SOrderByExprNode* pSortKey = (SOrderByExprNode*)node; SOrderByExprNode* pSortKey = (SOrderByExprNode*)node;
@ -2386,6 +2391,6 @@ int32_t extractKeysLen(const SArray* keys) {
SColumn* pCol = (SColumn*)taosArrayGet(keys, i); SColumn* pCol = (SColumn*)taosArrayGet(keys, i);
len += pCol->bytes; len += pCol->bytes;
} }
len += sizeof(int8_t) * keyNum; //null flag len += sizeof(int8_t) * keyNum; // null flag
return len; return len;
} }

View File

@ -486,6 +486,7 @@ int32_t schHandleNotifyCallback(void *param, SDataBuf *pMsg, int32_t code) {
SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param; SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param;
qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 " task notify rsp received, code:0x%x", pParam->queryId, pParam->taskId, qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 " task notify rsp received, code:0x%x", pParam->queryId, pParam->taskId,
code); code);
rpcReleaseHandle(pMsg->handle, TAOS_CONN_CLIENT);
if (pMsg) { if (pMsg) {
taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet); taosMemoryFree(pMsg->pEpSet);
@ -526,6 +527,7 @@ int32_t schHandleHbCallback(void *param, SDataBuf *pMsg, int32_t code) {
if (code) { if (code) {
qError("hb rsp error:%s", tstrerror(code)); qError("hb rsp error:%s", tstrerror(code));
rpcReleaseHandle(pMsg->handle, TAOS_CONN_CLIENT);
SCH_ERR_JRET(code); SCH_ERR_JRET(code);
} }