[td-225] fix bugs in join query.

This commit is contained in:
Haojun Liao 2020-06-17 11:51:01 +08:00
parent 688f6c3a37
commit 53cf7201b8
22 changed files with 536 additions and 509 deletions

View File

@ -122,15 +122,13 @@ bool tscNonOrderedProjectionQueryOnSTable(SQueryInfo *pQueryInfo, int32_t tableI
bool tscOrderedProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex); bool tscOrderedProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex);
bool tscIsProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex); bool tscIsProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex);
bool tscProjectionQueryOnTable(SQueryInfo* pQueryInfo); bool tscIsProjectionQuery(SQueryInfo* pQueryInfo);
bool tscIsTwoStageSTableQuery(SQueryInfo* pQueryInfo, int32_t tableIndex); bool tscIsTwoStageSTableQuery(SQueryInfo* pQueryInfo, int32_t tableIndex);
bool tscQueryOnSTable(SSqlCmd* pCmd);
bool tscQueryTags(SQueryInfo* pQueryInfo); bool tscQueryTags(SQueryInfo* pQueryInfo);
bool tscIsSelectivityWithTagQuery(SSqlCmd* pCmd);
void tscAddSpecialColumnForSelect(SQueryInfo* pQueryInfo, int32_t outputColIndex, int16_t functionId, SColumnIndex* pIndex, void tscAddSpecialColumnForSelect(SQueryInfo* pQueryInfo, int32_t outputColIndex, int16_t functionId, SColumnIndex* pIndex,
SSchema* pColSchema, int16_t isTag); SSchema* pColSchema, int16_t colType);
int32_t tscSetTableFullName(STableMetaInfo* pTableMetaInfo, SSQLToken* pzTableName, SSqlObj* pSql); int32_t tscSetTableFullName(STableMetaInfo* pTableMetaInfo, SSQLToken* pzTableName, SSqlObj* pSql);
void tscClearInterpInfo(SQueryInfo* pQueryInfo); void tscClearInterpInfo(SQueryInfo* pQueryInfo);
@ -139,7 +137,7 @@ bool tscIsInsertData(char* sqlstr);
/* use for keep current db info temporarily, for handle table with db prefix */ /* use for keep current db info temporarily, for handle table with db prefix */
// todo remove it // todo remove it
void tscGetDBInfoFromMeterId(char* tableId, char* db); void tscGetDBInfoFromTableFullName(char* tableId, char* db);
int tscAllocPayload(SSqlCmd* pCmd, int size); int tscAllocPayload(SSqlCmd* pCmd, int size);
@ -253,7 +251,7 @@ void addGroupInfoForSubquery(SSqlObj* pParentObj, SSqlObj* pSql, int32_t sub
void doAddGroupColumnForSubquery(SQueryInfo* pQueryInfo, int32_t tagIndex); void doAddGroupColumnForSubquery(SQueryInfo* pQueryInfo, int32_t tagIndex);
int16_t tscGetJoinTagColIndexByUid(STagCond* pTagCond, uint64_t uid); int16_t tscGetJoinTagColIdByUid(STagCond* pTagCond, uint64_t uid);
void tscPrintSelectClause(SSqlObj* pSql, int32_t subClauseIndex); void tscPrintSelectClause(SSqlObj* pSql, int32_t subClauseIndex);

View File

@ -64,12 +64,20 @@ SSchema* tscGetTableSchema(const STableMeta* pTableMeta);
SSchema *tscGetTableTagSchema(const STableMeta *pMeta); SSchema *tscGetTableTagSchema(const STableMeta *pMeta);
/** /**
* * get the column schema according to the column index
* @param pMeta * @param pMeta
* @param startCol * @param colIndex
* @return * @return
*/ */
SSchema *tscGetTableColumnSchema(const STableMeta *pMeta, int32_t startCol); SSchema *tscGetTableColumnSchema(const STableMeta *pMeta, int32_t colIndex);
/**
* get the column schema according to the column id
* @param pTableMeta
* @param colId
* @return
*/
SSchema* tscGetTableColumnSchemaById(STableMeta* pTableMeta, int16_t colId);
/** /**
* check if the schema is valid or not, including following aspects: * check if the schema is valid or not, including following aspects:

View File

@ -85,7 +85,7 @@ typedef struct SSqlExpr {
int16_t functionId; // function id in aAgg array int16_t functionId; // function id in aAgg array
int16_t resType; // return value type int16_t resType; // return value type
int16_t resBytes; // length of return value int16_t resBytes; // length of return value
int32_t interBytes; // inter result buffer size int32_t interBytes; // inter result buffer size
int16_t numOfParams; // argument value of each function int16_t numOfParams; // argument value of each function
tVariant param[3]; // parameters are not more than 3 tVariant param[3]; // parameters are not more than 3
int32_t offset; // sub result column value of arithmetic expression. int32_t offset; // sub result column value of arithmetic expression.
@ -123,7 +123,7 @@ typedef struct SCond {
typedef struct SJoinNode { typedef struct SJoinNode {
char tableId[TSDB_TABLE_ID_LEN]; char tableId[TSDB_TABLE_ID_LEN];
uint64_t uid; uint64_t uid;
int16_t tagCol; int16_t tagColId;
} SJoinNode; } SJoinNode;
typedef struct SJoinInfo { typedef struct SJoinInfo {
@ -155,20 +155,19 @@ typedef struct SParamInfo {
} SParamInfo; } SParamInfo;
typedef struct STableDataBlocks { typedef struct STableDataBlocks {
char tableId[TSDB_TABLE_ID_LEN]; char tableId[TSDB_TABLE_ID_LEN];
int8_t tsSource; // where does the UNIX timestamp come from, server or client int8_t tsSource; // where does the UNIX timestamp come from, server or client
bool ordered; // if current rows are ordered or not bool ordered; // if current rows are ordered or not
int64_t vgId; // virtual group id int64_t vgId; // virtual group id
int64_t prevTS; // previous timestamp, recorded to decide if the records array is ts ascending int64_t prevTS; // previous timestamp, recorded to decide if the records array is ts ascending
int32_t numOfTables; // number of tables in current submit block int32_t numOfTables; // number of tables in current submit block
int32_t rowSize; // row size for current table
int32_t rowSize; // row size for current table
uint32_t nAllocSize; uint32_t nAllocSize;
uint32_t headerSize; // header for metadata (submit metadata) uint32_t headerSize; // header for table info (uid, tid, submit metadata)
uint32_t size; uint32_t size;
/* /*
* the metermeta for current table, the metermeta will be used during submit stage, keep a ref * the table meta of table, the table meta will be used during submit, keep a ref
* to avoid it to be removed from cache * to avoid it to be removed from cache
*/ */
STableMeta *pTableMeta; STableMeta *pTableMeta;
@ -191,32 +190,28 @@ typedef struct SDataBlockList { // todo remove
} SDataBlockList; } SDataBlockList;
typedef struct SQueryInfo { typedef struct SQueryInfo {
int16_t command; // the command may be different for each subclause, so keep it seperately. int16_t command; // the command may be different for each subclause, so keep it seperately.
uint32_t type; // query/insert/import type uint32_t type; // query/insert/import type
char slidingTimeUnit; char slidingTimeUnit;
STimeWindow window; STimeWindow window;
int64_t intervalTime; // aggregation time interval int64_t intervalTime; // aggregation time interval
int64_t slidingTime; // sliding window in mseconds int64_t slidingTime; // sliding window in mseconds
SSqlGroupbyExpr groupbyExpr; // group by tags info SSqlGroupbyExpr groupbyExpr; // group by tags info
SArray * colList; // SArray<SColumn*>
SArray * colList; // SArray<SColumn*>
SFieldInfo fieldsInfo; SFieldInfo fieldsInfo;
SArray * exprList; // SArray<SSqlExpr*> SArray * exprList; // SArray<SSqlExpr*>
SLimitVal limit; SLimitVal limit;
SLimitVal slimit; SLimitVal slimit;
STagCond tagCond; STagCond tagCond;
SOrderVal order; SOrderVal order;
int16_t fillType; // final result fill type int16_t fillType; // final result fill type
int16_t numOfTables; int16_t numOfTables;
STableMetaInfo **pTableMetaInfo; STableMetaInfo **pTableMetaInfo;
struct STSBuf * tsBuf; struct STSBuf * tsBuf;
int64_t * fillVal; // default value for fill int64_t * fillVal; // default value for fill
char * msg; // pointer to the pCmd->payload to keep error message temporarily char * msg; // pointer to the pCmd->payload to keep error message temporarily
int64_t clauseLimit; // limit for current sub clause int64_t clauseLimit; // limit for current sub clause
int64_t prjOffset; // offset value in the original sql expression, only applied at client side
// offset value in the original sql expression, NOT sent to virtual node, only applied at client side
int64_t prjOffset;
} SQueryInfo; } SQueryInfo;
typedef struct { typedef struct {
@ -431,7 +426,7 @@ extern int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo);
typedef void (*__async_cb_func_t)(void *param, TAOS_RES *tres, int numOfRows); typedef void (*__async_cb_func_t)(void *param, TAOS_RES *tres, int numOfRows);
int32_t tscCompareTidTags(const void* p1, const void* p2); int32_t tscCompareTidTags(const void* p1, const void* p2);
void tscBuildVgroupTableInfo(STableMetaInfo* pTableMetaInfo, SArray* tables); void tscBuildVgroupTableInfo(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, SArray* tables);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -214,7 +214,6 @@ void taos_fetch_rows_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, int), voi
tscError("qhandle is NULL"); tscError("qhandle is NULL");
pRes->code = TSDB_CODE_TSC_INVALID_QHANDLE; pRes->code = TSDB_CODE_TSC_INVALID_QHANDLE;
tscQueueAsyncRes(pSql); tscQueueAsyncRes(pSql);
// tscQueueAsyncError(fp, param, TSDB_CODE_TSC_INVALID_QHANDLE);
return; return;
} }

View File

@ -3903,7 +3903,7 @@ static bool ts_comp_function_setup(SQLFunctionCtx *pCtx) {
SResultInfo *pResInfo = GET_RES_INFO(pCtx); SResultInfo *pResInfo = GET_RES_INFO(pCtx);
STSCompInfo *pInfo = pResInfo->interResultBuf; STSCompInfo *pInfo = pResInfo->interResultBuf;
pInfo->pTSBuf = tsBufCreate(false); pInfo->pTSBuf = tsBufCreate(false, pCtx->order);
pInfo->pTSBuf->tsOrder = pCtx->order; pInfo->pTSBuf->tsOrder = pCtx->order;
return true; return true;
} }
@ -3925,7 +3925,6 @@ static void ts_comp_function(SQLFunctionCtx *pCtx) {
} }
SET_VAL(pCtx, pCtx->size, 1); SET_VAL(pCtx, pCtx->size, 1);
pResInfo->hasResult = DATA_SET_FLAG; pResInfo->hasResult = DATA_SET_FLAG;
} }

View File

