avg function add partial/merge
This commit is contained in:
parent
4113472e89
commit
e82289542d
|
@ -55,7 +55,9 @@ int32_t maxCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
|
||||||
bool getAvgFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
bool getAvgFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||||
bool avgFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
bool avgFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
||||||
int32_t avgFunction(SqlFunctionCtx* pCtx);
|
int32_t avgFunction(SqlFunctionCtx* pCtx);
|
||||||
|
int32_t avgFunctionMerge(SqlFunctionCtx* pCtx);
|
||||||
int32_t avgFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
int32_t avgFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
||||||
|
int32_t avgPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
||||||
int32_t avgInvertFunction(SqlFunctionCtx* pCtx);
|
int32_t avgInvertFunction(SqlFunctionCtx* pCtx);
|
||||||
int32_t avgCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
|
int32_t avgCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
|
||||||
int32_t getAvgInfoSize();
|
int32_t getAvgInfoSize();
|
||||||
|
|
|
@ -51,6 +51,7 @@ typedef struct SAvgRes {
|
||||||
double result;
|
double result;
|
||||||
SSumRes sum;
|
SSumRes sum;
|
||||||
int64_t count;
|
int64_t count;
|
||||||
|
int16_t type; // store the original input type, used in merge function
|
||||||
} SAvgRes;
|
} SAvgRes;
|
||||||
|
|
||||||
typedef struct STuplePos {
|
typedef struct STuplePos {
|
||||||
|
@ -649,6 +650,7 @@ int32_t avgFunction(SqlFunctionCtx* pCtx) {
|
||||||
int32_t type = pInput->pData[0]->info.type;
|
int32_t type = pInput->pData[0]->info.type;
|
||||||
|
|
||||||
SAvgRes* pAvgRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
SAvgRes* pAvgRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
||||||
|
pAvgRes->type = type;
|
||||||
|
|
||||||
// computing based on the true data block
|
// computing based on the true data block
|
||||||
SColumnInfoData* pCol = pInput->pData[0];
|
SColumnInfoData* pCol = pInput->pData[0];
|
||||||
|
@ -771,6 +773,38 @@ _avg_over:
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void avgTransferInfo(SAvgRes* pInput, SAvgRes* pOutput) {
|
||||||
|
int16_t type = pInput->type;
|
||||||
|
if (IS_INTEGER_TYPE(type)) {
|
||||||
|
pOutput->sum.isum += pInput->sum.isum;
|
||||||
|
} else {
|
||||||
|
pOutput->sum.dsum += pInput->sum.dsum;
|
||||||
|
}
|
||||||
|
|
||||||
|
pOutput->count += pInput->count;
|
||||||
|
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t avgFunctionMerge(SqlFunctionCtx* pCtx) {
|
||||||
|
SInputColumnInfoData* pInput = &pCtx->input;
|
||||||
|
SColumnInfoData* pCol = pInput->pData[0];
|
||||||
|
ASSERT(pCol->info.type == TSDB_DATA_TYPE_BINARY);
|
||||||
|
|
||||||
|
SAvgRes* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
||||||
|
SAvgRes* pInputInfo;
|
||||||
|
|
||||||
|
int32_t start = pInput->startRowIndex;
|
||||||
|
char* data = colDataGetData(pCol, start);
|
||||||
|
pInputInfo = (SAvgRes*)varDataVal(data);
|
||||||
|
|
||||||
|
avgTransferInfo(pInputInfo, pInfo);
|
||||||
|
|
||||||
|
SET_VAL(GET_RES_INFO(pCtx), 1, 1);
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
#define LIST_AVG_N(sumT, T) \
|
#define LIST_AVG_N(sumT, T) \
|
||||||
do { \
|
do { \
|
||||||
T* plist = (T*)pCol->pData; \
|
T* plist = (T*)pCol->pData; \
|
||||||
|
@ -872,6 +906,24 @@ int32_t avgFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||||
return functionFinalize(pCtx, pBlock);
|
return functionFinalize(pCtx, pBlock);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t avgPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||||
|
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||||
|
SAvgRes* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
||||||
|
int32_t resultBytes = getAvgInfoSize();
|
||||||
|
char* res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char));
|
||||||
|
|
||||||
|
memcpy(varDataVal(res), pInfo, resultBytes);
|
||||||
|
varDataSetLen(res, resultBytes);
|
||||||
|
|
||||||
|
int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
|
||||||
|
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
|
||||||
|
|
||||||
|
colDataAppend(pCol, pBlock->info.rows, res, false);
|
||||||
|
|
||||||
|
taosMemoryFree(res);
|
||||||
|
return pResInfo->numOfRes;
|
||||||
|
}
|
||||||
|
|
||||||
EFuncDataRequired statisDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWindow) {
|
EFuncDataRequired statisDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWindow) {
|
||||||
return FUNC_DATA_REQUIRED_STATIS_LOAD;
|
return FUNC_DATA_REQUIRED_STATIS_LOAD;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue