Merge branch '3.0' into feature/TD-11274-3.0

This commit is contained in:
Cary Xu 2022-06-30 15:28:15 +08:00
commit e577e78e15
19 changed files with 399 additions and 212 deletions

View File

@ -125,6 +125,7 @@ typedef enum EFunctionType {
FUNCTION_TYPE_BLOCK_DIST_INFO, // block distribution pseudo column function FUNCTION_TYPE_BLOCK_DIST_INFO, // block distribution pseudo column function
FUNCTION_TYPE_TO_COLUMN, FUNCTION_TYPE_TO_COLUMN,
FUNCTION_TYPE_GROUP_KEY, FUNCTION_TYPE_GROUP_KEY,
FUNCTION_TYPE_CACHE_LAST_ROW,
// distributed splitting functions // distributed splitting functions
FUNCTION_TYPE_APERCENTILE_PARTIAL = 4000, FUNCTION_TYPE_APERCENTILE_PARTIAL = 4000,

View File

@ -91,6 +91,7 @@ typedef struct SAggLogicNode {
SLogicNode node; SLogicNode node;
SNodeList* pGroupKeys; SNodeList* pGroupKeys;
SNodeList* pAggFuncs; SNodeList* pAggFuncs;
bool hasLastRow;
} SAggLogicNode; } SAggLogicNode;
typedef struct SProjectLogicNode { typedef struct SProjectLogicNode {

View File

@ -111,9 +111,9 @@ else
fi fi
csudo="" csudo=""
if command -v sudo > /dev/null; then #if command -v sudo > /dev/null; then
csudo="sudo " # csudo="sudo "
fi #fi
function is_valid_version() { function is_valid_version() {
[ -z $1 ] && return 1 || : [ -z $1 ] && return 1 || :
@ -181,7 +181,9 @@ cd "${curr_dir}"
# 2. cmake executable file # 2. cmake executable file
compile_dir="${top_dir}/debug" compile_dir="${top_dir}/debug"
${csudo}rm -rf ${compile_dir} if [ -d ${compile_dir} ]; then
rm -rf ${compile_dir}
fi
mkdir -p ${compile_dir} mkdir -p ${compile_dir}
cd ${compile_dir} cd ${compile_dir}
@ -258,9 +260,9 @@ if [ "$osType" != "Darwin" ]; then
if [[ "$pagMode" == "full" ]]; then if [[ "$pagMode" == "full" ]]; then
if [ -d ${top_dir}/tools/taos-tools/packaging/deb ]; then if [ -d ${top_dir}/tools/taos-tools/packaging/deb ]; then
cd ${top_dir}/tools/taos-tools/packaging/deb cd ${top_dir}/tools/taos-tools/packaging/deb
taos_tools_ver=$(git describe --tags | sed -e 's/ver-//g' | awk -F '-' '{print $1}')
[ -z "$taos_tools_ver" ] && taos_tools_ver="0.1.0" [ -z "$taos_tools_ver" ] && taos_tools_ver="0.1.0"
taos_tools_ver=$(git describe --tags | sed -e 's/ver-//g' | awk -F '-' '{print $1}')
${csudo}./make-taos-tools-deb.sh ${top_dir} \ ${csudo}./make-taos-tools-deb.sh ${top_dir} \
${compile_dir} ${output_dir} ${taos_tools_ver} ${cpuType} ${osType} ${verMode} ${verType} ${compile_dir} ${output_dir} ${taos_tools_ver} ${cpuType} ${osType} ${verMode} ${verType}
fi fi
@ -283,9 +285,9 @@ if [ "$osType" != "Darwin" ]; then
if [[ "$pagMode" == "full" ]]; then if [[ "$pagMode" == "full" ]]; then
if [ -d ${top_dir}/tools/taos-tools/packaging/rpm ]; then if [ -d ${top_dir}/tools/taos-tools/packaging/rpm ]; then
cd ${top_dir}/tools/taos-tools/packaging/rpm cd ${top_dir}/tools/taos-tools/packaging/rpm
taos_tools_ver=$(git describe --tags | sed -e 's/ver-//g' | awk -F '-' '{print $1}' | sed -e 's/-/_/g')
[ -z "$taos_tools_ver" ] && taos_tools_ver="0.1.0" [ -z "$taos_tools_ver" ] && taos_tools_ver="0.1.0"
taos_tools_ver=$(git describe --tags | sed -e 's/ver-//g' | awk -F '-' '{print $1}' | sed -e 's/-/_/g')
${csudo}./make-taos-tools-rpm.sh ${top_dir} \ ${csudo}./make-taos-tools-rpm.sh ${top_dir} \
${compile_dir} ${output_dir} ${taos_tools_ver} ${cpuType} ${osType} ${verMode} ${verType} ${compile_dir} ${output_dir} ${taos_tools_ver} ${cpuType} ${osType} ${verMode} ${verType}
fi fi
@ -300,7 +302,7 @@ if [ "$osType" != "Darwin" ]; then
${csudo}./makepkg.sh ${compile_dir} ${verNumber} "${build_time}" ${cpuType} ${osType} ${verMode} ${verType} ${pagMode} ${verNumberComp} ${dbName} ${csudo}./makepkg.sh ${compile_dir} ${verNumber} "${build_time}" ${cpuType} ${osType} ${verMode} ${verType} ${pagMode} ${verNumberComp} ${dbName}
${csudo}./makeclient.sh ${compile_dir} ${verNumber} "${build_time}" ${cpuType} ${osType} ${verMode} ${verType} ${pagMode} ${dbName} ${csudo}./makeclient.sh ${compile_dir} ${verNumber} "${build_time}" ${cpuType} ${osType} ${verMode} ${verType} ${pagMode} ${dbName}
# ${csudo}./makearbi.sh ${compile_dir} ${verNumber} "${build_time}" ${cpuType} ${osType} ${verMode} ${verType} ${pagMode} ${csudo}./makearbi.sh ${compile_dir} ${verNumber} "${build_time}" ${cpuType} ${osType} ${verMode} ${verType} ${pagMode}
else else
# only make client for Darwin # only make client for Darwin

View File

@ -39,6 +39,20 @@ static int32_t invaildFuncParaValueErrMsg(char* pErrBuf, int32_t len, const char
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_PARA_VALUE, "Invalid parameter value : %s", pFuncName); return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_PARA_VALUE, "Invalid parameter value : %s", pFuncName);
} }
void static addDbPrecisonParam(SNodeList** pList, uint8_t precision) {
SValueNode* pVal = (SValueNode*)nodesMakeNode(QUERY_NODE_VALUE);
pVal->literal = NULL;
pVal->isDuration = false;
pVal->translate = true;
pVal->node.resType.type = TSDB_DATA_TYPE_TINYINT;
pVal->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_TINYINT].bytes;
pVal->node.resType.precision = precision;
pVal->datum.i = (int64_t)precision;
pVal->typeData = (int64_t)precision;
nodesListMakeAppend(pList, (SNode*)pVal);
}
// There is only one parameter of numeric type, and the return type is parameter type // There is only one parameter of numeric type, and the return type is parameter type
static int32_t translateInOutNum(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { static int32_t translateInOutNum(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
if (1 != LIST_LENGTH(pFunc->pParameterList)) { if (1 != LIST_LENGTH(pFunc->pParameterList)) {
@ -220,8 +234,20 @@ static int32_t translateWduration(SFunctionNode* pFunc, char* pErrBuf, int32_t l
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t translateNowToday(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
// pseudo column do not need to check parameters
//add database precision as param
uint8_t dbPrec = pFunc->node.resType.precision;
addDbPrecisonParam(&pFunc->pParameterList, dbPrec);
pFunc->node.resType = (SDataType){.bytes = sizeof(int64_t), .type = TSDB_DATA_TYPE_TIMESTAMP};
return TSDB_CODE_SUCCESS;
}
static int32_t translateTimePseudoColumn(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { static int32_t translateTimePseudoColumn(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
// pseudo column do not need to check parameters // pseudo column do not need to check parameters
pFunc->node.resType = (SDataType){.bytes = sizeof(int64_t), .type = TSDB_DATA_TYPE_TIMESTAMP}; pFunc->node.resType = (SDataType){.bytes = sizeof(int64_t), .type = TSDB_DATA_TYPE_TIMESTAMP};
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -1379,20 +1405,6 @@ static bool validateTimezoneFormat(const SValueNode* pVal) {
return true; return true;
} }
void static addDbPrecisonParam(SNodeList* pList, uint8_t precision) {
SValueNode* pVal = (SValueNode*)nodesMakeNode(QUERY_NODE_VALUE);
pVal->literal = NULL;
pVal->isDuration = false;
pVal->translate = true;
pVal->node.resType.type = TSDB_DATA_TYPE_TINYINT;
pVal->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_TINYINT].bytes;
pVal->node.resType.precision = precision;
pVal->datum.i = (int64_t)precision;
pVal->typeData = (int64_t)precision;
nodesListAppend(pList, (SNode*)pVal);
}
void static addTimezoneParam(SNodeList* pList) { void static addTimezoneParam(SNodeList* pList) {
char buf[6] = {0}; char buf[6] = {0};
time_t t = taosTime(NULL); time_t t = taosTime(NULL);
@ -1462,7 +1474,7 @@ static int32_t translateToUnixtimestamp(SFunctionNode* pFunc, char* pErrBuf, int
//add database precision as param //add database precision as param
uint8_t dbPrec = pFunc->node.resType.precision; uint8_t dbPrec = pFunc->node.resType.precision;
addDbPrecisonParam(pFunc->pParameterList, dbPrec); addDbPrecisonParam(&pFunc->pParameterList, dbPrec);
pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes, .type = TSDB_DATA_TYPE_BIGINT}; pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes, .type = TSDB_DATA_TYPE_BIGINT};
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -1482,7 +1494,7 @@ static int32_t translateTimeTruncate(SFunctionNode* pFunc, char* pErrBuf, int32_
//add database precision as param //add database precision as param
uint8_t dbPrec = pFunc->node.resType.precision; uint8_t dbPrec = pFunc->node.resType.precision;
addDbPrecisonParam(pFunc->pParameterList, dbPrec); addDbPrecisonParam(&pFunc->pParameterList, dbPrec);
pFunc->node.resType = pFunc->node.resType =
(SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes, .type = TSDB_DATA_TYPE_TIMESTAMP}; (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes, .type = TSDB_DATA_TYPE_TIMESTAMP};
@ -1510,7 +1522,7 @@ static int32_t translateTimeDiff(SFunctionNode* pFunc, char* pErrBuf, int32_t le
//add database precision as param //add database precision as param
uint8_t dbPrec = pFunc->node.resType.precision; uint8_t dbPrec = pFunc->node.resType.precision;
addDbPrecisonParam(pFunc->pParameterList, dbPrec); addDbPrecisonParam(&pFunc->pParameterList, dbPrec);
pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes, .type = TSDB_DATA_TYPE_BIGINT}; pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes, .type = TSDB_DATA_TYPE_BIGINT};
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -1938,6 +1950,19 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.name = "last_row", .name = "last_row",
.type = FUNCTION_TYPE_LAST_ROW, .type = FUNCTION_TYPE_LAST_ROW,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_TIMELINE_FUNC, .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_TIMELINE_FUNC,
.translateFunc = translateFirstLast,
.getEnvFunc = getFirstLastFuncEnv,
.initFunc = functionSetup,
.processFunc = lastFunction,
.finalizeFunc = firstLastFinalize,
.pPartialFunc = "_last_partial",
.pMergeFunc = "_last_merge",
.combineFunc = lastCombine,
},
{
.name = "_cache_last_row",
.type = FUNCTION_TYPE_CACHE_LAST_ROW,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_TIMELINE_FUNC,
.translateFunc = translateLastRow, .translateFunc = translateLastRow,
.getEnvFunc = getMinmaxFuncEnv, .getEnvFunc = getMinmaxFuncEnv,
.initFunc = minmaxFunctionSetup, .initFunc = minmaxFunctionSetup,
@ -2466,7 +2491,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.name = "now", .name = "now",
.type = FUNCTION_TYPE_NOW, .type = FUNCTION_TYPE_NOW,
.classification = FUNC_MGT_SCALAR_FUNC | FUNC_MGT_DATETIME_FUNC, .classification = FUNC_MGT_SCALAR_FUNC | FUNC_MGT_DATETIME_FUNC,
.translateFunc = translateTimePseudoColumn, .translateFunc = translateNowToday,
.getEnvFunc = NULL, .getEnvFunc = NULL,
.initFunc = NULL, .initFunc = NULL,
.sprocessFunc = nowFunction, .sprocessFunc = nowFunction,
@ -2476,7 +2501,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.name = "today", .name = "today",
.type = FUNCTION_TYPE_TODAY, .type = FUNCTION_TYPE_TODAY,
.classification = FUNC_MGT_SCALAR_FUNC | FUNC_MGT_DATETIME_FUNC, .classification = FUNC_MGT_SCALAR_FUNC | FUNC_MGT_DATETIME_FUNC,
.translateFunc = translateTimePseudoColumn, .translateFunc = translateNowToday,
.getEnvFunc = NULL, .getEnvFunc = NULL,
.initFunc = NULL, .initFunc = NULL,
.sprocessFunc = todayFunction, .sprocessFunc = todayFunction,

View File

@ -4283,7 +4283,7 @@ int32_t stateDurationFunction(SqlFunctionCtx* pCtx) {
SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput; SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput;
// TODO: process timeUnit for different db precisions // TODO: process timeUnit for different db precisions
int32_t timeUnit = 1000; int32_t timeUnit = 1;
if (pCtx->numOfParams == 5) { // TODO: param number incorrect if (pCtx->numOfParams == 5) { // TODO: param number incorrect
timeUnit = pCtx->param[3].param.i; timeUnit = pCtx->param[3].param.i;
} }

View File

@ -154,16 +154,12 @@ static int32_t createSelectRootLogicNode(SLogicPlanContext* pCxt, SSelectStmt* p
return createRootLogicNode(pCxt, pSelect, pSelect->precision, (FCreateLogicNode)func, pRoot); return createRootLogicNode(pCxt, pSelect, pSelect->precision, (FCreateLogicNode)func, pRoot);
} }
static EScanType getScanType(SLogicPlanContext* pCxt, SSelectStmt* pSelect, SNodeList* pScanPseudoCols, static EScanType getScanType(SLogicPlanContext* pCxt, SNodeList* pScanPseudoCols, SNodeList* pScanCols,
SNodeList* pScanCols, int8_t tableType) { int8_t tableType) {
if (pCxt->pPlanCxt->topicQuery || pCxt->pPlanCxt->streamQuery) { if (pCxt->pPlanCxt->topicQuery || pCxt->pPlanCxt->streamQuery) {
return SCAN_TYPE_STREAM; return SCAN_TYPE_STREAM;
} }
if (pSelect->hasLastRowFunc) {
return SCAN_TYPE_LAST_ROW;
}
if (NULL == pScanCols) { if (NULL == pScanCols) {
// select count(*) from t // select count(*) from t
return NULL == pScanPseudoCols return NULL == pScanPseudoCols
@ -279,7 +275,7 @@ static int32_t createScanLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
code = rewriteExprsForSelect(pScan->pScanPseudoCols, pSelect, SQL_CLAUSE_FROM); code = rewriteExprsForSelect(pScan->pScanPseudoCols, pSelect, SQL_CLAUSE_FROM);
} }
pScan->scanType = getScanType(pCxt, pSelect, pScan->pScanPseudoCols, pScan->pScanCols, pScan->tableType); pScan->scanType = getScanType(pCxt, pScan->pScanPseudoCols, pScan->pScanCols, pScan->tableType);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = addPrimaryKeyCol(pScan->tableId, &pScan->pScanCols); code = addPrimaryKeyCol(pScan->tableId, &pScan->pScanCols);
@ -474,6 +470,8 @@ static int32_t createAggLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect,
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
pAgg->hasLastRow = pSelect->hasLastRowFunc;
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
// set grouyp keys, agg funcs and having conditions // set grouyp keys, agg funcs and having conditions

View File

@ -1622,6 +1622,46 @@ static int32_t rewriteUniqueOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLog
return rewriteUniqueOptimizeImpl(pCxt, pLogicSubplan, pIndef); return rewriteUniqueOptimizeImpl(pCxt, pLogicSubplan, pIndef);
} }
static bool lastRowScanOptMayBeOptimized(SLogicNode* pNode) {
if (QUERY_NODE_LOGIC_PLAN_AGG != nodeType(pNode) || !(((SAggLogicNode*)pNode)->hasLastRow) ||
NULL != ((SAggLogicNode*)pNode)->pGroupKeys || 1 != LIST_LENGTH(pNode->pChildren) ||
QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(nodesListGetNode(pNode->pChildren, 0)) ||
NULL != ((SScanLogicNode*)nodesListGetNode(pNode->pChildren, 0))->node.pConditions) {
return false;
}
SNode* pFunc = NULL;
FOREACH(pFunc, ((SAggLogicNode*)pNode)->pAggFuncs) {
if (FUNCTION_TYPE_LAST_ROW != ((SFunctionNode*)pFunc)->funcType) {
return false;
}
}
return true;
}
static int32_t lastRowScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan) {
SAggLogicNode* pAgg = (SAggLogicNode*)optFindPossibleNode(pLogicSubplan->pNode, lastRowScanOptMayBeOptimized);
if (NULL == pAgg) {
return TSDB_CODE_SUCCESS;
}
SNode* pNode = NULL;
FOREACH(pNode, pAgg->pAggFuncs) {
SFunctionNode* pFunc = (SFunctionNode*)pNode;
int32_t len = snprintf(pFunc->functionName, sizeof(pFunc->functionName), "_cache_last_row");
pFunc->functionName[len] = '\0';
fmGetFuncInfo(pFunc, NULL, 0);
}
pAgg->hasLastRow = false;
((SScanLogicNode*)nodesListGetNode(pAgg->node.pChildren, 0))->scanType = SCAN_TYPE_LAST_ROW;
pCxt->optimized = true;
return TSDB_CODE_SUCCESS;
}
// merge projects // merge projects
static bool mergeProjectsMayBeOptimized(SLogicNode* pNode) { static bool mergeProjectsMayBeOptimized(SLogicNode* pNode) {
if (QUERY_NODE_LOGIC_PLAN_PROJECT != nodeType(pNode) || 1 != LIST_LENGTH(pNode->pChildren)) { if (QUERY_NODE_LOGIC_PLAN_PROJECT != nodeType(pNode) || 1 != LIST_LENGTH(pNode->pChildren)) {
@ -1710,7 +1750,8 @@ static const SOptimizeRule optimizeRuleSet[] = {
{.pName = "EliminateProject", .optimizeFunc = eliminateProjOptimize}, {.pName = "EliminateProject", .optimizeFunc = eliminateProjOptimize},
{.pName = "EliminateSetOperator", .optimizeFunc = eliminateSetOpOptimize}, {.pName = "EliminateSetOperator", .optimizeFunc = eliminateSetOpOptimize},
{.pName = "RewriteTail", .optimizeFunc = rewriteTailOptimize}, {.pName = "RewriteTail", .optimizeFunc = rewriteTailOptimize},
{.pName = "RewriteUnique", .optimizeFunc = rewriteUniqueOptimize} {.pName = "RewriteUnique", .optimizeFunc = rewriteUniqueOptimize},
{.pName = "LastRowScan", .optimizeFunc = lastRowScanOptimize}
}; };
// clang-format on // clang-format on

View File

@ -1344,7 +1344,7 @@ static int32_t createMergePhysiNode(SPhysiPlanContext* pCxt, SMergeLogicNode* pM
} }
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code && NULL != pMergeLogicNode->pMergeKeys) {
code = setListSlotId(pCxt, pMerge->node.pOutputDataBlockDesc->dataBlockId, -1, pMergeLogicNode->pMergeKeys, code = setListSlotId(pCxt, pMerge->node.pOutputDataBlockDesc->dataBlockId, -1, pMergeLogicNode->pMergeKeys,
&pMerge->pMergeKeys); &pMerge->pMergeKeys);
} }

View File

@ -197,6 +197,8 @@ static bool stbSplNeedSplit(bool streamQuery, SLogicNode* pNode) {
return stbSplIsMultiTbScan(streamQuery, (SScanLogicNode*)pNode); return stbSplIsMultiTbScan(streamQuery, (SScanLogicNode*)pNode);
case QUERY_NODE_LOGIC_PLAN_JOIN: case QUERY_NODE_LOGIC_PLAN_JOIN:
return !(((SJoinLogicNode*)pNode)->isSingleTableJoin); return !(((SJoinLogicNode*)pNode)->isSingleTableJoin);
case QUERY_NODE_LOGIC_PLAN_PARTITION:
return stbSplHasMultiTbScan(streamQuery, pNode);
case QUERY_NODE_LOGIC_PLAN_AGG: case QUERY_NODE_LOGIC_PLAN_AGG:
return !stbSplHasGatherExecFunc(((SAggLogicNode*)pNode)->pAggFuncs) && stbSplHasMultiTbScan(streamQuery, pNode); return !stbSplHasGatherExecFunc(((SAggLogicNode*)pNode)->pAggFuncs) && stbSplHasMultiTbScan(streamQuery, pNode);
case QUERY_NODE_LOGIC_PLAN_WINDOW: case QUERY_NODE_LOGIC_PLAN_WINDOW:
@ -431,7 +433,7 @@ static int32_t stbSplSplitIntervalForBatch(SSplitContext* pCxt, SStableSplitInfo
SNodeList* pMergeKeys = NULL; SNodeList* pMergeKeys = NULL;
code = stbSplCreateMergeKeysByPrimaryKey(((SWindowLogicNode*)pInfo->pSplitNode)->pTspk, &pMergeKeys); code = stbSplCreateMergeKeysByPrimaryKey(((SWindowLogicNode*)pInfo->pSplitNode)->pTspk, &pMergeKeys);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = stbSplCreateMergeNode(pCxt, NULL, pInfo->pSplitNode, pMergeKeys, pPartWindow, false); code = stbSplCreateMergeNode(pCxt, NULL, pInfo->pSplitNode, pMergeKeys, pPartWindow, true);
} }
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
nodesDestroyList(pMergeKeys); nodesDestroyList(pMergeKeys);
@ -887,6 +889,16 @@ static int32_t stbSplSplitJoinNode(SSplitContext* pCxt, SStableSplitInfo* pInfo)
return code; return code;
} }
static int32_t stbSplSplitPartitionNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
int32_t code = stbSplCreateMergeNode(pCxt, pInfo->pSubplan, pInfo->pSplitNode, NULL, pInfo->pSplitNode, true);
if (TSDB_CODE_SUCCESS == code) {
code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
(SNode*)splCreateScanSubplan(pCxt, pInfo->pSplitNode, SPLIT_FLAG_STABLE_SPLIT));
}
++(pCxt->groupId);
return code;
}
static int32_t stableSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) { static int32_t stableSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
if (pCxt->pPlanCxt->rSmaQuery) { if (pCxt->pPlanCxt->rSmaQuery) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -905,6 +917,9 @@ static int32_t stableSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
case QUERY_NODE_LOGIC_PLAN_JOIN: case QUERY_NODE_LOGIC_PLAN_JOIN:
code = stbSplSplitJoinNode(pCxt, &info); code = stbSplSplitJoinNode(pCxt, &info);
break; break;
case QUERY_NODE_LOGIC_PLAN_PARTITION:
code = stbSplSplitPartitionNode(pCxt, &info);
break;
case QUERY_NODE_LOGIC_PLAN_AGG: case QUERY_NODE_LOGIC_PLAN_AGG:
code = stbSplSplitAggNode(pCxt, &info); code = stbSplSplitAggNode(pCxt, &info);
break; break;

View File

@ -99,6 +99,8 @@ TEST_F(PlanBasicTest, lastRowFunc) {
run("SELECT LAST_ROW(c1, c2) FROM t1"); run("SELECT LAST_ROW(c1, c2) FROM t1");
run("SELECT LAST_ROW(c1) FROM st1"); run("SELECT LAST_ROW(c1) FROM st1");
run("SELECT LAST_ROW(c1), SUM(c3) FROM t1");
} }
TEST_F(PlanBasicTest, sampleFunc) { TEST_F(PlanBasicTest, sampleFunc) {

View File

@ -1515,7 +1515,10 @@ int32_t timeDiffFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p
} }
int32_t nowFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { int32_t nowFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
int64_t ts = taosGetTimestamp(TSDB_TIME_PRECISION_MILLI); int64_t timePrec;
GET_TYPED_DATA(timePrec, int64_t, GET_PARAM_TYPE(&pInput[0]), pInput[0].columnData->pData);
int64_t ts = taosGetTimestamp(timePrec);
for (int32_t i = 0; i < pInput->numOfRows; ++i) { for (int32_t i = 0; i < pInput->numOfRows; ++i) {
colDataAppendInt64(pOutput->columnData, i, &ts); colDataAppendInt64(pOutput->columnData, i, &ts);
} }
@ -1524,7 +1527,10 @@ int32_t nowFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutpu
} }
int32_t todayFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { int32_t todayFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
int64_t ts = taosGetTimestampToday(TSDB_TIME_PRECISION_MILLI); int64_t timePrec;
GET_TYPED_DATA(timePrec, int64_t, GET_PARAM_TYPE(&pInput[0]), pInput[0].columnData->pData);
int64_t ts = taosGetTimestampToday(timePrec);
for (int32_t i = 0; i < pInput->numOfRows; ++i) { for (int32_t i = 0; i < pInput->numOfRows; ++i) {
colDataAppendInt64(pOutput->columnData, i, &ts); colDataAppendInt64(pOutput->columnData, i, &ts);
} }

View File

@ -38,15 +38,15 @@ class TDTestCase:
tdSql.query(f"select stateduration(col{i},'{j}',5) from test") tdSql.query(f"select stateduration(col{i},'{j}',5) from test")
tdSql.checkRows(10) tdSql.checkRows(10)
if j in ['LT' ,'lt','Lt','lT']: if j in ['LT' ,'lt','Lt','lT']:
tdSql.checkEqual(tdSql.queryResult,[(0,), (0,), (0,), (0,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,)]) tdSql.checkEqual(tdSql.queryResult,[(0,), (1,), (2,), (3,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,)])
elif j in ['GT','gt', 'Gt','gT']: elif j in ['GT','gt', 'Gt','gT']:
tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (-1,), (0,), (0,), (0,), (0,), (0,)]) tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (-1,), (0,), (1,), (2,), (3,), (4,)])
elif j in ['LE','le','Le','lE']: elif j in ['LE','le','Le','lE']:
tdSql.checkEqual(tdSql.queryResult,[(0,), (0,), (0,), (0,), (0,), (-1,), (-1,), (-1,), (-1,), (-1,)]) tdSql.checkEqual(tdSql.queryResult,[(0,), (1,), (2,), (3,), (4,), (-1,), (-1,), (-1,), (-1,), (-1,)])
elif j in [ 'GE','ge','Ge','gE']: elif j in [ 'GE','ge','Ge','gE']:
tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (0,), (0,), (0,), (0,), (0,), (0,)]) tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (0,), (1,), (2,), (3,), (4,), (5,)])
elif j in ['NE','ne','Ne','nE']: elif j in ['NE','ne','Ne','nE']:
tdSql.checkEqual(tdSql.queryResult,[(0,), (0,), (0,), (0,), (-1,), (0,), (0,), (0,), (0,), (0,)]) tdSql.checkEqual(tdSql.queryResult,[(0,), (1,), (2,), (3,), (-1,), (0,), (1,), (2,), (3,), (4,)])
elif j in ['EQ','eq','Eq','eQ']: elif j in ['EQ','eq','Eq','eQ']:
tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (0,), (-1,), (-1,), (-1,), (-1,), (-1,)]) tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (0,), (-1,), (-1,), (-1,), (-1,), (-1,)])
for i in float_list: for i in float_list:
@ -54,11 +54,11 @@ class TDTestCase:
tdSql.query(f"select stateduration(col{i},'{j}',5) from test") tdSql.query(f"select stateduration(col{i},'{j}',5) from test")
tdSql.checkRows(10) tdSql.checkRows(10)
if j in ['LT','lt','Lt','lT','LE','le','Le','lE']: if j in ['LT','lt','Lt','lT','LE','le','Le','lE']:
tdSql.checkEqual(tdSql.queryResult,[(0,), (0,), (0,), (0,), (0,), (-1,), (-1,), (-1,), (-1,), (-1,)]) tdSql.checkEqual(tdSql.queryResult,[(0,), (1,), (2,), (3,), (4,), (-1,), (-1,), (-1,), (-1,), (-1,)])
elif j in ['GE','ge','Ge','gE','GT','gt','Gt','gT']: elif j in ['GE','ge','Ge','gE','GT','gt','Gt','gT']:
tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (-1,), (0,), (0,), (0,), (0,), (0,)]) tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (-1,), (0,), (1,), (2,), (3,), (4,)])
elif j in ['NE','ne','Ne','nE']: elif j in ['NE','ne','Ne','nE']:
tdSql.checkEqual(tdSql.queryResult,[(0,), (0,), (0,), (0,), (0,), (0,), (0,), (0,), (0,), (0,)]) tdSql.checkEqual(tdSql.queryResult,[(0,), (1,), (2,), (3,), (4,), (5,), (6,), (7,), (8,), (9,)])
elif j in ['EQ','eq','Eq','eQ']: elif j in ['EQ','eq','Eq','eQ']:
tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,)]) tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,)])
@ -84,15 +84,15 @@ class TDTestCase:
tdSql.checkRows(10) tdSql.checkRows(10)
# print(tdSql.queryResult) # print(tdSql.queryResult)
if j in ['LT' ,'lt','Lt','lT']: if j in ['LT' ,'lt','Lt','lT']:
tdSql.checkEqual(tdSql.queryResult,[(0,), (1,), (2,), (3,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,)]) tdSql.checkEqual(tdSql.queryResult,[(0,), (1000,), (2000,), (3000,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,)])
elif j in ['GT','gt', 'Gt','gT']: elif j in ['GT','gt', 'Gt','gT']:
tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (-1,), (0,), (1,), (2,), (3,), (4,)]) tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (-1,), (0,), (1000,), (2000,), (3000,), (4000,)])
elif j in ['LE','le','Le','lE']: elif j in ['LE','le','Le','lE']:
tdSql.checkEqual(tdSql.queryResult,[(0,), (1,), (2,), (3,), (4,), (-1,), (-1,), (-1,), (-1,), (-1,)]) tdSql.checkEqual(tdSql.queryResult,[(0,), (1000,), (2000,), (3000,), (4000,), (-1,), (-1,), (-1,), (-1,), (-1,)])
elif j in [ 'GE','ge','Ge','gE']: elif j in [ 'GE','ge','Ge','gE']:
tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (0,), (1,), (2,), (3,), (4,), (5,)]) tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (0,), (1000,), (2000,), (3000,), (4000,), (5000,)])
elif j in ['NE','ne','Ne','nE']: elif j in ['NE','ne','Ne','nE']:
tdSql.checkEqual(tdSql.queryResult,[(0,), (1,), (2,), (3,), (-1,), (0,), (1,), (2,), (3,), (4,)]) tdSql.checkEqual(tdSql.queryResult,[(0,), (1000,), (2000,), (3000,), (-1,), (0,), (1000,), (2000,), (3000,), (4000,)])
elif j in ['EQ','eq','Eq','eQ']: elif j in ['EQ','eq','Eq','eQ']:
tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (0,), (-1,), (-1,), (-1,), (-1,), (-1,)]) tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (0,), (-1,), (-1,), (-1,), (-1,), (-1,)])
for i in float_list: for i in float_list:
@ -101,11 +101,11 @@ class TDTestCase:
tdSql.checkRows(10) tdSql.checkRows(10)
print(tdSql.queryResult) print(tdSql.queryResult)
if j in ['LT','lt','Lt','lT','LE','le','Le','lE']: if j in ['LT','lt','Lt','lT','LE','le','Le','lE']:
tdSql.checkEqual(tdSql.queryResult,[(0,), (1,), (2,), (3,), (4,), (-1,), (-1,), (-1,), (-1,), (-1,)]) tdSql.checkEqual(tdSql.queryResult,[(0,), (1000,), (2000,), (3000,), (4000,), (-1,), (-1,), (-1,), (-1,), (-1,)])
elif j in ['GE','ge','Ge','gE','GT','gt','Gt','gT']: elif j in ['GE','ge','Ge','gE','GT','gt','Gt','gT']:
tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (-1,), (0,), (1,), (2,), (3,), (4,)]) tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (-1,), (0,), (1000,), (2000,), (3000,), (4000,)])
elif j in ['NE','ne','Ne','nE']: elif j in ['NE','ne','Ne','nE']:
tdSql.checkEqual(tdSql.queryResult,[(0,), (1,), (2,), (3,), (4,), (5,), (6,), (7,), (8,), (9,)]) tdSql.checkEqual(tdSql.queryResult,[(0,), (1000,), (2000,), (3000,), (4000,), (5000,), (6000,), (7000,), (8000,), (9000,)])
elif j in ['EQ','eq','Eq','eQ']: elif j in ['EQ','eq','Eq','eQ']:
tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,)]) tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,)])

