diff --git a/include/common/ttime.h b/include/common/ttime.h index cd704bb1f7..de55b016cd 100644 --- a/include/common/ttime.h +++ b/include/common/ttime.h @@ -72,6 +72,8 @@ static FORCE_INLINE int64_t taosGetTimestampToday(int32_t precision) { } int64_t taosTimeAdd(int64_t t, int64_t duration, char unit, int32_t precision); +int64_t taosTimeSub(int64_t t, int64_t duration, char unit, int32_t precision); + int64_t taosTimeTruncate(int64_t t, const SInterval* pInterval, int32_t precision); int32_t taosTimeCountInterval(int64_t skey, int64_t ekey, int64_t interval, char unit, int32_t precision); diff --git a/include/libs/scalar/scalar.h b/include/libs/scalar/scalar.h index c81c474366..517c5ff0e6 100644 --- a/include/libs/scalar/scalar.h +++ b/include/libs/scalar/scalar.h @@ -32,6 +32,7 @@ pNode will be freed in API; *pRes need to freed in caller */ int32_t scalarCalculateConstants(SNode *pNode, SNode **pRes); +int32_t scalarCalculateConstantsFromDual(SNode *pNode, SNode **pRes); /* pDst need to freed in caller diff --git a/include/util/taoserror.h b/include/util/taoserror.h index ce434612c3..c057d48875 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -584,7 +584,8 @@ int32_t* taosGetErrno(); #define TSDB_CODE_PAR_INVALID_INTERP_CLAUSE TAOS_DEF_ERROR_CODE(0, 0x265D) #define TSDB_CODE_PAR_NO_VALID_FUNC_IN_WIN TAOS_DEF_ERROR_CODE(0, 0x265E) #define TSDB_CODE_PAR_ONLY_SUPPORT_SINGLE_TABLE TAOS_DEF_ERROR_CODE(0, 0x265F) -#define TSDB_CODE_PAR_INVALID_SMA_INDEX TAOS_DEF_ERROR_CODE(0, 0x265C) +#define TSDB_CODE_PAR_INVALID_SMA_INDEX TAOS_DEF_ERROR_CODE(0, 0x2660) +#define TSDB_CODE_PAR_INVALID_SELECTED_EXPR TAOS_DEF_ERROR_CODE(0, 0x2661) //planner #define TSDB_CODE_PLAN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x2700) diff --git a/source/common/src/ttime.c b/source/common/src/ttime.c index befb0abac8..d728bbe49e 100644 --- a/source/common/src/ttime.c +++ b/source/common/src/ttime.c @@ -710,6 +710,32 @@ int64_t taosTimeAdd(int64_t t, int64_t duration, char unit, int32_t precision) { return (int64_t)(taosMktime(&tm) * TSDB_TICK_PER_SECOND(precision)); } +int64_t taosTimeSub(int64_t t, int64_t duration, char unit, int32_t precision) { + if (duration == 0) { + return t; + } + + if (unit != 'n' && unit != 'y') { + return t - duration; + } + + // The following code handles the y/n time duration + int64_t numOfMonth = duration; + if (unit == 'y') { + numOfMonth *= 12; + } + + struct tm tm; + time_t tt = (time_t)(t / TSDB_TICK_PER_SECOND(precision)); + taosLocalTime(&tt, &tm); + int32_t mon = tm.tm_year * 12 + tm.tm_mon - (int32_t)numOfMonth; + tm.tm_year = mon / 12; + tm.tm_mon = mon % 12; + + return (int64_t)(taosMktime(&tm) * TSDB_TICK_PER_SECOND(precision)); +} + + int32_t taosTimeCountInterval(int64_t skey, int64_t ekey, int64_t interval, char unit, int32_t precision) { if (ekey < skey) { int64_t tmp = ekey; diff --git a/source/libs/command/src/command.c b/source/libs/command/src/command.c index a2816209a9..034778e5bf 100644 --- a/source/libs/command/src/command.c +++ b/source/libs/command/src/command.c @@ -570,10 +570,14 @@ int32_t buildSelectResultDataBlock(SNodeList* pProjects, SSDataBlock* pBlock) { int32_t index = 0; SNode* pProj = NULL; FOREACH(pProj, pProjects) { - if (((SValueNode*)pProj)->isNull) { - colDataAppend(taosArrayGet(pBlock->pDataBlock, index++), 0, NULL, true); + if (QUERY_NODE_VALUE != nodeType(pProj)) { + return TSDB_CODE_PAR_INVALID_SELECTED_EXPR; } else { - colDataAppend(taosArrayGet(pBlock->pDataBlock, index++), 0, nodesGetValueFromNode((SValueNode*)pProj), false); + if (((SValueNode*)pProj)->isNull) { + colDataAppend(taosArrayGet(pBlock->pDataBlock, index++), 0, NULL, true); + } else { + colDataAppend(taosArrayGet(pBlock->pDataBlock, index++), 0, nodesGetValueFromNode((SValueNode*)pProj), false); + } } } diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index 8d9cac3614..9795907404 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -621,7 +621,9 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData break; } } - + if (pInfo->groupSort) { + pInfo->hasGroupId = false; + } if (p->info.rows > 0) { // todo extract method blockDataEnsureCapacity(pDataBlock, p->info.rows); int32_t numOfCols = taosArrayGetSize(pColMatchInfo); diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 3ae9ef38c9..e31ae3c4c8 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -4507,13 +4507,14 @@ static SSDataBlock* doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) { setInputDataBlock(pOperator, pSup->pCtx, pBlock, iaInfo->order, scanFlag, true); doMergeAlignedIntervalAggImpl(pOperator, &iaInfo->binfo.resultRowInfo, pBlock, scanFlag, pRes); doFilter(miaInfo->pCondition, pRes); - if (pRes->info.rows > 0) { + if (pRes->info.rows >= pOperator->resultInfo.capacity) { break; } } pRes->info.groupId = miaInfo->groupId; } + miaInfo->hasGroupId = false; if (miaInfo->inputBlocksFinished) { doSetOperatorCompleted(pOperator); diff --git a/source/libs/parser/src/parCalcConst.c b/source/libs/parser/src/parCalcConst.c index b799ae5fb1..68a60e0b35 100644 --- a/source/libs/parser/src/parCalcConst.c +++ b/source/libs/parser/src/parCalcConst.c @@ -166,7 +166,7 @@ static int32_t calcConstStmtCondition(SCalcConstContext* pCxt, SNode** pCond, bo return code; } -static int32_t calcConstProject(SNode* pProject, SNode** pNew) { +static int32_t calcConstProject(SNode* pProject, bool dual, SNode** pNew) { SArray* pAssociation = NULL; if (NULL != ((SExprNode*)pProject)->pAssociation) { pAssociation = taosArrayDup(((SExprNode*)pProject)->pAssociation); @@ -177,7 +177,12 @@ static int32_t calcConstProject(SNode* pProject, SNode** pNew) { char aliasName[TSDB_COL_NAME_LEN] = {0}; strcpy(aliasName, ((SExprNode*)pProject)->aliasName); - int32_t code = scalarCalculateConstants(pProject, pNew); + int32_t code = TSDB_CODE_SUCCESS; + if (dual) { + code = scalarCalculateConstantsFromDual(pProject, pNew); + } else { + code = scalarCalculateConstants(pProject, pNew); + } if (TSDB_CODE_SUCCESS == code && QUERY_NODE_VALUE == nodeType(*pNew) && NULL != pAssociation) { strcpy(((SExprNode*)*pNew)->aliasName, aliasName); int32_t size = taosArrayGetSize(pAssociation); @@ -223,7 +228,7 @@ static int32_t calcConstProjections(SCalcConstContext* pCxt, SSelectStmt* pSelec continue; } SNode* pNew = NULL; - int32_t code = calcConstProject(pProj, &pNew); + int32_t code = calcConstProject(pProj, (NULL == pSelect->pFromTable), &pNew); if (TSDB_CODE_SUCCESS == code) { REPLACE_NODE(pNew); } else { diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index ca000fcf2d..7798d9578c 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -916,8 +916,6 @@ static EDealRes translateValueImpl(STranslateContext* pCxt, SValueNode* pVal, SD } if (TSDB_DATA_TYPE_NULL == pVal->node.resType.type) { - // TODO - // pVal->node.resType = targetDt; pVal->translate = true; pVal->isNull = true; return DEAL_RES_CONTINUE; @@ -932,6 +930,7 @@ static EDealRes translateValueImpl(STranslateContext* pCxt, SValueNode* pVal, SD res = translateNormalValue(pCxt, pVal, targetDt, strict); } pVal->node.resType = targetDt; + pVal->node.resType.scale = pVal->unit; pVal->translate = true; return res; } diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index 855427f26e..ad75720892 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -124,12 +124,15 @@ static bool scanPathOptMayBeOptimized(SLogicNode* pNode) { QUERY_NODE_LOGIC_PLAN_PARTITION != nodeType(pNode->pParent))) { return false; } - if (QUERY_NODE_LOGIC_PLAN_WINDOW == nodeType(pNode->pParent) || + if ((QUERY_NODE_LOGIC_PLAN_WINDOW == nodeType(pNode->pParent) && WINDOW_TYPE_INTERVAL == ((SWindowLogicNode*)pNode->pParent)->winType) || (QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pNode->pParent) && pNode->pParent->pParent && - QUERY_NODE_LOGIC_PLAN_WINDOW == nodeType(pNode->pParent->pParent))) { + QUERY_NODE_LOGIC_PLAN_WINDOW == nodeType(pNode->pParent->pParent) && WINDOW_TYPE_INTERVAL == ((SWindowLogicNode*)pNode->pParent)->winType)) { return true; } - return !scanPathOptHaveNormalCol(((SAggLogicNode*)pNode->pParent)->pGroupKeys); + if (QUERY_NODE_LOGIC_PLAN_AGG == nodeType(pNode->pParent)) { + return !scanPathOptHaveNormalCol(((SAggLogicNode*)pNode->pParent)->pGroupKeys); + } + return false; } static SNodeList* scanPathOptGetAllFuncs(SLogicNode* pNode) { diff --git a/source/libs/scalar/inc/sclInt.h b/source/libs/scalar/inc/sclInt.h index 4422da1b81..d423b92da7 100644 --- a/source/libs/scalar/inc/sclInt.h +++ b/source/libs/scalar/inc/sclInt.h @@ -30,6 +30,7 @@ typedef struct SOperatorValueType { typedef struct SScalarCtx { int32_t code; + bool dual; SArray *pBlockList; /* element is SSDataBlock* */ SHashObj *pRes; /* element is SScalarParam */ void *param; // additional parameter (meta actually) for acquire value such as tbname/tags values diff --git a/source/libs/scalar/src/scalar.c b/source/libs/scalar/src/scalar.c index e610fcb62e..bdfc411fa6 100644 --- a/source/libs/scalar/src/scalar.c +++ b/source/libs/scalar/src/scalar.c @@ -1010,13 +1010,14 @@ int32_t sclExtendResRows(SScalarParam *pDst, SScalarParam *pSrc, SArray *pBlockL return TSDB_CODE_SUCCESS; } -int32_t scalarCalculateConstants(SNode *pNode, SNode **pRes) { +int32_t sclCalcConstants(SNode *pNode, bool dual, SNode **pRes) { if (NULL == pNode) { SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } int32_t code = 0; SScalarCtx ctx = {0}; + ctx.dual = dual; ctx.pRes = taosHashInit(SCL_DEFAULT_OP_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); if (NULL == ctx.pRes) { sclError("taosHashInit failed, num:%d", SCL_DEFAULT_OP_NUM); @@ -1028,10 +1029,88 @@ int32_t scalarCalculateConstants(SNode *pNode, SNode **pRes) { *pRes = pNode; _return: + sclFreeRes(ctx.pRes); return code; } +static int32_t sclGetMinusOperatorResType(SOperatorNode* pOp) { + if (!IS_MATHABLE_TYPE(((SExprNode*)(pOp->pLeft))->resType.type)) { + return TSDB_CODE_TSC_INVALID_OPERATION; + } + pOp->node.resType.type = TSDB_DATA_TYPE_DOUBLE; + pOp->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_DOUBLE].bytes; + return TSDB_CODE_SUCCESS; +} + +static int32_t sclGetMathOperatorResType(SOperatorNode* pOp) { + SDataType ldt = ((SExprNode*)(pOp->pLeft))->resType; + SDataType rdt = ((SExprNode*)(pOp->pRight))->resType; + if ((TSDB_DATA_TYPE_TIMESTAMP == ldt.type && TSDB_DATA_TYPE_TIMESTAMP == rdt.type) || + (TSDB_DATA_TYPE_TIMESTAMP == ldt.type && (IS_VAR_DATA_TYPE(rdt.type) || IS_FLOAT_TYPE(rdt.type))) || + (TSDB_DATA_TYPE_TIMESTAMP == rdt.type && (IS_VAR_DATA_TYPE(ldt.type) || IS_FLOAT_TYPE(ldt.type)))) { + return TSDB_CODE_TSC_INVALID_OPERATION; + } + + if ((TSDB_DATA_TYPE_TIMESTAMP == ldt.type && IS_INTEGER_TYPE(rdt.type)) || + (TSDB_DATA_TYPE_TIMESTAMP == rdt.type && IS_INTEGER_TYPE(ldt.type)) || + (TSDB_DATA_TYPE_TIMESTAMP == ldt.type && TSDB_DATA_TYPE_BOOL == rdt.type) || + (TSDB_DATA_TYPE_TIMESTAMP == rdt.type && TSDB_DATA_TYPE_BOOL == ldt.type)) { + pOp->node.resType.type = TSDB_DATA_TYPE_TIMESTAMP; + pOp->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes; + } else { + pOp->node.resType.type = TSDB_DATA_TYPE_DOUBLE; + pOp->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_DOUBLE].bytes; + } + return TSDB_CODE_SUCCESS; +} + +static int32_t sclGetCompOperatorResType(SOperatorNode* pOp) { + SDataType ldt = ((SExprNode*)(pOp->pLeft))->resType; + if (OP_TYPE_IN == pOp->opType || OP_TYPE_NOT_IN == pOp->opType) { + ((SExprNode*)(pOp->pRight))->resType = ldt; + } else if (nodesIsRegularOp(pOp)) { + SDataType rdt = ((SExprNode*)(pOp->pRight))->resType; + if (!IS_VAR_DATA_TYPE(ldt.type) || QUERY_NODE_VALUE != nodeType(pOp->pRight) || + (!IS_STR_DATA_TYPE(rdt.type) && (rdt.type != TSDB_DATA_TYPE_NULL))) { + return TSDB_CODE_TSC_INVALID_OPERATION; + } + } + pOp->node.resType.type = TSDB_DATA_TYPE_BOOL; + pOp->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_BOOL].bytes; + return TSDB_CODE_SUCCESS; +} + +static int32_t sclGetJsonOperatorResType(SOperatorNode* pOp) { + SDataType ldt = ((SExprNode*)(pOp->pLeft))->resType; + SDataType rdt = ((SExprNode*)(pOp->pRight))->resType; + if (TSDB_DATA_TYPE_JSON != ldt.type || !IS_STR_DATA_TYPE(rdt.type)) { + return TSDB_CODE_TSC_INVALID_OPERATION; + } + if (pOp->opType == OP_TYPE_JSON_GET_VALUE) { + pOp->node.resType.type = TSDB_DATA_TYPE_JSON; + } else if (pOp->opType == OP_TYPE_JSON_CONTAINS) { + pOp->node.resType.type = TSDB_DATA_TYPE_BOOL; + } + pOp->node.resType.bytes = tDataTypes[pOp->node.resType.type].bytes; + return TSDB_CODE_SUCCESS; +} + +static int32_t sclGetBitwiseOperatorResType(SOperatorNode* pOp) { + pOp->node.resType.type = TSDB_DATA_TYPE_BIGINT; + pOp->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes; + return TSDB_CODE_SUCCESS; +} + + +int32_t scalarCalculateConstants(SNode *pNode, SNode **pRes) { + return sclCalcConstants(pNode, false, pRes); +} + +int32_t scalarCalculateConstantsFromDual(SNode *pNode, SNode **pRes) { + return sclCalcConstants(pNode, true, pRes); +} + int32_t scalarCalculate(SNode *pNode, SArray *pBlockList, SScalarParam *pDst) { if (NULL == pNode || NULL == pBlockList) { SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); @@ -1075,74 +1154,6 @@ _return: return code; } -static int32_t getMinusOperatorResultType(SOperatorNode* pOp) { - if (!IS_MATHABLE_TYPE(((SExprNode*)(pOp->pLeft))->resType.type)) { - return TSDB_CODE_TSC_INVALID_OPERATION; - } - pOp->node.resType.type = TSDB_DATA_TYPE_DOUBLE; - pOp->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_DOUBLE].bytes; - return TSDB_CODE_SUCCESS; -} - -static int32_t getArithmeticOperatorResultType(SOperatorNode* pOp) { - SDataType ldt = ((SExprNode*)(pOp->pLeft))->resType; - SDataType rdt = ((SExprNode*)(pOp->pRight))->resType; - if ((TSDB_DATA_TYPE_TIMESTAMP == ldt.type && TSDB_DATA_TYPE_TIMESTAMP == rdt.type) || - (TSDB_DATA_TYPE_TIMESTAMP == ldt.type && (IS_VAR_DATA_TYPE(rdt.type) || IS_FLOAT_TYPE(rdt.type))) || - (TSDB_DATA_TYPE_TIMESTAMP == rdt.type && (IS_VAR_DATA_TYPE(ldt.type) || IS_FLOAT_TYPE(ldt.type)))) { - return TSDB_CODE_TSC_INVALID_OPERATION; - } - - if ((TSDB_DATA_TYPE_TIMESTAMP == ldt.type && IS_INTEGER_TYPE(rdt.type)) || - (TSDB_DATA_TYPE_TIMESTAMP == rdt.type && IS_INTEGER_TYPE(ldt.type)) || - (TSDB_DATA_TYPE_TIMESTAMP == ldt.type && TSDB_DATA_TYPE_BOOL == rdt.type) || - (TSDB_DATA_TYPE_TIMESTAMP == rdt.type && TSDB_DATA_TYPE_BOOL == ldt.type)) { - pOp->node.resType.type = TSDB_DATA_TYPE_TIMESTAMP; - pOp->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes; - } else { - pOp->node.resType.type = TSDB_DATA_TYPE_DOUBLE; - pOp->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_DOUBLE].bytes; - } - return TSDB_CODE_SUCCESS; -} - -static int32_t getComparisonOperatorResultType(SOperatorNode* pOp) { - SDataType ldt = ((SExprNode*)(pOp->pLeft))->resType; - if (OP_TYPE_IN == pOp->opType || OP_TYPE_NOT_IN == pOp->opType) { - ((SExprNode*)(pOp->pRight))->resType = ldt; - } else if (nodesIsRegularOp(pOp)) { - SDataType rdt = ((SExprNode*)(pOp->pRight))->resType; - if (!IS_VAR_DATA_TYPE(ldt.type) || QUERY_NODE_VALUE != nodeType(pOp->pRight) || - (!IS_STR_DATA_TYPE(rdt.type) && (rdt.type != TSDB_DATA_TYPE_NULL))) { - return TSDB_CODE_TSC_INVALID_OPERATION; - } - } - pOp->node.resType.type = TSDB_DATA_TYPE_BOOL; - pOp->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_BOOL].bytes; - return TSDB_CODE_SUCCESS; -} - -static int32_t getJsonOperatorResultType(SOperatorNode* pOp) { - SDataType ldt = ((SExprNode*)(pOp->pLeft))->resType; - SDataType rdt = ((SExprNode*)(pOp->pRight))->resType; - if (TSDB_DATA_TYPE_JSON != ldt.type || !IS_STR_DATA_TYPE(rdt.type)) { - return TSDB_CODE_TSC_INVALID_OPERATION; - } - if (pOp->opType == OP_TYPE_JSON_GET_VALUE) { - pOp->node.resType.type = TSDB_DATA_TYPE_JSON; - } else if (pOp->opType == OP_TYPE_JSON_CONTAINS) { - pOp->node.resType.type = TSDB_DATA_TYPE_BOOL; - } - pOp->node.resType.bytes = tDataTypes[pOp->node.resType.type].bytes; - return TSDB_CODE_SUCCESS; -} - -static int32_t getBitwiseOperatorResultType(SOperatorNode* pOp) { - pOp->node.resType.type = TSDB_DATA_TYPE_BIGINT; - pOp->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes; - return TSDB_CODE_SUCCESS; -} - int32_t scalarGetOperatorResultType(SOperatorNode* pOp) { if (TSDB_DATA_TYPE_BLOB == ((SExprNode*)(pOp->pLeft))->resType.type || (NULL != pOp->pRight && TSDB_DATA_TYPE_BLOB == ((SExprNode*)(pOp->pRight))->resType.type)) { @@ -1155,15 +1166,15 @@ int32_t scalarGetOperatorResultType(SOperatorNode* pOp) { case OP_TYPE_MULTI: case OP_TYPE_DIV: case OP_TYPE_REM: - return getArithmeticOperatorResultType(pOp); + return sclGetMathOperatorResType(pOp); case OP_TYPE_MINUS: - return getMinusOperatorResultType(pOp); + return sclGetMinusOperatorResType(pOp); case OP_TYPE_ASSIGN: pOp->node.resType = ((SExprNode*)(pOp->pLeft))->resType; break; case OP_TYPE_BIT_AND: case OP_TYPE_BIT_OR: - return getBitwiseOperatorResultType(pOp); + return sclGetBitwiseOperatorResType(pOp); case OP_TYPE_GREATER_THAN: case OP_TYPE_GREATER_EQUAL: case OP_TYPE_LOWER_THAN: @@ -1184,10 +1195,10 @@ int32_t scalarGetOperatorResultType(SOperatorNode* pOp) { case OP_TYPE_NMATCH: case OP_TYPE_IN: case OP_TYPE_NOT_IN: - return getComparisonOperatorResultType(pOp); + return sclGetCompOperatorResType(pOp); case OP_TYPE_JSON_GET_VALUE: case OP_TYPE_JSON_CONTAINS: - return getJsonOperatorResultType(pOp); + return sclGetJsonOperatorResType(pOp); default: break; } diff --git a/source/libs/scalar/src/sclvector.c b/source/libs/scalar/src/sclvector.c index bf457d07eb..39b2f04f3e 100644 --- a/source/libs/scalar/src/sclvector.c +++ b/source/libs/scalar/src/sclvector.c @@ -1055,7 +1055,7 @@ static void vectorMathAddHelper(SColumnInfoData* pLeftCol, SColumnInfoData* pRig } } -static void vectorMathBigintAddHelper(SColumnInfoData* pLeftCol, SColumnInfoData* pRightCol, SColumnInfoData* pOutputCol, int32_t numOfRows, int32_t step, int32_t i) { +static void vectorMathTsAddHelper(SColumnInfoData* pLeftCol, SColumnInfoData* pRightCol, SColumnInfoData* pOutputCol, int32_t numOfRows, int32_t step, int32_t i) { _getBigintValue_fn_t getVectorBigintValueFnLeft = getVectorBigintValueFn(pLeftCol->info.type); _getBigintValue_fn_t getVectorBigintValueFnRight = getVectorBigintValueFn(pRightCol->info.type); @@ -1069,7 +1069,8 @@ static void vectorMathBigintAddHelper(SColumnInfoData* pLeftCol, SColumnInfoData colDataAppendNULL(pOutputCol, i); continue; // TODO set null or ignore } - *output = getVectorBigintValueFnLeft(pLeftCol->pData, i) + getVectorBigintValueFnRight(pRightCol->pData, 0); + *output = taosTimeAdd(getVectorBigintValueFnLeft(pLeftCol->pData, i), getVectorBigintValueFnRight(pRightCol->pData, 0), + pRightCol->info.scale, pRightCol->info.precision); } } } @@ -1116,7 +1117,17 @@ void vectorMathAdd(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam *pOut _getBigintValue_fn_t getVectorBigintValueFnLeft = getVectorBigintValueFn(pLeftCol->info.type); _getBigintValue_fn_t getVectorBigintValueFnRight = getVectorBigintValueFn(pRightCol->info.type); - if (pLeft->numOfRows == pRight->numOfRows) { + if (pLeft->numOfRows == 1 && pRight->numOfRows == 1) { + if (GET_PARAM_TYPE(pLeft) == TSDB_DATA_TYPE_TIMESTAMP) { + vectorMathTsAddHelper(pLeftCol, pRightCol, pOutputCol, pRight->numOfRows, step, i); + } else { + vectorMathTsAddHelper(pRightCol, pLeftCol, pOutputCol, pRight->numOfRows, step, i); + } + } else if (pLeft->numOfRows == 1) { + vectorMathTsAddHelper(pRightCol, pLeftCol, pOutputCol, pRight->numOfRows, step, i); + } else if (pRight->numOfRows == 1) { + vectorMathTsAddHelper(pLeftCol, pRightCol, pOutputCol, pLeft->numOfRows, step, i); + } else if (pLeft->numOfRows == pRight->numOfRows) { for (; i < pRight->numOfRows && i >= 0; i += step, output += 1) { if (IS_NULL) { colDataAppendNULL(pOutputCol, i); @@ -1124,11 +1135,7 @@ void vectorMathAdd(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam *pOut } *output = getVectorBigintValueFnLeft(pLeftCol->pData, i) + getVectorBigintValueFnRight(pRightCol->pData, i); } - } else if (pLeft->numOfRows == 1) { - vectorMathBigintAddHelper(pRightCol, pLeftCol, pOutputCol, pRight->numOfRows, step, i); - } else if (pRight->numOfRows == 1) { - vectorMathBigintAddHelper(pLeftCol, pRightCol, pOutputCol, pLeft->numOfRows, step, i); - } + } } else { double *output = (double *)pOutputCol->pData; _getDoubleValue_fn_t getVectorDoubleValueFnLeft = getVectorDoubleValueFn(pLeftCol->info.type); @@ -1174,7 +1181,7 @@ static void vectorMathSubHelper(SColumnInfoData* pLeftCol, SColumnInfoData* pRig } } -static void vectorMathBigintSubHelper(SColumnInfoData* pLeftCol, SColumnInfoData* pRightCol, SColumnInfoData* pOutputCol, int32_t numOfRows, int32_t step, int32_t factor, int32_t i) { +static void vectorMathTsSubHelper(SColumnInfoData* pLeftCol, SColumnInfoData* pRightCol, SColumnInfoData* pOutputCol, int32_t numOfRows, int32_t step, int32_t factor, int32_t i) { _getBigintValue_fn_t getVectorBigintValueFnLeft = getVectorBigintValueFn(pLeftCol->info.type); _getBigintValue_fn_t getVectorBigintValueFnRight = getVectorBigintValueFn(pRightCol->info.type); @@ -1188,7 +1195,9 @@ static void vectorMathBigintSubHelper(SColumnInfoData* pLeftCol, SColumnInfoData colDataAppendNULL(pOutputCol, i); continue; // TODO set null or ignore } - *output = (getVectorBigintValueFnLeft(pLeftCol->pData, i) - getVectorBigintValueFnRight(pRightCol->pData, 0)) * factor; + *output = taosTimeSub(getVectorBigintValueFnLeft(pLeftCol->pData, i), getVectorBigintValueFnRight(pRightCol->pData, 0), + pRightCol->info.scale, pRightCol->info.precision); + } } } @@ -1211,7 +1220,13 @@ void vectorMathSub(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam *pOut _getBigintValue_fn_t getVectorBigintValueFnLeft = getVectorBigintValueFn(pLeftCol->info.type); _getBigintValue_fn_t getVectorBigintValueFnRight = getVectorBigintValueFn(pRightCol->info.type); - if (pLeft->numOfRows == pRight->numOfRows) { + if (pLeft->numOfRows == 1 && pRight->numOfRows == 1) { + vectorMathTsSubHelper(pLeftCol, pRightCol, pOutputCol, pLeft->numOfRows, step, 1, i); + } else if (pLeft->numOfRows == 1) { + vectorMathTsSubHelper(pRightCol, pLeftCol, pOutputCol, pRight->numOfRows, step, -1, i); + } else if (pRight->numOfRows == 1) { + vectorMathTsSubHelper(pLeftCol, pRightCol, pOutputCol, pLeft->numOfRows, step, 1, i); + } else if (pLeft->numOfRows == pRight->numOfRows) { for (; i < pRight->numOfRows && i >= 0; i += step, output += 1) { if (IS_NULL) { colDataAppendNULL(pOutputCol, i); @@ -1219,10 +1234,6 @@ void vectorMathSub(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam *pOut } *output = getVectorBigintValueFnLeft(pLeftCol->pData, i) - getVectorBigintValueFnRight(pRightCol->pData, i); } - } else if (pLeft->numOfRows == 1) { - vectorMathBigintSubHelper(pRightCol, pLeftCol, pOutputCol, pRight->numOfRows, step, -1, i); - } else if (pRight->numOfRows == 1) { - vectorMathBigintSubHelper(pLeftCol, pRightCol, pOutputCol, pLeft->numOfRows, step, 1, i); } } else { double *output = (double *)pOutputCol->pData; diff --git a/source/util/src/terror.c b/source/util/src/terror.c index ef6697b3b5..2364c53a9a 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -561,6 +561,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_COL_JSON, "Only tag can be jso TAOS_DEFINE_ERROR(TSDB_CODE_PAR_VALUE_TOO_LONG, "Value too long for column/tag") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_DELETE_WHERE, "The DELETE statement must have a definite time window range") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_REDISTRIBUTE_VG, "The REDISTRIBUTE VGROUP statement only support 1 to 3 dnodes") +TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_SELECTED_EXPR, "Invalid SELECTed expression") //planner TAOS_DEFINE_ERROR(TSDB_CODE_PLAN_INTERNAL_ERROR, "Planner internal error") diff --git a/tests/script/jenkins/basic.txt b/tests/script/jenkins/basic.txt index c88b5e2b78..dad4dbbe29 100644 --- a/tests/script/jenkins/basic.txt +++ b/tests/script/jenkins/basic.txt @@ -203,6 +203,7 @@ # --- scalar ./test.sh -f tsim/scalar/in.sim +./test.sh -f tsim/scalar/scalar.sim # ---- alter ./test.sh -f tsim/alter/cached_schema_after_alter.sim diff --git a/tests/script/tsim/scalar/scalar.sim b/tests/script/tsim/scalar/scalar.sim new file mode 100644 index 0000000000..32224e33ba --- /dev/null +++ b/tests/script/tsim/scalar/scalar.sim @@ -0,0 +1,67 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/exec.sh -n dnode1 -s start +sql connect + +print ======== step1 +sql drop database if exists db1; +sql create database db1 vgroups 3; +sql use db1; +sql create stable st1 (fts timestamp, fbool bool, ftiny tinyint, fsmall smallint, fint int, fbig bigint, futiny tinyint unsigned, fusmall smallint unsigned, fuint int unsigned, fubig bigint unsigned, ffloat float, fdouble double, fbin binary(10), fnchar nchar(10)) tags(tts timestamp, tbool bool, ttiny tinyint, tsmall smallint, tint int, tbig bigint, tutiny tinyint unsigned, tusmall smallint unsigned, tuint int unsigned, tubig bigint unsigned, tfloat float, tdouble double, tbin binary(10), tnchar nchar(10)); +sql create table tb1 using st1 tags('2022-07-10 16:31:00', true, 1, 1, 1, 1, 1, 1, 1, 1, 1.0, 1.0, 'a', 'a'); +sql create table tb2 using st1 tags('2022-07-10 16:32:00', false, 2, 2, 2, 2, 2, 2, 2, 2, 2.0, 2.0, 'b', 'b'); +sql create table tb3 using st1 tags('2022-07-10 16:33:00', true, 3, 3, 3, 3, 3, 3, 3, 3, 3.0, 3.0, 'c', 'c'); + +sql insert into tb1 values ('2022-07-10 16:31:01', false, 1, 1, 1, 1, 1, 1, 1, 1, 1.0, 1.0, 'a', 'a'); +sql insert into tb1 values ('2022-07-10 16:31:02', true, 2, 2, 2, 2, 2, 2, 2, 2, 2.0, 2.0, 'b', 'b'); +sql insert into tb1 values ('2022-07-10 16:31:03', false, 3, 3, 3, 3, 3, 3, 3, 3, 3.0, 3.0, 'c', 'c'); +sql insert into tb1 values ('2022-07-10 16:31:04', true, 4, 4, 4, 4, 4, 4, 4, 4, 4.0, 4.0, 'd', 'd'); +sql insert into tb1 values ('2022-07-10 16:31:05', false, 5, 5, 5, 5, 5, 5, 5, 5, 5.0, 5.0, 'e', 'e'); + +sql insert into tb2 values ('2022-07-10 16:32:01', false, 1, 1, 1, 1, 1, 1, 1, 1, 1.0, 1.0, 'a', 'a'); +sql insert into tb2 values ('2022-07-10 16:32:02', true, 2, 2, 2, 2, 2, 2, 2, 2, 2.0, 2.0, 'b', 'b'); +sql insert into tb2 values ('2022-07-10 16:32:03', false, 3, 3, 3, 3, 3, 3, 3, 3, 3.0, 3.0, 'c', 'c'); +sql insert into tb2 values ('2022-07-10 16:32:04', true, 4, 4, 4, 4, 4, 4, 4, 4, 4.0, 4.0, 'd', 'd'); +sql insert into tb2 values ('2022-07-10 16:32:05', false, 5, 5, 5, 5, 5, 5, 5, 5, 5.0, 5.0, 'e', 'e'); + +sql insert into tb3 values ('2022-07-10 16:33:01', false, 1, 1, 1, 1, 1, 1, 1, 1, 1.0, 1.0, 'a', 'a'); +sql insert into tb3 values ('2022-07-10 16:33:02', true, 2, 2, 2, 2, 2, 2, 2, 2, 2.0, 2.0, 'b', 'b'); +sql insert into tb3 values ('2022-07-10 16:33:03', false, 3, 3, 3, 3, 3, 3, 3, 3, 3.0, 3.0, 'c', 'c'); +sql insert into tb3 values ('2022-07-10 16:33:04', true, 4, 4, 4, 4, 4, 4, 4, 4, 4.0, 4.0, 'd', 'd'); +sql insert into tb3 values ('2022-07-10 16:33:05', false, 5, 5, 5, 5, 5, 5, 5, 5, 5.0, 5.0, 'e', 'e'); + +sql select 1+1n; +if $rows != 1 then + return -1 +endi +if $data00 != 2.000000000 then + return -1 +endi + + +sql select cast(1 as timestamp)+1n; +if $rows != 1 then + return -1 +endi +if $data00 != @70-02-01 08:00:00.000@ then + return -1 +endi + +sql select 1-1n; +if $rows != 1 then + return -1 +endi + +sql select cast(1 as timestamp)-1y; +if $rows != 1 then + return -1 +endi +if $data00 != @69-01-01 08:00:00.000@ then + return -1 +endi + +sql select 1n-now(); +sql select 1n+now(); +#sql select avg(4n); + +system sh/exec.sh -n dnode1 -s stop -x SIGINT diff --git a/tests/script/tsim/stream/sliding.sim b/tests/script/tsim/stream/sliding.sim new file mode 100644 index 0000000000..750be7cb49 --- /dev/null +++ b/tests/script/tsim/stream/sliding.sim @@ -0,0 +1,243 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/exec.sh -n dnode1 -s start +sleep 50 +sql connect + +print =============== create database +sql create database test vgroups 1 +sql show databases +if $rows != 3 then + return -1 +endi + +print $data00 $data01 $data02 + +sql use test +sql create stable st(ts timestamp, a int, b int, c int, d double) tags(ta int,tb int,tc int); +sql create table t1 using st tags(1,1,1); +sql create table t2 using st tags(2,2,2); +sql create stream streams1 trigger at_once into streamt as select _wstartts, count(*) c1, sum(a) c3 , max(b) c4, min(c) c5 from t1 interval(10s) sliding (5s); +sql create stream streams2 trigger at_once watermark 1d into streamt2 as select _wstartts, count(*) c1, sum(a) c3 , max(b) c4, min(c) c5 from t1 interval(10s) sliding (5s); +sql create stream stream_t1 trigger at_once into streamtST as select _wstartts, count(*) c1, sum(a) c3 , max(b) c4, min(c) c5 from st interval(10s) sliding (5s); +sql create stream stream_t2 trigger at_once watermark 1d into streamtST2 as select _wstartts, count(*) c1, sum(a) c3 , max(b) c4, min(c) c5 from st interval(10s) sliding (5s); + +sql insert into t1 values(1648791210000,1,2,3,1.0); +sql insert into t1 values(1648791216000,2,2,3,1.1); +sql insert into t1 values(1648791220000,3,2,3,2.1); + +sql insert into t1 values(1648791210000,1,2,3,1.0); +sql insert into t1 values(1648791216000,2,2,3,1.1); +sql insert into t1 values(1648791220000,3,2,3,2.1); + +sql insert into t2 values(1648791210000,1,2,3,1.0); +sql insert into t2 values(1648791216000,2,2,3,1.1); +sql insert into t2 values(1648791220000,3,2,3,2.1); + +sql insert into t2 values(1648791210000,1,2,3,1.0); +sql insert into t2 values(1648791216000,2,2,3,1.1); +sql insert into t2 values(1648791220000,3,2,3,2.1); + +$loop_count = 0 + +loop0: +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + + +sql select * from streamt + +# row 0 +if $data01 != 1 then + print =====data01=$data01 + goto loop0 +endi + +if $data02 != 1 then + print =====data02=$data02 + goto loop0 +endi + +# row 1 +if $data11 != 2 then + print =====data11=$data11 + goto loop0 +endi + +if $data12 != 3 then + print =====data12=$data12 + goto loop0 +endi + +# row 2 +if $data21 != 2 then + print =====data21=$data21 + goto loop0 +endi + +if $data22 != 5 then + print =====data22=$data22 + goto loop0 +endi + +# row 3 +if $data31 != 1 then + print =====data31=$data31 + goto loop0 +endi + +if $data32 != 3 then + print =====data32=$data32 + goto loop0 +endi + +print step 1 + +sql select * from streamt2 + +# row 0 +if $data01 != 1 then + print =====data01=$data01 + goto loop0 +endi + +if $data02 != 1 then + print =====data02=$data02 + goto loop0 +endi + +# row 1 +if $data11 != 2 then + print =====data11=$data11 + goto loop0 +endi + +if $data12 != 3 then + print =====data12=$data12 + goto loop0 +endi + +# row 2 +if $data21 != 2 then + print =====data21=$data21 + goto loop0 +endi + +if $data22 != 5 then + print =====data22=$data22 + goto loop0 +endi + +# row 3 +if $data31 != 1 then + print =====data31=$data31 + goto loop0 +endi + +if $data32 != 3 then + print =====data32=$data32 + goto loop0 +endi + +print step 2 + +sql select * from streamtST + +# row 0 +if $data01 != 2 then + print =====data01=$data01 + goto loop0 +endi + +if $data02 != 2 then + print =====data02=$data02 + goto loop0 +endi + +# row 1 +if $data11 != 4 then + print =====data11=$data11 + goto loop0 +endi + +if $data12 != 6 then + print =====data12=$data12 + goto loop0 +endi + +# row 2 +if $data21 != 4 then + print =====data21=$data21 + goto loop0 +endi + +if $data22 != 10 then + print =====data22=$data22 + goto loop0 +endi + +# row 3 +if $data31 != 2 then + print =====data31=$data31 + goto loop0 +endi + +if $data32 != 6 then + print =====data32=$data32 + goto loop0 +endi + +print step 3 + +sql select * from streamtST2 + +# row 0 +if $data01 != 2 then + print =====data01=$data01 + goto loop0 +endi + +if $data02 != 2 then + print =====data02=$data02 + goto loop0 +endi + +# row 1 +if $data11 != 4 then + print =====data11=$data11 + goto loop0 +endi + +if $data12 != 6 then + print =====data12=$data12 + goto loop0 +endi + +# row 2 +if $data21 != 4 then + print =====data21=$data21 + goto loop0 +endi + +if $data22 != 10 then + print =====data22=$data22 + goto loop0 +endi + +# row 3 +if $data31 != 2 then + print =====data31=$data31 + goto loop0 +endi + +if $data32 != 6 then + print =====data32=$data32 + goto loop0 +endi + + +system sh/stop_dnodes.sh \ No newline at end of file diff --git a/tests/system-test/7-tmq/tmqUpdate-1ctb.py b/tests/system-test/7-tmq/tmqUpdate-1ctb.py index 3cb364f91d..8513092be9 100644 --- a/tests/system-test/7-tmq/tmqUpdate-1ctb.py +++ b/tests/system-test/7-tmq/tmqUpdate-1ctb.py @@ -19,7 +19,7 @@ class TDTestCase: self.snapshot = 0 self.vgroups = 4 self.ctbNum = 1 - self.rowsPerTbl = 100000 + self.rowsPerTbl = 10000 def init(self, conn, logSql): tdLog.debug(f"start to excute {__file__}") diff --git a/tests/system-test/7-tmq/tmqUpdate1.py b/tests/system-test/7-tmq/tmqUpdate1.py deleted file mode 100644 index 5f11090385..0000000000 --- a/tests/system-test/7-tmq/tmqUpdate1.py +++ /dev/null @@ -1,175 +0,0 @@ - -import taos -import sys -import time -import socket -import os -import threading -from enum import Enum - -from util.log import * -from util.sql import * -from util.cases import * -from util.dnodes import * -sys.path.append("./7-tmq") -from tmqCommon import * - -class TDTestCase: - def __init__(self): - self.vgroups = 4 - self.ctbNum = 1000 - self.rowsPerTbl = 1000 - - def init(self, conn, logSql): - tdLog.debug(f"start to excute {__file__}") - tdSql.init(conn.cursor(), False) - - def prepareTestEnv(self): - tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ") - paraDict = {'dbName': 'dbt', - 'dropFlag': 1, - 'event': '', - 'vgroups': 4, - 'stbName': 'stb', - 'colPrefix': 'c', - 'tagPrefix': 't', - 'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}], - 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}], - 'ctbPrefix': 'ctb', - 'ctbStartIdx': 0, - 'ctbNum': 1000, - 'rowsPerTbl': 1000, - 'batchNum': 400, - 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 - 'pollDelay': 3, - 'showMsg': 1, - 'showRow': 1, - 'snapshot': 0} - - paraDict['vgroups'] = self.vgroups - paraDict['ctbNum'] = self.ctbNum - paraDict['rowsPerTbl'] = self.rowsPerTbl - - tmqCom.initConsumerTable() - tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1) - tdLog.info("create stb") - tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"]) - tdLog.info("create ctb") - tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'], - ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx']) - tdLog.info("insert data") - paraDict['ctbNum'] = int(self.ctbNum / 2) - tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"], - ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], - startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) - tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix="ctbx", - ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], - startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) - - # tdLog.info("restart taosd to ensure that the data falls into the disk") - # tdSql.query("flush database %s"%(paraDict['dbName'])) - return - - def tmqCase1(self): - tdLog.printNoPrefix("======== test case 1: ") - paraDict = {'dbName': 'dbt', - 'dropFlag': 1, - 'event': '', - 'vgroups': 4, - 'stbName': 'stb', - 'colPrefix': 'c', - 'tagPrefix': 't', - 'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}], - 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}], - 'ctbPrefix': 'ctb', - 'ctbStartIdx': 0, - 'ctbNum': 1000, - 'rowsPerTbl': 1000, - 'batchNum': 1000, - 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 - 'pollDelay': 5, - 'showMsg': 1, - 'showRow': 1, - 'snapshot': 1} - - paraDict['vgroups'] = self.vgroups - paraDict['ctbNum'] = self.ctbNum - paraDict['rowsPerTbl'] = self.rowsPerTbl - - tdLog.info("restart taosd to ensure that the data falls into the disk") - tdSql.query("flush database %s"%(paraDict['dbName'])) - - # update to half tables - paraDict['ctbNum'] = int(self.ctbNum / 4) - tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix="ctbx", - ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], - startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']+paraDict['ctbNum']) - tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"], - ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], - startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']+paraDict['ctbNum']) - - tmqCom.initConsumerTable() - tdLog.info("create topics from stb1") - topicFromStb1 = 'topic_stb1' - queryString = "select ts, c1, c2 from %s.%s"%(paraDict['dbName'], paraDict['stbName']) - sqlString = "create topic %s as %s" %(topicFromStb1, queryString) - tdLog.info("create topic sql: %s"%sqlString) - tdSql.execute(sqlString) - - paraDict['ctbNum'] = self.ctbNum - consumerId = 0 - expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (1 + 1) - topicList = topicFromStb1 - ifcheckdata = 0 - ifManualCommit = 0 - keyList = 'group.id:cgrp1,\ - enable.auto.commit:true,\ - auto.commit.interval.ms:1000,\ - auto.offset.reset:earliest' - tmqCom.insertConsumerInfo(consumerId, expectrowcnt + paraDict["rowsPerTbl"] * paraDict["ctbNum"],topicList,keyList,ifcheckdata,ifManualCommit) - - tdLog.info("start consume processor") - tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) - - paraDict['ctbNum'] = int(self.ctbNum / 2) - paraDict['ctbStartIdx'] += paraDict['ctbNum'] - _ = tmqCom.asyncInsertDataByInterlace(paraDict) - time.sleep(3) - pthread = tmqCom.asyncInsertDataByInterlace(paraDict) - pthread.join() - - tdLog.info("insert process end, and start to check consume result") - expectRows = 1 - resultList = tmqCom.selectConsumeResult(expectRows) - totalConsumeRows = 0 - for i in range(expectRows): - totalConsumeRows += resultList[i] - - tdSql.query(queryString) - totalRowsInserted = tdSql.getRows() - - tdLog.info("act consume rows: %d, act insert rows: %d, expect consume rows: %d, "%(totalConsumeRows, totalRowsInserted, expectrowcnt)) - - if totalConsumeRows <= totalRowsInserted or totalConsumeRows != expectrowcnt: - tdLog.exit("tmq consume rows error!") - - tdSql.query("drop topic %s"%topicFromStb1) - - tdLog.printNoPrefix("======== test case 1 end ...... ") - - - def run(self): - tdSql.prepare() - self.prepareTestEnv() - self.tmqCase1() - # self.tmqCase2() - - - def stop(self): - tdSql.close() - tdLog.success(f"{__file__} successfully executed") - -event = threading.Event() - -tdCases.addLinux(__file__, TDTestCase()) -tdCases.addWindows(__file__, TDTestCase()) diff --git a/tests/system-test/fulltest.sh b/tests/system-test/fulltest.sh index becb5db501..2c116113bc 100755 --- a/tests/system-test/fulltest.sh +++ b/tests/system-test/fulltest.sh @@ -148,7 +148,7 @@ python3 ./test.py -f 7-tmq/subscribeDb2.py python3 ./test.py -f 7-tmq/subscribeDb3.py #python3 ./test.py -f 7-tmq/subscribeDb4.py python3 ./test.py -f 7-tmq/subscribeStb.py -#python3 ./test.py -f 7-tmq/subscribeStb0.py +python3 ./test.py -f 7-tmq/subscribeStb0.py python3 ./test.py -f 7-tmq/subscribeStb1.py python3 ./test.py -f 7-tmq/subscribeStb2.py python3 ./test.py -f 7-tmq/subscribeStb3.py