[td-1844]
This commit is contained in:
parent
ea0fd672d0
commit
1620cc5240
|
@ -75,6 +75,7 @@ typedef struct SJoinSupporter {
|
|||
SArray* exprList;
|
||||
SFieldInfo fieldsInfo;
|
||||
STagCond tagCond;
|
||||
SSqlGroupbyExpr groupInfo; // group by info
|
||||
struct STSBuf* pTSBuf; // the TSBuf struct that holds the compressed timestamp array
|
||||
FILE* f; // temporary file in order to create TSBuf
|
||||
char path[PATH_MAX]; // temporary file path, todo dynamic allocate memory
|
||||
|
@ -265,6 +266,7 @@ void addGroupInfoForSubquery(SSqlObj* pParentObj, SSqlObj* pSql, int32_t sub
|
|||
void doAddGroupColumnForSubquery(SQueryInfo* pQueryInfo, int32_t tagIndex);
|
||||
|
||||
int16_t tscGetJoinTagColIdByUid(STagCond* pTagCond, uint64_t uid);
|
||||
int16_t tscGetTagColIndexById(STableMeta* pTableMeta, int16_t colId);
|
||||
|
||||
void tscPrintSelectClause(SSqlObj* pSql, int32_t subClauseIndex);
|
||||
|
||||
|
|
|
@ -77,7 +77,7 @@ SSchema *tscGetTableColumnSchema(const STableMeta *pMeta, int32_t colIndex);
|
|||
* @param colId
|
||||
* @return
|
||||
*/
|
||||
SSchema* tscGetTableColumnSchemaById(STableMeta* pTableMeta, int16_t colId);
|
||||
SSchema* tscGetColumnSchemaById(STableMeta* pTableMeta, int16_t colId);
|
||||
|
||||
/**
|
||||
* check if the schema is valid or not, including following aspects:
|
||||
|
|
|
@ -5286,14 +5286,17 @@ void addGroupInfoForSubquery(SSqlObj* pParentObj, SSqlObj* pSql, int32_t subClau
|
|||
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, (int32_t)size - 1);
|
||||
|
||||
if (pExpr->functionId != TSDB_FUNC_TAG) {
|
||||
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, tableIndex);
|
||||
int16_t columnInfo = tscGetJoinTagColIdByUid(&pQueryInfo->tagCond, pTableMetaInfo->pTableMeta->id.uid);
|
||||
SColumnIndex index = {.tableIndex = 0, .columnIndex = columnInfo};
|
||||
SSchema* pSchema = tscGetTableTagSchema(pTableMetaInfo->pTableMeta);
|
||||
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pParentQueryInfo, tableIndex);
|
||||
|
||||
int16_t type = pSchema[index.columnIndex].type;
|
||||
int16_t bytes = pSchema[index.columnIndex].bytes;
|
||||
char* name = pSchema[index.columnIndex].name;
|
||||
int16_t colId = tscGetJoinTagColIdByUid(&pQueryInfo->tagCond, pTableMetaInfo->pTableMeta->id.uid);
|
||||
|
||||
SSchema* pTagSchema = tscGetColumnSchemaById(pTableMetaInfo->pTableMeta, colId);
|
||||
int16_t colIndex = tscGetTagColIndexById(pTableMetaInfo->pTableMeta, colId);
|
||||
SColumnIndex index = {.tableIndex = 0, .columnIndex = colIndex};
|
||||
|
||||
char* name = pTagSchema->name;
|
||||
int16_t type = pTagSchema->type;
|
||||
int16_t bytes = pTagSchema->bytes;
|
||||
|
||||
pExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TAG, &index, type, bytes, bytes, true);
|
||||
pExpr->colInfo.flag = TSDB_COL_TAG;
|
||||
|
|
|
@ -118,7 +118,7 @@ SSchema* tscGetTableColumnSchema(const STableMeta* pTableMeta, int32_t colIndex)
|
|||
}
|
||||
|
||||
// TODO for large number of columns, employ the binary search method
|
||||
SSchema* tscGetTableColumnSchemaById(STableMeta* pTableMeta, int16_t colId) {
|
||||
SSchema* tscGetColumnSchemaById(STableMeta* pTableMeta, int16_t colId) {
|
||||
STableComInfo tinfo = tscGetTableInfo(pTableMeta);
|
||||
|
||||
for(int32_t i = 0; i < tinfo.numOfColumns + tinfo.numOfTags; ++i) {
|
||||
|
|
|
@ -320,10 +320,7 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) {
|
|||
pQueryInfo->colList = pSupporter->colList;
|
||||
pQueryInfo->exprList = pSupporter->exprList;
|
||||
pQueryInfo->fieldsInfo = pSupporter->fieldsInfo;
|
||||
|
||||
pSupporter->exprList = NULL;
|
||||
pSupporter->colList = NULL;
|
||||
memset(&pSupporter->fieldsInfo, 0, sizeof(SFieldInfo));
|
||||
pQueryInfo->groupbyExpr = pSupporter->groupInfo;
|
||||
|
||||
SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
|
||||
assert(pNew->subState.numOfSub == 0 && pNew->cmd.numOfClause == 1 && pNewQueryInfo->numOfTables == 1);
|
||||
|
@ -332,7 +329,12 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) {
|
|||
|
||||
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pNewQueryInfo, 0);
|
||||
pTableMetaInfo->pVgroupTables = pSupporter->pVgroupTables;
|
||||
|
||||
pSupporter->exprList = NULL;
|
||||
pSupporter->colList = NULL;
|
||||
pSupporter->pVgroupTables = NULL;
|
||||
memset(&pSupporter->fieldsInfo, 0, sizeof(SFieldInfo));
|
||||
memset(&pSupporter->groupInfo, 0, sizeof(SSqlGroupbyExpr));
|
||||
|
||||
/*
|
||||
* When handling the projection query, the offset value will be modified for table-table join, which is changed
|
||||
|
@ -612,7 +614,7 @@ static int32_t getIntersectionOfTableTuple(SQueryInfo* pQueryInfo, SSqlObj* pPar
|
|||
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
|
||||
int16_t tagColId = tscGetJoinTagColIdByUid(&pQueryInfo->tagCond, pTableMetaInfo->pTableMeta->id.uid);
|
||||
|
||||
SSchema* pColSchema = tscGetTableColumnSchemaById(pTableMetaInfo->pTableMeta, tagColId);
|
||||
SSchema* pColSchema = tscGetColumnSchemaById(pTableMetaInfo->pTableMeta, tagColId);
|
||||
|
||||
// int16_t for padding
|
||||
int32_t size = p1->tagSize - sizeof(int16_t);
|
||||
|
@ -1341,6 +1343,9 @@ int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter
|
|||
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
pSupporter->groupInfo = pNewQueryInfo->groupbyExpr;
|
||||
memset(&pNewQueryInfo->groupbyExpr, 0, sizeof(SSqlGroupbyExpr));
|
||||
|
||||
pNew->cmd.numOfCols = 0;
|
||||
pNewQueryInfo->interval.interval = 0;
|
||||
pSupporter->limit = pNewQueryInfo->limit;
|
||||
|
@ -1361,17 +1366,9 @@ int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter
|
|||
assert(pTagCond->joinInfo.hasJoin);
|
||||
|
||||
int32_t tagColId = tscGetJoinTagColIdByUid(pTagCond, pTableMetaInfo->pTableMeta->id.uid);
|
||||
SSchema* s = tscGetTableColumnSchemaById(pTableMetaInfo->pTableMeta, tagColId);
|
||||
SSchema* s = tscGetColumnSchemaById(pTableMetaInfo->pTableMeta, tagColId);
|
||||
|
||||
// get the tag colId column index
|
||||
int32_t numOfTags = tscGetNumOfTags(pTableMetaInfo->pTableMeta);
|
||||
SSchema* pSchema = tscGetTableTagSchema(pTableMetaInfo->pTableMeta);
|
||||
for(int32_t i = 0; i < numOfTags; ++i) {
|
||||
if (pSchema[i].colId == tagColId) {
|
||||
colIndex.columnIndex = i;
|
||||
break;
|
||||
}
|
||||
}
|
||||
colIndex.columnIndex = tscGetTagColIndexById(pTableMetaInfo->pTableMeta, tagColId);
|
||||
|
||||
int16_t bytes = 0;
|
||||
int16_t type = 0;
|
||||
|
@ -2193,7 +2190,8 @@ static void doBuildResFromSubqueries(SSqlObj* pSql) {
|
|||
numOfRes = (int32_t)(MIN(numOfRes, remain));
|
||||
}
|
||||
|
||||
if (numOfRes == 0) {
|
||||
if (numOfRes == 0) { // no result any more, free all subquery objects
|
||||
freeJoinSubqueryObj(pSql);
|
||||
return;
|
||||
}
|
||||
|
||||
|
|
|
@ -1665,6 +1665,7 @@ static void freeQueryInfoImpl(SQueryInfo* pQueryInfo) {
|
|||
if (pQueryInfo->groupbyExpr.columnInfo != NULL) {
|
||||
taosArrayDestroy(pQueryInfo->groupbyExpr.columnInfo);
|
||||
pQueryInfo->groupbyExpr.columnInfo = NULL;
|
||||
pQueryInfo->groupbyExpr.numOfGroupCols = 0;
|
||||
}
|
||||
|
||||
pQueryInfo->tsBuf = tsBufDestroy(pQueryInfo->tsBuf);
|
||||
|
@ -2156,6 +2157,19 @@ int16_t tscGetJoinTagColIdByUid(STagCond* pTagCond, uint64_t uid) {
|
|||
}
|
||||
}
|
||||
|
||||
int16_t tscGetTagColIndexById(STableMeta* pTableMeta, int16_t colId) {
|
||||
int32_t numOfTags = tscGetNumOfColumns(pTableMeta);
|
||||
|
||||
SSchema* pSchema = tscGetTableTagSchema(pTableMeta);
|
||||
for(int32_t i = 0; i < numOfTags; ++i) {
|
||||
if (pSchema[i].colId == colId) {
|
||||
return i;
|
||||
}
|
||||
}
|
||||
|
||||
return -1;
|
||||
}
|
||||
|
||||
bool tscIsUpdateQuery(SSqlObj* pSql) {
|
||||
if (pSql == NULL || pSql->signature != pSql) {
|
||||
terrno = TSDB_CODE_TSC_DISCONNECTED;
|
||||
|
|
|
@ -132,4 +132,9 @@ sql select join_mt0.ts, join_mt1.t1, join_mt0.t1, join_mt1.tbname, join_mt0.tbna
|
|||
|
||||
sql select join_mt0.ts, join_mt1.t1, join_mt0.t1, join_mt1.tbname, join_mt0.tbname from join_mt0, join_mt1 where join_mt0.ts=join_mt1.ts and join_mt0.t1=join_mt1.t1 limit 1
|
||||
|
||||
sql select count(join_mt0.c1), sum(join_mt1.c2), first(join_mt0.c5), last(join_mt1.c7) from join_mt0, join_mt1 where join_mt0.t1=join_mt1.t1 and join_mt0.ts=join_mt1.ts interval(10a) group by join_mt0.t1 order by join_mt0.ts desc;
|
||||
if $rows != 100 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
Loading…
Reference in New Issue