From c490820b09305813fd866ce5bec10763ab57f85a Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 14 Apr 2022 22:12:10 +0800 Subject: [PATCH 1/6] feat(query): add the merge join operator for child table inner join. --- source/libs/executor/inc/executorimpl.h | 25 +++- source/libs/executor/src/executorimpl.c | 184 +++++++++++++++++++++--- 2 files changed, 183 insertions(+), 26 deletions(-) diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index cbd2dc66f5..78bc6947cd 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -549,8 +549,6 @@ typedef struct SStateWindowOperatorInfo { typedef struct SSortedMergeOperatorInfo { SOptrBasicInfo binfo; - bool hasVarCol; - SArray* pSortInfo; int32_t numOfSources; SSortHandle *pSortHandle; @@ -582,6 +580,24 @@ typedef struct SSortOperatorInfo { uint64_t totalElapsed; // total elapsed time } SSortOperatorInfo; +typedef struct SJoinOperatorInfo { + SSDataBlock *pRes; + int32_t joinType; + + SSDataBlock *pLeft; + int32_t leftPos; + SColumnInfo leftCol; + + SSDataBlock *pRight; + int32_t rightPos; + SColumnInfo rightCol; + + SNode *pOnCondition; +// SJoinStatus *status; +// int32_t numOfUpstream; +// SRspResultInfo resultInfo; +} SJoinOperatorInfo; + int32_t operatorDummyOpenFn(SOperatorInfo* pOperator); void operatorDummyCloseFn(void* param, int32_t numOfCols); int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t num); @@ -628,6 +644,8 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo* SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SNode* pOnCondition, SExecTaskInfo* pTaskInfo); + #if 0 SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv); SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, @@ -635,9 +653,6 @@ SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntim SOperatorInfo* createAllMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createTagScanOperatorInfo(SReaderHandle* pReaderHandle, SExprInfo* pExpr, int32_t numOfOutput); - -SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pdownstream, int32_t numOfDownstream, SSchema* pSchema, - int32_t numOfOutput); #endif void projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx, int32_t numOfOutput, SArray* pPseudoList); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 14acff3204..45674d31bc 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -6600,10 +6600,10 @@ bool validateExprColumnInfo(SQueriedTableInfo* pTableInfo, SExprBasicInfo* pExpr static SResSchema createResSchema(int32_t type, int32_t bytes, int32_t slotId, int32_t scale, int32_t precision, const char* name) { SResSchema s = {0}; - s.scale = scale; - s.type = type; - s.bytes = bytes; - s.slotId = slotId; + s.scale = scale; + s.type = type; + s.bytes = bytes; + s.slotId = slotId; s.precision = precision; strncpy(s.name, name, tListLen(s.name)); @@ -6679,8 +6679,7 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t* SFunctionNode* pFuncNode = (SFunctionNode*)pTargetNode->pExpr; SDataType* pType = &pFuncNode->node.resType; - pExp->base.resSchema = createResSchema(pType->type, pType->bytes, pTargetNode->slotId, pType->scale, - pType->precision, pFuncNode->node.aliasName); + pExp->base.resSchema = createResSchema(pType->type, pType->bytes, pTargetNode->slotId, pType->scale, pType->precision, pFuncNode->node.aliasName); pExp->pExpr->_function.functionId = pFuncNode->funcId; pExp->pExpr->_function.pFunctNode = pFuncNode; @@ -6756,7 +6755,7 @@ static SArray* createIndexMap(SNodeList* pNodeList); static SArray* extractPartitionColInfo(SNodeList* pNodeList); SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, - uint64_t queryId, uint64_t taskId, STableGroupInfo* pTableGroupInfo) { + uint64_t queryId, uint64_t taskId, STableGroupInfo* pTableGroupInfo) { if (pPhyNode->pChildren == NULL || LIST_LENGTH(pPhyNode->pChildren) == 0) { int32_t type = nodeType(pPhyNode); if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == type) { @@ -6764,6 +6763,10 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo int32_t numOfCols = 0; tsdbReaderT pDataReader = doCreateDataReader((STableScanPhysiNode*)pPhyNode, pHandle, pTableGroupInfo, (uint64_t)queryId, taskId); + if (pDataReader == NULL) { + return NULL; + } + SArray* pColList = extractColMatchInfo(pScanPhyNode->pScanCols, pScanPhyNode->node.pOutputDataBlockDesc, &numOfCols); SSDataBlock* pResBlock = createOutputBuf_rv1(pScanPhyNode->node.pOutputDataBlockDesc); @@ -6802,13 +6805,15 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo } } + int32_t num = 0; int32_t type = nodeType(pPhyNode); - size_t size = LIST_LENGTH(pPhyNode->pChildren); - ASSERT(size == 1); + size_t size = LIST_LENGTH(pPhyNode->pChildren); - SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, 0); - SOperatorInfo* op = createOperatorTree(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo); - int32_t num = 0; + SOperatorInfo** ops = taosMemoryCalloc(size, POINTER_BYTES); + for(int32_t i = 0; i < size; ++i) { + SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, i); + ops[i] = createOperatorTree(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo); + } if (QUERY_NODE_PHYSICAL_PLAN_PROJECT == type) { SProjectPhysiNode* pProjPhyNode = (SProjectPhysiNode*) pPhyNode; @@ -6817,7 +6822,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc); SLimit limit = {.limit = pProjPhyNode->limit, .offset = pProjPhyNode->offset}; SLimit slimit = {.limit = pProjPhyNode->slimit, .offset = pProjPhyNode->soffset}; - return createProjectOperatorInfo(op, pExprInfo, num, pResBlock, &limit, &slimit, pTaskInfo); + return createProjectOperatorInfo(ops[0], pExprInfo, num, pResBlock, &limit, &slimit, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_AGG == type) { SAggPhysiNode* pAggNode = (SAggPhysiNode*)pPhyNode; SExprInfo* pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num); @@ -6831,9 +6836,9 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo pScalarExprInfo = createExprInfo(pAggNode->pExprs, NULL, &numOfScalarExpr); } - return createGroupOperatorInfo(op, pExprInfo, num, pResBlock, pColList, pAggNode->node.pConditions, pScalarExprInfo, numOfScalarExpr, pTaskInfo, NULL); + return createGroupOperatorInfo(ops[0], pExprInfo, num, pResBlock, pColList, pAggNode->node.pConditions, pScalarExprInfo, numOfScalarExpr, pTaskInfo, NULL); } else { - return createAggregateOperatorInfo(op, pExprInfo, num, pResBlock, pTaskInfo, pTableGroupInfo); + return createAggregateOperatorInfo(ops[0], pExprInfo, num, pResBlock, pTaskInfo, pTableGroupInfo); } } else if (QUERY_NODE_PHYSICAL_PLAN_INTERVAL == type) { SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode; @@ -6851,33 +6856,39 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo }; int32_t primaryTsSlotId = ((SColumnNode*) pIntervalPhyNode->window.pTspk)->slotId; - return createIntervalOperatorInfo(op, pExprInfo, num, pResBlock, &interval, primaryTsSlotId, pTableGroupInfo, pTaskInfo); + return createIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, primaryTsSlotId, pTableGroupInfo, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_SORT == type) { SSortPhysiNode* pSortPhyNode = (SSortPhysiNode*)pPhyNode; SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc); SArray* info = createSortInfo(pSortPhyNode->pSortKeys, pSortPhyNode->pTargets); SArray* slotMap = createIndexMap(pSortPhyNode->pTargets); - return createSortOperatorInfo(op, pResBlock, info, slotMap, pTaskInfo); + return createSortOperatorInfo(ops[0], pResBlock, info, slotMap, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW == type) { SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode; SExprInfo* pExprInfo = createExprInfo(pSessionNode->window.pFuncs, NULL, &num); SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc); - return createSessionAggOperatorInfo(op, pExprInfo, num, pResBlock, pSessionNode->gap, pTaskInfo); + return createSessionAggOperatorInfo(ops[0], pExprInfo, num, pResBlock, pSessionNode->gap, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_PARTITION == type) { SPartitionPhysiNode* pPartNode = (SPartitionPhysiNode*) pPhyNode; SArray* pColList = extractPartitionColInfo(pPartNode->pPartitionKeys); SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc); SExprInfo* pExprInfo = createExprInfo(pPartNode->pTargets, NULL, &num); - return createPartitionOperatorInfo(op, pExprInfo, num, pResBlock, pColList, pTaskInfo, NULL); - } else if (QUERY_NODE_STATE_WINDOW == type) { + return createPartitionOperatorInfo(ops[0], pExprInfo, num, pResBlock, pColList, pTaskInfo, NULL); + } else if (QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW == type) { SStateWinodwPhysiNode* pStateNode = (SStateWinodwPhysiNode*) pPhyNode; SExprInfo* pExprInfo = createExprInfo(pStateNode->window.pFuncs, NULL, &num); SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc); - return createStatewindowOperatorInfo(op, pExprInfo, num, pResBlock, pTaskInfo); + return createStatewindowOperatorInfo(ops[0], pExprInfo, num, pResBlock, pTaskInfo); + } else if (QUERY_NODE_PHYSICAL_PLAN_JOIN == type) { + SJoinPhysiNode* pJoinNode = (SJoinPhysiNode*) pPhyNode; + SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc); + + SExprInfo* pExprInfo = createExprInfo(pJoinNode->pTargets, NULL, &num); + return createJoinOperatorInfo(ops, size, pExprInfo, num, pResBlock, pJoinNode->pOnConditions, pTaskInfo); } else { ASSERT(0); } /*else if (pPhyNode->info.type == OP_MultiTableAggregate) { @@ -7374,4 +7385,135 @@ int32_t getOperatorExplainExecInfo(SOperatorInfo *operatorInfo, SExplainExecInfo return TSDB_CODE_SUCCESS; } +static SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator, bool* newgroup) { + SJoinOperatorInfo* pJoinInfo = pOperator->info; +// SOptrBasicInfo* pInfo = &pJoinInfo->binfo; + SSDataBlock* pRes = pJoinInfo->pRes; + blockDataCleanup(pRes); + blockDataEnsureCapacity(pRes, 4096); + + int32_t nrows = 0; + + while (1) { + bool prevVal = *newgroup; + + if (pJoinInfo->pLeft == NULL || pJoinInfo->leftPos >= pJoinInfo->pLeft->info.rows) { + SOperatorInfo* ds1 = pOperator->pDownstream[0]; + publishOperatorProfEvent(ds1, QUERY_PROF_BEFORE_OPERATOR_EXEC); + pJoinInfo->pLeft = ds1->getNextFn(ds1, newgroup); + publishOperatorProfEvent(ds1, QUERY_PROF_AFTER_OPERATOR_EXEC); + + pJoinInfo->leftPos = 0; + if (pJoinInfo->pLeft == NULL) { + setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED); + break; + } + } + + if (pJoinInfo->pRight == NULL || pJoinInfo->rightPos >= pJoinInfo->pRight->info.rows) { + SOperatorInfo* ds2 = pOperator->pDownstream[1]; + publishOperatorProfEvent(ds2, QUERY_PROF_BEFORE_OPERATOR_EXEC); + pJoinInfo->pRight = ds2->getNextFn(ds2, newgroup); + publishOperatorProfEvent(ds2, QUERY_PROF_AFTER_OPERATOR_EXEC); + + pJoinInfo->rightPos = 0; + if (pJoinInfo->pRight == NULL) { + setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED); + break; + } + } + + SColumnInfoData* pLeftCol = taosArrayGet(pJoinInfo->pLeft->pDataBlock, pJoinInfo->leftCol.slotId); + char* pLeftVal = colDataGetData(pLeftCol, pJoinInfo->leftPos); + + SColumnInfoData* pRightCol = taosArrayGet(pJoinInfo->pRight->pDataBlock, pJoinInfo->rightCol.slotId); + char* pRightVal = colDataGetData(pRightCol, pJoinInfo->rightPos); + + // only the timestamp match support for ordinary table + ASSERT(pLeftCol->info.type == TSDB_DATA_TYPE_TIMESTAMP); + if (*(int64_t*) pLeftVal == *(int64_t*) pRightVal) { + for(int32_t i = 0; i < pOperator->numOfOutput; ++i) { + SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, i); + + SExprInfo* pExprInfo = &pOperator->pExpr[i]; + + int32_t blockId = pExprInfo->base.pParam[0].pCol->dataBlockId; + int32_t slotId = pExprInfo->base.pParam[0].pCol->slotId; + + SColumnInfoData* pSrc = NULL; + if (pJoinInfo->pLeft->info.blockId == blockId) { + pSrc = taosArrayGet(pJoinInfo->pLeft->pDataBlock, slotId); + } else { + pSrc = taosArrayGet(pJoinInfo->pRight->pDataBlock, slotId); + } + + if (colDataIsNull_s(pSrc, pJoinInfo->leftPos)) { + colDataAppendNULL(pDst, nrows); + } else { + char* p = colDataGetData(pSrc, pJoinInfo->leftPos); + colDataAppend(pDst, nrows, p, false); + } + } + + pJoinInfo->leftPos += 1; + pJoinInfo->rightPos += 1; + + nrows += 1; + } else if (*(int64_t*) pLeftVal < *(int64_t*) pRightVal) { + pJoinInfo->leftPos += 1; + + if (pJoinInfo->leftPos >= pJoinInfo->pLeft->info.rows) { + continue; + } + } else if (*(int64_t*) pLeftVal > *(int64_t*) pRightVal) { + pJoinInfo->rightPos += 1; + if (pJoinInfo->rightPos >= pJoinInfo->pRight->info.rows) { + continue; + } + } + + // the pDataBlock are always the same one, no need to call this again + pRes->info.rows = nrows; + if (pRes->info.rows >= pOperator->resultInfo.threshold) { + break; + } + } + + return (pRes->info.rows > 0) ? pRes : NULL; +} + +SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SNode* pOnCondition, SExecTaskInfo* pTaskInfo) { + SJoinOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SJoinOperatorInfo)); + SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); + if (pOperator == NULL || pInfo == NULL) { + goto _error; + } + + pOperator->resultInfo.capacity = 4096; + pOperator->resultInfo.threshold = 4096 * 0.75; + +// initResultRowInf +// o(&pInfo->binfo.resultRowInfo, 8); + pInfo->pRes = pResBlock; + + pOperator->name = "JoinOperator"; + pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_JOIN; + pOperator->blockingOptr = true; + pOperator->status = OP_NOT_OPENED; + pOperator->pExpr = pExprInfo; + pOperator->numOfOutput = numOfCols; + pOperator->info = pInfo; + pOperator->pTaskInfo = pTaskInfo; + pOperator->getNextFn = doMergeJoin; + pOperator->closeFn = destroyBasicOperatorInfo; + + int32_t code = appendDownstream(pOperator, pDownstream, numOfDownstream); + return pOperator; + + _error: + taosMemoryFree(pInfo); + taosMemoryFree(pOperator); + pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; + return NULL; +} \ No newline at end of file From d2761b41145e50bb4ca4d640021e0b91753365df Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 15 Apr 2022 12:09:27 +0800 Subject: [PATCH 2/6] ehn(query): refactor c driver codes. --- source/client/inc/clientInt.h | 35 +++------ source/client/src/clientEnv.c | 7 +- source/client/src/clientImpl.c | 14 ++++ source/client/src/clientMain.c | 4 +- source/client/src/clientMsgHandler.c | 97 +++---------------------- source/libs/executor/src/executorimpl.c | 23 +++--- source/libs/qworker/src/qworker.c | 2 +- 7 files changed, 55 insertions(+), 127 deletions(-) diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index 0f12880272..32668356ee 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -43,13 +43,23 @@ extern "C" { } \ } while (0) +#define ERROR_MSG_BUF_DEFAULT_SIZE 512 #define HEARTBEAT_INTERVAL 1500 // ms +// todo refactor enum { CONN_TYPE__QUERY = 1, CONN_TYPE__TMQ, }; +enum { + RES_TYPE__QUERY = 1, + RES_TYPE__TMQ, +}; + +#define TD_RES_QUERY(res) (*(int8_t*)res == RES_TYPE__QUERY) +#define TD_RES_TMQ(res) (*(int8_t*)res == RES_TYPE__TMQ) + typedef struct SAppInstInfo SAppInstInfo; typedef struct { @@ -172,33 +182,15 @@ typedef struct SReqResultInfo { int32_t payloadLen; } SReqResultInfo; -typedef struct SShowReqInfo { - int64_t execId; // showId/queryId - int32_t vgId; - SArray* pArray; // SArray - int32_t currentIndex; // current accessed vgroup index. -} SShowReqInfo; - typedef struct SRequestSendRecvBody { tsem_t rspSem; // not used now void* fp; - SShowReqInfo showInfo; // todo this attribute will be removed after the query framework being completed. SDataBuf requestMsg; int64_t queryJob; // query job, created according to sql query DAG. struct SQueryPlan* pDag; // the query dag, generated according to the sql statement. SReqResultInfo resInfo; } SRequestSendRecvBody; -#define ERROR_MSG_BUF_DEFAULT_SIZE 512 - -enum { - RES_TYPE__QUERY = 1, - RES_TYPE__TMQ, -}; - -#define TD_RES_QUERY(res) (*(int8_t*)res == RES_TYPE__QUERY) -#define TD_RES_TMQ(res) (*(int8_t*)res == RES_TYPE__TMQ) - typedef struct { int8_t resType; char* topic; @@ -217,7 +209,6 @@ typedef struct SRequestObj { int32_t sqlLen; int64_t self; char* msgBuf; - void* pInfo; // sql parse info, generated by parser module int32_t code; SArray* dbList; SArray* tableList; @@ -252,7 +243,7 @@ extern int (*handleRequestRspFp[TDMT_MAX])(void*, const SDataBuf* pMsg, int32_t int genericRspCallback(void* param, const SDataBuf* pMsg, int32_t code); SMsgSendInfo* buildMsgInfoImpl(SRequestObj* pReqObj); -int taos_init(); +int taos_init(); void* createTscObj(const char* user, const char* auth, const char* db, SAppInstInfo* pAppInfo); void destroyTscObj(void* pObj); @@ -266,7 +257,6 @@ char* getDbOfConnection(STscObj* pObj); void setConnectionDB(STscObj* pTscObj, const char* db); void resetConnectDB(STscObj* pTscObj); -void taos_init_imp(void); int taos_options_imp(TSDB_OPTION option, const char* str); void* openTransporter(const char* user, const char* auth, int32_t numOfThreads); @@ -286,8 +276,7 @@ int32_t buildRequest(STscObj* pTscObj, const char* sql, int sqlLen, SRequestObj* void* doFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4); void doSetOneRowPtr(SReqResultInfo* pResultInfo); -int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows, - bool convertUcs4); +int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows, bool convertUcs4); void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols); int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp, bool convertUcs4); diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index fec6c8e5db..a258fd305e 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -33,7 +33,7 @@ SAppInfo appInfo; int32_t clientReqRefPool = -1; int32_t clientConnRefPool = -1; -static TdThreadOnce tscinit = PTHREAD_ONCE_INIT; +static TdThreadOnce tscinit = PTHREAD_ONCE_INIT; volatile int32_t tscInitRes = 0; static void registerRequest(SRequestObj *pRequest) { @@ -188,7 +188,6 @@ static void doDestroyRequest(void *p) { taosMemoryFreeClear(pRequest->msgBuf); taosMemoryFreeClear(pRequest->sqlstr); - taosMemoryFreeClear(pRequest->pInfo); taosMemoryFreeClear(pRequest->pDb); doFreeReqResultInfo(&pRequest->body.resInfo); @@ -198,10 +197,6 @@ static void doDestroyRequest(void *p) { schedulerFreeJob(pRequest->body.queryJob); } - if (pRequest->body.showInfo.pArray != NULL) { - taosArrayDestroy(pRequest->body.showInfo.pArray); - } - taosArrayDestroy(pRequest->tableList); taosArrayDestroy(pRequest->dbList); diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 96a7230ff3..b3615c208c 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -1,3 +1,17 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ #include "clientInt.h" #include "clientLog.h" diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 42d204284e..96ef8d0d21 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -193,7 +193,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) { } } else { - // assert to avoid uninitialization error + // assert to avoid un-initialization error ASSERT(0); } return NULL; @@ -355,6 +355,7 @@ int taos_result_precision(TAOS_RES *res) { if (res == NULL) { return TSDB_TIME_PRECISION_MILLI; } + if (TD_RES_QUERY(res)) { SRequestObj *pRequest = (SRequestObj *)res; return pRequest->body.resInfo.precision; @@ -467,6 +468,7 @@ int taos_fetch_raw_block(TAOS_RES *res, int *numOfRows, void **pData) { if (res == NULL) { return 0; } + if (TD_RES_TMQ(res)) { SReqResultInfo *pResultInfo = tmqGetNextResInfo(res); if (pResultInfo == NULL) { diff --git a/source/client/src/clientMsgHandler.c b/source/client/src/clientMsgHandler.c index 67c5679cac..87a8e86415 100644 --- a/source/client/src/clientMsgHandler.c +++ b/source/client/src/clientMsgHandler.c @@ -117,10 +117,10 @@ int32_t processUseDbRsp(void* param, const SDataBuf* pMsg, int32_t code) { struct SCatalog *pCatalog = NULL; if (usedbRsp.vgVersion >= 0) { - int32_t code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog); - if (code != TSDB_CODE_SUCCESS) { + int32_t code1 = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog); + if (code1 != TSDB_CODE_SUCCESS) { tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", pRequest->pTscObj->pAppInfo->clusterId, - tstrerror(code)); + tstrerror(code1)); } else { catalogRemoveDB(pCatalog, usedbRsp.db, usedbRsp.uid); } @@ -154,10 +154,10 @@ int32_t processUseDbRsp(void* param, const SDataBuf* pMsg, int32_t code) { } else { struct SCatalog* pCatalog = NULL; - int32_t code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog); - if (code != TSDB_CODE_SUCCESS) { + int32_t code1 = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog); + if (code1 != TSDB_CODE_SUCCESS) { tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", pRequest->pTscObj->pAppInfo->clusterId, - tstrerror(code)); + tstrerror(code1)); } else { catalogUpdateDBVgInfo(pCatalog, output.db, output.dbId, output.dbVgroup); } @@ -209,84 +209,9 @@ int32_t processDropDbRsp(void* param, const SDataBuf* pMsg, int32_t code) { } void initMsgHandleFp() { -#if 0 - tscBuildMsg[TSDB_SQL_SELECT] = tscBuildQueryMsg; - tscBuildMsg[TSDB_SQL_INSERT] = tscBuildSubmitMsg; - tscBuildMsg[TSDB_SQL_FETCH] = tscBuildFetchMsg; - - tscBuildMsg[TSDB_SQL_CREATE_DB] = tscBuildCreateDbMsg; - tscBuildMsg[TSDB_SQL_CREATE_USER] = tscBuildUserMsg; - tscBuildMsg[TSDB_SQL_CREATE_FUNCTION] = tscBuildCreateFuncMsg; - - tscBuildMsg[TSDB_SQL_CREATE_ACCT] = tscBuildAcctMsg; - tscBuildMsg[TSDB_SQL_ALTER_ACCT] = tscBuildAcctMsg; - - tscBuildMsg[TSDB_SQL_CREATE_TABLE] = tscBuildCreateTableMsg; - tscBuildMsg[TSDB_SQL_DROP_USER] = tscBuildDropUserAcctMsg; - tscBuildMsg[TSDB_SQL_DROP_ACCT] = tscBuildDropUserAcctMsg; - tscBuildMsg[TSDB_SQL_DROP_DB] = tscBuildDropDbMsg; - tscBuildMsg[TSDB_SQL_DROP_FUNCTION] = tscBuildDropFuncMsg; - tscBuildMsg[TSDB_SQL_SYNC_DB_REPLICA] = tscBuildSyncDbReplicaMsg; - tscBuildMsg[TSDB_SQL_DROP_TABLE] = tscBuildDropTableMsg; - tscBuildMsg[TSDB_SQL_ALTER_USER] = tscBuildUserMsg; - tscBuildMsg[TSDB_SQL_CREATE_DNODE] = tscBuildCreateDnodeMsg; - tscBuildMsg[TSDB_SQL_DROP_DNODE] = tscBuildDropDnodeMsg; - tscBuildMsg[TSDB_SQL_CFG_DNODE] = tscBuildCfgDnodeMsg; - tscBuildMsg[TSDB_SQL_ALTER_TABLE] = tscBuildAlterTableMsg; - tscBuildMsg[TSDB_SQL_UPDATE_TAG_VAL] = tscBuildUpdateTagMsg; - tscBuildMsg[TSDB_SQL_ALTER_DB] = tscAlterDbMsg; - tscBuildMsg[TSDB_SQL_COMPACT_VNODE] = tscBuildCompactMsg; - - - tscBuildMsg[TSDB_SQL_USE_DB] = tscBuildUseDbMsg; - tscBuildMsg[TSDB_SQL_STABLEVGROUP] = tscBuildSTableVgroupMsg; - tscBuildMsg[TSDB_SQL_RETRIEVE_FUNC] = tscBuildRetrieveFuncMsg; - - tscBuildMsg[TSDB_SQL_HB] = tscBuildHeartBeatMsg; - tscBuildMsg[TSDB_SQL_SHOW] = tscBuildShowMsg; - tscBuildMsg[TSDB_SQL_RETRIEVE_MNODE] = tscBuildRetrieveFromMgmtMsg; - tscBuildMsg[TSDB_SQL_KILL_QUERY] = tscBuildKillMsg; - tscBuildMsg[TSDB_SQL_KILL_STREAM] = tscBuildKillMsg; - tscBuildMsg[TSDB_SQL_KILL_CONNECTION] = tscBuildKillMsg; - - tscProcessMsgRsp[TSDB_SQL_SELECT] = tscProcessQueryRsp; - tscProcessMsgRsp[TSDB_SQL_FETCH] = tscProcessRetrieveRspFromNode; - - tscProcessMsgRsp[TSDB_SQL_DROP_DB] = tscProcessDropDbRsp; - tscProcessMsgRsp[TSDB_SQL_DROP_TABLE] = tscProcessDropTableRsp; - - tscProcessMsgRsp[TSDB_SQL_USE_DB] = tscProcessUseDbRsp; - tscProcessMsgRsp[TSDB_SQL_META] = tscProcessTableMetaRsp; - tscProcessMsgRsp[TSDB_SQL_STABLEVGROUP] = tscProcessSTableVgroupRsp; - tscProcessMsgRsp[TSDB_SQL_MULTI_META] = tscProcessMultiTableMetaRsp; - tscProcessMsgRsp[TSDB_SQL_RETRIEVE_FUNC] = tscProcessRetrieveFuncRsp; - - tscProcessMsgRsp[TSDB_SQL_SHOW] = tscProcessShowRsp; - tscProcessMsgRsp[TSDB_SQL_RETRIEVE_MNODE] = tscProcessRetrieveRspFromNode; // rsp handled by same function. - tscProcessMsgRsp[TSDB_SQL_DESCRIBE_TABLE] = tscProcessDescribeTableRsp; - - tscProcessMsgRsp[TSDB_SQL_CURRENT_DB] = tscProcessLocalRetrieveRsp; - tscProcessMsgRsp[TSDB_SQL_CURRENT_USER] = tscProcessLocalRetrieveRsp; - tscProcessMsgRsp[TSDB_SQL_SERV_VERSION] = tscProcessLocalRetrieveRsp; - tscProcessMsgRsp[TSDB_SQL_CLI_VERSION] = tscProcessLocalRetrieveRsp; - tscProcessMsgRsp[TSDB_SQL_SERV_STATUS] = tscProcessLocalRetrieveRsp; - - tscProcessMsgRsp[TSDB_SQL_RETRIEVE_EMPTY_RESULT] = tscProcessEmptyResultRsp; - - tscProcessMsgRsp[TSDB_SQL_RETRIEVE_GLOBALMERGE] = tscProcessRetrieveGlobalMergeRsp; - - tscProcessMsgRsp[TSDB_SQL_ALTER_TABLE] = tscProcessAlterTableMsgRsp; - tscProcessMsgRsp[TSDB_SQL_ALTER_DB] = tscProcessAlterDbMsgRsp; - tscProcessMsgRsp[TSDB_SQL_COMPACT_VNODE] = tscProcessCompactRsp; - - tscProcessMsgRsp[TSDB_SQL_SHOW_CREATE_TABLE] = tscProcessShowCreateRsp; - tscProcessMsgRsp[TSDB_SQL_SHOW_CREATE_STABLE] = tscProcessShowCreateRsp; - tscProcessMsgRsp[TSDB_SQL_SHOW_CREATE_DATABASE] = tscProcessShowCreateRsp; -#endif - - handleRequestRspFp[TMSG_INDEX(TDMT_MND_CONNECT)] = processConnectRsp; - handleRequestRspFp[TMSG_INDEX(TDMT_MND_CREATE_DB)] = processCreateDbRsp; - handleRequestRspFp[TMSG_INDEX(TDMT_MND_USE_DB)] = processUseDbRsp; - handleRequestRspFp[TMSG_INDEX(TDMT_MND_CREATE_STB)] = processCreateTableRsp; - handleRequestRspFp[TMSG_INDEX(TDMT_MND_DROP_DB)] = processDropDbRsp; + handleRequestRspFp[TMSG_INDEX(TDMT_MND_CONNECT)] = processConnectRsp; + handleRequestRspFp[TMSG_INDEX(TDMT_MND_CREATE_DB)] = processCreateDbRsp; + handleRequestRspFp[TMSG_INDEX(TDMT_MND_USE_DB)] = processUseDbRsp; + handleRequestRspFp[TMSG_INDEX(TDMT_MND_CREATE_STB)] = processCreateTableRsp; + handleRequestRspFp[TMSG_INDEX(TDMT_MND_DROP_DB)] = processDropDbRsp; } diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 45674d31bc..20b7169895 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -6815,6 +6815,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ops[i] = createOperatorTree(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo); } + SOperatorInfo* pOptr = NULL; if (QUERY_NODE_PHYSICAL_PLAN_PROJECT == type) { SProjectPhysiNode* pProjPhyNode = (SProjectPhysiNode*) pPhyNode; SExprInfo* pExprInfo = createExprInfo(pProjPhyNode->pProjections, NULL, &num); @@ -6822,7 +6823,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc); SLimit limit = {.limit = pProjPhyNode->limit, .offset = pProjPhyNode->offset}; SLimit slimit = {.limit = pProjPhyNode->slimit, .offset = pProjPhyNode->soffset}; - return createProjectOperatorInfo(ops[0], pExprInfo, num, pResBlock, &limit, &slimit, pTaskInfo); + pOptr = createProjectOperatorInfo(ops[0], pExprInfo, num, pResBlock, &limit, &slimit, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_AGG == type) { SAggPhysiNode* pAggNode = (SAggPhysiNode*)pPhyNode; SExprInfo* pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num); @@ -6836,9 +6837,9 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo pScalarExprInfo = createExprInfo(pAggNode->pExprs, NULL, &numOfScalarExpr); } - return createGroupOperatorInfo(ops[0], pExprInfo, num, pResBlock, pColList, pAggNode->node.pConditions, pScalarExprInfo, numOfScalarExpr, pTaskInfo, NULL); + pOptr = createGroupOperatorInfo(ops[0], pExprInfo, num, pResBlock, pColList, pAggNode->node.pConditions, pScalarExprInfo, numOfScalarExpr, pTaskInfo, NULL); } else { - return createAggregateOperatorInfo(ops[0], pExprInfo, num, pResBlock, pTaskInfo, pTableGroupInfo); + pOptr = createAggregateOperatorInfo(ops[0], pExprInfo, num, pResBlock, pTaskInfo, pTableGroupInfo); } } else if (QUERY_NODE_PHYSICAL_PLAN_INTERVAL == type) { SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode; @@ -6856,39 +6857,39 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo }; int32_t primaryTsSlotId = ((SColumnNode*) pIntervalPhyNode->window.pTspk)->slotId; - return createIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, primaryTsSlotId, pTableGroupInfo, pTaskInfo); + pOptr = createIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, primaryTsSlotId, pTableGroupInfo, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_SORT == type) { SSortPhysiNode* pSortPhyNode = (SSortPhysiNode*)pPhyNode; SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc); SArray* info = createSortInfo(pSortPhyNode->pSortKeys, pSortPhyNode->pTargets); SArray* slotMap = createIndexMap(pSortPhyNode->pTargets); - return createSortOperatorInfo(ops[0], pResBlock, info, slotMap, pTaskInfo); + pOptr = createSortOperatorInfo(ops[0], pResBlock, info, slotMap, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW == type) { SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode; SExprInfo* pExprInfo = createExprInfo(pSessionNode->window.pFuncs, NULL, &num); SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc); - return createSessionAggOperatorInfo(ops[0], pExprInfo, num, pResBlock, pSessionNode->gap, pTaskInfo); + pOptr = createSessionAggOperatorInfo(ops[0], pExprInfo, num, pResBlock, pSessionNode->gap, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_PARTITION == type) { SPartitionPhysiNode* pPartNode = (SPartitionPhysiNode*) pPhyNode; SArray* pColList = extractPartitionColInfo(pPartNode->pPartitionKeys); SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc); SExprInfo* pExprInfo = createExprInfo(pPartNode->pTargets, NULL, &num); - return createPartitionOperatorInfo(ops[0], pExprInfo, num, pResBlock, pColList, pTaskInfo, NULL); + pOptr = createPartitionOperatorInfo(ops[0], pExprInfo, num, pResBlock, pColList, pTaskInfo, NULL); } else if (QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW == type) { SStateWinodwPhysiNode* pStateNode = (SStateWinodwPhysiNode*) pPhyNode; SExprInfo* pExprInfo = createExprInfo(pStateNode->window.pFuncs, NULL, &num); SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc); - return createStatewindowOperatorInfo(ops[0], pExprInfo, num, pResBlock, pTaskInfo); + pOptr = createStatewindowOperatorInfo(ops[0], pExprInfo, num, pResBlock, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_JOIN == type) { SJoinPhysiNode* pJoinNode = (SJoinPhysiNode*) pPhyNode; SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc); SExprInfo* pExprInfo = createExprInfo(pJoinNode->pTargets, NULL, &num); - return createJoinOperatorInfo(ops, size, pExprInfo, num, pResBlock, pJoinNode->pOnConditions, pTaskInfo); + pOptr = createJoinOperatorInfo(ops, size, pExprInfo, num, pResBlock, pJoinNode->pOnConditions, pTaskInfo); } else { ASSERT(0); } /*else if (pPhyNode->info.type == OP_MultiTableAggregate) { @@ -6901,7 +6902,9 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo return createMultiTableAggOperatorInfo(op, pPhyNode->pTargets, pTaskInfo, pTableGroupInfo); } }*/ - return NULL; + + taosMemoryFree(ops); + return pOptr; } static tsdbReaderT createDataReaderImpl(STableScanPhysiNode* pTableScanNode, STableGroupInfo* pGroupInfo, diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 67871dfe62..a9a360e03e 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -1020,7 +1020,7 @@ int32_t qwProcessReady(QW_FPARAMS_DEF, SQWMsg *qwMsg) { } if (ctx->phase == QW_PHASE_PRE_QUERY) { - ctx->ctrlConnInfo.handle == qwMsg->connInfo.handle; + ctx->ctrlConnInfo.handle = qwMsg->connInfo.handle; ctx->ctrlConnInfo.ahandle = qwMsg->connInfo.ahandle; QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_READY); needRsp = false; From 367064591a7401249baf98b4087cf07af5d56c56 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 15 Apr 2022 13:40:50 +0800 Subject: [PATCH 3/6] refactor(driver): refactor c driver and remove redundant codes. --- include/libs/parser/parser.h | 1 + source/client/inc/clientInt.h | 6 +++--- source/client/src/client.c | 0 source/client/src/clientEnv.c | 10 +++++----- source/client/src/clientImpl.c | 2 +- 5 files changed, 10 insertions(+), 9 deletions(-) delete mode 100644 source/client/src/client.c diff --git a/include/libs/parser/parser.h b/include/libs/parser/parser.h index fef624288a..02cd073226 100644 --- a/include/libs/parser/parser.h +++ b/include/libs/parser/parser.h @@ -57,6 +57,7 @@ typedef struct SQuery { SNode* pRoot; int32_t numOfResCols; SSchema* pResSchema; + int8_t precision; SCmdMsgInfo* pCmdMsg; int32_t msgType; SArray* pDbList; diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index 32668356ee..6af1fd6016 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -204,11 +204,11 @@ typedef struct SRequestObj { uint64_t requestId; int32_t type; // request type STscObj* pTscObj; - char* pDb; + char* pDb; // current database string char* sqlstr; // sql string int32_t sqlLen; int64_t self; - char* msgBuf; + char* msgBuf; // error msg buffer int32_t code; SArray* dbList; SArray* tableList; @@ -276,8 +276,8 @@ int32_t buildRequest(STscObj* pTscObj, const char* sql, int sqlLen, SRequestObj* void* doFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4); void doSetOneRowPtr(SReqResultInfo* pResultInfo); -int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows, bool convertUcs4); void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols); +void setResPrecision(SReqResultInfo* pResInfo, int32_t precision); int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp, bool convertUcs4); // --- heartbeat diff --git a/source/client/src/client.c b/source/client/src/client.c deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index a258fd305e..8efee9bffb 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -27,10 +27,10 @@ #include "ttime.h" #define TSC_VAR_NOT_RELEASE 1 -#define TSC_VAR_RELEASED 0 +#define TSC_VAR_RELEASED 0 SAppInfo appInfo; -int32_t clientReqRefPool = -1; +int32_t clientReqRefPool = -1; int32_t clientConnRefPool = -1; static TdThreadOnce tscinit = PTHREAD_ONCE_INIT; @@ -48,8 +48,8 @@ static void registerRequest(SRequestObj *pRequest) { if (pTscObj->pAppInfo) { SInstanceSummary *pSummary = &pTscObj->pAppInfo->summary; - int32_t total = atomic_add_fetch_64(&pSummary->totalRequests, 1); - int32_t currentInst = atomic_add_fetch_64(&pSummary->currentRequests, 1); + int32_t total = atomic_add_fetch_64((int64_t*)&pSummary->totalRequests, 1); + int32_t currentInst = atomic_add_fetch_64((int64_t*)&pSummary->currentRequests, 1); tscDebug("0x%" PRIx64 " new Request from connObj:0x%" PRIx64 ", current:%d, app current:%d, total:%d, reqId:0x%" PRIx64, pRequest->self, pRequest->pTscObj->id, num, currentInst, total, pRequest->requestId); @@ -62,7 +62,7 @@ static void deregisterRequest(SRequestObj *pRequest) { STscObj * pTscObj = pRequest->pTscObj; SInstanceSummary *pActivity = &pTscObj->pAppInfo->summary; - int32_t currentInst = atomic_sub_fetch_64(&pActivity->currentRequests, 1); + int32_t currentInst = atomic_sub_fetch_64((int64_t*)&pActivity->currentRequests, 1); int32_t num = atomic_sub_fetch_32(&pTscObj->numOfReqs, 1); int64_t duration = taosGetTimestampUs() - pRequest->metric.start; diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index b3615c208c..85e3979477 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -229,7 +229,7 @@ int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArra } void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols) { - assert(pSchema != NULL && numOfCols > 0); + ASSERT(pSchema != NULL && numOfCols > 0); pResInfo->numOfCols = numOfCols; pResInfo->fields = taosMemoryCalloc(numOfCols, sizeof(TAOS_FIELD)); From eaa6cd56a68e91a28530a9e105553a2c82aff70c Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 15 Apr 2022 13:42:03 +0800 Subject: [PATCH 4/6] enh(query): add an new internal api to extract query result timestamp precision before the query response received from dnode. --- source/client/src/clientImpl.c | 9 +++++++++ source/libs/parser/src/parTranslater.c | 14 ++++++++++---- 2 files changed, 19 insertions(+), 4 deletions(-) diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 85e3979477..a64fa13dd8 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -175,6 +175,7 @@ int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery) { if (TSDB_CODE_SUCCESS == code) { if ((*pQuery)->haveResultSet) { setResSchemaInfo(&pRequest->body.resInfo, (*pQuery)->pResSchema, (*pQuery)->numOfResCols); + setResPrecision(&pRequest->body.resInfo, (*pQuery)->precision); } TSWAP(pRequest->dbList, (*pQuery)->pDbList, SArray*); @@ -253,6 +254,14 @@ void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t } } +void setResPrecision(SReqResultInfo* pResInfo, int32_t precision) { + if (precision != TSDB_TIME_PRECISION_MILLI && precision != TSDB_TIME_PRECISION_MICRO && precision != TSDB_TIME_PRECISION_NANO) { + return; + } + + pResInfo->precision = precision; +} + int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList) { void* pTransporter = pRequest->pTscObj->pAppInfo->pTransporter; diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 250c6c2fd1..f89f5fbb22 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -2198,6 +2198,10 @@ static int32_t extractSelectResultSchema(const SSelectStmt* pSelect, int32_t* nu return TSDB_CODE_SUCCESS; } +static int8_t extractResultTsPrecision(const SSelectStmt* pSelect) { + return pSelect->precision; +} + static int32_t extractExplainResultSchema(int32_t* numOfCols, SSchema** pSchema) { *numOfCols = 1; *pSchema = taosMemoryCalloc((*numOfCols), sizeof(SSchema)); @@ -2219,19 +2223,19 @@ static int32_t extractDescribeResultSchema(int32_t* numOfCols, SSchema** pSchema (*pSchema)[0].type = TSDB_DATA_TYPE_BINARY; (*pSchema)[0].bytes = DESCRIBE_RESULT_FIELD_LEN; - strcpy((*pSchema)[0].name, "Field"); + strcpy((*pSchema)[0].name, "field"); (*pSchema)[1].type = TSDB_DATA_TYPE_BINARY; (*pSchema)[1].bytes = DESCRIBE_RESULT_TYPE_LEN; - strcpy((*pSchema)[1].name, "Type"); + strcpy((*pSchema)[1].name, "type"); (*pSchema)[2].type = TSDB_DATA_TYPE_INT; (*pSchema)[2].bytes = tDataTypes[TSDB_DATA_TYPE_INT].bytes; - strcpy((*pSchema)[2].name, "Length"); + strcpy((*pSchema)[2].name, "length"); (*pSchema)[3].type = TSDB_DATA_TYPE_BINARY; (*pSchema)[3].bytes = DESCRIBE_RESULT_NOTE_LEN; - strcpy((*pSchema)[3].name, "Note"); + strcpy((*pSchema)[3].name, "note"); return TSDB_CODE_SUCCESS; } @@ -2912,6 +2916,8 @@ static int32_t setQuery(STranslateContext* pCxt, SQuery* pQuery) { if (TSDB_CODE_SUCCESS != extractResultSchema(pQuery->pRoot, &pQuery->numOfResCols, &pQuery->pResSchema)) { return TSDB_CODE_OUT_OF_MEMORY; } + + pQuery->precision = extractResultTsPrecision((SSelectStmt*) pQuery->pRoot); } if (NULL != pCxt->pDbs) { From 684db8eba818e8f735d7cf1b8ab0193a1982dbe6 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 15 Apr 2022 14:01:43 +0800 Subject: [PATCH 5/6] refactor(query): refactor some variable name and macro definitions. --- include/libs/function/function.h | 9 ++--- source/client/inc/clientInt.h | 8 ++++ source/libs/executor/src/executorimpl.c | 53 ++++++++----------------- source/libs/executor/src/scanoperator.c | 6 +-- source/libs/function/src/taggfunction.c | 38 +++++++++--------- 5 files changed, 50 insertions(+), 64 deletions(-) diff --git a/include/libs/function/function.h b/include/libs/function/function.h index 9fa89f0415..1303a1fb6a 100644 --- a/include/libs/function/function.h +++ b/include/libs/function/function.h @@ -110,7 +110,6 @@ typedef struct SFileBlockInfo { #define FUNCTION_COV 38 typedef struct SResultRowEntryInfo { -// int8_t hasResult:6; // result generated, not NULL value bool initialized:1; // output buffer has been initialized bool complete:1; // query has completed uint8_t isNullRes:6; // the result is null @@ -119,10 +118,10 @@ typedef struct SResultRowEntryInfo { // determine the real data need to calculated the result enum { - BLK_DATA_NO_NEEDED = 0x0, - BLK_DATA_STATIS_NEEDED = 0x1, - BLK_DATA_ALL_NEEDED = 0x3, - BLK_DATA_DISCARD = 0x4, // discard current data block since it is not qualified for filter + BLK_DATA_NOT_LOAD = 0x0, + BLK_DATA_SMA_LOAD = 0x1, + BLK_DATA_DATA_LOAD = 0x3, + BLK_DATA_FILTEROUT = 0x4, // discard current data block since it is not qualified for filter }; enum { diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index 5f99af5460..772ff5e69a 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -46,6 +46,14 @@ extern "C" { #define ERROR_MSG_BUF_DEFAULT_SIZE 512 #define HEARTBEAT_INTERVAL 1500 // ms +enum { + RES_TYPE__QUERY = 1, + RES_TYPE__TMQ, +}; + +#define TD_RES_QUERY(res) (*(int8_t*)res == RES_TYPE__QUERY) +#define TD_RES_TMQ(res) (*(int8_t*)res == RES_TYPE__TMQ) + typedef struct SAppInstInfo SAppInstInfo; typedef struct { diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 20b7169895..c911aef45f 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -2004,7 +2004,7 @@ static int32_t updateBlockLoadStatus(STaskAttr* pQuery, int32_t status) { bool hasFirstLastFunc = false; bool hasOtherFunc = false; - if (status == BLK_DATA_ALL_NEEDED || status == BLK_DATA_DISCARD) { + if (status == BLK_DATA_DATA_LOAD || status == BLK_DATA_FILTEROUT) { return status; } @@ -2023,11 +2023,11 @@ static int32_t updateBlockLoadStatus(STaskAttr* pQuery, int32_t status) { } } - if (hasFirstLastFunc && status == BLK_DATA_NO_NEEDED) { + if (hasFirstLastFunc && status == BLK_DATA_NOT_LOAD) { if (!hasOtherFunc) { - return BLK_DATA_DISCARD; + return BLK_DATA_FILTEROUT; } else { - return BLK_DATA_ALL_NEEDED; + return BLK_DATA_DATA_LOAD; } } @@ -2360,7 +2360,7 @@ static void doSetTagValueInParam(void* pTable, int32_t tagColId, SVarian static uint32_t doFilterByBlockTimeWindow(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock) { SqlFunctionCtx* pCtx = pTableScanInfo->pCtx; - uint32_t status = BLK_DATA_NO_NEEDED; + uint32_t status = BLK_DATA_NOT_LOAD; int32_t numOfOutput = pTableScanInfo->numOfOutput; for (int32_t i = 0; i < numOfOutput; ++i) { @@ -2369,11 +2369,11 @@ static uint32_t doFilterByBlockTimeWindow(STableScanInfo* pTableScanInfo, SSData // group by + first/last should not apply the first/last block filter if (functionId < 0) { - status |= BLK_DATA_ALL_NEEDED; + status |= BLK_DATA_DATA_LOAD; return status; } else { // status |= aAggs[functionId].dataReqFunc(&pTableScanInfo->pCtx[i], &pBlock->info.window, colId); - // if ((status & BLK_DATA_ALL_NEEDED) == BLK_DATA_ALL_NEEDED) { + // if ((status & BLK_DATA_DATA_LOAD) == BLK_DATA_DATA_LOAD) { // return status; // } } @@ -2384,7 +2384,7 @@ static uint32_t doFilterByBlockTimeWindow(STableScanInfo* pTableScanInfo, SSData int32_t loadDataBlockOnDemand(SExecTaskInfo* pTaskInfo, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, uint32_t* status) { - *status = BLK_DATA_NO_NEEDED; + *status = BLK_DATA_NOT_LOAD; pBlock->pDataBlock = NULL; pBlock->pBlockAgg = NULL; @@ -2397,36 +2397,15 @@ int32_t loadDataBlockOnDemand(SExecTaskInfo* pTaskInfo, STableScanInfo* pTableSc pCost->totalBlocks += 1; pCost->totalRows += pBlock->info.rows; #if 0 - if (pRuntimeEnv->pTsBuf != NULL) { - (*status) = BLK_DATA_ALL_NEEDED; - - if (pQueryAttr->stableQuery) { // todo refactor - SExprInfo* pExprInfo = &pTableScanInfo->pExpr[0]; - int16_t tagId = (int16_t)pExprInfo->base.param[0].i; - SColumnInfo* pColInfo = doGetTagColumnInfoById(pQueryAttr->tagColList, pQueryAttr->numOfTags, tagId); - - // compare tag first - SVariant t = {0}; - doSetTagValueInParam(pRuntimeEnv->current->pTable, tagId, &t, pColInfo->type, pColInfo->bytes); - setTimestampListJoinInfo(pRuntimeEnv, &t, pRuntimeEnv->current); - - STSElem elem = tsBufGetElem(pRuntimeEnv->pTsBuf); - if (!tsBufIsValidElem(&elem) || (tsBufIsValidElem(&elem) && (taosVariantCompare(&t, elem.tag) != 0))) { - (*status) = BLK_DATA_DISCARD; - return TSDB_CODE_SUCCESS; - } - } - } - // Calculate all time windows that are overlapping or contain current data block. // If current data block is contained by all possible time window, do not load current data block. if (/*pQueryAttr->pFilters || */pQueryAttr->groupbyColumn || pQueryAttr->sw.gap > 0 || (QUERY_IS_INTERVAL_QUERY(pQueryAttr) && overlapWithTimeWindow(pTaskInfo, &pBlock->info))) { - (*status) = BLK_DATA_ALL_NEEDED; + (*status) = BLK_DATA_DATA_LOAD; } // check if this data block is required to load - if ((*status) != BLK_DATA_ALL_NEEDED) { + if ((*status) != BLK_DATA_DATA_LOAD) { bool needFilter = true; // the pCtx[i] result is belonged to previous time window since the outputBuf has not been set yet, @@ -2458,18 +2437,18 @@ int32_t loadDataBlockOnDemand(SExecTaskInfo* pTaskInfo, STableScanInfo* pTableSc if (needFilter) { (*status) = doFilterByBlockTimeWindow(pTableScanInfo, pBlock); } else { - (*status) = BLK_DATA_ALL_NEEDED; + (*status) = BLK_DATA_DATA_LOAD; } } SDataBlockInfo* pBlockInfo = &pBlock->info; // *status = updateBlockLoadStatus(pRuntimeEnv->pQueryAttr, *status); - if ((*status) == BLK_DATA_NO_NEEDED || (*status) == BLK_DATA_DISCARD) { + if ((*status) == BLK_DATA_NOT_LOAD || (*status) == BLK_DATA_FILTEROUT) { //qDebug("QInfo:0x%"PRIx64" data block discard, brange:%" PRId64 "-%" PRId64 ", rows:%d", pQInfo->qId, pBlockInfo->window.skey, // pBlockInfo->window.ekey, pBlockInfo->rows); pCost->discardBlocks += 1; - } else if ((*status) == BLK_DATA_STATIS_NEEDED) { + } else if ((*status) == BLK_DATA_SMA_LOAD) { // this function never returns error? pCost->loadBlockStatis += 1; // tsdbRetrieveDataBlockStatisInfo(pTableScanInfo->pTsdbReadHandle, &pBlock->pBlockAgg); @@ -2479,7 +2458,7 @@ int32_t loadDataBlockOnDemand(SExecTaskInfo* pTaskInfo, STableScanInfo* pTableSc pCost->totalCheckedRows += pBlock->info.rows; } } else { - assert((*status) == BLK_DATA_ALL_NEEDED); + assert((*status) == BLK_DATA_DATA_LOAD); // load the data block statistics to perform further filter pCost->loadBlockStatis += 1; @@ -2511,7 +2490,7 @@ int32_t loadDataBlockOnDemand(SExecTaskInfo* pTaskInfo, STableScanInfo* pTableSc pCost->discardBlocks += 1; //qDebug("QInfo:0x%"PRIx64" data block discard, brange:%" PRId64 "-%" PRId64 ", rows:%d", pQInfo->qId, // pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows); - (*status) = BLK_DATA_DISCARD; + (*status) = BLK_DATA_FILTEROUT; return TSDB_CODE_SUCCESS; } } @@ -2523,7 +2502,7 @@ int32_t loadDataBlockOnDemand(SExecTaskInfo* pTaskInfo, STableScanInfo* pTableSc // pCost->discardBlocks += 1; // qDebug("QInfo:0x%"PRIx64" data block discard, brange:%" PRId64 "-%" PRId64 ", rows:%d", pQInfo->qId, pBlockInfo->window.skey, // pBlockInfo->window.ekey, pBlockInfo->rows); -// (*status) = BLK_DATA_DISCARD; +// (*status) = BLK_DATA_FILTEROUT; // return TSDB_CODE_SUCCESS; // } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 0286b6e6e9..901bfde6a4 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -73,7 +73,7 @@ int32_t loadDataBlock(SExecTaskInfo* pTaskInfo, STableScanInfo* pTableScanInfo, pCost->totalCheckedRows += pBlock->info.rows; pCost->loadBlocks += 1; - *status = BLK_DATA_ALL_NEEDED; + *status = BLK_DATA_DATA_LOAD; SArray* pCols = tsdbRetrieveDataBlock(pTableScanInfo->dataReader, NULL); if (pCols == NULL) { @@ -138,7 +138,7 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator, bool* newgroup) { // } // this function never returns error? - uint32_t status = BLK_DATA_ALL_NEEDED; + uint32_t status = BLK_DATA_DATA_LOAD; int32_t code = loadDataBlock(pTaskInfo, pTableScanInfo, pBlock, &status); // int32_t code = loadDataBlockOnDemand(pOperator->pRuntimeEnv, pTableScanInfo, pBlock, &status); if (code != TSDB_CODE_SUCCESS) { @@ -146,7 +146,7 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator, bool* newgroup) { } // current block is ignored according to filter result by block statistics data, continue load the next block - if (status == BLK_DATA_DISCARD || pBlock->info.rows == 0) { + if (status == BLK_DATA_FILTEROUT || pBlock->info.rows == 0) { continue; } diff --git a/source/libs/function/src/taggfunction.c b/source/libs/function/src/taggfunction.c index 60566d00d8..e001ef4071 100644 --- a/source/libs/function/src/taggfunction.c +++ b/source/libs/function/src/taggfunction.c @@ -557,14 +557,14 @@ static void count_func_merge(SqlFunctionCtx *pCtx) { */ int32_t countRequired(SqlFunctionCtx *pCtx, STimeWindow* w, int32_t colId) { if (colId == PRIMARYKEY_TIMESTAMP_COL_ID) { - return BLK_DATA_NO_NEEDED; + return BLK_DATA_NOT_LOAD; } else { - return BLK_DATA_STATIS_NEEDED; + return BLK_DATA_SMA_LOAD; } } int32_t noDataRequired(SqlFunctionCtx *pCtx, STimeWindow* w, int32_t colId) { - return BLK_DATA_NO_NEEDED; + return BLK_DATA_NOT_LOAD; } #define LIST_ADD_N_DOUBLE_FLOAT(x, ctx, p, t, numOfElem, tsdbType) \ do { \ @@ -743,76 +743,76 @@ static void sum_func_merge(SqlFunctionCtx *pCtx) { } static int32_t statisRequired(SqlFunctionCtx *pCtx, STimeWindow* w, int32_t colId) { - return BLK_DATA_STATIS_NEEDED; + return BLK_DATA_SMA_LOAD; } static int32_t dataBlockRequired(SqlFunctionCtx *pCtx, STimeWindow* w, int32_t colId) { - return BLK_DATA_ALL_NEEDED; + return BLK_DATA_DATA_LOAD; } // todo: if column in current data block are null, opt for this case static int32_t firstFuncRequired(SqlFunctionCtx *pCtx, STimeWindow* w, int32_t colId) { if (pCtx->order == TSDB_ORDER_DESC) { - return BLK_DATA_NO_NEEDED; + return BLK_DATA_NOT_LOAD; } // no result for first query, data block is required if (GET_RES_INFO(pCtx) == NULL || GET_RES_INFO(pCtx)->numOfRes <= 0) { - return BLK_DATA_ALL_NEEDED; + return BLK_DATA_DATA_LOAD; } else { - return BLK_DATA_NO_NEEDED; + return BLK_DATA_NOT_LOAD; } } static int32_t lastFuncRequired(SqlFunctionCtx *pCtx, STimeWindow* w, int32_t colId) { if (pCtx->order != pCtx->param[0].i) { - return BLK_DATA_NO_NEEDED; + return BLK_DATA_NOT_LOAD; } if (GET_RES_INFO(pCtx) == NULL || GET_RES_INFO(pCtx)->numOfRes <= 0) { - return BLK_DATA_ALL_NEEDED; + return BLK_DATA_DATA_LOAD; } else { - return BLK_DATA_NO_NEEDED; + return BLK_DATA_NOT_LOAD; } } static int32_t firstDistFuncRequired(SqlFunctionCtx *pCtx, STimeWindow* w, int32_t colId) { if (pCtx->order == TSDB_ORDER_DESC) { - return BLK_DATA_NO_NEEDED; + return BLK_DATA_NOT_LOAD; } // not initialized yet, it is the first block, load it. if (pCtx->pOutput == NULL) { - return BLK_DATA_ALL_NEEDED; + return BLK_DATA_DATA_LOAD; } // the pCtx should be set to current Ctx and output buffer before call this function. Otherwise, pCtx->pOutput is // the previous windowRes output buffer, not current unloaded block. In this case, the following filter is invalid SFirstLastInfo *pInfo = (SFirstLastInfo*) (pCtx->pOutput + pCtx->inputBytes); if (pInfo->hasResult != DATA_SET_FLAG) { - return BLK_DATA_ALL_NEEDED; + return BLK_DATA_DATA_LOAD; } else { // data in current block is not earlier than current result - return (pInfo->ts <= w->skey) ? BLK_DATA_NO_NEEDED : BLK_DATA_ALL_NEEDED; + return (pInfo->ts <= w->skey) ? BLK_DATA_NOT_LOAD : BLK_DATA_DATA_LOAD; } } static int32_t lastDistFuncRequired(SqlFunctionCtx *pCtx, STimeWindow* w, int32_t colId) { if (pCtx->order != pCtx->param[0].i) { - return BLK_DATA_NO_NEEDED; + return BLK_DATA_NOT_LOAD; } // not initialized yet, it is the first block, load it. if (pCtx->pOutput == NULL) { - return BLK_DATA_ALL_NEEDED; + return BLK_DATA_DATA_LOAD; } // the pCtx should be set to current Ctx and output buffer before call this function. Otherwise, pCtx->pOutput is // the previous windowRes output buffer, not current unloaded block. In this case, the following filter is invalid SFirstLastInfo *pInfo = (SFirstLastInfo*) (pCtx->pOutput + pCtx->inputBytes); if (pInfo->hasResult != DATA_SET_FLAG) { - return BLK_DATA_ALL_NEEDED; + return BLK_DATA_DATA_LOAD; } else { - return (pInfo->ts > w->ekey) ? BLK_DATA_NO_NEEDED : BLK_DATA_ALL_NEEDED; + return (pInfo->ts > w->ekey) ? BLK_DATA_NOT_LOAD : BLK_DATA_DATA_LOAD; } } From ad3675daed5c826e7ba77698f5c7be621b785fad Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 15 Apr 2022 14:13:29 +0800 Subject: [PATCH 6/6] test: fix an typo in session.sim script. --- tests/script/tsim/query/session.sim | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/script/tsim/query/session.sim b/tests/script/tsim/query/session.sim index 3aee838625..d4920ea255 100644 --- a/tests/script/tsim/query/session.sim +++ b/tests/script/tsim/query/session.sim @@ -288,7 +288,7 @@ endi print ================> syntax error check not active ================> reactive sql_error select * from dev_001 session(ts,1w) -sql select count(*) from st session(ts,1w) +sql_error select count(*) from st session(ts,1w) sql_error select count(*) from dev_001 group by tagtype session(ts,1w) sql select count(*) from dev_001 session(ts,1n) sql select count(*) from dev_001 session(ts,1y)