[td-1151]
This commit is contained in:
parent
ff20d564ab
commit
39618becb7
|
@ -138,8 +138,8 @@ bool tscIsProjectionQuery(SQueryInfo* pQueryInfo);
|
|||
bool tscIsTwoStageSTableQuery(SQueryInfo* pQueryInfo, int32_t tableIndex);
|
||||
bool tscQueryTags(SQueryInfo* pQueryInfo);
|
||||
|
||||
void tscAddSpecialColumnForSelect(SQueryInfo* pQueryInfo, int32_t outputColIndex, int16_t functionId, SColumnIndex* pIndex,
|
||||
SSchema* pColSchema, int16_t colType);
|
||||
SSqlExpr* tscAddSpecialColumnForSelect(SQueryInfo* pQueryInfo, int32_t outputColIndex, int16_t functionId,
|
||||
SColumnIndex* pIndex, SSchema* pColSchema, int16_t colType);
|
||||
|
||||
int32_t tscSetTableFullName(STableMetaInfo* pTableMetaInfo, SSQLToken* pzTableName, SSqlObj* pSql);
|
||||
void tscClearInterpInfo(SQueryInfo* pQueryInfo);
|
||||
|
|
|
@ -2906,15 +2906,27 @@ static FORCE_INLINE void date_col_output_function_f(SQLFunctionCtx *pCtx, int32_
|
|||
}
|
||||
|
||||
static void col_project_function(SQLFunctionCtx *pCtx) {
|
||||
INC_INIT_VAL(pCtx, pCtx->size);
|
||||
if (pCtx->numOfParams == 1) { // the number of output rows should not affect the final number of rows, so set it to be 1
|
||||
INC_INIT_VAL(pCtx, 1);
|
||||
|
||||
char *pData = GET_INPUT_CHAR(pCtx);
|
||||
if (pCtx->order == TSDB_ORDER_ASC) {
|
||||
memcpy(pCtx->aOutputBuf, pData, (size_t)pCtx->size * pCtx->inputBytes);
|
||||
} else {
|
||||
char* output = pCtx->aOutputBuf;
|
||||
for(int32_t i = 0; i < pCtx->size; ++i) {
|
||||
memcpy(pCtx->aOutputBuf + (pCtx->size - 1 - i) * pCtx->inputBytes, pData + i * pCtx->inputBytes,
|
||||
pCtx->inputBytes);
|
||||
tVariantDump(&pCtx->param[0], output, pCtx->outputType, true);
|
||||
output += pCtx->outputBytes;
|
||||
}
|
||||
|
||||
} else {
|
||||
|
||||
INC_INIT_VAL(pCtx, pCtx->size);
|
||||
|
||||
char *pData = GET_INPUT_CHAR(pCtx);
|
||||
if (pCtx->order == TSDB_ORDER_ASC) {
|
||||
memcpy(pCtx->aOutputBuf, pData, (size_t) pCtx->size * pCtx->inputBytes);
|
||||
} else {
|
||||
for(int32_t i = 0; i < pCtx->size; ++i) {
|
||||
memcpy(pCtx->aOutputBuf + (pCtx->size - 1 - i) * pCtx->inputBytes, pData + i * pCtx->inputBytes,
|
||||
pCtx->inputBytes);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -33,8 +33,8 @@
|
|||
|
||||
#define DEFAULT_PRIMARY_TIMESTAMP_COL_NAME "_c0"
|
||||
|
||||
// -1 is tbname column index, so here use the -2 as the initial value
|
||||
#define COLUMN_INDEX_INITIAL_VAL (-2)
|
||||
// -1 is tbname column index, so here use the -3 as the initial value
|
||||
#define COLUMN_INDEX_INITIAL_VAL (-3)
|
||||
#define COLUMN_INDEX_INITIALIZER \
|
||||
{ COLUMN_INDEX_INITIAL_VAL, COLUMN_INDEX_INITIAL_VAL }
|
||||
#define COLUMN_INDEX_VALIDE(index) (((index).tableIndex >= 0) && ((index).columnIndex >= TSDB_TBNAME_COLUMN_INDEX))
|
||||
|
@ -1248,7 +1248,9 @@ int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSel
|
|||
tSQLExprItem* pItem = &pSelection->a[i];
|
||||
|
||||
// project on all fields
|
||||
if (pItem->pNode->nSQLOptr == TK_ALL || pItem->pNode->nSQLOptr == TK_ID || pItem->pNode->nSQLOptr == TK_STRING) {
|
||||
int32_t optr = pItem->pNode->nSQLOptr;
|
||||
|
||||
if (optr == TK_ALL || optr == TK_ID || optr == TK_STRING || (optr == TK_INTEGER || optr == TK_FLOAT)) {
|
||||
// it is actually a function, but the function name is invalid
|
||||
if (pItem->pNode->nSQLOptr == TK_ID && (pItem->pNode->colInfo.z == NULL && pItem->pNode->colInfo.n == 0)) {
|
||||
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg5);
|
||||
|
@ -1256,7 +1258,6 @@ int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSel
|
|||
|
||||
// if the name of column is quoted, remove it and set the right information for later process
|
||||
extractColumnNameFromString(pItem);
|
||||
TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_PROJECTION_QUERY);
|
||||
|
||||
// select table_name1.field_name1, table_name2.field_name2 from table_name1, table_name2
|
||||
if (addProjectionExprAndResultField(pCmd, pQueryInfo, pItem) != TSDB_CODE_SUCCESS) {
|
||||
|
@ -1372,10 +1373,10 @@ static void addProjectQueryCol(SQueryInfo* pQueryInfo, int32_t startPos, SColumn
|
|||
insertResultField(pQueryInfo, startPos, &ids, pExpr->resBytes, (int8_t)pExpr->resType, pExpr->aliasName, pExpr);
|
||||
}
|
||||
|
||||
void tscAddSpecialColumnForSelect(SQueryInfo* pQueryInfo, int32_t outputColIndex, int16_t functionId,
|
||||
SSqlExpr* tscAddSpecialColumnForSelect(SQueryInfo* pQueryInfo, int32_t outputColIndex, int16_t functionId,
|
||||
SColumnIndex* pIndex, SSchema* pColSchema, int16_t flag) {
|
||||
SSqlExpr* pExpr = tscSqlExprInsert(pQueryInfo, outputColIndex, functionId, pIndex, pColSchema->type,
|
||||
pColSchema->bytes, pColSchema->bytes, flag);
|
||||
pColSchema->bytes, pColSchema->bytes, TSDB_COL_IS_TAG(flag));
|
||||
tstrncpy(pExpr->aliasName, pColSchema->name, sizeof(pExpr->aliasName));
|
||||
|
||||
SColumnList ids = getColumnList(1, pIndex->tableIndex, pIndex->columnIndex);
|
||||
|
@ -1391,6 +1392,8 @@ void tscAddSpecialColumnForSelect(SQueryInfo* pQueryInfo, int32_t outputColIndex
|
|||
if (TSDB_COL_IS_TAG(flag)) {
|
||||
tscColumnListInsert(pTableMetaInfo->tagColList, pIndex);
|
||||
}
|
||||
|
||||
return pExpr;
|
||||
}
|
||||
|
||||
static int32_t doAddProjectionExprAndResultFields(SQueryInfo* pQueryInfo, SColumnIndex* pIndex, int32_t startPos) {
|
||||
|
@ -1434,7 +1437,11 @@ int32_t addProjectionExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, t
|
|||
|
||||
int32_t startPos = (int32_t)tscSqlExprNumOfExprs(pQueryInfo);
|
||||
|
||||
if (pItem->pNode->nSQLOptr == TK_ALL) { // project on all fields
|
||||
int32_t optr = pItem->pNode->nSQLOptr;
|
||||
|
||||
if (optr == TK_ALL) { // project on all fields
|
||||
TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_PROJECTION_QUERY);
|
||||
|
||||
SColumnIndex index = COLUMN_INDEX_INITIALIZER;
|
||||
if (getTableIndexByName(&pItem->pNode->colInfo, pQueryInfo, &index) != TSDB_CODE_SUCCESS) {
|
||||
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg0);
|
||||
|
@ -1450,28 +1457,43 @@ int32_t addProjectionExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, t
|
|||
} else {
|
||||
doAddProjectionExprAndResultFields(pQueryInfo, &index, startPos);
|
||||
}
|
||||
} else if (pItem->pNode->nSQLOptr == TK_ID) { // simple column projection query
|
||||
|
||||
// add the primary timestamp column even though it is not required by user
|
||||
tscInsertPrimaryTSSourceColumn(pQueryInfo, &index);
|
||||
} else if (optr == TK_ID || optr == TK_INTEGER || optr == TK_FLOAT) { // simple column projection query
|
||||
SColumnIndex index = COLUMN_INDEX_INITIALIZER;
|
||||
|
||||
if (getColumnIndexByName(pCmd, &pItem->pNode->colInfo, pQueryInfo, &index) != TSDB_CODE_SUCCESS) {
|
||||
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg0);
|
||||
}
|
||||
// user-specified constant value as a new result column
|
||||
if ((optr == TK_INTEGER || optr == TK_FLOAT) || (getColumnIndexByName(pCmd, &pItem->pNode->colInfo, pQueryInfo, &index) != TSDB_CODE_SUCCESS)) {
|
||||
index.columnIndex = TSDB_UD_COLUMN_INDEX;
|
||||
index.tableIndex = 0;
|
||||
|
||||
if (index.columnIndex == TSDB_TBNAME_COLUMN_INDEX) {
|
||||
SSchema colSchema = tGetTableNameColumnSchema();
|
||||
tscAddSpecialColumnForSelect(pQueryInfo, startPos, TSDB_FUNC_TAGPRJ, &index, &colSchema, TSDB_COL_TAG);
|
||||
} else {
|
||||
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex);
|
||||
STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
|
||||
SSchema colSchema = tGetUserSpecifiedColumnSchema(pItem->pNode->val.pz, pItem->pNode->val.nType, pItem->aliasName);
|
||||
SSqlExpr* pExpr = tscAddSpecialColumnForSelect(pQueryInfo, startPos, TSDB_FUNC_PRJ, &index, &colSchema, TSDB_COL_UDC);
|
||||
pExpr->numOfParams = 1;
|
||||
tVariantAssign(&pExpr->param[0], &pItem->pNode->val);
|
||||
|
||||
if (index.columnIndex >= tscGetNumOfColumns(pTableMeta) && UTIL_TABLE_IS_NORMAL_TABLE(pTableMetaInfo)) {
|
||||
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
|
||||
} else { // columns from the queried table
|
||||
TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_PROJECTION_QUERY);
|
||||
|
||||
if (index.columnIndex == TSDB_TBNAME_COLUMN_INDEX) {
|
||||
SSchema colSchema = tGetTableNameColumnSchema();
|
||||
tscAddSpecialColumnForSelect(pQueryInfo, startPos, TSDB_FUNC_TAGPRJ, &index, &colSchema, TSDB_COL_TAG);
|
||||
} else {
|
||||
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex);
|
||||
STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
|
||||
|
||||
if (index.columnIndex >= tscGetNumOfColumns(pTableMeta) && UTIL_TABLE_IS_NORMAL_TABLE(pTableMetaInfo)) {
|
||||
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
|
||||
}
|
||||
|
||||
addProjectQueryCol(pQueryInfo, startPos, &index, pItem);
|
||||
}
|
||||
|
||||
addProjectQueryCol(pQueryInfo, startPos, &index, pItem);
|
||||
// add the primary timestamp column even though it is not required by user
|
||||
tscInsertPrimaryTSSourceColumn(pQueryInfo, &index);
|
||||
}
|
||||
|
||||
tscInsertPrimaryTSSourceColumn(pQueryInfo, &index);
|
||||
} else {
|
||||
return TSDB_CODE_TSC_INVALID_SQL;
|
||||
}
|
||||
|
@ -2037,7 +2059,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
|
|||
|
||||
// todo refactor
|
||||
static SColumnList getColumnList(int32_t num, int16_t tableIndex, int32_t columnIndex) {
|
||||
assert(num == 1 && columnIndex >= -1 && tableIndex >= 0);
|
||||
assert(num == 1 && columnIndex >= -2 && tableIndex >= 0);
|
||||
|
||||
SColumnList columnList = {0};
|
||||
columnList.num = num;
|
||||
|
|
|
@ -622,7 +622,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
|||
|
||||
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
|
||||
tscError("%p failed to malloc for query msg", pSql);
|
||||
return -1; // todo add test for this
|
||||
return TSDB_CODE_TSC_INVALID_SQL; // todo add test for this
|
||||
}
|
||||
|
||||
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
|
||||
|
@ -631,17 +631,17 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
|||
|
||||
if (taosArrayGetSize(pQueryInfo->colList) <= 0 && !tscQueryTags(pQueryInfo)) {
|
||||
tscError("%p illegal value of numOfCols in query msg: %d", pSql, tscGetNumOfColumns(pTableMeta));
|
||||
return -1;
|
||||
return TSDB_CODE_TSC_INVALID_SQL;
|
||||
}
|
||||
|
||||
if (pQueryInfo->intervalTime < 0) {
|
||||
tscError("%p illegal value of aggregation time interval in query msg: %ld", pSql, pQueryInfo->intervalTime);
|
||||
return -1;
|
||||
return TSDB_CODE_TSC_INVALID_SQL;
|
||||
}
|
||||
|
||||
if (pQueryInfo->groupbyExpr.numOfGroupCols < 0) {
|
||||
tscError("%p illegal value of numOfGroupCols in query msg: %d", pSql, pQueryInfo->groupbyExpr.numOfGroupCols);
|
||||
return -1;
|
||||
return TSDB_CODE_TSC_INVALID_SQL;
|
||||
}
|
||||
|
||||
SQueryTableMsg *pQueryMsg = (SQueryTableMsg *)pCmd->payload;
|
||||
|
@ -719,7 +719,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
|||
|
||||
if (pColFilter->lowerRelOptr == TSDB_RELATION_INVALID && pColFilter->upperRelOptr == TSDB_RELATION_INVALID) {
|
||||
tscError("invalid filter info");
|
||||
return -1;
|
||||
return TSDB_CODE_TSC_INVALID_SQL;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -731,7 +731,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
|||
if (!tscValidateColumnId(pTableMetaInfo, pExpr->colInfo.colId)) {
|
||||
/* column id is not valid according to the cached table meta, the table meta is expired */
|
||||
tscError("%p table schema is not matched with parsed sql", pSql);
|
||||
return -1;
|
||||
return TSDB_CODE_TSC_INVALID_SQL;
|
||||
}
|
||||
|
||||
pSqlFuncExpr->colInfo.colId = htons(pExpr->colInfo.colId);
|
||||
|
@ -1279,7 +1279,7 @@ int tscBuildAlterTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
|||
int size = tscEstimateAlterTableMsgLength(pCmd);
|
||||
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
|
||||
tscError("%p failed to malloc for alter table msg", pSql);
|
||||
return -1;
|
||||
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
SCMAlterTableMsg *pAlterTableMsg = (SCMAlterTableMsg *)pCmd->payload;
|
||||
|
@ -1631,7 +1631,7 @@ int tscBuildHeartBeatMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
|||
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
|
||||
pthread_mutex_unlock(&pObj->mutex);
|
||||
tscError("%p failed to malloc for heartbeat msg", pSql);
|
||||
return -1;
|
||||
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
SCMHeartBeatMsg *pHeartbeat = (SCMHeartBeatMsg *)pCmd->payload;
|
||||
|
|
|
@ -926,7 +926,7 @@ void tscFieldInfoClear(SFieldInfo* pFieldInfo) {
|
|||
}
|
||||
|
||||
static SSqlExpr* doBuildSqlExpr(SQueryInfo* pQueryInfo, int16_t functionId, SColumnIndex* pColIndex, int16_t type,
|
||||
int16_t size, int16_t interSize, bool isTagCol) {
|
||||
int16_t size, int16_t interSize, int32_t colType) {
|
||||
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, pColIndex->tableIndex);
|
||||
|
||||
SSqlExpr* pExpr = calloc(1, sizeof(SSqlExpr));
|
||||
|
@ -935,8 +935,10 @@ static SSqlExpr* doBuildSqlExpr(SQueryInfo* pQueryInfo, int16_t functionId, SCol
|
|||
// set the correct columnIndex index
|
||||
if (pColIndex->columnIndex == TSDB_TBNAME_COLUMN_INDEX) {
|
||||
pExpr->colInfo.colId = TSDB_TBNAME_COLUMN_INDEX;
|
||||
} else if (pColIndex->columnIndex == TSDB_UD_COLUMN_INDEX) {
|
||||
pExpr->colInfo.colId = TSDB_UD_COLUMN_INDEX;
|
||||
} else {
|
||||
if (isTagCol) {
|
||||
if (TSDB_COL_IS_TAG(colType)) {
|
||||
SSchema* pSchema = tscGetTableTagSchema(pTableMetaInfo->pTableMeta);
|
||||
pExpr->colInfo.colId = pSchema[pColIndex->columnIndex].colId;
|
||||
tstrncpy(pExpr->colInfo.name, pSchema[pColIndex->columnIndex].name, sizeof(pExpr->colInfo.name));
|
||||
|
@ -948,9 +950,9 @@ static SSqlExpr* doBuildSqlExpr(SQueryInfo* pQueryInfo, int16_t functionId, SCol
|
|||
}
|
||||
}
|
||||
|
||||
pExpr->colInfo.flag = isTagCol? TSDB_COL_TAG:TSDB_COL_NORMAL;
|
||||
|
||||
pExpr->colInfo.flag = colType;
|
||||
pExpr->colInfo.colIndex = pColIndex->columnIndex;
|
||||
|
||||
pExpr->resType = type;
|
||||
pExpr->resBytes = size;
|
||||
pExpr->interBytes = interSize;
|
||||
|
@ -1291,7 +1293,7 @@ bool tscValidateColumnId(STableMetaInfo* pTableMetaInfo, int32_t colId) {
|
|||
return false;
|
||||
}
|
||||
|
||||
if (colId == TSDB_TBNAME_COLUMN_INDEX) {
|
||||
if (colId == TSDB_TBNAME_COLUMN_INDEX || colId == TSDB_UD_COLUMN_INDEX) {
|
||||
return true;
|
||||
}
|
||||
|
||||
|
|
|
@ -28,6 +28,8 @@ void extractTableNameFromToken(SSQLToken *pToken, SSQLToken* pTable);
|
|||
|
||||
SSchema tGetTableNameColumnSchema();
|
||||
|
||||
SSchema tGetUserSpecifiedColumnSchema(const char* v, int16_t type, const char* name);
|
||||
|
||||
bool tscValidateTableNameLength(size_t len);
|
||||
|
||||
SColumnFilterInfo* tscFilterInfoClone(const SColumnFilterInfo* src, int32_t numOfFilters);
|
||||
|
|
|
@ -43,7 +43,24 @@ SSchema tGetTableNameColumnSchema() {
|
|||
s.bytes = TSDB_TABLE_NAME_LEN - 1 + VARSTR_HEADER_SIZE;
|
||||
s.type = TSDB_DATA_TYPE_BINARY;
|
||||
s.colId = TSDB_TBNAME_COLUMN_INDEX;
|
||||
strncpy(s.name, TSQL_TBNAME_L, TSDB_COL_NAME_LEN);
|
||||
tstrncpy(s.name, TSQL_TBNAME_L, TSDB_COL_NAME_LEN);
|
||||
return s;
|
||||
}
|
||||
|
||||
SSchema tGetUserSpecifiedColumnSchema(const char* v, int16_t type, const char* name) {
|
||||
SSchema s = {0};
|
||||
|
||||
s.type = type;
|
||||
if (s.type == TSDB_DATA_TYPE_BINARY || s.type == TSDB_DATA_TYPE_NCHAR) {
|
||||
size_t len = strlen(v);
|
||||
s.bytes = len + VARSTR_HEADER_SIZE;
|
||||
} else {
|
||||
s.bytes = tDataTypeDesc[type].nSize;
|
||||
}
|
||||
|
||||
s.colId = TSDB_UD_COLUMN_INDEX;
|
||||
tstrncpy(s.name, name, sizeof(s.name));
|
||||
|
||||
return s;
|
||||
}
|
||||
|
||||
|
|
|
@ -167,7 +167,11 @@ void tVariantAssign(tVariant *pDst, const tVariant *pSrc) {
|
|||
char* n = strdup(p);
|
||||
taosArrayPush(pDst->arr, &n);
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
pDst->nLen = tDataTypeDesc[pDst->nType].nSize;
|
||||
}
|
||||
|
||||
int32_t tVariantCompare(const tVariant* p1, const tVariant* p2) {
|
||||
|
|
|
@ -286,7 +286,8 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size);
|
|||
|
||||
#define TSDB_MAX_REPLICA 5
|
||||
|
||||
#define TSDB_TBNAME_COLUMN_INDEX (-1)
|
||||
#define TSDB_TBNAME_COLUMN_INDEX (-1)
|
||||
#define TSDB_UD_COLUMN_INDEX (-2)
|
||||
#define TSDB_MULTI_METERMETA_MAX_NUM 100000 // maximum batch size allowed to load metermeta
|
||||
|
||||
#define TSDB_MIN_CACHE_BLOCK_SIZE 1
|
||||
|
|
|
@ -167,9 +167,9 @@ enum _mgmt_table {
|
|||
#define TSDB_VN_WRITE_ACCCESS ((char)0x2)
|
||||
#define TSDB_VN_ALL_ACCCESS (TSDB_VN_READ_ACCCESS | TSDB_VN_WRITE_ACCCESS)
|
||||
|
||||
#define TSDB_COL_NORMAL 0x0u
|
||||
#define TSDB_COL_TAG 0x1u
|
||||
#define TSDB_COL_JOIN 0x2u
|
||||
#define TSDB_COL_NORMAL 0x0u // the normal column of the table
|
||||
#define TSDB_COL_TAG 0x1u // the tag column type
|
||||
#define TSDB_COL_UDC 0x2u // the user specified normal string column, it is a dummy column
|
||||
|
||||
extern char *taosMsg[];
|
||||
|
||||
|
|
|
@ -35,6 +35,9 @@
|
|||
*/
|
||||
#define Q_STATUS_EQUAL(p, s) (((p) & (s)) != 0)
|
||||
#define TSDB_COL_IS_TAG(f) (((f)&TSDB_COL_TAG) != 0)
|
||||
#define TSDB_COL_IS_NORMAL_COL(f) ((f) == TSDB_COL_NORMAL)
|
||||
#define TSDB_COL_IS_UD_COL(f) ((f) == TSDB_COL_UDC)
|
||||
|
||||
#define QUERY_IS_ASC_QUERY(q) (GET_FORWARD_DIRECTION_FACTOR((q)->order.order) == QUERY_ASC_FORWARD_STEP)
|
||||
|
||||
#define IS_MASTER_SCAN(runtime) ((runtime)->scanFlag == MASTER_SCAN)
|
||||
|
@ -371,14 +374,14 @@ static bool hasTagValOutput(SQuery* pQuery) {
|
|||
* @return
|
||||
*/
|
||||
static bool hasNullValue(SColIndex* pColIndex, SDataStatis *pStatis, SDataStatis **pColStatis) {
|
||||
if (pStatis != NULL && !TSDB_COL_IS_TAG(pColIndex->flag)) {
|
||||
if (pStatis != NULL && TSDB_COL_IS_NORMAL_COL(pColIndex->flag)) {
|
||||
*pColStatis = &pStatis[pColIndex->colIndex];
|
||||
assert((*pColStatis)->colId == pColIndex->colId);
|
||||
} else {
|
||||
*pColStatis = NULL;
|
||||
}
|
||||
|
||||
if (TSDB_COL_IS_TAG(pColIndex->flag) || pColIndex->colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
|
||||
if (TSDB_COL_IS_TAG(pColIndex->flag) || TSDB_COL_IS_UD_COL(pColIndex->flag) || pColIndex->colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -884,14 +887,14 @@ static char *getDataBlock(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sas
|
|||
|
||||
} else { // other type of query function
|
||||
SColIndex *pCol = &pQuery->pSelectExpr[col].base.colInfo;
|
||||
if (TSDB_COL_IS_TAG(pCol->flag)) {
|
||||
dataBlock = NULL;
|
||||
} else {
|
||||
if (TSDB_COL_IS_NORMAL_COL(pCol->flag)) {
|
||||
SColIndex* pColIndex = &pQuery->pSelectExpr[col].base.colInfo;
|
||||
SColumnInfoData *p = taosArrayGet(pDataBlock, pColIndex->colIndex);
|
||||
assert(p->info.colId == pColIndex->colId);
|
||||
|
||||
dataBlock = p->pData;
|
||||
} else {
|
||||
dataBlock = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1536,7 +1539,9 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int16_t order
|
|||
pCtx->inputBytes = pQuery->tagColList[index].bytes;
|
||||
pCtx->inputType = pQuery->tagColList[index].type;
|
||||
}
|
||||
|
||||
} else if (TSDB_COL_IS_UD_COL(pIndex->flag)) {
|
||||
pCtx->inputBytes = pSqlFuncMsg->arg[0].argBytes;
|
||||
pCtx->inputType = pSqlFuncMsg->arg[0].argType;
|
||||
} else {
|
||||
pCtx->inputBytes = pQuery->colList[index].bytes;
|
||||
pCtx->inputType = pQuery->colList[index].type;
|
||||
|
@ -5231,6 +5236,8 @@ static int32_t getColumnIndexInSource(SQueryTableMsg *pQueryMsg, SSqlFuncMsg *pE
|
|||
j += 1;
|
||||
}
|
||||
|
||||
} else if (pExprMsg->colInfo.flag == TSDB_COL_UDC) { // user specified column data
|
||||
return TSDB_UD_COLUMN_INDEX;
|
||||
} else {
|
||||
while (j < pQueryMsg->numOfCols) {
|
||||
if (pExprMsg->colInfo.colId == pQueryMsg->colList[j].colId) {
|
||||
|
@ -5590,9 +5597,18 @@ static int32_t createQFunctionExprFromMsg(SQueryTableMsg *pQueryMsg, SExprInfo *
|
|||
bytes = tDataTypeDesc[type].nSize;
|
||||
} else if (pExprs[i].base.colInfo.colId == TSDB_TBNAME_COLUMN_INDEX && pExprs[i].base.functionId == TSDB_FUNC_TAGPRJ) { // parse the normal column
|
||||
SSchema s = tGetTableNameColumnSchema();
|
||||
type = s.type;
|
||||
type = s.type;
|
||||
bytes = s.bytes;
|
||||
} else{
|
||||
} else if (pExprs[i].base.colInfo.colId == TSDB_UD_COLUMN_INDEX) {
|
||||
assert(pExprs[i].base.functionId == TSDB_FUNC_PRJ);
|
||||
|
||||
type = pExprs[i].base.arg[0].argType;
|
||||
bytes = pExprs[i].base.arg[0].argBytes;
|
||||
|
||||
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
|
||||
bytes += VARSTR_HEADER_SIZE;
|
||||
}
|
||||
} else {
|
||||
int32_t j = getColumnIndexInSource(pQueryMsg, &pExprs[i].base, pTagCols);
|
||||
assert(j < pQueryMsg->numOfCols || j < pQueryMsg->numOfTags);
|
||||
|
||||
|
@ -5765,7 +5781,7 @@ static void doUpdateExprColumnIndex(SQuery *pQuery) {
|
|||
|
||||
// todo opt performance
|
||||
SColIndex *pColIndex = &pSqlExprMsg->colInfo;
|
||||
if (!TSDB_COL_IS_TAG(pColIndex->flag)) {
|
||||
if (TSDB_COL_IS_NORMAL_COL(pColIndex->flag)) {
|
||||
int32_t f = 0;
|
||||
for (f = 0; f < pQuery->numOfCols; ++f) {
|
||||
if (pColIndex->colId == pQuery->colList[f].colId) {
|
||||
|
@ -5774,7 +5790,9 @@ static void doUpdateExprColumnIndex(SQuery *pQuery) {
|
|||
}
|
||||
}
|
||||
|
||||
assert (f < pQuery->numOfCols);
|
||||
assert(f < pQuery->numOfCols);
|
||||
} else if (pColIndex->colId == TSDB_UD_COLUMN_INDEX) {
|
||||
// do nothing
|
||||
} else {
|
||||
int32_t f = 0;
|
||||
for (f = 0; f < pQuery->numOfTags; ++f) {
|
||||
|
|
Loading…
Reference in New Issue