[td-2859]add test cases.
This commit is contained in:
parent
4142ec3eb8
commit
1ed54bc597
|
@ -424,7 +424,7 @@ int32_t tscCreateResPointerInfo(SSqlRes *pRes, SQueryInfo *pQueryInfo);
|
|||
void tscSetResRawPtr(SSqlRes* pRes, SQueryInfo* pQueryInfo);
|
||||
void tscSetResRawPtrRv(SSqlRes* pRes, SQueryInfo* pQueryInfo, SSDataBlock* pBlock);
|
||||
|
||||
void prepareInputDataFromUpstream(SSqlRes* pRes, SQueryInfo* pQueryInfo);
|
||||
void handleDownstreamOperator(SSqlRes* pRes, SQueryInfo* pQueryInfo);
|
||||
|
||||
void tscResetSqlCmd(SSqlCmd *pCmd, bool removeMeta);
|
||||
|
||||
|
|
|
@ -540,7 +540,7 @@ void tscDestroyLocalMerger(SSqlObj *pSql) {
|
|||
|
||||
static int32_t createOrderDescriptor(tOrderDescriptor **pOrderDesc, SSqlCmd *pCmd, SColumnModel *pModel) {
|
||||
int32_t numOfGroupByCols = 0;
|
||||
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex);
|
||||
SQueryInfo *pQueryInfo = tscGetActiveQueryInfo(pCmd);
|
||||
|
||||
if (pQueryInfo->groupbyExpr.numOfGroupCols > 0) {
|
||||
numOfGroupByCols = pQueryInfo->groupbyExpr.numOfGroupCols;
|
||||
|
@ -662,7 +662,7 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr
|
|||
SColumnModel *pModel = NULL;
|
||||
*pFinalModel = NULL;
|
||||
|
||||
SQueryInfo * pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex);
|
||||
SQueryInfo * pQueryInfo = tscGetActiveQueryInfo(pCmd);
|
||||
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
|
||||
|
||||
(*pMemBuffer) = (tExtMemBuffer **)malloc(POINTER_BYTES * pSql->subState.numOfSub);
|
||||
|
@ -1153,8 +1153,8 @@ int32_t doArithmeticCalculate(SQueryInfo* pQueryInfo, tFilePage* pOutput, int32_
|
|||
|
||||
// calculate the result from several other columns
|
||||
if (pSup->pExpr->pExpr != NULL) {
|
||||
arithSup.pArithExpr = pSup->pExpr;
|
||||
arithmeticTreeTraverse(arithSup.pArithExpr->pExpr, (int32_t) pOutput->num, pbuf + pOutput->num*offset, &arithSup, TSDB_ORDER_ASC, getArithmeticInputSrc);
|
||||
arithSup.pExprInfo = pSup->pExpr;
|
||||
arithmeticTreeTraverse(arithSup.pExprInfo->pExpr, (int32_t) pOutput->num, pbuf + pOutput->num*offset, &arithSup, TSDB_ORDER_ASC, getArithmeticInputSrc);
|
||||
} else {
|
||||
SExprInfo* pExpr = pSup->pExpr;
|
||||
memcpy(pbuf + pOutput->num * offset, pExpr->base.offset * pOutput->num + pOutput->data, (size_t)(pExpr->base.resBytes * pOutput->num));
|
||||
|
|
|
@ -77,7 +77,7 @@ static int32_t insertResultField(SQueryInfo* pQueryInfo, int32_t outputIndex, SC
|
|||
|
||||
static uint8_t convertOptr(SStrToken *pToken);
|
||||
|
||||
static int32_t validateSelectNodeList(SSqlCmd* pCmd, int32_t clauseIndex, SQueryInfo* pQueryInfo, SArray* pSelNodeList, bool isSTable, bool joinQuery, bool timeWindowQuery);
|
||||
static int32_t validateSelectNodeList(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SArray* pSelNodeList, bool isSTable, bool joinQuery, bool timeWindowQuery);
|
||||
|
||||
static bool validateIpAddress(const char* ip, size_t size);
|
||||
static bool hasUnsupportFunctionsForSTableQuery(SSqlCmd* pCmd, SQueryInfo* pQueryInfo);
|
||||
|
@ -1405,15 +1405,12 @@ void tscInsertPrimaryTsSourceColumn(SQueryInfo* pQueryInfo, uint64_t tableUid) {
|
|||
tscColumnListInsert(pQueryInfo->colList, PRIMARYKEY_TIMESTAMP_COL_INDEX, tableUid, &s);
|
||||
}
|
||||
|
||||
static int32_t handleArithmeticExpr(SSqlCmd* pCmd, int32_t clauseIndex, int32_t exprIndex, tSqlExprItem* pItem) {
|
||||
static int32_t handleArithmeticExpr(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t exprIndex, tSqlExprItem* pItem) {
|
||||
const char* msg1 = "invalid column name, illegal column type, or columns in arithmetic expression from two tables";
|
||||
const char* msg2 = "invalid arithmetic expression in select clause";
|
||||
const char* msg3 = "tag columns can not be used in arithmetic expression";
|
||||
const char* msg4 = "columns from different table mixed up in arithmetic expression";
|
||||
|
||||
// arithmetic function in select clause
|
||||
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd, clauseIndex);
|
||||
|
||||
SColumnList columnList = {0};
|
||||
int32_t arithmeticType = NON_ARITHMEIC_EXPR;
|
||||
|
||||
|
@ -1608,7 +1605,7 @@ bool isValidDistinctSql(SQueryInfo* pQueryInfo) {
|
|||
return false;
|
||||
}
|
||||
|
||||
int32_t validateSelectNodeList(SSqlCmd* pCmd, int32_t clauseIndex, SQueryInfo* pQueryInfo, SArray* pSelNodeList, bool isSTable, bool joinQuery,
|
||||
int32_t validateSelectNodeList(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SArray* pSelNodeList, bool isSTable, bool joinQuery,
|
||||
bool timeWindowQuery) {
|
||||
assert(pSelNodeList != NULL && pCmd != NULL);
|
||||
|
||||
|
@ -1655,7 +1652,7 @@ int32_t validateSelectNodeList(SSqlCmd* pCmd, int32_t clauseIndex, SQueryInfo* p
|
|||
return TSDB_CODE_TSC_INVALID_SQL;
|
||||
}
|
||||
} else if (type == SQL_NODE_EXPR) {
|
||||
int32_t code = handleArithmeticExpr(pCmd, clauseIndex, i, pItem);
|
||||
int32_t code = handleArithmeticExpr(pCmd, pQueryInfo, i, pItem);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
@ -6709,7 +6706,7 @@ int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) {
|
|||
}
|
||||
|
||||
bool isSTable = UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo);
|
||||
if (validateSelectNodeList(&pSql->cmd, 0, pQueryInfo, pSqlNode->pSelNodeList, isSTable, false, false) != TSDB_CODE_SUCCESS) {
|
||||
if (validateSelectNodeList(&pSql->cmd, pQueryInfo, pSqlNode->pSelNodeList, isSTable, false, false) != TSDB_CODE_SUCCESS) {
|
||||
return TSDB_CODE_TSC_INVALID_SQL;
|
||||
}
|
||||
|
||||
|
@ -6940,6 +6937,10 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, int32_t index) {
|
|||
return code;
|
||||
}
|
||||
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
||||
pQueryInfo = pCmd->pQueryInfo[0];
|
||||
|
||||
SQueryInfo* current = calloc(1, sizeof(SQueryInfo));
|
||||
|
@ -6954,11 +6955,12 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, int32_t index) {
|
|||
current->pTableMetaInfo = calloc(1, POINTER_BYTES);
|
||||
current->pTableMetaInfo[0] = pTableMetaInfo1;
|
||||
current->numOfTables = 1;
|
||||
current->order = pQueryInfo->order;
|
||||
|
||||
pCmd->pQueryInfo[0] = current;
|
||||
pQueryInfo->pDownstream = current;
|
||||
|
||||
if (validateSelectNodeList(pCmd, index, current, pSqlNode->pSelNodeList, false, false, false) != TSDB_CODE_SUCCESS) {
|
||||
if (validateSelectNodeList(pCmd, current, pSqlNode->pSelNodeList, false, false, false) != TSDB_CODE_SUCCESS) {
|
||||
return TSDB_CODE_TSC_INVALID_SQL;
|
||||
}
|
||||
|
||||
|
@ -7017,7 +7019,7 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, int32_t index) {
|
|||
int32_t timeWindowQuery =
|
||||
(TPARSER_HAS_TOKEN(pSqlNode->interval.interval) || TPARSER_HAS_TOKEN(pSqlNode->sessionVal.gap));
|
||||
|
||||
if (validateSelectNodeList(pCmd, index, pQueryInfo, pSqlNode->pSelNodeList, isSTable, joinQuery, timeWindowQuery) !=
|
||||
if (validateSelectNodeList(pCmd, pQueryInfo, pSqlNode->pSelNodeList, isSTable, joinQuery, timeWindowQuery) !=
|
||||
TSDB_CODE_SUCCESS) {
|
||||
return TSDB_CODE_TSC_INVALID_SQL;
|
||||
}
|
||||
|
|
|
@ -823,15 +823,15 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
|||
|
||||
// set column list ids
|
||||
size_t numOfCols = taosArrayGetSize(pQueryInfo->colList);
|
||||
char *pMsg = (char *)(pQueryMsg->colList) + numOfCols * sizeof(SColumnInfo);
|
||||
char *pMsg = (char *)(pQueryMsg->tableCols) + numOfCols * sizeof(SColumnInfo);
|
||||
|
||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
SColumnInfo *pCol = &query.colList[i];
|
||||
SColumnInfo *pCol = &query.tableCols[i];
|
||||
|
||||
pQueryMsg->colList[i].colId = htons(pCol->colId);
|
||||
pQueryMsg->colList[i].bytes = htons(pCol->bytes);
|
||||
pQueryMsg->colList[i].type = htons(pCol->type);
|
||||
pQueryMsg->colList[i].numOfFilters = htons(pCol->numOfFilters);
|
||||
pQueryMsg->tableCols[i].colId = htons(pCol->colId);
|
||||
pQueryMsg->tableCols[i].bytes = htons(pCol->bytes);
|
||||
pQueryMsg->tableCols[i].type = htons(pCol->type);
|
||||
pQueryMsg->tableCols[i].numOfFilters = htons(pCol->numOfFilters);
|
||||
|
||||
// append the filter information after the basic column information
|
||||
for (int32_t f = 0; f < pCol->numOfFilters; ++f) {
|
||||
|
@ -1569,6 +1569,7 @@ int tscProcessRetrieveLocalMergeRsp(SSqlObj *pSql) {
|
|||
return code;
|
||||
}
|
||||
|
||||
// global aggregation may be the upstream for parent query
|
||||
SQueryInfo *pQueryInfo = tscGetActiveQueryInfo(pCmd);
|
||||
if (pQueryInfo->pQInfo == NULL) {
|
||||
STableGroupInfo tableGroupInfo = {.numOfTables = 1, .pGroupList = taosArrayInit(1, POINTER_BYTES),};
|
||||
|
@ -1593,6 +1594,8 @@ int tscProcessRetrieveLocalMergeRsp(SSqlObj *pSql) {
|
|||
qTableQuery(pQueryInfo->pQInfo, &localQueryId);
|
||||
convertQueryResult(pRes, pQueryInfo);
|
||||
|
||||
handleDownstreamOperator(pRes, pQueryInfo);
|
||||
|
||||
code = pRes->code;
|
||||
if (pRes->code == TSDB_CODE_SUCCESS) {
|
||||
(*pSql->fp)(pSql->param, pSql, pRes->numOfRows);
|
||||
|
@ -2289,7 +2292,7 @@ int tscProcessRetrieveRspFromNode(SSqlObj *pSql) {
|
|||
tscSetResRawPtr(pRes, pQueryInfo);
|
||||
}
|
||||
|
||||
prepareInputDataFromUpstream(pRes, pQueryInfo);
|
||||
handleDownstreamOperator(pRes, pQueryInfo);
|
||||
|
||||
if (pSql->pSubscription != NULL) {
|
||||
int32_t numOfCols = pQueryInfo->fieldsInfo.numOfOutput;
|
||||
|
|
|
@ -2422,7 +2422,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
|
|||
|
||||
const uint32_t nBufferSize = (1u << 16u); // 64KB
|
||||
|
||||
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex);
|
||||
SQueryInfo *pQueryInfo = tscGetActiveQueryInfo(pCmd);
|
||||
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
|
||||
SSubqueryState *pState = &pSql->subState;
|
||||
|
||||
|
@ -2890,6 +2890,7 @@ static SSqlObj *tscCreateSTableSubquery(SSqlObj *pSql, SRetrieveSupport *trsuppo
|
|||
if (pNew != NULL) { // the sub query of two-stage super table query
|
||||
SQueryInfo *pQueryInfo = tscGetQueryInfo(&pNew->cmd, 0);
|
||||
|
||||
pNew->cmd.active = pQueryInfo;
|
||||
pQueryInfo->type |= TSDB_QUERY_TYPE_STABLE_SUBQUERY;
|
||||
|
||||
// clear the limit/offset info, since it should not be sent to vnode to be executed.
|
||||
|
@ -3562,14 +3563,6 @@ void* createQueryInfoFromQueryNode(SQueryInfo* pQueryInfo, SExprInfo* pExprs, ST
|
|||
}
|
||||
}
|
||||
|
||||
// qDebug("qmsg:%p QInfo:%" PRIu64 "-%p created", pQueryMsg, pQInfo->qId, pQInfo);
|
||||
// return pQInfo;
|
||||
// if (pGroupbyExpr != NULL) {
|
||||
// taosArrayDestroy(pGroupbyExpr->columnInfo);
|
||||
// free(pGroupbyExpr);
|
||||
// }
|
||||
//
|
||||
// tfree(pTagCols);
|
||||
for (int32_t i = 0; i < numOfOutput; ++i) {
|
||||
SExprInfo* pExprInfo = &pExprs[i];
|
||||
if (pExprInfo->pExpr != NULL) {
|
||||
|
|
|
@ -621,10 +621,15 @@ SSDataBlock* doGetDataBlock(void* param, bool* newgroup) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
//TODO refactor
|
||||
int32_t offset = 0;
|
||||
for(int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
|
||||
SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i);
|
||||
pColData->pData = pData + offset * pBlock->info.rows;
|
||||
if (pData != NULL) {
|
||||
pColData->pData = pData + offset * pBlock->info.rows;
|
||||
} else {
|
||||
pColData->pData = pInput->pRes->urow[i];
|
||||
}
|
||||
|
||||
offset += pColData->info.bytes;
|
||||
}
|
||||
|
@ -692,7 +697,7 @@ void convertQueryResult(SSqlRes* pRes, SQueryInfo* pQueryInfo) {
|
|||
pRes->completed = (pRes->numOfRows == 0);
|
||||
}
|
||||
|
||||
void prepareInputDataFromUpstream(SSqlRes* pRes, SQueryInfo* pQueryInfo) {
|
||||
void handleDownstreamOperator(SSqlRes* pRes, SQueryInfo* pQueryInfo) {
|
||||
if (pQueryInfo->pDownstream != NULL) {
|
||||
// handle the following query process
|
||||
SQueryInfo *px = pQueryInfo->pDownstream;
|
||||
|
@ -713,7 +718,7 @@ void prepareInputDataFromUpstream(SSqlRes* pRes, SQueryInfo* pQueryInfo) {
|
|||
|
||||
taosArrayPush(tableGroupInfo.pGroupList, &group);
|
||||
|
||||
SOperatorInfo* pSourceOptr = createDummyInputOperator((char*)pRes, pSchema, numOfOutput);
|
||||
SOperatorInfo* pSourceOptr = createDummyInputOperator((char*)pRes, pSchema, numOfCols);
|
||||
|
||||
SExprInfo *exprInfo = NULL;
|
||||
/*int32_t code = */createQueryFunc(&info, numOfOutput, &exprInfo, px->exprList->pData, NULL, px->type, NULL);
|
||||
|
@ -2535,8 +2540,9 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t
|
|||
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, tableIndex);
|
||||
|
||||
SQueryInfo* pQueryInfo = tscGetActiveQueryInfo(pCmd);
|
||||
STableMetaInfo* pTableMetaInfo = pQueryInfo->pTableMetaInfo[tableIndex];
|
||||
|
||||
pNew->pTscObj = pSql->pTscObj;
|
||||
pNew->signature = pNew;
|
||||
|
@ -2567,7 +2573,6 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t
|
|||
}
|
||||
|
||||
SQueryInfo* pNewQueryInfo = tscGetQueryInfo(pnCmd, 0);
|
||||
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex);
|
||||
|
||||
pNewQueryInfo->command = pQueryInfo->command;
|
||||
pnCmd->active = pNewQueryInfo;
|
||||
|
@ -3331,7 +3336,7 @@ static int32_t createSecondaryExpr(SQueryAttr* pQueryAttr, SQueryInfo* pQueryInf
|
|||
pse->uid = pTableMetaInfo->pTableMeta->id.uid;
|
||||
pse->resColId = pExpr->base.resColId;
|
||||
|
||||
if (pExpr->pExpr == NULL) { // this should be switched to projection query
|
||||
if (pExpr->base.functionId != TSDB_FUNC_ARITHM) { // this should be switched to projection query
|
||||
pse->numOfParams = 0; // no params for projection query
|
||||
pse->functionId = TSDB_FUNC_PRJ;
|
||||
pse->colInfo.colId = pExpr->base.resColId;
|
||||
|
@ -3539,17 +3544,23 @@ int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAt
|
|||
for(int32_t i = 0; i < pQueryAttr->numOfOutput; ++i) {
|
||||
SExprInfo* pExpr = tscSqlExprGet(pQueryInfo, i);
|
||||
tscSqlExprAssign(&pQueryAttr->pExpr1[i], pExpr);
|
||||
|
||||
if (pQueryAttr->pExpr1[i].base.functionId == TSDB_FUNC_ARITHM) {
|
||||
for (int32_t j = 0; j < pQueryAttr->pExpr1[i].base.numOfParams; ++j) {
|
||||
buildArithmeticExprFromMsg(&pQueryAttr->pExpr1[i], NULL);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pQueryAttr->colList = calloc(numOfCols, sizeof(SColumnInfo));
|
||||
pQueryAttr->tableCols = calloc(numOfCols, sizeof(SColumnInfo));
|
||||
for(int32_t i = 0; i < numOfCols; ++i) {
|
||||
SColumn* pCol = taosArrayGetP(pQueryInfo->colList, i);
|
||||
if (!isValidDataType(pCol->info.type) || pCol->info.type == TSDB_DATA_TYPE_NULL) {
|
||||
assert(0);
|
||||
}
|
||||
|
||||
pQueryAttr->colList[i] = pCol->info;
|
||||
pQueryAttr->colList[i].filterInfo = tFilterInfoDup(pCol->info.filterInfo, pQueryAttr->colList[i].numOfFilters);
|
||||
pQueryAttr->tableCols[i] = pCol->info;
|
||||
pQueryAttr->tableCols[i].filterInfo = tFilterInfoDup(pCol->info.filterInfo, pQueryAttr->tableCols[i].numOfFilters);
|
||||
}
|
||||
|
||||
// global aggregate query
|
||||
|
@ -3577,9 +3588,9 @@ int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAt
|
|||
pQueryAttr->srcRowSize = 0;
|
||||
pQueryAttr->maxTableColumnWidth = 0;
|
||||
for (int16_t i = 0; i < numOfCols; ++i) {
|
||||
pQueryAttr->srcRowSize += pQueryAttr->colList[i].bytes;
|
||||
if (pQueryAttr->maxTableColumnWidth < pQueryAttr->colList[i].bytes) {
|
||||
pQueryAttr->maxTableColumnWidth = pQueryAttr->colList[i].bytes;
|
||||
pQueryAttr->srcRowSize += pQueryAttr->tableCols[i].bytes;
|
||||
if (pQueryAttr->maxTableColumnWidth < pQueryAttr->tableCols[i].bytes) {
|
||||
pQueryAttr->maxTableColumnWidth = pQueryAttr->tableCols[i].bytes;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -497,7 +497,7 @@ typedef struct {
|
|||
int32_t prevResultLen; // previous result length
|
||||
int32_t numOfOperator;
|
||||
int32_t tableScanOperator;// table scan operator. -1 means no scan operator
|
||||
SColumnInfo colList[];
|
||||
SColumnInfo tableCols[];
|
||||
} SQueryTableMsg;
|
||||
|
||||
typedef struct {
|
||||
|
|
|
@ -122,7 +122,7 @@ enum {
|
|||
#define QUERY_IS_FREE_RESOURCE(type) (((type)&TSDB_QUERY_TYPE_FREE_RESOURCE) != 0)
|
||||
|
||||
typedef struct SArithmeticSupport {
|
||||
SExprInfo *pArithExpr;
|
||||
SExprInfo *pExprInfo;
|
||||
int32_t numOfCols;
|
||||
SColumnInfo *colList;
|
||||
void *exprList; // client side used
|
||||
|
|
|
@ -220,7 +220,7 @@ typedef struct SQueryAttr {
|
|||
SExprInfo* pExpr3;
|
||||
int32_t numOfExpr3;
|
||||
|
||||
SColumnInfo* colList;
|
||||
SColumnInfo* tableCols;
|
||||
SColumnInfo* tagColList;
|
||||
int32_t numOfFilterCols;
|
||||
int64_t* fillVal;
|
||||
|
|
|
@ -3698,7 +3698,7 @@ static void arithmetic_function(SQLFunctionCtx *pCtx) {
|
|||
GET_RES_INFO(pCtx)->numOfRes += pCtx->size;
|
||||
SArithmeticSupport *sas = (SArithmeticSupport *)pCtx->param[1].pz;
|
||||
|
||||
arithmeticTreeTraverse(sas->pArithExpr->pExpr, pCtx->size, pCtx->pOutput, sas, pCtx->order, getArithColumnData);
|
||||
arithmeticTreeTraverse(sas->pExprInfo->pExpr, pCtx->size, pCtx->pOutput, sas, pCtx->order, getArithColumnData);
|
||||
}
|
||||
|
||||
static void arithmetic_function_f(SQLFunctionCtx *pCtx, int32_t index) {
|
||||
|
@ -3706,7 +3706,7 @@ static void arithmetic_function_f(SQLFunctionCtx *pCtx, int32_t index) {
|
|||
SArithmeticSupport *sas = (SArithmeticSupport *)pCtx->param[1].pz;
|
||||
|
||||
sas->offset = index;
|
||||
arithmeticTreeTraverse(sas->pArithExpr->pExpr, 1, pCtx->pOutput, sas, pCtx->order, getArithColumnData);
|
||||
arithmeticTreeTraverse(sas->pExprInfo->pExpr, 1, pCtx->pOutput, sas, pCtx->order, getArithColumnData);
|
||||
|
||||
pCtx->pOutput += pCtx->outputBytes;
|
||||
}
|
||||
|
|
|
@ -851,7 +851,7 @@ static TSKEY getStartTsKey(SQueryAttr* pQueryAttr, STimeWindow* win, const TSKEY
|
|||
|
||||
static void setArithParams(SArithmeticSupport* sas, SExprInfo *pExprInfo, SSDataBlock* pSDataBlock) {
|
||||
sas->numOfCols = (int32_t) pSDataBlock->info.numOfCols;
|
||||
sas->pArithExpr = pExprInfo;
|
||||
sas->pExprInfo = pExprInfo;
|
||||
|
||||
sas->colList = calloc(1, pSDataBlock->info.numOfCols*sizeof(SColumnInfo));
|
||||
for(int32_t i = 0; i < sas->numOfCols; ++i) {
|
||||
|
@ -1663,7 +1663,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
|
|||
char* start = POINTER_BYTES * pQueryAttr->numOfCols + (char*) pRuntimeEnv->prevRow;
|
||||
pRuntimeEnv->prevRow[0] = start;
|
||||
for(int32_t i = 1; i < pQueryAttr->numOfCols; ++i) {
|
||||
pRuntimeEnv->prevRow[i] = pRuntimeEnv->prevRow[i - 1] + pQueryAttr->colList[i-1].bytes;
|
||||
pRuntimeEnv->prevRow[i] = pRuntimeEnv->prevRow[i - 1] + pQueryAttr->tableCols[i-1].bytes;
|
||||
}
|
||||
|
||||
*(int64_t*) pRuntimeEnv->prevRow[0] = INT64_MIN;
|
||||
|
@ -1725,7 +1725,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
|
|||
SOperatorInfo* prev = pRuntimeEnv->pTableScanner;
|
||||
if (i == 0) {
|
||||
pRuntimeEnv->proot = createArithOperatorInfo(pRuntimeEnv, prev, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
|
||||
if (pRuntimeEnv->pTableScanner != NULL) { // TODO refactor
|
||||
if (pRuntimeEnv->pTableScanner != NULL && pRuntimeEnv->pTableScanner->operatorType != OP_DummyInput) { // TODO refactor
|
||||
setTableScanFilterOperatorInfo(pRuntimeEnv->pTableScanner->info, pRuntimeEnv->proot);
|
||||
}
|
||||
} else {
|
||||
|
@ -1973,7 +1973,7 @@ void getAlignQueryTimeWindow(SQueryAttr *pQueryAttr, int64_t key, int64_t keyFir
|
|||
bool colIdCheck(SQueryAttr *pQueryAttr, uint64_t qId) {
|
||||
// load data column information is incorrect
|
||||
for (int32_t i = 0; i < pQueryAttr->numOfCols - 1; ++i) {
|
||||
if (pQueryAttr->colList[i].colId == pQueryAttr->colList[i + 1].colId) {
|
||||
if (pQueryAttr->tableCols[i].colId == pQueryAttr->tableCols[i + 1].colId) {
|
||||
qError("QInfo:%"PRIu64" invalid data load column for query", qId);
|
||||
return false;
|
||||
}
|
||||
|
@ -3990,7 +3990,7 @@ static void doTableQueryInfoTimeWindowCheck(SQueryAttr* pQueryAttr, STableQueryI
|
|||
|
||||
STsdbQueryCond createTsdbQueryCond(SQueryAttr* pQueryAttr, STimeWindow* win) {
|
||||
STsdbQueryCond cond = {
|
||||
.colList = pQueryAttr->colList,
|
||||
.colList = pQueryAttr->tableCols,
|
||||
.order = pQueryAttr->order.order,
|
||||
.numOfCols = pQueryAttr->numOfCols,
|
||||
.type = BLOCK_LOAD_OFFSET_SEQ_ORDER,
|
||||
|
@ -5757,9 +5757,9 @@ int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param) {
|
|||
goto _cleanup;
|
||||
}
|
||||
|
||||
char *pMsg = (char *)(pQueryMsg->colList) + sizeof(SColumnInfo) * pQueryMsg->numOfCols;
|
||||
char *pMsg = (char *)(pQueryMsg->tableCols) + sizeof(SColumnInfo) * pQueryMsg->numOfCols;
|
||||
for (int32_t col = 0; col < pQueryMsg->numOfCols; ++col) {
|
||||
SColumnInfo *pColInfo = &pQueryMsg->colList[col];
|
||||
SColumnInfo *pColInfo = &pQueryMsg->tableCols[col];
|
||||
|
||||
pColInfo->colId = htons(pColInfo->colId);
|
||||
pColInfo->type = htons(pColInfo->type);
|
||||
|
@ -6012,7 +6012,7 @@ int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param) {
|
|||
|
||||
param->sql = strndup(pMsg, pQueryMsg->sqlstrLen);
|
||||
|
||||
SQueriedTableInfo info = { .numOfTags = pQueryMsg->numOfTags, .numOfCols = pQueryMsg->numOfCols, .colList = pQueryMsg->colList};
|
||||
SQueriedTableInfo info = { .numOfTags = pQueryMsg->numOfTags, .numOfCols = pQueryMsg->numOfCols, .colList = pQueryMsg->tableCols};
|
||||
if (!validateQueryTableCols(&info, param->pExpr, pQueryMsg->numOfOutput, param->pTagColumnInfo, pQueryMsg)) {
|
||||
code = TSDB_CODE_QRY_INVALID_MSG;
|
||||
goto _cleanup;
|
||||
|
@ -6251,7 +6251,7 @@ SSqlGroupbyExpr *createGroupbyExprFromMsg(SQueryTableMsg *pQueryMsg, SColIndex *
|
|||
|
||||
static int32_t createFilterInfo(SQueryAttr *pQueryAttr, uint64_t qId) {
|
||||
for (int32_t i = 0; i < pQueryAttr->numOfCols; ++i) {
|
||||
if (pQueryAttr->colList[i].numOfFilters > 0) {
|
||||
if (pQueryAttr->tableCols[i].numOfFilters > 0) {
|
||||
pQueryAttr->numOfFilterCols++;
|
||||
}
|
||||
}
|
||||
|
@ -6266,13 +6266,13 @@ static int32_t createFilterInfo(SQueryAttr *pQueryAttr, uint64_t qId) {
|
|||
}
|
||||
|
||||
for (int32_t i = 0, j = 0; i < pQueryAttr->numOfCols; ++i) {
|
||||
if (pQueryAttr->colList[i].numOfFilters > 0) {
|
||||
if (pQueryAttr->tableCols[i].numOfFilters > 0) {
|
||||
SSingleColumnFilterInfo *pFilterInfo = &pQueryAttr->pFilterInfo[j];
|
||||
|
||||
memcpy(&pFilterInfo->info, &pQueryAttr->colList[i], sizeof(SColumnInfo));
|
||||
pFilterInfo->info = pQueryAttr->colList[i];
|
||||
memcpy(&pFilterInfo->info, &pQueryAttr->tableCols[i], sizeof(SColumnInfo));
|
||||
pFilterInfo->info = pQueryAttr->tableCols[i];
|
||||
|
||||
pFilterInfo->numOfFilters = pQueryAttr->colList[i].numOfFilters;
|
||||
pFilterInfo->numOfFilters = pQueryAttr->tableCols[i].numOfFilters;
|
||||
pFilterInfo->pFilters = calloc(pFilterInfo->numOfFilters, sizeof(SColumnFilterElem));
|
||||
if (pFilterInfo->pFilters == NULL) {
|
||||
return TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||
|
@ -6280,7 +6280,7 @@ static int32_t createFilterInfo(SQueryAttr *pQueryAttr, uint64_t qId) {
|
|||
|
||||
for (int32_t f = 0; f < pFilterInfo->numOfFilters; ++f) {
|
||||
SColumnFilterElem *pSingleColFilter = &pFilterInfo->pFilters[f];
|
||||
pSingleColFilter->filterInfo = pQueryAttr->colList[i].filterInfo[f];
|
||||
pSingleColFilter->filterInfo = pQueryAttr->tableCols[i].filterInfo[f];
|
||||
|
||||
int32_t lower = pSingleColFilter->filterInfo.lowerRelOptr;
|
||||
int32_t upper = pSingleColFilter->filterInfo.upperRelOptr;
|
||||
|
@ -6295,7 +6295,7 @@ static int32_t createFilterInfo(SQueryAttr *pQueryAttr, uint64_t qId) {
|
|||
return TSDB_CODE_QRY_INVALID_MSG;
|
||||
}
|
||||
|
||||
pSingleColFilter->bytes = pQueryAttr->colList[i].bytes;
|
||||
pSingleColFilter->bytes = pQueryAttr->tableCols[i].bytes;
|
||||
}
|
||||
|
||||
j++;
|
||||
|
@ -6319,7 +6319,7 @@ static void doUpdateExprColumnIndex(SQueryAttr *pQueryAttr) {
|
|||
if (TSDB_COL_IS_NORMAL_COL(pColIndex->flag)) {
|
||||
int32_t f = 0;
|
||||
for (f = 0; f < pQueryAttr->numOfCols; ++f) {
|
||||
if (pColIndex->colId == pQueryAttr->colList[f].colId) {
|
||||
if (pColIndex->colId == pQueryAttr->tableCols[f].colId) {
|
||||
pColIndex->colIndex = f;
|
||||
break;
|
||||
}
|
||||
|
@ -6421,20 +6421,20 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SSqlGroupbyExpr* pGroupbyExpr
|
|||
pQueryAttr->needReverseScan = pQueryMsg->needReverseScan;
|
||||
pQueryAttr->vgId = vgId;
|
||||
|
||||
pQueryAttr->colList = calloc(numOfCols, sizeof(SSingleColumnFilterInfo));
|
||||
if (pQueryAttr->colList == NULL) {
|
||||
pQueryAttr->tableCols = calloc(numOfCols, sizeof(SSingleColumnFilterInfo));
|
||||
if (pQueryAttr->tableCols == NULL) {
|
||||
goto _cleanup;
|
||||
}
|
||||
|
||||
pQueryAttr->srcRowSize = 0;
|
||||
pQueryAttr->maxTableColumnWidth = 0;
|
||||
for (int16_t i = 0; i < numOfCols; ++i) {
|
||||
pQueryAttr->colList[i] = pQueryMsg->colList[i];
|
||||
pQueryAttr->colList[i].filterInfo = tFilterInfoDup(pQueryMsg->colList[i].filterInfo, pQueryAttr->colList[i].numOfFilters);
|
||||
pQueryAttr->tableCols[i] = pQueryMsg->tableCols[i];
|
||||
pQueryAttr->tableCols[i].filterInfo = tFilterInfoDup(pQueryMsg->tableCols[i].filterInfo, pQueryAttr->tableCols[i].numOfFilters);
|
||||
|
||||
pQueryAttr->srcRowSize += pQueryAttr->colList[i].bytes;
|
||||
if (pQueryAttr->maxTableColumnWidth < pQueryAttr->colList[i].bytes) {
|
||||
pQueryAttr->maxTableColumnWidth = pQueryAttr->colList[i].bytes;
|
||||
pQueryAttr->srcRowSize += pQueryAttr->tableCols[i].bytes;
|
||||
if (pQueryAttr->maxTableColumnWidth < pQueryAttr->tableCols[i].bytes) {
|
||||
pQueryAttr->maxTableColumnWidth = pQueryAttr->tableCols[i].bytes;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -6713,12 +6713,12 @@ void freeQInfo(SQInfo *pQInfo) {
|
|||
tfree(pQueryAttr->tagColList);
|
||||
tfree(pQueryAttr->pFilterInfo);
|
||||
|
||||
if (pQueryAttr->colList != NULL) {
|
||||
if (pQueryAttr->tableCols != NULL) {
|
||||
for (int32_t i = 0; i < pQueryAttr->numOfCols; i++) {
|
||||
SColumnInfo *column = pQueryAttr->colList + i;
|
||||
SColumnInfo *column = pQueryAttr->tableCols + i;
|
||||
freeColumnFilterInfo(column->filterInfo, column->numOfFilters);
|
||||
}
|
||||
tfree(pQueryAttr->colList);
|
||||
tfree(pQueryAttr->tableCols);
|
||||
}
|
||||
|
||||
if (pQueryAttr->pGroupbyExpr != NULL) {
|
||||
|
@ -6900,12 +6900,12 @@ void freeQueryAttr(SQueryAttr* pQueryAttr) {
|
|||
tfree(pQueryAttr->tagColList);
|
||||
tfree(pQueryAttr->pFilterInfo);
|
||||
|
||||
if (pQueryAttr->colList != NULL) {
|
||||
if (pQueryAttr->tableCols != NULL) {
|
||||
for (int32_t i = 0; i < pQueryAttr->numOfCols; i++) {
|
||||
SColumnInfo* column = pQueryAttr->colList + i;
|
||||
SColumnInfo* column = pQueryAttr->tableCols + i;
|
||||
freeColumnFilterInfo(column->filterInfo, column->numOfFilters);
|
||||
}
|
||||
tfree(pQueryAttr->colList);
|
||||
tfree(pQueryAttr->tableCols);
|
||||
}
|
||||
|
||||
if (pQueryAttr->pGroupbyExpr != NULL) {
|
||||
|
|
|
@ -116,7 +116,6 @@ SArray* createExecOperatorPlan(SQueryAttr* pQueryAttr) {
|
|||
taosArrayPush(plan, &op);
|
||||
}
|
||||
|
||||
|
||||
if (pQueryAttr->limit.limit > 0 || pQueryAttr->limit.offset > 0) {
|
||||
op = OP_Limit;
|
||||
taosArrayPush(plan, &op);
|
||||
|
|
|
@ -88,7 +88,7 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qi
|
|||
goto _over;
|
||||
}
|
||||
|
||||
SQueriedTableInfo info = { .numOfTags = pQueryMsg->numOfTags, .numOfCols = pQueryMsg->numOfCols, .colList = pQueryMsg->colList};
|
||||
SQueriedTableInfo info = { .numOfTags = pQueryMsg->numOfTags, .numOfCols = pQueryMsg->numOfCols, .colList = pQueryMsg->tableCols};
|
||||
if ((code = createQueryFunc(&info, pQueryMsg->numOfOutput, ¶m.pExprs, param.pExpr, param.pTagColumnInfo,
|
||||
pQueryMsg->queryType, pQueryMsg)) != TSDB_CODE_SUCCESS) {
|
||||
goto _over;
|
||||
|
@ -185,7 +185,7 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qi
|
|||
freeParam(¶m);
|
||||
|
||||
for (int32_t i = 0; i < pQueryMsg->numOfCols; i++) {
|
||||
SColumnInfo* column = pQueryMsg->colList + i;
|
||||
SColumnInfo* column = pQueryMsg->tableCols + i;
|
||||
freeColumnFilterInfo(column->filterInfo, column->numOfFilters);
|
||||
}
|
||||
|
||||
|
@ -210,7 +210,6 @@ bool qTableQuery(qinfo_t qinfo, uint64_t *qId) {
|
|||
return false;
|
||||
}
|
||||
|
||||
|
||||
*qId = pQInfo->qId;
|
||||
pQInfo->startExecTs = taosGetTimestampSec();
|
||||
|
||||
|
|
|
@ -0,0 +1,144 @@
|
|||
system sh/stop_dnodes.sh
|
||||
|
||||
system sh/deploy.sh -n dnode1 -i 1
|
||||
system sh/cfg.sh -n dnode1 -c walLevel -v 1
|
||||
system sh/exec.sh -n dnode1 -s start
|
||||
sleep 100
|
||||
sql connect
|
||||
sleep 100
|
||||
|
||||
print ========== sub_in_from.sim
|
||||
$i = 0
|
||||
|
||||
$dbPrefix = subdb
|
||||
$tbPrefix = sub_tb
|
||||
$stbPrefix = sub_stb
|
||||
$tbNum = 10
|
||||
$rowNum = 1000
|
||||
$totalNum = $tbNum * $rowNum
|
||||
$loops = 200000
|
||||
$log = 10000
|
||||
$ts0 = 1537146000000
|
||||
$delta = 600000
|
||||
$i = 0
|
||||
$db = $dbPrefix . $i
|
||||
$stb = $stbPrefix . $i
|
||||
|
||||
sql drop database $db -x step1
|
||||
step1:
|
||||
sql create database $db cache 16 maxrows 4096 keep 36500
|
||||
print ====== create tables
|
||||
sql use $db
|
||||
sql create table $stb (ts timestamp, c1 int, c2 bigint, c3 float, c4 double, c5 smallint, c6 tinyint, c7 bool, c8 binary(10), c9 nchar(10)) tags(t1 int)
|
||||
|
||||
$i = 0
|
||||
$ts = $ts0
|
||||
$halfNum = $tbNum / 2
|
||||
while $i < $halfNum
|
||||
$tbId = $i + $halfNum
|
||||
$tb = $tbPrefix . $i
|
||||
$tb1 = $tbPrefix . $tbId
|
||||
sql create table $tb using $stb tags( $i )
|
||||
sql create table $tb1 using $stb tags( $tbId )
|
||||
|
||||
$x = 0
|
||||
while $x < $rowNum
|
||||
$xs = $x * $delta
|
||||
$ts = $ts0 + $xs
|
||||
$c = $x / 10
|
||||
$c = $c * 10
|
||||
$c = $x - $c
|
||||
$binary = 'binary . $c
|
||||
$binary = $binary . '
|
||||
$nchar = 'nchar . $c
|
||||
$nchar = $nchar . '
|
||||
sql insert into $tb values ( $ts , $c , $c , $c , $c , $c , $c , true, $binary , $nchar )
|
||||
sql insert into $tb1 values ( $ts , $c , NULL , $c , NULL , $c , $c , true, $binary , $nchar )
|
||||
$x = $x + 1
|
||||
endw
|
||||
|
||||
$i = $i + 1
|
||||
endw
|
||||
print ====== tables created
|
||||
|
||||
sql_error select count(*) from (select count(*) from abc.sub_stb0)
|
||||
sql_error select val + 20 from (select count(*) from sub_stb0 interval(10h))
|
||||
sql_error select abc+20 from (select count(*) from sub_stb0 interval(1s))
|
||||
|
||||
sql select count(*) from (select count(*) from sub_stb0 interval(10h))
|
||||
if $rows != 1 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data00 != 18 then
|
||||
print expect 18, actual: $data00
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select ts from (select count(*) from sub_stb0 interval(10h))
|
||||
if $rows != 18 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data00 != @18-09-17 04:00:00.000@ then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data01 != @18-09-17 14:00:00.000@ then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select val + 20, val from (select count(*) as val from sub_stb0 interval(10h))
|
||||
if $rows != 18 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data00 != 320.000000 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data01 != 300 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data10 != 620 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data11 != 600 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data20 != 620 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data21 != 600 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select max(val), min(val), max(val) - min(val) from (select count(*) val from sub_stb0 interval(10h))
|
||||
if $rows != 1 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data00 != 600 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data01 != 100 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data02 != 500.000000 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select first(ts,val),last(ts,val) from (select count(*) val from sub_stb0 interval(10h))
|
||||
sql select top(val, 5) from (select count(*) val from sub_stb0 interval(10h))
|
||||
sql select diff(val) from (select count(*) val from sub_stb0 interval(10h))
|
||||
sql select percentile(val, 50) from (select count(*) val from sub_stb0 interval(10h))
|
||||
sql select stddev(val) from (select count(*) val from sub_stb0 interval(10h))
|
||||
|
||||
print ====================>complex query
|
||||
|
Loading…
Reference in New Issue