Merge pull request #5533 from taosdata/feature/TD-3295
[TD-3295]support multiple tables join
This commit is contained in:
commit
36ed687b33
|
@ -83,6 +83,22 @@ typedef struct SJoinSupporter {
|
|||
SArray* pVgroupTables;
|
||||
} SJoinSupporter;
|
||||
|
||||
|
||||
typedef struct SMergeCtx {
|
||||
SJoinSupporter* p;
|
||||
int32_t idx;
|
||||
SArray* res;
|
||||
int8_t compared;
|
||||
}SMergeCtx;
|
||||
|
||||
typedef struct SMergeTsCtx {
|
||||
SJoinSupporter* p;
|
||||
STSBuf* res;
|
||||
int64_t numOfInput;
|
||||
int8_t compared;
|
||||
}SMergeTsCtx;
|
||||
|
||||
|
||||
typedef struct SVgroupTableInfo {
|
||||
SVgroupInfo vgInfo;
|
||||
SArray* itemList; //SArray<STableIdInfo>
|
||||
|
@ -183,6 +199,7 @@ int32_t tscSqlExprCopy(SArray* dst, const SArray* src, uint64_t uid, bool deep
|
|||
void tscSqlExprInfoDestroy(SArray* pExprInfo);
|
||||
|
||||
SColumn* tscColumnClone(const SColumn* src);
|
||||
bool tscColumnExists(SArray* pColumnList, SColumnIndex* pColIndex);
|
||||
SColumn* tscColumnListInsert(SArray* pColList, SColumnIndex* colIndex);
|
||||
SArray* tscColumnListClone(const SArray* src, int16_t tableIndex);
|
||||
void tscColumnListDestroy(SArray* pColList);
|
||||
|
|
|
@ -142,15 +142,15 @@ typedef struct SCond {
|
|||
} SCond;
|
||||
|
||||
typedef struct SJoinNode {
|
||||
char tableName[TSDB_TABLE_FNAME_LEN];
|
||||
uint64_t uid;
|
||||
int16_t tagColId;
|
||||
SArray* tsJoin;
|
||||
SArray* tagJoin;
|
||||
} SJoinNode;
|
||||
|
||||
typedef struct SJoinInfo {
|
||||
bool hasJoin;
|
||||
SJoinNode left;
|
||||
SJoinNode right;
|
||||
bool hasJoin;
|
||||
SJoinNode* joinTables[TSDB_MAX_JOIN_TABLE_NUM];
|
||||
} SJoinInfo;
|
||||
|
||||
typedef struct STagCond {
|
||||
|
|
|
@ -3353,24 +3353,26 @@ static int32_t getColumnQueryCondInfo(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSq
|
|||
}
|
||||
}
|
||||
|
||||
static int32_t getJoinCondInfo(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSqlExpr* pExpr) {
|
||||
const char* msg1 = "invalid join query condition";
|
||||
const char* msg2 = "invalid table name in join query";
|
||||
static int32_t checkAndSetJoinCondInfo(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSqlExpr* pExpr) {
|
||||
int32_t code = 0;
|
||||
const char* msg1 = "timestamp required for join tables";
|
||||
const char* msg3 = "type of join columns must be identical";
|
||||
const char* msg4 = "invalid column name in join condition";
|
||||
const char* msg5 = "only support one join tag for each table";
|
||||
|
||||
if (pExpr == NULL) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (!tSqlExprIsParentOfLeaf(pExpr)) {
|
||||
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
|
||||
code = checkAndSetJoinCondInfo(pCmd, pQueryInfo, pExpr->pLeft);
|
||||
if (code) {
|
||||
return code;
|
||||
}
|
||||
|
||||
return checkAndSetJoinCondInfo(pCmd, pQueryInfo, pExpr->pRight);
|
||||
}
|
||||
|
||||
STagCond* pTagCond = &pQueryInfo->tagCond;
|
||||
SJoinNode* pLeft = &pTagCond->joinInfo.left;
|
||||
SJoinNode* pRight = &pTagCond->joinInfo.right;
|
||||
|
||||
SColumnIndex index = COLUMN_INDEX_INITIALIZER;
|
||||
if (getColumnIndexByName(pCmd, &pExpr->pLeft->colInfo, pQueryInfo, &index) != TSDB_CODE_SUCCESS) {
|
||||
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg4);
|
||||
|
@ -3379,13 +3381,28 @@ static int32_t getJoinCondInfo(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSqlExpr*
|
|||
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex);
|
||||
SSchema* pTagSchema1 = tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, index.columnIndex);
|
||||
|
||||
pLeft->uid = pTableMetaInfo->pTableMeta->id.uid;
|
||||
pLeft->tagColId = pTagSchema1->colId;
|
||||
assert(index.tableIndex >= 0 && index.tableIndex < TSDB_MAX_JOIN_TABLE_NUM);
|
||||
|
||||
int32_t code = tNameExtractFullName(&pTableMetaInfo->name, pLeft->tableName);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
|
||||
SJoinNode **leftNode = &pQueryInfo->tagCond.joinInfo.joinTables[index.tableIndex];
|
||||
if (*leftNode == NULL) {
|
||||
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
|
||||
}
|
||||
|
||||
(*leftNode)->uid = pTableMetaInfo->pTableMeta->id.uid;
|
||||
(*leftNode)->tagColId = pTagSchema1->colId;
|
||||
|
||||
if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
|
||||
index.columnIndex = index.columnIndex - tscGetNumOfColumns(pTableMetaInfo->pTableMeta);
|
||||
if (!tscColumnExists(pTableMetaInfo->tagColList, &index)) {
|
||||
tscColumnListInsert(pTableMetaInfo->tagColList, &index);
|
||||
if (taosArrayGetSize(pTableMetaInfo->tagColList) > 1) {
|
||||
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg5);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
int16_t leftIdx = index.tableIndex;
|
||||
|
||||
|
||||
index = (SColumnIndex)COLUMN_INDEX_INITIALIZER;
|
||||
if (getColumnIndexByName(pCmd, &pExpr->pRight->colInfo, pQueryInfo, &index) != TSDB_CODE_SUCCESS) {
|
||||
|
@ -3395,20 +3412,55 @@ static int32_t getJoinCondInfo(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSqlExpr*
|
|||
pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex);
|
||||
SSchema* pTagSchema2 = tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, index.columnIndex);
|
||||
|
||||
pRight->uid = pTableMetaInfo->pTableMeta->id.uid;
|
||||
pRight->tagColId = pTagSchema2->colId;
|
||||
assert(index.tableIndex >= 0 && index.tableIndex < TSDB_MAX_JOIN_TABLE_NUM);
|
||||
|
||||
code = tNameExtractFullName(&pTableMetaInfo->name, pRight->tableName);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
|
||||
SJoinNode **rightNode = &pQueryInfo->tagCond.joinInfo.joinTables[index.tableIndex];
|
||||
if (*rightNode == NULL) {
|
||||
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
|
||||
}
|
||||
|
||||
(*rightNode)->uid = pTableMetaInfo->pTableMeta->id.uid;
|
||||
(*rightNode)->tagColId = pTagSchema2->colId;
|
||||
|
||||
if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
|
||||
index.columnIndex = index.columnIndex - tscGetNumOfColumns(pTableMetaInfo->pTableMeta);
|
||||
if (!tscColumnExists(pTableMetaInfo->tagColList, &index)) {
|
||||
tscColumnListInsert(pTableMetaInfo->tagColList, &index);
|
||||
if (taosArrayGetSize(pTableMetaInfo->tagColList) > 1) {
|
||||
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg5);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
int16_t rightIdx = index.tableIndex;
|
||||
|
||||
if (pTagSchema1->type != pTagSchema2->type) {
|
||||
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3);
|
||||
}
|
||||
|
||||
pTagCond->joinInfo.hasJoin = true;
|
||||
if ((*leftNode)->tagJoin == NULL) {
|
||||
(*leftNode)->tagJoin = taosArrayInit(2, sizeof(int16_t));
|
||||
}
|
||||
|
||||
if ((*rightNode)->tagJoin == NULL) {
|
||||
(*rightNode)->tagJoin = taosArrayInit(2, sizeof(int16_t));
|
||||
}
|
||||
|
||||
taosArrayPush((*leftNode)->tagJoin, &rightIdx);
|
||||
taosArrayPush((*rightNode)->tagJoin, &leftIdx);
|
||||
|
||||
pQueryInfo->tagCond.joinInfo.hasJoin = true;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
||||
}
|
||||
|
||||
static int32_t getJoinCondInfo(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSqlExpr* pExpr) {
|
||||
if (pExpr == NULL) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
return checkAndSetJoinCondInfo(pCmd, pQueryInfo, pExpr);
|
||||
}
|
||||
|
||||
static int32_t validateSQLExpr(SSqlCmd* pCmd, tSqlExpr* pExpr, SQueryInfo* pQueryInfo, SColumnList* pList,
|
||||
|
@ -3674,7 +3726,7 @@ static int32_t handleExprInQueryCond(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSql
|
|||
const char* msg1 = "table query cannot use tags filter";
|
||||
const char* msg2 = "illegal column name";
|
||||
const char* msg3 = "only one query time range allowed";
|
||||
const char* msg4 = "only one join condition allowed";
|
||||
const char* msg4 = "too many join tables";
|
||||
const char* msg5 = "not support ordinary column join";
|
||||
const char* msg6 = "only one query condition on tbname allowed";
|
||||
const char* msg7 = "only in/like allowed in filter table name";
|
||||
|
@ -3705,6 +3757,47 @@ static int32_t handleExprInQueryCond(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSql
|
|||
TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_QUERY);
|
||||
pCondExpr->tsJoin = true;
|
||||
|
||||
assert(index.tableIndex >= 0 && index.tableIndex < TSDB_MAX_JOIN_TABLE_NUM);
|
||||
SJoinNode **leftNode = &pQueryInfo->tagCond.joinInfo.joinTables[index.tableIndex];
|
||||
if (*leftNode == NULL) {
|
||||
*leftNode = calloc(1, sizeof(SJoinNode));
|
||||
if (*leftNode == NULL) {
|
||||
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
|
||||
int16_t leftIdx = index.tableIndex;
|
||||
|
||||
SColumnIndex index = COLUMN_INDEX_INITIALIZER;
|
||||
if (getColumnIndexByName(pCmd, &pRight->colInfo, pQueryInfo, &index) != TSDB_CODE_SUCCESS) {
|
||||
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
|
||||
}
|
||||
|
||||
if (index.tableIndex < 0 || index.tableIndex >= TSDB_MAX_JOIN_TABLE_NUM) {
|
||||
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg4);
|
||||
}
|
||||
|
||||
SJoinNode **rightNode = &pQueryInfo->tagCond.joinInfo.joinTables[index.tableIndex];
|
||||
if (*rightNode == NULL) {
|
||||
*rightNode = calloc(1, sizeof(SJoinNode));
|
||||
if (*rightNode == NULL) {
|
||||
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
|
||||
int16_t rightIdx = index.tableIndex;
|
||||
|
||||
if ((*leftNode)->tsJoin == NULL) {
|
||||
(*leftNode)->tsJoin = taosArrayInit(2, sizeof(int16_t));
|
||||
}
|
||||
|
||||
if ((*rightNode)->tsJoin == NULL) {
|
||||
(*rightNode)->tsJoin = taosArrayInit(2, sizeof(int16_t));
|
||||
}
|
||||
|
||||
taosArrayPush((*leftNode)->tsJoin, &rightIdx);
|
||||
taosArrayPush((*rightNode)->tsJoin, &leftIdx);
|
||||
|
||||
/*
|
||||
* to release expression, e.g., m1.ts = m2.ts,
|
||||
* since this expression is used to set the join query type
|
||||
|
@ -3762,10 +3855,6 @@ static int32_t handleExprInQueryCond(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSql
|
|||
return TSDB_CODE_TSC_INVALID_SQL;
|
||||
}
|
||||
|
||||
if (pCondExpr->pJoinExpr != NULL) {
|
||||
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg4);
|
||||
}
|
||||
|
||||
pQueryInfo->type |= TSDB_QUERY_TYPE_JOIN_QUERY;
|
||||
ret = setExprToCond(&pCondExpr->pJoinExpr, *pExpr, NULL, parentOptr, pQueryInfo->msg);
|
||||
*pExpr = NULL;
|
||||
|
@ -3993,7 +4082,8 @@ static bool validateFilterExpr(SQueryInfo* pQueryInfo) {
|
|||
static int32_t getTimeRangeFromExpr(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSqlExpr* pExpr) {
|
||||
const char* msg0 = "invalid timestamp";
|
||||
const char* msg1 = "only one time stamp window allowed";
|
||||
|
||||
int32_t code = 0;
|
||||
|
||||
if (pExpr == NULL) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -4003,8 +4093,11 @@ static int32_t getTimeRangeFromExpr(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSqlE
|
|||
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
|
||||
}
|
||||
|
||||
getTimeRangeFromExpr(pCmd, pQueryInfo, pExpr->pLeft);
|
||||
|
||||
code = getTimeRangeFromExpr(pCmd, pQueryInfo, pExpr->pLeft);
|
||||
if (code) {
|
||||
return code;
|
||||
}
|
||||
|
||||
return getTimeRangeFromExpr(pCmd, pQueryInfo, pExpr->pRight);
|
||||
} else {
|
||||
SColumnIndex index = COLUMN_INDEX_INITIALIZER;
|
||||
|
@ -4085,6 +4178,7 @@ static void cleanQueryExpr(SCondExpr* pCondExpr) {
|
|||
}
|
||||
}
|
||||
|
||||
/*
|
||||
static void doAddJoinTagsColumnsIntoTagList(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SCondExpr* pCondExpr) {
|
||||
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
|
||||
if (QUERY_IS_JOIN_QUERY(pQueryInfo->type) && UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
|
||||
|
@ -4107,6 +4201,7 @@ static void doAddJoinTagsColumnsIntoTagList(SSqlCmd* pCmd, SQueryInfo* pQueryInf
|
|||
tscColumnListInsert(pTableMetaInfo->tagColList, &index);
|
||||
}
|
||||
}
|
||||
*/
|
||||
|
||||
static int32_t validateTagCondExpr(SSqlCmd* pCmd, tExprNode *p) {
|
||||
const char *msg1 = "invalid tag operator";
|
||||
|
@ -4250,6 +4345,102 @@ static int32_t getTagQueryCondExpr(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SCondE
|
|||
return ret;
|
||||
}
|
||||
|
||||
int32_t validateJoinNodes(SQueryInfo* pQueryInfo, SSqlObj* pSql) {
|
||||
const char* msg1 = "timestamp required for join tables";
|
||||
const char* msg2 = "tag required for join stables";
|
||||
|
||||
for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
|
||||
SJoinNode *node = pQueryInfo->tagCond.joinInfo.joinTables[i];
|
||||
|
||||
if (node == NULL || node->tsJoin == NULL || taosArrayGetSize(node->tsJoin) <= 0) {
|
||||
return invalidSqlErrMsg(tscGetErrorMsgPayload(&pSql->cmd), msg1);
|
||||
}
|
||||
}
|
||||
|
||||
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
|
||||
if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
|
||||
for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
|
||||
SJoinNode *node = pQueryInfo->tagCond.joinInfo.joinTables[i];
|
||||
|
||||
if (node == NULL || node->tagJoin == NULL || taosArrayGetSize(node->tagJoin) <= 0) {
|
||||
return invalidSqlErrMsg(tscGetErrorMsgPayload(&pSql->cmd), msg2);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
void mergeJoinNodesImpl(int8_t* r, int8_t* p, int16_t* tidx, SJoinNode** nodes, int32_t type) {
|
||||
SJoinNode *node = nodes[*tidx];
|
||||
SArray* arr = (type == 0) ? node->tsJoin : node->tagJoin;
|
||||
size_t size = taosArrayGetSize(arr);
|
||||
|
||||
p[*tidx] = 1;
|
||||
|
||||
for (int32_t j = 0; j < size; j++) {
|
||||
int16_t* idx = taosArrayGet(arr, j);
|
||||
r[*idx] = 1;
|
||||
if (p[*idx] == 0) {
|
||||
mergeJoinNodesImpl(r, p, idx, nodes, type);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
int32_t mergeJoinNodes(SQueryInfo* pQueryInfo, SSqlObj* pSql) {
|
||||
const char* msg1 = "not all join tables have same timestamp";
|
||||
const char* msg2 = "not all join tables have same tag";
|
||||
|
||||
int8_t r[TSDB_MAX_JOIN_TABLE_NUM] = {0};
|
||||
int8_t p[TSDB_MAX_JOIN_TABLE_NUM] = {0};
|
||||
|
||||
for (int16_t i = 0; i < pQueryInfo->numOfTables; ++i) {
|
||||
mergeJoinNodesImpl(r, p, &i, pQueryInfo->tagCond.joinInfo.joinTables, 0);
|
||||
|
||||
taosArrayClear(pQueryInfo->tagCond.joinInfo.joinTables[i]->tsJoin);
|
||||
|
||||
for (int32_t j = 0; j < TSDB_MAX_JOIN_TABLE_NUM; ++j) {
|
||||
if (r[j]) {
|
||||
taosArrayPush(pQueryInfo->tagCond.joinInfo.joinTables[i]->tsJoin, &j);
|
||||
}
|
||||
}
|
||||
|
||||
memset(r, 0, sizeof(r));
|
||||
memset(p, 0, sizeof(p));
|
||||
}
|
||||
|
||||
if (taosArrayGetSize(pQueryInfo->tagCond.joinInfo.joinTables[0]->tsJoin) != pQueryInfo->numOfTables) {
|
||||
return invalidSqlErrMsg(tscGetErrorMsgPayload(&pSql->cmd), msg1);
|
||||
}
|
||||
|
||||
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
|
||||
if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
|
||||
for (int16_t i = 0; i < pQueryInfo->numOfTables; ++i) {
|
||||
mergeJoinNodesImpl(r, p, &i, pQueryInfo->tagCond.joinInfo.joinTables, 1);
|
||||
|
||||
taosArrayClear(pQueryInfo->tagCond.joinInfo.joinTables[i]->tagJoin);
|
||||
|
||||
for (int32_t j = 0; j < TSDB_MAX_JOIN_TABLE_NUM; ++j) {
|
||||
if (r[j]) {
|
||||
taosArrayPush(pQueryInfo->tagCond.joinInfo.joinTables[i]->tagJoin, &j);
|
||||
}
|
||||
}
|
||||
|
||||
memset(r, 0, sizeof(r));
|
||||
memset(p, 0, sizeof(p));
|
||||
}
|
||||
|
||||
if (taosArrayGetSize(pQueryInfo->tagCond.joinInfo.joinTables[0]->tagJoin) != pQueryInfo->numOfTables) {
|
||||
return invalidSqlErrMsg(tscGetErrorMsgPayload(&pSql->cmd), msg2);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
int32_t parseWhereClause(SQueryInfo* pQueryInfo, tSqlExpr** pExpr, SSqlObj* pSql) {
|
||||
if (pExpr == NULL) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -4295,17 +4486,17 @@ int32_t parseWhereClause(SQueryInfo* pQueryInfo, tSqlExpr** pExpr, SSqlObj* pSql
|
|||
|
||||
// 4. get the table name query condition
|
||||
if ((ret = getTablenameCond(&pSql->cmd, pQueryInfo, condExpr.pTableCond, &sb)) != TSDB_CODE_SUCCESS) {
|
||||
return ret;
|
||||
goto PARSE_WHERE_EXIT;
|
||||
}
|
||||
|
||||
// 5. other column query condition
|
||||
if ((ret = getColumnQueryCondInfo(&pSql->cmd, pQueryInfo, condExpr.pColumnCond, TK_AND)) != TSDB_CODE_SUCCESS) {
|
||||
return ret;
|
||||
goto PARSE_WHERE_EXIT;
|
||||
}
|
||||
|
||||
// 6. join condition
|
||||
if ((ret = getJoinCondInfo(&pSql->cmd, pQueryInfo, condExpr.pJoinExpr)) != TSDB_CODE_SUCCESS) {
|
||||
return ret;
|
||||
goto PARSE_WHERE_EXIT;
|
||||
}
|
||||
|
||||
// 7. query condition for table name
|
||||
|
@ -4313,12 +4504,29 @@ int32_t parseWhereClause(SQueryInfo* pQueryInfo, tSqlExpr** pExpr, SSqlObj* pSql
|
|||
|
||||
ret = setTableCondForSTableQuery(&pSql->cmd, pQueryInfo, getAccountId(pSql), condExpr.pTableCond, condExpr.tableCondIndex, &sb);
|
||||
taosStringBuilderDestroy(&sb);
|
||||
|
||||
if (!validateFilterExpr(pQueryInfo)) {
|
||||
return invalidSqlErrMsg(tscGetErrorMsgPayload(&pSql->cmd), msg2);
|
||||
if (ret) {
|
||||
goto PARSE_WHERE_EXIT;
|
||||
}
|
||||
|
||||
doAddJoinTagsColumnsIntoTagList(&pSql->cmd, pQueryInfo, &condExpr);
|
||||
if (!validateFilterExpr(pQueryInfo)) {
|
||||
ret = invalidSqlErrMsg(tscGetErrorMsgPayload(&pSql->cmd), msg2);
|
||||
goto PARSE_WHERE_EXIT;
|
||||
}
|
||||
|
||||
//doAddJoinTagsColumnsIntoTagList(&pSql->cmd, pQueryInfo, &condExpr);
|
||||
if (condExpr.tsJoin) {
|
||||
ret = validateJoinNodes(pQueryInfo, pSql);
|
||||
if (ret) {
|
||||
goto PARSE_WHERE_EXIT;
|
||||
}
|
||||
|
||||
ret = mergeJoinNodes(pQueryInfo, pSql);
|
||||
if (ret) {
|
||||
goto PARSE_WHERE_EXIT;
|
||||
}
|
||||
}
|
||||
|
||||
PARSE_WHERE_EXIT:
|
||||
|
||||
cleanQueryExpr(&condExpr);
|
||||
return ret;
|
||||
|
@ -6531,7 +6739,6 @@ int32_t doValidateSqlNode(SSqlObj* pSql, SQuerySqlNode* pQuerySqlNode, int32_t i
|
|||
const char* msg1 = "point interpolation query needs timestamp";
|
||||
const char* msg2 = "fill only available for interval query";
|
||||
const char* msg3 = "start(end) time of query range required or time range too large";
|
||||
const char* msg4 = "illegal number of tables in from clause";
|
||||
const char* msg5 = "too many columns in selection clause";
|
||||
const char* msg6 = "too many tables in from clause";
|
||||
const char* msg7 = "invalid table alias name";
|
||||
|
@ -6568,14 +6775,11 @@ int32_t doValidateSqlNode(SSqlObj* pSql, SQuerySqlNode* pQuerySqlNode, int32_t i
|
|||
}
|
||||
|
||||
size_t fromSize = taosArrayGetSize(pQuerySqlNode->from->tableList);
|
||||
if (fromSize > TSDB_MAX_JOIN_TABLE_NUM) {
|
||||
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg4);
|
||||
if (fromSize > TSDB_MAX_JOIN_TABLE_NUM * 2) {
|
||||
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg6);
|
||||
}
|
||||
|
||||
pQueryInfo->command = TSDB_SQL_SELECT;
|
||||
if (fromSize > 2) {
|
||||
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg6);
|
||||
}
|
||||
|
||||
// set all query tables, which are maybe more than one.
|
||||
for (int32_t i = 0; i < fromSize; ++i) {
|
||||
|
|
|
@ -46,6 +46,13 @@ static int32_t tsCompare(int32_t order, int64_t left, int64_t right) {
|
|||
}
|
||||
|
||||
static void skipRemainValue(STSBuf* pTSBuf, tVariant* tag1) {
|
||||
STSElem el1 = tsBufGetElem(pTSBuf);
|
||||
|
||||
int32_t res = tVariantCompare(el1.tag, tag1);
|
||||
if (res != 0) { // it is a record with new tag
|
||||
return;
|
||||
}
|
||||
|
||||
while (tsBufNextPos(pTSBuf)) {
|
||||
STSElem el1 = tsBufGetElem(pTSBuf);
|
||||
|
||||
|
@ -118,123 +125,233 @@ static bool subAndCheckDone(SSqlObj *pSql, SSqlObj *pParentSql, int idx) {
|
|||
|
||||
|
||||
|
||||
static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSupporter* pSupporter1, SJoinSupporter* pSupporter2, STimeWindow * win) {
|
||||
static int64_t doTSBlockIntersect(SSqlObj* pSql, STimeWindow * win) {
|
||||
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex);
|
||||
|
||||
STSBuf* output1 = tsBufCreate(true, pQueryInfo->order.order);
|
||||
STSBuf* output2 = tsBufCreate(true, pQueryInfo->order.order);
|
||||
|
||||
win->skey = INT64_MAX;
|
||||
win->ekey = INT64_MIN;
|
||||
|
||||
SLimitVal* pLimit = &pQueryInfo->limit;
|
||||
int32_t order = pQueryInfo->order.order;
|
||||
int32_t joinNum = pSql->subState.numOfSub;
|
||||
SMergeTsCtx ctxlist[TSDB_MAX_JOIN_TABLE_NUM] = {{0}};
|
||||
SMergeTsCtx* ctxStack[TSDB_MAX_JOIN_TABLE_NUM] = {0};
|
||||
int32_t slot = 0;
|
||||
size_t tableNum = 0;
|
||||
int16_t* tableMIdx = 0;
|
||||
int32_t equalNum = 0;
|
||||
int32_t stackidx = 0;
|
||||
SMergeTsCtx* ctx = NULL;
|
||||
SMergeTsCtx* pctx = NULL;
|
||||
SMergeTsCtx* mainCtx = NULL;
|
||||
STSElem cur;
|
||||
STSElem prev;
|
||||
SArray* tsCond = NULL;
|
||||
int32_t mergeDone = 0;
|
||||
|
||||
SQueryInfo* pSubQueryInfo1 = tscGetQueryInfoDetail(&pSql->pSubs[0]->cmd, 0);
|
||||
SQueryInfo* pSubQueryInfo2 = tscGetQueryInfoDetail(&pSql->pSubs[1]->cmd, 0);
|
||||
for (int32_t i = 0; i < joinNum; ++i) {
|
||||
STSBuf* output = tsBufCreate(true, pQueryInfo->order.order);
|
||||
SQueryInfo* pSubQueryInfo = tscGetQueryInfoDetail(&pSql->pSubs[i]->cmd, 0);
|
||||
|
||||
pSubQueryInfo1->tsBuf = output1;
|
||||
pSubQueryInfo2->tsBuf = output2;
|
||||
pSubQueryInfo->tsBuf = output;
|
||||
|
||||
SJoinSupporter* pSupporter = pSql->pSubs[i]->param;
|
||||
|
||||
if (pSupporter->pTSBuf == NULL) {
|
||||
tscDebug("%p at least one ts-comp is empty, 0 for secondary query after ts blocks intersecting", pSql);
|
||||
return 0;
|
||||
}
|
||||
|
||||
tsBufResetPos(pSupporter->pTSBuf);
|
||||
|
||||
if (!tsBufNextPos(pSupporter->pTSBuf)) {
|
||||
tscDebug("%p input1 is empty, 0 for secondary query after ts blocks intersecting", pSql);
|
||||
return 0;
|
||||
}
|
||||
|
||||
tscDebug("%p sub:%p table idx:%d, input group number:%d", pSql, pSql->pSubs[i], i, pSupporter->pTSBuf->numOfGroups);
|
||||
|
||||
ctxlist[i].p = pSupporter;
|
||||
ctxlist[i].res = output;
|
||||
}
|
||||
|
||||
TSKEY st = taosGetTimestampUs();
|
||||
|
||||
// no result generated, return directly
|
||||
if (pSupporter1->pTSBuf == NULL || pSupporter2->pTSBuf == NULL) {
|
||||
tscDebug("%p at least one ts-comp is empty, 0 for secondary query after ts blocks intersecting", pSql);
|
||||
return 0;
|
||||
}
|
||||
|
||||
tsBufResetPos(pSupporter1->pTSBuf);
|
||||
tsBufResetPos(pSupporter2->pTSBuf);
|
||||
|
||||
if (!tsBufNextPos(pSupporter1->pTSBuf)) {
|
||||
tsBufFlush(output1);
|
||||
tsBufFlush(output2);
|
||||
|
||||
tscDebug("%p input1 is empty, 0 for secondary query after ts blocks intersecting", pSql);
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (!tsBufNextPos(pSupporter2->pTSBuf)) {
|
||||
tsBufFlush(output1);
|
||||
tsBufFlush(output2);
|
||||
|
||||
tscDebug("%p input2 is empty, 0 for secondary query after ts blocks intersecting", pSql);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int64_t numOfInput1 = 1;
|
||||
int64_t numOfInput2 = 1;
|
||||
|
||||
while(1) {
|
||||
STSElem elem = tsBufGetElem(pSupporter1->pTSBuf);
|
||||
|
||||
// no data in pSupporter1 anymore, jump out of loop
|
||||
if (!tsBufIsValidElem(&elem)) {
|
||||
break;
|
||||
for (int16_t tidx = 0; tidx < joinNum; tidx++) {
|
||||
pctx = &ctxlist[tidx];
|
||||
if (pctx->compared) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// find the data in supporter2 with the same tag value
|
||||
STSElem e2 = tsBufFindElemStartPosByTag(pSupporter2->pTSBuf, elem.tag);
|
||||
assert(pctx->numOfInput == 0);
|
||||
|
||||
/**
|
||||
* there are elements in pSupporter2 with the same tag, continue
|
||||
*/
|
||||
tVariant tag1 = {0};
|
||||
tVariantAssign(&tag1, elem.tag);
|
||||
tsCond = pQueryInfo->tagCond.joinInfo.joinTables[tidx]->tsJoin;
|
||||
|
||||
tableNum = taosArrayGetSize(tsCond);
|
||||
assert(tableNum >= 2);
|
||||
|
||||
for (int32_t i = 0; i < tableNum; ++i) {
|
||||
tableMIdx = taosArrayGet(tsCond, i);
|
||||
SMergeTsCtx* tctx = &ctxlist[*tableMIdx];
|
||||
tctx->compared = 1;
|
||||
}
|
||||
|
||||
tableMIdx = taosArrayGet(tsCond, 0);
|
||||
pctx = &ctxlist[*tableMIdx];
|
||||
|
||||
mainCtx = pctx;
|
||||
|
||||
while (1) {
|
||||
pctx = mainCtx;
|
||||
|
||||
prev = tsBufGetElem(pctx->p->pTSBuf);
|
||||
|
||||
ctxStack[stackidx++] = pctx;
|
||||
|
||||
if (!tsBufIsValidElem(&prev)) {
|
||||
break;
|
||||
}
|
||||
|
||||
tVariant tag = {0};
|
||||
tVariantAssign(&tag, prev.tag);
|
||||
|
||||
int32_t skipped = 0;
|
||||
|
||||
for (int32_t i = 1; i < tableNum; ++i) {
|
||||
SMergeTsCtx* tctx = &ctxlist[i];
|
||||
|
||||
// find the data in supporter2 with the same tag value
|
||||
STSElem e2 = tsBufFindElemStartPosByTag(tctx->p->pTSBuf, &tag);
|
||||
|
||||
if (!tsBufIsValidElem(&e2)) {
|
||||
skipRemainValue(pctx->p->pTSBuf, &tag);
|
||||
skipped = 1;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (skipped) {
|
||||
slot = 0;
|
||||
stackidx = 0;
|
||||
continue;
|
||||
}
|
||||
|
||||
tableMIdx = taosArrayGet(tsCond, ++slot);
|
||||
equalNum = 1;
|
||||
|
||||
if (tsBufIsValidElem(&e2)) {
|
||||
while (1) {
|
||||
STSElem elem1 = tsBufGetElem(pSupporter1->pTSBuf);
|
||||
STSElem elem2 = tsBufGetElem(pSupporter2->pTSBuf);
|
||||
ctx = &ctxlist[*tableMIdx];
|
||||
|
||||
prev = tsBufGetElem(pctx->p->pTSBuf);
|
||||
cur = tsBufGetElem(ctx->p->pTSBuf);
|
||||
|
||||
// data with current are exhausted
|
||||
if (!tsBufIsValidElem(&elem1) || tVariantCompare(elem1.tag, &tag1) != 0) {
|
||||
if (!tsBufIsValidElem(&prev) || tVariantCompare(prev.tag, &tag) != 0) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (!tsBufIsValidElem(&elem2) || tVariantCompare(elem2.tag, &tag1) != 0) { // ignore all records with the same tag
|
||||
skipRemainValue(pSupporter1->pTSBuf, &tag1);
|
||||
if (!tsBufIsValidElem(&cur) || tVariantCompare(cur.tag, &tag) != 0) { // ignore all records with the same tag
|
||||
break;
|
||||
}
|
||||
|
||||
/*
|
||||
* in case of stable query, limit/offset is not applied here. the limit/offset is applied to the
|
||||
* final results which is acquired after the secondary merge of in the client.
|
||||
*/
|
||||
int32_t re = tsCompare(order, elem1.ts, elem2.ts);
|
||||
if (re < 0) {
|
||||
tsBufNextPos(pSupporter1->pTSBuf);
|
||||
numOfInput1++;
|
||||
} else if (re > 0) {
|
||||
tsBufNextPos(pSupporter2->pTSBuf);
|
||||
numOfInput2++;
|
||||
} else {
|
||||
ctxStack[stackidx++] = ctx;
|
||||
|
||||
int32_t ret = tsCompare(order, prev.ts, cur.ts);
|
||||
if (ret == 0) {
|
||||
if (++equalNum < tableNum) {
|
||||
pctx = ctx;
|
||||
|
||||
if (++slot >= tableNum) {
|
||||
slot = 0;
|
||||
}
|
||||
|
||||
tableMIdx = taosArrayGet(tsCond, slot);
|
||||
continue;
|
||||
}
|
||||
|
||||
assert(stackidx == tableNum);
|
||||
|
||||
if (pLimit->offset == 0 || pQueryInfo->interval.interval > 0 || QUERY_IS_STABLE_QUERY(pQueryInfo->type)) {
|
||||
if (win->skey > elem1.ts) {
|
||||
win->skey = elem1.ts;
|
||||
if (win->skey > prev.ts) {
|
||||
win->skey = prev.ts;
|
||||
}
|
||||
|
||||
if (win->ekey < prev.ts) {
|
||||
win->ekey = prev.ts;
|
||||
}
|
||||
|
||||
if (win->ekey < elem1.ts) {
|
||||
win->ekey = elem1.ts;
|
||||
for (int32_t i = 0; i < stackidx; ++i) {
|
||||
SMergeTsCtx* tctx = ctxStack[i];
|
||||
prev = tsBufGetElem(tctx->p->pTSBuf);
|
||||
|
||||
tsBufAppend(tctx->res, prev.id, prev.tag, (const char*)&prev.ts, sizeof(prev.ts));
|
||||
}
|
||||
|
||||
tsBufAppend(output1, elem1.id, elem1.tag, (const char*)&elem1.ts, sizeof(elem1.ts));
|
||||
tsBufAppend(output2, elem2.id, elem2.tag, (const char*)&elem2.ts, sizeof(elem2.ts));
|
||||
} else {
|
||||
pLimit->offset -= 1;//offset apply to projection?
|
||||
}
|
||||
|
||||
tsBufNextPos(pSupporter1->pTSBuf);
|
||||
numOfInput1++;
|
||||
for (int32_t i = 0; i < stackidx; ++i) {
|
||||
SMergeTsCtx* tctx = ctxStack[i];
|
||||
|
||||
if (!tsBufNextPos(tctx->p->pTSBuf) && tctx == mainCtx) {
|
||||
mergeDone = 1;
|
||||
}
|
||||
tctx->numOfInput++;
|
||||
}
|
||||
|
||||
tsBufNextPos(pSupporter2->pTSBuf);
|
||||
numOfInput2++;
|
||||
if (mergeDone) {
|
||||
break;
|
||||
}
|
||||
|
||||
stackidx = 0;
|
||||
equalNum = 1;
|
||||
|
||||
ctxStack[stackidx++] = pctx;
|
||||
} else if (ret > 0) {
|
||||
if (!tsBufNextPos(ctx->p->pTSBuf) && ctx == mainCtx) {
|
||||
mergeDone = 1;
|
||||
break;
|
||||
}
|
||||
|
||||
ctx->numOfInput++;
|
||||
stackidx--;
|
||||
} else {
|
||||
stackidx--;
|
||||
|
||||
for (int32_t i = 0; i < stackidx; ++i) {
|
||||
SMergeTsCtx* tctx = ctxStack[i];
|
||||
|
||||
if (!tsBufNextPos(tctx->p->pTSBuf) && tctx == mainCtx) {
|
||||
mergeDone = 1;
|
||||
}
|
||||
tctx->numOfInput++;
|
||||
}
|
||||
|
||||
if (mergeDone) {
|
||||
break;
|
||||
}
|
||||
|
||||
stackidx = 0;
|
||||
equalNum = 1;
|
||||
|
||||
ctxStack[stackidx++] = pctx;
|
||||
}
|
||||
|
||||
}
|
||||
} else { // no data in pSupporter2, ignore current data in pSupporter2
|
||||
skipRemainValue(pSupporter1->pTSBuf, &tag1);
|
||||
|
||||
if (mergeDone) {
|
||||
break;
|
||||
}
|
||||
|
||||
slot = 0;
|
||||
stackidx = 0;
|
||||
|
||||
skipRemainValue(mainCtx->p->pTSBuf, &tag);
|
||||
}
|
||||
|
||||
stackidx = 0;
|
||||
slot = 0;
|
||||
mergeDone = 0;
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -242,28 +359,32 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSupporter* pSupporter1, SJ
|
|||
* 1. only one element
|
||||
* 2. only one element for each tag.
|
||||
*/
|
||||
if (output1->tsOrder == -1) {
|
||||
output1->tsOrder = TSDB_ORDER_ASC;
|
||||
output2->tsOrder = TSDB_ORDER_ASC;
|
||||
if (ctxlist[0].res->tsOrder == -1) {
|
||||
for (int32_t i = 0; i < joinNum; ++i) {
|
||||
ctxlist[i].res->tsOrder = TSDB_ORDER_ASC;
|
||||
}
|
||||
}
|
||||
|
||||
tsBufFlush(output1);
|
||||
tsBufFlush(output2);
|
||||
|
||||
tsBufDestroy(pSupporter1->pTSBuf);
|
||||
pSupporter1->pTSBuf = NULL;
|
||||
tsBufDestroy(pSupporter2->pTSBuf);
|
||||
pSupporter2->pTSBuf = NULL;
|
||||
for (int32_t i = 0; i < joinNum; ++i) {
|
||||
tsBufFlush(ctxlist[i].res);
|
||||
|
||||
tsBufDestroy(ctxlist[i].p->pTSBuf);
|
||||
ctxlist[i].p->pTSBuf = NULL;
|
||||
}
|
||||
|
||||
TSKEY et = taosGetTimestampUs();
|
||||
tscDebug("%p input1:%" PRId64 ", input2:%" PRId64 ", final:%" PRId64 " in %d vnodes for secondary query after ts blocks "
|
||||
"intersecting, skey:%" PRId64 ", ekey:%" PRId64 ", numOfVnode:%d, elapsed time:%" PRId64 " us",
|
||||
pSql, numOfInput1, numOfInput2, output1->numOfTotal, output1->numOfGroups, win->skey, win->ekey,
|
||||
tsBufGetNumOfGroup(output1), et - st);
|
||||
|
||||
return output1->numOfTotal;
|
||||
for (int32_t i = 0; i < joinNum; ++i) {
|
||||
tscDebug("%p sub:%p tblidx:%d, input:%" PRId64 ", final:%" PRId64 " in %d vnodes for secondary query after ts blocks "
|
||||
"intersecting, skey:%" PRId64 ", ekey:%" PRId64 ", numOfVnode:%d, elapsed time:%" PRId64 " us",
|
||||
pSql, pSql->pSubs[i], i, ctxlist[i].numOfInput, ctxlist[i].res->numOfTotal, ctxlist[i].res->numOfGroups, win->skey, win->ekey,
|
||||
tsBufGetNumOfGroup(ctxlist[i].res), et - st);
|
||||
}
|
||||
|
||||
return ctxlist[0].res->numOfTotal;
|
||||
}
|
||||
|
||||
|
||||
// todo handle failed to create sub query
|
||||
SJoinSupporter* tscCreateJoinSupporter(SSqlObj* pSql, int32_t index) {
|
||||
SJoinSupporter* pSupporter = calloc(1, sizeof(SJoinSupporter));
|
||||
|
@ -768,76 +889,218 @@ static bool checkForDuplicateTagVal(SSchema* pColSchema, SJoinSupporter* p1, SSq
|
|||
return true;
|
||||
}
|
||||
|
||||
static int32_t getIntersectionOfTableTuple(SQueryInfo* pQueryInfo, SSqlObj* pParentSql, SArray** s1, SArray** s2) {
|
||||
SJoinSupporter* p1 = pParentSql->pSubs[0]->param;
|
||||
SJoinSupporter* p2 = pParentSql->pSubs[1]->param;
|
||||
|
||||
tscDebug("%p all subquery retrieve <tid, tags> complete, do tags match, %d, %d", pParentSql, p1->num, p2->num);
|
||||
|
||||
// sort according to the tag value
|
||||
qsort(p1->pIdTagList, p1->num, p1->tagSize, tagValCompar);
|
||||
qsort(p2->pIdTagList, p2->num, p2->tagSize, tagValCompar);
|
||||
|
||||
static int32_t getIntersectionOfTableTuple(SQueryInfo* pQueryInfo, SSqlObj* pParentSql, SArray* resList) {
|
||||
int16_t joinNum = pParentSql->subState.numOfSub;
|
||||
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
|
||||
int16_t tagColId = tscGetJoinTagColIdByUid(&pQueryInfo->tagCond, pTableMetaInfo->pTableMeta->id.uid);
|
||||
|
||||
SSchema* pColSchema = tscGetColumnSchemaById(pTableMetaInfo->pTableMeta, tagColId);
|
||||
SJoinSupporter* p0 = pParentSql->pSubs[0]->param;
|
||||
SMergeCtx ctxlist[TSDB_MAX_JOIN_TABLE_NUM] = {{0}};
|
||||
SMergeCtx* ctxStack[TSDB_MAX_JOIN_TABLE_NUM] = {0};
|
||||
|
||||
// int16_t for padding
|
||||
int32_t size = p1->tagSize - sizeof(int16_t);
|
||||
*s1 = taosArrayInit(p1->num, size);
|
||||
*s2 = taosArrayInit(p2->num, size);
|
||||
int32_t size = p0->tagSize - sizeof(int16_t);
|
||||
|
||||
if (!(checkForDuplicateTagVal(pColSchema, p1, pParentSql) && checkForDuplicateTagVal(pColSchema, p2, pParentSql))) {
|
||||
return TSDB_CODE_QRY_DUP_JOIN_KEY;
|
||||
}
|
||||
SSchema* pColSchema = tscGetColumnSchemaById(pTableMetaInfo->pTableMeta, tagColId);
|
||||
|
||||
tscDebug("%p all subquery retrieve <tid, tags> complete, do tags match", pParentSql);
|
||||
|
||||
int32_t i = 0, j = 0;
|
||||
while(i < p1->num && j < p2->num) {
|
||||
STidTags* pp1 = (STidTags*) varDataVal(p1->pIdTagList + i * p1->tagSize);
|
||||
STidTags* pp2 = (STidTags*) varDataVal(p2->pIdTagList + j * p2->tagSize);
|
||||
assert(pp1->tid != 0 && pp2->tid != 0);
|
||||
for (int32_t i = 0; i < joinNum; i++) {
|
||||
SJoinSupporter* p = pParentSql->pSubs[i]->param;
|
||||
|
||||
int32_t ret = doCompare(pp1->tag, pp2->tag, pColSchema->type, pColSchema->bytes);
|
||||
if (ret == 0) {
|
||||
tscDebug("%p tag matched, vgId:%d, val:%d, tid:%d, uid:%"PRIu64", tid:%d, uid:%"PRIu64, pParentSql, pp1->vgId,
|
||||
*(int*) pp1->tag, pp1->tid, pp1->uid, pp2->tid, pp2->uid);
|
||||
|
||||
taosArrayPush(*s1, pp1);
|
||||
taosArrayPush(*s2, pp2);
|
||||
j++;
|
||||
i++;
|
||||
} else if (ret > 0) {
|
||||
j++;
|
||||
} else {
|
||||
i++;
|
||||
ctxlist[i].p = p;
|
||||
ctxlist[i].res = taosArrayInit(p->num, size);
|
||||
|
||||
tscDebug("Join %d - num:%d", i, p->num);
|
||||
|
||||
// sort according to the tag valu
|
||||
qsort(p->pIdTagList, p->num, p->tagSize, tagValCompar);
|
||||
|
||||
if (!checkForDuplicateTagVal(pColSchema, p, pParentSql)) {
|
||||
for (int32_t j = 0; j <= i; j++) {
|
||||
taosArrayDestroy(ctxlist[j].res);
|
||||
}
|
||||
return TSDB_CODE_QRY_DUP_JOIN_KEY;
|
||||
}
|
||||
}
|
||||
|
||||
// reorganize the tid-tag value according to both the vgroup id and tag values
|
||||
// sort according to the tag value
|
||||
size_t t1 = taosArrayGetSize(*s1);
|
||||
size_t t2 = taosArrayGetSize(*s2);
|
||||
int32_t slot = 0;
|
||||
size_t tableNum = 0;
|
||||
int16_t* tableMIdx = 0;
|
||||
int32_t equalNum = 0;
|
||||
int32_t stackidx = 0;
|
||||
int32_t mergeDone = 0;
|
||||
SMergeCtx* ctx = NULL;
|
||||
SMergeCtx* pctx = NULL;
|
||||
STidTags* cur = NULL;
|
||||
STidTags* prev = NULL;
|
||||
SArray* tagCond = NULL;
|
||||
|
||||
qsort((*s1)->pData, t1, size, tidTagsCompar);
|
||||
qsort((*s2)->pData, t2, size, tidTagsCompar);
|
||||
for (int16_t tidx = 0; tidx < joinNum; tidx++) {
|
||||
pctx = &ctxlist[tidx];
|
||||
if (pctx->compared) {
|
||||
continue;
|
||||
}
|
||||
|
||||
#if 0
|
||||
for(int32_t k = 0; k < t1; ++k) {
|
||||
STidTags* p = (*s1)->pData + size * k;
|
||||
printf("%d, tag:%s\n", p->vgId, ((tstr*)(p->tag))->data);
|
||||
assert(pctx->idx == 0 && taosArrayGetSize(pctx->res) == 0);
|
||||
|
||||
tagCond = pQueryInfo->tagCond.joinInfo.joinTables[tidx]->tagJoin;
|
||||
|
||||
tableNum = taosArrayGetSize(tagCond);
|
||||
assert(tableNum >= 2);
|
||||
|
||||
for (int32_t i = 0; i < tableNum; ++i) {
|
||||
tableMIdx = taosArrayGet(tagCond, i);
|
||||
SMergeCtx* tctx = &ctxlist[*tableMIdx];
|
||||
tctx->compared = 1;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < tableNum; ++i) {
|
||||
tableMIdx = taosArrayGet(tagCond, i);
|
||||
SMergeCtx* tctx = &ctxlist[*tableMIdx];
|
||||
if (tctx->p->num <= 0 || tctx->p->pIdTagList == NULL) {
|
||||
mergeDone = 1;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (mergeDone) {
|
||||
mergeDone = 0;
|
||||
continue;
|
||||
}
|
||||
|
||||
tableMIdx = taosArrayGet(tagCond, slot);
|
||||
|
||||
pctx = &ctxlist[*tableMIdx];
|
||||
|
||||
prev = (STidTags*) varDataVal(pctx->p->pIdTagList + pctx->idx * pctx->p->tagSize);
|
||||
|
||||
ctxStack[stackidx++] = pctx;
|
||||
|
||||
tableMIdx = taosArrayGet(tagCond, ++slot);
|
||||
|
||||
equalNum = 1;
|
||||
|
||||
while (1) {
|
||||
ctx = &ctxlist[*tableMIdx];
|
||||
|
||||
cur = (STidTags*) varDataVal(ctx->p->pIdTagList + ctx->idx * ctx->p->tagSize);
|
||||
|
||||
assert(cur->tid != 0 && prev->tid != 0);
|
||||
|
||||
ctxStack[stackidx++] = ctx;
|
||||
|
||||
int32_t ret = doCompare(prev->tag, cur->tag, pColSchema->type, pColSchema->bytes);
|
||||
if (ret == 0) {
|
||||
if (++equalNum < tableNum) {
|
||||
prev = cur;
|
||||
pctx = ctx;
|
||||
|
||||
if (++slot >= tableNum) {
|
||||
slot = 0;
|
||||
}
|
||||
|
||||
tableMIdx = taosArrayGet(tagCond, slot);
|
||||
continue;
|
||||
}
|
||||
|
||||
tscDebug("%p tag matched, vgId:%d, val:%d, tid:%d, uid:%"PRIu64", tid:%d, uid:%"PRIu64, pParentSql, prev->vgId,
|
||||
*(int*) prev->tag, prev->tid, prev->uid, cur->tid, cur->uid);
|
||||
|
||||
assert(stackidx == tableNum);
|
||||
|
||||
for (int32_t i = 0; i < stackidx; ++i) {
|
||||
SMergeCtx* tctx = ctxStack[i];
|
||||
prev = (STidTags*) varDataVal(tctx->p->pIdTagList + tctx->idx * tctx->p->tagSize);
|
||||
|
||||
taosArrayPush(tctx->res, prev);
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < stackidx; ++i) {
|
||||
SMergeCtx* tctx = ctxStack[i];
|
||||
|
||||
if (++tctx->idx >= tctx->p->num) {
|
||||
mergeDone = 1;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (mergeDone) {
|
||||
break;
|
||||
}
|
||||
|
||||
stackidx = 0;
|
||||
equalNum = 1;
|
||||
|
||||
prev = (STidTags*) varDataVal(pctx->p->pIdTagList + pctx->idx * pctx->p->tagSize);
|
||||
|
||||
ctxStack[stackidx++] = pctx;
|
||||
} else if (ret > 0) {
|
||||
stackidx--;
|
||||
|
||||
if (++ctx->idx >= ctx->p->num) {
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
stackidx--;
|
||||
|
||||
for (int32_t i = 0; i < stackidx; ++i) {
|
||||
SMergeCtx* tctx = ctxStack[i];
|
||||
if (++tctx->idx >= tctx->p->num) {
|
||||
mergeDone = 1;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (mergeDone) {
|
||||
break;
|
||||
}
|
||||
|
||||
stackidx = 0;
|
||||
equalNum = 1;
|
||||
|
||||
prev = (STidTags*) varDataVal(pctx->p->pIdTagList + pctx->idx * pctx->p->tagSize);
|
||||
ctxStack[stackidx++] = pctx;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
slot = 0;
|
||||
mergeDone = 0;
|
||||
stackidx = 0;
|
||||
}
|
||||
|
||||
for(int32_t k = 0; k < t1; ++k) {
|
||||
STidTags* p = (*s2)->pData + size * k;
|
||||
printf("%d, tag:%s\n", p->vgId, ((tstr*)(p->tag))->data);
|
||||
}
|
||||
#endif
|
||||
for (int32_t i = 0; i < joinNum; ++i) {
|
||||
// reorganize the tid-tag value according to both the vgroup id and tag values
|
||||
// sort according to the tag value
|
||||
size_t num = taosArrayGetSize(ctxlist[i].res);
|
||||
|
||||
qsort((ctxlist[i].res)->pData, num, size, tidTagsCompar);
|
||||
|
||||
tscDebug("%p tags match complete, result: %"PRIzu", %"PRIzu, pParentSql, t1, t2);
|
||||
taosArrayPush(resList, &ctxlist[i].res);
|
||||
|
||||
tscDebug("%p tags match complete, result num: %"PRIzu, pParentSql, num);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
bool emptyTagList(SArray* resList, int32_t size) {
|
||||
size_t rsize = taosArrayGetSize(resList);
|
||||
if (rsize != size) {
|
||||
return true;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < size; ++i) {
|
||||
SArray** s = taosArrayGet(resList, i);
|
||||
if (taosArrayGetSize(*s) <= 0) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRows) {
|
||||
SJoinSupporter* pSupporter = (SJoinSupporter*)param;
|
||||
|
||||
|
@ -939,19 +1202,19 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
|
|||
return;
|
||||
}
|
||||
|
||||
SArray *s1 = NULL, *s2 = NULL;
|
||||
int32_t code = getIntersectionOfTableTuple(pQueryInfo, pParentSql, &s1, &s2);
|
||||
SArray* resList = taosArrayInit(pParentSql->subState.numOfSub, sizeof(SArray *));
|
||||
|
||||
int32_t code = getIntersectionOfTableTuple(pQueryInfo, pParentSql, resList);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
freeJoinSubqueryObj(pParentSql);
|
||||
pParentSql->res.code = code;
|
||||
tscAsyncResultOnError(pParentSql);
|
||||
|
||||
taosArrayDestroy(s1);
|
||||
taosArrayDestroy(s2);
|
||||
taosArrayDestroy(resList);
|
||||
return;
|
||||
}
|
||||
|
||||
if (taosArrayGetSize(s1) == 0 || taosArrayGetSize(s2) == 0) { // no results,return.
|
||||
if (emptyTagList(resList, pParentSql->subState.numOfSub)) { // no results,return.
|
||||
assert(pParentSql->fp != tscJoinQueryCallback);
|
||||
|
||||
tscDebug("%p tag intersect does not generated qualified tables for join, free all sub SqlObj and quit", pParentSql);
|
||||
|
@ -963,37 +1226,34 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
|
|||
|
||||
(*pParentSql->fp)(pParentSql->param, pParentSql, 0);
|
||||
} else {
|
||||
// proceed to for ts_comp query
|
||||
SSqlCmd* pSubCmd1 = &pParentSql->pSubs[0]->cmd;
|
||||
SSqlCmd* pSubCmd2 = &pParentSql->pSubs[1]->cmd;
|
||||
|
||||
SQueryInfo* pQueryInfo1 = tscGetQueryInfoDetail(pSubCmd1, 0);
|
||||
STableMetaInfo* pTableMetaInfo1 = tscGetMetaInfo(pQueryInfo1, 0);
|
||||
tscBuildVgroupTableInfo(pParentSql, pTableMetaInfo1, s1);
|
||||
|
||||
SQueryInfo* pQueryInfo2 = tscGetQueryInfoDetail(pSubCmd2, 0);
|
||||
STableMetaInfo* pTableMetaInfo2 = tscGetMetaInfo(pQueryInfo2, 0);
|
||||
tscBuildVgroupTableInfo(pParentSql, pTableMetaInfo2, s2);
|
||||
|
||||
SSqlObj* psub1 = pParentSql->pSubs[0];
|
||||
((SJoinSupporter*)psub1->param)->pVgroupTables = tscVgroupTableInfoDup(pTableMetaInfo1->pVgroupTables);
|
||||
|
||||
SSqlObj* psub2 = pParentSql->pSubs[1];
|
||||
((SJoinSupporter*)psub2->param)->pVgroupTables = tscVgroupTableInfoDup(pTableMetaInfo2->pVgroupTables);
|
||||
|
||||
pParentSql->subState.numOfSub = 2;
|
||||
|
||||
memset(pParentSql->subState.states, 0, sizeof(pParentSql->subState.states[0]) * pParentSql->subState.numOfSub);
|
||||
tscDebug("%p reset all sub states to 0", pParentSql);
|
||||
|
||||
for (int32_t m = 0; m < pParentSql->subState.numOfSub; ++m) {
|
||||
SSqlObj* sub = pParentSql->pSubs[m];
|
||||
issueTsCompQuery(sub, sub->param, pParentSql);
|
||||
// proceed to for ts_comp query
|
||||
SSqlCmd* pSubCmd = &pParentSql->pSubs[m]->cmd;
|
||||
SArray** s = taosArrayGet(resList, m);
|
||||
|
||||
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pSubCmd, 0);
|
||||
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
|
||||
tscBuildVgroupTableInfo(pParentSql, pTableMetaInfo, *s);
|
||||
|
||||
SSqlObj* psub = pParentSql->pSubs[m];
|
||||
((SJoinSupporter*)psub->param)->pVgroupTables = tscVgroupTableInfoDup(pTableMetaInfo->pVgroupTables);
|
||||
|
||||
memset(pParentSql->subState.states, 0, sizeof(pParentSql->subState.states[0]) * pParentSql->subState.numOfSub);
|
||||
tscDebug("%p reset all sub states to 0", pParentSql);
|
||||
|
||||
issueTsCompQuery(psub, psub->param, pParentSql);
|
||||
}
|
||||
}
|
||||
|
||||
taosArrayDestroy(s1);
|
||||
taosArrayDestroy(s2);
|
||||
size_t rsize = taosArrayGetSize(resList);
|
||||
for (int32_t i = 0; i < rsize; ++i) {
|
||||
SArray** s = taosArrayGet(resList, i);
|
||||
if (*s) {
|
||||
taosArrayDestroy(*s);
|
||||
}
|
||||
}
|
||||
|
||||
taosArrayDestroy(resList);
|
||||
}
|
||||
|
||||
static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRows) {
|
||||
|
@ -1124,12 +1384,8 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
|
|||
|
||||
tscDebug("%p all subquery retrieve ts complete, do ts block intersect", pParentSql);
|
||||
|
||||
// proceeds to launched secondary query to retrieve final data
|
||||
SJoinSupporter* p1 = pParentSql->pSubs[0]->param;
|
||||
SJoinSupporter* p2 = pParentSql->pSubs[1]->param;
|
||||
|
||||
STimeWindow win = TSWINDOW_INITIALIZER;
|
||||
int64_t num = doTSBlockIntersect(pParentSql, p1, p2, &win);
|
||||
int64_t num = doTSBlockIntersect(pParentSql, &win);
|
||||
if (num <= 0) { // no result during ts intersect
|
||||
tscDebug("%p no results generated in ts intersection, free all sub SqlObj and quit", pParentSql);
|
||||
freeJoinSubqueryObj(pParentSql);
|
||||
|
@ -1639,6 +1895,8 @@ int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter
|
|||
pNewQueryInfo->limit.limit = -1;
|
||||
pNewQueryInfo->limit.offset = 0;
|
||||
|
||||
pNewQueryInfo->order.orderColId = INT32_MIN;
|
||||
|
||||
// backup the data and clear it in the sqlcmd object
|
||||
memset(&pNewQueryInfo->groupbyExpr, 0, sizeof(SSqlGroupbyExpr));
|
||||
|
||||
|
|
|
@ -1279,6 +1279,34 @@ int32_t tscSqlExprCopy(SArray* dst, const SArray* src, uint64_t uid, bool deepco
|
|||
return 0;
|
||||
}
|
||||
|
||||
bool tscColumnExists(SArray* pColumnList, SColumnIndex* pColIndex) {
|
||||
// ignore the tbname columnIndex to be inserted into source list
|
||||
if (pColIndex->columnIndex < 0) {
|
||||
return false;
|
||||
}
|
||||
|
||||
size_t numOfCols = taosArrayGetSize(pColumnList);
|
||||
int16_t col = pColIndex->columnIndex;
|
||||
|
||||
int32_t i = 0;
|
||||
while (i < numOfCols) {
|
||||
SColumn* pCol = taosArrayGetP(pColumnList, i);
|
||||
if ((pCol->colIndex.columnIndex != col) || (pCol->colIndex.tableIndex != pColIndex->tableIndex)) {
|
||||
++i;
|
||||
continue;
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (i >= numOfCols || numOfCols == 0) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
SColumn* tscColumnListInsert(SArray* pColumnList, SColumnIndex* pColIndex) {
|
||||
// ignore the tbname columnIndex to be inserted into source list
|
||||
if (pColIndex->columnIndex < 0) {
|
||||
|
@ -1583,7 +1611,25 @@ int32_t tscTagCondCopy(STagCond* dest, const STagCond* src) {
|
|||
dest->tbnameCond.uid = src->tbnameCond.uid;
|
||||
dest->tbnameCond.len = src->tbnameCond.len;
|
||||
|
||||
memcpy(&dest->joinInfo, &src->joinInfo, sizeof(SJoinInfo));
|
||||
dest->joinInfo.hasJoin = src->joinInfo.hasJoin;
|
||||
|
||||
for (int32_t i = 0; i < TSDB_MAX_JOIN_TABLE_NUM; ++i) {
|
||||
if (src->joinInfo.joinTables[i]) {
|
||||
dest->joinInfo.joinTables[i] = calloc(1, sizeof(SJoinNode));
|
||||
|
||||
memcpy(dest->joinInfo.joinTables[i], src->joinInfo.joinTables[i], sizeof(SJoinNode));
|
||||
|
||||
if (src->joinInfo.joinTables[i]->tsJoin) {
|
||||
dest->joinInfo.joinTables[i]->tsJoin = taosArrayDup(src->joinInfo.joinTables[i]->tsJoin);
|
||||
}
|
||||
|
||||
if (src->joinInfo.joinTables[i]->tagJoin) {
|
||||
dest->joinInfo.joinTables[i]->tagJoin = taosArrayDup(src->joinInfo.joinTables[i]->tagJoin);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
dest->relType = src->relType;
|
||||
|
||||
if (src->pCond == NULL) {
|
||||
|
@ -1629,6 +1675,23 @@ void tscTagCondRelease(STagCond* pTagCond) {
|
|||
taosArrayDestroy(pTagCond->pCond);
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < TSDB_MAX_JOIN_TABLE_NUM; ++i) {
|
||||
SJoinNode *node = pTagCond->joinInfo.joinTables[i];
|
||||
if (node == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (node->tsJoin != NULL) {
|
||||
taosArrayDestroy(node->tsJoin);
|
||||
}
|
||||
|
||||
if (node->tagJoin != NULL) {
|
||||
taosArrayDestroy(node->tagJoin);
|
||||
}
|
||||
|
||||
tfree(node);
|
||||
}
|
||||
|
||||
memset(pTagCond, 0, sizeof(STagCond));
|
||||
}
|
||||
|
||||
|
@ -2318,16 +2381,21 @@ void tscDoQuery(SSqlObj* pSql) {
|
|||
}
|
||||
|
||||
int16_t tscGetJoinTagColIdByUid(STagCond* pTagCond, uint64_t uid) {
|
||||
if (pTagCond->joinInfo.left.uid == uid) {
|
||||
return pTagCond->joinInfo.left.tagColId;
|
||||
} else if (pTagCond->joinInfo.right.uid == uid) {
|
||||
return pTagCond->joinInfo.right.tagColId;
|
||||
} else {
|
||||
assert(0);
|
||||
return -1;
|
||||
int32_t i = 0;
|
||||
while (i < TSDB_MAX_JOIN_TABLE_NUM) {
|
||||
SJoinNode* node = pTagCond->joinInfo.joinTables[i];
|
||||
if (node && node->uid == uid) {
|
||||
return node->tagColId;
|
||||
}
|
||||
|
||||
i++;
|
||||
}
|
||||
|
||||
assert(0);
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
||||
int16_t tscGetTagColIndexById(STableMeta* pTableMeta, int16_t colId) {
|
||||
int32_t numOfTags = tscGetNumOfTags(pTableMeta);
|
||||
|
||||
|
|
|
@ -317,7 +317,7 @@ do { \
|
|||
#define TSDB_MAX_DB_QUORUM_OPTION 2
|
||||
#define TSDB_DEFAULT_DB_QUORUM_OPTION 1
|
||||
|
||||
#define TSDB_MAX_JOIN_TABLE_NUM 5
|
||||
#define TSDB_MAX_JOIN_TABLE_NUM 10
|
||||
#define TSDB_MAX_UNION_CLAUSE 5
|
||||
|
||||
#define TSDB_MAX_BINARY_LEN (TSDB_MAX_BYTES_PER_ROW-TSDB_KEYSIZE)
|
||||
|
|
File diff suppressed because it is too large
Load Diff
Loading…
Reference in New Issue