[tbase-1282]

This commit is contained in:
hjxilinx 2019-12-07 15:24:52 +08:00
parent 1a5ae0d1c3
commit 4246517c9f
6 changed files with 329 additions and 182 deletions

View File

@ -52,7 +52,6 @@ typedef struct SParsedDataColInfo {
typedef struct SJoinSubquerySupporter { typedef struct SJoinSubquerySupporter {
SSubqueryState* pState; SSubqueryState* pState;
SSqlObj* pObj; // parent SqlObj SSqlObj* pObj; // parent SqlObj
bool hasMore; // has data from vnode to fetch
int32_t subqueryIndex; // index of sub query int32_t subqueryIndex; // index of sub query
int64_t interval; // interval time int64_t interval; // interval time
SLimitVal limit; // limit info SLimitVal limit; // limit info
@ -166,7 +165,6 @@ void tsSetMetricQueryCond(STagCond* pTagCond, uint64_t uid, const char* str);
void tscTagCondCopy(STagCond* dest, const STagCond* src); void tscTagCondCopy(STagCond* dest, const STagCond* src);
void tscTagCondRelease(STagCond* pCond); void tscTagCondRelease(STagCond* pCond);
void tscTagCondSetQueryCondType(STagCond* pCond, int16_t type);
void tscGetSrcColumnInfo(SSrcColumnInfo* pColInfo, SSqlCmd* pCmd); void tscGetSrcColumnInfo(SSrcColumnInfo* pColInfo, SSqlCmd* pCmd);
@ -222,6 +220,9 @@ TAOS* taos_connect_a(char* ip, char* user, char* pass, char* db, uint16_t port,
void* param, void** taos); void* param, void** taos);
void sortRemoveDuplicates(STableDataBlocks* dataBuf); void sortRemoveDuplicates(STableDataBlocks* dataBuf);
void tscPrintSelectClause(SSqlCmd* pCmd);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -164,8 +164,6 @@ SJoinSubquerySupporter* tscCreateJoinSupporter(SSqlObj* pSql, SSubqueryState* pS
} }
pSupporter->pObj = pSql; pSupporter->pObj = pSql;
pSupporter->hasMore = true;
pSupporter->pState = pState; pSupporter->pState = pState;
pSupporter->subqueryIndex = index; pSupporter->subqueryIndex = index;
@ -226,12 +224,6 @@ bool needSecondaryQuery(SSqlObj* pSql) {
* launch secondary stage query to fetch the result that contains timestamp in set * launch secondary stage query to fetch the result that contains timestamp in set
*/ */
int32_t tscLaunchSecondSubquery(SSqlObj* pSql) { int32_t tscLaunchSecondSubquery(SSqlObj* pSql) {
// TODO not launch secondary stage query
// if (!needSecondaryQuery(pSql)) {
// return;
// }
// sub query may not be necessary
int32_t numOfSub = 0; int32_t numOfSub = 0;
SJoinSubquerySupporter* pSupporter = NULL; SJoinSubquerySupporter* pSupporter = NULL;
@ -286,7 +278,6 @@ int32_t tscLaunchSecondSubquery(SSqlObj* pSql) {
pNew->cmd.type |= TSDB_QUERY_TYPE_JOIN_SEC_STAGE; pNew->cmd.type |= TSDB_QUERY_TYPE_JOIN_SEC_STAGE;
pNew->cmd.nAggTimeInterval = pSupporter->interval; pNew->cmd.nAggTimeInterval = pSupporter->interval;
pNew->cmd.limit = pSupporter->limit;
pNew->cmd.groupbyExpr = pSupporter->groupbyExpr; pNew->cmd.groupbyExpr = pSupporter->groupbyExpr;
tscColumnBaseInfoCopy(&pNew->cmd.colList, &pSupporter->colList, 0); tscColumnBaseInfoCopy(&pNew->cmd.colList, &pSupporter->colList, 0);
@ -305,7 +296,14 @@ int32_t tscLaunchSecondSubquery(SSqlObj* pSql) {
tscFieldInfoCalOffset(&pNew->cmd); tscFieldInfoCalOffset(&pNew->cmd);
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(&pNew->cmd, 0); SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(&pNew->cmd, 0);
/*
* When handling the projection query, the offset value will be modified for table-table join, which is changed
* during the timestamp intersection.
*/
pSupporter->limit = pSql->cmd.limit;
pNew->cmd.limit = pSupporter->limit;
// fetch the join tag column // fetch the join tag column
if (UTIL_METER_IS_METRIC(pMeterMetaInfo)) { if (UTIL_METER_IS_METRIC(pMeterMetaInfo)) {
SSqlExpr* pExpr = tscSqlExprGet(&pNew->cmd, 0); SSqlExpr* pExpr = tscSqlExprGet(&pNew->cmd, 0);
@ -314,10 +312,12 @@ int32_t tscLaunchSecondSubquery(SSqlObj* pSql) {
int16_t tagColIndex = tscGetJoinTagColIndexByUid(&pNew->cmd.tagCond, pMeterMetaInfo->pMeterMeta->uid); int16_t tagColIndex = tscGetJoinTagColIndexByUid(&pNew->cmd.tagCond, pMeterMetaInfo->pMeterMeta->uid);
pExpr->param[0].i64Key = tagColIndex; pExpr->param[0].i64Key = tagColIndex;
pExpr->numOfParams = 1; pExpr->numOfParams = 1;
addRequiredTagColumn(&pNew->cmd, tagColIndex, 0);
} }
#ifdef _DEBUG_VIEW
tscPrintSelectClause(&pNew->cmd);
#endif
tscProcessSql(pNew); tscProcessSql(pNew);
} }
@ -471,9 +471,31 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
pSupporter->pState->code = numOfRows; pSupporter->pState->code = numOfRows;
tscError("%p retrieve failed, code:%d, index:%d", pSql, numOfRows, pSupporter->subqueryIndex); tscError("%p retrieve failed, code:%d, index:%d", pSql, numOfRows, pSupporter->subqueryIndex);
} }
if (tscProjectionQueryOnMetric(&pSql->cmd) && numOfRows == 0) {
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0);
assert(pSql->cmd.numOfTables == 1);
// for projection query, need to try next vnode if current vnode is exhausted
if ((++pMeterMetaInfo->vnodeIndex) < pMeterMetaInfo->pMetricMeta->numOfVnodes) {
pSupporter->pState->numOfCompleted = 0;
pSupporter->pState->numOfTotal = 1;
pSql->cmd.command = TSDB_SQL_SELECT;
pSql->fp = tscJoinQueryCallback;
tscProcessSql(pSql);
return;
}
}
if (atomic_add_fetch_32(&pSupporter->pState->numOfCompleted, 1) >= pSupporter->pState->numOfTotal) { if (atomic_add_fetch_32(&pSupporter->pState->numOfCompleted, 1) >= pSupporter->pState->numOfTotal) {
tscTrace("%p secondary retrieve completed, global code:%d", tres, pParentSql->res.code); assert(pSupporter->pState->numOfCompleted == pSupporter->pState->numOfTotal);
tscTrace("%p all %d secondary retrieves are completed, global code:%d", tres, pSupporter->pState->numOfTotal,
pParentSql->res.code);
if (pSupporter->pState->code != TSDB_CODE_SUCCESS) { if (pSupporter->pState->code != TSDB_CODE_SUCCESS) {
pParentSql->res.code = abs(pSupporter->pState->code); pParentSql->res.code = abs(pSupporter->pState->code);
freeSubqueryObj(pParentSql); freeSubqueryObj(pParentSql);
@ -490,11 +512,17 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql) {
assert(pSql->numOfSubs >= 1); assert(pSql->numOfSubs >= 1);
for (int32_t i = 0; i < pSql->numOfSubs; ++i) { for (int32_t i = 0; i < pSql->numOfSubs; ++i) {
SJoinSubquerySupporter* pSupporter = (SJoinSubquerySupporter*)pSql->pSubs[i]->param;
SSqlRes* pRes = &pSql->pSubs[i]->res; SSqlRes* pRes = &pSql->pSubs[i]->res;
if (pRes->row >= pRes->numOfRows && pSupporter->hasMore) { SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->pSubs[i]->cmd, 0);
numOfFetch++;
if (UTIL_METER_IS_METRIC(pMeterMetaInfo)) {
if (pRes->row >= pRes->numOfRows && pMeterMetaInfo->vnodeIndex < pMeterMetaInfo->pMetricMeta->numOfVnodes) {
numOfFetch++;
}
} else {
if (pRes->row >= pRes->numOfRows) {
numOfFetch++;
}
} }
} }
@ -515,8 +543,13 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql) {
// wait for all subqueries completed // wait for all subqueries completed
pSupporter->pState->numOfTotal = numOfFetch; pSupporter->pState->numOfTotal = numOfFetch;
if (pRes1->row >= pRes1->numOfRows && pSupporter->hasMore) {
tscTrace("%p subquery:%p retrieve data from vnode, index:%d", pSql, pSql1, pSupporter->subqueryIndex); assert(pRes1->numOfRows >= 0 && pCmd1->numOfTables == 1);
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd1, 0);
if (pRes1->row >= pRes1->numOfRows) {
tscTrace("%p subquery:%p retrieve data from vnode, subquery:%d, vnodeIndex:%d", pSql, pSql1,
pSupporter->subqueryIndex, pMeterMetaInfo->vnodeIndex);
tscResetForNextRetrieve(pRes1); tscResetForNextRetrieve(pRes1);
@ -541,7 +574,11 @@ void tscSetupOutputColumnIndex(SSqlObj* pSql) {
SSqlRes* pRes = &pSql->res; SSqlRes* pRes = &pSql->res;
tscTrace("%p all subquery response, retrieve data", pSql); tscTrace("%p all subquery response, retrieve data", pSql);
if (pRes->pColumnIndex != NULL) {
return; // the column transfer support struct has been built
}
pRes->pColumnIndex = calloc(1, sizeof(SColumnIndex) * pCmd->fieldsInfo.numOfOutputCols); pRes->pColumnIndex = calloc(1, sizeof(SColumnIndex) * pCmd->fieldsInfo.numOfOutputCols);
for (int32_t i = 0; i < pCmd->fieldsInfo.numOfOutputCols; ++i) { for (int32_t i = 0; i < pCmd->fieldsInfo.numOfOutputCols; ++i) {
@ -631,20 +668,34 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
if (atomic_add_fetch_32(&pSupporter->pState->numOfCompleted, 1) >= pSupporter->pState->numOfTotal) { if (atomic_add_fetch_32(&pSupporter->pState->numOfCompleted, 1) >= pSupporter->pState->numOfTotal) {
tscSetupOutputColumnIndex(pParentSql); tscSetupOutputColumnIndex(pParentSql);
if (pParentSql->fp == NULL) { SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0);
tsem_wait(&pParentSql->emptyRspSem);
tsem_wait(&pParentSql->emptyRspSem); /**
* if the query is a continue query (vnodeIndex > 0 for projection query) for next vnode, do the retrieval of data instead of returning to its invoker
tsem_post(&pParentSql->rspSem); */
} else { if (pMeterMetaInfo->vnodeIndex > 0 && tscProjectionQueryOnMetric(&pSql->cmd)) {
// set the command flag must be after the semaphore been correctly set. assert(pMeterMetaInfo->vnodeIndex < pMeterMetaInfo->pMetricMeta->numOfVnodes);
// pPObj->cmd.command = TSDB_SQL_RETRIEVE_METRIC; pSupporter->pState->numOfCompleted = 0; // reset the record value
// if (pPObj->res.code == TSDB_CODE_SUCCESS) {
// (*pPObj->fp)(pPObj->param, pPObj, 0); pSql->fp = joinRetrieveCallback; // continue retrieve data
// } else { pSql->cmd.command = TSDB_SQL_FETCH;
// tscQueueAsyncRes(pPObj); tscProcessSql(pSql);
// } } else { // first retrieve from vnode during the secondary stage sub-query
assert(0); if (pParentSql->fp == NULL) {
tsem_wait(&pParentSql->emptyRspSem);
tsem_wait(&pParentSql->emptyRspSem);
tsem_post(&pParentSql->rspSem);
} else {
// set the command flag must be after the semaphore been correctly set.
// pPObj->cmd.command = TSDB_SQL_RETRIEVE_METRIC;
// if (pPObj->res.code == TSDB_CODE_SUCCESS) {
// (*pPObj->fp)(pPObj->param, pPObj, 0);
// } else {
// tscQueueAsyncRes(pPObj);
// }
assert(0);
}
} }
} }
} }
@ -1440,7 +1491,7 @@ int32_t tsBufMerge(STSBuf* pDestBuf, const STSBuf* pSrcBuf, int32_t vnodeId) {
assert(pDestBuf->fileSize == oldSize + size); assert(pDestBuf->fileSize == oldSize + size);
tscTrace("tsBuf merge success, %p, path:%s, fd:%d, file size:%d, vnode:%d, autoDelete:%d", pDestBuf, pDestBuf->path, tscTrace("tsBuf merge success, %p, path:%s, fd:%d, file size:%d, numOfVnode:%d, autoDelete:%d", pDestBuf, pDestBuf->path,
fileno(pDestBuf->f), pDestBuf->fileSize, pDestBuf->numOfVnodes, pDestBuf->autoDelete); fileno(pDestBuf->f), pDestBuf->fileSize, pDestBuf->numOfVnodes, pDestBuf->autoDelete);
return 0; return 0;

View File

@ -1020,7 +1020,10 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) {
} }
setColumnOffsetValueInResultset(pCmd); setColumnOffsetValueInResultset(pCmd);
updateTagColumnIndex(pCmd, 0);
for(int32_t i = 0; i < pCmd->numOfTables; ++i) {
updateTagColumnIndex(pCmd, i);
}
break; break;
} }
@ -1796,12 +1799,11 @@ int32_t addProjectionExprAndResultField(SSqlCmd* pCmd, tSQLExprItem* pItem) {
} }
if (index.columnIndex == TSDB_TBNAME_COLUMN_INDEX) { if (index.columnIndex == TSDB_TBNAME_COLUMN_INDEX) {
SColumnIndex index1 = {0, TSDB_TBNAME_COLUMN_INDEX};
SSchema colSchema = {.type = TSDB_DATA_TYPE_BINARY, .bytes = TSDB_METER_NAME_LEN}; SSchema colSchema = {.type = TSDB_DATA_TYPE_BINARY, .bytes = TSDB_METER_NAME_LEN};
strcpy(colSchema.name, TSQL_TBNAME_L); strcpy(colSchema.name, TSQL_TBNAME_L);
pCmd->type = TSDB_QUERY_TYPE_STABLE_QUERY; pCmd->type = TSDB_QUERY_TYPE_STABLE_QUERY;
tscAddSpecialColumnForSelect(pCmd, startPos, TSDB_FUNC_TAGPRJ, &index1, &colSchema, true); tscAddSpecialColumnForSelect(pCmd, startPos, TSDB_FUNC_TAGPRJ, &index, &colSchema, true);
} else { } else {
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, index.tableIndex); SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, index.tableIndex);
SMeterMeta* pMeterMeta = pMeterMetaInfo->pMeterMeta; SMeterMeta* pMeterMeta = pMeterMetaInfo->pMeterMeta;
@ -2739,15 +2741,20 @@ static bool functionCompatibleCheck(SSqlCmd* pCmd) {
void updateTagColumnIndex(SSqlCmd* pCmd, int32_t tableIndex) { void updateTagColumnIndex(SSqlCmd* pCmd, int32_t tableIndex) {
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, tableIndex); SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, tableIndex);
// update tags column index for group by tags /*
for (int32_t i = 0; i < pCmd->groupbyExpr.numOfGroupCols; ++i) { * update tags column index for group by tags
int32_t index = pCmd->groupbyExpr.columnInfo[i].colIdx; * group by columns belong to this table
*/
for (int32_t j = 0; j < pMeterMetaInfo->numOfTags; ++j) { if (pCmd->groupbyExpr.numOfGroupCols > 0 && pCmd->groupbyExpr.tableIndex == tableIndex) {
int32_t tagColIndex = pMeterMetaInfo->tagColumnIndex[j]; for (int32_t i = 0; i < pCmd->groupbyExpr.numOfGroupCols; ++i) {
if (tagColIndex == index) { int32_t index = pCmd->groupbyExpr.columnInfo[i].colIdx;
pCmd->groupbyExpr.columnInfo[i].colIdx = j;
break; for (int32_t j = 0; j < pMeterMetaInfo->numOfTags; ++j) {
int32_t tagColIndex = pMeterMetaInfo->tagColumnIndex[j];
if (tagColIndex == index) {
pCmd->groupbyExpr.columnInfo[i].colIdx = j;
break;
}
} }
} }
} }
@ -2755,9 +2762,15 @@ void updateTagColumnIndex(SSqlCmd* pCmd, int32_t tableIndex) {
// update tags column index for expression // update tags column index for expression
for (int32_t i = 0; i < pCmd->exprsInfo.numOfExprs; ++i) { for (int32_t i = 0; i < pCmd->exprsInfo.numOfExprs; ++i) {
SSqlExpr* pExpr = tscSqlExprGet(pCmd, i); SSqlExpr* pExpr = tscSqlExprGet(pCmd, i);
if (!TSDB_COL_IS_TAG(pExpr->colInfo.flag)) { // not tags, continue if (!TSDB_COL_IS_TAG(pExpr->colInfo.flag)) { // not tags, continue
continue; continue;
} }
// not belongs to this table
if (pExpr->uid != pMeterMetaInfo->pMeterMeta->uid) {
continue;
}
for (int32_t j = 0; j < pMeterMetaInfo->numOfTags; ++j) { for (int32_t j = 0; j < pMeterMetaInfo->numOfTags; ++j) {
if (pExpr->colInfo.colIdx == pMeterMetaInfo->tagColumnIndex[j]) { if (pExpr->colInfo.colIdx == pMeterMetaInfo->tagColumnIndex[j]) {
@ -2766,6 +2779,32 @@ void updateTagColumnIndex(SSqlCmd* pCmd, int32_t tableIndex) {
} }
} }
} }
// update join condition tag column index
SJoinInfo* pJoinInfo = &pCmd->tagCond.joinInfo;
if (!pJoinInfo->hasJoin) { // not join query
return;
}
assert(pJoinInfo->left.uid != pJoinInfo->right.uid);
// the join condition expression node belongs to this table(super table)
if (pMeterMetaInfo->pMeterMeta->uid == pJoinInfo->left.uid) {
for(int32_t i = 0; i < pMeterMetaInfo->numOfTags; ++i) {
if (pJoinInfo->left.tagCol == pMeterMetaInfo->tagColumnIndex[i]) {
pJoinInfo->left.tagCol = i;
}
}
}
if (pMeterMetaInfo->pMeterMeta->uid == pJoinInfo->right.uid) {
for(int32_t i = 0; i < pMeterMetaInfo->numOfTags; ++i) {
if (pJoinInfo->right.tagCol == pMeterMetaInfo->tagColumnIndex[i]) {
pJoinInfo->right.tagCol = i;
}
}
}
} }
int32_t parseGroupbyClause(SSqlCmd* pCmd, tVariantList* pList) { int32_t parseGroupbyClause(SSqlCmd* pCmd, tVariantList* pList) {
@ -2987,8 +3026,6 @@ typedef struct SCondExpr {
static int32_t getTimeRange(int64_t* stime, int64_t* etime, tSQLExpr* pRight, int32_t optr, int16_t timePrecision); static int32_t getTimeRange(int64_t* stime, int64_t* etime, tSQLExpr* pRight, int32_t optr, int16_t timePrecision);
static int32_t doParseWhereClause(SSqlObj* pSql, tSQLExpr** pExpr, SCondExpr* condExpr);
static int32_t tSQLExprNodeToString(tSQLExpr* pExpr, char** str) { static int32_t tSQLExprNodeToString(tSQLExpr* pExpr, char** str) {
if (pExpr->nSQLOptr == TK_ID) { // column name if (pExpr->nSQLOptr == TK_ID) { // column name
strncpy(*str, pExpr->colInfo.z, pExpr->colInfo.n); strncpy(*str, pExpr->colInfo.z, pExpr->colInfo.n);
@ -4018,129 +4055,128 @@ static void cleanQueryExpr(SCondExpr* pCondExpr) {
} }
} }
int32_t parseWhereClause(SSqlObj* pSql, tSQLExpr** pExpr) {
SSqlCmd* pCmd = &pSql->cmd;
static void doAddJoinTagsColumnsIntoTagList(SSqlCmd* pCmd, SCondExpr* pCondExpr) {
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0);
if (QUERY_IS_JOIN_QUERY(pCmd->type) && UTIL_METER_IS_METRIC(pMeterMetaInfo)) {
SColumnIndex index = {0};
getColumnIndexByNameEx(&pCondExpr->pJoinExpr->pLeft->colInfo, pCmd, &index);
pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, index.tableIndex);
int32_t columnInfo = index.columnIndex - pMeterMetaInfo->pMeterMeta->numOfColumns;
addRequiredTagColumn(pCmd, columnInfo, index.tableIndex);
getColumnIndexByNameEx(&pCondExpr->pJoinExpr->pRight->colInfo, pCmd, &index);
pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, index.tableIndex);
columnInfo = index.columnIndex - pMeterMetaInfo->pMeterMeta->numOfColumns;
addRequiredTagColumn(pCmd, columnInfo, index.tableIndex);
}
}
static int32_t getTagQueryCondExpr(SSqlCmd* pCmd, SCondExpr* pCondExpr, tSQLExpr** pExpr) {
int32_t ret = TSDB_CODE_SUCCESS;
if (pCondExpr->pTagCond != NULL) {
for (int32_t i = 0; i < pCmd->numOfTables; ++i) {
tSQLExpr* p1 = extractExprForSTable(pExpr, pCmd, i);
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, i);
char c[TSDB_MAX_TAGS_LEN] = {0};
char* str = c;
if ((ret = getTagCondString(pCmd, p1, &str)) != TSDB_CODE_SUCCESS) {
return ret;
}
tsSetMetricQueryCond(&pCmd->tagCond, pMeterMetaInfo->pMeterMeta->uid, c);
doCompactQueryExpr(pExpr);
tSQLExprDestroy(p1);
}
pCondExpr->pTagCond = NULL;
}
return ret;
}
int32_t parseWhereClause(SSqlObj* pSql, tSQLExpr** pExpr) {
if (pExpr == NULL) { if (pExpr == NULL) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
const char* msg = "invalid filter expression";
const char* msg1 = "invalid expression";
int32_t ret = TSDB_CODE_SUCCESS;
SSqlCmd* pCmd = &pSql->cmd;
pCmd->stime = 0; pCmd->stime = 0;
pCmd->etime = INT64_MAX; pCmd->etime = INT64_MAX;
int32_t ret = TSDB_CODE_SUCCESS; //tags query condition may be larger than 512bytes, therefore, we need to prepare enough large space
SStringBuilder sb = {0};
const char* msg1 = "invalid expression";
SCondExpr condExpr = {0}; SCondExpr condExpr = {0};
if ((*pExpr)->pLeft == NULL || (*pExpr)->pRight == NULL) { if ((*pExpr)->pLeft == NULL || (*pExpr)->pRight == NULL) {
return invalidSqlErrMsg(pCmd, msg1); return invalidSqlErrMsg(pCmd, msg1);
} }
ret = doParseWhereClause(pSql, pExpr, &condExpr); int32_t type = 0;
if (ret != TSDB_CODE_SUCCESS) { if ((ret = getQueryCondExpr(pCmd, pExpr, &condExpr, &type, (*pExpr)->nSQLOptr)) != TSDB_CODE_SUCCESS) {
return ret; return ret;
} }
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0);
if (QUERY_IS_JOIN_QUERY(pCmd->type) && UTIL_METER_IS_METRIC(pMeterMetaInfo)) {
SColumnIndex index = {0};
getColumnIndexByNameEx(&condExpr.pJoinExpr->pLeft->colInfo, pCmd, &index);
pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, index.tableIndex);
int32_t columnInfo = index.columnIndex - pMeterMetaInfo->pMeterMeta->numOfColumns;
addRequiredTagColumn(pCmd, columnInfo, index.tableIndex);
getColumnIndexByNameEx(&condExpr.pJoinExpr->pRight->colInfo, pCmd, &index);
pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, index.tableIndex);
columnInfo = index.columnIndex - pMeterMetaInfo->pMeterMeta->numOfColumns;
addRequiredTagColumn(pCmd, columnInfo, index.tableIndex);
}
cleanQueryExpr(&condExpr);
return ret;
}
int32_t doParseWhereClause(SSqlObj* pSql, tSQLExpr** pExpr, SCondExpr* condExpr) {
const char* msg = "invalid filter expression";
int32_t type = 0;
SSqlCmd* pCmd = &pSql->cmd;
/*
* tags query condition may be larger than 512bytes, therefore, we need to prepare enough large space
*/
SStringBuilder sb = {0};
int32_t ret = TSDB_CODE_SUCCESS;
if ((ret = getQueryCondExpr(pCmd, pExpr, condExpr, &type, (*pExpr)->nSQLOptr)) != TSDB_CODE_SUCCESS) {
return ret;
}
doCompactQueryExpr(pExpr);
// after expression compact, the expression tree is only include tag query condition
condExpr->pTagCond = (*pExpr);
// 1. check if it is a join query
if ((ret = validateJoinExpr(pCmd, condExpr)) != TSDB_CODE_SUCCESS) {
return ret;
}
// 2. get the query time range
if ((ret = getTimeRangeFromExpr(pCmd, condExpr->pTimewindow)) != TSDB_CODE_SUCCESS) {
return ret;
}
// 3. get the tag query condition
if (condExpr->pTagCond != NULL) {
for (int32_t i = 0; i < pCmd->numOfTables; ++i) {
tSQLExpr* p1 = extractExprForSTable(pExpr, pCmd, i);
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, i);
char c[TSDB_MAX_TAGS_LEN] = {0};
char* str = c;
if ((ret = getTagCondString(pCmd, p1, &str)) != TSDB_CODE_SUCCESS) {
return ret;
}
tsSetMetricQueryCond(&pCmd->tagCond, pMeterMetaInfo->pMeterMeta->uid, c);
doCompactQueryExpr(pExpr);
tSQLExprDestroy(p1);
}
condExpr->pTagCond = NULL;
}
// 4. get the table name query condition
if ((ret = getTablenameCond(pCmd, condExpr->pTableCond, &sb)) != TSDB_CODE_SUCCESS) {
return ret;
}
// 5. other column query condition
if ((ret = getColumnQueryCondInfo(pCmd, condExpr->pColumnCond, TK_AND)) != TSDB_CODE_SUCCESS) {
return ret;
}
// 6. join condition
if ((ret = getJoinCondInfo(pSql, condExpr->pJoinExpr)) != TSDB_CODE_SUCCESS) {
return ret;
}
// 7. query condition for table name
pCmd->tagCond.relType = (condExpr->relType == TK_AND) ? TSDB_RELATION_AND : TSDB_RELATION_OR;
ret = setTableCondForMetricQuery(pSql, condExpr->pTableCond, condExpr->tableCondIndex, &sb); doCompactQueryExpr(pExpr);
// after expression compact, the expression tree is only include tag query condition
condExpr.pTagCond = (*pExpr);
// 1. check if it is a join query
if ((ret = validateJoinExpr(pCmd, &condExpr)) != TSDB_CODE_SUCCESS) {
return ret;
}
// 2. get the query time range
if ((ret = getTimeRangeFromExpr(pCmd, condExpr.pTimewindow)) != TSDB_CODE_SUCCESS) {
return ret;
}
// 3. get the tag query condition
if ((ret = getTagQueryCondExpr(pCmd, &condExpr, pExpr)) != TSDB_CODE_SUCCESS) {
return ret;
}
// 4. get the table name query condition
if ((ret = getTablenameCond(pCmd, condExpr.pTableCond, &sb)) != TSDB_CODE_SUCCESS) {
return ret;
}
// 5. other column query condition
if ((ret = getColumnQueryCondInfo(pCmd, condExpr.pColumnCond, TK_AND)) != TSDB_CODE_SUCCESS) {
return ret;
}
// 6. join condition
if ((ret = getJoinCondInfo(pSql, condExpr.pJoinExpr)) != TSDB_CODE_SUCCESS) {
return ret;
}
// 7. query condition for table name
pCmd->tagCond.relType = (condExpr.relType == TK_AND) ? TSDB_RELATION_AND : TSDB_RELATION_OR;
ret = setTableCondForMetricQuery(pSql, condExpr.pTableCond, condExpr.tableCondIndex, &sb);
taosStringBuilderDestroy(&sb); taosStringBuilderDestroy(&sb);
if (!validateFilterExpr(pCmd)) { if (!validateFilterExpr(pCmd)) {
return invalidSqlErrMsg(pCmd, msg); return invalidSqlErrMsg(pCmd, msg);
} }
doAddJoinTagsColumnsIntoTagList(pCmd, &condExpr);
cleanQueryExpr(&condExpr);
return ret; return ret;
} }
@ -5684,3 +5720,30 @@ int32_t tscCheckCreateDbParams(SSqlCmd* pCmd, SCreateDbMsg *pCreate) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
// for debug purpose
void tscPrintSelectClause(SSqlCmd* pCmd) {
if (pCmd == NULL || pCmd->exprsInfo.numOfExprs == 0) {
return;
}
char* str = calloc(1, 10240);
int32_t offset = 0;
offset += sprintf(str, "%d [", pCmd->exprsInfo.numOfExprs);
for(int32_t i = 0; i < pCmd->exprsInfo.numOfExprs; ++i) {
SSqlExpr* pExpr = tscSqlExprGet(pCmd, i);
int32_t size = sprintf(str + offset, "%s(%d)", aAggs[pExpr->functionId].aName, pExpr->colInfo.colId);
offset += size;
if (i < pCmd->exprsInfo.numOfExprs - 1) {
str[offset++] = ',';
}
}
str[offset] = ']';
printf("%s\n", str);
free(str);
}