View File

@ -9,27 +9,21 @@ from util.sql import *
from util.cases import * from util.cases import *
from util.dnodes import TDDnodes from util.dnodes import TDDnodes
from util.dnodes import TDDnode from util.dnodes import TDDnode
from util.cluster import *
import time import time
import socket import socket
import subprocess import subprocess
sys.path.append("./6-cluster")
class MyDnodes(TDDnodes): from clusterCommonCreate import *
def __init__(self ,dnodes_lists): from clusterCommonCheck import *
super(MyDnodes,self).__init__()
self.dnodes = dnodes_lists # dnode must be TDDnode instance
self.simDeployed = False
class TDTestCase: class TDTestCase:
noConn = True
def init(self,conn ,logSql): def init(self,conn ,logSql):
tdLog.debug(f"start to excute {__file__}") tdLog.debug(f"start to excute {__file__}")
self.TDDnodes = None tdSql.init(conn.cursor())
self.depoly_cluster(5) self.host = socket.gethostname()
self.master_dnode = self.TDDnodes.dnodes[0]
self.host=self.master_dnode.cfgDict["fqdn"]
conn1 = taos.connect(self.master_dnode.cfgDict["fqdn"] , config=self.master_dnode.cfgDir)
tdSql.init(conn1.cursor())
def getBuildPath(self): def getBuildPath(self):
@ -41,91 +35,13 @@ class TDTestCase:
projPath = selfPath[:selfPath.find("tests")] projPath = selfPath[:selfPath.find("tests")]
for root, dirs, files in os.walk(projPath): for root, dirs, files in os.walk(projPath):
if ("taosd" in files or "taosd.exe" in files): if ("taosd" in files):
rootRealPath = os.path.dirname(os.path.realpath(root)) rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath): if ("packaging" not in rootRealPath):
buildPath = root[:len(root) - len("/build/bin")] buildPath = root[:len(root) - len("/build/bin")]
break break
return buildPath return buildPath
def depoly_cluster(self ,dnodes_nums):
testCluster = False
valgrind = 0
hostname = socket.gethostname()
dnodes = []
start_port = 6030
start_port_sec = 6130
for num in range(1, dnodes_nums+1):
dnode = TDDnode(num)
dnode.addExtraCfg("firstEp", f"{hostname}:{start_port}")
dnode.addExtraCfg("fqdn", f"{hostname}")
dnode.addExtraCfg("serverPort", f"{start_port + (num-1)*100}")
dnode.addExtraCfg("monitorFqdn", hostname)
dnode.addExtraCfg("monitorPort", 7043)
dnode.addExtraCfg("secondEp", f"{hostname}:{start_port_sec}")
dnodes.append(dnode)
self.TDDnodes = MyDnodes(dnodes)
self.TDDnodes.init("")
self.TDDnodes.setTestCluster(testCluster)
self.TDDnodes.setValgrind(valgrind)
self.TDDnodes.stopAll()
for dnode in self.TDDnodes.dnodes:
self.TDDnodes.deploy(dnode.index,{})
for dnode in self.TDDnodes.dnodes:
self.TDDnodes.starttaosd(dnode.index)
# create cluster
for dnode in self.TDDnodes.dnodes[1:]:
# print(dnode.cfgDict)
dnode_id = dnode.cfgDict["fqdn"] + ":" +dnode.cfgDict["serverPort"]
dnode_first_host = dnode.cfgDict["firstEp"].split(":")[0]
dnode_first_port = dnode.cfgDict["firstEp"].split(":")[-1]
cmd = f"{self.getBuildPath()}/build/bin/taos -h {dnode_first_host} -P {dnode_first_port} -s \"create dnode \\\"{dnode_id}\\\"\""
print(cmd)
os.system(cmd)
time.sleep(2)
tdLog.info(" create cluster done! ")
def five_dnode_one_mnode(self):
tdSql.query("show dnodes;")
tdSql.checkData(0,1,'%s:6030'%self.host)
tdSql.checkData(4,1,'%s:6430'%self.host)
tdSql.checkData(0,4,'ready')
tdSql.checkData(4,4,'ready')
tdSql.query("show mnodes;")
tdSql.checkData(0,1,'%s:6030'%self.host)
tdSql.checkData(0,2,'leader')
tdSql.checkData(0,3,'ready')
tdSql.error("create mnode on dnode 1;")
tdSql.error("drop mnode on dnode 1;")
tdSql.execute("drop database if exists db")
tdSql.execute("create database if not exists db replica 1 duration 300")
tdSql.execute("use db")
tdSql.execute(
'''create table stb1
(ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp)
tags (t1 int)
'''
)
tdSql.execute(
'''
create table t1
(ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp)
'''
)
for i in range(4):
tdSql.execute(f'create table ct{i+1} using stb1 tags ( {i+1} )')
def five_dnode_two_mnode(self): def five_dnode_two_mnode(self):
tdSql.query("show dnodes;") tdSql.query("show dnodes;")
tdSql.checkData(0,1,'%s:6030'%self.host) tdSql.checkData(0,1,'%s:6030'%self.host)
@ -187,6 +103,34 @@ class TDTestCase:
tdSql.error("create mnode on dnode 2") tdSql.error("create mnode on dnode 2")
tdSql.query("show dnodes;") tdSql.query("show dnodes;")
print(tdSql.queryResult) print(tdSql.queryResult)
clusterComCheck.checkDnodes(5)
# restart all taosd
tdDnodes=cluster.dnodes
# stop follower
tdLog.info("stop follower")
tdDnodes[1].stoptaosd()
if cluster.checkConnectStatus(0) :
print("cluster also work")
# start follower
tdLog.info("start follower")
tdDnodes[1].starttaosd()
if clusterComCheck.checkMnodeStatus(2) :
print("both mnodes are ready")
# stop leader
tdLog.info("stop leader")
tdDnodes[0].stoptaosd()
try:
cluster.checkConnectStatus(2)
tdLog.exit(" The election still succeeds when leader of both mnodes are killed ")
except Exception:
pass
tdLog.info("start leader")
tdDnodes[0].starttaosd()
if clusterComCheck.checkMnodeStatus(2) :
print("both mnodes are ready")
# # fisrt drop follower of mnode # # fisrt drop follower of mnode
# BUG # BUG
@ -229,8 +173,6 @@ class TDTestCase:
def run(self): def run(self):
print(self.master_dnode.cfgDict)
self.five_dnode_one_mnode()
self.five_dnode_two_mnode() self.five_dnode_two_mnode()

