From 409ad8c4c18fb33609dc031db996e01220d71bd1 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 22 Mar 2021 15:26:24 +0800 Subject: [PATCH] [td-3318] --- src/client/inc/tscUtil.h | 20 +- src/client/inc/tsclient.h | 18 +- src/client/src/tscAsync.c | 39 +-- src/client/src/tscLocal.c | 34 +-- src/client/src/tscLocalMerge.c | 32 +- src/client/src/tscParseInsert.c | 8 +- src/client/src/tscPrepare.c | 2 +- src/client/src/tscSQLParser.c | 497 +++++++++++++++++++------------- src/client/src/tscServer.c | 64 ++-- src/client/src/tscSql.c | 20 +- src/client/src/tscStream.c | 12 +- src/client/src/tscSub.c | 6 +- src/client/src/tscSubquery.c | 139 +++++---- src/client/src/tscUtil.c | 118 +++++--- src/inc/taosdef.h | 7 +- src/query/inc/qSqlparser.h | 8 +- src/query/inc/sql.y | 11 +- src/query/src/qSqlParser.c | 16 +- src/query/src/sql.c | 27 +- 19 files changed, 607 insertions(+), 471 deletions(-) diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index f69ee23222..3ec44875fd 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -88,9 +88,8 @@ typedef struct SVgroupTableInfo { SArray* itemList; //SArray } SVgroupTableInfo; -static FORCE_INLINE SQueryInfo* tscGetQueryInfoDetail(SSqlCmd* pCmd, int32_t subClauseIndex) { +static FORCE_INLINE SQueryInfo* tscGetQueryInfo(SSqlCmd* pCmd, int32_t subClauseIndex) { assert(pCmd != NULL && subClauseIndex >= 0); - if (pCmd->pQueryInfo == NULL || subClauseIndex >= pCmd->numOfClause) { return NULL; } @@ -98,6 +97,8 @@ static FORCE_INLINE SQueryInfo* tscGetQueryInfoDetail(SSqlCmd* pCmd, int32_t sub return pCmd->pQueryInfo[subClauseIndex]; } +SQueryInfo* tscGetActiveQueryInfo(SSqlCmd* pCmd); + int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOffset, SName* name, STableMeta* pTableMeta, STableDataBlocks** dataBlocks); void tscDestroyDataBlock(STableDataBlocks* pDataBlock, bool removeMeta); void tscSortRemoveDataBlockDupRows(STableDataBlocks* dataBuf); @@ -208,8 +209,11 @@ bool tscShouldBeFreed(SSqlObj* pSql); STableMetaInfo* tscGetTableMetaInfoFromCmd(SSqlCmd *pCmd, int32_t subClauseIndex, int32_t tableIndex); STableMetaInfo* tscGetMetaInfo(SQueryInfo *pQueryInfo, int32_t tableIndex); -SQueryInfo *tscGetQueryInfoDetail(SSqlCmd* pCmd, int32_t subClauseIndex); -SQueryInfo *tscGetQueryInfoDetailSafely(SSqlCmd *pCmd, int32_t subClauseIndex); +void tscInitQueryInfo(SQueryInfo* pQueryInfo); +void tscClearSubqueryInfo(SSqlCmd* pCmd); +int32_t tscAddQueryInfo(SSqlCmd *pCmd); +SQueryInfo *tscGetQueryInfo(SSqlCmd* pCmd, int32_t subClauseIndex); +SQueryInfo *tscGetQueryInfoS(SSqlCmd *pCmd, int32_t subClauseIndex); void tscClearTableMetaInfo(STableMetaInfo* pTableMetaInfo); @@ -217,11 +221,7 @@ STableMetaInfo* tscAddTableMetaInfo(SQueryInfo* pQueryInfo, SName* name, STableM SVgroupsInfo* vgroupList, SArray* pTagCols, SArray* pVgroupTables); STableMetaInfo* tscAddEmptyMetaInfo(SQueryInfo *pQueryInfo); -int32_t tscAddSubqueryInfo(SSqlCmd *pCmd); -void tscInitQueryInfo(SQueryInfo* pQueryInfo); - -void tscClearSubqueryInfo(SSqlCmd* pCmd); void tscFreeVgroupTableInfo(SArray* pVgroupTables); SArray* tscVgroupTableInfoDup(SArray* pVgroupTables); void tscRemoveVgroupTableGroup(SArray* pVgroupTable, int32_t index); @@ -233,6 +233,8 @@ int tscGetTableMetaEx(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, bool creat void tscResetForNextRetrieve(SSqlRes* pRes); void tscDoQuery(SSqlObj* pSql); +void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo); +void doExecuteQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo); SVgroupsInfo* tscVgroupInfoClone(SVgroupsInfo *pInfo); void* tscVgroupInfoClear(SVgroupsInfo *pInfo); @@ -266,7 +268,7 @@ void doAddGroupColumnForSubquery(SQueryInfo* pQueryInfo, int32_t tagIndex); int16_t tscGetJoinTagColIdByUid(STagCond* pTagCond, uint64_t uid); int16_t tscGetTagColIndexById(STableMeta* pTableMeta, int16_t colId); -void tscPrintSelectClause(SSqlObj* pSql, int32_t subClauseIndex); +void tscPrintSelNodeList(SSqlObj* pSql, int32_t subClauseIndex); bool hasMoreVnodesToTry(SSqlObj *pSql); bool hasMoreClauseToTry(SSqlObj* pSql); diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 1740266518..9b5cd8fa0f 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -228,6 +228,9 @@ typedef struct SQueryInfo { int32_t round; // 0/1/.... int32_t bufLen; char* buf; + + struct SQueryInfo *sibling; // sibling + SArray *pUpstream; //SArray } SQueryInfo; typedef struct { @@ -242,8 +245,6 @@ typedef struct { }; uint32_t insertType; // TODO remove it - int32_t clauseIndex; // index of multiple subclause query - char * curSql; // current sql, resume position of sql after parsing paused int8_t parseFinished; char reserve2[3]; // fix bus error on arm32 @@ -253,22 +254,26 @@ typedef struct { uint32_t allocSize; char * payload; int32_t payloadLen; + SQueryInfo **pQueryInfo; int32_t numOfClause; + int32_t clauseIndex; // index of multiple subclause query + SQueryInfo *active; // current active query info + int32_t batchSize; // for parameter ('?') binding and batch processing int32_t numOfParams; int8_t dataSourceType; // load data from file or not - char reserve4[3]; // fix bus error on arm32 + char reserve4[3]; // fix bus error on arm32 int8_t submitSchema; // submit block is built with table schema - char reserve5[3]; // fix bus error on arm32 + char reserve5[3]; // fix bus error on arm32 STagData tagData; // NOTE: pTagData->data is used as a variant length array SName **pTableNameList; // all involved tableMeta list of current insert sql statement. int32_t numOfTables; SHashObj *pTableBlockHashList; // data block for each table - SArray *pDataBlocks; // SArray. Merged submit block for each vgroup + SArray *pDataBlocks; // SArray. Merged submit block for each vgroup } SSqlCmd; typedef struct SResRec { @@ -410,7 +415,7 @@ void tscInitMsgsFp(); int tsParseSql(SSqlObj *pSql, bool initial); void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet); -int tscProcessSql(SSqlObj *pSql); +int tscProcessSql(SSqlObj *pSql, SQueryInfo* pQueryInfo); int tscRenewTableMeta(SSqlObj *pSql, int32_t tableIndex); void tscAsyncResultOnError(SSqlObj *pSql); @@ -425,6 +430,7 @@ void tscRestoreFuncForSTableQuery(SQueryInfo *pQueryInfo); int32_t tscCreateResPointerInfo(SSqlRes *pRes, SQueryInfo *pQueryInfo); void tscSetResRawPtr(SSqlRes* pRes, SQueryInfo* pQueryInfo); +void prepareInputDataFromUpstream(SSqlRes* pRes, SQueryInfo* pQueryInfo); void tscResetSqlCmd(SSqlCmd *pCmd, bool removeMeta); diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index 5cba897b30..0eeab22a0c 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -69,7 +69,8 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, __async_cb_func_t fp, void* para return; } - tscDoQuery(pSql); + SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex); + executeQuery(pSql, pQueryInfo); } // TODO return the correct error code to client in tscQueueAsyncError @@ -179,7 +180,7 @@ static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRo if (pCmd->command == TSDB_SQL_TABLE_JOIN_RETRIEVE) { tscFetchDatablockForSubquery(pSql); } else { - tscProcessSql(pSql); + tscProcessSql(pSql, NULL); } } @@ -193,8 +194,8 @@ static void tscAsyncQueryRowsForNextVnode(void *param, TAOS_RES *tres, int numOf tscProcessAsyncRetrieveImpl(param, tres, numOfRows, tscAsyncFetchRowsProxy); } -void taos_fetch_rows_a(TAOS_RES *taosa, __async_cb_func_t fp, void *param) { - SSqlObj *pSql = (SSqlObj *)taosa; +void taos_fetch_rows_a(TAOS_RES *tres, __async_cb_func_t fp, void *param) { + SSqlObj *pSql = (SSqlObj *)tres; if (pSql == NULL || pSql->signature != pSql) { tscError("sql object is NULL"); tscQueueAsyncError(fp, param, TSDB_CODE_TSC_DISCONNECTED); @@ -206,18 +207,17 @@ void taos_fetch_rows_a(TAOS_RES *taosa, __async_cb_func_t fp, void *param) { // user-defined callback function is stored in fetchFp pSql->fetchFp = fp; - pSql->fp = tscAsyncFetchRowsProxy; + pSql->fp = tscAsyncFetchRowsProxy; + pSql->param = param; if (pRes->qhandle == 0) { tscError("qhandle is NULL"); pRes->code = TSDB_CODE_TSC_INVALID_QHANDLE; - pSql->param = param; tscAsyncResultOnError(pSql); return; } - pSql->param = param; tscResetForNextRetrieve(pRes); // handle the sub queries of join query @@ -255,8 +255,9 @@ void taos_fetch_rows_a(TAOS_RES *taosa, __async_cb_func_t fp, void *param) { if (pCmd->command != TSDB_SQL_RETRIEVE_LOCALMERGE && pCmd->command < TSDB_SQL_LOCAL) { pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH; } - - tscProcessSql(pSql); + + SQueryInfo* pQueryInfo1 = tscGetActiveQueryInfo(&pSql->cmd); + tscProcessSql(pSql, pQueryInfo1); } } @@ -330,7 +331,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { tscDebug("%p get %s successfully", pSql, msg); if (pSql->pStream == NULL) { - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); + SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex); // check if it is a sub-query of super table query first, if true, enter another routine if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, (TSDB_QUERY_TYPE_STABLE_SUBQUERY|TSDB_QUERY_TYPE_SUBQUERY|TSDB_QUERY_TYPE_TAG_FILTER_QUERY))) { @@ -348,7 +349,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { assert((tscGetNumOfTags(pTableMetaInfo->pTableMeta) != 0)); // tscProcessSql can add error into async res - tscProcessSql(pSql); + tscProcessSql(pSql, NULL); taosReleaseRef(tscObjRef, pSql->self); return; } else { // continue to process normal async query @@ -379,9 +380,9 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { goto _error; } - tscProcessSql(pSql); + tscProcessSql(pSql, NULL); } else { // in all other cases, simple retry - tscProcessSql(pSql); + tscProcessSql(pSql, NULL); } taosReleaseRef(tscObjRef, pSql->self); @@ -408,11 +409,15 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { } (*pSql->fp)(pSql->param, pSql, code); - taosReleaseRef(tscObjRef, pSql->self); - return; + } else if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_INSERT)) { + tscHandleMultivnodeInsert(pSql); + } else { + SQueryInfo* pQueryInfo1 = tscGetQueryInfo(pCmd, pCmd->clauseIndex); + executeQuery(pSql, pQueryInfo1); } - // proceed to invoke the tscDoQuery(); + taosReleaseRef(tscObjRef, pSql->self); + return; } } @@ -449,7 +454,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { return; } - tscDoQuery(pSql); +// tscDoQuery(pSql); taosReleaseRef(tscObjRef, pSql->self); diff --git a/src/client/src/tscLocal.c b/src/client/src/tscLocal.c index 820572859e..5e96db69c8 100644 --- a/src/client/src/tscLocal.c +++ b/src/client/src/tscLocal.c @@ -53,7 +53,7 @@ static int32_t tscSetValueToResObj(SSqlObj *pSql, int32_t rowLen) { SSqlRes *pRes = &pSql->res; // one column for each row - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); + SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableMeta * pMeta = pTableMetaInfo->pTableMeta; @@ -154,7 +154,7 @@ static int32_t tscBuildTableSchemaResultFields(SSqlObj *pSql, int32_t numOfCols, pSql->cmd.numOfCols = numOfCols; - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); + SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0); pQueryInfo->order.order = TSDB_ORDER_ASC; TAOS_FIELD f = {.type = TSDB_DATA_TYPE_BINARY, .bytes = (TSDB_COL_NAME_LEN - 1) + VARSTR_HEADER_SIZE}; @@ -199,7 +199,7 @@ static int32_t tscBuildTableSchemaResultFields(SSqlObj *pSql, int32_t numOfCols, } static int32_t tscProcessDescribeTable(SSqlObj *pSql) { - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); + SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0); assert(tscGetMetaInfo(pQueryInfo, 0)->pTableMeta != NULL); @@ -389,7 +389,7 @@ static int32_t tscSCreateBuildResultFields(SSqlObj *pSql, BuildType type, const SColumnIndex index = {0}; pSql->cmd.numOfCols = 2; - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); + SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0); pQueryInfo->order.order = TSDB_ORDER_ASC; TAOS_FIELD f; @@ -427,7 +427,7 @@ static int32_t tscSCreateBuildResultFields(SSqlObj *pSql, BuildType type, const static int32_t tscSCreateSetValueToResObj(SSqlObj *pSql, int32_t rowLen, const char *tableName, const char *ddl) { SSqlRes *pRes = &pSql->res; - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); + SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0); int32_t numOfRows = 1; if (strlen(ddl) == 0) { @@ -444,7 +444,7 @@ static int32_t tscSCreateSetValueToResObj(SSqlObj *pSql, int32_t rowLen, const c return 0; } static int32_t tscSCreateBuildResult(SSqlObj *pSql, BuildType type, const char *str, const char *result) { - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); + SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0); int32_t rowLen = tscSCreateBuildResultFields(pSql, type, result); tscFieldInfoUpdateOffset(pQueryInfo); @@ -552,7 +552,7 @@ static int32_t tscGetTableTagColumnName(SSqlObj *pSql, char **result) { return TSDB_CODE_SUCCESS; } static int32_t tscRebuildDDLForSubTable(SSqlObj *pSql, const char *tableName, char *ddl) { - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); + SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableMeta * pMeta = pTableMetaInfo->pTableMeta; @@ -606,7 +606,7 @@ static int32_t tscRebuildDDLForSubTable(SSqlObj *pSql, const char *tableName, ch } static int32_t tscRebuildDDLForNormalTable(SSqlObj *pSql, const char *tableName, char *ddl) { - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); + SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableMeta * pMeta = pTableMetaInfo->pTableMeta; @@ -633,7 +633,7 @@ static int32_t tscRebuildDDLForNormalTable(SSqlObj *pSql, const char *tableName, } static int32_t tscRebuildDDLForSuperTable(SSqlObj *pSql, const char *tableName, char *ddl) { char *result = ddl; - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); + SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableMeta * pMeta = pTableMetaInfo->pTableMeta; @@ -674,7 +674,7 @@ static int32_t tscRebuildDDLForSuperTable(SSqlObj *pSql, const char *tableName, } static int32_t tscProcessShowCreateTable(SSqlObj *pSql) { - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); + SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); assert(pTableMetaInfo->pTableMeta != NULL); @@ -700,7 +700,7 @@ static int32_t tscProcessShowCreateTable(SSqlObj *pSql) { } static int32_t tscProcessShowCreateDatabase(SSqlObj *pSql) { - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); + SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); @@ -727,7 +727,7 @@ static int32_t tscProcessShowCreateDatabase(SSqlObj *pSql) { return TSDB_CODE_TSC_ACTION_IN_PROGRESS; } static int32_t tscProcessCurrentUser(SSqlObj *pSql) { - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); + SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0); SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprList, 0); pExpr->resBytes = TSDB_USER_LEN + TSDB_DATA_TYPE_BINARY; @@ -754,7 +754,7 @@ static int32_t tscProcessCurrentDB(SSqlObj *pSql) { extractDBName(pSql->pTscObj->db, db); pthread_mutex_unlock(&pSql->pTscObj->mutex); - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex); + SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, pSql->cmd.clauseIndex); SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprList, 0); pExpr->resType = TSDB_DATA_TYPE_BINARY; @@ -781,7 +781,7 @@ static int32_t tscProcessCurrentDB(SSqlObj *pSql) { static int32_t tscProcessServerVer(SSqlObj *pSql) { const char* v = pSql->pTscObj->sversion; - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex); + SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, pSql->cmd.clauseIndex); SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprList, 0); pExpr->resType = TSDB_DATA_TYPE_BINARY; @@ -804,7 +804,7 @@ static int32_t tscProcessServerVer(SSqlObj *pSql) { } static int32_t tscProcessClientVer(SSqlObj *pSql) { - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); + SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0); SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprList, 0); pExpr->resType = TSDB_DATA_TYPE_BINARY; @@ -856,7 +856,7 @@ static int32_t tscProcessServStatus(SSqlObj *pSql) { return pSql->res.code; } - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); + SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0); SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprList, 0); int32_t val = 1; @@ -870,7 +870,7 @@ void tscSetLocalQueryResult(SSqlObj *pSql, const char *val, const char *columnNa pCmd->numOfCols = 1; - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); + SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex); pQueryInfo->order.order = TSDB_ORDER_ASC; tscFieldInfoClear(&pQueryInfo->fieldsInfo); diff --git a/src/client/src/tscLocalMerge.c b/src/client/src/tscLocalMerge.c index a44b0c46ba..66a1051a44 100644 --- a/src/client/src/tscLocalMerge.c +++ b/src/client/src/tscLocalMerge.c @@ -61,7 +61,7 @@ static void tscInitSqlContext(SSqlCmd *pCmd, SLocalMerger *pReducer, tOrderDescr * the fields and offset attributes in pCmd and pModel may be different due to * merge requirement. So, the final result in pRes structure is formatted in accordance with the pCmd object. */ - SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); + SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex); size_t size = tscSqlExprNumOfExprs(pQueryInfo); for (int32_t i = 0; i < size; ++i) { @@ -262,7 +262,7 @@ void tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrde #ifdef _DEBUG_VIEW printf("load data page into mem for build loser tree: %" PRIu64 " rows\n", ds->filePage.num); SSrcColumnInfo colInfo[256] = {0}; - SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); + SQueryInfo * pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex); tscGetSrcColumnInfo(colInfo, pQueryInfo); @@ -297,7 +297,7 @@ void tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrde param->pLocalData = pReducer->pLocalDataSrc; param->pDesc = pReducer->pDesc; param->num = pReducer->pLocalDataSrc[0]->pMemBuffer->numOfElemsPerPage; - SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); + SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex); param->groupOrderType = pQueryInfo->groupbyExpr.orderType; pReducer->orderPrjOnSTable = tscOrderedProjectionQueryOnSTable(pQueryInfo, 0); @@ -491,7 +491,7 @@ void tscDestroyLocalMerger(SSqlObj *pSql) { } SSqlCmd * pCmd = &pSql->cmd; - SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); + SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex); // there is no more result, so we release all allocated resource SLocalMerger *pLocalMerge = (SLocalMerger *)atomic_exchange_ptr(&pRes->pLocalMerger, NULL); @@ -545,7 +545,7 @@ void tscDestroyLocalMerger(SSqlObj *pSql) { static int32_t createOrderDescriptor(tOrderDescriptor **pOrderDesc, SSqlCmd *pCmd, SColumnModel *pModel) { int32_t numOfGroupByCols = 0; - SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); + SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex); if (pQueryInfo->groupbyExpr.numOfGroupCols > 0) { numOfGroupByCols = pQueryInfo->groupbyExpr.numOfGroupCols; @@ -608,7 +608,7 @@ static int32_t createOrderDescriptor(tOrderDescriptor **pOrderDesc, SSqlCmd *pCm } bool isSameGroup(SSqlCmd *pCmd, SLocalMerger *pReducer, char *pPrev, tFilePage *tmpBuffer) { - SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); + SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex); // disable merge procedure for column projection query int16_t functionId = pReducer->pCtx[0].functionId; @@ -659,7 +659,7 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr SColumnModel *pModel = NULL; *pFinalModel = NULL; - SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); + SQueryInfo * pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); (*pMemBuffer) = (tExtMemBuffer **)malloc(POINTER_BYTES * pSql->subState.numOfSub); @@ -949,7 +949,7 @@ static void doFillResult(SSqlObj *pSql, SLocalMerger *pLocalMerge, bool doneOutp SSqlRes *pRes = &pSql->res; tFilePage *pBeforeFillData = pLocalMerge->pResultBuf; - SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); + SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex); SFillInfo *pFillInfo = pLocalMerge->pFillInfo; // todo extract function @@ -1048,7 +1048,7 @@ static void savePreviousRow(SLocalMerger *pLocalMerge, tFilePage *tmpBuffer) { static void doExecuteFinalMerge(SSqlCmd *pCmd, SLocalMerger *pLocalMerge, bool needInit) { // the tag columns need to be set before all functions execution - SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); + SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex); size_t size = tscSqlExprNumOfExprs(pQueryInfo); for (int32_t j = 0; j < size; ++j) { @@ -1215,7 +1215,7 @@ static bool saveGroupResultInfo(SSqlObj *pSql) { SSqlCmd *pCmd = &pSql->cmd; SSqlRes *pRes = &pSql->res; - SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); + SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex); if (pRes->numOfRowsGroup > 0) { pRes->numOfGroups += 1; @@ -1244,7 +1244,7 @@ bool genFinalResults(SSqlObj *pSql, SLocalMerger *pLocalMerge, bool noMoreCurren SSqlCmd *pCmd = &pSql->cmd; SSqlRes *pRes = &pSql->res; - SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); + SQueryInfo * pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex); tFilePage * pResBuf = pLocalMerge->pResultBuf; SColumnModel *pModel = pLocalMerge->resColModel; @@ -1310,7 +1310,7 @@ static void resetEnvForNewResultset(SSqlRes *pRes, SSqlCmd *pCmd, SLocalMerger * pRes->numOfRows = 0; pRes->numOfRowsGroup = 0; - SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); + SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex); pQueryInfo->limit.offset = pLocalMerge->offset; @@ -1333,7 +1333,7 @@ static bool doBuildFilledResultForGroup(SSqlObj *pSql) { SSqlCmd *pCmd = &pSql->cmd; SSqlRes *pRes = &pSql->res; - SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); + SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex); SLocalMerger *pLocalMerge = pRes->pLocalMerger; SFillInfo *pFillInfo = pLocalMerge->pFillInfo; @@ -1364,7 +1364,7 @@ static bool doHandleLastRemainData(SSqlObj *pSql) { bool prevGroupCompleted = (!pLocalMerge->discard) && pLocalMerge->hasUnprocessedRow; - SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); + SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex); if ((isAllSourcesCompleted(pLocalMerge) && !pLocalMerge->hasPrevRow) || pLocalMerge->pLocalDataSrc[0] == NULL || prevGroupCompleted) { @@ -1405,7 +1405,7 @@ static void doProcessResultInNextWindow(SSqlObj *pSql, int32_t numOfRes) { SSqlRes *pRes = &pSql->res; SLocalMerger *pLocalMerge = pRes->pLocalMerger; - SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); + SQueryInfo * pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex); size_t size = tscSqlExprNumOfExprs(pQueryInfo); for (int32_t k = 0; k < size; ++k) { @@ -1437,7 +1437,7 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) { } SLocalMerger *pLocalMerge = pRes->pLocalMerger; - SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); + SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex); tFilePage *tmpBuffer = pLocalMerge->pTempBuffer; if (doHandleLastRemainData(pSql)) { diff --git a/src/client/src/tscParseInsert.c b/src/client/src/tscParseInsert.c index 2b962333d5..6c31f2e33a 100644 --- a/src/client/src/tscParseInsert.c +++ b/src/client/src/tscParseInsert.c @@ -759,7 +759,7 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql) { const int32_t STABLE_INDEX = 1; SSqlCmd * pCmd = &pSql->cmd; - SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); + SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, 0); char *sql = *sqlstr; @@ -1055,7 +1055,7 @@ int tsParseInsertSql(SSqlObj *pSql) { int32_t totalNum = 0; int32_t code = TSDB_CODE_SUCCESS; - SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); + SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, 0); assert(pQueryInfo != NULL); STableMetaInfo *pTableMetaInfo = (pQueryInfo->numOfTables == 0)? tscAddEmptyMetaInfo(pQueryInfo):tscGetMetaInfo(pQueryInfo, 0); @@ -1313,7 +1313,7 @@ int tsInsertInitialCheck(SSqlObj *pSql) { pCmd->count = 0; pCmd->command = TSDB_SQL_INSERT; - SQueryInfo *pQueryInfo = tscGetQueryInfoDetailSafely(pCmd, pCmd->clauseIndex); + SQueryInfo *pQueryInfo = tscGetQueryInfoS(pCmd, pCmd->clauseIndex); TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_INSERT | pCmd->insertType); @@ -1403,7 +1403,7 @@ static int doPackSendDataBlock(SSqlObj *pSql, int32_t numOfRows, STableDataBlock return code; } - return tscProcessSql(pSql); + return tscProcessSql(pSql, NULL); } typedef struct SImportFileSupport { diff --git a/src/client/src/tscPrepare.c b/src/client/src/tscPrepare.c index c5f06a52f3..0142c25f51 100644 --- a/src/client/src/tscPrepare.c +++ b/src/client/src/tscPrepare.c @@ -815,7 +815,7 @@ static int insertStmtExecute(STscStmt* stmt) { pRes->numOfRows = 0; pRes->numOfTotal = 0; - tscProcessSql(pSql); + tscProcessSql(pSql, NULL); // wait for the callback function to post the semaphore tsem_wait(&pSql->rspSem); diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index b724ee5750..2e918c09f2 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -77,23 +77,23 @@ static int32_t insertResultField(SQueryInfo* pQueryInfo, int32_t outputIndex, SC static uint8_t convertOptr(SStrToken *pToken); -static int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, SArray* pSelectList, bool isSTable, bool joinQuery, bool timeWindowQuery); +static int32_t validateSelectNodeList(SSqlCmd* pCmd, int32_t clauseIndex, SArray* pSelNodeList, bool isSTable, bool joinQuery, bool timeWindowQuery); static bool validateIpAddress(const char* ip, size_t size); static bool hasUnsupportFunctionsForSTableQuery(SSqlCmd* pCmd, SQueryInfo* pQueryInfo); static bool functionCompatibleCheck(SQueryInfo* pQueryInfo, bool joinQuery, bool twQuery); -static int32_t parseGroupbyClause(SQueryInfo* pQueryInfo, SArray* pList, SSqlCmd* pCmd); +static int32_t validateGroupbyNode(SQueryInfo* pQueryInfo, SArray* pList, SSqlCmd* pCmd); -static int32_t parseIntervalClause(SSqlObj* pSql, SQueryInfo* pQueryInfo, SQuerySqlNode* pQuerySqlNode); +static int32_t validateIntervalNode(SSqlObj* pSql, SQueryInfo* pQueryInfo, SQuerySqlNode* pQuerySqlNode); static int32_t parseIntervalOffset(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SStrToken* offsetToken); static int32_t parseSlidingClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SStrToken* pSliding); static int32_t addProjectionExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSqlExprItem* pItem); -static int32_t parseWhereClause(SQueryInfo* pQueryInfo, tSqlExpr** pExpr, SSqlObj* pSql); -static int32_t parseFillClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySqlNode* pQuerySQL); -static int32_t parseOrderbyClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySqlNode* pQuerySqlNode, SSchema* pSchema); +static int32_t validateWhereNode(SQueryInfo* pQueryInfo, tSqlExpr** pExpr, SSqlObj* pSql); +static int32_t validateFillNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySqlNode* pQuerySqlNode); +static int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySqlNode* pQuerySqlNode, SSchema* pSchema); static int32_t tsRewriteFieldNameIfNecessary(SSqlCmd* pCmd, SQueryInfo* pQueryInfo); static int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo); @@ -110,7 +110,7 @@ static bool validateOneTags(SSqlCmd* pCmd, TAOS_FIELD* pTagField); static bool hasTimestampForPointInterpQuery(SQueryInfo* pQueryInfo); static bool hasNormalColumnFilter(SQueryInfo* pQueryInfo); -static int32_t parseLimitClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t index, SQuerySqlNode* pQuerySqlNode, SSqlObj* pSql); +static int32_t validateLimitNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t index, SQuerySqlNode* pQuerySqlNode, SSqlObj* pSql); static int32_t parseCreateDBOptions(SSqlCmd* pCmd, SCreateDbInfo* pCreateDbSql); static int32_t getColumnIndexByName(SSqlCmd* pCmd, const SStrToken* pToken, SQueryInfo* pQueryInfo, SColumnIndex* pIndex); static int32_t getTableIndexByName(SStrToken* pToken, SQueryInfo* pQueryInfo, SColumnIndex* pIndex); @@ -125,9 +125,10 @@ static SColumnList getColumnList(int32_t num, int16_t tableIndex, int32_t column static int32_t doCheckForCreateTable(SSqlObj* pSql, int32_t subClauseIndex, SSqlInfo* pInfo); static int32_t doCheckForCreateFromStable(SSqlObj* pSql, SSqlInfo* pInfo); static int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo); -static int32_t doValidateSqlNode(SSqlObj* pSql, SQuerySqlNode* pQuerySqlNode, int32_t index); +static int32_t validateSqlNode(SSqlObj* pSql, SQuerySqlNode* pQuerySqlNode, int32_t index); static int32_t exprTreeFromSqlExpr(SSqlCmd* pCmd, tExprNode **pExpr, const tSqlExpr* pSqlExpr, SQueryInfo* pQueryInfo, SArray* pCols, int64_t *uid); static bool validateDebugFlag(int32_t v); +static int32_t checkQueryRangeForFill(SSqlCmd* pCmd, SQueryInfo* pQueryInfo); static bool isTimeWindowQuery(SQueryInfo* pQueryInfo) { return pQueryInfo->interval.interval > 0 || pQueryInfo->sessionWindow.gap > 0; @@ -257,7 +258,7 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) { return tscSQLSyntaxErrMsg(tscGetErrorMsgPayload(pCmd), NULL, pInfo->msg); } - SQueryInfo* pQueryInfo = tscGetQueryInfoDetailSafely(pCmd, pCmd->clauseIndex); + SQueryInfo* pQueryInfo = tscGetQueryInfoS(pCmd, pCmd->clauseIndex); if (pQueryInfo == NULL) { pRes->code = terrno; return pRes->code; @@ -617,8 +618,8 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) { const char* msg1 = "columns in select clause not identical"; for (int32_t i = pCmd->numOfClause; i < pInfo->subclauseInfo.numOfClause; ++i) { - SQueryInfo* pqi = tscGetQueryInfoDetailSafely(pCmd, i); - if (pqi == NULL) { + SQueryInfo* p = tscGetQueryInfoS(pCmd, i); + if (p == NULL) { pRes->code = terrno; return pRes->code; } @@ -628,23 +629,25 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) { for (int32_t i = pCmd->clauseIndex; i < pInfo->subclauseInfo.numOfClause; ++i) { SQuerySqlNode* pQuerySqlNode = pInfo->subclauseInfo.pClause[i]; tscTrace("%p start to parse %dth subclause, total:%d", pSql, i, pInfo->subclauseInfo.numOfClause); - if ((code = doValidateSqlNode(pSql, pQuerySqlNode, i)) != TSDB_CODE_SUCCESS) { + if ((code = validateSqlNode(pSql, pQuerySqlNode, i)) != TSDB_CODE_SUCCESS) { return code; } - tscPrintSelectClause(pSql, i); + tscPrintSelNodeList(pSql, i); pCmd->clauseIndex += 1; } // restore the clause index pCmd->clauseIndex = 0; + // set the command/global limit parameters from the first subclause to the sqlcmd object - SQueryInfo* pQueryInfo1 = tscGetQueryInfoDetail(pCmd, 0); + SQueryInfo* pQueryInfo1 = tscGetQueryInfo(pCmd, 0); pCmd->command = pQueryInfo1->command; // if there is only one element, the limit of clause is the limit of global result. + // validate the select node for "UNION ALL" subclause for (int32_t i = 1; i < pCmd->numOfClause; ++i) { - SQueryInfo* pQueryInfo2 = tscGetQueryInfoDetail(pCmd, i); + SQueryInfo* pQueryInfo2 = tscGetQueryInfo(pCmd, i); int32_t ret = tscFieldInfoCompare(&pQueryInfo1->fieldsInfo, &pQueryInfo2->fieldsInfo); if (ret != 0) { @@ -770,7 +773,7 @@ static int32_t checkInvalidExprForTimeWindow(SSqlCmd* pCmd, SQueryInfo* pQueryIn return addPrimaryTsColumnForTimeWindowQuery(pQueryInfo); } -int32_t parseIntervalClause(SSqlObj* pSql, SQueryInfo* pQueryInfo, SQuerySqlNode* pQuerySqlNode) { +int32_t validateIntervalNode(SSqlObj* pSql, SQueryInfo* pQueryInfo, SQuerySqlNode* pQuerySqlNode) { const char* msg2 = "interval cannot be less than 10 ms"; const char* msg3 = "sliding cannot be used without interval"; @@ -822,7 +825,7 @@ int32_t parseIntervalClause(SSqlObj* pSql, SQueryInfo* pQueryInfo, SQuerySqlNode return checkInvalidExprForTimeWindow(pCmd, pQueryInfo); } -int32_t parseSessionClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySqlNode * pQuerySqlNode) { +int32_t validateSessionNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySqlNode * pQuerySqlNode) { const char* msg1 = "gap should be fixed time window"; const char* msg2 = "only one type time window allowed"; const char* msg3 = "invalid column name"; @@ -1391,7 +1394,7 @@ static int32_t handleArithmeticExpr(SSqlCmd* pCmd, int32_t clauseIndex, int32_t const char* msg4 = "columns from different table mixed up in arithmetic expression"; // arithmetic function in select clause - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex); + SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd, clauseIndex); SColumnList columnList = {0}; int32_t arithmeticType = NON_ARITHMEIC_EXPR; @@ -1587,25 +1590,32 @@ bool isValidDistinctSql(SQueryInfo* pQueryInfo) { return false; } -int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, SArray* pSelectList, bool isSTable, bool joinQuery, bool timeWindowQuery) { - assert(pSelectList != NULL && pCmd != NULL); +int32_t validateSelectNodeList(SSqlCmd* pCmd, int32_t clauseIndex, SArray* pSelNodeList, bool isSTable, bool joinQuery, + bool timeWindowQuery) { + assert(pSelNodeList != NULL && pCmd != NULL); + const char* msg1 = "too many items in selection clause"; const char* msg2 = "functions or others can not be mixed up"; const char* msg3 = "not support query expression"; + const char* msg4 = "only support distinct one tag"; const char* msg5 = "invalid function name"; - const char* msg6 = "only support distinct one tag"; - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex); + SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd, clauseIndex); + + // too many result columns not support order by in query + if (taosArrayGetSize(pSelNodeList) > TSDB_MAX_COLUMNS) { + return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1); + } if (pQueryInfo->colList == NULL) { pQueryInfo->colList = taosArrayInit(4, POINTER_BYTES); } bool hasDistinct = false; - size_t numOfExpr = taosArrayGetSize(pSelectList); + size_t numOfExpr = taosArrayGetSize(pSelNodeList); for (int32_t i = 0; i < numOfExpr; ++i) { int32_t outputIndex = (int32_t)tscSqlExprNumOfExprs(pQueryInfo); - tSqlExprItem* pItem = taosArrayGet(pSelectList, i); + tSqlExprItem* pItem = taosArrayGet(pSelNodeList, i); if (hasDistinct == false) { hasDistinct = (pItem->distinct == true); @@ -1644,7 +1654,7 @@ int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, SArray* pSelectLis if (hasDistinct == true) { if (!isValidDistinctSql(pQueryInfo)) { - return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg6); + return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg4); } pQueryInfo->distinctTag = true; } @@ -1800,7 +1810,9 @@ int32_t addProjectionExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, t } // add the primary timestamp column even though it is not required by user - tscInsertPrimaryTsSourceColumn(pQueryInfo, &index); + if (pQueryInfo->pTableMetaInfo[index.tableIndex]->pTableMeta->tableType != TSDB_TEMP_TABLE) { + tscInsertPrimaryTsSourceColumn(pQueryInfo, &index); + } } else if (optr == TK_STRING || optr == TK_INTEGER || optr == TK_FLOAT) { // simple column projection query SColumnIndex index = COLUMN_INDEX_INITIALIZER; @@ -2939,7 +2951,7 @@ static bool functionCompatibleCheck(SQueryInfo* pQueryInfo, bool joinQuery, bool return true; } -int32_t parseGroupbyClause(SQueryInfo* pQueryInfo, SArray* pList, SSqlCmd* pCmd) { +int32_t validateGroupbyNode(SQueryInfo* pQueryInfo, SArray* pList, SSqlCmd* pCmd) { const char* msg1 = "too many columns in group by clause"; const char* msg2 = "invalid column name in group by clause"; const char* msg3 = "columns from one table allowed as group by columns"; @@ -4239,7 +4251,7 @@ static int32_t getTagQueryCondExpr(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SCondE return ret; } -int32_t parseWhereClause(SQueryInfo* pQueryInfo, tSqlExpr** pExpr, SSqlObj* pSql) { +int32_t validateWhereNode(SQueryInfo* pQueryInfo, tSqlExpr** pExpr, SSqlObj* pSql) { if (pExpr == NULL) { return TSDB_CODE_SUCCESS; } @@ -4433,16 +4445,34 @@ int32_t tsRewriteFieldNameIfNecessary(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) { return TSDB_CODE_SUCCESS; } -int32_t parseFillClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySqlNode* pQuerySQL) { - SArray* pFillToken = pQuerySQL->fillType; +int32_t validateFillNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySqlNode* pQuerySqlNode) { + SArray* pFillToken = pQuerySqlNode->fillType; + if (pQuerySqlNode->fillType == NULL) { + return TSDB_CODE_SUCCESS; + } + tVariantListItem* pItem = taosArrayGet(pFillToken, 0); const int32_t START_INTERPO_COL_IDX = 1; - const char* msg = "illegal value or data overflow"; const char* msg1 = "value is expected"; const char* msg2 = "invalid fill option"; const char* msg3 = "top/bottom not support fill"; + const char* msg4 = "illegal value or data overflow"; + const char* msg5 = "fill only available for interval query"; + + if ((!isTimeWindowQuery(pQueryInfo)) && (!tscIsPointInterpQuery(pQueryInfo))) { + return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg5); + } + + /* + * fill options are set at the end position, when all columns are set properly + * the columns may be increased due to group by operation + */ + if (checkQueryRangeForFill(pCmd, pQueryInfo) != TSDB_CODE_SUCCESS) { + return TSDB_CODE_TSC_INVALID_SQL; + } + if (pItem->pVar.nType != TSDB_DATA_TYPE_BINARY) { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2); @@ -4506,7 +4536,7 @@ int32_t parseFillClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySqlNode* pQ tVariant* p = taosArrayGet(pFillToken, j); int32_t ret = tVariantDump(p, (char*)&pQueryInfo->fillVal[i], pField->type, true); if (ret != TSDB_CODE_SUCCESS) { - return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg); + return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg4); } } @@ -4555,7 +4585,7 @@ static void setDefaultOrderInfo(SQueryInfo* pQueryInfo) { } } -int32_t parseOrderbyClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySqlNode* pQuerySqlNode, SSchema* pSchema) { +int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySqlNode* pQuerySqlNode, SSchema* pSchema) { const char* msg0 = "only support order by primary timestamp"; const char* msg1 = "invalid column name"; const char* msg2 = "order by primary timestamp or first tag in groupby clause allowed"; @@ -4759,7 +4789,7 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) { SSqlCmd* pCmd = &pSql->cmd; SAlterTableInfo* pAlterSQL = pInfo->pAlterInfo; - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); + SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd, 0); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, DEFAULT_TABLE_INDEX); @@ -5250,7 +5280,7 @@ bool hasTimestampForPointInterpQuery(SQueryInfo* pQueryInfo) { return (pQueryInfo->window.skey == pQueryInfo->window.ekey) && (pQueryInfo->window.skey != 0); } -int32_t parseLimitClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t clauseIndex, SQuerySqlNode* pQuerySqlNode, SSqlObj* pSql) { +int32_t validateLimitNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t clauseIndex, SQuerySqlNode* pQuerySqlNode, SSqlObj* pSql) { STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); const char* msg0 = "soffset/offset can not be less than 0"; @@ -5445,10 +5475,10 @@ int32_t parseCreateDBOptions(SSqlCmd* pCmd, SCreateDbInfo* pCreateDbSql) { } void addGroupInfoForSubquery(SSqlObj* pParentObj, SSqlObj* pSql, int32_t subClauseIndex, int32_t tableIndex) { - SQueryInfo* pParentQueryInfo = tscGetQueryInfoDetail(&pParentObj->cmd, subClauseIndex); + SQueryInfo* pParentQueryInfo = tscGetQueryInfo(&pParentObj->cmd, subClauseIndex); if (pParentQueryInfo->groupbyExpr.numOfGroupCols > 0) { - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, subClauseIndex); + SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, subClauseIndex); SSqlExpr* pExpr = NULL; size_t size = taosArrayGetSize(pQueryInfo->exprList); @@ -5924,7 +5954,7 @@ int32_t doLocalQueryProcess(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySqlNode const char* msg2 = "invalid expression in select clause"; const char* msg3 = "invalid function"; - SArray* pExprList = pQuerySqlNode->pSelectList; + SArray* pExprList = pQuerySqlNode->pSelNodeList; size_t size = taosArrayGetSize(pExprList); if (size != 1) { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1); @@ -6070,8 +6100,8 @@ int32_t tscCheckCreateDbParams(SSqlCmd* pCmd, SCreateDbMsg* pCreate) { } // for debug purpose -void tscPrintSelectClause(SSqlObj* pSql, int32_t subClauseIndex) { - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, subClauseIndex); +void tscPrintSelNodeList(SSqlObj* pSql, int32_t subClauseIndex) { + SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, subClauseIndex); int32_t size = (int32_t)tscSqlExprNumOfExprs(pQueryInfo); if (size == 0) { @@ -6112,7 +6142,7 @@ int32_t doCheckForCreateTable(SSqlObj* pSql, int32_t subClauseIndex, SSqlInfo* p const char* msg1 = "invalid table name"; SSqlCmd* pCmd = &pSql->cmd; - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, subClauseIndex); + SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd, subClauseIndex); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); SCreateTableSql* pCreateTable = pInfo->pCreateTableInfo; @@ -6171,7 +6201,7 @@ int32_t doCheckForCreateFromStable(SSqlObj* pSql, SSqlInfo* pInfo) { SSqlCmd* pCmd = &pSql->cmd; SCreateTableSql* pCreateTable = pInfo->pCreateTableInfo; - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); + SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd, 0); // two table: the first one is for current table, and the secondary is for the super table. if (pQueryInfo->numOfTables < 2) { @@ -6376,7 +6406,7 @@ int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) { const char* msg7 = "time interval is required"; SSqlCmd* pCmd = &pSql->cmd; - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); + SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd, 0); assert(pQueryInfo->numOfTables == 1); SCreateTableSql* pCreateTable = pInfo->pCreateTableInfo; @@ -6412,18 +6442,18 @@ int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) { } bool isSTable = UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo); - if (parseSelectClause(&pSql->cmd, 0, pQuerySqlNode->pSelectList, isSTable, false, false) != TSDB_CODE_SUCCESS) { + if (validateSelectNodeList(&pSql->cmd, 0, pQuerySqlNode->pSelNodeList, isSTable, false, false) != TSDB_CODE_SUCCESS) { return TSDB_CODE_TSC_INVALID_SQL; } if (pQuerySqlNode->pWhere != NULL) { // query condition in stream computing - if (parseWhereClause(pQueryInfo, &pQuerySqlNode->pWhere, pSql) != TSDB_CODE_SUCCESS) { + if (validateWhereNode(pQueryInfo, &pQuerySqlNode->pWhere, pSql) != TSDB_CODE_SUCCESS) { return TSDB_CODE_TSC_INVALID_SQL; } } // set interval value - if (parseIntervalClause(pSql, pQueryInfo, pQuerySqlNode) != TSDB_CODE_SUCCESS) { + if (validateIntervalNode(pSql, pQueryInfo, pQuerySqlNode) != TSDB_CODE_SUCCESS) { return TSDB_CODE_TSC_INVALID_SQL; } @@ -6478,7 +6508,7 @@ int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) { return TSDB_CODE_SUCCESS; } -static int32_t checkQueryRangeForFill(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) { +int32_t checkQueryRangeForFill(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) { const char* msg3 = "start(end) time of query range required or time range too large"; if (pQueryInfo->interval.interval == 0) { @@ -6513,24 +6543,107 @@ static int32_t checkQueryRangeForFill(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) { return TSDB_CODE_SUCCESS; } -int32_t doValidateSqlNode(SSqlObj* pSql, SQuerySqlNode* pQuerySqlNode, int32_t index) { +static int32_t doLoadAllTableMeta(SSqlObj* pSql, int32_t index, SQuerySqlNode* pQuerySqlNode, int32_t numOfTables) { + const char* msg1 = "invalid table name"; + const char* msg2 = "invalid table alias name"; + const char* msg3 = "alias name too long"; + + int32_t code = TSDB_CODE_SUCCESS; + + SSqlCmd* pCmd = &pSql->cmd; + SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd, index); + + for (int32_t i = 0; i < numOfTables; ++i) { + if (pQueryInfo->numOfTables <= i) { // more than one table + tscAddEmptyMetaInfo(pQueryInfo); + } + + STableNamePair *item = taosArrayGet(pQuerySqlNode->from->tableList, i); + SStrToken *oriName = &item->name; + + if (oriName->type == TK_INTEGER || oriName->type == TK_FLOAT) { + return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1); + } + + tscDequoteAndTrimToken(oriName); + if (tscValidateName(oriName) != TSDB_CODE_SUCCESS) { + return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1); + } + + STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, i); + code = tscSetTableFullName(pTableMetaInfo, oriName, pSql); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + SStrToken* aliasName = &item->aliasName; + if (TPARSER_HAS_TOKEN(*aliasName)) { + if (aliasName->type == TK_INTEGER || aliasName->type == TK_FLOAT) { + return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2); + } + + tscDequoteAndTrimToken(aliasName); + if (tscValidateName(aliasName) != TSDB_CODE_SUCCESS) { + return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3); + } + + if (aliasName->n >= TSDB_TABLE_NAME_LEN) { + return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3); + } + + strncpy(pTableMetaInfo->aliasName, aliasName->z, aliasName->n); + } else { + strncpy(pTableMetaInfo->aliasName, tNameGetTableName(&pTableMetaInfo->name), + tListLen(pTableMetaInfo->aliasName)); + } + + code = tscGetTableMeta(pSql, pTableMetaInfo); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + } + + return TSDB_CODE_SUCCESS; +} + +static STableMeta* extractTempTableMetaFromNestQuery(SQueryInfo* pUpstream) { + int32_t numOfColumns = pUpstream->fieldsInfo.numOfOutput; + + STableMeta* meta = calloc(1, sizeof(STableMeta) + sizeof(SSchema) * numOfColumns); + meta->tableType = TSDB_TEMP_TABLE; + + STableComInfo *info = &meta->tableInfo; + info->numOfColumns = numOfColumns; + info->numOfTags = 0; + + int32_t n = 0; + for(int32_t i = 0; i < numOfColumns; ++i) { + SInternalField* pField = tscFieldInfoGetInternalField(&pUpstream->fieldsInfo, i); + if (pField->visible) { + meta->schema[n].bytes = pField->field.bytes; + meta->schema[n].type = pField->field.type; + meta->schema[n].colId = pField->pSqlExpr->resColId; + tstrncpy(meta->schema[n].name, pField->pSqlExpr->aliasName, TSDB_COL_NAME_LEN); + n += 1; + } + } + + return meta; +} + +int32_t validateSqlNode(SSqlObj* pSql, SQuerySqlNode* pQuerySqlNode, int32_t index) { assert(pQuerySqlNode != NULL && (pQuerySqlNode->from == NULL || taosArrayGetSize(pQuerySqlNode->from->tableList) > 0)); - const char* msg0 = "invalid table name"; const char* msg1 = "point interpolation query needs timestamp"; - const char* msg2 = "fill only available for interval query"; + const char* msg2 = "too many tables in from clause"; const char* msg3 = "start(end) time of query range required or time range too large"; const char* msg4 = "illegal number of tables in from clause"; - const char* msg5 = "too many columns in selection clause"; - const char* msg6 = "too many tables in from clause"; - const char* msg7 = "invalid table alias name"; - const char* msg8 = "alias name too long"; int32_t code = TSDB_CODE_SUCCESS; SSqlCmd* pCmd = &pSql->cmd; - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, index); + SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd, index); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); if (pTableMetaInfo == NULL) { pTableMetaInfo = tscAddEmptyMetaInfo(pQueryInfo); @@ -6538,11 +6651,6 @@ int32_t doValidateSqlNode(SSqlObj* pSql, SQuerySqlNode* pQuerySqlNode, int32_t i assert(pCmd->clauseIndex == index); - // too many result columns not support order by in query - if (taosArrayGetSize(pQuerySqlNode->pSelectList) > TSDB_MAX_COLUMNS) { - return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg5); - } - /* * handle the sql expression without from subclause * select current_database(); @@ -6556,179 +6664,164 @@ int32_t doValidateSqlNode(SSqlObj* pSql, SQuerySqlNode* pQuerySqlNode, int32_t i return doLocalQueryProcess(pCmd, pQueryInfo, pQuerySqlNode); } - size_t fromSize = taosArrayGetSize(pQuerySqlNode->from->tableList); - if (fromSize > TSDB_MAX_JOIN_TABLE_NUM) { - return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg4); + //validate the subquery recursively + if (pQuerySqlNode->from->type == SQL_NODE_FROM_SUBQUERY) { + for(int32_t j = 0; j < pQuerySqlNode->from->pNode.numOfClause; ++j) { + int32_t ret = validateSqlNode(pSql, pQuerySqlNode->from->pNode.pClause[j], 0); + if (ret != TSDB_CODE_SUCCESS) { + return ret; + } + } + + SQueryInfo* pQueryInfo1 = calloc(1, sizeof(SQueryInfo)); + tscInitQueryInfo(pQueryInfo1); + + pQueryInfo1->pUpstream = taosArrayInit(4, POINTER_BYTES); + for(int32_t x = 0; x < pCmd->numOfClause; ++x) { + taosArrayPush(pQueryInfo1->pUpstream, &pCmd->pQueryInfo[x]); + } + + pQueryInfo1->numOfTables = 1; + pCmd->numOfClause = 1; + pQueryInfo1->command = TSDB_SQL_SELECT; + + pCmd->pQueryInfo[0] = pQueryInfo1; + } + + size_t numOfTables = 0; + if (pQuerySqlNode->from->type == SQL_NODE_FROM_SUBQUERY) { + numOfTables = 1; + } else { + numOfTables = taosArrayGetSize(pQuerySqlNode->from->tableList); + if (numOfTables > TSDB_MAX_JOIN_TABLE_NUM) { + return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg4); + } } pQueryInfo->command = TSDB_SQL_SELECT; - if (fromSize > 2) { - return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg6); + if (numOfTables > 2) { + return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2); } - // set all query tables, which are maybe more than one. - for (int32_t i = 0; i < fromSize; ++i) { - STableNamePair* item = taosArrayGet(pQuerySqlNode->from->tableList, i); - SStrToken* pTableItem = &item->name; + if (pQuerySqlNode->from->type == SQL_NODE_FROM_SUBQUERY) { + // support only one nestquery + pQueryInfo = pCmd->pQueryInfo[0]; - if (pTableItem->type != TSDB_DATA_TYPE_BINARY) { - return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg0); + STableMetaInfo* pTableMetaInfo1 = calloc(1, sizeof(STableMetaInfo)); + STableMeta* pTableMeta = extractTempTableMetaFromNestQuery(taosArrayGetP(pQueryInfo->pUpstream, 0)); + pTableMetaInfo1->pTableMeta = pTableMeta; + + pQueryInfo->pTableMetaInfo = calloc(1, POINTER_BYTES); + pQueryInfo->pTableMetaInfo[0] = pTableMetaInfo1; + + // parse the group by clause in the first place + if (validateSelectNodeList(pCmd, index, pQuerySqlNode->pSelNodeList, false, false, false) != TSDB_CODE_SUCCESS) { + return TSDB_CODE_TSC_INVALID_SQL; } - tscDequoteAndTrimToken(pTableItem); - - SStrToken tableName = {.z = pTableItem->z, .n = pTableItem->n, .type = TK_STRING}; - if (tscValidateName(&tableName) != TSDB_CODE_SUCCESS) { - return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg0); - } - - if (pQueryInfo->numOfTables <= i) { // more than one table - tscAddEmptyMetaInfo(pQueryInfo); - } - - STableMetaInfo* pTableMetaInfo1 = tscGetMetaInfo(pQueryInfo, i); - code = tscSetTableFullName(pTableMetaInfo1, pTableItem, pSql); + } else { + // set all query tables, which are maybe more than one. + code = doLoadAllTableMeta(pSql, index, pQuerySqlNode, numOfTables); if (code != TSDB_CODE_SUCCESS) { return code; } - SStrToken* aliasName = &item->aliasName; - if (TPARSER_HAS_TOKEN(*aliasName)) { - if (aliasName->type != TSDB_DATA_TYPE_BINARY) { - return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg7); + bool isSTable = UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo); + if (isSTable) { + code = tscGetSTableVgroupInfo(pSql, index); // TODO refactor: getTablemeta along with vgroupInfo + if (code != TSDB_CODE_SUCCESS) { + return code; } - tscDequoteAndTrimToken(aliasName); - - SStrToken aliasName1 = {.z = aliasName->z, .n = aliasName->n, .type = TK_STRING}; - if (tscValidateName(&aliasName1) != TSDB_CODE_SUCCESS) { - return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg7); - } - - if (aliasName1.n >= TSDB_TABLE_NAME_LEN) { - return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg8); - } - - strncpy(pTableMetaInfo1->aliasName, aliasName1.z, aliasName1.n); + TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_STABLE_QUERY); } else { - strncpy(pTableMetaInfo1->aliasName, tNameGetTableName(&pTableMetaInfo1->name), tListLen(pTableMetaInfo1->aliasName)); + TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_TABLE_QUERY); } - code = tscGetTableMeta(pSql, pTableMetaInfo1); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - } - - assert(pQueryInfo->numOfTables == taosArrayGetSize(pQuerySqlNode->from->tableList)); - bool isSTable = false; - - if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { - isSTable = true; - code = tscGetSTableVgroupInfo(pSql, index); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - - TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_STABLE_QUERY); - } else { - TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_TABLE_QUERY); - } - - // parse the group by clause in the first place - if (parseGroupbyClause(pQueryInfo, pQuerySqlNode->pGroupby, pCmd) != TSDB_CODE_SUCCESS) { - return TSDB_CODE_TSC_INVALID_SQL; - } - - // set where info - STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta); - - if (pQuerySqlNode->pWhere != NULL) { - if (parseWhereClause(pQueryInfo, &pQuerySqlNode->pWhere, pSql) != TSDB_CODE_SUCCESS) { + // parse the group by clause in the first place + if (validateGroupbyNode(pQueryInfo, pQuerySqlNode->pGroupby, pCmd) != TSDB_CODE_SUCCESS) { return TSDB_CODE_TSC_INVALID_SQL; } - pQuerySqlNode->pWhere = NULL; - if (tinfo.precision == TSDB_TIME_PRECISION_MILLI) { - pQueryInfo->window.skey = pQueryInfo->window.skey / 1000; - pQueryInfo->window.ekey = pQueryInfo->window.ekey / 1000; + // set where info + STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta); + + if (pQuerySqlNode->pWhere != NULL) { + if (validateWhereNode(pQueryInfo, &pQuerySqlNode->pWhere, pSql) != TSDB_CODE_SUCCESS) { + return TSDB_CODE_TSC_INVALID_SQL; + } + + pQuerySqlNode->pWhere = NULL; + if (tinfo.precision == TSDB_TIME_PRECISION_MILLI) { + pQueryInfo->window.skey = pQueryInfo->window.skey / 1000; + pQueryInfo->window.ekey = pQueryInfo->window.ekey / 1000; + } + } else { // set the time rang + if (taosArrayGetSize(pQuerySqlNode->from->tableList) > 1) { + // If it is a join query, no where clause is not allowed. + return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), "condition missing for join query "); + } } - } else { // set the time rang - if (taosArrayGetSize(pQuerySqlNode->from->tableList) > 1) { // it is a join query, no where clause is not allowed. - return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), "condition missing for join query "); - } - } - int32_t joinQuery = (pQuerySqlNode->from != NULL && taosArrayGetSize(pQuerySqlNode->from->tableList) > 1); - int32_t timeWindowQuery = - (TPARSER_HAS_TOKEN(pQuerySqlNode->interval.interval) || TPARSER_HAS_TOKEN(pQuerySqlNode->sessionVal.gap)); + int32_t joinQuery = (pQuerySqlNode->from != NULL && taosArrayGetSize(pQuerySqlNode->from->tableList) > 1); + int32_t timeWindowQuery = + (TPARSER_HAS_TOKEN(pQuerySqlNode->interval.interval) || TPARSER_HAS_TOKEN(pQuerySqlNode->sessionVal.gap)); - if (parseSelectClause(pCmd, index, pQuerySqlNode->pSelectList, isSTable, joinQuery, timeWindowQuery) != TSDB_CODE_SUCCESS) { - return TSDB_CODE_TSC_INVALID_SQL; - } - - // set order by info - if (parseOrderbyClause(pCmd, pQueryInfo, pQuerySqlNode, tscGetTableSchema(pTableMetaInfo->pTableMeta)) != TSDB_CODE_SUCCESS) { - return TSDB_CODE_TSC_INVALID_SQL; - } - - // set interval value - if (parseIntervalClause(pSql, pQueryInfo, pQuerySqlNode) != TSDB_CODE_SUCCESS) { - return TSDB_CODE_TSC_INVALID_SQL; - } else { - if (isTimeWindowQuery(pQueryInfo) && - (validateFunctionsInIntervalOrGroupbyQuery(pCmd, pQueryInfo) != TSDB_CODE_SUCCESS)) { + if (validateSelectNodeList(pCmd, index, pQuerySqlNode->pSelNodeList, isSTable, joinQuery, timeWindowQuery) != + TSDB_CODE_SUCCESS) { return TSDB_CODE_TSC_INVALID_SQL; } - } - if (parseSessionClause(pCmd, pQueryInfo, pQuerySqlNode) != TSDB_CODE_SUCCESS) { - return TSDB_CODE_TSC_INVALID_SQL; - } - - // no result due to invalid query time range - if (pQueryInfo->window.skey > pQueryInfo->window.ekey) { - pQueryInfo->command = TSDB_SQL_RETRIEVE_EMPTY_RESULT; - return TSDB_CODE_SUCCESS; - } - - if (!hasTimestampForPointInterpQuery(pQueryInfo)) { - return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1); - } - - // in case of join query, time range is required. - if (QUERY_IS_JOIN_QUERY(pQueryInfo->type)) { - int64_t timeRange = ABS(pQueryInfo->window.skey - pQueryInfo->window.ekey); - if (timeRange == 0 && pQueryInfo->window.skey == 0) { - return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3); - } - } - - if ((code = parseLimitClause(pCmd, pQueryInfo, index, pQuerySqlNode, pSql)) != TSDB_CODE_SUCCESS) { - return code; - } - - if ((code = doFunctionsCompatibleCheck(pCmd, pQueryInfo)) != TSDB_CODE_SUCCESS) { - return code; - } - - updateLastScanOrderIfNeeded(pQueryInfo); - tscFieldInfoUpdateOffset(pQueryInfo); - - if (pQuerySqlNode->fillType != NULL) { - if (pQueryInfo->interval.interval == 0 && (!tscIsPointInterpQuery(pQueryInfo))) { - return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2); + // set order by info + if (validateOrderbyNode(pCmd, pQueryInfo, pQuerySqlNode, tscGetTableSchema(pTableMetaInfo->pTableMeta)) != + TSDB_CODE_SUCCESS) { + return TSDB_CODE_TSC_INVALID_SQL; } - /* - * fill options are set at the end position, when all columns are set properly - * the columns may be increased due to group by operation - */ - if ((code = checkQueryRangeForFill(pCmd, pQueryInfo)) != TSDB_CODE_SUCCESS) { + // set interval value + if (validateIntervalNode(pSql, pQueryInfo, pQuerySqlNode) != TSDB_CODE_SUCCESS) { + return TSDB_CODE_TSC_INVALID_SQL; + } else { + if (isTimeWindowQuery(pQueryInfo) && + (validateFunctionsInIntervalOrGroupbyQuery(pCmd, pQueryInfo) != TSDB_CODE_SUCCESS)) { + return TSDB_CODE_TSC_INVALID_SQL; + } + } + + if (validateSessionNode(pCmd, pQueryInfo, pQuerySqlNode) != TSDB_CODE_SUCCESS) { + return TSDB_CODE_TSC_INVALID_SQL; + } + + // no result due to invalid query time range + if (pQueryInfo->window.skey > pQueryInfo->window.ekey) { + pQueryInfo->command = TSDB_SQL_RETRIEVE_EMPTY_RESULT; + return TSDB_CODE_SUCCESS; + } + + if (!hasTimestampForPointInterpQuery(pQueryInfo)) { + return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1); + } + + // in case of join query, time range is required. + if (QUERY_IS_JOIN_QUERY(pQueryInfo->type)) { + int64_t timeRange = ABS(pQueryInfo->window.skey - pQueryInfo->window.ekey); + if (timeRange == 0 && pQueryInfo->window.skey == 0) { + return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3); + } + } + + if ((code = validateLimitNode(pCmd, pQueryInfo, index, pQuerySqlNode, pSql)) != TSDB_CODE_SUCCESS) { return code; } - if ((code = parseFillClause(pCmd, pQueryInfo, pQuerySqlNode)) != TSDB_CODE_SUCCESS) { + if ((code = doFunctionsCompatibleCheck(pCmd, pQueryInfo)) != TSDB_CODE_SUCCESS) { + return code; + } + + updateLastScanOrderIfNeeded(pQueryInfo); + tscFieldInfoUpdateOffset(pQueryInfo); + + if ((code = validateFillNode(pCmd, pQueryInfo, pQuerySqlNode)) != TSDB_CODE_SUCCESS) { return code; } } diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 73a324f521..ae58d67628 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -237,7 +237,7 @@ void tscProcessActivityTimer(void *handle, void *tmrId) { assert(pHB->self == pObj->hbrid); pHB->retry = 0; - int32_t code = tscProcessSql(pHB); + int32_t code = tscProcessSql(pHB, NULL); taosReleaseRef(tscObjRef, pObj->hbrid); if (code != TSDB_CODE_SUCCESS) { @@ -302,7 +302,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { return; } - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); + SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd, 0); if (pQueryInfo != NULL && pQueryInfo->type == TSDB_QUERY_TYPE_FREE_RESOURCE) { tscDebug("%p sqlObj needs to be released or DB connection is closed, cmd:%d type:%d, pObj:%p signature:%p", pSql, pCmd->command, pQueryInfo->type, pObj, pObj->signature); @@ -469,13 +469,16 @@ int doProcessSql(SSqlObj *pSql) { return TSDB_CODE_SUCCESS; } -int tscProcessSql(SSqlObj *pSql) { +int tscProcessSql(SSqlObj *pSql, SQueryInfo* pQueryInfo) { char name[TSDB_TABLE_FNAME_LEN] = {0}; SSqlCmd *pCmd = &pSql->cmd; uint32_t type = 0; - SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); + if (pQueryInfo == NULL) { + pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex); + } + STableMetaInfo *pTableMetaInfo = NULL; if (pQueryInfo != NULL) { @@ -506,7 +509,7 @@ int tscProcessSql(SSqlObj *pSql) { int tscBuildFetchMsg(SSqlObj *pSql, SSqlInfo *pInfo) { SRetrieveTableMsg *pRetrieveMsg = (SRetrieveTableMsg *) pSql->cmd.payload; - SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex); + SQueryInfo *pQueryInfo = tscGetActiveQueryInfo(&pSql->cmd); pRetrieveMsg->free = htons(pQueryInfo->type); pRetrieveMsg->qhandle = htobe64(pSql->res.qhandle); @@ -545,7 +548,7 @@ int tscBuildFetchMsg(SSqlObj *pSql, SSqlInfo *pInfo) { } int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) { - SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); + SQueryInfo *pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0); STableMeta* pTableMeta = tscGetMetaInfo(pQueryInfo, 0)->pTableMeta; char* pMsg = pSql->cmd.payload; @@ -584,7 +587,7 @@ static int32_t tscEstimateQueryMsgSize(SSqlObj *pSql, int32_t clauseIndex) { const static int32_t MIN_QUERY_MSG_PKT_SIZE = TSDB_MAX_BYTES_PER_ROW * 5; SSqlCmd* pCmd = &pSql->cmd; - SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex); + SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, clauseIndex); int32_t srcColListSize = (int32_t)(taosArrayGetSize(pQueryInfo->colList) * sizeof(SColumnInfo)); @@ -614,7 +617,9 @@ static int32_t tscEstimateQueryMsgSize(SSqlObj *pSql, int32_t clauseIndex) { } static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char *pMsg) { - STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, pSql->cmd.clauseIndex, 0); + SQueryInfo* pQueryInfo = tscGetActiveQueryInfo(&pSql->cmd); + STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); + TSKEY dfltKey = htobe64(pQueryMsg->window.skey); STableMeta * pTableMeta = pTableMetaInfo->pTableMeta; @@ -698,7 +703,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { return TSDB_CODE_TSC_INVALID_SQL; // todo add test for this } - SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); + SQueryInfo *pQueryInfo = tscGetActiveQueryInfo(pCmd); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableMeta * pTableMeta = pTableMetaInfo->pTableMeta; @@ -1371,7 +1376,7 @@ int tscBuildCreateTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) { SSchema *pSchema; SSqlCmd *pCmd = &pSql->cmd; - SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); + SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, 0); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); // Reallocate the payload size @@ -1460,7 +1465,7 @@ int tscBuildCreateTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) { } int tscEstimateAlterTableMsgLength(SSqlCmd *pCmd) { - SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); + SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, 0); return minMsgSize() + sizeof(SAlterTableMsg) + sizeof(SSchema) * tscNumOfFields(pQueryInfo) + TSDB_EXTRA_PAYLOAD_SIZE; } @@ -1469,7 +1474,7 @@ int tscBuildAlterTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) { int msgLen = 0; SSqlCmd *pCmd = &pSql->cmd; - SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); + SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, 0); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); @@ -1518,7 +1523,7 @@ int tscBuildUpdateTagMsg(SSqlObj* pSql, SSqlInfo *pInfo) { SUpdateTableTagValMsg* pUpdateMsg = (SUpdateTableTagValMsg*) pCmd->payload; pCmd->payloadLen = htonl(pUpdateMsg->head.contLen); - SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); + SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, 0); STableMeta *pTableMeta = tscGetMetaInfo(pQueryInfo, 0)->pTableMeta; SNewVgroupInfo vgroupInfo = {.vgId = -1}; @@ -1554,7 +1559,7 @@ int tscBuildRetrieveFromMgmtMsg(SSqlObj *pSql, SSqlInfo *pInfo) { return TSDB_CODE_TSC_OUT_OF_MEMORY; } - SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); + SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, 0); SRetrieveTableMsg *pRetrieveMsg = (SRetrieveTableMsg*)pCmd->payload; pRetrieveMsg->qhandle = htobe64(pSql->res.qhandle); pRetrieveMsg->free = htons(pQueryInfo->type); @@ -1578,7 +1583,7 @@ static int tscLocalResultCommonBuilder(SSqlObj *pSql, int32_t numOfRes) { pRes->row = 0; pRes->rspType = 1; - SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); + SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex); if (tscCreateResPointerInfo(pRes, pQueryInfo) != TSDB_CODE_SUCCESS) { return pRes->code; } @@ -1629,7 +1634,7 @@ int tscProcessRetrieveLocalMergeRsp(SSqlObj *pSql) { pRes->code = tscDoLocalMerge(pSql); if (pRes->code == TSDB_CODE_SUCCESS && pRes->numOfRows > 0) { - SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); + SQueryInfo *pQueryInfo = tscGetActiveQueryInfo(pCmd); tscCreateResPointerInfo(pRes, pQueryInfo); tscSetResRawPtr(pRes, pQueryInfo); } @@ -1683,7 +1688,7 @@ int tscBuildConnectMsg(SSqlObj *pSql, SSqlInfo *pInfo) { int tscBuildTableMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) { SSqlCmd *pCmd = &pSql->cmd; - SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); + SQueryInfo *pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableInfoMsg *pInfoMsg = (STableInfoMsg *)pCmd->payload; @@ -1753,7 +1758,7 @@ int tscBuildSTableVgroupMsg(SSqlObj *pSql, SSqlInfo *pInfo) { SSqlCmd *pCmd = &pSql->cmd; char* pMsg = pCmd->payload; - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); + SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd, 0); SSTableVgroupMsg *pStableVgroupMsg = (SSTableVgroupMsg *)pMsg; pStableVgroupMsg->numOfTables = htonl(pQueryInfo->numOfTables); @@ -2096,7 +2101,7 @@ int tscProcessShowRsp(SSqlObj *pSql) { SSqlRes *pRes = &pSql->res; SSqlCmd *pCmd = &pSql->cmd; - SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); + SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, 0); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); @@ -2155,7 +2160,7 @@ static void createHbObj(STscObj* pObj) { pSql->fp = tscProcessHeartBeatRsp; - SQueryInfo *pQueryInfo = tscGetQueryInfoDetailSafely(&pSql->cmd, 0); + SQueryInfo *pQueryInfo = tscGetQueryInfoS(&pSql->cmd, 0); if (pQueryInfo == NULL) { terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; tfree(pSql); @@ -2311,7 +2316,7 @@ int tscProcessRetrieveRspFromNode(SSqlObj *pSql) { pRes->completed = (pRetrieve->completed == 1); pRes->data = pRetrieve->data; - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); + SQueryInfo* pQueryInfo = tscGetActiveQueryInfo(pCmd); if (tscCreateResPointerInfo(pRes, pQueryInfo) != TSDB_CODE_SUCCESS) { return pRes->code; } @@ -2325,6 +2330,7 @@ int tscProcessRetrieveRspFromNode(SSqlObj *pSql) { tscSetResRawPtr(pRes, pQueryInfo); } + prepareInputDataFromUpstream(pRes, pQueryInfo); if (pSql->pSubscription != NULL) { int32_t numOfCols = pQueryInfo->fieldsInfo.numOfOutput; @@ -2364,9 +2370,9 @@ static int32_t getTableMetaFromMnode(SSqlObj *pSql, STableMetaInfo *pTableMetaIn pNew->signature = pNew; pNew->cmd.command = TSDB_SQL_META; - tscAddSubqueryInfo(&pNew->cmd); + tscAddQueryInfo(&pNew->cmd); - SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetailSafely(&pNew->cmd, 0); + SQueryInfo *pNewQueryInfo = tscGetQueryInfoS(&pNew->cmd, 0); pNew->cmd.autoCreated = pSql->cmd.autoCreated; // create table if not exists if (TSDB_CODE_SUCCESS != tscAllocPayload(&pNew->cmd, TSDB_DEFAULT_PAYLOAD_SIZE + pSql->cmd.payloadLen)) { @@ -2400,7 +2406,7 @@ static int32_t getTableMetaFromMnode(SSqlObj *pSql, STableMetaInfo *pTableMetaIn pSql->metaRid = pNew->self; - int32_t code = tscProcessSql(pNew); + int32_t code = tscProcessSql(pNew, NULL); if (code == TSDB_CODE_SUCCESS) { code = TSDB_CODE_TSC_ACTION_IN_PROGRESS; // notify application that current process needs to be terminated } @@ -2455,7 +2461,7 @@ int tscGetTableMetaEx(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo, bool create int tscRenewTableMeta(SSqlObj *pSql, int32_t tableIndex) { SSqlCmd *pCmd = &pSql->cmd; - SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); + SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, 0); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, tableIndex); char name[TSDB_TABLE_FNAME_LEN] = {0}; @@ -2479,7 +2485,7 @@ int tscRenewTableMeta(SSqlObj *pSql, int32_t tableIndex) { } static bool allVgroupInfoRetrieved(SSqlCmd* pCmd, int32_t clauseIndex) { - SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex); + SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, clauseIndex); for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) { STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, i); if (pTableMetaInfo->vgroupList == NULL) { @@ -2506,13 +2512,13 @@ int tscGetSTableVgroupInfo(SSqlObj *pSql, int32_t clauseIndex) { pNew->cmd.command = TSDB_SQL_STABLEVGROUP; // TODO TEST IT - SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetailSafely(&pNew->cmd, 0); + SQueryInfo *pNewQueryInfo = tscGetQueryInfoS(&pNew->cmd, 0); if (pNewQueryInfo == NULL) { tscFreeSqlObj(pNew); return code; } - SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex); + SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, clauseIndex); for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) { STableMetaInfo *pMInfo = tscGetMetaInfo(pQueryInfo, i); STableMeta* pTableMeta = tscTableMetaDup(pMInfo->pTableMeta); @@ -2536,7 +2542,7 @@ int tscGetSTableVgroupInfo(SSqlObj *pSql, int32_t clauseIndex) { pNew->fp = tscTableMetaCallBack; pNew->param = (void *)pSql->self; - code = tscProcessSql(pNew); + code = tscProcessSql(pNew, NULL); if (code == TSDB_CODE_SUCCESS) { code = TSDB_CODE_TSC_ACTION_IN_PROGRESS; } diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 13539a9b19..d845bb1b1e 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -191,7 +191,7 @@ TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass, pSql->fp = syncConnCallback; pSql->param = pSql; - tscProcessSql(pSql); + tscProcessSql(pSql, NULL); tsem_wait(&pSql->rspSem); if (pSql->res.code != TSDB_CODE_SUCCESS) { @@ -265,7 +265,7 @@ TAOS *taos_connect_a(char *ip, char *user, char *pass, char *db, uint16_t port, if (taos) *taos = pObj; pSql->fetchFp = fp; - pSql->res.code = tscProcessSql(pSql); + pSql->res.code = tscProcessSql(pSql, NULL); tscDebug("%p DB async connection is opening", taos); return pObj; } @@ -373,7 +373,7 @@ int taos_num_fields(TAOS_RES *res) { if (pSql == NULL || pSql->signature != pSql) return 0; int32_t num = 0; - SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); + SQueryInfo *pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0); if (pQueryInfo == NULL) { return num; } @@ -407,7 +407,7 @@ TAOS_FIELD *taos_fetch_fields(TAOS_RES *res) { SSqlObj *pSql = (SSqlObj *)res; if (pSql == NULL || pSql->signature != pSql) return 0; - SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); + SQueryInfo *pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0); if (pQueryInfo == NULL) { return NULL; } @@ -558,7 +558,7 @@ static bool tscKillQueryInDnode(SSqlObj* pSql) { return true; } - SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); + SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, 0); if ((pQueryInfo == NULL) || tscIsTwoStageSTableQuery(pQueryInfo, 0)) { return true; @@ -577,7 +577,7 @@ static bool tscKillQueryInDnode(SSqlObj* pSql) { pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH; tscDebug("%p send msg to dnode to free qhandle ASAP before free sqlObj, command:%s", pSql, sqlCmd[pCmd->command]); - tscProcessSql(pSql); + tscProcessSql(pSql, NULL); return false; } @@ -671,7 +671,7 @@ char *taos_get_client_info() { return version; } static void tscKillSTableQuery(SSqlObj *pSql) { SSqlCmd* pCmd = &pSql->cmd; - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); + SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex); if (!tscIsTwoStageSTableQuery(pQueryInfo, 0)) { return; @@ -722,7 +722,7 @@ void taos_stop_query(TAOS_RES *res) { // set the error code for master pSqlObj firstly pSql->res.code = TSDB_CODE_TSC_QUERY_CANCELLED; - SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); + SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex); if (tscIsTwoStageSTableQuery(pQueryInfo, 0)) { assert(pSql->rpcRid <= 0); @@ -752,7 +752,7 @@ bool taos_is_null(TAOS_RES *res, int32_t row, int32_t col) { return true; } - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); + SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0); if (pQueryInfo == NULL) { return true; } @@ -932,7 +932,7 @@ static int tscParseTblNameList(SSqlObj *pSql, const char *tblNameList, int32_t t int code = TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH; char *str = (char *)tblNameList; - SQueryInfo *pQueryInfo = tscGetQueryInfoDetailSafely(pCmd, pCmd->clauseIndex); + SQueryInfo *pQueryInfo = tscGetQueryInfoS(pCmd, pCmd->clauseIndex); if (pQueryInfo == NULL) { pSql->res.code = terrno; return terrno; diff --git a/src/client/src/tscStream.c b/src/client/src/tscStream.c index a9cd1965e8..acaccccab4 100644 --- a/src/client/src/tscStream.c +++ b/src/client/src/tscStream.c @@ -89,7 +89,7 @@ static void doLaunchQuery(void* param, TAOS_RES* tres, int32_t code) { return; } - SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); + SQueryInfo *pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); code = tscGetTableMeta(pSql, pTableMetaInfo); @@ -130,7 +130,7 @@ static void tscProcessStreamTimer(void *handle, void *tmrId) { pStream->numOfRes = 0; // reset the numOfRes. SSqlObj *pSql = pStream->pSql; - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); + SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0); tscDebug("%p add into timer", pSql); if (pStream->isProject) { @@ -208,7 +208,7 @@ static void tscProcessStreamQueryCallback(void *param, TAOS_RES *tres, int numOf static void tscStreamFillTimeGap(SSqlStream* pStream, TSKEY ts) { #if 0 SSqlObj * pSql = pStream->pSql; - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); + SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0); if (pQueryInfo->fillType != TSDB_FILL_SET_VALUE && pQueryInfo->fillType != TSDB_FILL_NULL) { return; @@ -421,7 +421,7 @@ static int32_t tscSetSlidingWindowInfo(SSqlObj *pSql, SSqlStream *pStream) { int64_t minIntervalTime = (pStream->precision == TSDB_TIME_PRECISION_MICRO) ? tsMinIntervalTime * 1000L : tsMinIntervalTime; - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); + SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0); if (!pStream->isProject && pQueryInfo->interval.interval == 0) { sprintf(pSql->cmd.payload, "the interval value is 0"); @@ -471,7 +471,7 @@ static int32_t tscSetSlidingWindowInfo(SSqlObj *pSql, SSqlStream *pStream) { } static int64_t tscGetStreamStartTimestamp(SSqlObj *pSql, SSqlStream *pStream, int64_t stime) { - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); + SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0); if (pStream->isProject) { // no data in table, flush all data till now to destination meter, 10sec delay @@ -530,7 +530,7 @@ static void tscCreateStream(void *param, TAOS_RES *res, int code) { return; } - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); + SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd, 0); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta); diff --git a/src/client/src/tscSub.c b/src/client/src/tscSub.c index 32257f5a7c..7f16aca260 100644 --- a/src/client/src/tscSub.c +++ b/src/client/src/tscSub.c @@ -284,7 +284,7 @@ static int tscUpdateSubscription(STscObj* pObj, SSub* pSub) { } size_t numOfTables = taosArrayGetSize(tables); - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); + SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd, 0); SArray* progress = taosArrayInit(numOfTables, sizeof(SSubscriptionProgress)); for( size_t i = 0; i < numOfTables; i++ ) { STidTags* tt = taosArrayGet( tables, i ); @@ -304,7 +304,7 @@ static int tscUpdateSubscription(STscObj* pObj, SSub* pSub) { } taosArrayDestroy(tables); - TSDB_QUERY_SET_TYPE(tscGetQueryInfoDetail(pCmd, 0)->type, TSDB_QUERY_TYPE_MULTITABLE_QUERY); + TSDB_QUERY_SET_TYPE(tscGetQueryInfo(pCmd, 0)->type, TSDB_QUERY_TYPE_MULTITABLE_QUERY); return 1; } @@ -502,7 +502,7 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) { SSqlRes *pRes = &pSql->res; SSqlCmd *pCmd = &pSql->cmd; STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0); - SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); + SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, 0); if (taosArrayGetSize(pSub->progress) > 0) { // fix crash in single table subscription size_t size = taosArrayGetSize(pSub->progress); diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 22cb580951..11d98ea6f9 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -119,7 +119,7 @@ static bool subAndCheckDone(SSqlObj *pSql, SSqlObj *pParentSql, int idx) { static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSupporter* pSupporter1, SJoinSupporter* pSupporter2, STimeWindow * win) { - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex); + SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, pSql->cmd.clauseIndex); STSBuf* output1 = tsBufCreate(true, pQueryInfo->order.order); STSBuf* output2 = tsBufCreate(true, pQueryInfo->order.order); @@ -130,8 +130,8 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSupporter* pSupporter1, SJ SLimitVal* pLimit = &pQueryInfo->limit; int32_t order = pQueryInfo->order.order; - SQueryInfo* pSubQueryInfo1 = tscGetQueryInfoDetail(&pSql->pSubs[0]->cmd, 0); - SQueryInfo* pSubQueryInfo2 = tscGetQueryInfoDetail(&pSql->pSubs[1]->cmd, 0); + SQueryInfo* pSubQueryInfo1 = tscGetQueryInfo(&pSql->pSubs[0]->cmd, 0); + SQueryInfo* pSubQueryInfo2 = tscGetQueryInfo(&pSql->pSubs[1]->cmd, 0); pSubQueryInfo1->tsBuf = output1; pSubQueryInfo2->tsBuf = output2; @@ -274,7 +274,7 @@ SJoinSupporter* tscCreateJoinSupporter(SSqlObj* pSql, int32_t index) { pSupporter->pObj = pSql; pSupporter->subqueryIndex = index; - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex); + SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, pSql->cmd.clauseIndex); memcpy(&pSupporter->interval, &pQueryInfo->interval, sizeof(pSupporter->interval)); pSupporter->limit = pQueryInfo->limit; @@ -448,7 +448,7 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) { continue; } - SQueryInfo *pSubQueryInfo = tscGetQueryInfoDetail(&pPrevSub->cmd, 0); + SQueryInfo *pSubQueryInfo = tscGetQueryInfo(&pPrevSub->cmd, 0); STSBuf *pTsBuf = pSubQueryInfo->tsBuf; pSubQueryInfo->tsBuf = NULL; @@ -467,7 +467,7 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) { tscClearSubqueryInfo(&pNew->cmd); pSql->pSubs[i] = pNew; - SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0); + SQueryInfo *pQueryInfo = tscGetQueryInfo(&pNew->cmd, 0); pQueryInfo->tsBuf = pTsBuf; // transfer the ownership of timestamp comp-z data to the new created object // set the second stage sub query for join process @@ -514,7 +514,7 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) { int16_t functionId = tscIsProjectionQuery(pQueryInfo)? TSDB_FUNC_PRJ : TSDB_FUNC_TS; tscAddFuncInSelectClause(pQueryInfo, 0, functionId, &index, s, TSDB_COL_NORMAL); - tscPrintSelectClause(pNew, 0); + tscPrintSelNodeList(pNew, 0); tscFieldInfoUpdateOffset(pQueryInfo); pExpr = tscSqlExprGet(pQueryInfo, 0); @@ -564,7 +564,9 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) { continue; } - tscDoQuery(pSql->pSubs[i]); +// tscDoQuery(pSql->pSubs[i]); + SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->pSubs[i]->cmd, 0); + executeQuery(pSql->pSubs[i], pQueryInfo); } return TSDB_CODE_SUCCESS; @@ -702,7 +704,7 @@ static void issueTsCompQuery(SSqlObj* pSql, SJoinSupporter* pSupporter, SSqlObj* tscClearSubqueryInfo(pCmd); tscFreeSqlResult(pSql); - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); + SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd, 0); assert(pQueryInfo->numOfTables == 1); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); @@ -749,7 +751,7 @@ static void issueTsCompQuery(SSqlObj* pSql, SJoinSupporter* pSupporter, SSqlObj* pParent, pSql, 0, pTableMetaInfo->vgroupIndex, pTableMetaInfo->vgroupList->numOfVgroups, pQueryInfo->type, tscSqlExprNumOfExprs(pQueryInfo), numOfCols, pQueryInfo->fieldsInfo.numOfOutput, tNameGetTableName(&pTableMetaInfo->name)); - tscProcessSql(pSql); + tscProcessSql(pSql, NULL); } static bool checkForDuplicateTagVal(SSchema* pColSchema, SJoinSupporter* p1, SSqlObj* pPSqlObj) { @@ -847,7 +849,7 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow SSqlCmd* pCmd = &pSql->cmd; SSqlRes* pRes = &pSql->res; - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); + SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex); // todo, the type may not include TSDB_QUERY_TYPE_TAG_FILTER_QUERY assert(TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_TAG_FILTER_QUERY)); @@ -928,7 +930,7 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow // set the callback function pSql->fp = tscJoinQueryCallback; - tscProcessSql(pSql); + tscProcessSql(pSql, NULL); return; } @@ -967,11 +969,11 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow SSqlCmd* pSubCmd1 = &pParentSql->pSubs[0]->cmd; SSqlCmd* pSubCmd2 = &pParentSql->pSubs[1]->cmd; - SQueryInfo* pQueryInfo1 = tscGetQueryInfoDetail(pSubCmd1, 0); + SQueryInfo* pQueryInfo1 = tscGetQueryInfo(pSubCmd1, 0); STableMetaInfo* pTableMetaInfo1 = tscGetMetaInfo(pQueryInfo1, 0); tscBuildVgroupTableInfo(pParentSql, pTableMetaInfo1, s1); - SQueryInfo* pQueryInfo2 = tscGetQueryInfoDetail(pSubCmd2, 0); + SQueryInfo* pQueryInfo2 = tscGetQueryInfo(pSubCmd2, 0); STableMetaInfo* pTableMetaInfo2 = tscGetMetaInfo(pQueryInfo2, 0); tscBuildVgroupTableInfo(pParentSql, pTableMetaInfo2, s2); @@ -1005,7 +1007,7 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow SSqlCmd* pCmd = &pSql->cmd; SSqlRes* pRes = &pSql->res; - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); + SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex); assert(!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE)); if (pParentSql->res.code != TSDB_CODE_SUCCESS) { @@ -1114,7 +1116,7 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow // set the callback function pSql->fp = tscJoinQueryCallback; - tscProcessSql(pSql); + tscProcessSql(pSql, NULL); return; } @@ -1141,7 +1143,7 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow } // launch the query the retrieve actual results from vnode along with the filtered timestamp - SQueryInfo* pPQueryInfo = tscGetQueryInfoDetail(&pParentSql->cmd, pParentSql->cmd.clauseIndex); + SQueryInfo* pPQueryInfo = tscGetQueryInfo(&pParentSql->cmd, pParentSql->cmd.clauseIndex); updateQueryTimeRange(pPQueryInfo, &win); //update the vgroup that involved in real data query @@ -1157,7 +1159,7 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR SSqlCmd* pCmd = &pSql->cmd; SSqlRes* pRes = &pSql->res; - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); + SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex); if (pParentSql->res.code != TSDB_CODE_SUCCESS) { tscError("%p abort query due to other subquery failure. code:%d, global code:%d", pSql, numOfRows, pParentSql->res.code); @@ -1203,7 +1205,7 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR pSql->cmd.command = TSDB_SQL_SELECT; pSql->fp = tscJoinQueryCallback; - tscProcessSql(pSql); + tscProcessSql(pSql, NULL); return; } else { tscDebug("%p no result in current subquery anymore", pSql); @@ -1266,8 +1268,7 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) { SSqlRes *pRes = &pSub->res; - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSub->cmd, 0); - + SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSub->cmd, 0); if (!tscHasReachLimitation(pQueryInfo, pRes)) { if (pRes->row >= pRes->numOfRows) { // no data left in current result buffer @@ -1319,7 +1320,7 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) { continue; } - SQueryInfo* p = tscGetQueryInfoDetail(&pSub->cmd, 0); + SQueryInfo* p = tscGetQueryInfo(&pSub->cmd, 0); orderedPrjQuery = tscNonOrderedProjectionQueryOnSTable(p, 0); if (orderedPrjQuery) { break; @@ -1343,7 +1344,7 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) { continue; } - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSub->cmd, 0); + SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSub->cmd, 0); if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) && pSub->res.row >= pSub->res.numOfRows && pSub->res.completed) { @@ -1364,7 +1365,7 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) { pSub->cmd.command = TSDB_SQL_SELECT; pSub->fp = tscJoinQueryCallback; - tscProcessSql(pSub); + tscProcessSql(pSub, NULL); tryNextVnode = true; } else { tscDebug("%p no result in current subquery anymore", pSub); @@ -1418,7 +1419,7 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) { pSupporter = (SJoinSupporter*)pSql1->param; // wait for all subqueries completed - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd1, 0); + SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd1, 0); assert(pRes1->numOfRows >= 0 && pQueryInfo->numOfTables == 1); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); @@ -1434,7 +1435,7 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) { pCmd1->command = (pCmd1->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH; } - tscProcessSql(pSql1); + tscProcessSql(pSql1, NULL); } } } @@ -1444,13 +1445,12 @@ void tscSetupOutputColumnIndex(SSqlObj* pSql) { SSqlCmd* pCmd = &pSql->cmd; SSqlRes* pRes = &pSql->res; - // the column transfer support struct has been built if (pRes->pColumnIndex != NULL) { return; } - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); + SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex); int32_t numOfExprs = (int32_t)tscSqlExprNumOfExprs(pQueryInfo); pRes->pColumnIndex = calloc(1, sizeof(SColumnIndex) * numOfExprs); @@ -1474,7 +1474,7 @@ void tscSetupOutputColumnIndex(SSqlObj* pSql) { assert(tableIndexOfSub >= 0 && tableIndexOfSub < pQueryInfo->numOfTables); SSqlCmd* pSubCmd = &pSql->pSubs[tableIndexOfSub]->cmd; - SQueryInfo* pSubQueryInfo = tscGetQueryInfoDetail(pSubCmd, 0); + SQueryInfo* pSubQueryInfo = tscGetQueryInfo(pSubCmd, 0); size_t numOfSubExpr = taosArrayGetSize(pSubQueryInfo->exprList); for (int32_t k = 0; k < numOfSubExpr; ++k) { @@ -1498,7 +1498,7 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) { SSqlObj* pParentSql = pSupporter->pObj; // There is only one subquery and table for each subquery. - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); + SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); assert(pQueryInfo->numOfTables == 1 && pSql->cmd.numOfClause == 1); @@ -1535,7 +1535,7 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) { if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_TAG_FILTER_QUERY)) { pSql->fp = tidTagRetrieveCallback; pSql->cmd.command = TSDB_SQL_FETCH; - tscProcessSql(pSql); + tscProcessSql(pSql, NULL); return; } @@ -1543,7 +1543,7 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) { if (!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE)) { pSql->fp = tsCompRetrieveCallback; pSql->cmd.command = TSDB_SQL_FETCH; - tscProcessSql(pSql); + tscProcessSql(pSql, NULL); return; } @@ -1564,7 +1564,7 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) { pSql->fp = joinRetrieveFinalResCallback; // continue retrieve data pSql->cmd.command = TSDB_SQL_FETCH; - tscProcessSql(pSql); + tscProcessSql(pSql, NULL); } else { // first retrieve from vnode during the secondary stage sub-query // set the command flag must be after the semaphore been correctly set. if (pParentSql->res.code == TSDB_CODE_SUCCESS) { @@ -1582,7 +1582,7 @@ static SSqlObj *tscCreateSTableSubquery(SSqlObj *pSql, SRetrieveSupport *trsuppo int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter *pSupporter) { SSqlCmd * pCmd = &pSql->cmd; - SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); + SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex); pSql->res.qhandle = 0x1; assert(pSql->res.numOfRows == 0); @@ -1605,7 +1605,7 @@ int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter addGroupInfoForSubquery(pSql, pNew, 0, tableIndex); // refactor as one method - SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0); + SQueryInfo *pNewQueryInfo = tscGetQueryInfo(&pNew->cmd, 0); assert(pNewQueryInfo != NULL); // update the table index @@ -1714,7 +1714,7 @@ int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter } } else { assert(0); - SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0); + SQueryInfo *pNewQueryInfo = tscGetQueryInfo(&pNew->cmd, 0); pNewQueryInfo->type |= TSDB_QUERY_TYPE_SUBQUERY; } @@ -1725,7 +1725,7 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) { SSqlCmd* pCmd = &pSql->cmd; SSqlRes* pRes = &pSql->res; - SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); + SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex); assert((pQueryInfo->type & TSDB_QUERY_TYPE_SUBQUERY) == 0); int32_t code = TSDB_CODE_SUCCESS; @@ -1744,13 +1744,10 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) { memset(pSql->subState.states, 0, sizeof(*pSql->subState.states) * pSql->subState.numOfSub); tscDebug("%p reset all sub states to 0", pSql); - bool hasEmptySub = false; - tscDebug("%p start subquery, total:%d", pSql, pQueryInfo->numOfTables); - for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) { + for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) { SJoinSupporter *pSupporter = tscCreateJoinSupporter(pSql, i); - if (pSupporter == NULL) { // failed to create support struct, abort current query tscError("%p tableIndex:%d, failed to allocate join support object, abort further query", pSql, i); code = TSDB_CODE_TSC_OUT_OF_MEMORY; @@ -1766,14 +1763,13 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) { SSqlObj* pSub = pSql->pSubs[i]; STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSub->cmd, 0, 0); if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo) && (pTableMetaInfo->vgroupList->numOfVgroups == 0)) { - hasEmptySub = true; + pSql->cmd.command = TSDB_SQL_RETRIEVE_EMPTY_RESULT; break; } } - if (hasEmptySub) { // at least one subquery is empty, do nothing and return + if (pSql->cmd.command == TSDB_SQL_RETRIEVE_EMPTY_RESULT) { // at least one subquery is empty, do nothing and return freeJoinSubqueryObj(pSql); - pSql->cmd.command = TSDB_SQL_RETRIEVE_EMPTY_RESULT; (*pSql->fp)(pSql->param, pSql, 0); } else { int fail = 0; @@ -1784,7 +1780,7 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) { continue; } - if ((code = tscProcessSql(pSub)) != TSDB_CODE_SUCCESS) { + if ((code = tscProcessSql(pSub, NULL)) != TSDB_CODE_SUCCESS) { pRes->code = code; (*pSub->fp)(pSub->param, pSub, 0); fail = 1; @@ -1914,7 +1910,7 @@ void tscFirstRoundRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) { SFirstRoundQuerySup* pSup = param; SSqlObj* pParent = pSup->pParent; - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); + SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0); int32_t code = taos_errno(pSql); if (code != TSDB_CODE_SUCCESS) { @@ -1997,7 +1993,7 @@ void tscFirstRoundRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) { // set the parameters for the second round query process SSqlCmd *pPCmd = &pParent->cmd; - SQueryInfo *pQueryInfo1 = tscGetQueryInfoDetail(pPCmd, 0); + SQueryInfo *pQueryInfo1 = tscGetQueryInfo(pPCmd, 0); int32_t resRows = pSup->numOfRows; if (pSup->numOfRows > 0) { @@ -2024,7 +2020,8 @@ void tscFirstRoundRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) { } pQueryInfo1->round = 1; - tscDoQuery(pParent); +// tscDoQuery(pParent); + executeQuery(pParent, pQueryInfo1); } void tscFirstRoundCallback(void* param, TAOS_RES* tres, int code) { @@ -2047,7 +2044,7 @@ void tscFirstRoundCallback(void* param, TAOS_RES* tres, int code) { } int32_t tscHandleFirstRoundStableQuery(SSqlObj *pSql) { - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); + SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0); STableMetaInfo* pTableMetaInfo1 = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0); SFirstRoundQuerySup *pSup = calloc(1, sizeof(SFirstRoundQuerySup)); @@ -2063,7 +2060,7 @@ int32_t tscHandleFirstRoundStableQuery(SSqlObj *pSql) { tscClearSubqueryInfo(pCmd); tscFreeSqlResult(pSql); - SQueryInfo* pNewQueryInfo = tscGetQueryInfoDetail(pCmd, 0); + SQueryInfo* pNewQueryInfo = tscGetQueryInfo(pCmd, 0); assert(pQueryInfo->numOfTables == 1); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pNewQueryInfo, 0); @@ -2186,7 +2183,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { const uint32_t nBufferSize = (1u << 16u); // 64KB - SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); + SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); SSubqueryState *pState = &pSql->subState; @@ -2268,7 +2265,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { // todo handle multi-vnode situation if (pQueryInfo->tsBuf) { - SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0); + SQueryInfo *pNewQueryInfo = tscGetQueryInfo(&pNew->cmd, 0); pNewQueryInfo->tsBuf = tsBufClone(pQueryInfo->tsBuf); assert(pNewQueryInfo->tsBuf != NULL); } @@ -2296,7 +2293,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { SRetrieveSupport* pSupport = pSub->param; tscDebug("%p sub:%p launch subquery, orderOfSub:%d.", pSql, pSub, pSupport->subqueryIndex); - tscProcessSql(pSub); + tscProcessSql(pSub, NULL); } return TSDB_CODE_SUCCESS; @@ -2376,7 +2373,7 @@ static int32_t tscReissueSubquery(SRetrieveSupport *oriTrs, SSqlObj *pSql, int32 return pParentSql->res.code; } - int32_t ret = tscProcessSql(pNew); + int32_t ret = tscProcessSql(pNew, NULL); *sent = 1; @@ -2456,7 +2453,7 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO tscFreeRetrieveSup(pSql); // in case of second stage join subquery, invoke its callback function instead of regular QueueAsyncRes - SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pParentSql->cmd, 0); + SQueryInfo *pQueryInfo = tscGetQueryInfo(&pParentSql->cmd, 0); if (!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE)) { (*pParentSql->fp)(pParentSql->param, pParentSql, pParentSql->res.code); @@ -2473,7 +2470,7 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p tOrderDescriptor *pDesc = trsupport->pOrderDescriptor; SSubqueryState* pState = &pParentSql->subState; - SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); + SQueryInfo *pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0); STableMetaInfo* pTableMetaInfo = pQueryInfo->pTableMetaInfo[0]; @@ -2521,7 +2518,7 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p tscDebug("%p retrieve from %d vnodes completed.final NumOfRows:%" PRId64 ",start to build loser tree", pParentSql, pState->numOfSub, pState->numOfRetrievedRows); - SQueryInfo *pPQueryInfo = tscGetQueryInfoDetail(&pParentSql->cmd, 0); + SQueryInfo *pPQueryInfo = tscGetQueryInfo(&pParentSql->cmd, 0); tscClearInterpInfo(pPQueryInfo); tscCreateLocalMerger(trsupport->pExtMemBuffer, pState->numOfSub, pDesc, trsupport->pFinalColModel, trsupport->pFFColModel, pParentSql); @@ -2597,7 +2594,7 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR } SSqlRes * pRes = &pSql->res; - SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); + SQueryInfo *pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0); if (numOfRows > 0) { assert(pRes->numOfRows == numOfRows); @@ -2649,7 +2646,7 @@ static SSqlObj *tscCreateSTableSubquery(SSqlObj *pSql, SRetrieveSupport *trsuppo SSqlObj *pNew = createSubqueryObj(pSql, table_index, tscRetrieveDataRes, trsupport, TSDB_SQL_SELECT, prevSqlObj); if (pNew != NULL) { // the sub query of two-stage super table query - SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0); + SQueryInfo *pQueryInfo = tscGetQueryInfo(&pNew->cmd, 0); pQueryInfo->type |= TSDB_QUERY_TYPE_STABLE_SUBQUERY; @@ -2683,7 +2680,7 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) { SSqlObj* pParentSql = trsupport->pParentSql; SSqlObj* pSql = (SSqlObj *) tres; - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); + SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0); assert(pSql->cmd.numOfClause == 1 && pQueryInfo->numOfTables == 1); STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0); @@ -2821,7 +2818,7 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) // clean up tableMeta in cache tscFreeQueryInfo(&pSql->cmd, false); - SQueryInfo* pQueryInfo = tscGetQueryInfoDetailSafely(&pSql->cmd, 0); + SQueryInfo* pQueryInfo = tscGetQueryInfoS(&pSql->cmd, 0); STableMetaInfo* pMasterTableMetaInfo = tscGetTableMetaInfoFromCmd(&pParentObj->cmd, pSql->cmd.clauseIndex, 0); tscAddTableMetaInfo(pQueryInfo, &pMasterTableMetaInfo->name, NULL, NULL, NULL, NULL); @@ -2860,7 +2857,7 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) return; } - tscDoQuery(pParentObj); + tscHandleMultivnodeInsert(pParentObj); } } @@ -2884,7 +2881,7 @@ int32_t tscHandleInsertRetry(SSqlObj* pParent, SSqlObj* pSql) { return code; // here the pSql may have been released already. } - return tscProcessSql(pSql); + return tscProcessSql(pSql, NULL); } int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) { @@ -2983,7 +2980,7 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) { for (int32_t j = 0; j < numOfSub; ++j) { SSqlObj *pSub = pSql->pSubs[j]; tscDebug("%p sub:%p launch sub insert, orderOfSub:%d", pSql, pSub, j); - tscProcessSql(pSub); + tscProcessSql(pSub, NULL); } return TSDB_CODE_SUCCESS; @@ -2993,7 +2990,7 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) { } static char* getResultBlockPosition(SSqlCmd* pCmd, SSqlRes* pRes, int32_t columnIndex, int16_t* bytes) { - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); + SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex); SInternalField* pInfo = (SInternalField*) TARRAY_GET_ELEM(pQueryInfo->fieldsInfo.internalField, columnIndex); assert(pInfo->pSqlExpr != NULL); @@ -3007,7 +3004,7 @@ static char* getResultBlockPosition(SSqlCmd* pCmd, SSqlRes* pRes, int32_t column static void doBuildResFromSubqueries(SSqlObj* pSql) { SSqlRes* pRes = &pSql->res; - SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex); + SQueryInfo *pQueryInfo = tscGetQueryInfo(&pSql->cmd, pSql->cmd.clauseIndex); int32_t numOfRes = INT32_MAX; for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { @@ -3090,7 +3087,7 @@ void tscBuildResFromSubqueries(SSqlObj *pSql) { } if (pRes->tsrow == NULL) { - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex); + SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, pSql->cmd.clauseIndex); pRes->numOfCols = (int16_t) tscSqlExprNumOfExprs(pQueryInfo); pRes->tsrow = calloc(pRes->numOfCols, POINTER_BYTES); @@ -3144,7 +3141,7 @@ TAOS_ROW doSetResultRowData(SSqlObj *pSql) { return pRes->tsrow; } - SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); + SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex); size_t size = tscNumOfFields(pQueryInfo); for (int i = 0; i < size; ++i) { @@ -3171,7 +3168,7 @@ static UNUSED_FUNC bool tscHasRemainDataInSubqueryResultSet(SSqlObj *pSql) { bool hasData = true; SSqlCmd *pCmd = &pSql->cmd; - SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); + SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex); if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) { bool allSubqueryExhausted = true; @@ -3183,7 +3180,7 @@ static UNUSED_FUNC bool tscHasRemainDataInSubqueryResultSet(SSqlObj *pSql) { SSqlRes *pRes1 = &pSql->pSubs[i]->res; SSqlCmd *pCmd1 = &pSql->pSubs[i]->cmd; - SQueryInfo *pQueryInfo1 = tscGetQueryInfoDetail(pCmd1, pCmd1->clauseIndex); + SQueryInfo *pQueryInfo1 = tscGetQueryInfo(pCmd1, pCmd1->clauseIndex); assert(pQueryInfo1->numOfTables == 1); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo1, 0); @@ -3207,7 +3204,7 @@ static UNUSED_FUNC bool tscHasRemainDataInSubqueryResultSet(SSqlObj *pSql) { } SSqlRes * pRes1 = &pSql->pSubs[i]->res; - SQueryInfo *pQueryInfo1 = tscGetQueryInfoDetail(&pSql->pSubs[i]->cmd, 0); + SQueryInfo *pQueryInfo1 = tscGetQueryInfo(&pSql->pSubs[i]->cmd, 0); if ((pRes1->row >= pRes1->numOfRows && tscHasReachLimitation(pQueryInfo1, pRes1) && tscIsProjectionQuery(pQueryInfo1)) || (pRes1->numOfRows == 0)) { diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index e899450261..b12eaeb494 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -371,6 +371,10 @@ void tscSetResRawPtr(SSqlRes* pRes, SQueryInfo* pQueryInfo) { } } +void prepareInputDataFromUpstream(SSqlRes* pRes, SQueryInfo* pQueryInfo) { + printf("abc\n"); +} + static void tscDestroyResPointerInfo(SSqlRes* pRes) { if (pRes->buffer != NULL) { // free all buffers containing the multibyte string for (int i = 0; i < pRes->numOfCols; i++) { @@ -404,7 +408,7 @@ void tscFreeQueryInfo(SSqlCmd* pCmd, bool removeMeta) { } for (int32_t i = 0; i < pCmd->numOfClause; ++i) { - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, i); + SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd, i); freeQueryInfoImpl(pQueryInfo); clearAllTableMetaInfo(pQueryInfo, removeMeta); @@ -669,6 +673,10 @@ int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock) { return TSDB_CODE_SUCCESS; } +SQueryInfo* tscGetActiveQueryInfo(SSqlCmd* pCmd) { + return pCmd->active; +} + /** * create the in-memory buffer for each table to keep the submitted data block * @param initialSize @@ -1697,7 +1705,7 @@ STableMetaInfo* tscGetTableMetaInfoFromCmd(SSqlCmd* pCmd, int32_t clauseIndex, i assert(clauseIndex >= 0 && clauseIndex < pCmd->numOfClause); - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex); + SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd, clauseIndex); return tscGetMetaInfo(pQueryInfo, tableIndex); } @@ -1714,17 +1722,17 @@ STableMetaInfo* tscGetMetaInfo(SQueryInfo* pQueryInfo, int32_t tableIndex) { return pQueryInfo->pTableMetaInfo[tableIndex]; } -SQueryInfo* tscGetQueryInfoDetailSafely(SSqlCmd* pCmd, int32_t subClauseIndex) { - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, subClauseIndex); +SQueryInfo* tscGetQueryInfoS(SSqlCmd* pCmd, int32_t subClauseIndex) { + SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd, subClauseIndex); int32_t ret = TSDB_CODE_SUCCESS; while ((pQueryInfo) == NULL) { - if ((ret = tscAddSubqueryInfo(pCmd)) != TSDB_CODE_SUCCESS) { + if ((ret = tscAddQueryInfo(pCmd)) != TSDB_CODE_SUCCESS) { terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; return NULL; } - pQueryInfo = tscGetQueryInfoDetail(pCmd, subClauseIndex); + pQueryInfo = tscGetQueryInfo(pCmd, subClauseIndex); } return pQueryInfo; @@ -1753,18 +1761,20 @@ void tscInitQueryInfo(SQueryInfo* pQueryInfo) { pQueryInfo->fieldsInfo.internalField = taosArrayInit(4, sizeof(SInternalField)); assert(pQueryInfo->exprList == NULL); - pQueryInfo->exprList = taosArrayInit(4, POINTER_BYTES); - pQueryInfo->colList = taosArrayInit(4, POINTER_BYTES); - pQueryInfo->udColumnId = TSDB_UD_COLUMN_INDEX; - pQueryInfo->resColumnId = -1000; - pQueryInfo->limit.limit = -1; - pQueryInfo->limit.offset = 0; - pQueryInfo->slimit.limit = -1; - pQueryInfo->slimit.offset = 0; + pQueryInfo->exprList = taosArrayInit(4, POINTER_BYTES); + pQueryInfo->colList = taosArrayInit(4, POINTER_BYTES); + pQueryInfo->udColumnId = TSDB_UD_COLUMN_INDEX; + pQueryInfo->resColumnId = -1000; + pQueryInfo->limit.limit = -1; + pQueryInfo->limit.offset = 0; + + pQueryInfo->slimit.limit = -1; + pQueryInfo->slimit.offset = 0; + pQueryInfo->pUpstream = taosArrayInit(4, POINTER_BYTES); } -int32_t tscAddSubqueryInfo(SSqlCmd* pCmd) { +int32_t tscAddQueryInfo(SSqlCmd* pCmd) { assert(pCmd != NULL); // todo refactor: remove this structure @@ -1814,7 +1824,7 @@ static void freeQueryInfoImpl(SQueryInfo* pQueryInfo) { void tscClearSubqueryInfo(SSqlCmd* pCmd) { for (int32_t i = 0; i < pCmd->numOfClause; ++i) { - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, i); + SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd, i); freeQueryInfoImpl(pQueryInfo); } } @@ -2001,7 +2011,7 @@ SSqlObj* createSimpleSubObj(SSqlObj* pSql, __async_cb_func_t fp, void* param, in return NULL; } - if (tscAddSubqueryInfo(pCmd) != TSDB_CODE_SUCCESS) { + if (tscAddQueryInfo(pCmd) != TSDB_CODE_SUCCESS) { #ifdef __APPLE__ // to satisfy later tsem_destroy in taos_free_result tsem_init(&pNew->rspSem, 0, 0); @@ -2016,7 +2026,7 @@ SSqlObj* createSimpleSubObj(SSqlObj* pSql, __async_cb_func_t fp, void* param, in pNew->sqlstr = NULL; pNew->maxRetry = TSDB_MAX_REPLICA; - SQueryInfo* pQueryInfo = tscGetQueryInfoDetailSafely(pCmd, 0); + SQueryInfo* pQueryInfo = tscGetQueryInfoS(pCmd, 0); assert(pSql->cmd.clauseIndex == 0); STableMetaInfo* pMasterTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, pSql->cmd.clauseIndex, 0); @@ -2091,7 +2101,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t pnCmd->payload = NULL; pnCmd->allocSize = 0; - pnCmd->pQueryInfo = NULL; + pnCmd->pQueryInfo = NULL; pnCmd->numOfClause = 0; pnCmd->clauseIndex = 0; pnCmd->pDataBlocks = NULL; @@ -2103,13 +2113,13 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t pnCmd->tagData.data = NULL; pnCmd->tagData.dataLen = 0; - if (tscAddSubqueryInfo(pnCmd) != TSDB_CODE_SUCCESS) { + if (tscAddQueryInfo(pnCmd) != TSDB_CODE_SUCCESS) { terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; goto _error; } - SQueryInfo* pNewQueryInfo = tscGetQueryInfoDetail(pnCmd, 0); - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); + SQueryInfo* pNewQueryInfo = tscGetQueryInfo(pnCmd, 0); + SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex); pNewQueryInfo->command = pQueryInfo->command; memcpy(&pNewQueryInfo->interval, &pQueryInfo->interval, sizeof(pNewQueryInfo->interval)); @@ -2171,7 +2181,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t // set the correct query type if (pPrevSql != NULL) { - SQueryInfo* pPrevQueryInfo = tscGetQueryInfoDetail(&pPrevSql->cmd, pPrevSql->cmd.clauseIndex); + SQueryInfo* pPrevQueryInfo = tscGetQueryInfo(&pPrevSql->cmd, pPrevSql->cmd.clauseIndex); pNewQueryInfo->type = pPrevQueryInfo->type; } else { TSDB_QUERY_SET_TYPE(pNewQueryInfo->type, TSDB_QUERY_TYPE_SUBQUERY);// it must be the subquery @@ -2241,7 +2251,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t size, pNewQueryInfo->fieldsInfo.numOfOutput, tNameGetTableName(&pFinalInfo->name), pNewQueryInfo->window.skey, pNewQueryInfo->window.ekey, pNewQueryInfo->order.order, pNewQueryInfo->limit.limit); - tscPrintSelectClause(pNew, 0); + tscPrintSelNodeList(pNew, 0); } else { tscDebug("%p new sub insertion: %p, vnodeIdx:%d", pSql, pNew, pTableMetaInfo->vgroupIndex); } @@ -2254,6 +2264,41 @@ _error: return NULL; } +void doExecuteQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) { + uint16_t type = pQueryInfo->type; + if (QUERY_IS_JOIN_QUERY(type) && !TSDB_QUERY_HAS_TYPE(type, TSDB_QUERY_TYPE_SUBQUERY)) { + tscHandleMasterJoinQuery(pSql); + } else if (tscMultiRoundQuery(pQueryInfo, 0) && pQueryInfo->round == 0) { + tscHandleFirstRoundStableQuery(pSql); // todo lock? + } else if (tscIsTwoStageSTableQuery(pQueryInfo, 0)) { // super table query + tscLockByThread(&pSql->squeryLock); + tscHandleMasterSTableQuery(pSql); + tscUnlockByThread(&pSql->squeryLock); + } else if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_INSERT)) { + tscHandleMultivnodeInsert(pSql); + } else if (pSql->cmd.command > TSDB_SQL_LOCAL) { + tscProcessLocalCmd(pSql); + } else { // send request to server directly + tscProcessSql(pSql, pQueryInfo); + } +} + +// do execute the query according to the query execution plan +void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) { + if (taosArrayGetSize(pQueryInfo->pUpstream) > 0) { // nest query. do execute it firstly + SQueryInfo* pq = taosArrayGetP(pQueryInfo->pUpstream, 0); + + pSql->cmd.active = pq; + executeQuery(pSql, pq); + + // merge nest query result and generate final results + return; + } + + pSql->cmd.active = pQueryInfo; + doExecuteQuery(pSql, pQueryInfo); +} + /** * To decide if current is a two-stage super table query, join query, or insert. And invoke different * procedure accordingly @@ -2277,27 +2322,22 @@ void tscDoQuery(SSqlObj* pSql) { if (pCmd->dataSourceType == DATA_FROM_DATA_FILE) { tscImportDataFromFile(pSql); } else { - SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); + SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex); uint16_t type = pQueryInfo->type; - if (TSDB_QUERY_HAS_TYPE(type, TSDB_QUERY_TYPE_INSERT)) { // multi-vnodes insertion - tscHandleMultivnodeInsert(pSql); - return; - } - if (QUERY_IS_JOIN_QUERY(type)) { if (!TSDB_QUERY_HAS_TYPE(type, TSDB_QUERY_TYPE_SUBQUERY)) { tscHandleMasterJoinQuery(pSql); } else { // for first stage sub query, iterate all vnodes to get all timestamp if (!TSDB_QUERY_HAS_TYPE(type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE)) { - tscProcessSql(pSql); + tscProcessSql(pSql, NULL); } else { // secondary stage join query. if (tscIsTwoStageSTableQuery(pQueryInfo, 0)) { // super table query tscLockByThread(&pSql->squeryLock); tscHandleMasterSTableQuery(pSql); tscUnlockByThread(&pSql->squeryLock); } else { - tscProcessSql(pSql); + tscProcessSql(pSql, NULL); } } } @@ -2313,7 +2353,7 @@ void tscDoQuery(SSqlObj* pSql) { return; } - tscProcessSql(pSql); + tscProcessSql(pSql, NULL); } } @@ -2368,7 +2408,7 @@ bool tscIsQueryWithLimit(SSqlObj* pSql) { SSqlCmd* pCmd = &pSql->cmd; for (int32_t i = 0; i < pCmd->numOfClause; ++i) { - SQueryInfo* pqi = tscGetQueryInfoDetailSafely(pCmd, i); + SQueryInfo* pqi = tscGetQueryInfoS(pCmd, i); if (pqi == NULL) { continue; } @@ -2453,7 +2493,7 @@ bool hasMoreVnodesToTry(SSqlObj* pSql) { } assert(pRes->completed); - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); + SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); // for normal table, no need to try any more if results are all retrieved from one vnode @@ -2478,7 +2518,7 @@ void tscTryQueryNextVnode(SSqlObj* pSql, __async_cb_func_t fp) { SSqlCmd* pCmd = &pSql->cmd; SSqlRes* pRes = &pSql->res; - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); + SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex); /* * no result returned from the current virtual node anymore, try the next vnode if exists @@ -2524,7 +2564,7 @@ void tscTryQueryNextVnode(SSqlObj* pSql, __async_cb_func_t fp) { // set the callback function pSql->fp = fp; - tscProcessSql(pSql); + tscProcessSql(pSql, NULL); } else { tscDebug("%p try all %d vnodes, query complete. current numOfRes:%" PRId64, pSql, totalVgroups, pRes->numOfClauseTotal); } @@ -2538,7 +2578,7 @@ void tscTryQueryNextClause(SSqlObj* pSql, __async_cb_func_t fp) { assert(pCmd->clauseIndex < pCmd->numOfClause - 1); pCmd->clauseIndex++; - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); + SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex); pSql->cmd.command = pQueryInfo->command; @@ -2556,7 +2596,7 @@ void tscTryQueryNextClause(SSqlObj* pSql, __async_cb_func_t fp) { if (pCmd->command > TSDB_SQL_LOCAL) { tscProcessLocalCmd(pSql); } else { - tscDoQuery(pSql); + executeQuery(pSql, pQueryInfo); } } diff --git a/src/inc/taosdef.h b/src/inc/taosdef.h index d9a50e8914..f574a02538 100644 --- a/src/inc/taosdef.h +++ b/src/inc/taosdef.h @@ -388,9 +388,10 @@ typedef enum { typedef enum { TSDB_SUPER_TABLE = 0, // super table TSDB_CHILD_TABLE = 1, // table created from super table - TSDB_NORMAL_TABLE = 2, // ordinary table - TSDB_STREAM_TABLE = 3, // table created from stream computing - TSDB_TABLE_MAX = 4 + TSDB_NORMAL_TABLE = 2, // ordinary table + TSDB_STREAM_TABLE = 3, // table created from stream computing + TSDB_TEMP_TABLE = 4, // temp table created by nest query + TSDB_TABLE_MAX = 5 } ETableType; typedef enum { diff --git a/src/query/inc/qSqlparser.h b/src/query/inc/qSqlparser.h index 3ce81787f0..d684dc31c9 100644 --- a/src/query/inc/qSqlparser.h +++ b/src/query/inc/qSqlparser.h @@ -86,7 +86,7 @@ typedef struct SSessionWindowVal { struct SFromInfo; typedef struct SQuerySqlNode { - struct SArray *pSelectList; // select clause + struct SArray *pSelNodeList; // select clause struct SFromInfo *from; // from clause SArray struct tSqlExpr *pWhere; // where clause [optional] SArray *pGroupby; // groupby clause, only for tags[optional], SArray @@ -113,7 +113,7 @@ typedef struct SSubclauseInfo { // "UNION" multiple select sub-clause typedef struct SFromInfo { int32_t type; // nested query|table name list union { - SSubclauseInfo *pNode; + SSubclauseInfo pNode; SArray *tableList; // SArray }; } SFromInfo; @@ -254,7 +254,7 @@ SArray *tVariantListInsert(SArray *pList, tVariant *pVar, uint8_t sortOrder, int SArray *tVariantListAppendToken(SArray *pList, SStrToken *pAliasToken, uint8_t sortOrder); SFromInfo *setTableNameList(SFromInfo* pFromInfo, SStrToken *pName, SStrToken* pAlias); -SFromInfo *setSubquery(SFromInfo* pFromInfo, SQuerySqlNode *pSqlNode); +SFromInfo *setSubquery(SFromInfo* pFromInfo, SSubclauseInfo* pSqlNode); void *destroyFromInfo(SFromInfo* pFromInfo); // sql expr leaf node @@ -270,7 +270,7 @@ void tSqlExprDestroy(tSqlExpr *pExpr); SArray *tSqlExprListAppend(SArray *pList, tSqlExpr *pNode, SStrToken *pDistinct, SStrToken *pToken); void tSqlExprListDestroy(SArray *pList); -SQuerySqlNode *tSetQuerySqlNode(SStrToken *pSelectToken, SArray *pSelectList, SFromInfo *pFrom, tSqlExpr *pWhere, +SQuerySqlNode *tSetQuerySqlNode(SStrToken *pSelectToken, SArray *pSelNodeList, SFromInfo *pFrom, tSqlExpr *pWhere, SArray *pGroupby, SArray *pSortOrder, SIntervalVal *pInterval, SSessionWindowVal *ps, SStrToken *pSliding, SArray *pFill, SLimitVal *pLimit, SLimitVal *pgLimit); diff --git a/src/query/inc/sql.y b/src/query/inc/sql.y index 7f6aa1ca5f..8b537ffe5d 100644 --- a/src/query/inc/sql.y +++ b/src/query/inc/sql.y @@ -507,33 +507,26 @@ distinct(X) ::= . { X.n = 0;} // A complete FROM clause. %type from {SFromInfo*} from(A) ::= FROM tablelist(X). {A = X;} -from(A) ::= FROM LP union(Y) RP. {A = Y;} +from(A) ::= FROM LP union(Y) RP. {A = setSubquery(NULL, Y);} -%type tablelist {SArray*} +%type tablelist {SFromInfo*} tablelist(A) ::= ids(X) cpxName(Y). { - toTSDBType(X.type); X.n += Y.n; A = setTableNameList(NULL, &X, NULL); } tablelist(A) ::= ids(X) cpxName(Y) ids(Z). { - toTSDBType(X.type); - toTSDBType(Z.type); X.n += Y.n; A = setTableNameList(NULL, &X, &Z); } tablelist(A) ::= tablelist(Y) COMMA ids(X) cpxName(Z). { - toTSDBType(X.type); X.n += Z.n; A = setTableNameList(Y, &X, NULL); } tablelist(A) ::= tablelist(Y) COMMA ids(X) cpxName(Z) ids(F). { - toTSDBType(X.type); - toTSDBType(F.type); X.n += Z.n; - A = setTableNameList(Y, &X, &F); } diff --git a/src/query/src/qSqlParser.c b/src/query/src/qSqlParser.c index e76b78c523..19bd2d35fc 100644 --- a/src/query/src/qSqlParser.c +++ b/src/query/src/qSqlParser.c @@ -462,13 +462,13 @@ SFromInfo *setTableNameList(SFromInfo* pFromInfo, SStrToken *pName, SStrToken* p return pFromInfo; } -SFromInfo *setSubquery(SFromInfo* pFromInfo, SQuerySqlNode* pSqlNode) { +SFromInfo *setSubquery(SFromInfo* pFromInfo, SSubclauseInfo* pSqlNode) { if (pFromInfo == NULL) { pFromInfo = calloc(1, sizeof(SFromInfo)); } pFromInfo->type = SQL_NODE_FROM_SUBQUERY; - pFromInfo->pNode->pClause[pFromInfo->pNode->numOfClause - 1] = pSqlNode; + pFromInfo->pNode = *pSqlNode; return pFromInfo; } @@ -481,7 +481,7 @@ void* destroyFromInfo(SFromInfo* pFromInfo) { if (pFromInfo->type == SQL_NODE_FROM_NAMELIST) { taosArrayDestroy(pFromInfo->tableList); } else { - destroyAllSelectClause(pFromInfo->pNode); + destroyAllSelectClause(&pFromInfo->pNode); } tfree(pFromInfo); @@ -628,11 +628,11 @@ void tSetColumnType(TAOS_FIELD *pField, SStrToken *type) { /* * extract the select info out of sql string */ -SQuerySqlNode *tSetQuerySqlNode(SStrToken *pSelectToken, SArray *pSelectList, SFromInfo *pFrom, tSqlExpr *pWhere, +SQuerySqlNode *tSetQuerySqlNode(SStrToken *pSelectToken, SArray *pSelNodeList, SFromInfo *pFrom, tSqlExpr *pWhere, SArray *pGroupby, SArray *pSortOrder, SIntervalVal *pInterval, SSessionWindowVal *pSession, SStrToken *pSliding, SArray *pFill, SLimitVal *pLimit, SLimitVal *psLimit) { - assert(pSelectList != NULL); + assert(pSelNodeList != NULL); SQuerySqlNode *pSqlNode = calloc(1, sizeof(SQuerySqlNode)); @@ -640,7 +640,7 @@ SQuerySqlNode *tSetQuerySqlNode(SStrToken *pSelectToken, SArray *pSelectList, SF pSqlNode->sqlstr = *pSelectToken; pSqlNode->sqlstr.n = (uint32_t)strlen(pSqlNode->sqlstr.z); - pSqlNode->pSelectList = pSelectList; + pSqlNode->pSelNodeList = pSelNodeList; pSqlNode->from = pFrom; pSqlNode->pGroupby = pGroupby; pSqlNode->pSortOrder = pSortOrder; @@ -702,9 +702,9 @@ void destroyQuerySqlNode(SQuerySqlNode *pQuerySql) { return; } - tSqlExprListDestroy(pQuerySql->pSelectList); + tSqlExprListDestroy(pQuerySql->pSelNodeList); - pQuerySql->pSelectList = NULL; + pQuerySql->pSelNodeList = NULL; tSqlExprDestroy(pQuerySql->pWhere); pQuerySql->pWhere = NULL; diff --git a/src/query/src/sql.c b/src/query/src/sql.c index 98304d636f..e976e5ebe2 100644 --- a/src/query/src/sql.c +++ b/src/query/src/sql.c @@ -2649,45 +2649,38 @@ static void yy_reduce( yymsp[0].minor.yy0 = yylhsminor.yy0; break; case 171: /* from ::= FROM tablelist */ -{yymsp[-1].minor.yy70 = yymsp[0].minor.yy429;} +{yymsp[-1].minor.yy70 = yymsp[0].minor.yy70;} break; case 172: /* from ::= FROM LP union RP */ -{yymsp[-3].minor.yy70 = yymsp[-1].minor.yy141;} +{yymsp[-3].minor.yy70 = setSubquery(NULL, yymsp[-1].minor.yy141);} break; case 173: /* tablelist ::= ids cpxName */ { - toTSDBType(yymsp[-1].minor.yy0.type); yymsp[-1].minor.yy0.n += yymsp[0].minor.yy0.n; - yylhsminor.yy429 = setTableNameList(NULL, &yymsp[-1].minor.yy0, NULL); + yylhsminor.yy70 = setTableNameList(NULL, &yymsp[-1].minor.yy0, NULL); } - yymsp[-1].minor.yy429 = yylhsminor.yy429; + yymsp[-1].minor.yy70 = yylhsminor.yy70; break; case 174: /* tablelist ::= ids cpxName ids */ { - toTSDBType(yymsp[-2].minor.yy0.type); - toTSDBType(yymsp[0].minor.yy0.type); yymsp[-2].minor.yy0.n += yymsp[-1].minor.yy0.n; - yylhsminor.yy429 = setTableNameList(NULL, &yymsp[-2].minor.yy0, &yymsp[0].minor.yy0); + yylhsminor.yy70 = setTableNameList(NULL, &yymsp[-2].minor.yy0, &yymsp[0].minor.yy0); } - yymsp[-2].minor.yy429 = yylhsminor.yy429; + yymsp[-2].minor.yy70 = yylhsminor.yy70; break; case 175: /* tablelist ::= tablelist COMMA ids cpxName */ { - toTSDBType(yymsp[-1].minor.yy0.type); yymsp[-1].minor.yy0.n += yymsp[0].minor.yy0.n; - yylhsminor.yy429 = setTableNameList(yymsp[-3].minor.yy429, &yymsp[-1].minor.yy0, NULL); + yylhsminor.yy70 = setTableNameList(yymsp[-3].minor.yy70, &yymsp[-1].minor.yy0, NULL); } - yymsp[-3].minor.yy429 = yylhsminor.yy429; + yymsp[-3].minor.yy70 = yylhsminor.yy70; break; case 176: /* tablelist ::= tablelist COMMA ids cpxName ids */ { - toTSDBType(yymsp[-2].minor.yy0.type); - toTSDBType(yymsp[0].minor.yy0.type); yymsp[-2].minor.yy0.n += yymsp[-1].minor.yy0.n; - - yylhsminor.yy429 = setTableNameList(yymsp[-4].minor.yy429, &yymsp[-2].minor.yy0, &yymsp[0].minor.yy0); + yylhsminor.yy70 = setTableNameList(yymsp[-4].minor.yy70, &yymsp[-2].minor.yy0, &yymsp[0].minor.yy0); } - yymsp[-4].minor.yy429 = yylhsminor.yy429; + yymsp[-4].minor.yy70 = yylhsminor.yy70; break; case 177: /* tmvar ::= VARIABLE */ {yylhsminor.yy0 = yymsp[0].minor.yy0;}