View File

@ -695,8 +695,6 @@ int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSubquerySu
pExpr->param->i64Key = tagColIndex; pExpr->param->i64Key = tagColIndex;
pExpr->numOfParams = 1; pExpr->numOfParams = 1;
addRequiredTagColumn(pCmd, tagColIndex, 0);
// add the filter tag column // add the filter tag column
for (int32_t i = 0; i < pSupporter->colList.numOfCols; ++i) { for (int32_t i = 0; i < pSupporter->colList.numOfCols; ++i) {
SColumnBase *pColBase = &pSupporter->colList.pColList[i]; SColumnBase *pColBase = &pSupporter->colList.pColList[i];
@ -708,7 +706,11 @@ int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSubquerySu
} else { } else {
pNew->cmd.type |= TSDB_QUERY_TYPE_SUBQUERY; pNew->cmd.type |= TSDB_QUERY_TYPE_SUBQUERY;
} }
#ifdef _DEBUG_VIEW
tscPrintSelectClause(&pNew->cmd);
#endif
return tscProcessSql(pNew); return tscProcessSql(pNew);
} }

View File

@ -458,14 +458,48 @@ static void **tscJoinResultsetFromBuf(SSqlObj *pSql) {
while (1) { while (1) {
bool hasData = true; bool hasData = true;
if (tscProjectionQueryOnMetric(pCmd)) {
bool allSubqueryExhausted = true;
for (int32_t i = 0; i < pSql->numOfSubs; ++i) {
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->pSubs[i]->cmd, 0);
if (pMeterMetaInfo->vnodeIndex < pMeterMetaInfo->pMetricMeta->numOfVnodes) {
allSubqueryExhausted = false;
break;
}
}
hasData = !allSubqueryExhausted;
} else { //otherwise, in case inner join, if any subquery exhausted, query completed.
for (int32_t i = 0; i < pSql->numOfSubs; ++i) {
SSqlRes *pRes1 = &pSql->pSubs[i]->res;
if (pRes1->numOfRows == 0) {
hasData = false;
break;
}
}
}
for (int32_t i = 0; i < pSql->numOfSubs; ++i) { for (int32_t i = 0; i < pSql->numOfSubs; ++i) {
SSqlRes *pRes1 = &pSql->pSubs[i]->res; SSqlRes *pRes1 = &pSql->pSubs[i]->res;
SMeterMetaInfo* pMeterMeta = tscGetMeterMetaInfo(&pSql->pSubs[i]->cmd, 0);
// in case inner join, if any subquery exhausted, query completed
if (pRes1->numOfRows == 0) { if (tscProjectionQueryOnMetric(pCmd)) {
hasData = false; //For multi-vnode projection query, the results may locate in following vnode, so we needs to go on
break; if (pMeterMeta->vnodeIndex < pMeterMeta->pMetricMeta->numOfVnodes) {
break;
}
} else { //otherwise, in case inner join, if any subquery exhausted, query completed.
if (pRes1->numOfRows == 0) {
hasData = false;
break;
}
} }
// if (pRes1->numOfRows == 0 && !tscProjectionQueryOnMetric(pCmd) ||
// (pMeterMeta->vnodeIndex >= pMeterMeta->pMetricMeta->numOfVnodes && )) {
// hasData = false;
// break;
// }
} }
if (!hasData) { // free all sub sqlobj if (!hasData) { // free all sub sqlobj
@ -487,34 +521,26 @@ static void **tscJoinResultsetFromBuf(SSqlObj *pSql) {
} }
if (pRes->tsrow == NULL) { if (pRes->tsrow == NULL) {
pRes->tsrow = malloc(sizeof(void *) * pCmd->exprsInfo.numOfExprs); pRes->tsrow = malloc(POINTER_BYTES * pCmd->exprsInfo.numOfExprs);
} }
bool success = false; bool success = false;
if (pSql->numOfSubs >= 2) { if (pSql->numOfSubs >= 2) { // do merge result
// do merge result
SSqlRes *pRes1 = &pSql->pSubs[0]->res; SSqlRes *pRes1 = &pSql->pSubs[0]->res;
SSqlRes *pRes2 = &pSql->pSubs[1]->res; SSqlRes *pRes2 = &pSql->pSubs[1]->res;
while (pRes1->row < pRes1->numOfRows && pRes2->row < pRes2->numOfRows) { if (pRes1->row < pRes1->numOfRows && pRes2->row < pRes2->numOfRows) {
doSetResultRowData(pSql->pSubs[0]); doSetResultRowData(pSql->pSubs[0]);
doSetResultRowData(pSql->pSubs[1]); doSetResultRowData(pSql->pSubs[1]);
// TSKEY key1 = *(TSKEY *)pRes1->tsrow[0];
TSKEY key1 = *(TSKEY *)pRes1->tsrow[0]; // TSKEY key2 = *(TSKEY *)pRes2->tsrow[0];
TSKEY key2 = *(TSKEY *)pRes2->tsrow[0]; // printf("first:%lld, second:%lld\n", key1, key2);
success = true;
if (key1 == key2) { pRes1->row++;
success = true; pRes2->row++;
pRes1->row++;
pRes2->row++;
break;
} else if (key1 < key2) {
pRes1->row++;
} else if (key1 > key2) {
pRes2->row++;
}
} }
} else {
} else { // only one subquery
SSqlRes *pRes1 = &pSql->pSubs[0]->res; SSqlRes *pRes1 = &pSql->pSubs[0]->res;
doSetResultRowData(pSql->pSubs[0]); doSetResultRowData(pSql->pSubs[0]);
@ -553,9 +579,12 @@ TAOS_ROW taos_fetch_row_impl(TAOS_RES *res) {
if (pCmd->command == TSDB_SQL_METRIC_JOIN_RETRIEVE) { if (pCmd->command == TSDB_SQL_METRIC_JOIN_RETRIEVE) {
tscFetchDatablockFromSubquery(pSql); tscFetchDatablockFromSubquery(pSql);
if (pRes->code == TSDB_CODE_SUCCESS) { if (pRes->code == TSDB_CODE_SUCCESS) {
tscTrace("%p data from all subqueries have been retrieved to client", pSql);
return tscJoinResultsetFromBuf(pSql); return tscJoinResultsetFromBuf(pSql);
} else { } else {
tscTrace("%p retrieve data from subquery failed, code:%d", pSql, pRes->code);
return NULL; return NULL;
} }

View File

@ -1538,7 +1538,7 @@ SMeterMetaInfo* tscAddMeterMetaInfo(SSqlCmd* pCmd, const char* name, SMeterMeta*
pMeterMetaInfo->numOfTags = numOfTags; pMeterMetaInfo->numOfTags = numOfTags;
if (tags != NULL) { if (tags != NULL) {
memcpy(pMeterMetaInfo->tagColumnIndex, tags, sizeof(int16_t) * numOfTags); memcpy(pMeterMetaInfo->tagColumnIndex, tags, sizeof(pMeterMetaInfo->tagColumnIndex[0]) * numOfTags);
} }
pCmd->numOfTables += 1; pCmd->numOfTables += 1;
@ -1673,6 +1673,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
char key[TSDB_MAX_TAGS_LEN + 1] = {0}; char key[TSDB_MAX_TAGS_LEN + 1] = {0};
tscGetMetricMetaCacheKey(pCmd, key, pMetermetaInfo->pMeterMeta->uid); tscGetMetricMetaCacheKey(pCmd, key, pMetermetaInfo->pMeterMeta->uid);
printf("-----%s\n", key);
char* name = pMeterMetaInfo->name; char* name = pMeterMetaInfo->name;
SMeterMetaInfo* pFinalInfo = NULL; SMeterMetaInfo* pFinalInfo = NULL;