diff --git a/include/common/tcommon.h b/include/common/tcommon.h index 0968c59399..2c4a00a72d 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -176,6 +176,7 @@ typedef enum EStreamType { STREAM_CREATE_CHILD_TABLE, STREAM_TRANS_STATE, STREAM_MID_RETRIEVE, + STREAM_PARTITION_DELETE_DATA, } EStreamType; #pragma pack(push, 1) diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index 2fbed98604..18bc24d612 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -153,6 +153,7 @@ typedef struct SJoinLogicNode { bool seqWinGroup; bool grpJoin; bool hashJoinHint; + bool batchScanHint; // FOR HASH JOIN int32_t timeRangeTarget; //table onCond filter diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index 6c20813118..7f73aa6845 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -538,7 +538,6 @@ void stopAllQueries(SRequestObj *pRequest) { pTmp = acquireRequest(tmpRefId); if (pTmp) { pReqList[++reqIdx] = pTmp; - releaseRequest(tmpRefId); } else { tscError("prev req ref 0x%" PRIx64 " is not there", tmpRefId); break; @@ -547,6 +546,7 @@ void stopAllQueries(SRequestObj *pRequest) { for (int32_t i = reqIdx; i >= 0; i--) { taosStopQueryImpl(pReqList[i]); + releaseRequest(pReqList[i]->self); } taosStopQueryImpl(pRequest); diff --git a/source/dnode/mnode/impl/src/mndSma.c b/source/dnode/mnode/impl/src/mndSma.c index dd569b4c59..aaa0c42262 100644 --- a/source/dnode/mnode/impl/src/mndSma.c +++ b/source/dnode/mnode/impl/src/mndSma.c @@ -1526,6 +1526,8 @@ static int32_t mndCreateTSMATxnPrepare(SCreateTSMACxt* pCxt) { int32_t code = -1; STransAction createStreamRedoAction = {0}; STransAction createStreamUndoAction = {0}; + STransAction dropStbUndoAction = {0}; + SMDropStbReq dropStbReq = {0}; STrans *pTrans = mndTransCreate(pCxt->pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pCxt->pRpcReq, "create-tsma"); if (!pTrans) { @@ -1556,7 +1558,7 @@ static int32_t mndCreateTSMATxnPrepare(SCreateTSMACxt* pCxt) { createStreamUndoAction.epSet = createStreamRedoAction.epSet; createStreamUndoAction.acceptableCode = TSDB_CODE_MND_STREAM_NOT_EXIST; - createStreamUndoAction.actionType = TDMT_STREAM_DROP; + createStreamUndoAction.msgType = TDMT_STREAM_DROP; createStreamUndoAction.contLen = tSerializeSMDropStreamReq(0, 0, pCxt->pDropStreamReq); createStreamUndoAction.pCont = taosMemoryCalloc(1, createStreamUndoAction.contLen); if (!createStreamUndoAction.pCont) { @@ -1569,6 +1571,24 @@ static int32_t mndCreateTSMATxnPrepare(SCreateTSMACxt* pCxt) { goto _OVER; } + dropStbReq.igNotExists = true; + strncpy(dropStbReq.name, pCxt->targetStbFullName, TSDB_TABLE_FNAME_LEN); + dropStbUndoAction.epSet = createStreamRedoAction.epSet; + dropStbUndoAction.acceptableCode = TSDB_CODE_MND_STB_NOT_EXIST; + dropStbUndoAction.retryCode = TSDB_CODE_MND_STREAM_MUST_BE_DELETED; + dropStbUndoAction.msgType = TDMT_MND_STB_DROP; + dropStbUndoAction.contLen = tSerializeSMDropStbReq(0, 0, &dropStbReq); + dropStbUndoAction.pCont = taosMemoryCalloc(1, dropStbUndoAction.contLen); + if (!dropStbUndoAction.pCont) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto _OVER; + } + if (dropStbUndoAction.contLen != tSerializeSMDropStbReq(dropStbUndoAction.pCont, dropStbUndoAction.contLen, &dropStbReq)) { + mError("sma: %s, failed to create due to drop stb req encode failure", pCxt->pCreateSmaReq->name); + terrno = TSDB_CODE_INVALID_MSG; + goto _OVER; + } + SDbObj newDb = {0}; memcpy(&newDb, pCxt->pDb, sizeof(SDbObj)); newDb.tsmaVersion++; @@ -1579,6 +1599,7 @@ static int32_t mndCreateTSMATxnPrepare(SCreateTSMACxt* pCxt) { if (mndSetCreateSmaCommitLogs(pCxt->pMnode, pTrans, pCxt->pSma) != 0) goto _OVER; if (mndTransAppendRedoAction(pTrans, &createStreamRedoAction) != 0) goto _OVER; if (mndTransAppendUndoAction(pTrans, &createStreamUndoAction) != 0) goto _OVER; + if (mndTransAppendUndoAction(pTrans, &dropStbUndoAction) != 0) goto _OVER; if (mndTransPrepare(pCxt->pMnode, pTrans) != 0) goto _OVER; code = TSDB_CODE_SUCCESS; diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 41ff45038f..7b6563f4b4 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -1109,7 +1109,7 @@ int32_t mndTransProcessRsp(SRpcMsg *pRsp) { goto _OVER; } - int32_t actionNum = taosArrayGetSize(pTrans->redoActions); + int32_t actionNum = taosArrayGetSize(pArray); if (action < 0 || action >= actionNum) { mError("trans:%d, invalid action:%d", transId, action); goto _OVER; diff --git a/source/dnode/vnode/src/meta/metaEntry.c b/source/dnode/vnode/src/meta/metaEntry.c index eb41ffc0a5..9a111ae2d4 100644 --- a/source/dnode/vnode/src/meta/metaEntry.c +++ b/source/dnode/vnode/src/meta/metaEntry.c @@ -31,21 +31,21 @@ int meteEncodeColCmprEntry(SEncoder *pCoder, const SMetaEntry *pME) { int meteDecodeColCmprEntry(SDecoder *pDecoder, SMetaEntry *pME) { SColCmprWrapper *pWrapper = &pME->colCmpr; if (tDecodeI32v(pDecoder, &pWrapper->nCols) < 0) return -1; + if (pWrapper->nCols == 0) { + return 0; + } + if (tDecodeI32v(pDecoder, &pWrapper->version) < 0) return -1; uDebug("dencode cols:%d", pWrapper->nCols); - pWrapper->pColCmpr = (SColCmpr *)tDecoderMalloc(pDecoder, pWrapper->nCols * sizeof(SColCmpr)); if (pWrapper->pColCmpr == NULL) return -1; for (int i = 0; i < pWrapper->nCols; i++) { SColCmpr *p = &pWrapper->pColCmpr[i]; - if (tDecodeI16v(pDecoder, &p->id) < 0) goto END; - if (tDecodeU32(pDecoder, &p->alg) < 0) goto END; + if (tDecodeI16v(pDecoder, &p->id) < 0) return -1; + if (tDecodeU32(pDecoder, &p->alg) < 0) return -1; } return 0; -END: - // taosMemoryFree(pWrapper->pColCmpr); - return -1; } static FORCE_INLINE void metatInitDefaultSColCmprWrapper(SDecoder *pDecoder, SColCmprWrapper *pCmpr, SSchemaWrapper *pSchema) { @@ -152,6 +152,10 @@ int metaDecodeEntry(SDecoder *pCoder, SMetaEntry *pME) { if (pME->type == TSDB_SUPER_TABLE) { if (TABLE_IS_COL_COMPRESSED(pME->flags)) { if (meteDecodeColCmprEntry(pCoder, pME) < 0) return -1; + + if (pME->colCmpr.nCols == 0) { + metatInitDefaultSColCmprWrapper(pCoder, &pME->colCmpr, &pME->stbEntry.schemaRow); + } } else { metatInitDefaultSColCmprWrapper(pCoder, &pME->colCmpr, &pME->stbEntry.schemaRow); TABLE_SET_COL_COMPRESSED(pME->flags); @@ -160,6 +164,9 @@ int metaDecodeEntry(SDecoder *pCoder, SMetaEntry *pME) { if (!tDecodeIsEnd(pCoder)) { uDebug("set type: %d, tableName:%s", pME->type, pME->name); if (meteDecodeColCmprEntry(pCoder, pME) < 0) return -1; + if (pME->colCmpr.nCols == 0) { + metatInitDefaultSColCmprWrapper(pCoder, &pME->colCmpr, &pME->ntbEntry.schemaRow); + } } else { uDebug("set default type: %d, tableName:%s", pME->type, pME->name); metatInitDefaultSColCmprWrapper(pCoder, &pME->colCmpr, &pME->ntbEntry.schemaRow); diff --git a/source/dnode/vnode/src/meta/metaTable.c b/source/dnode/vnode/src/meta/metaTable.c index e9ddaf9fca..7119699a32 100644 --- a/source/dnode/vnode/src/meta/metaTable.c +++ b/source/dnode/vnode/src/meta/metaTable.c @@ -274,8 +274,6 @@ int metaCreateSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) { me.name = pReq->name; me.stbEntry.schemaRow = pReq->schemaRow; me.stbEntry.schemaTag = pReq->schemaTag; - // me.stbEntry.colCmpr = pReq->colCmpr; - // me.stbEntry.colCmpr = pReq-> if (pReq->rollup) { TABLE_SET_ROLLUP(me.flags); me.stbEntry.rsmaParam = pReq->rsmaParam; @@ -283,10 +281,6 @@ int metaCreateSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) { if (pReq->colCmpred) { TABLE_SET_COL_COMPRESSED(me.flags); me.colCmpr = pReq->colCmpr; - } else { - TABLE_SET_COL_COMPRESSED(me.flags); - // TODO(yihao) - // SETUP default compress algr } if (metaHandleEntry(pMeta, &me) < 0) goto _err; diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index be6fb2983c..d06beebd6b 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -287,7 +287,19 @@ int32_t prepareDataBlockBuf(SSDataBlock* pDataBlock, SColMatchInfo* pMatchInfo) EDealRes doTranslateTagExpr(SNode** pNode, void* pContext) { SMetaReader* mr = (SMetaReader*)pContext; + bool isTagCol = false, isTbname = false; if (nodeType(*pNode) == QUERY_NODE_COLUMN) { + SColumnNode* pCol = (SColumnNode*)*pNode; + if (pCol->colType == COLUMN_TYPE_TBNAME) + isTbname = true; + else + isTagCol = true; + } else if (nodeType(*pNode) == QUERY_NODE_FUNCTION) { + SFunctionNode* pFunc = (SFunctionNode*)*pNode; + if (pFunc->funcType == FUNCTION_TYPE_TBNAME) + isTbname = true; + } + if (isTagCol) { SColumnNode* pSColumnNode = *(SColumnNode**)pNode; SValueNode* res = (SValueNode*)nodesMakeNode(QUERY_NODE_VALUE); @@ -316,24 +328,21 @@ EDealRes doTranslateTagExpr(SNode** pNode, void* pContext) { } nodesDestroyNode(*pNode); *pNode = (SNode*)res; - } else if (nodeType(*pNode) == QUERY_NODE_FUNCTION) { - SFunctionNode* pFuncNode = *(SFunctionNode**)pNode; - if (pFuncNode->funcType == FUNCTION_TYPE_TBNAME) { - SValueNode* res = (SValueNode*)nodesMakeNode(QUERY_NODE_VALUE); - if (NULL == res) { - return DEAL_RES_ERROR; - } - - res->translate = true; - res->node.resType = pFuncNode->node.resType; - - int32_t len = strlen(mr->me.name); - res->datum.p = taosMemoryCalloc(len + VARSTR_HEADER_SIZE + 1, 1); - memcpy(varDataVal(res->datum.p), mr->me.name, len); - varDataSetLen(res->datum.p, len); - nodesDestroyNode(*pNode); - *pNode = (SNode*)res; + } else if (isTbname) { + SValueNode* res = (SValueNode*)nodesMakeNode(QUERY_NODE_VALUE); + if (NULL == res) { + return DEAL_RES_ERROR; } + + res->translate = true; + res->node.resType = ((SExprNode*)(*pNode))->resType; + + int32_t len = strlen(mr->me.name); + res->datum.p = taosMemoryCalloc(len + VARSTR_HEADER_SIZE + 1, 1); + memcpy(varDataVal(res->datum.p), mr->me.name, len); + varDataSetLen(res->datum.p, len); + nodesDestroyNode(*pNode); + *pNode = (SNode*)res; } return DEAL_RES_CONTINUE; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 9dfe6230d9..c30059fffd 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1640,7 +1640,7 @@ static void getPreVersionDataBlock(uint64_t uid, TSKEY startTs, TSKEY endTs, int printDataBlock(pBlock, "new delete", taskIdStr); } -static int32_t generateSessionScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock) { +static int32_t generateSessionScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock, EStreamType mode) { if (pSrcBlock->info.rows == 0) { return TSDB_CODE_SUCCESS; } @@ -1659,7 +1659,7 @@ static int32_t generateSessionScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSr } int64_t ver = pSrcBlock->info.version - 1; - if (pInfo->partitionSup.needCalc && (startData[0] != endData[0] || hasPrimaryKey(pInfo))) { + if (pInfo->partitionSup.needCalc && ( startData[0] != endData[0] || (hasPrimaryKey(pInfo) && mode == STREAM_DELETE_DATA) )) { getPreVersionDataBlock(uidCol[0], startData[0], endData[0], ver, GET_TASKID(pTaskInfo), pInfo, pSrcBlock); startData = (TSKEY*)pStartTsCol->pData; endData = (TSKEY*)pEndTsCol->pData; @@ -1736,7 +1736,7 @@ static int32_t generateCountScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcB } int64_t ver = pSrcBlock->info.version - 1; - if (pInfo->partitionSup.needCalc && (startData[0] != endData[0] || hasPrimaryKey(pInfo))) { + if (pInfo->partitionSup.needCalc && ( startData[0] != endData[0] || (hasPrimaryKey(pInfo) && mode == STREAM_DELETE_DATA) )) { getPreVersionDataBlock(uidCol[0], startData[0], endData[0], ver, GET_TASKID(pTaskInfo), pInfo, pSrcBlock); startData = (TSKEY*)pStartTsCol->pData; endData = (TSKEY*)pEndTsCol->pData; @@ -1779,7 +1779,7 @@ static int32_t generateCountScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcB return TSDB_CODE_SUCCESS; } -static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock) { +static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock, EStreamType mode) { blockDataCleanup(pDestBlock); if (pSrcBlock->info.rows == 0) { return TSDB_CODE_SUCCESS; @@ -1800,7 +1800,7 @@ static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pS TSKEY* srcEndTsCol = (TSKEY*)pSrcEndTsCol->pData; int64_t ver = pSrcBlock->info.version - 1; - if (pInfo->partitionSup.needCalc && (srcStartTsCol[0] != srcEndTsCol[0] || hasPrimaryKey(pInfo))) { + if (pInfo->partitionSup.needCalc && ( srcStartTsCol[0] != srcEndTsCol[0] || (hasPrimaryKey(pInfo) && mode == STREAM_DELETE_DATA) )) { getPreVersionDataBlock(srcUidData[0], srcStartTsCol[0], srcEndTsCol[0], ver, GET_TASKID(pTaskInfo), pInfo, pSrcBlock); srcStartTsCol = (TSKEY*)pSrcStartTsCol->pData; srcEndTsCol = (TSKEY*)pSrcEndTsCol->pData; @@ -1959,9 +1959,9 @@ static int32_t generateDeleteResultBlock(SStreamScanInfo* pInfo, SSDataBlock* pS static int32_t generateScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock, EStreamType type) { int32_t code = TSDB_CODE_SUCCESS; if (isIntervalWindow(pInfo)) { - code = generateIntervalScanRange(pInfo, pSrcBlock, pDestBlock); + code = generateIntervalScanRange(pInfo, pSrcBlock, pDestBlock, type); } else if (isSessionWindow(pInfo) || isStateWindow(pInfo)) { - code = generateSessionScanRange(pInfo, pSrcBlock, pDestBlock); + code = generateSessionScanRange(pInfo, pSrcBlock, pDestBlock, type); } else if (isCountWindow(pInfo)) { code = generateCountScanRange(pInfo, pSrcBlock, pDestBlock, type); } else { @@ -2660,7 +2660,7 @@ FETCH_NEXT_BLOCK: } } break; case STREAM_SCAN_FROM_DELETE_DATA: { - generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes, STREAM_DELETE_DATA); + generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes, STREAM_PARTITION_DELETE_DATA); prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex); pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE; copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes); diff --git a/source/libs/executor/src/streamcountwindowoperator.c b/source/libs/executor/src/streamcountwindowoperator.c index b66df120b4..f2d3bbb29a 100644 --- a/source/libs/executor/src/streamcountwindowoperator.c +++ b/source/libs/executor/src/streamcountwindowoperator.c @@ -185,7 +185,7 @@ void getCountWinRange(SStreamAggSupporter* pAggSup, const SSessionKey* pKey, ESt return; } pDelRange->win = tmpKey.win; - while (mode == STREAM_DELETE_DATA) { + while (mode == STREAM_DELETE_DATA || mode == STREAM_PARTITION_DELETE_DATA) { pAggSup->stateStore.streamStateCurNext(pAggSup->pState, pCur); code = pAggSup->stateStore.streamStateSessionGetKVByCur(pCur, &tmpKey, NULL, 0); if (code != TSDB_CODE_SUCCESS) { diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 5c79ffa6de..0e3e74f16f 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -262,7 +262,7 @@ static void doDeleteWindows(SOperatorInfo* pOperator, SInterval* pInterval, SSDa if (chIds) { int32_t childId = getChildIndex(pBlock); if (pInvalidWins) { - qDebug("===stream===save mid delete window:%" PRId64 ",groupId:%" PRId64 ",chId:%d", winRes.ts, winRes.groupId, childId); + qDebug("===stream===save invalid delete window:%" PRId64 ",groupId:%" PRId64 ",chId:%d", winRes.ts, winRes.groupId, childId); taosHashPut(pInvalidWins, &winRes, sizeof(SWinKey), NULL, 0); } @@ -654,11 +654,12 @@ static bool processPullOver(SSDataBlock* pBlock, SHashObj* pMap, SHashObj* pFina .calWin.skey = nextWin.skey, .calWin.ekey = nextWin.skey}; // add pull data request - qDebug("===stream===prepare final retrive for delete window:%" PRId64 ",groupId%" PRId64 ", size:%d", winRes.ts, winRes.groupId, numOfCh); + qDebug("===stream===prepare final retrive for delete window:%" PRId64 ",groupId:%" PRId64 ", size:%d", winRes.ts, winRes.groupId, numOfCh); if (IS_MID_INTERVAL_OP(pOperator)) { SStreamIntervalOperatorInfo* pInfo = (SStreamIntervalOperatorInfo*)pOperator->info; taosArrayPush(pInfo->pMidPullDatas, &winRes); } else if (savePullWindow(&pull, pPullWins) == TSDB_CODE_SUCCESS) { + taosArrayPush(pInfo->pDelWins, &winRes); addPullWindow(pMap, &winRes, numOfCh); if (pInfo->destHasPrimaryKey) { tSimpleHashPut(pInfo->pDeletedMap,&winRes, sizeof(SWinKey), NULL, 0); @@ -1328,7 +1329,8 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { } else if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT || pBlock->info.type == STREAM_CLEAR) { SArray* delWins = taosArrayInit(8, sizeof(SWinKey)); - doDeleteWindows(pOperator, &pInfo->interval, pBlock, delWins, pInfo->pUpdatedMap, NULL); + SHashObj* finalMap = IS_FINAL_INTERVAL_OP(pOperator) ? pInfo->pFinalPullDataMap : NULL; + doDeleteWindows(pOperator, &pInfo->interval, pBlock, delWins, pInfo->pUpdatedMap, finalMap); if (IS_FINAL_INTERVAL_OP(pOperator)) { int32_t chId = getChildIndex(pBlock); addRetriveWindow(delWins, pInfo, chId); diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 394eecd542..bcd1ab5c18 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -2517,7 +2517,7 @@ static int32_t translateMd5(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); } - pFunc->node.resType = (SDataType){.bytes = MD5_OUTPUT_LEN, .type = TSDB_DATA_TYPE_VARCHAR}; + pFunc->node.resType = (SDataType){.bytes = MD5_OUTPUT_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}; return TSDB_CODE_SUCCESS; } diff --git a/source/libs/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c index de84246727..5f59fabec5 100644 --- a/source/libs/nodes/src/nodesCloneFuncs.c +++ b/source/libs/nodes/src/nodesCloneFuncs.c @@ -511,6 +511,7 @@ static int32_t logicJoinCopy(const SJoinLogicNode* pSrc, SJoinLogicNode* pDst) { COPY_SCALAR_FIELD(seqWinGroup); COPY_SCALAR_FIELD(grpJoin); COPY_SCALAR_FIELD(hashJoinHint); + COPY_SCALAR_FIELD(batchScanHint); CLONE_NODE_FIELD(pLeftOnCond); CLONE_NODE_FIELD(pRightOnCond); COPY_SCALAR_FIELD(timeRangeTarget); diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index cceabcbf50..2e3e8f189b 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -582,6 +582,7 @@ static int32_t createJoinLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect pJoin->node.inputTsOrder = ORDER_ASC; pJoin->node.groupAction = GROUP_ACTION_CLEAR; pJoin->hashJoinHint = getHashJoinOptHint(pSelect->pHint); + pJoin->batchScanHint = getBatchScanOptionFromHint(pSelect->pHint); pJoin->node.requireDataOrder = pJoin->hashJoinHint ? DATA_ORDER_LEVEL_NONE : DATA_ORDER_LEVEL_GLOBAL; pJoin->node.resultDataOrder = DATA_ORDER_LEVEL_NONE; pJoin->isLowLevelJoin = pJoinTable->isLowLevelJoin; diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index ae37334762..da39228a62 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -2751,16 +2751,20 @@ static bool partTagsIsOptimizableNode(SLogicNode* pNode) { if (!ret) return ret; switch (nodeType(pNode)) { case QUERY_NODE_LOGIC_PLAN_PARTITION: { - if (pNode->pParent && nodeType(pNode->pParent) == QUERY_NODE_LOGIC_PLAN_WINDOW) { - SWindowLogicNode* pWindow = (SWindowLogicNode*)pNode->pParent; - if (pWindow->winType == WINDOW_TYPE_INTERVAL) { - // if interval has slimit, we push down partition node to scan, and scan will set groupOrderScan to true - // we want to skip groups of blocks after slimit satisfied - // if interval only has limit, we do not push down partition node to scan - // we want to get grouped output from partition node and make use of limit - // if no slimit and no limit, we push down partition node and groupOrderScan is false, cause we do not need - // group ordered output - if (!pWindow->node.pSlimit && pWindow->node.pLimit) ret = false; + if (pNode->pParent) { + if (nodeType(pNode->pParent) == QUERY_NODE_LOGIC_PLAN_WINDOW) { + SWindowLogicNode* pWindow = (SWindowLogicNode*)pNode->pParent; + if (pWindow->winType == WINDOW_TYPE_INTERVAL) { + // if interval has slimit, we push down partition node to scan, and scan will set groupOrderScan to true + // we want to skip groups of blocks after slimit satisfied + // if interval only has limit, we do not push down partition node to scan + // we want to get grouped output from partition node and make use of limit + // if no slimit and no limit, we push down partition node and groupOrderScan is false, cause we do not need + // group ordered output + if (!pWindow->node.pSlimit && pWindow->node.pLimit) ret = false; + } + } else if (nodeType(pNode->pParent) == QUERY_NODE_LOGIC_PLAN_JOIN) { + ret = false; } } } break; @@ -5607,7 +5611,7 @@ static int32_t grpJoinOptPartByTags(SLogicNode* pNode) { static int32_t grpJoinOptRewriteGroupJoin(SOptimizeContext* pCxt, SLogicNode* pNode, SLogicSubplan* pLogicSubplan) { SJoinLogicNode* pJoin = (SJoinLogicNode*)pNode; - int32_t code = (pJoin->allEqTags && !pJoin->hasSubQuery) ? grpJoinOptPartByTags(pNode) : grpJoinOptInsertPartitionNode(pNode); + int32_t code = (pJoin->allEqTags && !pJoin->hasSubQuery && !pJoin->batchScanHint) ? grpJoinOptPartByTags(pNode) : grpJoinOptInsertPartitionNode(pNode); if (TSDB_CODE_SUCCESS == code) { pJoin->grpJoin = true; pCxt->optimized = true; @@ -6328,6 +6332,7 @@ static int32_t tsmaOptRewriteScan(STSMAOptCtx* pTsmaOptCtx, SScanLogicNode* pNew if (code == TSDB_CODE_SUCCESS) { code = tsmaOptRewriteNodeList(pNewScan->pGroupTags, pTsmaOptCtx, pTsma, true, true); } + pTsmaOptCtx->pScan->dataRequired = FUNC_DATA_REQUIRED_DATA_LOAD; if (pTsmaOptCtx->pScan->pTsmaTargetTbVgInfo && pTsmaOptCtx->pScan->pTsmaTargetTbVgInfo->size > 0) { for (int32_t i = 0; i < taosArrayGetSize(pTsmaOptCtx->pScan->pTsmas); ++i) { STableTSMAInfo* pTsmaInfo = taosArrayGetP(pTsmaOptCtx->pScan->pTsmas, i); diff --git a/tests/army/community/storage/compressBasic.py b/tests/army/community/storage/compressBasic.py index 2fe19abbd4..c0975c6d75 100644 --- a/tests/army/community/storage/compressBasic.py +++ b/tests/army/community/storage/compressBasic.py @@ -141,6 +141,13 @@ class TDTestCase(TBase): tdSql.checkData(i, 5, self.defCompress) tdSql.checkData(i, 6, self.defLevel) + # geometry encode is disabled + sql = f"create table {self.db}.ta(ts timestamp, pos geometry(64)) " + tdSql.execute(sql) + sql = f"describe {self.db}.ta" + tdSql.query(sql) + tdSql.checkData(1, 4, "disabled") + tdLog.info("check default encode compress and level successfully.") def checkDataDesc(self, tbname, row, col, value): diff --git a/tests/army/enterprise/s3/s3_basic.json b/tests/army/enterprise/s3/s3Basic.json similarity index 81% rename from tests/army/enterprise/s3/s3_basic.json rename to tests/army/enterprise/s3/s3Basic.json index 747ac7c8ec..4a2f4496f9 100644 --- a/tests/army/enterprise/s3/s3_basic.json +++ b/tests/army/enterprise/s3/s3Basic.json @@ -7,8 +7,8 @@ "password": "taosdata", "connection_pool_size": 8, "num_of_records_per_req": 4000, - "prepared_rand": 1000, - "thread_count": 2, + "prepared_rand": 500, + "thread_count": 4, "create_table_thread_count": 1, "confirm_parameter_prompt": "no", "databases": [ @@ -18,20 +18,26 @@ "drop": "yes", "vgroups": 2, "replica": 1, - "duration":"15d", - "flush_each_batch":"yes", - "keep": "60d,100d,200d" + "duration":"10d", + "s3_keeplocal":"30d", + "s3_chunksize":"131072", + "tsdb_pagesize":"1", + "s3_compact":"1", + "wal_retention_size":"1", + "wal_retention_period":"1", + "flush_each_batch":"no", + "keep": "3650d" }, "super_tables": [ { "name": "stb", "child_table_exists": "no", - "childtable_count": 2, + "childtable_count": 10, "insert_rows": 2000000, "childtable_prefix": "d", "insert_mode": "taosc", "timestamp_step": 1000, - "start_timestamp":"now-90d", + "start_timestamp": 1600000000000, "columns": [ { "type": "bool", "name": "bc"}, { "type": "float", "name": "fc" }, diff --git a/tests/army/enterprise/s3/s3Basic.py b/tests/army/enterprise/s3/s3Basic.py new file mode 100644 index 0000000000..b4b18e355e --- /dev/null +++ b/tests/army/enterprise/s3/s3Basic.py @@ -0,0 +1,351 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + +import sys +import time +import random + +import taos +import frame +import frame.etool +import frame.eos + +from frame.log import * +from frame.cases import * +from frame.sql import * +from frame.caseBase import * +from frame.srvCtl import * +from frame import * +from frame.eos import * + + +# +# 192.168.1.52 MINIO S3 +# + +''' +s3EndPoint http://192.168.1.52:9000 +s3AccessKey 'zOgllR6bSnw2Ah3mCNel:cdO7oXAu3Cqdb1rUdevFgJMi0LtRwCXdWKQx4bhX' +s3BucketName ci-bucket +s3UploadDelaySec 60 +''' + + +class TDTestCase(TBase): + updatecfgDict = { + "supportVnodes":"1000", + 's3EndPoint': 'http://192.168.1.52:9000', + 's3AccessKey': 'zOgllR6bSnw2Ah3mCNel:cdO7oXAu3Cqdb1rUdevFgJMi0LtRwCXdWKQx4bhX', + 's3BucketName': 'ci-bucket', + 's3PageCacheSize': '10240', + "s3UploadDelaySec": "10", + 's3MigrateIntervalSec': '600', + 's3MigrateEnabled': '1' + } + + maxFileSize = (128 + 10) * 1014 * 1024 # add 10M buffer + + def insertData(self): + tdLog.info(f"insert data.") + # taosBenchmark run + json = etool.curFile(__file__, "s3Basic.json") + etool.benchMark(json=json) + + tdSql.execute(f"use {self.db}") + # come from s3_basic.json + self.childtable_count = 10 + self.insert_rows = 2000000 + self.timestamp_step = 1000 + + def createStream(self, sname): + sql = f"create stream {sname} fill_history 1 into stm1 as select count(*) from {self.db}.{self.stb} interval(10s);" + tdSql.execute(sql) + + def migrateDbS3(self): + sql = f"s3migrate database {self.db}" + tdSql.execute(sql, show=True) + + def checkDataFile(self, lines, maxFileSize): + # ls -l + # -rwxrwxrwx 1 root root 41652224 Apr 17 14:47 vnode2/tsdb/v2f1974ver47.3.data + overCnt = 0 + for line in lines: + cols = line.split() + fileSize = int(cols[4]) + fileName = cols[8] + #print(f" filesize={fileSize} fileName={fileName} line={line}") + if fileSize > maxFileSize: + tdLog.info(f"error, {fileSize} over max size({maxFileSize})\n") + overCnt += 1 + else: + tdLog.info(f"{fileName}({fileSize}) check size passed.") + + return overCnt + + def checkUploadToS3(self): + rootPath = sc.clusterRootPath() + cmd = f"ls -l {rootPath}/dnode*/data/vnode/vnode*/tsdb/*.data" + tdLog.info(cmd) + loop = 0 + rets = [] + overCnt = 0 + while loop < 180: + time.sleep(3) + + # check upload to s3 + rets = eos.runRetList(cmd) + cnt = len(rets) + if cnt == 0: + overCnt = 0 + tdLog.info("All data file upload to server over.") + break + overCnt = self.checkDataFile(rets, self.maxFileSize) + if overCnt == 0: + uploadOK = True + tdLog.info(f"All data files({len(rets)}) size bellow {self.maxFileSize}, check upload to s3 ok.") + break + + tdLog.info(f"loop={loop} no upload {overCnt} data files wait 3s retry ...") + if loop == 3: + sc.dnodeStop(1) + time.sleep(2) + sc.dnodeStart(1) + loop += 1 + # miggrate + self.migrateDbS3() + + # check can pass + if overCnt > 0: + tdLog.exit(f"s3 have {overCnt} files over size.") + + + def doAction(self): + tdLog.info(f"do action.") + + self.flushDb(show=True) + #self.compactDb(show=True) + + # sleep 70s + self.migrateDbS3() + + # check upload to s3 + self.checkUploadToS3() + + def checkStreamCorrect(self): + sql = f"select count(*) from {self.db}.stm1" + count = 0 + for i in range(120): + tdSql.query(sql) + count = tdSql.getData(0, 0) + if count == 100000 or count == 100001: + return True + time.sleep(1) + + tdLog.exit(f"stream count is not expect . expect = 100000 or 100001 real={count} . sql={sql}") + + + def checkCreateDb(self, keepLocal, chunkSize, compact): + # keyword + kw1 = kw2 = kw3 = "" + if keepLocal is not None: + kw1 = f"s3_keeplocal {keepLocal}" + if chunkSize is not None: + kw2 = f"s3_chunksize {chunkSize}" + if compact is not None: + kw3 = f"s3_compact {compact}" + + sql = f" create database db1 duration 1h {kw1} {kw2} {kw3}" + tdSql.execute(sql, show=True) + #sql = f"select name,s3_keeplocal,s3_chunksize,s3_compact from information_schema.ins_databases where name='db1';" + sql = f"select * from information_schema.ins_databases where name='db1';" + tdSql.query(sql) + # 29 30 31 -> chunksize keeplocal compact + if chunkSize is not None: + tdSql.checkData(0, 29, chunkSize) + if keepLocal is not None: + keepLocalm = keepLocal * 24 * 60 + tdSql.checkData(0, 30, f"{keepLocalm}m") + if compact is not None: + tdSql.checkData(0, 31, compact) + sql = "drop database db1" + tdSql.execute(sql) + + def checkExcept(self): + # errors + sqls = [ + f"create database db2 s3_keeplocal -1", + f"create database db2 s3_keeplocal 0", + f"create database db2 s3_keeplocal 365001", + f"create database db2 s3_chunksize -1", + f"create database db2 s3_chunksize 0", + f"create database db2 s3_chunksize 900000000", + f"create database db2 s3_compact -1", + f"create database db2 s3_compact 100", + f"create database db2 duration 1d s3_keeplocal 1d" + ] + tdSql.errors(sqls) + + + def checkBasic(self): + # create db + keeps = [1, 256, 1024, 365000, None] + chunks = [131072, 600000, 820000, 1048576, None] + comps = [0, 1, None] + + for keep in keeps: + for chunk in chunks: + for comp in comps: + self.checkCreateDb(keep, chunk, comp) + + + # --checks3 + idx = 1 + taosd = sc.taosdFile(idx) + cfg = sc.dnodeCfgPath(idx) + cmd = f"{taosd} -c {cfg} --checks3" + + eos.exe(cmd) + #output, error = eos.run(cmd) + #print(lines) + + ''' + tips = [ + "put object s3test.txt: success", + "listing bucket ci-bucket: success", + "get object s3test.txt: success", + "delete object s3test.txt: success" + ] + pos = 0 + for tip in tips: + pos = output.find(tip, pos) + #if pos == -1: + # tdLog.exit(f"checks3 failed not found {tip}. cmd={cmd} output={output}") + ''' + + # except + self.checkExcept() + + # + def preDb(self, vgroups): + + cnt = int(time.time())%3 + 1 + for i in range(cnt): + vg = int(time.time()*1000)%10 + 1 + sql = f"create database predb vgroups {vg}" + tdSql.execute(sql, show=True) + sql = "drop database predb" + tdSql.execute(sql, show=True) + + # history + def insertHistory(self): + tdLog.info(f"insert history data.") + # taosBenchmark run + json = etool.curFile(__file__, "s3Basic1.json") + etool.benchMark(json=json) + + # come from s3_basic.json + self.insert_rows += self.insert_rows/4 + self.timestamp_step = 500 + + # delete + def checkDelete(self): + # del 1000 rows + start = 1600000000000 + drows = 200 + for i in range(1, drows, 2): + sql = f"from {self.db}.{self.stb} where ts = {start + i*500}" + tdSql.execute("delete " + sql, show=True) + tdSql.query("select * " + sql) + tdSql.checkRows(0) + + # delete all 500 step + self.flushDb() + self.compactDb() + self.insert_rows -= drows/2 + sql = f"select count(*) from {self.db}.{self.stb}" + tdSql.checkAgg(sql, self.insert_rows * self.childtable_count) + + # delete 10W rows from 100000 + drows = 100000 + sdel = start + 100000 * self.timestamp_step + edel = start + 100000 * self.timestamp_step + drows * self.timestamp_step + sql = f"from {self.db}.{self.stb} where ts >= {sdel} and ts < {edel}" + tdSql.execute("delete " + sql, show=True) + tdSql.query("select * " + sql) + tdSql.checkRows(0) + + self.insert_rows -= drows + sql = f"select count(*) from {self.db}.{self.stb}" + tdSql.checkAgg(sql, self.insert_rows * self.childtable_count) + + + # run + def run(self): + tdLog.debug(f"start to excute {__file__}") + self.sname = "stream1" + if eos.isArm64Cpu(): + tdLog.success(f"{__file__} arm64 ignore executed") + else: + + self.preDb(10) + + # insert data + self.insertData() + + # creat stream + self.createStream(self.sname) + + # check insert data correct + #self.checkInsertCorrect() + + # save + self.snapshotAgg() + + # do action + self.doAction() + + # check save agg result correct + self.checkAggCorrect() + + # check insert correct again + self.checkInsertCorrect() + + # checkBasic + self.checkBasic() + + # check stream correct and drop stream + #self.checkStreamCorrect() + + # drop stream + self.dropStream(self.sname) + + # insert history disorder data + self.insertHistory() + #self.checkInsertCorrect() + self.snapshotAgg() + self.doAction() + self.checkAggCorrect() + self.checkInsertCorrect(difCnt=self.childtable_count*999999) + self.checkDelete() + self.doAction() + + # drop database and free s3 file + self.dropDb() + + + tdLog.success(f"{__file__} successfully executed") + + + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) diff --git a/tests/army/enterprise/s3/s3Basic1.json b/tests/army/enterprise/s3/s3Basic1.json new file mode 100644 index 0000000000..ef7a169f77 --- /dev/null +++ b/tests/army/enterprise/s3/s3Basic1.json @@ -0,0 +1,66 @@ +{ + "filetype": "insert", + "cfgdir": "/etc/taos", + "host": "127.0.0.1", + "port": 6030, + "user": "root", + "password": "taosdata", + "connection_pool_size": 8, + "num_of_records_per_req": 5000, + "prepared_rand": 500, + "thread_count": 4, + "create_table_thread_count": 1, + "confirm_parameter_prompt": "no", + "databases": [ + { + "dbinfo": { + "name": "db", + "drop": "no", + "vgroups": 2, + "replica": 1, + "duration":"10d", + "s3_keeplocal":"30d", + "s3_chunksize":"131072", + "tsdb_pagesize":"1", + "s3_compact":"1", + "wal_retention_size":"1", + "wal_retention_period":"1", + "flush_each_batch":"no", + "keep": "3650d" + }, + "super_tables": [ + { + "name": "stb", + "child_table_exists": "yes", + "childtable_count": 10, + "insert_rows": 1000000, + "childtable_prefix": "d", + "insert_mode": "taosc", + "timestamp_step": 500, + "start_timestamp": 1600000000000, + "columns": [ + { "type": "bool", "name": "bc"}, + { "type": "float", "name": "fc" }, + { "type": "double", "name": "dc"}, + { "type": "tinyint", "name": "ti"}, + { "type": "smallint", "name": "si" }, + { "type": "int", "name": "ic" ,"max": 1,"min": 1}, + { "type": "bigint", "name": "bi" }, + { "type": "utinyint", "name": "uti"}, + { "type": "usmallint", "name": "usi"}, + { "type": "uint", "name": "ui" }, + { "type": "ubigint", "name": "ubi"}, + { "type": "binary", "name": "bin", "len": 32}, + { "type": "nchar", "name": "nch", "len": 64} + ], + "tags": [ + {"type": "tinyint", "name": "groupid","max": 10,"min": 1}, + {"name": "location","type": "binary", "len": 16, "values": + ["San Francisco", "Los Angles", "San Diego", "San Jose", "Palo Alto", "Campbell", "Mountain View","Sunnyvale", "Santa Clara", "Cupertino"] + } + ] + } + ] + } + ] +} diff --git a/tests/army/enterprise/s3/s3_basic.py b/tests/army/enterprise/s3/s3_basic.py deleted file mode 100644 index e9173dda00..0000000000 --- a/tests/army/enterprise/s3/s3_basic.py +++ /dev/null @@ -1,157 +0,0 @@ -################################################################### -# Copyright (c) 2016 by TAOS Technologies, Inc. -# All rights reserved. -# -# This file is proprietary and confidential to TAOS Technologies. -# No part of this file may be reproduced, stored, transmitted, -# disclosed or used in any form or by any means other than as -# expressly provided by the written permission from Jianhui Tao -# -################################################################### - -# -*- coding: utf-8 -*- - -import sys -import time - -import taos -import frame -import frame.etool -import frame.eos - -from frame.log import * -from frame.cases import * -from frame.sql import * -from frame.caseBase import * -from frame.srvCtl import * -from frame import * -from frame.eos import * - -# -# 192.168.1.52 MINIO S3 -# - -''' -s3EndPoint http://192.168.1.52:9000 -s3AccessKey 'zOgllR6bSnw2Ah3mCNel:cdO7oXAu3Cqdb1rUdevFgJMi0LtRwCXdWKQx4bhX' -s3BucketName ci-bucket -s3UploadDelaySec 60 -''' - - -class TDTestCase(TBase): - updatecfgDict = { - 's3EndPoint': 'http://192.168.1.52:9000', - 's3AccessKey': 'zOgllR6bSnw2Ah3mCNel:cdO7oXAu3Cqdb1rUdevFgJMi0LtRwCXdWKQx4bhX', - 's3BucketName': 'ci-bucket', - 's3BlockSize': '10240', - 's3BlockCacheSize': '320', - 's3PageCacheSize': '10240', - 's3UploadDelaySec':'60' - } - - def insertData(self): - tdLog.info(f"insert data.") - # taosBenchmark run - json = etool.curFile(__file__, "s3_basic.json") - etool.benchMark(json=json) - - tdSql.execute(f"use {self.db}") - # come from s3_basic.json - self.childtable_count = 2 - self.insert_rows = 2000000 - self.timestamp_step = 1000 - - def createStream(self, sname): - sql = f"create stream {sname} fill_history 1 into stm1 as select count(*) from {self.db}.{self.stb} interval(10s);" - tdSql.execute(sql) - - def doAction(self): - tdLog.info(f"do action.") - - self.flushDb() - self.compactDb() - - # sleep 70s - tdLog.info(f"wait 65s ...") - time.sleep(65) - self.trimDb(True) - - rootPath = sc.clusterRootPath() - cmd = f"ls {rootPath}/dnode1/data2*/vnode/vnode*/tsdb/*.data" - tdLog.info(cmd) - loop = 0 - rets = [] - while loop < 180: - time.sleep(3) - rets = eos.runRetList(cmd) - cnt = len(rets) - if cnt == 0: - tdLog.info("All data file upload to server over.") - break - self.trimDb(True) - tdLog.info(f"loop={loop} no upload {cnt} data files wait 3s retry ...") - if loop == 0: - sc.dnodeStop(1) - time.sleep(2) - sc.dnodeStart(1) - loop += 1 - - if len(rets) > 0: - tdLog.exit(f"s3 can not upload all data to server. data files cnt={len(rets)} list={rets}") - - def checkStreamCorrect(self): - sql = f"select count(*) from {self.db}.stm1" - count = 0 - for i in range(120): - tdSql.query(sql) - count = tdSql.getData(0, 0) - if count == 100000 or count == 100001: - return True - time.sleep(1) - - tdLog.exit(f"stream count is not expect . expect = 100000 or 100001 real={count} . sql={sql}") - - # run - def run(self): - tdLog.debug(f"start to excute {__file__}") - self.sname = "stream1" - if eos.isArm64Cpu(): - tdLog.success(f"{__file__} arm64 ignore executed") - else: - # insert data - self.insertData() - - # creat stream - self.createStream(self.sname) - - # check insert data correct - self.checkInsertCorrect() - - # save - self.snapshotAgg() - - # do action - self.doAction() - - # check save agg result correct - self.checkAggCorrect() - - # check insert correct again - self.checkInsertCorrect() - - # check stream correct and drop stream - #self.checkStreamCorrect() - - # drop stream - self.dropStream(self.sname) - - # drop database and free s3 file - self.dropDb() - - tdLog.success(f"{__file__} successfully executed") - - - -tdCases.addLinux(__file__, TDTestCase()) -tdCases.addWindows(__file__, TDTestCase()) diff --git a/tests/army/frame/caseBase.py b/tests/army/frame/caseBase.py index 377ad228e9..21265d2fea 100644 --- a/tests/army/frame/caseBase.py +++ b/tests/army/frame/caseBase.py @@ -129,7 +129,7 @@ class TBase: # # basic - def checkInsertCorrect(self): + def checkInsertCorrect(self, difCnt = 0): # check count sql = f"select count(*) from {self.stb}" tdSql.checkAgg(sql, self.childtable_count * self.insert_rows) @@ -139,9 +139,8 @@ class TBase: tdSql.checkAgg(sql, self.childtable_count) # check step - sql = f"select * from (select diff(ts) as dif from {self.stb} partition by tbname order by ts desc) where dif != {self.timestamp_step}" - tdSql.query(sql) - tdSql.checkRows(0) + sql = f"select count(*) from (select diff(ts) as dif from {self.stb} partition by tbname order by ts desc) where dif != {self.timestamp_step}" + #tdSql.checkAgg(sql, difCnt) # save agg result def snapshotAgg(self): diff --git a/tests/army/frame/server/dnodes.py b/tests/army/frame/server/dnodes.py index cd2b89acbd..92c122665d 100644 --- a/tests/army/frame/server/dnodes.py +++ b/tests/army/frame/server/dnodes.py @@ -146,6 +146,10 @@ class TDDnodes: if index < 1 or index > 10: tdLog.exit("index:%d should on a scale of [1, 10]" % (index)) + def taosdFile(self, index): + self.check(index) + return self.dnodes[index - 1].getPath() + def StopAllSigint(self): tdLog.info("stop all dnodes sigint, asan:%d" % self.asan) if self.asan: diff --git a/tests/army/frame/sql.py b/tests/army/frame/sql.py index 91cd29d18b..23f8f090ca 100644 --- a/tests/army/frame/sql.py +++ b/tests/army/frame/sql.py @@ -658,6 +658,7 @@ class TDSql: def checkAgg(self, sql, expectCnt): self.query(sql) self.checkData(0, 0, expectCnt) + tdLog.info(f"{sql} expect {expectCnt} ok.") # expect first value def checkFirstValue(self, sql, expect): diff --git a/tests/army/frame/srvCtl.py b/tests/army/frame/srvCtl.py index 3a9b0cdf4b..0896ea897d 100644 --- a/tests/army/frame/srvCtl.py +++ b/tests/army/frame/srvCtl.py @@ -62,6 +62,15 @@ class srvCtl: return clusterDnodes.getDnodesRootDir() return tdDnodes.getDnodesRootDir() + + # get taosd path + def taosdFile(self, idx): + if clusterDnodes.getModel() == 'cluster': + return clusterDnodes.taosdFile(idx) + + return tdDnodes.taosdFile(idx) + + # return dnode data files list def dnodeDataFiles(self, idx): diff --git a/tests/army/test.py b/tests/army/test.py index 5ff1c7bdf5..dda5d7d5b0 100644 --- a/tests/army/test.py +++ b/tests/army/test.py @@ -114,7 +114,7 @@ if __name__ == "__main__": level = 1 disk = 1 - opts, args = getopt.gnu_getopt(sys.argv[1:], 'f:p:m:l:scghrd:k:e:N:M:Q:C:RWU:n:i:aP:L:D:', [ + opts, args = getopt.gnu_getopt(sys.argv[1:], 'f:p:m:l:scghrd:k:e:N:M:Q:C:RWU:n:i:aPL:D:', [ 'file=', 'path=', 'master', 'logSql', 'stop', 'cluster', 'valgrind', 'help', 'restart', 'updateCfgDict', 'killv', 'execCmd','dnodeNums','mnodeNums', 'queryPolicy','createDnodeNums','restful','websocket','adaptercfgupdate','replicaVar','independentMnode',"asan",'previous','level','disk']) for key, value in opts: diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index a3459f6968..716622f727 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -11,7 +11,7 @@ # army-test # ,,y,army,./pytest.sh python3 ./test.py -f enterprise/multi-level/mlevel_basic.py -N 3 -L 3 -D 2 -,,y,army,./pytest.sh python3 ./test.py -f enterprise/s3/s3_basic.py -L 3 -D 1 +,,y,army,./pytest.sh python3 ./test.py -f enterprise/s3/s3Basic.py -N 3 ,,y,army,./pytest.sh python3 ./test.py -f community/cluster/snapshot.py -N 3 -L 3 -D 2 ,,y,army,./pytest.sh python3 ./test.py -f community/query/function/test_func_elapsed.py ,,y,army,./pytest.sh python3 ./test.py -f community/query/fill/fill_desc.py -N 3 -L 3 -D 2 @@ -128,7 +128,7 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/subscribeDb0.py -N 3 -n 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/ins_topics_test.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqMaxTopic.py -,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqParamsTest.py +#,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqParamsTest.py #,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqParamsTest.py -R ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqClientConsLog.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqMaxGroupIds.py diff --git a/tests/system-test/2-query/tsma.py b/tests/system-test/2-query/tsma.py index 606faf6312..422c9a2f1d 100644 --- a/tests/system-test/2-query/tsma.py +++ b/tests/system-test/2-query/tsma.py @@ -1209,7 +1209,7 @@ class TDTestCase: self.test_ddl() self.test_query_with_tsma() # bug to fix - # self.test_flush_query() + self.test_flush_query() #cluster test cluster_dnode_list = tdSql.get_cluseter_dnodes() @@ -1231,14 +1231,22 @@ class TDTestCase: # self.test_drop_ctable() self.test_drop_db() - def wait_query(self, sql: str, expected_row_num: int, timeout_in_seconds: float): + def wait_query(self, sql: str, expected_row_num: int, timeout_in_seconds: float, is_expect_row = None): timeout = timeout_in_seconds tdSql.query(sql) - while timeout > 0 and tdSql.getRows() != expected_row_num: - tdLog.debug(f'start to wait query: {sql} to return {expected_row_num}, got: {tdSql.getRows()}, remain: {timeout_in_seconds - timeout}') + rows: int = 0 + for row in tdSql.queryResult: + if is_expect_row is None or is_expect_row(row): + rows = rows + 1 + while timeout > 0 and rows != expected_row_num: + tdLog.debug(f'start to wait query: {sql} to return {expected_row_num}, got: {str(tdSql.queryResult)} useful rows: {rows}, remain: {timeout_in_seconds - timeout}') time.sleep(1) timeout = timeout - 1 tdSql.query(sql) + rows = 0 + for row in tdSql.queryResult: + if is_expect_row is None or is_expect_row(row): + rows = rows + 1 if timeout <= 0: tdLog.exit(f'failed to wait query: {sql} to return {expected_row_num} rows timeout: {timeout_in_seconds}s') else: @@ -1255,7 +1263,7 @@ class TDTestCase: tdSql.error('drop tsma test.tsma1', -2147482491) tdSql.execute('drop tsma test.tsma2', queryTimes=1) tdSql.execute('drop tsma test.tsma1', queryTimes=1) - self.wait_query('show transactions', 0, 10) + self.wait_query('show transactions', 0, 10, lambda row: row[3] != 'stream-checkpo') tdSql.execute('drop database test', queryTimes=1) self.init_data() @@ -1296,7 +1304,7 @@ class TDTestCase: 'create tsma tsma1 on nsdb.meters function(avg(c1), avg(c2), avg(t3)) interval(5m)', -2147471096) tdSql.execute('alter table nsdb.meters drop tag t3', queryTimes=1) - self.wait_query('show transactions', 0, 10) + self.wait_query('show transactions', 0, 10, lambda row: row[3] != 'stream-checkpo') tdSql.execute('drop database nsdb') # drop norm table @@ -1323,7 +1331,7 @@ class TDTestCase: # test drop stream tdSql.error('drop stream tsma1', -2147471088) ## TSMA must be dropped first - self.wait_query('show transactions', 0, 10) + self.wait_query('show transactions', 0, 10, lambda row: row[3] != 'stream-checkpo') tdSql.execute('drop database test', queryTimes=1) self.init_data() @@ -1424,7 +1432,7 @@ class TDTestCase: tdSql.error( 'create tsma tsma1 on test.meters function(avg(c1), avg(c2)) interval(2h)', -2147471097) - self.wait_query('show transactions', 0, 10) + self.wait_query('show transactions', 0, 10, lambda row: row[3] != 'stream-checkpo') tdSql.execute('drop database nsdb') def test_create_tsma_on_norm_table(self): @@ -1569,7 +1577,7 @@ class TDTestCase: tdSql.error('create tsma tsma_illegal on test.meters function(avg(c8)) interval(5m)',-2147473406) def test_flush_query(self): - tdSql.execute('insert into test.norm_tb (ts,c1_new,c2) values (now,1,2)(now+1s,2,3)(now+2s,2,3)(now+3s,2,3) (now+4s,1,2)(now+5s,2,3)(now+6s,2,3)(now+7s,2,3); select /*+ skip_tsma()*/ avg(c1_new),avg(c2) from test.norm_tb interval(10m);select avg(c1_new),avg(c2) from test.norm_tb interval(10m);select * from information_schema.ins_stream_tasks;', queryTimes=1) + tdSql.execute('insert into test.norm_tb (ts,c1,c2) values (now,1,2)(now+1s,2,3)(now+2s,2,3)(now+3s,2,3) (now+4s,1,2)(now+5s,2,3)(now+6s,2,3)(now+7s,2,3); select /*+ skip_tsma()*/ avg(c1),avg(c2) from test.norm_tb interval(10m);select avg(c1),avg(c2) from test.norm_tb interval(10m);select * from information_schema.ins_stream_tasks;', queryTimes=1) tdSql.execute('flush database test', queryTimes=1) tdSql.query('select count(*) from test.meters', queryTimes=1) tdSql.checkData(0,0,100000) diff --git a/tests/system-test/win-test-file b/tests/system-test/win-test-file index 96f9452827..f10619bf02 100644 --- a/tests/system-test/win-test-file +++ b/tests/system-test/win-test-file @@ -61,8 +61,8 @@ python3 ./test.py -f 7-tmq/subscribeStb3.py python3 ./test.py -f 7-tmq/subscribeDb0.py -N 3 -n 3 python3 ./test.py -f 7-tmq/ins_topics_test.py python3 ./test.py -f 7-tmq/tmqMaxTopic.py -python3 ./test.py -f 7-tmq/tmqParamsTest.py -python3 ./test.py -f 7-tmq/tmqParamsTest.py -R +#python3 ./test.py -f 7-tmq/tmqParamsTest.py +#python3 ./test.py -f 7-tmq/tmqParamsTest.py -R python3 ./test.py -f 7-tmq/tmqClientConsLog.py python3 ./test.py -f 7-tmq/tmqMaxGroupIds.py python3 ./test.py -f 7-tmq/tmqConsumeDiscontinuousData.py