View File

@ -88,7 +88,7 @@ class TDTestCase:
tdLog.info("Confirm the status of the dnode again") tdLog.info("Confirm the status of the dnode again")
tdSql.error("create mnode on dnode 2") tdSql.error("create mnode on dnode 2")
tdSql.query("show dnodes;") tdSql.query("show dnodes;")
print(tdSql.queryResult) # print(tdSql.queryResult)
clusterComCheck.checkDnodes(dnodenumbers) clusterComCheck.checkDnodes(dnodenumbers)
# restart all taosd # restart all taosd
tdDnodes=cluster.dnodes tdDnodes=cluster.dnodes
@ -108,18 +108,6 @@ class TDTestCase:
tdDnodes[0].starttaosd() tdDnodes[0].starttaosd()
clusterComCheck.checkMnodeStatus(3) clusterComCheck.checkMnodeStatus(3)
tdLog.info("Take turns stopping all dnodes ")
# seperate vnode and mnode in different dnodes.
# create database and stable
stopcount =0
while stopcount <= 2:
tdLog.info("first restart loop")
for i in range(dnodenumbers):
tdDnodes[i].stoptaosd()
tdDnodes[i].starttaosd()
stopcount+=1
clusterComCheck.checkDnodes(dnodenumbers)
clusterComCheck.checkMnodeStatus(3)
def run(self): def run(self):
# print(self.master_dnode.cfgDict) # print(self.master_dnode.cfgDict)

