enh(query): refactor function merge code to provide common interface
This commit is contained in:
parent
797cde53d2
commit
be88d8027f
|
@ -2103,8 +2103,49 @@ int32_t apercentileFunction(SqlFunctionCtx* pCtx) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void apercentileTransferInfo(SAPercentileInfo* pInput, SAPercentileInfo* pOutput) {
|
||||||
|
pOutput->percent = pInput->percent;
|
||||||
|
pOutput->algo = pInput->algo;
|
||||||
|
if (pOutput->algo == APERCT_ALGO_TDIGEST) {
|
||||||
|
buildTDigestInfo(pInput);
|
||||||
|
tdigestAutoFill(pInput->pTDigest, COMPRESSION);
|
||||||
|
|
||||||
|
if(pInput->pTDigest->num_centroids == 0 && pInput->pTDigest->num_buffered_pts == 0) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
buildTDigestInfo(pOutput);
|
||||||
|
TDigest *pTDigest = pOutput->pTDigest;
|
||||||
|
|
||||||
|
if(pTDigest->num_centroids <= 0) {
|
||||||
|
memcpy(pTDigest, pInput->pTDigest, (size_t)TDIGEST_SIZE(COMPRESSION));
|
||||||
|
tdigestAutoFill(pTDigest, COMPRESSION);
|
||||||
|
} else {
|
||||||
|
tdigestMerge(pTDigest, pInput->pTDigest);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
buildHistogramInfo(pInput);
|
||||||
|
if (pInput->pHisto->numOfElems <= 0) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
buildHistogramInfo(pOutput);
|
||||||
|
SHistogramInfo *pHisto = pOutput->pHisto;
|
||||||
|
|
||||||
|
if (pHisto->numOfElems <= 0) {
|
||||||
|
memcpy(pHisto, pInput->pHisto, sizeof(SHistogramInfo) + sizeof(SHistBin) * (MAX_HISTOGRAM_BIN + 1));
|
||||||
|
pHisto->elems = (SHistBin*) ((char *)pHisto + sizeof(SHistogramInfo));
|
||||||
|
} else {
|
||||||
|
pHisto->elems = (SHistBin*) ((char *)pHisto + sizeof(SHistogramInfo));
|
||||||
|
SHistogramInfo *pRes = tHistogramMerge(pHisto, pInput->pHisto, MAX_HISTOGRAM_BIN);
|
||||||
|
memcpy(pHisto, pRes, sizeof(SHistogramInfo) + sizeof(SHistBin) * MAX_HISTOGRAM_BIN);
|
||||||
|
pHisto->elems = (SHistBin*) ((char *)pHisto + sizeof(SHistogramInfo));
|
||||||
|
tHistogramDestroy(&pRes);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
int32_t apercentileFunctionMerge(SqlFunctionCtx* pCtx) {
|
int32_t apercentileFunctionMerge(SqlFunctionCtx* pCtx) {
|
||||||
int32_t numOfElems = 0;
|
|
||||||
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||||
|
|
||||||
SInputColumnInfoData* pInput = &pCtx->input;
|
SInputColumnInfoData* pInput = &pCtx->input;
|
||||||
|
@ -2113,60 +2154,14 @@ int32_t apercentileFunctionMerge(SqlFunctionCtx* pCtx) {
|
||||||
ASSERT(pCol->info.type == TSDB_DATA_TYPE_BINARY);
|
ASSERT(pCol->info.type == TSDB_DATA_TYPE_BINARY);
|
||||||
|
|
||||||
SAPercentileInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo);
|
SAPercentileInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo);
|
||||||
SAPercentileInfo* pInputInfo;
|
|
||||||
|
|
||||||
int32_t start = pInput->startRowIndex;
|
int32_t start = pInput->startRowIndex;
|
||||||
for (int32_t i = start; i < pInput->numOfRows + start; ++i) {
|
char* data = colDataGetData(pCol, start);
|
||||||
//if (colDataIsNull_s(pCol, i)) {
|
SAPercentileInfo* pInputInfo = (SAPercentileInfo *)varDataVal(data);
|
||||||
// continue;
|
|
||||||
//}
|
|
||||||
numOfElems += 1;
|
|
||||||
char* data = colDataGetData(pCol, i);
|
|
||||||
|
|
||||||
pInputInfo = (SAPercentileInfo *)varDataVal(data);
|
apercentileTransferInfo(pInputInfo, pInfo);
|
||||||
}
|
|
||||||
|
|
||||||
pInfo->percent = pInputInfo->percent;
|
SET_VAL(pResInfo, 1, 1);
|
||||||
pInfo->algo = pInputInfo->algo;
|
|
||||||
if (pInfo->algo == APERCT_ALGO_TDIGEST) {
|
|
||||||
buildTDigestInfo(pInputInfo);
|
|
||||||
tdigestAutoFill(pInputInfo->pTDigest, COMPRESSION);
|
|
||||||
|
|
||||||
if(pInputInfo->pTDigest->num_centroids == 0 && pInputInfo->pTDigest->num_buffered_pts == 0) {
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
buildTDigestInfo(pInfo);
|
|
||||||
TDigest *pTDigest = pInfo->pTDigest;
|
|
||||||
|
|
||||||
if(pTDigest->num_centroids <= 0) {
|
|
||||||
memcpy(pTDigest, pInputInfo->pTDigest, (size_t)TDIGEST_SIZE(COMPRESSION));
|
|
||||||
tdigestAutoFill(pTDigest, COMPRESSION);
|
|
||||||
} else {
|
|
||||||
tdigestMerge(pTDigest, pInputInfo->pTDigest);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
buildHistogramInfo(pInputInfo);
|
|
||||||
if (pInputInfo->pHisto->numOfElems <= 0) {
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
buildHistogramInfo(pInfo);
|
|
||||||
SHistogramInfo *pHisto = pInfo->pHisto;
|
|
||||||
|
|
||||||
if (pHisto->numOfElems <= 0) {
|
|
||||||
memcpy(pHisto, pInputInfo->pHisto, sizeof(SHistogramInfo) + sizeof(SHistBin) * (MAX_HISTOGRAM_BIN + 1));
|
|
||||||
pHisto->elems = (SHistBin*) ((char *)pHisto + sizeof(SHistogramInfo));
|
|
||||||
} else {
|
|
||||||
pHisto->elems = (SHistBin*) ((char *)pHisto + sizeof(SHistogramInfo));
|
|
||||||
SHistogramInfo *pRes = tHistogramMerge(pHisto, pInputInfo->pHisto, MAX_HISTOGRAM_BIN);
|
|
||||||
memcpy(pHisto, pRes, sizeof(SHistogramInfo) + sizeof(SHistBin) * MAX_HISTOGRAM_BIN);
|
|
||||||
pHisto->elems = (SHistBin*) ((char *)pHisto + sizeof(SHistogramInfo));
|
|
||||||
tHistogramDestroy(&pRes);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
SET_VAL(pResInfo, numOfElems, 1);
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3049,6 +3044,17 @@ _spread_over:
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void spreadTransferInfo(SSpreadInfo* pInput, SSpreadInfo* pOutput) {
|
||||||
|
pOutput->hasResult = pInput->hasResult;
|
||||||
|
if (pInput->max > pOutput->max) {
|
||||||
|
pOutput->max = pInput->max;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pInput->min < pOutput->min) {
|
||||||
|
pOutput->min = pInput->min;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
int32_t spreadFunctionMerge(SqlFunctionCtx *pCtx) {
|
int32_t spreadFunctionMerge(SqlFunctionCtx *pCtx) {
|
||||||
SInputColumnInfoData* pInput = &pCtx->input;
|
SInputColumnInfoData* pInput = &pCtx->input;
|
||||||
SColumnInfoData* pCol = pInput->pData[0];
|
SColumnInfoData* pCol = pInput->pData[0];
|
||||||
|
@ -3061,14 +3067,7 @@ int32_t spreadFunctionMerge(SqlFunctionCtx *pCtx) {
|
||||||
char* data = colDataGetData(pCol, start);
|
char* data = colDataGetData(pCol, start);
|
||||||
pInputInfo = (SSpreadInfo *)varDataVal(data);
|
pInputInfo = (SSpreadInfo *)varDataVal(data);
|
||||||
|
|
||||||
pInfo->hasResult = pInputInfo->hasResult;
|
spreadTransferInfo(pInputInfo, pInfo);
|
||||||
if (pInputInfo->max > pInfo->max) {
|
|
||||||
pInfo->max = pInputInfo->max;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pInputInfo->min < pInfo->min) {
|
|
||||||
pInfo->min = pInputInfo->min;
|
|
||||||
}
|
|
||||||
|
|
||||||
SET_VAL(GET_RES_INFO(pCtx), 1, 1);
|
SET_VAL(GET_RES_INFO(pCtx), 1, 1);
|
||||||
|
|
||||||
|
@ -3206,6 +3205,17 @@ _elapsed_over:
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void elapsedTransferInfo(SElapsedInfo* pInput, SElapsedInfo* pOutput) {
|
||||||
|
pOutput->timeUnit = pInput->timeUnit;
|
||||||
|
if (pOutput->min > pInput->min) {
|
||||||
|
pOutput->min = pInput->min;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pOutput->max < pInput->max) {
|
||||||
|
pOutput->max = pInput->max;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
int32_t elapsedFunctionMerge(SqlFunctionCtx *pCtx) {
|
int32_t elapsedFunctionMerge(SqlFunctionCtx *pCtx) {
|
||||||
SInputColumnInfoData* pInput = &pCtx->input;
|
SInputColumnInfoData* pInput = &pCtx->input;
|
||||||
SColumnInfoData* pCol = pInput->pData[0];
|
SColumnInfoData* pCol = pInput->pData[0];
|
||||||
|
@ -3217,14 +3227,7 @@ int32_t elapsedFunctionMerge(SqlFunctionCtx *pCtx) {
|
||||||
char* data = colDataGetData(pCol, start);
|
char* data = colDataGetData(pCol, start);
|
||||||
SElapsedInfo* pInputInfo = (SElapsedInfo *)varDataVal(data);
|
SElapsedInfo* pInputInfo = (SElapsedInfo *)varDataVal(data);
|
||||||
|
|
||||||
pInfo->timeUnit = pInputInfo->timeUnit;
|
elapsedTransferInfo(pInputInfo, pInfo);
|
||||||
if (pInfo->min > pInputInfo->min) {
|
|
||||||
pInfo->min = pInputInfo->min;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pInfo->max < pInputInfo->max) {
|
|
||||||
pInfo->max = pInputInfo->max;
|
|
||||||
}
|
|
||||||
|
|
||||||
SET_VAL(GET_RES_INFO(pCtx), 1, 1);
|
SET_VAL(GET_RES_INFO(pCtx), 1, 1);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -3470,6 +3473,17 @@ int32_t histogramFunction(SqlFunctionCtx *pCtx) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void histogramTransferInfo(SHistoFuncInfo* pInput, SHistoFuncInfo* pOutput) {
|
||||||
|
pOutput->normalized = pInput->normalized;
|
||||||
|
pOutput->numOfBins = pInput->numOfBins;
|
||||||
|
pOutput->totalCount += pInput->totalCount;
|
||||||
|
for (int32_t k = 0; k < pOutput->numOfBins; ++k) {
|
||||||
|
pOutput->bins[k].lower = pInput->bins[k].lower;
|
||||||
|
pOutput->bins[k].upper = pInput->bins[k].upper;
|
||||||
|
pOutput->bins[k].count += pInput->bins[k].count;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
int32_t histogramFunctionMerge(SqlFunctionCtx *pCtx) {
|
int32_t histogramFunctionMerge(SqlFunctionCtx *pCtx) {
|
||||||
SInputColumnInfoData* pInput = &pCtx->input;
|
SInputColumnInfoData* pInput = &pCtx->input;
|
||||||
SColumnInfoData* pCol = pInput->pData[0];
|
SColumnInfoData* pCol = pInput->pData[0];
|
||||||
|
@ -3481,14 +3495,7 @@ int32_t histogramFunctionMerge(SqlFunctionCtx *pCtx) {
|
||||||
char* data = colDataGetData(pCol, start);
|
char* data = colDataGetData(pCol, start);
|
||||||
SHistoFuncInfo* pInputInfo = (SHistoFuncInfo *)varDataVal(data);
|
SHistoFuncInfo* pInputInfo = (SHistoFuncInfo *)varDataVal(data);
|
||||||
|
|
||||||
pInfo->normalized = pInputInfo->normalized;
|
histogramTransferInfo(pInputInfo, pInfo);
|
||||||
pInfo->numOfBins = pInputInfo->numOfBins;
|
|
||||||
pInfo->totalCount += pInputInfo->totalCount;
|
|
||||||
for (int32_t k = 0; k < pInfo->numOfBins; ++k) {
|
|
||||||
pInfo->bins[k].lower = pInputInfo->bins[k].lower;
|
|
||||||
pInfo->bins[k].upper = pInputInfo->bins[k].upper;
|
|
||||||
pInfo->bins[k].count += pInputInfo->bins[k].count;
|
|
||||||
}
|
|
||||||
|
|
||||||
SET_VAL(GET_RES_INFO(pCtx), pInfo->numOfBins, pInfo->numOfBins);
|
SET_VAL(GET_RES_INFO(pCtx), pInfo->numOfBins, pInfo->numOfBins);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -3676,6 +3683,14 @@ int32_t hllFunction(SqlFunctionCtx *pCtx) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void hllTransferInfo(SHLLInfo* pInput, SHLLInfo* pOutput) {
|
||||||
|
for (int32_t k = 0; k < HLL_BUCKETS; ++k) {
|
||||||
|
if (pOutput->buckets[k] < pInput->buckets[k]) {
|
||||||
|
pOutput->buckets[k] = pInput->buckets[k];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
int32_t hllFunctionMerge(SqlFunctionCtx *pCtx) {
|
int32_t hllFunctionMerge(SqlFunctionCtx *pCtx) {
|
||||||
SInputColumnInfoData* pInput = &pCtx->input;
|
SInputColumnInfoData* pInput = &pCtx->input;
|
||||||
SColumnInfoData* pCol = pInput->pData[0];
|
SColumnInfoData* pCol = pInput->pData[0];
|
||||||
|
@ -3687,11 +3702,7 @@ int32_t hllFunctionMerge(SqlFunctionCtx *pCtx) {
|
||||||
char* data = colDataGetData(pCol, start);
|
char* data = colDataGetData(pCol, start);
|
||||||
SHLLInfo* pInputInfo = (SHLLInfo *)varDataVal(data);
|
SHLLInfo* pInputInfo = (SHLLInfo *)varDataVal(data);
|
||||||
|
|
||||||
for (int32_t k = 0; k < HLL_BUCKETS; ++k) {
|
hllTransferInfo(pInputInfo, pInfo);
|
||||||
if (pInfo->buckets[k] < pInputInfo->buckets[k]) {
|
|
||||||
pInfo->buckets[k] = pInputInfo->buckets[k];
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
SET_VAL(GET_RES_INFO(pCtx), 1, 1);
|
SET_VAL(GET_RES_INFO(pCtx), 1, 1);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
|
Loading…
Reference in New Issue