fix: fix invalid dist count
This commit is contained in:
parent
7d75939fae
commit
e8527ee9de
|
@ -205,7 +205,7 @@ typedef struct SDataBlockInfo {
|
|||
STimeWindow calWin; // used for stream, do not serialize
|
||||
TSKEY watermark; // used for stream
|
||||
|
||||
char parTbName[TSDB_TABLE_NAME_LEN]; // used for stream partition
|
||||
char parTbName[TSDB_TABLE_NAME_LEN]; // used for stream partition
|
||||
} SDataBlockInfo;
|
||||
|
||||
typedef struct SSDataBlock {
|
||||
|
@ -291,7 +291,6 @@ typedef struct STableBlockDistInfo {
|
|||
uint16_t numOfFiles;
|
||||
uint32_t numOfTables;
|
||||
uint32_t numOfBlocks;
|
||||
uint32_t numOfVgroups;
|
||||
uint64_t totalSize;
|
||||
uint64_t totalRows;
|
||||
int32_t maxRows;
|
||||
|
@ -301,6 +300,7 @@ typedef struct STableBlockDistInfo {
|
|||
int32_t firstSeekTimeUs;
|
||||
uint32_t numOfInmemRows;
|
||||
uint32_t numOfSmallBlocks;
|
||||
uint32_t numOfVgroups;
|
||||
int32_t blockRowsHisto[20];
|
||||
} STableBlockDistInfo;
|
||||
|
||||
|
@ -341,7 +341,7 @@ typedef struct SExprInfo {
|
|||
|
||||
typedef struct {
|
||||
const char* key;
|
||||
size_t keyLen;
|
||||
size_t keyLen;
|
||||
uint8_t type;
|
||||
union {
|
||||
const char* value;
|
||||
|
@ -385,9 +385,9 @@ typedef struct STUidTagInfo {
|
|||
#define TABLE_NAME_COLUMN_INDEX 6
|
||||
|
||||
// stream create table block column
|
||||
#define UD_TABLE_NAME_COLUMN_INDEX 0
|
||||
#define UD_GROUPID_COLUMN_INDEX 1
|
||||
#define UD_TAG_COLUMN_INDEX 2
|
||||
#define UD_TABLE_NAME_COLUMN_INDEX 0
|
||||
#define UD_GROUPID_COLUMN_INDEX 1
|
||||
#define UD_TAG_COLUMN_INDEX 2
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -775,13 +775,13 @@ int32_t maxFunction(SqlFunctionCtx* pCtx) {
|
|||
|
||||
static int32_t setNullSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t rowIndex);
|
||||
static int32_t setSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, const STuplePos* pTuplePos,
|
||||
int32_t rowIndex);
|
||||
int32_t rowIndex);
|
||||
|
||||
int32_t minmaxFunctionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx);
|
||||
SMinmaxResInfo* pRes = GET_ROWCELL_INTERBUF(pEntryInfo);
|
||||
SMinmaxResInfo* pRes = GET_ROWCELL_INTERBUF(pEntryInfo);
|
||||
|
||||
int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
|
||||
int32_t currentRow = pBlock->info.rows;
|
||||
|
@ -795,7 +795,7 @@ int32_t minmaxFunctionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
|||
case TSDB_DATA_TYPE_UBIGINT:
|
||||
case TSDB_DATA_TYPE_BIGINT:
|
||||
((int64_t*)pCol->pData)[currentRow] = pRes->v;
|
||||
// colDataAppendInt64(pCol, currentRow, &pRes->v);
|
||||
// colDataAppendInt64(pCol, currentRow, &pRes->v);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_UINT:
|
||||
case TSDB_DATA_TYPE_INT:
|
||||
|
@ -920,9 +920,7 @@ void appendSelectivityValue(SqlFunctionCtx* pCtx, int32_t rowIndex, int32_t pos)
|
|||
}
|
||||
}
|
||||
|
||||
void replaceTupleData(STuplePos* pDestPos, STuplePos* pSourcePos) {
|
||||
*pDestPos = *pSourcePos;
|
||||
}
|
||||
void replaceTupleData(STuplePos* pDestPos, STuplePos* pSourcePos) { *pDestPos = *pSourcePos; }
|
||||
|
||||
int32_t minMaxCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx, int32_t isMinFunc) {
|
||||
SResultRowEntryInfo* pDResInfo = GET_RES_INFO(pDestCtx);
|
||||
|
@ -1686,7 +1684,7 @@ int32_t percentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
|||
SPercentileInfo* ppInfo = (SPercentileInfo*)GET_ROWCELL_INTERBUF(pResInfo);
|
||||
|
||||
int32_t code = 0;
|
||||
double v = 0;
|
||||
double v = 0;
|
||||
|
||||
tMemBucket* pMemBucket = ppInfo->pMemBucket;
|
||||
if (pMemBucket == NULL || pMemBucket->total == 0) { // check for null
|
||||
|
@ -1717,7 +1715,7 @@ int32_t percentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
|||
}
|
||||
|
||||
int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
|
||||
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
|
||||
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
|
||||
|
||||
varDataSetLen(buf, len);
|
||||
colDataAppend(pCol, pBlock->info.rows, buf, false);
|
||||
|
@ -1742,7 +1740,6 @@ _fin_error:
|
|||
|
||||
tMemBucketDestroy(pMemBucket);
|
||||
return code;
|
||||
|
||||
}
|
||||
|
||||
bool getApercentileFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
|
||||
|
@ -2103,7 +2100,7 @@ static void prepareBuf(SqlFunctionCtx* pCtx) {
|
|||
}
|
||||
|
||||
static int32_t firstlastSaveTupleData(const SSDataBlock* pSrcBlock, int32_t rowIndex, SqlFunctionCtx* pCtx,
|
||||
SFirstLastRes* pInfo) {
|
||||
SFirstLastRes* pInfo) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
if (pCtx->subsidiaries.num <= 0) {
|
||||
|
@ -2156,7 +2153,8 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) {
|
|||
}
|
||||
|
||||
// All null data column, return directly.
|
||||
if (pInput->colDataSMAIsSet && (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows) && pInputCol->hasNull == true) {
|
||||
if (pInput->colDataSMAIsSet && (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows) &&
|
||||
pInputCol->hasNull == true) {
|
||||
// save selectivity value for column consisted of all null values
|
||||
int32_t code = firstlastSaveTupleData(pCtx->pSrcBlock, pInput->startRowIndex, pCtx, pInfo);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -2272,7 +2270,8 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) {
|
|||
}
|
||||
|
||||
// All null data column, return directly.
|
||||
if (pInput->colDataSMAIsSet && (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows) && pInputCol->hasNull == true) {
|
||||
if (pInput->colDataSMAIsSet && (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows) &&
|
||||
pInputCol->hasNull == true) {
|
||||
// save selectivity value for column consisted of all null values
|
||||
int32_t code = firstlastSaveTupleData(pCtx->pSrcBlock, pInput->startRowIndex, pCtx, pInfo);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -2366,7 +2365,7 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) {
|
|||
}
|
||||
|
||||
if (pResInfo->numOfRes == 0 || pInfo->ts < cts) {
|
||||
char* data = colDataGetData(pInputCol, chosen);
|
||||
char* data = colDataGetData(pInputCol, chosen);
|
||||
int32_t code = doSaveCurrentVal(pCtx, i, cts, type, data);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
|
@ -2377,7 +2376,7 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) {
|
|||
|
||||
for (int32_t i = pInput->startRowIndex + round * 4; i < pInput->startRowIndex + pInput->numOfRows; ++i) {
|
||||
if (pResInfo->numOfRes == 0 || pInfo->ts < pts[i]) {
|
||||
char* data = colDataGetData(pInputCol, i);
|
||||
char* data = colDataGetData(pInputCol, i);
|
||||
int32_t code = doSaveCurrentVal(pCtx, i, pts[i], type, data);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
|
@ -2394,7 +2393,7 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) {
|
|||
numOfElems++;
|
||||
|
||||
if (pResInfo->numOfRes == 0 || pInfo->ts < pts[i]) {
|
||||
char* data = colDataGetData(pInputCol, i);
|
||||
char* data = colDataGetData(pInputCol, i);
|
||||
int32_t code = doSaveCurrentVal(pCtx, i, pts[i], type, data);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
|
@ -2441,7 +2440,7 @@ static int32_t firstLastTransferInfoImpl(SFirstLastRes* pInput, SFirstLastRes* p
|
|||
}
|
||||
|
||||
static int32_t firstLastTransferInfo(SqlFunctionCtx* pCtx, SFirstLastRes* pInput, SFirstLastRes* pOutput, bool isFirst,
|
||||
int32_t rowIndex) {
|
||||
int32_t rowIndex) {
|
||||
if (TSDB_CODE_SUCCESS == firstLastTransferInfoImpl(pInput, pOutput, isFirst)) {
|
||||
int32_t code = firstlastSaveTupleData(pCtx->pSrcBlock, rowIndex, pCtx, pOutput);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -2468,7 +2467,7 @@ static int32_t firstLastFunctionMergeImpl(SqlFunctionCtx* pCtx, bool isFirstQuer
|
|||
for (int32_t i = start; i < start + pInput->numOfRows; ++i) {
|
||||
char* data = colDataGetData(pCol, i);
|
||||
SFirstLastRes* pInputInfo = (SFirstLastRes*)varDataVal(data);
|
||||
int32_t code = firstLastTransferInfo(pCtx, pInputInfo, pInfo, isFirstQuery, i);
|
||||
int32_t code = firstLastTransferInfo(pCtx, pInputInfo, pInfo, isFirstQuery, i);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
@ -2700,7 +2699,7 @@ static int32_t doSetPrevVal(SDiffInfo* pDiffInfo, int32_t type, const char* pv,
|
|||
}
|
||||
|
||||
static int32_t doHandleDiff(SDiffInfo* pDiffInfo, int32_t type, const char* pv, SColumnInfoData* pOutput, int32_t pos,
|
||||
int32_t order, int64_t ts) {
|
||||
int32_t order, int64_t ts) {
|
||||
int32_t factor = (order == TSDB_ORDER_ASC) ? 1 : -1;
|
||||
pDiffInfo->prevTs = ts;
|
||||
switch (type) {
|
||||
|
@ -2908,8 +2907,8 @@ static STopBotRes* getTopBotOutputInfo(SqlFunctionCtx* pCtx) {
|
|||
return pRes;
|
||||
}
|
||||
|
||||
static int32_t doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSDataBlock* pSrcBlock, uint16_t type,
|
||||
uint64_t uid, SResultRowEntryInfo* pEntryInfo, bool isTopQuery);
|
||||
static int32_t doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSDataBlock* pSrcBlock,
|
||||
uint16_t type, uint64_t uid, SResultRowEntryInfo* pEntryInfo, bool isTopQuery);
|
||||
|
||||
static void addResult(SqlFunctionCtx* pCtx, STopBotResItem* pSourceItem, int16_t type, bool isTopQuery);
|
||||
|
||||
|
@ -2930,7 +2929,7 @@ int32_t topFunction(SqlFunctionCtx* pCtx) {
|
|||
}
|
||||
|
||||
numOfElems++;
|
||||
char* data = colDataGetData(pCol, i);
|
||||
char* data = colDataGetData(pCol, i);
|
||||
int32_t code = doAddIntoResult(pCtx, data, i, pCtx->pSrcBlock, pRes->type, pInput->uid, pResInfo, true);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
|
@ -2964,7 +2963,7 @@ int32_t bottomFunction(SqlFunctionCtx* pCtx) {
|
|||
}
|
||||
|
||||
numOfElems++;
|
||||
char* data = colDataGetData(pCol, i);
|
||||
char* data = colDataGetData(pCol, i);
|
||||
int32_t code = doAddIntoResult(pCtx, data, i, pCtx->pSrcBlock, pRes->type, pInput->uid, pResInfo, false);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
|
@ -3016,7 +3015,7 @@ static int32_t topBotResComparFn(const void* p1, const void* p2, const void* par
|
|||
}
|
||||
|
||||
int32_t doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSDataBlock* pSrcBlock, uint16_t type,
|
||||
uint64_t uid, SResultRowEntryInfo* pEntryInfo, bool isTopQuery) {
|
||||
uint64_t uid, SResultRowEntryInfo* pEntryInfo, bool isTopQuery) {
|
||||
STopBotRes* pRes = getTopBotOutputInfo(pCtx);
|
||||
|
||||
SVariant val = {0};
|
||||
|
@ -3207,7 +3206,7 @@ static char* doLoadTupleData(SSerializeDataHandle* pHandle, const STuplePos* pPo
|
|||
if (pPage == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
char* p = pPage->data + pPos->offset;
|
||||
char* p = pPage->data + pPos->offset;
|
||||
releaseBufPage(pHandle->pBuf, pPage);
|
||||
return p;
|
||||
} else {
|
||||
|
@ -4668,7 +4667,7 @@ int32_t sampleFunction(SqlFunctionCtx* pCtx) {
|
|||
continue;
|
||||
}
|
||||
|
||||
char* data = colDataGetData(pInputCol, i);
|
||||
char* data = colDataGetData(pInputCol, i);
|
||||
int32_t code = doReservoirSample(pCtx, pInfo, data, i);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
|
@ -4688,7 +4687,7 @@ int32_t sampleFunction(SqlFunctionCtx* pCtx) {
|
|||
}
|
||||
|
||||
int32_t sampleFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx);
|
||||
|
||||
SSampleInfo* pInfo = getSampleOutputInfo(pCtx);
|
||||
|
@ -5022,7 +5021,7 @@ int32_t modeFunction(SqlFunctionCtx* pCtx) {
|
|||
}
|
||||
numOfElems++;
|
||||
|
||||
char* data = colDataGetData(pInputCol, i);
|
||||
char* data = colDataGetData(pInputCol, i);
|
||||
int32_t code = doModeAdd(pInfo, i, pCtx, data);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
|
@ -5428,7 +5427,6 @@ int32_t blockDistFunction(SqlFunctionCtx* pCtx) {
|
|||
pDistInfo->numOfBlocks += p1.numOfBlocks;
|
||||
pDistInfo->numOfTables += p1.numOfTables;
|
||||
pDistInfo->numOfInmemRows += p1.numOfInmemRows;
|
||||
pDistInfo->numOfVgroups += p1.numOfVgroups;
|
||||
pDistInfo->totalSize += p1.totalSize;
|
||||
pDistInfo->totalRows += p1.totalRows;
|
||||
pDistInfo->numOfFiles += p1.numOfFiles;
|
||||
|
@ -5445,6 +5443,7 @@ int32_t blockDistFunction(SqlFunctionCtx* pCtx) {
|
|||
pDistInfo->maxRows = p1.maxRows;
|
||||
}
|
||||
|
||||
pDistInfo->numOfVgroups += (p1.numOfTables != 0 ? 1 : 0);
|
||||
for (int32_t i = 0; i < tListLen(pDistInfo->blockRowsHisto); ++i) {
|
||||
pDistInfo->blockRowsHisto[i] += p1.blockRowsHisto[i];
|
||||
}
|
||||
|
@ -5463,7 +5462,6 @@ int32_t tSerializeBlockDistInfo(void* buf, int32_t bufLen, const STableBlockDist
|
|||
if (tEncodeU16(&encoder, pInfo->numOfFiles) < 0) return -1;
|
||||
if (tEncodeU32(&encoder, pInfo->numOfBlocks) < 0) return -1;
|
||||
if (tEncodeU32(&encoder, pInfo->numOfTables) < 0) return -1;
|
||||
if (tEncodeU32(&encoder, pInfo->numOfVgroups) < 0) return -1;
|
||||
|
||||
if (tEncodeU64(&encoder, pInfo->totalSize) < 0) return -1;
|
||||
if (tEncodeU64(&encoder, pInfo->totalRows) < 0) return -1;
|
||||
|
@ -5473,6 +5471,7 @@ int32_t tSerializeBlockDistInfo(void* buf, int32_t bufLen, const STableBlockDist
|
|||
if (tEncodeI32(&encoder, pInfo->defMinRows) < 0) return -1;
|
||||
if (tEncodeU32(&encoder, pInfo->numOfInmemRows) < 0) return -1;
|
||||
if (tEncodeU32(&encoder, pInfo->numOfSmallBlocks) < 0) return -1;
|
||||
if (tEncodeU32(&encoder, pInfo->numOfVgroups) < 0) return -1;
|
||||
|
||||
for (int32_t i = 0; i < tListLen(pInfo->blockRowsHisto); ++i) {
|
||||
if (tEncodeI32(&encoder, pInfo->blockRowsHisto[i]) < 0) return -1;
|
||||
|
@ -5495,7 +5494,6 @@ int32_t tDeserializeBlockDistInfo(void* buf, int32_t bufLen, STableBlockDistInfo
|
|||
if (tDecodeU16(&decoder, &pInfo->numOfFiles) < 0) return -1;
|
||||
if (tDecodeU32(&decoder, &pInfo->numOfBlocks) < 0) return -1;
|
||||
if (tDecodeU32(&decoder, &pInfo->numOfTables) < 0) return -1;
|
||||
if (tDecodeU32(&decoder, &pInfo->numOfVgroups) < 0) return -1;
|
||||
|
||||
if (tDecodeU64(&decoder, &pInfo->totalSize) < 0) return -1;
|
||||
if (tDecodeU64(&decoder, &pInfo->totalRows) < 0) return -1;
|
||||
|
@ -5505,6 +5503,7 @@ int32_t tDeserializeBlockDistInfo(void* buf, int32_t bufLen, STableBlockDistInfo
|
|||
if (tDecodeI32(&decoder, &pInfo->defMinRows) < 0) return -1;
|
||||
if (tDecodeU32(&decoder, &pInfo->numOfInmemRows) < 0) return -1;
|
||||
if (tDecodeU32(&decoder, &pInfo->numOfSmallBlocks) < 0) return -1;
|
||||
if (tDecodeU32(&decoder, &pInfo->numOfVgroups) < 0) return -1;
|
||||
|
||||
for (int32_t i = 0; i < tListLen(pInfo->blockRowsHisto); ++i) {
|
||||
if (tDecodeI32(&decoder, &pInfo->blockRowsHisto[i]) < 0) return -1;
|
||||
|
|
Loading…
Reference in New Issue