@ -1328,12 +1328,14 @@ int tsParseSql(SSqlObj *pSql, bool initialParse) {
int32_t ret = TSDB_CODE_SUCCESS; int32_t ret = TSDB_CODE_SUCCESS;
if (initialParse) { if (initialParse) {
assert(!pSql->cmd.parseFinished);
char* p = pSql->sqlstr; char* p = pSql->sqlstr;
pSql->sqlstr = NULL; pSql->sqlstr = NULL;
tscPartiallyFreeSqlObj(pSql); tscPartiallyFreeSqlObj(pSql);
pSql->sqlstr = p; pSql->sqlstr = p;
} else { } else if (!pSql->cmd.parseFinished) {
tscTrace("continue parse sql: %s", pSql->cmd.curSql); tscTrace("continue parse sql: %s", pSql->cmd.curSql);
} }

View File

@ -98,8 +98,6 @@ static int32_t setKillInfo(SSqlObj* pSql, struct SSqlInfo* pInfo, int32_t killTy
static bool validateOneTags(SSqlCmd* pCmd, TAOS_FIELD* pTagField); static bool validateOneTags(SSqlCmd* pCmd, TAOS_FIELD* pTagField);
static bool hasTimestampForPointInterpQuery(SQueryInfo* pQueryInfo); static bool hasTimestampForPointInterpQuery(SQueryInfo* pQueryInfo);
static void updateTagColumnIndex(SQueryInfo* pQueryInfo, int32_t tableIndex);
static int32_t parseLimitClause(SQueryInfo* pQueryInfo, int32_t index, SQuerySQL* pQuerySql, SSqlObj* pSql); static int32_t parseLimitClause(SQueryInfo* pQueryInfo, int32_t index, SQuerySQL* pQuerySql, SSqlObj* pSql);
static int32_t parseCreateDBOptions(SSqlCmd* pCmd, SCreateDBInfo* pCreateDbSql); static int32_t parseCreateDBOptions(SSqlCmd* pCmd, SCreateDBInfo* pCreateDbSql);
static int32_t getColumnIndexByName(const SSQLToken* pToken, SQueryInfo* pQueryInfo, SColumnIndex* pIndex); static int32_t getColumnIndexByName(const SSQLToken* pToken, SQueryInfo* pQueryInfo, SColumnIndex* pIndex);
@ -640,17 +638,11 @@ int32_t parseIntervalClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql) {
return TSDB_CODE_TSC_INVALID_SQL; return TSDB_CODE_TSC_INVALID_SQL;
} }
SSchema s = {.bytes = TSDB_KEYSIZE, .type = TSDB_DATA_TYPE_TIMESTAMP, .colId = PRIMARYKEY_TIMESTAMP_COL_INDEX};
tstrncpy(s.name, aAggs[TSDB_FUNC_TS].aName, sizeof(s.name));
SColumnIndex index = {tableIndex, PRIMARYKEY_TIMESTAMP_COL_INDEX}; SColumnIndex index = {tableIndex, PRIMARYKEY_TIMESTAMP_COL_INDEX};
SSqlExpr* pExpr = tscSqlExprInsert(pQueryInfo, 0, TSDB_FUNC_TS, &index, TSDB_DATA_TYPE_TIMESTAMP, TSDB_KEYSIZE, tscAddSpecialColumnForSelect(pQueryInfo, 0, TSDB_FUNC_TS, &index, &s, TSDB_COL_NORMAL);
TSDB_KEYSIZE, false);
SColumnList ids = getColumnList(1, 0, PRIMARYKEY_TIMESTAMP_COL_INDEX);
int32_t ret =
insertResultField(pQueryInfo, 0, &ids, TSDB_KEYSIZE, TSDB_DATA_TYPE_TIMESTAMP, aAggs[TSDB_FUNC_TS].aName, pExpr);
if (ret != TSDB_CODE_SUCCESS) {
return ret;
}
if (parseSlidingClause(pQueryInfo, pQuerySql) != TSDB_CODE_SUCCESS) { if (parseSlidingClause(pQueryInfo, pQuerySql) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_SQL; return TSDB_CODE_TSC_INVALID_SQL;
@ -1241,11 +1233,11 @@ int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSel
return invalidSqlErrMsg(pQueryInfo->msg, msg2); return invalidSqlErrMsg(pQueryInfo->msg, msg2);
} }
/*
* transfer sql functions that need secondary merge into another format
* in dealing with metric queries such as: count/first/last
*/
if (isSTable) { if (isSTable) {
/*
* transfer sql functions that need secondary merge into another format
* in dealing with metric queries such as: count/first/last
*/
tscTansformSQLFuncForSTableQuery(pQueryInfo); tscTansformSQLFuncForSTableQuery(pQueryInfo);
if (hasUnsupportFunctionsForSTableQuery(pQueryInfo)) { if (hasUnsupportFunctionsForSTableQuery(pQueryInfo)) {
@ -1272,7 +1264,7 @@ int32_t insertResultField(SQueryInfo* pQueryInfo, int32_t outputIndex, SColumnLi
} }
TAOS_FIELD f = tscCreateField(type, fieldName, bytes); TAOS_FIELD f = tscCreateField(type, fieldName, bytes);
SFieldSupInfo* pInfo =tscFieldInfoInsert(&pQueryInfo->fieldsInfo, outputIndex, &f); SFieldSupInfo* pInfo = tscFieldInfoInsert(&pQueryInfo->fieldsInfo, outputIndex, &f);
pInfo->pSqlExpr = pSqlExpr; pInfo->pSqlExpr = pSqlExpr;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -1324,8 +1316,9 @@ static void addProjectQueryCol(SQueryInfo* pQueryInfo, int32_t startPos, SColumn
void tscAddSpecialColumnForSelect(SQueryInfo* pQueryInfo, int32_t outputColIndex, int16_t functionId, void tscAddSpecialColumnForSelect(SQueryInfo* pQueryInfo, int32_t outputColIndex, int16_t functionId,
SColumnIndex* pIndex, SSchema* pColSchema, int16_t flag) { SColumnIndex* pIndex, SSchema* pColSchema, int16_t flag) {
SSqlExpr* pExpr = tscSqlExprAppend(pQueryInfo, functionId, pIndex, pColSchema->type, SSqlExpr* pExpr = tscSqlExprInsert(pQueryInfo, outputColIndex, functionId, pIndex, pColSchema->type,
pColSchema->bytes, pColSchema->bytes, flag); pColSchema->bytes, pColSchema->bytes, flag);
tstrncpy(pExpr->aliasName, pColSchema->name, sizeof(pExpr->aliasName));
SColumnList ids = getColumnList(1, pIndex->tableIndex, pIndex->columnIndex); SColumnList ids = getColumnList(1, pIndex->tableIndex, pIndex->columnIndex);
if (TSDB_COL_IS_TAG(flag)) { if (TSDB_COL_IS_TAG(flag)) {
@ -1403,7 +1396,7 @@ int32_t addProjectionExprAndResultField(SQueryInfo* pQueryInfo, tSQLExprItem* pI
if (index.columnIndex == TSDB_TBNAME_COLUMN_INDEX) { if (index.columnIndex == TSDB_TBNAME_COLUMN_INDEX) {
SSchema colSchema = tGetTableNameColumnSchema(); SSchema colSchema = tGetTableNameColumnSchema();
tscAddSpecialColumnForSelect(pQueryInfo, startPos, TSDB_FUNC_TAGPRJ, &index, &colSchema, true); tscAddSpecialColumnForSelect(pQueryInfo, startPos, TSDB_FUNC_TAGPRJ, &index, &colSchema, TSDB_COL_TAG);
} else { } else {
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex);
STableMeta* pTableMeta = pTableMetaInfo->pTableMeta; STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
@ -2470,62 +2463,10 @@ static bool functionCompatibleCheck(SQueryInfo* pQueryInfo) {
return true; return true;
} }
void updateTagColumnIndex(SQueryInfo* pQueryInfo, int32_t tableIndex) {
// STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, tableIndex);
//
// // update tags column index for expression
// size_t size = tscSqlExprNumOfExprs(pQueryInfo);
// for (int32_t i = 0; i < size; ++i) {
// SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);
//
// if (!TSDB_COL_IS_TAG(pExpr->colInfo.flag)) { // not tags, continue
// continue;
// }
//
// // not belongs to this table
// if (pExpr->uid != pTableMetaInfo->pTableMeta->uid) {
// continue;
// }
// for (int32_t j = 0; j < pTableMetaInfo->numOfTags; ++j) {
// if (pExpr->colInfo.colIndex == pTableMetaInfo->tagColumnIndex[j]) {
// pExpr->colInfo.colIndex = j;
// break;
// }
// }
// }
// update join condition tag column index
// SJoinInfo* pJoinInfo = &pQueryInfo->tagCond.joinInfo;
// if (!pJoinInfo->hasJoin) { // not join query
// return;
// }
//
// assert(pJoinInfo->left.uid != pJoinInfo->right.uid);
//
// // the join condition expression node belongs to this table(super table)
// assert(0);
// if (pTableMetaInfo->pTableMeta->uid == pJoinInfo->left.uid) {
// for (int32_t i = 0; i < pTableMetaInfo->numOfTags; ++i) {
// if (pJoinInfo->left.tagCol == pTableMetaInfo->tagColumnIndex[i]) {
// pJoinInfo->left.tagCol = i;
// }
// }
// }
//
// if (pTableMetaInfo->pTableMeta->uid == pJoinInfo->right.uid) {
// for (int32_t i = 0; i < pTableMetaInfo->numOfTags; ++i) {
// if (pJoinInfo->right.tagCol == pTableMetaInfo->tagColumnIndex[i]) {
// pJoinInfo->right.tagCol = i;
// }
// }
// }
}
int32_t parseGroupbyClause(SQueryInfo* pQueryInfo, tVariantList* pList, SSqlCmd* pCmd) { int32_t parseGroupbyClause(SQueryInfo* pQueryInfo, tVariantList* pList, SSqlCmd* pCmd) {
const char* msg1 = "too many columns in group by clause"; const char* msg1 = "too many columns in group by clause";
const char* msg2 = "invalid column name in group by clause"; const char* msg2 = "invalid column name in group by clause";
const char* msg3 = "group by columns must belong to one table"; // const char* msg3 = "group by columns must belong to one table";
const char* msg7 = "not support group by expression"; const char* msg7 = "not support group by expression";
const char* msg8 = "not allowed column type for group by"; const char* msg8 = "not allowed column type for group by";
const char* msg9 = "tags not allowed for table query"; const char* msg9 = "tags not allowed for table query";
@ -2561,10 +2502,6 @@ int32_t parseGroupbyClause(SQueryInfo* pQueryInfo, tVariantList* pList, SSqlCmd*
return invalidSqlErrMsg(pQueryInfo->msg, msg2); return invalidSqlErrMsg(pQueryInfo->msg, msg2);
} }
if (tableIndex != index.tableIndex && tableIndex >= 0) {
return invalidSqlErrMsg(pQueryInfo->msg, msg3);
}
tableIndex = index.tableIndex; tableIndex = index.tableIndex;
pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex); pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex);
@ -2621,7 +2558,6 @@ int32_t parseGroupbyClause(SQueryInfo* pQueryInfo, tVariantList* pList, SSqlCmd*
} }
pQueryInfo->groupbyExpr.tableIndex = tableIndex; pQueryInfo->groupbyExpr.tableIndex = tableIndex;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -3051,14 +2987,17 @@ static int32_t getColumnQueryCondInfo(SQueryInfo* pQueryInfo, tSQLExpr* pExpr, i
} }
static int32_t getJoinCondInfo(SQueryInfo* pQueryInfo, tSQLExpr* pExpr) { static int32_t getJoinCondInfo(SQueryInfo* pQueryInfo, tSQLExpr* pExpr) {
const char* msg = "invalid join query condition"; const char* msg1 = "invalid join query condition";
const char* msg2 = "join on binary/nchar not supported";
const char* msg3 = "type of join columns must be identical";
const char* msg4 = "invalid column name in join condition";
if (pExpr == NULL) { if (pExpr == NULL) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if (!isExprDirectParentOfLeaftNode(pExpr)) { if (!isExprDirectParentOfLeaftNode(pExpr)) {
return invalidSqlErrMsg(pQueryInfo->msg, msg); return invalidSqlErrMsg(pQueryInfo->msg, msg1);
} }
STagCond* pTagCond = &pQueryInfo->tagCond; STagCond* pTagCond = &pQueryInfo->tagCond;
@ -3067,28 +3006,36 @@ static int32_t getJoinCondInfo(SQueryInfo* pQueryInfo, tSQLExpr* pExpr) {
SColumnIndex index = COLUMN_INDEX_INITIALIZER; SColumnIndex index = COLUMN_INDEX_INITIALIZER;
if (getColumnIndexByName(&pExpr->pLeft->colInfo, pQueryInfo, &index) != TSDB_CODE_SUCCESS) { if (getColumnIndexByName(&pExpr->pLeft->colInfo, pQueryInfo, &index) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_SQL; return invalidSqlErrMsg(pQueryInfo->msg, msg4);
} }
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex);
int16_t tagColIndex = index.columnIndex - tscGetNumOfColumns(pTableMetaInfo->pTableMeta); SSchema* pTagSchema1 = tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, index.columnIndex);
pLeft->uid = pTableMetaInfo->pTableMeta->uid; pLeft->uid = pTableMetaInfo->pTableMeta->uid;
pLeft->tagCol = tagColIndex; pLeft->tagColId = pTagSchema1->colId;
strcpy(pLeft->tableId, pTableMetaInfo->name); strcpy(pLeft->tableId, pTableMetaInfo->name);
index = (SColumnIndex)COLUMN_INDEX_INITIALIZER; index = (SColumnIndex)COLUMN_INDEX_INITIALIZER;
if (getColumnIndexByName(&pExpr->pRight->colInfo, pQueryInfo, &index) != TSDB_CODE_SUCCESS) { if (getColumnIndexByName(&pExpr->pRight->colInfo, pQueryInfo, &index) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_SQL; return invalidSqlErrMsg(pQueryInfo->msg, msg4);
} }
pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex); pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex);
tagColIndex = index.columnIndex - tscGetNumOfColumns(pTableMetaInfo->pTableMeta); SSchema* pTagSchema2 = tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, index.columnIndex);
pRight->uid = pTableMetaInfo->pTableMeta->uid; pRight->uid = pTableMetaInfo->pTableMeta->uid;
pRight->tagCol = tagColIndex; pRight->tagColId = pTagSchema2->colId;
strcpy(pRight->tableId, pTableMetaInfo->name); strcpy(pRight->tableId, pTableMetaInfo->name);
if (pTagSchema1->type != pTagSchema2->type) {
return invalidSqlErrMsg(pQueryInfo->msg, msg3);
}
if (pTagSchema1->type == TSDB_DATA_TYPE_BINARY || pTagSchema1->type == TSDB_DATA_TYPE_NCHAR) {
return invalidSqlErrMsg(pQueryInfo->msg, msg2);
}
pTagCond->joinInfo.hasJoin = true; pTagCond->joinInfo.hasJoin = true;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -3816,6 +3763,10 @@ static int32_t getTagQueryCondExpr(SQueryInfo* pQueryInfo, SCondExpr* pCondExpr,
for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) { for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
tSQLExpr* p1 = extractExprForSTable(pExpr, pQueryInfo, i); tSQLExpr* p1 = extractExprForSTable(pExpr, pQueryInfo, i);
if (p1 == NULL) { // no query condition on this table
continue;
}
tExprNode* p = NULL; tExprNode* p = NULL;
SArray* colList = taosArrayInit(10, sizeof(SColIndex)); SArray* colList = taosArrayInit(10, sizeof(SColIndex));
@ -4980,7 +4931,7 @@ void addGroupInfoForSubquery(SSqlObj* pParentObj, SSqlObj* pSql, int32_t subClau
if (pExpr->functionId != TSDB_FUNC_TAG) { if (pExpr->functionId != TSDB_FUNC_TAG) {
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, tableIndex); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, tableIndex);
int16_t columnInfo = tscGetJoinTagColIndexByUid(&pQueryInfo->tagCond, pTableMetaInfo->pTableMeta->uid); int16_t columnInfo = tscGetJoinTagColIdByUid(&pQueryInfo->tagCond, pTableMetaInfo->pTableMeta->uid);
SColumnIndex index = {.tableIndex = 0, .columnIndex = columnInfo}; SColumnIndex index = {.tableIndex = 0, .columnIndex = columnInfo};
SSchema* pSchema = tscGetTableTagSchema(pTableMetaInfo->pTableMeta); SSchema* pSchema = tscGetTableTagSchema(pTableMetaInfo->pTableMeta);
@ -5016,27 +4967,17 @@ static void doLimitOutputNormalColOfGroupby(SSqlExpr* pExpr) {
void doAddGroupColumnForSubquery(SQueryInfo* pQueryInfo, int32_t tagIndex) { void doAddGroupColumnForSubquery(SQueryInfo* pQueryInfo, int32_t tagIndex) {
SColIndex* pColIndex = taosArrayGet(pQueryInfo->groupbyExpr.columnInfo, tagIndex); SColIndex* pColIndex = taosArrayGet(pQueryInfo->groupbyExpr.columnInfo, tagIndex);
int32_t index = pColIndex->colIndex; size_t size = tscSqlExprNumOfExprs(pQueryInfo);
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
SSchema* pSchema = tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, index); SSchema* pSchema = tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, pColIndex->colIndex);
SColumnIndex colIndex = {.tableIndex = 0, .columnIndex = index}; SColumnIndex colIndex = {.tableIndex = 0, .columnIndex = pColIndex->colIndex};
size_t size = tscSqlExprNumOfExprs(pQueryInfo); tscAddSpecialColumnForSelect(pQueryInfo, size, TSDB_FUNC_PRJ, &colIndex, pSchema, TSDB_COL_NORMAL);
SSqlExpr* pExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_PRJ, &colIndex, pSchema->type, pSchema->bytes,
pSchema->bytes, false);
pExpr->colInfo.flag = TSDB_COL_NORMAL; SFieldSupInfo* pInfo = tscFieldInfoGetSupp(&pQueryInfo->fieldsInfo, size);
doLimitOutputNormalColOfGroupby(pExpr); doLimitOutputNormalColOfGroupby(pInfo->pSqlExpr);
// NOTE: tag column does not add to source column list
SColumnList list = {0};
list.num = 1;
list.ids[0] = colIndex;
insertResultField(pQueryInfo, size, &list, pSchema->bytes, pSchema->type, pSchema->name, pExpr);
SFieldSupInfo* pInfo = tscFieldInfoGetSupp(&pQueryInfo->fieldsInfo, size - 1);
pInfo->visible = false; pInfo->visible = false;
} }
@ -5248,6 +5189,7 @@ static int32_t doAddGroupbyColumnsOnDemand(SQueryInfo* pQueryInfo) {
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
SSchema s = tGetTableNameColumnSchema();
SSchema* pSchema = tscGetTableSchema(pTableMetaInfo->pTableMeta); SSchema* pSchema = tscGetTableSchema(pTableMetaInfo->pTableMeta);
int16_t bytes = 0; int16_t bytes = 0;
int16_t type = 0; int16_t type = 0;
@ -5258,7 +5200,6 @@ static int32_t doAddGroupbyColumnsOnDemand(SQueryInfo* pQueryInfo) {
int16_t colIndex = pColIndex->colIndex; int16_t colIndex = pColIndex->colIndex;
if (colIndex == TSDB_TBNAME_COLUMN_INDEX) { if (colIndex == TSDB_TBNAME_COLUMN_INDEX) {
SSchema s = tGetTableNameColumnSchema();
type = s.type; type = s.type;
bytes = s.bytes; bytes = s.bytes;
name = s.name; name = s.name;
@ -5955,10 +5896,6 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) {
setColumnOffsetValueInResultset(pQueryInfo); setColumnOffsetValueInResultset(pQueryInfo);
for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
updateTagColumnIndex(pQueryInfo, i);
}
/* /*
* fill options are set at the end position, when all columns are set properly * fill options are set at the end position, when all columns are set properly
* the columns may be increased due to group by operation * the columns may be increased due to group by operation

View File

@ -64,14 +64,6 @@ SSchema* tscGetTableTagSchema(const STableMeta* pTableMeta) {
STableComInfo tscGetTableInfo(const STableMeta* pTableMeta) { STableComInfo tscGetTableInfo(const STableMeta* pTableMeta) {
assert(pTableMeta != NULL); assert(pTableMeta != NULL);
#if 0
if (pTableMeta->tableType == TSDB_CHILD_TABLE) {
assert (pTableMeta->pSTable != NULL);
return pTableMeta->pSTable->tableInfo;
}
#endif
return pTableMeta->tableInfo; return pTableMeta->tableInfo;
} }
@ -119,11 +111,24 @@ bool isValidSchema(struct SSchema* pSchema, int32_t numOfCols) {
return (rowLen <= TSDB_MAX_BYTES_PER_ROW); return (rowLen <= TSDB_MAX_BYTES_PER_ROW);
} }
SSchema* tscGetTableColumnSchema(const STableMeta* pTableMeta, int32_t startCol) { SSchema* tscGetTableColumnSchema(const STableMeta* pTableMeta, int32_t colIndex) {
assert(pTableMeta != NULL); assert(pTableMeta != NULL);
SSchema* pSchema = (SSchema*) pTableMeta->schema; SSchema* pSchema = (SSchema*) pTableMeta->schema;
return &pSchema[startCol]; return &pSchema[colIndex];
}
// TODO for large number of columns, employ the binary search method
SSchema* tscGetTableColumnSchemaById(STableMeta* pTableMeta, int16_t colId) {
STableComInfo tinfo = tscGetTableInfo(pTableMeta);
for(int32_t i = 0; i < tinfo.numOfColumns + tinfo.numOfTags; ++i) {
if (pTableMeta->schema[i].colId == colId) {
return &pTableMeta->schema[i];
}
}
return NULL;
} }
struct SSchema tscGetTbnameColumnSchema() { struct SSchema tscGetTbnameColumnSchema() {

View File

@ -1092,14 +1092,6 @@ static int64_t getNumOfResultLocal(SQueryInfo *pQueryInfo, SQLFunctionCtx *pCtx)
size_t size = tscSqlExprNumOfExprs(pQueryInfo); size_t size = tscSqlExprNumOfExprs(pQueryInfo);
for (int32_t j = 0; j < size; ++j) { for (int32_t j = 0; j < size; ++j) {
// SSqlExpr* pExpr = pQueryInfo->fieldsInfo.pSqlExpr[j];
// if (pExpr == NULL) {
// assert(pQueryInfo->fieldsInfo.pExpr[j] != NULL);
//
// maxOutput = 1;
// continue;
// }
/* /*
* ts, tag, tagprj function can not decide the output number of current query * ts, tag, tagprj function can not decide the output number of current query
* the number of output result is decided by main output * the number of output result is decided by main output
@ -1109,8 +1101,9 @@ static int64_t getNumOfResultLocal(SQueryInfo *pQueryInfo, SQLFunctionCtx *pCtx)
continue; continue;
} }
if (maxOutput < GET_RES_INFO(&pCtx[j])->numOfRes) { SResultInfo* pResInfo = GET_RES_INFO(&pCtx[j]);
maxOutput = GET_RES_INFO(&pCtx[j])->numOfRes; if (maxOutput < pResInfo->numOfRes) {
maxOutput = pResInfo->numOfRes;
} }
} }
@ -1260,7 +1253,6 @@ bool doGenerateFinalResults(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool no
#ifdef _DEBUG_VIEW #ifdef _DEBUG_VIEW
printf("final result before interpo:\n"); printf("final result before interpo:\n");
assert(0);
// tColModelDisplay(pLocalReducer->resColModel, pLocalReducer->pBufForInterpo, pResBuf->num, pResBuf->num); // tColModelDisplay(pLocalReducer->resColModel, pLocalReducer->pBufForInterpo, pResBuf->num, pResBuf->num);
#endif #endif

View File

@ -644,7 +644,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pQueryMsg->numOfGroupCols = htons(pQueryInfo->groupbyExpr.numOfGroupCols); pQueryMsg->numOfGroupCols = htons(pQueryInfo->groupbyExpr.numOfGroupCols);
pQueryMsg->numOfTags = htonl(numOfTags); pQueryMsg->numOfTags = htonl(numOfTags);
pQueryMsg->tagNameRelType = htons(pQueryInfo->tagCond.relType); pQueryMsg->tagNameRelType = htons(pQueryInfo->tagCond.relType);
pQueryMsg->queryType = htons(pQueryInfo->type); pQueryMsg->queryType = htonl(pQueryInfo->type);
size_t numOfOutput = tscSqlExprNumOfExprs(pQueryInfo); size_t numOfOutput = tscSqlExprNumOfExprs(pQueryInfo);
pQueryMsg->numOfOutput = htons(numOfOutput); pQueryMsg->numOfOutput = htons(numOfOutput);
@ -723,6 +723,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pMsg += sizeof(SSqlFuncMsg); pMsg += sizeof(SSqlFuncMsg);
for (int32_t j = 0; j < pExpr->numOfParams; ++j) { for (int32_t j = 0; j < pExpr->numOfParams; ++j) {
// todo add log
pSqlFuncExpr->arg[j].argType = htons((uint16_t)pExpr->param[j].nType); pSqlFuncExpr->arg[j].argType = htons((uint16_t)pExpr->param[j].nType);
pSqlFuncExpr->arg[j].argBytes = htons(pExpr->param[j].nLen); pSqlFuncExpr->arg[j].argBytes = htons(pExpr->param[j].nLen);
@ -1175,7 +1176,7 @@ int tscBuildCreateTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
strcpy(pCreateTableMsg->tableId, pTableMetaInfo->name); strcpy(pCreateTableMsg->tableId, pTableMetaInfo->name);
// use dbinfo from table id without modifying current db info // use dbinfo from table id without modifying current db info
tscGetDBInfoFromMeterId(pTableMetaInfo->name, pCreateTableMsg->db); tscGetDBInfoFromTableFullName(pTableMetaInfo->name, pCreateTableMsg->db);
SCreateTableSQL *pCreateTable = pInfo->pCreateTableInfo; SCreateTableSQL *pCreateTable = pInfo->pCreateTableInfo;
@ -1252,7 +1253,7 @@ int tscBuildAlterTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
} }
SCMAlterTableMsg *pAlterTableMsg = (SCMAlterTableMsg *)pCmd->payload; SCMAlterTableMsg *pAlterTableMsg = (SCMAlterTableMsg *)pCmd->payload;
tscGetDBInfoFromMeterId(pTableMetaInfo->name, pAlterTableMsg->db); tscGetDBInfoFromTableFullName(pTableMetaInfo->name, pAlterTableMsg->db);
strcpy(pAlterTableMsg->tableId, pTableMetaInfo->name); strcpy(pAlterTableMsg->tableId, pTableMetaInfo->name);
pAlterTableMsg->type = htons(pAlterInfo->type); pAlterTableMsg->type = htons(pAlterInfo->type);
@ -1577,7 +1578,7 @@ int tscBuildSTableVgroupMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pMsg = pStart; pMsg = pStart;
SMgmtHead *pMgmt = (SMgmtHead *)pMsg; SMgmtHead *pMgmt = (SMgmtHead *)pMsg;
tscGetDBInfoFromMeterId(pTableMetaInfo->name, pMgmt->db); tscGetDBInfoFromTableFullName(pTableMetaInfo->name, pMgmt->db);
pMsg += sizeof(SMgmtHead); pMsg += sizeof(SMgmtHead);

View File

@ -238,7 +238,7 @@ static int tscUpdateSubscription(STscObj* pObj, SSub* pSub) {
if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
taosArraySort( tables, tscCompareTidTags ); taosArraySort( tables, tscCompareTidTags );
tscBuildVgroupTableInfo( pTableMetaInfo, tables ); tscBuildVgroupTableInfo(pSql, pTableMetaInfo, tables);
} }
taosArrayDestroy(tables); taosArrayDestroy(tables);

View File

@ -28,7 +28,7 @@ typedef struct SInsertSupporter {
} SInsertSupporter; } SInsertSupporter;
static void freeJoinSubqueryObj(SSqlObj* pSql); static void freeJoinSubqueryObj(SSqlObj* pSql);
static bool tscHashRemainDataInSubqueryResultSet(SSqlObj *pSql); static bool tscHasRemainDataInSubqueryResultSet(SSqlObj *pSql);
static bool tsCompare(int32_t order, int64_t left, int64_t right) { static bool tsCompare(int32_t order, int64_t left, int64_t right) {
if (order == TSDB_ORDER_ASC) { if (order == TSDB_ORDER_ASC) {
@ -38,16 +38,15 @@ static bool tsCompare(int32_t order, int64_t left, int64_t right) {
} }
} }
static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSupporter* pSupporter1, static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSupporter* pSupporter1, SJoinSupporter* pSupporter2, STimeWindow * win) {
SJoinSupporter* pSupporter2, TSKEY* st, TSKEY* et) {
STSBuf* output1 = tsBufCreate(true);
STSBuf* output2 = tsBufCreate(true);
*st = INT64_MAX;
*et = INT64_MIN;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex); SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex);
STSBuf* output1 = tsBufCreate(true, pQueryInfo->order.order);
STSBuf* output2 = tsBufCreate(true, pQueryInfo->order.order);
win->skey = INT64_MAX;
win->ekey = INT64_MIN;
SLimitVal* pLimit = &pQueryInfo->limit; SLimitVal* pLimit = &pQueryInfo->limit;
int32_t order = pQueryInfo->order.order; int32_t order = pQueryInfo->order.order;
@ -106,12 +105,12 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSupporter* pSupporter1,
* final results which is acquired after the secondry merge of in the client. * final results which is acquired after the secondry merge of in the client.
*/ */
if (pLimit->offset == 0 || pQueryInfo->intervalTime > 0 || QUERY_IS_STABLE_QUERY(pQueryInfo->type)) { if (pLimit->offset == 0 || pQueryInfo->intervalTime > 0 || QUERY_IS_STABLE_QUERY(pQueryInfo->type)) {
if (*st > elem1.ts) { if (win->skey > elem1.ts) {
*st = elem1.ts; win->skey = elem1.ts;
} }
if (*et < elem1.ts) { if (win->ekey < elem1.ts) {
*et = elem1.ts; win->ekey = elem1.ts;
} }
tsBufAppend(output1, elem1.vnode, elem1.tag, (const char*)&elem1.ts, sizeof(elem1.ts)); tsBufAppend(output1, elem1.vnode, elem1.tag, (const char*)&elem1.ts, sizeof(elem1.ts));
@ -151,8 +150,8 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSupporter* pSupporter1,
tsBufDestory(pSupporter2->pTSBuf); tsBufDestory(pSupporter2->pTSBuf);
tscTrace("%p input1:%" PRId64 ", input2:%" PRId64 ", final:%" PRId64 " for secondary query after ts blocks " tscTrace("%p input1:%" PRId64 ", input2:%" PRId64 ", final:%" PRId64 " for secondary query after ts blocks "
"intersecting, skey:%" PRId64 ", ekey:%" PRId64, pSql, "intersecting, skey:%" PRId64 ", ekey:%" PRId64, pSql, numOfInput1, numOfInput2, output1->numOfTotal,
numOfInput1, numOfInput2, output1->numOfTotal, *st, *et); win->skey, win->ekey);
return output1->numOfTotal; return output1->numOfTotal;
} }
@ -252,7 +251,7 @@ int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql) {
assert(numOfSub > 0); assert(numOfSub > 0);
// scan all subquery, if one sub query has only ts, ignore it // scan all subquery, if one sub query has only ts, ignore it
tscTrace("%p start to launch secondary subqueries, total:%d, only:%d needs to query, others are not retrieve in " tscTrace("%p start to launch secondary subquery, total:%d, only:%d needs to query, others are not retrieve in "
"select clause", pSql, pSql->numOfSubs, numOfSub); "select clause", pSql, pSql->numOfSubs, numOfSub);
//the subqueries that do not actually launch the secondary query to virtual node is set as completed. //the subqueries that do not actually launch the secondary query to virtual node is set as completed.
@ -329,32 +328,37 @@ int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql) {
pSupporter->limit = pQueryInfo->limit; pSupporter->limit = pQueryInfo->limit;
pNewQueryInfo->limit = pSupporter->limit; pNewQueryInfo->limit = pSupporter->limit;
// fetch the join tag column
if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
SSqlExpr* pExpr = tscSqlExprGet(pNewQueryInfo, 0);
assert(pQueryInfo->tagCond.joinInfo.hasJoin);
int16_t tagColIndex = tscGetJoinTagColIndexByUid(&pQueryInfo->tagCond, pTableMetaInfo->pTableMeta->uid);
pExpr->param[0].i64Key = tagColIndex;
pExpr->numOfParams = 1;
}
SColumnIndex index = {.tableIndex = 0, .columnIndex = PRIMARYKEY_TIMESTAMP_COL_INDEX}; SColumnIndex index = {.tableIndex = 0, .columnIndex = PRIMARYKEY_TIMESTAMP_COL_INDEX};
SSchema* s = tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, 0); SSchema* s = tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, 0);
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, 0); SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, 0);
int16_t funcId = pExpr->functionId;
if ((pExpr->colInfo.colId != PRIMARYKEY_TIMESTAMP_COL_INDEX) || if ((pExpr->colInfo.colId != PRIMARYKEY_TIMESTAMP_COL_INDEX) ||
(pExpr->functionId != TSDB_FUNC_TS || pExpr->functionId != TSDB_FUNC_TS_DUMMY)) { (funcId != TSDB_FUNC_TS && funcId != TSDB_FUNC_TS_DUMMY && funcId != TSDB_FUNC_PRJ)) {
tscAddSpecialColumnForSelect(pQueryInfo, 0, TSDB_FUNC_PRJ, &index, s, 0);
int16_t functionId = tscIsProjectionQuery(pQueryInfo)? TSDB_FUNC_PRJ : TSDB_FUNC_TS;
tscAddSpecialColumnForSelect(pQueryInfo, 0, functionId, &index, s, TSDB_COL_NORMAL);
tscPrintSelectClause(pNew, 0); tscPrintSelectClause(pNew, 0);
tscFieldInfoUpdateOffset(pNewQueryInfo); tscFieldInfoUpdateOffset(pNewQueryInfo);
pExpr = tscSqlExprGet(pQueryInfo, 0);
}
// set the join condition tag column info, to do extract method
if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
assert(pQueryInfo->tagCond.joinInfo.hasJoin);
int16_t colId = tscGetJoinTagColIdByUid(&pQueryInfo->tagCond, pTableMetaInfo->pTableMeta->uid);
pExpr->param[0].i64Key = colId;
pExpr->numOfParams = 1;
} }
size_t numOfCols = taosArrayGetSize(pNewQueryInfo->colList); size_t numOfCols = taosArrayGetSize(pNewQueryInfo->colList);
tscTrace("%p subquery:%p tableIndex:%d, vgroupIndex:%d, type:%d, exprInfo:%d, colList:%d, fieldsInfo:%d, name:%s", tscTrace("%p subquery:%p tableIndex:%d, vgroupIndex:%d, type:%d, exprInfo:%d, colList:%d, fieldsInfo:%d, name:%s",
pSql, pNew, 0, pTableMetaInfo->vgroupIndex, pNewQueryInfo->type, pSql, pNew, 0, pTableMetaInfo->vgroupIndex, pNewQueryInfo->type, taosArrayGetSize(pNewQueryInfo->exprList),
taosArrayGetSize(pNewQueryInfo->exprList), numOfCols, numOfCols, pNewQueryInfo->fieldsInfo.numOfOutput, pTableMetaInfo->name);
pNewQueryInfo->fieldsInfo.numOfOutput, pNewQueryInfo->pTableMetaInfo[0]->name);
} }
//prepare the subqueries object failed, abort //prepare the subqueries object failed, abort
@ -368,12 +372,10 @@ int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql) {
} }
for(int32_t i = 0; i < pSql->numOfSubs; ++i) { for(int32_t i = 0; i < pSql->numOfSubs; ++i) {
SSqlObj* pSub = pSql->pSubs[i]; if (pSql->pSubs[i] == NULL) {
if (pSub == NULL) {
continue; continue;
} }
tscDoQuery(pSql->pSubs[i]);
tscProcessSql(pSub);
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -414,11 +416,9 @@ static void quitAllSubquery(SSqlObj* pSqlObj, SJoinSupporter* pSupporter) {
} }
// update the query time range according to the join results on timestamp // update the query time range according to the join results on timestamp
static void updateQueryTimeRange(SQueryInfo* pQueryInfo, int64_t st, int64_t et) { static void updateQueryTimeRange(SQueryInfo* pQueryInfo, STimeWindow* win) {
assert(pQueryInfo->window.skey <= st && pQueryInfo->window.ekey >= et); assert(pQueryInfo->window.skey <= win->skey && pQueryInfo->window.ekey >= win->ekey);
pQueryInfo->window = *win;
pQueryInfo->window.skey = st;
pQueryInfo->window.ekey = et;
} }
static void tSIntersectionAndLaunchSecQuery(SJoinSupporter* pSupporter, SSqlObj* pSql) { static void tSIntersectionAndLaunchSecQuery(SJoinSupporter* pSupporter, SSqlObj* pSql) {
@ -462,13 +462,13 @@ static void tSIntersectionAndLaunchSecQuery(SJoinSupporter* pSupporter, SSqlObj*
SJoinSupporter* p1 = pParentSql->pSubs[0]->param; SJoinSupporter* p1 = pParentSql->pSubs[0]->param;
SJoinSupporter* p2 = pParentSql->pSubs[1]->param; SJoinSupporter* p2 = pParentSql->pSubs[1]->param;
TSKEY st, et; STimeWindow win = TSWINDOW_INITIALIZER;
int64_t num = doTSBlockIntersect(pParentSql, p1, p2, &st, &et); int64_t num = doTSBlockIntersect(pParentSql, p1, p2, &win);
if (num <= 0) { // no result during ts intersect if (num <= 0) { // no result during ts intersect
tscTrace("%p free all sub SqlObj and quit", pParentSql); tscTrace("%p free all sub SqlObj and quit", pParentSql);
freeJoinSubqueryObj(pParentSql); freeJoinSubqueryObj(pParentSql);
} else { } else {
updateQueryTimeRange(pParentQueryInfo, st, et); updateQueryTimeRange(pParentQueryInfo, &win);
tscLaunchSecondPhaseSubqueries(pParentSql); tscLaunchSecondPhaseSubqueries(pParentSql);
} }
} }
@ -487,7 +487,7 @@ int32_t tscCompareTidTags(const void* p1, const void* p2) {
return 0; return 0;
} }
void tscBuildVgroupTableInfo(STableMetaInfo* pTableMetaInfo, SArray* tables) { void tscBuildVgroupTableInfo(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, SArray* tables) {
SArray* result = taosArrayInit(4, sizeof(SVgroupTableInfo)); SArray* result = taosArrayInit(4, sizeof(SVgroupTableInfo));
SArray* vgTables = NULL; SArray* vgTables = NULL;
STidTags* prev = NULL; STidTags* prev = NULL;
@ -513,12 +513,14 @@ void tscBuildVgroupTableInfo(STableMetaInfo* pTableMetaInfo, SArray* tables) {
taosArrayPush(result, &info); taosArrayPush(result, &info);
} }
tscTrace("%p tid:%d, uid:%"PRIu64",vgId:%d added for vnode query", pSql, tt->tid, tt->uid, tt->vgId)
STableIdInfo item = {.uid = tt->uid, .tid = tt->tid, .key = INT64_MIN}; STableIdInfo item = {.uid = tt->uid, .tid = tt->tid, .key = INT64_MIN};
taosArrayPush(vgTables, &item); taosArrayPush(vgTables, &item);
prev = tt; prev = tt;
} }
pTableMetaInfo->pVgroupTables = result; pTableMetaInfo->pVgroupTables = result;
pTableMetaInfo->vgroupIndex = 0;
} }
static void issueTSCompQuery(SSqlObj* pSql, SJoinSupporter* pSupporter, SSqlObj* pParent) { static void issueTSCompQuery(SSqlObj* pSql, SJoinSupporter* pSupporter, SSqlObj* pParent) {
@ -544,9 +546,8 @@ static void issueTSCompQuery(SSqlObj* pSql, SJoinSupporter* pSupporter, SSqlObj*
// set the tags value for ts_comp function // set the tags value for ts_comp function
SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, 0); SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, 0);
int16_t tagColIndex = tscGetJoinTagColIndexByUid(&pSupporter->tagCond, pTableMetaInfo->pTableMeta->uid); int16_t tagColId = tscGetJoinTagColIdByUid(&pSupporter->tagCond, pTableMetaInfo->pTableMeta->uid);
pExpr->param->i64Key = tagColId;
pExpr->param->i64Key = tagColIndex;
pExpr->numOfParams = 1; pExpr->numOfParams = 1;
// add the filter tag column // add the filter tag column
@ -566,86 +567,144 @@ static void issueTSCompQuery(SSqlObj* pSql, SJoinSupporter* pSupporter, SSqlObj*
size_t numOfCols = taosArrayGetSize(pQueryInfo->colList); size_t numOfCols = taosArrayGetSize(pQueryInfo->colList);
tscTrace( tscTrace(
"%p subquery:%p tableIndex:%d, vgroupIndex:%d, type:%d, transfer to ts_comp query to retrieve timestamps, " "%p subquery:%p tableIndex:%d, vgroupIndex:%d, numOfVgroups:%d, type:%d, ts_comp query to retrieve timestamps, "
"exprInfo:%d, colList:%d, fieldsInfo:%d, name:%s", "numOfExpr:%d, colList:%d, numOfOutputFields:%d, name:%s",
pParent, pSql, 0, pTableMetaInfo->vgroupIndex, pQueryInfo->type, tscSqlExprNumOfExprs(pQueryInfo), pParent, pSql, 0, pTableMetaInfo->vgroupIndex, pTableMetaInfo->vgroupList->numOfVgroups, pQueryInfo->type,
numOfCols, pQueryInfo->fieldsInfo.numOfOutput, pTableMetaInfo->name); tscSqlExprNumOfExprs(pQueryInfo), numOfCols, pQueryInfo->fieldsInfo.numOfOutput, pTableMetaInfo->name);
tscProcessSql(pSql); tscProcessSql(pSql);
} }
static bool checkForIdenticalTagVal(SQueryInfo* pQueryInfo, SJoinSupporter* p1, void* pSql) {
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
SSchema* pSchema = tscGetTableTagSchema(pTableMetaInfo->pTableMeta);// todo: tags mismatch, tags not completed
SColumn *pCol = taosArrayGetP(pTableMetaInfo->tagColList, 0);
SSchema *pColSchema = &pSchema[pCol->colIndex.columnIndex];
for(int32_t i = 1; i < p1->num; ++i) {
STidTags* prev = (STidTags*) varDataVal(p1->pIdTagList + (i - 1) * p1->tagSize);
STidTags* p = (STidTags*) varDataVal(p1->pIdTagList + i * p1->tagSize);
if (doCompare(prev->tag, p->tag, pColSchema->type, pColSchema->bytes) == 0) {
tscError("%p join tags have same value for different table, free all sub SqlObj and quit", pSql);
p1->pState->code = TSDB_CODE_QRY_DUP_JOIN_KEY;
return false;
}
}
return true;
}
static void getIntersectionOfTagVal(SQueryInfo* pQueryInfo, SSqlObj* pParentSql, SArray** s1, SArray** s2) {
tscTrace("%p all subqueries retrieve tags complete, do tags match", pParentSql);
SJoinSupporter* p1 = pParentSql->pSubs[0]->param;
SJoinSupporter* p2 = pParentSql->pSubs[1]->param;
qsort(p1->pIdTagList, p1->num, p1->tagSize, tscCompareTidTags);
qsort(p2->pIdTagList, p2->num, p2->tagSize, tscCompareTidTags);
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
int16_t tagColId = tscGetJoinTagColIdByUid(&pQueryInfo->tagCond, pTableMetaInfo->pTableMeta->uid);
SSchema* pColSchema = tscGetTableColumnSchemaById(pTableMetaInfo->pTableMeta, tagColId);
*s1 = taosArrayInit(p1->num, p1->tagSize);
*s2 = taosArrayInit(p2->num, p2->tagSize);
if (!(checkForIdenticalTagVal(pQueryInfo, p1, pParentSql) && checkForIdenticalTagVal(pQueryInfo, p2, pParentSql))) {
freeJoinSubqueryObj(pParentSql);
pParentSql->res.code = TSDB_CODE_QRY_DUP_JOIN_KEY;
tscQueueAsyncRes(pParentSql);
return;
}
int32_t i = 0, j = 0;
while(i < p1->num && j < p2->num) {
STidTags* pp1 = (STidTags*) varDataVal(p1->pIdTagList + i * p1->tagSize);
STidTags* pp2 = (STidTags*) varDataVal(p2->pIdTagList + j * p2->tagSize);
int32_t ret = doCompare(pp1->tag, pp2->tag, pColSchema->type, pColSchema->bytes);
if (ret == 0) {
tscTrace("%p tag matched, vgId:%d, val:%d, tid:%d, uid:%"PRIu64", tid:%d, uid:%"PRIu64, pParentSql, pp1->vgId,
*(int*) pp1->tag, pp1->tid, pp1->uid, pp2->tid, pp2->uid);
taosArrayPush(*s1, pp1);
taosArrayPush(*s2, pp2);
j++;
i++;
} else if (ret > 0) {
j++;
} else {
i++;
}
}
}
static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) { static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
SJoinSupporter* pSupporter = (SJoinSupporter*)param; SJoinSupporter* pSupporter = (SJoinSupporter*)param;
SSqlObj* pParentSql = pSupporter->pObj; SSqlObj* pParentSql = pSupporter->pObj;
SSqlObj* pSql = (SSqlObj*)tres; SSqlObj* pSql = (SSqlObj*)tres;
SSqlCmd* pCmd = &pSql->cmd; SSqlCmd* pCmd = &pSql->cmd;
SSqlRes* pRes = &pSql->res;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
// response of tag retrieve // response of tag retrieve
if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_TAG_FILTER_QUERY)) { if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_TAG_FILTER_QUERY)) {
//todo handle error // todo handle error
if (numOfRows == 0 || pSql->res.completed) {
if (numOfRows == 0 || pRes->completed) {
if (numOfRows > 0) { if (numOfRows > 0) {
size_t length = pSupporter->totalLen + pSql->res.rspLen; size_t validLen = pSupporter->tagSize * pRes->numOfRows;
char* tmp = realloc(pSupporter->pIdTagList, length);
size_t length = pSupporter->totalLen + validLen;
char* tmp = realloc(pSupporter->pIdTagList, length);
assert(tmp != NULL); assert(tmp != NULL);
pSupporter->pIdTagList = tmp; pSupporter->pIdTagList = tmp;
memcpy(pSupporter->pIdTagList, pSql->res.data, pSql->res.rspLen); memcpy(pSupporter->pIdTagList + pSupporter->totalLen,pRes->data, validLen);
pSupporter->totalLen += pSql->res.rspLen; pSupporter->totalLen += validLen;
pSupporter->num += pSql->res.numOfRows; pSupporter->num += pRes->numOfRows;
}
// <tid + tag> tuples have been retrieved to client, try <tid + tag> tuples from the next vnode
if (hasMoreVnodesToTry(pSql)) {
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
int32_t totalVgroups = pTableMetaInfo->vgroupList->numOfVgroups;
pTableMetaInfo->vgroupIndex += 1;
assert(pTableMetaInfo->vgroupIndex < totalVgroups);
tscTrace("%p tid_tag from vgroup index:%d completed, try next vgroup:%d. total vgroups:%d. current numOfRes:%d",
pSql, pTableMetaInfo->vgroupIndex - 1, pTableMetaInfo->vgroupIndex, totalVgroups,
pSupporter->num);
pCmd->command = TSDB_SQL_SELECT;
tscResetForNextRetrieve(&pSql->res);
// set the callback function
pSql->fp = tscJoinQueryCallback;
tscProcessSql(pSql);
return;
} }
int32_t numOfTotal = pSupporter->pState->numOfTotal; int32_t numOfTotal = pSupporter->pState->numOfTotal;
int32_t finished = atomic_add_fetch_32(&pSupporter->pState->numOfCompleted, 1); int32_t finished = atomic_add_fetch_32(&pSupporter->pState->numOfCompleted, 1);
if (finished < numOfTotal) { if (finished < numOfTotal) {
return; return;
} }
// all subqueries are returned, start to compare the tags // all subquery are returned, start to compare the tags
assert(finished == numOfTotal); assert(finished == numOfTotal);
tscTrace("%p all subqueries retrieve tags complete, do tags match", pParentSql);
SJoinSupporter* p1 = pParentSql->pSubs[0]->param; SArray *s1 = NULL, *s2 = NULL;
SJoinSupporter* p2 = pParentSql->pSubs[1]->param; getIntersectionOfTagVal(pQueryInfo, pParentSql, &s1, &s2);
qsort(p1->pIdTagList, p1->num, p1->tagSize, tscCompareTidTags); if (taosArrayGetSize(s1) == 0 || taosArrayGetSize(s2) == 0) { // no results,return.
qsort(p2->pIdTagList, p2->num, p2->tagSize, tscCompareTidTags);
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
SSchema* pSchema = tscGetTableTagSchema(pTableMetaInfo->pTableMeta);// todo: tags mismatch, tags not completed
SColumn *pCol = taosArrayGetP(pTableMetaInfo->tagColList, 0);
SSchema *pColSchema = &pSchema[pCol->colIndex.columnIndex];
SArray* s1 = taosArrayInit(p1->num, p1->tagSize);
SArray* s2 = taosArrayInit(p2->num, p2->tagSize);
int32_t i = 0, j = 0;
while(i < p1->num && j < p2->num) {
STidTags* pp1 = (STidTags*) varDataVal(p1->pIdTagList + i * p1->tagSize);
STidTags* pp2 = (STidTags*) varDataVal(p2->pIdTagList + j * p2->tagSize);
int32_t ret = doCompare(pp1->tag, pp2->tag, pColSchema->type, pColSchema->bytes);
if (ret == 0) {
taosArrayPush(s1, pp1);
taosArrayPush(s2, pp2);
j++;
i++;
} else if (ret > 0) {
j++;
} else {
i++;
}
}
if (taosArrayGetSize(s1) == 0 || taosArrayGetSize(s2) == 0) {// no results,return.
tscTrace("%p free all sub SqlObj and quit", pParentSql); tscTrace("%p free all sub SqlObj and quit", pParentSql);
freeJoinSubqueryObj(pParentSql); freeJoinSubqueryObj(pParentSql);
return; return;
@ -653,26 +712,35 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
SSqlCmd* pSubCmd1 = &pParentSql->pSubs[0]->cmd; SSqlCmd* pSubCmd1 = &pParentSql->pSubs[0]->cmd;
SSqlCmd* pSubCmd2 = &pParentSql->pSubs[1]->cmd; SSqlCmd* pSubCmd2 = &pParentSql->pSubs[1]->cmd;
SQueryInfo* pQueryInfo1 = tscGetQueryInfoDetail(pSubCmd1, 0); SQueryInfo* pQueryInfo1 = tscGetQueryInfoDetail(pSubCmd1, 0);
STableMetaInfo* pTableMetaInfo1 = tscGetMetaInfo(pQueryInfo1, 0); STableMetaInfo* pTableMetaInfo1 = tscGetMetaInfo(pQueryInfo1, 0);
tscBuildVgroupTableInfo(pTableMetaInfo1, s1); tscBuildVgroupTableInfo(pParentSql, pTableMetaInfo1, s1);
SQueryInfo* pQueryInfo2 = tscGetQueryInfoDetail(pSubCmd2, 0); SQueryInfo* pQueryInfo2 = tscGetQueryInfoDetail(pSubCmd2, 0);
STableMetaInfo* pTableMetaInfo2 = tscGetMetaInfo(pQueryInfo2, 0); STableMetaInfo* pTableMetaInfo2 = tscGetMetaInfo(pQueryInfo2, 0);
tscBuildVgroupTableInfo(pTableMetaInfo2, s2); tscBuildVgroupTableInfo(pParentSql, pTableMetaInfo2, s2);
pSupporter->pState->numOfCompleted = 0; pSupporter->pState->numOfCompleted = 0;
pSupporter->pState->code = 0; pSupporter->pState->code = 0;
pSupporter->pState->numOfTotal = 2; pSupporter->pState->numOfTotal = 2;
for(int32_t m = 0; m < pParentSql->numOfSubs; ++m) { for (int32_t m = 0; m < pParentSql->numOfSubs; ++m) {
SSqlObj* psub = pParentSql->pSubs[m]; SSqlObj* psub = pParentSql->pSubs[m];
issueTSCompQuery(psub, psub->param, pParentSql); issueTSCompQuery(psub, psub->param, pParentSql);
} }
} }
} else { } else {
size_t length = pSupporter->totalLen + pSql->res.rspLen; if (numOfRows < 0) { // error
pSupporter->pState->code = numOfRows;
quitAllSubquery(pParentSql, pSupporter);
pParentSql->res.code = numOfRows;
tscQueueAsyncRes(pParentSql);
return;
}
size_t length = pSupporter->totalLen + pRes->rspLen;
assert(length > 0); assert(length > 0);
char* tmp = realloc(pSupporter->pIdTagList, length); char* tmp = realloc(pSupporter->pIdTagList, length);
@ -680,9 +748,9 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
pSupporter->pIdTagList = tmp; pSupporter->pIdTagList = tmp;
memcpy(pSupporter->pIdTagList, pSql->res.data, pSql->res.rspLen); memcpy(pSupporter->pIdTagList, pRes->data, pRes->rspLen);
pSupporter->totalLen += pSql->res.rspLen; pSupporter->totalLen += pRes->rspLen;
pSupporter->num += pSql->res.numOfRows; pSupporter->num += pRes->numOfRows;
// continue retrieve data from vnode // continue retrieve data from vnode
taos_fetch_rows_a(tres, joinRetrieveCallback, param); taos_fetch_rows_a(tres, joinRetrieveCallback, param);
@ -699,42 +767,64 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
return; return;
} }
if (numOfRows == 0) { if (numOfRows > 0) { // write the compressed timestamp to disk file
tSIntersectionAndLaunchSecQuery(pSupporter, pSql); fwrite(pRes->data, pRes->numOfRows, 1, pSupporter->f);
return; fclose(pSupporter->f);
pSupporter->f = NULL;
STSBuf* pBuf = tsBufCreateFromFile(pSupporter->path, true);
if (pBuf == NULL) { // in error process, close the fd
tscError("%p invalid ts comp file from vnode, abort subquery, file size:%d", pSql, numOfRows);
pSupporter->pState->code = TSDB_CODE_TSC_APP_ERROR; // todo set the informative code
quitAllSubquery(pParentSql, pSupporter);
return;
}
if (pSupporter->pTSBuf == NULL) {
tscTrace("%p create tmp file for ts block:%s, size:%d bytes", pSql, pBuf->path, numOfRows);
pSupporter->pTSBuf = pBuf;
} else {
assert(pQueryInfo->numOfTables == 1); // for subquery, only one
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
tsBufMerge(pSupporter->pTSBuf, pBuf, pTableMetaInfo->vgroupIndex);
tsBufDestory(pBuf);
}
} }
// write the compressed timestamp to disk file if (pRes->completed) {
fwrite(pSql->res.data, pSql->res.numOfRows, 1, pSupporter->f); if (hasMoreVnodesToTry(pSql)) {
fclose(pSupporter->f); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
pSupporter->f = NULL;
STSBuf* pBuf = tsBufCreateFromFile(pSupporter->path, true); int32_t totalVgroups = pTableMetaInfo->vgroupList->numOfVgroups;
if (pBuf == NULL) { pTableMetaInfo->vgroupIndex += 1;
tscError("%p invalid ts comp file from vnode, abort subquery, file size:%d", pSql, numOfRows); assert(pTableMetaInfo->vgroupIndex < totalVgroups);
pSupporter->pState->code = TSDB_CODE_TSC_APP_ERROR; // todo set the informative code tscTrace("%p results from vgroup index:%d completed, try next vgroup:%d. total vgroups:%d. current numOfRes:%d",
quitAllSubquery(pParentSql, pSupporter); pSql, pTableMetaInfo->vgroupIndex - 1, pTableMetaInfo->vgroupIndex, totalVgroups,
return; pRes->numOfClauseTotal);
}
if (pSupporter->pTSBuf == NULL) { pCmd->command = TSDB_SQL_SELECT;
tscTrace("%p create tmp file for ts block:%s, size:%d bytes", pSql, pBuf->path, numOfRows); tscResetForNextRetrieve(&pSql->res);
pSupporter->pTSBuf = pBuf;
} else {
assert(pQueryInfo->numOfTables == 1); // for subquery, only one
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
tsBufMerge(pSupporter->pTSBuf, pBuf, pTableMetaInfo->vgroupIndex); assert(pSupporter->f == NULL);
tsBufDestory(pBuf); getTmpfilePath("ts-join", pSupporter->path);
} pSupporter->f = fopen(pSupporter->path, "w");
pRes->row = pRes->numOfRows;
if (pSql->res.completed) { // set the callback function
tSIntersectionAndLaunchSecQuery(pSupporter, pSql); pSql->fp = tscJoinQueryCallback;
} else { // open a new file to save the incoming result tscProcessSql(pSql);
return;
} else {
tSIntersectionAndLaunchSecQuery(pSupporter, pSql);
}
} else { // open a new file to save the incoming result
getTmpfilePath("ts-join", pSupporter->path); getTmpfilePath("ts-join", pSupporter->path);
pSupporter->f = fopen(pSupporter->path, "w"); pSupporter->f = fopen(pSupporter->path, "w");
pSql->res.row = pSql->res.numOfRows; pRes->row = pRes->numOfRows;
taos_fetch_rows_a(tres, joinRetrieveCallback, param); taos_fetch_rows_a(tres, joinRetrieveCallback, param);
} }
@ -745,14 +835,14 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
} }
if (numOfRows >= 0) { if (numOfRows >= 0) {
pSql->res.numOfTotal += pSql->res.numOfRows; pRes->numOfTotal += pRes->numOfRows;
} }
if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) && numOfRows == 0) { if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) && numOfRows == 0) {
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
assert(pQueryInfo->numOfTables == 1); assert(pQueryInfo->numOfTables == 1);
// for projection query, need to try next vnode if current vnode is exhausted // for projection query, need to try next vnode if current vnode is exhausted
if ((++pTableMetaInfo->vgroupIndex) < pTableMetaInfo->vgroupList->numOfVgroups) { if ((++pTableMetaInfo->vgroupIndex) < pTableMetaInfo->vgroupList->numOfVgroups) {
pSupporter->pState->numOfCompleted = 0; pSupporter->pState->numOfCompleted = 0;
pSupporter->pState->numOfTotal = 1; pSupporter->pState->numOfTotal = 1;
@ -780,7 +870,7 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
} }
// update the records for each subquery in parent sql object. // update the records for each subquery in parent sql object.
for(int32_t i = 0; i < pParentSql->numOfSubs; ++i) { for (int32_t i = 0; i < pParentSql->numOfSubs; ++i) {
if (pParentSql->pSubs[i] == NULL) { if (pParentSql->pSubs[i] == NULL) {
continue; continue;
} }
@ -1073,9 +1163,7 @@ int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter
// this data needs to be transfer to support struct // this data needs to be transfer to support struct
memset(&pNewQueryInfo->fieldsInfo, 0, sizeof(SFieldInfo)); memset(&pNewQueryInfo->fieldsInfo, 0, sizeof(SFieldInfo));
tscTagCondCopy(&pSupporter->tagCond, &pNewQueryInfo->tagCond);//pNewQueryInfo->tagCond;
pSupporter->tagCond = pNewQueryInfo->tagCond;
memset(&pNewQueryInfo->tagCond, 0, sizeof(STagCond));
pNew->cmd.numOfCols = 0; pNew->cmd.numOfCols = 0;
pNewQueryInfo->intervalTime = 0; pNewQueryInfo->intervalTime = 0;
@ -1094,32 +1182,29 @@ int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter
STagCond* pTagCond = &pSupporter->tagCond; STagCond* pTagCond = &pSupporter->tagCond;
assert(pTagCond->joinInfo.hasJoin); assert(pTagCond->joinInfo.hasJoin);
int32_t tagIndex = tscGetJoinTagColIndexByUid(pTagCond, pTableMetaInfo->pTableMeta->uid); int32_t tagColId = tscGetJoinTagColIdByUid(pTagCond, pTableMetaInfo->pTableMeta->uid);
SSchema* pTagSchema = tscGetTableTagSchema(pTableMetaInfo->pTableMeta); SSchema* s = tscGetTableColumnSchemaById(pTableMetaInfo->pTableMeta, tagColId);
SSchema s = pTagSchema[tagIndex];
int16_t bytes = 0; int16_t bytes = 0;
int16_t type = 0; int16_t type = 0;
int32_t inter = 0; int32_t inter = 0;
getResultDataInfo(s.type, s.bytes, TSDB_FUNC_TID_TAG, 0, &type, &bytes, &inter, 0, 0); getResultDataInfo(s->type, s->bytes, TSDB_FUNC_TID_TAG, 0, &type, &bytes, &inter, 0, 0);
s.type = type; SSchema s1 = {.colId = s->colId, .type = type, .bytes = bytes};
s.bytes = bytes; pSupporter->tagSize = s1.bytes;
pSupporter->tagSize = s.bytes; assert(isValidDataType(s1.type, 0) && s1.bytes > 0);
// set get tags query type // set get tags query type
TSDB_QUERY_SET_TYPE(pNewQueryInfo->type, TSDB_QUERY_TYPE_TAG_FILTER_QUERY); TSDB_QUERY_SET_TYPE(pNewQueryInfo->type, TSDB_QUERY_TYPE_TAG_FILTER_QUERY);
tscAddSpecialColumnForSelect(pNewQueryInfo, 0, TSDB_FUNC_TID_TAG, &index, &s, TSDB_COL_TAG); tscAddSpecialColumnForSelect(pNewQueryInfo, 0, TSDB_FUNC_TID_TAG, &index, &s1, TSDB_COL_TAG);
size_t numOfCols = taosArrayGetSize(pNewQueryInfo->colList); size_t numOfCols = taosArrayGetSize(pNewQueryInfo->colList);
tscTrace( tscTrace(
"%p subquery:%p tableIndex:%d, vgroupIndex:%d, type:%d, transfer to tid_tag query to retrieve (tableId, tags), " "%p subquery:%p tableIndex:%d, vgroupIndex:%d, type:%d, transfer to tid_tag query to retrieve (tableId, tags), "
"exprInfo:%d, colList:%d, fieldsInfo:%d, name:%s", "exprInfo:%d, colList:%d, fieldsInfo:%d, tagIndex:%d, name:%s",
pSql, pNew, tableIndex, pTableMetaInfo->vgroupIndex, pNewQueryInfo->type, tscSqlExprNumOfExprs(pNewQueryInfo), pSql, pNew, tableIndex, pTableMetaInfo->vgroupIndex, pNewQueryInfo->type, tscSqlExprNumOfExprs(pNewQueryInfo),
numOfCols, pNewQueryInfo->fieldsInfo.numOfOutput, pNewQueryInfo->pTableMetaInfo[0]->name); numOfCols, pNewQueryInfo->fieldsInfo.numOfOutput, index.columnIndex, pNewQueryInfo->pTableMetaInfo[0]->name);
} else { } else {
SSchema colSchema = {.type = TSDB_DATA_TYPE_BINARY, .bytes = 1}; SSchema colSchema = {.type = TSDB_DATA_TYPE_BINARY, .bytes = 1};
SColumnIndex index = {0, PRIMARYKEY_TIMESTAMP_COL_INDEX}; SColumnIndex index = {0, PRIMARYKEY_TIMESTAMP_COL_INDEX};
@ -1128,10 +1213,11 @@ int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter
// set the tags value for ts_comp function // set the tags value for ts_comp function
SSqlExpr *pExpr = tscSqlExprGet(pNewQueryInfo, 0); SSqlExpr *pExpr = tscSqlExprGet(pNewQueryInfo, 0);
int16_t tagColIndex = tscGetJoinTagColIndexByUid(&pSupporter->tagCond, pTableMetaInfo->pTableMeta->uid); if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
int16_t tagColId = tscGetJoinTagColIdByUid(&pSupporter->tagCond, pTableMetaInfo->pTableMeta->uid);
pExpr->param->i64Key = tagColIndex; pExpr->param->i64Key = tagColId;
pExpr->numOfParams = 1; pExpr->numOfParams = 1;
}
// add the filter tag column // add the filter tag column
if (pSupporter->colList != NULL) { if (pSupporter->colList != NULL) {
@ -1281,9 +1367,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
trs->pParentSqlObj = pSql; trs->pParentSqlObj = pSql;
trs->pFinalColModel = pModel; trs->pFinalColModel = pModel;
pthread_mutexattr_t mutexattr; pthread_mutexattr_t mutexattr = {0};
memset(&mutexattr, 0, sizeof(pthread_mutexattr_t));
pthread_mutexattr_settype(&mutexattr, PTHREAD_MUTEX_RECURSIVE_NP); pthread_mutexattr_settype(&mutexattr, PTHREAD_MUTEX_RECURSIVE_NP);
pthread_mutex_init(&trs->queryMutex, &mutexattr); pthread_mutex_init(&trs->queryMutex, &mutexattr);
pthread_mutexattr_destroy(&mutexattr); pthread_mutexattr_destroy(&mutexattr);
@ -1300,6 +1384,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
if (pQueryInfo->tsBuf) { if (pQueryInfo->tsBuf) {
SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0); SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
pNewQueryInfo->tsBuf = tsBufClone(pQueryInfo->tsBuf); pNewQueryInfo->tsBuf = tsBufClone(pQueryInfo->tsBuf);
assert(pNewQueryInfo->tsBuf != NULL);
} }
tscTrace("%p sub:%p create subquery success. orderOfSub:%d", pSql, pNew, trs->subqueryIndex); tscTrace("%p sub:%p create subquery success. orderOfSub:%d", pSql, pNew, trs->subqueryIndex);
@ -1453,7 +1538,7 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO
// in case of second stage join subquery, invoke its callback function instead of regular QueueAsyncRes // in case of second stage join subquery, invoke its callback function instead of regular QueueAsyncRes
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pPObj->cmd, 0); SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pPObj->cmd, 0);
if ((pQueryInfo->type & TSDB_QUERY_TYPE_JOIN_SEC_STAGE) == TSDB_QUERY_TYPE_JOIN_SEC_STAGE) { if (!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE)) {
(*pPObj->fp)(pPObj->param, pPObj, pPObj->res.code); (*pPObj->fp)(pPObj->param, pPObj, pPObj->res.code);
} else { // regular super table query } else { // regular super table query
if (pPObj->res.code != TSDB_CODE_SUCCESS) { if (pPObj->res.code != TSDB_CODE_SUCCESS) {
@ -1474,7 +1559,7 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p
// data in from current vnode is stored in cache and disk // data in from current vnode is stored in cache and disk
uint32_t numOfRowsFromSubquery = trsupport->pExtMemBuffer[idx]->numOfTotalElems + trsupport->localBuffer->num; uint32_t numOfRowsFromSubquery = trsupport->pExtMemBuffer[idx]->numOfTotalElems + trsupport->localBuffer->num;
tscTrace("%p sub:%p all data retrieved from ip:%u,vgId:%d, numOfRows:%d, orderOfSub:%d", pPObj, pSql, tscTrace("%p sub:%p all data retrieved from ip:%s, vgId:%d, numOfRows:%d, orderOfSub:%d", pPObj, pSql,
pTableMetaInfo->vgroupList->vgroups[0].ipAddr[0].fqdn, pTableMetaInfo->vgroupList->vgroups[0].vgId, pTableMetaInfo->vgroupList->vgroups[0].ipAddr[0].fqdn, pTableMetaInfo->vgroupList->vgroups[0].vgId,
numOfRowsFromSubquery, idx); numOfRowsFromSubquery, idx);
@ -1708,7 +1793,7 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
tscHandleSubqueryError(param, tres, pState->code); tscHandleSubqueryError(param, tres, pState->code);
} else { // success, proceed to retrieve data from dnode } else { // success, proceed to retrieve data from dnode
tscTrace("%p sub:%p query complete, ip:%u, vgId:%d, orderOfSub:%d, retrieve data", trsupport->pParentSqlObj, pSql, tscTrace("%p sub:%p query complete, ip:%s, vgId:%d, orderOfSub:%d, retrieve data", trsupport->pParentSqlObj, pSql,
pVgroup->ipAddr[0].fqdn, pVgroup->vgId, trsupport->subqueryIndex); pVgroup->ipAddr[0].fqdn, pVgroup->vgId, trsupport->subqueryIndex);
if (pSql->res.qhandle == 0) { // qhandle is NULL, code is TSDB_CODE_SUCCESS means no results generated from this vnode if (pSql->res.qhandle == 0) { // qhandle is NULL, code is TSDB_CODE_SUCCESS means no results generated from this vnode
@ -1868,7 +1953,7 @@ void tscBuildResFromSubqueries(SSqlObj *pSql) {
pRes->numOfClauseTotal++; pRes->numOfClauseTotal++;
break; break;
} else { // continue retrieve data from vnode } else { // continue retrieve data from vnode
if (!tscHashRemainDataInSubqueryResultSet(pSql)) { if (!tscHasRemainDataInSubqueryResultSet(pSql)) {
tscTrace("%p at least one subquery exhausted, free all other %d subqueries", pSql, pSql->numOfSubs - 1); tscTrace("%p at least one subquery exhausted, free all other %d subqueries", pSql, pSql->numOfSubs - 1);
SSubqueryState *pState = NULL; SSubqueryState *pState = NULL;
@ -2018,7 +2103,7 @@ void **doSetResultRowData(SSqlObj *pSql, bool finalResult) {
return pRes->tsrow; return pRes->tsrow;
} }
static UNUSED_FUNC bool tscHashRemainDataInSubqueryResultSet(SSqlObj *pSql) { static bool tscHasRemainDataInSubqueryResultSet(SSqlObj *pSql) {
bool hasData = true; bool hasData = true;
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
@ -2061,8 +2146,7 @@ static UNUSED_FUNC bool tscHashRemainDataInSubqueryResultSet(SSqlObj *pSql) {
SQueryInfo *pQueryInfo1 = tscGetQueryInfoDetail(&pSql->pSubs[i]->cmd, 0); SQueryInfo *pQueryInfo1 = tscGetQueryInfoDetail(&pSql->pSubs[i]->cmd, 0);
if ((pRes1->row >= pRes1->numOfRows && tscHasReachLimitation(pQueryInfo1, pRes1) && if ((pRes1->row >= pRes1->numOfRows && tscHasReachLimitation(pQueryInfo1, pRes1) &&
tscProjectionQueryOnTable(pQueryInfo1)) || tscIsProjectionQuery(pQueryInfo1)) || (pRes1->numOfRows == 0)) {
(pRes1->numOfRows == 0)) {
hasData = false; hasData = false;
break; break;
} }

View File

@ -70,13 +70,6 @@ void tsSetSTableQueryCond(STagCond* pTagCond, uint64_t uid, SBufferWriter* bw) {
taosArrayPush(pTagCond->pCond, &cond); taosArrayPush(pTagCond->pCond, &cond);
} }
bool tscQueryOnSTable(SSqlCmd* pCmd) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
return ((pQueryInfo->type & TSDB_QUERY_TYPE_STABLE_QUERY) == TSDB_QUERY_TYPE_STABLE_QUERY) &&
(pCmd->msgType == TSDB_MSG_TYPE_QUERY);
}
bool tscQueryTags(SQueryInfo* pQueryInfo) { bool tscQueryTags(SQueryInfo* pQueryInfo) {
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) { for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i); SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);
@ -95,32 +88,8 @@ bool tscQueryTags(SQueryInfo* pQueryInfo) {
return true; return true;
} }
bool tscIsSelectivityWithTagQuery(SSqlCmd* pCmd) { // todo refactor, extract methods and move the common module
bool hasTags = false; void tscGetDBInfoFromTableFullName(char* tableId, char* db) {
int32_t numOfSelectivity = 0;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
int32_t functId = tscSqlExprGet(pQueryInfo, i)->functionId;
if (functId == TSDB_FUNC_TAG_DUMMY) {
hasTags = true;
continue;
}
if ((aAggs[functId].nStatus & TSDB_FUNCSTATE_SELECTIVITY) != 0) {
numOfSelectivity++;
}
}
if (numOfSelectivity > 0 && hasTags) {
return true;
}
return false;
}
void tscGetDBInfoFromMeterId(char* tableId, char* db) {
char* st = strstr(tableId, TS_PATH_DELIMITER); char* st = strstr(tableId, TS_PATH_DELIMITER);
if (st != NULL) { if (st != NULL) {
char* end = strstr(st + 1, TS_PATH_DELIMITER); char* end = strstr(st + 1, TS_PATH_DELIMITER);
@ -181,8 +150,14 @@ bool tscIsProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex) {
for (int32_t i = 0; i < numOfExprs; ++i) { for (int32_t i = 0; i < numOfExprs; ++i) {
int32_t functionId = tscSqlExprGet(pQueryInfo, i)->functionId; int32_t functionId = tscSqlExprGet(pQueryInfo, i)->functionId;
if (functionId != TSDB_FUNC_PRJ && functionId != TSDB_FUNC_TAGPRJ && functionId != TSDB_FUNC_TAG &&
functionId != TSDB_FUNC_TS && functionId != TSDB_FUNC_ARITHM) { if (functionId != TSDB_FUNC_PRJ &&
functionId != TSDB_FUNC_TAGPRJ &&
functionId != TSDB_FUNC_TAG &&
functionId != TSDB_FUNC_TS &&
functionId != TSDB_FUNC_ARITHM &&
functionId != TSDB_FUNC_TS_COMP &&
functionId != TSDB_FUNC_TID_TAG) {
return false; return false;
} }
} }
@ -209,10 +184,14 @@ bool tscOrderedProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableInde
return pQueryInfo->order.orderColId >= 0; return pQueryInfo->order.orderColId >= 0;
} }
bool tscProjectionQueryOnTable(SQueryInfo* pQueryInfo) { bool tscIsProjectionQuery(SQueryInfo* pQueryInfo) {
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) { size_t size = tscSqlExprNumOfExprs(pQueryInfo);
for (int32_t i = 0; i < size; ++i) {
int32_t functionId = tscSqlExprGet(pQueryInfo, i)->functionId; int32_t functionId = tscSqlExprGet(pQueryInfo, i)->functionId;
if (functionId != TSDB_FUNC_PRJ && functionId != TSDB_FUNC_TS) {
if (functionId != TSDB_FUNC_PRJ && functionId != TSDB_FUNC_TAGPRJ && functionId != TSDB_FUNC_TAG &&
functionId != TSDB_FUNC_TS && functionId != TSDB_FUNC_ARITHM) {
return false; return false;
} }
} }
@ -225,9 +204,10 @@ bool tscIsPointInterpQuery(SQueryInfo* pQueryInfo) {
for (int32_t i = 0; i < size; ++i) { for (int32_t i = 0; i < size; ++i) {
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i); SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);
if (pExpr == NULL) { assert(pExpr != NULL);
return false; // if (pExpr == NULL) {
} // return false;
// }
int32_t functionId = pExpr->functionId; int32_t functionId = pExpr->functionId;
if (functionId == TSDB_FUNC_TAG) { if (functionId == TSDB_FUNC_TAG) {
@ -238,6 +218,7 @@ bool tscIsPointInterpQuery(SQueryInfo* pQueryInfo) {
return false; return false;
} }
} }
return true; return true;
} }
@ -1774,7 +1755,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
SQueryInfo* pPrevQueryInfo = tscGetQueryInfoDetail(&pPrevSql->cmd, pPrevSql->cmd.clauseIndex); SQueryInfo* pPrevQueryInfo = tscGetQueryInfoDetail(&pPrevSql->cmd, pPrevSql->cmd.clauseIndex);
pNewQueryInfo->type = pPrevQueryInfo->type; pNewQueryInfo->type = pPrevQueryInfo->type;
} else { } else {
pNewQueryInfo->type |= TSDB_QUERY_TYPE_SUBQUERY; // it must be the subquery TSDB_QUERY_SET_TYPE(pNewQueryInfo->type, TSDB_QUERY_TYPE_SUBQUERY);// it must be the subquery
} }
uint64_t uid = pTableMetaInfo->pTableMeta->uid; uint64_t uid = pTableMetaInfo->pTableMeta->uid;
@ -1799,19 +1780,26 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
} }
// make sure the the sqlExpr for each fields is correct // make sure the the sqlExpr for each fields is correct
// todo handle the agg arithmetic expression // todo handle the agg arithmetic expression
numOfExprs = tscSqlExprNumOfExprs(pNewQueryInfo);
for(int32_t f = 0; f < pNewQueryInfo->fieldsInfo.numOfOutput; ++f) { for(int32_t f = 0; f < pNewQueryInfo->fieldsInfo.numOfOutput; ++f) {
TAOS_FIELD* field = tscFieldInfoGetField(&pNewQueryInfo->fieldsInfo, f); TAOS_FIELD* field = tscFieldInfoGetField(&pNewQueryInfo->fieldsInfo, f);
numOfExprs = tscSqlExprNumOfExprs(pNewQueryInfo); bool matched = false;
for(int32_t k1 = 0; k1 < numOfExprs; ++k1) { for(int32_t k1 = 0; k1 < numOfExprs; ++k1) {
SSqlExpr* pExpr1 = tscSqlExprGet(pNewQueryInfo, k1); SSqlExpr* pExpr1 = tscSqlExprGet(pNewQueryInfo, k1);
if (strcmp(field->name, pExpr1->aliasName) == 0) { // eatablish link according to the result field name if (strcmp(field->name, pExpr1->aliasName) == 0) { // establish link according to the result field name
SFieldSupInfo* pInfo = tscFieldInfoGetSupp(&pNewQueryInfo->fieldsInfo, f); SFieldSupInfo* pInfo = tscFieldInfoGetSupp(&pNewQueryInfo->fieldsInfo, f);
pInfo->pSqlExpr = pExpr1; pInfo->pSqlExpr = pExpr1;
matched = true;
break;
} }
} }
assert(matched);
} }
tscFieldInfoUpdateOffset(pNewQueryInfo); tscFieldInfoUpdateOffset(pNewQueryInfo);
@ -1900,16 +1888,21 @@ void tscDoQuery(SSqlObj* pSql) {
} }
if (QUERY_IS_JOIN_QUERY(type)) { if (QUERY_IS_JOIN_QUERY(type)) {
if ((pQueryInfo->type & TSDB_QUERY_TYPE_SUBQUERY) == 0) { if (!TSDB_QUERY_HAS_TYPE(type, TSDB_QUERY_TYPE_SUBQUERY)) {
tscHandleMasterJoinQuery(pSql); tscHandleMasterJoinQuery(pSql);
return; } else { // for first stage sub query, iterate all vnodes to get all timestamp
} else { if (!TSDB_QUERY_HAS_TYPE(type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE)) {
// for first stage sub query, iterate all vnodes to get all timestamp tscProcessSql(pSql);
if ((pQueryInfo->type & TSDB_QUERY_TYPE_JOIN_SEC_STAGE) != TSDB_QUERY_TYPE_JOIN_SEC_STAGE) { } else { // secondary stage join query.
// doProcessSql(pSql); if (tscIsTwoStageSTableQuery(pQueryInfo, 0)) { // super table query
assert(0); tscHandleMasterSTableQuery(pSql);
} else {
tscProcessSql(pSql);
}
} }
} }
return;
} else if (tscIsTwoStageSTableQuery(pQueryInfo, 0)) { // super table query } else if (tscIsTwoStageSTableQuery(pQueryInfo, 0)) { // super table query
tscHandleMasterSTableQuery(pSql); tscHandleMasterSTableQuery(pSql);
return; return;
@ -1919,13 +1912,13 @@ void tscDoQuery(SSqlObj* pSql) {
} }
} }
int16_t tscGetJoinTagColIndexByUid(STagCond* pTagCond, uint64_t uid) { int16_t tscGetJoinTagColIdByUid(STagCond* pTagCond, uint64_t uid) {
if (pTagCond->joinInfo.left.uid == uid) { if (pTagCond->joinInfo.left.uid == uid) {
return pTagCond->joinInfo.left.tagCol; return pTagCond->joinInfo.left.tagColId;
} else if (pTagCond->joinInfo.right.uid == uid){ } else if (pTagCond->joinInfo.right.uid == uid) {
return pTagCond->joinInfo.right.tagCol; return pTagCond->joinInfo.right.tagColId;
} else { } else {
return -2; assert(0);
} }
} }
@ -1982,10 +1975,9 @@ bool hasMoreVnodesToTry(SSqlObj* pSql) {
return false; return false;
} }
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
assert(pRes->completed); assert(pRes->completed);
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
// for normal table, no need to try any more if results are all retrieved from one vnode // for normal table, no need to try any more if results are all retrieved from one vnode
if (!UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo) || (pTableMetaInfo->vgroupList == NULL)) { if (!UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo) || (pTableMetaInfo->vgroupList == NULL)) {
@ -2008,7 +2000,6 @@ void tscTryQueryNextVnode(SSqlObj* pSql, __async_cb_func_t fp) {
* if case of: multi-vnode super table projection query * if case of: multi-vnode super table projection query
*/ */
assert(pRes->numOfRows == 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) && !tscHasReachLimitation(pQueryInfo, pRes)); assert(pRes->numOfRows == 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) && !tscHasReachLimitation(pQueryInfo, pRes));
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
int32_t totalVgroups = pTableMetaInfo->vgroupList->numOfVgroups; int32_t totalVgroups = pTableMetaInfo->vgroupList->numOfVgroups;

