[td-186]support query on tags/tbname
This commit is contained in:
parent
9c5e9268d0
commit
c03e84c1bd
|
@ -73,8 +73,8 @@ typedef struct STableMetaInfo {
|
|||
SVgroupsInfo *vgroupList;
|
||||
|
||||
/*
|
||||
* 1. keep the vnode index during the multi-vnode super table projection query
|
||||
* 2. keep the vnode index for multi-vnode insertion
|
||||
* 1. keep the vgroup index during the multi-vnode super table projection query
|
||||
* 2. keep the vgroup index for multi-vnode insertion
|
||||
*/
|
||||
int32_t vgroupIndex;
|
||||
char name[TSDB_TABLE_ID_LEN]; // (super) table name
|
||||
|
|
|
@ -304,118 +304,6 @@ static int32_t tscProcessDescribeTable(SSqlObj *pSql) {
|
|||
return tscSetValueToResObj(pSql, rowLen);
|
||||
}
|
||||
|
||||
// todo add order support
|
||||
static int tscBuildMetricTagProjectionResult(SSqlObj *pSql) {
|
||||
#if 0
|
||||
// the result structure has been completed in sql parse, so we
|
||||
// only need to reorganize the results in the column format
|
||||
SSqlCmd * pCmd = &pSql->cmd;
|
||||
SSqlRes * pRes = &pSql->res;
|
||||
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
|
||||
|
||||
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
|
||||
|
||||
SSuperTableMeta *pMetricMeta = pTableMetaInfo->pMetricMeta;
|
||||
SSchema * pSchema = tscGetTableTagSchema(pTableMetaInfo->pTableMeta);
|
||||
|
||||
int32_t vOffset[TSDB_MAX_COLUMNS] = {0};
|
||||
|
||||
for (int32_t f = 1; f < pTableMetaInfo->numOfTags; ++f) {
|
||||
int16_t tagColumnIndex = pTableMetaInfo->tagColumnIndex[f - 1];
|
||||
if (tagColumnIndex == -1) {
|
||||
vOffset[f] = vOffset[f - 1] + TSDB_TABLE_NAME_LEN;
|
||||
} else {
|
||||
vOffset[f] = vOffset[f - 1] + pSchema[tagColumnIndex].bytes;
|
||||
}
|
||||
}
|
||||
|
||||
int32_t totalNumOfResults = pMetricMeta->numOfTables;
|
||||
int32_t rowLen = tscGetResRowLength(pQueryInfo->exprList);
|
||||
|
||||
tscInitResObjForLocalQuery(pSql, totalNumOfResults, rowLen);
|
||||
|
||||
int32_t rowIdx = 0;
|
||||
for (int32_t i = 0; i < pMetricMeta->numOfVnodes; ++i) {
|
||||
SVnodeSidList *pSidList = (SVnodeSidList *)((char *)pMetricMeta + pMetricMeta->list[i]);
|
||||
|
||||
for (int32_t j = 0; j < pSidList->numOfSids; ++j) {
|
||||
STableIdInfo *pSidExt = tscGetMeterSidInfo(pSidList, j);
|
||||
|
||||
for (int32_t k = 0; k < pQueryInfo->fieldsInfo.numOfOutput; ++k) {
|
||||
SColIndex *pColIndex = &tscSqlExprGet(pQueryInfo, k)->colInfo;
|
||||
int16_t offsetId = pColIndex->colIdx;
|
||||
|
||||
assert((pColIndex->flag & TSDB_COL_TAG) != 0);
|
||||
assert(0);
|
||||
|
||||
char * val = NULL;//pSidExt->tags + vOffset[offsetId];
|
||||
TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, k);
|
||||
|
||||
memcpy(pRes->data + tscFieldInfoGetOffset(pQueryInfo, k) * totalNumOfResults + pField->bytes * rowIdx, val,
|
||||
(size_t)pField->bytes);
|
||||
}
|
||||
rowIdx++;
|
||||
}
|
||||
}
|
||||
|
||||
#endif
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int tscBuildMetricTagSqlFunctionResult(SSqlObj *pSql) {
|
||||
// SSqlCmd *pCmd = &pSql->cmd;
|
||||
// SSqlRes *pRes = &pSql->res;
|
||||
|
||||
// SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
|
||||
#if 0
|
||||
SSuperTableMeta *pMetricMeta = tscGetMetaInfo(pQueryInfo, 0)->pMetricMeta;
|
||||
int32_t totalNumOfResults = 1; // count function only produce one result
|
||||
int32_t rowLen = tscGetResRowLength(pQueryInfo->exprList);
|
||||
|
||||
tscInitResObjForLocalQuery(pSql, totalNumOfResults, rowLen);
|
||||
|
||||
int32_t rowIdx = 0;
|
||||
for (int32_t i = 0; i < totalNumOfResults; ++i) {
|
||||
for (int32_t k = 0; k < pQueryInfo->fieldsInfo.numOfOutput; ++k) {
|
||||
SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, i);
|
||||
|
||||
if (pExpr->colInfo.colIdx == -1 && pExpr->functionId == TSDB_FUNC_COUNT) {
|
||||
TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, k);
|
||||
|
||||
memcpy(pRes->data + tscFieldInfoGetOffset(pQueryInfo, i) * totalNumOfResults + pField->bytes * rowIdx,
|
||||
&pMetricMeta->numOfTables, sizeof(pMetricMeta->numOfTables));
|
||||
} else {
|
||||
tscError("not support operations");
|
||||
continue;
|
||||
}
|
||||
}
|
||||
rowIdx++;
|
||||
}
|
||||
#endif
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int tscProcessQueryTags(SSqlObj *pSql) {
|
||||
SSqlCmd *pCmd = &pSql->cmd;
|
||||
|
||||
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
|
||||
|
||||
STableMeta *pTableMeta = tscGetMetaInfo(pQueryInfo, 0)->pTableMeta;
|
||||
if (pTableMeta == NULL || tscGetNumOfTags(pTableMeta) == 0 || tscGetNumOfColumns(pTableMeta) == 0) {
|
||||
strcpy(pCmd->payload, "invalid table");
|
||||
pSql->res.code = TSDB_CODE_INVALID_TABLE;
|
||||
return pSql->res.code;
|
||||
}
|
||||
|
||||
SSqlExpr *pExpr = taosArrayGetP(pQueryInfo->exprList, 0);
|
||||
if (pExpr->functionId == TSDB_FUNC_COUNT) {
|
||||
return tscBuildMetricTagSqlFunctionResult(pSql);
|
||||
} else {
|
||||
return tscBuildMetricTagProjectionResult(pSql);
|
||||
}
|
||||
}
|
||||
|
||||
static void tscProcessCurrentUser(SSqlObj *pSql) {
|
||||
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
|
||||
|
||||
|
@ -503,8 +391,6 @@ int tscProcessLocalCmd(SSqlObj *pSql) {
|
|||
pSql->res.code = (uint8_t)taosCfgDynamicOptions(pCmd->payload);
|
||||
} else if (pCmd->command == TSDB_SQL_DESCRIBE_TABLE) {
|
||||
pSql->res.code = (uint8_t)tscProcessDescribeTable(pSql);
|
||||
} else if (pCmd->command == TSDB_SQL_RETRIEVE_TAGS) {
|
||||
pSql->res.code = (uint8_t)tscProcessQueryTags(pSql);
|
||||
} else if (pCmd->command == TSDB_SQL_RETRIEVE_EMPTY_RESULT) {
|
||||
/*
|
||||
* set the qhandle to be 1 in order to pass the qhandle check, and to call partial release function to
|
||||
|
|
|
@ -1264,14 +1264,6 @@ int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSel
|
|||
|
||||
if (isSTable) {
|
||||
pQueryInfo->type |= TSDB_QUERY_TYPE_STABLE_QUERY;
|
||||
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
|
||||
int32_t numOfCols = tscGetNumOfColumns(pTableMetaInfo->pTableMeta);
|
||||
|
||||
if (tscQueryTags(pQueryInfo)) { // local handle the metric tag query
|
||||
pCmd->count = numOfCols; // the number of meter schema, tricky.
|
||||
pQueryInfo->command = TSDB_SQL_RETRIEVE_TAGS;
|
||||
}
|
||||
|
||||
/*
|
||||
* transfer sql functions that need secondary merge into another format
|
||||
* in dealing with metric queries such as: count/first/last
|
||||
|
@ -1321,7 +1313,7 @@ SSqlExpr* doAddProjectCol(SQueryInfo* pQueryInfo, int32_t outputIndex, int32_t c
|
|||
if (functionId == TSDB_FUNC_TAGPRJ) {
|
||||
index.columnIndex = colIndex - tscGetNumOfColumns(pTableMeta);
|
||||
|
||||
addRequiredTagColumn(pTableMetaInfo, &index);
|
||||
tscColumnListInsert(pTableMetaInfo->tagColList, &index);
|
||||
pQueryInfo->type = TSDB_QUERY_TYPE_STABLE_QUERY;
|
||||
} else {
|
||||
index.columnIndex = colIndex;
|
||||
|
@ -1333,7 +1325,7 @@ SSqlExpr* doAddProjectCol(SQueryInfo* pQueryInfo, int32_t outputIndex, int32_t c
|
|||
}
|
||||
|
||||
void addRequiredTagColumn(STableMetaInfo* pTableMetaInfo, SColumnIndex* index) {
|
||||
tscColumnListInsert(pTableMetaInfo->tagColList, index);
|
||||
|
||||
}
|
||||
|
||||
static void addProjectQueryCol(SQueryInfo* pQueryInfo, int32_t startPos, SColumnIndex* pIndex, tSQLExprItem* pItem) {
|
||||
|
@ -1374,7 +1366,7 @@ void tscAddSpecialColumnForSelect(SQueryInfo* pQueryInfo, int32_t outputColIndex
|
|||
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, pIndex->tableIndex);
|
||||
|
||||
if (TSDB_COL_IS_TAG(flag)) {
|
||||
addRequiredTagColumn(pTableMetaInfo, pIndex);
|
||||
tscColumnListInsert(pTableMetaInfo->tagColList, pIndex);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2532,12 +2524,11 @@ int32_t parseGroupbyClause(SQueryInfo* pQueryInfo, tVariantList* pList, SSqlCmd*
|
|||
relIndex -= numOfCols;
|
||||
}
|
||||
|
||||
SColIndex colIndex = {
|
||||
.colIndex = relIndex, .flag = TSDB_COL_TAG, .colId = pSchema->colId,
|
||||
};
|
||||
|
||||
SColIndex colIndex = { .colIndex = relIndex, .flag = TSDB_COL_TAG, .colId = pSchema->colId, };
|
||||
taosArrayPush(pGroupExpr->columnInfo, &colIndex);
|
||||
addRequiredTagColumn(pTableMetaInfo, &index);
|
||||
|
||||
index.columnIndex = relIndex;
|
||||
tscColumnListInsert(pTableMetaInfo->tagColList, &index);
|
||||
} else {
|
||||
// check if the column type is valid, here only support the bool/tinyint/smallint/bigint group by
|
||||
if (pSchema->type > TSDB_DATA_TYPE_BINARY) {
|
||||
|
@ -3724,13 +3715,13 @@ static void doAddJoinTagsColumnsIntoTagList(SQueryInfo* pQueryInfo, SCondExpr* p
|
|||
pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex);
|
||||
|
||||
// int32_t columnInfo = index.columnIndex - tscGetNumOfColumns(pTableMetaInfo->pTableMeta);
|
||||
addRequiredTagColumn(pTableMetaInfo, &index);
|
||||
tscColumnListInsert(pTableMetaInfo->tagColList, &index);
|
||||
|
||||
getColumnIndexByName(&pCondExpr->pJoinExpr->pRight->colInfo, pQueryInfo, &index);
|
||||
pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex);
|
||||
|
||||
// columnInfo = index.columnIndex - tscGetNumOfColumns(pTableMetaInfo->pTableMeta);
|
||||
addRequiredTagColumn(pTableMetaInfo, &index);
|
||||
tscColumnListInsert(pTableMetaInfo->tagColList, &index);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -3758,7 +3749,7 @@ static int32_t getTagQueryCondExpr(SQueryInfo* pQueryInfo, SCondExpr* pCondExpr,
|
|||
for(int32_t j = 0; j < num; ++j) {
|
||||
SColIndex* pIndex = taosArrayGet(colList, j);
|
||||
SColumnIndex index = {.tableIndex = i, .columnIndex = pIndex->colIndex - numOfCols};
|
||||
addRequiredTagColumn(pTableMetaInfo, &index);
|
||||
tscColumnListInsert(pTableMetaInfo->tagColList, &index);
|
||||
}
|
||||
|
||||
tsSetSTableQueryCond(&pQueryInfo->tagCond, uid, &buf);
|
||||
|
@ -4668,14 +4659,9 @@ int32_t parseLimitClause(SQueryInfo* pQueryInfo, int32_t clauseIndex, SQuerySQL*
|
|||
}
|
||||
|
||||
if (UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) {
|
||||
bool queryOnTags = false;
|
||||
// if (tscQueryOnlyMetricTags(pQueryInfo, &queryOnTags) != TSDB_CODE_SUCCESS) {
|
||||
// return TSDB_CODE_INVALID_SQL;
|
||||
// }
|
||||
bool queryOnTags = tscQueryTags(pQueryInfo);
|
||||
|
||||
if (queryOnTags == true) { // local handle the super table tag query
|
||||
pQueryInfo->command = TSDB_SQL_RETRIEVE_TAGS;
|
||||
} else {
|
||||
if (queryOnTags != true) { // local handle the super table tag query
|
||||
if (tscIsProjectionQueryOnSTable(pQueryInfo, 0)) {
|
||||
if (pQueryInfo->slimit.limit > 0 || pQueryInfo->slimit.offset > 0) {
|
||||
return invalidSqlErrMsg(pQueryInfo->msg, msg3);
|
||||
|
@ -4709,6 +4695,7 @@ int32_t parseLimitClause(SQueryInfo* pQueryInfo, int32_t clauseIndex, SQuerySQL*
|
|||
if (pTableMetaInfo->vgroupList->numOfVgroups == 0) {
|
||||
tscTrace("%p no table in super table, no output result", pSql);
|
||||
pQueryInfo->command = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
// keep original limitation value in globalLimit
|
||||
|
@ -4888,7 +4875,7 @@ void addGroupInfoForSubquery(SSqlObj* pParentObj, SSqlObj* pSql, int32_t subClau
|
|||
pColIndex->colIndex = relIndex;
|
||||
|
||||
index = (SColumnIndex) {.tableIndex = tableIndex, .columnIndex = relIndex};
|
||||
addRequiredTagColumn(pTableMetaInfo, &index);
|
||||
tscColumnListInsert(pTableMetaInfo->tagColList, &index);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -5209,7 +5196,7 @@ int32_t doFunctionsCompatibleCheck(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) {
|
|||
const char* msg4 = "retrieve tags not compatible with group by or interval query";
|
||||
|
||||
// only retrieve tags, group by is not supportted
|
||||
if (pCmd->command == TSDB_SQL_RETRIEVE_TAGS) {
|
||||
if (tscQueryTags(pQueryInfo)) {
|
||||
if (pQueryInfo->groupbyExpr.numOfGroupCols > 0 || pQueryInfo->intervalTime > 0) {
|
||||
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg4);
|
||||
} else {
|
||||
|
|
|
@ -622,7 +622,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
|||
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
|
||||
STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
|
||||
|
||||
if (taosArrayGetSize(pQueryInfo->colList) <= 0) {
|
||||
if (taosArrayGetSize(pQueryInfo->colList) <= 0 && !tscQueryTags(pQueryInfo)) {
|
||||
tscError("%p illegal value of numOfCols in query msg: %d", pSql, tscGetNumOfColumns(pTableMeta));
|
||||
return -1;
|
||||
}
|
||||
|
@ -827,7 +827,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
|||
pSchema = tscGetTableTagSchema(pTableMeta);
|
||||
|
||||
for (int32_t i = 0; i < numOfTags; ++i) {
|
||||
SColumn *pCol = taosArrayGetP(pQueryInfo->colList, i);
|
||||
SColumn *pCol = taosArrayGetP(pTableMetaInfo->tagColList, i);
|
||||
SSchema *pColSchema = &pSchema[pCol->colIndex.columnIndex];
|
||||
|
||||
if ((pCol->colIndex.columnIndex >= numOfTagColumns || pCol->colIndex.columnIndex < -1) ||
|
||||
|
@ -2642,7 +2642,6 @@ void tscInitMsgsFp() {
|
|||
tscProcessMsgRsp[TSDB_SQL_RETRIEVE] = tscProcessRetrieveRspFromNode; // rsp handled by same function.
|
||||
tscProcessMsgRsp[TSDB_SQL_DESCRIBE_TABLE] = tscProcessDescribeTableRsp;
|
||||
|
||||
tscProcessMsgRsp[TSDB_SQL_RETRIEVE_TAGS] = tscProcessTagRetrieveRsp;
|
||||
tscProcessMsgRsp[TSDB_SQL_CURRENT_DB] = tscProcessTagRetrieveRsp;
|
||||
tscProcessMsgRsp[TSDB_SQL_CURRENT_USER] = tscProcessTagRetrieveRsp;
|
||||
tscProcessMsgRsp[TSDB_SQL_SERV_VERSION] = tscProcessTagRetrieveRsp;
|
||||
|
|
|
@ -994,7 +994,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
|
|||
|
||||
// pRes->code check only serves in launching metric sub-queries
|
||||
if (pRes->code == TSDB_CODE_QUERY_CANCELLED) {
|
||||
pCmd->command = TSDB_SQL_RETRIEVE_METRIC; // enable the abort of kill metric function.
|
||||
pCmd->command = TSDB_SQL_RETRIEVE_METRIC; // enable the abort of kill super table function.
|
||||
return pRes->code;
|
||||
}
|
||||
|
||||
|
@ -1015,10 +1015,8 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
|
|||
int32_t ret = tscLocalReducerEnvCreate(pSql, &pMemoryBuf, &pDesc, &pModel, nBufferSize);
|
||||
if (ret != 0) {
|
||||
pRes->code = TSDB_CODE_CLI_OUT_OF_MEMORY;
|
||||
if (pSql->fp) {
|
||||
tscQueueAsyncRes(pSql);
|
||||
}
|
||||
return pRes->code;
|
||||
return ret;
|
||||
}
|
||||
|
||||
pSql->pSubs = calloc(pSql->numOfSubs, POINTER_BYTES);
|
||||
|
@ -1129,9 +1127,7 @@ static void tscAbortFurtherRetryRetrieval(SRetrieveSupport *trsupport, TAOS_RES
|
|||
tscError("sub:%p failed to flush data to disk:reason:%s", tres, lpMsgBuf);
|
||||
LocalFree(lpMsgBuf);
|
||||
#else
|
||||
char buf[256] = {0};
|
||||
strerror_r(errno, buf, 256);
|
||||
tscError("sub:%p failed to flush data to disk:reason:%s", tres, buf);
|
||||
tscError("sub:%p failed to flush data to disk:reason:%s", tres, strerror(errno));
|
||||
#endif
|
||||
|
||||
trsupport->pState->code = -errCode;
|
||||
|
@ -1198,8 +1194,8 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO
|
|||
return;
|
||||
} else { // reach the maximum retry count, abort
|
||||
atomic_val_compare_exchange_32(&pState->code, TSDB_CODE_SUCCESS, numOfRows);
|
||||
tscError("%p sub:%p retrieve failed,code:%d,orderOfSub:%d failed.no more retry,set global code:%d", pPObj, pSql,
|
||||
numOfRows, subqueryIndex, pState->code);
|
||||
tscError("%p sub:%p retrieve failed,code:%s,orderOfSub:%d failed.no more retry,set global code:%d", pPObj, pSql,
|
||||
numOfRows, subqueryIndex, tstrerror(pState->code));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -151,15 +151,11 @@ bool tscIsTwoStageSTableQuery(SQueryInfo* pQueryInfo, int32_t tableIndex) {
|
|||
return false;
|
||||
}
|
||||
|
||||
// for select query super table, the metricmeta can not be null in any cases.
|
||||
// for select query super table, the super table vgroup list can not be null in any cases.
|
||||
if (pQueryInfo->command == TSDB_SQL_SELECT && UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) {
|
||||
// assert(pTableMetaInfo->pMetricMeta != NULL);
|
||||
assert(pTableMetaInfo->vgroupList != NULL);
|
||||
}
|
||||
|
||||
// if (pTableMetaInfo->pMetricMeta == NULL) {
|
||||
// return false;
|
||||
// }
|
||||
|
||||
if ((pQueryInfo->type & TSDB_QUERY_TYPE_FREE_RESOURCE) == TSDB_QUERY_TYPE_FREE_RESOURCE) {
|
||||
return false;
|
||||
}
|
||||
|
@ -191,12 +187,11 @@ bool tscIsProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex) {
|
|||
return false;
|
||||
}
|
||||
|
||||
// only query on tag, not a projection query
|
||||
// only query on tag, a project query
|
||||
if (tscQueryTags(pQueryInfo)) {
|
||||
return false;
|
||||
return true;
|
||||
}
|
||||
|
||||
// for project query, only the following two function is allowed
|
||||
for (int32_t i = 0; i < numOfExprs; ++i) {
|
||||
int32_t functionId = tscSqlExprGet(pQueryInfo, i)->functionId;
|
||||
if (functionId != TSDB_FUNC_PRJ && functionId != TSDB_FUNC_TAGPRJ && functionId != TSDB_FUNC_TAG &&
|
||||
|
@ -1793,11 +1788,11 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
|
|||
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
|
||||
|
||||
pNewQueryInfo->command = pQueryInfo->command;
|
||||
pNewQueryInfo->type = pQueryInfo->type;
|
||||
pNewQueryInfo->slidingTimeUnit = pQueryInfo->slidingTimeUnit;
|
||||
pNewQueryInfo->window = pQueryInfo->window;
|
||||
pNewQueryInfo->intervalTime = pQueryInfo->intervalTime;
|
||||
pNewQueryInfo->slidingTime = pQueryInfo->slidingTime;
|
||||
pNewQueryInfo->type = pQueryInfo->type;
|
||||
pNewQueryInfo->window = pQueryInfo->window;
|
||||
pNewQueryInfo->limit = pQueryInfo->limit;
|
||||
pNewQueryInfo->slimit = pQueryInfo->slimit;
|
||||
pNewQueryInfo->order = pQueryInfo->order;
|
||||
|
@ -1807,6 +1802,11 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
|
|||
pNewQueryInfo->numOfTables = 0;
|
||||
pNewQueryInfo->tsBuf = NULL;
|
||||
|
||||
pNewQueryInfo->groupbyExpr = pQueryInfo->groupbyExpr;
|
||||
if (pQueryInfo->groupbyExpr.columnInfo != NULL) {
|
||||
pNewQueryInfo->groupbyExpr.columnInfo = taosArrayClone(pQueryInfo->groupbyExpr.columnInfo);
|
||||
}
|
||||
|
||||
tscTagCondCopy(&pNewQueryInfo->tagCond, &pQueryInfo->tagCond);
|
||||
|
||||
if (pQueryInfo->interpoType != TSDB_INTERPO_NONE) {
|
||||
|
|
|
@ -97,8 +97,8 @@ int tsdbTableSetName(STableCfg *config, char *name, bool dup);
|
|||
int tsdbTableSetSName(STableCfg *config, char *sname, bool dup);
|
||||
void tsdbClearTableCfg(STableCfg *config);
|
||||
|
||||
int32_t tsdbGetTableTagVal(TsdbRepoT *repo, STableId* id, int32_t col, int16_t *type, int16_t *bytes, char **val);
|
||||
int32_t tsdbTableGetName(TsdbRepoT *repo, STableId* id, char** name);
|
||||
int32_t tsdbGetTableTagVal(TsdbRepoT *repo, STableId* id, int32_t colId, int16_t *type, int16_t *bytes, char **val);
|
||||
int32_t tsdbGetTableName(TsdbRepoT *repo, STableId* id, char** name);
|
||||
|
||||
int tsdbCreateTable(TsdbRepoT *repo, STableCfg *pCfg);
|
||||
int tsdbDropTable(TsdbRepoT *pRepo, STableId tableId);
|
||||
|
|
|
@ -48,7 +48,8 @@ typedef struct tQueryInfo {
|
|||
int32_t colIndex; // index of column in schema
|
||||
uint8_t optr; // expression operator
|
||||
SSchema sch; // schema of tags
|
||||
tVariant q; // query condition value on the specific schema, filter expression
|
||||
// tVariant q; // query condition value on the specific schema, filter expression
|
||||
char* q;
|
||||
__compar_fn_t compare; // filter function
|
||||
} tQueryInfo;
|
||||
|
||||
|
|
|
@ -62,7 +62,6 @@ enum _sql_type {
|
|||
TSDB_SQL_DESCRIBE_TABLE,
|
||||
TSDB_SQL_RETRIEVE_METRIC,
|
||||
TSDB_SQL_METRIC_JOIN_RETRIEVE,
|
||||
TSDB_SQL_RETRIEVE_TAGS,
|
||||
|
||||
/*
|
||||
* build empty result instead of accessing dnode to fetch result
|
||||
|
|
|
@ -36,7 +36,6 @@
|
|||
*
|
||||
* @date 2018-2-15
|
||||
* @version 0.2 operation for column filter
|
||||
* @author liaohj
|
||||
*
|
||||
* @Description parse tag query expression to build ast
|
||||
* ver 0.2, filter the result on first column with high priority to limit the candidate set
|
||||
|
@ -468,7 +467,7 @@ void tExprTreeDestroy(tExprNode **pExpr, void (*fp)(void *)) {
|
|||
}
|
||||
|
||||
typedef struct {
|
||||
tVariant v;
|
||||
char* v;
|
||||
int32_t optr;
|
||||
} SEndPoint;
|
||||
|
||||
|
@ -521,21 +520,19 @@ static int32_t setQueryCond(tQueryInfo *queryColInfo, SQueryCond* pCond) {
|
|||
|
||||
if (optr == TSDB_RELATION_GREATER || optr == TSDB_RELATION_GREATER_EQUAL ||
|
||||
optr == TSDB_RELATION_EQUAL || optr == TSDB_RELATION_NOT_EQUAL) {
|
||||
pCond->start = calloc(1, sizeof(tVariant));
|
||||
tVariantAssign(&pCond->start->v, &queryColInfo->q);
|
||||
pCond->start = calloc(1, sizeof(SEndPoint));
|
||||
pCond->start->optr = queryColInfo->optr;
|
||||
|
||||
pCond->start->v = queryColInfo->q;
|
||||
} else if (optr == TSDB_RELATION_LESS || optr == TSDB_RELATION_LESS_EQUAL) {
|
||||
pCond->end = calloc(1, sizeof(tVariant));
|
||||
tVariantAssign(&pCond->end->v, &queryColInfo->q);
|
||||
pCond->end = calloc(1, sizeof(SEndPoint));
|
||||
pCond->end->optr = queryColInfo->optr;
|
||||
|
||||
pCond->end->v = queryColInfo->q;
|
||||
} else if (optr == TSDB_RELATION_IN) {
|
||||
printf("relation is in\n");
|
||||
|
||||
assert(0);
|
||||
} else if (optr == TSDB_RELATION_LIKE) {
|
||||
printf("relation is like\n");
|
||||
|
||||
assert(0);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -543,18 +540,16 @@ static int32_t setQueryCond(tQueryInfo *queryColInfo, SQueryCond* pCond) {
|
|||
|
||||
static void tQueryIndexColumn(SSkipList* pSkipList, tQueryInfo* pQueryInfo, SArray* result) {
|
||||
SSkipListIterator* iter = NULL;
|
||||
int32_t type = pQueryInfo->q.nType;
|
||||
|
||||
SQueryCond cond = {0};
|
||||
setQueryCond(pQueryInfo, &cond);
|
||||
|
||||
if (cond.start != NULL) {
|
||||
iter = tSkipListCreateIterFromVal(pSkipList, (char*) &cond.start->v.i64Key, type, TSDB_ORDER_ASC);
|
||||
iter = tSkipListCreateIterFromVal(pSkipList, (char*) &cond.start->v, pSkipList->keyInfo.type, TSDB_ORDER_ASC);
|
||||
} else {
|
||||
iter = tSkipListCreateIterFromVal(pSkipList, (char*) &cond.end->v.i64Key, type, TSDB_ORDER_DESC);
|
||||
iter = tSkipListCreateIterFromVal(pSkipList, (char*) &cond.end->v, pSkipList->keyInfo.type, TSDB_ORDER_DESC);
|
||||
}
|
||||
|
||||
__compar_fn_t func = getComparFunc(pSkipList->keyInfo.type, type, 0);
|
||||
__compar_fn_t func = getKeyComparFunc(pSkipList->keyInfo.type);
|
||||
|
||||
if (cond.start != NULL) {
|
||||
int32_t optr = cond.start->optr;
|
||||
|
@ -563,7 +558,7 @@ static void tQueryIndexColumn(SSkipList* pSkipList, tQueryInfo* pQueryInfo, SArr
|
|||
while(tSkipListIterNext(iter)) {
|
||||
SSkipListNode* pNode = tSkipListIterGet(iter);
|
||||
|
||||
int32_t ret = func(SL_GET_NODE_KEY(pSkipList, pNode), &cond.start->v.i64Key);
|
||||
int32_t ret = func(SL_GET_NODE_KEY(pSkipList, pNode), cond.start->v);
|
||||
if (ret == 0) {
|
||||
taosArrayPush(result, SL_GET_NODE_DATA(pNode));
|
||||
} else {
|
||||
|
@ -578,7 +573,7 @@ static void tQueryIndexColumn(SSkipList* pSkipList, tQueryInfo* pQueryInfo, SArr
|
|||
SSkipListNode* pNode = tSkipListIterGet(iter);
|
||||
|
||||
if (comp) {
|
||||
ret = func(SL_GET_NODE_KEY(pSkipList, pNode), &cond.start->v.i64Key);
|
||||
ret = func(SL_GET_NODE_KEY(pSkipList, pNode), cond.start->v);
|
||||
assert(ret >= 0);
|
||||
}
|
||||
|
||||
|
@ -605,7 +600,7 @@ static void tQueryIndexColumn(SSkipList* pSkipList, tQueryInfo* pQueryInfo, SArr
|
|||
SSkipListNode* pNode = tSkipListIterGet(iter);
|
||||
|
||||
if (comp) {
|
||||
ret = func(SL_GET_NODE_KEY(pSkipList, pNode), &cond.end->v.i64Key);
|
||||
ret = func(SL_GET_NODE_KEY(pSkipList, pNode), cond.end->v);
|
||||
assert(ret <= 0);
|
||||
}
|
||||
|
||||
|
@ -699,21 +694,18 @@ int32_t intersect(SArray *pLeft, SArray *pRight, SArray *pFinalRes) {
|
|||
/*
|
||||
* traverse the result and apply the function to each item to check if the item is qualified or not
|
||||
*/
|
||||
static UNUSED_FUNC void tSQLListTraverseOnResult(struct tExprNode *pExpr, __result_filter_fn_t fp, SArray *pResult) {
|
||||
// assert(pExpr->_node.pLeft->nodeType == TSQL_NODE_COL && pExpr->_node.pRight->nodeType == TSQL_NODE_VALUE);
|
||||
//
|
||||
// // brutal force scan the result list and check for each item in the list
|
||||
// int64_t num = pResult->num;
|
||||
// for (int32_t i = 0, j = 0; i < pResult->num; ++i) {
|
||||
// if (fp == NULL || (fp(pResult->pRes[i], pExpr->_node.info) == true)) {
|
||||
// pResult->pRes[j++] = pResult->pRes[i];
|
||||
// } else {
|
||||
// num--;
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// pResult->num = num;
|
||||
assert(0);
|
||||
static void tArrayTraverse(tExprNode *pExpr, __result_filter_fn_t fp, SArray *pResult) {
|
||||
assert(pExpr->_node.pLeft->nodeType == TSQL_NODE_COL && pExpr->_node.pRight->nodeType == TSQL_NODE_VALUE && fp != NULL);
|
||||
|
||||
// scan the result array list and check for each item in the list
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pResult); ++i) {
|
||||
void* item = taosArrayGet(pResult, i);
|
||||
if (fp(item, pExpr->_node.info)) {
|
||||
i++;
|
||||
} else {
|
||||
taosArrayRemove(pResult, i);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static bool filterItem(tExprNode *pExpr, const void *pItem, SBinaryFilterSupp *param) {
|
||||
|
@ -771,12 +763,7 @@ static void exprTreeTraverseImpl(tExprNode *pExpr, SArray *pResult, SBinaryFilte
|
|||
}
|
||||
|
||||
|
||||
static void tSQLBinaryTraverseOnSkipList(
|
||||
tExprNode *pExpr,
|
||||
SArray *pResult,
|
||||
SSkipList *pSkipList,
|
||||
SBinaryFilterSupp *param
|
||||
) {
|
||||
static void tSQLBinaryTraverseOnSkipList(tExprNode *pExpr, SArray *pResult, SSkipList *pSkipList, SBinaryFilterSupp *param ) {
|
||||
SSkipListIterator* iter = tSkipListCreateIter(pSkipList);
|
||||
|
||||
while (tSkipListIterNext(iter)) {
|
||||
|
@ -797,20 +784,26 @@ static void tQueryIndexlessColumn(SSkipList* pSkipList, tQueryInfo* pQueryInfo,
|
|||
bool addToResult = false;
|
||||
|
||||
SSkipListNode *pNode = tSkipListIterGet(iter);
|
||||
STable* table = *(STable**) SL_GET_NODE_DATA(pNode);
|
||||
char* pTable = SL_GET_NODE_DATA(pNode);
|
||||
|
||||
//todo refactor:
|
||||
char* name = (*(STable**) pTable)->name;
|
||||
// char* name = NULL;
|
||||
// tsdbGetTableName(tsdb, pTable, &name);
|
||||
|
||||
// todo speed up by using hash
|
||||
if (pQueryInfo->colIndex == TSDB_TBNAME_COLUMN_INDEX) {
|
||||
if (pQueryInfo->optr == TSDB_RELATION_IN) {
|
||||
addToResult = pQueryInfo->compare(table->name, pQueryInfo->q.arr);
|
||||
addToResult = pQueryInfo->compare(name, pQueryInfo->q);
|
||||
} else if(pQueryInfo->optr == TSDB_RELATION_LIKE) {
|
||||
addToResult = !pQueryInfo->compare(table->name, pQueryInfo->q.pz);
|
||||
addToResult = !pQueryInfo->compare(name, pQueryInfo->q);
|
||||
}
|
||||
} else {
|
||||
// TODO: other columns
|
||||
}
|
||||
|
||||
if (addToResult) {
|
||||
taosArrayPush(result, (void*)&table);
|
||||
taosArrayPush(result, pTable);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -834,7 +827,7 @@ void tExprTreeTraverse(tExprNode *pExpr, SSkipList *pSkipList, SArray *result, S
|
|||
|
||||
param->setupInfoFn(pExpr, param->pExtInfo);
|
||||
if (pSkipList == NULL) {
|
||||
tSQLListTraverseOnResult(pExpr, param->fp, result);
|
||||
tArrayTraverse(pExpr, param->fp, result);
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -920,7 +913,6 @@ void tExprTreeTraverse(tExprNode *pExpr, SSkipList *pSkipList, SArray *result, S
|
|||
tExprTreeTraverse(pSecond, NULL, result, param);
|
||||
}
|
||||
|
||||
|
||||
void tExprTreeCalcTraverse(tExprNode *pExprs, int32_t numOfRows, char *pOutput, void *param, int32_t order,
|
||||
char *(*getSourceDataBlock)(void *, const char*, int32_t)) {
|
||||
if (pExprs == NULL) {
|
||||
|
|
|
@ -1651,8 +1651,7 @@ static bool needReverseScan(SQuery *pQuery) {
|
|||
static bool onlyQueryTags(SQuery* pQuery) {
|
||||
for(int32_t i = 0; i < pQuery->numOfOutput; ++i) {
|
||||
int32_t functionId = pQuery->pSelectExpr[i].base.functionId;
|
||||
|
||||
if (functionId != TSDB_FUNC_TAG) {
|
||||
if (functionId != TSDB_FUNC_TAGPRJ) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
@ -2548,7 +2547,7 @@ static void doSetTagValueInParam(void *tsdb, STableId* pTableId, int32_t tagColI
|
|||
int16_t type = 0;
|
||||
|
||||
if (tagColId == TSDB_TBNAME_COLUMN_INDEX) {
|
||||
tsdbTableGetName(tsdb, pTableId, &val);
|
||||
tsdbGetTableName(tsdb, pTableId, &val);
|
||||
bytes = TSDB_TABLE_NAME_LEN;
|
||||
type = TSDB_DATA_TYPE_BINARY;
|
||||
} else {
|
||||
|
@ -4380,6 +4379,10 @@ static int64_t queryOnDataBlocks(SQInfo *pQInfo) {
|
|||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (pTableQueryInfo != NULL) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
assert(pTableQueryInfo != NULL && pTableQueryInfo != NULL);
|
||||
|
@ -4852,8 +4855,7 @@ static void multiTableQueryProcess(SQInfo *pQInfo) {
|
|||
|
||||
// do check all qualified data blocks
|
||||
int64_t el = queryOnDataBlocks(pQInfo);
|
||||
qTrace("QInfo:%p forward scan completed, elapsed time: %lldms, reversed scan start, order:%d", pQInfo, el,
|
||||
pQuery->order.order ^ 1u);
|
||||
qTrace("QInfo:%p forward scan completed, elapsed time: %lldms, reversed scan start", pQInfo, el);
|
||||
|
||||
// query error occurred or query is killed, abort current execution
|
||||
if (pQInfo->code != TSDB_CODE_SUCCESS || isQueryKilled(pQInfo)) {
|
||||
|
@ -4883,8 +4885,6 @@ static void multiTableQueryProcess(SQInfo *pQInfo) {
|
|||
}
|
||||
|
||||
if (isIntervalQuery(pQuery) || isSumAvgRateQuery(pQuery)) {
|
||||
// assert(pSupporter->groupIndex == 0 && pSupporter->numOfGroupResultPages == 0);
|
||||
|
||||
if (mergeIntoGroupResult(pQInfo) == TSDB_CODE_SUCCESS) {
|
||||
copyResToQueryResultBuf(pQInfo, pQuery);
|
||||
|
||||
|
@ -5213,11 +5213,12 @@ bool validateExprColumnInfo(SQueryTableMsg *pQueryMsg, SSqlFuncMsg *pExprMsg, SC
|
|||
|
||||
static int32_t validateQueryMsg(SQueryTableMsg *pQueryMsg) {
|
||||
if (pQueryMsg->intervalTime < 0) {
|
||||
qError("qmsg:%p illegal value of aggTimeInterval %" PRId64 "", pQueryMsg, pQueryMsg->intervalTime);
|
||||
qError("qmsg:%p illegal value of interval time %" PRId64 "", pQueryMsg, pQueryMsg->intervalTime);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (pQueryMsg->numOfCols <= 0 || pQueryMsg->numOfCols > TSDB_MAX_COLUMNS) {
|
||||
if (pQueryMsg->numOfCols < 0 || pQueryMsg->numOfTags < 0 || (pQueryMsg->numOfCols + pQueryMsg->numOfTags <= 0) ||
|
||||
pQueryMsg->numOfCols > TSDB_MAX_COLUMNS) {
|
||||
qError("qmsg:%p illegal value of numOfCols %d", pQueryMsg, pQueryMsg->numOfCols);
|
||||
return -1;
|
||||
}
|
||||
|
@ -5513,7 +5514,7 @@ static int32_t createSqlFunctionExprFromMsg(SQueryTableMsg *pQueryMsg, SExprInfo
|
|||
bytes = TSDB_TABLE_NAME_LEN;
|
||||
} else{
|
||||
int32_t j = getColumnIndexInSource(pQueryMsg, &pExprs[i].base, pTagCols);
|
||||
assert(j < pQueryMsg->numOfCols);
|
||||
assert(j < pQueryMsg->numOfCols || j < pQueryMsg->numOfTags);
|
||||
|
||||
SColumnInfo* pCol = (TSDB_COL_IS_TAG(pExprs[i].base.colInfo.flag))? &pTagCols[j]:&pQueryMsg->colList[j];
|
||||
type = pCol->type;
|
||||
|
@ -6148,7 +6149,7 @@ void qTableQuery(qinfo_t qinfo) {
|
|||
qTrace("QInfo:%p query task is launched", pQInfo);
|
||||
|
||||
if (onlyQueryTags(pQInfo->runtimeEnv.pQuery)) {
|
||||
buildTagQueryResult(pQInfo);
|
||||
buildTagQueryResult(pQInfo); // todo support the limit/offset
|
||||
} else if (pQInfo->runtimeEnv.stableQuery) {
|
||||
stableQueryImpl(pQInfo);
|
||||
} else {
|
||||
|
@ -6258,16 +6259,26 @@ static void buildTagQueryResult(SQInfo* pQInfo) {
|
|||
|
||||
for(int32_t i = 0; i < num; ++i) {
|
||||
SExprInfo* pExprInfo = pQuery->pSelectExpr;
|
||||
char* data = NULL;
|
||||
|
||||
SGroupItem* item = taosArrayGet(pa, i);
|
||||
|
||||
char* data = NULL;
|
||||
for(int32_t j = 0; j < pQuery->numOfOutput; ++j) {
|
||||
tsdbGetTableTagVal(pQInfo->tsdb, &item->id, j, &type, &bytes, &data);
|
||||
// todo check the return value
|
||||
if (pExprInfo[j].base.colInfo.colId == TSDB_TBNAME_COLUMN_INDEX) {
|
||||
tsdbGetTableName(pQInfo->tsdb, &item->id, &data);
|
||||
strncpy(pQuery->sdata[j]->data + i * TSDB_TABLE_NAME_LEN, data, TSDB_TABLE_NAME_LEN);
|
||||
tfree(data);
|
||||
|
||||
} else {
|
||||
tsdbGetTableTagVal(pQInfo->tsdb, &item->id, pExprInfo[j].base.colInfo.colId, &type, &bytes, &data);
|
||||
assert(bytes == pExprInfo[j].bytes && type == pExprInfo[j].type);
|
||||
|
||||
memcpy(pQuery->sdata[j]->data + num * bytes, data, bytes);
|
||||
memcpy(pQuery->sdata[j]->data + i * bytes, data, bytes);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
pQuery->rec.rows = num;
|
||||
setQueryStatus(pQuery, QUERY_COMPLETED);
|
||||
}
|
||||
|
||||
|
|
|
@ -239,6 +239,10 @@ int32_t tsdbGetTableTagVal(TsdbRepoT* repo, STableId* id, int32_t colId, int16_t
|
|||
}
|
||||
}
|
||||
|
||||
if (pCol == NULL) {
|
||||
return -1; // No matched tags. Maybe the modification of tags has not been done yet.
|
||||
}
|
||||
|
||||
assert(pCol != NULL);
|
||||
|
||||
SDataRow row = (SDataRow)pTable->tagVal;
|
||||
|
@ -248,10 +252,10 @@ int32_t tsdbGetTableTagVal(TsdbRepoT* repo, STableId* id, int32_t colId, int16_t
|
|||
*type = pCol->type;
|
||||
*bytes = pCol->bytes;
|
||||
|
||||
return 0;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t tsdbTableGetName(TsdbRepoT *repo, STableId* id, char** name) {
|
||||
int32_t tsdbGetTableName(TsdbRepoT *repo, STableId* id, char** name) {
|
||||
STsdbMeta* pMeta = tsdbGetMeta(repo);
|
||||
STable* pTable = tsdbGetTableByUid(pMeta, id->uid);
|
||||
|
||||
|
|
|
@ -114,7 +114,6 @@ typedef struct STsdbQueryHandle {
|
|||
|
||||
SFileGroup* pFileGroup;
|
||||
SFileGroupIter fileIter;
|
||||
SCompIdx* compIndex;
|
||||
SRWHelper rhelper;
|
||||
} STsdbQueryHandle;
|
||||
|
||||
|
@ -138,7 +137,6 @@ TsdbQueryHandleT* tsdbQueryTables(TsdbRepoT* tsdb, STsdbQueryCond* pCond, STable
|
|||
pQueryHandle->order = pCond->order;
|
||||
pQueryHandle->window = pCond->twindow;
|
||||
pQueryHandle->pTsdb = tsdb;
|
||||
pQueryHandle->compIndex = calloc(10000, sizeof(SCompIdx));
|
||||
tsdbInitReadHelper(&pQueryHandle->rhelper, (STsdbRepo*) tsdb);
|
||||
|
||||
pQueryHandle->cur.fid = -1;
|
||||
|
@ -163,7 +161,7 @@ TsdbQueryHandleT* tsdbQueryTables(TsdbRepoT* tsdb, STsdbQueryCond* pCond, STable
|
|||
.pTableObj = tsdbGetTableByUid(tsdbGetMeta(tsdb), id->uid),
|
||||
};
|
||||
|
||||
assert(info.pTableObj != NULL);
|
||||
assert(info.pTableObj != NULL && info.pTableObj->tableId.tid == id->tid);
|
||||
taosArrayPush(pQueryHandle->pTableCheckInfo, &info);
|
||||
}
|
||||
}
|
||||
|
@ -288,8 +286,6 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo
|
|||
tsdbSetAndOpenHelperFile(&pQueryHandle->rhelper, fileGroup);
|
||||
|
||||
// load all the comp offset value for all tables in this file
|
||||
// tsdbLoadCompIdx(fileGroup, pQueryHandle->compIndex, 10000); // todo set dynamic max tables
|
||||
|
||||
*numOfBlocks = 0;
|
||||
size_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
|
||||
|
||||
|
@ -1117,16 +1113,13 @@ TsdbQueryHandleT* tsdbQueryFromTagConds(STsdbQueryCond* pCond, int16_t stableId,
|
|||
|
||||
SArray* tsdbGetTableList(TsdbQueryHandleT* pQueryHandle) { return NULL; }
|
||||
|
||||
static int32_t getAllTableIdList(STsdbRepo* tsdb, int64_t uid, SArray* list) {
|
||||
STable* pTable = tsdbGetTableByUid(tsdbGetMeta(tsdb), uid);
|
||||
assert(pTable != NULL); // assert pTable is a super table
|
||||
|
||||
SSkipListIterator* iter = tSkipListCreateIter(pTable->pIndex);
|
||||
static int32_t getAllTableIdList(STable* pSuperTable, SArray* list) {
|
||||
SSkipListIterator* iter = tSkipListCreateIter(pSuperTable->pIndex);
|
||||
while (tSkipListIterNext(iter)) {
|
||||
SSkipListNode* pNode = tSkipListIterGet(iter);
|
||||
|
||||
STable* t = *(STable**)SL_GET_NODE_DATA(pNode);
|
||||
taosArrayPush(list, &t);
|
||||
taosArrayPush(list, &t->tableId);
|
||||
}
|
||||
|
||||
tSkipListDestroyIter(iter);
|
||||
|
@ -1151,7 +1144,7 @@ static void convertQueryResult(SArray* pRes, SArray* pTableList) {
|
|||
size_t size = taosArrayGetSize(pTableList);
|
||||
for (int32_t i = 0; i < size; ++i) {
|
||||
STable* pTable = taosArrayGetP(pTableList, i);
|
||||
taosArrayPush(pRes, &pTable);
|
||||
taosArrayPush(pRes, &pTable->tableId);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1160,27 +1153,31 @@ static void destroyHelper(void* param) {
|
|||
return;
|
||||
}
|
||||
|
||||
|
||||
tQueryInfo* pInfo = (tQueryInfo*)param;
|
||||
tVariantDestroy(&(pInfo->q));
|
||||
if (pInfo->optr != TSDB_RELATION_IN) {
|
||||
tfree(pInfo->q);
|
||||
}
|
||||
|
||||
// tVariantDestroy(&(pInfo->q));
|
||||
free(param);
|
||||
}
|
||||
|
||||
static void getTagColumnInfo(SExprTreeSupporter* pSupporter, SSchema* pSchema, int32_t* index) {
|
||||
*index = 0;
|
||||
|
||||
static int32_t getTagColumnInfo(SExprTreeSupporter* pSupporter, SSchema* pSchema) {
|
||||
// filter on table name(TBNAME)
|
||||
if (strcasecmp(pSchema->name, TSQL_TBNAME_L) == 0) {
|
||||
*index = TSDB_TBNAME_COLUMN_INDEX;
|
||||
return;
|
||||
return TSDB_TBNAME_COLUMN_INDEX;
|
||||
}
|
||||
|
||||
while ((*index) < pSupporter->numOfTags) {
|
||||
if (pSupporter->pTagSchema[*index].bytes == pSchema->bytes &&
|
||||
pSupporter->pTagSchema[*index].type == pSchema->type &&
|
||||
pSupporter->pTagSchema[*index].colId == pSchema->colId) {
|
||||
break;
|
||||
for(int32_t i = 0; i < pSupporter->numOfTags; ++i) {
|
||||
if (pSupporter->pTagSchema[i].bytes == pSchema->bytes &&
|
||||
pSupporter->pTagSchema[i].type == pSchema->type &&
|
||||
pSupporter->pTagSchema[i].colId == pSchema->colId) {
|
||||
return i;
|
||||
}
|
||||
}
|
||||
|
||||
return -2;
|
||||
}
|
||||
|
||||
void filterPrepare(void* expr, void* param) {
|
||||
|
@ -1189,28 +1186,29 @@ void filterPrepare(void* expr, void* param) {
|
|||
return;
|
||||
}
|
||||
|
||||
int32_t i = 0, offset = 0;
|
||||
int32_t i = 0;
|
||||
pExpr->_node.info = calloc(1, sizeof(tQueryInfo));
|
||||
|
||||
tQueryInfo* pInfo = pExpr->_node.info;
|
||||
|
||||
SExprTreeSupporter* pSupporter = (SExprTreeSupporter*)param;
|
||||
|
||||
tQueryInfo* pInfo = pExpr->_node.info;
|
||||
tVariant* pCond = pExpr->_node.pRight->pVal;
|
||||
SSchema* pSchema = pExpr->_node.pLeft->pSchema;
|
||||
|
||||
getTagColumnInfo(pSupporter, pSchema, &i);
|
||||
assert((i >= 0 && i < TSDB_MAX_TAGS) || (i == TSDB_TBNAME_COLUMN_INDEX));
|
||||
assert((offset >= 0 && offset < TSDB_MAX_TAGS_LEN) || (offset == TSDB_TBNAME_COLUMN_INDEX));
|
||||
// todo : if current super table does not change schema yet, this function may failed, add test case
|
||||
int32_t index = getTagColumnInfo(pSupporter, pSchema);
|
||||
assert((index >= 0 && i < TSDB_MAX_TAGS) || (index == TSDB_TBNAME_COLUMN_INDEX));
|
||||
|
||||
pInfo->sch = *pSchema;
|
||||
pInfo->colIndex = i;
|
||||
pInfo->colIndex = index;
|
||||
pInfo->optr = pExpr->_node.optr;
|
||||
pInfo->compare = getComparFunc(pSchema->type, pCond->nType, pInfo->optr);
|
||||
pInfo->compare = getComparFunc(pSchema->type, pInfo->optr);
|
||||
|
||||
tVariantAssign(&pInfo->q, pCond);
|
||||
if (pInfo->optr != TSDB_RELATION_IN) {
|
||||
tVariantTypeSetType(&pInfo->q, pInfo->sch.type);
|
||||
if (pInfo->optr == TSDB_RELATION_IN) {
|
||||
pInfo->q = (char*) pCond->arr;
|
||||
} else {
|
||||
pInfo->q = calloc(1, pSchema->bytes);
|
||||
tVariantDump(pCond, pInfo->q, pSchema->type);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1245,13 +1243,16 @@ typedef struct STableGroupSupporter {
|
|||
int32_t numOfCols;
|
||||
SColIndex* pCols;
|
||||
STSchema* pTagSchema;
|
||||
void* tsdbMeta;
|
||||
} STableGroupSupporter;
|
||||
|
||||
int32_t tableGroupComparFn(const void *p1, const void *p2, const void *param) {
|
||||
STableGroupSupporter* pTableGroupSupp = (STableGroupSupporter*) param;
|
||||
STableId* id1 = (STableId*) p1;
|
||||
STableId* id2 = (STableId*) p2;
|
||||
|
||||
STable *pTable1 = *(STable **) p1;
|
||||
STable *pTable2 = *(STable **) p2;
|
||||
STable *pTable1 = tsdbGetTableByUid(pTableGroupSupp->tsdbMeta, id1->uid);
|
||||
STable *pTable2 = tsdbGetTableByUid(pTableGroupSupp->tsdbMeta, id2->uid);
|
||||
|
||||
for (int32_t i = 0; i < pTableGroupSupp->numOfCols; ++i) {
|
||||
SColIndex* pColIndex = &pTableGroupSupp->pCols[i];
|
||||
|
@ -1288,29 +1289,36 @@ int32_t tableGroupComparFn(const void *p1, const void *p2, const void *param) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
void createTableGroupImpl(SArray* pGroups, STable** pTables, size_t numOfTables, STableGroupSupporter* pSupp, __ext_compar_fn_t compareFn) {
|
||||
void createTableGroupImpl(SArray* pGroups, SArray* pTableIdList, size_t numOfTables, STableGroupSupporter* pSupp,
|
||||
__ext_compar_fn_t compareFn) {
|
||||
STableId* pId = taosArrayGet(pTableIdList, 0);
|
||||
|
||||
SArray* g = taosArrayInit(16, sizeof(STableId));
|
||||
taosArrayPush(g, &pTables[0]->tableId);
|
||||
taosArrayPush(g, pId);
|
||||
|
||||
for (int32_t i = 1; i < numOfTables; ++i) {
|
||||
int32_t ret = compareFn(&pTables[i - 1], &pTables[i], pSupp);
|
||||
STableId* prev = taosArrayGet(pTableIdList, i - 1);
|
||||
STableId* p = taosArrayGet(pTableIdList, i);
|
||||
|
||||
int32_t ret = compareFn(prev, p, pSupp);
|
||||
assert(ret == 0 || ret == -1);
|
||||
|
||||
if (ret == 0) {
|
||||
taosArrayPush(g, &pTables[i]->tableId);
|
||||
taosArrayPush(g, p);
|
||||
} else {
|
||||
taosArrayPush(pGroups, &g); // current group is ended, start a new group
|
||||
g = taosArrayInit(16, POINTER_BYTES);
|
||||
taosArrayPush(g, &pTables[i]->tableId);
|
||||
g = taosArrayInit(16, sizeof(STableId));
|
||||
|
||||
taosArrayPush(g, p);
|
||||
}
|
||||
}
|
||||
|
||||
taosArrayPush(pGroups, &g);
|
||||
}
|
||||
|
||||
SArray* createTableGroup(SArray* pTableList, STSchema* pTagSchema, SColIndex* pCols, int32_t numOfOrderCols) {
|
||||
SArray* createTableGroup(SArray* pTableList, STSchema* pTagSchema, SColIndex* pCols, int32_t numOfOrderCols,
|
||||
TsdbRepoT* tsdb) {
|
||||
assert(pTableList != NULL);
|
||||
|
||||
SArray* pTableGroup = taosArrayInit(1, POINTER_BYTES);
|
||||
|
||||
size_t size = taosArrayGetSize(pTableList);
|
||||
|
@ -1322,20 +1330,21 @@ SArray* createTableGroup(SArray* pTableList, STSchema* pTagSchema, SColIndex* pC
|
|||
if (numOfOrderCols == 0 || size == 1) { // no group by tags clause or only one table
|
||||
SArray* sa = taosArrayInit(size, sizeof(STableId));
|
||||
for(int32_t i = 0; i < size; ++i) {
|
||||
STable* pTable = taosArrayGetP(pTableList, i);
|
||||
taosArrayPush(sa, &pTable->tableId);
|
||||
STableId* tableId = taosArrayGet(pTableList, i);
|
||||
taosArrayPush(sa, tableId);
|
||||
}
|
||||
|
||||
taosArrayPush(pTableGroup, &sa);
|
||||
uTrace("all %d tables belong to one group", size);
|
||||
} else {
|
||||
STableGroupSupporter *pSupp = (STableGroupSupporter *) calloc(1, sizeof(STableGroupSupporter));
|
||||
pSupp->tsdbMeta = tsdbGetMeta(tsdb);
|
||||
pSupp->numOfCols = numOfOrderCols;
|
||||
pSupp->pTagSchema = pTagSchema;
|
||||
pSupp->pCols = pCols;
|
||||
|
||||
taosqsort(pTableList->pData, size, POINTER_BYTES, pSupp, tableGroupComparFn);
|
||||
createTableGroupImpl(pTableGroup, pTableList->pData, size, pSupp, tableGroupComparFn);
|
||||
taosqsort(pTableList->pData, size, sizeof(STableId), pSupp, tableGroupComparFn);
|
||||
createTableGroupImpl(pTableGroup, pTableList, size, pSupp, tableGroupComparFn);
|
||||
tfree(pSupp);
|
||||
}
|
||||
|
||||
|
@ -1360,14 +1369,14 @@ bool tSkipListNodeFilterCallback(const void* pNode, void* param) {
|
|||
int32_t ret = 0;
|
||||
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
|
||||
if (pInfo->optr == TSDB_RELATION_IN) {
|
||||
ret = pInfo->compare(val, pInfo->q.arr);
|
||||
ret = pInfo->compare(val, pInfo->q);
|
||||
} else {
|
||||
ret = pInfo->compare(val, pInfo->q.pz);
|
||||
ret = pInfo->compare(val, pInfo->q);
|
||||
}
|
||||
} else {
|
||||
tVariant t = {0};
|
||||
tVariantCreateFromBinary(&t, val, (uint32_t)pInfo->sch.bytes, type);
|
||||
ret = pInfo->compare(&t.i64Key, &pInfo->q.i64Key);
|
||||
// tVariant t = {0};
|
||||
// tVariantCreateFromBinary(&t, val, (uint32_t)pInfo->sch.bytes, type);
|
||||
ret = pInfo->compare(val, pInfo->q);
|
||||
}
|
||||
|
||||
switch (pInfo->optr) {
|
||||
|
@ -1399,6 +1408,7 @@ bool tSkipListNodeFilterCallback(const void* pNode, void* param) {
|
|||
default:
|
||||
assert(false);
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -1433,21 +1443,21 @@ static int32_t doQueryTableList(STable* pSTable, SArray* pRes, tExprNode* pExpr)
|
|||
|
||||
int32_t tsdbQuerySTableByTagCond(TsdbRepoT *tsdb, int64_t uid, const char *pTagCond, size_t len, int16_t tagNameRelType,
|
||||
const char* tbnameCond, STableGroupInfo *pGroupInfo, SColIndex *pColIndex, int32_t numOfCols) {
|
||||
STable* pSTable = tsdbGetTableByUid(tsdbGetMeta(tsdb), uid);
|
||||
if (pSTable == NULL) {
|
||||
STable* pTable = tsdbGetTableByUid(tsdbGetMeta(tsdb), uid);
|
||||
if (pTable == NULL) {
|
||||
uError("failed to get stable, uid:%" PRIu64, uid);
|
||||
return TSDB_CODE_INVALID_TABLE_ID;
|
||||
}
|
||||
|
||||
SArray* res = taosArrayInit(8, sizeof(STableId));
|
||||
STSchema* pTagSchema = tsdbGetTableTagSchema(tsdbGetMeta(tsdb), pSTable);
|
||||
STSchema* pTagSchema = tsdbGetTableTagSchema(tsdbGetMeta(tsdb), pTable);
|
||||
|
||||
// no tags and tbname condition, all child tables of this stable are involved
|
||||
if (tbnameCond == NULL && (pTagCond == NULL || len == 0)) {
|
||||
int32_t ret = getAllTableIdList(tsdb, uid, res);
|
||||
int32_t ret = getAllTableIdList(pTable, res);
|
||||
if (ret == TSDB_CODE_SUCCESS) {
|
||||
pGroupInfo->numOfTables = taosArrayGetSize(res);
|
||||
pGroupInfo->pGroupList = createTableGroup(res, pTagSchema, pColIndex, numOfCols);
|
||||
pGroupInfo->pGroupList = createTableGroup(res, pTagSchema, pColIndex, numOfCols, tsdb);
|
||||
}
|
||||
taosArrayDestroy(res);
|
||||
return ret;
|
||||
|
@ -1470,9 +1480,9 @@ int32_t tsdbQuerySTableByTagCond(TsdbRepoT *tsdb, int64_t uid, const char *pTagC
|
|||
}
|
||||
}
|
||||
|
||||
doQueryTableList(pSTable, res, expr);
|
||||
doQueryTableList(pTable, res, expr);
|
||||
pGroupInfo->numOfTables = taosArrayGetSize(res);
|
||||
pGroupInfo->pGroupList = createTableGroup(res, pTagSchema, pColIndex, numOfCols);
|
||||
pGroupInfo->pGroupList = createTableGroup(res, pTagSchema, pColIndex, numOfCols, tsdb);
|
||||
|
||||
taosArrayDestroy(res);
|
||||
return ret;
|
||||
|
@ -1512,12 +1522,10 @@ void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle) {
|
|||
}
|
||||
|
||||
tfree(pTableCheckInfo->pDataCols);
|
||||
|
||||
tfree(pTableCheckInfo->pCompInfo);
|
||||
}
|
||||
|
||||
taosArrayDestroy(pQueryHandle->pTableCheckInfo);
|
||||
tfree(pQueryHandle->compIndex);
|
||||
|
||||
size_t cols = taosArrayGetSize(pQueryHandle->pColumns);
|
||||
for (int32_t i = 0; i < cols; ++i) {
|
||||
|
|
|
@ -40,7 +40,7 @@ int WCSPatternMatch(const wchar_t *zPattern, const wchar_t *zString, size_t size
|
|||
|
||||
__compar_fn_t getKeyComparFunc(int32_t keyType);
|
||||
|
||||
__compar_fn_t getComparFunc(int32_t type, int32_t filterDataType, int32_t optr);
|
||||
__compar_fn_t getComparFunc(int32_t type, int32_t optr);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -232,55 +232,42 @@ static UNUSED_FUNC int32_t compareWStrPatternComp(const void* pLeft, const void*
|
|||
}
|
||||
|
||||
// todo promote the type definition before the comparsion
|
||||
__compar_fn_t getComparFunc(int32_t type, int32_t filterDataType, int32_t optr) {
|
||||
__compar_fn_t getComparFunc(int32_t type, int32_t optr) {
|
||||
__compar_fn_t comparFn = NULL;
|
||||
|
||||
switch (type) {
|
||||
case TSDB_DATA_TYPE_TINYINT:
|
||||
case TSDB_DATA_TYPE_SMALLINT:
|
||||
case TSDB_DATA_TYPE_INT:
|
||||
case TSDB_DATA_TYPE_SMALLINT: {
|
||||
comparFn = compareInt16Val; break;
|
||||
}
|
||||
|
||||
case TSDB_DATA_TYPE_INT: {
|
||||
comparFn = compareInt32Val; break;
|
||||
}
|
||||
|
||||
case TSDB_DATA_TYPE_BIGINT:
|
||||
case TSDB_DATA_TYPE_TIMESTAMP: {
|
||||
// assert(type == filterDataType);
|
||||
if (filterDataType == TSDB_DATA_TYPE_BIGINT || filterDataType == TSDB_DATA_TYPE_TIMESTAMP) {
|
||||
comparFn = compareInt64Val;
|
||||
} else if (filterDataType >= TSDB_DATA_TYPE_FLOAT && filterDataType <= TSDB_DATA_TYPE_DOUBLE) {
|
||||
comparFn = compareIntDoubleVal;
|
||||
comparFn = compareInt64Val; break;
|
||||
}
|
||||
|
||||
break;
|
||||
case TSDB_DATA_TYPE_BOOL:
|
||||
case TSDB_DATA_TYPE_TINYINT:{
|
||||
comparFn = compareInt8Val; break;
|
||||
}
|
||||
|
||||
case TSDB_DATA_TYPE_BOOL: {
|
||||
if (filterDataType >= TSDB_DATA_TYPE_BOOL && filterDataType <= TSDB_DATA_TYPE_BIGINT) {
|
||||
comparFn = compareInt32Val;
|
||||
} else if (filterDataType >= TSDB_DATA_TYPE_FLOAT && filterDataType <= TSDB_DATA_TYPE_DOUBLE) {
|
||||
comparFn = compareIntDoubleVal;
|
||||
}
|
||||
break;
|
||||
case TSDB_DATA_TYPE_FLOAT: {
|
||||
comparFn = compareDoubleVal; break;
|
||||
}
|
||||
|
||||
case TSDB_DATA_TYPE_FLOAT:
|
||||
case TSDB_DATA_TYPE_DOUBLE: {
|
||||
if (filterDataType >= TSDB_DATA_TYPE_BOOL && filterDataType <= TSDB_DATA_TYPE_BIGINT) {
|
||||
comparFn = compareDoubleIntVal;
|
||||
} else if (filterDataType >= TSDB_DATA_TYPE_FLOAT && filterDataType <= TSDB_DATA_TYPE_DOUBLE) {
|
||||
comparFn = compareDoubleVal;
|
||||
}
|
||||
break;
|
||||
comparFn = compareDoubleVal; break;
|
||||
}
|
||||
|
||||
case TSDB_DATA_TYPE_BINARY: {
|
||||
if (optr == TSDB_RELATION_LIKE) { /* wildcard query using like operator */
|
||||
assert(filterDataType == TSDB_DATA_TYPE_BINARY);
|
||||
comparFn = compareStrPatternComp;
|
||||
|
||||
} else if (optr == TSDB_RELATION_IN) {
|
||||
assert(filterDataType == TSDB_DATA_TYPE_ARRAY);
|
||||
comparFn = compareFindStrInArray;
|
||||
|
||||
} else { /* normal relational comparFn */
|
||||
assert(filterDataType == TSDB_DATA_TYPE_BINARY);
|
||||
comparFn = compareStrVal;
|
||||
}
|
||||
|
||||
|
@ -288,8 +275,6 @@ __compar_fn_t getComparFunc(int32_t type, int32_t filterDataType, int32_t optr)
|
|||
}
|
||||
|
||||
case TSDB_DATA_TYPE_NCHAR: {
|
||||
assert(filterDataType == TSDB_DATA_TYPE_NCHAR);
|
||||
|
||||
if (optr == TSDB_RELATION_LIKE) {
|
||||
comparFn = compareWStrPatternComp;
|
||||
} else {
|
||||
|
|
|
@ -325,7 +325,7 @@ SArray* tSkipListGet(SSkipList *pSkipList, SSkipListKey pKey, int16_t keyType) {
|
|||
pSkipList->state.queryCount++;
|
||||
#endif
|
||||
|
||||
__compar_fn_t filterComparFn = getComparFunc(pSkipList->keyInfo.type, keyType, 0);
|
||||
__compar_fn_t filterComparFn = getComparFunc(pSkipList->keyInfo.type, 0);
|
||||
int32_t ret = -1;
|
||||
for (int32_t i = sLevel; i >= 0; --i) {
|
||||
SSkipListNode *p = SL_GET_FORWARD_POINTER(pNode, i);
|
||||
|
@ -389,7 +389,7 @@ SSkipListIterator *tSkipListCreateIterFromVal(SSkipList* pSkipList, const char*
|
|||
SSkipListNode *forward[MAX_SKIP_LIST_LEVEL] = {0};
|
||||
|
||||
int32_t ret = -1;
|
||||
__compar_fn_t filterComparFn = getComparFunc(pSkipList->keyInfo.type, type, 0);
|
||||
__compar_fn_t filterComparFn = getKeyComparFunc(pSkipList->keyInfo.type);
|
||||
SSkipListNode* pNode = pSkipList->pHead;
|
||||
|
||||
for (int32_t i = pSkipList->level - 1; i >= 0; --i) {
|
||||
|
|
Loading…
Reference in New Issue