feat: add irate distributed execution

This commit is contained in:
Ganlin Zhao 2023-08-15 12:59:30 +08:00
parent f9f74eaaf4
commit 01a7dfbc34
4 changed files with 149 additions and 0 deletions

View File

@ -157,6 +157,8 @@ typedef enum EFunctionType {
FUNCTION_TYPE_AVG_MERGE,
FUNCTION_TYPE_STDDEV_PARTIAL,
FUNCTION_TYPE_STDDEV_MERGE,
FUNCTION_TYPE_IRATE_PARTIAL,
FUNCTION_TYPE_IRATE_MERGE,
// geometry functions
FUNCTION_TYPE_GEOM_FROM_TEXT = 4250,

View File

@ -127,7 +127,10 @@ int32_t derivativeFunction(SqlFunctionCtx* pCtx);
bool getIrateFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
bool irateFuncSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo);
int32_t irateFunction(SqlFunctionCtx* pCtx);
int32_t irateFunctionMerge(SqlFunctionCtx* pCtx);
int32_t irateFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
int32_t iratePartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
int32_t getIrateInfoSize();
int32_t cachedLastRowFunction(SqlFunctionCtx* pCtx);

View File

@ -1567,6 +1567,43 @@ static int32_t translateIrate(SFunctionNode* pFunc, char* pErrBuf, int32_t len)
return TSDB_CODE_SUCCESS;
}
static int32_t translateIrateImpl(SFunctionNode* pFunc, char* pErrBuf, int32_t len, bool isPartial) {
if (3 != LIST_LENGTH(pFunc->pParameterList)) {
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
}
uint8_t colType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type;
if (isPartial) {
if (!IS_NUMERIC_TYPE(colType)) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
}
pFunc->node.resType = (SDataType){.bytes = getIrateInfoSize() + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY};
} else {
if (TSDB_DATA_TYPE_BINARY != colType) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
}
pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_DOUBLE].bytes, .type = TSDB_DATA_TYPE_DOUBLE};
// add database precision as param
uint8_t dbPrec = pFunc->node.resType.precision;
int32_t code = addDbPrecisonParam(&pFunc->pParameterList, dbPrec);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
return TSDB_CODE_SUCCESS;
}
static int32_t translateIratePartial(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
return translateIrateImpl(pFunc, pErrBuf, len, true);
}
static int32_t translateIrateMerge(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
return translateIrateImpl(pFunc, pErrBuf, len, false);
}
static int32_t translateInterp(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
int32_t numOfParams = LIST_LENGTH(pFunc->pParameterList);
uint8_t dbPrec = pFunc->node.resType.precision;
@ -2604,6 +2641,32 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.initFunc = irateFuncSetup,
.processFunc = irateFunction,
.sprocessFunc = irateScalarFunction,
.finalizeFunc = irateFinalize,
.pPartialFunc = "_irate_partial",
.pMergeFunc = "_irate_merge"
},
{
.name = "_irate_partial",
.type = FUNCTION_TYPE_IRATE_PARTIAL,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC | FUNC_MGT_FORBID_STREAM_FUNC |
FUNC_MGT_FORBID_SYSTABLE_FUNC,
.translateFunc = translateIratePartial,
.getEnvFunc = getIrateFuncEnv,
.initFunc = irateFuncSetup,
.processFunc = irateFunction,
.sprocessFunc = irateScalarFunction,
.finalizeFunc = iratePartialFinalize
},
{
.name = "_irate_merge",
.type = FUNCTION_TYPE_IRATE_MERGE,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC | FUNC_MGT_FORBID_STREAM_FUNC |
FUNC_MGT_FORBID_SYSTABLE_FUNC,
.translateFunc = translateIrateMerge,
.getEnvFunc = getIrateFuncEnv,
.initFunc = irateFuncSetup,
.processFunc = irateFunctionMerge,
.sprocessFunc = irateScalarFunction,
.finalizeFunc = irateFinalize
},
{

View File

@ -5768,6 +5768,8 @@ int32_t derivativeFunction(SqlFunctionCtx* pCtx) {
return TSDB_CODE_SUCCESS;
}
int32_t getIrateInfoSize() { return (int32_t)sizeof(SRateInfo); }
bool getIrateFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
pEnv->calcMemSize = sizeof(SRateInfo);
return true;
@ -5868,6 +5870,85 @@ static double doCalcRate(const SRateInfo* pRateInfo, double tickPerSec) {
return (duration > 0) ? ((double)diff) / (duration / tickPerSec) : 0.0;
}
static void irateTransferInfoImpl(TSKEY inputKey, SRateInfo* pInput, SRateInfo* pOutput) {
if (inputKey > pOutput->lastKey) {
pOutput->firstKey = pOutput->lastKey;
pOutput->lastKey = pInput->firstKey;
pOutput->firstValue = pOutput->lastValue;
pOutput->lastValue = pInput->firstValue;
} else if ((inputKey < pOutput->lastKey) && (inputKey > pOutput->firstKey)) {
pOutput->firstKey = pOutput->lastKey;
pOutput->firstValue = pOutput->lastValue;
} else {
// inputKey < pOutput->firstKey
}
}
static int32_t irateTransferInfo(SRateInfo* pInput, SRateInfo* pOutput) {
pOutput->hasResult = pInput->hasResult;
if (pInput->firstKey == pOutput->firstKey || pInput->firstKey == pOutput->lastKey ||
pInput->lastKey == pOutput->firstKey || pInput->lastKey == pOutput->lastKey) {
return TSDB_CODE_FUNC_DUP_TIMESTAMP;
}
if (pInput->firstKey != INT64_MIN) {
irateTransferInfoImpl(pInput->firstKey, pInput, pOutput);
}
if (pInput->lastKey != INT64_MIN) {
irateTransferInfoImpl(pInput->lastKey, pInput, pOutput);
}
return TSDB_CODE_SUCCESS;
}
int32_t irateFunctionMerge(SqlFunctionCtx* pCtx) {
SInputColumnInfoData* pInput = &pCtx->input;
SColumnInfoData* pCol = pInput->pData[0];
if (pCol->info.type != TSDB_DATA_TYPE_BINARY) {
return TSDB_CODE_FUNC_FUNTION_PARA_TYPE;
}
SRateInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
int32_t start = pInput->startRowIndex;
for (int32_t i = start; i < start + pInput->numOfRows; ++i) {
char* data = colDataGetData(pCol, i);
SRateInfo* pInputInfo = (SRateInfo*)varDataVal(data);
if (pInputInfo->hasResult) {
int32_t code = irateTransferInfo(pInputInfo, pInfo);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
}
if (pInfo->hasResult) {
GET_RES_INFO(pCtx)->numOfRes = 1;
}
return TSDB_CODE_SUCCESS;
}
int32_t iratePartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
SRateInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
int32_t resultBytes = getIrateInfoSize();
char* res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char));
memcpy(varDataVal(res), pInfo, resultBytes);
varDataSetLen(res, resultBytes);
int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
colDataSetVal(pCol, pBlock->info.rows, res, false);
taosMemoryFree(res);
return pResInfo->numOfRes;
}
int32_t irateFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);