View File

@ -195,6 +195,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_QRY_INVALID_MSG, 0, 0x0701, "query inva
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_NO_DISKSPACE, 0, 0x0702, "query no diskspace") TAOS_DEFINE_ERROR(TSDB_CODE_QRY_NO_DISKSPACE, 0, 0x0702, "query no diskspace")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_OUT_OF_MEMORY, 0, 0x0703, "query out of memory") TAOS_DEFINE_ERROR(TSDB_CODE_QRY_OUT_OF_MEMORY, 0, 0x0703, "query out of memory")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_APP_ERROR, 0, 0x0704, "query app error") TAOS_DEFINE_ERROR(TSDB_CODE_QRY_APP_ERROR, 0, 0x0704, "query app error")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_DUP_JOIN_KEY, 0, 0x0705, "query duplicated join key")
// grant // grant
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_EXPIRED, 0, 0x0800, "grant expired") TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_EXPIRED, 0, 0x0800, "grant expired")

View File

@ -455,7 +455,7 @@ typedef struct {
int16_t orderType; // used in group by xx order by xxx int16_t orderType; // used in group by xx order by xxx
int64_t limit; int64_t limit;
int64_t offset; int64_t offset;
uint16_t queryType; // denote another query process uint32_t queryType; // denote another query process
int16_t numOfOutput; // final output columns numbers int16_t numOfOutput; // final output columns numbers
int16_t tagNameRelType; // relation of tag criteria and tbname criteria int16_t tagNameRelType; // relation of tag criteria and tbname criteria
int16_t fillType; // interpolate type int16_t fillType; // interpolate type

View File

@ -100,10 +100,10 @@ typedef struct STSBuf {
typedef struct STSBufFileHeader { typedef struct STSBufFileHeader {
uint32_t magic; // file magic number uint32_t magic; // file magic number
uint32_t numOfVnode; // number of vnode stored in current file uint32_t numOfVnode; // number of vnode stored in current file
uint32_t tsOrder; // timestamp order in current file int32_t tsOrder; // timestamp order in current file
} STSBufFileHeader; } STSBufFileHeader;
STSBuf* tsBufCreate(bool autoDelete); STSBuf* tsBufCreate(bool autoDelete, int32_t order);
STSBuf* tsBufCreateFromFile(const char* path, bool autoDelete); STSBuf* tsBufCreateFromFile(const char* path, bool autoDelete);
STSBuf* tsBufCreateFromCompBlocks(const char* pData, int32_t numOfBlocks, int32_t len, int32_t tsOrder); STSBuf* tsBufCreateFromCompBlocks(const char* pData, int32_t numOfBlocks, int32_t len, int32_t tsOrder);

View File

@ -12,6 +12,7 @@
* You should have received a copy of the GNU Affero General Public License * You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include <taosmsg.h>
#include "os.h" #include "os.h"
#include "qfill.h" #include "qfill.h"
@ -1072,10 +1073,6 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
groupbyColumnData = getGroupbyColumnData(pQuery, &type, &bytes, pDataBlock); groupbyColumnData = getGroupbyColumnData(pQuery, &type, &bytes, pDataBlock);
} }
if (pRuntimeEnv->pTSBuf != NULL && pQuery->numOfOutput > 1) {
printf("xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx\n");
}
for (int32_t k = 0; k < pQuery->numOfOutput; ++k) { for (int32_t k = 0; k < pQuery->numOfOutput; ++k) {
char *dataBlock = getDataBlock(pRuntimeEnv, &sasArray[k], k, pDataBlockInfo->rows, pDataBlock); char *dataBlock = getDataBlock(pRuntimeEnv, &sasArray[k], k, pDataBlockInfo->rows, pDataBlock);
setExecParams(pQuery, &pCtx[k], dataBlock, tsCols, pDataBlockInfo, pStatis, &sasArray[k], k); setExecParams(pQuery, &pCtx[k], dataBlock, tsCols, pDataBlockInfo, pStatis, &sasArray[k], k);
@ -2193,8 +2190,7 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
* set tag value in SQLFunctionCtx * set tag value in SQLFunctionCtx
* e.g.,tag information into input buffer * e.g.,tag information into input buffer
*/ */
static void doSetTagValueInParam(void *tsdb, STableId* pTableId, int32_t tagColId, tVariant *tag, int16_t type, static void doSetTagValueInParam(void *tsdb, STableId* pTableId, int32_t tagColId, tVariant *tag, int16_t type, int16_t bytes) {
int16_t bytes) {
tVariantDestroy(tag); tVariantDestroy(tag);
if (tagColId == TSDB_TBNAME_COLUMN_INDEX) { if (tagColId == TSDB_TBNAME_COLUMN_INDEX) {
@ -2219,35 +2215,55 @@ static void doSetTagValueInParam(void *tsdb, STableId* pTableId, int32_t tagColI
void setTagVal(SQueryRuntimeEnv *pRuntimeEnv, STableId* pTableId, void *tsdb) { void setTagVal(SQueryRuntimeEnv *pRuntimeEnv, STableId* pTableId, void *tsdb) {
SQuery *pQuery = pRuntimeEnv->pQuery; SQuery *pQuery = pRuntimeEnv->pQuery;
SQInfo* pQInfo = GET_QINFO_ADDR(pRuntimeEnv);
SExprInfo *pExprInfo = &pQuery->pSelectExpr[0]; SExprInfo *pExprInfo = &pQuery->pSelectExpr[0];
if (pQuery->numOfOutput == 1 && pExprInfo->base.functionId == TSDB_FUNC_TS_COMP) { if (pQuery->numOfOutput == 1 && pExprInfo->base.functionId == TSDB_FUNC_TS_COMP) {
assert(pExprInfo->base.numOfParams == 1); assert(pExprInfo->base.numOfParams == 1);
doSetTagValueInParam(tsdb, pTableId, pExprInfo->base.arg->argValue.i64, &pRuntimeEnv->pCtx[0].tag,
pExprInfo->type, pExprInfo->bytes); // todo refactor extract function.
int16_t type = -1, bytes = -1;
for(int32_t i = 0; i < pQuery->numOfTags; ++i) {
if (pQuery->tagColList[i].colId == pExprInfo->base.arg->argValue.i64) {
type = pQuery->tagColList[i].type;
bytes = pQuery->tagColList[i].bytes;
}
}
doSetTagValueInParam(tsdb, pTableId, pExprInfo->base.arg->argValue.i64, &pRuntimeEnv->pCtx[0].tag, type, bytes);
} else { } else {
// set tag value, by which the results are aggregated. // set tag value, by which the results are aggregated.
for (int32_t idx = 0; idx < pQuery->numOfOutput; ++idx) { for (int32_t idx = 0; idx < pQuery->numOfOutput; ++idx) {
SExprInfo* pExprInfo = &pQuery->pSelectExpr[idx]; SExprInfo* pLocalExprInfo = &pQuery->pSelectExpr[idx];
// ts_comp column required the tag value for join filter // ts_comp column required the tag value for join filter
if (!TSDB_COL_IS_TAG(pExprInfo->base.colInfo.flag)) { if (!TSDB_COL_IS_TAG(pLocalExprInfo->base.colInfo.flag)) {
continue; continue;
} }
// todo use tag column index to optimize performance // todo use tag column index to optimize performance
doSetTagValueInParam(tsdb, pTableId, pExprInfo->base.colInfo.colId, &pRuntimeEnv->pCtx[idx].tag, doSetTagValueInParam(tsdb, pTableId, pLocalExprInfo->base.colInfo.colId, &pRuntimeEnv->pCtx[idx].tag,
pExprInfo->type, pExprInfo->bytes); pLocalExprInfo->type, pLocalExprInfo->bytes);
} }
// set the join tag for first column // set the join tag for first column
SSqlFuncMsg *pFuncMsg = &pExprInfo->base; SSqlFuncMsg *pFuncMsg = &pExprInfo->base;
if (pFuncMsg->functionId == TSDB_FUNC_TS && pFuncMsg->colInfo.colIndex == PRIMARYKEY_TIMESTAMP_COL_INDEX && if ((pFuncMsg->functionId == TSDB_FUNC_TS || pFuncMsg->functionId == TSDB_FUNC_PRJ) && pFuncMsg->colInfo.colIndex == PRIMARYKEY_TIMESTAMP_COL_INDEX &&
pRuntimeEnv->pTSBuf != NULL) { pRuntimeEnv->pTSBuf != NULL) {
assert(pFuncMsg->numOfParams == 1); assert(pFuncMsg->numOfParams == 1);
assert(0); // to do fix me
// doSetTagValueInParam(pTagSchema, pFuncMsg->arg->argValue.i64, pMeterSidInfo, &pRuntimeEnv->pCtx[0].tag); // todo refactor
int16_t type = -1, bytes = -1;
for(int32_t i = 0; i < pQuery->numOfTags; ++i) {
if (pQuery->tagColList[i].colId == pExprInfo->base.arg->argValue.i64) {
type = pQuery->tagColList[i].type;
bytes = pQuery->tagColList[i].bytes;
}
}
doSetTagValueInParam(tsdb, pTableId, pExprInfo->base.arg->argValue.i64, &pRuntimeEnv->pCtx[0].tag, type, bytes);
qTrace("QInfo:%p set tag value for join comparison, colId:%d, val:%"PRId64, pQInfo, pExprInfo->base.arg->argValue.i64,
pRuntimeEnv->pCtx[0].tag)
} }
} }
} }
@ -3623,9 +3639,6 @@ bool queryHasRemainResults(SQueryRuntimeEnv* pRuntimeEnv) {
static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data) { static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data) {
SQuery *pQuery = pQInfo->runtimeEnv.pQuery; SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
if (pQInfo->runtimeEnv.pTSBuf != NULL && pQuery->numOfOutput > 1) {
printf("ffffffffffffffffffffffffff\n");
}
for (int32_t col = 0; col < pQuery->numOfOutput; ++col) { for (int32_t col = 0; col < pQuery->numOfOutput; ++col) {
int32_t bytes = pQuery->pSelectExpr[col].bytes; int32_t bytes = pQuery->pSelectExpr[col].bytes;
@ -4461,6 +4474,10 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
} }
} }
} }
if (pQInfo->tableIndex >= pQInfo->groupInfo.numOfTables) {
setQueryStatus(pQuery, QUERY_COMPLETED);
}
} }
/* /*
@ -5028,7 +5045,7 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
pQueryMsg->order = htons(pQueryMsg->order); pQueryMsg->order = htons(pQueryMsg->order);
pQueryMsg->orderColId = htons(pQueryMsg->orderColId); pQueryMsg->orderColId = htons(pQueryMsg->orderColId);
pQueryMsg->queryType = htons(pQueryMsg->queryType); pQueryMsg->queryType = htonl(pQueryMsg->queryType);
pQueryMsg->tagNameRelType = htons(pQueryMsg->tagNameRelType); pQueryMsg->tagNameRelType = htons(pQueryMsg->tagNameRelType);
pQueryMsg->numOfCols = htons(pQueryMsg->numOfCols); pQueryMsg->numOfCols = htons(pQueryMsg->numOfCols);
@ -5047,9 +5064,6 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
} }
char *pMsg = (char *)(pQueryMsg->colList) + sizeof(SColumnInfo) * pQueryMsg->numOfCols; char *pMsg = (char *)(pQueryMsg->colList) + sizeof(SColumnInfo) * pQueryMsg->numOfCols;
if (pQueryMsg->numOfCols > 1 && pQueryMsg->tsLen > 0) {
printf("ffffffffffffffff\n");
}
for (int32_t col = 0; col < pQueryMsg->numOfCols; ++col) { for (int32_t col = 0; col < pQueryMsg->numOfCols; ++col) {
SColumnInfo *pColInfo = &pQueryMsg->colList[col]; SColumnInfo *pColInfo = &pQueryMsg->colList[col];
@ -5199,11 +5213,11 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
pMsg += len; pMsg += len;
} }
qTrace("qmsg:%p query %d tables, qrange:%" PRId64 "-%" PRId64 ", numOfGroupbyTagCols:%d, order:%d, " qTrace("qmsg:%p query %d tables, type:%d, qrange:%" PRId64 "-%" PRId64 ", numOfGroupbyTagCols:%d, order:%d, "
"outputCols:%d, numOfCols:%d, interval:%" PRId64 ", fillType:%d, comptsLen:%d, limit:%" PRId64 ", offset:%" PRId64, "outputCols:%d, numOfCols:%d, interval:%" PRId64 ", fillType:%d, comptsLen:%d, compNumOfBlocks:%d, limit:%" PRId64 ", offset:%" PRId64,
pQueryMsg, pQueryMsg->numOfTables, pQueryMsg->window.skey, pQueryMsg->window.ekey, pQueryMsg->numOfGroupCols, pQueryMsg, pQueryMsg->numOfTables, pQueryMsg->queryType, pQueryMsg->window.skey, pQueryMsg->window.ekey, pQueryMsg->numOfGroupCols,
pQueryMsg->order, pQueryMsg->numOfOutput, pQueryMsg->numOfCols, pQueryMsg->intervalTime, pQueryMsg->order, pQueryMsg->numOfOutput, pQueryMsg->numOfCols, pQueryMsg->intervalTime,
pQueryMsg->fillType, pQueryMsg->tsLen, pQueryMsg->limit, pQueryMsg->offset); pQueryMsg->fillType, pQueryMsg->tsLen, pQueryMsg->tsNumOfBlocks, pQueryMsg->limit, pQueryMsg->offset);
return 0; return 0;
} }
@ -5241,9 +5255,6 @@ static int32_t createQFunctionExprFromMsg(SQueryTableMsg *pQueryMsg, SExprInfo *
bool isSuperTable = QUERY_IS_STABLE_QUERY(pQueryMsg->queryType); bool isSuperTable = QUERY_IS_STABLE_QUERY(pQueryMsg->queryType);
int16_t tagLen = 0; int16_t tagLen = 0;
if (pQueryMsg->numOfOutput > 1 && pQueryMsg->tsLen > 0) {
printf("ffffffffffffffffffff\n");
}
for (int32_t i = 0; i < pQueryMsg->numOfOutput; ++i) { for (int32_t i = 0; i < pQueryMsg->numOfOutput; ++i) {
pExprs[i].base = *pExprMsg[i]; pExprs[i].base = *pExprMsg[i];
pExprs[i].bytes = 0; pExprs[i].bytes = 0;
@ -5883,23 +5894,20 @@ int32_t qCreateQueryInfo(void *tsdb, int32_t vgId, SQueryTableMsg *pQueryMsg, qi
bool isSTableQuery = false; bool isSTableQuery = false;
STableGroupInfo groupInfo = {0}; STableGroupInfo groupInfo = {0};
//todo multitable_query?? if (TSDB_QUERY_HAS_TYPE(pQueryMsg->queryType, TSDB_QUERY_TYPE_TABLE_QUERY)) {
if (TSDB_QUERY_HAS_TYPE(pQueryMsg->queryType, TSDB_QUERY_TYPE_MULTITABLE_QUERY|TSDB_QUERY_TYPE_TABLE_QUERY)) {
isSTableQuery = TSDB_QUERY_HAS_TYPE(pQueryMsg->queryType, TSDB_QUERY_TYPE_MULTITABLE_QUERY);
STableIdInfo *id = taosArrayGet(pTableIdList, 0); STableIdInfo *id = taosArrayGet(pTableIdList, 0);
qTrace("qmsg:%p query table, uid:%"PRId64", tid:%d", pQueryMsg, id->uid, id->tid);
qTrace("qmsg:%p query normal table, uid:%"PRId64", tid:%d", pQueryMsg, id->uid, id->tid);
if ((code = tsdbGetOneTableGroup(tsdb, id->uid, &groupInfo)) != TSDB_CODE_SUCCESS) { if ((code = tsdbGetOneTableGroup(tsdb, id->uid, &groupInfo)) != TSDB_CODE_SUCCESS) {
goto _over; goto _over;
} }
} else if (TSDB_QUERY_HAS_TYPE(pQueryMsg->queryType, TSDB_QUERY_TYPE_STABLE_QUERY)) { } else if (TSDB_QUERY_HAS_TYPE(pQueryMsg->queryType, TSDB_QUERY_TYPE_MULTITABLE_QUERY|TSDB_QUERY_TYPE_STABLE_QUERY)) {
isSTableQuery = true; isSTableQuery = true;
// TODO: need a macro from TSDB to check if table is super table, // TODO: need a macro from TSDB to check if table is super table
// also note there's possiblity that only one table in the super table
if (taosArrayGetSize(pTableIdList) == 1) { // also note there's possibility that only one table in the super table
if (!TSDB_QUERY_HAS_TYPE(pQueryMsg->queryType, TSDB_QUERY_TYPE_MULTITABLE_QUERY)) {
STableIdInfo *id = taosArrayGet(pTableIdList, 0); STableIdInfo *id = taosArrayGet(pTableIdList, 0);
// if array size is 1 and assert super table
// group by normal column, do not pass the group by condition to tsdb to group table into different group // group by normal column, do not pass the group by condition to tsdb to group table into different group
int32_t numOfGroupByCols = pQueryMsg->numOfGroupCols; int32_t numOfGroupByCols = pQueryMsg->numOfGroupCols;
@ -5913,15 +5921,13 @@ int32_t qCreateQueryInfo(void *tsdb, int32_t vgId, SQueryTableMsg *pQueryMsg, qi
goto _over; goto _over;
} }
} else { } else {
SArray* pTableGroup = taosArrayInit(1, POINTER_BYTES); groupInfo.pGroupList = taosArrayInit(1, POINTER_BYTES);
groupInfo.numOfTables = taosArrayGetSize(pTableIdList);
SArray* sa = taosArrayInit(groupInfo.numOfTables, sizeof(STableId)); SArray* p = taosArrayClone(pTableIdList);
for(int32_t i = 0; i < groupInfo.numOfTables; ++i) { taosArrayPush(groupInfo.pGroupList, &p);
STableIdInfo* tableId = taosArrayGet(pTableIdList, i);
taosArrayPush(sa, tableId); qTrace("qmsg:%p query on %d tables in one group from client", pQueryMsg, groupInfo.numOfTables);
}
taosArrayPush(pTableGroup, &sa);
groupInfo.pGroupList = pTableGroup;
} }
} else { } else {
assert(0); assert(0);
@ -6177,7 +6183,7 @@ static void buildTagQueryResult(SQInfo* pQInfo) {
} else { } else {
if (val == NULL) { if (val == NULL) {
setNull(output, type, bytes); setNull(output, type, bytes);
} else { } else { // todo here stop will cause client crash
memcpy(output, val, bytes); memcpy(output, val, bytes);
} }
} }

View File

@ -15,7 +15,7 @@ static int32_t STSBufUpdateHeader(STSBuf* pTSBuf, STSBufFileHeader* pHeader);
* @param path * @param path
* @return * @return
*/ */
STSBuf* tsBufCreate(bool autoDelete) { STSBuf* tsBufCreate(bool autoDelete, int32_t order) {
STSBuf* pTSBuf = calloc(1, sizeof(STSBuf)); STSBuf* pTSBuf = calloc(1, sizeof(STSBuf));
if (pTSBuf == NULL) { if (pTSBuf == NULL) {
return NULL; return NULL;
@ -40,7 +40,7 @@ STSBuf* tsBufCreate(bool autoDelete) {
pTSBuf->cur.order = TSDB_ORDER_ASC; pTSBuf->cur.order = TSDB_ORDER_ASC;
pTSBuf->autoDelete = autoDelete; pTSBuf->autoDelete = autoDelete;
pTSBuf->tsOrder = -1; pTSBuf->tsOrder = order;
return pTSBuf; return pTSBuf;
} }
@ -66,7 +66,7 @@ STSBuf* tsBufCreateFromFile(const char* path, bool autoDelete) {
// validate the file magic number // validate the file magic number
STSBufFileHeader header = {0}; STSBufFileHeader header = {0};
fseek(pTSBuf->f, 0, SEEK_SET); fseek(pTSBuf->f, 0, SEEK_SET);
fread(&header, 1, sizeof(header), pTSBuf->f); fread(&header, 1, sizeof(STSBufFileHeader), pTSBuf->f);
// invalid file // invalid file
if (header.magic != TS_COMP_FILE_MAGIC) { if (header.magic != TS_COMP_FILE_MAGIC) {
@ -119,7 +119,6 @@ STSBuf* tsBufCreateFromFile(const char* path, bool autoDelete) {
// ascending by default // ascending by default
pTSBuf->cur.order = TSDB_ORDER_ASC; pTSBuf->cur.order = TSDB_ORDER_ASC;
pTSBuf->autoDelete = autoDelete; pTSBuf->autoDelete = autoDelete;
// tscTrace("create tsBuf from file:%s, fd:%d, size:%d, numOfVnode:%d, autoDelete:%d", pTSBuf->path, fileno(pTSBuf->f), // tscTrace("create tsBuf from file:%s, fd:%d, size:%d, numOfVnode:%d, autoDelete:%d", pTSBuf->path, fileno(pTSBuf->f),
@ -537,6 +536,8 @@ int32_t STSBufUpdateHeader(STSBuf* pTSBuf, STSBufFileHeader* pHeader) {
return -1; return -1;
} }
assert(pHeader->tsOrder == TSDB_ORDER_ASC || pHeader->tsOrder == TSDB_ORDER_DESC);
int64_t r = fseek(pTSBuf->f, 0, SEEK_SET); int64_t r = fseek(pTSBuf->f, 0, SEEK_SET);
if (r != 0) { if (r != 0) {
return -1; return -1;
@ -754,7 +755,7 @@ int32_t tsBufMerge(STSBuf* pDestBuf, const STSBuf* pSrcBuf, int32_t vnodeId) {
} }
STSBuf* tsBufCreateFromCompBlocks(const char* pData, int32_t numOfBlocks, int32_t len, int32_t order) { STSBuf* tsBufCreateFromCompBlocks(const char* pData, int32_t numOfBlocks, int32_t len, int32_t order) {
STSBuf* pTSBuf = tsBufCreate(true); STSBuf* pTSBuf = tsBufCreate(true, order);
STSVnodeBlockInfo* pBlockInfo = &(addOneVnodeInfo(pTSBuf, 0)->info); STSVnodeBlockInfo* pBlockInfo = &(addOneVnodeInfo(pTSBuf, 0)->info);
pBlockInfo->numOfBlocks = numOfBlocks; pBlockInfo->numOfBlocks = numOfBlocks;
@ -846,6 +847,8 @@ STSBuf* tsBufClone(STSBuf* pTSBuf) {
return NULL; return NULL;
} }
tsBufFlush(pTSBuf);
return tsBufCreateFromFile(pTSBuf->path, false); return tsBufCreateFromFile(pTSBuf->path, false);
} }

View File

@ -2112,8 +2112,8 @@ int32_t tsdbQuerySTableByTagCond(TsdbRepoT* tsdb, uint64_t uid, const char* pTag
} }
if (pTable->type != TSDB_SUPER_TABLE) { if (pTable->type != TSDB_SUPER_TABLE) {
tsdbError("%p query normal tag not allowed, uid:%" PRIu64 ", tid:%d, name:%s", tsdbError("%p query normal tag not allowed, uid:%" PRIu64 ", tid:%d, name:%s", tsdb, uid, pTable->tableId.tid,
tsdb, uid, pTable->tableId.tid, pTable->name); pTable->name);
return TSDB_CODE_COM_OPS_NOT_SUPPORT; //basically, this error is caused by invalid sql issued by client return TSDB_CODE_COM_OPS_NOT_SUPPORT; //basically, this error is caused by invalid sql issued by client
} }
@ -2128,7 +2128,7 @@ int32_t tsdbQuerySTableByTagCond(TsdbRepoT* tsdb, uint64_t uid, const char* pTag
pGroupInfo->numOfTables = taosArrayGetSize(res); pGroupInfo->numOfTables = taosArrayGetSize(res);
pGroupInfo->pGroupList = createTableGroup(res, pTagSchema, pColIndex, numOfCols, tsdb); pGroupInfo->pGroupList = createTableGroup(res, pTagSchema, pColIndex, numOfCols, tsdb);
tsdbTrace("no tbname condition or tagcond, all tables belongs to one group, numOfTables:%d", pGroupInfo->numOfTables); tsdbTrace("%p no table name/tag condition, all tables belong to one group, numOfTables:%d", tsdb, pGroupInfo->numOfTables);
} else { } else {
// todo add error // todo add error
} }
@ -2172,6 +2172,9 @@ int32_t tsdbQuerySTableByTagCond(TsdbRepoT* tsdb, uint64_t uid, const char* pTag
pGroupInfo->numOfTables = taosArrayGetSize(res); pGroupInfo->numOfTables = taosArrayGetSize(res);
pGroupInfo->pGroupList = createTableGroup(res, pTagSchema, pColIndex, numOfCols, tsdb); pGroupInfo->pGroupList = createTableGroup(res, pTagSchema, pColIndex, numOfCols, tsdb);
tsdbTrace("%p stable tid:%d, uid:%"PRIu64" query, numOfTables:%d, belong to %d groups", tsdb, pTable->tableId.tid,
pTable->tableId.uid, pGroupInfo->numOfTables, taosArrayGetSize(pGroupInfo->pGroupList));
taosArrayDestroy(res); taosArrayDestroy(res);
return ret; return ret;
} }

View File

@ -34,18 +34,19 @@ extern "C" {
#define WCHAR wchar_t #define WCHAR wchar_t
#define tfree(x) \ #define tfree(x) \
{ \ { \
if (x) { \ if (x) { \
free((void*)(x)); \ free((void *)(x)); \
x = 0; \ x = 0; \
} \ } \
} }
#define tstrncpy(dst, src, size) do { \ #define tstrncpy(dst, src, size) \
do { \
strncpy((dst), (src), (size)); \ strncpy((dst), (src), (size)); \
(dst)[(size) - 1] = 0; \ (dst)[(size)-1] = 0; \
} while (0); } while (0);
#define tclose(x) taosCloseSocket(x) #define tclose(x) taosCloseSocket(x)

View File

@ -354,6 +354,7 @@ sql select count(*) from join_mt0, join_mt1 where join_mt0.ts = join_mt1.ts and
$val = 20 $val = 20
if $data00 != $val then if $data00 != $val then
print expect 20, actual:$data00
return -1 return -1
endi endi
@ -411,7 +412,7 @@ endi
#======================limit offset=================================== #======================limit offset===================================
# tag values not int # tag values not int
sql_error select count(*) from join_mt0, join_mt1 where join_mt0.ts=join_mt1.ts and join_mt0.t2=join_mt1.t2; sql_error select count(*) from join_mt0, join_mt1 where join_mt0.ts=join_mt1.ts and join_mt0.t2=join_mt1.t2; #!!!!!
# tag type not identical # tag type not identical
sql_error select count(*) from join_mt0, join_mt1 where join_mt1.t2 = join_mt0.t1 and join_mt1.ts=join_mt0.ts; sql_error select count(*) from join_mt0, join_mt1 where join_mt1.t2 = join_mt0.t1 and join_mt1.ts=join_mt0.ts;

View File

@ -108,6 +108,7 @@ sql select join_mt0.ts,join_mt0.ts,join_mt0.t1 from join_mt0, join_mt1 where joi
print $row print $row
if $row != 3000 then if $row != 3000 then
print expect 3000, actual: $row
return -1 return -1
endi endi