Merge pull request #7483 from taosdata/fix/TD-6046-master
[TD-6046]<fix> fix ts top/bottom/diff/derivative ts error
This commit is contained in:
commit
e07be20f76
|
@ -643,7 +643,7 @@ static void doExecuteFinalMerge(SOperatorInfo* pOperator, int32_t numOfExpr, SSD
|
||||||
for(int32_t j = 0; j < numOfExpr; ++j) {
|
for(int32_t j = 0; j < numOfExpr; ++j) {
|
||||||
pCtx[j].pOutput += (pCtx[j].outputBytes * numOfRows);
|
pCtx[j].pOutput += (pCtx[j].outputBytes * numOfRows);
|
||||||
if (pCtx[j].functionId == TSDB_FUNC_TOP || pCtx[j].functionId == TSDB_FUNC_BOTTOM) {
|
if (pCtx[j].functionId == TSDB_FUNC_TOP || pCtx[j].functionId == TSDB_FUNC_BOTTOM) {
|
||||||
pCtx[j].ptsOutputBuf = pCtx[0].pOutput;
|
if(j>0) pCtx[j].ptsOutputBuf = pCtx[j-1].pOutput;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -2603,13 +2603,12 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
|
||||||
|
|
||||||
// set the first column ts for diff query
|
// set the first column ts for diff query
|
||||||
if (functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_DERIVATIVE) {
|
if (functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_DERIVATIVE) {
|
||||||
colIndex += 1;
|
|
||||||
SColumnIndex indexTS = {.tableIndex = index.tableIndex, .columnIndex = 0};
|
SColumnIndex indexTS = {.tableIndex = index.tableIndex, .columnIndex = 0};
|
||||||
SExprInfo* pExpr = tscExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &indexTS, TSDB_DATA_TYPE_TIMESTAMP,
|
SExprInfo* pExpr = tscExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &indexTS, TSDB_DATA_TYPE_TIMESTAMP,
|
||||||
TSDB_KEYSIZE, getNewResColId(pCmd), TSDB_KEYSIZE, false);
|
TSDB_KEYSIZE, getNewResColId(pCmd), TSDB_KEYSIZE, false);
|
||||||
|
|
||||||
SColumnList ids = createColumnList(1, 0, 0);
|
SColumnList ids = createColumnList(1, 0, 0);
|
||||||
insertResultField(pQueryInfo, 0, &ids, TSDB_KEYSIZE, TSDB_DATA_TYPE_TIMESTAMP, aAggs[TSDB_FUNC_TS_DUMMY].name, pExpr);
|
insertResultField(pQueryInfo, colIndex, &ids, TSDB_KEYSIZE, TSDB_DATA_TYPE_TIMESTAMP, aAggs[TSDB_FUNC_TS_DUMMY].name, pExpr);
|
||||||
}
|
}
|
||||||
|
|
||||||
SExprInfo* pExpr = tscExprAppend(pQueryInfo, functionId, &index, resultType, resultSize, getNewResColId(pCmd), intermediateResSize, false);
|
SExprInfo* pExpr = tscExprAppend(pQueryInfo, functionId, &index, resultType, resultSize, getNewResColId(pCmd), intermediateResSize, false);
|
||||||
|
@ -2882,7 +2881,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
|
||||||
|
|
||||||
const int32_t TS_COLUMN_INDEX = PRIMARYKEY_TIMESTAMP_COL_INDEX;
|
const int32_t TS_COLUMN_INDEX = PRIMARYKEY_TIMESTAMP_COL_INDEX;
|
||||||
SColumnList ids = createColumnList(1, index.tableIndex, TS_COLUMN_INDEX);
|
SColumnList ids = createColumnList(1, index.tableIndex, TS_COLUMN_INDEX);
|
||||||
insertResultField(pQueryInfo, TS_COLUMN_INDEX, &ids, TSDB_KEYSIZE, TSDB_DATA_TYPE_TIMESTAMP,
|
insertResultField(pQueryInfo, colIndex, &ids, TSDB_KEYSIZE, TSDB_DATA_TYPE_TIMESTAMP,
|
||||||
aAggs[TSDB_FUNC_TS].name, pExpr);
|
aAggs[TSDB_FUNC_TS].name, pExpr);
|
||||||
|
|
||||||
colIndex += 1; // the first column is ts
|
colIndex += 1; // the first column is ts
|
||||||
|
|
|
@ -597,6 +597,8 @@ bool doFilterDataBlock(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilter
|
||||||
void doCompactSDataBlock(SSDataBlock* pBlock, int32_t numOfRows, int8_t* p);
|
void doCompactSDataBlock(SSDataBlock* pBlock, int32_t numOfRows, int8_t* p);
|
||||||
|
|
||||||
SSDataBlock* createOutputBuf(SExprInfo* pExpr, int32_t numOfOutput, int32_t numOfRows);
|
SSDataBlock* createOutputBuf(SExprInfo* pExpr, int32_t numOfOutput, int32_t numOfRows);
|
||||||
|
void copyTsColoum(SSDataBlock* pRes, SQLFunctionCtx* pCtx, int32_t numOfOutput);
|
||||||
|
|
||||||
void* destroyOutputBuf(SSDataBlock* pBlock);
|
void* destroyOutputBuf(SSDataBlock* pBlock);
|
||||||
void* doDestroyFilterInfo(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols);
|
void* doDestroyFilterInfo(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols);
|
||||||
|
|
||||||
|
|
|
@ -3616,7 +3616,7 @@ void setDefaultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SOptrBasicInfo *pInfo, i
|
||||||
// set the timestamp output buffer for top/bottom/diff query
|
// set the timestamp output buffer for top/bottom/diff query
|
||||||
int32_t fid = pCtx[i].functionId;
|
int32_t fid = pCtx[i].functionId;
|
||||||
if (fid == TSDB_FUNC_TOP || fid == TSDB_FUNC_BOTTOM || fid == TSDB_FUNC_DIFF || fid == TSDB_FUNC_DERIVATIVE) {
|
if (fid == TSDB_FUNC_TOP || fid == TSDB_FUNC_BOTTOM || fid == TSDB_FUNC_DIFF || fid == TSDB_FUNC_DERIVATIVE) {
|
||||||
pCtx[i].ptsOutputBuf = pCtx[0].pOutput;
|
if(i>0) pCtx[i].ptsOutputBuf = pCtx[i-1].pOutput;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3651,7 +3651,37 @@ void updateOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity, int32_t numOf
|
||||||
// re-estabilish output buffer pointer.
|
// re-estabilish output buffer pointer.
|
||||||
int32_t functionId = pBInfo->pCtx[i].functionId;
|
int32_t functionId = pBInfo->pCtx[i].functionId;
|
||||||
if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_DERIVATIVE) {
|
if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_DERIVATIVE) {
|
||||||
pBInfo->pCtx[i].ptsOutputBuf = pBInfo->pCtx[i-1].pOutput;
|
if(i>0) pBInfo->pCtx[i].ptsOutputBuf = pBInfo->pCtx[i-1].pOutput;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void copyTsColoum(SSDataBlock* pRes, SQLFunctionCtx* pCtx, int32_t numOfOutput) {
|
||||||
|
bool needCopyTs = false;
|
||||||
|
int32_t tsNum = 0;
|
||||||
|
char *src = NULL;
|
||||||
|
for (int32_t i = 0; i < numOfOutput; i++) {
|
||||||
|
int32_t functionId = pCtx[i].functionId;
|
||||||
|
if (functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_DERIVATIVE) {
|
||||||
|
needCopyTs = true;
|
||||||
|
if (i > 0 && pCtx[i-1].functionId == TSDB_FUNC_TS_DUMMY){
|
||||||
|
SColumnInfoData* pColRes = taosArrayGet(pRes->pDataBlock, i - 1); // find ts data
|
||||||
|
src = pColRes->pData;
|
||||||
|
}
|
||||||
|
}else if(functionId == TSDB_FUNC_TS_DUMMY) {
|
||||||
|
tsNum++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!needCopyTs) return;
|
||||||
|
if (tsNum < 2) return;
|
||||||
|
if (src == NULL) return;
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < numOfOutput; i++) {
|
||||||
|
int32_t functionId = pCtx[i].functionId;
|
||||||
|
if(functionId == TSDB_FUNC_TS_DUMMY) {
|
||||||
|
SColumnInfoData* pColRes = taosArrayGet(pRes->pDataBlock, i);
|
||||||
|
memcpy(pColRes->pData, src, pColRes->info.bytes * pRes->info.rows);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -3851,7 +3881,7 @@ void setResultRowOutputBufInitCtx(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pRe
|
||||||
}
|
}
|
||||||
|
|
||||||
if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF) {
|
if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF) {
|
||||||
pCtx[i].ptsOutputBuf = pCtx[0].pOutput;
|
if(i>0) pCtx[i].ptsOutputBuf = pCtx[i-1].pOutput;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!pResInfo->initialized) {
|
if (!pResInfo->initialized) {
|
||||||
|
@ -3912,7 +3942,7 @@ void setResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult, SQLF
|
||||||
|
|
||||||
int32_t functionId = pCtx[i].functionId;
|
int32_t functionId = pCtx[i].functionId;
|
||||||
if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_DERIVATIVE) {
|
if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_DERIVATIVE) {
|
||||||
pCtx[i].ptsOutputBuf = pCtx[0].pOutput;
|
if(i>0) pCtx[i].ptsOutputBuf = pCtx[i-1].pOutput;
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -5698,6 +5728,7 @@ static SSDataBlock* doProjectOperation(void* param, bool* newgroup) {
|
||||||
|
|
||||||
pRes->info.rows = getNumOfResult(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput);
|
pRes->info.rows = getNumOfResult(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput);
|
||||||
if (pRes->info.rows >= pRuntimeEnv->resultInfo.threshold) {
|
if (pRes->info.rows >= pRuntimeEnv->resultInfo.threshold) {
|
||||||
|
copyTsColoum(pRes, pInfo->pCtx, pOperator->numOfOutput);
|
||||||
clearNumOfRes(pInfo->pCtx, pOperator->numOfOutput);
|
clearNumOfRes(pInfo->pCtx, pOperator->numOfOutput);
|
||||||
return pRes;
|
return pRes;
|
||||||
}
|
}
|
||||||
|
@ -5723,8 +5754,7 @@ static SSDataBlock* doProjectOperation(void* param, bool* newgroup) {
|
||||||
if (*newgroup) {
|
if (*newgroup) {
|
||||||
if (pRes->info.rows > 0) {
|
if (pRes->info.rows > 0) {
|
||||||
pProjectInfo->existDataBlock = pBlock;
|
pProjectInfo->existDataBlock = pBlock;
|
||||||
clearNumOfRes(pInfo->pCtx, pOperator->numOfOutput);
|
break;
|
||||||
return pInfo->pRes;
|
|
||||||
} else { // init output buffer for a new group data
|
} else { // init output buffer for a new group data
|
||||||
for (int32_t j = 0; j < pOperator->numOfOutput; ++j) {
|
for (int32_t j = 0; j < pOperator->numOfOutput; ++j) {
|
||||||
aAggs[pInfo->pCtx[j].functionId].xFinalize(&pInfo->pCtx[j]);
|
aAggs[pInfo->pCtx[j].functionId].xFinalize(&pInfo->pCtx[j]);
|
||||||
|
@ -5754,7 +5784,7 @@ static SSDataBlock* doProjectOperation(void* param, bool* newgroup) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
copyTsColoum(pRes, pInfo->pCtx, pOperator->numOfOutput);
|
||||||
clearNumOfRes(pInfo->pCtx, pOperator->numOfOutput);
|
clearNumOfRes(pInfo->pCtx, pOperator->numOfOutput);
|
||||||
return (pInfo->pRes->info.rows > 0)? pInfo->pRes:NULL;
|
return (pInfo->pRes->info.rows > 0)? pInfo->pRes:NULL;
|
||||||
}
|
}
|
||||||
|
|
|
@ -104,6 +104,21 @@ class TDTestCase:
|
||||||
tdSql.checkRows(2)
|
tdSql.checkRows(2)
|
||||||
tdSql.checkData(0, 1, 1)
|
tdSql.checkData(0, 1, 1)
|
||||||
tdSql.checkData(1, 1, 2)
|
tdSql.checkData(1, 1, 2)
|
||||||
|
|
||||||
|
tdSql.query("select ts,bottom(col1, 2),ts from test1")
|
||||||
|
tdSql.checkRows(2)
|
||||||
|
tdSql.checkData(0, 0, "2018-09-17 09:00:00.000")
|
||||||
|
tdSql.checkData(0, 1, "2018-09-17 09:00:00.000")
|
||||||
|
tdSql.checkData(1, 0, "2018-09-17 09:00:00.001")
|
||||||
|
tdSql.checkData(1, 3, "2018-09-17 09:00:00.001")
|
||||||
|
|
||||||
|
|
||||||
|
tdSql.query("select ts,bottom(col1, 2),ts from test group by tbname")
|
||||||
|
tdSql.checkRows(2)
|
||||||
|
tdSql.checkData(0, 0, "2018-09-17 09:00:00.000")
|
||||||
|
tdSql.checkData(0, 1, "2018-09-17 09:00:00.000")
|
||||||
|
tdSql.checkData(1, 0, "2018-09-17 09:00:00.001")
|
||||||
|
tdSql.checkData(1, 3, "2018-09-17 09:00:00.001")
|
||||||
|
|
||||||
#TD-2457 bottom + interval + order by
|
#TD-2457 bottom + interval + order by
|
||||||
tdSql.error('select top(col2,1) from test interval(1y) order by col2;')
|
tdSql.error('select top(col2,1) from test interval(1y) order by col2;')
|
||||||
|
|
|
@ -54,6 +54,28 @@ class TDTestCase:
|
||||||
tdSql.query("select derivative(col, 10s, 0) from stb group by tbname")
|
tdSql.query("select derivative(col, 10s, 0) from stb group by tbname")
|
||||||
tdSql.checkRows(10)
|
tdSql.checkRows(10)
|
||||||
|
|
||||||
|
tdSql.query("select ts,derivative(col, 10s, 1),ts from stb group by tbname")
|
||||||
|
tdSql.checkRows(4)
|
||||||
|
tdSql.checkData(0, 0, "2018-09-17 09:00:10.000")
|
||||||
|
tdSql.checkData(0, 1, "2018-09-17 09:00:10.000")
|
||||||
|
tdSql.checkData(0, 3, "2018-09-17 09:00:10.000")
|
||||||
|
tdSql.checkData(3, 0, "2018-09-17 09:01:20.000")
|
||||||
|
tdSql.checkData(3, 1, "2018-09-17 09:01:20.000")
|
||||||
|
tdSql.checkData(3, 3, "2018-09-17 09:01:20.000")
|
||||||
|
|
||||||
|
tdSql.query("select ts,derivative(col, 10s, 1),ts from tb1")
|
||||||
|
tdSql.checkRows(2)
|
||||||
|
tdSql.checkData(0, 0, "2018-09-17 09:00:10.000")
|
||||||
|
tdSql.checkData(0, 1, "2018-09-17 09:00:10.000")
|
||||||
|
tdSql.checkData(0, 3, "2018-09-17 09:00:10.000")
|
||||||
|
tdSql.checkData(1, 0, "2018-09-17 09:00:20.009")
|
||||||
|
tdSql.checkData(1, 1, "2018-09-17 09:00:20.009")
|
||||||
|
tdSql.checkData(1, 3, "2018-09-17 09:00:20.009")
|
||||||
|
|
||||||
|
tdSql.query("select ts from(select ts,derivative(col, 10s, 0) from stb group by tbname)")
|
||||||
|
|
||||||
|
tdSql.checkData(0, 0, "2018-09-17 09:00:10.000")
|
||||||
|
|
||||||
tdSql.error("select derivative(col, 10s, 0) from tb1 group by tbname")
|
tdSql.error("select derivative(col, 10s, 0) from tb1 group by tbname")
|
||||||
|
|
||||||
tdSql.query("select derivative(col, 10s, 1) from tb1")
|
tdSql.query("select derivative(col, 10s, 1) from tb1")
|
||||||
|
|
|
@ -94,6 +94,23 @@ class TDTestCase:
|
||||||
tdSql.error("select diff(col13) from test")
|
tdSql.error("select diff(col13) from test")
|
||||||
tdSql.error("select diff(col14) from test")
|
tdSql.error("select diff(col14) from test")
|
||||||
|
|
||||||
|
tdSql.query("select ts,diff(col1),ts from test1")
|
||||||
|
tdSql.checkRows(10)
|
||||||
|
tdSql.checkData(0, 0, "2018-09-17 09:00:00.000")
|
||||||
|
tdSql.checkData(0, 1, "2018-09-17 09:00:00.000")
|
||||||
|
tdSql.checkData(0, 3, "2018-09-17 09:00:00.000")
|
||||||
|
tdSql.checkData(9, 0, "2018-09-17 09:00:00.009")
|
||||||
|
tdSql.checkData(9, 1, "2018-09-17 09:00:00.009")
|
||||||
|
tdSql.checkData(9, 3, "2018-09-17 09:00:00.009")
|
||||||
|
|
||||||
|
tdSql.query("select ts,diff(col1),ts from test group by tbname")
|
||||||
|
tdSql.checkRows(10)
|
||||||
|
tdSql.checkData(0, 0, "2018-09-17 09:00:00.000")
|
||||||
|
tdSql.checkData(0, 1, "2018-09-17 09:00:00.000")
|
||||||
|
tdSql.checkData(0, 3, "2018-09-17 09:00:00.000")
|
||||||
|
tdSql.checkData(9, 0, "2018-09-17 09:00:00.009")
|
||||||
|
tdSql.checkData(9, 1, "2018-09-17 09:00:00.009")
|
||||||
|
tdSql.checkData(9, 3, "2018-09-17 09:00:00.009")
|
||||||
|
|
||||||
tdSql.query("select diff(col1) from test1")
|
tdSql.query("select diff(col1) from test1")
|
||||||
tdSql.checkRows(10)
|
tdSql.checkRows(10)
|
||||||
|
|
|
@ -117,6 +117,21 @@ class TDTestCase:
|
||||||
tdSql.checkRows(2)
|
tdSql.checkRows(2)
|
||||||
tdSql.checkData(0, 1, 8.1)
|
tdSql.checkData(0, 1, 8.1)
|
||||||
tdSql.checkData(1, 1, 9.1)
|
tdSql.checkData(1, 1, 9.1)
|
||||||
|
|
||||||
|
tdSql.query("select ts,top(col1, 2),ts from test1")
|
||||||
|
tdSql.checkRows(2)
|
||||||
|
tdSql.checkData(0, 0, "2018-09-17 09:00:00.008")
|
||||||
|
tdSql.checkData(0, 1, "2018-09-17 09:00:00.008")
|
||||||
|
tdSql.checkData(1, 0, "2018-09-17 09:00:00.009")
|
||||||
|
tdSql.checkData(1, 3, "2018-09-17 09:00:00.009")
|
||||||
|
|
||||||
|
|
||||||
|
tdSql.query("select ts,top(col1, 2),ts from test group by tbname")
|
||||||
|
tdSql.checkRows(2)
|
||||||
|
tdSql.checkData(0, 0, "2018-09-17 09:00:00.008")
|
||||||
|
tdSql.checkData(0, 1, "2018-09-17 09:00:00.008")
|
||||||
|
tdSql.checkData(1, 0, "2018-09-17 09:00:00.009")
|
||||||
|
tdSql.checkData(1, 3, "2018-09-17 09:00:00.009")
|
||||||
|
|
||||||
#TD-2563 top + super_table + interval
|
#TD-2563 top + super_table + interval
|
||||||
tdSql.execute("create table meters(ts timestamp, c int) tags (d int)")
|
tdSql.execute("create table meters(ts timestamp, c int) tags (d int)")
|
||||||
|
|
Loading…
Reference in New Issue