refactor: do some internal refactor.
This commit is contained in:
parent
a2cec238ec
commit
9f5cf450de
|
@ -125,7 +125,7 @@ ELSE ()
|
|||
|
||||
MESSAGE("System processor ID: ${CMAKE_SYSTEM_PROCESSOR}")
|
||||
IF (TD_INTEL_64 OR TD_INTEL_32)
|
||||
ADD_DEFINITIONS("-msse4.2")
|
||||
ADD_DEFINITIONS("-msse4.2 -mavx")
|
||||
IF("${FMA_SUPPORT}" MATCHES "true")
|
||||
MESSAGE(STATUS "turn fma function support on")
|
||||
ADD_DEFINITIONS("-mfma")
|
||||
|
|
|
@ -225,13 +225,13 @@ typedef struct SVarColAttr {
|
|||
// pBlockAgg->numOfNull == info.rows, all data are null
|
||||
// pBlockAgg->numOfNull == 0, no data are null.
|
||||
typedef struct SColumnInfoData {
|
||||
char* pData; // the corresponding block data in memory
|
||||
char* pData; // the corresponding block data in memory
|
||||
union {
|
||||
char* nullbitmap; // bitmap, one bit for each item in the list
|
||||
SVarColAttr varmeta;
|
||||
};
|
||||
SColumnInfo info; // column info
|
||||
bool hasNull; // if current column data has null value.
|
||||
SColumnInfo info; // column info
|
||||
bool hasNull; // if current column data has null value.
|
||||
} SColumnInfoData;
|
||||
|
||||
typedef struct SQueryTableDataCond {
|
||||
|
|
|
@ -643,34 +643,6 @@ int32_t tSerializeSGetUserAuthRsp(void* buf, int32_t bufLen, SGetUserAuthRsp* pR
|
|||
int32_t tDeserializeSGetUserAuthRsp(void* buf, int32_t bufLen, SGetUserAuthRsp* pRsp);
|
||||
void tFreeSGetUserAuthRsp(SGetUserAuthRsp* pRsp);
|
||||
|
||||
typedef struct {
|
||||
int16_t lowerRelOptr;
|
||||
int16_t upperRelOptr;
|
||||
int16_t filterstr; // denote if current column is char(binary/nchar)
|
||||
|
||||
union {
|
||||
struct {
|
||||
int64_t lowerBndi;
|
||||
int64_t upperBndi;
|
||||
};
|
||||
struct {
|
||||
double lowerBndd;
|
||||
double upperBndd;
|
||||
};
|
||||
struct {
|
||||
int64_t pz;
|
||||
int64_t len;
|
||||
};
|
||||
};
|
||||
} SColumnFilterInfo;
|
||||
|
||||
typedef struct {
|
||||
int16_t numOfFilters;
|
||||
union {
|
||||
int64_t placeholder;
|
||||
SColumnFilterInfo* filterInfo;
|
||||
};
|
||||
} SColumnFilterList;
|
||||
/*
|
||||
* for client side struct, only column id, type, bytes are necessary
|
||||
* But for data in vnode side, we need all the following information.
|
||||
|
@ -681,10 +653,10 @@ typedef struct {
|
|||
int16_t slotId;
|
||||
};
|
||||
|
||||
int8_t type;
|
||||
int32_t bytes;
|
||||
uint8_t precision;
|
||||
uint8_t scale;
|
||||
int32_t bytes;
|
||||
int8_t type;
|
||||
} SColumnInfo;
|
||||
|
||||
typedef struct STimeWindow {
|
||||
|
|
|
@ -105,7 +105,7 @@ int32_t metaGetTableTagsByUids(SMeta *pMeta, int64_t suid, SArray *uidList,
|
|||
int32_t metaReadNext(SMetaReader *pReader);
|
||||
const void *metaGetTableTagVal(void *tag, int16_t type, STagVal *tagVal);
|
||||
int metaGetTableNameByUid(void *meta, uint64_t uid, char *tbName);
|
||||
int metaGetTableUidByName(void *meta, char *tbName, int64_t *uid);
|
||||
int metaGetTableUidByName(void *meta, char *tbName, uint64_t *uid);
|
||||
int metaGetTableTypeByName(void *meta, char *tbName, ETableType *tbType);
|
||||
bool metaIsTableExist(SMeta *pMeta, tb_uid_t uid);
|
||||
|
||||
|
|
|
@ -211,7 +211,7 @@ int metaGetTableNameByUid(void *meta, uint64_t uid, char *tbName) {
|
|||
|
||||
return 0;
|
||||
}
|
||||
int metaGetTableUidByName(void *meta, char *tbName, int64_t *uid) {
|
||||
int metaGetTableUidByName(void *meta, char *tbName, uint64_t *uid) {
|
||||
int code = 0;
|
||||
SMetaReader mr = {0};
|
||||
metaReaderInit(&mr, (SMeta *)meta, 0);
|
||||
|
@ -1127,7 +1127,7 @@ int32_t metaFilterTableName(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) {
|
|||
if (valid < 0) break;
|
||||
|
||||
char *pTableKey = (char *)pEntryKey;
|
||||
int32_t cmp = (*param->filterFunc)(pTableKey, pName, pCursor->type);
|
||||
cmp = (*param->filterFunc)(pTableKey, pName, pCursor->type);
|
||||
if (cmp == 0) {
|
||||
tb_uid_t tuid = *(tb_uid_t *)pEntryVal;
|
||||
taosArrayPush(pUids, &tuid);
|
||||
|
|
|
@ -535,7 +535,7 @@ static SSDataBlock* createResBlock(SQueryTableDataCond* pCond, int32_t capacity)
|
|||
}
|
||||
|
||||
for (int32_t i = 0; i < pCond->numOfCols; ++i) {
|
||||
SColumnInfoData colInfo = {0, {0}};
|
||||
SColumnInfoData colInfo = {0};
|
||||
colInfo.info = pCond->colList[i];
|
||||
blockDataAppendColInfo(pResBlock, &colInfo);
|
||||
}
|
||||
|
|
|
@ -421,7 +421,7 @@ static SColumnInfoData* getColInfoResult(void* metaHandle, int64_t suid, SArray*
|
|||
}
|
||||
|
||||
for (int32_t i = 0; i < taosArrayGetSize(ctx.cInfoList); ++i) {
|
||||
SColumnInfoData colInfo = {0, {0}};
|
||||
SColumnInfoData colInfo = {0};
|
||||
colInfo.info = *(SColumnInfo*)taosArrayGet(ctx.cInfoList, i);
|
||||
blockDataAppendColInfo(pResBlock, &colInfo);
|
||||
}
|
||||
|
@ -582,7 +582,7 @@ int32_t getColInfoResultForGroupby(void* metaHandle, SNodeList* group, STableLis
|
|||
}
|
||||
|
||||
for (int32_t i = 0; i < taosArrayGetSize(ctx.cInfoList); ++i) {
|
||||
SColumnInfoData colInfo = {0, {0}};
|
||||
SColumnInfoData colInfo = {0};
|
||||
colInfo.info = *(SColumnInfo*)taosArrayGet(ctx.cInfoList, i);
|
||||
blockDataAppendColInfo(pResBlock, &colInfo);
|
||||
}
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
aux_source_directory(src FUNCTION_SRC)
|
||||
aux_source_directory(src/detail FUNCTION_SRC_DETAIL)
|
||||
list(REMOVE_ITEM FUNCTION_SRC src/udfd.c)
|
||||
add_library(function STATIC ${FUNCTION_SRC})
|
||||
add_library(function STATIC ${FUNCTION_SRC} ${FUNCTION_SRC_DETAIL})
|
||||
target_include_directories(
|
||||
function
|
||||
PUBLIC
|
||||
|
|
|
@ -23,6 +23,15 @@ extern "C" {
|
|||
#include "function.h"
|
||||
#include "functionMgt.h"
|
||||
|
||||
typedef struct SSumRes {
|
||||
union {
|
||||
int64_t isum;
|
||||
uint64_t usum;
|
||||
double dsum;
|
||||
};
|
||||
int16_t type;
|
||||
} SSumRes;
|
||||
|
||||
bool functionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo);
|
||||
int32_t functionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
||||
int32_t functionFinalizeWithResultBuf(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, char* finalResult);
|
||||
|
@ -119,15 +128,10 @@ EFuncDataRequired lastDynDataReq(void* pRes, STimeWindow* pTimeWindow);
|
|||
int32_t lastRowFunction(SqlFunctionCtx* pCtx);
|
||||
|
||||
bool getTopBotFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv);
|
||||
bool getTopBotMergeFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv);
|
||||
bool topBotFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo);
|
||||
int32_t topFunction(SqlFunctionCtx* pCtx);
|
||||
int32_t topFunctionMerge(SqlFunctionCtx* pCtx);
|
||||
int32_t bottomFunction(SqlFunctionCtx* pCtx);
|
||||
int32_t bottomFunctionMerge(SqlFunctionCtx* pCtx);
|
||||
int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
||||
int32_t topBotPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
||||
int32_t topBotMergeFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
||||
int32_t topCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
|
||||
int32_t bottomCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
|
||||
int32_t getTopBotInfoSize(int64_t numOfItems);
|
||||
|
|
|
@ -41,21 +41,12 @@
|
|||
#define HLL_BUCKET_MASK (HLL_BUCKETS - 1)
|
||||
#define HLL_ALPHA_INF 0.721347520444481703680 // constant for 0.5/ln(2)
|
||||
|
||||
typedef struct SSumRes {
|
||||
union {
|
||||
int64_t isum;
|
||||
uint64_t usum;
|
||||
double dsum;
|
||||
};
|
||||
int16_t type;
|
||||
} SSumRes;
|
||||
|
||||
typedef struct SAvgRes {
|
||||
double result;
|
||||
SSumRes sum;
|
||||
int64_t count;
|
||||
int16_t type; // store the original input type, used in merge function
|
||||
} SAvgRes;
|
||||
//typedef struct SAvgRes {
|
||||
// double result;
|
||||
// SSumRes sum;
|
||||
// int64_t count;
|
||||
// int16_t type; // store the original input type, used in merge function
|
||||
//} SAvgRes;
|
||||
|
||||
typedef struct SMinmaxResInfo {
|
||||
bool assign; // assign the first value or not
|
||||
|
@ -362,19 +353,19 @@ typedef struct SGroupKeyInfo {
|
|||
} \
|
||||
} while (0)
|
||||
|
||||
#define LIST_AVG_N(sumT, T) \
|
||||
do { \
|
||||
T* plist = (T*)pCol->pData; \
|
||||
for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) { \
|
||||
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) { \
|
||||
continue; \
|
||||
} \
|
||||
\
|
||||
numOfElem += 1; \
|
||||
pAvgRes->count -= 1; \
|
||||
sumT -= plist[i]; \
|
||||
} \
|
||||
} while (0)
|
||||
//#define LIST_AVG_N(sumT, T) \
|
||||
// do { \
|
||||
// T* plist = (T*)pCol->pData; \
|
||||
// for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) { \
|
||||
// if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) { \
|
||||
// continue; \
|
||||
// } \
|
||||
// \
|
||||
// numOfElem += 1; \
|
||||
// pAvgRes->count -= 1; \
|
||||
// sumT -= plist[i]; \
|
||||
// } \
|
||||
// } while (0)
|
||||
|
||||
#define LIST_STDDEV_SUB_N(sumT, T) \
|
||||
do { \
|
||||
|
@ -741,374 +732,374 @@ bool getSumFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
|
|||
return true;
|
||||
}
|
||||
|
||||
int32_t getAvgInfoSize() { return (int32_t)sizeof(SAvgRes); }
|
||||
|
||||
bool getAvgFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
|
||||
pEnv->calcMemSize = sizeof(SAvgRes);
|
||||
return true;
|
||||
}
|
||||
|
||||
bool avgFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) {
|
||||
if (!functionSetup(pCtx, pResultInfo)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
SAvgRes* pRes = GET_ROWCELL_INTERBUF(pResultInfo);
|
||||
memset(pRes, 0, sizeof(SAvgRes));
|
||||
return true;
|
||||
}
|
||||
|
||||
int32_t avgFunction(SqlFunctionCtx* pCtx) {
|
||||
int32_t numOfElem = 0;
|
||||
|
||||
SInputColumnInfoData* pInput = &pCtx->input;
|
||||
SColumnDataAgg* pAgg = pInput->pColumnDataAgg[0];
|
||||
int32_t type = pInput->pData[0]->info.type;
|
||||
|
||||
SAvgRes* pAvgRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
||||
pAvgRes->type = type;
|
||||
|
||||
// computing based on the true data block
|
||||
SColumnInfoData* pCol = pInput->pData[0];
|
||||
|
||||
int32_t start = pInput->startRowIndex;
|
||||
int32_t numOfRows = pInput->numOfRows;
|
||||
|
||||
if (IS_NULL_TYPE(type)) {
|
||||
numOfElem = 0;
|
||||
goto _avg_over;
|
||||
}
|
||||
|
||||
if (pInput->colDataAggIsSet) {
|
||||
numOfElem = numOfRows - pAgg->numOfNull;
|
||||
ASSERT(numOfElem >= 0);
|
||||
|
||||
pAvgRes->count += numOfElem;
|
||||
if (IS_SIGNED_NUMERIC_TYPE(type)) {
|
||||
pAvgRes->sum.isum += pAgg->sum;
|
||||
} else if (IS_UNSIGNED_NUMERIC_TYPE(type)) {
|
||||
pAvgRes->sum.usum += pAgg->sum;
|
||||
} else if (IS_FLOAT_TYPE(type)) {
|
||||
pAvgRes->sum.dsum += GET_DOUBLE_VAL((const char*)&(pAgg->sum));
|
||||
}
|
||||
} else { // computing based on the true data block
|
||||
switch (type) {
|
||||
case TSDB_DATA_TYPE_TINYINT: {
|
||||
int8_t* plist = (int8_t*)pCol->pData;
|
||||
for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
|
||||
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
numOfElem += 1;
|
||||
pAvgRes->count += 1;
|
||||
pAvgRes->sum.isum += plist[i];
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
case TSDB_DATA_TYPE_SMALLINT: {
|
||||
int16_t* plist = (int16_t*)pCol->pData;
|
||||
for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
|
||||
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
numOfElem += 1;
|
||||
pAvgRes->count += 1;
|
||||
pAvgRes->sum.isum += plist[i];
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
case TSDB_DATA_TYPE_INT: {
|
||||
int32_t* plist = (int32_t*)pCol->pData;
|
||||
for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
|
||||
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
numOfElem += 1;
|
||||
pAvgRes->count += 1;
|
||||
pAvgRes->sum.isum += plist[i];
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
case TSDB_DATA_TYPE_BIGINT: {
|
||||
int64_t* plist = (int64_t*)pCol->pData;
|
||||
for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
|
||||
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
numOfElem += 1;
|
||||
pAvgRes->count += 1;
|
||||
pAvgRes->sum.isum += plist[i];
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
case TSDB_DATA_TYPE_UTINYINT: {
|
||||
uint8_t* plist = (uint8_t*)pCol->pData;
|
||||
for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
|
||||
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
numOfElem += 1;
|
||||
pAvgRes->count += 1;
|
||||
pAvgRes->sum.usum += plist[i];
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
case TSDB_DATA_TYPE_USMALLINT: {
|
||||
uint16_t* plist = (uint16_t*)pCol->pData;
|
||||
for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
|
||||
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
numOfElem += 1;
|
||||
pAvgRes->count += 1;
|
||||
pAvgRes->sum.usum += plist[i];
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
case TSDB_DATA_TYPE_UINT: {
|
||||
uint32_t* plist = (uint32_t*)pCol->pData;
|
||||
for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
|
||||
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
numOfElem += 1;
|
||||
pAvgRes->count += 1;
|
||||
pAvgRes->sum.usum += plist[i];
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
case TSDB_DATA_TYPE_UBIGINT: {
|
||||
uint64_t* plist = (uint64_t*)pCol->pData;
|
||||
for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
|
||||
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
numOfElem += 1;
|
||||
pAvgRes->count += 1;
|
||||
pAvgRes->sum.usum += plist[i];
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
case TSDB_DATA_TYPE_FLOAT: {
|
||||
float* plist = (float*)pCol->pData;
|
||||
// float val = 0;
|
||||
for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
|
||||
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
numOfElem += 1;
|
||||
pAvgRes->count += 1;
|
||||
pAvgRes->sum.dsum += plist[i];
|
||||
}
|
||||
// pAvgRes->sum.dsum = val;
|
||||
break;
|
||||
}
|
||||
|
||||
case TSDB_DATA_TYPE_DOUBLE: {
|
||||
double* plist = (double*)pCol->pData;
|
||||
for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
|
||||
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
numOfElem += 1;
|
||||
pAvgRes->count += 1;
|
||||
pAvgRes->sum.dsum += plist[i];
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
_avg_over:
|
||||
// data in the check operation are all null, not output
|
||||
SET_VAL(GET_RES_INFO(pCtx), numOfElem, 1);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static void avgTransferInfo(SAvgRes* pInput, SAvgRes* pOutput) {
|
||||
pOutput->type = pInput->type;
|
||||
if (IS_SIGNED_NUMERIC_TYPE(pOutput->type)) {
|
||||
pOutput->sum.isum += pInput->sum.isum;
|
||||
} else if (IS_UNSIGNED_NUMERIC_TYPE(pOutput->type)) {
|
||||
pOutput->sum.usum += pInput->sum.usum;
|
||||
} else {
|
||||
pOutput->sum.dsum += pInput->sum.dsum;
|
||||
}
|
||||
|
||||
pOutput->count += pInput->count;
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
int32_t avgFunctionMerge(SqlFunctionCtx* pCtx) {
|
||||
SInputColumnInfoData* pInput = &pCtx->input;
|
||||
SColumnInfoData* pCol = pInput->pData[0];
|
||||
ASSERT(pCol->info.type == TSDB_DATA_TYPE_BINARY);
|
||||
|
||||
SAvgRes* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
||||
|
||||
int32_t start = pInput->startRowIndex;
|
||||
|
||||
for (int32_t i = start; i < start + pInput->numOfRows; ++i) {
|
||||
char* data = colDataGetData(pCol, i);
|
||||
SAvgRes* pInputInfo = (SAvgRes*)varDataVal(data);
|
||||
avgTransferInfo(pInputInfo, pInfo);
|
||||
}
|
||||
|
||||
SET_VAL(GET_RES_INFO(pCtx), 1, 1);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t avgInvertFunction(SqlFunctionCtx* pCtx) {
|
||||
int32_t numOfElem = 0;
|
||||
|
||||
// Only the pre-computing information loaded and actual data does not loaded
|
||||
SInputColumnInfoData* pInput = &pCtx->input;
|
||||
int32_t type = pInput->pData[0]->info.type;
|
||||
|
||||
SAvgRes* pAvgRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
||||
|
||||
// computing based on the true data block
|
||||
SColumnInfoData* pCol = pInput->pData[0];
|
||||
|
||||
int32_t start = pInput->startRowIndex;
|
||||
int32_t numOfRows = pInput->numOfRows;
|
||||
|
||||
switch (type) {
|
||||
case TSDB_DATA_TYPE_TINYINT: {
|
||||
LIST_AVG_N(pAvgRes->sum.isum, int8_t);
|
||||
break;
|
||||
}
|
||||
case TSDB_DATA_TYPE_SMALLINT: {
|
||||
LIST_AVG_N(pAvgRes->sum.isum, int16_t);
|
||||
break;
|
||||
}
|
||||
case TSDB_DATA_TYPE_INT: {
|
||||
LIST_AVG_N(pAvgRes->sum.isum, int32_t);
|
||||
break;
|
||||
}
|
||||
case TSDB_DATA_TYPE_BIGINT: {
|
||||
LIST_AVG_N(pAvgRes->sum.isum, int64_t);
|
||||
break;
|
||||
}
|
||||
case TSDB_DATA_TYPE_UTINYINT: {
|
||||
LIST_AVG_N(pAvgRes->sum.usum, uint8_t);
|
||||
break;
|
||||
}
|
||||
case TSDB_DATA_TYPE_USMALLINT: {
|
||||
LIST_AVG_N(pAvgRes->sum.usum, uint16_t);
|
||||
break;
|
||||
}
|
||||
case TSDB_DATA_TYPE_UINT: {
|
||||
LIST_AVG_N(pAvgRes->sum.usum, uint32_t);
|
||||
break;
|
||||
}
|
||||
case TSDB_DATA_TYPE_UBIGINT: {
|
||||
LIST_AVG_N(pAvgRes->sum.usum, uint64_t);
|
||||
break;
|
||||
}
|
||||
case TSDB_DATA_TYPE_FLOAT: {
|
||||
LIST_AVG_N(pAvgRes->sum.dsum, float);
|
||||
break;
|
||||
}
|
||||
case TSDB_DATA_TYPE_DOUBLE: {
|
||||
LIST_AVG_N(pAvgRes->sum.dsum, double);
|
||||
break;
|
||||
}
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
||||
// data in the check operation are all null, not output
|
||||
SET_VAL(GET_RES_INFO(pCtx), numOfElem, 1);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t avgCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
|
||||
SResultRowEntryInfo* pDResInfo = GET_RES_INFO(pDestCtx);
|
||||
SAvgRes* pDBuf = GET_ROWCELL_INTERBUF(pDResInfo);
|
||||
|
||||
SResultRowEntryInfo* pSResInfo = GET_RES_INFO(pSourceCtx);
|
||||
SAvgRes* pSBuf = GET_ROWCELL_INTERBUF(pSResInfo);
|
||||
int16_t type = pDBuf->type == TSDB_DATA_TYPE_NULL ? pSBuf->type : pDBuf->type;
|
||||
|
||||
if (IS_SIGNED_NUMERIC_TYPE(type)) {
|
||||
pDBuf->sum.isum += pSBuf->sum.isum;
|
||||
} else if (IS_UNSIGNED_NUMERIC_TYPE(type)) {
|
||||
pDBuf->sum.usum += pSBuf->sum.usum;
|
||||
} else {
|
||||
pDBuf->sum.dsum += pSBuf->sum.dsum;
|
||||
}
|
||||
pDBuf->count += pSBuf->count;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t avgFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||
SInputColumnInfoData* pInput = &pCtx->input;
|
||||
|
||||
SAvgRes* pAvgRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
||||
int32_t type = pAvgRes->type;
|
||||
|
||||
if (IS_SIGNED_NUMERIC_TYPE(type)) {
|
||||
pAvgRes->result = pAvgRes->sum.isum / ((double)pAvgRes->count);
|
||||
} else if (IS_UNSIGNED_NUMERIC_TYPE(type)) {
|
||||
pAvgRes->result = pAvgRes->sum.usum / ((double)pAvgRes->count);
|
||||
} else {
|
||||
pAvgRes->result = pAvgRes->sum.dsum / ((double)pAvgRes->count);
|
||||
}
|
||||
|
||||
// check for overflow
|
||||
if (isinf(pAvgRes->result) || isnan(pAvgRes->result)) {
|
||||
GET_RES_INFO(pCtx)->numOfRes = 0;
|
||||
}
|
||||
|
||||
return functionFinalize(pCtx, pBlock);
|
||||
}
|
||||
|
||||
int32_t avgPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||
SAvgRes* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
||||
int32_t resultBytes = getAvgInfoSize();
|
||||
char* res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char));
|
||||
|
||||
memcpy(varDataVal(res), pInfo, resultBytes);
|
||||
varDataSetLen(res, resultBytes);
|
||||
|
||||
int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
|
||||
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
|
||||
|
||||
colDataAppend(pCol, pBlock->info.rows, res, false);
|
||||
|
||||
taosMemoryFree(res);
|
||||
return pResInfo->numOfRes;
|
||||
}
|
||||
//int32_t getAvgInfoSize() { return (int32_t)sizeof(SAvgRes); }
|
||||
//
|
||||
//bool getAvgFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
|
||||
// pEnv->calcMemSize = sizeof(SAvgRes);
|
||||
// return true;
|
||||
//}
|
||||
//
|
||||
//bool avgFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) {
|
||||
// if (!functionSetup(pCtx, pResultInfo)) {
|
||||
// return false;
|
||||
// }
|
||||
//
|
||||
// SAvgRes* pRes = GET_ROWCELL_INTERBUF(pResultInfo);
|
||||
// memset(pRes, 0, sizeof(SAvgRes));
|
||||
// return true;
|
||||
//}
|
||||
|
||||
//int32_t avgFunction(SqlFunctionCtx* pCtx) {
|
||||
// int32_t numOfElem = 0;
|
||||
//
|
||||
// SInputColumnInfoData* pInput = &pCtx->input;
|
||||
// SColumnDataAgg* pAgg = pInput->pColumnDataAgg[0];
|
||||
// int32_t type = pInput->pData[0]->info.type;
|
||||
//
|
||||
// SAvgRes* pAvgRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
||||
// pAvgRes->type = type;
|
||||
//
|
||||
// // computing based on the true data block
|
||||
// SColumnInfoData* pCol = pInput->pData[0];
|
||||
//
|
||||
// int32_t start = pInput->startRowIndex;
|
||||
// int32_t numOfRows = pInput->numOfRows;
|
||||
//
|
||||
// if (IS_NULL_TYPE(type)) {
|
||||
// numOfElem = 0;
|
||||
// goto _avg_over;
|
||||
// }
|
||||
//
|
||||
// if (pInput->colDataAggIsSet) {
|
||||
// numOfElem = numOfRows - pAgg->numOfNull;
|
||||
// ASSERT(numOfElem >= 0);
|
||||
//
|
||||
// pAvgRes->count += numOfElem;
|
||||
// if (IS_SIGNED_NUMERIC_TYPE(type)) {
|
||||
// pAvgRes->sum.isum += pAgg->sum;
|
||||
// } else if (IS_UNSIGNED_NUMERIC_TYPE(type)) {
|
||||
// pAvgRes->sum.usum += pAgg->sum;
|
||||
// } else if (IS_FLOAT_TYPE(type)) {
|
||||
// pAvgRes->sum.dsum += GET_DOUBLE_VAL((const char*)&(pAgg->sum));
|
||||
// }
|
||||
// } else { // computing based on the true data block
|
||||
// switch (type) {
|
||||
// case TSDB_DATA_TYPE_TINYINT: {
|
||||
// int8_t* plist = (int8_t*)pCol->pData;
|
||||
// for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
|
||||
// if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
|
||||
// continue;
|
||||
// }
|
||||
//
|
||||
// numOfElem += 1;
|
||||
// pAvgRes->count += 1;
|
||||
// pAvgRes->sum.isum += plist[i];
|
||||
// }
|
||||
//
|
||||
// break;
|
||||
// }
|
||||
//
|
||||
// case TSDB_DATA_TYPE_SMALLINT: {
|
||||
// int16_t* plist = (int16_t*)pCol->pData;
|
||||
// for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
|
||||
// if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
|
||||
// continue;
|
||||
// }
|
||||
//
|
||||
// numOfElem += 1;
|
||||
// pAvgRes->count += 1;
|
||||
// pAvgRes->sum.isum += plist[i];
|
||||
// }
|
||||
// break;
|
||||
// }
|
||||
//
|
||||
// case TSDB_DATA_TYPE_INT: {
|
||||
// int32_t* plist = (int32_t*)pCol->pData;
|
||||
// for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
|
||||
// if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
|
||||
// continue;
|
||||
// }
|
||||
//
|
||||
// numOfElem += 1;
|
||||
// pAvgRes->count += 1;
|
||||
// pAvgRes->sum.isum += plist[i];
|
||||
// }
|
||||
//
|
||||
// break;
|
||||
// }
|
||||
//
|
||||
// case TSDB_DATA_TYPE_BIGINT: {
|
||||
// int64_t* plist = (int64_t*)pCol->pData;
|
||||
// for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
|
||||
// if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
|
||||
// continue;
|
||||
// }
|
||||
//
|
||||
// numOfElem += 1;
|
||||
// pAvgRes->count += 1;
|
||||
// pAvgRes->sum.isum += plist[i];
|
||||
// }
|
||||
// break;
|
||||
// }
|
||||
//
|
||||
// case TSDB_DATA_TYPE_UTINYINT: {
|
||||
// uint8_t* plist = (uint8_t*)pCol->pData;
|
||||
// for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
|
||||
// if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
|
||||
// continue;
|
||||
// }
|
||||
//
|
||||
// numOfElem += 1;
|
||||
// pAvgRes->count += 1;
|
||||
// pAvgRes->sum.usum += plist[i];
|
||||
// }
|
||||
//
|
||||
// break;
|
||||
// }
|
||||
//
|
||||
// case TSDB_DATA_TYPE_USMALLINT: {
|
||||
// uint16_t* plist = (uint16_t*)pCol->pData;
|
||||
// for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
|
||||
// if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
|
||||
// continue;
|
||||
// }
|
||||
//
|
||||
// numOfElem += 1;
|
||||
// pAvgRes->count += 1;
|
||||
// pAvgRes->sum.usum += plist[i];
|
||||
// }
|
||||
// break;
|
||||
// }
|
||||
//
|
||||
// case TSDB_DATA_TYPE_UINT: {
|
||||
// uint32_t* plist = (uint32_t*)pCol->pData;
|
||||
// for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
|
||||
// if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
|
||||
// continue;
|
||||
// }
|
||||
//
|
||||
// numOfElem += 1;
|
||||
// pAvgRes->count += 1;
|
||||
// pAvgRes->sum.usum += plist[i];
|
||||
// }
|
||||
//
|
||||
// break;
|
||||
// }
|
||||
//
|
||||
// case TSDB_DATA_TYPE_UBIGINT: {
|
||||
// uint64_t* plist = (uint64_t*)pCol->pData;
|
||||
// for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
|
||||
// if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
|
||||
// continue;
|
||||
// }
|
||||
//
|
||||
// numOfElem += 1;
|
||||
// pAvgRes->count += 1;
|
||||
// pAvgRes->sum.usum += plist[i];
|
||||
// }
|
||||
// break;
|
||||
// }
|
||||
//
|
||||
// case TSDB_DATA_TYPE_FLOAT: {
|
||||
// float* plist = (float*)pCol->pData;
|
||||
//// float val = 0;
|
||||
// for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
|
||||
// if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
|
||||
// continue;
|
||||
// }
|
||||
//
|
||||
// numOfElem += 1;
|
||||
// pAvgRes->count += 1;
|
||||
// pAvgRes->sum.dsum += plist[i];
|
||||
// }
|
||||
//// pAvgRes->sum.dsum = val;
|
||||
// break;
|
||||
// }
|
||||
//
|
||||
// case TSDB_DATA_TYPE_DOUBLE: {
|
||||
// double* plist = (double*)pCol->pData;
|
||||
// for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
|
||||
// if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
|
||||
// continue;
|
||||
// }
|
||||
//
|
||||
// numOfElem += 1;
|
||||
// pAvgRes->count += 1;
|
||||
// pAvgRes->sum.dsum += plist[i];
|
||||
// }
|
||||
// break;
|
||||
// }
|
||||
//
|
||||
// default:
|
||||
// break;
|
||||
// }
|
||||
// }
|
||||
//
|
||||
//_avg_over:
|
||||
// // data in the check operation are all null, not output
|
||||
// SET_VAL(GET_RES_INFO(pCtx), numOfElem, 1);
|
||||
// return TSDB_CODE_SUCCESS;
|
||||
//}
|
||||
|
||||
//static void avgTransferInfo(SAvgRes* pInput, SAvgRes* pOutput) {
|
||||
// pOutput->type = pInput->type;
|
||||
// if (IS_SIGNED_NUMERIC_TYPE(pOutput->type)) {
|
||||
// pOutput->sum.isum += pInput->sum.isum;
|
||||
// } else if (IS_UNSIGNED_NUMERIC_TYPE(pOutput->type)) {
|
||||
// pOutput->sum.usum += pInput->sum.usum;
|
||||
// } else {
|
||||
// pOutput->sum.dsum += pInput->sum.dsum;
|
||||
// }
|
||||
//
|
||||
// pOutput->count += pInput->count;
|
||||
//
|
||||
// return;
|
||||
//}
|
||||
//
|
||||
//int32_t avgFunctionMerge(SqlFunctionCtx* pCtx) {
|
||||
// SInputColumnInfoData* pInput = &pCtx->input;
|
||||
// SColumnInfoData* pCol = pInput->pData[0];
|
||||
// ASSERT(pCol->info.type == TSDB_DATA_TYPE_BINARY);
|
||||
//
|
||||
// SAvgRes* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
||||
//
|
||||
// int32_t start = pInput->startRowIndex;
|
||||
//
|
||||
// for (int32_t i = start; i < start + pInput->numOfRows; ++i) {
|
||||
// char* data = colDataGetData(pCol, i);
|
||||
// SAvgRes* pInputInfo = (SAvgRes*)varDataVal(data);
|
||||
// avgTransferInfo(pInputInfo, pInfo);
|
||||
// }
|
||||
//
|
||||
// SET_VAL(GET_RES_INFO(pCtx), 1, 1);
|
||||
//
|
||||
// return TSDB_CODE_SUCCESS;
|
||||
//}
|
||||
//
|
||||
//int32_t avgInvertFunction(SqlFunctionCtx* pCtx) {
|
||||
// int32_t numOfElem = 0;
|
||||
//
|
||||
// // Only the pre-computing information loaded and actual data does not loaded
|
||||
// SInputColumnInfoData* pInput = &pCtx->input;
|
||||
// int32_t type = pInput->pData[0]->info.type;
|
||||
//
|
||||
// SAvgRes* pAvgRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
||||
//
|
||||
// // computing based on the true data block
|
||||
// SColumnInfoData* pCol = pInput->pData[0];
|
||||
//
|
||||
// int32_t start = pInput->startRowIndex;
|
||||
// int32_t numOfRows = pInput->numOfRows;
|
||||
//
|
||||
// switch (type) {
|
||||
// case TSDB_DATA_TYPE_TINYINT: {
|
||||
// LIST_AVG_N(pAvgRes->sum.isum, int8_t);
|
||||
// break;
|
||||
// }
|
||||
// case TSDB_DATA_TYPE_SMALLINT: {
|
||||
// LIST_AVG_N(pAvgRes->sum.isum, int16_t);
|
||||
// break;
|
||||
// }
|
||||
// case TSDB_DATA_TYPE_INT: {
|
||||
// LIST_AVG_N(pAvgRes->sum.isum, int32_t);
|
||||
// break;
|
||||
// }
|
||||
// case TSDB_DATA_TYPE_BIGINT: {
|
||||
// LIST_AVG_N(pAvgRes->sum.isum, int64_t);
|
||||
// break;
|
||||
// }
|
||||
// case TSDB_DATA_TYPE_UTINYINT: {
|
||||
// LIST_AVG_N(pAvgRes->sum.usum, uint8_t);
|
||||
// break;
|
||||
// }
|
||||
// case TSDB_DATA_TYPE_USMALLINT: {
|
||||
// LIST_AVG_N(pAvgRes->sum.usum, uint16_t);
|
||||
// break;
|
||||
// }
|
||||
// case TSDB_DATA_TYPE_UINT: {
|
||||
// LIST_AVG_N(pAvgRes->sum.usum, uint32_t);
|
||||
// break;
|
||||
// }
|
||||
// case TSDB_DATA_TYPE_UBIGINT: {
|
||||
// LIST_AVG_N(pAvgRes->sum.usum, uint64_t);
|
||||
// break;
|
||||
// }
|
||||
// case TSDB_DATA_TYPE_FLOAT: {
|
||||
// LIST_AVG_N(pAvgRes->sum.dsum, float);
|
||||
// break;
|
||||
// }
|
||||
// case TSDB_DATA_TYPE_DOUBLE: {
|
||||
// LIST_AVG_N(pAvgRes->sum.dsum, double);
|
||||
// break;
|
||||
// }
|
||||
// default:
|
||||
// break;
|
||||
// }
|
||||
//
|
||||
// // data in the check operation are all null, not output
|
||||
// SET_VAL(GET_RES_INFO(pCtx), numOfElem, 1);
|
||||
// return TSDB_CODE_SUCCESS;
|
||||
//}
|
||||
//
|
||||
//int32_t avgCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
|
||||
// SResultRowEntryInfo* pDResInfo = GET_RES_INFO(pDestCtx);
|
||||
// SAvgRes* pDBuf = GET_ROWCELL_INTERBUF(pDResInfo);
|
||||
//
|
||||
// SResultRowEntryInfo* pSResInfo = GET_RES_INFO(pSourceCtx);
|
||||
// SAvgRes* pSBuf = GET_ROWCELL_INTERBUF(pSResInfo);
|
||||
// int16_t type = pDBuf->type == TSDB_DATA_TYPE_NULL ? pSBuf->type : pDBuf->type;
|
||||
//
|
||||
// if (IS_SIGNED_NUMERIC_TYPE(type)) {
|
||||
// pDBuf->sum.isum += pSBuf->sum.isum;
|
||||
// } else if (IS_UNSIGNED_NUMERIC_TYPE(type)) {
|
||||
// pDBuf->sum.usum += pSBuf->sum.usum;
|
||||
// } else {
|
||||
// pDBuf->sum.dsum += pSBuf->sum.dsum;
|
||||
// }
|
||||
// pDBuf->count += pSBuf->count;
|
||||
//
|
||||
// return TSDB_CODE_SUCCESS;
|
||||
//}
|
||||
//
|
||||
//int32_t avgFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||
// SInputColumnInfoData* pInput = &pCtx->input;
|
||||
//
|
||||
// SAvgRes* pAvgRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
||||
// int32_t type = pAvgRes->type;
|
||||
//
|
||||
// if (IS_SIGNED_NUMERIC_TYPE(type)) {
|
||||
// pAvgRes->result = pAvgRes->sum.isum / ((double)pAvgRes->count);
|
||||
// } else if (IS_UNSIGNED_NUMERIC_TYPE(type)) {
|
||||
// pAvgRes->result = pAvgRes->sum.usum / ((double)pAvgRes->count);
|
||||
// } else {
|
||||
// pAvgRes->result = pAvgRes->sum.dsum / ((double)pAvgRes->count);
|
||||
// }
|
||||
//
|
||||
// // check for overflow
|
||||
// if (isinf(pAvgRes->result) || isnan(pAvgRes->result)) {
|
||||
// GET_RES_INFO(pCtx)->numOfRes = 0;
|
||||
// }
|
||||
//
|
||||
// return functionFinalize(pCtx, pBlock);
|
||||
//}
|
||||
//
|
||||
//int32_t avgPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||
// SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||
// SAvgRes* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
||||
// int32_t resultBytes = getAvgInfoSize();
|
||||
// char* res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char));
|
||||
//
|
||||
// memcpy(varDataVal(res), pInfo, resultBytes);
|
||||
// varDataSetLen(res, resultBytes);
|
||||
//
|
||||
// int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
|
||||
// SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
|
||||
//
|
||||
// colDataAppend(pCol, pBlock->info.rows, res, false);
|
||||
//
|
||||
// taosMemoryFree(res);
|
||||
// return pResInfo->numOfRes;
|
||||
//}
|
||||
|
||||
EFuncDataRequired statisDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWindow) {
|
||||
return FUNC_DATA_REQUIRED_SMA_LOAD;
|
||||
|
@ -3117,8 +3108,7 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) {
|
|||
int32_t round = pInput->numOfRows >> 2;
|
||||
int32_t reminder = pInput->numOfRows & 0x03;
|
||||
|
||||
int32_t tick = 0;
|
||||
for (int32_t i = pInput->startRowIndex; tick < round; i += 4, tick += 1) {
|
||||
for (int32_t i = pInput->startRowIndex, tick = 0; tick < round; i += 4, tick += 1) {
|
||||
int64_t cts = pts[i];
|
||||
int32_t chosen = i;
|
||||
|
||||
|
@ -3153,7 +3143,7 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) {
|
|||
}
|
||||
} else {
|
||||
for (int32_t i = pInput->startRowIndex; i < pInput->startRowIndex + pInput->numOfRows; ++i) {
|
||||
if (pInputCol->hasNull && colDataIsNull(pInputCol, pInput->totalRows, i, pColAgg)) {
|
||||
if (colDataIsNull(pInputCol, pInput->totalRows, i, pColAgg)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,482 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include <immintrin.h>
|
||||
#include "builtinsimpl.h"
|
||||
#include "function.h"
|
||||
#include "tdatablock.h"
|
||||
#include "tfunctionInt.h"
|
||||
#include "tglobal.h"
|
||||
|
||||
#define SET_VAL(_info, numOfElem, res) \
|
||||
do { \
|
||||
if ((numOfElem) <= 0) { \
|
||||
break; \
|
||||
} \
|
||||
(_info)->numOfRes = (res); \
|
||||
} while (0)
|
||||
|
||||
#define LIST_AVG_N(sumT, T) \
|
||||
do { \
|
||||
T* plist = (T*)pCol->pData; \
|
||||
for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) { \
|
||||
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) { \
|
||||
continue; \
|
||||
} \
|
||||
\
|
||||
numOfElem += 1; \
|
||||
pAvgRes->count -= 1; \
|
||||
sumT -= plist[i]; \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
typedef struct SAvgRes {
|
||||
double result;
|
||||
SSumRes sum;
|
||||
int64_t count;
|
||||
int16_t type; // store the original input type, used in merge function
|
||||
} SAvgRes;
|
||||
|
||||
static int32_t handleFloatCols(const SColumnInfoData* pCol, const SInputColumnInfoData* pInput, SAvgRes* pRes) {
|
||||
int32_t numOfElems = 0;
|
||||
float* plist = (float*)pCol->pData;
|
||||
|
||||
if (pCol->hasNull || pInput->numOfRows < 8) {
|
||||
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) {
|
||||
if (colDataIsNull_f(pCol->nullbitmap, i)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
numOfElems += 1;
|
||||
pRes->count += 1;
|
||||
pRes->sum.dsum += plist[i];
|
||||
}
|
||||
} else { // no null values exist
|
||||
numOfElems = pInput->numOfRows;
|
||||
pRes->count += pInput->numOfRows;
|
||||
|
||||
// 1. an software version to speedup the process by using loop unwinding.
|
||||
|
||||
|
||||
|
||||
// 2. if both the CPU and OS support SSE4.2, let's try the faster version by using SSE4.2 SIMD
|
||||
|
||||
|
||||
|
||||
// 3. If both the CPU and OS support AVX, let's employ AVX instruction to speedup this loop
|
||||
// 3.1 find the start position that are aligned to 32bytes address in memory
|
||||
int32_t startElem = 0;//((uint64_t)plist) & ((1<<8u)-1);
|
||||
int32_t i = 0;
|
||||
|
||||
int32_t bitWidth = 8;
|
||||
|
||||
int32_t remain = (pInput->numOfRows - startElem) % bitWidth;
|
||||
int32_t rounds = (pInput->numOfRows - startElem) / bitWidth;
|
||||
const float* p = &plist[startElem];
|
||||
|
||||
__m256 loadVal;
|
||||
__m256 sum = _mm256_setzero_ps();
|
||||
|
||||
for(; i < rounds; ++i) {
|
||||
loadVal = _mm256_loadu_ps(p);
|
||||
sum = _mm256_add_ps(sum, loadVal);
|
||||
p += bitWidth;
|
||||
}
|
||||
|
||||
// let sum up the final results
|
||||
const float* q = (const float*)∑
|
||||
pRes->sum.dsum += q[0] + q[1] + q[2] + q[3] + q[4] + q[5] + q[6] + q[7];
|
||||
|
||||
// calculate the front and the reminder items in array list
|
||||
for(int32_t j = 0; j < startElem; ++j) {
|
||||
pRes->sum.dsum += plist[j];
|
||||
}
|
||||
|
||||
startElem += rounds * bitWidth;
|
||||
for(int32_t j = 0; j < remain; ++j) {
|
||||
pRes->sum.dsum += plist[j + startElem];
|
||||
}
|
||||
}
|
||||
|
||||
return numOfElems;
|
||||
}
|
||||
|
||||
int32_t getAvgInfoSize() { return (int32_t)sizeof(SAvgRes); }
|
||||
|
||||
bool getAvgFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
|
||||
pEnv->calcMemSize = sizeof(SAvgRes);
|
||||
return true;
|
||||
}
|
||||
|
||||
bool avgFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) {
|
||||
if (!functionSetup(pCtx, pResultInfo)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
SAvgRes* pRes = GET_ROWCELL_INTERBUF(pResultInfo);
|
||||
memset(pRes, 0, sizeof(SAvgRes));
|
||||
return true;
|
||||
}
|
||||
|
||||
int32_t avgFunction(SqlFunctionCtx* pCtx) {
|
||||
int32_t numOfElem = 0;
|
||||
|
||||
SInputColumnInfoData* pInput = &pCtx->input;
|
||||
SColumnDataAgg* pAgg = pInput->pColumnDataAgg[0];
|
||||
int32_t type = pInput->pData[0]->info.type;
|
||||
|
||||
SAvgRes* pAvgRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
||||
pAvgRes->type = type;
|
||||
|
||||
// computing based on the true data block
|
||||
SColumnInfoData* pCol = pInput->pData[0];
|
||||
|
||||
int32_t start = pInput->startRowIndex;
|
||||
int32_t numOfRows = pInput->numOfRows;
|
||||
|
||||
if (IS_NULL_TYPE(type)) {
|
||||
numOfElem = 0;
|
||||
goto _avg_over;
|
||||
}
|
||||
|
||||
if (pInput->colDataAggIsSet) {
|
||||
numOfElem = numOfRows - pAgg->numOfNull;
|
||||
ASSERT(numOfElem >= 0);
|
||||
|
||||
pAvgRes->count += numOfElem;
|
||||
if (IS_SIGNED_NUMERIC_TYPE(type)) {
|
||||
pAvgRes->sum.isum += pAgg->sum;
|
||||
} else if (IS_UNSIGNED_NUMERIC_TYPE(type)) {
|
||||
pAvgRes->sum.usum += pAgg->sum;
|
||||
} else if (IS_FLOAT_TYPE(type)) {
|
||||
pAvgRes->sum.dsum += GET_DOUBLE_VAL((const char*)&(pAgg->sum));
|
||||
}
|
||||
} else { // computing based on the true data block
|
||||
switch (type) {
|
||||
case TSDB_DATA_TYPE_TINYINT: {
|
||||
int8_t* plist = (int8_t*)pCol->pData;
|
||||
for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
|
||||
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
numOfElem += 1;
|
||||
pAvgRes->count += 1;
|
||||
pAvgRes->sum.isum += plist[i];
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
case TSDB_DATA_TYPE_SMALLINT: {
|
||||
int16_t* plist = (int16_t*)pCol->pData;
|
||||
for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
|
||||
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
numOfElem += 1;
|
||||
pAvgRes->count += 1;
|
||||
pAvgRes->sum.isum += plist[i];
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
case TSDB_DATA_TYPE_INT: {
|
||||
int32_t* plist = (int32_t*)pCol->pData;
|
||||
for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
|
||||
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
numOfElem += 1;
|
||||
pAvgRes->count += 1;
|
||||
pAvgRes->sum.isum += plist[i];
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
case TSDB_DATA_TYPE_BIGINT: {
|
||||
int64_t* plist = (int64_t*)pCol->pData;
|
||||
for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
|
||||
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
numOfElem += 1;
|
||||
pAvgRes->count += 1;
|
||||
pAvgRes->sum.isum += plist[i];
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
case TSDB_DATA_TYPE_UTINYINT: {
|
||||
uint8_t* plist = (uint8_t*)pCol->pData;
|
||||
for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
|
||||
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
numOfElem += 1;
|
||||
pAvgRes->count += 1;
|
||||
pAvgRes->sum.usum += plist[i];
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
case TSDB_DATA_TYPE_USMALLINT: {
|
||||
uint16_t* plist = (uint16_t*)pCol->pData;
|
||||
for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
|
||||
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
numOfElem += 1;
|
||||
pAvgRes->count += 1;
|
||||
pAvgRes->sum.usum += plist[i];
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
case TSDB_DATA_TYPE_UINT: {
|
||||
uint32_t* plist = (uint32_t*)pCol->pData;
|
||||
for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
|
||||
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
numOfElem += 1;
|
||||
pAvgRes->count += 1;
|
||||
pAvgRes->sum.usum += plist[i];
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
case TSDB_DATA_TYPE_UBIGINT: {
|
||||
uint64_t* plist = (uint64_t*)pCol->pData;
|
||||
for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
|
||||
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
numOfElem += 1;
|
||||
pAvgRes->count += 1;
|
||||
pAvgRes->sum.usum += plist[i];
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
case TSDB_DATA_TYPE_FLOAT: {
|
||||
numOfElem = handleFloatCols(pCol, pInput, pAvgRes);
|
||||
// float* plist = (float*)pCol->pData;
|
||||
// // float val = 0;
|
||||
// for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
|
||||
// if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
|
||||
// continue;
|
||||
// }
|
||||
//
|
||||
// numOfElem += 1;
|
||||
// pAvgRes->count += 1;
|
||||
// pAvgRes->sum.dsum += plist[i];
|
||||
// }
|
||||
// pAvgRes->sum.dsum = val;
|
||||
break;
|
||||
}
|
||||
|
||||
case TSDB_DATA_TYPE_DOUBLE: {
|
||||
double* plist = (double*)pCol->pData;
|
||||
for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
|
||||
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
numOfElem += 1;
|
||||
pAvgRes->count += 1;
|
||||
pAvgRes->sum.dsum += plist[i];
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
_avg_over:
|
||||
// data in the check operation are all null, not output
|
||||
SET_VAL(GET_RES_INFO(pCtx), numOfElem, 1);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static void avgTransferInfo(SAvgRes* pInput, SAvgRes* pOutput) {
|
||||
pOutput->type = pInput->type;
|
||||
if (IS_SIGNED_NUMERIC_TYPE(pOutput->type)) {
|
||||
pOutput->sum.isum += pInput->sum.isum;
|
||||
} else if (IS_UNSIGNED_NUMERIC_TYPE(pOutput->type)) {
|
||||
pOutput->sum.usum += pInput->sum.usum;
|
||||
} else {
|
||||
pOutput->sum.dsum += pInput->sum.dsum;
|
||||
}
|
||||
|
||||
pOutput->count += pInput->count;
|
||||
}
|
||||
|
||||
int32_t avgFunctionMerge(SqlFunctionCtx* pCtx) {
|
||||
SInputColumnInfoData* pInput = &pCtx->input;
|
||||
SColumnInfoData* pCol = pInput->pData[0];
|
||||
ASSERT(pCol->info.type == TSDB_DATA_TYPE_BINARY);
|
||||
|
||||
SAvgRes* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
||||
|
||||
int32_t start = pInput->startRowIndex;
|
||||
|
||||
for (int32_t i = start; i < start + pInput->numOfRows; ++i) {
|
||||
char* data = colDataGetData(pCol, i);
|
||||
SAvgRes* pInputInfo = (SAvgRes*)varDataVal(data);
|
||||
avgTransferInfo(pInputInfo, pInfo);
|
||||
}
|
||||
|
||||
SET_VAL(GET_RES_INFO(pCtx), 1, 1);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t avgInvertFunction(SqlFunctionCtx* pCtx) {
|
||||
int32_t numOfElem = 0;
|
||||
|
||||
// Only the pre-computing information loaded and actual data does not loaded
|
||||
SInputColumnInfoData* pInput = &pCtx->input;
|
||||
int32_t type = pInput->pData[0]->info.type;
|
||||
|
||||
SAvgRes* pAvgRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
||||
|
||||
// computing based on the true data block
|
||||
SColumnInfoData* pCol = pInput->pData[0];
|
||||
|
||||
int32_t start = pInput->startRowIndex;
|
||||
int32_t numOfRows = pInput->numOfRows;
|
||||
|
||||
switch (type) {
|
||||
case TSDB_DATA_TYPE_TINYINT: {
|
||||
LIST_AVG_N(pAvgRes->sum.isum, int8_t);
|
||||
break;
|
||||
}
|
||||
case TSDB_DATA_TYPE_SMALLINT: {
|
||||
LIST_AVG_N(pAvgRes->sum.isum, int16_t);
|
||||
break;
|
||||
}
|
||||
case TSDB_DATA_TYPE_INT: {
|
||||
LIST_AVG_N(pAvgRes->sum.isum, int32_t);
|
||||
break;
|
||||
}
|
||||
case TSDB_DATA_TYPE_BIGINT: {
|
||||
LIST_AVG_N(pAvgRes->sum.isum, int64_t);
|
||||
break;
|
||||
}
|
||||
case TSDB_DATA_TYPE_UTINYINT: {
|
||||
LIST_AVG_N(pAvgRes->sum.usum, uint8_t);
|
||||
break;
|
||||
}
|
||||
case TSDB_DATA_TYPE_USMALLINT: {
|
||||
LIST_AVG_N(pAvgRes->sum.usum, uint16_t);
|
||||
break;
|
||||
}
|
||||
case TSDB_DATA_TYPE_UINT: {
|
||||
LIST_AVG_N(pAvgRes->sum.usum, uint32_t);
|
||||
break;
|
||||
}
|
||||
case TSDB_DATA_TYPE_UBIGINT: {
|
||||
LIST_AVG_N(pAvgRes->sum.usum, uint64_t);
|
||||
break;
|
||||
}
|
||||
case TSDB_DATA_TYPE_FLOAT: {
|
||||
LIST_AVG_N(pAvgRes->sum.dsum, float);
|
||||
break;
|
||||
}
|
||||
case TSDB_DATA_TYPE_DOUBLE: {
|
||||
LIST_AVG_N(pAvgRes->sum.dsum, double);
|
||||
break;
|
||||
}
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
||||
// data in the check operation are all null, not output
|
||||
SET_VAL(GET_RES_INFO(pCtx), numOfElem, 1);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t avgCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
|
||||
SResultRowEntryInfo* pDResInfo = GET_RES_INFO(pDestCtx);
|
||||
SAvgRes* pDBuf = GET_ROWCELL_INTERBUF(pDResInfo);
|
||||
|
||||
SResultRowEntryInfo* pSResInfo = GET_RES_INFO(pSourceCtx);
|
||||
SAvgRes* pSBuf = GET_ROWCELL_INTERBUF(pSResInfo);
|
||||
int16_t type = pDBuf->type == TSDB_DATA_TYPE_NULL ? pSBuf->type : pDBuf->type;
|
||||
|
||||
if (IS_SIGNED_NUMERIC_TYPE(type)) {
|
||||
pDBuf->sum.isum += pSBuf->sum.isum;
|
||||
} else if (IS_UNSIGNED_NUMERIC_TYPE(type)) {
|
||||
pDBuf->sum.usum += pSBuf->sum.usum;
|
||||
} else {
|
||||
pDBuf->sum.dsum += pSBuf->sum.dsum;
|
||||
}
|
||||
pDBuf->count += pSBuf->count;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t avgFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||
SInputColumnInfoData* pInput = &pCtx->input;
|
||||
|
||||
SAvgRes* pAvgRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
||||
int32_t type = pAvgRes->type;
|
||||
|
||||
if (IS_SIGNED_NUMERIC_TYPE(type)) {
|
||||
pAvgRes->result = pAvgRes->sum.isum / ((double)pAvgRes->count);
|
||||
} else if (IS_UNSIGNED_NUMERIC_TYPE(type)) {
|
||||
pAvgRes->result = pAvgRes->sum.usum / ((double)pAvgRes->count);
|
||||
} else {
|
||||
pAvgRes->result = pAvgRes->sum.dsum / ((double)pAvgRes->count);
|
||||
}
|
||||
|
||||
// check for overflow
|
||||
if (isinf(pAvgRes->result) || isnan(pAvgRes->result)) {
|
||||
GET_RES_INFO(pCtx)->numOfRes = 0;
|
||||
}
|
||||
|
||||
return functionFinalize(pCtx, pBlock);
|
||||
}
|
||||
|
||||
int32_t avgPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||
SAvgRes* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
||||
int32_t resultBytes = getAvgInfoSize();
|
||||
char* res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char));
|
||||
|
||||
memcpy(varDataVal(res), pInfo, resultBytes);
|
||||
varDataSetLen(res, resultBytes);
|
||||
|
||||
int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
|
||||
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
|
||||
|
||||
colDataAppend(pCol, pBlock->info.rows, res, false);
|
||||
|
||||
taosMemoryFree(res);
|
||||
return pResInfo->numOfRes;
|
||||
}
|
Loading…
Reference in New Issue