[td-225]refactor
This commit is contained in:
parent
2101466d9a
commit
a06a1fb370
|
@ -6505,7 +6505,6 @@ int32_t doCheckForCreateFromStable(SSqlObj* pSql, SSqlInfo* pInfo) {
|
|||
|
||||
size_t valSize = taosArrayGetSize(pValList);
|
||||
|
||||
|
||||
// too long tag values will return invalid sql, not be truncated automatically
|
||||
SSchema *pTagSchema = tscGetTableTagSchema(pStableMetaInfo->pTableMeta);
|
||||
STagData *pTag = &pCreateTableInfo->tagdata;
|
||||
|
@ -6515,7 +6514,6 @@ int32_t doCheckForCreateFromStable(SSqlObj* pSql, SSqlInfo* pInfo) {
|
|||
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
|
||||
SArray* pNameList = NULL;
|
||||
size_t nameSize = 0;
|
||||
int32_t schemaSize = tscGetNumOfTags(pStableMetaInfo->pTableMeta);
|
||||
|
|
|
@ -48,6 +48,13 @@ void tVariantCreate(tVariant *pVar, SStrToken *token) {
|
|||
case TSDB_DATA_TYPE_INT:{
|
||||
ret = tStrToInteger(token->z, token->type, token->n, &pVar->i64, true);
|
||||
if (ret != 0) {
|
||||
SStrToken t = {0};
|
||||
tSQLGetToken(token->z, &t.type);
|
||||
if (t.type == TK_MINUS) { // it is a signed number which is greater than INT64_MAX or less than INT64_MIN
|
||||
pVar->nType = -1; // -1 means error type
|
||||
return;
|
||||
}
|
||||
|
||||
// data overflow, try unsigned parse the input number
|
||||
ret = tStrToInteger(token->z, token->type, token->n, &pVar->i64, false);
|
||||
if (ret != 0) {
|
||||
|
|
|
@ -266,7 +266,6 @@ typedef struct SQueryRuntimeEnv {
|
|||
SSDataBlock *outputBuf;
|
||||
STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray<STableQueryInfo*> structure
|
||||
struct SOperatorInfo *proot;
|
||||
struct SOperatorInfo *pTableScanner; // table scan operator
|
||||
SGroupResInfo groupResInfo;
|
||||
int64_t currentOffset; // dynamic offset value
|
||||
|
||||
|
@ -300,7 +299,7 @@ enum OPERATOR_TYPE_E {
|
|||
OP_DummyInput = 16, //TODO remove it after fully refactor.
|
||||
OP_MultiwaySort = 17, // multi-way data merge into one input stream.
|
||||
OP_GlobalAggregate = 18, // global merge for the multi-way data sources.
|
||||
OP_Condition = 19,
|
||||
OP_Filter = 19,
|
||||
};
|
||||
|
||||
typedef struct SOperatorInfo {
|
||||
|
@ -437,10 +436,10 @@ typedef struct SSLimitOperatorInfo {
|
|||
SArray *orderColumnList;
|
||||
} SSLimitOperatorInfo;
|
||||
|
||||
typedef struct SConditionOperatorInfo {
|
||||
typedef struct SFilterOperatorInfo {
|
||||
SSingleColumnFilterInfo *pFilterInfo;
|
||||
int32_t numOfFilterCols;
|
||||
} SConditionOperatorInfo;
|
||||
} SFilterOperatorInfo;
|
||||
|
||||
typedef struct SFillOperatorInfo {
|
||||
SFillInfo *pFillInfo;
|
||||
|
@ -504,7 +503,7 @@ SOperatorInfo* createMultiwaySortOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SEx
|
|||
int32_t numOfRows, void* merger, bool groupMix);
|
||||
SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* param);
|
||||
SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* merger);
|
||||
SOperatorInfo* createConditionOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
|
||||
SOperatorInfo* createFilterOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
|
||||
|
||||
SSDataBlock* doGlobalAggregate(void* param, bool* newgroup);
|
||||
SSDataBlock* doMultiwayMergeSort(void* param, bool* newgroup);
|
||||
|
|
|
@ -1707,49 +1707,49 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
|
|||
}
|
||||
case OP_MultiTableTimeInterval: {
|
||||
pRuntimeEnv->proot =
|
||||
createMultiTableTimeIntervalOperatorInfo(pRuntimeEnv, pRuntimeEnv->pTableScanner, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
|
||||
setTableScanFilterOperatorInfo(pRuntimeEnv->pTableScanner->info, pRuntimeEnv->proot);
|
||||
createMultiTableTimeIntervalOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
|
||||
setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream->info, pRuntimeEnv->proot);
|
||||
break;
|
||||
}
|
||||
case OP_TimeWindow: {
|
||||
pRuntimeEnv->proot =
|
||||
createTimeIntervalOperatorInfo(pRuntimeEnv, pRuntimeEnv->pTableScanner, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
|
||||
setTableScanFilterOperatorInfo(pRuntimeEnv->pTableScanner->info, pRuntimeEnv->proot);
|
||||
createTimeIntervalOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
|
||||
setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream->info, pRuntimeEnv->proot);
|
||||
break;
|
||||
}
|
||||
case OP_Groupby: {
|
||||
pRuntimeEnv->proot =
|
||||
createGroupbyOperatorInfo(pRuntimeEnv, pRuntimeEnv->pTableScanner, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
|
||||
setTableScanFilterOperatorInfo(pRuntimeEnv->pTableScanner->info, pRuntimeEnv->proot);
|
||||
createGroupbyOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
|
||||
setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream->info, pRuntimeEnv->proot);
|
||||
break;
|
||||
}
|
||||
case OP_SessionWindow: {
|
||||
pRuntimeEnv->proot =
|
||||
createSWindowOperatorInfo(pRuntimeEnv, pRuntimeEnv->pTableScanner, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
|
||||
setTableScanFilterOperatorInfo(pRuntimeEnv->pTableScanner->info, pRuntimeEnv->proot);
|
||||
createSWindowOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
|
||||
setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream->info, pRuntimeEnv->proot);
|
||||
break;
|
||||
}
|
||||
case OP_MultiTableAggregate: {
|
||||
pRuntimeEnv->proot =
|
||||
createMultiTableAggOperatorInfo(pRuntimeEnv, pRuntimeEnv->pTableScanner, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
|
||||
setTableScanFilterOperatorInfo(pRuntimeEnv->pTableScanner->info, pRuntimeEnv->proot);
|
||||
createMultiTableAggOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
|
||||
setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream->info, pRuntimeEnv->proot);
|
||||
break;
|
||||
}
|
||||
case OP_Aggregate: {
|
||||
pRuntimeEnv->proot =
|
||||
createAggregateOperatorInfo(pRuntimeEnv, pRuntimeEnv->pTableScanner, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
|
||||
if (pRuntimeEnv->pTableScanner->operatorType != OP_DummyInput) {
|
||||
setTableScanFilterOperatorInfo(pRuntimeEnv->pTableScanner->info, pRuntimeEnv->proot);
|
||||
createAggregateOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
|
||||
if (pRuntimeEnv->proot->upstream->operatorType != OP_DummyInput) {
|
||||
setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream->info, pRuntimeEnv->proot);
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
case OP_Arithmetic: { // TODO refactor to remove arith operator.
|
||||
SOperatorInfo* prev = pRuntimeEnv->pTableScanner;
|
||||
SOperatorInfo* prev = pRuntimeEnv->proot;
|
||||
if (i == 0) {
|
||||
pRuntimeEnv->proot = createArithOperatorInfo(pRuntimeEnv, prev, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
|
||||
if (pRuntimeEnv->pTableScanner != NULL && pRuntimeEnv->pTableScanner->operatorType != OP_DummyInput) { // TODO refactor
|
||||
setTableScanFilterOperatorInfo(pRuntimeEnv->pTableScanner->info, pRuntimeEnv->proot);
|
||||
if (pRuntimeEnv->proot != NULL && pRuntimeEnv->proot->operatorType != OP_DummyInput) { // TODO refactor
|
||||
setTableScanFilterOperatorInfo(prev->info, pRuntimeEnv->proot);
|
||||
}
|
||||
} else {
|
||||
prev = pRuntimeEnv->proot;
|
||||
|
@ -1764,12 +1764,12 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
|
|||
break;
|
||||
}
|
||||
|
||||
case OP_Condition: { // todo refactor
|
||||
case OP_Filter: { // todo refactor
|
||||
assert(pQueryAttr->havingNum > 0);
|
||||
if (pQueryAttr->stableQuery) {
|
||||
pRuntimeEnv->proot = createConditionOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr3, pQueryAttr->numOfExpr3);
|
||||
pRuntimeEnv->proot = createFilterOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr3, pQueryAttr->numOfExpr3);
|
||||
} else {
|
||||
pRuntimeEnv->proot = createConditionOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
|
||||
pRuntimeEnv->proot = createFilterOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
@ -4019,19 +4019,19 @@ int32_t doInitQInfo(SQInfo* pQInfo, STSBuf* pTsBuf, SArray* prevResult, void* ts
|
|||
|
||||
switch(tbScanner) {
|
||||
case OP_TableBlockInfoScan: {
|
||||
pRuntimeEnv->pTableScanner = createTableBlockInfoScanOperator(pRuntimeEnv->pQueryHandle, pRuntimeEnv);
|
||||
pRuntimeEnv->proot = createTableBlockInfoScanOperator(pRuntimeEnv->pQueryHandle, pRuntimeEnv);
|
||||
break;
|
||||
}
|
||||
case OP_TableSeqScan: {
|
||||
pRuntimeEnv->pTableScanner = createTableSeqScanOperator(pRuntimeEnv->pQueryHandle, pRuntimeEnv);
|
||||
pRuntimeEnv->proot = createTableSeqScanOperator(pRuntimeEnv->pQueryHandle, pRuntimeEnv);
|
||||
break;
|
||||
}
|
||||
case OP_DataBlocksOptScan: {
|
||||
pRuntimeEnv->pTableScanner = createDataBlocksOptScanInfo(pRuntimeEnv->pQueryHandle, pRuntimeEnv, getNumOfScanTimes(pQueryAttr), 1);
|
||||
pRuntimeEnv->proot = createDataBlocksOptScanInfo(pRuntimeEnv->pQueryHandle, pRuntimeEnv, getNumOfScanTimes(pQueryAttr), 1);
|
||||
break;
|
||||
}
|
||||
case OP_TableScan: {
|
||||
pRuntimeEnv->pTableScanner = createTableScanOperator(pRuntimeEnv->pQueryHandle, pRuntimeEnv, getNumOfScanTimes(pQueryAttr));
|
||||
pRuntimeEnv->proot = createTableScanOperator(pRuntimeEnv->pQueryHandle, pRuntimeEnv, getNumOfScanTimes(pQueryAttr));
|
||||
break;
|
||||
}
|
||||
default: { // do nothing
|
||||
|
@ -4040,8 +4040,8 @@ int32_t doInitQInfo(SQInfo* pQInfo, STSBuf* pTsBuf, SArray* prevResult, void* ts
|
|||
}
|
||||
|
||||
if (sourceOptr != NULL) {
|
||||
assert(pRuntimeEnv->pTableScanner == NULL);
|
||||
pRuntimeEnv->pTableScanner = sourceOptr;
|
||||
assert(pRuntimeEnv->proot == NULL);
|
||||
pRuntimeEnv->proot = sourceOptr;
|
||||
}
|
||||
|
||||
if (pTsBuf != NULL) {
|
||||
|
@ -4911,7 +4911,7 @@ static SSDataBlock* doFilter(void* param, bool* newgroup) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
SConditionOperatorInfo* pCondInfo = pOperator->info;
|
||||
SFilterOperatorInfo* pCondInfo = pOperator->info;
|
||||
SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
|
||||
|
||||
while (1) {
|
||||
|
@ -5334,7 +5334,7 @@ static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) {
|
|||
}
|
||||
|
||||
static void destroyConditionOperatorInfo(void* param, int32_t numOfOutput) {
|
||||
SConditionOperatorInfo* pInfo = (SConditionOperatorInfo*) param;
|
||||
SFilterOperatorInfo* pInfo = (SFilterOperatorInfo*) param;
|
||||
doDestroyFilterInfo(pInfo->pFilterInfo, pInfo->numOfFilterCols);
|
||||
}
|
||||
|
||||
|
@ -5394,9 +5394,9 @@ SOperatorInfo* createArithOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorI
|
|||
return pOperator;
|
||||
}
|
||||
|
||||
SOperatorInfo* createConditionOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr,
|
||||
SOperatorInfo* createFilterOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr,
|
||||
int32_t numOfOutput) {
|
||||
SConditionOperatorInfo* pInfo = calloc(1, sizeof(SConditionOperatorInfo));
|
||||
SFilterOperatorInfo* pInfo = calloc(1, sizeof(SFilterOperatorInfo));
|
||||
|
||||
{
|
||||
SColumnInfo* pCols = calloc(numOfOutput, sizeof(SColumnInfo));
|
||||
|
@ -5430,7 +5430,7 @@ SOperatorInfo* createConditionOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpera
|
|||
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
|
||||
|
||||
pOperator->name = "ConditionOperator";
|
||||
pOperator->operatorType = OP_Condition;
|
||||
pOperator->operatorType = OP_Filter;
|
||||
pOperator->blockingOptr = false;
|
||||
pOperator->status = OP_IN_EXECUTING;
|
||||
pOperator->numOfOutput = numOfOutput;
|
||||
|
|
|
@ -94,7 +94,7 @@ SArray* createExecOperatorPlan(SQueryAttr* pQueryAttr) {
|
|||
taosArrayPush(plan, &op);
|
||||
|
||||
if (!pQueryAttr->stableQuery && pQueryAttr->havingNum > 0) {
|
||||
op = OP_Condition;
|
||||
op = OP_Filter;
|
||||
taosArrayPush(plan, &op);
|
||||
}
|
||||
|
||||
|
@ -120,7 +120,7 @@ SArray* createExecOperatorPlan(SQueryAttr* pQueryAttr) {
|
|||
taosArrayPush(plan, &op);
|
||||
|
||||
if (!pQueryAttr->stableQuery && pQueryAttr->havingNum > 0) {
|
||||
op = OP_Condition;
|
||||
op = OP_Filter;
|
||||
taosArrayPush(plan, &op);
|
||||
}
|
||||
|
||||
|
@ -157,7 +157,7 @@ SArray* createGlobalMergePlan(SQueryAttr* pQueryAttr) {
|
|||
taosArrayPush(plan, &op);
|
||||
|
||||
if (pQueryAttr->havingNum > 0) {
|
||||
op = OP_Condition;
|
||||
op = OP_Filter;
|
||||
taosArrayPush(plan, &op);
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue