From 90b3e77698231a1ebbc07e81934cc0ad8b83c2ea Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 25 Jul 2022 16:02:49 +0800 Subject: [PATCH] fix(query):disable merge for project operator in stream processing. --- source/dnode/vnode/src/tsdb/tsdbUtil.c | 89 +++++++++++++++++++--- source/libs/executor/src/executor.c | 1 + source/libs/executor/src/projectoperator.c | 2 +- source/libs/function/src/builtins.c | 2 +- 4 files changed, 83 insertions(+), 11 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index f318c69c6f..872357fc93 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -1395,10 +1395,26 @@ void tsdbCalcColDataSMA(SColData *pColData, SColumnDataAgg *pColAgg) { break; case TSDB_DATA_TYPE_BOOL: break; - case TSDB_DATA_TYPE_TINYINT: + case TSDB_DATA_TYPE_TINYINT:{ + pColAgg->sum += colVal.value.i8; + if (pColAgg->min > colVal.value.i8) { + pColAgg->min = colVal.value.i8; + } + if (pColAgg->max < colVal.value.i8) { + pColAgg->max = colVal.value.i8; + } break; - case TSDB_DATA_TYPE_SMALLINT: + } + case TSDB_DATA_TYPE_SMALLINT:{ + pColAgg->sum += colVal.value.i16; + if (pColAgg->min > colVal.value.i16) { + pColAgg->min = colVal.value.i16; + } + if (pColAgg->max < colVal.value.i16) { + pColAgg->max = colVal.value.i16; + } break; + } case TSDB_DATA_TYPE_INT: { pColAgg->sum += colVal.value.i32; if (pColAgg->min > colVal.value.i32) { @@ -1419,24 +1435,79 @@ void tsdbCalcColDataSMA(SColData *pColData, SColumnDataAgg *pColAgg) { } break; } - case TSDB_DATA_TYPE_FLOAT: + case TSDB_DATA_TYPE_FLOAT:{ + pColAgg->sum += colVal.value.f; + if (pColAgg->min > colVal.value.f) { + pColAgg->min = colVal.value.f; + } + if (pColAgg->max < colVal.value.f) { + pColAgg->max = colVal.value.f; + } break; - case TSDB_DATA_TYPE_DOUBLE: + } + case TSDB_DATA_TYPE_DOUBLE:{ + pColAgg->sum += colVal.value.d; + if (pColAgg->min > colVal.value.d) { + pColAgg->min = colVal.value.d; + } + if (pColAgg->max < colVal.value.d) { + pColAgg->max = colVal.value.d; + } break; + } case TSDB_DATA_TYPE_VARCHAR: break; - case TSDB_DATA_TYPE_TIMESTAMP: + case TSDB_DATA_TYPE_TIMESTAMP:{ + if (pColAgg->min > colVal.value.i64) { + pColAgg->min = colVal.value.i64; + } + if (pColAgg->max < colVal.value.i64) { + pColAgg->max = colVal.value.i64; + } break; + } case TSDB_DATA_TYPE_NCHAR: break; - case TSDB_DATA_TYPE_UTINYINT: + case TSDB_DATA_TYPE_UTINYINT:{ + pColAgg->sum += colVal.value.u8; + if (pColAgg->min > colVal.value.u8) { + pColAgg->min = colVal.value.u8; + } + if (pColAgg->max < colVal.value.u8) { + pColAgg->max = colVal.value.u8; + } break; - case TSDB_DATA_TYPE_USMALLINT: + } + case TSDB_DATA_TYPE_USMALLINT:{ + pColAgg->sum += colVal.value.u16; + if (pColAgg->min > colVal.value.u16) { + pColAgg->min = colVal.value.u16; + } + if (pColAgg->max < colVal.value.u16) { + pColAgg->max = colVal.value.u16; + } break; - case TSDB_DATA_TYPE_UINT: + } + case TSDB_DATA_TYPE_UINT:{ + pColAgg->sum += colVal.value.u32; + if (pColAgg->min > colVal.value.u32) { + pColAgg->min = colVal.value.u32; + } + if (pColAgg->max < colVal.value.u32) { + pColAgg->max = colVal.value.u32; + } break; - case TSDB_DATA_TYPE_UBIGINT: + } + case TSDB_DATA_TYPE_UBIGINT:{ + pColAgg->sum += colVal.value.u64; + if (pColAgg->min > colVal.value.u64) { + pColAgg->min = colVal.value.u64; + } + if (pColAgg->max < colVal.value.u64) { + pColAgg->max = colVal.value.u64; + } break; + } case TSDB_DATA_TYPE_JSON: break; case TSDB_DATA_TYPE_VARBINARY: diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index d8cd76d31e..f249321a76 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -416,6 +416,7 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) { } if (isTaskKilled(pTaskInfo)) { + atomic_store_64(&pTaskInfo->owner, 0); qDebug("%s already killed, abort", GET_TASKID(pTaskInfo)); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c index 34149d7499..3f3df6a8f0 100644 --- a/source/libs/executor/src/projectoperator.c +++ b/source/libs/executor/src/projectoperator.c @@ -69,7 +69,7 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys // todo remove it soon if (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM) { - pInfo->mergeDataBlocks = true; + pInfo->mergeDataBlocks = false; } int32_t numOfRows = 4096; diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 324a17320e..43a5d19ada 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -1532,7 +1532,7 @@ static int32_t translateDiff(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { } uint8_t resType; - if (IS_SIGNED_NUMERIC_TYPE(colType) || TSDB_DATA_TYPE_BOOL == colType) { + if (IS_SIGNED_NUMERIC_TYPE(colType) || TSDB_DATA_TYPE_BOOL == colType || TSDB_DATA_TYPE_TIMESTAMP == colType) { resType = TSDB_DATA_TYPE_BIGINT; } else { resType = TSDB_DATA_TYPE_DOUBLE;