Merge remote-tracking branch 'origin/feat/TS-4243-3.0' into feat/TS-4243-3.0

This commit is contained in:
Haojun Liao 2024-03-26 09:50:20 +08:00
commit cedca30416
3 changed files with 64 additions and 27 deletions

View File

@ -144,7 +144,7 @@ int32_t irateFunction(SqlFunctionCtx* pCtx);
int32_t irateFunctionMerge(SqlFunctionCtx* pCtx); int32_t irateFunctionMerge(SqlFunctionCtx* pCtx);
int32_t irateFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); int32_t irateFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
int32_t iratePartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); int32_t iratePartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
int32_t getIrateInfoSize(); int32_t getIrateInfoSize(int32_t pkBytes);
int32_t cachedLastRowFunction(SqlFunctionCtx* pCtx); int32_t cachedLastRowFunction(SqlFunctionCtx* pCtx);

View File

@ -1638,7 +1638,8 @@ static int32_t translateIrateImpl(SFunctionNode* pFunc, char* pErrBuf, int32_t l
if (!IS_NUMERIC_TYPE(colType)) { if (!IS_NUMERIC_TYPE(colType)) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
} }
pFunc->node.resType = (SDataType){.bytes = getIrateInfoSize() + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}; int32_t pkBytes = (pFunc->hasPk) ? pFunc->pkBytes : 0;
pFunc->node.resType = (SDataType){.bytes = getIrateInfoSize(pkBytes) + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY};
} else { } else {
if (1 != LIST_LENGTH(pFunc->pParameterList)) { if (1 != LIST_LENGTH(pFunc->pParameterList)) {
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName); return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);

View File

@ -269,6 +269,12 @@ typedef struct SRateInfo {
double lastValue; double lastValue;
TSKEY lastKey; TSKEY lastKey;
int8_t hasResult; // flag to denote has value int8_t hasResult; // flag to denote has value
char* firstPk;
char* lastPk;
int8_t pkType;
int32_t pkBytes;
char pkData[];
} SRateInfo; } SRateInfo;
typedef struct SGroupKeyInfo { typedef struct SGroupKeyInfo {
@ -6165,10 +6171,11 @@ int32_t derivativeFunction(SqlFunctionCtx* pCtx) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t getIrateInfoSize() { return (int32_t)sizeof(SRateInfo); } int32_t getIrateInfoSize(int32_t pkBytes) { return (int32_t)sizeof(SRateInfo) + 2 * pkBytes; }
bool getIrateFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv) { bool getIrateFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
pEnv->calcMemSize = sizeof(SRateInfo); int32_t pkBytes = (pFunc->hasPk) ? pFunc->pkBytes : 0;
pEnv->calcMemSize = getIrateInfoSize(pkBytes);
return true; return true;
} }
@ -6188,6 +6195,36 @@ bool irateFuncSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) {
return true; return true;
} }
static void doSaveRateInfo(SRateInfo* pRateInfo, bool isFirst, int64_t ts, char* pk, double v) {
if (isFirst) {
pRateInfo->firstValue = v;
pRateInfo->firstKey = ts;
if (pRateInfo->firstPk) {
int32_t pkBytes = IS_VAR_DATA_TYPE(pRateInfo->pkType) ? varDataTLen(pk) : pRateInfo->pkBytes;
memcpy(pRateInfo->firstPk, pk, pkBytes);
}
} else {
pRateInfo->lastValue = v;
pRateInfo->lastKey = ts;
if (pRateInfo->lastPk) {
int32_t pkBytes = IS_VAR_DATA_TYPE(pRateInfo->pkType) ? varDataTLen(pk) : pRateInfo->pkBytes;
memcpy(pRateInfo->lastPk, pk, pkBytes);
}
}
}
static void initializeRateInfo(SqlFunctionCtx* pCtx, SRateInfo* pRateInfo) {
if (pCtx->hasPrimaryKey) {
pRateInfo->pkType = pCtx->input.pPrimaryKey->info.type;
pRateInfo->pkBytes = pCtx->input.pPrimaryKey->info.bytes;
pRateInfo->firstPk = pRateInfo->pkData;
pRateInfo->lastPk = pRateInfo->pkData + pRateInfo->pkBytes;
} else {
pRateInfo->firstPk = NULL;
pRateInfo->lastPk = NULL;
}
}
int32_t irateFunction(SqlFunctionCtx* pCtx) { int32_t irateFunction(SqlFunctionCtx* pCtx) {
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
SRateInfo* pRateInfo = GET_ROWCELL_INTERBUF(pResInfo); SRateInfo* pRateInfo = GET_ROWCELL_INTERBUF(pResInfo);
@ -6198,6 +6235,8 @@ int32_t irateFunction(SqlFunctionCtx* pCtx) {
SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput; SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput;
funcInputUpdate(pCtx); funcInputUpdate(pCtx);
initializeRateInfo(pCtx, pRateInfo);
int32_t numOfElems = 0; int32_t numOfElems = 0;
int32_t type = pInputCol->info.type; int32_t type = pInputCol->info.type;
@ -6212,21 +6251,16 @@ int32_t irateFunction(SqlFunctionCtx* pCtx) {
GET_TYPED_DATA(v, double, type, data); GET_TYPED_DATA(v, double, type, data);
if (INT64_MIN == pRateInfo->lastKey) { if (INT64_MIN == pRateInfo->lastKey) {
pRateInfo->lastValue = v; doSaveRateInfo(pRateInfo, false, row.ts, row.pPk, v);
pRateInfo->lastKey = row.ts;
pRateInfo->hasResult = 1; pRateInfo->hasResult = 1;
continue; continue;
} }
if (row.ts > pRateInfo->lastKey) { if (row.ts > pRateInfo->lastKey) {
if ((INT64_MIN == pRateInfo->firstKey) || pRateInfo->lastKey > pRateInfo->firstKey) { if ((INT64_MIN == pRateInfo->firstKey) || pRateInfo->lastKey > pRateInfo->firstKey) {
pRateInfo->firstValue = pRateInfo->lastValue; doSaveRateInfo(pRateInfo, true, pRateInfo->lastKey, pRateInfo->lastPk, pRateInfo->lastValue);
pRateInfo->firstKey = pRateInfo->lastKey;
} }
doSaveRateInfo(pRateInfo, false, row.ts, row.pPk, v);
pRateInfo->lastValue = v;
pRateInfo->lastKey = row.ts;
continue; continue;
} else if (row.ts == pRateInfo->lastKey) { } else if (row.ts == pRateInfo->lastKey) {
return TSDB_CODE_FUNC_DUP_TIMESTAMP; return TSDB_CODE_FUNC_DUP_TIMESTAMP;
@ -6234,8 +6268,7 @@ int32_t irateFunction(SqlFunctionCtx* pCtx) {
if ((INT64_MIN == pRateInfo->firstKey) || row.ts > pRateInfo->firstKey) { if ((INT64_MIN == pRateInfo->firstKey) || row.ts > pRateInfo->firstKey) {
pRateInfo->firstValue = v; doSaveRateInfo(pRateInfo, true, row.ts, row.pPk, v);
pRateInfo->firstKey = row.ts;
} else if (row.ts == pRateInfo->firstKey) { } else if (row.ts == pRateInfo->firstKey) {
return TSDB_CODE_FUNC_DUP_TIMESTAMP; return TSDB_CODE_FUNC_DUP_TIMESTAMP;
} }
@ -6271,25 +6304,26 @@ static double doCalcRate(const SRateInfo* pRateInfo, double tickPerSec) {
static void irateTransferInfoImpl(TSKEY inputKey, SRateInfo* pInput, SRateInfo* pOutput, bool isFirstKey) { static void irateTransferInfoImpl(TSKEY inputKey, SRateInfo* pInput, SRateInfo* pOutput, bool isFirstKey) {
if (inputKey > pOutput->lastKey) { if (inputKey > pOutput->lastKey) {
pOutput->firstKey = pOutput->lastKey; doSaveRateInfo(pOutput, true, pOutput->lastKey, pOutput->lastPk, pOutput->lastValue);
pOutput->firstValue = pOutput->lastValue; if (isFirstKey) {
doSaveRateInfo(pOutput, false, pInput->firstKey, pInput->firstPk, pInput->firstValue);
pOutput->lastKey = isFirstKey ? pInput->firstKey : pInput->lastKey; } else {
pOutput->lastValue = isFirstKey ? pInput->firstValue : pInput->lastValue; doSaveRateInfo(pOutput, false, pInput->lastKey, pInput->lastPk, pInput->lastValue);
}
} else if ((inputKey < pOutput->lastKey) && (inputKey > pOutput->firstKey)) { } else if ((inputKey < pOutput->lastKey) && (inputKey > pOutput->firstKey)) {
pOutput->firstKey = isFirstKey ? pInput->firstKey : pInput->lastKey; if (isFirstKey) {
pOutput->firstValue = isFirstKey ? pInput->firstValue : pInput->lastValue; doSaveRateInfo(pOutput, true, pInput->firstKey, pInput->firstPk, pInput->firstValue);
} else {
doSaveRateInfo(pOutput, true, pInput->lastKey, pInput->lastPk, pInput->lastValue);
}
} else { } else {
// inputKey < pOutput->firstKey // inputKey < pOutput->firstKey
} }
} }
static void irateCopyInfo(SRateInfo* pInput, SRateInfo* pOutput) { static void irateCopyInfo(SRateInfo* pInput, SRateInfo* pOutput) {
pOutput->firstKey = pInput->firstKey; doSaveRateInfo(pOutput, true, pInput->firstKey, pInput->firstPk, pInput->firstValue);
pOutput->lastKey = pInput->lastKey; doSaveRateInfo(pOutput, false, pInput->lastKey, pInput->lastPk, pInput->lastValue);
pOutput->firstValue = pInput->firstValue;
pOutput->lastValue = pInput->lastValue;
} }
static int32_t irateTransferInfo(SRateInfo* pInput, SRateInfo* pOutput) { static int32_t irateTransferInfo(SRateInfo* pInput, SRateInfo* pOutput) {
@ -6324,11 +6358,13 @@ int32_t irateFunctionMerge(SqlFunctionCtx* pCtx) {
} }
SRateInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); SRateInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
initializeRateInfo(pCtx, pInfo);
int32_t start = pInput->startRowIndex; int32_t start = pInput->startRowIndex;
for (int32_t i = start; i < start + pInput->numOfRows; ++i) { for (int32_t i = start; i < start + pInput->numOfRows; ++i) {
char* data = colDataGetData(pCol, i); char* data = colDataGetData(pCol, i);
SRateInfo* pInputInfo = (SRateInfo*)varDataVal(data); SRateInfo* pInputInfo = (SRateInfo*)varDataVal(data);
initializeRateInfo(pCtx, pInfo);
if (pInputInfo->hasResult) { if (pInputInfo->hasResult) {
int32_t code = irateTransferInfo(pInputInfo, pInfo); int32_t code = irateTransferInfo(pInputInfo, pInfo);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
@ -6347,7 +6383,7 @@ int32_t irateFunctionMerge(SqlFunctionCtx* pCtx) {
int32_t iratePartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { int32_t iratePartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
SRateInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); SRateInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
int32_t resultBytes = getIrateInfoSize(); int32_t resultBytes = getIrateInfoSize(pInfo->pkBytes);
char* res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char)); char* res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char));
memcpy(varDataVal(res), pInfo, resultBytes); memcpy(varDataVal(res), pInfo, resultBytes);