[td-171] add filter support, refactor the client side parse functions
This commit is contained in:
parent
773432c119
commit
64545fb9a3
|
@ -57,7 +57,7 @@ typedef struct SJoinSubquerySupporter {
|
|||
int64_t interval; // interval time
|
||||
SLimitVal limit; // limit info
|
||||
uint64_t uid; // query meter uid
|
||||
SColumnBaseInfo colList; // previous query information
|
||||
SArray* colList; // previous query information
|
||||
SSqlExprInfo exprsInfo;
|
||||
SFieldInfo fieldsInfo;
|
||||
STagCond tagCond;
|
||||
|
@ -106,7 +106,7 @@ bool tscIsProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex);
|
|||
bool tscProjectionQueryOnTable(SQueryInfo* pQueryInfo);
|
||||
|
||||
bool tscIsTwoStageSTableQuery(SQueryInfo* pQueryInfo, int32_t tableIndex);
|
||||
bool tscQueryOnMetric(SSqlCmd* pCmd);
|
||||
bool tscQueryOnSTable(SSqlCmd* pCmd);
|
||||
bool tscQueryTags(SQueryInfo* pQueryInfo);
|
||||
bool tscIsSelectivityWithTagQuery(SSqlCmd* pCmd);
|
||||
|
||||
|
@ -159,16 +159,12 @@ void tscSqlExprCopy(SSqlExprInfo* dst, const SSqlExprInfo* src, uint64_t ui
|
|||
void* tscSqlExprDestroy(SSqlExpr* pExpr);
|
||||
void tscSqlExprInfoDestroy(SSqlExprInfo* pExprInfo);
|
||||
|
||||
SColumnBase* tscColumnBaseInfoInsert(SQueryInfo* pQueryInfo, SColumnIndex* colIndex);
|
||||
void tscColumnFilterInfoCopy(SColumnFilterInfo* dst, const SColumnFilterInfo* src);
|
||||
void tscColumnBaseCopy(SColumnBase* dst, const SColumnBase* src);
|
||||
SColumn* tscColumnClone(const SColumn* src);
|
||||
SColumn* tscColumnListInsert(SArray* pColList, SColumnIndex* colIndex);
|
||||
void tscColumnListAssign(SArray* dst, const SArray* src, int16_t tableIndex);
|
||||
void tscColumnListDestroy(SArray* pColList);
|
||||
|
||||
void tscColumnBaseInfoCopy(SColumnBaseInfo* dst, const SColumnBaseInfo* src, int16_t tableIndex);
|
||||
SColumnBase* tscColumnBaseInfoGet(SColumnBaseInfo* pColumnBaseInfo, int32_t index);
|
||||
void tscColumnBaseInfoUpdateTableIndex(SColumnBaseInfo* pColList, int16_t tableIndex);
|
||||
|
||||
void tscColumnBaseInfoReserve(SColumnBaseInfo* pColumnBaseInfo, int32_t size);
|
||||
void tscColumnBaseInfoDestroy(SColumnBaseInfo* pColumnBaseInfo);
|
||||
SColumnFilterInfo* tscFilterInfoClone(const SColumnFilterInfo* src, int32_t numOfFilters);
|
||||
|
||||
int32_t tscValidateName(SSQLToken* pToken);
|
||||
|
||||
|
|
|
@ -30,9 +30,9 @@ extern "C" {
|
|||
#include "taosmsg.h"
|
||||
#include "tarray.h"
|
||||
#include "tglobal.h"
|
||||
#include "trpc.h"
|
||||
#include "tsqlfunction.h"
|
||||
#include "tutil.h"
|
||||
#include "tsqlfunction.h"
|
||||
#include "queryExecutor.h"
|
||||
|
||||
#define TSC_GET_RESPTR_BASE(res, _queryinfo, col) (res->data + ((_queryinfo)->fieldsInfo.pSqlExpr[col]->offset) * res->numOfRows)
|
||||
|
||||
|
@ -41,14 +41,6 @@ struct SSqlInfo;
|
|||
|
||||
typedef SCMSTableVgroupRspMsg SVgroupsInfo;
|
||||
|
||||
typedef struct SSqlGroupbyExpr {
|
||||
int16_t tableIndex;
|
||||
int16_t numOfGroupCols;
|
||||
SColIndex columnInfo[TSDB_MAX_TAGS]; // group by columns information
|
||||
int16_t orderIndex; // order by column index
|
||||
int16_t orderType; // order by type: asc/desc
|
||||
} SSqlGroupbyExpr;
|
||||
|
||||
typedef struct STableComInfo {
|
||||
uint8_t numOfTags;
|
||||
uint8_t precision;
|
||||
|
@ -123,17 +115,11 @@ typedef struct SSqlExprInfo {
|
|||
SSqlExpr** pExprs;
|
||||
} SSqlExprInfo;
|
||||
|
||||
typedef struct SColumnBase {
|
||||
typedef struct SColumn {
|
||||
SColumnIndex colIndex;
|
||||
int32_t numOfFilters;
|
||||
SColumnFilterInfo *filterInfo;
|
||||
} SColumnBase;
|
||||
|
||||
typedef struct SColumnBaseInfo {
|
||||
int16_t numOfAlloc;
|
||||
int16_t numOfCols;
|
||||
SColumnBase *pColList;
|
||||
} SColumnBaseInfo;
|
||||
} SColumn;
|
||||
|
||||
struct SLocalReducer;
|
||||
|
||||
|
@ -223,7 +209,7 @@ typedef struct SQueryInfo {
|
|||
int64_t slidingTime; // sliding window in mseconds
|
||||
SSqlGroupbyExpr groupbyExpr; // group by tags info
|
||||
|
||||
SColumnBaseInfo colList;
|
||||
SArray* colList;
|
||||
SFieldInfo fieldsInfo;
|
||||
SSqlExprInfo exprsInfo;
|
||||
SLimitVal limit;
|
||||
|
|
|
@ -2940,7 +2940,7 @@ static void col_project_function_f(SQLFunctionCtx *pCtx, int32_t index) {
|
|||
char *pData = GET_INPUT_CHAR_INDEX(pCtx, index);
|
||||
memcpy(pCtx->aOutputBuf, pData, pCtx->inputBytes);
|
||||
|
||||
pCtx->aOutputBuf += pCtx->inputBytes/* * GET_FORWARD_DIRECTION_FACTOR(pCtx->order)*/;
|
||||
pCtx->aOutputBuf += pCtx->inputBytes;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -1123,7 +1123,11 @@ int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSel
|
|||
const char* msg5 = "invalid function name";
|
||||
|
||||
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex);
|
||||
|
||||
|
||||
if (pQueryInfo->colList == NULL) {
|
||||
pQueryInfo->colList = taosArrayInit(4, POINTER_BYTES);
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < pSelection->nExpr; ++i) {
|
||||
int32_t outputIndex = pQueryInfo->exprsInfo.numOfExprs;
|
||||
tSQLExprItem* pItem = &pSelection->a[i];
|
||||
|
@ -1278,7 +1282,7 @@ int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSel
|
|||
int32_t insertResultField(SQueryInfo* pQueryInfo, int32_t outputIndex, SColumnList* pIdList, int16_t bytes,
|
||||
int8_t type, char* fieldName, SSqlExpr* pSqlExpr) {
|
||||
for (int32_t i = 0; i < pIdList->num; ++i) {
|
||||
tscColumnBaseInfoInsert(pQueryInfo, &(pIdList->ids[i]));
|
||||
tscColumnListInsert(pQueryInfo->colList, &(pIdList->ids[i]));
|
||||
}
|
||||
|
||||
tscFieldInfoSetValue(&pQueryInfo->fieldsInfo, outputIndex, type, fieldName, bytes);
|
||||
|
@ -1494,7 +1498,7 @@ static int32_t setExprInfoForFunctions(SQueryInfo* pQueryInfo, SSchema* pSchema,
|
|||
|
||||
// for all querie, the timestamp column meeds to be loaded
|
||||
SColumnIndex index = {.tableIndex = pColIndex->tableIndex, .columnIndex = PRIMARYKEY_TIMESTAMP_COL_INDEX};
|
||||
tscColumnBaseInfoInsert(pQueryInfo, &index);
|
||||
tscColumnListInsert(pQueryInfo->colList, &index);
|
||||
|
||||
SColumnList ids = getColumnList(1, pColIndex->tableIndex, pColIndex->columnIndex);
|
||||
insertResultField(pQueryInfo, resColIdx, &ids, bytes, type, columnName, pExpr);
|
||||
|
@ -1579,12 +1583,12 @@ int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIndex, tSQLExpr
|
|||
insertResultField(pQueryInfo, numOfOutput, &ids, sizeof(int64_t), TSDB_DATA_TYPE_BIGINT, pExpr->aliasName, pExpr);
|
||||
} else {
|
||||
for (int32_t i = 0; i < ids.num; ++i) {
|
||||
tscColumnBaseInfoInsert(pQueryInfo, &(ids.ids[i]));
|
||||
tscColumnListInsert(pQueryInfo->colList, &(ids.ids[i]));
|
||||
}
|
||||
}
|
||||
|
||||
SColumnIndex tsCol = {.tableIndex = index.tableIndex, .columnIndex = PRIMARYKEY_TIMESTAMP_COL_INDEX};
|
||||
tscColumnBaseInfoInsert(pQueryInfo, &tsCol);
|
||||
tscColumnListInsert(pQueryInfo->colList, &tsCol);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -1690,12 +1694,12 @@ int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIndex, tSQLExpr
|
|||
insertResultField(pQueryInfo, numOfOutput, &ids, pExpr->resBytes, pExpr->resType, pExpr->aliasName, pExpr);
|
||||
} else {
|
||||
for (int32_t i = 0; i < ids.num; ++i) {
|
||||
tscColumnBaseInfoInsert(pQueryInfo, &(ids.ids[i]));
|
||||
tscColumnListInsert(pQueryInfo->colList, &(ids.ids[i]));
|
||||
}
|
||||
}
|
||||
|
||||
SColumnIndex tsCol = {.tableIndex = index.tableIndex, .columnIndex = PRIMARYKEY_TIMESTAMP_COL_INDEX};
|
||||
tscColumnBaseInfoInsert(pQueryInfo, &tsCol);
|
||||
tscColumnListInsert(pQueryInfo->colList, &tsCol);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -1889,7 +1893,7 @@ int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIndex, tSQLExpr
|
|||
insertResultField(pQueryInfo, colIndex, &ids, resultSize, resultType, pExpr->aliasName, pExpr);
|
||||
} else {
|
||||
for (int32_t i = 0; i < ids.num; ++i) {
|
||||
tscColumnBaseInfoInsert(pQueryInfo, &(ids.ids[i]));
|
||||
tscColumnListInsert(pQueryInfo->colList, &(ids.ids[i]));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2535,7 +2539,7 @@ int32_t parseGroupbyClause(SQueryInfo* pQueryInfo, tVariantList* pList, SSqlCmd*
|
|||
return invalidSqlErrMsg(pQueryInfo->msg, msg8);
|
||||
}
|
||||
|
||||
tscColumnBaseInfoInsert(pQueryInfo, &index);
|
||||
tscColumnListInsert(pQueryInfo->colList, &index);
|
||||
pQueryInfo->groupbyExpr.columnInfo[i] =
|
||||
(SColIndex){.colIndex = index.columnIndex, .flag = TSDB_COL_NORMAL, .colId = pSchema->colId}; // relIndex;
|
||||
pQueryInfo->groupbyExpr.orderType = TSDB_ORDER_ASC;
|
||||
|
@ -2559,7 +2563,7 @@ void setColumnOffsetValueInResultset(SQueryInfo* pQueryInfo) {
|
|||
}
|
||||
}
|
||||
|
||||
static SColumnFilterInfo* addColumnFilterInfo(SColumnBase* pColumn) {
|
||||
static SColumnFilterInfo* addColumnFilterInfo(SColumn* pColumn) {
|
||||
if (pColumn == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
@ -2836,7 +2840,7 @@ static int32_t extractColumnFilterInfo(SQueryInfo* pQueryInfo, SColumnIndex* pIn
|
|||
const char* msg1 = "non binary column not support like operator";
|
||||
const char* msg2 = "binary column not support this operator";
|
||||
|
||||
SColumnBase* pColumn = tscColumnBaseInfoInsert(pQueryInfo, pIndex);
|
||||
SColumn* pColumn = tscColumnListInsert(pQueryInfo->colList, pIndex);
|
||||
SColumnFilterInfo* pColFilter = NULL;
|
||||
|
||||
/*
|
||||
|
@ -2857,10 +2861,10 @@ static int32_t extractColumnFilterInfo(SQueryInfo* pQueryInfo, SColumnIndex* pIn
|
|||
return TSDB_CODE_INVALID_SQL;
|
||||
}
|
||||
|
||||
pColFilter->filterOnBinary =
|
||||
pColFilter->filterstr =
|
||||
((pSchema->type == TSDB_DATA_TYPE_BINARY || pSchema->type == TSDB_DATA_TYPE_NCHAR) ? 1 : 0);
|
||||
|
||||
if (pColFilter->filterOnBinary) {
|
||||
if (pColFilter->filterstr) {
|
||||
if (pExpr->nSQLOptr != TK_EQ && pExpr->nSQLOptr != TK_NE && pExpr->nSQLOptr != TK_LIKE) {
|
||||
return invalidSqlErrMsg(pQueryInfo->msg, msg2);
|
||||
}
|
||||
|
@ -3584,11 +3588,15 @@ static int32_t setTableCondForSTableQuery(SQueryInfo* pQueryInfo, const char* ac
|
|||
}
|
||||
|
||||
static bool validateFilterExpr(SQueryInfo* pQueryInfo) {
|
||||
for (int32_t i = 0; i < pQueryInfo->colList.numOfCols; ++i) {
|
||||
SColumnBase* pColBase = &pQueryInfo->colList.pColList[i];
|
||||
SArray* pColList = pQueryInfo->colList;
|
||||
|
||||
size_t num = taosArrayGetSize(pColList);
|
||||
|
||||
for (int32_t i = 0; i < num; ++i) {
|
||||
SColumn* pCol = taosArrayGetP(pColList, i);
|
||||
|
||||
for (int32_t j = 0; j < pColBase->numOfFilters; ++j) {
|
||||
SColumnFilterInfo* pColFilter = &pColBase->filterInfo[j];
|
||||
for (int32_t j = 0; j < pCol->numOfFilters; ++j) {
|
||||
SColumnFilterInfo* pColFilter = &pCol->filterInfo[j];
|
||||
int32_t lowerOptr = pColFilter->lowerRelOptr;
|
||||
int32_t upperOptr = pColFilter->upperRelOptr;
|
||||
|
||||
|
|
|
@ -567,7 +567,7 @@ static int32_t tscEstimateQueryMsgSize(SSqlCmd *pCmd, int32_t clauseIndex) {
|
|||
const static int32_t MIN_QUERY_MSG_PKT_SIZE = TSDB_MAX_BYTES_PER_ROW * 5;
|
||||
SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex);
|
||||
|
||||
int32_t srcColListSize = pQueryInfo->colList.numOfCols * sizeof(SColumnInfo);
|
||||
int32_t srcColListSize = taosArrayGetSize(pQueryInfo->colList) * sizeof(SColumnInfo);
|
||||
|
||||
int32_t exprSize = sizeof(SSqlFuncExprMsg) * pQueryInfo->exprsInfo.numOfExprs;
|
||||
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
|
||||
|
@ -624,7 +624,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
|||
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
|
||||
STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
|
||||
|
||||
if (pQueryInfo->colList.numOfCols <= 0) {
|
||||
if (taosArrayGetSize(pQueryInfo->colList) <= 0) {
|
||||
tscError("%p illegal value of numOfCols in query msg: %d", pSql, tscGetNumOfColumns(pTableMeta));
|
||||
return -1;
|
||||
}
|
||||
|
@ -700,7 +700,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
|||
pQueryMsg->interpoType = htons(pQueryInfo->interpoType);
|
||||
pQueryMsg->limit = htobe64(pQueryInfo->limit.limit);
|
||||
pQueryMsg->offset = htobe64(pQueryInfo->limit.offset);
|
||||
pQueryMsg->numOfCols = htons(pQueryInfo->colList.numOfCols);
|
||||
pQueryMsg->numOfCols = htons(taosArrayGetSize(pQueryInfo->colList));
|
||||
pQueryMsg->intervalTime = htobe64(pQueryInfo->intervalTime);
|
||||
pQueryMsg->slidingTime = htobe64(pQueryInfo->slidingTime);
|
||||
pQueryMsg->slidingTimeUnit = pQueryInfo->slidingTimeUnit;
|
||||
|
@ -716,21 +716,22 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
|||
}
|
||||
|
||||
// set column list ids
|
||||
char *pMsg = (char *)(pQueryMsg->colList) + pQueryInfo->colList.numOfCols * sizeof(SColumnInfo);
|
||||
size_t numOfCols = taosArrayGetSize(pQueryInfo->colList);
|
||||
char *pMsg = (char *)(pQueryMsg->colList) + numOfCols * sizeof(SColumnInfo);
|
||||
SSchema *pSchema = tscGetTableSchema(pTableMeta);
|
||||
|
||||
for (int32_t i = 0; i < pQueryInfo->colList.numOfCols; ++i) {
|
||||
SColumnBase *pCol = tscColumnBaseInfoGet(&pQueryInfo->colList, i);
|
||||
SSchema * pColSchema = &pSchema[pCol->colIndex.columnIndex];
|
||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
SColumn *pCol = taosArrayGetP(pQueryInfo->colList, i);
|
||||
SSchema *pColSchema = &pSchema[pCol->colIndex.columnIndex];
|
||||
|
||||
// if (pCol->colIndex.columnIndex >= tscGetNumOfColumns(pTableMeta) || pColSchema->type < TSDB_DATA_TYPE_BOOL ||
|
||||
// pColSchema->type > TSDB_DATA_TYPE_NCHAR) {
|
||||
// tscError("%p vid:%d sid:%d id:%s, column index out of range, numOfColumns:%d, index:%d, column name:%s", pSql,
|
||||
// htons(pQueryMsg->vnode), pTableMeta->sid, pTableMetaInfo->name, tscGetNumOfColumns(pTableMeta), pCol->colIndex,
|
||||
// pColSchema->name);
|
||||
//
|
||||
// return -1; // 0 means build msg failed
|
||||
// }
|
||||
if (pCol->colIndex.columnIndex >= tscGetNumOfColumns(pTableMeta) || pColSchema->type < TSDB_DATA_TYPE_BOOL ||
|
||||
pColSchema->type > TSDB_DATA_TYPE_NCHAR) {
|
||||
tscError("%p sid:%d uid:%" PRIu64" id:%s, column index out of range, numOfColumns:%d, index:%d, column name:%s",
|
||||
pSql, pTableMeta->sid, pTableMeta->uid, pTableMetaInfo->name, tscGetNumOfColumns(pTableMeta), pCol->colIndex,
|
||||
pColSchema->name);
|
||||
|
||||
return -1; // 0 means build msg failed
|
||||
}
|
||||
|
||||
pQueryMsg->colList[i].colId = htons(pColSchema->colId);
|
||||
pQueryMsg->colList[i].bytes = htons(pColSchema->bytes);
|
||||
|
@ -742,11 +743,11 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
|||
SColumnFilterInfo *pColFilter = &pCol->filterInfo[f];
|
||||
|
||||
SColumnFilterInfo *pFilterMsg = (SColumnFilterInfo *)pMsg;
|
||||
pFilterMsg->filterOnBinary = htons(pColFilter->filterOnBinary);
|
||||
pFilterMsg->filterstr = htons(pColFilter->filterstr);
|
||||
|
||||
pMsg += sizeof(SColumnFilterInfo);
|
||||
|
||||
if (pColFilter->filterOnBinary) {
|
||||
if (pColFilter->filterstr) {
|
||||
pFilterMsg->len = htobe64(pColFilter->len);
|
||||
memcpy(pMsg, (void *)pColFilter->pz, pColFilter->len + 1);
|
||||
pMsg += (pColFilter->len + 1); // append the additional filter binary info
|
||||
|
@ -808,8 +809,9 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
|||
|
||||
int32_t len = 0;
|
||||
if (hasArithmeticFunction) {
|
||||
SColumnBase *pColBase = pQueryInfo->colList.pColList;
|
||||
for (int32_t i = 0; i < pQueryInfo->colList.numOfCols; ++i) {
|
||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
SColumn* pColBase = taosArrayGetP(pQueryInfo->colList, i);
|
||||
|
||||
char * name = pSchema[pColBase[i].colIndex.columnIndex].name;
|
||||
int32_t lenx = strlen(name);
|
||||
memcpy(pMsg, name, lenx);
|
||||
|
@ -2194,12 +2196,15 @@ int tscProcessShowRsp(SSqlObj *pSql) {
|
|||
pCmd->numOfCols = pQueryInfo->fieldsInfo.numOfOutputCols;
|
||||
SSchema *pTableSchema = tscGetTableSchema(pTableMetaInfo->pTableMeta);
|
||||
|
||||
tscColumnBaseInfoReserve(&pQueryInfo->colList, pMetaMsg->numOfColumns);
|
||||
if (pQueryInfo->colList == NULL) {
|
||||
pQueryInfo->colList = taosArrayInit(4, POINTER_BYTES);
|
||||
}
|
||||
|
||||
SColumnIndex index = {0};
|
||||
|
||||
for (int16_t i = 0; i < pMetaMsg->numOfColumns; ++i) {
|
||||
index.columnIndex = i;
|
||||
tscColumnBaseInfoInsert(pQueryInfo, &index);
|
||||
tscColumnListInsert(pQueryInfo->colList, &index);
|
||||
|
||||
tscFieldInfoSetValFromSchema(&pQueryInfo->fieldsInfo, i, &pTableSchema[i]);
|
||||
|
||||
pQueryInfo->fieldsInfo.pSqlExpr[i] = tscSqlExprInsert(pQueryInfo, i, TSDB_FUNC_TS_DUMMY, &index,
|
||||
|
@ -2477,7 +2482,7 @@ int tscRenewMeterMeta(SSqlObj *pSql, char *tableId) {
|
|||
* 1. only update the metermeta in force model metricmeta is not updated
|
||||
* 2. if get metermeta failed, still get the metermeta
|
||||
*/
|
||||
if (pTableMetaInfo->pTableMeta == NULL || !tscQueryOnMetric(pCmd)) {
|
||||
if (pTableMetaInfo->pTableMeta == NULL || !tscQueryOnSTable(pCmd)) {
|
||||
STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
|
||||
if (pTableMetaInfo->pTableMeta) {
|
||||
tscTrace("%p update table meta, old: numOfTags:%d, numOfCols:%d, uid:%" PRId64 ", addr:%p", pSql,
|
||||
|
|
|
@ -191,7 +191,7 @@ void tscDestroyJoinSupporter(SJoinSubquerySupporter* pSupporter) {
|
|||
}
|
||||
|
||||
tscSqlExprInfoDestroy(&pSupporter->exprsInfo);
|
||||
tscColumnBaseInfoDestroy(&pSupporter->colList);
|
||||
tscColumnListDestroy(pSupporter->colList);
|
||||
|
||||
tscClearFieldInfo(&pSupporter->fieldsInfo);
|
||||
|
||||
|
@ -211,8 +211,10 @@ void tscDestroyJoinSupporter(SJoinSubquerySupporter* pSupporter) {
|
|||
*
|
||||
*/
|
||||
bool needSecondaryQuery(SQueryInfo* pQueryInfo) {
|
||||
for (int32_t i = 0; i < pQueryInfo->colList.numOfCols; ++i) {
|
||||
SColumnBase* pBase = tscColumnBaseInfoGet(&pQueryInfo->colList, i);
|
||||
size_t numOfCols = taosArrayGetSize(pQueryInfo->colList);
|
||||
|
||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
SColumn* pBase = taosArrayGet(pQueryInfo->colList, i);
|
||||
if (pBase->colIndex.columnIndex != PRIMARYKEY_TIMESTAMP_COL_INDEX) {
|
||||
return true;
|
||||
}
|
||||
|
@ -299,7 +301,7 @@ int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql) {
|
|||
pQueryInfo->intervalTime = pSupporter->interval;
|
||||
pQueryInfo->groupbyExpr = pSupporter->groupbyExpr;
|
||||
|
||||
tscColumnBaseInfoCopy(&pQueryInfo->colList, &pSupporter->colList, 0);
|
||||
tscColumnListAssign(pQueryInfo->colList, pSupporter->colList, 0);
|
||||
tscTagCondCopy(&pQueryInfo->tagCond, &pSupporter->tagCond);
|
||||
|
||||
tscSqlExprCopy(&pQueryInfo->exprsInfo, &pSupporter->exprsInfo, pSupporter->uid, false);
|
||||
|
@ -342,9 +344,10 @@ int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql) {
|
|||
|
||||
tscPrintSelectClause(pNew, 0);
|
||||
|
||||
size_t numOfCols = taosArrayGetSize(pNewQueryInfo->colList);
|
||||
tscTrace("%p subquery:%p tableIndex:%d, vgroupIndex:%d, type:%d, exprInfo:%d, colList:%d, fieldsInfo:%d, name:%s",
|
||||
pSql, pNew, 0, pTableMetaInfo->vgroupIndex, pNewQueryInfo->type,
|
||||
pNewQueryInfo->exprsInfo.numOfExprs, pNewQueryInfo->colList.numOfCols,
|
||||
pNewQueryInfo->exprsInfo.numOfExprs, numOfCols,
|
||||
pNewQueryInfo->fieldsInfo.numOfOutputCols, pNewQueryInfo->pTableMetaInfo[0]->name);
|
||||
}
|
||||
|
||||
|
@ -850,8 +853,14 @@ int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSubquerySu
|
|||
SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
|
||||
assert(pNewQueryInfo != NULL);
|
||||
|
||||
tscColumnBaseInfoUpdateTableIndex(&pNewQueryInfo->colList, 0);
|
||||
tscColumnBaseInfoCopy(&pSupporter->colList, &pNewQueryInfo->colList, 0);
|
||||
// update the table index
|
||||
size_t num = taosArrayGetSize(pNewQueryInfo->colList);
|
||||
for (int32_t i = 0; i < num; ++i) {
|
||||
SColumn* pCol = taosArrayGetP(pNewQueryInfo->colList, i);
|
||||
pCol->colIndex.tableIndex = 0;
|
||||
}
|
||||
|
||||
tscColumnListAssign(pSupporter->colList, pNewQueryInfo->colList, 0);
|
||||
|
||||
tscSqlExprCopy(&pSupporter->exprsInfo, &pNewQueryInfo->exprsInfo, pSupporter->uid, false);
|
||||
tscFieldInfoCopyAll(&pSupporter->fieldsInfo, &pNewQueryInfo->fieldsInfo);
|
||||
|
@ -888,27 +897,26 @@ int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSubquerySu
|
|||
pExpr->numOfParams = 1;
|
||||
|
||||
// add the filter tag column
|
||||
for (int32_t i = 0; i < pSupporter->colList.numOfCols; ++i) {
|
||||
SColumnBase *pColBase = &pSupporter->colList.pColList[i];
|
||||
if (pColBase->numOfFilters > 0) { // copy to the pNew->cmd.colList if it is filtered.
|
||||
tscColumnBaseCopy(&pNewQueryInfo->colList.pColList[pNewQueryInfo->colList.numOfCols], pColBase);
|
||||
pNewQueryInfo->colList.numOfCols++;
|
||||
size_t s = taosArrayGetSize(pSupporter->colList);
|
||||
|
||||
for (int32_t i = 0; i < s; ++i) {
|
||||
SColumn *pCol = taosArrayGetP(pSupporter->colList, i);
|
||||
|
||||
if (pCol->numOfFilters > 0) { // copy to the pNew->cmd.colList if it is filtered.
|
||||
SColumn* p = tscColumnClone(pCol);
|
||||
taosArrayPush(pNewQueryInfo->colList, &p);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
size_t numOfCols = taosArrayGetSize(pNewQueryInfo->colList);
|
||||
|
||||
tscTrace("%p subquery:%p tableIndex:%d, vnodeIdx:%d, type:%d, transfer to ts_comp query to retrieve timestamps, "
|
||||
"exprInfo:%d, colList:%d, fieldsInfo:%d, name:%s",
|
||||
pSql, pNew, tableIndex, pTableMetaInfo->vgroupIndex, pNewQueryInfo->type,
|
||||
pNewQueryInfo->exprsInfo.numOfExprs, pNewQueryInfo->colList.numOfCols,
|
||||
pNewQueryInfo->fieldsInfo.numOfOutputCols, pNewQueryInfo->pTableMetaInfo[0]->name);
|
||||
tscPrintSelectClause(pNew, 0);
|
||||
|
||||
tscTrace("%p subquery:%p tableIndex:%d, vnodeIdx:%d, type:%d, transfer to ts_comp query to retrieve timestamps, "
|
||||
"exprInfo:%d, colList:%d, fieldsInfo:%d, name:%s",
|
||||
pSql, pNew, tableIndex, pTableMetaInfo->vgroupIndex, pNewQueryInfo->type,
|
||||
pNewQueryInfo->exprsInfo.numOfExprs, pNewQueryInfo->colList.numOfCols,
|
||||
pNewQueryInfo->exprsInfo.numOfExprs, numOfCols,
|
||||
pNewQueryInfo->fieldsInfo.numOfOutputCols, pNewQueryInfo->pTableMetaInfo[0]->name);
|
||||
tscPrintSelectClause(pNew, 0);
|
||||
|
||||
} else {
|
||||
SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
|
||||
pNewQueryInfo->type |= TSDB_QUERY_TYPE_SUBQUERY;
|
||||
|
|
|
@ -130,7 +130,7 @@ void tsSetSTableQueryCond(STagCond* pTagCond, uint64_t uid, SBuffer* pBuf) {
|
|||
taosArrayPush(pTagCond->pCond, &cond);
|
||||
}
|
||||
|
||||
bool tscQueryOnMetric(SSqlCmd* pCmd) {
|
||||
bool tscQueryOnSTable(SSqlCmd* pCmd) {
|
||||
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
|
||||
|
||||
return ((pQueryInfo->type & TSDB_QUERY_TYPE_STABLE_QUERY) == TSDB_QUERY_TYPE_STABLE_QUERY) &&
|
||||
|
@ -1289,7 +1289,6 @@ void tscSqlExprInfoDestroy(SSqlExprInfo* pExprInfo) {
|
|||
pExprInfo->numOfExprs = 0;
|
||||
}
|
||||
|
||||
|
||||
void tscSqlExprCopy(SSqlExprInfo* dst, const SSqlExprInfo* src, uint64_t tableuid, bool deepcopy) {
|
||||
if (src == NULL || src->numOfExprs == 0) {
|
||||
return;
|
||||
|
@ -1323,186 +1322,130 @@ void tscSqlExprCopy(SSqlExprInfo* dst, const SSqlExprInfo* src, uint64_t tableui
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
static void clearVal(SColumnBase* pBase) {
|
||||
memset(pBase, 0, sizeof(SColumnBase));
|
||||
|
||||
pBase->colIndex.tableIndex = -2;
|
||||
pBase->colIndex.columnIndex = -2;
|
||||
}
|
||||
|
||||
static void _cf_ensureSpace(SColumnBaseInfo* pcolList, int32_t size) {
|
||||
if (pcolList->numOfAlloc < size) {
|
||||
int32_t oldSize = pcolList->numOfAlloc;
|
||||
|
||||
int32_t newSize = (oldSize <= 0) ? 8 : (oldSize << 1);
|
||||
while (newSize < size) {
|
||||
newSize = (newSize << 1);
|
||||
}
|
||||
|
||||
if (newSize > TSDB_MAX_COLUMNS) {
|
||||
newSize = TSDB_MAX_COLUMNS;
|
||||
}
|
||||
|
||||
int32_t inc = newSize - oldSize;
|
||||
|
||||
pcolList->pColList = realloc(pcolList->pColList, newSize * sizeof(SColumnBase));
|
||||
memset(&pcolList->pColList[oldSize], 0, inc * sizeof(SColumnBase));
|
||||
|
||||
pcolList->numOfAlloc = newSize;
|
||||
}
|
||||
}
|
||||
|
||||
static void _cf_evic(SColumnBaseInfo* pcolList, int32_t index) {
|
||||
if (index < pcolList->numOfCols) {
|
||||
memmove(&pcolList->pColList[index + 1], &pcolList->pColList[index],
|
||||
sizeof(SColumnBase) * (pcolList->numOfCols - index));
|
||||
|
||||
clearVal(&pcolList->pColList[index]);
|
||||
}
|
||||
}
|
||||
|
||||
SColumnBase* tscColumnBaseInfoGet(SColumnBaseInfo* pColumnBaseInfo, int32_t index) {
|
||||
if (pColumnBaseInfo == NULL || pColumnBaseInfo->numOfCols < index) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
return &pColumnBaseInfo->pColList[index];
|
||||
}
|
||||
|
||||
void tscColumnBaseInfoUpdateTableIndex(SColumnBaseInfo* pColList, int16_t tableIndex) {
|
||||
for (int32_t i = 0; i < pColList->numOfCols; ++i) {
|
||||
pColList->pColList[i].colIndex.tableIndex = tableIndex;
|
||||
}
|
||||
}
|
||||
|
||||
// todo refactor
|
||||
SColumnBase* tscColumnBaseInfoInsert(SQueryInfo* pQueryInfo, SColumnIndex* pColIndex) {
|
||||
SColumnBaseInfo* pcolList = &pQueryInfo->colList;
|
||||
|
||||
SColumn* tscColumnListInsert(SArray* pColumnList, SColumnIndex* pColIndex) {
|
||||
// ignore the tbname column to be inserted into source list
|
||||
if (pColIndex->columnIndex < 0) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
||||
size_t numOfCols = taosArrayGetSize(pColumnList);
|
||||
int16_t col = pColIndex->columnIndex;
|
||||
|
||||
int32_t i = 0;
|
||||
while (i < pcolList->numOfCols) {
|
||||
if (pcolList->pColList[i].colIndex.columnIndex < col) {
|
||||
while (i < numOfCols) {
|
||||
SColumn* pCol = taosArrayGetP(pColumnList, i);
|
||||
if (pCol->colIndex.columnIndex < col) {
|
||||
i++;
|
||||
} else if (pcolList->pColList[i].colIndex.tableIndex < pColIndex->tableIndex) {
|
||||
} else if (pCol->colIndex.tableIndex < pColIndex->tableIndex) {
|
||||
i++;
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
SColumnIndex* pIndex = &pcolList->pColList[i].colIndex;
|
||||
if ((i < pcolList->numOfCols && (pIndex->columnIndex > col || pIndex->tableIndex != pColIndex->tableIndex)) ||
|
||||
(i >= pcolList->numOfCols)) {
|
||||
_cf_ensureSpace(pcolList, pcolList->numOfCols + 1);
|
||||
_cf_evic(pcolList, i);
|
||||
|
||||
pcolList->pColList[i].colIndex = *pColIndex;
|
||||
pcolList->numOfCols++;
|
||||
}
|
||||
|
||||
return &pcolList->pColList[i];
|
||||
}
|
||||
|
||||
void tscColumnFilterInfoCopy(SColumnFilterInfo* dst, const SColumnFilterInfo* src) {
|
||||
assert(src != NULL && dst != NULL);
|
||||
|
||||
assert(src->filterOnBinary == 0 || src->filterOnBinary == 1);
|
||||
if (src->lowerRelOptr == TSDB_RELATION_INVALID && src->upperRelOptr == TSDB_RELATION_INVALID) {
|
||||
assert(0);
|
||||
}
|
||||
|
||||
*dst = *src;
|
||||
if (dst->filterOnBinary) {
|
||||
size_t len = (size_t)dst->len + 1;
|
||||
char* pTmp = calloc(1, len);
|
||||
dst->pz = (int64_t)pTmp;
|
||||
memcpy((char*)dst->pz, (char*)src->pz, (size_t)len);
|
||||
}
|
||||
}
|
||||
|
||||
void tscColumnBaseCopy(SColumnBase* dst, const SColumnBase* src) {
|
||||
assert(src != NULL && dst != NULL);
|
||||
|
||||
*dst = *src;
|
||||
|
||||
if (src->numOfFilters > 0) {
|
||||
dst->filterInfo = calloc(1, src->numOfFilters * sizeof(SColumnFilterInfo));
|
||||
|
||||
for (int32_t j = 0; j < src->numOfFilters; ++j) {
|
||||
tscColumnFilterInfoCopy(&dst->filterInfo[j], &src->filterInfo[j]);
|
||||
}
|
||||
if (i >= numOfCols || numOfCols == 0) {
|
||||
SColumn* b = calloc(1, sizeof(SColumn));
|
||||
b->colIndex = *pColIndex;
|
||||
|
||||
taosArrayInsert(pColumnList, i, &b);
|
||||
} else {
|
||||
assert(src->filterInfo == NULL);
|
||||
SColumn* pCol = taosArrayGetP(pColumnList, i);
|
||||
|
||||
if (i < numOfCols && (pCol->colIndex.columnIndex > col || pCol->colIndex.tableIndex != pColIndex->tableIndex)) {
|
||||
SColumn* b = calloc(1, sizeof(SColumn));
|
||||
b->colIndex = *pColIndex;
|
||||
|
||||
taosArrayInsert(pColumnList, i, &b);
|
||||
}
|
||||
}
|
||||
|
||||
return taosArrayGetP(pColumnList, i);
|
||||
}
|
||||
|
||||
void tscColumnBaseInfoCopy(SColumnBaseInfo* dst, const SColumnBaseInfo* src, int16_t tableIndex) {
|
||||
SColumnFilterInfo* tscFilterInfoClone(const SColumnFilterInfo* src, int32_t numOfFilters) {
|
||||
SColumnFilterInfo* pFilter = NULL;
|
||||
if (numOfFilters > 0) {
|
||||
pFilter = calloc(1, numOfFilters * sizeof(SColumnFilterInfo));
|
||||
} else {
|
||||
assert(src == NULL);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
memcpy(pFilter, src, sizeof(SColumnFilterInfo) * numOfFilters);
|
||||
for (int32_t j = 0; j < numOfFilters; ++j) {
|
||||
if (pFilter[j].filterstr) {
|
||||
size_t len = (size_t) pFilter[j].len + 1;
|
||||
|
||||
char* pTmp = calloc(1, len);
|
||||
pFilter[j].pz = (int64_t) pTmp;
|
||||
|
||||
memcpy((char*)pFilter[j].pz, (char*)src->pz, (size_t)len);
|
||||
}
|
||||
}
|
||||
|
||||
assert(src->filterstr == 0 || src->filterstr == 1);
|
||||
assert(!(src->lowerRelOptr == TSDB_RELATION_INVALID && src->upperRelOptr == TSDB_RELATION_INVALID));
|
||||
|
||||
return pFilter;
|
||||
}
|
||||
|
||||
static void destroyFilterInfo(SColumnFilterInfo* pFilterInfo, int32_t numOfFilters) {
|
||||
for(int32_t i = 0; i < numOfFilters; ++i) {
|
||||
if (pFilterInfo[i].filterstr) {
|
||||
tfree(pFilterInfo[i].pz);
|
||||
}
|
||||
}
|
||||
|
||||
tfree(pFilterInfo);
|
||||
}
|
||||
|
||||
SColumn* tscColumnClone(const SColumn* src) {
|
||||
assert(src != NULL);
|
||||
|
||||
SColumn* dst = calloc(1, sizeof(SColumn));
|
||||
|
||||
dst->colIndex = src->colIndex;
|
||||
dst->numOfFilters = src->numOfFilters;
|
||||
dst->filterInfo = tscFilterInfoClone(src->filterInfo, src->numOfFilters);
|
||||
|
||||
return dst;
|
||||
}
|
||||
|
||||
static void tscColumnDestroy(SColumn* pCol) {
|
||||
destroyFilterInfo(pCol->filterInfo, pCol->numOfFilters);
|
||||
free(pCol);
|
||||
}
|
||||
|
||||
void tscColumnListAssign(SArray* dst, const SArray* src, int16_t tableIndex) {
|
||||
if (src == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
size_t num = taosArrayGetSize(src);
|
||||
for (int32_t i = 0; i < num; ++i) {
|
||||
SColumn* pCol = taosArrayGetP(src, i);
|
||||
|
||||
*dst = *src;
|
||||
dst->pColList = calloc(1, sizeof(SColumnBase) * dst->numOfAlloc);
|
||||
|
||||
int16_t num = 0;
|
||||
for (int32_t i = 0; i < src->numOfCols; ++i) {
|
||||
if (src->pColList[i].colIndex.tableIndex == tableIndex || tableIndex < 0) {
|
||||
dst->pColList[num] = src->pColList[i];
|
||||
|
||||
if (dst->pColList[num].numOfFilters > 0) {
|
||||
dst->pColList[num].filterInfo = calloc(1, dst->pColList[num].numOfFilters * sizeof(SColumnFilterInfo));
|
||||
|
||||
for (int32_t j = 0; j < dst->pColList[num].numOfFilters; ++j) {
|
||||
tscColumnFilterInfoCopy(&dst->pColList[num].filterInfo[j], &src->pColList[i].filterInfo[j]);
|
||||
}
|
||||
}
|
||||
|
||||
num += 1;
|
||||
if (pCol->colIndex.tableIndex == tableIndex || tableIndex < 0) {
|
||||
SColumn* p = tscColumnClone(pCol);
|
||||
taosArrayPush(dst, &p);
|
||||
}
|
||||
}
|
||||
|
||||
dst->numOfCols = num;
|
||||
}
|
||||
|
||||
void tscColumnBaseInfoDestroy(SColumnBaseInfo* pColumnBaseInfo) {
|
||||
void tscColumnListDestroy(SArray* pColumnBaseInfo) {
|
||||
if (pColumnBaseInfo == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
assert(pColumnBaseInfo->numOfCols <= TSDB_MAX_COLUMNS);
|
||||
|
||||
for (int32_t i = 0; i < pColumnBaseInfo->numOfCols; ++i) {
|
||||
SColumnBase* pColBase = &(pColumnBaseInfo->pColList[i]);
|
||||
|
||||
if (pColBase->numOfFilters > 0) {
|
||||
for (int32_t j = 0; j < pColBase->numOfFilters; ++j) {
|
||||
assert(pColBase->filterInfo[j].filterOnBinary == 0 || pColBase->filterInfo[j].filterOnBinary == 1);
|
||||
|
||||
if (pColBase->filterInfo[j].filterOnBinary) {
|
||||
free((char*)pColBase->filterInfo[j].pz);
|
||||
pColBase->filterInfo[j].pz = 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
tfree(pColBase->filterInfo);
|
||||
size_t num = taosArrayGetSize(pColumnBaseInfo);
|
||||
for (int32_t i = 0; i < num; ++i) {
|
||||
SColumn* pCol = taosArrayGetP(pColumnBaseInfo, i);
|
||||
tscColumnDestroy(pCol);
|
||||
}
|
||||
|
||||
tfree(pColumnBaseInfo->pColList);
|
||||
}
|
||||
|
||||
void tscColumnBaseInfoReserve(SColumnBaseInfo* pColumnBaseInfo, int32_t size) {
|
||||
_cf_ensureSpace(pColumnBaseInfo, size);
|
||||
taosArrayDestroy(pColumnBaseInfo);
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -1883,7 +1826,7 @@ static void doClearSubqueryInfo(SQueryInfo* pQueryInfo) {
|
|||
tscSqlExprInfoDestroy(&pQueryInfo->exprsInfo);
|
||||
memset(&pQueryInfo->exprsInfo, 0, sizeof(pQueryInfo->exprsInfo));
|
||||
|
||||
tscColumnBaseInfoDestroy(&pQueryInfo->colList);
|
||||
tscColumnListDestroy(pQueryInfo->colList);
|
||||
memset(&pQueryInfo->colList, 0, sizeof(pQueryInfo->colList));
|
||||
|
||||
pQueryInfo->tsBuf = tsBufDestory(pQueryInfo->tsBuf);
|
||||
|
@ -2070,7 +2013,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
|
|||
return NULL;
|
||||
}
|
||||
|
||||
tscColumnBaseInfoCopy(&pNewQueryInfo->colList, &pQueryInfo->colList, (int16_t)tableIndex);
|
||||
tscColumnListAssign(pNewQueryInfo->colList, pQueryInfo->colList, (int16_t)tableIndex);
|
||||
|
||||
// set the correct query type
|
||||
if (pPrevSql != NULL) {
|
||||
|
@ -2149,11 +2092,13 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
|
|||
}
|
||||
|
||||
if (cmd == TSDB_SQL_SELECT) {
|
||||
size_t size = taosArrayGetSize(pNewQueryInfo->colList);
|
||||
|
||||
tscTrace(
|
||||
"%p new subquery: %p, tableIndex:%d, vnodeIdx:%d, type:%d, exprInfo:%d, colList:%d,"
|
||||
"fieldInfo:%d, name:%s, qrang:%" PRId64 " - %" PRId64 " order:%d, limit:%" PRId64,
|
||||
pSql, pNew, tableIndex, pTableMetaInfo->vgroupIndex, pNewQueryInfo->type, pNewQueryInfo->exprsInfo.numOfExprs,
|
||||
pNewQueryInfo->colList.numOfCols, pNewQueryInfo->fieldsInfo.numOfOutputCols, pFinalInfo->name, pNewQueryInfo->stime,
|
||||
size, pNewQueryInfo->fieldsInfo.numOfOutputCols, pFinalInfo->name, pNewQueryInfo->stime,
|
||||
pNewQueryInfo->etime, pNewQueryInfo->order.order, pNewQueryInfo->limit.limit);
|
||||
|
||||
tscPrintSelectClause(pNew, 0);
|
||||
|
|
|
@ -413,7 +413,7 @@ typedef struct SSqlFunctionExpr {
|
|||
typedef struct SColumnFilterInfo {
|
||||
int16_t lowerRelOptr;
|
||||
int16_t upperRelOptr;
|
||||
int16_t filterOnBinary; /* denote if current column is binary */
|
||||
int16_t filterstr; // denote if current column is char(binary/nchar)
|
||||
|
||||
union {
|
||||
struct {
|
||||
|
|
|
@ -146,7 +146,7 @@ typedef struct STsdbQueryCond {
|
|||
STimeWindow twindow;
|
||||
int32_t order; // desc/asc order to iterate the data block
|
||||
int32_t numOfCols;
|
||||
SColumnInfoData *colList;
|
||||
SColumnInfo *colList;
|
||||
} STsdbQueryCond;
|
||||
|
||||
typedef struct SBlockInfo {
|
||||
|
|
|
@ -90,7 +90,7 @@ typedef struct SColumnFilterElem {
|
|||
} SColumnFilterElem;
|
||||
|
||||
typedef struct SSingleColumnFilterInfo {
|
||||
SColumnInfoData info;
|
||||
SColumnInfo info;
|
||||
int32_t numOfFilters;
|
||||
SColumnFilterElem* pFilters;
|
||||
void* pData;
|
||||
|
@ -129,14 +129,13 @@ typedef struct SQuery {
|
|||
int32_t rowSize;
|
||||
SSqlGroupbyExpr* pGroupbyExpr;
|
||||
SSqlFunctionExpr* pSelectExpr;
|
||||
SColumnInfoData* colList;
|
||||
SColumnInfo* colList;
|
||||
int32_t numOfFilterCols;
|
||||
int64_t* defaultVal;
|
||||
TSKEY lastKey;
|
||||
uint32_t status; // query status
|
||||
SResultRec rec;
|
||||
int32_t pos;
|
||||
int64_t pointsOffset; // the number of points offset to save read data
|
||||
SData** sdata;
|
||||
SSingleColumnFilterInfo* pFilterInfo;
|
||||
} SQuery;
|
||||
|
|
|
@ -38,4 +38,9 @@ void createQueryResultInfo(SQuery *pQuery, SWindowResult *pResultRow, bool isSTa
|
|||
|
||||
char *getPosInResultPage(SQueryRuntimeEnv *pRuntimeEnv, int32_t columnIndex, SWindowResult *pResult);
|
||||
|
||||
__filter_func_t *getRangeFilterFuncArray(int32_t type);
|
||||
__filter_func_t *getValueFilterFuncArray(int32_t type);
|
||||
|
||||
bool supportPrefilter(int32_t type);
|
||||
|
||||
#endif // TDENGINE_QUERYUTIL_H
|
||||
|
|
|
@ -27,6 +27,7 @@
|
|||
#include "tscompression.h"
|
||||
#include "tsdbMain.h" //todo use TableId instead of STable object
|
||||
#include "ttime.h"
|
||||
#include "tscUtil.h" // todo move the function to common module
|
||||
|
||||
#define DEFAULT_INTERN_BUF_SIZE 16384L
|
||||
|
||||
|
@ -52,8 +53,8 @@
|
|||
|
||||
/* get the qinfo struct address from the query struct address */
|
||||
#define GET_COLUMN_BYTES(query, colidx) \
|
||||
((query)->colList[(query)->pSelectExpr[colidx].pBase.colInfo.colIndex].info.bytes)
|
||||
#define GET_COLUMN_TYPE(query, colidx) ((query)->colList[(query)->pSelectExpr[colidx].pBase.colInfo.colIndex].info.type)
|
||||
((query)->colList[(query)->pSelectExpr[colidx].pBase.colInfo.colIndex].bytes)
|
||||
#define GET_COLUMN_TYPE(query, colidx) ((query)->colList[(query)->pSelectExpr[colidx].pBase.colInfo.colIndex].type)
|
||||
|
||||
typedef struct SPointInterpoSupporter {
|
||||
int32_t numOfCols;
|
||||
|
@ -223,19 +224,19 @@ bool getNeighborPoints(SQInfo *pQInfo, void *pMeterObj, SPointInterpoSupporter *
|
|||
return true;
|
||||
}
|
||||
|
||||
bool vnodeDoFilterData(SQuery *pQuery, int32_t elemPos) {
|
||||
bool doFilterData(SQuery *pQuery, int32_t elemPos) {
|
||||
for (int32_t k = 0; k < pQuery->numOfFilterCols; ++k) {
|
||||
SSingleColumnFilterInfo *pFilterInfo = &pQuery->pFilterInfo[k];
|
||||
char * pElem = pFilterInfo->pData + pFilterInfo->info.info.bytes * elemPos;
|
||||
|
||||
if (isNull(pElem, pFilterInfo->info.info.type)) {
|
||||
|
||||
char *pElem = pFilterInfo->pData + pFilterInfo->info.bytes * elemPos;
|
||||
if (isNull(pElem, pFilterInfo->info.type)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
int32_t num = pFilterInfo->numOfFilters;
|
||||
bool qualified = false;
|
||||
for (int32_t j = 0; j < num; ++j) {
|
||||
bool qualified = false;
|
||||
for (int32_t j = 0; j < pFilterInfo->numOfFilters; ++j) {
|
||||
SColumnFilterElem *pFilterElem = &pFilterInfo->pFilters[j];
|
||||
|
||||
if (pFilterElem->fp(pFilterElem, pElem, pElem)) {
|
||||
qualified = true;
|
||||
break;
|
||||
|
@ -252,7 +253,7 @@ bool vnodeDoFilterData(SQuery *pQuery, int32_t elemPos) {
|
|||
|
||||
bool vnodeFilterData(SQuery *pQuery, int32_t *numOfActualRead, int32_t index) {
|
||||
(*numOfActualRead)++;
|
||||
if (!vnodeDoFilterData(pQuery, index)) {
|
||||
if (!doFilterData(pQuery, index)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -333,8 +334,8 @@ int16_t getGroupbyColumnType(SQuery *pQuery, SSqlGroupbyExpr *pGroupbyExpr) {
|
|||
}
|
||||
|
||||
for (int32_t i = 0; i < pQuery->numOfCols; ++i) {
|
||||
if (colId == pQuery->colList[i].info.colId) {
|
||||
type = pQuery->colList[i].info.type;
|
||||
if (colId == pQuery->colList[i].colId) {
|
||||
type = pQuery->colList[i].type;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -367,7 +368,7 @@ bool isSelectivityWithTagsQuery(SQuery *pQuery) {
|
|||
|
||||
bool isTSCompQuery(SQuery *pQuery) { return pQuery->pSelectExpr[0].pBase.functionId == TSDB_FUNC_TS_COMP; }
|
||||
|
||||
bool doRevisedResultsByLimit(SQInfo *pQInfo) {
|
||||
bool limitResults(SQInfo *pQInfo) {
|
||||
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
|
||||
|
||||
if ((pQuery->limit.limit > 0) && (pQuery->rec.total + pQuery->rec.rows > pQuery->limit.limit)) {
|
||||
|
@ -848,10 +849,9 @@ char *getDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sas, int3
|
|||
}
|
||||
|
||||
for (int32_t i = 0; i < pQuery->numOfCols; ++i) {
|
||||
SColumnInfo *pColMsg = &pQuery->colList[i].info;
|
||||
SColumnInfo *pColMsg = &pQuery->colList[i];
|
||||
assert(0);
|
||||
// char * pData = doGetDataBlocks(pQuery, pRuntimeEnv->colDataBuffer, pQuery->colList[i].colIdxInBuf);
|
||||
|
||||
// char * pData = doGetDataBlocks(pQuery, pRuntimeEnv->colDataBuffer, pQuery->colList[i].colIdxInBuf);
|
||||
sas->elemSize[i] = pColMsg->bytes;
|
||||
// sas->data[i] = pData + pCtx->startOffset * sas->elemSize[i]; // start from the offset
|
||||
}
|
||||
|
@ -860,7 +860,7 @@ char *getDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sas, int3
|
|||
sas->offset = 0;
|
||||
} else { // other type of query function
|
||||
SColIndex *pCol = &pQuery->pSelectExpr[col].pBase.colInfo;
|
||||
if (TSDB_COL_IS_TAG(pCol->flag)) {
|
||||
if (TSDB_COL_IS_TAG(pCol->flag) || pDataBlock == NULL) {
|
||||
dataBlock = NULL;
|
||||
} else {
|
||||
/*
|
||||
|
@ -868,11 +868,8 @@ char *getDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sas, int3
|
|||
* stage, the remain meter may not have the required column in cache actually. So, the validation of required
|
||||
* column in cache with the corresponding meter schema is reinforced.
|
||||
*/
|
||||
if (pDataBlock == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
int32_t numOfCols = taosArrayGetSize(pDataBlock);
|
||||
|
||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
SColumnInfoData *p = taosArrayGet(pDataBlock, i);
|
||||
if (pCol->colId == p->info.colId) {
|
||||
|
@ -896,7 +893,7 @@ char *getDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sas, int3
|
|||
* @return the incremental number of output value, so it maybe 0 for fixed number of query,
|
||||
* such as count/min/max etc.
|
||||
*/
|
||||
static void blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pStatis,
|
||||
static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pStatis,
|
||||
SDataBlockInfo *pDataBlockInfo, SWindowResInfo *pWindowResInfo,
|
||||
__block_search_fn_t searchFn, SArray *pDataBlock) {
|
||||
SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx;
|
||||
|
@ -1021,7 +1018,7 @@ static UNUSED_FUNC char *getGroupbyColumnData(SQuery *pQuery, SData **data, int1
|
|||
int32_t colId = pGroupbyExpr->columnInfo[k].colId;
|
||||
|
||||
for (int32_t i = 0; i < pQuery->numOfCols; ++i) {
|
||||
if (pQuery->colList[i].info.colId == colId) {
|
||||
if (pQuery->colList[i].colId == colId) {
|
||||
colIndex = i;
|
||||
break;
|
||||
}
|
||||
|
@ -1029,8 +1026,8 @@ static UNUSED_FUNC char *getGroupbyColumnData(SQuery *pQuery, SData **data, int1
|
|||
|
||||
assert(colIndex >= 0 && colIndex < pQuery->numOfCols);
|
||||
|
||||
*type = pQuery->colList[colIndex].info.type;
|
||||
*bytes = pQuery->colList[colIndex].info.bytes;
|
||||
*type = pQuery->colList[colIndex].type;
|
||||
*bytes = pQuery->colList[colIndex].bytes;
|
||||
|
||||
// groupbyColumnData = doGetDataBlocks(pQuery, data, pQuery->colList[colIndex].inf);
|
||||
break;
|
||||
|
@ -1093,22 +1090,14 @@ static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx
|
|||
return true;
|
||||
}
|
||||
|
||||
static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pStatis,
|
||||
SDataBlockInfo *pDataBlockInfo, SWindowResInfo *pWindowResInfo,
|
||||
SArray *pDataBlock) {
|
||||
static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pStatis, SDataBlockInfo *pDataBlockInfo,
|
||||
SWindowResInfo *pWindowResInfo, SArray *pDataBlock) {
|
||||
SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx;
|
||||
SQuery * pQuery = pRuntimeEnv->pQuery;
|
||||
TSKEY * primaryKeyCol = (TSKEY *)taosArrayGet(pDataBlock, 0);
|
||||
|
||||
// SData **data = pRuntimeEnv->colDataBuffer;
|
||||
|
||||
int64_t prevNumOfRes = 0;
|
||||
bool groupbyStateValue = isGroupbyNormalCol(pQuery->pGroupbyExpr);
|
||||
|
||||
if (!groupbyStateValue) {
|
||||
prevNumOfRes = getNumOfResult(pRuntimeEnv);
|
||||
}
|
||||
|
||||
SQuery *pQuery = pRuntimeEnv->pQuery;
|
||||
TSKEY *primaryKeyCol = (TSKEY*) ((SColumnInfoData *)taosArrayGet(pDataBlock, 0))->pData;
|
||||
|
||||
bool groupbyStateValue = isGroupbyNormalCol(pQuery->pGroupbyExpr);
|
||||
SArithmeticSupport *sasArray = calloc((size_t)pQuery->numOfOutputCols, sizeof(SArithmeticSupport));
|
||||
|
||||
int16_t type = 0;
|
||||
|
@ -1134,16 +1123,10 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStat
|
|||
|
||||
// set the input column data
|
||||
for (int32_t k = 0; k < pQuery->numOfFilterCols; ++k) {
|
||||
// SSingleColumnFilterInfo *pFilterInfo = &pQuery->pFilterInfo[k];
|
||||
assert(0);
|
||||
/*
|
||||
* NOTE: here the tbname/tags column cannot reach here, since it will never be a filter column,
|
||||
* so we do NOT check if is a tag or not
|
||||
*/
|
||||
// pFilterInfo->pData = doGetDataBlocks(pQuery, data, pFilterInfo->info.colIdxInBuf);
|
||||
SSingleColumnFilterInfo *pFilterInfo = &pQuery->pFilterInfo[k];
|
||||
pFilterInfo->pData = getDataBlocks(pRuntimeEnv, &sasArray[k], pFilterInfo->info.colId, pDataBlockInfo->rows, pDataBlock);
|
||||
}
|
||||
|
||||
int32_t numOfRes = 0;
|
||||
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
|
||||
|
||||
// from top to bottom in desc
|
||||
|
@ -1171,7 +1154,7 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStat
|
|||
}
|
||||
}
|
||||
|
||||
if (pQuery->numOfFilterCols > 0 && (!vnodeDoFilterData(pQuery, offset))) {
|
||||
if (pQuery->numOfFilterCols > 0 && (!doFilterData(pQuery, offset))) {
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -1181,9 +1164,7 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStat
|
|||
int64_t ts = primaryKeyCol[offset];
|
||||
STimeWindow win = getActiveTimeWindow(pWindowResInfo, ts, pQuery);
|
||||
|
||||
assert(0);
|
||||
int32_t ret = 0;
|
||||
// int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pRuntimeEnv->pTabObj->sid, &win);
|
||||
int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo->sid, &win);
|
||||
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
|
||||
continue;
|
||||
}
|
||||
|
@ -1197,8 +1178,6 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStat
|
|||
lastKey = ts;
|
||||
STimeWindow nextWin = win;
|
||||
int32_t index = pWindowResInfo->curIndex;
|
||||
assert(0);
|
||||
int32_t sid = 0; // pRuntimeEnv->pTabObj->sid;
|
||||
|
||||
while (1) {
|
||||
getNextTimeWindow(pQuery, &nextWin);
|
||||
|
@ -1213,7 +1192,7 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStat
|
|||
}
|
||||
|
||||
// null data, failed to allocate more memory buffer
|
||||
if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, sid, &nextWin) != TSDB_CODE_SUCCESS) {
|
||||
if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo->sid, &nextWin) != TSDB_CODE_SUCCESS) {
|
||||
break;
|
||||
}
|
||||
|
||||
|
@ -1254,54 +1233,10 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStat
|
|||
break;
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* pointsOffset is the maximum available space in result buffer update the actual forward step for query that
|
||||
* requires checking buffer during loop
|
||||
*/
|
||||
if ((pQuery->checkBuffer == 1) && (++numOfRes) >= pQuery->pointsOffset) {
|
||||
pQuery->lastKey = lastKey + step;
|
||||
assert(0);
|
||||
// *forwardStep = j + 1;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
pQuery->lastKey = lastKey + step;
|
||||
free(sasArray);
|
||||
|
||||
/*
|
||||
* No need to calculate the number of output results for group-by normal columns, interval query
|
||||
* because the results of group by normal column is put into intermediate buffer.
|
||||
*/
|
||||
int32_t num = 0;
|
||||
if (!groupbyStateValue && !isIntervalQuery(pQuery)) {
|
||||
num = getNumOfResult(pRuntimeEnv) - prevNumOfRes;
|
||||
}
|
||||
|
||||
return num;
|
||||
}
|
||||
|
||||
static UNUSED_FUNC int32_t reviseForwardSteps(SQueryRuntimeEnv *pRuntimeEnv, int32_t forwardStep) {
|
||||
/*
|
||||
* 1. If value filter exists, we try all data in current block, and do not set the QUERY_RESBUF_FULL flag.
|
||||
*
|
||||
* 2. In case of top/bottom/ts_comp query, the checkBuffer == 1 and pQuery->numOfFilterCols
|
||||
* may be 0 or not. We do not check the capacity of output buffer, since the filter function will do it.
|
||||
*
|
||||
* 3. In handling the query of secondary query of join, tsBuf servers as a ts filter.
|
||||
*/
|
||||
SQuery *pQuery = pRuntimeEnv->pQuery;
|
||||
|
||||
if (isTopBottomQuery(pQuery) || isTSCompQuery(pQuery) || pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTSBuf != NULL) {
|
||||
return forwardStep;
|
||||
}
|
||||
|
||||
// current buffer does not have enough space, try in the next loop
|
||||
if ((pQuery->checkBuffer == 1) && (pQuery->pointsOffset <= forwardStep)) {
|
||||
forwardStep = pQuery->pointsOffset;
|
||||
}
|
||||
|
||||
return forwardStep;
|
||||
}
|
||||
|
||||
static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pDataBlockInfo,
|
||||
|
@ -1310,9 +1245,9 @@ static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBl
|
|||
SQuery *pQuery = pRuntimeEnv->pQuery;
|
||||
|
||||
if (pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTSBuf != NULL || isGroupbyNormalCol(pQuery->pGroupbyExpr)) {
|
||||
/*numOfRes = */ rowwiseApplyAllFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pWindowResInfo, pDataBlock);
|
||||
rowwiseApplyFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pWindowResInfo, pDataBlock);
|
||||
} else {
|
||||
blockwiseApplyAllFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pWindowResInfo, searchFn, pDataBlock);
|
||||
blockwiseApplyFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pWindowResInfo, searchFn, pDataBlock);
|
||||
}
|
||||
|
||||
TSKEY lastKey = QUERY_IS_ASC_QUERY(pQuery) ? pDataBlockInfo->window.ekey : pDataBlockInfo->window.skey;
|
||||
|
@ -1854,7 +1789,7 @@ static void setScanLimitationByResultBuffer(SQuery *pQuery) {
|
|||
bool vnodeParametersSafetyCheck(SQuery *pQuery) {
|
||||
// load data column information is incorrect
|
||||
for (int32_t i = 0; i < pQuery->numOfCols - 1; ++i) {
|
||||
if (pQuery->colList[i].info.colId == pQuery->colList[i + 1].info.colId) {
|
||||
if (pQuery->colList[i].colId == pQuery->colList[i + 1].colId) {
|
||||
qError("QInfo:%p invalid data load column for query", GET_QINFO_ADDR(pQuery));
|
||||
return false;
|
||||
}
|
||||
|
@ -2133,7 +2068,7 @@ void pointInterpSupporterInit(SQuery *pQuery, SPointInterpoSupporter *pInterpoSu
|
|||
/* get appropriated size for one row data source*/
|
||||
int32_t len = 0;
|
||||
for (int32_t i = 0; i < pQuery->numOfCols; ++i) {
|
||||
len += pQuery->colList[i].info.bytes;
|
||||
len += pQuery->colList[i].bytes;
|
||||
}
|
||||
|
||||
// assert(PRIMARY_TSCOL_LOADED(pQuery));
|
||||
|
@ -2147,7 +2082,7 @@ void pointInterpSupporterInit(SQuery *pQuery, SPointInterpoSupporter *pInterpoSu
|
|||
pInterpoSupport->pPrevPoint[i] = prev + offset;
|
||||
pInterpoSupport->pNextPoint[i] = next + offset;
|
||||
|
||||
offset += pQuery->colList[i].info.bytes;
|
||||
offset += pQuery->colList[i].bytes;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -3252,7 +3187,7 @@ void initCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv) {
|
|||
}
|
||||
}
|
||||
|
||||
void doSkipResults(SQueryRuntimeEnv *pRuntimeEnv) {
|
||||
void skipResults(SQueryRuntimeEnv *pRuntimeEnv) {
|
||||
SQuery *pQuery = pRuntimeEnv->pQuery;
|
||||
if (pQuery->rec.rows == 0 || pQuery->limit.offset == 0) {
|
||||
return;
|
||||
|
@ -3260,23 +3195,21 @@ void doSkipResults(SQueryRuntimeEnv *pRuntimeEnv) {
|
|||
|
||||
if (pQuery->rec.rows <= pQuery->limit.offset) {
|
||||
pQuery->limit.offset -= pQuery->rec.rows;
|
||||
|
||||
pQuery->rec.rows = 0;
|
||||
// pQuery->pointsOffset = pQuery->rec.pointsToRead; // clear all data in result buffer
|
||||
|
||||
resetCtxOutputBuf(pRuntimeEnv);
|
||||
|
||||
// clear the buffer is full flag if exists
|
||||
pQuery->status &= (~QUERY_RESBUF_FULL);
|
||||
} else {
|
||||
int32_t numOfSkip = (int32_t)pQuery->limit.offset;
|
||||
int32_t numOfSkip = (int32_t) pQuery->limit.offset;
|
||||
pQuery->rec.rows -= numOfSkip;
|
||||
|
||||
for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
|
||||
int32_t functionId = pQuery->pSelectExpr[i].pBase.functionId;
|
||||
int32_t bytes = pRuntimeEnv->pCtx[i].outputBytes;
|
||||
assert(0);
|
||||
// memmove(pQuery->sdata[i]->data, pQuery->sdata[i]->data + bytes * numOfSkip, pQuery->size * bytes);
|
||||
|
||||
memmove(pQuery->sdata[i]->data, pQuery->sdata[i]->data + bytes * numOfSkip, pQuery->rec.rows * bytes);
|
||||
pRuntimeEnv->pCtx[i].aOutputBuf += bytes * numOfSkip;
|
||||
|
||||
if (functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM) {
|
||||
|
@ -3864,9 +3797,9 @@ void stableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, STableDataInfo *
|
|||
SWindowResInfo * pWindowResInfo = &pTableQueryInfo->windowResInfo;
|
||||
|
||||
if (pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTSBuf != NULL) {
|
||||
// numOfRes = rowwiseApplyAllFunctions(pRuntimeEnv, &forwardStep, pFields, pDataBlockInfo, pWindowResInfo);
|
||||
rowwiseApplyFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pWindowResInfo, pDataBlock);
|
||||
} else {
|
||||
blockwiseApplyAllFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pWindowResInfo, searchFn, pDataBlock);
|
||||
blockwiseApplyFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pWindowResInfo, searchFn, pDataBlock);
|
||||
}
|
||||
|
||||
updateWindowResNumOfRes(pRuntimeEnv, pTableDataInfo);
|
||||
|
@ -4667,10 +4600,10 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
|
|||
scanAllDataBlocks(pRuntimeEnv);
|
||||
|
||||
pQuery->rec.rows = getNumOfResult(pRuntimeEnv);
|
||||
doSkipResults(pRuntimeEnv);
|
||||
skipResults(pRuntimeEnv);
|
||||
|
||||
// the limitation of output result is reached, set the query completed
|
||||
if (doRevisedResultsByLimit(pQInfo)) {
|
||||
if (limitResults(pQInfo)) {
|
||||
pQInfo->tableIndex = pQInfo->groupInfo.numOfTables;
|
||||
break;
|
||||
}
|
||||
|
@ -4963,8 +4896,8 @@ static void tableFixedOutputProcess(SQInfo *pQInfo) {
|
|||
assert(isTopBottomQuery(pQuery));
|
||||
}
|
||||
|
||||
doSkipResults(pRuntimeEnv);
|
||||
doRevisedResultsByLimit(pQInfo);
|
||||
skipResults(pRuntimeEnv);
|
||||
limitResults(pQInfo);
|
||||
}
|
||||
|
||||
static void tableMultiOutputProcess(SQInfo *pQInfo) {
|
||||
|
@ -4993,7 +4926,7 @@ static void tableMultiOutputProcess(SQInfo *pQInfo) {
|
|||
|
||||
pQuery->rec.rows = getNumOfResult(pRuntimeEnv);
|
||||
if (pQuery->limit.offset > 0 && pQuery->numOfFilterCols > 0 && pQuery->rec.rows > 0) {
|
||||
doSkipResults(pRuntimeEnv);
|
||||
skipResults(pRuntimeEnv);
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -5010,7 +4943,7 @@ static void tableMultiOutputProcess(SQInfo *pQInfo) {
|
|||
resetCtxOutputBuf(pRuntimeEnv);
|
||||
}
|
||||
|
||||
doRevisedResultsByLimit(pQInfo);
|
||||
limitResults(pQInfo);
|
||||
if (Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL)) {
|
||||
qTrace("QInfo:%p query paused due to output limitation, next qrange:%" PRId64 "-%" PRId64, pQInfo, pQuery->lastKey,
|
||||
pQuery->window.ekey);
|
||||
|
@ -5079,7 +5012,7 @@ static void tableIntervalProcess(SQInfo *pQInfo) {
|
|||
|
||||
// the offset is handled at prepare stage if no interpolation involved
|
||||
if (pQuery->interpoType == TSDB_INTERPO_NONE) {
|
||||
doRevisedResultsByLimit(pQInfo);
|
||||
limitResults(pQInfo);
|
||||
break;
|
||||
} else {
|
||||
taosInterpoSetStartInfo(&pRuntimeEnv->interpoInfo, pQuery->rec.rows, pQuery->interpoType);
|
||||
|
@ -5095,7 +5028,7 @@ static void tableIntervalProcess(SQInfo *pQInfo) {
|
|||
|
||||
qTrace("QInfo: %p interpo completed, final:%d", pQInfo, pQuery->rec.rows);
|
||||
if (pQuery->rec.rows > 0 || Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) {
|
||||
doRevisedResultsByLimit(pQInfo);
|
||||
limitResults(pQInfo);
|
||||
break;
|
||||
}
|
||||
|
||||
|
@ -5129,7 +5062,7 @@ static void tableQueryImpl(SQInfo *pQInfo) {
|
|||
pQuery->rec.rows = vnodeQueryResultInterpolate(pQInfo, (tFilePage **)pQuery->sdata,
|
||||
(tFilePage **)pRuntimeEnv->pInterpoBuf, remain, &numOfInterpo);
|
||||
|
||||
doRevisedResultsByLimit(pQInfo);
|
||||
limitResults(pQInfo);
|
||||
|
||||
pQInfo->pointsInterpo += numOfInterpo;
|
||||
qTrace("QInfo:%p current:%d returned, total:%d", pQInfo, pQuery->rec.rows, pQuery->rec.total);
|
||||
|
@ -5361,11 +5294,11 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
|
|||
SColumnFilterInfo *pFilterInfo = (SColumnFilterInfo *)pMsg;
|
||||
SColumnFilterInfo *pDestFilterInfo = &pColInfo->filters[f];
|
||||
|
||||
pDestFilterInfo->filterOnBinary = htons(pFilterInfo->filterOnBinary);
|
||||
pDestFilterInfo->filterstr = htons(pFilterInfo->filterstr);
|
||||
|
||||
pMsg += sizeof(SColumnFilterInfo);
|
||||
|
||||
if (pDestFilterInfo->filterOnBinary) {
|
||||
if (pDestFilterInfo->filterstr) {
|
||||
pDestFilterInfo->len = htobe64(pFilterInfo->len);
|
||||
|
||||
pDestFilterInfo->pz = (int64_t)calloc(1, pDestFilterInfo->len + 1);
|
||||
|
@ -5635,9 +5568,9 @@ static SSqlGroupbyExpr *createGroupbyExprFromMsg(SQueryTableMsg *pQueryMsg, SCol
|
|||
return pGroupbyExpr;
|
||||
}
|
||||
|
||||
static int32_t vnodeCreateFilterInfo(void *pQInfo, SQuery *pQuery) {
|
||||
static int32_t createFilterInfo(void *pQInfo, SQuery *pQuery) {
|
||||
for (int32_t i = 0; i < pQuery->numOfCols; ++i) {
|
||||
if (pQuery->colList[i].info.numOfFilters > 0) {
|
||||
if (pQuery->colList[i].numOfFilters > 0) {
|
||||
pQuery->numOfFilterCols++;
|
||||
}
|
||||
}
|
||||
|
@ -5649,18 +5582,18 @@ static int32_t vnodeCreateFilterInfo(void *pQInfo, SQuery *pQuery) {
|
|||
pQuery->pFilterInfo = calloc(1, sizeof(SSingleColumnFilterInfo) * pQuery->numOfFilterCols);
|
||||
|
||||
for (int32_t i = 0, j = 0; i < pQuery->numOfCols; ++i) {
|
||||
if (pQuery->colList[i].info.numOfFilters > 0) {
|
||||
if (pQuery->colList[i].numOfFilters > 0) {
|
||||
SSingleColumnFilterInfo *pFilterInfo = &pQuery->pFilterInfo[j];
|
||||
|
||||
memcpy(&pFilterInfo->info, &pQuery->colList[i], sizeof(SColumnInfoData));
|
||||
pFilterInfo->info.info.filters = NULL;
|
||||
|
||||
pFilterInfo->numOfFilters = pQuery->colList[i].info.numOfFilters;
|
||||
pFilterInfo->info = pQuery->colList[i];
|
||||
|
||||
pFilterInfo->numOfFilters = pQuery->colList[i].numOfFilters;
|
||||
pFilterInfo->pFilters = calloc(pFilterInfo->numOfFilters, sizeof(SColumnFilterElem));
|
||||
|
||||
for (int32_t f = 0; f < pFilterInfo->numOfFilters; ++f) {
|
||||
SColumnFilterElem *pSingleColFilter = &pFilterInfo->pFilters[f];
|
||||
pSingleColFilter->filterInfo = pQuery->colList[i].info.filters[f];
|
||||
pSingleColFilter->filterInfo = pQuery->colList[i].filters[f];
|
||||
|
||||
int32_t lower = pSingleColFilter->filterInfo.lowerRelOptr;
|
||||
int32_t upper = pSingleColFilter->filterInfo.upperRelOptr;
|
||||
|
@ -5670,11 +5603,12 @@ static int32_t vnodeCreateFilterInfo(void *pQInfo, SQuery *pQuery) {
|
|||
return TSDB_CODE_INVALID_QUERY_MSG;
|
||||
}
|
||||
|
||||
int16_t type = pQuery->colList[i].info.type;
|
||||
int16_t bytes = pQuery->colList[i].info.bytes;
|
||||
int16_t type = pQuery->colList[i].type;
|
||||
int16_t bytes = pQuery->colList[i].bytes;
|
||||
|
||||
__filter_func_t *rangeFilterArray = NULL; // vnodeGetRangeFilterFuncArray(type);
|
||||
__filter_func_t *filterArray = NULL; // vnodeGetValueFilterFuncArray(type);
|
||||
// todo refactor
|
||||
__filter_func_t *rangeFilterArray = getRangeFilterFuncArray(type);
|
||||
__filter_func_t *filterArray = getValueFilterFuncArray(type);
|
||||
|
||||
if (rangeFilterArray == NULL && filterArray == NULL) {
|
||||
qError("QInfo:%p failed to get filter function, invalid data type:%d", pQInfo, type);
|
||||
|
@ -5744,7 +5678,7 @@ static void doUpdateExprColumnIndex(SQuery *pQuery) {
|
|||
|
||||
SColIndex *pColIndexEx = &pSqlExprMsg->colInfo;
|
||||
for (int32_t f = 0; f < pQuery->numOfCols; ++f) {
|
||||
if (pColIndexEx->colId == pQuery->colList[f].info.colId) {
|
||||
if (pColIndexEx->colId == pQuery->colList[f].colId) {
|
||||
pColIndexEx->colIndex = f;
|
||||
break;
|
||||
}
|
||||
|
@ -5765,44 +5699,29 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou
|
|||
int16_t numOfCols = pQueryMsg->numOfCols;
|
||||
int16_t numOfOutputCols = pQueryMsg->numOfOutputCols;
|
||||
|
||||
pQuery->numOfCols = numOfCols;
|
||||
pQuery->numOfCols = numOfCols;
|
||||
pQuery->numOfOutputCols = numOfOutputCols;
|
||||
|
||||
pQuery->limit.limit = pQueryMsg->limit;
|
||||
pQuery->limit.offset = pQueryMsg->offset;
|
||||
|
||||
pQuery->order.order = pQueryMsg->order;
|
||||
pQuery->limit.limit = pQueryMsg->limit;
|
||||
pQuery->limit.offset = pQueryMsg->offset;
|
||||
pQuery->order.order = pQueryMsg->order;
|
||||
pQuery->order.orderColId = pQueryMsg->orderColId;
|
||||
|
||||
pQuery->pSelectExpr = pExprs;
|
||||
pQuery->pGroupbyExpr = pGroupbyExpr;
|
||||
|
||||
pQuery->intervalTime = pQueryMsg->intervalTime;
|
||||
|
||||
pQuery->slidingTime = pQueryMsg->slidingTime;
|
||||
pQuery->pSelectExpr = pExprs;
|
||||
pQuery->pGroupbyExpr = pGroupbyExpr;
|
||||
pQuery->intervalTime = pQueryMsg->intervalTime;
|
||||
pQuery->slidingTime = pQueryMsg->slidingTime;
|
||||
pQuery->slidingTimeUnit = pQueryMsg->slidingTimeUnit;
|
||||
|
||||
pQuery->interpoType = pQueryMsg->interpoType;
|
||||
pQuery->interpoType = pQueryMsg->interpoType;
|
||||
|
||||
pQuery->colList = calloc(1, sizeof(SSingleColumnFilterInfo) * numOfCols);
|
||||
if (pQuery->colList == NULL) {
|
||||
goto _clean_memory;
|
||||
goto _cleanup;
|
||||
}
|
||||
|
||||
for (int16_t i = 0; i < numOfCols; ++i) {
|
||||
pQuery->colList[i].info = pQueryMsg->colList[i];
|
||||
pQuery->colList[i] = pQueryMsg->colList[i];
|
||||
|
||||
SColumnInfo *pColInfo = &pQuery->colList[i].info;
|
||||
pColInfo->filters = NULL;
|
||||
// if (colList[i].numOfFilters > 0) {
|
||||
// pColInfo->filters = calloc(1, colList[i].numOfFilters * sizeof(SColumnFilterInfo));
|
||||
//
|
||||
// for (int32_t j = 0; j < colList[i].numOfFilters; ++j) {
|
||||
// tscColumnFilterInfoCopy(&pColInfo->filters[j], &colList[i].filters[j]);
|
||||
// }
|
||||
// } else {
|
||||
// pQuery->colList[i].data.filters = NULL;
|
||||
// }
|
||||
SColumnInfo *pColInfo = &pQuery->colList[i];
|
||||
pColInfo->filters = tscFilterInfoClone(pQueryMsg->colList[i].filters, pColInfo->numOfFilters);
|
||||
}
|
||||
|
||||
// calculate the result row size
|
||||
|
@ -5813,15 +5732,15 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou
|
|||
|
||||
doUpdateExprColumnIndex(pQuery);
|
||||
|
||||
int32_t ret = vnodeCreateFilterInfo(pQInfo, pQuery);
|
||||
int32_t ret = createFilterInfo(pQInfo, pQuery);
|
||||
if (ret != TSDB_CODE_SUCCESS) {
|
||||
goto _clean_memory;
|
||||
goto _cleanup;
|
||||
}
|
||||
|
||||
// prepare the result buffer
|
||||
pQuery->sdata = (SData **)calloc(pQuery->numOfOutputCols, POINTER_BYTES);
|
||||
if (pQuery->sdata == NULL) {
|
||||
goto _clean_memory;
|
||||
goto _cleanup;
|
||||
}
|
||||
|
||||
// set the output buffer capacity
|
||||
|
@ -5835,14 +5754,14 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou
|
|||
size_t size = (pQuery->rec.capacity + 1) * pExprs[col].resBytes + pExprs[col].interResBytes + sizeof(SData);
|
||||
pQuery->sdata[col] = (SData *)calloc(1, size);
|
||||
if (pQuery->sdata[col] == NULL) {
|
||||
goto _clean_memory;
|
||||
goto _cleanup;
|
||||
}
|
||||
}
|
||||
|
||||
if (pQuery->interpoType != TSDB_INTERPO_NONE) {
|
||||
pQuery->defaultVal = malloc(sizeof(int64_t) * pQuery->numOfOutputCols);
|
||||
if (pQuery->defaultVal == NULL) {
|
||||
goto _clean_memory;
|
||||
goto _cleanup;
|
||||
}
|
||||
|
||||
// the first column is the timestamp
|
||||
|
@ -5861,7 +5780,7 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou
|
|||
|
||||
if (sem_init(&pQInfo->dataReady, 0, 0) != 0) {
|
||||
qError("QInfo:%p init dataReady sem failed, reason:%s", pQInfo, strerror(errno));
|
||||
goto _clean_memory;
|
||||
goto _cleanup;
|
||||
}
|
||||
|
||||
vnodeParametersSafetyCheck(pQuery);
|
||||
|
@ -5869,7 +5788,7 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou
|
|||
qTrace("qmsg:%p QInfo:%p created", pQueryMsg, pQInfo);
|
||||
return pQInfo;
|
||||
|
||||
_clean_memory:
|
||||
_cleanup:
|
||||
tfree(pQuery->defaultVal);
|
||||
|
||||
if (pQuery->sdata != NULL) {
|
||||
|
@ -6082,7 +6001,6 @@ int32_t qCreateQueryInfo(void *tsdb, SQueryTableMsg *pQueryMsg, qinfo_t *pQInfo)
|
|||
goto _query_over;
|
||||
}
|
||||
|
||||
// todo check vnode status
|
||||
if (pTableIdList == NULL || taosArrayGetSize(pTableIdList) == 0) {
|
||||
qError("qmsg:%p, SQueryTableMsg wrong format", pQueryMsg);
|
||||
code = TSDB_CODE_INVALID_QUERY_MSG;
|
||||
|
|
|
@ -0,0 +1,558 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#define _DEFAULT_SOURCE
|
||||
#include "os.h"
|
||||
|
||||
#include "taosmsg.h"
|
||||
#include "tsqlfunction.h"
|
||||
#include "queryExecutor.h"
|
||||
#include "tcompare.h"
|
||||
|
||||
bool less_i8(SColumnFilterElem *pFilter, char *minval, char *maxval) {
|
||||
return (*(int8_t *)minval < pFilter->filterInfo.upperBndi);
|
||||
}
|
||||
|
||||
bool less_i16(SColumnFilterElem *pFilter, char *minval, char *maxval) {
|
||||
return (*(int16_t *)minval < pFilter->filterInfo.upperBndi);
|
||||
}
|
||||
|
||||
bool less_i32(SColumnFilterElem *pFilter, char *minval, char *maxval) {
|
||||
return (*(int32_t *)minval < pFilter->filterInfo.upperBndi);
|
||||
}
|
||||
|
||||
bool less_i64(SColumnFilterElem *pFilter, char *minval, char *maxval) {
|
||||
return (*(int64_t *)minval < pFilter->filterInfo.upperBndi);
|
||||
}
|
||||
|
||||
bool less_ds(SColumnFilterElem *pFilter, char *minval, char *maxval) {
|
||||
return (*(float *)minval < pFilter->filterInfo.upperBndd);
|
||||
}
|
||||
|
||||
bool less_dd(SColumnFilterElem *pFilter, char *minval, char *maxval) {
|
||||
return (*(double *)minval < pFilter->filterInfo.upperBndd);
|
||||
}
|
||||
|
||||
//////////////////////////////////////////////////////////////////
|
||||
bool large_i8(SColumnFilterElem *pFilter, char *minval, char *maxval) {
|
||||
return (*(int8_t *)maxval > pFilter->filterInfo.lowerBndi);
|
||||
}
|
||||
|
||||
bool large_i16(SColumnFilterElem *pFilter, char *minval, char *maxval) {
|
||||
return (*(int16_t *)maxval > pFilter->filterInfo.lowerBndi);
|
||||
}
|
||||
|
||||
bool large_i32(SColumnFilterElem *pFilter, char *minval, char *maxval) {
|
||||
return (*(int32_t *)maxval > pFilter->filterInfo.lowerBndi);
|
||||
}
|
||||
|
||||
bool large_i64(SColumnFilterElem *pFilter, char *minval, char *maxval) {
|
||||
return (*(int64_t *)maxval > pFilter->filterInfo.lowerBndi);
|
||||
}
|
||||
|
||||
bool large_ds(SColumnFilterElem *pFilter, char *minval, char *maxval) {
|
||||
return (*(float *)maxval > pFilter->filterInfo.lowerBndd);
|
||||
}
|
||||
|
||||
bool large_dd(SColumnFilterElem *pFilter, char *minval, char *maxval) {
|
||||
return (*(double *)maxval > pFilter->filterInfo.lowerBndd);
|
||||
}
|
||||
/////////////////////////////////////////////////////////////////////
|
||||
|
||||
bool lessEqual_i8(SColumnFilterElem *pFilter, char *minval, char *maxval) {
|
||||
return (*(int8_t *)minval <= pFilter->filterInfo.upperBndi);
|
||||
}
|
||||
|
||||
bool lessEqual_i16(SColumnFilterElem *pFilter, char *minval, char *maxval) {
|
||||
return (*(int16_t *)minval <= pFilter->filterInfo.upperBndi);
|
||||
}
|
||||
|
||||
bool lessEqual_i32(SColumnFilterElem *pFilter, char *minval, char *maxval) {
|
||||
return (*(int32_t *)minval <= pFilter->filterInfo.upperBndi);
|
||||
}
|
||||
|
||||
bool lessEqual_i64(SColumnFilterElem *pFilter, char *minval, char *maxval) {
|
||||
return (*(int64_t *)minval <= pFilter->filterInfo.upperBndi);
|
||||
}
|
||||
|
||||
bool lessEqual_ds(SColumnFilterElem *pFilter, char *minval, char *maxval) {
|
||||
return (*(float *)minval <= pFilter->filterInfo.upperBndd);
|
||||
}
|
||||
|
||||
bool lessEqual_dd(SColumnFilterElem *pFilter, char *minval, char *maxval) {
|
||||
return (*(double *)minval <= pFilter->filterInfo.upperBndd);
|
||||
}
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////
|
||||
bool largeEqual_i8(SColumnFilterElem *pFilter, char *minval, char *maxval) {
|
||||
return (*(int8_t *)maxval >= pFilter->filterInfo.lowerBndi);
|
||||
}
|
||||
|
||||
bool largeEqual_i16(SColumnFilterElem *pFilter, char *minval, char *maxval) {
|
||||
return (*(int16_t *)maxval >= pFilter->filterInfo.lowerBndi);
|
||||
}
|
||||
|
||||
bool largeEqual_i32(SColumnFilterElem *pFilter, char *minval, char *maxval) {
|
||||
return (*(int32_t *)maxval >= pFilter->filterInfo.lowerBndi);
|
||||
}
|
||||
|
||||
bool largeEqual_i64(SColumnFilterElem *pFilter, char *minval, char *maxval) {
|
||||
return (*(int64_t *)maxval >= pFilter->filterInfo.lowerBndi);
|
||||
}
|
||||
|
||||
bool largeEqual_ds(SColumnFilterElem *pFilter, char *minval, char *maxval) {
|
||||
return (*(float *)maxval >= pFilter->filterInfo.lowerBndd);
|
||||
}
|
||||
|
||||
bool largeEqual_dd(SColumnFilterElem *pFilter, char *minval, char *maxval) {
|
||||
return (*(double *)maxval >= pFilter->filterInfo.lowerBndd);
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////
|
||||
|
||||
bool equal_i8(SColumnFilterElem *pFilter, char *minval, char *maxval) {
|
||||
if (*(int8_t *)minval == *(int8_t *)maxval) {
|
||||
return (*(int8_t *)minval == pFilter->filterInfo.lowerBndi);
|
||||
} else { /* range filter */
|
||||
assert(*(int8_t *)minval < *(int8_t *)maxval);
|
||||
|
||||
return *(int8_t *)minval <= pFilter->filterInfo.lowerBndi && *(int8_t *)maxval >= pFilter->filterInfo.lowerBndi;
|
||||
}
|
||||
}
|
||||
|
||||
bool equal_i16(SColumnFilterElem *pFilter, char *minval, char *maxval) {
|
||||
if (*(int16_t *)minval == *(int16_t *)maxval) {
|
||||
return (*(int16_t *)minval == pFilter->filterInfo.lowerBndi);
|
||||
} else { /* range filter */
|
||||
assert(*(int16_t *)minval < *(int16_t *)maxval);
|
||||
|
||||
return *(int16_t *)minval <= pFilter->filterInfo.lowerBndi && *(int16_t *)maxval >= pFilter->filterInfo.lowerBndi;
|
||||
}
|
||||
}
|
||||
|
||||
bool equal_i32(SColumnFilterElem *pFilter, char *minval, char *maxval) {
|
||||
if (*(int32_t *)minval == *(int32_t *)maxval) {
|
||||
return (*(int32_t *)minval == pFilter->filterInfo.lowerBndi);
|
||||
} else { /* range filter */
|
||||
assert(*(int32_t *)minval < *(int32_t *)maxval);
|
||||
|
||||
return *(int32_t *)minval <= pFilter->filterInfo.lowerBndi && *(int32_t *)maxval >= pFilter->filterInfo.lowerBndi;
|
||||
}
|
||||
}
|
||||
|
||||
bool equal_i64(SColumnFilterElem *pFilter, char *minval, char *maxval) {
|
||||
if (*(int64_t *)minval == *(int64_t *)maxval) {
|
||||
return (*(int64_t *)minval == pFilter->filterInfo.lowerBndi);
|
||||
} else { /* range filter */
|
||||
assert(*(int64_t *)minval < *(int64_t *)maxval);
|
||||
|
||||
return *(int64_t *)minval <= pFilter->filterInfo.lowerBndi && *(int64_t *)maxval >= pFilter->filterInfo.lowerBndi;
|
||||
}
|
||||
}
|
||||
|
||||
bool equal_ds(SColumnFilterElem *pFilter, char *minval, char *maxval) {
|
||||
if (*(float *)minval == *(float *)maxval) {
|
||||
return (fabs(*(float *)minval - pFilter->filterInfo.lowerBndd) <= FLT_EPSILON);
|
||||
} else { /* range filter */
|
||||
assert(*(float *)minval < *(float *)maxval);
|
||||
return *(float *)minval <= pFilter->filterInfo.lowerBndd && *(float *)maxval >= pFilter->filterInfo.lowerBndd;
|
||||
}
|
||||
}
|
||||
|
||||
bool equal_dd(SColumnFilterElem *pFilter, char *minval, char *maxval) {
|
||||
if (*(double *)minval == *(double *)maxval) {
|
||||
return (*(double *)minval == pFilter->filterInfo.lowerBndd);
|
||||
} else { /* range filter */
|
||||
assert(*(double *)minval < *(double *)maxval);
|
||||
|
||||
return *(double *)minval <= pFilter->filterInfo.lowerBndi && *(double *)maxval >= pFilter->filterInfo.lowerBndi;
|
||||
}
|
||||
}
|
||||
|
||||
bool equal_str(SColumnFilterElem *pFilter, char *minval, char *maxval) {
|
||||
// query condition string is greater than the max length of string, not qualified data
|
||||
if (pFilter->filterInfo.len > pFilter->bytes) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return strncmp((char *)pFilter->filterInfo.pz, minval, pFilter->bytes) == 0;
|
||||
}
|
||||
|
||||
bool equal_nchar(SColumnFilterElem *pFilter, char *minval, char *maxval) {
|
||||
// query condition string is greater than the max length of string, not qualified data
|
||||
if (pFilter->filterInfo.len > pFilter->bytes) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return wcsncmp((wchar_t *)pFilter->filterInfo.pz, (wchar_t*) minval, pFilter->bytes/TSDB_NCHAR_SIZE) == 0;
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////
|
||||
bool like_str(SColumnFilterElem *pFilter, char *minval, char *maxval) {
|
||||
SPatternCompareInfo info = PATTERN_COMPARE_INFO_INITIALIZER;
|
||||
|
||||
return patternMatch((char *)pFilter->filterInfo.pz, minval, pFilter->bytes, &info) == TSDB_PATTERN_MATCH;
|
||||
}
|
||||
|
||||
bool like_nchar(SColumnFilterElem* pFilter, char* minval, char *maxval) {
|
||||
SPatternCompareInfo info = PATTERN_COMPARE_INFO_INITIALIZER;
|
||||
|
||||
return WCSPatternMatch((wchar_t*) pFilter->filterInfo.pz, (wchar_t*) minval, pFilter->bytes/TSDB_NCHAR_SIZE, &info) == TSDB_PATTERN_MATCH;
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////
|
||||
/**
|
||||
* If minval equals to maxval, it may serve as the one element filter,
|
||||
* or all elements of an array are identical during pref-filter stage.
|
||||
* Otherwise, it must be pre-filter of array list of elements.
|
||||
*
|
||||
* During pre-filter stage, if there is one element that locates in [minval, maxval],
|
||||
* the filter function will return true.
|
||||
*/
|
||||
bool nequal_i8(SColumnFilterElem *pFilter, char *minval, char *maxval) {
|
||||
if (*(int8_t *)minval == *(int8_t *)maxval) {
|
||||
return (*(int8_t *)minval != pFilter->filterInfo.lowerBndi);
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool nequal_i16(SColumnFilterElem *pFilter, char *minval, char *maxval) {
|
||||
if (*(int16_t *)minval == *(int16_t *)maxval) {
|
||||
return (*(int16_t *)minval != pFilter->filterInfo.lowerBndi);
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool nequal_i32(SColumnFilterElem *pFilter, char *minval, char *maxval) {
|
||||
if (*(int32_t *)minval == *(int32_t *)maxval) {
|
||||
return (*(int32_t *)minval != pFilter->filterInfo.lowerBndi);
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool nequal_i64(SColumnFilterElem *pFilter, char *minval, char *maxval) {
|
||||
if (*(int64_t *)minval == *(int64_t *)maxval) {
|
||||
return (*(int64_t *)minval != pFilter->filterInfo.lowerBndi);
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool nequal_ds(SColumnFilterElem *pFilter, char *minval, char *maxval) {
|
||||
if (*(float *)minval == *(float *)maxval) {
|
||||
return (*(float *)minval != pFilter->filterInfo.lowerBndd);
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool nequal_dd(SColumnFilterElem *pFilter, char *minval, char *maxval) {
|
||||
if (*(double *)minval == *(double *)maxval) {
|
||||
return (*(double *)minval != pFilter->filterInfo.lowerBndd);
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool nequal_str(SColumnFilterElem *pFilter, char *minval, char *maxval) {
|
||||
if (pFilter->filterInfo.len > pFilter->bytes) {
|
||||
return true;
|
||||
}
|
||||
|
||||
return strncmp((char *)pFilter->filterInfo.pz, minval, pFilter->bytes) != 0;
|
||||
}
|
||||
|
||||
bool nequal_nchar(SColumnFilterElem *pFilter, char* minval, char *maxval) {
|
||||
if (pFilter->filterInfo.len > pFilter->bytes) {
|
||||
return true;
|
||||
}
|
||||
|
||||
return wcsncmp((wchar_t *)pFilter->filterInfo.pz, (wchar_t*)minval, pFilter->bytes/TSDB_NCHAR_SIZE) != 0;
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////
|
||||
|
||||
bool rangeFilter_i32_ii(SColumnFilterElem *pFilter, char *minval, char *maxval) {
|
||||
return (*(int32_t *)minval <= pFilter->filterInfo.upperBndi && *(int32_t *)maxval >= pFilter->filterInfo.lowerBndi);
|
||||
}
|
||||
|
||||
bool rangeFilter_i32_ee(SColumnFilterElem *pFilter, char *minval, char *maxval) {
|
||||
return (*(int32_t *)minval<pFilter->filterInfo.upperBndi &&*(int32_t *)maxval> pFilter->filterInfo.lowerBndi);
|
||||
}
|
||||
|
||||
bool rangeFilter_i32_ie(SColumnFilterElem *pFilter, char *minval, char *maxval) {
|
||||
return (*(int32_t *)minval < pFilter->filterInfo.upperBndi && *(int32_t *)maxval >= pFilter->filterInfo.lowerBndi);
|
||||
}
|
||||
|
||||
bool rangeFilter_i32_ei(SColumnFilterElem *pFilter, char *minval, char *maxval) {
|
||||
return (*(int32_t *)minval <= pFilter->filterInfo.upperBndi && *(int32_t *)maxval > pFilter->filterInfo.lowerBndi);
|
||||
}
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////
|
||||
bool rangeFilter_i8_ii(SColumnFilterElem *pFilter, char *minval, char *maxval) {
|
||||
return (*(int8_t *)minval <= pFilter->filterInfo.upperBndi && *(int8_t *)maxval >= pFilter->filterInfo.lowerBndi);
|
||||
}
|
||||
|
||||
bool rangeFilter_i8_ee(SColumnFilterElem *pFilter, char *minval, char *maxval) {
|
||||
return (*(int8_t *)minval<pFilter->filterInfo.upperBndi &&*(int8_t *)maxval> pFilter->filterInfo.lowerBndi);
|
||||
}
|
||||
|
||||
bool rangeFilter_i8_ie(SColumnFilterElem *pFilter, char *minval, char *maxval) {
|
||||
return (*(int8_t *)minval < pFilter->filterInfo.upperBndi && *(int8_t *)maxval >= pFilter->filterInfo.lowerBndi);
|
||||
}
|
||||
|
||||
bool rangeFilter_i8_ei(SColumnFilterElem *pFilter, char *minval, char *maxval) {
|
||||
return (*(int8_t *)minval <= pFilter->filterInfo.upperBndi && *(int8_t *)maxval > pFilter->filterInfo.lowerBndi);
|
||||
}
|
||||
|
||||
/////////////////////////////////////////////////////////////////////////////////////
|
||||
bool rangeFilter_i16_ii(SColumnFilterElem *pFilter, char *minval, char *maxval) {
|
||||
return (*(int16_t *)minval <= pFilter->filterInfo.upperBndi && *(int16_t *)maxval >= pFilter->filterInfo.lowerBndi);
|
||||
}
|
||||
|
||||
bool rangeFilter_i16_ee(SColumnFilterElem *pFilter, char *minval, char *maxval) {
|
||||
return (*(int16_t *)minval<pFilter->filterInfo.upperBndi &&*(int16_t *)maxval> pFilter->filterInfo.lowerBndi);
|
||||
}
|
||||
|
||||
bool rangeFilter_i16_ie(SColumnFilterElem *pFilter, char *minval, char *maxval) {
|
||||
return (*(int16_t *)minval < pFilter->filterInfo.upperBndi && *(int16_t *)maxval >= pFilter->filterInfo.lowerBndi);
|
||||
}
|
||||
|
||||
bool rangeFilter_i16_ei(SColumnFilterElem *pFilter, char *minval, char *maxval) {
|
||||
return (*(int16_t *)minval <= pFilter->filterInfo.upperBndi && *(int16_t *)maxval > pFilter->filterInfo.lowerBndi);
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////
|
||||
bool rangeFilter_i64_ii(SColumnFilterElem *pFilter, char *minval, char *maxval) {
|
||||
return (*(int64_t *)minval <= pFilter->filterInfo.upperBndi && *(int64_t *)maxval >= pFilter->filterInfo.lowerBndi);
|
||||
}
|
||||
|
||||
bool rangeFilter_i64_ee(SColumnFilterElem *pFilter, char *minval, char *maxval) {
|
||||
return (*(int64_t *)minval<pFilter->filterInfo.upperBndi &&*(int64_t *)maxval> pFilter->filterInfo.lowerBndi);
|
||||
}
|
||||
|
||||
bool rangeFilter_i64_ie(SColumnFilterElem *pFilter, char *minval, char *maxval) {
|
||||
return (*(int64_t *)minval < pFilter->filterInfo.upperBndi && *(int64_t *)maxval >= pFilter->filterInfo.lowerBndi);
|
||||
}
|
||||
|
||||
bool rangeFilter_i64_ei(SColumnFilterElem *pFilter, char *minval, char *maxval) {
|
||||
return (*(int64_t *)minval <= pFilter->filterInfo.upperBndi && *(int64_t *)maxval > pFilter->filterInfo.lowerBndi);
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////
|
||||
bool rangeFilter_ds_ii(SColumnFilterElem *pFilter, char *minval, char *maxval) {
|
||||
return (*(float *)minval <= pFilter->filterInfo.upperBndd && *(float *)maxval >= pFilter->filterInfo.lowerBndd);
|
||||
}
|
||||
|
||||
bool rangeFilter_ds_ee(SColumnFilterElem *pFilter, char *minval, char *maxval) {
|
||||
return (*(float *)minval<pFilter->filterInfo.upperBndd &&*(float *)maxval> pFilter->filterInfo.lowerBndd);
|
||||
}
|
||||
|
||||
bool rangeFilter_ds_ie(SColumnFilterElem *pFilter, char *minval, char *maxval) {
|
||||
return (*(float *)minval < pFilter->filterInfo.upperBndd && *(float *)maxval >= pFilter->filterInfo.lowerBndd);
|
||||
}
|
||||
|
||||
bool rangeFilter_ds_ei(SColumnFilterElem *pFilter, char *minval, char *maxval) {
|
||||
return (*(float *)minval <= pFilter->filterInfo.upperBndd && *(float *)maxval > pFilter->filterInfo.lowerBndd);
|
||||
}
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////
|
||||
bool rangeFilter_dd_ii(SColumnFilterElem *pFilter, char *minval, char *maxval) {
|
||||
return (*(double *)minval <= pFilter->filterInfo.upperBndd && *(double *)maxval >= pFilter->filterInfo.lowerBndd);
|
||||
}
|
||||
|
||||
bool rangeFilter_dd_ee(SColumnFilterElem *pFilter, char *minval, char *maxval) {
|
||||
return (*(double *)minval<pFilter->filterInfo.upperBndd &&*(double *)maxval> pFilter->filterInfo.lowerBndd);
|
||||
}
|
||||
|
||||
bool rangeFilter_dd_ie(SColumnFilterElem *pFilter, char *minval, char *maxval) {
|
||||
return (*(double *)minval < pFilter->filterInfo.upperBndd && *(double *)maxval >= pFilter->filterInfo.lowerBndd);
|
||||
}
|
||||
|
||||
bool rangeFilter_dd_ei(SColumnFilterElem *pFilter, char *minval, char *maxval) {
|
||||
return (*(double *)minval <= pFilter->filterInfo.upperBndd && *(double *)maxval > pFilter->filterInfo.lowerBndd);
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////
|
||||
bool (*filterFunc_i8[])(SColumnFilterElem *pFilter, char *minval, char *maxval) = {
|
||||
NULL,
|
||||
less_i8,
|
||||
large_i8,
|
||||
equal_i8,
|
||||
lessEqual_i8,
|
||||
largeEqual_i8,
|
||||
nequal_i8,
|
||||
NULL,
|
||||
};
|
||||
|
||||
bool (*filterFunc_i16[])(SColumnFilterElem *pFilter, char *minval, char *maxval) = {
|
||||
NULL,
|
||||
less_i16,
|
||||
large_i16,
|
||||
equal_i16,
|
||||
lessEqual_i16,
|
||||
largeEqual_i16,
|
||||
nequal_i16,
|
||||
NULL,
|
||||
};
|
||||
|
||||
bool (*filterFunc_i32[])(SColumnFilterElem *pFilter, char *minval, char *maxval) = {
|
||||
NULL,
|
||||
less_i32,
|
||||
large_i32,
|
||||
equal_i32,
|
||||
lessEqual_i32,
|
||||
largeEqual_i32,
|
||||
nequal_i32,
|
||||
NULL,
|
||||
};
|
||||
|
||||
bool (*filterFunc_i64[])(SColumnFilterElem *pFilter, char *minval, char *maxval) = {
|
||||
NULL,
|
||||
less_i64,
|
||||
large_i64,
|
||||
equal_i64,
|
||||
lessEqual_i64,
|
||||
largeEqual_i64,
|
||||
nequal_i64,
|
||||
NULL,
|
||||
};
|
||||
|
||||
bool (*filterFunc_ds[])(SColumnFilterElem *pFilter, char *minval, char *maxval) = {
|
||||
NULL,
|
||||
less_ds,
|
||||
large_ds,
|
||||
equal_ds,
|
||||
lessEqual_ds,
|
||||
largeEqual_ds,
|
||||
nequal_ds,
|
||||
NULL,
|
||||
};
|
||||
|
||||
bool (*filterFunc_dd[])(SColumnFilterElem *pFilter, char *minval, char *maxval) = {
|
||||
NULL,
|
||||
less_dd,
|
||||
large_dd,
|
||||
equal_dd,
|
||||
lessEqual_dd,
|
||||
largeEqual_dd,
|
||||
nequal_dd,
|
||||
NULL,
|
||||
};
|
||||
|
||||
bool (*filterFunc_str[])(SColumnFilterElem* pFilter, char* minval, char *maxval) = {
|
||||
NULL,
|
||||
NULL,
|
||||
NULL,
|
||||
equal_str,
|
||||
NULL,
|
||||
NULL,
|
||||
nequal_str,
|
||||
like_str,
|
||||
};
|
||||
|
||||
bool (*filterFunc_nchar[])(SColumnFilterElem* pFitler, char* minval, char* maxval) = {
|
||||
NULL,
|
||||
NULL,
|
||||
NULL,
|
||||
equal_nchar,
|
||||
NULL,
|
||||
NULL,
|
||||
nequal_nchar,
|
||||
like_nchar,
|
||||
};
|
||||
|
||||
bool (*rangeFilterFunc_i8[])(SColumnFilterElem *pFilter, char *minval, char *maxval) = {
|
||||
NULL,
|
||||
rangeFilter_i8_ee,
|
||||
rangeFilter_i8_ie,
|
||||
rangeFilter_i8_ei,
|
||||
rangeFilter_i8_ii,
|
||||
};
|
||||
|
||||
bool (*rangeFilterFunc_i16[])(SColumnFilterElem *pFilter, char *minval, char *maxval) = {
|
||||
NULL,
|
||||
rangeFilter_i16_ee,
|
||||
rangeFilter_i16_ie,
|
||||
rangeFilter_i16_ei,
|
||||
rangeFilter_i16_ii,
|
||||
};
|
||||
|
||||
bool (*rangeFilterFunc_i32[])(SColumnFilterElem *pFilter, char *minval, char *maxval) = {
|
||||
NULL,
|
||||
rangeFilter_i32_ee,
|
||||
rangeFilter_i32_ie,
|
||||
rangeFilter_i32_ei,
|
||||
rangeFilter_i32_ii,
|
||||
};
|
||||
|
||||
bool (*rangeFilterFunc_i64[])(SColumnFilterElem *pFilter, char *minval, char *maxval) = {
|
||||
NULL,
|
||||
rangeFilter_i64_ee,
|
||||
rangeFilter_i64_ie,
|
||||
rangeFilter_i64_ei,
|
||||
rangeFilter_i64_ii,
|
||||
};
|
||||
|
||||
bool (*rangeFilterFunc_ds[])(SColumnFilterElem *pFilter, char *minval, char *maxval) = {
|
||||
NULL,
|
||||
rangeFilter_ds_ee,
|
||||
rangeFilter_ds_ie,
|
||||
rangeFilter_ds_ei,
|
||||
rangeFilter_ds_ii,
|
||||
};
|
||||
|
||||
bool (*rangeFilterFunc_dd[])(SColumnFilterElem *pFilter, char *minval, char *maxval) = {
|
||||
NULL,
|
||||
rangeFilter_dd_ee,
|
||||
rangeFilter_dd_ie,
|
||||
rangeFilter_dd_ei,
|
||||
rangeFilter_dd_ii,
|
||||
};
|
||||
|
||||
__filter_func_t* getRangeFilterFuncArray(int32_t type) {
|
||||
switch(type) {
|
||||
case TSDB_DATA_TYPE_BOOL: return rangeFilterFunc_i8;
|
||||
case TSDB_DATA_TYPE_TINYINT: return rangeFilterFunc_i8;
|
||||
case TSDB_DATA_TYPE_SMALLINT: return rangeFilterFunc_i16;
|
||||
case TSDB_DATA_TYPE_INT: return rangeFilterFunc_i32;
|
||||
case TSDB_DATA_TYPE_TIMESTAMP: //timestamp uses bigint filter
|
||||
case TSDB_DATA_TYPE_BIGINT: return rangeFilterFunc_i64;
|
||||
case TSDB_DATA_TYPE_FLOAT: return rangeFilterFunc_ds;
|
||||
case TSDB_DATA_TYPE_DOUBLE: return rangeFilterFunc_dd;
|
||||
default:return NULL;
|
||||
}
|
||||
}
|
||||
|
||||
__filter_func_t* getValueFilterFuncArray(int32_t type) {
|
||||
switch(type) {
|
||||
case TSDB_DATA_TYPE_BOOL: return filterFunc_i8;
|
||||
case TSDB_DATA_TYPE_TINYINT: return filterFunc_i8;
|
||||
case TSDB_DATA_TYPE_SMALLINT: return filterFunc_i16;
|
||||
case TSDB_DATA_TYPE_INT: return filterFunc_i32;
|
||||
case TSDB_DATA_TYPE_TIMESTAMP: //timestamp uses bigint filter
|
||||
case TSDB_DATA_TYPE_BIGINT: return filterFunc_i64;
|
||||
case TSDB_DATA_TYPE_FLOAT: return filterFunc_ds;
|
||||
case TSDB_DATA_TYPE_DOUBLE: return filterFunc_dd;
|
||||
case TSDB_DATA_TYPE_BINARY: return filterFunc_str;
|
||||
case TSDB_DATA_TYPE_NCHAR: return filterFunc_nchar;
|
||||
default: return NULL;
|
||||
}
|
||||
}
|
||||
|
||||
bool supportPrefilter(int32_t type) { return type != TSDB_DATA_TYPE_BINARY && type != TSDB_DATA_TYPE_NCHAR; }
|
|
@ -189,8 +189,8 @@ TsdbQueryHandleT* tsdbQueryTables(TsdbRepoT* tsdb, STsdbQueryCond* pCond, STable
|
|||
for (int32_t i = 0; i < pCond->numOfCols; ++i) {
|
||||
SColumnInfoData pDest = {{0}, 0};
|
||||
|
||||
pDest.info = pCond->colList[i].info;
|
||||
pDest.pData = calloc(1, EXTRA_BYTES + bufferCapacity * pCond->colList[i].info.bytes);
|
||||
pDest.info = pCond->colList[i];
|
||||
pDest.pData = calloc(1, EXTRA_BYTES + bufferCapacity * pCond->colList[i].bytes);
|
||||
taosArrayPush(pQueryHandle->pColumns, &pDest);
|
||||
}
|
||||
|
||||
|
@ -595,10 +595,8 @@ static void filterDataInDataBlock(STsdbQueryHandle* pQueryHandle, STableCheckInf
|
|||
SColumnInfoData* pCol = taosArrayGet(pQueryHandle->pColumns, j);
|
||||
|
||||
if (pCol->info.colId == colId) {
|
||||
// SDataCol* pDataCol = &pCols->cols[i];
|
||||
// pCol->pData = pQueryHandle->rhelper.pDataCols[0]->cols[i].pData + pCol->info.bytes * start;
|
||||
memmove(pCol->pData, pQueryHandle->rhelper.pDataCols[0]->cols[i].pData + pCol->info.bytes * start,
|
||||
pQueryHandle->realNumOfRows * pCol->info.bytes);
|
||||
memmove(pCol->pData, pQueryHandle->rhelper.pDataCols[0]->cols[i].pData + pCol->info.bytes * start,
|
||||
pQueryHandle->realNumOfRows * pCol->info.bytes);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -1082,7 +1080,7 @@ SArray* tsdbRetrieveDataBlock(TsdbQueryHandleT* pQueryHandle, SArray* pIdList) {
|
|||
return pHandle->pColumns;
|
||||
} else {
|
||||
STableBlockInfo* pBlockInfoEx = &pHandle->pDataBlockInfo[pHandle->cur.slot];
|
||||
STableCheckInfo* pCheckInfo = pBlockInfoEx->pTableCheckInfo;
|
||||
STableCheckInfo* pCheckInfo = pBlockInfoEx->pTableCheckInfo;
|
||||
|
||||
SDataBlockInfo binfo = getTrueDataBlockInfo(pCheckInfo, pBlockInfoEx->pBlock.compBlock);
|
||||
assert(pHandle->realNumOfRows <= binfo.rows);
|
||||
|
|
|
@ -61,7 +61,7 @@ void taosArrayPop(SArray* pArray);
|
|||
* @param index
|
||||
* @return
|
||||
*/
|
||||
void* taosArrayGet(SArray* pArray, size_t index);
|
||||
void* taosArrayGet(const SArray* pArray, size_t index);
|
||||
|
||||
/**
|
||||
* get the pointer data from the array
|
||||
|
@ -69,7 +69,7 @@ void* taosArrayGet(SArray* pArray, size_t index);
|
|||
* @param index
|
||||
* @return
|
||||
*/
|
||||
void* taosArrayGetP(SArray* pArray, size_t index);
|
||||
void* taosArrayGetP(const SArray* pArray, size_t index);
|
||||
|
||||
/**
|
||||
* return the size of array
|
||||
|
|
|
@ -84,12 +84,12 @@ void taosArrayPop(SArray* pArray) {
|
|||
pArray->size -= 1;
|
||||
}
|
||||
|
||||
void* taosArrayGet(SArray* pArray, size_t index) {
|
||||
void* taosArrayGet(const SArray* pArray, size_t index) {
|
||||
assert(index < pArray->size);
|
||||
return TARRAY_GET_ELEM(pArray, index);
|
||||
}
|
||||
|
||||
void* taosArrayGetP(SArray* pArray, size_t index) {
|
||||
void* taosArrayGetP(const SArray* pArray, size_t index) {
|
||||
void* ret = taosArrayGet(pArray, index);
|
||||
if (ret == NULL) {
|
||||
return NULL;
|
||||
|
|
Loading…
Reference in New Issue