diff --git a/Jenkinsfile2 b/Jenkinsfile2 index d90bc3061b..a3006c4f7d 100644 --- a/Jenkinsfile2 +++ b/Jenkinsfile2 @@ -34,16 +34,6 @@ def abort_previous(){ } def pre_test(){ sh 'hostname' - sh ''' - date - sudo rmtaos || echo "taosd has not installed" - ''' - sh ''' - killall -9 taosd ||echo "no taosd running" - killall -9 gdb || echo "no gdb running" - killall -9 python3.8 || echo "no python program running" - cd ${WKC} - ''' script { if (env.CHANGE_TARGET == 'master') { sh ''' @@ -81,10 +71,10 @@ def pre_test(){ git pull >/dev/null git fetch origin +refs/pull/${CHANGE_ID}/merge git checkout -qf FETCH_HEAD - git log|head -n20 + git log -5 cd ${WK} git pull >/dev/null - git log|head -n20 + git log -5 ''' } else if (env.CHANGE_URL =~ /\/TDinternal\//) { sh ''' @@ -92,10 +82,10 @@ def pre_test(){ git pull >/dev/null git fetch origin +refs/pull/${CHANGE_ID}/merge git checkout -qf FETCH_HEAD - git log|head -n20 + git log -5 cd ${WKC} git pull >/dev/null - git log|head -n20 + git log -5 ''' } else { sh ''' @@ -106,21 +96,10 @@ def pre_test(){ cd ${WKC} git submodule update --init --recursive ''' - sh ''' - cd ${WK} - export TZ=Asia/Harbin - date - rm -rf debug - mkdir debug - cd debug - cmake .. > /dev/null - make -j4> /dev/null - ''' sh ''' cd ${WKPY} git reset --hard git pull - pip3 install . ''' return 1 } @@ -131,12 +110,14 @@ def pre_test_win(){ time /t taskkill /f /t /im python.exe taskkill /f /t /im bash.exe - cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDengine - rd /s /Q C:\\workspace\\%EXECUTOR_NUMBER%\\TDengine\\debug + rd /s /Q C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal\\debug exit 0 ''' bat ''' - cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDengine + cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal + git reset --hard + git fetch || git fetch + cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal\\community git reset --hard git fetch || git fetch git checkout -f @@ -144,39 +125,73 @@ def pre_test_win(){ script { if (env.CHANGE_TARGET == 'master') { bat ''' - cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDengine + cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal + git checkout master + cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal\\community git checkout master ''' } else if(env.CHANGE_TARGET == '2.0') { bat ''' - cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDengine + cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal + git checkout 2.0 + cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal\\community git checkout 2.0 ''' } else if(env.CHANGE_TARGET == '3.0') { bat ''' - cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDengine + cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal + git checkout 3.0 + cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal\\community git checkout 3.0 ''' } else { bat ''' - cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDengine + cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal + git checkout develop + cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal\\community git checkout develop ''' } } + script { + if (env.CHANGE_URL =~ /\/TDengine\//) { + bat ''' + cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal + git pull + git log -5 + cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal\\community + git pull + git fetch origin +refs/pull/${CHANGE_ID}/merge + git checkout -qf FETCH_HEAD + git log -5 + ''' + } else if (env.CHANGE_URL =~ /\/TDinternal\//) { + bat ''' + cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal + git pull + git fetch origin +refs/pull/${CHANGE_ID}/merge + git checkout -qf FETCH_HEAD + git log -5 + cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal\\community + git pull + git log -5 + ''' + } else { + sh ''' + echo "unmatched reposiotry ${CHANGE_URL}" + ''' + } + } bat ''' - cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDengine - git branch - git pull || git pull - git fetch origin +refs/pull/%CHANGE_ID%/merge - git checkout -qf FETCH_HEAD + cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal\\community + git submodule update --init --recursive ''' } def pre_test_build_win() { bat ''' echo "building ..." time /t - cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDengine + cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal mkdir debug cd debug call "C:\\Program Files (x86)\\Microsoft Visual Studio\\2017\\Community\\VC\\Auxiliary\\Build\\vcvarsall.bat" x64 @@ -192,6 +207,7 @@ pipeline { agent none options { skipDefaultCheckout() } environment{ + WKDIR = '/var/lib/jenkins/workspace' WK = '/var/lib/jenkins/workspace/TDinternal' WKC = '/var/lib/jenkins/workspace/TDinternal/community' WKPY = '/var/lib/jenkins/workspace/taos-connector-python' @@ -206,39 +222,22 @@ pipeline { changeRequest() } steps { - timeout(time: 45, unit: 'MINUTES'){ + timeout(time: 20, unit: 'MINUTES'){ pre_test() script { - if (env.CHANGE_URL =~ /\/TDengine\//) { - sh ''' - cd ${WK}/debug - ctest -VV - ''' - sh ''' - export LD_LIBRARY_PATH=${WK}/debug/build/lib - cd ${WKC}/tests/system-test - ./fulltest.sh - ''' - } else if (env.CHANGE_URL =~ /\/TDinternal\//) { - sh ''' - cd ${WKC}/debug - ctest -VV - ''' - sh ''' - export LD_LIBRARY_PATH=${WKC}/debug/build/lib - cd ${WKC}/tests/system-test - ./fulltest.sh - ''' - } else { - sh ''' - echo "unmatched reposiotry ${CHANGE_URL}" - ''' - } + sh ''' + cd ${WKC}/tests/parallel_test + date + time ./container_build.sh -w ${WKDIR} -t 8 -e + rm -f /tmp/cases.task + ./collect_cases.sh -e + ''' + sh ''' + cd ${WKC}/tests/parallel_test + date + time ./run.sh -e -m /home/m.json -t /tmp/cases.task -b ${CHANGE_TARGET} -l ${WKDIR}/log + ''' } - sh ''' - cd ${WKC}/tests - ./test-all.sh b1fq - ''' } } } diff --git a/include/common/tcommon.h b/include/common/tcommon.h index 0c4e534734..80125d6788 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -48,6 +48,7 @@ enum { typedef enum EStreamType { STREAM_NORMAL = 1, STREAM_INVERT, + STREAM_REPROCESS, STREAM_INVALID, } EStreamType; diff --git a/include/libs/function/function.h b/include/libs/function/function.h index 6141829a3f..8d0b93dde2 100644 --- a/include/libs/function/function.h +++ b/include/libs/function/function.h @@ -173,6 +173,7 @@ typedef struct SqlFunctionCtx { SInputColumnInfoData input; SResultDataInfo resDataInfo; uint32_t order; // data block scanner order: asc|desc + uint8_t scanFlag; // record current running step, default: 0 //////////////////////////////////////////////////////////////// int32_t startRow; // start row index int32_t size; // handled processed row number @@ -183,7 +184,6 @@ typedef struct SqlFunctionCtx { bool hasNull; // null value exist in current block, TODO remove it bool requireNull; // require null in some function, TODO remove it int32_t columnIndex; // TODO remove it - uint8_t currentStage; // record current running step, default: 0 bool isAggSet; int64_t startTs; // timestamp range of current query when function is executed on a specific data block, TODO remove it bool stableQuery; diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index d0d10b2761..7ca4ca9172 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -193,7 +193,6 @@ typedef struct SScanPhysiNode { } SScanPhysiNode; typedef SScanPhysiNode STagScanPhysiNode; -typedef SScanPhysiNode SStreamScanPhysiNode; typedef struct SSystemTableScanPhysiNode { SScanPhysiNode scan; @@ -217,6 +216,7 @@ typedef struct STableScanPhysiNode { } STableScanPhysiNode; typedef STableScanPhysiNode STableSeqScanPhysiNode; +typedef STableScanPhysiNode SStreamScanPhysiNode; typedef struct SProjectPhysiNode { SPhysiNode node; diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 9053101938..43dcf2dfa9 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1447,6 +1447,10 @@ void blockDebugShowData(const SArray* dataBlocks) { for (int32_t k = 0; k < colNum; k++) { SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, k); void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes); + if (pColInfoData->hasNull) { + printf(" %15s |", "NULL"); + continue; + } switch (pColInfoData->info.type) { case TSDB_DATA_TYPE_TIMESTAMP: formatTimestamp(pBuf, *(uint64_t*)var, TSDB_TIME_PRECISION_MILLI); @@ -1464,6 +1468,9 @@ void blockDebugShowData(const SArray* dataBlocks) { case TSDB_DATA_TYPE_UBIGINT: printf(" %15lu |", *(uint64_t*)var); break; + case TSDB_DATA_TYPE_DOUBLE: + printf(" %15f |", *(double*)var); + break; } } printf("\n"); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 049cd74784..28cdb39bd5 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -950,6 +950,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int32_t parallel) { .reader = pStreamReader, .meta = pTq->pVnode->pMeta, .pMsgCb = &pTq->pVnode->msgCb, + .vnode = pTq->pVnode, }; pTask->exec.runners[i].inputHandle = pStreamReader; pTask->exec.runners[i].executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle); diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index ba18d30a52..34b7fce33c 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -371,9 +371,18 @@ typedef struct STagScanInfo { STableGroupInfo *pTableGroups; } STagScanInfo; +typedef enum EStreamScanMode { + STREAM_SCAN_FROM_READERHANDLE = 1, + STREAM_SCAN_FROM_RES, + STREAM_SCAN_FROM_UPDATERES, + STREAM_SCAN_FROM_DATAREADER, +} EStreamScanMode; + typedef struct SStreamBlockScanInfo { SArray* pBlockLists; // multiple SSDatablock. SSDataBlock* pRes; // result SSDataBlock + SSDataBlock* pUpdateRes; // update SSDataBlock + int32_t updateResIndex; int32_t blockType; // current block type int32_t validBlockIndex; // Is current data has returned? SColumnInfo* pCols; // the output column info @@ -383,8 +392,12 @@ typedef struct SStreamBlockScanInfo { SArray* pColMatchInfo; // SNode* pCondition; SArray* tsArray; - SUpdateInfo* pUpdateInfo; + SUpdateInfo* pUpdateInfo; int32_t primaryTsIndex; // primary time stamp slot id + void* pDataReader; + EStreamScanMode scanMode; + SOperatorInfo* pOperatorDumy; + SInterval interval; // if the upstream is an interval operator, the interval info is also kept here. } SStreamBlockScanInfo; typedef struct SSysTableScanInfo { @@ -645,6 +658,7 @@ void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWin void cleanupAggSup(SAggSupporter* pAggSup); void destroyBasicOperatorInfo(void* param, int32_t numOfOutput); void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle); +SInterval extractIntervalInfo(const STableScanPhysiNode* pTableScanNode); SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity, SArray* pColMatchInfo); SSDataBlock* loadNextDataBlock(void* param); @@ -690,8 +704,9 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx SExprInfo* pScalarExprInfo, int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock* pResBlock, SArray* pColList, - SArray* pTableIdList, SExecTaskInfo* pTaskInfo, SNode* pConditions); +SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, void* pDataReader, SSDataBlock* pResBlock, + SArray* pColList, SArray* pTableIdList, SExecTaskInfo* pTaskInfo, + SNode* pConditions, SOperatorInfo* pOperatorDumy, SInterval* pInterval); SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SInterval* pInterval, STimeWindow* pWindow, SSDataBlock* pResBlock, int32_t fillType, SNodeListNode* fillVal, @@ -745,6 +760,15 @@ bool aggDecodeResultRow(SOperatorInfo* pOperator, SAggSupporter* pSup, SOptrBasi int32_t length); void aggEncodeResultRow(SOperatorInfo* pOperator, SAggSupporter* pSup, SOptrBasicInfo* pInfo, char** result, int32_t* length); +STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int64_t ts, + SInterval* pInterval, int32_t precision, STimeWindow* win); +int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimaryColumn, int32_t startPos, + TSKEY ekey, __block_search_fn_t searchFn, STableQueryInfo* item, + int32_t order); +int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order); + +void doClearWindow(SIntervalAggOperatorInfo* pInfo, char* pData, int16_t bytes, + uint64_t groupId, int32_t numOfOutput); #ifdef __cplusplus } diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 7e9ca1c52a..8db5a282d3 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -344,6 +344,28 @@ SResultRow* getNewResultRow_rv(SDiskbasedBuf* pResultBuf, int64_t tableGroupId, return pResultRow; } +void doClearWindow(SIntervalAggOperatorInfo* pInfo, char* pData, int16_t bytes, + uint64_t groupId, int32_t numOfOutput) { + SAggSupporter* pSup = &pInfo->aggSup; + SET_RES_WINDOW_KEY(pSup->keyBuf, pData, bytes, groupId); + SResultRowPosition* p1 = + (SResultRowPosition*)taosHashGet(pSup->pResultRowHashTable, pSup->keyBuf, + GET_RES_WINDOW_KEY_LEN(bytes)); + SResultRow* pResult = getResultRowByPos(pSup->pResultBuf, p1); + SqlFunctionCtx* pCtx = pInfo->binfo.pCtx; + for (int32_t i = 0; i < numOfOutput; ++i) { + pCtx[i].resultInfo = getResultCell(pResult, i, pInfo->binfo.rowCellInfoOffset); + struct SResultRowEntryInfo* pResInfo = pCtx[i].resultInfo; + if (fmIsWindowPseudoColumnFunc(pCtx[i].functionId)) { + continue; + } + pResInfo->initialized = false; + if (pCtx[i].functionId != -1) { + pCtx[i].fpSet.init(&pCtx[i], pResInfo); + } + } +} + /** * the struct of key in hash table * +----------+---------------+ @@ -724,7 +746,7 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt pCtx[i].order = order; pCtx[i].size = pBlock->info.rows; pCtx[i].pSrcBlock = pBlock; - pCtx[i].currentStage = scanFlag; + pCtx[i].scanFlag = scanFlag; SInputColumnInfoData* pInput = &pCtx[i].input; pInput->uid = pBlock->info.uid; @@ -804,23 +826,22 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt return code; } -static void doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SqlFunctionCtx* pCtx) { +static int32_t doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SqlFunctionCtx* pCtx) { for (int32_t k = 0; k < pOperator->numOfExprs; ++k) { if (functionNeedToExecute(&pCtx[k])) { pCtx[k].startTs = startTs; - // this can be set during create the struct // todo add a dummy funtion to avoid process check if (pCtx[k].fpSet.process != NULL) { int32_t code = pCtx[k].fpSet.process(&pCtx[k]); if (code != TSDB_CODE_SUCCESS) { - qError("%s call aggregate function error happens, code : %s", - GET_TASKID(pOperator->pTaskInfo), tstrerror(code)); - pOperator->pTaskInfo->code = code; - longjmp(pOperator->pTaskInfo->env, code); + qError("%s aggregate function error happens, code: %s", GET_TASKID(pOperator->pTaskInfo), tstrerror(code)); + return code; } } } } + + return TSDB_CODE_SUCCESS; } static void setPseudoOutputColInfo(SSDataBlock* pResult, SqlFunctionCtx* pCtx, SArray* pPseudoList) { @@ -976,18 +997,22 @@ static bool functionNeedToExecute(SqlFunctionCtx* pCtx) { return false; } + if (pCtx->scanFlag == REPEAT_SCAN) { + return fmIsRepeatScanFunc(pCtx->functionId); + } + if (isRowEntryCompleted(pResInfo)) { return false; } - if (functionId == FUNCTION_FIRST_DST || functionId == FUNCTION_FIRST) { - // return QUERY_IS_ASC_QUERY(pQueryAttr); - } - - // denote the order type - if ((functionId == FUNCTION_LAST_DST || functionId == FUNCTION_LAST)) { - // return pCtx->param[0].i == pQueryAttr->order.order; - } +// if (functionId == FUNCTION_FIRST_DST || functionId == FUNCTION_FIRST) { +// // return QUERY_IS_ASC_QUERY(pQueryAttr); +// } +// +// // denote the order type +// if ((functionId == FUNCTION_LAST_DST || functionId == FUNCTION_LAST)) { +// // return pCtx->param[0].i == pQueryAttr->order.order; +// } // in the reverse table scan, only the following functions need to be executed // if (IS_REVERSE_SCAN(pRuntimeEnv) || @@ -1922,7 +1947,7 @@ void setFunctionResultOutput(SOptrBasicInfo* pInfo, SAggSupporter* pSup, int32_t cleanupResultRowEntry(pEntry); pCtx[i].resultInfo = pEntry; - pCtx[i].currentStage = stage; + pCtx[i].scanFlag = stage; // set the timestamp output buffer for top/bottom/diff query // int32_t fid = pCtx[i].functionId; @@ -3702,7 +3727,6 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { SAggOperatorInfo* pAggInfo = pOperator->info; SOptrBasicInfo* pInfo = &pAggInfo->binfo; - SOperatorInfo* downstream = pOperator->pDownstream[0]; int32_t order = TSDB_ORDER_ASC; @@ -3716,9 +3740,6 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { if (pBlock == NULL) { break; } - // if (pAggInfo->current != NULL) { - // setTagValue(pOperator, pAggInfo->current->pTable, pInfo->pCtx, pOperator->numOfExprs); - // } int32_t code = getTableScanInfo(pOperator, &order, &scanFlag); if (code != TSDB_CODE_SUCCESS) { @@ -3728,17 +3749,19 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { // there is an scalar expression that needs to be calculated before apply the group aggregation. if (pAggInfo->pScalarExprInfo != NULL) { code = projectApplyFunctions(pAggInfo->pScalarExprInfo, pBlock, pBlock, pAggInfo->pScalarCtx, - pAggInfo->numOfScalarExpr, NULL); + pAggInfo->numOfScalarExpr, NULL); if (code != TSDB_CODE_SUCCESS) { - pTaskInfo->code = code; - longjmp(pTaskInfo->env, pTaskInfo->code); + longjmp(pTaskInfo->env, code); } } // the pDataBlock are always the same one, no need to call this again setExecutionContext(pOperator->numOfExprs, pBlock->info.groupId, pTaskInfo, pAggInfo); setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order, scanFlag, true); - doAggregateImpl(pOperator, 0, pInfo->pCtx); + code = doAggregateImpl(pOperator, 0, pInfo->pCtx); + if (code != 0) { + longjmp(pTaskInfo->env, code); + } #if 0 // test for encode/decode result info if(pOperator->encodeResultRow){ @@ -4581,7 +4604,7 @@ static SResSchema createResSchema(int32_t type, int32_t bytes, int32_t slotId, i return s; } -static SColumn* createColumn(int32_t blockId, int32_t slotId, SDataType* pType) { +static SColumn* createColumn(int32_t blockId, int32_t slotId, int32_t colId, SDataType* pType) { SColumn* pCol = taosMemoryCalloc(1, sizeof(SColumn)); if (pCol == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -4589,9 +4612,10 @@ static SColumn* createColumn(int32_t blockId, int32_t slotId, SDataType* pType) } pCol->slotId = slotId; - pCol->bytes = pType->bytes; - pCol->type = pType->type; - pCol->scale = pType->scale; + pCol->colId = colId; + pCol->bytes = pType->bytes; + pCol->type = pType->type; + pCol->scale = pType->scale; pCol->precision = pType->precision; pCol->dataBlockId = blockId; @@ -4634,7 +4658,7 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t* SDataType* pType = &pColNode->node.resType; pExp->base.resSchema = createResSchema(pType->type, pType->bytes, pTargetNode->slotId, pType->scale, pType->precision, pColNode->colName); - pExp->base.pParam[0].pCol = createColumn(pColNode->dataBlockId, pColNode->slotId, pType); + pExp->base.pParam[0].pCol = createColumn(pColNode->dataBlockId, pColNode->slotId, pColNode->colId, pType); pExp->base.pParam[0].type = FUNC_PARAM_TYPE_COLUMN; } else if (type == QUERY_NODE_VALUE) { pExp->pExpr->nodeType = QUERY_NODE_VALUE; @@ -4686,7 +4710,7 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t* SColumnNode* pcn = (SColumnNode*)p1; pExp->base.pParam[j].type = FUNC_PARAM_TYPE_COLUMN; - pExp->base.pParam[j].pCol = createColumn(pcn->dataBlockId, pcn->slotId, &pcn->node.resType); + pExp->base.pParam[j].pCol = createColumn(pcn->dataBlockId, pcn->slotId, pcn->colId, &pcn->node.resType); } else if (p1->type == QUERY_NODE_VALUE) { SValueNode* pvn = (SValueNode*)p1; pExp->base.pParam[j].type = FUNC_PARAM_TYPE_VALUE; @@ -4764,18 +4788,45 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo return createExchangeOperatorInfo(pHandle->pMsgCb->clientRpc, pExchange->pSrcEndPoints, pResBlock, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) { SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode; // simple child table. + STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode; - int32_t code = doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo, - queryId, taskId); - SArray* tableIdList = extractTableIdList(pTableGroupInfo); + int32_t numOfCols = 0; + + tsdbReaderT pDataReader = NULL; + if (pHandle->vnode) { + pDataReader = doCreateDataReader(pTableScanNode, pHandle, pTableGroupInfo, (uint64_t)queryId, taskId); + } else { + doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo, + queryId, taskId); + } + + if (pDataReader == NULL && terrno != 0) { + qDebug("pDataReader is NULL"); + // return NULL; + } else { + qDebug("pDataReader is not NULL"); + } SDataBlockDescNode* pDescNode = pScanPhyNode->node.pOutputDataBlockDesc; + + SArray* pColList = extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID); + SSDataBlock* pResBlockDumy = createResDataBlock(pDescNode); + + SQueryTableDataCond cond = {0}; + int32_t code = initQueryTableDataCond(&cond, pTableScanNode); + if (code != TSDB_CODE_SUCCESS) { + return NULL; + } + + SInterval interval = extractIntervalInfo(pTableScanNode); + SOperatorInfo* pOperatorDumy = createTableScanOperatorInfo(pTableScanNode, pDataReader, pHandle, pTaskInfo); + + SArray* tableIdList = extractTableIdList(pTableGroupInfo); SSDataBlock* pResBlock = createResDataBlock(pDescNode); - int32_t numOfCols = 0; SArray* pCols = extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID); - SOperatorInfo* pOperator = createStreamScanOperatorInfo(pHandle->reader, pResBlock, pCols, tableIdList, pTaskInfo, - pScanPhyNode->node.pConditions); + SOperatorInfo* pOperator = createStreamScanOperatorInfo(pHandle->reader, pDataReader, pResBlock, pCols, tableIdList, pTaskInfo, + pScanPhyNode->node.pConditions, pOperatorDumy, &interval); taosArrayDestroy(tableIdList); return pOperator; } else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index b39714c121..e0758a7210 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -13,8 +13,8 @@ * along with this program. If not, see . */ -#include "filter.h" #include "function.h" +#include "filter.h" #include "functionMgt.h" #include "os.h" #include "querynodes.h" @@ -260,6 +260,53 @@ static void prepareForDescendingScan(STableScanInfo* pTableScanInfo, SqlFunction pTableScanInfo->cond.order = TSDB_ORDER_DESC; } +static void addTagPseudoColumnData(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock) { + // currently only the tbname pseudo column + if (pTableScanInfo->numOfPseudoExpr == 0) { + return; + } + + SMetaReader mr = {0}; + metaReaderInit(&mr, pTableScanInfo->readHandle.meta, 0); + metaGetTableEntryByUid(&mr, pBlock->info.uid); + + for (int32_t j = 0; j < pTableScanInfo->numOfPseudoExpr; ++j) { + SExprInfo* pExpr = &pTableScanInfo->pPseudoExpr[j]; + + int32_t dstSlotId = pExpr->base.resSchema.slotId; + + SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotId); + colInfoDataEnsureCapacity(pColInfoData, 0, pBlock->info.rows); + + int32_t functionId = pExpr->pExpr->_function.functionId; + + // this is to handle the tbname + if (fmIsScanPseudoColumnFunc(functionId)) { + struct SScalarFuncExecFuncs fpSet = {0}; + fmGetScalarFuncExecFuncs(functionId, &fpSet); + + SColumnInfoData infoData = {0}; + infoData.info.type = TSDB_DATA_TYPE_BIGINT; + infoData.info.bytes = sizeof(uint64_t); + colInfoDataEnsureCapacity(&infoData, 0, 1); + + colDataAppendInt64(&infoData, 0, &pBlock->info.uid); + SScalarParam srcParam = { + .numOfRows = pBlock->info.rows, .param = pTableScanInfo->readHandle.meta, .columnData = &infoData}; + + SScalarParam param = {.columnData = pColInfoData}; + fpSet.process(&srcParam, 1, ¶m); + } else { // these are tags + const char* p = metaGetTableTagVal(&mr.me, pExpr->base.pParam[0].pCol->colId); + for (int32_t i = 0; i < pBlock->info.rows; ++i) { + colDataAppend(pColInfoData, i, p, (p == NULL)); + } + } + } + + metaReaderClear(&mr); +} + static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) { STableScanInfo* pTableScanInfo = pOperator->info; SSDataBlock* pBlock = pTableScanInfo->pResBlock; @@ -285,23 +332,7 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) { // currently only the tbname pseudo column if (pTableScanInfo->numOfPseudoExpr > 0) { - int32_t dstSlotId = pTableScanInfo->pPseudoExpr->base.resSchema.slotId; - SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotId); - colInfoDataEnsureCapacity(pColInfoData, 0, pBlock->info.rows); - - struct SScalarFuncExecFuncs fpSet; - fmGetScalarFuncExecFuncs(pTableScanInfo->pPseudoExpr->pExpr->_function.functionId, &fpSet); - - SColumnInfoData infoData = {0}; - infoData.info.type = TSDB_DATA_TYPE_BIGINT; - infoData.info.bytes = sizeof(uint64_t); - colInfoDataEnsureCapacity(&infoData, 0, 1); - - colDataAppendInt64(&infoData, 0, &pBlock->info.uid); - SScalarParam srcParam = {.numOfRows = pBlock->info.rows, .param = pTableScanInfo->readHandle.meta, .columnData = &infoData}; - - SScalarParam param = {.columnData = pColInfoData}; - fpSet.process(&srcParam, 1, ¶m); + addTagPseudoColumnData(pTableScanInfo, pBlock); } return pBlock; @@ -568,7 +599,40 @@ static void doClearBufferedBlocks(SStreamBlockScanInfo* pInfo) { taosArrayClear(pInfo->pBlockLists); } -static SSDataBlock* getUpdateDataBlock(SStreamBlockScanInfo* pInfo) { +static bool prepareDataScan(SStreamBlockScanInfo* pInfo) { + SSDataBlock* pSDB = pInfo->pUpdateRes; + if (pInfo->updateResIndex < pSDB->info.rows) { + SColumnInfoData* pColDataInfo = taosArrayGet(pSDB->pDataBlock, 0); + TSKEY *tsCols = (TSKEY*)pColDataInfo->pData; + SResultRowInfo dumyInfo; + dumyInfo.cur.pageId = -1; + STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, tsCols[pInfo->updateResIndex], &pInfo->interval, + pInfo->interval.precision, NULL); + STableScanInfo* pTableScanInfo = pInfo->pOperatorDumy->info; + pTableScanInfo->cond.twindow = win; + tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond); + pInfo->updateResIndex += getNumOfRowsInTimeWindow(&pSDB->info, tsCols, pInfo->updateResIndex, + win.ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC); + pTableScanInfo->scanTimes = 0; + return true; + } else { + return false; + } +} + +static SSDataBlock* doDataScan(SStreamBlockScanInfo* pInfo) { + SSDataBlock* pResult = NULL; + pResult = doTableScan(pInfo->pOperatorDumy); + if (pResult == NULL) { + if (prepareDataScan(pInfo)) { + // scan next window data + pResult = doTableScan(pInfo->pOperatorDumy); + } + } + return pResult; +} + +static SSDataBlock* getUpdateDataBlock(SStreamBlockScanInfo* pInfo, bool invertible) { SColumnInfoData* pColDataInfo = taosArrayGet(pInfo->pRes->pDataBlock, pInfo->primaryTsIndex); TSKEY* ts = (TSKEY*)pColDataInfo->pData; for (int32_t i = 0; i < pInfo->pRes->info.rows; i++) { @@ -576,13 +640,19 @@ static SSDataBlock* getUpdateDataBlock(SStreamBlockScanInfo* pInfo) { taosArrayPush(pInfo->tsArray, ts + i); } } - if (taosArrayGetSize(pInfo->tsArray) > 0) { + int32_t size = taosArrayGetSize(pInfo->tsArray); + if (size > 0 && invertible) { // TODO(liuyao) get from tsdb // SSDataBlock* p = createOneDataBlock(pInfo->pRes, true); // p->info.type = STREAM_INVERT; // taosArrayClear(pInfo->tsArray); // return p; - return NULL; + SSDataBlock* p = createOneDataBlock(pInfo->pRes, false); + taosArraySet(p->pDataBlock, 0, pInfo->tsArray); + p->info.rows = size; + p->info.type = STREAM_REPROCESS; + taosArrayClear(pInfo->tsArray); + return p; } return NULL; } @@ -609,14 +679,23 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) { int32_t current = pInfo->validBlockIndex++; return taosArrayGetP(pInfo->pBlockLists, current); } else { - if (total > 0) { - ASSERT(total == 2); - SSDataBlock* pRes = taosArrayGetP(pInfo->pBlockLists, 0); - SSDataBlock* pUpRes = taosArrayGetP(pInfo->pBlockLists, 1); - blockDataDestroy(pUpRes); - taosArrayClear(pInfo->pBlockLists); - return pRes; + if (pInfo->scanMode == STREAM_SCAN_FROM_RES) { + blockDataDestroy(pInfo->pUpdateRes); + pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE; + return pInfo->pRes; + } else if (pInfo->scanMode == STREAM_SCAN_FROM_UPDATERES) { + blockDataCleanup(pInfo->pRes); + pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER; + return pInfo->pUpdateRes; + } else if (pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER) { + SSDataBlock* pSDB = doDataScan(pInfo); + if (pSDB == NULL) { + pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE; + } else { + return pSDB; + } } + SDataBlockInfo* pBlockInfo = &pInfo->pRes->info; blockDataCleanup(pInfo->pRes); @@ -682,12 +761,18 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) { if (rows == 0) { pOperator->status = OP_EXEC_DONE; - } else { - SSDataBlock* upRes = getUpdateDataBlock(pInfo); + } else if (pInfo->interval.interval > 0) { + SSDataBlock* upRes = getUpdateDataBlock(pInfo, true); //TODO(liuyao) get invertible from plan if (upRes) { - taosArrayPush(pInfo->pBlockLists, &(pInfo->pRes)); - taosArrayPush(pInfo->pBlockLists, &upRes); - return upRes; + pInfo->pUpdateRes = upRes; + if (upRes->info.type = STREAM_REPROCESS) { + pInfo->updateResIndex = 0; + prepareDataScan(pInfo); + pInfo->scanMode = STREAM_SCAN_FROM_UPDATERES; + } else if (upRes->info.type = STREAM_INVERT) { + pInfo->scanMode = STREAM_SCAN_FROM_RES; + return upRes; + } } } @@ -695,8 +780,10 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) { } } -SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock* pResBlock, SArray* pColList, - SArray* pTableIdList, SExecTaskInfo* pTaskInfo, SNode* pCondition) { +SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, void* pDataReader, + SSDataBlock* pResBlock, SArray* pColList, SArray* pTableIdList, + SExecTaskInfo* pTaskInfo, SNode* pCondition, SOperatorInfo* pOperatorDumy, + SInterval* pInterval) { SStreamBlockScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamBlockScanInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { @@ -736,7 +823,7 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock* } pInfo->primaryTsIndex = 0; // TODO(liuyao) get it from physical plan - pInfo->pUpdateInfo = updateInfoInit(60000, 0, 100); // TODO(liuyao) get it from physical plan + pInfo->pUpdateInfo = updateInfoInitP(pInterval, 10000); // TODO(liuyao) get watermark from physical plan if (pInfo->pUpdateInfo == NULL) { taosMemoryFreeClear(pInfo); taosMemoryFreeClear(pOperator); @@ -746,6 +833,10 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock* pInfo->readerHandle = streamReadHandle; pInfo->pRes = pResBlock; pInfo->pCondition = pCondition; + pInfo->pDataReader = pDataReader; + pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE; + pInfo->pOperatorDumy = pOperatorDumy; + pInfo->interval = *pInterval; pOperator->name = "StreamBlockScanOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN; @@ -1348,35 +1439,33 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) { char str[512] = {0}; int32_t count = 0; SMetaReader mr = {0}; + metaReaderInit(&mr, pInfo->readHandle.meta, 0); while (pInfo->curPos < pInfo->pTableGroups->numOfTables && count < pOperator->resultInfo.capacity) { STableKeyInfo* item = taosArrayGet(pa, pInfo->curPos); + metaGetTableEntryByUid(&mr, item->uid); for (int32_t j = 0; j < pOperator->numOfExprs; ++j) { SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, pExprInfo[j].base.resSchema.slotId); // refactor later if (fmIsScanPseudoColumnFunc(pExprInfo[j].pExpr->_function.functionId)) { - metaReaderInit(&mr, pInfo->readHandle.meta, 0); - metaGetTableEntryByUid(&mr, item->uid); - STR_TO_VARSTR(str, mr.me.name); - metaReaderClear(&mr); - colDataAppend(pDst, count, str, false); - // data = tsdbGetTableTagVal(item->pTable, pExprInfo[j].base.pColumns->info.colId, type, bytes); - // dst = pColInfo->pData + count * pExprInfo[j].base.resSchema.bytes; - // doSetTagValueToResultBuf(dst, data, type, bytes); + } else { // it is a tag value + const char* p = metaGetTableTagVal(&mr.me, pExprInfo[j].base.pParam[0].pCol->colId); + colDataAppend(pDst, count, p, (p == NULL)); } - - count += 1; } + count += 1; if (++pInfo->curPos >= pInfo->pTableGroups->numOfTables) { pOperator->status = OP_EXEC_DONE; } } + metaReaderClear(&mr); + // qDebug("QInfo:0x%"PRIx64" create tag values results completed, rows:%d", GET_TASKID(pRuntimeEnv), count); if (pOperator->status == OP_EXEC_DONE) { setTaskStatus(pTaskInfo, TASK_COMPLETED); diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index b194c5e535..332a116f76 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -82,7 +82,7 @@ static void getInitialStartTimeWindow(SInterval* pInterval, int32_t precision, T } // get the correct time window according to the handled timestamp -static STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int64_t ts, +STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int64_t ts, SInterval* pInterval, int32_t precision, STimeWindow* win) { STimeWindow w = {0}; @@ -186,7 +186,7 @@ static FORCE_INLINE int32_t getForwardStepsInBlock(int32_t numOfRows, __block_se return forwardStep; } -static int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order) { +int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order) { int32_t midPos = -1; int32_t numOfRows; @@ -249,7 +249,7 @@ static int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order) { return midPos; } -static int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimaryColumn, int32_t startPos, +int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimaryColumn, int32_t startPos, TSKEY ekey, __block_search_fn_t searchFn, STableQueryInfo* item, int32_t order) { assert(startPos >= 0 && startPos < pDataBlockInfo->rows); @@ -988,6 +988,20 @@ static void setInverFunction(SqlFunctionCtx* pCtx, int32_t num, EStreamType type } } } +static void doClearWindows(SIntervalAggOperatorInfo* pInfo, int32_t numOfOutput, SSDataBlock* pBlock) { + SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex); + TSKEY *tsCols = (TSKEY*)pColDataInfo->pData; + int32_t step = 0; + for (int32_t i = 0; i < pBlock->info.rows; i += step) { + SResultRowInfo dumyInfo; + dumyInfo.cur.pageId = -1; + STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, tsCols[i], &pInfo->interval, + pInfo->interval.precision, NULL); + step = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, i, + win.ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC); + doClearWindow(pInfo, (char*)&win.skey, sizeof(TKEY), pBlock->info.groupId, numOfOutput); + } +} static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { SIntervalAggOperatorInfo* pInfo = pOperator->info; @@ -1028,6 +1042,10 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { if (pInfo->invertible) { setInverFunction(pInfo->binfo.pCtx, pOperator->numOfExprs, pBlock->info.type); } + if (pBlock->info.type == STREAM_REPROCESS) { + doClearWindows(pInfo, pOperator->numOfExprs, pBlock); + continue; + } pUpdated = hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, 0); } diff --git a/source/libs/executor/test/indexexcutorTests.cpp b/source/libs/executor/test/index_executor_tests.cpp similarity index 100% rename from source/libs/executor/test/indexexcutorTests.cpp rename to source/libs/executor/test/index_executor_tests.cpp diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 64bee0c096..2ec050d82d 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -1645,7 +1645,7 @@ int32_t percentileFunction(SqlFunctionCtx* pCtx) { int32_t type = pCol->info.type; SPercentileInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo); - if (pCtx->currentStage == REPEAT_SCAN && pInfo->stage == 0) { + if (pCtx->scanFlag == REPEAT_SCAN && pInfo->stage == 0) { pInfo->stage += 1; // all data are null, set it completed diff --git a/source/libs/function/src/taggfunction.c b/source/libs/function/src/taggfunction.c index 4d80b88a3a..b6d5d38c9e 100644 --- a/source/libs/function/src/taggfunction.c +++ b/source/libs/function/src/taggfunction.c @@ -37,7 +37,7 @@ #define GET_TRUE_DATA_TYPE() \ int32_t type = 0; \ - if (pCtx->currentStage == MERGE_STAGE) { \ + if (pCtx->scanFlag == MERGE_STAGE) { \ type = pCtx->resDataInfo.type; \ assert(pCtx->inputType == TSDB_DATA_TYPE_BINARY); \ } else { \ @@ -908,7 +908,7 @@ static void avg_func_merge(SqlFunctionCtx *pCtx) { static void avg_finalizer(SqlFunctionCtx *pCtx) { SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx); - if (pCtx->currentStage == MERGE_STAGE) { + if (pCtx->scanFlag == MERGE_STAGE) { assert(pCtx->inputType == TSDB_DATA_TYPE_BINARY); if (GET_INT64_VAL(GET_ROWCELL_INTERBUF(pResInfo)) <= 0) { @@ -1152,7 +1152,7 @@ static void stddev_function(SqlFunctionCtx *pCtx) { SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx); SStddevInfo *pStd = GET_ROWCELL_INTERBUF(pResInfo); - if (pCtx->currentStage == REPEAT_SCAN && pStd->stage == 0) { + if (pCtx->scanFlag == REPEAT_SCAN && pStd->stage == 0) { pStd->stage++; avg_finalizer(pCtx); @@ -1814,7 +1814,7 @@ static STopBotInfo *getTopBotOutputInfo(SqlFunctionCtx *pCtx) { SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx); // only the first_stage_merge is directly written data into final output buffer - if (pCtx->stableQuery && pCtx->currentStage != MERGE_STAGE) { + if (pCtx->stableQuery && pCtx->scanFlag != MERGE_STAGE) { return (STopBotInfo*) pCtx->pOutput; } else { // during normal table query and super table at the secondary_stage, result is written to intermediate buffer return GET_ROWCELL_INTERBUF(pResInfo); @@ -1956,7 +1956,7 @@ static void top_func_merge(SqlFunctionCtx *pCtx) { for (int32_t i = 0; i < pInput->num; ++i) { int16_t type = (pCtx->resDataInfo.type == TSDB_DATA_TYPE_FLOAT)? TSDB_DATA_TYPE_DOUBLE:pCtx->resDataInfo.type; // do_top_function_add(pOutput, (int32_t)pCtx->param[0].param.i, &pInput->res[i]->v.i, pInput->res[i]->timestamp, -// type, &pCtx->tagInfo, pInput->res[i]->pTags, pCtx->currentStage); +// type, &pCtx->tagInfo, pInput->res[i]->pTags, pCtx->scanFlag); } SET_VAL(pCtx, pInput->num, pOutput->num); @@ -2013,7 +2013,7 @@ static void bottom_func_merge(SqlFunctionCtx *pCtx) { for (int32_t i = 0; i < pInput->num; ++i) { int16_t type = (pCtx->resDataInfo.type == TSDB_DATA_TYPE_FLOAT) ? TSDB_DATA_TYPE_DOUBLE : pCtx->resDataInfo.type; // do_bottom_function_add(pOutput, (int32_t)pCtx->param[0].param.i, &pInput->res[i]->v.i, pInput->res[i]->timestamp, type, -// &pCtx->tagInfo, pInput->res[i]->pTags, pCtx->currentStage); +// &pCtx->tagInfo, pInput->res[i]->pTags, pCtx->scanFlag); } SET_VAL(pCtx, pInput->num, pOutput->num); @@ -2073,7 +2073,7 @@ static void percentile_function(SqlFunctionCtx *pCtx) { SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx); SPercentileInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo); - if (pCtx->currentStage == REPEAT_SCAN && pInfo->stage == 0) { + if (pCtx->scanFlag == REPEAT_SCAN && pInfo->stage == 0) { pInfo->stage += 1; // all data are null, set it completed @@ -2180,7 +2180,7 @@ static SAPercentileInfo *getAPerctInfo(SqlFunctionCtx *pCtx) { SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx); SAPercentileInfo* pInfo = NULL; - if (pCtx->stableQuery && pCtx->currentStage != MERGE_STAGE) { + if (pCtx->stableQuery && pCtx->scanFlag != MERGE_STAGE) { pInfo = (SAPercentileInfo*) pCtx->pOutput; } else { pInfo = GET_ROWCELL_INTERBUF(pResInfo); @@ -2270,7 +2270,7 @@ static void apercentile_finalizer(SqlFunctionCtx *pCtx) { SResultRowEntryInfo * pResInfo = GET_RES_INFO(pCtx); SAPercentileInfo *pOutput = GET_ROWCELL_INTERBUF(pResInfo); - if (pCtx->currentStage == MERGE_STAGE) { + if (pCtx->scanFlag == MERGE_STAGE) { // if (pResInfo->hasResult == DATA_SET_FLAG) { // check for null // assert(pOutput->pHisto->numOfElems > 0); // @@ -2510,7 +2510,7 @@ static void copy_function(SqlFunctionCtx *pCtx); static void tag_function(SqlFunctionCtx *pCtx) { SET_VAL(pCtx, 1, 1); - if (pCtx->currentStage == MERGE_STAGE) { + if (pCtx->scanFlag == MERGE_STAGE) { copy_function(pCtx); } else { taosVariantDump(&pCtx->tag, pCtx->pOutput, pCtx->resDataInfo.type, true); @@ -2966,7 +2966,7 @@ static bool spread_function_setup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pRe SSpreadInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo); // this is the server-side setup function in client-side, the secondary merge do not need this procedure - if (pCtx->currentStage == MERGE_STAGE) { + if (pCtx->scanFlag == MERGE_STAGE) { // pCtx->param[0].param.d = DBL_MAX; // pCtx->param[3].param.d = -DBL_MAX; } else { @@ -3086,7 +3086,7 @@ void spread_function_finalizer(SqlFunctionCtx *pCtx) { */ SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx); - if (pCtx->currentStage == MERGE_STAGE) { + if (pCtx->scanFlag == MERGE_STAGE) { assert(pCtx->inputType == TSDB_DATA_TYPE_BINARY); // if (pResInfo->hasResult != DATA_SET_FLAG) { diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index a625fc0d0c..71b0774ca6 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -1142,9 +1142,9 @@ static int32_t jsonToPhysiTableScanNode(const SJson* pJson, void* pObj) { return code; } -static int32_t physiStreamScanNodeToJson(const void* pObj, SJson* pJson) { return physiScanNodeToJson(pObj, pJson); } +static int32_t physiStreamScanNodeToJson(const void* pObj, SJson* pJson) { return physiTableScanNodeToJson(pObj, pJson); } -static int32_t jsonToPhysiStreamScanNode(const SJson* pJson, void* pObj) { return jsonToPhysiScanNode(pJson, pObj); } +static int32_t jsonToPhysiStreamScanNode(const SJson* pJson, void* pObj) { return jsonToPhysiTableScanNode(pJson, pObj); } static const char* jkSysTableScanPhysiPlanMnodeEpSet = "MnodeEpSet"; static const char* jkSysTableScanPhysiPlanShowRewrite = "ShowRewrite"; diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index 835d607099..edf44424e3 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -460,9 +460,13 @@ static int32_t createTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubp memcpy(pTableScan->scanSeq, pScanLogicNode->scanSeq, sizeof(pScanLogicNode->scanSeq)); pTableScan->scanRange = pScanLogicNode->scanRange; pTableScan->ratio = pScanLogicNode->ratio; - vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode); - taosArrayPush(pCxt->pExecNodeList, &pSubplan->execNode); - pSubplan->execNodeStat.tableNum = pScanLogicNode->pVgroupList->vgroups[0].numOfTable; + if (pScanLogicNode->pVgroupList) { + vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode); + pSubplan->execNodeStat.tableNum = pScanLogicNode->pVgroupList->vgroups[0].numOfTable; + } + if (pCxt->pExecNodeList) { + taosArrayPush(pCxt->pExecNodeList, &pSubplan->execNode); + } tNameGetFullDbName(&pScanLogicNode->tableName, pSubplan->dbFName); pTableScan->dataRequired = pScanLogicNode->dataRequired; pTableScan->pDynamicScanFuncs = nodesCloneList(pScanLogicNode->pDynamicScanFuncs); @@ -505,13 +509,12 @@ static int32_t createSystemTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* static int32_t createStreamScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode, SPhysiNode** pPhyNode) { - SStreamScanPhysiNode* pScan = - (SStreamScanPhysiNode*)makePhysiNode(pCxt, pScanLogicNode->pMeta->tableInfo.precision, - (SLogicNode*)pScanLogicNode, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN); - if (NULL == pScan) { - return TSDB_CODE_OUT_OF_MEMORY; + int32_t res = createTableScanPhysiNode(pCxt, pSubplan, pScanLogicNode, pPhyNode); + if (res == TSDB_CODE_SUCCESS) { + ENodeType type = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN; + setNodeType(*pPhyNode, type); } - return createScanPhysiNodeFinalize(pCxt, pScanLogicNode, (SScanPhysiNode*)pScan, pPhyNode); + return res; } static int32_t createScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode, @@ -786,7 +789,7 @@ static int32_t doCreateExchangePhysiNode(SPhysiPlanContext* pCxt, SExchangeLogic } static int32_t createStreamScanPhysiNodeByExchange(SPhysiPlanContext* pCxt, SExchangeLogicNode* pExchangeLogicNode, SPhysiNode** pPhyNode) { - SStreamScanPhysiNode* pScan = (SStreamScanPhysiNode*)makePhysiNode( + SScanPhysiNode* pScan = (SScanPhysiNode*)makePhysiNode( pCxt, pExchangeLogicNode->precision, (SLogicNode*)pExchangeLogicNode, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN); if (NULL == pScan) { return TSDB_CODE_OUT_OF_MEMORY; diff --git a/source/libs/stream/src/tstream.c b/source/libs/stream/src/tstream.c index 237f673a47..a5811a5ace 100644 --- a/source/libs/stream/src/tstream.c +++ b/source/libs/stream/src/tstream.c @@ -154,7 +154,7 @@ int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, in // sink if (pTask->sinkType == TASK_SINK__TABLE) { - /*blockDebugShowData(pRes);*/ + blockDebugShowData(pRes); pTask->tbSink.tbSinkFunc(pTask, pTask->tbSink.vnode, 0, pRes); } else if (pTask->sinkType == TASK_SINK__SMA) { pTask->smaSink.smaSink(pTask->ahandle, pTask->smaSink.smaId, pRes); diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 718be6aa64..5570bdcd3e 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -21,15 +21,16 @@ typedef struct SCliConn { uv_connect_t connReq; uv_stream_t* stream; uv_write_t writeReq; - void* hostThrd; - SConnBuffer readBuf; - void* data; - STransQueue cliMsgs; - queue conn; - uint64_t expireTime; - int hThrdIdx; - STransCtx ctx; + void* hostThrd; + int hThrdIdx; + + SConnBuffer readBuf; + STransQueue cliMsgs; + queue conn; + uint64_t expireTime; + + STransCtx ctx; bool broken; // link broken or not ConnStatus status; // @@ -157,13 +158,11 @@ static void cliWalkCb(uv_handle_t* handle, void* arg); transClearBuffer(&conn->readBuf); \ transFreeMsg(transContFromHead((char*)head)); \ tDebug("cli conn %p receive release request, ref: %d", conn, T_REF_VAL_GET(conn)); \ - while (T_REF_VAL_GET(conn) > 1) { \ - transUnrefCliHandle(conn); \ - } \ - if (T_REF_VAL_GET(conn) == 1) { \ + if (T_REF_VAL_GET(conn) > 1) { \ transUnrefCliHandle(conn); \ } \ destroyCmsg(pMsg); \ + addConnToPool(((SCliThrdObj*)conn->hostThrd)->pool, conn); \ return; \ } \ } while (0) diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index af66d39904..fc840691b6 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -35,7 +35,6 @@ typedef struct SSrvConn { uv_timer_t pTimer; queue queue; - int persist; // persist connection or not SConnBuffer readBuf; // read buf, int inType; void* pTransInst; // rpc init @@ -138,6 +137,7 @@ static void destroySmsg(SSrvMsg* smsg); // check whether already read complete packet static SSrvConn* createConn(void* hThrd); static void destroyConn(SSrvConn* conn, bool clear /*clear handle or not*/); +static int reallocConnRefHandle(SSrvConn* conn); static void uvHandleQuit(SSrvMsg* msg, SWorkThrdObj* thrd); static void uvHandleRelease(SSrvMsg* msg, SWorkThrdObj* thrd); @@ -164,7 +164,7 @@ static void* transWorkerThread(void* arg); static void* transAcceptThread(void* arg); // add handle loop -static bool addHandleToWorkloop(SWorkThrdObj* pThrd,char *pipeName); +static bool addHandleToWorkloop(SWorkThrdObj* pThrd, char* pipeName); static bool addHandleToAcceptloop(void* arg); #define CONN_SHOULD_RELEASE(conn, head) \ @@ -180,6 +180,7 @@ static bool addHandleToAcceptloop(void* arg); srvMsg->msg = tmsg; \ srvMsg->type = Release; \ srvMsg->pConn = conn; \ + reallocConnRefHandle(conn); \ if (!transQueuePush(&conn->srvMsgs, srvMsg)) { \ return; \ } \ @@ -360,10 +361,14 @@ void uvOnSendCb(uv_write_t* req, int status) { tTrace("server conn %p data already was written on stream", conn); if (!transQueueEmpty(&conn->srvMsgs)) { SSrvMsg* msg = transQueuePop(&conn->srvMsgs); - if (msg->type == Release && conn->status != ConnNormal) { - conn->status = ConnNormal; - transUnrefSrvHandle(conn); - } + // if (msg->type == Release && conn->status != ConnNormal) { + // conn->status = ConnNormal; + // transUnrefSrvHandle(conn); + // reallocConnRefHandle(conn); + // destroySmsg(msg); + // transQueueClear(&conn->srvMsgs); + // return; + //} destroySmsg(msg); // send second data, just use for push if (!transQueueEmpty(&conn->srvMsgs)) { @@ -421,8 +426,15 @@ static void uvPrepareSendData(SSrvMsg* smsg, uv_buf_t* wb) { if (pConn->status == ConnNormal) { pHead->msgType = pConn->inType + 1; } else { - pHead->msgType = smsg->type == Release ? 0 : pMsg->msgType; + if (smsg->type == Release) { + pHead->msgType = 0; + pConn->status = ConnNormal; + transUnrefSrvHandle(pConn); + } else { + pHead->msgType = pMsg->msgType; + } } + pHead->release = smsg->type == Release ? 1 : 0; pHead->code = htonl(pMsg->code); @@ -517,7 +529,7 @@ void uvWorkerAsyncCb(uv_async_t* handle) { int64_t refId = transMsg.refId; SExHandle* exh2 = uvAcquireExHandle(refId); if (exh2 == NULL || exh1 != exh2) { - tTrace("server handle %p except msg, ignore it", exh1); + tTrace("server handle except msg %p, ignore it", exh1); uvReleaseExHandle(refId); destroySmsg(msg); continue; @@ -581,11 +593,12 @@ void uvOnAcceptCb(uv_stream_t* stream, int status) { if (uv_accept(stream, (uv_stream_t*)cli) == 0) { if (pObj->numOfWorkerReady < pObj->numOfThreads) { - tError("worker-threads are not ready for all, need %d instead of %d.", pObj->numOfThreads, pObj->numOfWorkerReady); + tError("worker-threads are not ready for all, need %d instead of %d.", pObj->numOfThreads, + pObj->numOfWorkerReady); uv_close((uv_handle_t*)cli, NULL); return; } - + uv_write_t* wr = (uv_write_t*)taosMemoryMalloc(sizeof(uv_write_t)); wr->data = cli; uv_buf_t buf = uv_buf_init((char*)notify, strlen(notify)); @@ -681,14 +694,14 @@ void* transAcceptThread(void* arg) { return NULL; } -void uvOnPipeConnectionCb(uv_connect_t *connect, int status) { +void uvOnPipeConnectionCb(uv_connect_t* connect, int status) { if (status != 0) { return; } SWorkThrdObj* pThrd = container_of(connect, SWorkThrdObj, connect_req); uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb); } -static bool addHandleToWorkloop(SWorkThrdObj* pThrd,char *pipeName) { +static bool addHandleToWorkloop(SWorkThrdObj* pThrd, char* pipeName) { pThrd->loop = (uv_loop_t*)taosMemoryMalloc(sizeof(uv_loop_t)); if (0 != uv_loop_init(pThrd->loop)) { return false; @@ -787,6 +800,19 @@ static void destroyConn(SSrvConn* conn, bool clear) { // uv_shutdown(req, (uv_stream_t*)conn->pTcp, uvShutDownCb); } } +static int reallocConnRefHandle(SSrvConn* conn) { + uvReleaseExHandle(conn->refId); + uvRemoveExHandle(conn->refId); + // avoid app continue to send msg on invalid handle + SExHandle* exh = taosMemoryMalloc(sizeof(SExHandle)); + exh->handle = conn; + exh->pThrd = conn->hostThrd; + exh->refId = uvAddExHandle(exh); + uvAcquireExHandle(exh->refId); + conn->refId = exh->refId; + + return 0; +} static void uvDestroyConn(uv_handle_t* handle) { SSrvConn* conn = handle->data; if (conn == NULL) { @@ -822,7 +848,7 @@ static void uvPipeListenCb(uv_stream_t* handle, int status) { ASSERT(status == 0); SServerObj* srv = container_of(handle, SServerObj, pipeListen); - uv_pipe_t* pipe = &(srv->pipe[srv->numOfWorkerReady][0]); + uv_pipe_t* pipe = &(srv->pipe[srv->numOfWorkerReady][0]); ASSERT(0 == uv_pipe_init(srv->loop, pipe, 1)); ASSERT(0 == uv_accept((uv_stream_t*)&srv->pipeListen, (uv_stream_t*)pipe)); @@ -859,7 +885,8 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, snprintf(pipeName, sizeof(pipeName), "\\\\?\\pipe\\trans.rpc.%p-%lu", taosSafeRand(), GetCurrentProcessId()); #else char pipeName[PATH_MAX] = {0}; - snprintf(pipeName, sizeof(pipeName), "%s%spipe.trans.rpc.%08X-%lu", tsTempDir, TD_DIRSEP, taosSafeRand(), taosGetSelfPthreadId()); + snprintf(pipeName, sizeof(pipeName), "%s%spipe.trans.rpc.%08X-%lu", tsTempDir, TD_DIRSEP, taosSafeRand(), + taosGetSelfPthreadId()); #endif assert(0 == uv_pipe_bind(&srv->pipeListen, pipeName)); assert(0 == uv_listen((uv_stream_t*)&srv->pipeListen, SOMAXCONN, uvPipeListenCb)); @@ -874,7 +901,7 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, srv->pipe[i] = (uv_pipe_t*)taosMemoryCalloc(2, sizeof(uv_pipe_t)); thrd->pipe = &(srv->pipe[i][1]); // init read - if (false == addHandleToWorkloop(thrd,pipeName)) { + if (false == addHandleToWorkloop(thrd, pipeName)) { goto End; } int err = taosThreadCreate(&(thrd->thread), NULL, transWorkerThread, (void*)(thrd)); @@ -959,6 +986,7 @@ void uvHandleQuit(SSrvMsg* msg, SWorkThrdObj* thrd) { void uvHandleRelease(SSrvMsg* msg, SWorkThrdObj* thrd) { SSrvConn* conn = msg->pConn; if (conn->status == ConnAcquire) { + reallocConnRefHandle(conn); if (!transQueuePush(&conn->srvMsgs, msg)) { return; } diff --git a/tests/parallel_test/collect_cases.sh b/tests/parallel_test/collect_cases.sh new file mode 100755 index 0000000000..c560598c81 --- /dev/null +++ b/tests/parallel_test/collect_cases.sh @@ -0,0 +1,45 @@ +#!/bin/bash + +case_file=/tmp/cases.task + +function usage() { + echo "$0" + echo -e "\t -o output case file" + echo -e "\t -e enterprise edition" + echo -e "\t -h help" +} + +ent=0 +while getopts "o:eh" opt; do + case $opt in + o) + case_file=$OPTARG + ;; + e) + ent=1 + ;; + h) + usage + exit 0 + ;; + \?) + echo "Invalid option: -$OPTARG" + usage + exit 0 + ;; + esac +done + +script_dir=`dirname $0` +cd $script_dir + +if [ $ent -eq 0 ]; then + echo ",,unit-test,bash test.sh" >$case_file +else + echo ",,unit-test,bash test.sh -e" >$case_file +fi +cat ../script/jenkins/basic.txt |grep -v "^#"|grep -v "^$"|sed "s/^/,,script,/" >>$case_file +grep "^python" ../system-test/fulltest.sh |sed "s/^/,,system-test,/" >>$case_file + +exit 0 + diff --git a/tests/parallel_test/container_build.sh b/tests/parallel_test/container_build.sh new file mode 100755 index 0000000000..3f23cd8b5f --- /dev/null +++ b/tests/parallel_test/container_build.sh @@ -0,0 +1,59 @@ +#!/bin/bash + +function usage() { + echo "$0" + echo -e "\t -w work dir" + echo -e "\t -e enterprise edition" + echo -e "\t -t make thread count" + echo -e "\t -h help" +} + +ent=0 +while getopts "w:t:eh" opt; do + case $opt in + w) + WORKDIR=$OPTARG + ;; + e) + ent=1 + ;; + t) + THREAD_COUNT=$OPTARG + ;; + h) + usage + exit 0 + ;; + \?) + echo "Invalid option: -$OPTARG" + usage + exit 0 + ;; + esac +done + +if [ -z "$WORKDIR" ]; then + usage + exit 1 +fi +if [ -z "$THREAD_COUNT" ]; then + THREAD_COUNT=1 +fi + +ulimit -c unlimited + +if [ $ent -eq 0 ]; then + REP_DIR=/home/TDengine + REP_MOUNT_PARAM=$WORKDIR/TDengine:/home/TDengine +else + REP_DIR=/home/TDinternal + REP_MOUNT_PARAM=$WORKDIR/TDinternal:/home/TDinternal +fi + +docker run \ + -v $REP_MOUNT_PARAM \ + --rm --ulimit core=-1 taos_test:v1.0 sh -c "cd $REP_DIR;rm -rf debug;mkdir -p debug;cd debug;cmake .. -DBUILD_TOOLS=true;make -j $THREAD_COUNT" + +ret=$? +exit $ret + diff --git a/tests/parallel_test/run.sh b/tests/parallel_test/run.sh new file mode 100755 index 0000000000..6417f41fd4 --- /dev/null +++ b/tests/parallel_test/run.sh @@ -0,0 +1,357 @@ +#!/bin/bash + +function usage() { + echo "$0" + echo -e "\t -m vm config file" + echo -e "\t -t task file" + echo -e "\t -b branch" + echo -e "\t -l log dir" + echo -e "\t -e enterprise edition" + echo -e "\t -o default timeout value" + echo -e "\t -h help" +} + +ent=0 +while getopts "m:t:b:l:o:eh" opt; do + case $opt in + m) + config_file=$OPTARG + ;; + t) + t_file=$OPTARG + ;; + b) + branch=$OPTARG + ;; + l) + log_dir=$OPTARG + ;; + e) + ent=1 + ;; + o) + timeout_param="-o $OPTARG" + ;; + h) + usage + exit 0 + ;; + \?) + echo "Invalid option: -$OPTARG" + usage + exit 0 + ;; + esac +done +#config_file=$1 +if [ -z $config_file ]; then + usage + exit 1 +fi +if [ ! -f $config_file ]; then + echo "$config_file not found" + usage + exit 1 +fi +#t_file=$2 +if [ -z $t_file ]; then + usage + exit 1 +fi +if [ ! -f $t_file ]; then + echo "$t_file not found" + usage + exit 1 +fi +date_tag=`date +%Y%m%d-%H%M%S` +if [ -z $log_dir ]; then + log_dir="log/${branch}_${date_tag}" +else + log_dir="$log_dir/${branch}_${date_tag}" +fi + +hosts=() +usernames=() +passwords=() +workdirs=() +threads=() + +i=0 +while [ 1 ]; do + host=`jq .[$i].host $config_file` + if [ "$host" = "null" ]; then + break + fi + username=`jq .[$i].username $config_file` + if [ "$username" = "null" ]; then + break + fi + password=`jq .[$i].password $config_file` + if [ "$password" = "null" ]; then + password="" + fi + workdir=`jq .[$i].workdir $config_file` + if [ "$workdir" = "null" ]; then + break + fi + thread=`jq .[$i].thread $config_file` + if [ "$thread" = "null" ]; then + break + fi + hosts[i]=`echo $host|sed 's/\"$//'|sed 's/^\"//'` + usernames[i]=`echo $username|sed 's/\"$//'|sed 's/^\"//'` + passwords[i]=`echo $password|sed 's/\"$//'|sed 's/^\"//'` + workdirs[i]=`echo $workdir|sed 's/\"$//'|sed 's/^\"//'` + threads[i]=$thread + i=$(( i + 1 )) +done + + +function prepare_cases() { + cat $t_file >>$task_file + local i=0 + while [ $i -lt $1 ]; do + echo "%%FINISHED%%" >>$task_file + i=$(( i + 1 )) + done +} + +function clean_tmp() { + # clean tmp dir + local index=$1 + local ssh_script="sshpass -p ${passwords[index]} ssh -o StrictHostKeyChecking=no ${usernames[index]}@${hosts[index]}" + if [ -z ${passwords[index]} ]; then + ssh_script="ssh -o StrictHostKeyChecking=no ${usernames[index]}@${hosts[index]}" + fi + local cmd="${ssh_script} rm -rf ${workdirs[index]}/tmp" + ${cmd} +} + +function run_thread() { + local index=$1 + local thread_no=$2 + local runcase_script="sshpass -p ${passwords[index]} ssh -o StrictHostKeyChecking=no ${usernames[index]}@${hosts[index]}" + if [ -z ${passwords[index]} ]; then + runcase_script="ssh -o StrictHostKeyChecking=no ${usernames[index]}@${hosts[index]}" + fi + local count=0 + local script="${workdirs[index]}/TDengine/tests/parallel_test/run_container.sh" + if [ $ent -ne 0 ]; then + local script="${workdirs[index]}/TDinternal/community/tests/parallel_test/run_container.sh -e" + fi + local cmd="${runcase_script} ${script}" + + # script="echo" + while [ 1 ]; do + local line=`flock -x $lock_file -c "head -n1 $task_file;sed -i \"1d\" $task_file"` + if [ "x$line" = "x%%FINISHED%%" ]; then + # echo "$index . $thread_no EXIT" + break + fi + if [ -z "$line" ]; then + continue + fi + echo "$line"|grep -q "^#" + if [ $? -eq 0 ]; then + continue + fi + local case_redo_time=`echo "$line"|cut -d, -f2` + if [ -z "$case_redo_time" ]; then + case_redo_time=${DEFAULT_RETRY_TIME:-2} + fi + local exec_dir=`echo "$line"|cut -d, -f3` + local case_cmd=`echo "$line"|cut -d, -f4` + local case_file="" + echo "$case_cmd"|grep -q "\.sh" + if [ $? -eq 0 ]; then + case_file=`echo "$case_cmd"|grep -o ".*\.sh"|awk '{print $NF}'` + fi + echo "$case_cmd"|grep -q "^python3" + if [ $? -eq 0 ]; then + case_file=`echo "$case_cmd"|grep -o ".*\.py"|awk '{print $NF}'` + fi + echo "$case_cmd"|grep -q "\.sim" + if [ $? -eq 0 ]; then + case_file=`echo "$case_cmd"|grep -o ".*\.sim"|awk '{print $NF}'` + fi + if [ -z "$case_file" ]; then + case_file=`echo "$case_cmd"|awk '{print $NF}'` + fi + if [ -z "$case_file" ]; then + continue + fi + case_file="$exec_dir/${case_file}.${index}.${thread_no}.${count}" + count=$(( count + 1 )) + local case_path=`dirname "$case_file"` + if [ ! -z "$case_path" ]; then + mkdir -p $log_dir/$case_path + fi + cmd="${runcase_script} ${script} -w ${workdirs[index]} -c \"${case_cmd}\" -t ${thread_no} -d ${exec_dir} ${timeout_param}" + # echo "$thread_no $count $cmd" + local ret=0 + local redo_count=1 + start_time=`date +%s` + while [ ${redo_count} -lt 6 ]; do + if [ -f $log_dir/$case_file.log ]; then + cp $log_dir/$case_file.log $log_dir/$case_file.${redo_count}.redolog + fi + echo "${hosts[index]}-${thread_no} order:${count}, redo:${redo_count} task:${line}" >$log_dir/$case_file.log + echo -e "\e[33m >>>>> \e[0m ${case_cmd}" + date >>$log_dir/$case_file.log + # $cmd 2>&1 | tee -a $log_dir/$case_file.log + # ret=${PIPESTATUS[0]} + $cmd >>$log_dir/$case_file.log 2>&1 + ret=$? + echo "${hosts[index]} `date` ret:${ret}" >>$log_dir/$case_file.log + if [ $ret -eq 0 ]; then + break + fi + redo=0 + grep -q "wait too long for taosd start" $log_dir/$case_file.log + if [ $? -eq 0 ]; then + redo=1 + fi + grep -q "kex_exchange_identification: Connection closed by remote host" $log_dir/$case_file.log + if [ $? -eq 0 ]; then + redo=1 + fi + grep -q "ssh_exchange_identification: Connection closed by remote host" $log_dir/$case_file.log + if [ $? -eq 0 ]; then + redo=1 + fi + grep -q "kex_exchange_identification: read: Connection reset by peer" $log_dir/$case_file.log + if [ $? -eq 0 ]; then + redo=1 + fi + grep -q "Database not ready" $log_dir/$case_file.log + if [ $? -eq 0 ]; then + redo=1 + fi + grep -q "Unable to establish connection" $log_dir/$case_file.log + if [ $? -eq 0 ]; then + redo=1 + fi + if [ $redo_count -lt $case_redo_time ]; then + redo=1 + fi + if [ $redo -eq 0 ]; then + break + fi + redo_count=$(( redo_count + 1 )) + done + end_time=`date +%s` + echo >>$log_dir/$case_file.log + echo "${hosts[index]} execute time: $(( end_time - start_time ))s" >>$log_dir/$case_file.log + # echo "$thread_no ${line} DONE" + if [ $ret -ne 0 ]; then + flock -x $lock_file -c "echo \"${hosts[index]} ret:${ret} ${line}\" >>$log_dir/failed.log" + mkdir -p $log_dir/${case_file}.coredump + local remote_coredump_dir="${workdirs[index]}/tmp/thread_volume/$thread_no/coredump" + local scpcmd="sshpass -p ${passwords[index]} scp -o StrictHostKeyChecking=no -r ${usernames[index]}@${hosts[index]}" + if [ -z ${passwords[index]} ]; then + scpcmd="scp -o StrictHostKeyChecking=no -r ${usernames[index]}@${hosts[index]}" + fi + cmd="$scpcmd:${remote_coredump_dir}/* $log_dir/${case_file}.coredump/" + $cmd # 2>/dev/null + local case_info=`echo "$line"|cut -d, -f 3,4` + local corefile=`ls $log_dir/${case_file}.coredump/` + corefile=`find $log_dir/${case_file}.coredump/ -name "core.*"` + echo -e "$case_info \e[31m failed\e[0m" + echo "=========================log============================" + cat $log_dir/$case_file.log + echo "=====================================================" + echo -e "\e[34m log file: $log_dir/$case_file.log \e[0m" + if [ ! -z "$corefile" ]; then + echo -e "\e[34m corefiles: $corefile \e[0m" + local build_dir=$log_dir/build_${hosts[index]} + local remote_build_dir="${workdirs[index]}/TDengine/debug/build" + if [ $ent -ne 0 ]; then + remote_build_dir="${workdirs[index]}/TDinternal/debug/build" + fi + mkdir $build_dir 2>/dev/null + if [ $? -eq 0 ]; then + # scp build binary + cmd="$scpcmd:${remote_build_dir}/* ${build_dir}/" + echo "$cmd" + $cmd >/dev/null + fi + fi + # get remote sim dir + local remote_sim_dir="${workdirs[index]}/tmp/thread_volume/$thread_no" + local tarcmd="sshpass -p ${passwords[index]} ssh -o StrictHostKeyChecking=no -r ${usernames[index]}@${hosts[index]}" + if [ -z ${passwords[index]} ]; then + tarcmd="ssh -o StrictHostKeyChecking=no ${usernames[index]}@${hosts[index]}" + fi + cmd="$tarcmd sh -c \"cd $remote_sim_dir; tar -czf sim.tar.gz sim\"" + $cmd + local remote_sim_tar="${workdirs[index]}/tmp/thread_volume/$thread_no/sim.tar.gz" + scpcmd="sshpass -p ${passwords[index]} scp -o StrictHostKeyChecking=no -r ${usernames[index]}@${hosts[index]}" + if [ -z ${passwords[index]} ]; then + scpcmd="scp -o StrictHostKeyChecking=no -r ${usernames[index]}@${hosts[index]}" + fi + cmd="$scpcmd:${remote_sim_tar} $log_dir/${case_file}.sim.tar.gz" + $cmd + fi + done +} + +# echo "hosts: ${hosts[@]}" +# echo "usernames: ${usernames[@]}" +# echo "passwords: ${passwords[@]}" +# echo "workdirs: ${workdirs[@]}" +# echo "threads: ${threads[@]}" +# TODO: check host accessibility + +i=0 +while [ $i -lt ${#hosts[*]} ]; do + clean_tmp $i & + i=$(( i + 1 )) +done +wait + +mkdir -p $log_dir +rm -rf $log_dir/* +task_file=$log_dir/$$.task +lock_file=$log_dir/$$.lock + +i=0 +j=0 +while [ $i -lt ${#hosts[*]} ]; do + j=$(( j + threads[i] )) + i=$(( i + 1 )) +done +prepare_cases $j + +i=0 +while [ $i -lt ${#hosts[*]} ]; do + j=0 + while [ $j -lt ${threads[i]} ]; do + run_thread $i $j & + j=$(( j + 1 )) + done + i=$(( i + 1 )) +done + +wait + +rm -f $lock_file +rm -f $task_file + +# docker ps -a|grep -v CONTAINER|awk '{print $1}'|xargs docker rm -f +RET=0 +i=1 +if [ -f "$log_dir/failed.log" ]; then + echo "=====================================================" + while read line; do + line=`echo "$line"|cut -d, -f 3,4` + echo -e "$i. $line \e[31m failed\e[0m" >&2 + i=$(( i + 1 )) + done <$log_dir/failed.log + RET=1 +fi + +echo "${log_dir}" >&2 + +date + +exit $RET diff --git a/tests/parallel_test/run_case.sh b/tests/parallel_test/run_case.sh new file mode 100755 index 0000000000..9705c024b8 --- /dev/null +++ b/tests/parallel_test/run_case.sh @@ -0,0 +1,74 @@ +#!/bin/bash + +function usage() { + echo "$0" + echo -e "\t -d execution dir" + echo -e "\t -c command" + echo -e "\t -e enterprise edition" + echo -e "\t -o default timeout value" + echo -e "\t -h help" +} + +ent=0 +while getopts "d:c:o:eh" opt; do + case $opt in + d) + exec_dir=$OPTARG + ;; + c) + cmd=$OPTARG + ;; + o) + TIMEOUT_CMD="timeout $OPTARG" + ;; + e) + ent=1 + ;; + h) + usage + exit 0 + ;; + \?) + echo "Invalid option: -$OPTARG" + usage + exit 0 + ;; + esac +done + +if [ -z "$exec_dir" ]; then + usage + exit 0 +fi +if [ -z "$cmd" ]; then + usage + exit 0 +fi + +if [ $ent -eq 0 ]; then + export PATH=$PATH:/home/TDengine/debug/build/bin + export LD_LIBRARY_PATH=/home/TDengine/debug/build/lib + ln -s /home/TDengine/debug/build/lib/libtaos.so /usr/lib/libtaos.so 2>/dev/null + CONTAINER_TESTDIR=/home/TDengine +else + export PATH=$PATH:/home/TDinternal/debug/build/bin + export LD_LIBRARY_PATH=/home/TDinternal/debug/build/lib + ln -s /home/TDinternal/debug/build/lib/libtaos.so /usr/lib/libtaos.so 2>/dev/null + CONTAINER_TESTDIR=/home/TDinternal/community +fi +mkdir -p /var/lib/taos/subscribe +mkdir -p /var/log/taos +mkdir -p /var/lib/taos + +cd $CONTAINER_TESTDIR/tests/$exec_dir +ulimit -c unlimited + +$TIMEOUT_CMD $cmd +RET=$? + +if [ $RET -ne 0 ]; then + pwd +fi + +exit $RET + diff --git a/tests/parallel_test/run_container.sh b/tests/parallel_test/run_container.sh new file mode 100755 index 0000000000..affd9128a4 --- /dev/null +++ b/tests/parallel_test/run_container.sh @@ -0,0 +1,106 @@ +#!/bin/bash + +function usage() { + echo "$0" + echo -e "\t -w work dir" + echo -e "\t -d execution dir" + echo -e "\t -c command" + echo -e "\t -t thread number" + echo -e "\t -e enterprise edition" + echo -e "\t -o default timeout value" + echo -e "\t -h help" +} + +ent=0 +while getopts "w:d:c:t:o:eh" opt; do + case $opt in + w) + WORKDIR=$OPTARG + ;; + d) + exec_dir=$OPTARG + ;; + c) + cmd=$OPTARG + ;; + t) + thread_no=$OPTARG + ;; + e) + ent=1 + ;; + o) + extra_param="-o $OPTARG" + ;; + h) + usage + exit 0 + ;; + \?) + echo "Invalid option: -$OPTARG" + usage + exit 0 + ;; + esac +done + +if [ -z "$WORKDIR" ]; then + usage + exit 1 +fi +if [ -z "$exec_dir" ]; then + usage + exit 1 +fi +if [ -z "$cmd" ]; then + usage + exit 1 +fi +if [ -z "$thread_no" ]; then + usage + exit 1 +fi +if [ $ent -ne 0 ]; then + # enterprise edition + extra_param="$extra_param -e" + INTERNAL_REPDIR=$WORKDIR/TDinternal + REPDIR=$INTERNAL_REPDIR/community + CONTAINER_TESTDIR=/home/TDinternal/community + SIM_DIR=/home/TDinternal/sim + REP_MOUNT_PARAM="$INTERNAL_REPDIR:/home/TDinternal" +else + # community edition + REPDIR=$WORKDIR/TDengine + CONTAINER_TESTDIR=/home/TDengine + SIM_DIR=/home/TDengine/sim + REP_MOUNT_PARAM="$REPDIR:/home/TDengine" +fi + +ulimit -c unlimited + +TMP_DIR=$WORKDIR/tmp + +MOUNT_DIR="" +rm -rf ${TMP_DIR}/thread_volume/$thread_no/sim +mkdir -p ${TMP_DIR}/thread_volume/$thread_no/sim/tsim +mkdir -p ${TMP_DIR}/thread_volume/$thread_no/coredump +rm -rf ${TMP_DIR}/thread_volume/$thread_no/coredump/* +if [ ! -d "${TMP_DIR}/thread_volume/$thread_no/$exec_dir" ]; then + subdir=`echo "$exec_dir"|cut -d/ -f1` + echo "cp -rf ${REPDIR}/tests/$subdir ${TMP_DIR}/thread_volume/$thread_no/" + cp -rf ${REPDIR}/tests/$subdir ${TMP_DIR}/thread_volume/$thread_no/ +fi +MOUNT_DIR="$TMP_DIR/thread_volume/$thread_no/$exec_dir:$CONTAINER_TESTDIR/tests/$exec_dir" +echo "$thread_no -> ${exec_dir}:$cmd" +coredump_dir=`cat /proc/sys/kernel/core_pattern | xargs dirname` + +docker run \ + -v $REP_MOUNT_PARAM \ + -v $MOUNT_DIR \ + -v "$TMP_DIR/thread_volume/$thread_no/sim:${SIM_DIR}" \ + -v ${TMP_DIR}/thread_volume/$thread_no/coredump:$coredump_dir \ + -v $WORKDIR/taos-connector-python/taos:/usr/local/lib/python3.8/site-packages/taos:ro \ + --rm --ulimit core=-1 taos_test:v1.0 $CONTAINER_TESTDIR/tests/parallel_test/run_case.sh -d "$exec_dir" -c "$cmd" $extra_param +ret=$? +exit $ret + diff --git a/tests/pytest/util/dnodes.py b/tests/pytest/util/dnodes.py index 9dcd485194..3c23e784c5 100644 --- a/tests/pytest/util/dnodes.py +++ b/tests/pytest/util/dnodes.py @@ -35,7 +35,7 @@ class TDSimClient: "tableIncStepPerVnode": "10000", "maxVgroupsPerDb": "1000", "sdbDebugFlag": "143", - "rpcDebugFlag": "135", + "rpcDebugFlag": "143", "tmrDebugFlag": "131", "cDebugFlag": "135", "udebugFlag": "135", @@ -136,7 +136,7 @@ class TDDnode: "tsdbDebugFlag": "135", "mDebugFlag": "135", "sdbDebugFlag": "135", - "rpcDebugFlag": "135", + "rpcDebugFlag": "143", "tmrDebugFlag": "131", "cDebugFlag": "135", "httpDebugFlag": "135", diff --git a/tests/system-test/0-others/taosShellNetChk.py b/tests/system-test/0-others/taosShellNetChk.py index 524268101a..bbaeacf328 100644 --- a/tests/system-test/0-others/taosShellNetChk.py +++ b/tests/system-test/0-others/taosShellNetChk.py @@ -138,7 +138,8 @@ class TDTestCase: if "2: service ok" in retVal: tdLog.info("taos -k success") else: - tdLog.exit("taos -k fail") + tdLog.info(retVal) + tdLog.exit("taos -k fail 1") # stop taosd tdDnodes.stop(1) @@ -149,7 +150,8 @@ class TDTestCase: if "0: unavailable" in retVal: tdLog.info("taos -k success") else: - tdLog.exit("taos -k fail") + tdLog.info(retVal) + tdLog.exit("taos -k fail 2") # restart taosd tdDnodes.start(1) @@ -158,7 +160,8 @@ class TDTestCase: if "2: service ok" in retVal: tdLog.info("taos -k success") else: - tdLog.exit("taos -k fail") + tdLog.info(retVal) + tdLog.exit("taos -k fail 3") tdLog.printNoPrefix("================================ parameter: -n") # stop taosd diff --git a/tests/system-test/0-others/user_control.py b/tests/system-test/0-others/user_control.py new file mode 100644 index 0000000000..78aefd5e9e --- /dev/null +++ b/tests/system-test/0-others/user_control.py @@ -0,0 +1,348 @@ +import taos +import sys +import inspect +import traceback + +from util.log import * +from util.sql import * +from util.cases import * +from util.dnodes import * + + +PRIVILEGES_ALL = "ALL" +PRIVILEGES_READ = "READ" +PRIVILEGES_WRITE = "WRITE" + +class TDconnect: + def __init__(self, + host = None, + port = None, + user = None, + password = None, + database = None, + config = None, + ) -> None: + self._conn = None + self._host = host + self._user = user + self._password = password + self._database = database + self._port = port + self._config = config + + def __enter__(self): + self._conn = taos.connect( + host =self._host, + port =self._port, + user =self._user, + password=self._password, + database=self._database, + config =self._config + ) + + self.cursor = self._conn.cursor() + return self + + def error(self, sql): + expectErrNotOccured = True + try: + self.cursor.execute(sql) + except BaseException: + expectErrNotOccured = False + + if expectErrNotOccured: + caller = inspect.getframeinfo(inspect.stack()[1][0]) + tdLog.exit(f"{caller.filename}({caller.lineno}) failed: sql:{sql}, expect error not occured" ) + else: + self.queryRows = 0 + self.queryCols = 0 + self.queryResult = None + tdLog.info(f"sql:{sql}, expect error occured") + + def query(self, sql, row_tag=None): + # sourcery skip: raise-from-previous-error, raise-specific-error + self.sql = sql + try: + self.cursor.execute(sql) + self.queryResult = self.cursor.fetchall() + self.queryRows = len(self.queryResult) + self.queryCols = len(self.cursor.description) + except Exception as e: + caller = inspect.getframeinfo(inspect.stack()[1][0]) + tdLog.notice(f"{caller.filename}({caller.lineno}) failed: sql:{sql}, {repr(e)}") + traceback.print_exc() + raise Exception(repr(e)) + if row_tag: + return self.queryResult + return self.queryRows + + def __exit__(self, types, values, trace): + if self._conn: + self.cursor.close() + self._conn.close() + +def taos_connect( + host = "127.0.0.1", + port = 6030, + user = "root", + passwd = "taosdata", + database= None, + config = None +): + return TDconnect( + host = host, + port=port, + user=user, + password=passwd, + database=database, + config=config + ) + +class TDTestCase: + + def init(self, conn, logSql): + tdLog.debug(f"start to excute {__file__}") + tdSql.init(conn.cursor()) + + @property + def __user_list(self): + return [f"user_test{i}" for i in range(self.users_count) ] + + @property + def __passwd_list(self): + return [f"taosdata{i}" for i in range(self.users_count) ] + + @property + def __privilege(self): + return [ PRIVILEGES_ALL, PRIVILEGES_READ, PRIVILEGES_WRITE ] + + def __priv_level(self, dbname=None): + return f"{dbname}.*" if dbname else "*.*" + + + def create_user_current(self): + users = self.__user_list + passwds = self.__passwd_list + for i in range(self.users_count): + tdSql.execute(f"create user {users[i]} pass '{passwds[i]}' ") + + tdSql.query("show users") + tdSql.checkRows(self.users_count + 1) + + def create_user_err(self): + sqls = [ + "create users u1 pass 'u1passwd' ", + "create user '' pass 'u1passwd' ", + "create user pass 'u1passwd' ", + "create user u1 pass u1passwd ", + "create user u1 password 'u1passwd' ", + "create user u1 pass u1passwd ", + "create user u1 pass '' ", + "create user u1 pass ' ' ", + "create user u1 pass ", + "create user u1 u2 pass 'u1passwd' 'u2passwd' ", + "create user u1 u2 pass 'u1passwd', 'u2passwd' ", + "create user u1, u2 pass 'u1passwd', 'u2passwd' ", + "create user u1, u2 pass 'u1passwd' 'u2passwd' ", + # length of user_name must <= 23 + "create user u12345678901234567890123 pass 'u1passwd' " , + # length of passwd must <= 128 + "create user u1 pass 'u12345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678' " , + # password must have not " ' ~ ` \ + "create user u1 pass 'u1passwd\\' " , + "create user u1 pass 'u1passwd~' " , + "create user u1 pass 'u1passwd\"' " , + "create user u1 pass 'u1passwd\'' " , + "create user u1 pass 'u1passwd`' " , + # must after create a user named u1 + "create user u1 pass 'u1passwd' " , + ] + + tdSql.execute("create user u1 pass 'u1passwd' ") + for sql in sqls: + tdSql.error(sql) + + tdSql.execute("DROP USER u1") + + def __alter_pass_sql(self, user, passwd): + return f'''ALTER USER {user} PASS '{passwd}' ''' + + def alter_pass_current(self): + self.__init_pass = True + for count, i in enumerate(range(self.users_count)): + if self.__init_pass: + tdSql.query(self.__alter_pass_sql(self.__user_list[i], f"new{self.__passwd_list[i]}")) + self.__init_pass = count != self.users_count - 1 + else: + tdSql.query(self.__alter_pass_sql(self.__user_list[i], self.__passwd_list[i] ) ) + self.__init_pass = count == self.users_count - 1 + + def alter_pass_err(self): # sourcery skip: remove-redundant-fstring + sqls = [ + f"alter users {self.__user_list[0]} pass 'newpass' " , + f"alter user {self.__user_list[0]} pass '' " , + f"alter user {self.__user_list[0]} pass ' ' " , + f"alter user anyuser pass 'newpass' " , + f"alter user {self.__user_list[0]} pass " , + f"alter user {self.__user_list[0]} password 'newpass' " , + ] + for sql in sqls: + tdSql.error(sql) + + + def grant_user_privileges(self, privilege, dbname=None, user_name="root"): + return f"GRANT {privilege} ON {self.__priv_level(dbname)} TO {user_name} " + + def test_user_create(self): + self.create_user_current() + self.create_user_err() + + def test_alter_pass(self): + self.alter_pass_current() + self.alter_pass_err() + + def user_login(self, user, passwd): + login_except = False + try: + with taos_connect(user=user, passwd=passwd) as conn: + cursor = conn.cursor + except BaseException: + login_except = True + cursor = None + return login_except, cursor + + def login_currrent(self, user, passwd): + login_except, _ = self.user_login(user, passwd) + if login_except: + tdLog.exit(f"connect failed, user: {user} and pass: {passwd} do not match!") + else: + tdLog.info("connect successfully, user and pass matched!") + + + def login_err(self, user, passwd): + login_except, _ = self.user_login(user, passwd) + if login_except: + tdLog.info("connect failed, except error occured!") + else: + tdLog.exit("connect successfully, except error not occrued!") + + def __drop_user(self, user): + return f"DROP USER {user}" + + def drop_user_current(self): + for user in self.__user_list: + tdSql.query(self.__drop_user(user)) + + def drop_user_error(self): + sqls = [ + f"DROP {self.__user_list[0]}", + f"DROP user {self.__user_list[0]} {self.__user_list[1]}", + f"DROP user {self.__user_list[0]} , {self.__user_list[1]}", + f"DROP users {self.__user_list[0]} {self.__user_list[1]}", + f"DROP users {self.__user_list[0]} , {self.__user_list[1]}", + # "DROP user root", + "DROP user abcde", + "DROP user ALL", + ] + + for sql in sqls: + tdSql.error(sql) + + def test_drop_user(self): + # must drop err first + self.drop_user_error() + self.drop_user_current() + + def run(self): + + # 默认只有 root 用户 + tdLog.printNoPrefix("==========step0: init, user list only has root account") + tdSql.query("show users") + tdSql.checkData(0, 0, "root") + tdSql.checkData(0, 1, "super") + + # root用户权限 + # 创建用户测试 + tdLog.printNoPrefix("==========step1: create user test") + self.users_count = 5 + self.test_user_create() + + # 查看用户 + tdLog.printNoPrefix("==========step2: show user test") + tdSql.query("show users") + tdSql.checkRows(self.users_count + 1) + + # 密码登录认证 + self.login_currrent(self.__user_list[0], self.__passwd_list[0]) + self.login_err(self.__user_list[0], f"new{self.__passwd_list[0]}") + + # 修改密码 + tdLog.printNoPrefix("==========step3: alter user pass test") + self.test_alter_pass() + + # 密码修改后的登录认证 + tdLog.printNoPrefix("==========step4: check login test") + self.login_err(self.__user_list[0], self.__passwd_list[0]) + self.login_currrent(self.__user_list[0], f"new{self.__passwd_list[0]}") + + tdDnodes.stop(1) + tdDnodes.start(1) + + tdSql.query("show users") + tdSql.checkRows(self.users_count + 1) + + # 普通用户权限 + # 密码登录 + # _, user = self.user_login(self.__user_list[0], f"new{self.__passwd_list[0]}") + with taos_connect(user=self.__user_list[0], passwd=f"new{self.__passwd_list[0]}") as user: + # user = conn + # 不能创建用户 + tdLog.printNoPrefix("==========step5: normal user can not create user") + user.error("create use utest1 pass 'utest1pass'") + # 可以查看用户 + tdLog.printNoPrefix("==========step6: normal user can show user") + user.query("show users") + assert user.queryRows == self.users_count + 1 + # 不可以修改其他用户的密码 + tdLog.printNoPrefix("==========step7: normal user can not alter other user pass") + user.error(self.__alter_pass_sql(self.__user_list[1], self.__passwd_list[1] )) + user.error(self.__alter_pass_sql("root", "taosdata_root" )) + # 可以修改自己的密码 + tdLog.printNoPrefix("==========step8: normal user can alter owner pass") + user.query(self.__alter_pass_sql(self.__user_list[0], self.__passwd_list[0])) + # 不可以删除用户,包括自己 + tdLog.printNoPrefix("==========step9: normal user can not drop any user ") + user.error(f"drop user {self.__user_list[0]}") + user.error(f"drop user {self.__user_list[1]}") + user.error("drop user root") + + # root删除用户测试 + tdLog.printNoPrefix("==========step10: super user drop normal user") + self.test_drop_user() + + tdSql.query("show users") + tdSql.checkRows(1) + tdSql.checkData(0, 0, "root") + tdSql.checkData(0, 1, "super") + + tdDnodes.stop(1) + tdDnodes.start(1) + + # 删除后无法登录 + self.login_err(self.__user_list[0], self.__passwd_list[0]) + self.login_err(self.__user_list[0], f"new{self.__passwd_list[0]}") + self.login_err(self.__user_list[1], self.__passwd_list[1]) + self.login_err(self.__user_list[1], f"new{self.__passwd_list[1]}") + + tdSql.query("show users") + tdSql.checkRows(1) + tdSql.checkData(0, 0, "root") + tdSql.checkData(0, 1, "super") + + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) diff --git a/tests/system-test/2-query/between.py b/tests/system-test/2-query/between.py index e8bde3c11c..3b9465dd26 100644 --- a/tests/system-test/2-query/between.py +++ b/tests/system-test/2-query/between.py @@ -45,16 +45,16 @@ class TDTestCase: tdLog.printNoPrefix("==========step3:query timestamp type") - # tdSql.query("select * from t1 where ts between now()-1m and now()+10m") - # tdSql.checkRows(10) - # tdSql.query("select * from t1 where ts between '2021-01-01 00:00:00.000' and '2121-01-01 00:00:00.000'") + tdSql.query("select * from t1 where ts between now()-1m and now()+10m") + tdSql.checkRows(10) + tdSql.query("select * from t1 where ts between '2021-01-01 00:00:00.000' and '2121-01-01 00:00:00.000'") # tdSql.checkRows(11) - # tdSql.query("select * from t1 where ts between '1969-01-01 00:00:00.000' and '1969-12-31 23:59:59.999'") + tdSql.query("select * from t1 where ts between '1969-01-01 00:00:00.000' and '1969-12-31 23:59:59.999'") # tdSql.checkRows(0) - # tdSql.query("select * from t1 where ts between -2793600 and 31507199") - # tdSql.checkRows(0) - # tdSql.query("select * from t1 where ts between 1609430400000 and 4765104000000") - # tdSql.checkRows(11) + tdSql.query("select * from t1 where ts between -2793600 and 31507199") + tdSql.checkRows(0) + tdSql.query("select * from t1 where ts between 1609430400000 and 4765104000000") + tdSql.checkRows(11) tdLog.printNoPrefix("==========step4:query int type") @@ -68,11 +68,11 @@ class TDTestCase: tdSql.checkRows(0) # tdSql.query("select * from t1 where c1 between 0x64 and 0x69") # tdSql.checkRows(6) - # tdSql.query("select * from t1 where c1 not between 100 and 106") - # tdSql.checkRows(11) + tdSql.query("select * from t1 where c1 not between 100 and 106") + tdSql.checkRows(11) tdSql.query(f"select * from t1 where c1 between {2**31-2} and {2**31+1}") tdSql.checkRows(1) - tdSql.error(f"select * from t2 where c1 between null and {1-2**31}") + tdSql.query(f"select * from t2 where c1 between null and {1-2**31}") # tdSql.checkRows(3) tdSql.query(f"select * from t2 where c1 between {-2**31} and {1-2**31}") tdSql.checkRows(1) @@ -88,12 +88,12 @@ class TDTestCase: tdSql.query("select * from t1 where c2 between 'DC3' and 'SYN'") tdSql.checkRows(0) tdSql.query("select * from t1 where c2 not between 0.1 and 0.2") - # tdSql.checkRows(11) + tdSql.checkRows(11) tdSql.query(f"select * from t1 where c2 between {pow(10,38)*3.4} and {pow(10,38)*3.4+1}") # tdSql.checkRows(1) tdSql.query(f"select * from t2 where c2 between {-3.4*10**38-1} and {-3.4*10**38}") # tdSql.checkRows(2) - tdSql.error(f"select * from t2 where c2 between null and {-3.4*10**38}") + tdSql.query(f"select * from t2 where c2 between null and {-3.4*10**38}") # tdSql.checkRows(3) tdLog.printNoPrefix("==========step6:query bigint type") @@ -101,7 +101,7 @@ class TDTestCase: tdSql.query(f"select * from t1 where c3 between {2**31} and {2**31+10}") tdSql.checkRows(10) tdSql.query(f"select * from t1 where c3 between {-2**63} and {2**63}") - # tdSql.checkRows(11) + tdSql.checkRows(11) tdSql.query(f"select * from t1 where c3 between {2**31+10} and {2**31}") tdSql.checkRows(0) tdSql.query("select * from t1 where c3 between 'a' and 'z'") @@ -112,7 +112,7 @@ class TDTestCase: tdSql.checkRows(1) tdSql.query(f"select * from t2 where c3 between {-2**63} and {1-2**63}") # tdSql.checkRows(3) - tdSql.error(f"select * from t2 where c3 between null and {1-2**63}") + tdSql.query(f"select * from t2 where c3 between null and {1-2**63}") # tdSql.checkRows(2) tdLog.printNoPrefix("==========step7:query double type") @@ -129,10 +129,10 @@ class TDTestCase: tdSql.query("select * from t1 where c4 not between 1 and 2") # tdSql.checkRows(0) tdSql.query(f"select * from t1 where c4 between {1.7*10**308} and {1.7*10**308+1}") - # tdSql.checkRows(1) + tdSql.checkRows(1) tdSql.query(f"select * from t2 where c4 between {-1.7*10**308-1} and {-1.7*10**308}") # tdSql.checkRows(3) - tdSql.error(f"select * from t2 where c4 between null and {-1.7*10**308}") + tdSql.query(f"select * from t2 where c4 between null and {-1.7*10**308}") # tdSql.checkRows(3) tdLog.printNoPrefix("==========step8:query smallint type") @@ -151,7 +151,7 @@ class TDTestCase: tdSql.checkRows(1) tdSql.query("select * from t2 where c5 between -32768 and -32767") tdSql.checkRows(1) - tdSql.error("select * from t2 where c5 between null and -32767") + tdSql.query("select * from t2 where c5 between null and -32767") # tdSql.checkRows(1) tdLog.printNoPrefix("==========step9:query tinyint type") @@ -170,21 +170,21 @@ class TDTestCase: tdSql.checkRows(1) tdSql.query("select * from t2 where c6 between -128 and -127") tdSql.checkRows(1) - tdSql.error("select * from t2 where c6 between null and -127") + tdSql.query("select * from t2 where c6 between null and -127") # tdSql.checkRows(3) tdLog.printNoPrefix("==========step10:invalid query type") - # tdSql.query("select * from supt where location between 'beijing' and 'shanghai'") - # tdSql.checkRows(23) - # # 非0值均解析为1,因此"between 负值 and o"解析为"between 1 and 0" - # tdSql.query("select * from supt where isused between 0 and 1") - # tdSql.checkRows(23) - # tdSql.query("select * from supt where isused between -1 and 0") - # tdSql.checkRows(0) - # tdSql.error("select * from supt where isused between false and true") - # tdSql.query("select * from supt where family between '拖拉机' and '自行车'") - # tdSql.checkRows(23) + tdSql.query("select * from supt where location between 'beijing' and 'shanghai'") + tdSql.checkRows(23) + # 非0值均解析为1,因此"between 负值 and o"解析为"between 1 and 0" + tdSql.query("select * from supt where isused between 0 and 1") + tdSql.checkRows(23) + tdSql.query("select * from supt where isused between -1 and 0") + tdSql.checkRows(0) + tdSql.error("select * from supt where isused between false and true") + tdSql.query("select * from supt where family between '拖拉机' and '自行车'") + tdSql.checkRows(23) tdLog.printNoPrefix("==========step11:query HEX/OCT/BIN type") diff --git a/tests/system-test/2-query/timezone.py b/tests/system-test/2-query/timezone.py index 1f3dac90c6..ff55ab31bf 100644 --- a/tests/system-test/2-query/timezone.py +++ b/tests/system-test/2-query/timezone.py @@ -15,8 +15,16 @@ class TDTestCase: def run(self): # sourcery skip: extract-duplicate-method tdSql.prepare() # get system timezone - time_zone = os.popen('timedatectl | grep zone').read( - ).strip().split(':')[1].lstrip() + time_zone_arr = os.popen('timedatectl | grep zone').read( + ).strip().split(':') + if len(time_zone_arr) > 1: + time_zone = time_zone_arr[1].lstrip() + else: + # possibly in a docker container + time_zone_1 = os.popen('ls -l /etc/localtime|awk -F/ \'{print $(NF-1) "/" $NF}\'').read().strip() + time_zone_2 = os.popen('date "+(%Z, %z)"').read().strip() + time_zone = time_zone_1 + " " + time_zone_2 + print("expected time zone: " + time_zone) tdLog.printNoPrefix("==========step1:create tables==========") tdSql.execute( diff --git a/tests/system-test/7-tmq/basic5.py b/tests/system-test/7-tmq/basic5.py index 8a1932f05c..65840349ba 100644 --- a/tests/system-test/7-tmq/basic5.py +++ b/tests/system-test/7-tmq/basic5.py @@ -114,7 +114,7 @@ class TDTestCase: def tmqCase1(self, cfgPath, buildPath): - tdLog.printNoPrefix("======== test scenario 1: ") + tdLog.printNoPrefix("======== test case 1: Produce while consume") tdLog.info("step 1: create database, stb, ctb and insert data") # create and start thread parameterDict = {'cfg': '', \ @@ -122,8 +122,8 @@ class TDTestCase: 'vgroups': 1, \ 'stbName': 'stb', \ 'ctbNum': 10, \ - 'rowsPerTbl': 100, \ - 'batchNum': 10, \ + 'rowsPerTbl': 1000, \ + 'batchNum': 100, \ 'startTs': 1640966400000} # 2022-01-01 00:00:00.000 parameterDict['cfg'] = cfgPath prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict) @@ -163,8 +163,7 @@ class TDTestCase: tdSql.query("create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)") consumerId = 0 - expectmsgcnt = (parameterDict["rowsPerTbl"] / parameterDict["batchNum"] ) * parameterDict["ctbNum"] - expectmsgcnt1 = expectmsgcnt + parameterDict["ctbNum"] + expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] topicList = topicFromStb ifcheckdata = 0 keyList = 'group.id:cgrp1,\ @@ -172,7 +171,7 @@ class TDTestCase: auto.commit.interval.ms:6000,\ auto.offset.reset:earliest' sql = "insert into consumeinfo values " - sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectmsgcnt1, ifcheckdata) + sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata) tdSql.query(sql) tdLog.info("check stb if there are data") @@ -209,18 +208,19 @@ class TDTestCase: else: time.sleep(5) - expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] - + tdLog.info("consumer result: %d, %d"%(tdSql.getData(0 , 2), tdSql.getData(0 , 3))) tdSql.checkData(0 , 1, consumerId) - tdSql.checkData(0 , 2, expectmsgcnt) + # mulit rows and mulit tables in one sql, this num of msg is not sure + #tdSql.checkData(0 , 2, expectmsgcnt) tdSql.checkData(0 , 3, expectrowcnt) tdSql.query("drop topic %s"%topicFromStb) tdSql.query("drop topic %s"%topicFromCtb) - + + tdLog.printNoPrefix("======== test case 1 end ...... ") def tmqCase2(self, cfgPath, buildPath): - tdLog.printNoPrefix("======== test scenario 2: add child table with consuming ") + tdLog.printNoPrefix("======== test case 2: add child table with consuming ") # create and start thread parameterDict = {'cfg': '', \ 'dbName': 'db2', \ @@ -275,9 +275,9 @@ class TDTestCase: tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)"%cdbName) tdSql.query("create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"%cdbName) + rowsOfNewCtb = 1000 consumerId = 0 - expectmsgcnt = (parameterDict["rowsPerTbl"] / parameterDict["batchNum"] ) * parameterDict["ctbNum"] - expectmsgcnt1 = expectmsgcnt + parameterDict["ctbNum"] + expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] + rowsOfNewCtb topicList = topicFromStb ifcheckdata = 0 keyList = 'group.id:cgrp1,\ @@ -285,7 +285,7 @@ class TDTestCase: auto.commit.interval.ms:6000,\ auto.offset.reset:earliest' sql = "insert into consumeinfo values " - sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectmsgcnt1, ifcheckdata) + sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata) tdSql.query(sql) tdLog.info("check stb if there are data") @@ -312,7 +312,6 @@ class TDTestCase: # create new child table and insert data newCtbName = 'newctb' - rowsOfNewCtb = 1000 tdSql.query("create table %s.%s using %s.%s tags(9999)"%(parameterDict["dbName"], newCtbName, parameterDict["dbName"], parameterDict["stbName"])) startTs = parameterDict["startTs"] for j in range(rowsOfNewCtb): @@ -332,14 +331,135 @@ class TDTestCase: else: time.sleep(5) - expectmsgcnt += rowsOfNewCtb - expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] + rowsOfNewCtb - tdSql.checkData(0 , 1, consumerId) - tdSql.checkData(0 , 2, expectmsgcnt) tdSql.checkData(0 , 3, expectrowcnt) + + tdSql.query("drop topic %s"%topicFromStb) + tdSql.query("drop topic %s"%topicFromCtb) - tdLog.printNoPrefix("======== test scenario 2 end ...... ") + tdLog.printNoPrefix("======== test case 2 end ...... ") + + def tmqCase3(self, cfgPath, buildPath): + tdLog.printNoPrefix("======== test case 3: tow topics, each contains a stable, \ + but at the beginning, no ctables in the stable of one topic,\ + after starting consumer, create ctables ") + # create and start thread + parameterDict = {'cfg': '', \ + 'dbName': 'db2', \ + 'vgroups': 1, \ + 'stbName': 'stb', \ + 'ctbNum': 10, \ + 'rowsPerTbl': 10000, \ + 'batchNum': 100, \ + 'startTs': 1640966400000} # 2022-01-01 00:00:00.000 + parameterDict['cfg'] = cfgPath + + prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict) + prepareEnvThread.start() + + # wait db ready + while 1: + tdSql.query("show databases") + if tdSql.getRows() == 4: + print (tdSql.getData(0,0), tdSql.getData(1,0),tdSql.getData(2,0),) + break + else: + time.sleep(1) + + tdSql.query("use %s"%parameterDict['dbName']) + # wait stb ready + while 1: + tdSql.query("show %s.stables"%parameterDict['dbName']) + if tdSql.getRows() == 1: + break + else: + time.sleep(1) + + tdLog.info("create topics from super table") + topicFromStb = 'topic_stb_column2' + topicFromCtb = 'topic_ctb_column2' + + tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb, parameterDict['dbName'], parameterDict['stbName'])) + tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s_0" %(topicFromCtb, parameterDict['dbName'], parameterDict['stbName'])) + + time.sleep(1) + tdSql.query("show topics") + topic1 = tdSql.getData(0 , 0) + topic2 = tdSql.getData(1 , 0) + tdLog.info("show topics: %s, %s"%(topic1, topic2)) + if topic1 != topicFromStb and topic1 != topicFromCtb: + tdLog.exit("topic error1") + if topic2 != topicFromStb and topic2 != topicFromCtb: + tdLog.exit("topic error2") + + tdLog.info("create consume info table and consume result table") + cdbName = parameterDict["dbName"] + tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)"%cdbName) + tdSql.query("create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"%cdbName) + + rowsOfNewCtb = 1000 + consumerId = 0 + expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] + rowsOfNewCtb + topicList = topicFromStb + ifcheckdata = 0 + keyList = 'group.id:cgrp1,\ + enable.auto.commit:false,\ + auto.commit.interval.ms:6000,\ + auto.offset.reset:earliest' + sql = "insert into consumeinfo values " + sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata) + tdSql.query(sql) + + tdLog.info("check stb if there are data") + while 1: + tdSql.query("select count(*) from %s"%parameterDict["stbName"]) + #tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3)) + countOfStb = tdSql.getData(0, 0) + if countOfStb != 0: + tdLog.info("count from stb: %d"%countOfStb) + break + else: + time.sleep(1) + + tdLog.info("start consume processor") + pollDelay = 5 + showMsg = 1 + showRow = 1 + + shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath + shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, parameterDict["dbName"], showMsg, showRow, cdbName) + shellCmd += "> /dev/null 2>&1 &" + tdLog.info(shellCmd) + os.system(shellCmd) + + # create new child table and insert data + newCtbName = 'newctb' + tdSql.query("create table %s.%s using %s.%s tags(9999)"%(parameterDict["dbName"], newCtbName, parameterDict["dbName"], parameterDict["stbName"])) + startTs = parameterDict["startTs"] + for j in range(rowsOfNewCtb): + sql = "insert into %s.%s values (%d, %d, 'tmqrow_%d') "%(parameterDict["dbName"], newCtbName, startTs + j, j, j) + tdSql.execute(sql) + tdLog.debug("insert data into new child table ............ [OK]") + + # wait for data ready + prepareEnvThread.join() + + tdLog.info("insert process end, and start to check consume result") + while 1: + tdSql.query("select * from consumeresult") + #tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3)) + if tdSql.getRows() == 1: + break + else: + time.sleep(5) + + tdSql.checkData(0 , 1, consumerId) + tdSql.checkData(0 , 3, expectrowcnt) + + tdSql.query("drop topic %s"%topicFromStb) + tdSql.query("drop topic %s"%topicFromCtb) + + tdLog.printNoPrefix("======== test case 3 end ...... ") def run(self): tdSql.prepare() @@ -353,7 +473,7 @@ class TDTestCase: tdLog.info("cfgPath: %s" % cfgPath) self.tmqCase1(cfgPath, buildPath) - #self.tmqCase2(cfgPath, buildPath) + self.tmqCase2(cfgPath, buildPath) #self.tmqCase3(cfgPath, buildPath) def stop(self): diff --git a/tests/system-test/fulltest.sh b/tests/system-test/fulltest.sh index 817f814873..c80206abbc 100755 --- a/tests/system-test/fulltest.sh +++ b/tests/system-test/fulltest.sh @@ -8,6 +8,8 @@ python3 ./test.py -f 0-others/taosShellNetChk.py python3 ./test.py -f 0-others/telemetry.py python3 ./test.py -f 0-others/taosdMonitor.py +python3 ./test.py -f 0-others/user_control.py + #python3 ./test.py -f 2-query/between.py python3 ./test.py -f 2-query/distinct.py python3 ./test.py -f 2-query/varchar.py @@ -53,5 +55,3 @@ python3 ./test.py -f 2-query/arctan.py # python3 ./test.py -f 2-query/query_cols_tags_and_or.py python3 ./test.py -f 7-tmq/basic5.py - - diff --git a/tests/test/c/tmqSim.c b/tests/test/c/tmqSim.c index bc3aa091c3..33ddd23d8c 100644 --- a/tests/test/c/tmqSim.c +++ b/tests/test/c/tmqSim.c @@ -98,12 +98,24 @@ static void printHelp() { } void initLogFile() { - // FILE *fp = fopen(g_stConfInfo.resultFileName, "a"); - char file[256]; - sprintf(file, "%s/../log/tmqlog.txt", configDir); - TdFilePtr pFile = taosOpenFile(file, TD_FILE_TEXT | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_STREAM); + time_t now; + struct tm curTime; + char filename[256]; + + now = taosTime(NULL); + taosLocalTime(&now, &curTime); + sprintf(filename,"%s/../log/tmqlog_%04d-%02d-%02d %02d-%02d-%02d.txt", + configDir, + curTime.tm_year+1900, + curTime.tm_mon+1, + curTime.tm_mday, + curTime.tm_hour, + curTime.tm_min, + curTime.tm_sec); + //sprintf(filename, "%s/../log/tmqlog.txt", configDir); + TdFilePtr pFile = taosOpenFile(filename, TD_FILE_TEXT | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_STREAM); if (NULL == pFile) { - fprintf(stderr, "Failed to open %s for save result\n", "./tmqlog.txt"); + fprintf(stderr, "Failed to open %s for save result\n", filename); exit(-1); } g_fp = pFile; @@ -333,8 +345,8 @@ void loop_consume(SThreadInfo* pInfo) { totalMsgs++; - if (totalMsgs >= pInfo->expectMsgCnt) { - taosFprintfFile(g_fp, "==== totalMsgs >= pInfo->expectMsgCnt, so break\n"); + if (totalRows >= pInfo->expectMsgCnt) { + taosFprintfFile(g_fp, "==== totalRows >= pInfo->expectMsgCnt, so break\n"); break; } } else { diff --git a/tests/unit-test/test.sh b/tests/unit-test/test.sh new file mode 100755 index 0000000000..4122597717 --- /dev/null +++ b/tests/unit-test/test.sh @@ -0,0 +1,40 @@ +#!/bin/bash + +function usage() { + echo "$0" + echo -e "\t -e enterprise edition" + echo -e "\t -h help" +} + +ent=0 +while getopts "eh" opt; do + case $opt in + e) + ent=1 + ;; + h) + usage + exit 0 + ;; + \?) + echo "Invalid option: -$OPTARG" + usage + exit 0 + ;; + esac +done + +script_dir=`dirname $0` +cd ${script_dir} +PWD=`pwd` + +if [ $ent -eq 0 ]; then + cd ../../debug +else + cd ../../../debug +fi + +ctest -j8 +ret=$? +exit $ret +