[td-13039] fix bug in agg query of ordinary table.
This commit is contained in:
parent
dc01600a19
commit
ddbe4095ed
|
@ -256,7 +256,6 @@ typedef struct SFunctParam {
|
||||||
// the structure for sql function in select clause
|
// the structure for sql function in select clause
|
||||||
typedef struct SExprBasicInfo {
|
typedef struct SExprBasicInfo {
|
||||||
SSchema resSchema; // TODO refactor
|
SSchema resSchema; // TODO refactor
|
||||||
int32_t interBytes; // inter result buffer size, TODO remove it
|
|
||||||
int16_t numOfParams; // argument value of each function
|
int16_t numOfParams; // argument value of each function
|
||||||
SFunctParam *pParam;
|
SFunctParam *pParam;
|
||||||
// SVariant param[3]; // parameters are not more than 3
|
// SVariant param[3]; // parameters are not more than 3
|
||||||
|
|
|
@ -92,10 +92,11 @@ typedef struct SResultRowPool {
|
||||||
struct STaskAttr;
|
struct STaskAttr;
|
||||||
struct STaskRuntimeEnv;
|
struct STaskRuntimeEnv;
|
||||||
struct SUdfInfo;
|
struct SUdfInfo;
|
||||||
|
struct SqlFunctionCtx;
|
||||||
|
|
||||||
int32_t getOutputInterResultBufSize(struct STaskAttr* pQueryAttr);
|
int32_t getOutputInterResultBufSize(struct STaskAttr* pQueryAttr);
|
||||||
|
|
||||||
size_t getResultRowSize(SArray* pExprInfo);
|
size_t getResultRowSize(struct SqlFunctionCtx* pCtx, int32_t numOfOutput);
|
||||||
int32_t initResultRowInfo(SResultRowInfo* pResultRowInfo, int32_t size);
|
int32_t initResultRowInfo(SResultRowInfo* pResultRowInfo, int32_t size);
|
||||||
void cleanupResultRowInfo(SResultRowInfo* pResultRowInfo);
|
void cleanupResultRowInfo(SResultRowInfo* pResultRowInfo);
|
||||||
|
|
||||||
|
|
|
@ -469,12 +469,14 @@ typedef struct SOptrBasicInfo {
|
||||||
int32_t capacity;
|
int32_t capacity;
|
||||||
} SOptrBasicInfo;
|
} SOptrBasicInfo;
|
||||||
|
|
||||||
|
//TODO move the resultrowsiz together with SOptrBasicInfo:rowCellInfoOffset
|
||||||
typedef struct SAggSupporter {
|
typedef struct SAggSupporter {
|
||||||
SHashObj* pResultRowHashTable; // quick locate the window object for each result
|
SHashObj* pResultRowHashTable; // quick locate the window object for each result
|
||||||
SHashObj* pResultRowListSet; // used to check if current ResultRowInfo has ResultRow object or not
|
SHashObj* pResultRowListSet; // used to check if current ResultRowInfo has ResultRow object or not
|
||||||
SArray* pResultRowArrayList; // The array list that contains the Result rows
|
SArray* pResultRowArrayList; // The array list that contains the Result rows
|
||||||
char* keyBuf; // window key buffer
|
char* keyBuf; // window key buffer
|
||||||
SResultRowPool *pool; // The window result objects pool, all the resultRow Objects are allocated and managed by this object.
|
SResultRowPool *pool; // The window result objects pool, all the resultRow Objects are allocated and managed by this object.
|
||||||
|
int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row
|
||||||
} SAggSupporter;
|
} SAggSupporter;
|
||||||
|
|
||||||
typedef struct STableIntervalOperatorInfo {
|
typedef struct STableIntervalOperatorInfo {
|
||||||
|
|
|
@ -46,7 +46,7 @@ int32_t getOutputInterResultBufSize(STaskAttr* pQueryAttr) {
|
||||||
int32_t size = 0;
|
int32_t size = 0;
|
||||||
|
|
||||||
for (int32_t i = 0; i < pQueryAttr->numOfOutput; ++i) {
|
for (int32_t i = 0; i < pQueryAttr->numOfOutput; ++i) {
|
||||||
size += pQueryAttr->pExpr1[i].base.interBytes;
|
// size += pQueryAttr->pExpr1[i].base.interBytes;
|
||||||
}
|
}
|
||||||
|
|
||||||
assert(size >= 0);
|
assert(size >= 0);
|
||||||
|
@ -172,9 +172,14 @@ SResultRowEntryInfo* getResultCell(const SResultRow* pRow, int32_t index, int32_
|
||||||
return (SResultRowEntryInfo*)((char*) pRow->pEntryInfo + offset[index]);
|
return (SResultRowEntryInfo*)((char*) pRow->pEntryInfo + offset[index]);
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t getResultRowSize(SArray* pExprInfo) {
|
size_t getResultRowSize(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
|
||||||
size_t numOfOutput = taosArrayGetSize(pExprInfo);
|
int32_t rowSize = (numOfOutput * sizeof(SResultRowEntryInfo)) + sizeof(SResultRow);
|
||||||
return (numOfOutput * sizeof(SResultRowEntryInfo)) + /*pQueryAttr->interBufSize +*/ sizeof(SResultRow);
|
|
||||||
|
for(int32_t i = 0; i < numOfOutput; ++i) {
|
||||||
|
rowSize += pCtx[i].resDataInfo.interBufSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
return rowSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
SResultRowPool* initResultRowPool(size_t size) {
|
SResultRowPool* initResultRowPool(size_t size) {
|
||||||
|
|
|
@ -1970,7 +1970,7 @@ static SqlFunctionCtx* createSqlFunctionCtx(STaskRuntimeEnv* pRuntimeEnv, SExprI
|
||||||
pCtx->order = pQueryAttr->order.order;
|
pCtx->order = pQueryAttr->order.order;
|
||||||
// pCtx->functionId = pFunct->functionId;
|
// pCtx->functionId = pFunct->functionId;
|
||||||
pCtx->stableQuery = pQueryAttr->stableQuery;
|
pCtx->stableQuery = pQueryAttr->stableQuery;
|
||||||
pCtx->resDataInfo.interBufSize = pFunct->interBytes;
|
// pCtx->resDataInfo.interBufSize = pFunct->interBytes;
|
||||||
pCtx->start.key = INT64_MIN;
|
pCtx->start.key = INT64_MIN;
|
||||||
pCtx->end.key = INT64_MIN;
|
pCtx->end.key = INT64_MIN;
|
||||||
|
|
||||||
|
@ -2052,7 +2052,7 @@ static SqlFunctionCtx* createSqlFunctionCtx_rv(SArray* pExprInfo, int32_t** rowC
|
||||||
SExprBasicInfo *pFunct = &pExpr->base;
|
SExprBasicInfo *pFunct = &pExpr->base;
|
||||||
SqlFunctionCtx* pCtx = &pFuncCtx[i];
|
SqlFunctionCtx* pCtx = &pFuncCtx[i];
|
||||||
|
|
||||||
fmGetFuncExecFuncs(pExpr->pExpr->_function.functionId, &pCtx->fpSet);
|
fmGetFuncExecFuncs(pExpr->pExpr->_function.pFunctNode->funcId, &pCtx->fpSet);
|
||||||
pCtx->input.numOfInputCols = pFunct->numOfParams;
|
pCtx->input.numOfInputCols = pFunct->numOfParams;
|
||||||
|
|
||||||
pCtx->input.pData = calloc(pFunct->numOfParams, POINTER_BYTES);
|
pCtx->input.pData = calloc(pFunct->numOfParams, POINTER_BYTES);
|
||||||
|
@ -2062,8 +2062,6 @@ static SqlFunctionCtx* createSqlFunctionCtx_rv(SArray* pExprInfo, int32_t** rowC
|
||||||
pCtx->resDataInfo.bytes = pFunct->resSchema.bytes;
|
pCtx->resDataInfo.bytes = pFunct->resSchema.bytes;
|
||||||
pCtx->resDataInfo.type = pFunct->resSchema.type;
|
pCtx->resDataInfo.type = pFunct->resSchema.type;
|
||||||
pCtx->order = TSDB_ORDER_ASC;
|
pCtx->order = TSDB_ORDER_ASC;
|
||||||
// pCtx->functionId = pExpr->pExpr->_function.pFunctNode->;//TODO remove it
|
|
||||||
pCtx->stableQuery = false; // TODO
|
|
||||||
pCtx->start.key = INT64_MIN;
|
pCtx->start.key = INT64_MIN;
|
||||||
pCtx->end.key = INT64_MIN;
|
pCtx->end.key = INT64_MIN;
|
||||||
|
|
||||||
|
@ -2120,8 +2118,7 @@ static SqlFunctionCtx* createSqlFunctionCtx_rv(SArray* pExprInfo, int32_t** rowC
|
||||||
}
|
}
|
||||||
|
|
||||||
for(int32_t i = 1; i < numOfOutput; ++i) {
|
for(int32_t i = 1; i < numOfOutput; ++i) {
|
||||||
SExprInfo* pExpr = taosArrayGetP(pExprInfo, i - 1);
|
(*rowCellInfoOffset)[i] = (int32_t)((*rowCellInfoOffset)[i - 1] + sizeof(SResultRowEntryInfo) + pFuncCtx[i].resDataInfo.interBufSize);
|
||||||
(*rowCellInfoOffset)[i] = (int32_t)((*rowCellInfoOffset)[i - 1] + sizeof(SResultRowEntryInfo) + pExpr->base.interBytes);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
setCtxTagColumnInfo(pFuncCtx, numOfOutput);
|
setCtxTagColumnInfo(pFuncCtx, numOfOutput);
|
||||||
|
@ -3347,15 +3344,11 @@ void setFunctionResultOutput(SOptrBasicInfo* pInfo, SAggSupporter* pSup, int32_t
|
||||||
for (int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) {
|
for (int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) {
|
||||||
SColumnInfoData* pData = taosArrayGet(pDataBlock->pDataBlock, i);
|
SColumnInfoData* pData = taosArrayGet(pDataBlock->pDataBlock, i);
|
||||||
|
|
||||||
/*
|
|
||||||
* set the output buffer information and intermediate buffer
|
|
||||||
* not all queries require the interResultBuf, such as COUNT/TAGPRJ/PRJ/TAG etc.
|
|
||||||
*/
|
|
||||||
struct SResultRowEntryInfo* pEntry = getResultCell(pRow, i, rowCellInfoOffset);
|
struct SResultRowEntryInfo* pEntry = getResultCell(pRow, i, rowCellInfoOffset);
|
||||||
cleanupResultRowEntry(pEntry);
|
cleanupResultRowEntry(pEntry);
|
||||||
|
|
||||||
pCtx[i].resultInfo = pEntry;
|
pCtx[i].resultInfo = pEntry;
|
||||||
pCtx[i].pOutput = pData->pData;
|
pCtx[i].pOutput = pData->pData; // todo remove it
|
||||||
pCtx[i].currentStage = stage;
|
pCtx[i].currentStage = stage;
|
||||||
|
|
||||||
// set the timestamp output buffer for top/bottom/diff query
|
// set the timestamp output buffer for top/bottom/diff query
|
||||||
|
@ -5663,7 +5656,7 @@ SArray* getResultGroupCheckColumns(STaskAttr* pQuery) {
|
||||||
return pOrderColumns;
|
return pOrderColumns;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SArray* pExprInfo);
|
static int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx *pCtx, int32_t numOfOutput);
|
||||||
static void clearupAggSup(SAggSupporter* pAggSup);
|
static void clearupAggSup(SAggSupporter* pAggSup);
|
||||||
|
|
||||||
static void destroySortedMergeOperatorInfo(void* param, int32_t numOfOutput) {
|
static void destroySortedMergeOperatorInfo(void* param, int32_t numOfOutput) {
|
||||||
|
@ -6044,7 +6037,7 @@ SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t code = doInitAggInfoSup(&pInfo->aggSup, pExprInfo);
|
int32_t code = doInitAggInfoSup(&pInfo->aggSup, pInfo->binfo.pCtx, numOfOutput);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
@ -7085,13 +7078,14 @@ static void destroyOperatorInfo(SOperatorInfo* pOperator) {
|
||||||
tfree(pOperator);
|
tfree(pOperator);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SArray* pExprInfo) {
|
int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx *pCtx, int32_t numOfOutput) {
|
||||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||||
|
|
||||||
|
pAggSup->resultRowSize = getResultRowSize(pCtx, numOfOutput);
|
||||||
pAggSup->keyBuf = calloc(1, sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES);
|
pAggSup->keyBuf = calloc(1, sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES);
|
||||||
pAggSup->pResultRowHashTable = taosHashInit(10, hashFn, true, HASH_NO_LOCK);
|
pAggSup->pResultRowHashTable = taosHashInit(10, hashFn, true, HASH_NO_LOCK);
|
||||||
pAggSup->pResultRowListSet = taosHashInit(100, hashFn, false, HASH_NO_LOCK);
|
pAggSup->pResultRowListSet = taosHashInit(100, hashFn, false, HASH_NO_LOCK);
|
||||||
pAggSup->pool = initResultRowPool(getResultRowSize(pExprInfo));
|
pAggSup->pool = initResultRowPool(pAggSup->resultRowSize);
|
||||||
pAggSup->pResultRowArrayList = taosArrayInit(10, sizeof(SResultRowCell));
|
pAggSup->pResultRowArrayList = taosArrayInit(10, sizeof(SResultRowCell));
|
||||||
|
|
||||||
if (pAggSup->keyBuf == NULL || pAggSup->pResultRowArrayList == NULL || pAggSup->pResultRowListSet == NULL ||
|
if (pAggSup->keyBuf == NULL || pAggSup->pResultRowArrayList == NULL || pAggSup->pResultRowListSet == NULL ||
|
||||||
|
@ -7115,7 +7109,7 @@ static int32_t initAggInfo(SAggOperatorInfo* pInfo, SArray* pExprInfo, int32_t n
|
||||||
pInfo->binfo.pRes = pResultBlock;
|
pInfo->binfo.pRes = pResultBlock;
|
||||||
pInfo->binfo.capacity = numOfRows;
|
pInfo->binfo.capacity = numOfRows;
|
||||||
|
|
||||||
doInitAggInfoSup(&pInfo->aggSup, pExprInfo);
|
doInitAggInfoSup(&pInfo->aggSup, pInfo->binfo.pCtx, taosArrayGetSize(pExprInfo));
|
||||||
pInfo->pTableQueryInfo = calloc(pTableGroupInfo->numOfTables, sizeof(STableQueryInfo));
|
pInfo->pTableQueryInfo = calloc(pTableGroupInfo->numOfTables, sizeof(STableQueryInfo));
|
||||||
|
|
||||||
int32_t index = 0;
|
int32_t index = 0;
|
||||||
|
@ -7353,14 +7347,15 @@ SOperatorInfo* createLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorIn
|
||||||
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SInterval* pInterval, SExecTaskInfo* pTaskInfo) {
|
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SInterval* pInterval, SExecTaskInfo* pTaskInfo) {
|
||||||
STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo));
|
STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo));
|
||||||
|
|
||||||
doInitAggInfoSup(&pInfo->aggSup, pExprInfo);
|
size_t numOfOutput = taosArrayGetSize(pExprInfo);
|
||||||
|
doInitAggInfoSup(&pInfo->aggSup, pInfo->binfo.pCtx, numOfOutput);
|
||||||
|
|
||||||
pInfo->order = TSDB_ORDER_ASC;
|
pInfo->order = TSDB_ORDER_ASC;
|
||||||
pInfo->precision = TSDB_TIME_PRECISION_MICRO;
|
pInfo->precision = TSDB_TIME_PRECISION_MICRO;
|
||||||
pInfo->win = pTaskInfo->window;
|
pInfo->win = pTaskInfo->window;
|
||||||
pInfo->interval = *pInterval;
|
pInfo->interval = *pInterval;
|
||||||
|
|
||||||
int32_t code = createDiskbasedBuf(&pInfo->pResultBuf, 4096, 4096 * 256, pTaskInfo->id.str, "/tmp/");
|
int32_t code = createDiskbasedBuf(&pInfo->pResultBuf, 4096, 4096 * 256, pTaskInfo->id.str, "/tmp/");
|
||||||
|
|
||||||
pInfo->binfo.pCtx = createSqlFunctionCtx_rv(pExprInfo, &pInfo->binfo.rowCellInfoOffset);
|
pInfo->binfo.pCtx = createSqlFunctionCtx_rv(pExprInfo, &pInfo->binfo.rowCellInfoOffset);
|
||||||
pInfo->binfo.pRes = createOutputBuf_rv(pExprInfo, pInfo->binfo.capacity);
|
pInfo->binfo.pRes = createOutputBuf_rv(pExprInfo, pInfo->binfo.capacity);
|
||||||
|
@ -8039,8 +8034,8 @@ SArray* createExprInfo(SAggPhysiNode* pPhyNode, int32_t* resultRowSize) {
|
||||||
pExp->base.pParam[0].pCol = calloc(1, sizeof(SColumn));
|
pExp->base.pParam[0].pCol = calloc(1, sizeof(SColumn));
|
||||||
SColumn* pCol = pExp->base.pParam[0].pCol;
|
SColumn* pCol = pExp->base.pParam[0].pCol;
|
||||||
|
|
||||||
ASSERT(LIST_LENGTH(pPhyNode->pAggFuncs) == 1);
|
STargetNode* pTargetNode = (STargetNode*) nodesListGetNode(pPhyNode->pAggFuncs, i);
|
||||||
STargetNode* pTargetNode = (STargetNode*) nodesListGetNode(pPhyNode->pAggFuncs, 0);
|
ASSERT(pTargetNode->slotId == i);
|
||||||
|
|
||||||
SFunctionNode* pFuncNode = (SFunctionNode*)pTargetNode->pExpr;
|
SFunctionNode* pFuncNode = (SFunctionNode*)pTargetNode->pExpr;
|
||||||
pExp->base.resSchema = createSchema(pFuncNode->node.resType.type, pFuncNode->node.resType.bytes, pTargetNode->slotId, pFuncNode->node.aliasName);
|
pExp->base.resSchema = createSchema(pFuncNode->node.resType.type, pFuncNode->node.resType.bytes, pTargetNode->slotId, pFuncNode->node.aliasName);
|
||||||
|
|
|
@ -44,7 +44,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
||||||
{
|
{
|
||||||
.name = "min",
|
.name = "min",
|
||||||
.type = FUNCTION_TYPE_MIN,
|
.type = FUNCTION_TYPE_MIN,
|
||||||
.classification = FUNC_MGT_NONSTANDARD_SQL_FUNC,
|
.classification = FUNC_MGT_AGG_FUNC,
|
||||||
.checkFunc = stubCheckAndGetResultType,
|
.checkFunc = stubCheckAndGetResultType,
|
||||||
.getEnvFunc = getMinmaxFuncEnv,
|
.getEnvFunc = getMinmaxFuncEnv,
|
||||||
.initFunc = minFunctionSetup,
|
.initFunc = minFunctionSetup,
|
||||||
|
@ -54,7 +54,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
||||||
{
|
{
|
||||||
.name = "max",
|
.name = "max",
|
||||||
.type = FUNCTION_TYPE_MAX,
|
.type = FUNCTION_TYPE_MAX,
|
||||||
.classification = FUNC_MGT_NONSTANDARD_SQL_FUNC,
|
.classification = FUNC_MGT_AGG_FUNC,
|
||||||
.checkFunc = stubCheckAndGetResultType,
|
.checkFunc = stubCheckAndGetResultType,
|
||||||
.getEnvFunc = getMinmaxFuncEnv,
|
.getEnvFunc = getMinmaxFuncEnv,
|
||||||
.initFunc = maxFunctionSetup,
|
.initFunc = maxFunctionSetup,
|
||||||
|
@ -78,8 +78,33 @@ const int32_t funcMgtBuiltinsNum = (sizeof(funcMgtBuiltins) / sizeof(SBuiltinFun
|
||||||
int32_t stubCheckAndGetResultType(SFunctionNode* pFunc) {
|
int32_t stubCheckAndGetResultType(SFunctionNode* pFunc) {
|
||||||
switch(pFunc->funcType) {
|
switch(pFunc->funcType) {
|
||||||
case FUNCTION_TYPE_COUNT: pFunc->node.resType = (SDataType){.bytes = sizeof(int64_t), .type = TSDB_DATA_TYPE_BIGINT};break;
|
case FUNCTION_TYPE_COUNT: pFunc->node.resType = (SDataType){.bytes = sizeof(int64_t), .type = TSDB_DATA_TYPE_BIGINT};break;
|
||||||
default:
|
case FUNCTION_TYPE_SUM: {
|
||||||
|
SColumnNode* pParam = nodesListGetNode(pFunc->pParameterList, 0);
|
||||||
|
int32_t paraType = pParam->node.resType.type;
|
||||||
|
|
||||||
|
int32_t resType = 0;
|
||||||
|
if (IS_SIGNED_NUMERIC_TYPE(paraType)) {
|
||||||
|
resType = TSDB_DATA_TYPE_BIGINT;
|
||||||
|
} else if (IS_UNSIGNED_NUMERIC_TYPE(paraType)) {
|
||||||
|
resType = TSDB_DATA_TYPE_UBIGINT;
|
||||||
|
} else if (IS_FLOAT_TYPE(paraType)) {
|
||||||
|
resType = TSDB_DATA_TYPE_DOUBLE;
|
||||||
|
} else {
|
||||||
|
ASSERT(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
pFunc->node.resType = (SDataType) { .bytes = tDataTypes[resType].bytes, .type = resType };
|
||||||
break;
|
break;
|
||||||
|
}
|
||||||
|
case FUNCTION_TYPE_MIN:
|
||||||
|
case FUNCTION_TYPE_MAX: {
|
||||||
|
SColumnNode* pParam = nodesListGetNode(pFunc->pParameterList, 0);
|
||||||
|
int32_t paraType = pParam->node.resType.type;
|
||||||
|
pFunc->node.resType = (SDataType) { .bytes = tDataTypes[paraType].bytes, .type = paraType };
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
ASSERT(0); // to found the fault ASAP.
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
|
|
@ -14,7 +14,7 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "builtinsimpl.h"
|
#include "builtinsimpl.h"
|
||||||
#include <querynodes.h>
|
#include "querynodes.h"
|
||||||
#include "taggfunction.h"
|
#include "taggfunction.h"
|
||||||
#include "tdatablock.h"
|
#include "tdatablock.h"
|
||||||
|
|
||||||
|
@ -123,17 +123,18 @@ void sumFunction(SqlFunctionCtx *pCtx) {
|
||||||
SColumnDataAgg *pAgg = pInput->pColumnDataAgg[0];
|
SColumnDataAgg *pAgg = pInput->pColumnDataAgg[0];
|
||||||
int32_t type = pInput->pData[0]->info.type;
|
int32_t type = pInput->pData[0]->info.type;
|
||||||
|
|
||||||
|
SSumRes* pSumRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
||||||
|
|
||||||
if (pInput->colDataAggIsSet) {
|
if (pInput->colDataAggIsSet) {
|
||||||
numOfElem = pInput->numOfRows - pAgg->numOfNull;
|
numOfElem = pInput->numOfRows - pAgg->numOfNull;
|
||||||
ASSERT(numOfElem >= 0);
|
ASSERT(numOfElem >= 0);
|
||||||
|
|
||||||
SSumRes* pSumInfo = (SSumRes*) pCtx->pOutput;
|
|
||||||
if (IS_SIGNED_NUMERIC_TYPE(type)) {
|
if (IS_SIGNED_NUMERIC_TYPE(type)) {
|
||||||
pSumInfo->isum += pAgg->sum;
|
pSumRes->isum += pAgg->sum;
|
||||||
} else if (IS_UNSIGNED_NUMERIC_TYPE(type)) {
|
} else if (IS_UNSIGNED_NUMERIC_TYPE(type)) {
|
||||||
pSumInfo->usum += pAgg->sum;
|
pSumRes->usum += pAgg->sum;
|
||||||
} else if (IS_FLOAT_TYPE(type)) {
|
} else if (IS_FLOAT_TYPE(type)) {
|
||||||
pSumInfo->dsum += GET_DOUBLE_VAL((const char*)&(pAgg->sum));
|
pSumRes->dsum += GET_DOUBLE_VAL((const char*)&(pAgg->sum));
|
||||||
}
|
}
|
||||||
} else { // computing based on the true data block
|
} else { // computing based on the true data block
|
||||||
SColumnInfoData* pCol = pInput->pData[0];
|
SColumnInfoData* pCol = pInput->pData[0];
|
||||||
|
@ -141,32 +142,30 @@ void sumFunction(SqlFunctionCtx *pCtx) {
|
||||||
int32_t start = pInput->startRowIndex;
|
int32_t start = pInput->startRowIndex;
|
||||||
int32_t numOfRows = pInput->numOfRows;
|
int32_t numOfRows = pInput->numOfRows;
|
||||||
|
|
||||||
SSumRes* pSum = (SSumRes*) pCtx->pOutput;
|
if (IS_SIGNED_NUMERIC_TYPE(type)) {
|
||||||
|
if (type == TSDB_DATA_TYPE_TINYINT) {
|
||||||
if (IS_SIGNED_NUMERIC_TYPE(pCtx->inputType)) {
|
LIST_ADD_N(pSumRes->isum, pCol, start, numOfRows, int8_t, numOfElem);
|
||||||
if (pCtx->inputType == TSDB_DATA_TYPE_TINYINT) {
|
} else if (type == TSDB_DATA_TYPE_SMALLINT) {
|
||||||
LIST_ADD_N(pSum->isum, pCol, start, numOfRows, int8_t, numOfElem);
|
LIST_ADD_N(pSumRes->isum, pCol, start, numOfRows, int16_t, numOfElem);
|
||||||
} else if (pCtx->inputType == TSDB_DATA_TYPE_SMALLINT) {
|
} else if (type == TSDB_DATA_TYPE_INT) {
|
||||||
LIST_ADD_N(pSum->isum, pCol, start, numOfRows, int16_t, numOfElem);
|
LIST_ADD_N(pSumRes->isum, pCol, start, numOfRows, int32_t, numOfElem);
|
||||||
} else if (pCtx->inputType == TSDB_DATA_TYPE_INT) {
|
} else if (type == TSDB_DATA_TYPE_BIGINT) {
|
||||||
LIST_ADD_N(pSum->isum, pCol, start, numOfRows, int32_t, numOfElem);
|
LIST_ADD_N(pSumRes->isum, pCol, start, numOfRows, int64_t, numOfElem);
|
||||||
} else if (pCtx->inputType == TSDB_DATA_TYPE_BIGINT) {
|
|
||||||
LIST_ADD_N(pSum->isum, pCol, start, numOfRows, int64_t, numOfElem);
|
|
||||||
}
|
}
|
||||||
} else if (IS_UNSIGNED_NUMERIC_TYPE(pCtx->inputType)) {
|
} else if (IS_UNSIGNED_NUMERIC_TYPE(type)) {
|
||||||
if (pCtx->inputType == TSDB_DATA_TYPE_UTINYINT) {
|
if (type == TSDB_DATA_TYPE_UTINYINT) {
|
||||||
LIST_ADD_N(pSum->usum, pCol, start, numOfRows, uint8_t, numOfElem);
|
LIST_ADD_N(pSumRes->usum, pCol, start, numOfRows, uint8_t, numOfElem);
|
||||||
} else if (pCtx->inputType == TSDB_DATA_TYPE_USMALLINT) {
|
} else if (type == TSDB_DATA_TYPE_USMALLINT) {
|
||||||
LIST_ADD_N(pSum->usum, pCol, start, numOfRows, uint16_t, numOfElem);
|
LIST_ADD_N(pSumRes->usum, pCol, start, numOfRows, uint16_t, numOfElem);
|
||||||
} else if (pCtx->inputType == TSDB_DATA_TYPE_UINT) {
|
} else if (type == TSDB_DATA_TYPE_UINT) {
|
||||||
LIST_ADD_N(pSum->usum, pCol, start, numOfRows, uint32_t, numOfElem);
|
LIST_ADD_N(pSumRes->usum, pCol, start, numOfRows, uint32_t, numOfElem);
|
||||||
} else if (pCtx->inputType == TSDB_DATA_TYPE_UBIGINT) {
|
} else if (type == TSDB_DATA_TYPE_UBIGINT) {
|
||||||
LIST_ADD_N(pSum->usum, pCol, start, numOfRows, uint64_t, numOfElem);
|
LIST_ADD_N(pSumRes->usum, pCol, start, numOfRows, uint64_t, numOfElem);
|
||||||
}
|
}
|
||||||
} else if (pCtx->inputType == TSDB_DATA_TYPE_DOUBLE) {
|
} else if (type == TSDB_DATA_TYPE_DOUBLE) {
|
||||||
LIST_ADD_N(pSum->dsum, pCol, start, numOfRows, double, numOfElem);
|
LIST_ADD_N(pSumRes->dsum, pCol, start, numOfRows, double, numOfElem);
|
||||||
} else if (pCtx->inputType == TSDB_DATA_TYPE_FLOAT) {
|
} else if (type == TSDB_DATA_TYPE_FLOAT) {
|
||||||
LIST_ADD_N(pSum->dsum, pCol, start, numOfRows, float, numOfElem);
|
LIST_ADD_N(pSumRes->dsum, pCol, start, numOfRows, float, numOfElem);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -179,14 +178,13 @@ bool getSumFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
bool maxFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo) {
|
bool maxFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo) {
|
||||||
if (!functionSetup(pCtx, pResultInfo)) {
|
if (!functionSetup(pCtx, pResultInfo)) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
char* buf = GET_ROWCELL_INTERBUF(pResultInfo);
|
char* buf = GET_ROWCELL_INTERBUF(pResultInfo);
|
||||||
switch (pCtx->input.pData[0]->info.type) {
|
switch (pCtx->resDataInfo.type) {
|
||||||
case TSDB_DATA_TYPE_INT:
|
case TSDB_DATA_TYPE_INT:
|
||||||
*((int32_t *)buf) = INT32_MIN;
|
*((int32_t *)buf) = INT32_MIN;
|
||||||
break;
|
break;
|
||||||
|
@ -229,7 +227,7 @@ bool minFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo) {
|
||||||
}
|
}
|
||||||
|
|
||||||
char* buf = GET_ROWCELL_INTERBUF(pResultInfo);
|
char* buf = GET_ROWCELL_INTERBUF(pResultInfo);
|
||||||
switch (pCtx->input.pData[0]->info.type) {
|
switch (pCtx->resDataInfo.type) {
|
||||||
case TSDB_DATA_TYPE_TINYINT:
|
case TSDB_DATA_TYPE_TINYINT:
|
||||||
*((int8_t *)buf) = INT8_MAX;
|
*((int8_t *)buf) = INT8_MAX;
|
||||||
break;
|
break;
|
||||||
|
@ -374,13 +372,13 @@ int32_t doMinMaxHelper(SqlFunctionCtx *pCtx, int32_t isMinFunc) {
|
||||||
__ctx->fpSet.process(__ctx);
|
__ctx->fpSet.process(__ctx);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else if (IS_UNSIGNED_NUMERIC_TYPE(pCtx->inputType)) {
|
} else if (IS_UNSIGNED_NUMERIC_TYPE(type)) {
|
||||||
uint64_t val = GET_UINT64_VAL(tval);
|
uint64_t val = GET_UINT64_VAL(tval);
|
||||||
UPDATE_DATA(pCtx, *(uint64_t*)buf, val, numOfElems, isMinFunc, key);
|
UPDATE_DATA(pCtx, *(uint64_t*)buf, val, numOfElems, isMinFunc, key);
|
||||||
} else if (pCtx->inputType == TSDB_DATA_TYPE_DOUBLE) {
|
} else if (type == TSDB_DATA_TYPE_DOUBLE) {
|
||||||
double val = GET_DOUBLE_VAL(tval);
|
double val = GET_DOUBLE_VAL(tval);
|
||||||
UPDATE_DATA(pCtx, *(double*)buf, val, numOfElems, isMinFunc, key);
|
UPDATE_DATA(pCtx, *(double*)buf, val, numOfElems, isMinFunc, key);
|
||||||
} else if (pCtx->inputType == TSDB_DATA_TYPE_FLOAT) {
|
} else if (type == TSDB_DATA_TYPE_FLOAT) {
|
||||||
double val = GET_DOUBLE_VAL(tval);
|
double val = GET_DOUBLE_VAL(tval);
|
||||||
UPDATE_DATA(pCtx, *(float*)buf, (float)val, numOfElems, isMinFunc, key);
|
UPDATE_DATA(pCtx, *(float*)buf, (float)val, numOfElems, isMinFunc, key);
|
||||||
}
|
}
|
||||||
|
@ -391,14 +389,14 @@ int32_t doMinMaxHelper(SqlFunctionCtx *pCtx, int32_t isMinFunc) {
|
||||||
int32_t start = pInput->startRowIndex;
|
int32_t start = pInput->startRowIndex;
|
||||||
int32_t numOfRows = pInput->numOfRows;
|
int32_t numOfRows = pInput->numOfRows;
|
||||||
|
|
||||||
if (IS_SIGNED_NUMERIC_TYPE(pCtx->inputType)) {
|
if (IS_SIGNED_NUMERIC_TYPE(type)) {
|
||||||
if (pCtx->inputType == TSDB_DATA_TYPE_TINYINT) {
|
if (type == TSDB_DATA_TYPE_TINYINT) {
|
||||||
LOOPCHECK_N(*(int64_t*)buf, pCol, pCtx, int8_t, numOfRows, start, isMinFunc, numOfElems);
|
LOOPCHECK_N(*(int8_t*)buf, pCol, pCtx, int8_t, numOfRows, start, isMinFunc, numOfElems);
|
||||||
} else if (pCtx->inputType == TSDB_DATA_TYPE_SMALLINT) {
|
} else if (type == TSDB_DATA_TYPE_SMALLINT) {
|
||||||
LOOPCHECK_N(*(int64_t*) buf, pCol, pCtx, int16_t, numOfRows, start, isMinFunc, numOfElems);
|
LOOPCHECK_N(*(int16_t*) buf, pCol, pCtx, int16_t, numOfRows, start, isMinFunc, numOfElems);
|
||||||
} else if (pCtx->inputType == TSDB_DATA_TYPE_INT) {
|
} else if (type == TSDB_DATA_TYPE_INT) {
|
||||||
int32_t *pData = (int32_t*)pCol->pData;
|
int32_t *pData = (int32_t*)pCol->pData;
|
||||||
int64_t *val = (int64_t*) buf;
|
int32_t *val = (int32_t*) buf;
|
||||||
|
|
||||||
for (int32_t i = 0; i < pCtx->size; ++i) {
|
for (int32_t i = 0; i < pCtx->size; ++i) {
|
||||||
if ((pCol->hasNull) && colDataIsNull_f(pCol->nullbitmap, i)) {
|
if ((pCol->hasNull) && colDataIsNull_f(pCol->nullbitmap, i)) {
|
||||||
|
@ -417,22 +415,22 @@ int32_t doMinMaxHelper(SqlFunctionCtx *pCtx, int32_t isMinFunc) {
|
||||||
#if defined(_DEBUG_VIEW)
|
#if defined(_DEBUG_VIEW)
|
||||||
qDebug("max value updated:%d", *retVal);
|
qDebug("max value updated:%d", *retVal);
|
||||||
#endif
|
#endif
|
||||||
} else if (pCtx->inputType == TSDB_DATA_TYPE_BIGINT) {
|
} else if (type == TSDB_DATA_TYPE_BIGINT) {
|
||||||
LOOPCHECK_N(*(int64_t*) buf, pCol, pCtx, int64_t, numOfRows, start, isMinFunc, numOfElems);
|
LOOPCHECK_N(*(int64_t*) buf, pCol, pCtx, int64_t, numOfRows, start, isMinFunc, numOfElems);
|
||||||
}
|
}
|
||||||
} else if (IS_UNSIGNED_NUMERIC_TYPE(pCtx->inputType)) {
|
} else if (IS_UNSIGNED_NUMERIC_TYPE(type)) {
|
||||||
if (pCtx->inputType == TSDB_DATA_TYPE_UTINYINT) {
|
if (type == TSDB_DATA_TYPE_UTINYINT) {
|
||||||
LOOPCHECK_N(*(uint64_t*) buf, pCol, pCtx, uint8_t, numOfRows, start, isMinFunc, numOfElems);
|
LOOPCHECK_N(*(uint8_t*) buf, pCol, pCtx, uint8_t, numOfRows, start, isMinFunc, numOfElems);
|
||||||
} else if (pCtx->inputType == TSDB_DATA_TYPE_USMALLINT) {
|
} else if (type == TSDB_DATA_TYPE_USMALLINT) {
|
||||||
LOOPCHECK_N(*(uint64_t*) buf, pCol, pCtx, uint16_t, numOfRows, start, isMinFunc, numOfElems);
|
LOOPCHECK_N(*(uint16_t*) buf, pCol, pCtx, uint16_t, numOfRows, start, isMinFunc, numOfElems);
|
||||||
} else if (pCtx->inputType == TSDB_DATA_TYPE_UINT) {
|
} else if (type == TSDB_DATA_TYPE_UINT) {
|
||||||
LOOPCHECK_N(*(uint64_t*) buf, pCol, pCtx, uint32_t, numOfRows, start, isMinFunc, numOfElems);
|
LOOPCHECK_N(*(uint32_t*) buf, pCol, pCtx, uint32_t, numOfRows, start, isMinFunc, numOfElems);
|
||||||
} else if (pCtx->inputType == TSDB_DATA_TYPE_UBIGINT) {
|
} else if (type == TSDB_DATA_TYPE_UBIGINT) {
|
||||||
LOOPCHECK_N(*(uint64_t*) buf, pCol, pCtx, uint64_t, numOfRows, start, isMinFunc, numOfElems);
|
LOOPCHECK_N(*(uint64_t*) buf, pCol, pCtx, uint64_t, numOfRows, start, isMinFunc, numOfElems);
|
||||||
}
|
}
|
||||||
} else if (pCtx->inputType == TSDB_DATA_TYPE_DOUBLE) {
|
} else if (type == TSDB_DATA_TYPE_DOUBLE) {
|
||||||
LOOPCHECK_N(*(double*) buf, pCol, pCtx, double, numOfRows, start, isMinFunc, numOfElems);
|
LOOPCHECK_N(*(double*) buf, pCol, pCtx, double, numOfRows, start, isMinFunc, numOfElems);
|
||||||
} else if (pCtx->inputType == TSDB_DATA_TYPE_FLOAT) {
|
} else if (type == TSDB_DATA_TYPE_FLOAT) {
|
||||||
LOOPCHECK_N(*(float*) buf, pCol, pCtx, float, numOfRows, start, isMinFunc, numOfElems);
|
LOOPCHECK_N(*(float*) buf, pCol, pCtx, float, numOfRows, start, isMinFunc, numOfElems);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue