fix(query): add mode function
This commit is contained in:
parent
515109088f
commit
f9e80a8e2b
|
@ -2130,10 +2130,10 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
.type = FUNCTION_TYPE_MODE,
|
||||
.classification = FUNC_MGT_AGG_FUNC,
|
||||
.translateFunc = translateMode,
|
||||
.getEnvFunc = getUniqueFuncEnv,
|
||||
.initFunc = uniqueFunctionSetup,
|
||||
.processFunc = uniqueFunction,
|
||||
.finalizeFunc = functionFinalize,
|
||||
.getEnvFunc = getModeFuncEnv,
|
||||
.initFunc = modeFunctionSetup,
|
||||
.processFunc = modeFunction,
|
||||
.finalizeFunc = modeFinalize,
|
||||
},
|
||||
{
|
||||
.name = "abs",
|
||||
|
|
|
@ -32,6 +32,7 @@
|
|||
#define TAIL_MAX_OFFSET 100
|
||||
|
||||
#define UNIQUE_MAX_RESULT_SIZE (1024 * 1024 * 10)
|
||||
#define MODE_MAX_RESULT_SIZE UNIQUE_MAX_RESULT_SIZE
|
||||
|
||||
#define HLL_BUCKET_BITS 14 // The bits of the bucket
|
||||
#define HLL_DATA_BITS (64 - HLL_BUCKET_BITS)
|
||||
|
@ -246,6 +247,19 @@ typedef struct SUniqueInfo {
|
|||
char pItems[];
|
||||
} SUniqueInfo;
|
||||
|
||||
typedef struct SModeItem {
|
||||
int64_t count;
|
||||
char data[];
|
||||
} SModeItem;
|
||||
|
||||
typedef struct SModeInfo {
|
||||
int32_t numOfPoints;
|
||||
uint8_t colType;
|
||||
int16_t colBytes;
|
||||
SHashObj* pHash;
|
||||
char pItems[];
|
||||
} SModeInfo;
|
||||
|
||||
typedef struct SDerivInfo {
|
||||
double prevValue; // previous value
|
||||
TSKEY prevTs; // previous timestamp
|
||||
|
@ -4694,21 +4708,99 @@ int32_t uniqueFunction(SqlFunctionCtx* pCtx) {
|
|||
return pInfo->numOfPoints;
|
||||
}
|
||||
|
||||
int32_t uniqueFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||
bool getModeFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
|
||||
pEnv->calcMemSize = sizeof(SModeInfo) + MODE_MAX_RESULT_SIZE;
|
||||
return true;
|
||||
}
|
||||
|
||||
bool modeFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) {
|
||||
if (!functionSetup(pCtx, pResInfo)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
SModeInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo);
|
||||
pInfo->numOfPoints = 0;
|
||||
pInfo->colType = pCtx->resDataInfo.type;
|
||||
pInfo->colBytes = pCtx->resDataInfo.bytes;
|
||||
if (pInfo->pHash != NULL) {
|
||||
taosHashClear(pInfo->pHash);
|
||||
} else {
|
||||
pInfo->pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
static void doModeAdd(SModeInfo* pInfo, char* data, bool isNull) {
|
||||
// ignore null elements
|
||||
if (isNull) {
|
||||
return;
|
||||
}
|
||||
|
||||
int32_t hashKeyBytes = IS_VAR_DATA_TYPE(pInfo->colType) ? varDataTLen(data) : pInfo->colBytes;
|
||||
SModeItem* pHashItem = taosHashGet(pInfo->pHash, data, hashKeyBytes);
|
||||
if (pHashItem == NULL) {
|
||||
int32_t size = sizeof(SModeItem) + pInfo->colBytes;
|
||||
SModeItem* pItem = (SModeItem*)(pInfo->pItems + pInfo->numOfPoints * size);
|
||||
memcpy(pItem->data, data, pInfo->colBytes);
|
||||
pItem->count += 1;
|
||||
|
||||
taosHashPut(pInfo->pHash, data, hashKeyBytes, (char*)pItem, sizeof(SModeItem*));
|
||||
pInfo->numOfPoints++;
|
||||
} else {
|
||||
pHashItem->count += 1;
|
||||
}
|
||||
}
|
||||
|
||||
int32_t modeFunction(SqlFunctionCtx* pCtx) {
|
||||
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||
SUniqueInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo);
|
||||
SModeInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo);
|
||||
|
||||
SInputColumnInfoData* pInput = &pCtx->input;
|
||||
|
||||
SColumnInfoData* pInputCol = pInput->pData[0];
|
||||
SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput;
|
||||
|
||||
int32_t startOffset = pCtx->offset;
|
||||
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) {
|
||||
char* data = colDataGetData(pInputCol, i);
|
||||
doModeAdd(pInfo, data, colDataIsNull_s(pInputCol, i));
|
||||
|
||||
if (sizeof(SModeInfo) + pInfo->numOfPoints * (sizeof(SModeItem) + pInfo->colBytes) >= MODE_MAX_RESULT_SIZE) {
|
||||
taosHashCleanup(pInfo->pHash);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t modeFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||
SModeInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo);
|
||||
int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
|
||||
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
|
||||
int32_t currentRow = pBlock->info.rows;
|
||||
|
||||
for (int32_t i = 0; i < pResInfo->numOfRes; ++i) {
|
||||
SUniqueItem* pItem = (SUniqueItem*)(pInfo->pItems + i * (sizeof(SUniqueItem) + pInfo->colBytes));
|
||||
colDataAppend(pCol, i, pItem->data, false);
|
||||
// TODO: handle ts output
|
||||
int32_t resIndex;
|
||||
int32_t maxCount = 0;
|
||||
for (int32_t i = 0; i < pInfo->numOfPoints; ++i) {
|
||||
SModeItem* pItem = (SModeItem*)(pInfo->pItems + i * (sizeof(SModeItem) + pInfo->colBytes));
|
||||
if (pItem->count > maxCount) {
|
||||
maxCount = pItem->count;
|
||||
resIndex = i;
|
||||
} else if (pItem->count == maxCount) {
|
||||
colDataAppendNULL(pCol, currentRow);
|
||||
return pResInfo->numOfRes;
|
||||
}
|
||||
}
|
||||
|
||||
SModeItem* pResItem = (SModeItem*)(pInfo->pItems + resIndex * (sizeof(SModeItem) + pInfo->colBytes));
|
||||
colDataAppend(pCol, currentRow, pResItem->data, false);
|
||||
|
||||
return pResInfo->numOfRes;
|
||||
}
|
||||
|
||||
|
||||
bool getTwaFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
|
||||
pEnv->calcMemSize = sizeof(STwaInfo);
|
||||
return true;
|
||||
|
|
Loading…
Reference in New Issue