diff --git a/include/common/tcommon.h b/include/common/tcommon.h index dd55722513..0c4e534734 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -45,6 +45,12 @@ enum { STREAM_TRIGGER__BY_EVENT_TIME, }; +typedef enum EStreamType { + STREAM_NORMAL = 1, + STREAM_INVERT, + STREAM_INVALID, +} EStreamType; + typedef struct { uint32_t numOfTables; SArray* pGroupList; @@ -71,6 +77,7 @@ typedef struct SDataBlockInfo { int16_t numOfCols; int16_t hasVarCol; int32_t capacity; + EStreamType type; } SDataBlockInfo; typedef struct SSDataBlock { diff --git a/include/libs/function/functionMgt.h b/include/libs/function/functionMgt.h index 1fa9db9927..4a37283ee5 100644 --- a/include/libs/function/functionMgt.h +++ b/include/libs/function/functionMgt.h @@ -168,6 +168,9 @@ EFuncDataRequired fmFuncDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWin int32_t fmGetFuncExecFuncs(int32_t funcId, SFuncExecFuncs* pFpSet); int32_t fmGetScalarFuncExecFuncs(int32_t funcId, SScalarFuncExecFuncs* pFpSet); int32_t fmGetUdafExecFuncs(int32_t funcId, SFuncExecFuncs* pFpSet); +int32_t fmSetInvertFunc(int32_t funcId, SFuncExecFuncs* pFpSet); +int32_t fmSetNormalFunc(int32_t funcId, SFuncExecFuncs* pFpSet); +bool fmIsInvertible(int32_t funcId); #ifdef __cplusplus } diff --git a/include/libs/stream/tstreamUpdate.h b/include/libs/stream/tstreamUpdate.h index 5911759fe3..398851a09f 100644 --- a/include/libs/stream/tstreamUpdate.h +++ b/include/libs/stream/tstreamUpdate.h @@ -36,7 +36,7 @@ typedef struct SUpdateInfo { SUpdateInfo *updateInfoInitP(SInterval* pInterval, int64_t watermark); SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t watermark); -bool isUpdated(SUpdateInfo *pInfo, tb_uid_t tableId, TSKEY ts); +bool updateInfoIsUpdated(SUpdateInfo *pInfo, tb_uid_t tableId, TSKEY ts); void updateInfoDestroy(SUpdateInfo *pInfo); #ifdef __cplusplus diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 3ed65f4a05..cd7f5237bb 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -38,6 +38,7 @@ extern "C" { #include "tlockfree.h" #include "tmsg.h" #include "tpagedbuf.h" +#include "tstreamUpdate.h" #include "vnode.h" #include "executorInt.h" @@ -386,6 +387,9 @@ typedef struct SStreamBlockScanInfo { void* readerHandle; // stream block reader handle SArray* pColMatchInfo; // SNode* pCondition; + SArray* tsArray; + SUpdateInfo* pUpdateInfo; + int32_t primaryTsIndex; // primary time stamp slot id } SStreamBlockScanInfo; typedef struct SSysTableScanInfo { @@ -446,6 +450,7 @@ typedef struct SIntervalAggOperatorInfo { SArray* pUpdatedWindow; // updated time window due to the input data block from the downstream operator. STimeWindowAggSupp twAggSup; struct SFillInfo* pFillInfo; // fill info + bool invertible; } SIntervalAggOperatorInfo; typedef struct SAggOperatorInfo { diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index c0ea54ce4a..a1f150d3bf 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -198,6 +198,7 @@ SSDataBlock* createResDataBlock(SDataBlockDescNode* pNode) { pBlock->info.blockId = pNode->dataBlockId; pBlock->info.rowSize = pNode->totalRowSize; // todo ?? + pBlock->info.type = STREAM_INVALID; for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData idata = {{0}}; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 2b94c5fdce..72f285231b 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -512,6 +512,25 @@ static void doClearBufferedBlocks(SStreamBlockScanInfo* pInfo) { taosArrayClear(pInfo->pBlockLists); } +static SSDataBlock* getUpdateDataBlock(SStreamBlockScanInfo* pInfo) { + SColumnInfoData* pColDataInfo = taosArrayGet(pInfo->pRes->pDataBlock, pInfo->primaryTsIndex); + TSKEY* ts = (TSKEY*)pColDataInfo->pData; + for (int32_t i = 0; i < pInfo->pRes->info.rows; i++) { + if (updateInfoIsUpdated(pInfo->pUpdateInfo, pInfo->pRes->info.uid, ts[i])) { + taosArrayPush(pInfo->tsArray, ts+i); + } + } + if (taosArrayGetSize(pInfo->tsArray) > 0) { + //TODO(liuyao) get from tsdb + // SSDataBlock* p = createOneDataBlock(pInfo->pRes, true); + // p->info.type = STREAM_INVERT; + // taosArrayClear(pInfo->tsArray); + // return p; + return NULL; + } + return NULL; +} + static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) { // NOTE: this operator does never check if current status is done or not SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; @@ -523,8 +542,8 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) { return NULL; } + size_t total = taosArrayGetSize(pInfo->pBlockLists); if (pInfo->blockType == STREAM_DATA_TYPE_SSDATA_BLOCK) { - size_t total = taosArrayGetSize(pInfo->pBlockLists); if (pInfo->validBlockIndex >= total) { doClearBufferedBlocks(pInfo); pOperator->status = OP_EXEC_DONE; @@ -534,6 +553,14 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) { int32_t current = pInfo->validBlockIndex++; return taosArrayGetP(pInfo->pBlockLists, current); } else { + if (total > 0) { + ASSERT(total == 2); + SSDataBlock* pRes = taosArrayGetP(pInfo->pBlockLists, 0); + SSDataBlock* pUpRes = taosArrayGetP(pInfo->pBlockLists, 1); + blockDataDestroy(pUpRes); + taosArrayClear(pInfo->pBlockLists); + return pRes; + } SDataBlockInfo* pBlockInfo = &pInfo->pRes->info; blockDataCleanup(pInfo->pRes); @@ -554,6 +581,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) { pInfo->pRes->info.groupId = groupId; pInfo->pRes->info.rows = numOfRows; pInfo->pRes->info.uid = uid; + pInfo->pRes->info.type = STREAM_NORMAL; int32_t numOfCols = pInfo->pRes->info.numOfCols; for (int32_t i = 0; i < numOfCols; ++i) { @@ -598,6 +626,13 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) { if (rows == 0) { pOperator->status = OP_EXEC_DONE; + } else { + SSDataBlock* upRes = getUpdateDataBlock(pInfo); + if (upRes) { + taosArrayPush(pInfo->pBlockLists, &(pInfo->pRes)); + taosArrayPush(pInfo->pBlockLists, &upRes); + return upRes; + } } return (rows == 0) ? NULL : pInfo->pRes; @@ -636,6 +671,21 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock* goto _error; } + pInfo->tsArray = taosArrayInit(4, sizeof(TSKEY)); + if (pInfo->tsArray == NULL) { + taosMemoryFreeClear(pInfo); + taosMemoryFreeClear(pOperator); + return NULL; + } + + pInfo->primaryTsIndex = 0; //TODO(liuyao) get it from physical plan + pInfo->pUpdateInfo = updateInfoInit(60000, 0, 100); //TODO(liuyao) get it from physical plan + if (pInfo->pUpdateInfo == NULL) { + taosMemoryFreeClear(pInfo); + taosMemoryFreeClear(pOperator); + return NULL; + } + pInfo->readerHandle = streamReadHandle; pInfo->pRes = pResBlock; pInfo->pCondition = pCondition; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 224f3db912..5c609303ff 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1,6 +1,7 @@ #include "ttime.h" #include "tdatablock.h" #include "executorimpl.h" +#include "functionMgt.h" typedef enum SResultTsInterpType { RESULT_ROW_START_INTERP = 1, @@ -979,6 +980,15 @@ static void finalizeUpdatedResult(int32_t numOfOutput, SDiskbasedBuf* pBuf, SArr releaseBufPage(pBuf, bufPage); } } +static void setInverFunction(SqlFunctionCtx* pCtx, int32_t num, EStreamType type) { + for ( int i = 0; i < num; i++) { + if (type == STREAM_INVERT) { + fmSetInvertFunc(pCtx[i].functionId, &(pCtx[i].fpSet)); + } else if (type == STREAM_NORMAL){ + fmSetNormalFunc(pCtx[i].functionId, &(pCtx[i].fpSet)); + } + } +} static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { SIntervalAggOperatorInfo* pInfo = pOperator->info; @@ -1016,6 +1026,9 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { // setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->pCtx, pOperator->numOfExprs); // the pDataBlock are always the same one, no need to call this again setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order, true); + if (pInfo->invertible) { + setInverFunction(pInfo->binfo.pCtx, pOperator->numOfExprs, pBlock->info.type); + } pUpdated = hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, 0); } @@ -1043,6 +1056,15 @@ void destroyIntervalOperatorInfo(void* param, int32_t numOfOutput) { cleanupAggSup(&pInfo->aggSup); } +bool allInvertible(SqlFunctionCtx* pFCtx, int32_t numOfCols) { + for (int32_t i = 0; i < numOfCols; i++) { + if (!fmIsInvertible(pFCtx[i].functionId)) { + return false; + } + } + return true; +} + SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId, STimeWindowAggSupp* pTwAggSupp, const STableGroupInfo* pTableGroupInfo, @@ -1068,6 +1090,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, pResBlock, keyBufSize, pTaskInfo->id.str); initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pInfo->win); + pInfo->invertible = allInvertible(pInfo->binfo.pCtx, numOfCols); // pInfo->pTableQueryInfo = initTableQueryInfo(pTableGroupInfo); if (code != TSDB_CODE_SUCCESS /* || pInfo->pTableQueryInfo == NULL*/) { diff --git a/source/libs/function/inc/builtins.h b/source/libs/function/inc/builtins.h index 54a68715a8..3a753325bd 100644 --- a/source/libs/function/inc/builtins.h +++ b/source/libs/function/inc/builtins.h @@ -36,6 +36,7 @@ typedef struct SBuiltinFuncDefinition { FExecProcess processFunc; FScalarExecProcess sprocessFunc; FExecFinalize finalizeFunc; + FExecProcess invertFunc; } SBuiltinFuncDefinition; extern const SBuiltinFuncDefinition funcMgtBuiltins[]; diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h index abb9525cc5..1f2ad0797d 100644 --- a/source/libs/function/inc/builtinsimpl.h +++ b/source/libs/function/inc/builtinsimpl.h @@ -30,10 +30,12 @@ int32_t functionFinalizeWithResultBuf(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, EFuncDataRequired countDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWindow); bool getCountFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); int32_t countFunction(SqlFunctionCtx *pCtx); +int32_t countInvertFunction(SqlFunctionCtx *pCtx); EFuncDataRequired statisDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWindow); bool getSumFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); int32_t sumFunction(SqlFunctionCtx *pCtx); +int32_t sumInvertFunction(SqlFunctionCtx *pCtx); bool minFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); bool maxFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); @@ -45,11 +47,13 @@ bool getAvgFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool avgFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); int32_t avgFunction(SqlFunctionCtx* pCtx); int32_t avgFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); +int32_t avgInvertFunction(SqlFunctionCtx* pCtx); bool getStddevFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool stddevFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); int32_t stddevFunction(SqlFunctionCtx* pCtx); int32_t stddevFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); +int32_t stddevInvertFunction(SqlFunctionCtx* pCtx); bool getPercentileFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool percentileFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index b22a9866c3..9e2bdea7d5 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -473,7 +473,8 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .getEnvFunc = getCountFuncEnv, .initFunc = functionSetup, .processFunc = countFunction, - .finalizeFunc = functionFinalize + .finalizeFunc = functionFinalize, + .invertFunc = countInvertFunction }, { .name = "sum", @@ -484,7 +485,8 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .getEnvFunc = getSumFuncEnv, .initFunc = functionSetup, .processFunc = sumFunction, - .finalizeFunc = functionFinalize + .finalizeFunc = functionFinalize, + .invertFunc = sumInvertFunction }, { .name = "min", @@ -516,7 +518,8 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .getEnvFunc = getStddevFuncEnv, .initFunc = stddevFunctionSetup, .processFunc = stddevFunction, - .finalizeFunc = stddevFinalize + .finalizeFunc = stddevFinalize, + .invertFunc = stddevInvertFunction }, { .name = "avg", @@ -526,7 +529,8 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .getEnvFunc = getAvgFuncEnv, .initFunc = avgFunctionSetup, .processFunc = avgFunction, - .finalizeFunc = avgFinalize + .finalizeFunc = avgFinalize, + .invertFunc = avgInvertFunction }, { .name = "percentile", diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 0808ec58b0..008f83e935 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -209,11 +209,7 @@ bool getCountFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) { return true; } -/* - * count function does need the finalize, if data is missing, the default value, which is 0, is used - * count function does not use the pCtx->interResBuf to keep the intermediate buffer - */ -int32_t countFunction(SqlFunctionCtx* pCtx) { +static FORCE_INLINE int32_t getNumofElem(SqlFunctionCtx* pCtx) { int32_t numOfElem = 0; /* @@ -240,7 +236,14 @@ int32_t countFunction(SqlFunctionCtx* pCtx) { numOfElem = pInput->numOfRows; } } - + return numOfElem; +} +/* + * count function does need the finalize, if data is missing, the default value, which is 0, is used + * count function does not use the pCtx->interResBuf to keep the intermediate buffer + */ +int32_t countFunction(SqlFunctionCtx* pCtx) { + int32_t numOfElem = getNumofElem(pCtx); SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); char* buf = GET_ROWCELL_INTERBUF(pResInfo); *((int64_t*)buf) += numOfElem; @@ -249,6 +252,17 @@ int32_t countFunction(SqlFunctionCtx* pCtx) { return TSDB_CODE_SUCCESS; } +int32_t countInvertFunction(SqlFunctionCtx* pCtx) { + int32_t numOfElem = getNumofElem(pCtx); + + SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); + char* buf = GET_ROWCELL_INTERBUF(pResInfo); + *((int64_t*)buf) -= numOfElem; + + SET_VAL(pResInfo, *((int64_t*)buf), 1); + return TSDB_CODE_SUCCESS; +} + #define LIST_ADD_N(_res, _col, _start, _rows, _t, numOfElem) \ do { \ _t* d = (_t*)(_col->pData); \ @@ -261,6 +275,18 @@ int32_t countFunction(SqlFunctionCtx* pCtx) { } \ } while (0) +#define LIST_SUB_N(_res, _col, _start, _rows, _t, numOfElem) \ + do { \ + _t* d = (_t*)(_col->pData); \ + for (int32_t i = (_start); i < (_rows) + (_start); ++i) { \ + if (((_col)->hasNull) && colDataIsNull_f((_col)->nullbitmap, i)) { \ + continue; \ + }; \ + (_res) -= (d)[i]; \ + (numOfElem)++; \ + } \ + } while (0) + int32_t sumFunction(SqlFunctionCtx* pCtx) { int32_t numOfElem = 0; @@ -320,6 +346,65 @@ int32_t sumFunction(SqlFunctionCtx* pCtx) { return TSDB_CODE_SUCCESS; } +int32_t sumInvertFunction(SqlFunctionCtx* pCtx) { + int32_t numOfElem = 0; + + // Only the pre-computing information loaded and actual data does not loaded + SInputColumnInfoData* pInput = &pCtx->input; + SColumnDataAgg* pAgg = pInput->pColumnDataAgg[0]; + int32_t type = pInput->pData[0]->info.type; + + SSumRes* pSumRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); + + if (pInput->colDataAggIsSet) { + numOfElem = pInput->numOfRows - pAgg->numOfNull; + ASSERT(numOfElem >= 0); + + if (IS_SIGNED_NUMERIC_TYPE(type)) { + pSumRes->isum -= pAgg->sum; + } else if (IS_UNSIGNED_NUMERIC_TYPE(type)) { + pSumRes->usum -= pAgg->sum; + } else if (IS_FLOAT_TYPE(type)) { + pSumRes->dsum -= GET_DOUBLE_VAL((const char*)&(pAgg->sum)); + } + } else { // computing based on the true data block + SColumnInfoData* pCol = pInput->pData[0]; + + int32_t start = pInput->startRowIndex; + int32_t numOfRows = pInput->numOfRows; + + if (IS_SIGNED_NUMERIC_TYPE(type) || type == TSDB_DATA_TYPE_BOOL) { + if (type == TSDB_DATA_TYPE_TINYINT || type == TSDB_DATA_TYPE_BOOL) { + LIST_SUB_N(pSumRes->isum, pCol, start, numOfRows, int8_t, numOfElem); + } else if (type == TSDB_DATA_TYPE_SMALLINT) { + LIST_SUB_N(pSumRes->isum, pCol, start, numOfRows, int16_t, numOfElem); + } else if (type == TSDB_DATA_TYPE_INT) { + LIST_SUB_N(pSumRes->isum, pCol, start, numOfRows, int32_t, numOfElem); + } else if (type == TSDB_DATA_TYPE_BIGINT) { + LIST_SUB_N(pSumRes->isum, pCol, start, numOfRows, int64_t, numOfElem); + } + } else if (IS_UNSIGNED_NUMERIC_TYPE(type)) { + if (type == TSDB_DATA_TYPE_UTINYINT) { + LIST_SUB_N(pSumRes->usum, pCol, start, numOfRows, uint8_t, numOfElem); + } else if (type == TSDB_DATA_TYPE_USMALLINT) { + LIST_SUB_N(pSumRes->usum, pCol, start, numOfRows, uint16_t, numOfElem); + } else if (type == TSDB_DATA_TYPE_UINT) { + LIST_SUB_N(pSumRes->usum, pCol, start, numOfRows, uint32_t, numOfElem); + } else if (type == TSDB_DATA_TYPE_UBIGINT) { + LIST_SUB_N(pSumRes->usum, pCol, start, numOfRows, uint64_t, numOfElem); + } + } else if (type == TSDB_DATA_TYPE_DOUBLE) { + LIST_SUB_N(pSumRes->dsum, pCol, start, numOfRows, double, numOfElem); + } else if (type == TSDB_DATA_TYPE_FLOAT) { + LIST_SUB_N(pSumRes->dsum, pCol, start, numOfRows, float, numOfElem); + } + } + + // data in the check operation are all null, not output + SET_VAL(GET_RES_INFO(pCtx), numOfElem, 1); + return TSDB_CODE_SUCCESS; +} + bool getSumFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) { pEnv->calcMemSize = sizeof(SSumRes); return true; @@ -451,6 +536,69 @@ int32_t avgFunction(SqlFunctionCtx* pCtx) { return TSDB_CODE_SUCCESS; } +#define LIST_AVG_N(sumT, T) \ + do { \ + T* plist = (T*)pCol->pData; \ + for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) { \ + if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) { \ + continue; \ + } \ + \ + numOfElem += 1; \ + pAvgRes->count -= 1; \ + sumT -= plist[i]; \ + } \ + } while (0) + +int32_t avgInvertFunction(SqlFunctionCtx* pCtx) { + int32_t numOfElem = 0; + + // Only the pre-computing information loaded and actual data does not loaded + SInputColumnInfoData* pInput = &pCtx->input; + int32_t type = pInput->pData[0]->info.type; + + SAvgRes* pAvgRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); + + // computing based on the true data block + SColumnInfoData* pCol = pInput->pData[0]; + + int32_t start = pInput->startRowIndex; + int32_t numOfRows = pInput->numOfRows; + + switch (type) { + case TSDB_DATA_TYPE_TINYINT: { + LIST_AVG_N(pAvgRes->sum.isum, int8_t); + break; + } + case TSDB_DATA_TYPE_SMALLINT: { + LIST_AVG_N(pAvgRes->sum.isum, int16_t); + break; + } + case TSDB_DATA_TYPE_INT: { + LIST_AVG_N(pAvgRes->sum.isum, int32_t); + break; + } + case TSDB_DATA_TYPE_BIGINT: { + LIST_AVG_N(pAvgRes->sum.isum, int64_t); + break; + } + case TSDB_DATA_TYPE_FLOAT: { + LIST_AVG_N(pAvgRes->sum.dsum, float); + break; + } + case TSDB_DATA_TYPE_DOUBLE: { + LIST_AVG_N(pAvgRes->sum.dsum, double); + break; + } + default: + break; + } + + // data in the check operation are all null, not output + SET_VAL(GET_RES_INFO(pCtx), numOfElem, 1); + return TSDB_CODE_SUCCESS; +} + int32_t avgFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { SInputColumnInfoData* pInput = &pCtx->input; int32_t type = pInput->pData[0]->info.type; @@ -885,6 +1033,69 @@ int32_t stddevFunction(SqlFunctionCtx* pCtx) { return TSDB_CODE_SUCCESS; } +#define LIST_STDDEV_SUB_N(sumT, T) \ + do { \ + T* plist = (T*)pCol->pData; \ + for (int32_t i = start; i < numOfRows + start; ++i) { \ + if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) { \ + continue; \ + } \ + numOfElem += 1; \ + pStddevRes->count -= 1; \ + sumT -= plist[i]; \ + pStddevRes->quadraticISum -= plist[i] * plist[i]; \ + } \ + } while (0) + +int32_t stddevInvertFunction(SqlFunctionCtx* pCtx) { + int32_t numOfElem = 0; + + // Only the pre-computing information loaded and actual data does not loaded + SInputColumnInfoData* pInput = &pCtx->input; + int32_t type = pInput->pData[0]->info.type; + + SStddevRes* pStddevRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); + + // computing based on the true data block + SColumnInfoData* pCol = pInput->pData[0]; + + int32_t start = pInput->startRowIndex; + int32_t numOfRows = pInput->numOfRows; + + switch (type) { + case TSDB_DATA_TYPE_TINYINT: { + LIST_STDDEV_SUB_N(pStddevRes->isum, int8_t); + break; + } + case TSDB_DATA_TYPE_SMALLINT: { + LIST_STDDEV_SUB_N(pStddevRes->isum, int16_t); + break; + } + case TSDB_DATA_TYPE_INT: { + LIST_STDDEV_SUB_N(pStddevRes->isum, int32_t); + break; + } + case TSDB_DATA_TYPE_BIGINT: { + LIST_STDDEV_SUB_N(pStddevRes->isum, int64_t); + break; + } + case TSDB_DATA_TYPE_FLOAT: { + LIST_STDDEV_SUB_N(pStddevRes->dsum, float); + break; + } + case TSDB_DATA_TYPE_DOUBLE: { + LIST_STDDEV_SUB_N(pStddevRes->dsum, double); + break; + } + default: + break; + } + + // data in the check operation are all null, not output + SET_VAL(GET_RES_INFO(pCtx), numOfElem, 1); + return TSDB_CODE_SUCCESS; +} + int32_t stddevFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { SInputColumnInfoData* pInput = &pCtx->input; int32_t type = pInput->pData[0]->info.type; diff --git a/source/libs/function/src/functionMgt.c b/source/libs/function/src/functionMgt.c index f8ef0f7d20..46bbb33aa7 100644 --- a/source/libs/function/src/functionMgt.c +++ b/source/libs/function/src/functionMgt.c @@ -177,3 +177,35 @@ void fmFuncMgtDestroy() { taosHashCleanup(m); } } + +int32_t fmSetInvertFunc(int32_t funcId, SFuncExecFuncs* pFpSet) { + if (fmIsUserDefinedFunc(funcId) || funcId < 0 || funcId >= funcMgtBuiltinsNum) { + return TSDB_CODE_FAILED; + } + pFpSet->process = funcMgtBuiltins[funcId].invertFunc; + return TSDB_CODE_SUCCESS; +} + +int32_t fmSetNormalFunc(int32_t funcId, SFuncExecFuncs* pFpSet) { + if (fmIsUserDefinedFunc(funcId) || funcId < 0 || funcId >= funcMgtBuiltinsNum) { + return TSDB_CODE_FAILED; + } + pFpSet->process = funcMgtBuiltins[funcId].processFunc; + return TSDB_CODE_SUCCESS; +} + +bool fmIsInvertible(int32_t funcId) { + bool res = false; + switch (funcMgtBuiltins[funcId].type) { + case FUNCTION_TYPE_COUNT: + case FUNCTION_TYPE_SUM: + case FUNCTION_TYPE_STDDEV: + case FUNCTION_TYPE_AVG: + res = true; + break; + default: + break; + } + return res; +} + diff --git a/source/libs/stream/src/tstreamUpdate.c b/source/libs/stream/src/tstreamUpdate.c index 850a17b6dd..1197b6100a 100644 --- a/source/libs/stream/src/tstreamUpdate.c +++ b/source/libs/stream/src/tstreamUpdate.c @@ -138,7 +138,7 @@ static SScalableBf* getSBf(SUpdateInfo *pInfo, TSKEY ts) { return res; } -bool isUpdated(SUpdateInfo *pInfo, tb_uid_t tableId, TSKEY ts) { +bool updateInfoIsUpdated(SUpdateInfo *pInfo, tb_uid_t tableId, TSKEY ts) { int32_t res = TSDB_CODE_FAILED; uint64_t index = ((uint64_t)tableId) % pInfo->numBuckets; SScalableBf* pSBf = getSBf(pInfo, ts); diff --git a/source/libs/stream/test/tstreamUpdateTest.cpp b/source/libs/stream/test/tstreamUpdateTest.cpp index 32ed974f72..c1e4e2bec1 100644 --- a/source/libs/stream/test/tstreamUpdateTest.cpp +++ b/source/libs/stream/test/tstreamUpdateTest.cpp @@ -9,40 +9,40 @@ TEST(TD_STREAM_UPDATE_TEST, update) { int64_t interval = 20 * 1000; int64_t watermark = 10 * 60 * 1000; SUpdateInfo *pSU = updateInfoInit(interval, TSDB_TIME_PRECISION_MILLI, watermark); - GTEST_ASSERT_EQ(isUpdated(pSU,1, 0), true); - GTEST_ASSERT_EQ(isUpdated(pSU,1, -1), true); + GTEST_ASSERT_EQ(updateInfoIsUpdated(pSU,1, 0), true); + GTEST_ASSERT_EQ(updateInfoIsUpdated(pSU,1, -1), true); for(int i=0; i < 1024; i++) { - GTEST_ASSERT_EQ(isUpdated(pSU,i, 1), false); + GTEST_ASSERT_EQ(updateInfoIsUpdated(pSU,i, 1), false); } for(int i=0; i < 1024; i++) { - GTEST_ASSERT_EQ(isUpdated(pSU,i, 1), true); + GTEST_ASSERT_EQ(updateInfoIsUpdated(pSU,i, 1), true); } for(int i=0; i < 1024; i++) { - GTEST_ASSERT_EQ(isUpdated(pSU,i, 2), false); + GTEST_ASSERT_EQ(updateInfoIsUpdated(pSU,i, 2), false); } for(int i=0; i < 1024; i++) { - GTEST_ASSERT_EQ(isUpdated(pSU,i, 2), true); + GTEST_ASSERT_EQ(updateInfoIsUpdated(pSU,i, 2), true); } for(int i=0; i < 1024; i++) { - GTEST_ASSERT_EQ(isUpdated(pSU,i, 1), true); + GTEST_ASSERT_EQ(updateInfoIsUpdated(pSU,i, 1), true); } for(int i=3; i < 1024; i++) { - GTEST_ASSERT_EQ(isUpdated(pSU,0, i), false); + GTEST_ASSERT_EQ(updateInfoIsUpdated(pSU,0, i), false); } GTEST_ASSERT_EQ(*(int64_t*)taosArrayGet(pSU->pTsBuckets,0), 1023); for(int i=3; i < 1024; i++) { - GTEST_ASSERT_EQ(isUpdated(pSU,0, i), true); + GTEST_ASSERT_EQ(updateInfoIsUpdated(pSU,0, i), true); } GTEST_ASSERT_EQ(*(int64_t*)taosArrayGet(pSU->pTsBuckets,0), 1023); SUpdateInfo *pSU1 = updateInfoInit(interval, TSDB_TIME_PRECISION_MILLI, watermark); for(int i=1; i <= watermark / interval; i++) { - GTEST_ASSERT_EQ(isUpdated(pSU1, 1, i * interval + 5), false); + GTEST_ASSERT_EQ(updateInfoIsUpdated(pSU1, 1, i * interval + 5), false); GTEST_ASSERT_EQ(pSU1->minTS, interval); GTEST_ASSERT_EQ(pSU1->numSBFs, watermark / interval); } @@ -53,7 +53,7 @@ TEST(TD_STREAM_UPDATE_TEST, update) { } for(int i= watermark / interval + 1, j = 2 ; i <= watermark / interval + 10; i++,j++) { - GTEST_ASSERT_EQ(isUpdated(pSU1, 1, i * interval + 5), false); + GTEST_ASSERT_EQ(updateInfoIsUpdated(pSU1, 1, i * interval + 5), false); GTEST_ASSERT_EQ(pSU1->minTS, interval*j); GTEST_ASSERT_EQ(pSU1->numSBFs, watermark / interval); SScalableBf *pSBF = (SScalableBf *)taosArrayGetP(pSU1->pTsSBFs, pSU1->numSBFs - 1); @@ -62,16 +62,16 @@ TEST(TD_STREAM_UPDATE_TEST, update) { } for(int i= watermark / interval * 100, j = 0; j < 10; i+= (watermark / interval * 2), j++) { - GTEST_ASSERT_EQ(isUpdated(pSU1, 1, i * interval + 5), false); + GTEST_ASSERT_EQ(updateInfoIsUpdated(pSU1, 1, i * interval + 5), false); GTEST_ASSERT_EQ(pSU1->minTS, (i-(pSU1->numSBFs-1))*interval); GTEST_ASSERT_EQ(pSU1->numSBFs, watermark / interval); } SUpdateInfo *pSU2 = updateInfoInit(interval, TSDB_TIME_PRECISION_MILLI, watermark); - GTEST_ASSERT_EQ(isUpdated(pSU2, 1, 1 * interval + 5), false); + GTEST_ASSERT_EQ(updateInfoIsUpdated(pSU2, 1, 1 * interval + 5), false); GTEST_ASSERT_EQ(pSU2->minTS, interval); for(int i= watermark / interval * 100, j = 0; j < 10; i+= (watermark / interval * 10), j++) { - GTEST_ASSERT_EQ(isUpdated(pSU2, 1, i * interval + 5), false); + GTEST_ASSERT_EQ(updateInfoIsUpdated(pSU2, 1, i * interval + 5), false); GTEST_ASSERT_EQ(pSU2->minTS, (i-(pSU2->numSBFs-1))*interval); GTEST_ASSERT_EQ(pSU2->numSBFs, watermark / interval); GTEST_ASSERT_EQ(*(int64_t*)taosArrayGet(pSU2->pTsBuckets,1), i * interval + 5); @@ -80,7 +80,7 @@ TEST(TD_STREAM_UPDATE_TEST, update) { SUpdateInfo *pSU3 = updateInfoInit(interval, TSDB_TIME_PRECISION_MILLI, watermark); for(int j = 1; j < 100; j++) { for(int i = 0; i < pSU3->numSBFs; i++) { - GTEST_ASSERT_EQ(isUpdated(pSU3, i, i * interval + 5 * j), false); + GTEST_ASSERT_EQ(updateInfoIsUpdated(pSU3, i, i * interval + 5 * j), false); GTEST_ASSERT_EQ(pSU3->minTS, 0); GTEST_ASSERT_EQ(pSU3->numSBFs, watermark / interval); GTEST_ASSERT_EQ(*(int64_t*)taosArrayGet(pSU3->pTsBuckets, i), i * interval + 5 * j);