From 53cf7201b8d2c5367f4517afc195bbd2429952fe Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 17 Jun 2020 11:51:01 +0800 Subject: [PATCH] [td-225] fix bugs in join query. --- src/client/inc/tscUtil.h | 10 +- src/client/inc/tschemautil.h | 14 +- src/client/inc/tsclient.h | 47 +- src/client/src/tscAsync.c | 1 - src/client/src/tscFunctionImpl.c | 3 +- src/client/src/tscParseInsert.c | 4 +- src/client/src/tscSQLParser.c | 153 ++---- src/client/src/tscSchemaUtil.c | 25 +- src/client/src/tscSecondaryMerge.c | 14 +- src/client/src/tscServer.c | 9 +- src/client/src/tscSub.c | 2 +- src/client/src/tscSubquery.c | 480 ++++++++++-------- src/client/src/tscUtil.c | 119 ++--- src/inc/taoserror.h | 1 + src/inc/taosmsg.h | 2 +- src/query/inc/qtsbuf.h | 4 +- src/query/src/qExecutor.c | 106 ++-- src/query/src/qtsbuf.c | 19 +- src/tsdb/src/tsdbRead.c | 9 +- src/util/inc/tutil.h | 19 +- tests/script/general/parser/join.sim | 3 +- .../script/general/parser/join_multivnode.sim | 1 + 22 files changed, 536 insertions(+), 509 deletions(-) diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index b3e05cb0ca..32b266cf43 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -122,15 +122,13 @@ bool tscNonOrderedProjectionQueryOnSTable(SQueryInfo *pQueryInfo, int32_t tableI bool tscOrderedProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex); bool tscIsProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex); -bool tscProjectionQueryOnTable(SQueryInfo* pQueryInfo); +bool tscIsProjectionQuery(SQueryInfo* pQueryInfo); bool tscIsTwoStageSTableQuery(SQueryInfo* pQueryInfo, int32_t tableIndex); -bool tscQueryOnSTable(SSqlCmd* pCmd); bool tscQueryTags(SQueryInfo* pQueryInfo); -bool tscIsSelectivityWithTagQuery(SSqlCmd* pCmd); void tscAddSpecialColumnForSelect(SQueryInfo* pQueryInfo, int32_t outputColIndex, int16_t functionId, SColumnIndex* pIndex, - SSchema* pColSchema, int16_t isTag); + SSchema* pColSchema, int16_t colType); int32_t tscSetTableFullName(STableMetaInfo* pTableMetaInfo, SSQLToken* pzTableName, SSqlObj* pSql); void tscClearInterpInfo(SQueryInfo* pQueryInfo); @@ -139,7 +137,7 @@ bool tscIsInsertData(char* sqlstr); /* use for keep current db info temporarily, for handle table with db prefix */ // todo remove it -void tscGetDBInfoFromMeterId(char* tableId, char* db); +void tscGetDBInfoFromTableFullName(char* tableId, char* db); int tscAllocPayload(SSqlCmd* pCmd, int size); @@ -253,7 +251,7 @@ void addGroupInfoForSubquery(SSqlObj* pParentObj, SSqlObj* pSql, int32_t sub void doAddGroupColumnForSubquery(SQueryInfo* pQueryInfo, int32_t tagIndex); -int16_t tscGetJoinTagColIndexByUid(STagCond* pTagCond, uint64_t uid); +int16_t tscGetJoinTagColIdByUid(STagCond* pTagCond, uint64_t uid); void tscPrintSelectClause(SSqlObj* pSql, int32_t subClauseIndex); diff --git a/src/client/inc/tschemautil.h b/src/client/inc/tschemautil.h index 2fc77d69bb..99ffa4e766 100644 --- a/src/client/inc/tschemautil.h +++ b/src/client/inc/tschemautil.h @@ -64,12 +64,20 @@ SSchema* tscGetTableSchema(const STableMeta* pTableMeta); SSchema *tscGetTableTagSchema(const STableMeta *pMeta); /** - * + * get the column schema according to the column index * @param pMeta - * @param startCol + * @param colIndex * @return */ -SSchema *tscGetTableColumnSchema(const STableMeta *pMeta, int32_t startCol); +SSchema *tscGetTableColumnSchema(const STableMeta *pMeta, int32_t colIndex); + +/** + * get the column schema according to the column id + * @param pTableMeta + * @param colId + * @return + */ +SSchema* tscGetTableColumnSchemaById(STableMeta* pTableMeta, int16_t colId); /** * check if the schema is valid or not, including following aspects: diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 24e0f48ec9..af4be8783d 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -85,7 +85,7 @@ typedef struct SSqlExpr { int16_t functionId; // function id in aAgg array int16_t resType; // return value type int16_t resBytes; // length of return value - int32_t interBytes; // inter result buffer size + int32_t interBytes; // inter result buffer size int16_t numOfParams; // argument value of each function tVariant param[3]; // parameters are not more than 3 int32_t offset; // sub result column value of arithmetic expression. @@ -123,7 +123,7 @@ typedef struct SCond { typedef struct SJoinNode { char tableId[TSDB_TABLE_ID_LEN]; uint64_t uid; - int16_t tagCol; + int16_t tagColId; } SJoinNode; typedef struct SJoinInfo { @@ -155,20 +155,19 @@ typedef struct SParamInfo { } SParamInfo; typedef struct STableDataBlocks { - char tableId[TSDB_TABLE_ID_LEN]; - int8_t tsSource; // where does the UNIX timestamp come from, server or client - bool ordered; // if current rows are ordered or not - int64_t vgId; // virtual group id - int64_t prevTS; // previous timestamp, recorded to decide if the records array is ts ascending - int32_t numOfTables; // number of tables in current submit block - - int32_t rowSize; // row size for current table + char tableId[TSDB_TABLE_ID_LEN]; + int8_t tsSource; // where does the UNIX timestamp come from, server or client + bool ordered; // if current rows are ordered or not + int64_t vgId; // virtual group id + int64_t prevTS; // previous timestamp, recorded to decide if the records array is ts ascending + int32_t numOfTables; // number of tables in current submit block + int32_t rowSize; // row size for current table uint32_t nAllocSize; - uint32_t headerSize; // header for metadata (submit metadata) + uint32_t headerSize; // header for table info (uid, tid, submit metadata) uint32_t size; /* - * the metermeta for current table, the metermeta will be used during submit stage, keep a ref + * the table meta of table, the table meta will be used during submit, keep a ref * to avoid it to be removed from cache */ STableMeta *pTableMeta; @@ -191,32 +190,28 @@ typedef struct SDataBlockList { // todo remove } SDataBlockList; typedef struct SQueryInfo { - int16_t command; // the command may be different for each subclause, so keep it seperately. - uint32_t type; // query/insert/import type + int16_t command; // the command may be different for each subclause, so keep it seperately. + uint32_t type; // query/insert/import type char slidingTimeUnit; - STimeWindow window; int64_t intervalTime; // aggregation time interval int64_t slidingTime; // sliding window in mseconds SSqlGroupbyExpr groupbyExpr; // group by tags info - - SArray * colList; // SArray + SArray * colList; // SArray SFieldInfo fieldsInfo; - SArray * exprList; // SArray + SArray * exprList; // SArray SLimitVal limit; SLimitVal slimit; STagCond tagCond; SOrderVal order; - int16_t fillType; // final result fill type + int16_t fillType; // final result fill type int16_t numOfTables; STableMetaInfo **pTableMetaInfo; struct STSBuf * tsBuf; - int64_t * fillVal; // default value for fill - char * msg; // pointer to the pCmd->payload to keep error message temporarily - int64_t clauseLimit; // limit for current sub clause - - // offset value in the original sql expression, NOT sent to virtual node, only applied at client side - int64_t prjOffset; + int64_t * fillVal; // default value for fill + char * msg; // pointer to the pCmd->payload to keep error message temporarily + int64_t clauseLimit; // limit for current sub clause + int64_t prjOffset; // offset value in the original sql expression, only applied at client side } SQueryInfo; typedef struct { @@ -431,7 +426,7 @@ extern int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo); typedef void (*__async_cb_func_t)(void *param, TAOS_RES *tres, int numOfRows); int32_t tscCompareTidTags(const void* p1, const void* p2); -void tscBuildVgroupTableInfo(STableMetaInfo* pTableMetaInfo, SArray* tables); +void tscBuildVgroupTableInfo(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, SArray* tables); #ifdef __cplusplus } diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index 855154cc4c..b1ec8e09de 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -214,7 +214,6 @@ void taos_fetch_rows_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, int), voi tscError("qhandle is NULL"); pRes->code = TSDB_CODE_TSC_INVALID_QHANDLE; tscQueueAsyncRes(pSql); -// tscQueueAsyncError(fp, param, TSDB_CODE_TSC_INVALID_QHANDLE); return; } diff --git a/src/client/src/tscFunctionImpl.c b/src/client/src/tscFunctionImpl.c index 457e187971..cf72aa2460 100644 --- a/src/client/src/tscFunctionImpl.c +++ b/src/client/src/tscFunctionImpl.c @@ -3903,7 +3903,7 @@ static bool ts_comp_function_setup(SQLFunctionCtx *pCtx) { SResultInfo *pResInfo = GET_RES_INFO(pCtx); STSCompInfo *pInfo = pResInfo->interResultBuf; - pInfo->pTSBuf = tsBufCreate(false); + pInfo->pTSBuf = tsBufCreate(false, pCtx->order); pInfo->pTSBuf->tsOrder = pCtx->order; return true; } @@ -3925,7 +3925,6 @@ static void ts_comp_function(SQLFunctionCtx *pCtx) { } SET_VAL(pCtx, pCtx->size, 1); - pResInfo->hasResult = DATA_SET_FLAG; } diff --git a/src/client/src/tscParseInsert.c b/src/client/src/tscParseInsert.c index c4749e4611..c3ba1dabad 100644 --- a/src/client/src/tscParseInsert.c +++ b/src/client/src/tscParseInsert.c @@ -1328,12 +1328,14 @@ int tsParseSql(SSqlObj *pSql, bool initialParse) { int32_t ret = TSDB_CODE_SUCCESS; if (initialParse) { + assert(!pSql->cmd.parseFinished); + char* p = pSql->sqlstr; pSql->sqlstr = NULL; tscPartiallyFreeSqlObj(pSql); pSql->sqlstr = p; - } else { + } else if (!pSql->cmd.parseFinished) { tscTrace("continue parse sql: %s", pSql->cmd.curSql); } diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index cdde651107..053cca43ae 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -98,8 +98,6 @@ static int32_t setKillInfo(SSqlObj* pSql, struct SSqlInfo* pInfo, int32_t killTy static bool validateOneTags(SSqlCmd* pCmd, TAOS_FIELD* pTagField); static bool hasTimestampForPointInterpQuery(SQueryInfo* pQueryInfo); -static void updateTagColumnIndex(SQueryInfo* pQueryInfo, int32_t tableIndex); - static int32_t parseLimitClause(SQueryInfo* pQueryInfo, int32_t index, SQuerySQL* pQuerySql, SSqlObj* pSql); static int32_t parseCreateDBOptions(SSqlCmd* pCmd, SCreateDBInfo* pCreateDbSql); static int32_t getColumnIndexByName(const SSQLToken* pToken, SQueryInfo* pQueryInfo, SColumnIndex* pIndex); @@ -640,17 +638,11 @@ int32_t parseIntervalClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql) { return TSDB_CODE_TSC_INVALID_SQL; } + SSchema s = {.bytes = TSDB_KEYSIZE, .type = TSDB_DATA_TYPE_TIMESTAMP, .colId = PRIMARYKEY_TIMESTAMP_COL_INDEX}; + tstrncpy(s.name, aAggs[TSDB_FUNC_TS].aName, sizeof(s.name)); + SColumnIndex index = {tableIndex, PRIMARYKEY_TIMESTAMP_COL_INDEX}; - SSqlExpr* pExpr = tscSqlExprInsert(pQueryInfo, 0, TSDB_FUNC_TS, &index, TSDB_DATA_TYPE_TIMESTAMP, TSDB_KEYSIZE, - TSDB_KEYSIZE, false); - - SColumnList ids = getColumnList(1, 0, PRIMARYKEY_TIMESTAMP_COL_INDEX); - - int32_t ret = - insertResultField(pQueryInfo, 0, &ids, TSDB_KEYSIZE, TSDB_DATA_TYPE_TIMESTAMP, aAggs[TSDB_FUNC_TS].aName, pExpr); - if (ret != TSDB_CODE_SUCCESS) { - return ret; - } + tscAddSpecialColumnForSelect(pQueryInfo, 0, TSDB_FUNC_TS, &index, &s, TSDB_COL_NORMAL); if (parseSlidingClause(pQueryInfo, pQuerySql) != TSDB_CODE_SUCCESS) { return TSDB_CODE_TSC_INVALID_SQL; @@ -1241,11 +1233,11 @@ int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSel return invalidSqlErrMsg(pQueryInfo->msg, msg2); } + /* + * transfer sql functions that need secondary merge into another format + * in dealing with metric queries such as: count/first/last + */ if (isSTable) { - /* - * transfer sql functions that need secondary merge into another format - * in dealing with metric queries such as: count/first/last - */ tscTansformSQLFuncForSTableQuery(pQueryInfo); if (hasUnsupportFunctionsForSTableQuery(pQueryInfo)) { @@ -1272,7 +1264,7 @@ int32_t insertResultField(SQueryInfo* pQueryInfo, int32_t outputIndex, SColumnLi } TAOS_FIELD f = tscCreateField(type, fieldName, bytes); - SFieldSupInfo* pInfo =tscFieldInfoInsert(&pQueryInfo->fieldsInfo, outputIndex, &f); + SFieldSupInfo* pInfo = tscFieldInfoInsert(&pQueryInfo->fieldsInfo, outputIndex, &f); pInfo->pSqlExpr = pSqlExpr; return TSDB_CODE_SUCCESS; @@ -1324,8 +1316,9 @@ static void addProjectQueryCol(SQueryInfo* pQueryInfo, int32_t startPos, SColumn void tscAddSpecialColumnForSelect(SQueryInfo* pQueryInfo, int32_t outputColIndex, int16_t functionId, SColumnIndex* pIndex, SSchema* pColSchema, int16_t flag) { - SSqlExpr* pExpr = tscSqlExprAppend(pQueryInfo, functionId, pIndex, pColSchema->type, + SSqlExpr* pExpr = tscSqlExprInsert(pQueryInfo, outputColIndex, functionId, pIndex, pColSchema->type, pColSchema->bytes, pColSchema->bytes, flag); + tstrncpy(pExpr->aliasName, pColSchema->name, sizeof(pExpr->aliasName)); SColumnList ids = getColumnList(1, pIndex->tableIndex, pIndex->columnIndex); if (TSDB_COL_IS_TAG(flag)) { @@ -1403,7 +1396,7 @@ int32_t addProjectionExprAndResultField(SQueryInfo* pQueryInfo, tSQLExprItem* pI if (index.columnIndex == TSDB_TBNAME_COLUMN_INDEX) { SSchema colSchema = tGetTableNameColumnSchema(); - tscAddSpecialColumnForSelect(pQueryInfo, startPos, TSDB_FUNC_TAGPRJ, &index, &colSchema, true); + tscAddSpecialColumnForSelect(pQueryInfo, startPos, TSDB_FUNC_TAGPRJ, &index, &colSchema, TSDB_COL_TAG); } else { STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex); STableMeta* pTableMeta = pTableMetaInfo->pTableMeta; @@ -2470,62 +2463,10 @@ static bool functionCompatibleCheck(SQueryInfo* pQueryInfo) { return true; } -void updateTagColumnIndex(SQueryInfo* pQueryInfo, int32_t tableIndex) { -// STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, tableIndex); -// -// // update tags column index for expression -// size_t size = tscSqlExprNumOfExprs(pQueryInfo); -// for (int32_t i = 0; i < size; ++i) { -// SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i); -// -// if (!TSDB_COL_IS_TAG(pExpr->colInfo.flag)) { // not tags, continue -// continue; -// } -// -// // not belongs to this table -// if (pExpr->uid != pTableMetaInfo->pTableMeta->uid) { -// continue; -// } - -// for (int32_t j = 0; j < pTableMetaInfo->numOfTags; ++j) { -// if (pExpr->colInfo.colIndex == pTableMetaInfo->tagColumnIndex[j]) { -// pExpr->colInfo.colIndex = j; -// break; -// } -// } -// } - - // update join condition tag column index -// SJoinInfo* pJoinInfo = &pQueryInfo->tagCond.joinInfo; -// if (!pJoinInfo->hasJoin) { // not join query -// return; -// } -// -// assert(pJoinInfo->left.uid != pJoinInfo->right.uid); -// -// // the join condition expression node belongs to this table(super table) -// assert(0); -// if (pTableMetaInfo->pTableMeta->uid == pJoinInfo->left.uid) { -// for (int32_t i = 0; i < pTableMetaInfo->numOfTags; ++i) { -// if (pJoinInfo->left.tagCol == pTableMetaInfo->tagColumnIndex[i]) { -// pJoinInfo->left.tagCol = i; -// } -// } -// } -// -// if (pTableMetaInfo->pTableMeta->uid == pJoinInfo->right.uid) { -// for (int32_t i = 0; i < pTableMetaInfo->numOfTags; ++i) { -// if (pJoinInfo->right.tagCol == pTableMetaInfo->tagColumnIndex[i]) { -// pJoinInfo->right.tagCol = i; -// } -// } -// } -} - int32_t parseGroupbyClause(SQueryInfo* pQueryInfo, tVariantList* pList, SSqlCmd* pCmd) { const char* msg1 = "too many columns in group by clause"; const char* msg2 = "invalid column name in group by clause"; - const char* msg3 = "group by columns must belong to one table"; +// const char* msg3 = "group by columns must belong to one table"; const char* msg7 = "not support group by expression"; const char* msg8 = "not allowed column type for group by"; const char* msg9 = "tags not allowed for table query"; @@ -2561,10 +2502,6 @@ int32_t parseGroupbyClause(SQueryInfo* pQueryInfo, tVariantList* pList, SSqlCmd* return invalidSqlErrMsg(pQueryInfo->msg, msg2); } - if (tableIndex != index.tableIndex && tableIndex >= 0) { - return invalidSqlErrMsg(pQueryInfo->msg, msg3); - } - tableIndex = index.tableIndex; pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex); @@ -2621,7 +2558,6 @@ int32_t parseGroupbyClause(SQueryInfo* pQueryInfo, tVariantList* pList, SSqlCmd* } pQueryInfo->groupbyExpr.tableIndex = tableIndex; - return TSDB_CODE_SUCCESS; } @@ -3051,14 +2987,17 @@ static int32_t getColumnQueryCondInfo(SQueryInfo* pQueryInfo, tSQLExpr* pExpr, i } static int32_t getJoinCondInfo(SQueryInfo* pQueryInfo, tSQLExpr* pExpr) { - const char* msg = "invalid join query condition"; + const char* msg1 = "invalid join query condition"; + const char* msg2 = "join on binary/nchar not supported"; + const char* msg3 = "type of join columns must be identical"; + const char* msg4 = "invalid column name in join condition"; if (pExpr == NULL) { return TSDB_CODE_SUCCESS; } if (!isExprDirectParentOfLeaftNode(pExpr)) { - return invalidSqlErrMsg(pQueryInfo->msg, msg); + return invalidSqlErrMsg(pQueryInfo->msg, msg1); } STagCond* pTagCond = &pQueryInfo->tagCond; @@ -3067,28 +3006,36 @@ static int32_t getJoinCondInfo(SQueryInfo* pQueryInfo, tSQLExpr* pExpr) { SColumnIndex index = COLUMN_INDEX_INITIALIZER; if (getColumnIndexByName(&pExpr->pLeft->colInfo, pQueryInfo, &index) != TSDB_CODE_SUCCESS) { - return TSDB_CODE_TSC_INVALID_SQL; + return invalidSqlErrMsg(pQueryInfo->msg, msg4); } STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex); - int16_t tagColIndex = index.columnIndex - tscGetNumOfColumns(pTableMetaInfo->pTableMeta); + SSchema* pTagSchema1 = tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, index.columnIndex); pLeft->uid = pTableMetaInfo->pTableMeta->uid; - pLeft->tagCol = tagColIndex; + pLeft->tagColId = pTagSchema1->colId; strcpy(pLeft->tableId, pTableMetaInfo->name); index = (SColumnIndex)COLUMN_INDEX_INITIALIZER; if (getColumnIndexByName(&pExpr->pRight->colInfo, pQueryInfo, &index) != TSDB_CODE_SUCCESS) { - return TSDB_CODE_TSC_INVALID_SQL; + return invalidSqlErrMsg(pQueryInfo->msg, msg4); } pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex); - tagColIndex = index.columnIndex - tscGetNumOfColumns(pTableMetaInfo->pTableMeta); + SSchema* pTagSchema2 = tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, index.columnIndex); pRight->uid = pTableMetaInfo->pTableMeta->uid; - pRight->tagCol = tagColIndex; + pRight->tagColId = pTagSchema2->colId; strcpy(pRight->tableId, pTableMetaInfo->name); + if (pTagSchema1->type != pTagSchema2->type) { + return invalidSqlErrMsg(pQueryInfo->msg, msg3); + } + + if (pTagSchema1->type == TSDB_DATA_TYPE_BINARY || pTagSchema1->type == TSDB_DATA_TYPE_NCHAR) { + return invalidSqlErrMsg(pQueryInfo->msg, msg2); + } + pTagCond->joinInfo.hasJoin = true; return TSDB_CODE_SUCCESS; } @@ -3816,6 +3763,10 @@ static int32_t getTagQueryCondExpr(SQueryInfo* pQueryInfo, SCondExpr* pCondExpr, for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) { tSQLExpr* p1 = extractExprForSTable(pExpr, pQueryInfo, i); + if (p1 == NULL) { // no query condition on this table + continue; + } + tExprNode* p = NULL; SArray* colList = taosArrayInit(10, sizeof(SColIndex)); @@ -4980,7 +4931,7 @@ void addGroupInfoForSubquery(SSqlObj* pParentObj, SSqlObj* pSql, int32_t subClau if (pExpr->functionId != TSDB_FUNC_TAG) { STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, tableIndex); - int16_t columnInfo = tscGetJoinTagColIndexByUid(&pQueryInfo->tagCond, pTableMetaInfo->pTableMeta->uid); + int16_t columnInfo = tscGetJoinTagColIdByUid(&pQueryInfo->tagCond, pTableMetaInfo->pTableMeta->uid); SColumnIndex index = {.tableIndex = 0, .columnIndex = columnInfo}; SSchema* pSchema = tscGetTableTagSchema(pTableMetaInfo->pTableMeta); @@ -5016,27 +4967,17 @@ static void doLimitOutputNormalColOfGroupby(SSqlExpr* pExpr) { void doAddGroupColumnForSubquery(SQueryInfo* pQueryInfo, int32_t tagIndex) { SColIndex* pColIndex = taosArrayGet(pQueryInfo->groupbyExpr.columnInfo, tagIndex); - int32_t index = pColIndex->colIndex; - + size_t size = tscSqlExprNumOfExprs(pQueryInfo); + STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - SSchema* pSchema = tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, index); - SColumnIndex colIndex = {.tableIndex = 0, .columnIndex = index}; - - size_t size = tscSqlExprNumOfExprs(pQueryInfo); - SSqlExpr* pExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_PRJ, &colIndex, pSchema->type, pSchema->bytes, - pSchema->bytes, false); + SSchema* pSchema = tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, pColIndex->colIndex); + SColumnIndex colIndex = {.tableIndex = 0, .columnIndex = pColIndex->colIndex}; - pExpr->colInfo.flag = TSDB_COL_NORMAL; - doLimitOutputNormalColOfGroupby(pExpr); + tscAddSpecialColumnForSelect(pQueryInfo, size, TSDB_FUNC_PRJ, &colIndex, pSchema, TSDB_COL_NORMAL); - // NOTE: tag column does not add to source column list - SColumnList list = {0}; - list.num = 1; - list.ids[0] = colIndex; - - insertResultField(pQueryInfo, size, &list, pSchema->bytes, pSchema->type, pSchema->name, pExpr); - SFieldSupInfo* pInfo = tscFieldInfoGetSupp(&pQueryInfo->fieldsInfo, size - 1); + SFieldSupInfo* pInfo = tscFieldInfoGetSupp(&pQueryInfo->fieldsInfo, size); + doLimitOutputNormalColOfGroupby(pInfo->pSqlExpr); pInfo->visible = false; } @@ -5248,6 +5189,7 @@ static int32_t doAddGroupbyColumnsOnDemand(SQueryInfo* pQueryInfo) { STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); + SSchema s = tGetTableNameColumnSchema(); SSchema* pSchema = tscGetTableSchema(pTableMetaInfo->pTableMeta); int16_t bytes = 0; int16_t type = 0; @@ -5258,7 +5200,6 @@ static int32_t doAddGroupbyColumnsOnDemand(SQueryInfo* pQueryInfo) { int16_t colIndex = pColIndex->colIndex; if (colIndex == TSDB_TBNAME_COLUMN_INDEX) { - SSchema s = tGetTableNameColumnSchema(); type = s.type; bytes = s.bytes; name = s.name; @@ -5955,10 +5896,6 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) { setColumnOffsetValueInResultset(pQueryInfo); - for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) { - updateTagColumnIndex(pQueryInfo, i); - } - /* * fill options are set at the end position, when all columns are set properly * the columns may be increased due to group by operation diff --git a/src/client/src/tscSchemaUtil.c b/src/client/src/tscSchemaUtil.c index 439aa7c1de..0dfbf8c487 100644 --- a/src/client/src/tscSchemaUtil.c +++ b/src/client/src/tscSchemaUtil.c @@ -64,14 +64,6 @@ SSchema* tscGetTableTagSchema(const STableMeta* pTableMeta) { STableComInfo tscGetTableInfo(const STableMeta* pTableMeta) { assert(pTableMeta != NULL); - -#if 0 - if (pTableMeta->tableType == TSDB_CHILD_TABLE) { - assert (pTableMeta->pSTable != NULL); - return pTableMeta->pSTable->tableInfo; - } -#endif - return pTableMeta->tableInfo; } @@ -119,11 +111,24 @@ bool isValidSchema(struct SSchema* pSchema, int32_t numOfCols) { return (rowLen <= TSDB_MAX_BYTES_PER_ROW); } -SSchema* tscGetTableColumnSchema(const STableMeta* pTableMeta, int32_t startCol) { +SSchema* tscGetTableColumnSchema(const STableMeta* pTableMeta, int32_t colIndex) { assert(pTableMeta != NULL); SSchema* pSchema = (SSchema*) pTableMeta->schema; - return &pSchema[startCol]; + return &pSchema[colIndex]; +} + +// TODO for large number of columns, employ the binary search method +SSchema* tscGetTableColumnSchemaById(STableMeta* pTableMeta, int16_t colId) { + STableComInfo tinfo = tscGetTableInfo(pTableMeta); + + for(int32_t i = 0; i < tinfo.numOfColumns + tinfo.numOfTags; ++i) { + if (pTableMeta->schema[i].colId == colId) { + return &pTableMeta->schema[i]; + } + } + + return NULL; } struct SSchema tscGetTbnameColumnSchema() { diff --git a/src/client/src/tscSecondaryMerge.c b/src/client/src/tscSecondaryMerge.c index 52a06277e3..8c3345f112 100644 --- a/src/client/src/tscSecondaryMerge.c +++ b/src/client/src/tscSecondaryMerge.c @@ -1092,14 +1092,6 @@ static int64_t getNumOfResultLocal(SQueryInfo *pQueryInfo, SQLFunctionCtx *pCtx) size_t size = tscSqlExprNumOfExprs(pQueryInfo); for (int32_t j = 0; j < size; ++j) { - // SSqlExpr* pExpr = pQueryInfo->fieldsInfo.pSqlExpr[j]; - // if (pExpr == NULL) { - // assert(pQueryInfo->fieldsInfo.pExpr[j] != NULL); - // - // maxOutput = 1; - // continue; - // } - /* * ts, tag, tagprj function can not decide the output number of current query * the number of output result is decided by main output @@ -1109,8 +1101,9 @@ static int64_t getNumOfResultLocal(SQueryInfo *pQueryInfo, SQLFunctionCtx *pCtx) continue; } - if (maxOutput < GET_RES_INFO(&pCtx[j])->numOfRes) { - maxOutput = GET_RES_INFO(&pCtx[j])->numOfRes; + SResultInfo* pResInfo = GET_RES_INFO(&pCtx[j]); + if (maxOutput < pResInfo->numOfRes) { + maxOutput = pResInfo->numOfRes; } } @@ -1260,7 +1253,6 @@ bool doGenerateFinalResults(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool no #ifdef _DEBUG_VIEW printf("final result before interpo:\n"); - assert(0); // tColModelDisplay(pLocalReducer->resColModel, pLocalReducer->pBufForInterpo, pResBuf->num, pResBuf->num); #endif diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 1150de1e7f..3a77aaffb3 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -644,7 +644,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pQueryMsg->numOfGroupCols = htons(pQueryInfo->groupbyExpr.numOfGroupCols); pQueryMsg->numOfTags = htonl(numOfTags); pQueryMsg->tagNameRelType = htons(pQueryInfo->tagCond.relType); - pQueryMsg->queryType = htons(pQueryInfo->type); + pQueryMsg->queryType = htonl(pQueryInfo->type); size_t numOfOutput = tscSqlExprNumOfExprs(pQueryInfo); pQueryMsg->numOfOutput = htons(numOfOutput); @@ -723,6 +723,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pMsg += sizeof(SSqlFuncMsg); for (int32_t j = 0; j < pExpr->numOfParams; ++j) { + // todo add log pSqlFuncExpr->arg[j].argType = htons((uint16_t)pExpr->param[j].nType); pSqlFuncExpr->arg[j].argBytes = htons(pExpr->param[j].nLen); @@ -1175,7 +1176,7 @@ int tscBuildCreateTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) { strcpy(pCreateTableMsg->tableId, pTableMetaInfo->name); // use dbinfo from table id without modifying current db info - tscGetDBInfoFromMeterId(pTableMetaInfo->name, pCreateTableMsg->db); + tscGetDBInfoFromTableFullName(pTableMetaInfo->name, pCreateTableMsg->db); SCreateTableSQL *pCreateTable = pInfo->pCreateTableInfo; @@ -1252,7 +1253,7 @@ int tscBuildAlterTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) { } SCMAlterTableMsg *pAlterTableMsg = (SCMAlterTableMsg *)pCmd->payload; - tscGetDBInfoFromMeterId(pTableMetaInfo->name, pAlterTableMsg->db); + tscGetDBInfoFromTableFullName(pTableMetaInfo->name, pAlterTableMsg->db); strcpy(pAlterTableMsg->tableId, pTableMetaInfo->name); pAlterTableMsg->type = htons(pAlterInfo->type); @@ -1577,7 +1578,7 @@ int tscBuildSTableVgroupMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pMsg = pStart; SMgmtHead *pMgmt = (SMgmtHead *)pMsg; - tscGetDBInfoFromMeterId(pTableMetaInfo->name, pMgmt->db); + tscGetDBInfoFromTableFullName(pTableMetaInfo->name, pMgmt->db); pMsg += sizeof(SMgmtHead); diff --git a/src/client/src/tscSub.c b/src/client/src/tscSub.c index 1a1305825b..7e3aaf7fc5 100644 --- a/src/client/src/tscSub.c +++ b/src/client/src/tscSub.c @@ -238,7 +238,7 @@ static int tscUpdateSubscription(STscObj* pObj, SSub* pSub) { if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { taosArraySort( tables, tscCompareTidTags ); - tscBuildVgroupTableInfo( pTableMetaInfo, tables ); + tscBuildVgroupTableInfo(pSql, pTableMetaInfo, tables); } taosArrayDestroy(tables); diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index b0403bdf2a..2ec02bb425 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -28,7 +28,7 @@ typedef struct SInsertSupporter { } SInsertSupporter; static void freeJoinSubqueryObj(SSqlObj* pSql); -static bool tscHashRemainDataInSubqueryResultSet(SSqlObj *pSql); +static bool tscHasRemainDataInSubqueryResultSet(SSqlObj *pSql); static bool tsCompare(int32_t order, int64_t left, int64_t right) { if (order == TSDB_ORDER_ASC) { @@ -38,16 +38,15 @@ static bool tsCompare(int32_t order, int64_t left, int64_t right) { } } -static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSupporter* pSupporter1, - SJoinSupporter* pSupporter2, TSKEY* st, TSKEY* et) { - STSBuf* output1 = tsBufCreate(true); - STSBuf* output2 = tsBufCreate(true); - - *st = INT64_MAX; - *et = INT64_MIN; - +static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSupporter* pSupporter1, SJoinSupporter* pSupporter2, STimeWindow * win) { SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex); - + + STSBuf* output1 = tsBufCreate(true, pQueryInfo->order.order); + STSBuf* output2 = tsBufCreate(true, pQueryInfo->order.order); + + win->skey = INT64_MAX; + win->ekey = INT64_MIN; + SLimitVal* pLimit = &pQueryInfo->limit; int32_t order = pQueryInfo->order.order; @@ -106,12 +105,12 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSupporter* pSupporter1, * final results which is acquired after the secondry merge of in the client. */ if (pLimit->offset == 0 || pQueryInfo->intervalTime > 0 || QUERY_IS_STABLE_QUERY(pQueryInfo->type)) { - if (*st > elem1.ts) { - *st = elem1.ts; + if (win->skey > elem1.ts) { + win->skey = elem1.ts; } - if (*et < elem1.ts) { - *et = elem1.ts; + if (win->ekey < elem1.ts) { + win->ekey = elem1.ts; } tsBufAppend(output1, elem1.vnode, elem1.tag, (const char*)&elem1.ts, sizeof(elem1.ts)); @@ -151,8 +150,8 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSupporter* pSupporter1, tsBufDestory(pSupporter2->pTSBuf); tscTrace("%p input1:%" PRId64 ", input2:%" PRId64 ", final:%" PRId64 " for secondary query after ts blocks " - "intersecting, skey:%" PRId64 ", ekey:%" PRId64, pSql, - numOfInput1, numOfInput2, output1->numOfTotal, *st, *et); + "intersecting, skey:%" PRId64 ", ekey:%" PRId64, pSql, numOfInput1, numOfInput2, output1->numOfTotal, + win->skey, win->ekey); return output1->numOfTotal; } @@ -252,7 +251,7 @@ int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql) { assert(numOfSub > 0); // scan all subquery, if one sub query has only ts, ignore it - tscTrace("%p start to launch secondary subqueries, total:%d, only:%d needs to query, others are not retrieve in " + tscTrace("%p start to launch secondary subquery, total:%d, only:%d needs to query, others are not retrieve in " "select clause", pSql, pSql->numOfSubs, numOfSub); //the subqueries that do not actually launch the secondary query to virtual node is set as completed. @@ -301,7 +300,7 @@ int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql) { // set the second stage sub query for join process TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE); - + pQueryInfo->intervalTime = pSupporter->interval; pQueryInfo->groupbyExpr = pSupporter->groupbyExpr; @@ -328,33 +327,38 @@ int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql) { */ pSupporter->limit = pQueryInfo->limit; pNewQueryInfo->limit = pSupporter->limit; - - // fetch the join tag column - if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { - SSqlExpr* pExpr = tscSqlExprGet(pNewQueryInfo, 0); - assert(pQueryInfo->tagCond.joinInfo.hasJoin); - - int16_t tagColIndex = tscGetJoinTagColIndexByUid(&pQueryInfo->tagCond, pTableMetaInfo->pTableMeta->uid); - pExpr->param[0].i64Key = tagColIndex; - pExpr->numOfParams = 1; - } SColumnIndex index = {.tableIndex = 0, .columnIndex = PRIMARYKEY_TIMESTAMP_COL_INDEX}; SSchema* s = tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, 0); SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, 0); + int16_t funcId = pExpr->functionId; + if ((pExpr->colInfo.colId != PRIMARYKEY_TIMESTAMP_COL_INDEX) || - (pExpr->functionId != TSDB_FUNC_TS || pExpr->functionId != TSDB_FUNC_TS_DUMMY)) { - tscAddSpecialColumnForSelect(pQueryInfo, 0, TSDB_FUNC_PRJ, &index, s, 0); + (funcId != TSDB_FUNC_TS && funcId != TSDB_FUNC_TS_DUMMY && funcId != TSDB_FUNC_PRJ)) { + + int16_t functionId = tscIsProjectionQuery(pQueryInfo)? TSDB_FUNC_PRJ : TSDB_FUNC_TS; + + tscAddSpecialColumnForSelect(pQueryInfo, 0, functionId, &index, s, TSDB_COL_NORMAL); tscPrintSelectClause(pNew, 0); tscFieldInfoUpdateOffset(pNewQueryInfo); + + pExpr = tscSqlExprGet(pQueryInfo, 0); + } + + // set the join condition tag column info, to do extract method + if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { + assert(pQueryInfo->tagCond.joinInfo.hasJoin); + int16_t colId = tscGetJoinTagColIdByUid(&pQueryInfo->tagCond, pTableMetaInfo->pTableMeta->uid); + + pExpr->param[0].i64Key = colId; + pExpr->numOfParams = 1; } size_t numOfCols = taosArrayGetSize(pNewQueryInfo->colList); tscTrace("%p subquery:%p tableIndex:%d, vgroupIndex:%d, type:%d, exprInfo:%d, colList:%d, fieldsInfo:%d, name:%s", - pSql, pNew, 0, pTableMetaInfo->vgroupIndex, pNewQueryInfo->type, - taosArrayGetSize(pNewQueryInfo->exprList), numOfCols, - pNewQueryInfo->fieldsInfo.numOfOutput, pNewQueryInfo->pTableMetaInfo[0]->name); + pSql, pNew, 0, pTableMetaInfo->vgroupIndex, pNewQueryInfo->type, taosArrayGetSize(pNewQueryInfo->exprList), + numOfCols, pNewQueryInfo->fieldsInfo.numOfOutput, pTableMetaInfo->name); } //prepare the subqueries object failed, abort @@ -368,12 +372,10 @@ int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql) { } for(int32_t i = 0; i < pSql->numOfSubs; ++i) { - SSqlObj* pSub = pSql->pSubs[i]; - if (pSub == NULL) { + if (pSql->pSubs[i] == NULL) { continue; } - - tscProcessSql(pSub); + tscDoQuery(pSql->pSubs[i]); } return TSDB_CODE_SUCCESS; @@ -414,11 +416,9 @@ static void quitAllSubquery(SSqlObj* pSqlObj, SJoinSupporter* pSupporter) { } // update the query time range according to the join results on timestamp -static void updateQueryTimeRange(SQueryInfo* pQueryInfo, int64_t st, int64_t et) { - assert(pQueryInfo->window.skey <= st && pQueryInfo->window.ekey >= et); - - pQueryInfo->window.skey = st; - pQueryInfo->window.ekey = et; +static void updateQueryTimeRange(SQueryInfo* pQueryInfo, STimeWindow* win) { + assert(pQueryInfo->window.skey <= win->skey && pQueryInfo->window.ekey >= win->ekey); + pQueryInfo->window = *win; } static void tSIntersectionAndLaunchSecQuery(SJoinSupporter* pSupporter, SSqlObj* pSql) { @@ -462,13 +462,13 @@ static void tSIntersectionAndLaunchSecQuery(SJoinSupporter* pSupporter, SSqlObj* SJoinSupporter* p1 = pParentSql->pSubs[0]->param; SJoinSupporter* p2 = pParentSql->pSubs[1]->param; - TSKEY st, et; - int64_t num = doTSBlockIntersect(pParentSql, p1, p2, &st, &et); + STimeWindow win = TSWINDOW_INITIALIZER; + int64_t num = doTSBlockIntersect(pParentSql, p1, p2, &win); if (num <= 0) { // no result during ts intersect tscTrace("%p free all sub SqlObj and quit", pParentSql); freeJoinSubqueryObj(pParentSql); } else { - updateQueryTimeRange(pParentQueryInfo, st, et); + updateQueryTimeRange(pParentQueryInfo, &win); tscLaunchSecondPhaseSubqueries(pParentSql); } } @@ -487,7 +487,7 @@ int32_t tscCompareTidTags(const void* p1, const void* p2) { return 0; } -void tscBuildVgroupTableInfo(STableMetaInfo* pTableMetaInfo, SArray* tables) { +void tscBuildVgroupTableInfo(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, SArray* tables) { SArray* result = taosArrayInit(4, sizeof(SVgroupTableInfo)); SArray* vgTables = NULL; STidTags* prev = NULL; @@ -513,12 +513,14 @@ void tscBuildVgroupTableInfo(STableMetaInfo* pTableMetaInfo, SArray* tables) { taosArrayPush(result, &info); } + tscTrace("%p tid:%d, uid:%"PRIu64",vgId:%d added for vnode query", pSql, tt->tid, tt->uid, tt->vgId) STableIdInfo item = {.uid = tt->uid, .tid = tt->tid, .key = INT64_MIN}; taosArrayPush(vgTables, &item); prev = tt; } pTableMetaInfo->pVgroupTables = result; + pTableMetaInfo->vgroupIndex = 0; } static void issueTSCompQuery(SSqlObj* pSql, SJoinSupporter* pSupporter, SSqlObj* pParent) { @@ -544,9 +546,8 @@ static void issueTSCompQuery(SSqlObj* pSql, SJoinSupporter* pSupporter, SSqlObj* // set the tags value for ts_comp function SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, 0); - int16_t tagColIndex = tscGetJoinTagColIndexByUid(&pSupporter->tagCond, pTableMetaInfo->pTableMeta->uid); - - pExpr->param->i64Key = tagColIndex; + int16_t tagColId = tscGetJoinTagColIdByUid(&pSupporter->tagCond, pTableMetaInfo->pTableMeta->uid); + pExpr->param->i64Key = tagColId; pExpr->numOfParams = 1; // add the filter tag column @@ -566,131 +567,198 @@ static void issueTSCompQuery(SSqlObj* pSql, SJoinSupporter* pSupporter, SSqlObj* size_t numOfCols = taosArrayGetSize(pQueryInfo->colList); tscTrace( - "%p subquery:%p tableIndex:%d, vgroupIndex:%d, type:%d, transfer to ts_comp query to retrieve timestamps, " - "exprInfo:%d, colList:%d, fieldsInfo:%d, name:%s", - pParent, pSql, 0, pTableMetaInfo->vgroupIndex, pQueryInfo->type, tscSqlExprNumOfExprs(pQueryInfo), - numOfCols, pQueryInfo->fieldsInfo.numOfOutput, pTableMetaInfo->name); + "%p subquery:%p tableIndex:%d, vgroupIndex:%d, numOfVgroups:%d, type:%d, ts_comp query to retrieve timestamps, " + "numOfExpr:%d, colList:%d, numOfOutputFields:%d, name:%s", + pParent, pSql, 0, pTableMetaInfo->vgroupIndex, pTableMetaInfo->vgroupList->numOfVgroups, pQueryInfo->type, + tscSqlExprNumOfExprs(pQueryInfo), numOfCols, pQueryInfo->fieldsInfo.numOfOutput, pTableMetaInfo->name); tscProcessSql(pSql); } +static bool checkForIdenticalTagVal(SQueryInfo* pQueryInfo, SJoinSupporter* p1, void* pSql) { + STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); + + SSchema* pSchema = tscGetTableTagSchema(pTableMetaInfo->pTableMeta);// todo: tags mismatch, tags not completed + SColumn *pCol = taosArrayGetP(pTableMetaInfo->tagColList, 0); + SSchema *pColSchema = &pSchema[pCol->colIndex.columnIndex]; + + for(int32_t i = 1; i < p1->num; ++i) { + STidTags* prev = (STidTags*) varDataVal(p1->pIdTagList + (i - 1) * p1->tagSize); + STidTags* p = (STidTags*) varDataVal(p1->pIdTagList + i * p1->tagSize); + + if (doCompare(prev->tag, p->tag, pColSchema->type, pColSchema->bytes) == 0) { + tscError("%p join tags have same value for different table, free all sub SqlObj and quit", pSql); + p1->pState->code = TSDB_CODE_QRY_DUP_JOIN_KEY; + return false; + } + } + + return true; +} + +static void getIntersectionOfTagVal(SQueryInfo* pQueryInfo, SSqlObj* pParentSql, SArray** s1, SArray** s2) { + tscTrace("%p all subqueries retrieve tags complete, do tags match", pParentSql); + + SJoinSupporter* p1 = pParentSql->pSubs[0]->param; + SJoinSupporter* p2 = pParentSql->pSubs[1]->param; + + qsort(p1->pIdTagList, p1->num, p1->tagSize, tscCompareTidTags); + qsort(p2->pIdTagList, p2->num, p2->tagSize, tscCompareTidTags); + + STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); + int16_t tagColId = tscGetJoinTagColIdByUid(&pQueryInfo->tagCond, pTableMetaInfo->pTableMeta->uid); + + SSchema* pColSchema = tscGetTableColumnSchemaById(pTableMetaInfo->pTableMeta, tagColId); + + *s1 = taosArrayInit(p1->num, p1->tagSize); + *s2 = taosArrayInit(p2->num, p2->tagSize); + + if (!(checkForIdenticalTagVal(pQueryInfo, p1, pParentSql) && checkForIdenticalTagVal(pQueryInfo, p2, pParentSql))) { + freeJoinSubqueryObj(pParentSql); + pParentSql->res.code = TSDB_CODE_QRY_DUP_JOIN_KEY; + tscQueueAsyncRes(pParentSql); + return; + } + + int32_t i = 0, j = 0; + while(i < p1->num && j < p2->num) { + STidTags* pp1 = (STidTags*) varDataVal(p1->pIdTagList + i * p1->tagSize); + STidTags* pp2 = (STidTags*) varDataVal(p2->pIdTagList + j * p2->tagSize); + + int32_t ret = doCompare(pp1->tag, pp2->tag, pColSchema->type, pColSchema->bytes); + if (ret == 0) { + tscTrace("%p tag matched, vgId:%d, val:%d, tid:%d, uid:%"PRIu64", tid:%d, uid:%"PRIu64, pParentSql, pp1->vgId, + *(int*) pp1->tag, pp1->tid, pp1->uid, pp2->tid, pp2->uid); + + taosArrayPush(*s1, pp1); + taosArrayPush(*s2, pp2); + j++; + i++; + } else if (ret > 0) { + j++; + } else { + i++; + } + } +} + static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) { SJoinSupporter* pSupporter = (SJoinSupporter*)param; - + SSqlObj* pParentSql = pSupporter->pObj; + SSqlObj* pSql = (SSqlObj*)tres; SSqlCmd* pCmd = &pSql->cmd; - - SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); - + SSqlRes* pRes = &pSql->res; + + SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); + // response of tag retrieve if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_TAG_FILTER_QUERY)) { - //todo handle error + // todo handle error - if (numOfRows == 0 || pSql->res.completed) { - + if (numOfRows == 0 || pRes->completed) { if (numOfRows > 0) { - size_t length = pSupporter->totalLen + pSql->res.rspLen; - char* tmp = realloc(pSupporter->pIdTagList, length); + size_t validLen = pSupporter->tagSize * pRes->numOfRows; + + size_t length = pSupporter->totalLen + validLen; + char* tmp = realloc(pSupporter->pIdTagList, length); assert(tmp != NULL); pSupporter->pIdTagList = tmp; - - memcpy(pSupporter->pIdTagList, pSql->res.data, pSql->res.rspLen); - pSupporter->totalLen += pSql->res.rspLen; - pSupporter->num += pSql->res.numOfRows; + + memcpy(pSupporter->pIdTagList + pSupporter->totalLen,pRes->data, validLen); + pSupporter->totalLen += validLen; + pSupporter->num += pRes->numOfRows; } - + + // tuples have been retrieved to client, try tuples from the next vnode + if (hasMoreVnodesToTry(pSql)) { + STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); + + int32_t totalVgroups = pTableMetaInfo->vgroupList->numOfVgroups; + pTableMetaInfo->vgroupIndex += 1; + assert(pTableMetaInfo->vgroupIndex < totalVgroups); + + tscTrace("%p tid_tag from vgroup index:%d completed, try next vgroup:%d. total vgroups:%d. current numOfRes:%d", + pSql, pTableMetaInfo->vgroupIndex - 1, pTableMetaInfo->vgroupIndex, totalVgroups, + pSupporter->num); + + pCmd->command = TSDB_SQL_SELECT; + tscResetForNextRetrieve(&pSql->res); + + // set the callback function + pSql->fp = tscJoinQueryCallback; + tscProcessSql(pSql); + return; + } + int32_t numOfTotal = pSupporter->pState->numOfTotal; int32_t finished = atomic_add_fetch_32(&pSupporter->pState->numOfCompleted, 1); - if (finished < numOfTotal) { return; } - - // all subqueries are returned, start to compare the tags + + // all subquery are returned, start to compare the tags assert(finished == numOfTotal); - tscTrace("%p all subqueries retrieve tags complete, do tags match", pParentSql); - - SJoinSupporter* p1 = pParentSql->pSubs[0]->param; - SJoinSupporter* p2 = pParentSql->pSubs[1]->param; - - qsort(p1->pIdTagList, p1->num, p1->tagSize, tscCompareTidTags); - qsort(p2->pIdTagList, p2->num, p2->tagSize, tscCompareTidTags); - - STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - - SSchema* pSchema = tscGetTableTagSchema(pTableMetaInfo->pTableMeta);// todo: tags mismatch, tags not completed - - SColumn *pCol = taosArrayGetP(pTableMetaInfo->tagColList, 0); - SSchema *pColSchema = &pSchema[pCol->colIndex.columnIndex]; - - SArray* s1 = taosArrayInit(p1->num, p1->tagSize); - SArray* s2 = taosArrayInit(p2->num, p2->tagSize); - - int32_t i = 0, j = 0; - while(i < p1->num && j < p2->num) { - STidTags* pp1 = (STidTags*) varDataVal(p1->pIdTagList + i * p1->tagSize); - STidTags* pp2 = (STidTags*) varDataVal(p2->pIdTagList + j * p2->tagSize); - - int32_t ret = doCompare(pp1->tag, pp2->tag, pColSchema->type, pColSchema->bytes); - if (ret == 0) { - taosArrayPush(s1, pp1); - taosArrayPush(s2, pp2); - j++; - i++; - } else if (ret > 0) { - j++; - } else { - i++; - } - } - - if (taosArrayGetSize(s1) == 0 || taosArrayGetSize(s2) == 0) {// no results,return. + + SArray *s1 = NULL, *s2 = NULL; + getIntersectionOfTagVal(pQueryInfo, pParentSql, &s1, &s2); + + if (taosArrayGetSize(s1) == 0 || taosArrayGetSize(s2) == 0) { // no results,return. tscTrace("%p free all sub SqlObj and quit", pParentSql); freeJoinSubqueryObj(pParentSql); return; } else { SSqlCmd* pSubCmd1 = &pParentSql->pSubs[0]->cmd; SSqlCmd* pSubCmd2 = &pParentSql->pSubs[1]->cmd; - - SQueryInfo* pQueryInfo1 = tscGetQueryInfoDetail(pSubCmd1, 0); + + SQueryInfo* pQueryInfo1 = tscGetQueryInfoDetail(pSubCmd1, 0); STableMetaInfo* pTableMetaInfo1 = tscGetMetaInfo(pQueryInfo1, 0); - tscBuildVgroupTableInfo(pTableMetaInfo1, s1); - - SQueryInfo* pQueryInfo2 = tscGetQueryInfoDetail(pSubCmd2, 0); + tscBuildVgroupTableInfo(pParentSql, pTableMetaInfo1, s1); + + SQueryInfo* pQueryInfo2 = tscGetQueryInfoDetail(pSubCmd2, 0); STableMetaInfo* pTableMetaInfo2 = tscGetMetaInfo(pQueryInfo2, 0); - tscBuildVgroupTableInfo(pTableMetaInfo2, s2); - + tscBuildVgroupTableInfo(pParentSql, pTableMetaInfo2, s2); + pSupporter->pState->numOfCompleted = 0; pSupporter->pState->code = 0; pSupporter->pState->numOfTotal = 2; - - for(int32_t m = 0; m < pParentSql->numOfSubs; ++m) { + + for (int32_t m = 0; m < pParentSql->numOfSubs; ++m) { SSqlObj* psub = pParentSql->pSubs[m]; issueTSCompQuery(psub, psub->param, pParentSql); } } - + } else { - size_t length = pSupporter->totalLen + pSql->res.rspLen; + if (numOfRows < 0) { // error + pSupporter->pState->code = numOfRows; + quitAllSubquery(pParentSql, pSupporter); + + pParentSql->res.code = numOfRows; + tscQueueAsyncRes(pParentSql); + return; + } + + size_t length = pSupporter->totalLen + pRes->rspLen; assert(length > 0); char* tmp = realloc(pSupporter->pIdTagList, length); assert(tmp != NULL); - + pSupporter->pIdTagList = tmp; - - memcpy(pSupporter->pIdTagList, pSql->res.data, pSql->res.rspLen); - pSupporter->totalLen += pSql->res.rspLen; - pSupporter->num += pSql->res.numOfRows; - + + memcpy(pSupporter->pIdTagList, pRes->data, pRes->rspLen); + pSupporter->totalLen += pRes->rspLen; + pSupporter->num += pRes->numOfRows; + // continue retrieve data from vnode taos_fetch_rows_a(tres, joinRetrieveCallback, param); } - + return; } - + if (!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE)) { if (numOfRows < 0) { tscError("%p sub query failed, code:%s, index:%d", pSql, tstrerror(numOfRows), pSupporter->subqueryIndex); @@ -699,42 +767,64 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) { return; } - if (numOfRows == 0) { - tSIntersectionAndLaunchSecQuery(pSupporter, pSql); - return; + if (numOfRows > 0) { // write the compressed timestamp to disk file + fwrite(pRes->data, pRes->numOfRows, 1, pSupporter->f); + fclose(pSupporter->f); + pSupporter->f = NULL; + + STSBuf* pBuf = tsBufCreateFromFile(pSupporter->path, true); + if (pBuf == NULL) { // in error process, close the fd + tscError("%p invalid ts comp file from vnode, abort subquery, file size:%d", pSql, numOfRows); + + pSupporter->pState->code = TSDB_CODE_TSC_APP_ERROR; // todo set the informative code + quitAllSubquery(pParentSql, pSupporter); + return; + } + + if (pSupporter->pTSBuf == NULL) { + tscTrace("%p create tmp file for ts block:%s, size:%d bytes", pSql, pBuf->path, numOfRows); + pSupporter->pTSBuf = pBuf; + } else { + assert(pQueryInfo->numOfTables == 1); // for subquery, only one + STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); + + tsBufMerge(pSupporter->pTSBuf, pBuf, pTableMetaInfo->vgroupIndex); + tsBufDestory(pBuf); + } } - // write the compressed timestamp to disk file - fwrite(pSql->res.data, pSql->res.numOfRows, 1, pSupporter->f); - fclose(pSupporter->f); - pSupporter->f = NULL; - - STSBuf* pBuf = tsBufCreateFromFile(pSupporter->path, true); - if (pBuf == NULL) { - tscError("%p invalid ts comp file from vnode, abort subquery, file size:%d", pSql, numOfRows); + if (pRes->completed) { + if (hasMoreVnodesToTry(pSql)) { + STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - pSupporter->pState->code = TSDB_CODE_TSC_APP_ERROR; // todo set the informative code - quitAllSubquery(pParentSql, pSupporter); - return; - } + int32_t totalVgroups = pTableMetaInfo->vgroupList->numOfVgroups; + pTableMetaInfo->vgroupIndex += 1; + assert(pTableMetaInfo->vgroupIndex < totalVgroups); - if (pSupporter->pTSBuf == NULL) { - tscTrace("%p create tmp file for ts block:%s, size:%d bytes", pSql, pBuf->path, numOfRows); - pSupporter->pTSBuf = pBuf; - } else { - assert(pQueryInfo->numOfTables == 1); // for subquery, only one - STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); + tscTrace("%p results from vgroup index:%d completed, try next vgroup:%d. total vgroups:%d. current numOfRes:%d", + pSql, pTableMetaInfo->vgroupIndex - 1, pTableMetaInfo->vgroupIndex, totalVgroups, + pRes->numOfClauseTotal); - tsBufMerge(pSupporter->pTSBuf, pBuf, pTableMetaInfo->vgroupIndex); - tsBufDestory(pBuf); - } - - if (pSql->res.completed) { - tSIntersectionAndLaunchSecQuery(pSupporter, pSql); - } else { // open a new file to save the incoming result + pCmd->command = TSDB_SQL_SELECT; + tscResetForNextRetrieve(&pSql->res); + + assert(pSupporter->f == NULL); + getTmpfilePath("ts-join", pSupporter->path); + pSupporter->f = fopen(pSupporter->path, "w"); + pRes->row = pRes->numOfRows; + + // set the callback function + pSql->fp = tscJoinQueryCallback; + tscProcessSql(pSql); + return; + } else { + tSIntersectionAndLaunchSecQuery(pSupporter, pSql); + } + + } else { // open a new file to save the incoming result getTmpfilePath("ts-join", pSupporter->path); pSupporter->f = fopen(pSupporter->path, "w"); - pSql->res.row = pSql->res.numOfRows; + pRes->row = pRes->numOfRows; taos_fetch_rows_a(tres, joinRetrieveCallback, param); } @@ -745,29 +835,29 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) { } if (numOfRows >= 0) { - pSql->res.numOfTotal += pSql->res.numOfRows; + pRes->numOfTotal += pRes->numOfRows; } - + if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) && numOfRows == 0) { STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); assert(pQueryInfo->numOfTables == 1); - - // for projection query, need to try next vnode if current vnode is exhausted + + // for projection query, need to try next vnode if current vnode is exhausted if ((++pTableMetaInfo->vgroupIndex) < pTableMetaInfo->vgroupList->numOfVgroups) { pSupporter->pState->numOfCompleted = 0; pSupporter->pState->numOfTotal = 1; - + pSql->cmd.command = TSDB_SQL_SELECT; pSql->fp = tscJoinQueryCallback; tscProcessSql(pSql); - + return; } } - + int32_t numOfTotal = pSupporter->pState->numOfTotal; int32_t finished = atomic_add_fetch_32(&pSupporter->pState->numOfCompleted, 1); - + if (finished >= numOfTotal) { assert(finished == numOfTotal); tscTrace("%p all %d secondary subquery retrieves completed, global code:%d", tres, numOfTotal, @@ -778,17 +868,17 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) { freeJoinSubqueryObj(pParentSql); pParentSql->res.completed = true; } - + // update the records for each subquery in parent sql object. - for(int32_t i = 0; i < pParentSql->numOfSubs; ++i) { + for (int32_t i = 0; i < pParentSql->numOfSubs; ++i) { if (pParentSql->pSubs[i] == NULL) { continue; } - + SSqlRes* pRes1 = &pParentSql->pSubs[i]->res; pRes1->numOfClauseTotal += pRes1->numOfRows; } - + // data has retrieved to client, build the join results tscBuildResFromSubqueries(pParentSql); } else { @@ -1073,10 +1163,8 @@ int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter // this data needs to be transfer to support struct memset(&pNewQueryInfo->fieldsInfo, 0, sizeof(SFieldInfo)); - - pSupporter->tagCond = pNewQueryInfo->tagCond; - memset(&pNewQueryInfo->tagCond, 0, sizeof(STagCond)); - + tscTagCondCopy(&pSupporter->tagCond, &pNewQueryInfo->tagCond);//pNewQueryInfo->tagCond; + pNew->cmd.numOfCols = 0; pNewQueryInfo->intervalTime = 0; memset(&pNewQueryInfo->limit, 0, sizeof(SLimitVal)); @@ -1094,32 +1182,29 @@ int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter STagCond* pTagCond = &pSupporter->tagCond; assert(pTagCond->joinInfo.hasJoin); - int32_t tagIndex = tscGetJoinTagColIndexByUid(pTagCond, pTableMetaInfo->pTableMeta->uid); - SSchema* pTagSchema = tscGetTableTagSchema(pTableMetaInfo->pTableMeta); - - SSchema s = pTagSchema[tagIndex]; + int32_t tagColId = tscGetJoinTagColIdByUid(pTagCond, pTableMetaInfo->pTableMeta->uid); + SSchema* s = tscGetTableColumnSchemaById(pTableMetaInfo->pTableMeta, tagColId); int16_t bytes = 0; - int16_t type = 0; + int16_t type = 0; int32_t inter = 0; - getResultDataInfo(s.type, s.bytes, TSDB_FUNC_TID_TAG, 0, &type, &bytes, &inter, 0, 0); + getResultDataInfo(s->type, s->bytes, TSDB_FUNC_TID_TAG, 0, &type, &bytes, &inter, 0, 0); - s.type = type; - s.bytes = bytes; - pSupporter->tagSize = s.bytes; + SSchema s1 = {.colId = s->colId, .type = type, .bytes = bytes}; + pSupporter->tagSize = s1.bytes; + assert(isValidDataType(s1.type, 0) && s1.bytes > 0); // set get tags query type TSDB_QUERY_SET_TYPE(pNewQueryInfo->type, TSDB_QUERY_TYPE_TAG_FILTER_QUERY); - tscAddSpecialColumnForSelect(pNewQueryInfo, 0, TSDB_FUNC_TID_TAG, &index, &s, TSDB_COL_TAG); + tscAddSpecialColumnForSelect(pNewQueryInfo, 0, TSDB_FUNC_TID_TAG, &index, &s1, TSDB_COL_TAG); size_t numOfCols = taosArrayGetSize(pNewQueryInfo->colList); tscTrace( "%p subquery:%p tableIndex:%d, vgroupIndex:%d, type:%d, transfer to tid_tag query to retrieve (tableId, tags), " - "exprInfo:%d, colList:%d, fieldsInfo:%d, name:%s", + "exprInfo:%d, colList:%d, fieldsInfo:%d, tagIndex:%d, name:%s", pSql, pNew, tableIndex, pTableMetaInfo->vgroupIndex, pNewQueryInfo->type, tscSqlExprNumOfExprs(pNewQueryInfo), - numOfCols, pNewQueryInfo->fieldsInfo.numOfOutput, pNewQueryInfo->pTableMetaInfo[0]->name); - + numOfCols, pNewQueryInfo->fieldsInfo.numOfOutput, index.columnIndex, pNewQueryInfo->pTableMetaInfo[0]->name); } else { SSchema colSchema = {.type = TSDB_DATA_TYPE_BINARY, .bytes = 1}; SColumnIndex index = {0, PRIMARYKEY_TIMESTAMP_COL_INDEX}; @@ -1128,10 +1213,11 @@ int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter // set the tags value for ts_comp function SSqlExpr *pExpr = tscSqlExprGet(pNewQueryInfo, 0); - int16_t tagColIndex = tscGetJoinTagColIndexByUid(&pSupporter->tagCond, pTableMetaInfo->pTableMeta->uid); - - pExpr->param->i64Key = tagColIndex; - pExpr->numOfParams = 1; + if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { + int16_t tagColId = tscGetJoinTagColIdByUid(&pSupporter->tagCond, pTableMetaInfo->pTableMeta->uid); + pExpr->param->i64Key = tagColId; + pExpr->numOfParams = 1; + } // add the filter tag column if (pSupporter->colList != NULL) { @@ -1281,9 +1367,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { trs->pParentSqlObj = pSql; trs->pFinalColModel = pModel; - pthread_mutexattr_t mutexattr; - memset(&mutexattr, 0, sizeof(pthread_mutexattr_t)); - + pthread_mutexattr_t mutexattr = {0}; pthread_mutexattr_settype(&mutexattr, PTHREAD_MUTEX_RECURSIVE_NP); pthread_mutex_init(&trs->queryMutex, &mutexattr); pthread_mutexattr_destroy(&mutexattr); @@ -1300,6 +1384,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { if (pQueryInfo->tsBuf) { SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0); pNewQueryInfo->tsBuf = tsBufClone(pQueryInfo->tsBuf); + assert(pNewQueryInfo->tsBuf != NULL); } tscTrace("%p sub:%p create subquery success. orderOfSub:%d", pSql, pNew, trs->subqueryIndex); @@ -1453,7 +1538,7 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO // in case of second stage join subquery, invoke its callback function instead of regular QueueAsyncRes SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pPObj->cmd, 0); - if ((pQueryInfo->type & TSDB_QUERY_TYPE_JOIN_SEC_STAGE) == TSDB_QUERY_TYPE_JOIN_SEC_STAGE) { + if (!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE)) { (*pPObj->fp)(pPObj->param, pPObj, pPObj->res.code); } else { // regular super table query if (pPObj->res.code != TSDB_CODE_SUCCESS) { @@ -1474,7 +1559,7 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p // data in from current vnode is stored in cache and disk uint32_t numOfRowsFromSubquery = trsupport->pExtMemBuffer[idx]->numOfTotalElems + trsupport->localBuffer->num; - tscTrace("%p sub:%p all data retrieved from ip:%u,vgId:%d, numOfRows:%d, orderOfSub:%d", pPObj, pSql, + tscTrace("%p sub:%p all data retrieved from ip:%s, vgId:%d, numOfRows:%d, orderOfSub:%d", pPObj, pSql, pTableMetaInfo->vgroupList->vgroups[0].ipAddr[0].fqdn, pTableMetaInfo->vgroupList->vgroups[0].vgId, numOfRowsFromSubquery, idx); @@ -1708,7 +1793,7 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) { tscHandleSubqueryError(param, tres, pState->code); } else { // success, proceed to retrieve data from dnode - tscTrace("%p sub:%p query complete, ip:%u, vgId:%d, orderOfSub:%d, retrieve data", trsupport->pParentSqlObj, pSql, + tscTrace("%p sub:%p query complete, ip:%s, vgId:%d, orderOfSub:%d, retrieve data", trsupport->pParentSqlObj, pSql, pVgroup->ipAddr[0].fqdn, pVgroup->vgId, trsupport->subqueryIndex); if (pSql->res.qhandle == 0) { // qhandle is NULL, code is TSDB_CODE_SUCCESS means no results generated from this vnode @@ -1868,7 +1953,7 @@ void tscBuildResFromSubqueries(SSqlObj *pSql) { pRes->numOfClauseTotal++; break; } else { // continue retrieve data from vnode - if (!tscHashRemainDataInSubqueryResultSet(pSql)) { + if (!tscHasRemainDataInSubqueryResultSet(pSql)) { tscTrace("%p at least one subquery exhausted, free all other %d subqueries", pSql, pSql->numOfSubs - 1); SSubqueryState *pState = NULL; @@ -2018,7 +2103,7 @@ void **doSetResultRowData(SSqlObj *pSql, bool finalResult) { return pRes->tsrow; } -static UNUSED_FUNC bool tscHashRemainDataInSubqueryResultSet(SSqlObj *pSql) { +static bool tscHasRemainDataInSubqueryResultSet(SSqlObj *pSql) { bool hasData = true; SSqlCmd *pCmd = &pSql->cmd; @@ -2061,8 +2146,7 @@ static UNUSED_FUNC bool tscHashRemainDataInSubqueryResultSet(SSqlObj *pSql) { SQueryInfo *pQueryInfo1 = tscGetQueryInfoDetail(&pSql->pSubs[i]->cmd, 0); if ((pRes1->row >= pRes1->numOfRows && tscHasReachLimitation(pQueryInfo1, pRes1) && - tscProjectionQueryOnTable(pQueryInfo1)) || - (pRes1->numOfRows == 0)) { + tscIsProjectionQuery(pQueryInfo1)) || (pRes1->numOfRows == 0)) { hasData = false; break; } diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index aea658f02f..498460c37b 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -70,13 +70,6 @@ void tsSetSTableQueryCond(STagCond* pTagCond, uint64_t uid, SBufferWriter* bw) { taosArrayPush(pTagCond->pCond, &cond); } -bool tscQueryOnSTable(SSqlCmd* pCmd) { - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); - - return ((pQueryInfo->type & TSDB_QUERY_TYPE_STABLE_QUERY) == TSDB_QUERY_TYPE_STABLE_QUERY) && - (pCmd->msgType == TSDB_MSG_TYPE_QUERY); -} - bool tscQueryTags(SQueryInfo* pQueryInfo) { for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) { SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i); @@ -95,32 +88,8 @@ bool tscQueryTags(SQueryInfo* pQueryInfo) { return true; } -bool tscIsSelectivityWithTagQuery(SSqlCmd* pCmd) { - bool hasTags = false; - int32_t numOfSelectivity = 0; - - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); - - for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) { - int32_t functId = tscSqlExprGet(pQueryInfo, i)->functionId; - if (functId == TSDB_FUNC_TAG_DUMMY) { - hasTags = true; - continue; - } - - if ((aAggs[functId].nStatus & TSDB_FUNCSTATE_SELECTIVITY) != 0) { - numOfSelectivity++; - } - } - - if (numOfSelectivity > 0 && hasTags) { - return true; - } - - return false; -} - -void tscGetDBInfoFromMeterId(char* tableId, char* db) { +// todo refactor, extract methods and move the common module +void tscGetDBInfoFromTableFullName(char* tableId, char* db) { char* st = strstr(tableId, TS_PATH_DELIMITER); if (st != NULL) { char* end = strstr(st + 1, TS_PATH_DELIMITER); @@ -181,8 +150,14 @@ bool tscIsProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex) { for (int32_t i = 0; i < numOfExprs; ++i) { int32_t functionId = tscSqlExprGet(pQueryInfo, i)->functionId; - if (functionId != TSDB_FUNC_PRJ && functionId != TSDB_FUNC_TAGPRJ && functionId != TSDB_FUNC_TAG && - functionId != TSDB_FUNC_TS && functionId != TSDB_FUNC_ARITHM) { + + if (functionId != TSDB_FUNC_PRJ && + functionId != TSDB_FUNC_TAGPRJ && + functionId != TSDB_FUNC_TAG && + functionId != TSDB_FUNC_TS && + functionId != TSDB_FUNC_ARITHM && + functionId != TSDB_FUNC_TS_COMP && + functionId != TSDB_FUNC_TID_TAG) { return false; } } @@ -209,10 +184,14 @@ bool tscOrderedProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableInde return pQueryInfo->order.orderColId >= 0; } -bool tscProjectionQueryOnTable(SQueryInfo* pQueryInfo) { - for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) { +bool tscIsProjectionQuery(SQueryInfo* pQueryInfo) { + size_t size = tscSqlExprNumOfExprs(pQueryInfo); + + for (int32_t i = 0; i < size; ++i) { int32_t functionId = tscSqlExprGet(pQueryInfo, i)->functionId; - if (functionId != TSDB_FUNC_PRJ && functionId != TSDB_FUNC_TS) { + + if (functionId != TSDB_FUNC_PRJ && functionId != TSDB_FUNC_TAGPRJ && functionId != TSDB_FUNC_TAG && + functionId != TSDB_FUNC_TS && functionId != TSDB_FUNC_ARITHM) { return false; } } @@ -225,9 +204,10 @@ bool tscIsPointInterpQuery(SQueryInfo* pQueryInfo) { for (int32_t i = 0; i < size; ++i) { SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i); - if (pExpr == NULL) { - return false; - } + assert(pExpr != NULL); +// if (pExpr == NULL) { +// return false; +// } int32_t functionId = pExpr->functionId; if (functionId == TSDB_FUNC_TAG) { @@ -238,6 +218,7 @@ bool tscIsPointInterpQuery(SQueryInfo* pQueryInfo) { return false; } } + return true; } @@ -1774,7 +1755,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void SQueryInfo* pPrevQueryInfo = tscGetQueryInfoDetail(&pPrevSql->cmd, pPrevSql->cmd.clauseIndex); pNewQueryInfo->type = pPrevQueryInfo->type; } else { - pNewQueryInfo->type |= TSDB_QUERY_TYPE_SUBQUERY; // it must be the subquery + TSDB_QUERY_SET_TYPE(pNewQueryInfo->type, TSDB_QUERY_TYPE_SUBQUERY);// it must be the subquery } uint64_t uid = pTableMetaInfo->pTableMeta->uid; @@ -1799,19 +1780,26 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void } // make sure the the sqlExpr for each fields is correct -// todo handle the agg arithmetic expression + // todo handle the agg arithmetic expression + numOfExprs = tscSqlExprNumOfExprs(pNewQueryInfo); + for(int32_t f = 0; f < pNewQueryInfo->fieldsInfo.numOfOutput; ++f) { TAOS_FIELD* field = tscFieldInfoGetField(&pNewQueryInfo->fieldsInfo, f); - numOfExprs = tscSqlExprNumOfExprs(pNewQueryInfo); - + bool matched = false; + for(int32_t k1 = 0; k1 < numOfExprs; ++k1) { SSqlExpr* pExpr1 = tscSqlExprGet(pNewQueryInfo, k1); - - if (strcmp(field->name, pExpr1->aliasName) == 0) { // eatablish link according to the result field name + + if (strcmp(field->name, pExpr1->aliasName) == 0) { // establish link according to the result field name SFieldSupInfo* pInfo = tscFieldInfoGetSupp(&pNewQueryInfo->fieldsInfo, f); pInfo->pSqlExpr = pExpr1; + + matched = true; + break; } } + + assert(matched); } tscFieldInfoUpdateOffset(pNewQueryInfo); @@ -1900,16 +1888,21 @@ void tscDoQuery(SSqlObj* pSql) { } if (QUERY_IS_JOIN_QUERY(type)) { - if ((pQueryInfo->type & TSDB_QUERY_TYPE_SUBQUERY) == 0) { + if (!TSDB_QUERY_HAS_TYPE(type, TSDB_QUERY_TYPE_SUBQUERY)) { tscHandleMasterJoinQuery(pSql); - return; - } else { - // for first stage sub query, iterate all vnodes to get all timestamp - if ((pQueryInfo->type & TSDB_QUERY_TYPE_JOIN_SEC_STAGE) != TSDB_QUERY_TYPE_JOIN_SEC_STAGE) { -// doProcessSql(pSql); - assert(0); + } else { // for first stage sub query, iterate all vnodes to get all timestamp + if (!TSDB_QUERY_HAS_TYPE(type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE)) { + tscProcessSql(pSql); + } else { // secondary stage join query. + if (tscIsTwoStageSTableQuery(pQueryInfo, 0)) { // super table query + tscHandleMasterSTableQuery(pSql); + } else { + tscProcessSql(pSql); + } } } + + return; } else if (tscIsTwoStageSTableQuery(pQueryInfo, 0)) { // super table query tscHandleMasterSTableQuery(pSql); return; @@ -1919,13 +1912,13 @@ void tscDoQuery(SSqlObj* pSql) { } } -int16_t tscGetJoinTagColIndexByUid(STagCond* pTagCond, uint64_t uid) { +int16_t tscGetJoinTagColIdByUid(STagCond* pTagCond, uint64_t uid) { if (pTagCond->joinInfo.left.uid == uid) { - return pTagCond->joinInfo.left.tagCol; - } else if (pTagCond->joinInfo.right.uid == uid){ - return pTagCond->joinInfo.right.tagCol; + return pTagCond->joinInfo.left.tagColId; + } else if (pTagCond->joinInfo.right.uid == uid) { + return pTagCond->joinInfo.right.tagColId; } else { - return -2; + assert(0); } } @@ -1982,11 +1975,10 @@ bool hasMoreVnodesToTry(SSqlObj* pSql) { return false; } - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); - - STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); assert(pRes->completed); - + SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); + STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); + // for normal table, no need to try any more if results are all retrieved from one vnode if (!UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo) || (pTableMetaInfo->vgroupList == NULL)) { return false; @@ -2008,7 +2000,6 @@ void tscTryQueryNextVnode(SSqlObj* pSql, __async_cb_func_t fp) { * if case of: multi-vnode super table projection query */ assert(pRes->numOfRows == 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) && !tscHasReachLimitation(pQueryInfo, pRes)); - STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); int32_t totalVgroups = pTableMetaInfo->vgroupList->numOfVgroups; diff --git a/src/inc/taoserror.h b/src/inc/taoserror.h index ac2af75742..8effacbbbf 100644 --- a/src/inc/taoserror.h +++ b/src/inc/taoserror.h @@ -195,6 +195,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_QRY_INVALID_MSG, 0, 0x0701, "query inva TAOS_DEFINE_ERROR(TSDB_CODE_QRY_NO_DISKSPACE, 0, 0x0702, "query no diskspace") TAOS_DEFINE_ERROR(TSDB_CODE_QRY_OUT_OF_MEMORY, 0, 0x0703, "query out of memory") TAOS_DEFINE_ERROR(TSDB_CODE_QRY_APP_ERROR, 0, 0x0704, "query app error") +TAOS_DEFINE_ERROR(TSDB_CODE_QRY_DUP_JOIN_KEY, 0, 0x0705, "query duplicated join key") // grant TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_EXPIRED, 0, 0x0800, "grant expired") diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index 1198097895..b9cdc0a9f3 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -455,7 +455,7 @@ typedef struct { int16_t orderType; // used in group by xx order by xxx int64_t limit; int64_t offset; - uint16_t queryType; // denote another query process + uint32_t queryType; // denote another query process int16_t numOfOutput; // final output columns numbers int16_t tagNameRelType; // relation of tag criteria and tbname criteria int16_t fillType; // interpolate type diff --git a/src/query/inc/qtsbuf.h b/src/query/inc/qtsbuf.h index c83c3dbe25..e437e1c4e5 100644 --- a/src/query/inc/qtsbuf.h +++ b/src/query/inc/qtsbuf.h @@ -100,10 +100,10 @@ typedef struct STSBuf { typedef struct STSBufFileHeader { uint32_t magic; // file magic number uint32_t numOfVnode; // number of vnode stored in current file - uint32_t tsOrder; // timestamp order in current file + int32_t tsOrder; // timestamp order in current file } STSBufFileHeader; -STSBuf* tsBufCreate(bool autoDelete); +STSBuf* tsBufCreate(bool autoDelete, int32_t order); STSBuf* tsBufCreateFromFile(const char* path, bool autoDelete); STSBuf* tsBufCreateFromCompBlocks(const char* pData, int32_t numOfBlocks, int32_t len, int32_t tsOrder); diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index e9672002ab..2b03e81800 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -12,6 +12,7 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ +#include #include "os.h" #include "qfill.h" @@ -1072,10 +1073,6 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS groupbyColumnData = getGroupbyColumnData(pQuery, &type, &bytes, pDataBlock); } - if (pRuntimeEnv->pTSBuf != NULL && pQuery->numOfOutput > 1) { - printf("xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx\n"); - } - for (int32_t k = 0; k < pQuery->numOfOutput; ++k) { char *dataBlock = getDataBlock(pRuntimeEnv, &sasArray[k], k, pDataBlockInfo->rows, pDataBlock); setExecParams(pQuery, &pCtx[k], dataBlock, tsCols, pDataBlockInfo, pStatis, &sasArray[k], k); @@ -2193,8 +2190,7 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { * set tag value in SQLFunctionCtx * e.g.,tag information into input buffer */ -static void doSetTagValueInParam(void *tsdb, STableId* pTableId, int32_t tagColId, tVariant *tag, int16_t type, - int16_t bytes) { +static void doSetTagValueInParam(void *tsdb, STableId* pTableId, int32_t tagColId, tVariant *tag, int16_t type, int16_t bytes) { tVariantDestroy(tag); if (tagColId == TSDB_TBNAME_COLUMN_INDEX) { @@ -2219,35 +2215,55 @@ static void doSetTagValueInParam(void *tsdb, STableId* pTableId, int32_t tagColI void setTagVal(SQueryRuntimeEnv *pRuntimeEnv, STableId* pTableId, void *tsdb) { SQuery *pQuery = pRuntimeEnv->pQuery; + SQInfo* pQInfo = GET_QINFO_ADDR(pRuntimeEnv); SExprInfo *pExprInfo = &pQuery->pSelectExpr[0]; if (pQuery->numOfOutput == 1 && pExprInfo->base.functionId == TSDB_FUNC_TS_COMP) { - assert(pExprInfo->base.numOfParams == 1); - doSetTagValueInParam(tsdb, pTableId, pExprInfo->base.arg->argValue.i64, &pRuntimeEnv->pCtx[0].tag, - pExprInfo->type, pExprInfo->bytes); + + // todo refactor extract function. + int16_t type = -1, bytes = -1; + for(int32_t i = 0; i < pQuery->numOfTags; ++i) { + if (pQuery->tagColList[i].colId == pExprInfo->base.arg->argValue.i64) { + type = pQuery->tagColList[i].type; + bytes = pQuery->tagColList[i].bytes; + } + } + + doSetTagValueInParam(tsdb, pTableId, pExprInfo->base.arg->argValue.i64, &pRuntimeEnv->pCtx[0].tag, type, bytes); } else { // set tag value, by which the results are aggregated. for (int32_t idx = 0; idx < pQuery->numOfOutput; ++idx) { - SExprInfo* pExprInfo = &pQuery->pSelectExpr[idx]; + SExprInfo* pLocalExprInfo = &pQuery->pSelectExpr[idx]; // ts_comp column required the tag value for join filter - if (!TSDB_COL_IS_TAG(pExprInfo->base.colInfo.flag)) { + if (!TSDB_COL_IS_TAG(pLocalExprInfo->base.colInfo.flag)) { continue; } // todo use tag column index to optimize performance - doSetTagValueInParam(tsdb, pTableId, pExprInfo->base.colInfo.colId, &pRuntimeEnv->pCtx[idx].tag, - pExprInfo->type, pExprInfo->bytes); + doSetTagValueInParam(tsdb, pTableId, pLocalExprInfo->base.colInfo.colId, &pRuntimeEnv->pCtx[idx].tag, + pLocalExprInfo->type, pLocalExprInfo->bytes); } // set the join tag for first column SSqlFuncMsg *pFuncMsg = &pExprInfo->base; - if (pFuncMsg->functionId == TSDB_FUNC_TS && pFuncMsg->colInfo.colIndex == PRIMARYKEY_TIMESTAMP_COL_INDEX && + if ((pFuncMsg->functionId == TSDB_FUNC_TS || pFuncMsg->functionId == TSDB_FUNC_PRJ) && pFuncMsg->colInfo.colIndex == PRIMARYKEY_TIMESTAMP_COL_INDEX && pRuntimeEnv->pTSBuf != NULL) { assert(pFuncMsg->numOfParams == 1); - assert(0); // to do fix me - // doSetTagValueInParam(pTagSchema, pFuncMsg->arg->argValue.i64, pMeterSidInfo, &pRuntimeEnv->pCtx[0].tag); + + // todo refactor + int16_t type = -1, bytes = -1; + for(int32_t i = 0; i < pQuery->numOfTags; ++i) { + if (pQuery->tagColList[i].colId == pExprInfo->base.arg->argValue.i64) { + type = pQuery->tagColList[i].type; + bytes = pQuery->tagColList[i].bytes; + } + } + + doSetTagValueInParam(tsdb, pTableId, pExprInfo->base.arg->argValue.i64, &pRuntimeEnv->pCtx[0].tag, type, bytes); + qTrace("QInfo:%p set tag value for join comparison, colId:%d, val:%"PRId64, pQInfo, pExprInfo->base.arg->argValue.i64, + pRuntimeEnv->pCtx[0].tag) } } } @@ -3623,9 +3639,6 @@ bool queryHasRemainResults(SQueryRuntimeEnv* pRuntimeEnv) { static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data) { SQuery *pQuery = pQInfo->runtimeEnv.pQuery; - if (pQInfo->runtimeEnv.pTSBuf != NULL && pQuery->numOfOutput > 1) { - printf("ffffffffffffffffffffffffff\n"); - } for (int32_t col = 0; col < pQuery->numOfOutput; ++col) { int32_t bytes = pQuery->pSelectExpr[col].bytes; @@ -4461,6 +4474,10 @@ static void sequentialTableProcess(SQInfo *pQInfo) { } } } + + if (pQInfo->tableIndex >= pQInfo->groupInfo.numOfTables) { + setQueryStatus(pQuery, QUERY_COMPLETED); + } } /* @@ -5028,7 +5045,7 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList, pQueryMsg->order = htons(pQueryMsg->order); pQueryMsg->orderColId = htons(pQueryMsg->orderColId); - pQueryMsg->queryType = htons(pQueryMsg->queryType); + pQueryMsg->queryType = htonl(pQueryMsg->queryType); pQueryMsg->tagNameRelType = htons(pQueryMsg->tagNameRelType); pQueryMsg->numOfCols = htons(pQueryMsg->numOfCols); @@ -5047,9 +5064,6 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList, } char *pMsg = (char *)(pQueryMsg->colList) + sizeof(SColumnInfo) * pQueryMsg->numOfCols; - if (pQueryMsg->numOfCols > 1 && pQueryMsg->tsLen > 0) { - printf("ffffffffffffffff\n"); - } for (int32_t col = 0; col < pQueryMsg->numOfCols; ++col) { SColumnInfo *pColInfo = &pQueryMsg->colList[col]; @@ -5199,11 +5213,11 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList, pMsg += len; } - qTrace("qmsg:%p query %d tables, qrange:%" PRId64 "-%" PRId64 ", numOfGroupbyTagCols:%d, order:%d, " - "outputCols:%d, numOfCols:%d, interval:%" PRId64 ", fillType:%d, comptsLen:%d, limit:%" PRId64 ", offset:%" PRId64, - pQueryMsg, pQueryMsg->numOfTables, pQueryMsg->window.skey, pQueryMsg->window.ekey, pQueryMsg->numOfGroupCols, + qTrace("qmsg:%p query %d tables, type:%d, qrange:%" PRId64 "-%" PRId64 ", numOfGroupbyTagCols:%d, order:%d, " + "outputCols:%d, numOfCols:%d, interval:%" PRId64 ", fillType:%d, comptsLen:%d, compNumOfBlocks:%d, limit:%" PRId64 ", offset:%" PRId64, + pQueryMsg, pQueryMsg->numOfTables, pQueryMsg->queryType, pQueryMsg->window.skey, pQueryMsg->window.ekey, pQueryMsg->numOfGroupCols, pQueryMsg->order, pQueryMsg->numOfOutput, pQueryMsg->numOfCols, pQueryMsg->intervalTime, - pQueryMsg->fillType, pQueryMsg->tsLen, pQueryMsg->limit, pQueryMsg->offset); + pQueryMsg->fillType, pQueryMsg->tsLen, pQueryMsg->tsNumOfBlocks, pQueryMsg->limit, pQueryMsg->offset); return 0; } @@ -5241,9 +5255,6 @@ static int32_t createQFunctionExprFromMsg(SQueryTableMsg *pQueryMsg, SExprInfo * bool isSuperTable = QUERY_IS_STABLE_QUERY(pQueryMsg->queryType); int16_t tagLen = 0; - if (pQueryMsg->numOfOutput > 1 && pQueryMsg->tsLen > 0) { - printf("ffffffffffffffffffff\n"); - } for (int32_t i = 0; i < pQueryMsg->numOfOutput; ++i) { pExprs[i].base = *pExprMsg[i]; pExprs[i].bytes = 0; @@ -5883,23 +5894,20 @@ int32_t qCreateQueryInfo(void *tsdb, int32_t vgId, SQueryTableMsg *pQueryMsg, qi bool isSTableQuery = false; STableGroupInfo groupInfo = {0}; - //todo multitable_query?? - if (TSDB_QUERY_HAS_TYPE(pQueryMsg->queryType, TSDB_QUERY_TYPE_MULTITABLE_QUERY|TSDB_QUERY_TYPE_TABLE_QUERY)) { - isSTableQuery = TSDB_QUERY_HAS_TYPE(pQueryMsg->queryType, TSDB_QUERY_TYPE_MULTITABLE_QUERY); - + if (TSDB_QUERY_HAS_TYPE(pQueryMsg->queryType, TSDB_QUERY_TYPE_TABLE_QUERY)) { STableIdInfo *id = taosArrayGet(pTableIdList, 0); - qTrace("qmsg:%p query table, uid:%"PRId64", tid:%d", pQueryMsg, id->uid, id->tid); - + + qTrace("qmsg:%p query normal table, uid:%"PRId64", tid:%d", pQueryMsg, id->uid, id->tid); if ((code = tsdbGetOneTableGroup(tsdb, id->uid, &groupInfo)) != TSDB_CODE_SUCCESS) { goto _over; } - } else if (TSDB_QUERY_HAS_TYPE(pQueryMsg->queryType, TSDB_QUERY_TYPE_STABLE_QUERY)) { + } else if (TSDB_QUERY_HAS_TYPE(pQueryMsg->queryType, TSDB_QUERY_TYPE_MULTITABLE_QUERY|TSDB_QUERY_TYPE_STABLE_QUERY)) { isSTableQuery = true; - // TODO: need a macro from TSDB to check if table is super table, - // also note there's possiblity that only one table in the super table - if (taosArrayGetSize(pTableIdList) == 1) { + // TODO: need a macro from TSDB to check if table is super table + + // also note there's possibility that only one table in the super table + if (!TSDB_QUERY_HAS_TYPE(pQueryMsg->queryType, TSDB_QUERY_TYPE_MULTITABLE_QUERY)) { STableIdInfo *id = taosArrayGet(pTableIdList, 0); - // if array size is 1 and assert super table // group by normal column, do not pass the group by condition to tsdb to group table into different group int32_t numOfGroupByCols = pQueryMsg->numOfGroupCols; @@ -5913,15 +5921,13 @@ int32_t qCreateQueryInfo(void *tsdb, int32_t vgId, SQueryTableMsg *pQueryMsg, qi goto _over; } } else { - SArray* pTableGroup = taosArrayInit(1, POINTER_BYTES); + groupInfo.pGroupList = taosArrayInit(1, POINTER_BYTES); + groupInfo.numOfTables = taosArrayGetSize(pTableIdList); - SArray* sa = taosArrayInit(groupInfo.numOfTables, sizeof(STableId)); - for(int32_t i = 0; i < groupInfo.numOfTables; ++i) { - STableIdInfo* tableId = taosArrayGet(pTableIdList, i); - taosArrayPush(sa, tableId); - } - taosArrayPush(pTableGroup, &sa); - groupInfo.pGroupList = pTableGroup; + SArray* p = taosArrayClone(pTableIdList); + taosArrayPush(groupInfo.pGroupList, &p); + + qTrace("qmsg:%p query on %d tables in one group from client", pQueryMsg, groupInfo.numOfTables); } } else { assert(0); @@ -6177,7 +6183,7 @@ static void buildTagQueryResult(SQInfo* pQInfo) { } else { if (val == NULL) { setNull(output, type, bytes); - } else { + } else { // todo here stop will cause client crash memcpy(output, val, bytes); } } diff --git a/src/query/src/qtsbuf.c b/src/query/src/qtsbuf.c index 869299f309..db1992a572 100644 --- a/src/query/src/qtsbuf.c +++ b/src/query/src/qtsbuf.c @@ -15,7 +15,7 @@ static int32_t STSBufUpdateHeader(STSBuf* pTSBuf, STSBufFileHeader* pHeader); * @param path * @return */ -STSBuf* tsBufCreate(bool autoDelete) { +STSBuf* tsBufCreate(bool autoDelete, int32_t order) { STSBuf* pTSBuf = calloc(1, sizeof(STSBuf)); if (pTSBuf == NULL) { return NULL; @@ -40,7 +40,7 @@ STSBuf* tsBufCreate(bool autoDelete) { pTSBuf->cur.order = TSDB_ORDER_ASC; pTSBuf->autoDelete = autoDelete; - pTSBuf->tsOrder = -1; + pTSBuf->tsOrder = order; return pTSBuf; } @@ -66,8 +66,8 @@ STSBuf* tsBufCreateFromFile(const char* path, bool autoDelete) { // validate the file magic number STSBufFileHeader header = {0}; fseek(pTSBuf->f, 0, SEEK_SET); - fread(&header, 1, sizeof(header), pTSBuf->f); - + fread(&header, 1, sizeof(STSBufFileHeader), pTSBuf->f); + // invalid file if (header.magic != TS_COMP_FILE_MAGIC) { return NULL; @@ -119,7 +119,6 @@ STSBuf* tsBufCreateFromFile(const char* path, bool autoDelete) { // ascending by default pTSBuf->cur.order = TSDB_ORDER_ASC; - pTSBuf->autoDelete = autoDelete; // tscTrace("create tsBuf from file:%s, fd:%d, size:%d, numOfVnode:%d, autoDelete:%d", pTSBuf->path, fileno(pTSBuf->f), @@ -536,7 +535,9 @@ int32_t STSBufUpdateHeader(STSBuf* pTSBuf, STSBufFileHeader* pHeader) { if ((pTSBuf->f == NULL) || pHeader == NULL || pHeader->numOfVnode < 0 || pHeader->magic != TS_COMP_FILE_MAGIC) { return -1; } - + + assert(pHeader->tsOrder == TSDB_ORDER_ASC || pHeader->tsOrder == TSDB_ORDER_DESC); + int64_t r = fseek(pTSBuf->f, 0, SEEK_SET); if (r != 0) { return -1; @@ -754,7 +755,7 @@ int32_t tsBufMerge(STSBuf* pDestBuf, const STSBuf* pSrcBuf, int32_t vnodeId) { } STSBuf* tsBufCreateFromCompBlocks(const char* pData, int32_t numOfBlocks, int32_t len, int32_t order) { - STSBuf* pTSBuf = tsBufCreate(true); + STSBuf* pTSBuf = tsBufCreate(true, order); STSVnodeBlockInfo* pBlockInfo = &(addOneVnodeInfo(pTSBuf, 0)->info); pBlockInfo->numOfBlocks = numOfBlocks; @@ -845,7 +846,9 @@ STSBuf* tsBufClone(STSBuf* pTSBuf) { if (pTSBuf == NULL) { return NULL; } - + + tsBufFlush(pTSBuf); + return tsBufCreateFromFile(pTSBuf->path, false); } diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index a62ad5bbd3..93f5ca9355 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -2112,8 +2112,8 @@ int32_t tsdbQuerySTableByTagCond(TsdbRepoT* tsdb, uint64_t uid, const char* pTag } if (pTable->type != TSDB_SUPER_TABLE) { - tsdbError("%p query normal tag not allowed, uid:%" PRIu64 ", tid:%d, name:%s", - tsdb, uid, pTable->tableId.tid, pTable->name); + tsdbError("%p query normal tag not allowed, uid:%" PRIu64 ", tid:%d, name:%s", tsdb, uid, pTable->tableId.tid, + pTable->name); return TSDB_CODE_COM_OPS_NOT_SUPPORT; //basically, this error is caused by invalid sql issued by client } @@ -2128,7 +2128,7 @@ int32_t tsdbQuerySTableByTagCond(TsdbRepoT* tsdb, uint64_t uid, const char* pTag pGroupInfo->numOfTables = taosArrayGetSize(res); pGroupInfo->pGroupList = createTableGroup(res, pTagSchema, pColIndex, numOfCols, tsdb); - tsdbTrace("no tbname condition or tagcond, all tables belongs to one group, numOfTables:%d", pGroupInfo->numOfTables); + tsdbTrace("%p no table name/tag condition, all tables belong to one group, numOfTables:%d", tsdb, pGroupInfo->numOfTables); } else { // todo add error } @@ -2172,6 +2172,9 @@ int32_t tsdbQuerySTableByTagCond(TsdbRepoT* tsdb, uint64_t uid, const char* pTag pGroupInfo->numOfTables = taosArrayGetSize(res); pGroupInfo->pGroupList = createTableGroup(res, pTagSchema, pColIndex, numOfCols, tsdb); + tsdbTrace("%p stable tid:%d, uid:%"PRIu64" query, numOfTables:%d, belong to %d groups", tsdb, pTable->tableId.tid, + pTable->tableId.uid, pGroupInfo->numOfTables, taosArrayGetSize(pGroupInfo->pGroupList)); + taosArrayDestroy(res); return ret; } diff --git a/src/util/inc/tutil.h b/src/util/inc/tutil.h index d38f983718..c75ba24baa 100644 --- a/src/util/inc/tutil.h +++ b/src/util/inc/tutil.h @@ -34,18 +34,19 @@ extern "C" { #define WCHAR wchar_t -#define tfree(x) \ - { \ - if (x) { \ - free((void*)(x)); \ - x = 0; \ - } \ +#define tfree(x) \ + { \ + if (x) { \ + free((void *)(x)); \ + x = 0; \ + } \ } -#define tstrncpy(dst, src, size) do { \ +#define tstrncpy(dst, src, size) \ + do { \ strncpy((dst), (src), (size)); \ - (dst)[(size) - 1] = 0; \ -} while (0); + (dst)[(size)-1] = 0; \ + } while (0); #define tclose(x) taosCloseSocket(x) diff --git a/tests/script/general/parser/join.sim b/tests/script/general/parser/join.sim index 62249e3d62..1bce6f1950 100644 --- a/tests/script/general/parser/join.sim +++ b/tests/script/general/parser/join.sim @@ -354,6 +354,7 @@ sql select count(*) from join_mt0, join_mt1 where join_mt0.ts = join_mt1.ts and $val = 20 if $data00 != $val then + print expect 20, actual:$data00 return -1 endi @@ -411,7 +412,7 @@ endi #======================limit offset=================================== # tag values not int -sql_error select count(*) from join_mt0, join_mt1 where join_mt0.ts=join_mt1.ts and join_mt0.t2=join_mt1.t2; +sql_error select count(*) from join_mt0, join_mt1 where join_mt0.ts=join_mt1.ts and join_mt0.t2=join_mt1.t2; #!!!!! # tag type not identical sql_error select count(*) from join_mt0, join_mt1 where join_mt1.t2 = join_mt0.t1 and join_mt1.ts=join_mt0.ts; diff --git a/tests/script/general/parser/join_multivnode.sim b/tests/script/general/parser/join_multivnode.sim index 3e32064f3b..4cf1d36672 100644 --- a/tests/script/general/parser/join_multivnode.sim +++ b/tests/script/general/parser/join_multivnode.sim @@ -108,6 +108,7 @@ sql select join_mt0.ts,join_mt0.ts,join_mt0.t1 from join_mt0, join_mt1 where joi print $row if $row != 3000 then + print expect 3000, actual: $row return -1 endi