[td-225]
This commit is contained in:
parent
794549527c
commit
c9933fd333
|
@ -7802,14 +7802,16 @@ static int32_t doLoadAllTableMeta(SSqlObj* pSql, SQueryInfo* pQueryInfo, SSqlNod
|
||||||
}
|
}
|
||||||
|
|
||||||
static STableMeta* extractTempTableMetaFromSubquery(SQueryInfo* pUpstream) {
|
static STableMeta* extractTempTableMetaFromSubquery(SQueryInfo* pUpstream) {
|
||||||
int32_t numOfColumns = pUpstream->fieldsInfo.numOfOutput;
|
STableMetaInfo* pUpstreamTableMetaInfo = tscGetMetaInfo(pUpstream, 0);
|
||||||
|
|
||||||
STableMeta* meta = calloc(1, sizeof(STableMeta) + sizeof(SSchema) * numOfColumns);
|
int32_t numOfColumns = pUpstream->fieldsInfo.numOfOutput;
|
||||||
|
STableMeta *meta = calloc(1, sizeof(STableMeta) + sizeof(SSchema) * numOfColumns);
|
||||||
meta->tableType = TSDB_TEMP_TABLE;
|
meta->tableType = TSDB_TEMP_TABLE;
|
||||||
|
|
||||||
STableComInfo *info = &meta->tableInfo;
|
STableComInfo *info = &meta->tableInfo;
|
||||||
info->numOfColumns = numOfColumns;
|
info->numOfColumns = numOfColumns;
|
||||||
info->numOfTags = 0;
|
info->precision = pUpstreamTableMetaInfo->pTableMeta->tableInfo.precision;
|
||||||
|
info->numOfTags = 0;
|
||||||
|
|
||||||
int32_t n = 0;
|
int32_t n = 0;
|
||||||
for(int32_t i = 0; i < numOfColumns; ++i) {
|
for(int32_t i = 0; i < numOfColumns; ++i) {
|
||||||
|
|
|
@ -105,8 +105,7 @@ typedef struct SResultRowInfo {
|
||||||
int16_t type:8; // data type for hash key
|
int16_t type:8; // data type for hash key
|
||||||
int32_t size:24; // number of result set
|
int32_t size:24; // number of result set
|
||||||
int32_t capacity; // max capacity
|
int32_t capacity; // max capacity
|
||||||
SResultRow* current; // current start active index
|
SResultRow* current; // current active result row
|
||||||
int64_t prevSKey; // previous (not completed) sliding window start key
|
|
||||||
} SResultRowInfo;
|
} SResultRowInfo;
|
||||||
|
|
||||||
typedef struct SColumnFilterElem {
|
typedef struct SColumnFilterElem {
|
||||||
|
|
|
@ -242,6 +242,7 @@ static void sortGroupResByOrderList(SGroupResInfo *pGroupResInfo, SQueryRuntimeE
|
||||||
if (size <= 0) {
|
if (size <= 0) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t orderId = pRuntimeEnv->pQueryAttr->order.orderColId;
|
int32_t orderId = pRuntimeEnv->pQueryAttr->order.orderColId;
|
||||||
if (orderId <= 0) {
|
if (orderId <= 0) {
|
||||||
return;
|
return;
|
||||||
|
@ -410,21 +411,6 @@ static void prepareResultListBuffer(SResultRowInfo* pResultRowInfo, SQueryRuntim
|
||||||
pResultRowInfo->capacity = (int32_t)newCapacity;
|
pResultRowInfo->capacity = (int32_t)newCapacity;
|
||||||
}
|
}
|
||||||
|
|
||||||
//static int32_t ascResultRowCompareFn(const void* p1, const void* p2) {
|
|
||||||
// SResultRow* pRow1 = *(SResultRow**)p1;
|
|
||||||
// SResultRow* pRow2 = *(SResultRow**)p2;
|
|
||||||
//
|
|
||||||
// if (pRow1 == pRow2) {
|
|
||||||
// return 0;
|
|
||||||
// } else {
|
|
||||||
// return pRow1->win.skey < pRow2->win.skey? -1:1;
|
|
||||||
// }
|
|
||||||
//}
|
|
||||||
|
|
||||||
//static int32_t descResultRowCompareFn(const void* p1, const void* p2) {
|
|
||||||
// return -ascResultRowCompareFn(p1, p2);
|
|
||||||
//}
|
|
||||||
|
|
||||||
static SResultRow *doPrepareResultRowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo, int64_t tid, char *pData,
|
static SResultRow *doPrepareResultRowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo, int64_t tid, char *pData,
|
||||||
int16_t bytes, bool masterscan, uint64_t tableGroupId) {
|
int16_t bytes, bool masterscan, uint64_t tableGroupId) {
|
||||||
bool existed = false;
|
bool existed = false;
|
||||||
|
@ -450,11 +436,6 @@ static SResultRow *doPrepareResultRowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SRes
|
||||||
SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, pData, bytes, tid);
|
SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, pData, bytes, tid);
|
||||||
void* ptr = taosHashGet(pRuntimeEnv->pResultRowListSet, pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
|
void* ptr = taosHashGet(pRuntimeEnv->pResultRowListSet, pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
|
||||||
existed = (ptr != NULL);
|
existed = (ptr != NULL);
|
||||||
// __compar_fn_t fn = QUERY_IS_ASC_QUERY(pRuntimeEnv->pQueryAttr)? ascResultRowCompareFn:descResultRowCompareFn;
|
|
||||||
// void* ptr = taosbsearch(p1, pResultRowInfo->pResult, pResultRowInfo->size, POINTER_BYTES, fn, TD_EQ);
|
|
||||||
// if (ptr != NULL) {
|
|
||||||
// existed = true;
|
|
||||||
// }
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -526,12 +507,12 @@ static STimeWindow getActiveTimeWindow(SResultRowInfo * pResultRowInfo, int64_t
|
||||||
STimeWindow w = {0};
|
STimeWindow w = {0};
|
||||||
|
|
||||||
if (pResultRowInfo->current == NULL) { // the first window, from the previous stored value
|
if (pResultRowInfo->current == NULL) { // the first window, from the previous stored value
|
||||||
if (pResultRowInfo->prevSKey == TSKEY_INITIAL_VAL) {
|
// if (pResultRowInfo->prevSKey == TSKEY_INITIAL_VAL) {
|
||||||
getInitialStartTimeWindow(pQueryAttr, ts, &w);
|
getInitialStartTimeWindow(pQueryAttr, ts, &w);
|
||||||
pResultRowInfo->prevSKey = w.skey;
|
// pResultRowInfo->prevSKey = w.skey;
|
||||||
} else {
|
// } else {
|
||||||
w.skey = pResultRowInfo->prevSKey;
|
// w.skey = pResultRowInfo->prevSKey;
|
||||||
}
|
// }
|
||||||
|
|
||||||
if (pQueryAttr->interval.intervalUnit == 'n' || pQueryAttr->interval.intervalUnit == 'y') {
|
if (pQueryAttr->interval.intervalUnit == 'n' || pQueryAttr->interval.intervalUnit == 'y') {
|
||||||
w.ekey = taosTimeAdd(w.skey, pQueryAttr->interval.interval, pQueryAttr->interval.intervalUnit, pQueryAttr->precision) - 1;
|
w.ekey = taosTimeAdd(w.skey, pQueryAttr->interval.interval, pQueryAttr->interval.intervalUnit, pQueryAttr->precision) - 1;
|
||||||
|
@ -539,10 +520,7 @@ static STimeWindow getActiveTimeWindow(SResultRowInfo * pResultRowInfo, int64_t
|
||||||
w.ekey = w.skey + pQueryAttr->interval.interval - 1;
|
w.ekey = w.skey + pQueryAttr->interval.interval - 1;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// int32_t slot = curTimeWindowIndex(pResultRowInfo);
|
w = pResultRowInfo->current->win;
|
||||||
// SResultRow* pWindowRes = getResultRow(pResultRowInfo, slot);
|
|
||||||
SResultRow* pWindowRes = pResultRowInfo->current;
|
|
||||||
w = pWindowRes->win;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (w.skey > ts || w.ekey < ts) {
|
if (w.skey > ts || w.ekey < ts) {
|
||||||
|
@ -747,8 +725,6 @@ static void doUpdateResultRowIndex(SResultRowInfo*pResultRowInfo, TSKEY lastKey,
|
||||||
} else {
|
} else {
|
||||||
pResultRowInfo->current = pResultRowInfo->pResult[i + 1]; // current not closed result object
|
pResultRowInfo->current = pResultRowInfo->pResult[i + 1]; // current not closed result object
|
||||||
}
|
}
|
||||||
|
|
||||||
pResultRowInfo->prevSKey = pResultRowInfo->current->win.skey;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1266,7 +1242,6 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
|
||||||
bool ascQuery = QUERY_IS_ASC_QUERY(pQueryAttr);
|
bool ascQuery = QUERY_IS_ASC_QUERY(pQueryAttr);
|
||||||
|
|
||||||
SResultRow* prevRow = pResultRowInfo->current;
|
SResultRow* prevRow = pResultRowInfo->current;
|
||||||
// int32_t prevIndex = curTimeWindowIndex(pResultRowInfo);
|
|
||||||
|
|
||||||
TSKEY* tsCols = NULL;
|
TSKEY* tsCols = NULL;
|
||||||
if (pSDataBlock->pDataBlock != NULL) {
|
if (pSDataBlock->pDataBlock != NULL) {
|
||||||
|
@ -1312,24 +1287,24 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
STimeWindow w = pRes->win;
|
STimeWindow w = pRes->win;
|
||||||
ret = setWindowOutputBufByKey(pRuntimeEnv, pResultRowInfo, pSDataBlock->info.tid, &w, masterScan, &pResult, tableGroupId, pInfo->pCtx,
|
ret = setWindowOutputBufByKey(pRuntimeEnv, pResultRowInfo, pSDataBlock->info.tid, &w, masterScan, &pResult,
|
||||||
numOfOutput, pInfo->rowCellInfoOffset);
|
tableGroupId, pInfo->pCtx, numOfOutput, pInfo->rowCellInfoOffset);
|
||||||
if (ret != TSDB_CODE_SUCCESS) {
|
if (ret != TSDB_CODE_SUCCESS) {
|
||||||
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
|
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||||
|
}
|
||||||
|
|
||||||
|
assert(!resultRowInterpolated(pResult, RESULT_ROW_END_INTERP));
|
||||||
|
|
||||||
|
doTimeWindowInterpolation(pOperatorInfo, pInfo, pSDataBlock->pDataBlock, *(TSKEY*)pRuntimeEnv->prevRow[0], -1,
|
||||||
|
tsCols[startPos], startPos, w.ekey, RESULT_ROW_END_INTERP);
|
||||||
|
|
||||||
|
setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
|
||||||
|
setNotInterpoWindowKey(pInfo->pCtx, pQueryAttr->numOfOutput, RESULT_ROW_START_INTERP);
|
||||||
|
|
||||||
|
doApplyFunctions(pRuntimeEnv, pInfo->pCtx, &w, startPos, 0, tsCols, pSDataBlock->info.rows, numOfOutput);
|
||||||
}
|
}
|
||||||
|
|
||||||
assert(!resultRowInterpolated(pResult, RESULT_ROW_END_INTERP));
|
|
||||||
|
|
||||||
doTimeWindowInterpolation(pOperatorInfo, pInfo, pSDataBlock->pDataBlock, *(TSKEY *)pRuntimeEnv->prevRow[0],
|
|
||||||
-1, tsCols[startPos], startPos, w.ekey, RESULT_ROW_END_INTERP);
|
|
||||||
|
|
||||||
setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
|
|
||||||
setNotInterpoWindowKey(pInfo->pCtx, pQueryAttr->numOfOutput, RESULT_ROW_START_INTERP);
|
|
||||||
|
|
||||||
doApplyFunctions(pRuntimeEnv, pInfo->pCtx, &w, startPos, 0, tsCols, pSDataBlock->info.rows, numOfOutput);
|
|
||||||
}
|
|
||||||
|
|
||||||
// restore current time window
|
// restore current time window
|
||||||
ret = setWindowOutputBufByKey(pRuntimeEnv, pResultRowInfo, pSDataBlock->info.tid, &win, masterScan, &pResult, tableGroupId, pInfo->pCtx,
|
ret = setWindowOutputBufByKey(pRuntimeEnv, pResultRowInfo, pSDataBlock->info.tid, &win, masterScan, &pResult, tableGroupId, pInfo->pCtx,
|
||||||
numOfOutput, pInfo->rowCellInfoOffset);
|
numOfOutput, pInfo->rowCellInfoOffset);
|
||||||
|
@ -3703,12 +3678,16 @@ void setParamForStableStddevByColData(SQueryRuntimeEnv* pRuntimeEnv, SQLFunction
|
||||||
void setIntervalQueryRange(SQueryRuntimeEnv *pRuntimeEnv, TSKEY key) {
|
void setIntervalQueryRange(SQueryRuntimeEnv *pRuntimeEnv, TSKEY key) {
|
||||||
SQueryAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
|
SQueryAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
|
||||||
STableQueryInfo *pTableQueryInfo = pRuntimeEnv->current;
|
STableQueryInfo *pTableQueryInfo = pRuntimeEnv->current;
|
||||||
SResultRowInfo *pWindowResInfo = &pTableQueryInfo->resInfo;
|
SResultRowInfo *pResultRowInfo = &pTableQueryInfo->resInfo;
|
||||||
|
|
||||||
if (pWindowResInfo->prevSKey != TSKEY_INITIAL_VAL) {
|
if (pResultRowInfo->current != NULL) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// if (pWindowResInfo->prevSKey != TSKEY_INITIAL_VAL) {
|
||||||
|
// return;
|
||||||
|
// }
|
||||||
|
|
||||||
pTableQueryInfo->win.skey = key;
|
pTableQueryInfo->win.skey = key;
|
||||||
STimeWindow win = {.skey = key, .ekey = pQueryAttr->window.ekey};
|
STimeWindow win = {.skey = key, .ekey = pQueryAttr->window.ekey};
|
||||||
|
|
||||||
|
@ -3724,13 +3703,13 @@ void setIntervalQueryRange(SQueryRuntimeEnv *pRuntimeEnv, TSKEY key) {
|
||||||
TSKEY ek = MAX(win.skey, win.ekey);
|
TSKEY ek = MAX(win.skey, win.ekey);
|
||||||
getAlignQueryTimeWindow(pQueryAttr, win.skey, sk, ek, &w);
|
getAlignQueryTimeWindow(pQueryAttr, win.skey, sk, ek, &w);
|
||||||
|
|
||||||
if (pWindowResInfo->prevSKey == TSKEY_INITIAL_VAL) {
|
// if (pResultRowInfo->prevSKey == TSKEY_INITIAL_VAL) {
|
||||||
if (!QUERY_IS_ASC_QUERY(pQueryAttr)) {
|
// if (!QUERY_IS_ASC_QUERY(pQueryAttr)) {
|
||||||
assert(win.ekey == pQueryAttr->window.ekey);
|
// assert(win.ekey == pQueryAttr->window.ekey);
|
||||||
}
|
// }
|
||||||
|
//
|
||||||
pWindowResInfo->prevSKey = w.skey;
|
// pResultRowInfo->prevSKey = w.skey;
|
||||||
}
|
// }
|
||||||
|
|
||||||
pTableQueryInfo->lastKey = pTableQueryInfo->win.skey;
|
pTableQueryInfo->lastKey = pTableQueryInfo->win.skey;
|
||||||
}
|
}
|
||||||
|
@ -4631,7 +4610,7 @@ static SSDataBlock* doTableScan(void* param, bool *newgroup) {
|
||||||
|
|
||||||
if (pResultRowInfo->size > 0) {
|
if (pResultRowInfo->size > 0) {
|
||||||
pResultRowInfo->current = pResultRowInfo->pResult[0];
|
pResultRowInfo->current = pResultRowInfo->pResult[0];
|
||||||
pResultRowInfo->prevSKey = pResultRowInfo->pResult[0]->win.skey;
|
// pResultRowInfo->prevSKey = pResultRowInfo->pResult[0]->win.skey;
|
||||||
}
|
}
|
||||||
|
|
||||||
qDebug("QInfo:0x%"PRIx64" start to repeat scan data blocks due to query func required, qrange:%" PRId64 "-%" PRId64,
|
qDebug("QInfo:0x%"PRIx64" start to repeat scan data blocks due to query func required, qrange:%" PRId64 "-%" PRId64,
|
||||||
|
@ -4657,7 +4636,7 @@ static SSDataBlock* doTableScan(void* param, bool *newgroup) {
|
||||||
|
|
||||||
if (pResultRowInfo->size > 0) {
|
if (pResultRowInfo->size > 0) {
|
||||||
pResultRowInfo->current = pResultRowInfo->pResult[pResultRowInfo->size - 1];
|
pResultRowInfo->current = pResultRowInfo->pResult[pResultRowInfo->size - 1];
|
||||||
pResultRowInfo->prevSKey = pResultRowInfo->current->win.skey;
|
// pResultRowInfo->prevSKey = pResultRowInfo->current->win.skey;
|
||||||
}
|
}
|
||||||
|
|
||||||
p = doTableScanImpl(pOperator, newgroup);
|
p = doTableScanImpl(pOperator, newgroup);
|
||||||
|
|
|
@ -44,7 +44,7 @@ int32_t getOutputInterResultBufSize(SQueryAttr* pQueryAttr) {
|
||||||
int32_t initResultRowInfo(SResultRowInfo *pResultRowInfo, int32_t size, int16_t type) {
|
int32_t initResultRowInfo(SResultRowInfo *pResultRowInfo, int32_t size, int16_t type) {
|
||||||
pResultRowInfo->type = type;
|
pResultRowInfo->type = type;
|
||||||
pResultRowInfo->size = 0;
|
pResultRowInfo->size = 0;
|
||||||
pResultRowInfo->prevSKey = TSKEY_INITIAL_VAL;
|
// pResultRowInfo->prevSKey = TSKEY_INITIAL_VAL;
|
||||||
pResultRowInfo->current = NULL;
|
pResultRowInfo->current = NULL;
|
||||||
pResultRowInfo->capacity = size;
|
pResultRowInfo->capacity = size;
|
||||||
|
|
||||||
|
@ -93,7 +93,7 @@ void resetResultRowInfo(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRo
|
||||||
|
|
||||||
pResultRowInfo->size = 0;
|
pResultRowInfo->size = 0;
|
||||||
pResultRowInfo->current = NULL;
|
pResultRowInfo->current = NULL;
|
||||||
pResultRowInfo->prevSKey = TSKEY_INITIAL_VAL;
|
// pResultRowInfo->prevSKey = TSKEY_INITIAL_VAL;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t numOfClosedResultRows(SResultRowInfo *pResultRowInfo) {
|
int32_t numOfClosedResultRows(SResultRowInfo *pResultRowInfo) {
|
||||||
|
|
|
@ -450,4 +450,44 @@ if $data11 != 1 then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
|
|
||||||
|
print =====================>TD-5157
|
||||||
|
sql select twa(c1) from nest_tb1 interval(19a);
|
||||||
|
if $rows != 10000 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data00 != @20-09-14 23:59:59.992@ then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data01 != 0.000083333 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
print =================>us database interval query, TD-5039
|
||||||
|
sql create database test precision 'us';
|
||||||
|
sql use test;
|
||||||
|
sql create table t1(ts timestamp, k int);
|
||||||
|
sql insert into t1 values('2020-01-01 01:01:01.000', 1) ('2020-01-01 01:02:00.000', 2);
|
||||||
|
sql select avg(k) from (select avg(k) k from t1 interval(1s)) interval(1m);
|
||||||
|
if $rows != 2 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data00 != @20-01-01 01:01:00.000000@ then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data01 != 1.000000000 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data10 != @20-01-01 01:02:00.000000@ then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data11 != 2.000000000 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
Loading…
Reference in New Issue