Merge pull request #7481 from taosdata/fix/TD-6046
[TD-6046]<fix> fix ts top/bottom/diff/derivative ts error
This commit is contained in:
commit
751beb46ca
|
@ -214,6 +214,7 @@ SExprInfo* tscExprUpdate(SQueryInfo* pQueryInfo, int32_t index, int16_t function
|
|||
int16_t size);
|
||||
|
||||
size_t tscNumOfExprs(SQueryInfo* pQueryInfo);
|
||||
int32_t tscExprTopBottomIndex(SQueryInfo* pQueryInfo);
|
||||
SExprInfo *tscExprGet(SQueryInfo* pQueryInfo, int32_t index);
|
||||
int32_t tscExprCopy(SArray* dst, const SArray* src, uint64_t uid, bool deepcopy);
|
||||
int32_t tscExprCopyAll(SArray* dst, const SArray* src, bool deepcopy);
|
||||
|
|
|
@ -643,7 +643,7 @@ static void doExecuteFinalMerge(SOperatorInfo* pOperator, int32_t numOfExpr, SSD
|
|||
for(int32_t j = 0; j < numOfExpr; ++j) {
|
||||
pCtx[j].pOutput += (pCtx[j].outputBytes * numOfRows);
|
||||
if (pCtx[j].functionId == TSDB_FUNC_TOP || pCtx[j].functionId == TSDB_FUNC_BOTTOM) {
|
||||
pCtx[j].ptsOutputBuf = pCtx[0].pOutput;
|
||||
if(j > 0)pCtx[j].ptsOutputBuf = pCtx[j - 1].pOutput;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -2606,13 +2606,12 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
|
|||
|
||||
// set the first column ts for diff query
|
||||
if (functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_DERIVATIVE) {
|
||||
colIndex += 1;
|
||||
SColumnIndex indexTS = {.tableIndex = index.tableIndex, .columnIndex = 0};
|
||||
SExprInfo* pExpr = tscExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &indexTS, TSDB_DATA_TYPE_TIMESTAMP,
|
||||
TSDB_KEYSIZE, getNewResColId(pCmd), TSDB_KEYSIZE, false);
|
||||
|
||||
SColumnList ids = createColumnList(1, 0, 0);
|
||||
insertResultField(pQueryInfo, 0, &ids, TSDB_KEYSIZE, TSDB_DATA_TYPE_TIMESTAMP, aAggs[TSDB_FUNC_TS_DUMMY].name, pExpr);
|
||||
insertResultField(pQueryInfo, colIndex, &ids, TSDB_KEYSIZE, TSDB_DATA_TYPE_TIMESTAMP, aAggs[TSDB_FUNC_TS_DUMMY].name, pExpr);
|
||||
}
|
||||
|
||||
SExprInfo* pExpr = tscExprAppend(pQueryInfo, functionId, &index, resultType, resultSize, getNewResColId(pCmd), intermediateResSize, false);
|
||||
|
@ -2885,7 +2884,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
|
|||
|
||||
const int32_t TS_COLUMN_INDEX = PRIMARYKEY_TIMESTAMP_COL_INDEX;
|
||||
SColumnList ids = createColumnList(1, index.tableIndex, TS_COLUMN_INDEX);
|
||||
insertResultField(pQueryInfo, TS_COLUMN_INDEX, &ids, TSDB_KEYSIZE, TSDB_DATA_TYPE_TIMESTAMP,
|
||||
insertResultField(pQueryInfo, colIndex, &ids, TSDB_KEYSIZE, TSDB_DATA_TYPE_TIMESTAMP,
|
||||
aAggs[TSDB_FUNC_TS].name, pExpr);
|
||||
|
||||
colIndex += 1; // the first column is ts
|
||||
|
@ -5880,10 +5879,12 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq
|
|||
pQueryInfo->order.orderColId = pSchema[index.columnIndex].colId;
|
||||
} else if (isTopBottomQuery(pQueryInfo)) {
|
||||
/* order of top/bottom query in interval is not valid */
|
||||
SExprInfo* pExpr = tscExprGet(pQueryInfo, 0);
|
||||
int32_t pos = tscExprTopBottomIndex(pQueryInfo);
|
||||
assert(pos > 0);
|
||||
SExprInfo* pExpr = tscExprGet(pQueryInfo, pos - 1);
|
||||
assert(pExpr->base.functionId == TSDB_FUNC_TS);
|
||||
|
||||
pExpr = tscExprGet(pQueryInfo, 1);
|
||||
pExpr = tscExprGet(pQueryInfo, pos);
|
||||
if (pExpr->base.colInfo.colIndex != index.columnIndex && index.columnIndex != PRIMARYKEY_TIMESTAMP_COL_INDEX) {
|
||||
return invalidOperationMsg(pMsgBuf, msg5);
|
||||
}
|
||||
|
@ -5975,10 +5976,12 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq
|
|||
}
|
||||
} else {
|
||||
/* order of top/bottom query in interval is not valid */
|
||||
SExprInfo* pExpr = tscExprGet(pQueryInfo, 0);
|
||||
int32_t pos = tscExprTopBottomIndex(pQueryInfo);
|
||||
assert(pos > 0);
|
||||
SExprInfo* pExpr = tscExprGet(pQueryInfo, pos - 1);
|
||||
assert(pExpr->base.functionId == TSDB_FUNC_TS);
|
||||
|
||||
pExpr = tscExprGet(pQueryInfo, 1);
|
||||
pExpr = tscExprGet(pQueryInfo, pos);
|
||||
if (pExpr->base.colInfo.colIndex != index.columnIndex && index.columnIndex != PRIMARYKEY_TIMESTAMP_COL_INDEX) {
|
||||
return invalidOperationMsg(pMsgBuf, msg5);
|
||||
}
|
||||
|
|
|
@ -2419,6 +2419,19 @@ size_t tscNumOfExprs(SQueryInfo* pQueryInfo) {
|
|||
return taosArrayGetSize(pQueryInfo->exprList);
|
||||
}
|
||||
|
||||
int32_t tscExprTopBottomIndex(SQueryInfo* pQueryInfo){
|
||||
size_t numOfExprs = tscNumOfExprs(pQueryInfo);
|
||||
for(int32_t i = 0; i < numOfExprs; ++i) {
|
||||
SExprInfo* pExpr = tscExprGet(pQueryInfo, i);
|
||||
if (pExpr == NULL)
|
||||
continue;
|
||||
if (pExpr->base.functionId == TSDB_FUNC_TOP || pExpr->base.functionId == TSDB_FUNC_BOTTOM) {
|
||||
return i;
|
||||
}
|
||||
}
|
||||
return -1;
|
||||
}
|
||||
|
||||
// todo REFACTOR
|
||||
void tscExprAddParams(SSqlExpr* pExpr, char* argument, int32_t type, int32_t bytes) {
|
||||
assert (pExpr != NULL || argument != NULL || bytes != 0);
|
||||
|
|
|
@ -612,6 +612,7 @@ int32_t getNumOfResult(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx* pCtx, int3
|
|||
void finalizeQueryResult(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset);
|
||||
void updateOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity, int32_t numOfInputRows);
|
||||
void clearOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity);
|
||||
void copyTsColoum(SSDataBlock* pRes, SQLFunctionCtx* pCtx, int32_t numOfOutput);
|
||||
|
||||
void freeParam(SQueryParam *param);
|
||||
int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param);
|
||||
|
|
|
@ -3591,7 +3591,7 @@ void setDefaultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SOptrBasicInfo *pInfo, i
|
|||
// set the timestamp output buffer for top/bottom/diff query
|
||||
int32_t fid = pCtx[i].functionId;
|
||||
if (fid == TSDB_FUNC_TOP || fid == TSDB_FUNC_BOTTOM || fid == TSDB_FUNC_DIFF || fid == TSDB_FUNC_DERIVATIVE) {
|
||||
pCtx[i].ptsOutputBuf = pCtx[0].pOutput;
|
||||
if (i > 0) pCtx[i].ptsOutputBuf = pCtx[i-1].pOutput;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -3619,14 +3619,15 @@ void updateOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity, int32_t numOf
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
for (int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) {
|
||||
SColumnInfoData *pColInfo = taosArrayGet(pDataBlock->pDataBlock, i);
|
||||
pBInfo->pCtx[i].pOutput = pColInfo->pData + pColInfo->info.bytes * pDataBlock->info.rows;
|
||||
|
||||
// re-estabilish output buffer pointer.
|
||||
int32_t functionId = pBInfo->pCtx[i].functionId;
|
||||
if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_DERIVATIVE) {
|
||||
pBInfo->pCtx[i].ptsOutputBuf = pBInfo->pCtx[i-1].pOutput;
|
||||
if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_DERIVATIVE){
|
||||
if (i > 0) pBInfo->pCtx[i].ptsOutputBuf = pBInfo->pCtx[i-1].pOutput;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -3644,7 +3645,35 @@ void clearOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity) {
|
|||
}
|
||||
}
|
||||
|
||||
void copyTsColoum(SSDataBlock* pRes, SQLFunctionCtx* pCtx, int32_t numOfOutput) {
|
||||
bool needCopyTs = false;
|
||||
int32_t tsNum = 0;
|
||||
char *src = NULL;
|
||||
for (int32_t i = 0; i < numOfOutput; i++) {
|
||||
int32_t functionId = pCtx[i].functionId;
|
||||
if (functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_DERIVATIVE) {
|
||||
needCopyTs = true;
|
||||
if (i > 0 && pCtx[i-1].functionId == TSDB_FUNC_TS_DUMMY){
|
||||
SColumnInfoData* pColRes = taosArrayGet(pRes->pDataBlock, i - 1); // find ts data
|
||||
src = pColRes->pData;
|
||||
}
|
||||
}else if(functionId == TSDB_FUNC_TS_DUMMY) {
|
||||
tsNum++;
|
||||
}
|
||||
}
|
||||
|
||||
if (!needCopyTs) return;
|
||||
if (tsNum < 2) return;
|
||||
if (src == NULL) return;
|
||||
|
||||
for (int32_t i = 0; i < numOfOutput; i++) {
|
||||
int32_t functionId = pCtx[i].functionId;
|
||||
if(functionId == TSDB_FUNC_TS_DUMMY) {
|
||||
SColumnInfoData* pColRes = taosArrayGet(pRes->pDataBlock, i);
|
||||
memcpy(pColRes->pData, src, pColRes->info.bytes * pRes->info.rows);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void initCtxOutputBuffer(SQLFunctionCtx* pCtx, int32_t size) {
|
||||
for (int32_t j = 0; j < size; ++j) {
|
||||
|
@ -3826,7 +3855,7 @@ void setResultRowOutputBufInitCtx(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pRe
|
|||
}
|
||||
|
||||
if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF) {
|
||||
pCtx[i].ptsOutputBuf = pCtx[0].pOutput;
|
||||
if(i > 0) pCtx[i].ptsOutputBuf = pCtx[i-1].pOutput;
|
||||
}
|
||||
|
||||
if (!pResInfo->initialized) {
|
||||
|
@ -3887,7 +3916,7 @@ void setResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult, SQLF
|
|||
|
||||
int32_t functionId = pCtx[i].functionId;
|
||||
if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_DERIVATIVE) {
|
||||
pCtx[i].ptsOutputBuf = pCtx[0].pOutput;
|
||||
if(i > 0) pCtx[i].ptsOutputBuf = pCtx[i-1].pOutput;
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -5708,6 +5737,7 @@ static SSDataBlock* doProjectOperation(void* param, bool* newgroup) {
|
|||
|
||||
pRes->info.rows = getNumOfResult(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput);
|
||||
if (pRes->info.rows >= pRuntimeEnv->resultInfo.threshold) {
|
||||
copyTsColoum(pRes, pInfo->pCtx, pOperator->numOfOutput);
|
||||
clearNumOfRes(pInfo->pCtx, pOperator->numOfOutput);
|
||||
return pRes;
|
||||
}
|
||||
|
@ -5733,8 +5763,7 @@ static SSDataBlock* doProjectOperation(void* param, bool* newgroup) {
|
|||
if (*newgroup) {
|
||||
if (pRes->info.rows > 0) {
|
||||
pProjectInfo->existDataBlock = pBlock;
|
||||
clearNumOfRes(pInfo->pCtx, pOperator->numOfOutput);
|
||||
return pInfo->pRes;
|
||||
break;
|
||||
} else { // init output buffer for a new group data
|
||||
for (int32_t j = 0; j < pOperator->numOfOutput; ++j) {
|
||||
aAggs[pInfo->pCtx[j].functionId].xFinalize(&pInfo->pCtx[j]);
|
||||
|
@ -5764,7 +5793,7 @@ static SSDataBlock* doProjectOperation(void* param, bool* newgroup) {
|
|||
break;
|
||||
}
|
||||
}
|
||||
|
||||
copyTsColoum(pRes, pInfo->pCtx, pOperator->numOfOutput);
|
||||
clearNumOfRes(pInfo->pCtx, pOperator->numOfOutput);
|
||||
return (pInfo->pRes->info.rows > 0)? pInfo->pRes:NULL;
|
||||
}
|
||||
|
|
|
@ -104,6 +104,21 @@ class TDTestCase:
|
|||
tdSql.checkRows(2)
|
||||
tdSql.checkData(0, 1, 1)
|
||||
tdSql.checkData(1, 1, 2)
|
||||
|
||||
tdSql.query("select ts,bottom(col1, 2),ts from test1")
|
||||
tdSql.checkRows(2)
|
||||
tdSql.checkData(0, 0, "2018-09-17 09:00:00.000")
|
||||
tdSql.checkData(0, 1, "2018-09-17 09:00:00.000")
|
||||
tdSql.checkData(1, 0, "2018-09-17 09:00:00.001")
|
||||
tdSql.checkData(1, 3, "2018-09-17 09:00:00.001")
|
||||
|
||||
|
||||
tdSql.query("select ts,bottom(col1, 2),ts from test group by tbname")
|
||||
tdSql.checkRows(2)
|
||||
tdSql.checkData(0, 0, "2018-09-17 09:00:00.000")
|
||||
tdSql.checkData(0, 1, "2018-09-17 09:00:00.000")
|
||||
tdSql.checkData(1, 0, "2018-09-17 09:00:00.001")
|
||||
tdSql.checkData(1, 3, "2018-09-17 09:00:00.001")
|
||||
|
||||
#TD-2457 bottom + interval + order by
|
||||
tdSql.error('select top(col2,1) from test interval(1y) order by col2;')
|
||||
|
|
|
@ -54,6 +54,29 @@ class TDTestCase:
|
|||
tdSql.query("select derivative(col, 10s, 0) from stb group by tbname")
|
||||
tdSql.checkRows(10)
|
||||
|
||||
tdSql.query("select ts,derivative(col, 10s, 1),ts from stb group by tbname")
|
||||
tdSql.checkRows(4)
|
||||
tdSql.checkData(0, 0, "2018-09-17 09:00:10.000")
|
||||
tdSql.checkData(0, 1, "2018-09-17 09:00:10.000")
|
||||
tdSql.checkData(0, 3, "2018-09-17 09:00:10.000")
|
||||
tdSql.checkData(3, 0, "2018-09-17 09:01:20.000")
|
||||
tdSql.checkData(3, 1, "2018-09-17 09:01:20.000")
|
||||
tdSql.checkData(3, 3, "2018-09-17 09:01:20.000")
|
||||
|
||||
tdSql.query("select ts,derivative(col, 10s, 1),ts from tb1")
|
||||
tdSql.checkRows(2)
|
||||
tdSql.checkData(0, 0, "2018-09-17 09:00:10.000")
|
||||
tdSql.checkData(0, 1, "2018-09-17 09:00:10.000")
|
||||
tdSql.checkData(0, 3, "2018-09-17 09:00:10.000")
|
||||
tdSql.checkData(1, 0, "2018-09-17 09:00:20.009")
|
||||
tdSql.checkData(1, 1, "2018-09-17 09:00:20.009")
|
||||
tdSql.checkData(1, 3, "2018-09-17 09:00:20.009")
|
||||
|
||||
|
||||
tdSql.query("select ts from(select ts,derivative(col, 10s, 0) from stb group by tbname)")
|
||||
|
||||
tdSql.checkData(0, 0, "2018-09-17 09:00:10.000")
|
||||
|
||||
tdSql.error("select derivative(col, 10s, 0) from tb1 group by tbname")
|
||||
|
||||
tdSql.query("select derivative(col, 10s, 1) from tb1")
|
||||
|
|
|
@ -95,6 +95,24 @@ class TDTestCase:
|
|||
tdSql.error("select diff(col14) from test")
|
||||
|
||||
|
||||
tdSql.query("select ts,diff(col1),ts from test1")
|
||||
tdSql.checkRows(10)
|
||||
tdSql.checkData(0, 0, "2018-09-17 09:00:00.000")
|
||||
tdSql.checkData(0, 1, "2018-09-17 09:00:00.000")
|
||||
tdSql.checkData(0, 3, "2018-09-17 09:00:00.000")
|
||||
tdSql.checkData(9, 0, "2018-09-17 09:00:00.009")
|
||||
tdSql.checkData(9, 1, "2018-09-17 09:00:00.009")
|
||||
tdSql.checkData(9, 3, "2018-09-17 09:00:00.009")
|
||||
|
||||
tdSql.query("select ts,diff(col1),ts from test group by tbname")
|
||||
tdSql.checkRows(10)
|
||||
tdSql.checkData(0, 0, "2018-09-17 09:00:00.000")
|
||||
tdSql.checkData(0, 1, "2018-09-17 09:00:00.000")
|
||||
tdSql.checkData(0, 3, "2018-09-17 09:00:00.000")
|
||||
tdSql.checkData(9, 0, "2018-09-17 09:00:00.009")
|
||||
tdSql.checkData(9, 1, "2018-09-17 09:00:00.009")
|
||||
tdSql.checkData(9, 3, "2018-09-17 09:00:00.009")
|
||||
|
||||
tdSql.query("select diff(col1) from test1")
|
||||
tdSql.checkRows(10)
|
||||
|
||||
|
|
|
@ -117,7 +117,22 @@ class TDTestCase:
|
|||
tdSql.checkRows(2)
|
||||
tdSql.checkData(0, 1, 8.1)
|
||||
tdSql.checkData(1, 1, 9.1)
|
||||
|
||||
|
||||
tdSql.query("select ts,top(col1, 2),ts from test1")
|
||||
tdSql.checkRows(2)
|
||||
tdSql.checkData(0, 0, "2018-09-17 09:00:00.008")
|
||||
tdSql.checkData(0, 1, "2018-09-17 09:00:00.008")
|
||||
tdSql.checkData(1, 0, "2018-09-17 09:00:00.009")
|
||||
tdSql.checkData(1, 3, "2018-09-17 09:00:00.009")
|
||||
|
||||
|
||||
tdSql.query("select ts,top(col1, 2),ts from test group by tbname")
|
||||
tdSql.checkRows(2)
|
||||
tdSql.checkData(0, 0, "2018-09-17 09:00:00.008")
|
||||
tdSql.checkData(0, 1, "2018-09-17 09:00:00.008")
|
||||
tdSql.checkData(1, 0, "2018-09-17 09:00:00.009")
|
||||
tdSql.checkData(1, 3, "2018-09-17 09:00:00.009")
|
||||
|
||||
#TD-2563 top + super_table + interval
|
||||
tdSql.execute("create table meters(ts timestamp, c int) tags (d int)")
|
||||
tdSql.execute("create table t1 using meters tags (1)")
|
||||
|
|
Loading…
Reference in New Issue