View File

@ -0,0 +1,118 @@
from ssl import ALERT_DESCRIPTION_CERTIFICATE_UNOBTAINABLE
import taos
import sys
import time
import os
from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
from util.dnodes import TDDnodes
from util.dnodes import TDDnode
from util.cluster import *
from test import tdDnodes
sys.path.append("./6-cluster")
from clusterCommonCreate import *
from clusterCommonCheck import *
import time
import socket
import subprocess
from multiprocessing import Process
class TDTestCase:
def init(self,conn ,logSql):
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor())
self.host = socket.gethostname()
def getBuildPath(self):
selfPath = os.path.dirname(os.path.realpath(__file__))
if ("community" in selfPath):
projPath = selfPath[:selfPath.find("community")]
else:
projPath = selfPath[:selfPath.find("tests")]
for root, dirs, files in os.walk(projPath):
if ("taosd" in files):
rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath):
buildPath = root[:len(root) - len("/build/bin")]
break
return buildPath
def fiveDnodeThreeMnode(self,dnodenumbers,mnodeNums,restartNumber):
tdLog.printNoPrefix("======== test case 1: ")
paraDict = {'dbName': 'db',
'dropFlag': 1,
'event': '',
'vgroups': 4,
'replica': 1,
'stbName': 'stb',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}],
'ctbPrefix': 'ctb',
'ctbNum': 1,
'rowsPerTbl': 10000,
'batchNum': 10,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 10,
'showMsg': 1,
'showRow': 1}
dnodenumbers=int(dnodenumbers)
mnodeNums=int(mnodeNums)
dbNumbers = int(dnodenumbers * restartNumber)
tdLog.info("first check dnode and mnode")
tdSql.query("show dnodes;")
tdSql.checkData(0,1,'%s:6030'%self.host)
tdSql.checkData(4,1,'%s:6430'%self.host)
clusterComCheck.checkDnodes(dnodenumbers)
clusterComCheck.checkMnodeStatus(1)
# fisr add three mnodes;
tdLog.info("fisr add three mnodes and check mnode status")
tdSql.execute("create mnode on dnode 2")
clusterComCheck.checkMnodeStatus(2)
tdSql.execute("create mnode on dnode 3")
clusterComCheck.checkMnodeStatus(3)
# add some error operations and
tdLog.info("Confirm the status of the dnode again")
tdSql.error("create mnode on dnode 2")
tdSql.query("show dnodes;")
# print(tdSql.queryResult)
clusterComCheck.checkDnodes(dnodenumbers)
# restart all taosd
tdDnodes=cluster.dnodes
tdLog.info("Take turns stopping all dnodes ")
# seperate vnode and mnode in different dnodes.
# create database and stable
stopcount =0
while stopcount <= 2:
tdLog.info(" restart loop: %d"%stopcount )
for i in range(dnodenumbers):
tdDnodes[i].stoptaosd()
tdDnodes[i].starttaosd()
stopcount+=1
clusterComCheck.checkDnodes(dnodenumbers)
clusterComCheck.checkMnodeStatus(3)
def run(self):
# print(self.master_dnode.cfgDict)
self.fiveDnodeThreeMnode(5,3,1)
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())

