Merge pull request #20073 from taosdata/fix/TD-22635
fix: add vgroup count
This commit is contained in:
commit
4902e9a644
|
@ -195,7 +195,7 @@ typedef struct SDataBlockInfo {
|
||||||
uint32_t capacity;
|
uint32_t capacity;
|
||||||
SBlockID id;
|
SBlockID id;
|
||||||
int16_t hasVarCol;
|
int16_t hasVarCol;
|
||||||
int16_t dataLoad; // denote if the data is loaded or not
|
int16_t dataLoad; // denote if the data is loaded or not
|
||||||
|
|
||||||
// TODO: optimize and remove following
|
// TODO: optimize and remove following
|
||||||
int64_t version; // used for stream, and need serialization
|
int64_t version; // used for stream, and need serialization
|
||||||
|
@ -204,8 +204,8 @@ typedef struct SDataBlockInfo {
|
||||||
STimeWindow calWin; // used for stream, do not serialize
|
STimeWindow calWin; // used for stream, do not serialize
|
||||||
TSKEY watermark; // used for stream
|
TSKEY watermark; // used for stream
|
||||||
|
|
||||||
char parTbName[TSDB_TABLE_NAME_LEN]; // used for stream partition
|
char parTbName[TSDB_TABLE_NAME_LEN]; // used for stream partition
|
||||||
STag* pTag; // used for stream partition
|
STag* pTag; // used for stream partition
|
||||||
} SDataBlockInfo;
|
} SDataBlockInfo;
|
||||||
|
|
||||||
typedef struct SSDataBlock {
|
typedef struct SSDataBlock {
|
||||||
|
@ -239,22 +239,22 @@ typedef struct SVarColAttr {
|
||||||
// pBlockAgg->numOfNull == info.rows, all data are null
|
// pBlockAgg->numOfNull == info.rows, all data are null
|
||||||
// pBlockAgg->numOfNull == 0, no data are null.
|
// pBlockAgg->numOfNull == 0, no data are null.
|
||||||
typedef struct SColumnInfoData {
|
typedef struct SColumnInfoData {
|
||||||
char* pData; // the corresponding block data in memory
|
char* pData; // the corresponding block data in memory
|
||||||
union {
|
union {
|
||||||
char* nullbitmap; // bitmap, one bit for each item in the list
|
char* nullbitmap; // bitmap, one bit for each item in the list
|
||||||
SVarColAttr varmeta;
|
SVarColAttr varmeta;
|
||||||
};
|
};
|
||||||
SColumnInfo info; // column info
|
SColumnInfo info; // column info
|
||||||
bool hasNull; // if current column data has null value.
|
bool hasNull; // if current column data has null value.
|
||||||
} SColumnInfoData;
|
} SColumnInfoData;
|
||||||
|
|
||||||
typedef struct SQueryTableDataCond {
|
typedef struct SQueryTableDataCond {
|
||||||
uint64_t suid;
|
uint64_t suid;
|
||||||
int32_t order; // desc|asc order to iterate the data block
|
int32_t order; // desc|asc order to iterate the data block
|
||||||
int32_t numOfCols;
|
int32_t numOfCols;
|
||||||
SColumnInfo* colList;
|
SColumnInfo* colList;
|
||||||
int32_t* pSlotList; // the column output destation slot, and it may be null
|
int32_t* pSlotList; // the column output destation slot, and it may be null
|
||||||
int32_t type; // data block load type:
|
int32_t type; // data block load type:
|
||||||
STimeWindow twindows;
|
STimeWindow twindows;
|
||||||
int64_t startVersion;
|
int64_t startVersion;
|
||||||
int64_t endVersion;
|
int64_t endVersion;
|
||||||
|
@ -300,6 +300,7 @@ typedef struct STableBlockDistInfo {
|
||||||
int32_t firstSeekTimeUs;
|
int32_t firstSeekTimeUs;
|
||||||
uint32_t numOfInmemRows;
|
uint32_t numOfInmemRows;
|
||||||
uint32_t numOfSmallBlocks;
|
uint32_t numOfSmallBlocks;
|
||||||
|
uint32_t numOfVgroups;
|
||||||
int32_t blockRowsHisto[20];
|
int32_t blockRowsHisto[20];
|
||||||
} STableBlockDistInfo;
|
} STableBlockDistInfo;
|
||||||
|
|
||||||
|
|
|
@ -81,10 +81,10 @@ typedef struct MergeIndex {
|
||||||
} MergeIndex;
|
} MergeIndex;
|
||||||
|
|
||||||
typedef struct SBlockDistInfo {
|
typedef struct SBlockDistInfo {
|
||||||
SSDataBlock* pResBlock;
|
SSDataBlock* pResBlock;
|
||||||
STsdbReader* pHandle;
|
STsdbReader* pHandle;
|
||||||
SReadHandle readHandle;
|
SReadHandle readHandle;
|
||||||
uint64_t uid; // table uid
|
uint64_t uid; // table uid
|
||||||
} SBlockDistInfo;
|
} SBlockDistInfo;
|
||||||
|
|
||||||
static int32_t sysChkFilter__Comm(SNode* pNode);
|
static int32_t sysChkFilter__Comm(SNode* pNode);
|
||||||
|
@ -129,10 +129,10 @@ static char* SYSTABLE_IDX_COLUMN[] = {"table_name", "db_name", "create_time"
|
||||||
|
|
||||||
static char* SYSTABLE_SPECIAL_COL[] = {"db_name", "vgroup_id"};
|
static char* SYSTABLE_SPECIAL_COL[] = {"db_name", "vgroup_id"};
|
||||||
|
|
||||||
static int32_t buildSysDbTableInfo(const SSysTableScanInfo* pInfo, int32_t capacity);
|
static int32_t buildSysDbTableInfo(const SSysTableScanInfo* pInfo, int32_t capacity);
|
||||||
static SSDataBlock* buildInfoSchemaTableMetaBlock(char* tableName);
|
static SSDataBlock* buildInfoSchemaTableMetaBlock(char* tableName);
|
||||||
static void destroySysScanOperator(void* param);
|
static void destroySysScanOperator(void* param);
|
||||||
static int32_t loadSysTableCallback(void* param, SDataBuf* pMsg, int32_t code);
|
static int32_t loadSysTableCallback(void* param, SDataBuf* pMsg, int32_t code);
|
||||||
static __optSysFilter optSysGetFilterFunc(int32_t ctype, bool* reverse);
|
static __optSysFilter optSysGetFilterFunc(int32_t ctype, bool* reverse);
|
||||||
|
|
||||||
static int32_t sysTableUserTagsFillOneTableTags(const SSysTableScanInfo* pInfo, SMetaReader* smrSuperTable,
|
static int32_t sysTableUserTagsFillOneTableTags(const SSysTableScanInfo* pInfo, SMetaReader* smrSuperTable,
|
||||||
|
@ -199,11 +199,11 @@ int32_t sysFilte__TableName(void* arg, SNode* pNode, SArray* result) {
|
||||||
if (func == NULL) return -1;
|
if (func == NULL) return -1;
|
||||||
|
|
||||||
SMetaFltParam param = {.suid = 0,
|
SMetaFltParam param = {.suid = 0,
|
||||||
.cid = 0,
|
.cid = 0,
|
||||||
.type = TSDB_DATA_TYPE_VARCHAR,
|
.type = TSDB_DATA_TYPE_VARCHAR,
|
||||||
.val = pVal->datum.p,
|
.val = pVal->datum.p,
|
||||||
.reverse = reverse,
|
.reverse = reverse,
|
||||||
.filterFunc = func};
|
.filterFunc = func};
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -218,11 +218,11 @@ int32_t sysFilte__CreateTime(void* arg, SNode* pNode, SArray* result) {
|
||||||
if (func == NULL) return -1;
|
if (func == NULL) return -1;
|
||||||
|
|
||||||
SMetaFltParam param = {.suid = 0,
|
SMetaFltParam param = {.suid = 0,
|
||||||
.cid = 0,
|
.cid = 0,
|
||||||
.type = TSDB_DATA_TYPE_BIGINT,
|
.type = TSDB_DATA_TYPE_BIGINT,
|
||||||
.val = &pVal->datum.i,
|
.val = &pVal->datum.i,
|
||||||
.reverse = reverse,
|
.reverse = reverse,
|
||||||
.filterFunc = func};
|
.filterFunc = func};
|
||||||
|
|
||||||
int32_t ret = metaFilterCreateTime(pMeta, ¶m, result);
|
int32_t ret = metaFilterCreateTime(pMeta, ¶m, result);
|
||||||
return ret;
|
return ret;
|
||||||
|
@ -350,9 +350,9 @@ static int32_t optSysMergeRslt(SArray* mRslt, SArray* rslt);
|
||||||
static SSDataBlock* sysTableScanFromMNode(SOperatorInfo* pOperator, SSysTableScanInfo* pInfo, const char* name,
|
static SSDataBlock* sysTableScanFromMNode(SOperatorInfo* pOperator, SSysTableScanInfo* pInfo, const char* name,
|
||||||
SExecTaskInfo* pTaskInfo);
|
SExecTaskInfo* pTaskInfo);
|
||||||
void extractTbnameSlotId(SSysTableScanInfo* pInfo, const SScanPhysiNode* pScanNode);
|
void extractTbnameSlotId(SSysTableScanInfo* pInfo, const SScanPhysiNode* pScanNode);
|
||||||
static void sysTableScanFillTbName(SOperatorInfo* pOperator, const SSysTableScanInfo* pInfo,
|
static void sysTableScanFillTbName(SOperatorInfo* pOperator, const SSysTableScanInfo* pInfo, const char* name,
|
||||||
const char* name, SSDataBlock* pBlock);
|
SSDataBlock* pBlock);
|
||||||
__optSysFilter optSysGetFilterFunc(int32_t ctype, bool* reverse) {
|
__optSysFilter optSysGetFilterFunc(int32_t ctype, bool* reverse) {
|
||||||
if (ctype == OP_TYPE_LOWER_EQUAL || ctype == OP_TYPE_LOWER_THAN) {
|
if (ctype == OP_TYPE_LOWER_EQUAL || ctype == OP_TYPE_LOWER_THAN) {
|
||||||
*reverse = true;
|
*reverse = true;
|
||||||
}
|
}
|
||||||
|
@ -516,9 +516,10 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) {
|
||||||
metaTbCursorPrev(pInfo->pCur);
|
metaTbCursorPrev(pInfo->pCur);
|
||||||
blockFull = true;
|
blockFull = true;
|
||||||
} else {
|
} else {
|
||||||
sysTableUserTagsFillOneTableTags(pInfo, &smrSuperTable, &pInfo->pCur->mr, dbname, tableName, &numOfRows, dataBlock);
|
sysTableUserTagsFillOneTableTags(pInfo, &smrSuperTable, &pInfo->pCur->mr, dbname, tableName, &numOfRows,
|
||||||
|
dataBlock);
|
||||||
}
|
}
|
||||||
|
|
||||||
metaReaderClear(&smrSuperTable);
|
metaReaderClear(&smrSuperTable);
|
||||||
|
|
||||||
if (blockFull || numOfRows >= pOperator->resultInfo.capacity) {
|
if (blockFull || numOfRows >= pOperator->resultInfo.capacity) {
|
||||||
|
@ -1343,7 +1344,7 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
|
||||||
setOperatorCompleted(pOperator);
|
setOperatorCompleted(pOperator);
|
||||||
}
|
}
|
||||||
|
|
||||||
return pBlock->info.rows > 0? pBlock:NULL;
|
return pBlock->info.rows > 0 ? pBlock : NULL;
|
||||||
} else {
|
} else {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
@ -1357,7 +1358,7 @@ static void sysTableScanFillTbName(SOperatorInfo* pOperator, const SSysTableScan
|
||||||
|
|
||||||
if (pInfo->tbnameSlotId != -1) {
|
if (pInfo->tbnameSlotId != -1) {
|
||||||
SColumnInfoData* pColumnInfoData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, pInfo->tbnameSlotId);
|
SColumnInfoData* pColumnInfoData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, pInfo->tbnameSlotId);
|
||||||
char varTbName[TSDB_TABLE_FNAME_LEN - 1 + VARSTR_HEADER_SIZE] = {0};
|
char varTbName[TSDB_TABLE_FNAME_LEN - 1 + VARSTR_HEADER_SIZE] = {0};
|
||||||
memcpy(varDataVal(varTbName), name, strlen(name));
|
memcpy(varDataVal(varTbName), name, strlen(name));
|
||||||
varDataSetLen(varTbName, strlen(name));
|
varDataSetLen(varTbName, strlen(name));
|
||||||
|
|
||||||
|
@ -1489,10 +1490,11 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScan
|
||||||
setOperatorInfo(pOperator, "SysTableScanOperator", QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN, false, OP_NOT_OPENED,
|
setOperatorInfo(pOperator, "SysTableScanOperator", QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN, false, OP_NOT_OPENED,
|
||||||
pInfo, pTaskInfo);
|
pInfo, pTaskInfo);
|
||||||
pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock);
|
pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock);
|
||||||
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doSysTableScan, NULL, destroySysScanOperator, optrDefaultBufFn, NULL);
|
pOperator->fpSet =
|
||||||
|
createOperatorFpSet(optrDummyOpenFn, doSysTableScan, NULL, destroySysScanOperator, optrDefaultBufFn, NULL);
|
||||||
return pOperator;
|
return pOperator;
|
||||||
|
|
||||||
_error:
|
_error:
|
||||||
if (pInfo != NULL) {
|
if (pInfo != NULL) {
|
||||||
destroySysScanOperator(pInfo);
|
destroySysScanOperator(pInfo);
|
||||||
}
|
}
|
||||||
|
@ -1927,7 +1929,7 @@ static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator) {
|
||||||
// make the valgrind happy that all memory buffer has been initialized already.
|
// make the valgrind happy that all memory buffer has been initialized already.
|
||||||
if (slotId != 0) {
|
if (slotId != 0) {
|
||||||
SColumnInfoData* p1 = taosArrayGet(pBlock->pDataBlock, 0);
|
SColumnInfoData* p1 = taosArrayGet(pBlock->pDataBlock, 0);
|
||||||
int64_t v = 0;
|
int64_t v = 0;
|
||||||
colDataAppendInt64(p1, 0, &v);
|
colDataAppendInt64(p1, 0, &v);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1937,10 +1939,10 @@ static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static void destroyBlockDistScanOperatorInfo(void* param) {
|
static void destroyBlockDistScanOperatorInfo(void* param) {
|
||||||
SBlockDistInfo* pDistInfo = (SBlockDistInfo*)param;
|
SBlockDistInfo* pDistInfo = (SBlockDistInfo*)param;
|
||||||
blockDataDestroy(pDistInfo->pResBlock);
|
blockDataDestroy(pDistInfo->pResBlock);
|
||||||
tsdbReaderClose(pDistInfo->pHandle);
|
tsdbReaderClose(pDistInfo->pHandle);
|
||||||
taosMemoryFreeClear(param);
|
taosMemoryFreeClear(param);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t initTableblockDistQueryCond(uint64_t uid, SQueryTableDataCond* pCond) {
|
static int32_t initTableblockDistQueryCond(uint64_t uid, SQueryTableDataCond* pCond) {
|
||||||
|
@ -2001,7 +2003,7 @@ SOperatorInfo* createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDi
|
||||||
}
|
}
|
||||||
|
|
||||||
pInfo->readHandle = *readHandle;
|
pInfo->readHandle = *readHandle;
|
||||||
pInfo->uid = (pBlockScanNode->suid != 0)? pBlockScanNode->suid:pBlockScanNode->uid;
|
pInfo->uid = (pBlockScanNode->suid != 0) ? pBlockScanNode->suid : pBlockScanNode->uid;
|
||||||
|
|
||||||
int32_t numOfCols = 0;
|
int32_t numOfCols = 0;
|
||||||
SExprInfo* pExprInfo = createExprInfo(pBlockScanNode->pScanPseudoCols, NULL, &numOfCols);
|
SExprInfo* pExprInfo = createExprInfo(pBlockScanNode->pScanPseudoCols, NULL, &numOfCols);
|
||||||
|
@ -2012,8 +2014,8 @@ SOperatorInfo* createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDi
|
||||||
|
|
||||||
setOperatorInfo(pOperator, "DataBlockDistScanOperator", QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN, false,
|
setOperatorInfo(pOperator, "DataBlockDistScanOperator", QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN, false,
|
||||||
OP_NOT_OPENED, pInfo, pTaskInfo);
|
OP_NOT_OPENED, pInfo, pTaskInfo);
|
||||||
pOperator->fpSet =
|
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doBlockInfoScan, NULL, destroyBlockDistScanOperatorInfo,
|
||||||
createOperatorFpSet(optrDummyOpenFn, doBlockInfoScan, NULL, destroyBlockDistScanOperatorInfo, optrDefaultBufFn, NULL);
|
optrDefaultBufFn, NULL);
|
||||||
return pOperator;
|
return pOperator;
|
||||||
|
|
||||||
_error:
|
_error:
|
||||||
|
|
|
@ -775,13 +775,13 @@ int32_t maxFunction(SqlFunctionCtx* pCtx) {
|
||||||
|
|
||||||
static int32_t setNullSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t rowIndex);
|
static int32_t setNullSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t rowIndex);
|
||||||
static int32_t setSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, const STuplePos* pTuplePos,
|
static int32_t setSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, const STuplePos* pTuplePos,
|
||||||
int32_t rowIndex);
|
int32_t rowIndex);
|
||||||
|
|
||||||
int32_t minmaxFunctionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
int32_t minmaxFunctionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx);
|
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx);
|
||||||
SMinmaxResInfo* pRes = GET_ROWCELL_INTERBUF(pEntryInfo);
|
SMinmaxResInfo* pRes = GET_ROWCELL_INTERBUF(pEntryInfo);
|
||||||
|
|
||||||
int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
|
int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
|
||||||
int32_t currentRow = pBlock->info.rows;
|
int32_t currentRow = pBlock->info.rows;
|
||||||
|
@ -795,7 +795,7 @@ int32_t minmaxFunctionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||||
case TSDB_DATA_TYPE_UBIGINT:
|
case TSDB_DATA_TYPE_UBIGINT:
|
||||||
case TSDB_DATA_TYPE_BIGINT:
|
case TSDB_DATA_TYPE_BIGINT:
|
||||||
((int64_t*)pCol->pData)[currentRow] = pRes->v;
|
((int64_t*)pCol->pData)[currentRow] = pRes->v;
|
||||||
// colDataAppendInt64(pCol, currentRow, &pRes->v);
|
// colDataAppendInt64(pCol, currentRow, &pRes->v);
|
||||||
break;
|
break;
|
||||||
case TSDB_DATA_TYPE_UINT:
|
case TSDB_DATA_TYPE_UINT:
|
||||||
case TSDB_DATA_TYPE_INT:
|
case TSDB_DATA_TYPE_INT:
|
||||||
|
@ -1691,7 +1691,7 @@ int32_t percentileFunction(SqlFunctionCtx* pCtx) {
|
||||||
|
|
||||||
int32_t percentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
int32_t percentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||||
SVariant* pVal = &pCtx->param[1].param;
|
SVariant* pVal = &pCtx->param[1].param;
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
double v = 0;
|
double v = 0;
|
||||||
|
|
||||||
GET_TYPED_DATA(v, double, pVal->nType, &pVal->i);
|
GET_TYPED_DATA(v, double, pVal->nType, &pVal->i);
|
||||||
|
@ -2070,7 +2070,7 @@ static void prepareBuf(SqlFunctionCtx* pCtx) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t firstlastSaveTupleData(const SSDataBlock* pSrcBlock, int32_t rowIndex, SqlFunctionCtx* pCtx,
|
static int32_t firstlastSaveTupleData(const SSDataBlock* pSrcBlock, int32_t rowIndex, SqlFunctionCtx* pCtx,
|
||||||
SFirstLastRes* pInfo) {
|
SFirstLastRes* pInfo) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
if (pCtx->subsidiaries.num <= 0) {
|
if (pCtx->subsidiaries.num <= 0) {
|
||||||
|
@ -2123,7 +2123,8 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// All null data column, return directly.
|
// All null data column, return directly.
|
||||||
if (pInput->colDataSMAIsSet && (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows) && pInputCol->hasNull == true) {
|
if (pInput->colDataSMAIsSet && (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows) &&
|
||||||
|
pInputCol->hasNull == true) {
|
||||||
// save selectivity value for column consisted of all null values
|
// save selectivity value for column consisted of all null values
|
||||||
int32_t code = firstlastSaveTupleData(pCtx->pSrcBlock, pInput->startRowIndex, pCtx, pInfo);
|
int32_t code = firstlastSaveTupleData(pCtx->pSrcBlock, pInput->startRowIndex, pCtx, pInfo);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
@ -2239,7 +2240,8 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// All null data column, return directly.
|
// All null data column, return directly.
|
||||||
if (pInput->colDataSMAIsSet && (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows) && pInputCol->hasNull == true) {
|
if (pInput->colDataSMAIsSet && (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows) &&
|
||||||
|
pInputCol->hasNull == true) {
|
||||||
// save selectivity value for column consisted of all null values
|
// save selectivity value for column consisted of all null values
|
||||||
int32_t code = firstlastSaveTupleData(pCtx->pSrcBlock, pInput->startRowIndex, pCtx, pInfo);
|
int32_t code = firstlastSaveTupleData(pCtx->pSrcBlock, pInput->startRowIndex, pCtx, pInfo);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
@ -2333,7 +2335,7 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pResInfo->numOfRes == 0 || pInfo->ts < cts) {
|
if (pResInfo->numOfRes == 0 || pInfo->ts < cts) {
|
||||||
char* data = colDataGetData(pInputCol, chosen);
|
char* data = colDataGetData(pInputCol, chosen);
|
||||||
int32_t code = doSaveCurrentVal(pCtx, i, cts, type, data);
|
int32_t code = doSaveCurrentVal(pCtx, i, cts, type, data);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
|
@ -2344,7 +2346,7 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) {
|
||||||
|
|
||||||
for (int32_t i = pInput->startRowIndex + round * 4; i < pInput->startRowIndex + pInput->numOfRows; ++i) {
|
for (int32_t i = pInput->startRowIndex + round * 4; i < pInput->startRowIndex + pInput->numOfRows; ++i) {
|
||||||
if (pResInfo->numOfRes == 0 || pInfo->ts < pts[i]) {
|
if (pResInfo->numOfRes == 0 || pInfo->ts < pts[i]) {
|
||||||
char* data = colDataGetData(pInputCol, i);
|
char* data = colDataGetData(pInputCol, i);
|
||||||
int32_t code = doSaveCurrentVal(pCtx, i, pts[i], type, data);
|
int32_t code = doSaveCurrentVal(pCtx, i, pts[i], type, data);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
|
@ -2361,7 +2363,7 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) {
|
||||||
numOfElems++;
|
numOfElems++;
|
||||||
|
|
||||||
if (pResInfo->numOfRes == 0 || pInfo->ts < pts[i]) {
|
if (pResInfo->numOfRes == 0 || pInfo->ts < pts[i]) {
|
||||||
char* data = colDataGetData(pInputCol, i);
|
char* data = colDataGetData(pInputCol, i);
|
||||||
int32_t code = doSaveCurrentVal(pCtx, i, pts[i], type, data);
|
int32_t code = doSaveCurrentVal(pCtx, i, pts[i], type, data);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
|
@ -2408,7 +2410,7 @@ static int32_t firstLastTransferInfoImpl(SFirstLastRes* pInput, SFirstLastRes* p
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t firstLastTransferInfo(SqlFunctionCtx* pCtx, SFirstLastRes* pInput, SFirstLastRes* pOutput, bool isFirst,
|
static int32_t firstLastTransferInfo(SqlFunctionCtx* pCtx, SFirstLastRes* pInput, SFirstLastRes* pOutput, bool isFirst,
|
||||||
int32_t rowIndex) {
|
int32_t rowIndex) {
|
||||||
if (TSDB_CODE_SUCCESS == firstLastTransferInfoImpl(pInput, pOutput, isFirst)) {
|
if (TSDB_CODE_SUCCESS == firstLastTransferInfoImpl(pInput, pOutput, isFirst)) {
|
||||||
int32_t code = firstlastSaveTupleData(pCtx->pSrcBlock, rowIndex, pCtx, pOutput);
|
int32_t code = firstlastSaveTupleData(pCtx->pSrcBlock, rowIndex, pCtx, pOutput);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
@ -2435,7 +2437,7 @@ static int32_t firstLastFunctionMergeImpl(SqlFunctionCtx* pCtx, bool isFirstQuer
|
||||||
for (int32_t i = start; i < start + pInput->numOfRows; ++i) {
|
for (int32_t i = start; i < start + pInput->numOfRows; ++i) {
|
||||||
char* data = colDataGetData(pCol, i);
|
char* data = colDataGetData(pCol, i);
|
||||||
SFirstLastRes* pInputInfo = (SFirstLastRes*)varDataVal(data);
|
SFirstLastRes* pInputInfo = (SFirstLastRes*)varDataVal(data);
|
||||||
int32_t code = firstLastTransferInfo(pCtx, pInputInfo, pInfo, isFirstQuery, i);
|
int32_t code = firstLastTransferInfo(pCtx, pInputInfo, pInfo, isFirstQuery, i);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -2667,7 +2669,7 @@ static int32_t doSetPrevVal(SDiffInfo* pDiffInfo, int32_t type, const char* pv,
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t doHandleDiff(SDiffInfo* pDiffInfo, int32_t type, const char* pv, SColumnInfoData* pOutput, int32_t pos,
|
static int32_t doHandleDiff(SDiffInfo* pDiffInfo, int32_t type, const char* pv, SColumnInfoData* pOutput, int32_t pos,
|
||||||
int32_t order, int64_t ts) {
|
int32_t order, int64_t ts) {
|
||||||
int32_t factor = (order == TSDB_ORDER_ASC) ? 1 : -1;
|
int32_t factor = (order == TSDB_ORDER_ASC) ? 1 : -1;
|
||||||
pDiffInfo->prevTs = ts;
|
pDiffInfo->prevTs = ts;
|
||||||
switch (type) {
|
switch (type) {
|
||||||
|
@ -2875,8 +2877,8 @@ static STopBotRes* getTopBotOutputInfo(SqlFunctionCtx* pCtx) {
|
||||||
return pRes;
|
return pRes;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSDataBlock* pSrcBlock, uint16_t type,
|
static int32_t doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSDataBlock* pSrcBlock,
|
||||||
uint64_t uid, SResultRowEntryInfo* pEntryInfo, bool isTopQuery);
|
uint16_t type, uint64_t uid, SResultRowEntryInfo* pEntryInfo, bool isTopQuery);
|
||||||
|
|
||||||
static void addResult(SqlFunctionCtx* pCtx, STopBotResItem* pSourceItem, int16_t type, bool isTopQuery);
|
static void addResult(SqlFunctionCtx* pCtx, STopBotResItem* pSourceItem, int16_t type, bool isTopQuery);
|
||||||
|
|
||||||
|
@ -2897,7 +2899,7 @@ int32_t topFunction(SqlFunctionCtx* pCtx) {
|
||||||
}
|
}
|
||||||
|
|
||||||
numOfElems++;
|
numOfElems++;
|
||||||
char* data = colDataGetData(pCol, i);
|
char* data = colDataGetData(pCol, i);
|
||||||
int32_t code = doAddIntoResult(pCtx, data, i, pCtx->pSrcBlock, pRes->type, pInput->uid, pResInfo, true);
|
int32_t code = doAddIntoResult(pCtx, data, i, pCtx->pSrcBlock, pRes->type, pInput->uid, pResInfo, true);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
|
@ -2931,7 +2933,7 @@ int32_t bottomFunction(SqlFunctionCtx* pCtx) {
|
||||||
}
|
}
|
||||||
|
|
||||||
numOfElems++;
|
numOfElems++;
|
||||||
char* data = colDataGetData(pCol, i);
|
char* data = colDataGetData(pCol, i);
|
||||||
int32_t code = doAddIntoResult(pCtx, data, i, pCtx->pSrcBlock, pRes->type, pInput->uid, pResInfo, false);
|
int32_t code = doAddIntoResult(pCtx, data, i, pCtx->pSrcBlock, pRes->type, pInput->uid, pResInfo, false);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
|
@ -2983,7 +2985,7 @@ static int32_t topBotResComparFn(const void* p1, const void* p2, const void* par
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSDataBlock* pSrcBlock, uint16_t type,
|
int32_t doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSDataBlock* pSrcBlock, uint16_t type,
|
||||||
uint64_t uid, SResultRowEntryInfo* pEntryInfo, bool isTopQuery) {
|
uint64_t uid, SResultRowEntryInfo* pEntryInfo, bool isTopQuery) {
|
||||||
STopBotRes* pRes = getTopBotOutputInfo(pCtx);
|
STopBotRes* pRes = getTopBotOutputInfo(pCtx);
|
||||||
|
|
||||||
SVariant val = {0};
|
SVariant val = {0};
|
||||||
|
@ -3174,7 +3176,7 @@ static char* doLoadTupleData(SSerializeDataHandle* pHandle, const STuplePos* pPo
|
||||||
if (pPage == NULL) {
|
if (pPage == NULL) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
char* p = pPage->data + pPos->offset;
|
char* p = pPage->data + pPos->offset;
|
||||||
releaseBufPage(pHandle->pBuf, pPage);
|
releaseBufPage(pHandle->pBuf, pPage);
|
||||||
return p;
|
return p;
|
||||||
} else {
|
} else {
|
||||||
|
@ -4635,7 +4637,7 @@ int32_t sampleFunction(SqlFunctionCtx* pCtx) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
char* data = colDataGetData(pInputCol, i);
|
char* data = colDataGetData(pInputCol, i);
|
||||||
int32_t code = doReservoirSample(pCtx, pInfo, data, i);
|
int32_t code = doReservoirSample(pCtx, pInfo, data, i);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
|
@ -4655,7 +4657,7 @@ int32_t sampleFunction(SqlFunctionCtx* pCtx) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t sampleFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
int32_t sampleFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx);
|
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx);
|
||||||
|
|
||||||
SSampleInfo* pInfo = getSampleOutputInfo(pCtx);
|
SSampleInfo* pInfo = getSampleOutputInfo(pCtx);
|
||||||
|
@ -4989,7 +4991,7 @@ int32_t modeFunction(SqlFunctionCtx* pCtx) {
|
||||||
}
|
}
|
||||||
numOfElems++;
|
numOfElems++;
|
||||||
|
|
||||||
char* data = colDataGetData(pInputCol, i);
|
char* data = colDataGetData(pInputCol, i);
|
||||||
int32_t code = doModeAdd(pInfo, i, pCtx, data);
|
int32_t code = doModeAdd(pInfo, i, pCtx, data);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
|
@ -5410,6 +5412,7 @@ int32_t blockDistFunction(SqlFunctionCtx* pCtx) {
|
||||||
if (pDistInfo->maxRows < p1.maxRows) {
|
if (pDistInfo->maxRows < p1.maxRows) {
|
||||||
pDistInfo->maxRows = p1.maxRows;
|
pDistInfo->maxRows = p1.maxRows;
|
||||||
}
|
}
|
||||||
|
pDistInfo->numOfVgroups += (p1.numOfTables != 0 ? 1 : 0);
|
||||||
|
|
||||||
for (int32_t i = 0; i < tListLen(pDistInfo->blockRowsHisto); ++i) {
|
for (int32_t i = 0; i < tListLen(pDistInfo->blockRowsHisto); ++i) {
|
||||||
pDistInfo->blockRowsHisto[i] += p1.blockRowsHisto[i];
|
pDistInfo->blockRowsHisto[i] += p1.blockRowsHisto[i];
|
||||||
|
@ -5438,6 +5441,7 @@ int32_t tSerializeBlockDistInfo(void* buf, int32_t bufLen, const STableBlockDist
|
||||||
if (tEncodeI32(&encoder, pInfo->defMinRows) < 0) return -1;
|
if (tEncodeI32(&encoder, pInfo->defMinRows) < 0) return -1;
|
||||||
if (tEncodeU32(&encoder, pInfo->numOfInmemRows) < 0) return -1;
|
if (tEncodeU32(&encoder, pInfo->numOfInmemRows) < 0) return -1;
|
||||||
if (tEncodeU32(&encoder, pInfo->numOfSmallBlocks) < 0) return -1;
|
if (tEncodeU32(&encoder, pInfo->numOfSmallBlocks) < 0) return -1;
|
||||||
|
if (tEncodeU32(&encoder, pInfo->numOfVgroups) < 0) return -1;
|
||||||
|
|
||||||
for (int32_t i = 0; i < tListLen(pInfo->blockRowsHisto); ++i) {
|
for (int32_t i = 0; i < tListLen(pInfo->blockRowsHisto); ++i) {
|
||||||
if (tEncodeI32(&encoder, pInfo->blockRowsHisto[i]) < 0) return -1;
|
if (tEncodeI32(&encoder, pInfo->blockRowsHisto[i]) < 0) return -1;
|
||||||
|
@ -5469,6 +5473,7 @@ int32_t tDeserializeBlockDistInfo(void* buf, int32_t bufLen, STableBlockDistInfo
|
||||||
if (tDecodeI32(&decoder, &pInfo->defMinRows) < 0) return -1;
|
if (tDecodeI32(&decoder, &pInfo->defMinRows) < 0) return -1;
|
||||||
if (tDecodeU32(&decoder, &pInfo->numOfInmemRows) < 0) return -1;
|
if (tDecodeU32(&decoder, &pInfo->numOfInmemRows) < 0) return -1;
|
||||||
if (tDecodeU32(&decoder, &pInfo->numOfSmallBlocks) < 0) return -1;
|
if (tDecodeU32(&decoder, &pInfo->numOfSmallBlocks) < 0) return -1;
|
||||||
|
if (tDecodeU32(&decoder, &pInfo->numOfVgroups) < 0) return -1;
|
||||||
|
|
||||||
for (int32_t i = 0; i < tListLen(pInfo->blockRowsHisto); ++i) {
|
for (int32_t i = 0; i < tListLen(pInfo->blockRowsHisto); ++i) {
|
||||||
if (tDecodeI32(&decoder, &pInfo->blockRowsHisto[i]) < 0) return -1;
|
if (tDecodeI32(&decoder, &pInfo->blockRowsHisto[i]) < 0) return -1;
|
||||||
|
@ -5520,7 +5525,7 @@ int32_t blockDistFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||||
colDataAppend(pColInfo, row++, st, false);
|
colDataAppend(pColInfo, row++, st, false);
|
||||||
|
|
||||||
len = sprintf(st + VARSTR_HEADER_SIZE, "Total_Tables=[%d] Total_Files=[%d] Total_Vgroups=[%d]", pData->numOfTables,
|
len = sprintf(st + VARSTR_HEADER_SIZE, "Total_Tables=[%d] Total_Files=[%d] Total_Vgroups=[%d]", pData->numOfTables,
|
||||||
pData->numOfFiles, 0);
|
pData->numOfFiles, pData->numOfVgroups);
|
||||||
|
|
||||||
varDataSetLen(st, len);
|
varDataSetLen(st, len);
|
||||||
colDataAppend(pColInfo, row++, st, false);
|
colDataAppend(pColInfo, row++, st, false);
|
||||||
|
|
Loading…
Reference in New Issue