[TD-2624]<fix>: fix crash in twa query.
This commit is contained in:
parent
5a7e631ca2
commit
8e9ec9df06
|
@ -717,7 +717,7 @@ static FORCE_INLINE int32_t getForwardStepsInBlock(int32_t numOfRows, __block_se
|
|||
return forwardStep;
|
||||
}
|
||||
|
||||
static void doUpdateResultRowIndex(SResultRowInfo*pResultRowInfo, TSKEY lastKey, bool ascQuery) {
|
||||
static void doUpdateResultRowIndex(SResultRowInfo*pResultRowInfo, TSKEY lastKey, bool ascQuery, bool timeWindowInterpo) {
|
||||
int64_t skey = TSKEY_INITIAL_VAL;
|
||||
int32_t i = 0;
|
||||
for (i = pResultRowInfo->size - 1; i >= 0; --i) {
|
||||
|
@ -727,10 +727,22 @@ static void doUpdateResultRowIndex(SResultRowInfo*pResultRowInfo, TSKEY lastKey,
|
|||
}
|
||||
|
||||
// new closed result rows
|
||||
if ((pResult->win.ekey <= lastKey && ascQuery) || (pResult->win.skey >= lastKey && !ascQuery)) {
|
||||
closeResultRow(pResultRowInfo, i);
|
||||
if (timeWindowInterpo) {
|
||||
if (pResult->endInterp && ((pResult->win.skey <= lastKey && ascQuery) || (pResult->win.skey >= lastKey && !ascQuery))) {
|
||||
if (i > 0) { // the first time window, the startInterp is false.
|
||||
assert(pResult->startInterp);
|
||||
}
|
||||
|
||||
closeResultRow(pResultRowInfo, i);
|
||||
} else {
|
||||
skey = pResult->win.skey;
|
||||
}
|
||||
} else {
|
||||
skey = pResult->win.skey;
|
||||
if ((pResult->win.ekey <= lastKey && ascQuery) || (pResult->win.skey >= lastKey && !ascQuery)) {
|
||||
closeResultRow(pResultRowInfo, i);
|
||||
} else {
|
||||
skey = pResult->win.skey;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -751,13 +763,13 @@ static void doUpdateResultRowIndex(SResultRowInfo*pResultRowInfo, TSKEY lastKey,
|
|||
}
|
||||
}
|
||||
|
||||
static void updateResultRowIndex(SResultRowInfo* pResultRowInfo, STableQueryInfo* pTableQueryInfo, bool ascQuery) {
|
||||
static void updateResultRowIndex(SResultRowInfo* pResultRowInfo, STableQueryInfo* pTableQueryInfo, bool ascQuery, bool timeWindowInterpo) {
|
||||
if ((pTableQueryInfo->lastKey > pTableQueryInfo->win.ekey && ascQuery) || (pTableQueryInfo->lastKey < pTableQueryInfo->win.ekey && (!ascQuery))) {
|
||||
closeAllResultRows(pResultRowInfo);
|
||||
pResultRowInfo->curIndex = pResultRowInfo->size - 1;
|
||||
} else {
|
||||
int32_t step = ascQuery? 1:-1;
|
||||
doUpdateResultRowIndex(pResultRowInfo, pTableQueryInfo->lastKey - step, ascQuery);
|
||||
doUpdateResultRowIndex(pResultRowInfo, pTableQueryInfo->lastKey - step, ascQuery, timeWindowInterpo);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1076,13 +1088,13 @@ static bool setTimeWindowInterpolationEndTs(SQueryRuntimeEnv* pRuntimeEnv, int32
|
|||
return true;
|
||||
}
|
||||
|
||||
static void saveDataBlockLastRow(SQueryRuntimeEnv* pRuntimeEnv, SDataBlockInfo* pDataBlockInfo, SArray* pDataBlock) {
|
||||
static void saveDataBlockLastRow(SQueryRuntimeEnv* pRuntimeEnv, SDataBlockInfo* pDataBlockInfo, SArray* pDataBlock,
|
||||
int32_t rowIndex) {
|
||||
if (pDataBlock == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
SQuery* pQuery = pRuntimeEnv->pQuery;
|
||||
int32_t rowIndex = QUERY_IS_ASC_QUERY(pQuery)? pDataBlockInfo->rows-1:0;
|
||||
for (int32_t k = 0; k < pQuery->numOfCols; ++k) {
|
||||
SColumnInfoData *pColInfo = taosArrayGet(pDataBlock, k);
|
||||
memcpy(pRuntimeEnv->prevRow[k], ((char*)pColInfo->pData) + (pColInfo->info.bytes * rowIndex), pColInfo->info.bytes);
|
||||
|
@ -1265,7 +1277,8 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *
|
|||
|
||||
_end:
|
||||
if (pRuntimeEnv->timeWindowInterpo) {
|
||||
saveDataBlockLastRow(pRuntimeEnv, pDataBlockInfo, pDataBlock);
|
||||
int32_t rowIndex = QUERY_IS_ASC_QUERY(pQuery)? pDataBlockInfo->rows-1:0;
|
||||
saveDataBlockLastRow(pRuntimeEnv, pDataBlockInfo, pDataBlock, rowIndex);
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
|
||||
|
@ -1512,7 +1525,7 @@ static void setTimeWindowEKeyInterp(SQueryRuntimeEnv* pRuntimeEnv, SArray* pData
|
|||
}
|
||||
|
||||
static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pStatis, SDataBlockInfo *pDataBlockInfo,
|
||||
SResultRowInfo *pWindowResInfo, SArray *pDataBlock) {
|
||||
SResultRowInfo *pWindowResInfo, SArray *pDataBlock) {
|
||||
SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx;
|
||||
bool masterScan = IS_MASTER_SCAN(pRuntimeEnv);
|
||||
|
||||
|
@ -1587,7 +1600,7 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
|
|||
// interval window query, decide the time window according to the primary timestamp
|
||||
if (QUERY_IS_INTERVAL_QUERY(pQuery)) {
|
||||
int32_t prevWindowIndex = curTimeWindowIndex(pWindowResInfo);
|
||||
int64_t ts = tsCols[offset];
|
||||
int64_t ts = tsCols[offset];
|
||||
|
||||
STimeWindow win = getActiveTimeWindow(pWindowResInfo, ts, pQuery);
|
||||
|
||||
|
@ -1629,8 +1642,6 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
|
|||
doRowwiseApplyFunctions(pRuntimeEnv, &win, offset);
|
||||
|
||||
STimeWindow nextWin = win;
|
||||
int32_t index = pWindowResInfo->curIndex;
|
||||
|
||||
while (1) {
|
||||
getNextTimeWindow(pQuery, &nextWin);
|
||||
if ((nextWin.skey > pQuery->window.ekey && QUERY_IS_ASC_QUERY(pQuery)) ||
|
||||
|
@ -1652,7 +1663,6 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
|
|||
doRowwiseApplyFunctions(pRuntimeEnv, &nextWin, offset);
|
||||
}
|
||||
|
||||
pWindowResInfo->curIndex = index;
|
||||
} else { // other queries
|
||||
// decide which group this rows belongs to according to current state value
|
||||
if (groupbyColumnValue) {
|
||||
|
@ -1686,10 +1696,17 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
|
|||
|
||||
_end:
|
||||
assert(offset >= 0);
|
||||
assert(tsCols != NULL);
|
||||
|
||||
if (tsCols != NULL) {
|
||||
item->lastKey = tsCols[offset] + step;
|
||||
item->lastKey = prevTs + step;
|
||||
} else {
|
||||
item->lastKey = (QUERY_IS_ASC_QUERY(pQuery)? pDataBlockInfo->window.ekey:pDataBlockInfo->window.skey) + step;
|
||||
item->lastKey = (QUERY_IS_ASC_QUERY(pQuery) ? pDataBlockInfo->window.ekey : pDataBlockInfo->window.skey) + step;
|
||||
}
|
||||
|
||||
// In case of all rows in current block are not qualified
|
||||
if (pRuntimeEnv->timeWindowInterpo && prevRowIndex != -1) {
|
||||
saveDataBlockLastRow(pRuntimeEnv, pDataBlockInfo, pDataBlock, prevRowIndex);
|
||||
}
|
||||
|
||||
if (pRuntimeEnv->pTsBuf != NULL) {
|
||||
|
@ -1729,7 +1746,7 @@ static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBl
|
|||
int32_t numOfRes = 0;
|
||||
if (QUERY_IS_INTERVAL_QUERY(pQuery) || pRuntimeEnv->groupbyNormalCol) {
|
||||
numOfRes = pResultRowInfo->size;
|
||||
updateResultRowIndex(pResultRowInfo, pTableQueryInfo, QUERY_IS_ASC_QUERY(pQuery));
|
||||
updateResultRowIndex(pResultRowInfo, pTableQueryInfo, QUERY_IS_ASC_QUERY(pQuery), pRuntimeEnv->timeWindowInterpo);
|
||||
} else { // projection query
|
||||
numOfRes = (int32_t) getNumOfResult(pRuntimeEnv);
|
||||
|
||||
|
@ -4210,7 +4227,7 @@ static void stableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBloc
|
|||
}
|
||||
|
||||
if (QUERY_IS_INTERVAL_QUERY(pQuery)) {
|
||||
updateResultRowIndex(pResultRowInfo, pTableQueryInfo, QUERY_IS_ASC_QUERY(pQuery));
|
||||
updateResultRowIndex(pResultRowInfo, pTableQueryInfo, QUERY_IS_ASC_QUERY(pQuery), pRuntimeEnv->timeWindowInterpo);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -4510,7 +4527,11 @@ static TSKEY doSkipIntervalProcess(SQueryRuntimeEnv* pRuntimeEnv, STimeWindow* w
|
|||
|
||||
static bool skipTimeInterval(SQueryRuntimeEnv *pRuntimeEnv, TSKEY* start) {
|
||||
SQuery *pQuery = pRuntimeEnv->pQuery;
|
||||
assert(*start <= pQuery->current->lastKey);
|
||||
if (QUERY_IS_ASC_QUERY(pQuery)) {
|
||||
assert(*start <= pQuery->current->lastKey);
|
||||
} else {
|
||||
assert(*start >= pQuery->current->lastKey);
|
||||
}
|
||||
|
||||
// if queried with value filter, do NOT forward query start position
|
||||
if (pQuery->limit.offset <= 0 || pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTsBuf != NULL || pRuntimeEnv->pFillInfo != NULL) {
|
||||
|
|
|
@ -106,7 +106,7 @@ while $x < 5000
|
|||
endw
|
||||
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||
sleep 3000
|
||||
sleep 1000
|
||||
system sh/exec.sh -n dnode1 -s start
|
||||
print ================== server restart completed
|
||||
sql connect
|
||||
|
|
|
@ -311,3 +311,53 @@ if $rows != 6 then
|
|||
return -1
|
||||
endi
|
||||
|
||||
print ==================> td-2624
|
||||
sql create table tm2(ts timestamp, k int, b binary(12));
|
||||
sql insert into tm2 values('2011-01-02 18:42:45.326', -1,'abc');
|
||||
sql insert into tm2 values('2020-07-30 17:44:06.283', 0, null);
|
||||
sql insert into tm2 values('2020-07-30 17:44:19.578', 9999999, null);
|
||||
sql insert into tm2 values('2020-07-30 17:46:06.417', NULL, null);
|
||||
sql insert into tm2 values('2020-11-09 18:42:25.538', 0, null);
|
||||
sql insert into tm2 values('2020-12-29 17:43:11.641', 0, null);
|
||||
sql insert into tm2 values('2020-12-29 18:43:17.129', 0, null);
|
||||
sql insert into tm2 values('2020-12-29 18:46:19.109', NULL, null);
|
||||
sql insert into tm2 values('2021-01-03 18:40:40.065', 0, null);
|
||||
|
||||
sql select twa(k),first(ts) from tm2 where k <50 interval(17s);
|
||||
if $rows != 6 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data00 != @11-01-02 18:42:42.000@ then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data02 != @11-01-02 18:42:45.326@ then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data10 != @20-07-30 17:43:59.000@ then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data21 != 0.000000000 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select twa(k),first(ts) from tm2 where k <50 interval(17s) order by ts desc;
|
||||
if $rows != 6 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select twa(k),first(ts),count(k),first(k) from tm2 interval(17s) limit 20 offset 0;
|
||||
if $rows != 9 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data00 != @11-01-02 18:42:42.000@ then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data10 != @20-07-30 17:43:59.000@ then
|
||||
return -1
|
||||
endi
|
||||
|
|
Loading…
Reference in New Issue