View File

@ -16,17 +16,22 @@ from tmqCommon import *
class TDTestCase: class TDTestCase:
paraDict = {'dbName': 'db12', paraDict = {'dbName': 'db12',
'dropFlag': 1, 'dropFlag': 1,
'event': '',
'vgroups': 4, 'vgroups': 4,
'precision': 'ms',
'stbName': 'stb0', 'stbName': 'stb0',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':2}, {'type': 'binary', 'len':16, 'count':1}, {'type': 'timestamp','count':1}],
'tagSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}],
'ctbPrefix': 'ctb',
'ctbStartIdx': 0,
'ctbNum': 10, 'ctbNum': 10,
'rowsPerTbl': 10000, 'rowsPerTbl': 10000,
'batchNum': 10, 'batchNum': 10,
'startTs': 0, # 1640966400000 ----> 2022-01-01 00:00:00.000 'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'event':'', 'pollDelay': 20,
'columnDict': {'int':2}, 'showMsg': 1,
'tagDict': {'int':1} 'showRow': 1}
}
cdbName = 'cdb' cdbName = 'cdb'
# some parameter to consumer processor # some parameter to consumer processor
@ -57,17 +62,18 @@ class TDTestCase:
tmqCom.initConsumerTable(self.cdbName) tmqCom.initConsumerTable(self.cdbName)
tdCom.create_database(tdSql,self.paraDict["dbName"],self.paraDict["dropFlag"], self.paraDict['precision']) tdCom.create_database(tdSql,self.paraDict["dbName"],self.paraDict["dropFlag"])
self.paraDict["stbName"] = 'stb1' self.paraDict["stbName"] = 'stb1'
tdCom.create_stable(tdSql,self.paraDict["dbName"],self.paraDict["stbName"],self.paraDict["columnDict"],self.paraDict["tagDict"]) tdCom.create_stable(tdSql,dbname=self.paraDict["dbName"],stbname=self.paraDict["stbName"],column_elm_list=self.paraDict["colSchema"],tag_elm_list=self.paraDict["tagSchema"],count=1, default_stbname_prefix=self.paraDict["stbName"])
tdCom.create_ctables(tdSql,self.paraDict["dbName"],self.paraDict["stbName"],self.paraDict["ctbNum"],self.paraDict["tagDict"]) tdCom.create_ctable(tdSql,dbname=self.paraDict["dbName"],stbname=self.paraDict["stbName"],tag_elm_list=self.paraDict['tagSchema'],count=self.paraDict["ctbNum"],default_ctbname_prefix=self.paraDict["ctbPrefix"])
tdCom.insert_data(tdSql,self.paraDict["dbName"],self.paraDict["stbName"],self.paraDict["ctbNum"],self.paraDict["rowsPerTbl"],self.paraDict["batchNum"]) tmqCom.insert_data_2(tdSql,self.paraDict["dbName"],self.paraDict["ctbPrefix"],self.paraDict["ctbNum"],self.paraDict["rowsPerTbl"],self.paraDict["batchNum"],self.paraDict["startTs"],self.paraDict["ctbStartIdx"])
self.paraDict["stbName"] = 'stb2' self.paraDict["stbName"] = 'stb2'
tdCom.create_stable(tdSql,self.paraDict["dbName"],self.paraDict["stbName"],self.paraDict["columnDict"],self.paraDict["tagDict"]) self.paraDict["ctbPrefix"] = 'newctb'
tdCom.create_ctables(tdSql,self.paraDict["dbName"],self.paraDict["stbName"],self.paraDict["ctbNum"],self.paraDict["tagDict"]) tdCom.create_stable(tdSql,dbname=self.paraDict["dbName"],stbname=self.paraDict["stbName"],column_elm_list=self.paraDict["colSchema"],tag_elm_list=self.paraDict["tagSchema"],count=1, default_stbname_prefix=self.paraDict["stbName"])
tdCom.insert_data(tdSql,self.paraDict["dbName"],self.paraDict["stbName"],self.paraDict["ctbNum"],self.paraDict["rowsPerTbl"],self.paraDict["batchNum"]) tdCom.create_ctable(tdSql,dbname=self.paraDict["dbName"],stbname=self.paraDict["stbName"],tag_elm_list=self.paraDict['tagSchema'],count=self.paraDict["ctbNum"],default_ctbname_prefix=self.paraDict["ctbPrefix"])
tmqCom.insert_data_2(tdSql,self.paraDict["dbName"],self.paraDict["ctbPrefix"],self.paraDict["ctbNum"],self.paraDict["rowsPerTbl"],self.paraDict["batchNum"],self.paraDict["startTs"],self.paraDict["ctbStartIdx"])
tdLog.info("create topics from db") tdLog.info("create topics from db")
topicName1 = 'topic_%s'%(self.paraDict['dbName']) topicName1 = 'topic_%s'%(self.paraDict['dbName'])
@ -97,7 +103,7 @@ class TDTestCase:
tdLog.info("act consume rows: %d, expect consume rows: between %d and %d"%(totalConsumeRows, self.expectrowcnt/2, self.expectrowcnt)) tdLog.info("act consume rows: %d, expect consume rows: between %d and %d"%(totalConsumeRows, self.expectrowcnt/2, self.expectrowcnt))
tdLog.exit("tmq consume rows error!") tdLog.exit("tmq consume rows error!")
time.sleep(15) time.sleep(10)
tdSql.query("drop topic %s"%topicName1) tdSql.query("drop topic %s"%topicName1)
tdLog.printNoPrefix("======== test case 12 end ...... ") tdLog.printNoPrefix("======== test case 12 end ...... ")

