enh(query): block distribution query is supported.
This commit is contained in:
parent
5bb16c3073
commit
c55aae2c0c
|
@ -159,19 +159,25 @@ typedef struct SColumn {
|
||||||
} SColumn;
|
} SColumn;
|
||||||
|
|
||||||
typedef struct STableBlockDistInfo {
|
typedef struct STableBlockDistInfo {
|
||||||
uint16_t rowSize;
|
uint32_t rowSize;
|
||||||
uint16_t numOfFiles;
|
uint16_t numOfFiles;
|
||||||
uint32_t numOfTables;
|
uint32_t numOfTables;
|
||||||
|
uint32_t numOfBlocks;
|
||||||
uint64_t totalSize;
|
uint64_t totalSize;
|
||||||
uint64_t totalRows;
|
uint64_t totalRows;
|
||||||
int32_t maxRows;
|
int32_t maxRows;
|
||||||
int32_t minRows;
|
int32_t minRows;
|
||||||
|
int32_t defMinRows;
|
||||||
|
int32_t defMaxRows;
|
||||||
int32_t firstSeekTimeUs;
|
int32_t firstSeekTimeUs;
|
||||||
uint32_t numOfRowsInMemTable;
|
uint32_t numOfInmemRows;
|
||||||
uint32_t numOfSmallBlocks;
|
uint32_t numOfSmallBlocks;
|
||||||
SArray* dataBlockInfos;
|
int32_t blockRowsHisto[20];
|
||||||
} STableBlockDistInfo;
|
} STableBlockDistInfo;
|
||||||
|
|
||||||
|
int32_t tSerializeBlockDistInfo(void* buf, int32_t bufLen, const STableBlockDistInfo* pInfo);
|
||||||
|
int32_t tDeserializeBlockDistInfo(void* buf, int32_t bufLen, STableBlockDistInfo* pInfo);
|
||||||
|
|
||||||
enum {
|
enum {
|
||||||
FUNC_PARAM_TYPE_VALUE = 0x1,
|
FUNC_PARAM_TYPE_VALUE = 0x1,
|
||||||
FUNC_PARAM_TYPE_COLUMN = 0x2,
|
FUNC_PARAM_TYPE_COLUMN = 0x2,
|
||||||
|
|
|
@ -58,7 +58,6 @@ typedef struct SFileBlockInfo {
|
||||||
int32_t numBlocksOfStep;
|
int32_t numBlocksOfStep;
|
||||||
} SFileBlockInfo;
|
} SFileBlockInfo;
|
||||||
|
|
||||||
#define TSDB_BLOCK_DIST_STEP_ROWS 8
|
|
||||||
#define MAX_INTERVAL_TIME_WINDOW 1000000 // maximum allowed time windows in final results
|
#define MAX_INTERVAL_TIME_WINDOW 1000000 // maximum allowed time windows in final results
|
||||||
|
|
||||||
#define TOP_BOTTOM_QUERY_LIMIT 100
|
#define TOP_BOTTOM_QUERY_LIMIT 100
|
||||||
|
|
|
@ -121,6 +121,7 @@ typedef enum EFunctionType {
|
||||||
|
|
||||||
// internal function
|
// internal function
|
||||||
FUNCTION_TYPE_SELECT_VALUE,
|
FUNCTION_TYPE_SELECT_VALUE,
|
||||||
|
FUNCTION_TYPE_BLOCK_DIST, // block distribution aggregate function
|
||||||
|
|
||||||
// distributed splitting functions
|
// distributed splitting functions
|
||||||
FUNCTION_TYPE_APERCENTILE_PARTIAL,
|
FUNCTION_TYPE_APERCENTILE_PARTIAL,
|
||||||
|
|
|
@ -2557,6 +2557,10 @@ static void moveToNextDataBlockInCurrentFile(STsdbReadHandle* pTsdbReadHandle) {
|
||||||
cur->blockCompleted = false;
|
cur->blockCompleted = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t getBucketIndex(int32_t startRow, int32_t bucketRange, int32_t numOfRows) {
|
||||||
|
return (numOfRows - startRow) / bucketRange;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT* queryHandle, STableBlockDistInfo* pTableBlockInfo) {
|
int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT* queryHandle, STableBlockDistInfo* pTableBlockInfo) {
|
||||||
STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)queryHandle;
|
STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)queryHandle;
|
||||||
|
|
||||||
|
@ -2575,16 +2579,20 @@ int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT* queryHandle, STableBlockDistInfo*
|
||||||
tsdbFSIterSeek(&pTsdbReadHandle->fileIter, fid);
|
tsdbFSIterSeek(&pTsdbReadHandle->fileIter, fid);
|
||||||
tsdbUnLockFS(pFileHandle);
|
tsdbUnLockFS(pFileHandle);
|
||||||
|
|
||||||
|
STsdbCfg* pc = REPO_CFG(pTsdbReadHandle->pTsdb);
|
||||||
|
pTableBlockInfo->defMinRows = pc->minRows;
|
||||||
|
pTableBlockInfo->defMaxRows = pc->maxRows;
|
||||||
|
|
||||||
|
int32_t bucketRange = ceil((pc->maxRows - pc->minRows) / 20.0);
|
||||||
|
|
||||||
pTableBlockInfo->numOfFiles += 1;
|
pTableBlockInfo->numOfFiles += 1;
|
||||||
|
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
int32_t numOfBlocks = 0;
|
int32_t numOfBlocks = 0;
|
||||||
int32_t numOfTables = (int32_t)taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
|
int32_t numOfTables = (int32_t)taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
|
||||||
int defaultRows = 4096; // TSDB_DEFAULT_BLOCK_ROWS(pCfg->maxRowsPerFileBlock);
|
int defaultRows = 4096;
|
||||||
STimeWindow win = TSWINDOW_INITIALIZER;
|
STimeWindow win = TSWINDOW_INITIALIZER;
|
||||||
|
|
||||||
bool ascTraverse = ASCENDING_TRAVERSE(pTsdbReadHandle->order);
|
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
numOfBlocks = 0;
|
numOfBlocks = 0;
|
||||||
tsdbRLockFS(REPO_FS(pTsdbReadHandle->pTsdb));
|
tsdbRLockFS(REPO_FS(pTsdbReadHandle->pTsdb));
|
||||||
|
@ -2597,8 +2605,7 @@ int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT* queryHandle, STableBlockDistInfo*
|
||||||
tsdbGetFidKeyRange(pCfg->days, pCfg->precision, pTsdbReadHandle->pFileGroup->fid, &win.skey, &win.ekey);
|
tsdbGetFidKeyRange(pCfg->days, pCfg->precision, pTsdbReadHandle->pFileGroup->fid, &win.skey, &win.ekey);
|
||||||
|
|
||||||
// current file are not overlapped with query time window, ignore remain files
|
// current file are not overlapped with query time window, ignore remain files
|
||||||
if ((ascTraverse && win.skey > pTsdbReadHandle->window.ekey) ||
|
if ((win.skey > pTsdbReadHandle->window.ekey)/* || (!ascTraverse && win.ekey < pTsdbReadHandle->window.ekey)*/) {
|
||||||
(!ascTraverse && win.ekey < pTsdbReadHandle->window.ekey)) {
|
|
||||||
tsdbUnLockFS(REPO_FS(pTsdbReadHandle->pTsdb));
|
tsdbUnLockFS(REPO_FS(pTsdbReadHandle->pTsdb));
|
||||||
tsdbDebug("%p remain files are not qualified for qrange:%" PRId64 "-%" PRId64 ", ignore, %s", pTsdbReadHandle,
|
tsdbDebug("%p remain files are not qualified for qrange:%" PRId64 "-%" PRId64 ", ignore, %s", pTsdbReadHandle,
|
||||||
pTsdbReadHandle->window.skey, pTsdbReadHandle->window.ekey, pTsdbReadHandle->idStr);
|
pTsdbReadHandle->window.skey, pTsdbReadHandle->window.ekey, pTsdbReadHandle->idStr);
|
||||||
|
@ -2631,15 +2638,19 @@ int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT* queryHandle, STableBlockDistInfo*
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pTableBlockInfo->numOfBlocks += numOfBlocks;
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfTables; ++i) {
|
for (int32_t i = 0; i < numOfTables; ++i) {
|
||||||
STableCheckInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, i);
|
STableCheckInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, i);
|
||||||
|
|
||||||
SBlock* pBlock = pCheckInfo->pCompInfo->blocks;
|
SBlock* pBlock = pCheckInfo->pCompInfo->blocks;
|
||||||
|
|
||||||
for (int32_t j = 0; j < pCheckInfo->numOfBlocks; ++j) {
|
for (int32_t j = 0; j < pCheckInfo->numOfBlocks; ++j) {
|
||||||
pTableBlockInfo->totalSize += pBlock[j].len;
|
pTableBlockInfo->totalSize += pBlock[j].len;
|
||||||
|
|
||||||
int32_t numOfRows = pBlock[j].numOfRows;
|
int32_t numOfRows = pBlock[j].numOfRows;
|
||||||
pTableBlockInfo->totalRows += numOfRows;
|
pTableBlockInfo->totalRows += numOfRows;
|
||||||
|
|
||||||
if (numOfRows > pTableBlockInfo->maxRows) {
|
if (numOfRows > pTableBlockInfo->maxRows) {
|
||||||
pTableBlockInfo->maxRows = numOfRows;
|
pTableBlockInfo->maxRows = numOfRows;
|
||||||
}
|
}
|
||||||
|
@ -2651,13 +2662,14 @@ int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT* queryHandle, STableBlockDistInfo*
|
||||||
if (numOfRows < defaultRows) {
|
if (numOfRows < defaultRows) {
|
||||||
pTableBlockInfo->numOfSmallBlocks += 1;
|
pTableBlockInfo->numOfSmallBlocks += 1;
|
||||||
}
|
}
|
||||||
// int32_t stepIndex = (numOfRows-1)/TSDB_BLOCK_DIST_STEP_ROWS;
|
|
||||||
// SFileBlockInfo *blockInfo = (SFileBlockInfo*)taosArrayGet(pTableBlockInfo->dataBlockInfos, stepIndex);
|
int32_t bucketIndex = getBucketIndex(pTableBlockInfo->defMinRows, bucketRange, numOfRows);
|
||||||
// blockInfo->numBlocksOfStep++;
|
pTableBlockInfo->blockRowsHisto[bucketIndex]++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pTableBlockInfo->numOfTables = numOfTables;
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -449,7 +449,7 @@ int32_t mergeIntoGroupResult(SGroupResInfo* pGroupResInfo, STaskRuntimeEnv* pRun
|
||||||
// tbufWriteUint64(bw, pDist->totalRows);
|
// tbufWriteUint64(bw, pDist->totalRows);
|
||||||
// tbufWriteInt32(bw, pDist->maxRows);
|
// tbufWriteInt32(bw, pDist->maxRows);
|
||||||
// tbufWriteInt32(bw, pDist->minRows);
|
// tbufWriteInt32(bw, pDist->minRows);
|
||||||
// tbufWriteUint32(bw, pDist->numOfRowsInMemTable);
|
// tbufWriteUint32(bw, pDist->numOfInmemRows);
|
||||||
// tbufWriteUint32(bw, pDist->numOfSmallBlocks);
|
// tbufWriteUint32(bw, pDist->numOfSmallBlocks);
|
||||||
// tbufWriteUint64(bw, taosArrayGetSize(pDist->dataBlockInfos));
|
// tbufWriteUint64(bw, taosArrayGetSize(pDist->dataBlockInfos));
|
||||||
//
|
//
|
||||||
|
@ -488,7 +488,7 @@ int32_t mergeIntoGroupResult(SGroupResInfo* pGroupResInfo, STaskRuntimeEnv* pRun
|
||||||
// pDist->totalRows = tbufReadUint64(&br);
|
// pDist->totalRows = tbufReadUint64(&br);
|
||||||
// pDist->maxRows = tbufReadInt32(&br);
|
// pDist->maxRows = tbufReadInt32(&br);
|
||||||
// pDist->minRows = tbufReadInt32(&br);
|
// pDist->minRows = tbufReadInt32(&br);
|
||||||
// pDist->numOfRowsInMemTable = tbufReadUint32(&br);
|
// pDist->numOfInmemRows = tbufReadUint32(&br);
|
||||||
// pDist->numOfSmallBlocks = tbufReadUint32(&br);
|
// pDist->numOfSmallBlocks = tbufReadUint32(&br);
|
||||||
// int64_t numSteps = tbufReadUint64(&br);
|
// int64_t numSteps = tbufReadUint64(&br);
|
||||||
//
|
//
|
||||||
|
|
|
@ -641,29 +641,19 @@ static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
STableScanInfo* pTableScanInfo = pOperator->info;
|
STableScanInfo* pTableScanInfo = pOperator->info;
|
||||||
|
|
||||||
STableBlockDistInfo tableBlockDist = {0};
|
STableBlockDistInfo blockDistInfo = {0};
|
||||||
tableBlockDist.numOfTables = 1; // TODO set the correct number of tables
|
blockDistInfo.maxRows = INT_MIN;
|
||||||
|
blockDistInfo.minRows = INT_MAX;
|
||||||
|
|
||||||
int32_t numRowSteps = TSDB_DEFAULT_MAXROWS_FBLOCK / TSDB_BLOCK_DIST_STEP_ROWS;
|
tsdbGetFileBlocksDistInfo(pTableScanInfo->dataReader, &blockDistInfo);
|
||||||
if (TSDB_DEFAULT_MAXROWS_FBLOCK % TSDB_BLOCK_DIST_STEP_ROWS != 0) {
|
blockDistInfo.numOfInmemRows = (int32_t)tsdbGetNumOfRowsInMemTable(pTableScanInfo->dataReader);
|
||||||
++numRowSteps;
|
|
||||||
}
|
|
||||||
|
|
||||||
tableBlockDist.dataBlockInfos = taosArrayInit(numRowSteps, sizeof(SFileBlockInfo));
|
|
||||||
taosArraySetSize(tableBlockDist.dataBlockInfos, numRowSteps);
|
|
||||||
|
|
||||||
tableBlockDist.maxRows = INT_MIN;
|
|
||||||
tableBlockDist.minRows = INT_MAX;
|
|
||||||
|
|
||||||
tsdbGetFileBlocksDistInfo(pTableScanInfo->dataReader, &tableBlockDist);
|
|
||||||
tableBlockDist.numOfRowsInMemTable = (int32_t)tsdbGetNumOfRowsInMemTable(pTableScanInfo->dataReader);
|
|
||||||
|
|
||||||
SSDataBlock* pBlock = pTableScanInfo->pResBlock;
|
SSDataBlock* pBlock = pTableScanInfo->pResBlock;
|
||||||
pBlock->info.rows = 1;
|
pBlock->info.rows = 1;
|
||||||
pBlock->info.numOfCols = 1;
|
pBlock->info.numOfCols = 1;
|
||||||
|
|
||||||
// SBufferWriter bw = tbufInitWriter(NULL, false);
|
// SBufferWriter bw = tbufInitWriter(NULL, false);
|
||||||
// blockDistInfoToBinary(&tableBlockDist, &bw);
|
// blockDistInfoToBinary(&blockDistInfo, &bw);
|
||||||
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, 0);
|
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, 0);
|
||||||
|
|
||||||
// int32_t len = (int32_t) tbufTell(&bw);
|
// int32_t len = (int32_t) tbufTell(&bw);
|
||||||
|
@ -673,9 +663,6 @@ static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator) {
|
||||||
//
|
//
|
||||||
// tbufCloseWriter(&bw);
|
// tbufCloseWriter(&bw);
|
||||||
|
|
||||||
// SArray* g = GET_TABLEGROUP(pOperator->, 0);
|
|
||||||
// pOperator->pRuntimeEnv->current = taosArrayGetP(g, 0);
|
|
||||||
|
|
||||||
pOperator->status = OP_EXEC_DONE;
|
pOperator->status = OP_EXEC_DONE;
|
||||||
return pBlock;
|
return pBlock;
|
||||||
}
|
}
|
||||||
|
@ -688,24 +675,22 @@ SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SExecTaskInfo*
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
||||||
pInfo->dataReader = dataReader;
|
pInfo->dataReader = dataReader;
|
||||||
// pInfo->block.pDataBlock = taosArrayInit(1, sizeof(SColumnInfoData));
|
// pInfo->block.pDataBlock = taosArrayInit(1, sizeof(SColumnInfoData));
|
||||||
|
|
||||||
SColumnInfoData infoData = {0};
|
SColumnInfoData infoData = {0};
|
||||||
infoData.info.type = TSDB_DATA_TYPE_BINARY;
|
infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
|
||||||
infoData.info.bytes = 1024;
|
infoData.info.bytes = 1024;
|
||||||
infoData.info.colId = 0;
|
|
||||||
// taosArrayPush(pInfo->block.pDataBlock, &infoData);
|
// taosArrayPush(pInfo->block.pDataBlock, &infoData);
|
||||||
|
|
||||||
pOperator->name = "DataBlockInfoScanOperator";
|
pOperator->name = "DataBlockInfoScanOperator";
|
||||||
// pOperator->operatorType = OP_TableBlockInfoScan;
|
// pOperator->operatorType = OP_TableBlockInfoScan;
|
||||||
pOperator->blocking = false;
|
pOperator->blocking = false;
|
||||||
pOperator->status = OP_NOT_OPENED;
|
pOperator->status = OP_NOT_OPENED;
|
||||||
pOperator->fpSet._openFn = operatorDummyOpenFn;
|
pOperator->info = pInfo;
|
||||||
pOperator->fpSet.getNextFn = doBlockInfoScan;
|
pOperator->pTaskInfo = pTaskInfo;
|
||||||
|
|
||||||
pOperator->info = pInfo;
|
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doBlockInfoScan, NULL, NULL, NULL, NULL, NULL, NULL);
|
||||||
pOperator->pTaskInfo = pTaskInfo;
|
|
||||||
|
|
||||||
return pOperator;
|
return pOperator;
|
||||||
|
|
||||||
|
|
|
@ -140,17 +140,14 @@ int32_t mavgFunction(SqlFunctionCtx* pCtx);
|
||||||
bool getSampleFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
bool getSampleFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||||
bool sampleFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
bool sampleFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
||||||
int32_t sampleFunction(SqlFunctionCtx* pCtx);
|
int32_t sampleFunction(SqlFunctionCtx* pCtx);
|
||||||
//int32_t sampleFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
|
||||||
|
|
||||||
bool getTailFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
bool getTailFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||||
bool tailFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
bool tailFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
||||||
int32_t tailFunction(SqlFunctionCtx* pCtx);
|
int32_t tailFunction(SqlFunctionCtx* pCtx);
|
||||||
//int32_t tailFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
|
||||||
|
|
||||||
bool getUniqueFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
bool getUniqueFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||||
bool uniqueFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
bool uniqueFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
||||||
int32_t uniqueFunction(SqlFunctionCtx *pCtx);
|
int32_t uniqueFunction(SqlFunctionCtx *pCtx);
|
||||||
//int32_t uniqueFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
|
||||||
|
|
||||||
bool getTwaFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
bool getTwaFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||||
bool twaFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
bool twaFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
||||||
|
@ -158,6 +155,8 @@ int32_t twaFunction(SqlFunctionCtx *pCtx);
|
||||||
int32_t twaFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock);
|
int32_t twaFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock);
|
||||||
|
|
||||||
bool getSelectivityFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
bool getSelectivityFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||||
|
int32_t blockDistFunction(SqlFunctionCtx *pCtx);
|
||||||
|
int32_t blockDistFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -1137,6 +1137,12 @@ static int32_t translateSelectValue(SFunctionNode* pFunc, char* pErrBuf, int32_t
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t translateBlockDistFunc(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||||
|
pFunc->node.resType = (SDataType) {.bytes = 128, .type = TSDB_DATA_TYPE_VARCHAR};
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
// clang-format off
|
// clang-format off
|
||||||
const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
||||||
{
|
{
|
||||||
|
@ -1865,6 +1871,15 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
||||||
.initFunc = functionSetup,
|
.initFunc = functionSetup,
|
||||||
.processFunc = NULL,
|
.processFunc = NULL,
|
||||||
.finalizeFunc = NULL
|
.finalizeFunc = NULL
|
||||||
|
},
|
||||||
|
{
|
||||||
|
.name = "_block_dist",
|
||||||
|
.type = FUNCTION_TYPE_BLOCK_DIST,
|
||||||
|
.classification = FUNC_MGT_AGG_FUNC,
|
||||||
|
.translateFunc = translateBlockDistFunc,
|
||||||
|
.getEnvFunc = NULL,
|
||||||
|
.processFunc = blockDistFunction,
|
||||||
|
.finalizeFunc = blockDistFinalize
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
// clang-format on
|
// clang-format on
|
||||||
|
|
|
@ -4421,7 +4421,6 @@ int32_t twaFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock) {
|
||||||
if (pResInfo->numOfRes == 0) {
|
if (pResInfo->numOfRes == 0) {
|
||||||
pResInfo->isNullRes = 1;
|
pResInfo->isNullRes = 1;
|
||||||
} else {
|
} else {
|
||||||
// assert(pInfo->win.ekey == pInfo->p.key && pInfo->hasResult == pResInfo->hasResult);
|
|
||||||
if (pInfo->win.ekey == pInfo->win.skey) {
|
if (pInfo->win.ekey == pInfo->win.skey) {
|
||||||
pInfo->dOutput = pInfo->p.val;
|
pInfo->dOutput = pInfo->p.val;
|
||||||
} else {
|
} else {
|
||||||
|
@ -4434,3 +4433,129 @@ int32_t twaFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock) {
|
||||||
return functionFinalize(pCtx, pBlock);
|
return functionFinalize(pCtx, pBlock);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t blockDistFunction(SqlFunctionCtx *pCtx) {
|
||||||
|
SInputColumnInfoData* pInput = &pCtx->input;
|
||||||
|
SColumnInfoData* pInputCol = pInput->pData[0];
|
||||||
|
|
||||||
|
SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx);
|
||||||
|
|
||||||
|
char *pInfo = GET_ROWCELL_INTERBUF(pResInfo);
|
||||||
|
memcpy(pInfo, pInputCol->pData, varDataTLen(pInputCol->pData));
|
||||||
|
pResInfo->numOfRes = 1;
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tSerializeBlockDistInfo(void* buf, int32_t bufLen, const STableBlockDistInfo* pInfo) {
|
||||||
|
SEncoder encoder = {0};
|
||||||
|
tEncoderInit(&encoder, buf, bufLen);
|
||||||
|
|
||||||
|
if (tStartEncode(&encoder) < 0) return -1;
|
||||||
|
if (tEncodeU32(&encoder, pInfo->rowSize) < 0) return -1;
|
||||||
|
|
||||||
|
if (tEncodeU16(&encoder, pInfo->numOfFiles) < 0) return -1;
|
||||||
|
if (tEncodeU32(&encoder, pInfo->rowSize) < 0) return -1;
|
||||||
|
if (tEncodeU32(&encoder, pInfo->numOfTables) < 0) return -1;
|
||||||
|
|
||||||
|
if (tEncodeU64(&encoder, pInfo->totalSize) < 0) return -1;
|
||||||
|
if (tEncodeU64(&encoder, pInfo->totalRows) < 0) return -1;
|
||||||
|
if (tEncodeI32(&encoder, pInfo->maxRows) < 0) return -1;
|
||||||
|
if (tEncodeI32(&encoder, pInfo->minRows) < 0) return -1;
|
||||||
|
if (tEncodeI32(&encoder, pInfo->defMaxRows) < 0) return -1;
|
||||||
|
if (tEncodeI32(&encoder, pInfo->defMinRows) < 0) return -1;
|
||||||
|
if (tEncodeU32(&encoder, pInfo->numOfInmemRows) < 0) return -1;
|
||||||
|
if (tEncodeU32(&encoder, pInfo->numOfSmallBlocks) < 0) return -1;
|
||||||
|
|
||||||
|
for(int32_t i = 0; i < tListLen(pInfo->blockRowsHisto); ++i) {
|
||||||
|
if (tEncodeI32(&encoder, pInfo->blockRowsHisto[i]) < 0) return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
tEndEncode(&encoder);
|
||||||
|
|
||||||
|
int32_t tlen = encoder.pos;
|
||||||
|
tEncoderClear(&encoder);
|
||||||
|
return tlen;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tDeserializeBlockDistInfo(void* buf, int32_t bufLen, STableBlockDistInfo* pInfo) {
|
||||||
|
SDecoder decoder = {0};
|
||||||
|
tDecoderInit(&decoder, buf, bufLen);
|
||||||
|
|
||||||
|
if (tStartDecode(&decoder) < 0) return -1;
|
||||||
|
if (tDecodeU32(&decoder, &pInfo->rowSize) < 0) return -1;
|
||||||
|
|
||||||
|
if (tDecodeU16(&decoder, &pInfo->numOfFiles) < 0) return -1;
|
||||||
|
if (tDecodeU32(&decoder, &pInfo->rowSize) < 0) return -1;
|
||||||
|
if (tDecodeU32(&decoder, &pInfo->numOfTables) < 0) return -1;
|
||||||
|
|
||||||
|
if (tDecodeU64(&decoder, &pInfo->totalSize) < 0) return -1;
|
||||||
|
if (tDecodeU64(&decoder, &pInfo->totalRows) < 0) return -1;
|
||||||
|
if (tDecodeI32(&decoder, &pInfo->maxRows) < 0) return -1;
|
||||||
|
if (tDecodeI32(&decoder, &pInfo->minRows) < 0) return -1;
|
||||||
|
if (tDecodeI32(&decoder, &pInfo->defMaxRows) < 0) return -1;
|
||||||
|
if (tDecodeI32(&decoder, &pInfo->defMinRows) < 0) return -1;
|
||||||
|
if (tDecodeU32(&decoder, &pInfo->numOfInmemRows) < 0) return -1;
|
||||||
|
if (tDecodeU32(&decoder, &pInfo->numOfSmallBlocks) < 0) return -1;
|
||||||
|
|
||||||
|
for(int32_t i = 0; i < tListLen(pInfo->blockRowsHisto); ++i) {
|
||||||
|
if (tDecodeI32(&decoder, &pInfo->blockRowsHisto[i]) < 0) return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
tDecoderClear(&decoder);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t blockDistFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||||
|
SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx);
|
||||||
|
|
||||||
|
STableBlockDistInfo info = {0};
|
||||||
|
char *pData = GET_ROWCELL_INTERBUF(pResInfo);
|
||||||
|
|
||||||
|
tDeserializeBlockDistInfo(varDataVal(pData), varDataLen(pData), &info);
|
||||||
|
|
||||||
|
int32_t step = (info.defMaxRows - info.defMinRows) / 50;
|
||||||
|
|
||||||
|
// convert to string results
|
||||||
|
char st[128] = {0};
|
||||||
|
sprintf(st, "Blocks=[%d] Size=[%.3fKb] Average_Block_size=[%.3fKb] Compression_Ratio=[%.3f]", info.numOfBlocks,
|
||||||
|
info.totalSize/1024.0,
|
||||||
|
info.totalSize/(info.numOfBlocks*1024.0),
|
||||||
|
info.totalSize/(info.totalRows*info.rowSize*1.0)
|
||||||
|
);
|
||||||
|
|
||||||
|
sprintf(st, "Total_Rows=[%"PRId64"] MinRows=[%d] MaxRows=[%d] Averge_Rows=[%"PRId64"] Inmem_Rows=[%d]",
|
||||||
|
info.totalRows,
|
||||||
|
info.minRows,
|
||||||
|
info.maxRows,
|
||||||
|
info.totalRows/info.numOfBlocks,
|
||||||
|
info.numOfInmemRows
|
||||||
|
);
|
||||||
|
|
||||||
|
sprintf(st, "Total_Tables=[%d] Total_Files=[%d] Total_Vgroups=[%d]", info.numOfTables, info.numOfFiles, 0);
|
||||||
|
sprintf(st, "----------------------------------------------------------------------");
|
||||||
|
|
||||||
|
int32_t maxVal = 0;
|
||||||
|
int32_t minVal = INT32_MAX;
|
||||||
|
for(int32_t i = 0; i < 100; ++i) {
|
||||||
|
if (maxVal < info.blockRowsHisto[i]) {
|
||||||
|
maxVal = info.blockRowsHisto[i];
|
||||||
|
}
|
||||||
|
|
||||||
|
if (minVal > info.blockRowsHisto[i]) {
|
||||||
|
minVal = info.blockRowsHisto[i];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for(int32_t i = 0; i < 100; ++i) {
|
||||||
|
int32_t len = sprintf(st, "%d |", info.defMinRows + step);
|
||||||
|
int32_t num = (info.blockRowsHisto[i] + step - 1) / step;
|
||||||
|
for(int32_t j = 0; j < num; ++j) {
|
||||||
|
int32_t x = sprintf(st + len, "%c", '|');
|
||||||
|
len += x;
|
||||||
|
}
|
||||||
|
|
||||||
|
double v = info.blockRowsHisto[i]*1.0 / info.numOfBlocks;
|
||||||
|
sprintf(st + len, " %d (%.3f)\n", info.blockRowsHisto[i], v);
|
||||||
|
}
|
||||||
|
|
||||||
|
return 100;
|
||||||
|
}
|
||||||
|
|
|
@ -3684,7 +3684,7 @@ static void blockDistInfoFromBinary(const char* data, int32_t len, STableBlockDi
|
||||||
pDist->totalRows = tbufReadUint64(&br);
|
pDist->totalRows = tbufReadUint64(&br);
|
||||||
pDist->maxRows = tbufReadInt32(&br);
|
pDist->maxRows = tbufReadInt32(&br);
|
||||||
pDist->minRows = tbufReadInt32(&br);
|
pDist->minRows = tbufReadInt32(&br);
|
||||||
pDist->numOfRowsInMemTable = tbufReadUint32(&br);
|
pDist->numOfInmemRows = tbufReadUint32(&br);
|
||||||
pDist->numOfSmallBlocks = tbufReadUint32(&br);
|
pDist->numOfSmallBlocks = tbufReadUint32(&br);
|
||||||
int64_t numSteps = tbufReadUint64(&br);
|
int64_t numSteps = tbufReadUint64(&br);
|
||||||
|
|
||||||
|
@ -3732,7 +3732,7 @@ static void mergeTableBlockDist(SResultRowEntryInfo* pResInfo, const STableBlock
|
||||||
assert(pDist != NULL && pSrc != NULL);
|
assert(pDist != NULL && pSrc != NULL);
|
||||||
|
|
||||||
pDist->numOfTables += pSrc->numOfTables;
|
pDist->numOfTables += pSrc->numOfTables;
|
||||||
pDist->numOfRowsInMemTable += pSrc->numOfRowsInMemTable;
|
pDist->numOfInmemRows += pSrc->numOfInmemRows;
|
||||||
pDist->numOfSmallBlocks += pSrc->numOfSmallBlocks;
|
pDist->numOfSmallBlocks += pSrc->numOfSmallBlocks;
|
||||||
pDist->numOfFiles += pSrc->numOfFiles;
|
pDist->numOfFiles += pSrc->numOfFiles;
|
||||||
pDist->totalSize += pSrc->totalSize;
|
pDist->totalSize += pSrc->totalSize;
|
||||||
|
@ -3862,7 +3862,7 @@ void generateBlockDistResult(STableBlockDistInfo *pTableBlockDist, char* result)
|
||||||
percentiles[6], percentiles[7], percentiles[8], percentiles[9], percentiles[10], percentiles[11],
|
percentiles[6], percentiles[7], percentiles[8], percentiles[9], percentiles[10], percentiles[11],
|
||||||
min, max, avg, stdDev,
|
min, max, avg, stdDev,
|
||||||
totalRows, totalBlocks, smallBlocks, totalLen/1024.0, compRatio,
|
totalRows, totalBlocks, smallBlocks, totalLen/1024.0, compRatio,
|
||||||
pTableBlockDist->numOfRowsInMemTable);
|
pTableBlockDist->numOfInmemRows);
|
||||||
varDataSetLen(result, sz);
|
varDataSetLen(result, sz);
|
||||||
UNUSED(sz);
|
UNUSED(sz);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue