Merge pull request #14309 from taosdata/feat/mode_function
feat(query): support mode function
This commit is contained in:
commit
75da5ec070
|
@ -25,7 +25,7 @@ extern "C" {
|
|||
// clang-format off
|
||||
|
||||
#define TAOS_DEF_ERROR_CODE(mod, code) ((int32_t)((0x80000000 | ((mod)<<16) | (code))))
|
||||
|
||||
|
||||
#define TAOS_SYSTEM_ERROR(code) (0x80ff0000 | (code))
|
||||
#define TAOS_SUCCEEDED(err) ((err) >= 0)
|
||||
#define TAOS_FAILED(err) ((err) < 0)
|
||||
|
@ -35,7 +35,7 @@ const char* terrstr();
|
|||
|
||||
int32_t* taosGetErrno();
|
||||
#define terrno (*taosGetErrno())
|
||||
|
||||
|
||||
#define TSDB_CODE_SUCCESS 0
|
||||
#define TSDB_CODE_FAILED -1 // unknown or needn't tell detail error
|
||||
|
||||
|
|
|
@ -191,6 +191,11 @@ bool getUniqueFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
|||
bool uniqueFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
||||
int32_t uniqueFunction(SqlFunctionCtx *pCtx);
|
||||
|
||||
bool getModeFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||
bool modeFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
||||
int32_t modeFunction(SqlFunctionCtx *pCtx);
|
||||
int32_t modeFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
||||
|
||||
bool getTwaFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||
bool twaFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
||||
int32_t twaFunction(SqlFunctionCtx *pCtx);
|
||||
|
|
|
@ -1045,20 +1045,28 @@ static int32_t translateFirstLastMerge(SFunctionNode* pFunc, char* pErrBuf, int3
|
|||
return translateFirstLastImpl(pFunc, pErrBuf, len, false);
|
||||
}
|
||||
|
||||
static int32_t translateUnique(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||
static int32_t translateUniqueMode(SFunctionNode* pFunc, char* pErrBuf, int32_t len, bool isUnique) {
|
||||
if (1 != LIST_LENGTH(pFunc->pParameterList)) {
|
||||
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
|
||||
}
|
||||
|
||||
SNode* pPara = nodesListGetNode(pFunc->pParameterList, 0);
|
||||
if (!nodesExprHasColumn(pPara)) {
|
||||
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR, "The parameters of UNIQUE must contain columns");
|
||||
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR, "The parameters of %s must contain columns", isUnique ? "UNIQUE" : "MODE");
|
||||
}
|
||||
|
||||
pFunc->node.resType = ((SExprNode*)pPara)->resType;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t translateUnique(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||
return translateUniqueMode(pFunc, pErrBuf, len, true);
|
||||
}
|
||||
|
||||
static int32_t translateMode(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||
return translateUniqueMode(pFunc, pErrBuf, len, false);
|
||||
}
|
||||
|
||||
static int32_t translateDiff(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||
int32_t numOfParams = LIST_LENGTH(pFunc->pParameterList);
|
||||
if (numOfParams == 0 || numOfParams > 2) {
|
||||
|
@ -2109,7 +2117,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
{
|
||||
.name = "unique",
|
||||
.type = FUNCTION_TYPE_UNIQUE,
|
||||
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_TIMELINE_FUNC |
|
||||
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_TIMELINE_FUNC |
|
||||
FUNC_MGT_FORBID_STREAM_FUNC | FUNC_MGT_FORBID_WINDOW_FUNC | FUNC_MGT_FORBID_GROUP_BY_FUNC,
|
||||
.translateFunc = translateUnique,
|
||||
.getEnvFunc = getUniqueFuncEnv,
|
||||
|
@ -2117,6 +2125,16 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
.processFunc = uniqueFunction,
|
||||
.finalizeFunc = NULL
|
||||
},
|
||||
{
|
||||
.name = "mode",
|
||||
.type = FUNCTION_TYPE_MODE,
|
||||
.classification = FUNC_MGT_AGG_FUNC,
|
||||
.translateFunc = translateMode,
|
||||
.getEnvFunc = getModeFuncEnv,
|
||||
.initFunc = modeFunctionSetup,
|
||||
.processFunc = modeFunction,
|
||||
.finalizeFunc = modeFinalize,
|
||||
},
|
||||
{
|
||||
.name = "abs",
|
||||
.type = FUNCTION_TYPE_ABS,
|
||||
|
|
|
@ -32,6 +32,7 @@
|
|||
#define TAIL_MAX_OFFSET 100
|
||||
|
||||
#define UNIQUE_MAX_RESULT_SIZE (1024 * 1024 * 10)
|
||||
#define MODE_MAX_RESULT_SIZE UNIQUE_MAX_RESULT_SIZE
|
||||
|
||||
#define HLL_BUCKET_BITS 14 // The bits of the bucket
|
||||
#define HLL_DATA_BITS (64 - HLL_BUCKET_BITS)
|
||||
|
@ -246,6 +247,19 @@ typedef struct SUniqueInfo {
|
|||
char pItems[];
|
||||
} SUniqueInfo;
|
||||
|
||||
typedef struct SModeItem {
|
||||
int64_t count;
|
||||
char data[];
|
||||
} SModeItem;
|
||||
|
||||
typedef struct SModeInfo {
|
||||
int32_t numOfPoints;
|
||||
uint8_t colType;
|
||||
int16_t colBytes;
|
||||
SHashObj* pHash;
|
||||
char pItems[];
|
||||
} SModeInfo;
|
||||
|
||||
typedef struct SDerivInfo {
|
||||
double prevValue; // previous value
|
||||
TSKEY prevTs; // previous timestamp
|
||||
|
@ -4694,21 +4708,100 @@ int32_t uniqueFunction(SqlFunctionCtx* pCtx) {
|
|||
return pInfo->numOfPoints;
|
||||
}
|
||||
|
||||
int32_t uniqueFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||
bool getModeFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
|
||||
pEnv->calcMemSize = sizeof(SModeInfo) + MODE_MAX_RESULT_SIZE;
|
||||
return true;
|
||||
}
|
||||
|
||||
bool modeFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) {
|
||||
if (!functionSetup(pCtx, pResInfo)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
SModeInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo);
|
||||
pInfo->numOfPoints = 0;
|
||||
pInfo->colType = pCtx->resDataInfo.type;
|
||||
pInfo->colBytes = pCtx->resDataInfo.bytes;
|
||||
if (pInfo->pHash != NULL) {
|
||||
taosHashClear(pInfo->pHash);
|
||||
} else {
|
||||
pInfo->pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
static void doModeAdd(SModeInfo* pInfo, char* data, bool isNull) {
|
||||
// ignore null elements
|
||||
if (isNull) {
|
||||
return;
|
||||
}
|
||||
|
||||
int32_t hashKeyBytes = IS_VAR_DATA_TYPE(pInfo->colType) ? varDataTLen(data) : pInfo->colBytes;
|
||||
SModeItem** pHashItem = taosHashGet(pInfo->pHash, data, hashKeyBytes);
|
||||
if (pHashItem == NULL) {
|
||||
int32_t size = sizeof(SModeItem) + pInfo->colBytes;
|
||||
SModeItem* pItem = (SModeItem*)(pInfo->pItems + pInfo->numOfPoints * size);
|
||||
memcpy(pItem->data, data, pInfo->colBytes);
|
||||
pItem->count += 1;
|
||||
|
||||
taosHashPut(pInfo->pHash, data, hashKeyBytes, &pItem, sizeof(SModeItem*));
|
||||
pInfo->numOfPoints++;
|
||||
} else {
|
||||
(*pHashItem)->count += 1;
|
||||
}
|
||||
}
|
||||
|
||||
int32_t modeFunction(SqlFunctionCtx* pCtx) {
|
||||
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||
SUniqueInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo);
|
||||
SModeInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo);
|
||||
|
||||
SInputColumnInfoData* pInput = &pCtx->input;
|
||||
|
||||
SColumnInfoData* pInputCol = pInput->pData[0];
|
||||
SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput;
|
||||
|
||||
int32_t startOffset = pCtx->offset;
|
||||
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) {
|
||||
char* data = colDataGetData(pInputCol, i);
|
||||
doModeAdd(pInfo, data, colDataIsNull_s(pInputCol, i));
|
||||
|
||||
if (sizeof(SModeInfo) + pInfo->numOfPoints * (sizeof(SModeItem) + pInfo->colBytes) >= MODE_MAX_RESULT_SIZE) {
|
||||
taosHashCleanup(pInfo->pHash);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
|
||||
SET_VAL(pResInfo, 1, 1);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t modeFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||
SModeInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo);
|
||||
int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
|
||||
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
|
||||
int32_t currentRow = pBlock->info.rows;
|
||||
|
||||
for (int32_t i = 0; i < pResInfo->numOfRes; ++i) {
|
||||
SUniqueItem* pItem = (SUniqueItem*)(pInfo->pItems + i * (sizeof(SUniqueItem) + pInfo->colBytes));
|
||||
colDataAppend(pCol, i, pItem->data, false);
|
||||
// TODO: handle ts output
|
||||
int32_t resIndex;
|
||||
int32_t maxCount = 0;
|
||||
for (int32_t i = 0; i < pInfo->numOfPoints; ++i) {
|
||||
SModeItem* pItem = (SModeItem*)(pInfo->pItems + i * (sizeof(SModeItem) + pInfo->colBytes));
|
||||
if (pItem->count > maxCount) {
|
||||
maxCount = pItem->count;
|
||||
resIndex = i;
|
||||
} else if (pItem->count == maxCount) {
|
||||
resIndex = -1;
|
||||
}
|
||||
}
|
||||
|
||||
SModeItem* pResItem = (SModeItem*)(pInfo->pItems + resIndex * (sizeof(SModeItem) + pInfo->colBytes));
|
||||
colDataAppend(pCol, currentRow, pResItem->data, (resIndex == -1) ? true : false);
|
||||
|
||||
return pResInfo->numOfRes;
|
||||
}
|
||||
|
||||
|
||||
bool getTwaFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
|
||||
pEnv->calcMemSize = sizeof(STwaInfo);
|
||||
return true;
|
||||
|
|
Loading…
Reference in New Issue