add histogram partial/merge function
This commit is contained in:
parent
081a8a62b2
commit
c07bea7ff5
|
@ -2187,9 +2187,7 @@ int32_t apercentilePartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||||
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||||
SAPercentileInfo* pInfo = (SAPercentileInfo*)GET_ROWCELL_INTERBUF(pResInfo);
|
SAPercentileInfo* pInfo = (SAPercentileInfo*)GET_ROWCELL_INTERBUF(pResInfo);
|
||||||
|
|
||||||
int32_t bytesHist = (int32_t)(sizeof(SAPercentileInfo) + sizeof(SHistogramInfo) + sizeof(SHistBin) * (MAX_HISTOGRAM_BIN + 1));
|
int32_t resultBytes = getApercentileMaxSize();
|
||||||
int32_t bytesDigest = (int32_t)(sizeof(SAPercentileInfo) + TDIGEST_SIZE(COMPRESSION));
|
|
||||||
int32_t resultBytes = TMAX(bytesHist, bytesDigest);
|
|
||||||
char *res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char));
|
char *res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char));
|
||||||
|
|
||||||
if (pInfo->algo == APERCT_ALGO_TDIGEST) {
|
if (pInfo->algo == APERCT_ALGO_TDIGEST) {
|
||||||
|
@ -3332,6 +3330,29 @@ int32_t histogramFunction(SqlFunctionCtx *pCtx) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t histogramFunctionMerge(SqlFunctionCtx *pCtx) {
|
||||||
|
SInputColumnInfoData* pInput = &pCtx->input;
|
||||||
|
SColumnInfoData* pCol = pInput->pData[0];
|
||||||
|
ASSERT(pCol->info.type == TSDB_DATA_TYPE_BINARY);
|
||||||
|
|
||||||
|
SHistoFuncInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
||||||
|
|
||||||
|
int32_t start = pInput->startRowIndex;
|
||||||
|
char* data = colDataGetData(pCol, start);
|
||||||
|
SHistoFuncInfo* pInputInfo = (SHistoFuncInfo *)varDataVal(data);
|
||||||
|
|
||||||
|
pInfo->normalized = pInputInfo->normalized;
|
||||||
|
pInfo->numOfBins = pInputInfo->numOfBins;
|
||||||
|
pInfo->totalCount += pInputInfo->totalCount;
|
||||||
|
for (int32_t k = 0; k < pInfo->numOfBins; ++k) {
|
||||||
|
pInfo->bins[k].count += pInputInfo->bins[k].count;
|
||||||
|
}
|
||||||
|
|
||||||
|
SET_VAL(GET_RES_INFO(pCtx), 1, 1);
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t histogramFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
int32_t histogramFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||||
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||||
SHistoFuncInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
SHistoFuncInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
||||||
|
@ -3368,6 +3389,24 @@ int32_t histogramFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||||
return pResInfo->numOfRes;
|
return pResInfo->numOfRes;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t histogramPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||||
|
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||||
|
SHistoFuncInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
||||||
|
int32_t resultBytes = getHistogramInfoSize();
|
||||||
|
char *res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char));
|
||||||
|
|
||||||
|
memcpy(varDataVal(res), pInfo, resultBytes);
|
||||||
|
varDataSetLen(res, resultBytes);
|
||||||
|
|
||||||
|
int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
|
||||||
|
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
|
||||||
|
|
||||||
|
colDataAppend(pCol, pBlock->info.rows, res, false);
|
||||||
|
|
||||||
|
taosMemoryFree(res);
|
||||||
|
return pResInfo->numOfRes;
|
||||||
|
}
|
||||||
|
|
||||||
bool getHLLFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
|
bool getHLLFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
|
||||||
pEnv->calcMemSize = sizeof(SHLLInfo);
|
pEnv->calcMemSize = sizeof(SHLLInfo);
|
||||||
return true;
|
return true;
|
||||||
|
|
Loading…
Reference in New Issue