add quick cmd
This commit is contained in:
parent
f876bfd6c9
commit
eddc7918d3
|
@ -299,6 +299,26 @@ typedef struct STableBlockDistInfo {
|
||||||
int32_t tSerializeBlockDistInfo(void* buf, int32_t bufLen, const STableBlockDistInfo* pInfo);
|
int32_t tSerializeBlockDistInfo(void* buf, int32_t bufLen, const STableBlockDistInfo* pInfo);
|
||||||
int32_t tDeserializeBlockDistInfo(void* buf, int32_t bufLen, STableBlockDistInfo* pInfo);
|
int32_t tDeserializeBlockDistInfo(void* buf, int32_t bufLen, STableBlockDistInfo* pInfo);
|
||||||
|
|
||||||
|
typedef struct SDBBlockUsageInfo {
|
||||||
|
uint32_t rowSize;
|
||||||
|
uint16_t numOfFiles;
|
||||||
|
uint32_t numOfTables;
|
||||||
|
uint32_t numOfBlocks;
|
||||||
|
uint64_t totalSize;
|
||||||
|
uint64_t totalRows;
|
||||||
|
int32_t maxRows;
|
||||||
|
int32_t minRows;
|
||||||
|
int32_t defMinRows;
|
||||||
|
int32_t defMaxRows;
|
||||||
|
int32_t firstSeekTimeUs;
|
||||||
|
uint32_t numOfInmemRows;
|
||||||
|
uint32_t numOfSttRows;
|
||||||
|
uint32_t numOfVgroups;
|
||||||
|
} SDBBlockUsageInfo;
|
||||||
|
|
||||||
|
int32_t tSerializeBlockDbUsage(void* buf, int32_t bufLen, const SDBBlockUsageInfo* pInfo);
|
||||||
|
int32_t tDeserializeBlockDbUsage(void* buf, int32_t bufLen, SDBBlockUsageInfo* pInfo);
|
||||||
|
|
||||||
enum {
|
enum {
|
||||||
FUNC_PARAM_TYPE_VALUE = 0x1,
|
FUNC_PARAM_TYPE_VALUE = 0x1,
|
||||||
FUNC_PARAM_TYPE_COLUMN = 0x2,
|
FUNC_PARAM_TYPE_COLUMN = 0x2,
|
||||||
|
|
|
@ -218,6 +218,9 @@ typedef enum EFunctionType {
|
||||||
FUNCTION_TYPE_CONTAINS,
|
FUNCTION_TYPE_CONTAINS,
|
||||||
FUNCTION_TYPE_CONTAINS_PROPERLY,
|
FUNCTION_TYPE_CONTAINS_PROPERLY,
|
||||||
|
|
||||||
|
FUNCTION_TYPE_DB_USAGE = 4300,
|
||||||
|
FUNCTION_TYPE_DB_USAGE_INFO,
|
||||||
|
|
||||||
// user defined funcion
|
// user defined funcion
|
||||||
FUNCTION_TYPE_UDF = 10000
|
FUNCTION_TYPE_UDF = 10000
|
||||||
} EFunctionType;
|
} EFunctionType;
|
||||||
|
@ -289,11 +292,13 @@ bool fmIsPrimaryKeyFunc(int32_t funcId);
|
||||||
bool fmIsProcessByRowFunc(int32_t funcId);
|
bool fmIsProcessByRowFunc(int32_t funcId);
|
||||||
bool fmisSelectGroupConstValueFunc(int32_t funcId);
|
bool fmisSelectGroupConstValueFunc(int32_t funcId);
|
||||||
bool fmIsElapsedFunc(int32_t funcId);
|
bool fmIsElapsedFunc(int32_t funcId);
|
||||||
|
bool fmIsDBUsageFunc(int32_t funcId);
|
||||||
|
|
||||||
void getLastCacheDataType(SDataType* pType, int32_t pkBytes);
|
void getLastCacheDataType(SDataType* pType, int32_t pkBytes);
|
||||||
int32_t createFunction(const char* pName, SNodeList* pParameterList, SFunctionNode** pFunc);
|
int32_t createFunction(const char* pName, SNodeList* pParameterList, SFunctionNode** pFunc);
|
||||||
|
|
||||||
int32_t fmGetDistMethod(const SFunctionNode* pFunc, SFunctionNode** pPartialFunc, SFunctionNode** pMidFunc, SFunctionNode** pMergeFunc);
|
int32_t fmGetDistMethod(const SFunctionNode* pFunc, SFunctionNode** pPartialFunc, SFunctionNode** pMidFunc,
|
||||||
|
SFunctionNode** pMergeFunc);
|
||||||
|
|
||||||
typedef enum EFuncDataRequired {
|
typedef enum EFuncDataRequired {
|
||||||
FUNC_DATA_REQUIRED_DATA_LOAD = 1,
|
FUNC_DATA_REQUIRED_DATA_LOAD = 1,
|
||||||
|
|
|
@ -375,6 +375,11 @@ typedef struct SShowTableDistributedStmt {
|
||||||
char tableName[TSDB_TABLE_NAME_LEN];
|
char tableName[TSDB_TABLE_NAME_LEN];
|
||||||
} SShowTableDistributedStmt;
|
} SShowTableDistributedStmt;
|
||||||
|
|
||||||
|
typedef struct SShowDBUsageStmt {
|
||||||
|
ENodeType type;
|
||||||
|
char dbName[TSDB_DB_NAME_LEN];
|
||||||
|
} SShowDBUsageStmt;
|
||||||
|
|
||||||
typedef struct SShowDnodeVariablesStmt {
|
typedef struct SShowDnodeVariablesStmt {
|
||||||
ENodeType type;
|
ENodeType type;
|
||||||
SNode* pDnodeId;
|
SNode* pDnodeId;
|
||||||
|
|
|
@ -77,7 +77,8 @@ typedef enum EScanType {
|
||||||
SCAN_TYPE_TABLE_MERGE,
|
SCAN_TYPE_TABLE_MERGE,
|
||||||
SCAN_TYPE_BLOCK_INFO,
|
SCAN_TYPE_BLOCK_INFO,
|
||||||
SCAN_TYPE_LAST_ROW,
|
SCAN_TYPE_LAST_ROW,
|
||||||
SCAN_TYPE_TABLE_COUNT
|
SCAN_TYPE_TABLE_COUNT,
|
||||||
|
SCAN_TYPE_DB_DISK_USAGE,
|
||||||
} EScanType;
|
} EScanType;
|
||||||
|
|
||||||
typedef struct SScanLogicNode {
|
typedef struct SScanLogicNode {
|
||||||
|
|
|
@ -242,6 +242,10 @@ int32_t groupKeyCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
|
||||||
int32_t groupConstValueFunction(SqlFunctionCtx* pCtx);
|
int32_t groupConstValueFunction(SqlFunctionCtx* pCtx);
|
||||||
int32_t groupConstValueFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
int32_t groupConstValueFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
||||||
|
|
||||||
|
int32_t blockDBUsageSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo);
|
||||||
|
int32_t blockDBUsageFunction(SqlFunctionCtx* pCtx);
|
||||||
|
int32_t blockDBUsageFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -694,14 +694,12 @@ static int32_t checkRangeValue(SNode *pNode, SParamRange range, bool *isMatch) {
|
||||||
if (pNode->type == QUERY_NODE_VALUE) {
|
if (pNode->type == QUERY_NODE_VALUE) {
|
||||||
SValueNode* pVal = (SValueNode*)pNode;
|
SValueNode* pVal = (SValueNode*)pNode;
|
||||||
if (IS_INTEGER_TYPE(getSDataTypeFromNode(pNode)->type)) {
|
if (IS_INTEGER_TYPE(getSDataTypeFromNode(pNode)->type)) {
|
||||||
if (pVal->datum.i < range.iMinVal ||
|
if (pVal->datum.i < range.iMinVal || pVal->datum.i > range.iMaxVal) {
|
||||||
pVal->datum.i > range.iMaxVal) {
|
|
||||||
code = TSDB_CODE_FUNC_FUNTION_PARA_RANGE;
|
code = TSDB_CODE_FUNC_FUNTION_PARA_RANGE;
|
||||||
*isMatch = false;
|
*isMatch = false;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if ((int64_t)pVal->datum.d < range.iMinVal ||
|
if ((int64_t)pVal->datum.d < range.iMinVal || (int64_t)pVal->datum.d > range.iMaxVal) {
|
||||||
(int64_t)pVal->datum.d > range.iMaxVal) {
|
|
||||||
code = TSDB_CODE_FUNC_FUNTION_PARA_RANGE;
|
code = TSDB_CODE_FUNC_FUNTION_PARA_RANGE;
|
||||||
*isMatch = false;
|
*isMatch = false;
|
||||||
}
|
}
|
||||||
|
@ -715,8 +713,7 @@ static int32_t checkRangeValue(SNode *pNode, SParamRange range, bool *isMatch) {
|
||||||
static int32_t checkFixedValue(SNode* pNode, const SParamInfo* paramPattern, int32_t paramIdx, bool* isMatch) {
|
static int32_t checkFixedValue(SNode* pNode, const SParamInfo* paramPattern, int32_t paramIdx, bool* isMatch) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
bool checkStr = paramSupportVarBinary(paramPattern->validDataType) ||
|
bool checkStr = paramSupportVarBinary(paramPattern->validDataType) ||
|
||||||
paramSupportVarchar(paramPattern->validDataType) ||
|
paramSupportVarchar(paramPattern->validDataType) || paramSupportNchar(paramPattern->validDataType);
|
||||||
paramSupportNchar(paramPattern->validDataType);
|
|
||||||
if (pNode->type == QUERY_NODE_VALUE) {
|
if (pNode->type == QUERY_NODE_VALUE) {
|
||||||
SValueNode* pVal = (SValueNode*)pNode;
|
SValueNode* pVal = (SValueNode*)pNode;
|
||||||
if (!checkStr) {
|
if (!checkStr) {
|
||||||
|
@ -827,7 +824,8 @@ static int32_t validateParam(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||||
const SParamInfo* paramPattern = funcMgtBuiltins[pFunc->funcId].parameters.inputParaInfo[i];
|
const SParamInfo* paramPattern = funcMgtBuiltins[pFunc->funcId].parameters.inputParaInfo[i];
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
for (int8_t j = paramPattern[paramIdx].startParam; j <= (paramPattern[paramIdx].endParam == -1 ? INT8_MAX : paramPattern[paramIdx].endParam); j++) {
|
for (int8_t j = paramPattern[paramIdx].startParam;
|
||||||
|
j <= (paramPattern[paramIdx].endParam == -1 ? INT8_MAX : paramPattern[paramIdx].endParam); j++) {
|
||||||
if (j > LIST_LENGTH(paramList)) {
|
if (j > LIST_LENGTH(paramList)) {
|
||||||
code = TSDB_CODE_SUCCESS;
|
code = TSDB_CODE_SUCCESS;
|
||||||
isMatch = true;
|
isMatch = true;
|
||||||
|
@ -911,8 +909,8 @@ static int32_t validateParam(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||||
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_PARA_RANGE, "Invalid parameter range : %s",
|
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_PARA_RANGE, "Invalid parameter range : %s",
|
||||||
pFunc->functionName);
|
pFunc->functionName);
|
||||||
case TSDB_CODE_FUNC_FUNTION_PARA_PRIMTS:
|
case TSDB_CODE_FUNC_FUNTION_PARA_PRIMTS:
|
||||||
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_PARA_PRIMTS, "Parameter should be primary timestamp : %s",
|
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_PARA_PRIMTS,
|
||||||
pFunc->functionName);
|
"Parameter should be primary timestamp : %s", pFunc->functionName);
|
||||||
case TSDB_CODE_FUNC_FUNTION_PARA_PK:
|
case TSDB_CODE_FUNC_FUNTION_PARA_PK:
|
||||||
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_PARA_PK, "Parameter should be primary key : %s",
|
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_PARA_PK, "Parameter should be primary key : %s",
|
||||||
pFunc->functionName);
|
pFunc->functionName);
|
||||||
|
@ -965,7 +963,6 @@ static int32_t translateOutDouble(SFunctionNode* pFunc, char* pErrBuf, int32_t l
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static int32_t translateTrimStr(SFunctionNode* pFunc, char* pErrBuf, int32_t len, bool isLtrim) {
|
static int32_t translateTrimStr(SFunctionNode* pFunc, char* pErrBuf, int32_t len, bool isLtrim) {
|
||||||
FUNC_ERR_RET(validateParam(pFunc, pErrBuf, len));
|
FUNC_ERR_RET(validateParam(pFunc, pErrBuf, len));
|
||||||
|
|
||||||
|
@ -1020,7 +1017,8 @@ static int32_t translateSum(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||||
|
|
||||||
static int32_t translateWduration(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
static int32_t translateWduration(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||||
// pseudo column do not need to check parameters
|
// pseudo column do not need to check parameters
|
||||||
pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes, .type = TSDB_DATA_TYPE_BIGINT,
|
pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes,
|
||||||
|
.type = TSDB_DATA_TYPE_BIGINT,
|
||||||
.precision = pFunc->node.resType.precision};
|
.precision = pFunc->node.resType.precision};
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -1050,8 +1048,7 @@ static int32_t translateRand(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pFunc->node.resType =
|
pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_DOUBLE].bytes, .type = TSDB_DATA_TYPE_DOUBLE};
|
||||||
(SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_DOUBLE].bytes, .type = TSDB_DATA_TYPE_DOUBLE};
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1065,8 +1062,8 @@ static int32_t translateOutFirstIn(SFunctionNode* pFunc, char* pErrBuf, int32_t
|
||||||
static int32_t translateTimePseudoColumn(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
static int32_t translateTimePseudoColumn(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||||
// pseudo column do not need to check parameters
|
// pseudo column do not need to check parameters
|
||||||
|
|
||||||
pFunc->node.resType =
|
pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes,
|
||||||
(SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes, .type = TSDB_DATA_TYPE_TIMESTAMP,
|
.type = TSDB_DATA_TYPE_TIMESTAMP,
|
||||||
.precision = pFunc->node.resType.precision};
|
.precision = pFunc->node.resType.precision};
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -1097,7 +1094,6 @@ static int32_t translateVgIdColumn(SFunctionNode* pFunc, char* pErrBuf, int32_t
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static int32_t reserveFirstMergeParam(SNodeList* pRawParameters, SNode* pPartialRes, SNodeList** pParameters) {
|
static int32_t reserveFirstMergeParam(SNodeList* pRawParameters, SNode* pPartialRes, SNodeList** pParameters) {
|
||||||
int32_t code = nodesListMakeAppend(pParameters, pPartialRes);
|
int32_t code = nodesListMakeAppend(pParameters, pPartialRes);
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
@ -1126,13 +1122,9 @@ int32_t apercentileCreateMergeParam(SNodeList* pRawParameters, SNode* pPartialRe
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t translateElapsedPartial(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
static int32_t translateElapsedPartial(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { return 0; }
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t translateElapsedMerge(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
static int32_t translateElapsedMerge(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { return 0; }
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t translateCsum(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
static int32_t translateCsum(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||||
FUNC_ERR_RET(validateParam(pFunc, pErrBuf, len));
|
FUNC_ERR_RET(validateParam(pFunc, pErrBuf, len));
|
||||||
|
@ -1160,8 +1152,8 @@ static int32_t translateSampleTail(SFunctionNode* pFunc, char* pErrBuf, int32_t
|
||||||
uint8_t colType = pSDataType->type;
|
uint8_t colType = pSDataType->type;
|
||||||
|
|
||||||
// set result type
|
// set result type
|
||||||
pFunc->node.resType = (SDataType){.bytes = IS_STR_DATA_TYPE(colType) ? pSDataType->bytes : tDataTypes[colType].bytes,
|
pFunc->node.resType =
|
||||||
.type = colType};
|
(SDataType){.bytes = IS_STR_DATA_TYPE(colType) ? pSDataType->bytes : tDataTypes[colType].bytes, .type = colType};
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1327,14 +1319,14 @@ static int32_t translateChar(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||||
|
|
||||||
static int32_t translateAscii(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
static int32_t translateAscii(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||||
FUNC_ERR_RET(validateParam(pFunc, pErrBuf, len));
|
FUNC_ERR_RET(validateParam(pFunc, pErrBuf, len));
|
||||||
pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_UTINYINT].bytes, .type = TSDB_DATA_TYPE_UTINYINT};
|
pFunc->node.resType =
|
||||||
|
(SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_UTINYINT].bytes, .type = TSDB_DATA_TYPE_UTINYINT};
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t translateTrim(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
static int32_t translateTrim(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||||
FUNC_ERR_RET(validateParam(pFunc, pErrBuf, len));
|
FUNC_ERR_RET(validateParam(pFunc, pErrBuf, len));
|
||||||
|
|
||||||
|
|
||||||
uint8_t para0Type = getSDataTypeFromNode(nodesListGetNode(pFunc->pParameterList, 0))->type;
|
uint8_t para0Type = getSDataTypeFromNode(nodesListGetNode(pFunc->pParameterList, 0))->type;
|
||||||
int32_t resLen = getSDataTypeFromNode(nodesListGetNode(pFunc->pParameterList, 0))->bytes;
|
int32_t resLen = getSDataTypeFromNode(nodesListGetNode(pFunc->pParameterList, 0))->bytes;
|
||||||
uint8_t type = para0Type;
|
uint8_t type = para0Type;
|
||||||
|
@ -1522,8 +1514,7 @@ static int32_t translateAddPrecOutBigint(SFunctionNode* pFunc, char* pErrBuf, in
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
pFunc->node.resType =
|
pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes, .type = TSDB_DATA_TYPE_BIGINT};
|
||||||
(SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes, .type = TSDB_DATA_TYPE_BIGINT};
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1535,7 +1526,8 @@ static int32_t translateToJson(SFunctionNode* pFunc, char* pErrBuf, int32_t len)
|
||||||
|
|
||||||
static int32_t translateOutGeom(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
static int32_t translateOutGeom(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||||
FUNC_ERR_RET(validateParam(pFunc, pErrBuf, len));
|
FUNC_ERR_RET(validateParam(pFunc, pErrBuf, len));
|
||||||
pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_GEOMETRY].bytes, .type = TSDB_DATA_TYPE_GEOMETRY};
|
pFunc->node.resType =
|
||||||
|
(SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_GEOMETRY].bytes, .type = TSDB_DATA_TYPE_GEOMETRY};
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -1573,6 +1565,10 @@ static bool getBlockDistFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv
|
||||||
pEnv->calcMemSize = sizeof(STableBlockDistInfo);
|
pEnv->calcMemSize = sizeof(STableBlockDistInfo);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
static bool getBlockDBUsageFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
|
||||||
|
pEnv->calcMemSize = sizeof(SDBBlockUsageInfo);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t translateGroupKey(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
static int32_t translateGroupKey(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||||
if (1 != LIST_LENGTH(pFunc->pParameterList)) {
|
if (1 != LIST_LENGTH(pFunc->pParameterList)) {
|
||||||
|
@ -1582,7 +1578,6 @@ static int32_t translateGroupKey(SFunctionNode* pFunc, char* pErrBuf, int32_t le
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static int32_t translateServerStatusFunc(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
static int32_t translateServerStatusFunc(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||||
pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_INT].bytes, .type = TSDB_DATA_TYPE_INT};
|
pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_INT].bytes, .type = TSDB_DATA_TYPE_INT};
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -1665,12 +1660,17 @@ static int32_t translateOutVarchar(SFunctionNode* pFunc, char* pErrBuf, int32_t
|
||||||
case FUNCTION_TYPE_FIRST_STATE:
|
case FUNCTION_TYPE_FIRST_STATE:
|
||||||
case FUNCTION_TYPE_LAST_STATE:
|
case FUNCTION_TYPE_LAST_STATE:
|
||||||
bytes = getFirstLastInfoSize(getSDataTypeFromNode(nodesListGetNode(pFunc->pParameterList, 0))->bytes,
|
bytes = getFirstLastInfoSize(getSDataTypeFromNode(nodesListGetNode(pFunc->pParameterList, 0))->bytes,
|
||||||
(pFunc->hasPk) ? pFunc->pkBytes : 0) + VARSTR_HEADER_SIZE;
|
(pFunc->hasPk) ? pFunc->pkBytes : 0) +
|
||||||
|
VARSTR_HEADER_SIZE;
|
||||||
break;
|
break;
|
||||||
case FUNCTION_TYPE_FIRST_STATE_MERGE:
|
case FUNCTION_TYPE_FIRST_STATE_MERGE:
|
||||||
case FUNCTION_TYPE_LAST_STATE_MERGE:
|
case FUNCTION_TYPE_LAST_STATE_MERGE:
|
||||||
bytes = getSDataTypeFromNode(nodesListGetNode(pFunc->pParameterList, 0))->bytes;
|
bytes = getSDataTypeFromNode(nodesListGetNode(pFunc->pParameterList, 0))->bytes;
|
||||||
break;
|
break;
|
||||||
|
case FUNCTION_TYPE_DB_USAGE:
|
||||||
|
case FUNCTION_TYPE_DB_USAGE_INFO:
|
||||||
|
bytes = 128;
|
||||||
|
break;
|
||||||
default:
|
default:
|
||||||
bytes = 0;
|
bytes = 0;
|
||||||
break;
|
break;
|
||||||
|
@ -5587,6 +5587,30 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
||||||
.sprocessFunc = NULL,
|
.sprocessFunc = NULL,
|
||||||
.finalizeFunc = NULL
|
.finalizeFunc = NULL
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
.name = "_db_usage",
|
||||||
|
.type = FUNCTION_TYPE_DB_USAGE,
|
||||||
|
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_FORBID_STREAM_FUNC,
|
||||||
|
.parameters = {.minParamNum = 0,
|
||||||
|
.maxParamNum = 0,
|
||||||
|
.paramInfoPattern = 0,
|
||||||
|
.outputParaInfo = {.validDataType = FUNC_PARAM_SUPPORT_VARCHAR_TYPE}},
|
||||||
|
.translateFunc = translateOutVarchar,
|
||||||
|
.getEnvFunc = getBlockDBUsageFuncEnv,
|
||||||
|
.initFunc = blockDBUsageSetup,
|
||||||
|
.processFunc = blockDBUsageFunction,
|
||||||
|
.finalizeFunc = blockDBUsageFinalize
|
||||||
|
},
|
||||||
|
{
|
||||||
|
.name = "_db_usage_info",
|
||||||
|
.type = FUNCTION_TYPE_DB_USAGE_INFO,
|
||||||
|
.classification = FUNC_MGT_PSEUDO_COLUMN_FUNC | FUNC_MGT_SCAN_PC_FUNC,
|
||||||
|
.parameters = {.minParamNum = 0,
|
||||||
|
.maxParamNum = 0,
|
||||||
|
.paramInfoPattern = 0,
|
||||||
|
.outputParaInfo = {.validDataType = FUNC_PARAM_SUPPORT_VARCHAR_TYPE}},
|
||||||
|
.translateFunc = translateOutVarchar,
|
||||||
|
},
|
||||||
};
|
};
|
||||||
// clang-format on
|
// clang-format on
|
||||||
|
|
||||||
|
|
|
@ -28,12 +28,8 @@
|
||||||
#include "thistogram.h"
|
#include "thistogram.h"
|
||||||
#include "tpercentile.h"
|
#include "tpercentile.h"
|
||||||
|
|
||||||
bool ignoreNegative(int8_t ignoreOption){
|
bool ignoreNegative(int8_t ignoreOption) { return (ignoreOption & 0x1) == 0x1; }
|
||||||
return (ignoreOption & 0x1) == 0x1;
|
bool ignoreNull(int8_t ignoreOption) { return (ignoreOption & 0x2) == 0x2; }
|
||||||
}
|
|
||||||
bool ignoreNull(int8_t ignoreOption){
|
|
||||||
return (ignoreOption & 0x2) == 0x2;
|
|
||||||
}
|
|
||||||
|
|
||||||
typedef enum {
|
typedef enum {
|
||||||
APERCT_ALGO_UNKNOWN = 0,
|
APERCT_ALGO_UNKNOWN = 0,
|
||||||
|
@ -312,7 +308,8 @@ int32_t funcInputGetNextRowDescPk(SFuncInputRowIter* pIter, SFuncInputRow* pRow,
|
||||||
qError("out of memory when function get input row.");
|
qError("out of memory when function get input row.");
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
(void)memcpy(pIter->pPrevData, colDataGetData(pIter->pDataCol, pIter->inputEndIndex), pIter->pDataCol->info.bytes);
|
(void)memcpy(pIter->pPrevData, colDataGetData(pIter->pDataCol, pIter->inputEndIndex),
|
||||||
|
pIter->pDataCol->info.bytes);
|
||||||
pIter->pPrevPk = taosMemoryMalloc(pIter->pPkCol->info.bytes);
|
pIter->pPrevPk = taosMemoryMalloc(pIter->pPkCol->info.bytes);
|
||||||
if (NULL == pIter->pPrevPk) {
|
if (NULL == pIter->pPrevPk) {
|
||||||
qError("out of memory when function get input row.");
|
qError("out of memory when function get input row.");
|
||||||
|
@ -448,7 +445,8 @@ int32_t appendSelectivityCols(SqlFunctionCtx* pCtx, SSDataBlock* pSrcBlock, int3
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool funcInputGetNextRowIndex(SInputColumnInfoData* pInput, int32_t from, bool firstOccur, int32_t* pRowIndex, int32_t* nextFrom);
|
bool funcInputGetNextRowIndex(SInputColumnInfoData* pInput, int32_t from, bool firstOccur, int32_t* pRowIndex,
|
||||||
|
int32_t* nextFrom);
|
||||||
|
|
||||||
static bool firstLastTransferInfoImpl(SFirstLastRes* pInput, SFirstLastRes* pOutput, bool isFirst);
|
static bool firstLastTransferInfoImpl(SFirstLastRes* pInput, SFirstLastRes* pOutput, bool isFirst);
|
||||||
|
|
||||||
|
@ -1805,7 +1803,8 @@ int32_t percentileFunction(SqlFunctionCtx* pCtx) {
|
||||||
pResInfo->complete = true;
|
pResInfo->complete = true;
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
} else {
|
} else {
|
||||||
code = tMemBucketCreate(pCol->info.bytes, type, pInfo->minval, pInfo->maxval, pCtx->hasWindowOrGroup, &pInfo->pMemBucket);
|
code = tMemBucketCreate(pCol->info.bytes, type, pInfo->minval, pInfo->maxval, pCtx->hasWindowOrGroup,
|
||||||
|
&pInfo->pMemBucket);
|
||||||
if (TSDB_CODE_SUCCESS != code) {
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -1894,7 +1893,8 @@ int32_t percentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||||
if ((*pMemBucket) != NULL && (*pMemBucket)->total > 0) { // check for null
|
if ((*pMemBucket) != NULL && (*pMemBucket)->total > 0) { // check for null
|
||||||
if (pCtx->numOfParams > 2) {
|
if (pCtx->numOfParams > 2) {
|
||||||
char buf[3200] = {0};
|
char buf[3200] = {0};
|
||||||
// max length of double num is 317, e.g. use %.6lf to print -1.0e+308, consider the comma and bracket, 3200 is enough.
|
// max length of double num is 317, e.g. use %.6lf to print -1.0e+308, consider the comma and bracket, 3200 is
|
||||||
|
// enough.
|
||||||
size_t len = 1;
|
size_t len = 1;
|
||||||
|
|
||||||
varDataVal(buf)[0] = '[';
|
varDataVal(buf)[0] = '[';
|
||||||
|
@ -2444,7 +2444,8 @@ static int32_t firstlastSaveTupleData(const SSDataBlock* pSrcBlock, int32_t rowI
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t doSaveCurrentVal(SqlFunctionCtx* pCtx, int32_t rowIndex, int64_t currentTs, char* pkData, int32_t type, char* pData) {
|
static int32_t doSaveCurrentVal(SqlFunctionCtx* pCtx, int32_t rowIndex, int64_t currentTs, char* pkData, int32_t type,
|
||||||
|
char* pData) {
|
||||||
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||||
SFirstLastRes* pInfo = GET_ROWCELL_INTERBUF(pResInfo);
|
SFirstLastRes* pInfo = GET_ROWCELL_INTERBUF(pResInfo);
|
||||||
|
|
||||||
|
@ -2978,7 +2979,6 @@ static int32_t doSaveLastrow(SqlFunctionCtx* pCtx, char* pData, int32_t rowIndex
|
||||||
SColumnInfoData* pInputCol = pInput->pData[0];
|
SColumnInfoData* pInputCol = pInput->pData[0];
|
||||||
SColumnInfoData* pkCol = pInput->pPrimaryKey;
|
SColumnInfoData* pkCol = pInput->pPrimaryKey;
|
||||||
|
|
||||||
|
|
||||||
if (colDataIsNull_s(pInputCol, rowIndex)) {
|
if (colDataIsNull_s(pInputCol, rowIndex)) {
|
||||||
pInfo->isNull = true;
|
pInfo->isNull = true;
|
||||||
} else {
|
} else {
|
||||||
|
@ -3306,7 +3306,8 @@ static int32_t doHandleDiff(SDiffInfo* pDiffInfo, int32_t type, const char* pv,
|
||||||
|
|
||||||
// TODO: the primary key compare can be skipped for ordered pk if knonwn before
|
// TODO: the primary key compare can be skipped for ordered pk if knonwn before
|
||||||
// TODO: for desc ordered, pk shall select the smallest one for one ts. if across block boundaries.
|
// TODO: for desc ordered, pk shall select the smallest one for one ts. if across block boundaries.
|
||||||
bool funcInputGetNextRowIndex(SInputColumnInfoData* pInput, int32_t from, bool firstOccur, int32_t* pRowIndex, int32_t* nextFrom) {
|
bool funcInputGetNextRowIndex(SInputColumnInfoData* pInput, int32_t from, bool firstOccur, int32_t* pRowIndex,
|
||||||
|
int32_t* nextFrom) {
|
||||||
if (pInput->pPrimaryKey == NULL) {
|
if (pInput->pPrimaryKey == NULL) {
|
||||||
if (from == -1) {
|
if (from == -1) {
|
||||||
from = pInput->startRowIndex;
|
from = pInput->startRowIndex;
|
||||||
|
@ -3425,9 +3426,7 @@ int32_t setDoDiffResult(SqlFunctionCtx* pCtx, SFuncInputRow* pRow, int32_t pos)
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t diffFunction(SqlFunctionCtx* pCtx) {
|
int32_t diffFunction(SqlFunctionCtx* pCtx) { return TSDB_CODE_SUCCESS; }
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t diffFunctionByRow(SArray* pCtxArray) {
|
int32_t diffFunctionByRow(SArray* pCtxArray) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
@ -3866,7 +3865,8 @@ int32_t saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock*
|
||||||
return doSaveTupleData(&pCtx->saveHandle, buf, pCtx->subsidiaries.rowLen, &key, pPos, pCtx->pStore);
|
return doSaveTupleData(&pCtx->saveHandle, buf, pCtx->subsidiaries.rowLen, &key, pPos, pCtx->pStore);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t doUpdateTupleData(SSerializeDataHandle* pHandle, const void* pBuf, size_t length, STuplePos* pPos, SFunctionStateStore* pStore) {
|
static int32_t doUpdateTupleData(SSerializeDataHandle* pHandle, const void* pBuf, size_t length, STuplePos* pPos,
|
||||||
|
SFunctionStateStore* pStore) {
|
||||||
if (pHandle->pBuf != NULL) {
|
if (pHandle->pBuf != NULL) {
|
||||||
SFilePage* pPage = getBufPage(pHandle->pBuf, pPos->pageId);
|
SFilePage* pPage = getBufPage(pHandle->pBuf, pPos->pageId);
|
||||||
if (pPage == NULL) {
|
if (pPage == NULL) {
|
||||||
|
@ -3899,7 +3899,8 @@ int32_t updateTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBloc
|
||||||
return doUpdateTupleData(&pCtx->saveHandle, buf, pCtx->subsidiaries.rowLen, pPos, pCtx->pStore);
|
return doUpdateTupleData(&pCtx->saveHandle, buf, pCtx->subsidiaries.rowLen, pPos, pCtx->pStore);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t doLoadTupleData(SSerializeDataHandle* pHandle, const STuplePos* pPos, SFunctionStateStore* pStore, char** value) {
|
static int32_t doLoadTupleData(SSerializeDataHandle* pHandle, const STuplePos* pPos, SFunctionStateStore* pStore,
|
||||||
|
char** value) {
|
||||||
if (pHandle->pBuf != NULL) {
|
if (pHandle->pBuf != NULL) {
|
||||||
SFilePage* pPage = getBufPage(pHandle->pBuf, pPos->pageId);
|
SFilePage* pPage = getBufPage(pHandle->pBuf, pPos->pageId);
|
||||||
if (pPage == NULL) {
|
if (pPage == NULL) {
|
||||||
|
@ -4740,11 +4741,13 @@ int32_t histogramFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||||
int32_t len;
|
int32_t len;
|
||||||
char buf[512] = {0};
|
char buf[512] = {0};
|
||||||
if (!pInfo->normalized) {
|
if (!pInfo->normalized) {
|
||||||
len = tsnprintf(varDataVal(buf), sizeof(buf) - VARSTR_HEADER_SIZE, "{\"lower_bin\":%g, \"upper_bin\":%g, \"count\":%" PRId64 "}",
|
len = tsnprintf(varDataVal(buf), sizeof(buf) - VARSTR_HEADER_SIZE,
|
||||||
pInfo->bins[i].lower, pInfo->bins[i].upper, pInfo->bins[i].count);
|
"{\"lower_bin\":%g, \"upper_bin\":%g, \"count\":%" PRId64 "}", pInfo->bins[i].lower,
|
||||||
|
pInfo->bins[i].upper, pInfo->bins[i].count);
|
||||||
} else {
|
} else {
|
||||||
len = tsnprintf(varDataVal(buf), sizeof(buf) - VARSTR_HEADER_SIZE, "{\"lower_bin\":%g, \"upper_bin\":%g, \"count\":%lf}", pInfo->bins[i].lower,
|
len = tsnprintf(varDataVal(buf), sizeof(buf) - VARSTR_HEADER_SIZE,
|
||||||
pInfo->bins[i].upper, pInfo->bins[i].percentage);
|
"{\"lower_bin\":%g, \"upper_bin\":%g, \"count\":%lf}", pInfo->bins[i].lower, pInfo->bins[i].upper,
|
||||||
|
pInfo->bins[i].percentage);
|
||||||
}
|
}
|
||||||
varDataSetLen(buf, len);
|
varDataSetLen(buf, len);
|
||||||
code = colDataSetVal(pCol, currentRow, buf, false);
|
code = colDataSetVal(pCol, currentRow, buf, false);
|
||||||
|
@ -6386,23 +6389,26 @@ int32_t blockDistFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||||
avgRows = pData->totalRows / pData->numOfBlocks;
|
avgRows = pData->totalRows / pData->numOfBlocks;
|
||||||
}
|
}
|
||||||
|
|
||||||
len = tsnprintf(varDataVal(st), sizeof(st) - VARSTR_HEADER_SIZE, "Block_Rows=[%" PRId64 "] MinRows=[%d] MaxRows=[%d] AvgRows=[%" PRId64 "]",
|
len = tsnprintf(varDataVal(st), sizeof(st) - VARSTR_HEADER_SIZE,
|
||||||
pData->totalRows, pData->minRows, pData->maxRows, avgRows);
|
"Block_Rows=[%" PRId64 "] MinRows=[%d] MaxRows=[%d] AvgRows=[%" PRId64 "]", pData->totalRows,
|
||||||
|
pData->minRows, pData->maxRows, avgRows);
|
||||||
varDataSetLen(st, len);
|
varDataSetLen(st, len);
|
||||||
code = colDataSetVal(pColInfo, row++, st, false);
|
code = colDataSetVal(pColInfo, row++, st, false);
|
||||||
if (TSDB_CODE_SUCCESS != code) {
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
len = tsnprintf(varDataVal(st), sizeof(st) - VARSTR_HEADER_SIZE, "Inmem_Rows=[%d] Stt_Rows=[%d] ", pData->numOfInmemRows, pData->numOfSttRows);
|
len = tsnprintf(varDataVal(st), sizeof(st) - VARSTR_HEADER_SIZE, "Inmem_Rows=[%d] Stt_Rows=[%d] ",
|
||||||
|
pData->numOfInmemRows, pData->numOfSttRows);
|
||||||
varDataSetLen(st, len);
|
varDataSetLen(st, len);
|
||||||
code = colDataSetVal(pColInfo, row++, st, false);
|
code = colDataSetVal(pColInfo, row++, st, false);
|
||||||
if (TSDB_CODE_SUCCESS != code) {
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
len = tsnprintf(varDataVal(st), sizeof(st) - VARSTR_HEADER_SIZE, "Total_Tables=[%d] Total_Filesets=[%d] Total_Vgroups=[%d]", pData->numOfTables,
|
len = tsnprintf(varDataVal(st), sizeof(st) - VARSTR_HEADER_SIZE,
|
||||||
pData->numOfFiles, pData->numOfVgroups);
|
"Total_Tables=[%d] Total_Filesets=[%d] Total_Vgroups=[%d]", pData->numOfTables, pData->numOfFiles,
|
||||||
|
pData->numOfVgroups);
|
||||||
|
|
||||||
varDataSetLen(st, len);
|
varDataSetLen(st, len);
|
||||||
code = colDataSetVal(pColInfo, row++, st, false);
|
code = colDataSetVal(pColInfo, row++, st, false);
|
||||||
|
@ -6437,7 +6443,8 @@ int32_t blockDistFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||||
int32_t bucketRange = ceil(((double)(pData->defMaxRows - pData->defMinRows)) / numOfBuckets);
|
int32_t bucketRange = ceil(((double)(pData->defMaxRows - pData->defMinRows)) / numOfBuckets);
|
||||||
|
|
||||||
for (int32_t i = 0; i < tListLen(pData->blockRowsHisto); ++i) {
|
for (int32_t i = 0; i < tListLen(pData->blockRowsHisto); ++i) {
|
||||||
len = tsnprintf(varDataVal(st), sizeof(st) - VARSTR_HEADER_SIZE, "%04d |", pData->defMinRows + bucketRange * (i + 1));
|
len =
|
||||||
|
tsnprintf(varDataVal(st), sizeof(st) - VARSTR_HEADER_SIZE, "%04d |", pData->defMinRows + bucketRange * (i + 1));
|
||||||
|
|
||||||
int32_t num = 0;
|
int32_t num = 0;
|
||||||
if (pData->blockRowsHisto[i] > 0) {
|
if (pData->blockRowsHisto[i] > 0) {
|
||||||
|
@ -6451,7 +6458,8 @@ int32_t blockDistFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||||
|
|
||||||
if (pData->blockRowsHisto[i] > 0) {
|
if (pData->blockRowsHisto[i] > 0) {
|
||||||
double v = pData->blockRowsHisto[i] * 100.0 / pData->numOfBlocks;
|
double v = pData->blockRowsHisto[i] * 100.0 / pData->numOfBlocks;
|
||||||
len += tsnprintf(varDataVal(st) + len, sizeof(st) - VARSTR_HEADER_SIZE - len, " %d (%.2f%c)", pData->blockRowsHisto[i], v, '%');
|
len += tsnprintf(varDataVal(st) + len, sizeof(st) - VARSTR_HEADER_SIZE - len, " %d (%.2f%c)",
|
||||||
|
pData->blockRowsHisto[i], v, '%');
|
||||||
}
|
}
|
||||||
|
|
||||||
varDataSetLen(st, len);
|
varDataSetLen(st, len);
|
||||||
|
@ -6463,6 +6471,168 @@ int32_t blockDistFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
int32_t blockDBUsageSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) {
|
||||||
|
if (pResultInfo->initialized) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS != functionSetup(pCtx, pResultInfo)) {
|
||||||
|
return TSDB_CODE_FUNC_SETUP_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
|
SDBBlockUsageInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
||||||
|
pInfo->minRows = INT32_MAX;
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
int32_t blockDBUsageFunction(SqlFunctionCtx* pCtx) {
|
||||||
|
const int32_t BLOCK_DISK_USAGE_RESULT_ROWS = 2;
|
||||||
|
|
||||||
|
SInputColumnInfoData* pInput = &pCtx->input;
|
||||||
|
SColumnInfoData* pInputCol = pInput->pData[0];
|
||||||
|
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||||
|
SDBBlockUsageInfo* pDistInfo = GET_ROWCELL_INTERBUF(pResInfo);
|
||||||
|
|
||||||
|
SDBBlockUsageInfo p1 = {0};
|
||||||
|
if (tDeserializeBlockDbUsage(varDataVal(pInputCol->pData), varDataLen(pInputCol->pData), &p1) < 0) {
|
||||||
|
qError("failed to deserialize block dist info");
|
||||||
|
return TSDB_CODE_FAILED;
|
||||||
|
}
|
||||||
|
|
||||||
|
pDistInfo->numOfBlocks += p1.numOfBlocks;
|
||||||
|
pDistInfo->numOfTables += p1.numOfTables;
|
||||||
|
pDistInfo->numOfInmemRows += p1.numOfInmemRows;
|
||||||
|
pDistInfo->numOfSttRows += p1.numOfSttRows;
|
||||||
|
pDistInfo->totalSize += p1.totalSize;
|
||||||
|
pDistInfo->totalRows += p1.totalRows;
|
||||||
|
pDistInfo->numOfFiles += p1.numOfFiles;
|
||||||
|
|
||||||
|
pDistInfo->defMinRows = p1.defMinRows;
|
||||||
|
pDistInfo->defMaxRows = p1.defMaxRows;
|
||||||
|
pDistInfo->rowSize = p1.rowSize;
|
||||||
|
|
||||||
|
if (pDistInfo->minRows > p1.minRows) {
|
||||||
|
pDistInfo->minRows = p1.minRows;
|
||||||
|
}
|
||||||
|
if (pDistInfo->maxRows < p1.maxRows) {
|
||||||
|
pDistInfo->maxRows = p1.maxRows;
|
||||||
|
}
|
||||||
|
pDistInfo->numOfVgroups += (p1.numOfTables != 0 ? 1 : 0);
|
||||||
|
|
||||||
|
pResInfo->numOfRes = BLOCK_DISK_USAGE_RESULT_ROWS; // default output rows
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tSerializeBlockDbUsage(void* buf, int32_t bufLen, const SDBBlockUsageInfo* pInfo) {
|
||||||
|
SEncoder encoder = {0};
|
||||||
|
int32_t code = 0;
|
||||||
|
int32_t lino;
|
||||||
|
int32_t tlen;
|
||||||
|
tEncoderInit(&encoder, buf, bufLen);
|
||||||
|
|
||||||
|
TAOS_CHECK_EXIT(tStartEncode(&encoder));
|
||||||
|
TAOS_CHECK_EXIT(tEncodeU32(&encoder, pInfo->rowSize));
|
||||||
|
|
||||||
|
TAOS_CHECK_EXIT(tEncodeU16(&encoder, pInfo->numOfFiles));
|
||||||
|
TAOS_CHECK_EXIT(tEncodeU32(&encoder, pInfo->numOfBlocks));
|
||||||
|
TAOS_CHECK_EXIT(tEncodeU32(&encoder, pInfo->numOfTables));
|
||||||
|
|
||||||
|
TAOS_CHECK_EXIT(tEncodeU64(&encoder, pInfo->totalSize));
|
||||||
|
TAOS_CHECK_EXIT(tEncodeU64(&encoder, pInfo->totalRows));
|
||||||
|
TAOS_CHECK_EXIT(tEncodeI32(&encoder, pInfo->maxRows));
|
||||||
|
TAOS_CHECK_EXIT(tEncodeI32(&encoder, pInfo->minRows));
|
||||||
|
TAOS_CHECK_EXIT(tEncodeI32(&encoder, pInfo->defMaxRows));
|
||||||
|
TAOS_CHECK_EXIT(tEncodeI32(&encoder, pInfo->defMinRows));
|
||||||
|
TAOS_CHECK_EXIT(tEncodeU32(&encoder, pInfo->numOfInmemRows));
|
||||||
|
TAOS_CHECK_EXIT(tEncodeU32(&encoder, pInfo->numOfSttRows));
|
||||||
|
TAOS_CHECK_EXIT(tEncodeU32(&encoder, pInfo->numOfVgroups));
|
||||||
|
|
||||||
|
tEndEncode(&encoder);
|
||||||
|
|
||||||
|
_exit:
|
||||||
|
if (code) {
|
||||||
|
tlen = code;
|
||||||
|
} else {
|
||||||
|
tlen = encoder.pos;
|
||||||
|
}
|
||||||
|
tEncoderClear(&encoder);
|
||||||
|
return tlen;
|
||||||
|
}
|
||||||
|
int32_t tDeserializeBlockDbUsage(void* buf, int32_t bufLen, SDBBlockUsageInfo* pInfo) {
|
||||||
|
SDecoder decoder = {0};
|
||||||
|
int32_t code = 0;
|
||||||
|
int32_t lino;
|
||||||
|
tDecoderInit(&decoder, buf, bufLen);
|
||||||
|
|
||||||
|
TAOS_CHECK_EXIT(tStartDecode(&decoder));
|
||||||
|
TAOS_CHECK_EXIT(tDecodeU32(&decoder, &pInfo->rowSize));
|
||||||
|
|
||||||
|
TAOS_CHECK_EXIT(tDecodeU16(&decoder, &pInfo->numOfFiles));
|
||||||
|
TAOS_CHECK_EXIT(tDecodeU32(&decoder, &pInfo->numOfBlocks));
|
||||||
|
TAOS_CHECK_EXIT(tDecodeU32(&decoder, &pInfo->numOfTables));
|
||||||
|
|
||||||
|
TAOS_CHECK_EXIT(tDecodeU64(&decoder, &pInfo->totalSize));
|
||||||
|
TAOS_CHECK_EXIT(tDecodeU64(&decoder, &pInfo->totalRows));
|
||||||
|
TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pInfo->maxRows));
|
||||||
|
TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pInfo->minRows));
|
||||||
|
TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pInfo->defMaxRows));
|
||||||
|
TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pInfo->defMinRows));
|
||||||
|
TAOS_CHECK_EXIT(tDecodeU32(&decoder, &pInfo->numOfInmemRows));
|
||||||
|
TAOS_CHECK_EXIT(tDecodeU32(&decoder, &pInfo->numOfSttRows));
|
||||||
|
TAOS_CHECK_EXIT(tDecodeU32(&decoder, &pInfo->numOfVgroups));
|
||||||
|
|
||||||
|
_exit:
|
||||||
|
tDecoderClear(&decoder);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
int32_t blockDBUsageFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||||
|
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||||
|
SDBBlockUsageInfo* pData = GET_ROWCELL_INTERBUF(pResInfo);
|
||||||
|
|
||||||
|
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, 0);
|
||||||
|
if (NULL == pColInfo) {
|
||||||
|
return TSDB_CODE_OUT_OF_RANGE;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pData->totalRows == 0) {
|
||||||
|
pData->minRows = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t row = 0;
|
||||||
|
char st[256] = {0};
|
||||||
|
double averageSize = 0;
|
||||||
|
if (pData->numOfBlocks != 0) {
|
||||||
|
averageSize = ((double)pData->totalSize) / pData->numOfBlocks;
|
||||||
|
}
|
||||||
|
uint64_t totalRawSize = pData->totalRows * pData->rowSize;
|
||||||
|
double compRatio = 0;
|
||||||
|
if (totalRawSize != 0) {
|
||||||
|
compRatio = pData->totalSize * 100 / (double)totalRawSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t len = tsnprintf(varDataVal(st), sizeof(st) - VARSTR_HEADER_SIZE,
|
||||||
|
"Total_Blocks=[%d] Total_Size=[%.2f KiB] Average_size=[%.2f KiB] Compression_Ratio=[%.2f %c]",
|
||||||
|
pData->numOfBlocks, pData->totalSize / 1024.0, averageSize / 1024.0, compRatio, '%');
|
||||||
|
|
||||||
|
varDataSetLen(st, len);
|
||||||
|
int32_t code = colDataSetVal(pColInfo, row++, st, false);
|
||||||
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
int64_t avgRows = 0;
|
||||||
|
if (pData->numOfBlocks > 0) {
|
||||||
|
avgRows = pData->totalRows / pData->numOfBlocks;
|
||||||
|
}
|
||||||
|
|
||||||
|
len = tsnprintf(varDataVal(st), sizeof(st) - VARSTR_HEADER_SIZE,
|
||||||
|
"Block_Rows=[%" PRId64 "] MinRows=[%d] MaxRows=[%d] AvgRows=[%" PRId64 "]", pData->totalRows,
|
||||||
|
pData->minRows, pData->maxRows, avgRows);
|
||||||
|
varDataSetLen(st, len);
|
||||||
|
code = colDataSetVal(pColInfo, row++, st, false);
|
||||||
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
bool getDerivativeFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
|
bool getDerivativeFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
|
||||||
pEnv->calcMemSize = sizeof(SDerivInfo);
|
pEnv->calcMemSize = sizeof(SDerivInfo);
|
||||||
|
@ -6731,7 +6901,6 @@ int32_t irateFunction(SqlFunctionCtx* pCtx) {
|
||||||
return TSDB_CODE_FUNC_DUP_TIMESTAMP;
|
return TSDB_CODE_FUNC_DUP_TIMESTAMP;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
if ((INT64_MIN == pRateInfo->firstKey) || row.ts > pRateInfo->firstKey) {
|
if ((INT64_MIN == pRateInfo->firstKey) || row.ts > pRateInfo->firstKey) {
|
||||||
doSaveRateInfo(pRateInfo, true, row.ts, row.pPk, v);
|
doSaveRateInfo(pRateInfo, true, row.ts, row.pPk, v);
|
||||||
} else if (row.ts == pRateInfo->firstKey) {
|
} else if (row.ts == pRateInfo->firstKey) {
|
||||||
|
@ -6792,7 +6961,8 @@ static void irateCopyInfo(SRateInfo* pInput, SRateInfo* pOutput) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t irateTransferInfo(SRateInfo* pInput, SRateInfo* pOutput) {
|
static int32_t irateTransferInfo(SRateInfo* pInput, SRateInfo* pOutput) {
|
||||||
if ((pInput->firstKey != INT64_MIN && (pInput->firstKey == pOutput->firstKey || pInput->firstKey == pOutput->lastKey)) ||
|
if ((pInput->firstKey != INT64_MIN &&
|
||||||
|
(pInput->firstKey == pOutput->firstKey || pInput->firstKey == pOutput->lastKey)) ||
|
||||||
(pInput->lastKey != INT64_MIN && (pInput->lastKey == pOutput->firstKey || pInput->lastKey == pOutput->lastKey))) {
|
(pInput->lastKey != INT64_MIN && (pInput->lastKey == pOutput->firstKey || pInput->lastKey == pOutput->lastKey))) {
|
||||||
return TSDB_CODE_FUNC_DUP_TIMESTAMP;
|
return TSDB_CODE_FUNC_DUP_TIMESTAMP;
|
||||||
}
|
}
|
||||||
|
@ -6922,9 +7092,7 @@ _group_value_over:
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t groupKeyFunction(SqlFunctionCtx* pCtx) {
|
int32_t groupKeyFunction(SqlFunctionCtx* pCtx) { return groupConstValueFunction(pCtx); }
|
||||||
return groupConstValueFunction(pCtx);
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t groupConstValueFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
int32_t groupConstValueFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||||
int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
|
int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
|
||||||
|
@ -6953,9 +7121,7 @@ int32_t groupConstValueFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t groupKeyFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock){
|
int32_t groupKeyFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { return groupConstValueFinalize(pCtx, pBlock); }
|
||||||
return groupConstValueFinalize(pCtx, pBlock);
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t groupKeyCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
|
int32_t groupKeyCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
|
||||||
SResultRowEntryInfo* pDResInfo = GET_RES_INFO(pDestCtx);
|
SResultRowEntryInfo* pDResInfo = GET_RES_INFO(pDestCtx);
|
||||||
|
@ -6977,7 +7143,8 @@ int32_t groupKeyCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
|
||||||
|
|
||||||
if (IS_VAR_DATA_TYPE(pSourceCtx->resDataInfo.type)) {
|
if (IS_VAR_DATA_TYPE(pSourceCtx->resDataInfo.type)) {
|
||||||
(void)memcpy(pDBuf->data, pSBuf->data,
|
(void)memcpy(pDBuf->data, pSBuf->data,
|
||||||
(pSourceCtx->resDataInfo.type == TSDB_DATA_TYPE_JSON) ? getJsonValueLen(pSBuf->data) : varDataTLen(pSBuf->data));
|
(pSourceCtx->resDataInfo.type == TSDB_DATA_TYPE_JSON) ? getJsonValueLen(pSBuf->data)
|
||||||
|
: varDataTLen(pSBuf->data));
|
||||||
} else {
|
} else {
|
||||||
(void)memcpy(pDBuf->data, pSBuf->data, pSourceCtx->resDataInfo.bytes);
|
(void)memcpy(pDBuf->data, pSBuf->data, pSourceCtx->resDataInfo.bytes);
|
||||||
}
|
}
|
||||||
|
|
|
@ -122,7 +122,8 @@ EFuncDataRequired fmFuncDynDataRequired(int32_t funcId, void* pRes, SDataBlockIn
|
||||||
|
|
||||||
const char* name = funcMgtBuiltins[funcId].name;
|
const char* name = funcMgtBuiltins[funcId].name;
|
||||||
if ((strcmp(name, "_group_key") == 0) || (strcmp(name, "_select_value") == 0)) {
|
if ((strcmp(name, "_group_key") == 0) || (strcmp(name, "_select_value") == 0)) {
|
||||||
return FUNC_DATA_REQUIRED_NOT_LOAD;;
|
return FUNC_DATA_REQUIRED_NOT_LOAD;
|
||||||
|
;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (funcMgtBuiltins[funcId].dynDataRequiredFunc == NULL) {
|
if (funcMgtBuiltins[funcId].dynDataRequiredFunc == NULL) {
|
||||||
|
@ -188,7 +189,6 @@ bool fmIsWindowClauseFunc(int32_t funcId) { return fmIsAggFunc(funcId) || fmIsWi
|
||||||
|
|
||||||
bool fmIsIndefiniteRowsFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_INDEFINITE_ROWS_FUNC); }
|
bool fmIsIndefiniteRowsFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_INDEFINITE_ROWS_FUNC); }
|
||||||
|
|
||||||
|
|
||||||
bool fmIsSpecialDataRequiredFunc(int32_t funcId) {
|
bool fmIsSpecialDataRequiredFunc(int32_t funcId) {
|
||||||
return isSpecificClassifyFunc(funcId, FUNC_MGT_SPECIAL_DATA_REQUIRED);
|
return isSpecificClassifyFunc(funcId, FUNC_MGT_SPECIAL_DATA_REQUIRED);
|
||||||
}
|
}
|
||||||
|
@ -299,6 +299,13 @@ bool fmIsBlockDistFunc(int32_t funcId) {
|
||||||
return FUNCTION_TYPE_BLOCK_DIST == funcMgtBuiltins[funcId].type;
|
return FUNCTION_TYPE_BLOCK_DIST == funcMgtBuiltins[funcId].type;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool fmIsDBUsageFunc(int32_t funcId) {
|
||||||
|
if (funcId < 0 || funcId >= funcMgtBuiltinsNum) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return FUNCTION_TYPE_DB_USAGE == funcMgtBuiltins[funcId].type;
|
||||||
|
}
|
||||||
|
|
||||||
bool fmIsProcessByRowFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_PROCESS_BY_ROW); }
|
bool fmIsProcessByRowFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_PROCESS_BY_ROW); }
|
||||||
|
|
||||||
bool fmIsIgnoreNullFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_IGNORE_NULL_FUNC); }
|
bool fmIsIgnoreNullFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_IGNORE_NULL_FUNC); }
|
||||||
|
@ -377,13 +384,9 @@ bool fmIsConstantResFunc(SFunctionNode* pFunc) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool fmIsSkipScanCheckFunc(int32_t funcId) {
|
bool fmIsSkipScanCheckFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_SKIP_SCAN_CHECK_FUNC); }
|
||||||
return isSpecificClassifyFunc(funcId, FUNC_MGT_SKIP_SCAN_CHECK_FUNC);
|
|
||||||
}
|
|
||||||
|
|
||||||
bool fmIsPrimaryKeyFunc(int32_t funcId) {
|
bool fmIsPrimaryKeyFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_PRIMARY_KEY_FUNC); }
|
||||||
return isSpecificClassifyFunc(funcId, FUNC_MGT_PRIMARY_KEY_FUNC);
|
|
||||||
}
|
|
||||||
void getLastCacheDataType(SDataType* pType, int32_t pkBytes) {
|
void getLastCacheDataType(SDataType* pType, int32_t pkBytes) {
|
||||||
// TODO: do it later.
|
// TODO: do it later.
|
||||||
pType->bytes = getFirstLastInfoSize(pType->bytes, pkBytes) + VARSTR_HEADER_SIZE;
|
pType->bytes = getFirstLastInfoSize(pType->bytes, pkBytes) + VARSTR_HEADER_SIZE;
|
||||||
|
@ -527,7 +530,8 @@ static int32_t createMergeFunction(const SFunctionNode* pSrcFunc, const SFunctio
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t fmGetDistMethod(const SFunctionNode* pFunc, SFunctionNode** pPartialFunc, SFunctionNode** pMidFunc, SFunctionNode** pMergeFunc) {
|
int32_t fmGetDistMethod(const SFunctionNode* pFunc, SFunctionNode** pPartialFunc, SFunctionNode** pMidFunc,
|
||||||
|
SFunctionNode** pMergeFunc) {
|
||||||
if (!fmIsDistExecFunc(pFunc->funcId)) {
|
if (!fmIsDistExecFunc(pFunc->funcId)) {
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
|
@ -676,6 +680,4 @@ bool fmIsMyStateFunc(int32_t funcId, int32_t stateFuncId) {
|
||||||
return strcmp(pStateFunc->name, pStateMergeFunc->pMergeFunc) == 0;
|
return strcmp(pStateFunc->name, pStateMergeFunc->pMergeFunc) == 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool fmIsCountLikeFunc(int32_t funcId) {
|
bool fmIsCountLikeFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_COUNT_LIKE_FUNC); }
|
||||||
return isSpecificClassifyFunc(funcId, FUNC_MGT_COUNT_LIKE_FUNC);
|
|
||||||
}
|
|
||||||
|
|
|
@ -2785,6 +2785,26 @@ static int32_t translateBlockDistFunc(STranslateContext* pCtx, SFunctionNode* pF
|
||||||
}
|
}
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
static int32_t translateDBUsageFunc(STranslateContext* pCtx, SFunctionNode* pFunc) {
|
||||||
|
if (!fmIsDBUsageFunc(pFunc->funcId)) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
if (!isSelectStmt(pCtx->pCurrStmt)) {
|
||||||
|
return generateSyntaxErrMsgExt(&pCtx->msgBuf, TSDB_CODE_PAR_ONLY_SUPPORT_SINGLE_TABLE,
|
||||||
|
"%s is only supported in single table query", pFunc->functionName);
|
||||||
|
}
|
||||||
|
SSelectStmt* pSelect = (SSelectStmt*)pCtx->pCurrStmt;
|
||||||
|
SNode* pTable = pSelect->pFromTable;
|
||||||
|
// if (NULL != pTable && (QUERY_NODE_REAL_TABLE != nodeType(pTable) ||
|
||||||
|
// (TSDB_SUPER_TABLE != ((SRealTableNode*)pTable)->pMeta->tableType &&
|
||||||
|
// TSDB_CHILD_TABLE != ((SRealTableNode*)pTable)->pMeta->tableType &&
|
||||||
|
// TSDB_NORMAL_TABLE != ((SRealTableNode*)pTable)->pMeta->tableType))) {
|
||||||
|
// return generateSyntaxErrMsgExt(&pCtx->msgBuf, TSDB_CODE_PAR_NOT_ALLOWED_FUNC,
|
||||||
|
// "%s is only supported on super table, child table or normal table",
|
||||||
|
// pFunc->functionName);
|
||||||
|
// }
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
static bool isStarParam(SNode* pNode) { return nodesIsStar(pNode) || nodesIsTableStar(pNode); }
|
static bool isStarParam(SNode* pNode) { return nodesIsStar(pNode) || nodesIsTableStar(pNode); }
|
||||||
|
|
||||||
|
@ -3139,6 +3159,10 @@ static int32_t translateNormalFunction(STranslateContext* pCxt, SNode** ppNode)
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = translateBlockDistFunc(pCxt, pFunc);
|
code = translateBlockDistFunc(pCxt, pFunc);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = translateDBUsageFunc(pCxt, pFunc);
|
||||||
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
setFuncClassification(pCxt, pFunc);
|
setFuncClassification(pCxt, pFunc);
|
||||||
}
|
}
|
||||||
|
@ -13156,6 +13180,12 @@ static int32_t createSelectStmtForShow(ENodeType showType, SSelectStmt** pStmt)
|
||||||
static int32_t createSelectStmtForShowTableDist(SShowTableDistributedStmt* pStmt, SSelectStmt** pOutput) {
|
static int32_t createSelectStmtForShowTableDist(SShowTableDistributedStmt* pStmt, SSelectStmt** pOutput) {
|
||||||
return createSimpleSelectStmtFromCols(pStmt->dbName, pStmt->tableName, 0, NULL, pOutput);
|
return createSimpleSelectStmtFromCols(pStmt->dbName, pStmt->tableName, 0, NULL, pOutput);
|
||||||
}
|
}
|
||||||
|
static int32_t createSelectStmtForShowDBUsage(SShowStmt* pStmt, SSelectStmt** pOutput) {
|
||||||
|
int32_t type = nodeType(pStmt);
|
||||||
|
const SSysTableShowAdapter* pShow = &sysTableShowAdapter[type - SYSTABLE_SHOW_TYPE_OFFSET];
|
||||||
|
return createSimpleSelectStmtFromCols(pShow->pDbName, pShow->pTableName, pShow->numOfShowCols, pShow->pShowCols,
|
||||||
|
pOutput);
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t createOperatorNode(EOperatorType opType, const char* pColName, const SNode* pRight, SNode** pOp) {
|
static int32_t createOperatorNode(EOperatorType opType, const char* pColName, const SNode* pRight, SNode** pOp) {
|
||||||
if (NULL == pRight) {
|
if (NULL == pRight) {
|
||||||
|
@ -13687,6 +13717,50 @@ static int32_t rewriteShowTableDist(STranslateContext* pCxt, SQuery* pQuery) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t createDBUsageFunc(SFunctionNode** ppNode) {
|
||||||
|
SFunctionNode* pFunc = NULL;
|
||||||
|
int32_t code = nodesMakeNode(QUERY_NODE_FUNCTION, (SNode**)&pFunc);
|
||||||
|
if (NULL == pFunc) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
strcpy(pFunc->functionName, "_db_usage");
|
||||||
|
strcpy(pFunc->node.aliasName, "_db_usage");
|
||||||
|
SFunctionNode* pFuncNew = NULL;
|
||||||
|
code = createBlockDistInfoFunc(&pFuncNew);
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = nodesListMakeStrictAppend(&pFunc->pParameterList, (SNode*)pFuncNew);
|
||||||
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
|
nodesDestroyNode((SNode*)pFunc);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
*ppNode = pFunc;
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
static int32_t rewriteShowDBUsage(STranslateContext* pCtx, SQuery* pQuery) {
|
||||||
|
SSelectStmt* pStmt = NULL;
|
||||||
|
int32_t code = createSelectStmtForShowDBUsage((SShowStmt*)pQuery->pRoot, &pStmt);
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
NODES_DESTORY_LIST(pStmt->pProjectionList);
|
||||||
|
SFunctionNode* pFuncNew = NULL;
|
||||||
|
code = createDBUsageFunc(&pFuncNew);
|
||||||
|
if (TSDB_CODE_SUCCESS == code) code = nodesListMakeStrictAppend(&pStmt->pProjectionList, (SNode*)pFuncNew);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = createShowCondition((SShowStmt*)pQuery->pRoot, pStmt);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
pCtx->showRewrite = true;
|
||||||
|
pQuery->showRewrite = true;
|
||||||
|
nodesDestroyNode(pQuery->pRoot);
|
||||||
|
pQuery->pRoot = (SNode*)pStmt;
|
||||||
|
}
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
typedef struct SVgroupCreateTableBatch {
|
typedef struct SVgroupCreateTableBatch {
|
||||||
SVCreateTbBatchReq req;
|
SVCreateTbBatchReq req;
|
||||||
SVgroupInfo info;
|
SVgroupInfo info;
|
||||||
|
@ -16061,9 +16135,11 @@ static int32_t rewriteQuery(STranslateContext* pCxt, SQuery* pQuery) {
|
||||||
case QUERY_NODE_SHOW_ARBGROUPS_STMT:
|
case QUERY_NODE_SHOW_ARBGROUPS_STMT:
|
||||||
case QUERY_NODE_SHOW_ENCRYPTIONS_STMT:
|
case QUERY_NODE_SHOW_ENCRYPTIONS_STMT:
|
||||||
case QUERY_NODE_SHOW_TSMAS_STMT:
|
case QUERY_NODE_SHOW_TSMAS_STMT:
|
||||||
case QUERY_NODE_SHOW_USAGE_STMT:
|
|
||||||
code = rewriteShow(pCxt, pQuery);
|
code = rewriteShow(pCxt, pQuery);
|
||||||
break;
|
break;
|
||||||
|
case QUERY_NODE_SHOW_USAGE_STMT:
|
||||||
|
code = rewriteShowDBUsage(pCxt, pQuery);
|
||||||
|
break;
|
||||||
case QUERY_NODE_SHOW_TAGS_STMT:
|
case QUERY_NODE_SHOW_TAGS_STMT:
|
||||||
code = rewriteShowTags(pCxt, pQuery);
|
code = rewriteShowTags(pCxt, pQuery);
|
||||||
break;
|
break;
|
||||||
|
|
|
@ -30,7 +30,6 @@ typedef struct SSlotIndex {
|
||||||
SArray* pSlotIdsInfo; // duplicate name slot
|
SArray* pSlotIdsInfo; // duplicate name slot
|
||||||
} SSlotIndex;
|
} SSlotIndex;
|
||||||
|
|
||||||
|
|
||||||
static int32_t getSlotKey(SNode* pNode, const char* pStmtName, char** ppKey, int32_t* pLen, uint16_t extraBufLen) {
|
static int32_t getSlotKey(SNode* pNode, const char* pStmtName, char** ppKey, int32_t* pLen, uint16_t extraBufLen) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
if (QUERY_NODE_COLUMN == nodeType(pNode)) {
|
if (QUERY_NODE_COLUMN == nodeType(pNode)) {
|
||||||
|
@ -125,7 +124,6 @@ static int32_t getSlotKey(SNode* pNode, const char* pStmtName, char** ppKey, int
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static SNode* createSlotDesc(SPhysiPlanContext* pCxt, const char* pName, const SNode* pNode, int16_t slotId,
|
static SNode* createSlotDesc(SPhysiPlanContext* pCxt, const char* pName, const SNode* pNode, int16_t slotId,
|
||||||
bool output, bool reserve) {
|
bool output, bool reserve) {
|
||||||
SSlotDescNode* pSlot = NULL;
|
SSlotDescNode* pSlot = NULL;
|
||||||
|
@ -178,7 +176,8 @@ static int32_t putSlotToHashImpl(int16_t dataBlockId, int16_t slotId, const char
|
||||||
return taosHashPut(pHash, pName, len, &index, sizeof(SSlotIndex));
|
return taosHashPut(pHash, pName, len, &index, sizeof(SSlotIndex));
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t putSlotToHash(const char* pName, int32_t len, int16_t dataBlockId, int16_t slotId, SNode* pNode, SHashObj* pHash) {
|
static int32_t putSlotToHash(const char* pName, int32_t len, int16_t dataBlockId, int16_t slotId, SNode* pNode,
|
||||||
|
SHashObj* pHash) {
|
||||||
return putSlotToHashImpl(dataBlockId, slotId, pName, len, pHash);
|
return putSlotToHashImpl(dataBlockId, slotId, pName, len, pHash);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -299,8 +298,8 @@ static int32_t addDataBlockSlotsImpl(SPhysiPlanContext* pCxt, SNodeList* pList,
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
SSlotIndex* pIndex = taosHashGet(pHash, name, len);
|
SSlotIndex* pIndex = taosHashGet(pHash, name, len);
|
||||||
if (NULL == pIndex) {
|
if (NULL == pIndex) {
|
||||||
code =
|
code = nodesListStrictAppend(pDataBlockDesc->pSlots,
|
||||||
nodesListStrictAppend(pDataBlockDesc->pSlots, createSlotDesc(pCxt, name, pExpr, nextSlotId, output, reserve));
|
createSlotDesc(pCxt, name, pExpr, nextSlotId, output, reserve));
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = putSlotToHashImpl(pDataBlockDesc->dataBlockId, nextSlotId, name, len, pHash);
|
code = putSlotToHashImpl(pDataBlockDesc->dataBlockId, nextSlotId, name, len, pHash);
|
||||||
}
|
}
|
||||||
|
@ -633,6 +632,7 @@ static ENodeType getScanOperatorType(EScanType scanType) {
|
||||||
return QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN;
|
return QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN;
|
||||||
case SCAN_TYPE_TABLE_COUNT:
|
case SCAN_TYPE_TABLE_COUNT:
|
||||||
return QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN;
|
return QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN;
|
||||||
|
case SCAN_TYPE_DB_DISK_USAGE:
|
||||||
default:
|
default:
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -878,18 +878,21 @@ static int32_t createScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan,
|
||||||
static int32_t getJoinDataBlockDescNode(SNodeList* pChildren, int32_t idx, SDataBlockDescNode** ppDesc) {
|
static int32_t getJoinDataBlockDescNode(SNodeList* pChildren, int32_t idx, SDataBlockDescNode** ppDesc) {
|
||||||
if (2 == pChildren->length) {
|
if (2 == pChildren->length) {
|
||||||
*ppDesc = ((SPhysiNode*)nodesListGetNode(pChildren, idx))->pOutputDataBlockDesc;
|
*ppDesc = ((SPhysiNode*)nodesListGetNode(pChildren, idx))->pOutputDataBlockDesc;
|
||||||
} else if (1 == pChildren->length && nodeType(nodesListGetNode(pChildren, 0)) == QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE) {
|
} else if (1 == pChildren->length &&
|
||||||
|
nodeType(nodesListGetNode(pChildren, 0)) == QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE) {
|
||||||
SGroupCachePhysiNode* pGrpCache = (SGroupCachePhysiNode*)nodesListGetNode(pChildren, 0);
|
SGroupCachePhysiNode* pGrpCache = (SGroupCachePhysiNode*)nodesListGetNode(pChildren, 0);
|
||||||
*ppDesc = ((SPhysiNode*)nodesListGetNode(pGrpCache->node.pChildren, idx))->pOutputDataBlockDesc;
|
*ppDesc = ((SPhysiNode*)nodesListGetNode(pGrpCache->node.pChildren, idx))->pOutputDataBlockDesc;
|
||||||
} else {
|
} else {
|
||||||
planError("Invalid join children num:%d or child type:%d", pChildren->length, nodeType(nodesListGetNode(pChildren, 0)));
|
planError("Invalid join children num:%d or child type:%d", pChildren->length,
|
||||||
|
nodeType(nodesListGetNode(pChildren, 0)));
|
||||||
return TSDB_CODE_PLAN_INTERNAL_ERROR;
|
return TSDB_CODE_PLAN_INTERNAL_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t setColEqList(SNode* pEqCond, int16_t leftBlkId, int16_t rightBlkId, SNodeList** ppLeft, SNodeList** ppRight) {
|
static int32_t setColEqList(SNode* pEqCond, int16_t leftBlkId, int16_t rightBlkId, SNodeList** ppLeft,
|
||||||
|
SNodeList** ppRight) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
if (QUERY_NODE_OPERATOR == nodeType(pEqCond) && ((SOperatorNode*)pEqCond)->opType == OP_TYPE_EQUAL) {
|
if (QUERY_NODE_OPERATOR == nodeType(pEqCond) && ((SOperatorNode*)pEqCond)->opType == OP_TYPE_EQUAL) {
|
||||||
SOperatorNode* pOp = (SOperatorNode*)pEqCond;
|
SOperatorNode* pOp = (SOperatorNode*)pEqCond;
|
||||||
|
@ -925,7 +928,8 @@ static int32_t setColEqList(SNode* pEqCond, int16_t leftBlkId, int16_t rightBlkI
|
||||||
return TSDB_CODE_PLAN_INTERNAL_ERROR;
|
return TSDB_CODE_PLAN_INTERNAL_ERROR;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else if (QUERY_NODE_LOGIC_CONDITION == nodeType(pEqCond) && ((SLogicConditionNode*)pEqCond)->condType == LOGIC_COND_TYPE_AND) {
|
} else if (QUERY_NODE_LOGIC_CONDITION == nodeType(pEqCond) &&
|
||||||
|
((SLogicConditionNode*)pEqCond)->condType == LOGIC_COND_TYPE_AND) {
|
||||||
SLogicConditionNode* pLogic = (SLogicConditionNode*)pEqCond;
|
SLogicConditionNode* pLogic = (SLogicConditionNode*)pEqCond;
|
||||||
SNode* pNode = NULL;
|
SNode* pNode = NULL;
|
||||||
FOREACH(pNode, pLogic->pParameterList) {
|
FOREACH(pNode, pLogic->pParameterList) {
|
||||||
|
@ -942,7 +946,8 @@ static int32_t setColEqList(SNode* pEqCond, int16_t leftBlkId, int16_t rightBlkI
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t setMergeJoinPrimColEqCond(SNode* pEqCond, int32_t subType, int16_t leftBlkId, int16_t rightBlkId, SSortMergeJoinPhysiNode* pJoin) {
|
static int32_t setMergeJoinPrimColEqCond(SNode* pEqCond, int32_t subType, int16_t leftBlkId, int16_t rightBlkId,
|
||||||
|
SSortMergeJoinPhysiNode* pJoin) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
if (QUERY_NODE_OPERATOR == nodeType(pEqCond)) {
|
if (QUERY_NODE_OPERATOR == nodeType(pEqCond)) {
|
||||||
SOperatorNode* pOp = (SOperatorNode*)pEqCond;
|
SOperatorNode* pOp = (SOperatorNode*)pEqCond;
|
||||||
|
@ -1085,7 +1090,8 @@ static int32_t createMergeJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChi
|
||||||
code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pPrimKeyEqCond,
|
code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pPrimKeyEqCond,
|
||||||
&pJoin->pPrimKeyCond);
|
&pJoin->pPrimKeyCond);
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = setMergeJoinPrimColEqCond(pJoin->pPrimKeyCond, pJoin->subType, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoin);
|
code = setMergeJoinPrimColEqCond(pJoin->pPrimKeyCond, pJoin->subType, pLeftDesc->dataBlockId,
|
||||||
|
pRightDesc->dataBlockId, pJoin);
|
||||||
}
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code && NULL != pJoin->leftPrimExpr) {
|
if (TSDB_CODE_SUCCESS == code && NULL != pJoin->leftPrimExpr) {
|
||||||
code = addDataBlockSlot(pCxt, &pJoin->leftPrimExpr, pLeftDesc);
|
code = addDataBlockSlot(pCxt, &pJoin->leftPrimExpr, pLeftDesc);
|
||||||
|
@ -1100,7 +1106,8 @@ static int32_t createMergeJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChi
|
||||||
code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->addPrimEqCond,
|
code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->addPrimEqCond,
|
||||||
&pPrimKeyCond);
|
&pPrimKeyCond);
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = setMergeJoinPrimColEqCond(pPrimKeyCond, pJoin->subType, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoin);
|
code = setMergeJoinPrimColEqCond(pPrimKeyCond, pJoin->subType, pLeftDesc->dataBlockId, pRightDesc->dataBlockId,
|
||||||
|
pJoin);
|
||||||
}
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code && NULL != pJoin->leftPrimExpr) {
|
if (TSDB_CODE_SUCCESS == code && NULL != pJoin->leftPrimExpr) {
|
||||||
code = addDataBlockSlot(pCxt, &pJoin->leftPrimExpr, pLeftDesc);
|
code = addDataBlockSlot(pCxt, &pJoin->leftPrimExpr, pLeftDesc);
|
||||||
|
@ -1117,8 +1124,8 @@ static int32_t createMergeJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChi
|
||||||
}
|
}
|
||||||
|
|
||||||
if (TSDB_CODE_SUCCESS == code && NULL != pJoinLogicNode->pFullOnCond) {
|
if (TSDB_CODE_SUCCESS == code && NULL != pJoinLogicNode->pFullOnCond) {
|
||||||
code = setNodeSlotId(pCxt, ((SPhysiNode*)pJoin)->pOutputDataBlockDesc->dataBlockId, -1,
|
code = setNodeSlotId(pCxt, ((SPhysiNode*)pJoin)->pOutputDataBlockDesc->dataBlockId, -1, pJoinLogicNode->pFullOnCond,
|
||||||
pJoinLogicNode->pFullOnCond, &pJoin->pFullOnCond);
|
&pJoin->pFullOnCond);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (TSDB_CODE_SUCCESS == code && ((NULL != pJoinLogicNode->pColEqCond) || (NULL != pJoinLogicNode->pTagEqCond))) {
|
if (TSDB_CODE_SUCCESS == code && ((NULL != pJoinLogicNode->pColEqCond) || (NULL != pJoinLogicNode->pTagEqCond))) {
|
||||||
|
@ -1127,14 +1134,16 @@ static int32_t createMergeJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChi
|
||||||
// TODO set from input blocks for group algo
|
// TODO set from input blocks for group algo
|
||||||
/*
|
/*
|
||||||
if (TSDB_CODE_SUCCESS == code && NULL != pJoinLogicNode->pColEqCond) {
|
if (TSDB_CODE_SUCCESS == code && NULL != pJoinLogicNode->pColEqCond) {
|
||||||
code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pColEqCond, &pJoin->pColEqCond);
|
code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pColEqCond,
|
||||||
|
&pJoin->pColEqCond);
|
||||||
}
|
}
|
||||||
*/
|
*/
|
||||||
if (TSDB_CODE_SUCCESS == code && NULL != pJoinLogicNode->pColEqCond) {
|
if (TSDB_CODE_SUCCESS == code && NULL != pJoinLogicNode->pColEqCond) {
|
||||||
code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId,
|
code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pColEqCond,
|
||||||
pJoinLogicNode->pColEqCond, &pJoin->pColEqCond);
|
&pJoin->pColEqCond);
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = setColEqList(pJoin->pColEqCond, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, &pJoin->pEqLeft, &pJoin->pEqRight);
|
code = setColEqList(pJoin->pColEqCond, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, &pJoin->pEqLeft,
|
||||||
|
&pJoin->pEqRight);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1144,15 +1153,15 @@ static int32_t createMergeJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChi
|
||||||
// TODO set from input blocks for group algo
|
// TODO set from input blocks for group algo
|
||||||
/*
|
/*
|
||||||
if (TSDB_CODE_SUCCESS == code && NULL != pJoinLogicNode->pColOnCond) {
|
if (TSDB_CODE_SUCCESS == code && NULL != pJoinLogicNode->pColOnCond) {
|
||||||
code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pColOnCond, &pJoin->pColOnCond);
|
code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pColOnCond,
|
||||||
|
&pJoin->pColOnCond);
|
||||||
}
|
}
|
||||||
*/
|
*/
|
||||||
if (TSDB_CODE_SUCCESS == code && NULL != pJoinLogicNode->pColOnCond) {
|
if (TSDB_CODE_SUCCESS == code && NULL != pJoinLogicNode->pColOnCond) {
|
||||||
code = setNodeSlotId(pCxt, ((SPhysiNode*)pJoin)->pOutputDataBlockDesc->dataBlockId, -1,
|
code = setNodeSlotId(pCxt, ((SPhysiNode*)pJoin)->pOutputDataBlockDesc->dataBlockId, -1, pJoinLogicNode->pColOnCond,
|
||||||
pJoinLogicNode->pColOnCond, &pJoin->pColOnCond);
|
&pJoin->pColOnCond);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = setConditionsSlotId(pCxt, (const SLogicNode*)pJoinLogicNode, (SPhysiNode*)pJoin);
|
code = setConditionsSlotId(pCxt, (const SLogicNode*)pJoinLogicNode, (SPhysiNode*)pJoin);
|
||||||
}
|
}
|
||||||
|
@ -1201,7 +1210,8 @@ static int32_t extractHashJoinOpCols(int16_t lBlkId, int16_t rBlkId, SNode* pEq,
|
||||||
code = nodesListStrictAppend(pJoin->pOnRight, pL);
|
code = nodesListStrictAppend(pJoin->pOnRight, pL);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
planError("Invalid join equal cond, lbid:%d, rbid:%d, oplid:%d, oprid:%d", lBlkId, rBlkId, pLeft->dataBlockId, pRight->dataBlockId);
|
planError("Invalid join equal cond, lbid:%d, rbid:%d, oplid:%d, oprid:%d", lBlkId, rBlkId, pLeft->dataBlockId,
|
||||||
|
pRight->dataBlockId);
|
||||||
return TSDB_CODE_PLAN_INTERNAL_ERROR;
|
return TSDB_CODE_PLAN_INTERNAL_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1237,7 +1247,8 @@ static int32_t extractHashJoinOnCols(int16_t lBlkId, int16_t rBlkId, SNode* pEq,
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t createHashJoinColList(int16_t lBlkId, int16_t rBlkId, SNode* pEq1, SNode* pEq2, SNode* pEq3, SHashJoinPhysiNode* pJoin) {
|
static int32_t createHashJoinColList(int16_t lBlkId, int16_t rBlkId, SNode* pEq1, SNode* pEq2, SNode* pEq3,
|
||||||
|
SHashJoinPhysiNode* pJoin) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
pJoin->pOnLeft = NULL;
|
pJoin->pOnLeft = NULL;
|
||||||
code = nodesMakeList(&pJoin->pOnLeft);
|
code = nodesMakeList(&pJoin->pOnLeft);
|
||||||
|
@ -1357,8 +1368,8 @@ static int32_t sortHashJoinTargets(int16_t lBlkId, int16_t rBlkId, SHashJoinPhys
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t setHashJoinPrimColEqCond(SNode* pEqCond, int16_t leftBlkId, int16_t rightBlkId,
|
||||||
static int32_t setHashJoinPrimColEqCond(SNode* pEqCond, int16_t leftBlkId, int16_t rightBlkId, SHashJoinPhysiNode* pJoin) {
|
SHashJoinPhysiNode* pJoin) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
if (QUERY_NODE_OPERATOR == nodeType(pEqCond)) {
|
if (QUERY_NODE_OPERATOR == nodeType(pEqCond)) {
|
||||||
SOperatorNode* pOp = (SOperatorNode*)pEqCond;
|
SOperatorNode* pOp = (SOperatorNode*)pEqCond;
|
||||||
|
@ -1464,7 +1475,6 @@ static int32_t setHashJoinPrimColEqCond(SNode* pEqCond, int16_t leftBlkId, int16
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static int32_t createHashJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SJoinLogicNode* pJoinLogicNode,
|
static int32_t createHashJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SJoinLogicNode* pJoinLogicNode,
|
||||||
SPhysiNode** pPhyNode) {
|
SPhysiNode** pPhyNode) {
|
||||||
SHashJoinPhysiNode* pJoin =
|
SHashJoinPhysiNode* pJoin =
|
||||||
|
@ -1510,10 +1520,12 @@ static int32_t createHashJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pColEqCond, &pJoin->pColEqCond);
|
code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pColEqCond,
|
||||||
|
&pJoin->pColEqCond);
|
||||||
}
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pTagEqCond, &pJoin->pTagEqCond);
|
code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pTagEqCond,
|
||||||
|
&pJoin->pTagEqCond);
|
||||||
}
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, -1, pJoinLogicNode->pLeftOnCond, &pJoin->pLeftOnCond);
|
code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, -1, pJoinLogicNode->pLeftOnCond, &pJoin->pLeftOnCond);
|
||||||
|
@ -1526,16 +1538,19 @@ static int32_t createHashJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChil
|
||||||
}
|
}
|
||||||
SNode* pOnCond = (NULL != pJoinLogicNode->pColOnCond) ? pJoinLogicNode->pColOnCond : pJoinLogicNode->pTagOnCond;
|
SNode* pOnCond = (NULL != pJoinLogicNode->pColOnCond) ? pJoinLogicNode->pColOnCond : pJoinLogicNode->pTagOnCond;
|
||||||
if (TSDB_CODE_SUCCESS == code && NULL != pOnCond) {
|
if (TSDB_CODE_SUCCESS == code && NULL != pOnCond) {
|
||||||
code = setNodeSlotId(pCxt, ((SPhysiNode*)pJoin)->pOutputDataBlockDesc->dataBlockId, -1, pOnCond, &pJoin->pFullOnCond);
|
code =
|
||||||
|
setNodeSlotId(pCxt, ((SPhysiNode*)pJoin)->pOutputDataBlockDesc->dataBlockId, -1, pOnCond, &pJoin->pFullOnCond);
|
||||||
}
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = setListSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->node.pTargets, &pJoin->pTargets);
|
code = setListSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->node.pTargets,
|
||||||
|
&pJoin->pTargets);
|
||||||
}
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = setConditionsSlotId(pCxt, (const SLogicNode*)pJoinLogicNode, (SPhysiNode*)pJoin);
|
code = setConditionsSlotId(pCxt, (const SLogicNode*)pJoinLogicNode, (SPhysiNode*)pJoin);
|
||||||
}
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = createHashJoinColList(pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoin->pPrimKeyCond, pJoin->pColEqCond, pJoin->pTagEqCond, pJoin);
|
code = createHashJoinColList(pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoin->pPrimKeyCond,
|
||||||
|
pJoin->pColEqCond, pJoin->pTagEqCond, pJoin);
|
||||||
}
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = sortHashJoinTargets(pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoin);
|
code = sortHashJoinTargets(pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoin);
|
||||||
|
@ -1568,8 +1583,8 @@ static int32_t createJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t createGroupCachePhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SGroupCacheLogicNode* pLogicNode,
|
static int32_t createGroupCachePhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
|
||||||
SPhysiNode** pPhyNode) {
|
SGroupCacheLogicNode* pLogicNode, SPhysiNode** pPhyNode) {
|
||||||
SGroupCachePhysiNode* pGrpCache =
|
SGroupCachePhysiNode* pGrpCache =
|
||||||
(SGroupCachePhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pLogicNode, QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE);
|
(SGroupCachePhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pLogicNode, QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE);
|
||||||
if (NULL == pGrpCache) {
|
if (NULL == pGrpCache) {
|
||||||
|
@ -1593,8 +1608,8 @@ static int32_t createGroupCachePhysiNode(SPhysiPlanContext* pCxt, SNodeList* pCh
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t updateDynQueryCtrlStbJoinInfo(SPhysiPlanContext* pCxt, SNodeList* pChildren, SDynQueryCtrlLogicNode* pLogicNode,
|
static int32_t updateDynQueryCtrlStbJoinInfo(SPhysiPlanContext* pCxt, SNodeList* pChildren,
|
||||||
SDynQueryCtrlPhysiNode* pDynCtrl) {
|
SDynQueryCtrlLogicNode* pLogicNode, SDynQueryCtrlPhysiNode* pDynCtrl) {
|
||||||
SDataBlockDescNode* pPrevDesc = ((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc;
|
SDataBlockDescNode* pPrevDesc = ((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc;
|
||||||
SNodeList* pVgList = NULL;
|
SNodeList* pVgList = NULL;
|
||||||
SNodeList* pUidList = NULL;
|
SNodeList* pUidList = NULL;
|
||||||
|
@ -1624,8 +1639,8 @@ static int32_t updateDynQueryCtrlStbJoinInfo(SPhysiPlanContext* pCxt, SNodeList*
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t createDynQueryCtrlPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SDynQueryCtrlLogicNode* pLogicNode,
|
static int32_t createDynQueryCtrlPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
|
||||||
SPhysiNode** pPhyNode) {
|
SDynQueryCtrlLogicNode* pLogicNode, SPhysiNode** pPhyNode) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
SDynQueryCtrlPhysiNode* pDynCtrl =
|
SDynQueryCtrlPhysiNode* pDynCtrl =
|
||||||
(SDynQueryCtrlPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pLogicNode, QUERY_NODE_PHYSICAL_PLAN_DYN_QUERY_CTRL);
|
(SDynQueryCtrlPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pLogicNode, QUERY_NODE_PHYSICAL_PLAN_DYN_QUERY_CTRL);
|
||||||
|
@ -2649,7 +2664,8 @@ static int32_t createExchangePhysiNodeByMerge(SMergePhysiNode* pMerge, int32_t i
|
||||||
return nodesListMakeStrictAppend(&pMerge->node.pChildren, (SNode*)pExchange);
|
return nodesListMakeStrictAppend(&pMerge->node.pChildren, (SNode*)pExchange);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t createMergePhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SMergeLogicNode* pMergeLogicNode, SPhysiNode** pPhyNode) {
|
static int32_t createMergePhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SMergeLogicNode* pMergeLogicNode,
|
||||||
|
SPhysiNode** pPhyNode) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
SMergePhysiNode* pMerge =
|
SMergePhysiNode* pMerge =
|
||||||
(SMergePhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pMergeLogicNode, QUERY_NODE_PHYSICAL_PLAN_MERGE);
|
(SMergePhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pMergeLogicNode, QUERY_NODE_PHYSICAL_PLAN_MERGE);
|
||||||
|
@ -2703,7 +2719,8 @@ static int32_t createMergePhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildre
|
||||||
SDataBlockDescNode* pLeftDesc = ((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc;
|
SDataBlockDescNode* pLeftDesc = ((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc;
|
||||||
SDataBlockDescNode* pRightDesc = ((SPhysiNode*)nodesListGetNode(pChildren, 1))->pOutputDataBlockDesc;
|
SDataBlockDescNode* pRightDesc = ((SPhysiNode*)nodesListGetNode(pChildren, 1))->pOutputDataBlockDesc;
|
||||||
|
|
||||||
code = setListSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pMergeLogicNode->node.pTargets, &pMerge->pTargets);
|
code = setListSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pMergeLogicNode->node.pTargets,
|
||||||
|
&pMerge->pTargets);
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = addDataBlockSlots(pCxt, pMerge->pTargets, pMerge->node.pOutputDataBlockDesc);
|
code = addDataBlockSlots(pCxt, pMerge->pTargets, pMerge->node.pOutputDataBlockDesc);
|
||||||
}
|
}
|
||||||
|
@ -3139,8 +3156,7 @@ static int32_t setExecNodeList(SPhysiPlanContext* pCxt, SArray* pExecNodeList) {
|
||||||
}
|
}
|
||||||
if (pCxt->hasSysScan || !pCxt->hasScan) {
|
if (pCxt->hasSysScan || !pCxt->hasScan) {
|
||||||
SQueryNodeLoad node = {.addr = {.nodeId = MNODE_HANDLE, .epSet = pCxt->pPlanCxt->mgmtEpSet}, .load = 0};
|
SQueryNodeLoad node = {.addr = {.nodeId = MNODE_HANDLE, .epSet = pCxt->pPlanCxt->mgmtEpSet}, .load = 0};
|
||||||
if (NULL == taosArrayPush(pExecNodeList, &node))
|
if (NULL == taosArrayPush(pExecNodeList, &node)) code = terrno;
|
||||||
code = terrno;
|
|
||||||
}
|
}
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue