add spread split partial/merge function
This commit is contained in:
parent
183fcac013
commit
622b4f8d47
|
@ -2190,19 +2190,19 @@ int32_t apercentilePartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||||
int32_t bytesHist = (int32_t)(sizeof(SAPercentileInfo) + sizeof(SHistogramInfo) + sizeof(SHistBin) * (MAX_HISTOGRAM_BIN + 1));
|
int32_t bytesHist = (int32_t)(sizeof(SAPercentileInfo) + sizeof(SHistogramInfo) + sizeof(SHistBin) * (MAX_HISTOGRAM_BIN + 1));
|
||||||
int32_t bytesDigest = (int32_t)(sizeof(SAPercentileInfo) + TDIGEST_SIZE(COMPRESSION));
|
int32_t bytesDigest = (int32_t)(sizeof(SAPercentileInfo) + TDIGEST_SIZE(COMPRESSION));
|
||||||
int32_t resultBytes = TMAX(bytesHist, bytesDigest);
|
int32_t resultBytes = TMAX(bytesHist, bytesDigest);
|
||||||
char *tmp = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char));
|
char *res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char));
|
||||||
|
|
||||||
if (pInfo->algo == APERCT_ALGO_TDIGEST) {
|
if (pInfo->algo == APERCT_ALGO_TDIGEST) {
|
||||||
if (pInfo->pTDigest->size > 0) {
|
if (pInfo->pTDigest->size > 0) {
|
||||||
memcpy(varDataVal(tmp), pInfo, resultBytes);
|
memcpy(varDataVal(res), pInfo, resultBytes);
|
||||||
varDataSetLen(tmp, resultBytes);
|
varDataSetLen(res, resultBytes);
|
||||||
} else {
|
} else {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if (pInfo->pHisto->numOfElems > 0) {
|
if (pInfo->pHisto->numOfElems > 0) {
|
||||||
memcpy(varDataVal(tmp), pInfo, resultBytes);
|
memcpy(varDataVal(res), pInfo, resultBytes);
|
||||||
varDataSetLen(tmp, resultBytes);
|
varDataSetLen(res, resultBytes);
|
||||||
} else {
|
} else {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -2211,9 +2211,9 @@ int32_t apercentilePartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||||
int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
|
int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
|
||||||
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
|
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
|
||||||
|
|
||||||
colDataAppend(pCol, pBlock->info.rows, tmp, false);
|
colDataAppend(pCol, pBlock->info.rows, res, false);
|
||||||
|
|
||||||
taosMemoryFree(tmp);
|
taosMemoryFree(res);
|
||||||
return pResInfo->numOfRes;
|
return pResInfo->numOfRes;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2957,6 +2957,31 @@ _spread_over:
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t spreadFunctionMerge(SqlFunctionCtx *pCtx) {
|
||||||
|
SInputColumnInfoData* pInput = &pCtx->input;
|
||||||
|
SColumnInfoData* pCol = pInput->pData[0];
|
||||||
|
ASSERT(pCol->info.type == TSDB_DATA_TYPE_BINARY);
|
||||||
|
|
||||||
|
SSpreadInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
||||||
|
SSpreadInfo* pInputInfo;
|
||||||
|
|
||||||
|
int32_t start = pInput->startRowIndex;
|
||||||
|
char* data = colDataGetData(pCol, start);
|
||||||
|
pInputInfo = (SSpreadInfo *)varDataVal(data);
|
||||||
|
|
||||||
|
if (pInputInfo->max > pInfo->max) {
|
||||||
|
pInfo->max =pInputInfo->max;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pInputInfo->min < pInfo->min) {
|
||||||
|
pInfo->min =pInputInfo->min;
|
||||||
|
}
|
||||||
|
|
||||||
|
SET_VAL(GET_RES_INFO(pCtx), 1, 1);
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t spreadFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
int32_t spreadFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||||
SSpreadInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
SSpreadInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
||||||
if (pInfo->hasResult == true) {
|
if (pInfo->hasResult == true) {
|
||||||
|
@ -2965,6 +2990,24 @@ int32_t spreadFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||||
return functionFinalize(pCtx, pBlock);
|
return functionFinalize(pCtx, pBlock);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t spreadPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||||
|
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||||
|
SSpreadInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
||||||
|
int32_t resultBytes = (int32_t)sizeof(SSpreadInfo);
|
||||||
|
char *res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char));
|
||||||
|
|
||||||
|
memcpy(varDataVal(res), pInfo, resultBytes);
|
||||||
|
varDataSetLen(res, resultBytes);
|
||||||
|
|
||||||
|
int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
|
||||||
|
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
|
||||||
|
|
||||||
|
colDataAppend(pCol, pBlock->info.rows, res, false);
|
||||||
|
|
||||||
|
taosMemoryFree(res);
|
||||||
|
return pResInfo->numOfRes;
|
||||||
|
}
|
||||||
|
|
||||||
bool getElapsedFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
|
bool getElapsedFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
|
||||||
pEnv->calcMemSize = sizeof(SElapsedInfo);
|
pEnv->calcMemSize = sizeof(SElapsedInfo);
|
||||||
return true;
|
return true;
|
||||||
|
|
Loading…
Reference in New Issue