Merge pull request #15389 from taosdata/feature/3_liaohj

fix(query):disable merge for project operator in stream processing.
This commit is contained in:
Haojun Liao 2022-07-25 20:16:18 +08:00 committed by GitHub
commit b4bb80ba9d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 83 additions and 11 deletions

View File

@ -1395,10 +1395,26 @@ void tsdbCalcColDataSMA(SColData *pColData, SColumnDataAgg *pColAgg) {
break;
case TSDB_DATA_TYPE_BOOL:
break;
case TSDB_DATA_TYPE_TINYINT:
case TSDB_DATA_TYPE_TINYINT:{
pColAgg->sum += colVal.value.i8;
if (pColAgg->min > colVal.value.i8) {
pColAgg->min = colVal.value.i8;
}
if (pColAgg->max < colVal.value.i8) {
pColAgg->max = colVal.value.i8;
}
break;
case TSDB_DATA_TYPE_SMALLINT:
}
case TSDB_DATA_TYPE_SMALLINT:{
pColAgg->sum += colVal.value.i16;
if (pColAgg->min > colVal.value.i16) {
pColAgg->min = colVal.value.i16;
}
if (pColAgg->max < colVal.value.i16) {
pColAgg->max = colVal.value.i16;
}
break;
}
case TSDB_DATA_TYPE_INT: {
pColAgg->sum += colVal.value.i32;
if (pColAgg->min > colVal.value.i32) {
@ -1419,24 +1435,79 @@ void tsdbCalcColDataSMA(SColData *pColData, SColumnDataAgg *pColAgg) {
}
break;
}
case TSDB_DATA_TYPE_FLOAT:
case TSDB_DATA_TYPE_FLOAT:{
pColAgg->sum += colVal.value.f;
if (pColAgg->min > colVal.value.f) {
pColAgg->min = colVal.value.f;
}
if (pColAgg->max < colVal.value.f) {
pColAgg->max = colVal.value.f;
}
break;
case TSDB_DATA_TYPE_DOUBLE:
}
case TSDB_DATA_TYPE_DOUBLE:{
pColAgg->sum += colVal.value.d;
if (pColAgg->min > colVal.value.d) {
pColAgg->min = colVal.value.d;
}
if (pColAgg->max < colVal.value.d) {
pColAgg->max = colVal.value.d;
}
break;
}
case TSDB_DATA_TYPE_VARCHAR:
break;
case TSDB_DATA_TYPE_TIMESTAMP:
case TSDB_DATA_TYPE_TIMESTAMP:{
if (pColAgg->min > colVal.value.i64) {
pColAgg->min = colVal.value.i64;
}
if (pColAgg->max < colVal.value.i64) {
pColAgg->max = colVal.value.i64;
}
break;
}
case TSDB_DATA_TYPE_NCHAR:
break;
case TSDB_DATA_TYPE_UTINYINT:
case TSDB_DATA_TYPE_UTINYINT:{
pColAgg->sum += colVal.value.u8;
if (pColAgg->min > colVal.value.u8) {
pColAgg->min = colVal.value.u8;
}
if (pColAgg->max < colVal.value.u8) {
pColAgg->max = colVal.value.u8;
}
break;
case TSDB_DATA_TYPE_USMALLINT:
}
case TSDB_DATA_TYPE_USMALLINT:{
pColAgg->sum += colVal.value.u16;
if (pColAgg->min > colVal.value.u16) {
pColAgg->min = colVal.value.u16;
}
if (pColAgg->max < colVal.value.u16) {
pColAgg->max = colVal.value.u16;
}
break;
case TSDB_DATA_TYPE_UINT:
}
case TSDB_DATA_TYPE_UINT:{
pColAgg->sum += colVal.value.u32;
if (pColAgg->min > colVal.value.u32) {
pColAgg->min = colVal.value.u32;
}
if (pColAgg->max < colVal.value.u32) {
pColAgg->max = colVal.value.u32;
}
break;
case TSDB_DATA_TYPE_UBIGINT:
}
case TSDB_DATA_TYPE_UBIGINT:{
pColAgg->sum += colVal.value.u64;
if (pColAgg->min > colVal.value.u64) {
pColAgg->min = colVal.value.u64;
}
if (pColAgg->max < colVal.value.u64) {
pColAgg->max = colVal.value.u64;
}
break;
}
case TSDB_DATA_TYPE_JSON:
break;
case TSDB_DATA_TYPE_VARBINARY:

View File

@ -416,6 +416,7 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) {
}
if (isTaskKilled(pTaskInfo)) {
atomic_store_64(&pTaskInfo->owner, 0);
qDebug("%s already killed, abort", GET_TASKID(pTaskInfo));
return TSDB_CODE_SUCCESS;
}

View File

@ -69,7 +69,7 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys
// todo remove it soon
if (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM) {
pInfo->mergeDataBlocks = true;
pInfo->mergeDataBlocks = false;
}
int32_t numOfRows = 4096;

View File

@ -1532,7 +1532,7 @@ static int32_t translateDiff(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
}
uint8_t resType;
if (IS_SIGNED_NUMERIC_TYPE(colType) || TSDB_DATA_TYPE_BOOL == colType) {
if (IS_SIGNED_NUMERIC_TYPE(colType) || TSDB_DATA_TYPE_BOOL == colType || TSDB_DATA_TYPE_TIMESTAMP == colType) {
resType = TSDB_DATA_TYPE_BIGINT;
} else {
resType = TSDB_DATA_TYPE_DOUBLE;