View File

@ -77,6 +77,22 @@ class TMQCom:
return resultList return resultList
def selectConsumeMsgResult(self,expectRows,cdbName='cdb'):
resultList=[]
while 1:
tdSql.query("select * from %s.consumeresult"%cdbName)
#tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3))
if tdSql.getRows() == expectRows:
break
else:
time.sleep(5)
for i in range(expectRows):
tdLog.info ("consume id: %d, consume msgs: %d, consume rows: %d"%(tdSql.getData(i , 1), tdSql.getData(i , 2), tdSql.getData(i , 3)))
resultList.append(tdSql.getData(i , 2))
return resultList
def startTmqSimProcess(self,pollDelay,dbName,showMsg=1,showRow=1,cdbName='cdb',valgrind=0,alias=0): def startTmqSimProcess(self,pollDelay,dbName,showMsg=1,showRow=1,cdbName='cdb',valgrind=0,alias=0):
buildPath = tdCom.getBuildPath() buildPath = tdCom.getBuildPath()
cfgPath = tdCom.getClientCfgPath() cfgPath = tdCom.getClientCfgPath()

View File

@ -116,17 +116,18 @@ python3 ./test.py -f 2-query/function_null.py
python3 ./test.py -f 2-query/queryQnode.py python3 ./test.py -f 2-query/queryQnode.py
python3 ./test.py -f 6-cluster/5dnode1mnode.py python3 ./test.py -f 6-cluster/5dnode1mnode.py
python3 ./test.py -f 6-cluster/5dnode2mnode.py python3 ./test.py -f 6-cluster/5dnode2mnode.py -N 5 -M 3
python3 ./test.py -f 6-cluster/5dnode3mnodeStop.py -N 5 -M 3 python3 ./test.py -f 6-cluster/5dnode3mnodeStop.py -N 5 -M 3
python3 ./test.py -f 6-cluster/5dnode3mnodeStopLoop.py -N 5 -M 3
# BUG python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateDb.py -N 5 -M 3 # BUG python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateDb.py -N 5 -M 3
# BUG python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopMnodeCreateDb.py -N 5 -M 3 # BUG python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopMnodeCreateDb.py -N 5 -M 3
python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopVnodeCreateDb.py -N 5 -M 3 # python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopVnodeCreateDb.py -N 5 -M 3
# BUG python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateStb.py -N 5 -M 3 # BUG python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateStb.py -N 5 -M 3
# BUG python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopMnodeCreateStb.py -N 5 -M 3 # BUG python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopMnodeCreateStb.py -N 5 -M 3
# python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopVnodeCreateStb.py -N 5 -M 3 # python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopVnodeCreateStb.py -N 5 -M 3
# BUG python3 ./test.py -f 6-cluster/5dnode3mnodeStopInsert.py # BUG python3 ./test.py -f 6-cluster/5dnode3mnodeStopInsert.py
# python3 ./test.py -f 6-cluster/5dnode3mnodeDrop.py -N 5 # python3 ./test.py -f 6-cluster/5dnode3mnodeDrop.py -N 5
python3 test.py -f 6-cluster/5dnode3mnodeStopConnect.py -N 5 -M 3 # python3 test.py -f 6-cluster/5dnode3mnodeStopConnect.py -N 5 -M 3
python3 ./test.py -f 7-tmq/basic5.py python3 ./test.py -f 7-tmq/basic5.py
@ -135,6 +136,7 @@ python3 ./test.py -f 7-tmq/subscribeDb0.py
python3 ./test.py -f 7-tmq/subscribeDb1.py python3 ./test.py -f 7-tmq/subscribeDb1.py
python3 ./test.py -f 7-tmq/subscribeDb2.py python3 ./test.py -f 7-tmq/subscribeDb2.py
python3 ./test.py -f 7-tmq/subscribeDb3.py python3 ./test.py -f 7-tmq/subscribeDb3.py
#python3 ./test.py -f 7-tmq/subscribeDb4.py
python3 ./test.py -f 7-tmq/subscribeStb.py python3 ./test.py -f 7-tmq/subscribeStb.py
python3 ./test.py -f 7-tmq/subscribeStb0.py python3 ./test.py -f 7-tmq/subscribeStb0.py
python3 ./test.py -f 7-tmq/subscribeStb1.py python3 ./test.py -f 7-tmq/subscribeStb1.py

