[TD-225]refactor.
This commit is contained in:
parent
52a524624d
commit
55a7ce46c1
|
@ -1875,6 +1875,7 @@ static int32_t setCtxTagColumnInfo(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// todo refactor
|
||||||
static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int16_t order) {
|
static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int16_t order) {
|
||||||
qDebug("QInfo:%p setup runtime env", GET_QINFO_ADDR(pRuntimeEnv));
|
qDebug("QInfo:%p setup runtime env", GET_QINFO_ADDR(pRuntimeEnv));
|
||||||
SQuery *pQuery = pRuntimeEnv->pQuery;
|
SQuery *pQuery = pRuntimeEnv->pQuery;
|
||||||
|
@ -3845,11 +3846,6 @@ void setExecutionContext(SQInfo *pQInfo, int32_t groupIndex, TSKEY nextKey) {
|
||||||
|
|
||||||
// lastKey needs to be updated
|
// lastKey needs to be updated
|
||||||
pTableQueryInfo->lastKey = nextKey;
|
pTableQueryInfo->lastKey = nextKey;
|
||||||
|
|
||||||
if (pRuntimeEnv->hasTagResults || pRuntimeEnv->pTsBuf != NULL) {
|
|
||||||
setAdditionalInfo(pQInfo, pTableQueryInfo->pTable, pTableQueryInfo);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pRuntimeEnv->prevGroupId != INT32_MIN && pRuntimeEnv->prevGroupId == groupIndex) {
|
if (pRuntimeEnv->prevGroupId != INT32_MIN && pRuntimeEnv->prevGroupId == groupIndex) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -4296,7 +4292,6 @@ int32_t doFillGapsInResults(SQueryRuntimeEnv* pRuntimeEnv, tFilePage **pDst, int
|
||||||
pQInfo, pFillInfo->numOfRows, ret, pQuery->limit.offset, ret - pQuery->limit.offset, 0);
|
pQInfo, pFillInfo->numOfRows, ret, pQuery->limit.offset, ret - pQuery->limit.offset, 0);
|
||||||
|
|
||||||
ret -= (int32_t)pQuery->limit.offset;
|
ret -= (int32_t)pQuery->limit.offset;
|
||||||
// todo !!!!there exactly number of interpo is not valid.
|
|
||||||
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
|
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
|
||||||
memmove(pDst[i]->data, pDst[i]->data + pQuery->pExpr1[i].bytes * pQuery->limit.offset,
|
memmove(pDst[i]->data, pDst[i]->data + pQuery->pExpr1[i].bytes * pQuery->limit.offset,
|
||||||
ret * pQuery->pExpr1[i].bytes);
|
ret * pQuery->pExpr1[i].bytes);
|
||||||
|
@ -4777,19 +4772,17 @@ static void enableExecutionForNextTable(SQueryRuntimeEnv *pRuntimeEnv) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO refactor: setAdditionalInfo
|
|
||||||
static FORCE_INLINE void setEnvForEachBlock(SQInfo* pQInfo, STableQueryInfo* pTableQueryInfo, SDataBlockInfo* pBlockInfo) {
|
static FORCE_INLINE void setEnvForEachBlock(SQInfo* pQInfo, STableQueryInfo* pTableQueryInfo, SDataBlockInfo* pBlockInfo) {
|
||||||
SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
|
SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
|
||||||
SQuery* pQuery = pQInfo->runtimeEnv.pQuery;
|
SQuery* pQuery = pQInfo->runtimeEnv.pQuery;
|
||||||
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
|
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
|
||||||
|
|
||||||
if (QUERY_IS_INTERVAL_QUERY(pQuery)) {
|
|
||||||
TSKEY nextKey = pBlockInfo->window.skey;
|
|
||||||
setIntervalQueryRange(pQInfo, nextKey);
|
|
||||||
|
|
||||||
if (pRuntimeEnv->hasTagResults || pRuntimeEnv->pTsBuf != NULL) {
|
if (pRuntimeEnv->hasTagResults || pRuntimeEnv->pTsBuf != NULL) {
|
||||||
setAdditionalInfo(pQInfo, pTableQueryInfo->pTable, pTableQueryInfo);
|
setAdditionalInfo(pQInfo, pTableQueryInfo->pTable, pTableQueryInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (QUERY_IS_INTERVAL_QUERY(pQuery)) {
|
||||||
|
setIntervalQueryRange(pQInfo, pBlockInfo->window.skey);
|
||||||
} else { // non-interval query
|
} else { // non-interval query
|
||||||
setExecutionContext(pQInfo, pTableQueryInfo->groupIndex, pBlockInfo->window.ekey + step);
|
setExecutionContext(pQInfo, pTableQueryInfo->groupIndex, pBlockInfo->window.ekey + step);
|
||||||
}
|
}
|
||||||
|
@ -5469,7 +5462,7 @@ static void doRestoreContext(SQInfo *pQInfo) {
|
||||||
SET_MASTER_SCAN_FLAG(pRuntimeEnv);
|
SET_MASTER_SCAN_FLAG(pRuntimeEnv);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void doCloseAllTimeWindowAfterScan(SQInfo *pQInfo) {
|
static void doCloseAllTimeWindow(SQInfo *pQInfo) {
|
||||||
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
|
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
|
||||||
|
|
||||||
if (QUERY_IS_INTERVAL_QUERY(pQuery)) {
|
if (QUERY_IS_INTERVAL_QUERY(pQuery)) {
|
||||||
|
@ -5521,7 +5514,7 @@ static void multiTableQueryProcess(SQInfo *pQInfo) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// close all time window results
|
// close all time window results
|
||||||
doCloseAllTimeWindowAfterScan(pQInfo);
|
doCloseAllTimeWindow(pQInfo);
|
||||||
|
|
||||||
if (needReverseScan(pQuery)) {
|
if (needReverseScan(pQuery)) {
|
||||||
int32_t code = doSaveContext(pQInfo);
|
int32_t code = doSaveContext(pQInfo);
|
||||||
|
@ -5846,8 +5839,7 @@ static void stableQueryImpl(SQInfo *pQInfo) {
|
||||||
(isFixedOutputQuery(pRuntimeEnv) && (!isPointInterpoQuery(pQuery)) && (!pRuntimeEnv->groupbyColumn))) {
|
(isFixedOutputQuery(pRuntimeEnv) && (!isPointInterpoQuery(pQuery)) && (!pRuntimeEnv->groupbyColumn))) {
|
||||||
multiTableQueryProcess(pQInfo);
|
multiTableQueryProcess(pQInfo);
|
||||||
} else {
|
} else {
|
||||||
assert((pQuery->checkResultBuf == 1 && pQuery->interval.interval == 0) || isPointInterpoQuery(pQuery) ||
|
assert(pQuery->checkResultBuf == 1 || isPointInterpoQuery(pQuery) || pRuntimeEnv->groupbyColumn);
|
||||||
pRuntimeEnv->groupbyColumn);
|
|
||||||
|
|
||||||
sequentialTableProcess(pQInfo);
|
sequentialTableProcess(pQInfo);
|
||||||
}
|
}
|
||||||
|
@ -6375,7 +6367,7 @@ static int32_t createQueryFuncExprFromMsg(SQueryTableMsg *pQueryMsg, int32_t num
|
||||||
if (functId == TSDB_FUNC_TOP || functId == TSDB_FUNC_BOTTOM) {
|
if (functId == TSDB_FUNC_TOP || functId == TSDB_FUNC_BOTTOM) {
|
||||||
int32_t j = getColumnIndexInSource(pQueryMsg, &pExprs[i].base, pTagCols);
|
int32_t j = getColumnIndexInSource(pQueryMsg, &pExprs[i].base, pTagCols);
|
||||||
if (j < 0 || j >= pQueryMsg->numOfCols) {
|
if (j < 0 || j >= pQueryMsg->numOfCols) {
|
||||||
assert(0);
|
return TSDB_CODE_QRY_INVALID_MSG;
|
||||||
} else {
|
} else {
|
||||||
SColumnInfo *pCol = &pQueryMsg->colList[j];
|
SColumnInfo *pCol = &pQueryMsg->colList[j];
|
||||||
int32_t ret =
|
int32_t ret =
|
||||||
|
@ -6640,7 +6632,7 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou
|
||||||
pQInfo->runtimeEnv.summary.tableInfoSize += (pTableGroupInfo->numOfTables * sizeof(STableQueryInfo));
|
pQInfo->runtimeEnv.summary.tableInfoSize += (pTableGroupInfo->numOfTables * sizeof(STableQueryInfo));
|
||||||
|
|
||||||
pQInfo->runtimeEnv.pResultRowHashTable = taosHashInit(pTableGroupInfo->numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
|
pQInfo->runtimeEnv.pResultRowHashTable = taosHashInit(pTableGroupInfo->numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
|
||||||
pQInfo->runtimeEnv.keyBuf = malloc(TSDB_MAX_BYTES_PER_ROW);
|
pQInfo->runtimeEnv.keyBuf = malloc(TSDB_MAX_BYTES_PER_ROW); // todo opt size
|
||||||
pQInfo->runtimeEnv.pool = initResultRowPool(getResultRowSize(&pQInfo->runtimeEnv));
|
pQInfo->runtimeEnv.pool = initResultRowPool(getResultRowSize(&pQInfo->runtimeEnv));
|
||||||
pQInfo->runtimeEnv.prevRow = malloc(POINTER_BYTES * pQuery->numOfCols + srcSize);
|
pQInfo->runtimeEnv.prevRow = malloc(POINTER_BYTES * pQuery->numOfCols + srcSize);
|
||||||
|
|
||||||
|
|
|
@ -383,3 +383,27 @@ sql create table cars(ts timestamp, c int) tags(id int);
|
||||||
sql create table car1 using cars tags(1);
|
sql create table car1 using cars tags(1);
|
||||||
sql create table car2 using cars tags(2);
|
sql create table car2 using cars tags(2);
|
||||||
sql insert into car1 (ts, c) values (now,1) car2(ts, c) values(now, 2);
|
sql insert into car1 (ts, c) values (now,1) car2(ts, c) values(now, 2);
|
||||||
|
|
||||||
|
print ========================> TD-2700
|
||||||
|
sql create table t1(ts timestamp, k int);
|
||||||
|
sql insert into t1 values(1500000001000, 0);
|
||||||
|
sql select sum(k) from t1 interval(1d) sliding(1h);
|
||||||
|
if $rows != 24 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
print ========================> TD-2740
|
||||||
|
sql drop table if exists m1;
|
||||||
|
sql create table m1(ts timestamp, k int) tags(a int);
|
||||||
|
sql create table tm0 using m1 tags(0);
|
||||||
|
sql create table tm1 using m1 tags(1);
|
||||||
|
sql create table tm2 using m1 tags(2);
|
||||||
|
sql create table tm3 using m1 tags(3);
|
||||||
|
sql insert into tm0 values('2020-1-1 1:1:1', 0);
|
||||||
|
sql insert into tm1 values('2020-1-5 1:1:1', 0);
|
||||||
|
sql insert into tm2 values('2020-1-7 1:1:1', 0);
|
||||||
|
sql insert into tm3 values('2020-1-1 1:1:1', 0);
|
||||||
|
sql select count from m1 where ts='2020-1-1 1:1:1' interval(1h) group by tbname;
|
||||||
|
if $rows != 2 then
|
||||||
|
return -1
|
||||||
|
endi
|
Loading…
Reference in New Issue