View File

@ -635,6 +635,9 @@ void loop_consume(SThreadInfo* pInfo) {
} }
} }
uint64_t lastPrintTime = taosGetTimestampMs();
uint64_t startTs = taosGetTimestampMs();
int32_t consumeDelay = g_stConfInfo.consumeDelay == -1 ? -1 : (g_stConfInfo.consumeDelay * 1000); int32_t consumeDelay = g_stConfInfo.consumeDelay == -1 ? -1 : (g_stConfInfo.consumeDelay * 1000);
while (running) { while (running) {
TAOS_RES* tmqMsg = tmq_consumer_poll(pInfo->tmq, consumeDelay); TAOS_RES* tmqMsg = tmq_consumer_poll(pInfo->tmq, consumeDelay);
@ -647,6 +650,14 @@ void loop_consume(SThreadInfo* pInfo) {
totalMsgs++; totalMsgs++;
int64_t currentPrintTime = taosGetTimestampMs();
if (currentPrintTime - lastPrintTime > 10 * 1000) {
taosFprintfFile(g_fp,
"consumer id %d has currently poll total msgs: %" PRId64 "\n",
pInfo->consumerId, totalMsgs);
lastPrintTime = currentPrintTime;
}
if (0 == once_flag) { if (0 == once_flag) {
once_flag = 1; once_flag = 1;
notifyMainScript(pInfo, NOTIFY_CMD_START_CONSUM); notifyMainScript(pInfo, NOTIFY_CMD_START_CONSUM);
@ -676,8 +687,6 @@ void loop_consume(SThreadInfo* pInfo) {
} }
void* consumeThreadFunc(void* param) { void* consumeThreadFunc(void* param) {
int32_t totalMsgs = 0;
SThreadInfo* pInfo = (SThreadInfo*)param; SThreadInfo* pInfo = (SThreadInfo*)param;
pInfo->taos = taos_connect(NULL, "root", "taosdata", NULL, 0); pInfo->taos = taos_connect(NULL, "root", "taosdata", NULL, 0);
@ -859,12 +868,27 @@ int main(int32_t argc, char* argv[]) {
(void*)(&(g_stConfInfo.stThreads[i]))); (void*)(&(g_stConfInfo.stThreads[i])));
} }
int64_t start = taosGetTimestampUs();
for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) { for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) {
taosThreadJoin(g_stConfInfo.stThreads[i].thread, NULL); taosThreadJoin(g_stConfInfo.stThreads[i].thread, NULL);
taosThreadClear(&g_stConfInfo.stThreads[i].thread); taosThreadClear(&g_stConfInfo.stThreads[i].thread);
} }
// printf("consumer: %d, cosumer1: %d\n", totalMsgs, pInfo->consumeMsgCnt); int64_t end = taosGetTimestampUs();
int64_t totalMsgs = 0;
for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) {
totalMsgs += g_stConfInfo.stThreads[i].consumeMsgCnt;
}
int64_t t = end - start;
if (0 == t) t = 1;
double tInMs = (double)t / 1000000.0;
taosFprintfFile(g_fp,
"Spent %.4f seconds to poll msgs: %" PRIu64 " with %d thread(s), throughput: %.2f msgs/second\n\n",
tInMs, totalMsgs, g_stConfInfo.numOfThread, (double)(totalMsgs / tInMs));
taosFprintfFile(g_fp, "==== close tmqlog ====\n"); taosFprintfFile(g_fp, "==== close tmqlog ====\n");
taosCloseFile(&g_fp); taosCloseFile(&g_fp);