Merge branch '3.0' of github.com:taosdata/TDengine into feature/udf

This commit is contained in:
slzhou 2022-05-13 15:18:28 +08:00
commit 003d203f24
32 changed files with 1669 additions and 277 deletions

View File

@ -34,16 +34,6 @@ def abort_previous(){
}
def pre_test(){
sh 'hostname'
sh '''
date
sudo rmtaos || echo "taosd has not installed"
'''
sh '''
killall -9 taosd ||echo "no taosd running"
killall -9 gdb || echo "no gdb running"
killall -9 python3.8 || echo "no python program running"
cd ${WKC}
'''
script {
if (env.CHANGE_TARGET == 'master') {
sh '''
@ -81,10 +71,10 @@ def pre_test(){
git pull >/dev/null
git fetch origin +refs/pull/${CHANGE_ID}/merge
git checkout -qf FETCH_HEAD
git log|head -n20
git log -5
cd ${WK}
git pull >/dev/null
git log|head -n20
git log -5
'''
} else if (env.CHANGE_URL =~ /\/TDinternal\//) {
sh '''
@ -92,10 +82,10 @@ def pre_test(){
git pull >/dev/null
git fetch origin +refs/pull/${CHANGE_ID}/merge
git checkout -qf FETCH_HEAD
git log|head -n20
git log -5
cd ${WKC}
git pull >/dev/null
git log|head -n20
git log -5
'''
} else {
sh '''
@ -106,21 +96,10 @@ def pre_test(){
cd ${WKC}
git submodule update --init --recursive
'''
sh '''
cd ${WK}
export TZ=Asia/Harbin
date
rm -rf debug
mkdir debug
cd debug
cmake .. > /dev/null
make -j4> /dev/null
'''
sh '''
cd ${WKPY}
git reset --hard
git pull
pip3 install .
'''
return 1
}
@ -131,12 +110,14 @@ def pre_test_win(){
time /t
taskkill /f /t /im python.exe
taskkill /f /t /im bash.exe
cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDengine
rd /s /Q C:\\workspace\\%EXECUTOR_NUMBER%\\TDengine\\debug
rd /s /Q C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal\\debug
exit 0
'''
bat '''
cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDengine
cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal
git reset --hard
git fetch || git fetch
cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal\\community
git reset --hard
git fetch || git fetch
git checkout -f
@ -144,39 +125,73 @@ def pre_test_win(){
script {
if (env.CHANGE_TARGET == 'master') {
bat '''
cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDengine
cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal
git checkout master
cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal\\community
git checkout master
'''
} else if(env.CHANGE_TARGET == '2.0') {
bat '''
cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDengine
cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal
git checkout 2.0
cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal\\community
git checkout 2.0
'''
} else if(env.CHANGE_TARGET == '3.0') {
bat '''
cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDengine
cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal
git checkout 3.0
cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal\\community
git checkout 3.0
'''
} else {
bat '''
cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDengine
cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal
git checkout develop
cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal\\community
git checkout develop
'''
}
}
script {
if (env.CHANGE_URL =~ /\/TDengine\//) {
bat '''
cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal
git pull
git log -5
cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal\\community
git pull
git fetch origin +refs/pull/${CHANGE_ID}/merge
git checkout -qf FETCH_HEAD
git log -5
'''
} else if (env.CHANGE_URL =~ /\/TDinternal\//) {
bat '''
cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal
git pull
git fetch origin +refs/pull/${CHANGE_ID}/merge
git checkout -qf FETCH_HEAD
git log -5
cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal\\community
git pull
git log -5
'''
} else {
sh '''
echo "unmatched reposiotry ${CHANGE_URL}"
'''
}
}
bat '''
cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDengine
git branch
git pull || git pull
git fetch origin +refs/pull/%CHANGE_ID%/merge
git checkout -qf FETCH_HEAD
cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal\\community
git submodule update --init --recursive
'''
}
def pre_test_build_win() {
bat '''
echo "building ..."
time /t
cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDengine
cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal
mkdir debug
cd debug
call "C:\\Program Files (x86)\\Microsoft Visual Studio\\2017\\Community\\VC\\Auxiliary\\Build\\vcvarsall.bat" x64
@ -192,6 +207,7 @@ pipeline {
agent none
options { skipDefaultCheckout() }
environment{
WKDIR = '/var/lib/jenkins/workspace'
WK = '/var/lib/jenkins/workspace/TDinternal'
WKC = '/var/lib/jenkins/workspace/TDinternal/community'
WKPY = '/var/lib/jenkins/workspace/taos-connector-python'
@ -206,39 +222,22 @@ pipeline {
changeRequest()
}
steps {
timeout(time: 45, unit: 'MINUTES'){
timeout(time: 20, unit: 'MINUTES'){
pre_test()
script {
if (env.CHANGE_URL =~ /\/TDengine\//) {
sh '''
cd ${WK}/debug
ctest -VV
'''
sh '''
export LD_LIBRARY_PATH=${WK}/debug/build/lib
cd ${WKC}/tests/system-test
./fulltest.sh
'''
} else if (env.CHANGE_URL =~ /\/TDinternal\//) {
sh '''
cd ${WKC}/debug
ctest -VV
'''
sh '''
export LD_LIBRARY_PATH=${WKC}/debug/build/lib
cd ${WKC}/tests/system-test
./fulltest.sh
'''
} else {
sh '''
echo "unmatched reposiotry ${CHANGE_URL}"
'''
}
sh '''
cd ${WKC}/tests/parallel_test
date
time ./container_build.sh -w ${WKDIR} -t 8 -e
rm -f /tmp/cases.task
./collect_cases.sh -e
'''
sh '''
cd ${WKC}/tests/parallel_test
date
time ./run.sh -e -m /home/m.json -t /tmp/cases.task -b ${CHANGE_TARGET} -l ${WKDIR}/log
'''
}
sh '''
cd ${WKC}/tests
./test-all.sh b1fq
'''
}
}
}

View File

@ -48,6 +48,7 @@ enum {
typedef enum EStreamType {
STREAM_NORMAL = 1,
STREAM_INVERT,
STREAM_REPROCESS,
STREAM_INVALID,
} EStreamType;

View File

@ -173,6 +173,7 @@ typedef struct SqlFunctionCtx {
SInputColumnInfoData input;
SResultDataInfo resDataInfo;
uint32_t order; // data block scanner order: asc|desc
uint8_t scanFlag; // record current running step, default: 0
////////////////////////////////////////////////////////////////
int32_t startRow; // start row index
int32_t size; // handled processed row number
@ -183,7 +184,6 @@ typedef struct SqlFunctionCtx {
bool hasNull; // null value exist in current block, TODO remove it
bool requireNull; // require null in some function, TODO remove it
int32_t columnIndex; // TODO remove it
uint8_t currentStage; // record current running step, default: 0
bool isAggSet;
int64_t startTs; // timestamp range of current query when function is executed on a specific data block, TODO remove it
bool stableQuery;

View File

@ -193,7 +193,6 @@ typedef struct SScanPhysiNode {
} SScanPhysiNode;
typedef SScanPhysiNode STagScanPhysiNode;
typedef SScanPhysiNode SStreamScanPhysiNode;
typedef struct SSystemTableScanPhysiNode {
SScanPhysiNode scan;
@ -217,6 +216,7 @@ typedef struct STableScanPhysiNode {
} STableScanPhysiNode;
typedef STableScanPhysiNode STableSeqScanPhysiNode;
typedef STableScanPhysiNode SStreamScanPhysiNode;
typedef struct SProjectPhysiNode {
SPhysiNode node;

View File

@ -1447,6 +1447,10 @@ void blockDebugShowData(const SArray* dataBlocks) {
for (int32_t k = 0; k < colNum; k++) {
SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, k);
void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes);
if (pColInfoData->hasNull) {
printf(" %15s |", "NULL");
continue;
}
switch (pColInfoData->info.type) {
case TSDB_DATA_TYPE_TIMESTAMP:
formatTimestamp(pBuf, *(uint64_t*)var, TSDB_TIME_PRECISION_MILLI);
@ -1464,6 +1468,9 @@ void blockDebugShowData(const SArray* dataBlocks) {
case TSDB_DATA_TYPE_UBIGINT:
printf(" %15lu |", *(uint64_t*)var);
break;
case TSDB_DATA_TYPE_DOUBLE:
printf(" %15f |", *(double*)var);
break;
}
}
printf("\n");

View File

@ -950,6 +950,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int32_t parallel) {
.reader = pStreamReader,
.meta = pTq->pVnode->pMeta,
.pMsgCb = &pTq->pVnode->msgCb,
.vnode = pTq->pVnode,
};
pTask->exec.runners[i].inputHandle = pStreamReader;
pTask->exec.runners[i].executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle);

View File

@ -371,9 +371,18 @@ typedef struct STagScanInfo {
STableGroupInfo *pTableGroups;
} STagScanInfo;
typedef enum EStreamScanMode {
STREAM_SCAN_FROM_READERHANDLE = 1,
STREAM_SCAN_FROM_RES,
STREAM_SCAN_FROM_UPDATERES,
STREAM_SCAN_FROM_DATAREADER,
} EStreamScanMode;
typedef struct SStreamBlockScanInfo {
SArray* pBlockLists; // multiple SSDatablock.
SSDataBlock* pRes; // result SSDataBlock
SSDataBlock* pUpdateRes; // update SSDataBlock
int32_t updateResIndex;
int32_t blockType; // current block type
int32_t validBlockIndex; // Is current data has returned?
SColumnInfo* pCols; // the output column info
@ -383,8 +392,12 @@ typedef struct SStreamBlockScanInfo {
SArray* pColMatchInfo; //
SNode* pCondition;
SArray* tsArray;
SUpdateInfo* pUpdateInfo;
SUpdateInfo* pUpdateInfo;
int32_t primaryTsIndex; // primary time stamp slot id
void* pDataReader;
EStreamScanMode scanMode;
SOperatorInfo* pOperatorDumy;
SInterval interval; // if the upstream is an interval operator, the interval info is also kept here.
} SStreamBlockScanInfo;
typedef struct SSysTableScanInfo {
@ -645,6 +658,7 @@ void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWin
void cleanupAggSup(SAggSupporter* pAggSup);
void destroyBasicOperatorInfo(void* param, int32_t numOfOutput);
void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle);
SInterval extractIntervalInfo(const STableScanPhysiNode* pTableScanNode);
SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity, SArray* pColMatchInfo);
SSDataBlock* loadNextDataBlock(void* param);
@ -690,8 +704,9 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx
SExprInfo* pScalarExprInfo, int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo,
const STableGroupInfo* pTableGroupInfo);
SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock* pResBlock, SArray* pColList,
SArray* pTableIdList, SExecTaskInfo* pTaskInfo, SNode* pConditions);
SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, void* pDataReader, SSDataBlock* pResBlock,
SArray* pColList, SArray* pTableIdList, SExecTaskInfo* pTaskInfo,
SNode* pConditions, SOperatorInfo* pOperatorDumy, SInterval* pInterval);
SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols,
SInterval* pInterval, STimeWindow* pWindow, SSDataBlock* pResBlock, int32_t fillType, SNodeListNode* fillVal,
@ -745,6 +760,15 @@ bool aggDecodeResultRow(SOperatorInfo* pOperator, SAggSupporter* pSup, SOptrBasi
int32_t length);
void aggEncodeResultRow(SOperatorInfo* pOperator, SAggSupporter* pSup, SOptrBasicInfo* pInfo, char** result,
int32_t* length);
STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int64_t ts,
SInterval* pInterval, int32_t precision, STimeWindow* win);
int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimaryColumn, int32_t startPos,
TSKEY ekey, __block_search_fn_t searchFn, STableQueryInfo* item,
int32_t order);
int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order);
void doClearWindow(SIntervalAggOperatorInfo* pInfo, char* pData, int16_t bytes,
uint64_t groupId, int32_t numOfOutput);
#ifdef __cplusplus
}

View File

@ -344,6 +344,28 @@ SResultRow* getNewResultRow_rv(SDiskbasedBuf* pResultBuf, int64_t tableGroupId,
return pResultRow;
}
void doClearWindow(SIntervalAggOperatorInfo* pInfo, char* pData, int16_t bytes,
uint64_t groupId, int32_t numOfOutput) {
SAggSupporter* pSup = &pInfo->aggSup;
SET_RES_WINDOW_KEY(pSup->keyBuf, pData, bytes, groupId);
SResultRowPosition* p1 =
(SResultRowPosition*)taosHashGet(pSup->pResultRowHashTable, pSup->keyBuf,
GET_RES_WINDOW_KEY_LEN(bytes));
SResultRow* pResult = getResultRowByPos(pSup->pResultBuf, p1);
SqlFunctionCtx* pCtx = pInfo->binfo.pCtx;
for (int32_t i = 0; i < numOfOutput; ++i) {
pCtx[i].resultInfo = getResultCell(pResult, i, pInfo->binfo.rowCellInfoOffset);
struct SResultRowEntryInfo* pResInfo = pCtx[i].resultInfo;
if (fmIsWindowPseudoColumnFunc(pCtx[i].functionId)) {
continue;
}
pResInfo->initialized = false;
if (pCtx[i].functionId != -1) {
pCtx[i].fpSet.init(&pCtx[i], pResInfo);
}
}
}
/**
* the struct of key in hash table
* +----------+---------------+
@ -724,7 +746,7 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt
pCtx[i].order = order;
pCtx[i].size = pBlock->info.rows;
pCtx[i].pSrcBlock = pBlock;
pCtx[i].currentStage = scanFlag;
pCtx[i].scanFlag = scanFlag;
SInputColumnInfoData* pInput = &pCtx[i].input;
pInput->uid = pBlock->info.uid;
@ -804,23 +826,22 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt
return code;
}
static void doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SqlFunctionCtx* pCtx) {
static int32_t doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SqlFunctionCtx* pCtx) {
for (int32_t k = 0; k < pOperator->numOfExprs; ++k) {
if (functionNeedToExecute(&pCtx[k])) {
pCtx[k].startTs = startTs;
// this can be set during create the struct
// todo add a dummy funtion to avoid process check
if (pCtx[k].fpSet.process != NULL) {
int32_t code = pCtx[k].fpSet.process(&pCtx[k]);
if (code != TSDB_CODE_SUCCESS) {
qError("%s call aggregate function error happens, code : %s",
GET_TASKID(pOperator->pTaskInfo), tstrerror(code));
pOperator->pTaskInfo->code = code;
longjmp(pOperator->pTaskInfo->env, code);
qError("%s aggregate function error happens, code: %s", GET_TASKID(pOperator->pTaskInfo), tstrerror(code));
return code;
}
}
}
}
return TSDB_CODE_SUCCESS;
}
static void setPseudoOutputColInfo(SSDataBlock* pResult, SqlFunctionCtx* pCtx, SArray* pPseudoList) {
@ -976,18 +997,22 @@ static bool functionNeedToExecute(SqlFunctionCtx* pCtx) {
return false;
}
if (pCtx->scanFlag == REPEAT_SCAN) {
return fmIsRepeatScanFunc(pCtx->functionId);
}
if (isRowEntryCompleted(pResInfo)) {
return false;
}
if (functionId == FUNCTION_FIRST_DST || functionId == FUNCTION_FIRST) {
// return QUERY_IS_ASC_QUERY(pQueryAttr);
}
// denote the order type
if ((functionId == FUNCTION_LAST_DST || functionId == FUNCTION_LAST)) {
// return pCtx->param[0].i == pQueryAttr->order.order;
}
// if (functionId == FUNCTION_FIRST_DST || functionId == FUNCTION_FIRST) {
// // return QUERY_IS_ASC_QUERY(pQueryAttr);
// }
//
// // denote the order type
// if ((functionId == FUNCTION_LAST_DST || functionId == FUNCTION_LAST)) {
// // return pCtx->param[0].i == pQueryAttr->order.order;
// }
// in the reverse table scan, only the following functions need to be executed
// if (IS_REVERSE_SCAN(pRuntimeEnv) ||
@ -1922,7 +1947,7 @@ void setFunctionResultOutput(SOptrBasicInfo* pInfo, SAggSupporter* pSup, int32_t
cleanupResultRowEntry(pEntry);
pCtx[i].resultInfo = pEntry;
pCtx[i].currentStage = stage;
pCtx[i].scanFlag = stage;
// set the timestamp output buffer for top/bottom/diff query
// int32_t fid = pCtx[i].functionId;
@ -3702,7 +3727,6 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
SAggOperatorInfo* pAggInfo = pOperator->info;
SOptrBasicInfo* pInfo = &pAggInfo->binfo;
SOperatorInfo* downstream = pOperator->pDownstream[0];
int32_t order = TSDB_ORDER_ASC;
@ -3716,9 +3740,6 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
if (pBlock == NULL) {
break;
}
// if (pAggInfo->current != NULL) {
// setTagValue(pOperator, pAggInfo->current->pTable, pInfo->pCtx, pOperator->numOfExprs);
// }
int32_t code = getTableScanInfo(pOperator, &order, &scanFlag);
if (code != TSDB_CODE_SUCCESS) {
@ -3728,17 +3749,19 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
// there is an scalar expression that needs to be calculated before apply the group aggregation.
if (pAggInfo->pScalarExprInfo != NULL) {
code = projectApplyFunctions(pAggInfo->pScalarExprInfo, pBlock, pBlock, pAggInfo->pScalarCtx,
pAggInfo->numOfScalarExpr, NULL);
pAggInfo->numOfScalarExpr, NULL);
if (code != TSDB_CODE_SUCCESS) {
pTaskInfo->code = code;
longjmp(pTaskInfo->env, pTaskInfo->code);
longjmp(pTaskInfo->env, code);
}
}
// the pDataBlock are always the same one, no need to call this again
setExecutionContext(pOperator->numOfExprs, pBlock->info.groupId, pTaskInfo, pAggInfo);
setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order, scanFlag, true);
doAggregateImpl(pOperator, 0, pInfo->pCtx);
code = doAggregateImpl(pOperator, 0, pInfo->pCtx);
if (code != 0) {
longjmp(pTaskInfo->env, code);
}
#if 0 // test for encode/decode result info
if(pOperator->encodeResultRow){
@ -4581,7 +4604,7 @@ static SResSchema createResSchema(int32_t type, int32_t bytes, int32_t slotId, i
return s;
}
static SColumn* createColumn(int32_t blockId, int32_t slotId, SDataType* pType) {
static SColumn* createColumn(int32_t blockId, int32_t slotId, int32_t colId, SDataType* pType) {
SColumn* pCol = taosMemoryCalloc(1, sizeof(SColumn));
if (pCol == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
@ -4589,9 +4612,10 @@ static SColumn* createColumn(int32_t blockId, int32_t slotId, SDataType* pType)
}
pCol->slotId = slotId;
pCol->bytes = pType->bytes;
pCol->type = pType->type;
pCol->scale = pType->scale;
pCol->colId = colId;
pCol->bytes = pType->bytes;
pCol->type = pType->type;
pCol->scale = pType->scale;
pCol->precision = pType->precision;
pCol->dataBlockId = blockId;
@ -4634,7 +4658,7 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t*
SDataType* pType = &pColNode->node.resType;
pExp->base.resSchema = createResSchema(pType->type, pType->bytes, pTargetNode->slotId, pType->scale,
pType->precision, pColNode->colName);
pExp->base.pParam[0].pCol = createColumn(pColNode->dataBlockId, pColNode->slotId, pType);
pExp->base.pParam[0].pCol = createColumn(pColNode->dataBlockId, pColNode->slotId, pColNode->colId, pType);
pExp->base.pParam[0].type = FUNC_PARAM_TYPE_COLUMN;
} else if (type == QUERY_NODE_VALUE) {
pExp->pExpr->nodeType = QUERY_NODE_VALUE;
@ -4686,7 +4710,7 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t*
SColumnNode* pcn = (SColumnNode*)p1;
pExp->base.pParam[j].type = FUNC_PARAM_TYPE_COLUMN;
pExp->base.pParam[j].pCol = createColumn(pcn->dataBlockId, pcn->slotId, &pcn->node.resType);
pExp->base.pParam[j].pCol = createColumn(pcn->dataBlockId, pcn->slotId, pcn->colId, &pcn->node.resType);
} else if (p1->type == QUERY_NODE_VALUE) {
SValueNode* pvn = (SValueNode*)p1;
pExp->base.pParam[j].type = FUNC_PARAM_TYPE_VALUE;
@ -4764,18 +4788,45 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
return createExchangeOperatorInfo(pHandle->pMsgCb->clientRpc, pExchange->pSrcEndPoints, pResBlock, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) {
SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode; // simple child table.
STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
int32_t code = doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo,
queryId, taskId);
SArray* tableIdList = extractTableIdList(pTableGroupInfo);
int32_t numOfCols = 0;
tsdbReaderT pDataReader = NULL;
if (pHandle->vnode) {
pDataReader = doCreateDataReader(pTableScanNode, pHandle, pTableGroupInfo, (uint64_t)queryId, taskId);
} else {
doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo,
queryId, taskId);
}
if (pDataReader == NULL && terrno != 0) {
qDebug("pDataReader is NULL");
// return NULL;
} else {
qDebug("pDataReader is not NULL");
}
SDataBlockDescNode* pDescNode = pScanPhyNode->node.pOutputDataBlockDesc;
SArray* pColList = extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID);
SSDataBlock* pResBlockDumy = createResDataBlock(pDescNode);
SQueryTableDataCond cond = {0};
int32_t code = initQueryTableDataCond(&cond, pTableScanNode);
if (code != TSDB_CODE_SUCCESS) {
return NULL;
}
SInterval interval = extractIntervalInfo(pTableScanNode);
SOperatorInfo* pOperatorDumy = createTableScanOperatorInfo(pTableScanNode, pDataReader, pHandle, pTaskInfo);
SArray* tableIdList = extractTableIdList(pTableGroupInfo);
SSDataBlock* pResBlock = createResDataBlock(pDescNode);
int32_t numOfCols = 0;
SArray* pCols = extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID);
SOperatorInfo* pOperator = createStreamScanOperatorInfo(pHandle->reader, pResBlock, pCols, tableIdList, pTaskInfo,
pScanPhyNode->node.pConditions);
SOperatorInfo* pOperator = createStreamScanOperatorInfo(pHandle->reader, pDataReader, pResBlock, pCols, tableIdList, pTaskInfo,
pScanPhyNode->node.pConditions, pOperatorDumy, &interval);
taosArrayDestroy(tableIdList);
return pOperator;
} else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) {

View File

@ -13,8 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "filter.h"
#include "function.h"
#include "filter.h"
#include "functionMgt.h"
#include "os.h"
#include "querynodes.h"
@ -260,6 +260,53 @@ static void prepareForDescendingScan(STableScanInfo* pTableScanInfo, SqlFunction
pTableScanInfo->cond.order = TSDB_ORDER_DESC;
}
static void addTagPseudoColumnData(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock) {
// currently only the tbname pseudo column
if (pTableScanInfo->numOfPseudoExpr == 0) {
return;
}
SMetaReader mr = {0};
metaReaderInit(&mr, pTableScanInfo->readHandle.meta, 0);
metaGetTableEntryByUid(&mr, pBlock->info.uid);
for (int32_t j = 0; j < pTableScanInfo->numOfPseudoExpr; ++j) {
SExprInfo* pExpr = &pTableScanInfo->pPseudoExpr[j];
int32_t dstSlotId = pExpr->base.resSchema.slotId;
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotId);
colInfoDataEnsureCapacity(pColInfoData, 0, pBlock->info.rows);
int32_t functionId = pExpr->pExpr->_function.functionId;
// this is to handle the tbname
if (fmIsScanPseudoColumnFunc(functionId)) {
struct SScalarFuncExecFuncs fpSet = {0};
fmGetScalarFuncExecFuncs(functionId, &fpSet);
SColumnInfoData infoData = {0};
infoData.info.type = TSDB_DATA_TYPE_BIGINT;
infoData.info.bytes = sizeof(uint64_t);
colInfoDataEnsureCapacity(&infoData, 0, 1);
colDataAppendInt64(&infoData, 0, &pBlock->info.uid);
SScalarParam srcParam = {
.numOfRows = pBlock->info.rows, .param = pTableScanInfo->readHandle.meta, .columnData = &infoData};
SScalarParam param = {.columnData = pColInfoData};
fpSet.process(&srcParam, 1, &param);
} else { // these are tags
const char* p = metaGetTableTagVal(&mr.me, pExpr->base.pParam[0].pCol->colId);
for (int32_t i = 0; i < pBlock->info.rows; ++i) {
colDataAppend(pColInfoData, i, p, (p == NULL));
}
}
}
metaReaderClear(&mr);
}
static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
STableScanInfo* pTableScanInfo = pOperator->info;
SSDataBlock* pBlock = pTableScanInfo->pResBlock;
@ -285,23 +332,7 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
// currently only the tbname pseudo column
if (pTableScanInfo->numOfPseudoExpr > 0) {
int32_t dstSlotId = pTableScanInfo->pPseudoExpr->base.resSchema.slotId;
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotId);
colInfoDataEnsureCapacity(pColInfoData, 0, pBlock->info.rows);
struct SScalarFuncExecFuncs fpSet;
fmGetScalarFuncExecFuncs(pTableScanInfo->pPseudoExpr->pExpr->_function.functionId, &fpSet);
SColumnInfoData infoData = {0};
infoData.info.type = TSDB_DATA_TYPE_BIGINT;
infoData.info.bytes = sizeof(uint64_t);
colInfoDataEnsureCapacity(&infoData, 0, 1);
colDataAppendInt64(&infoData, 0, &pBlock->info.uid);
SScalarParam srcParam = {.numOfRows = pBlock->info.rows, .param = pTableScanInfo->readHandle.meta, .columnData = &infoData};
SScalarParam param = {.columnData = pColInfoData};
fpSet.process(&srcParam, 1, &param);
addTagPseudoColumnData(pTableScanInfo, pBlock);
}
return pBlock;
@ -568,7 +599,40 @@ static void doClearBufferedBlocks(SStreamBlockScanInfo* pInfo) {
taosArrayClear(pInfo->pBlockLists);
}
static SSDataBlock* getUpdateDataBlock(SStreamBlockScanInfo* pInfo) {
static bool prepareDataScan(SStreamBlockScanInfo* pInfo) {
SSDataBlock* pSDB = pInfo->pUpdateRes;
if (pInfo->updateResIndex < pSDB->info.rows) {
SColumnInfoData* pColDataInfo = taosArrayGet(pSDB->pDataBlock, 0);
TSKEY *tsCols = (TSKEY*)pColDataInfo->pData;
SResultRowInfo dumyInfo;
dumyInfo.cur.pageId = -1;
STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, tsCols[pInfo->updateResIndex], &pInfo->interval,
pInfo->interval.precision, NULL);
STableScanInfo* pTableScanInfo = pInfo->pOperatorDumy->info;
pTableScanInfo->cond.twindow = win;
tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond);
pInfo->updateResIndex += getNumOfRowsInTimeWindow(&pSDB->info, tsCols, pInfo->updateResIndex,
win.ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC);
pTableScanInfo->scanTimes = 0;
return true;
} else {
return false;
}
}
static SSDataBlock* doDataScan(SStreamBlockScanInfo* pInfo) {
SSDataBlock* pResult = NULL;
pResult = doTableScan(pInfo->pOperatorDumy);
if (pResult == NULL) {
if (prepareDataScan(pInfo)) {
// scan next window data
pResult = doTableScan(pInfo->pOperatorDumy);
}
}
return pResult;
}
static SSDataBlock* getUpdateDataBlock(SStreamBlockScanInfo* pInfo, bool invertible) {
SColumnInfoData* pColDataInfo = taosArrayGet(pInfo->pRes->pDataBlock, pInfo->primaryTsIndex);
TSKEY* ts = (TSKEY*)pColDataInfo->pData;
for (int32_t i = 0; i < pInfo->pRes->info.rows; i++) {
@ -576,13 +640,19 @@ static SSDataBlock* getUpdateDataBlock(SStreamBlockScanInfo* pInfo) {
taosArrayPush(pInfo->tsArray, ts + i);
}
}
if (taosArrayGetSize(pInfo->tsArray) > 0) {
int32_t size = taosArrayGetSize(pInfo->tsArray);
if (size > 0 && invertible) {
// TODO(liuyao) get from tsdb
// SSDataBlock* p = createOneDataBlock(pInfo->pRes, true);
// p->info.type = STREAM_INVERT;
// taosArrayClear(pInfo->tsArray);
// return p;
return NULL;
SSDataBlock* p = createOneDataBlock(pInfo->pRes, false);
taosArraySet(p->pDataBlock, 0, pInfo->tsArray);
p->info.rows = size;
p->info.type = STREAM_REPROCESS;
taosArrayClear(pInfo->tsArray);
return p;
}
return NULL;
}
@ -609,14 +679,23 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
int32_t current = pInfo->validBlockIndex++;
return taosArrayGetP(pInfo->pBlockLists, current);
} else {
if (total > 0) {
ASSERT(total == 2);
SSDataBlock* pRes = taosArrayGetP(pInfo->pBlockLists, 0);
SSDataBlock* pUpRes = taosArrayGetP(pInfo->pBlockLists, 1);
blockDataDestroy(pUpRes);
taosArrayClear(pInfo->pBlockLists);
return pRes;
if (pInfo->scanMode == STREAM_SCAN_FROM_RES) {
blockDataDestroy(pInfo->pUpdateRes);
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
return pInfo->pRes;
} else if (pInfo->scanMode == STREAM_SCAN_FROM_UPDATERES) {
blockDataCleanup(pInfo->pRes);
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER;
return pInfo->pUpdateRes;
} else if (pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER) {
SSDataBlock* pSDB = doDataScan(pInfo);
if (pSDB == NULL) {
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
} else {
return pSDB;
}
}
SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;
blockDataCleanup(pInfo->pRes);
@ -682,12 +761,18 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
if (rows == 0) {
pOperator->status = OP_EXEC_DONE;
} else {
SSDataBlock* upRes = getUpdateDataBlock(pInfo);
} else if (pInfo->interval.interval > 0) {
SSDataBlock* upRes = getUpdateDataBlock(pInfo, true); //TODO(liuyao) get invertible from plan
if (upRes) {
taosArrayPush(pInfo->pBlockLists, &(pInfo->pRes));
taosArrayPush(pInfo->pBlockLists, &upRes);
return upRes;
pInfo->pUpdateRes = upRes;
if (upRes->info.type = STREAM_REPROCESS) {
pInfo->updateResIndex = 0;
prepareDataScan(pInfo);
pInfo->scanMode = STREAM_SCAN_FROM_UPDATERES;
} else if (upRes->info.type = STREAM_INVERT) {
pInfo->scanMode = STREAM_SCAN_FROM_RES;
return upRes;
}
}
}
@ -695,8 +780,10 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
}
}
SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock* pResBlock, SArray* pColList,
SArray* pTableIdList, SExecTaskInfo* pTaskInfo, SNode* pCondition) {
SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, void* pDataReader,
SSDataBlock* pResBlock, SArray* pColList, SArray* pTableIdList,
SExecTaskInfo* pTaskInfo, SNode* pCondition, SOperatorInfo* pOperatorDumy,
SInterval* pInterval) {
SStreamBlockScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamBlockScanInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
@ -736,7 +823,7 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock*
}
pInfo->primaryTsIndex = 0; // TODO(liuyao) get it from physical plan
pInfo->pUpdateInfo = updateInfoInit(60000, 0, 100); // TODO(liuyao) get it from physical plan
pInfo->pUpdateInfo = updateInfoInitP(pInterval, 10000); // TODO(liuyao) get watermark from physical plan
if (pInfo->pUpdateInfo == NULL) {
taosMemoryFreeClear(pInfo);
taosMemoryFreeClear(pOperator);
@ -746,6 +833,10 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock*
pInfo->readerHandle = streamReadHandle;
pInfo->pRes = pResBlock;
pInfo->pCondition = pCondition;
pInfo->pDataReader = pDataReader;
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
pInfo->pOperatorDumy = pOperatorDumy;
pInfo->interval = *pInterval;
pOperator->name = "StreamBlockScanOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN;
@ -1348,35 +1439,33 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
char str[512] = {0};
int32_t count = 0;
SMetaReader mr = {0};
metaReaderInit(&mr, pInfo->readHandle.meta, 0);
while (pInfo->curPos < pInfo->pTableGroups->numOfTables && count < pOperator->resultInfo.capacity) {
STableKeyInfo* item = taosArrayGet(pa, pInfo->curPos);
metaGetTableEntryByUid(&mr, item->uid);
for (int32_t j = 0; j < pOperator->numOfExprs; ++j) {
SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, pExprInfo[j].base.resSchema.slotId);
// refactor later
if (fmIsScanPseudoColumnFunc(pExprInfo[j].pExpr->_function.functionId)) {
metaReaderInit(&mr, pInfo->readHandle.meta, 0);
metaGetTableEntryByUid(&mr, item->uid);
STR_TO_VARSTR(str, mr.me.name);
metaReaderClear(&mr);
colDataAppend(pDst, count, str, false);
// data = tsdbGetTableTagVal(item->pTable, pExprInfo[j].base.pColumns->info.colId, type, bytes);
// dst = pColInfo->pData + count * pExprInfo[j].base.resSchema.bytes;
// doSetTagValueToResultBuf(dst, data, type, bytes);
} else { // it is a tag value
const char* p = metaGetTableTagVal(&mr.me, pExprInfo[j].base.pParam[0].pCol->colId);
colDataAppend(pDst, count, p, (p == NULL));
}
count += 1;
}
count += 1;
if (++pInfo->curPos >= pInfo->pTableGroups->numOfTables) {
pOperator->status = OP_EXEC_DONE;
}
}
metaReaderClear(&mr);
// qDebug("QInfo:0x%"PRIx64" create tag values results completed, rows:%d", GET_TASKID(pRuntimeEnv), count);
if (pOperator->status == OP_EXEC_DONE) {
setTaskStatus(pTaskInfo, TASK_COMPLETED);

View File

@ -82,7 +82,7 @@ static void getInitialStartTimeWindow(SInterval* pInterval, int32_t precision, T
}
// get the correct time window according to the handled timestamp
static STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int64_t ts,
STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int64_t ts,
SInterval* pInterval, int32_t precision, STimeWindow* win) {
STimeWindow w = {0};
@ -186,7 +186,7 @@ static FORCE_INLINE int32_t getForwardStepsInBlock(int32_t numOfRows, __block_se
return forwardStep;
}
static int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order) {
int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order) {
int32_t midPos = -1;
int32_t numOfRows;
@ -249,7 +249,7 @@ static int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order) {
return midPos;
}
static int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimaryColumn, int32_t startPos,
int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimaryColumn, int32_t startPos,
TSKEY ekey, __block_search_fn_t searchFn, STableQueryInfo* item,
int32_t order) {
assert(startPos >= 0 && startPos < pDataBlockInfo->rows);
@ -988,6 +988,20 @@ static void setInverFunction(SqlFunctionCtx* pCtx, int32_t num, EStreamType type
}
}
}
static void doClearWindows(SIntervalAggOperatorInfo* pInfo, int32_t numOfOutput, SSDataBlock* pBlock) {
SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
TSKEY *tsCols = (TSKEY*)pColDataInfo->pData;
int32_t step = 0;
for (int32_t i = 0; i < pBlock->info.rows; i += step) {
SResultRowInfo dumyInfo;
dumyInfo.cur.pageId = -1;
STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, tsCols[i], &pInfo->interval,
pInfo->interval.precision, NULL);
step = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, i,
win.ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC);
doClearWindow(pInfo, (char*)&win.skey, sizeof(TKEY), pBlock->info.groupId, numOfOutput);
}
}
static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
SIntervalAggOperatorInfo* pInfo = pOperator->info;
@ -1028,6 +1042,10 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
if (pInfo->invertible) {
setInverFunction(pInfo->binfo.pCtx, pOperator->numOfExprs, pBlock->info.type);
}
if (pBlock->info.type == STREAM_REPROCESS) {
doClearWindows(pInfo, pOperator->numOfExprs, pBlock);
continue;
}
pUpdated = hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, 0);
}

View File

@ -1645,7 +1645,7 @@ int32_t percentileFunction(SqlFunctionCtx* pCtx) {
int32_t type = pCol->info.type;
SPercentileInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo);
if (pCtx->currentStage == REPEAT_SCAN && pInfo->stage == 0) {
if (pCtx->scanFlag == REPEAT_SCAN && pInfo->stage == 0) {
pInfo->stage += 1;
// all data are null, set it completed

View File

@ -37,7 +37,7 @@
#define GET_TRUE_DATA_TYPE() \
int32_t type = 0; \
if (pCtx->currentStage == MERGE_STAGE) { \
if (pCtx->scanFlag == MERGE_STAGE) { \
type = pCtx->resDataInfo.type; \
assert(pCtx->inputType == TSDB_DATA_TYPE_BINARY); \
} else { \
@ -908,7 +908,7 @@ static void avg_func_merge(SqlFunctionCtx *pCtx) {
static void avg_finalizer(SqlFunctionCtx *pCtx) {
SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx);
if (pCtx->currentStage == MERGE_STAGE) {
if (pCtx->scanFlag == MERGE_STAGE) {
assert(pCtx->inputType == TSDB_DATA_TYPE_BINARY);
if (GET_INT64_VAL(GET_ROWCELL_INTERBUF(pResInfo)) <= 0) {
@ -1152,7 +1152,7 @@ static void stddev_function(SqlFunctionCtx *pCtx) {
SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx);
SStddevInfo *pStd = GET_ROWCELL_INTERBUF(pResInfo);
if (pCtx->currentStage == REPEAT_SCAN && pStd->stage == 0) {
if (pCtx->scanFlag == REPEAT_SCAN && pStd->stage == 0) {
pStd->stage++;
avg_finalizer(pCtx);
@ -1814,7 +1814,7 @@ static STopBotInfo *getTopBotOutputInfo(SqlFunctionCtx *pCtx) {
SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx);
// only the first_stage_merge is directly written data into final output buffer
if (pCtx->stableQuery && pCtx->currentStage != MERGE_STAGE) {
if (pCtx->stableQuery && pCtx->scanFlag != MERGE_STAGE) {
return (STopBotInfo*) pCtx->pOutput;
} else { // during normal table query and super table at the secondary_stage, result is written to intermediate buffer
return GET_ROWCELL_INTERBUF(pResInfo);
@ -1956,7 +1956,7 @@ static void top_func_merge(SqlFunctionCtx *pCtx) {
for (int32_t i = 0; i < pInput->num; ++i) {
int16_t type = (pCtx->resDataInfo.type == TSDB_DATA_TYPE_FLOAT)? TSDB_DATA_TYPE_DOUBLE:pCtx->resDataInfo.type;
// do_top_function_add(pOutput, (int32_t)pCtx->param[0].param.i, &pInput->res[i]->v.i, pInput->res[i]->timestamp,
// type, &pCtx->tagInfo, pInput->res[i]->pTags, pCtx->currentStage);
// type, &pCtx->tagInfo, pInput->res[i]->pTags, pCtx->scanFlag);
}
SET_VAL(pCtx, pInput->num, pOutput->num);
@ -2013,7 +2013,7 @@ static void bottom_func_merge(SqlFunctionCtx *pCtx) {
for (int32_t i = 0; i < pInput->num; ++i) {
int16_t type = (pCtx->resDataInfo.type == TSDB_DATA_TYPE_FLOAT) ? TSDB_DATA_TYPE_DOUBLE : pCtx->resDataInfo.type;
// do_bottom_function_add(pOutput, (int32_t)pCtx->param[0].param.i, &pInput->res[i]->v.i, pInput->res[i]->timestamp, type,
// &pCtx->tagInfo, pInput->res[i]->pTags, pCtx->currentStage);
// &pCtx->tagInfo, pInput->res[i]->pTags, pCtx->scanFlag);
}
SET_VAL(pCtx, pInput->num, pOutput->num);
@ -2073,7 +2073,7 @@ static void percentile_function(SqlFunctionCtx *pCtx) {
SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx);
SPercentileInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo);
if (pCtx->currentStage == REPEAT_SCAN && pInfo->stage == 0) {
if (pCtx->scanFlag == REPEAT_SCAN && pInfo->stage == 0) {
pInfo->stage += 1;
// all data are null, set it completed
@ -2180,7 +2180,7 @@ static SAPercentileInfo *getAPerctInfo(SqlFunctionCtx *pCtx) {
SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx);
SAPercentileInfo* pInfo = NULL;
if (pCtx->stableQuery && pCtx->currentStage != MERGE_STAGE) {
if (pCtx->stableQuery && pCtx->scanFlag != MERGE_STAGE) {
pInfo = (SAPercentileInfo*) pCtx->pOutput;
} else {
pInfo = GET_ROWCELL_INTERBUF(pResInfo);
@ -2270,7 +2270,7 @@ static void apercentile_finalizer(SqlFunctionCtx *pCtx) {
SResultRowEntryInfo * pResInfo = GET_RES_INFO(pCtx);
SAPercentileInfo *pOutput = GET_ROWCELL_INTERBUF(pResInfo);
if (pCtx->currentStage == MERGE_STAGE) {
if (pCtx->scanFlag == MERGE_STAGE) {
// if (pResInfo->hasResult == DATA_SET_FLAG) { // check for null
// assert(pOutput->pHisto->numOfElems > 0);
//
@ -2510,7 +2510,7 @@ static void copy_function(SqlFunctionCtx *pCtx);
static void tag_function(SqlFunctionCtx *pCtx) {
SET_VAL(pCtx, 1, 1);
if (pCtx->currentStage == MERGE_STAGE) {
if (pCtx->scanFlag == MERGE_STAGE) {
copy_function(pCtx);
} else {
taosVariantDump(&pCtx->tag, pCtx->pOutput, pCtx->resDataInfo.type, true);
@ -2966,7 +2966,7 @@ static bool spread_function_setup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pRe
SSpreadInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo);
// this is the server-side setup function in client-side, the secondary merge do not need this procedure
if (pCtx->currentStage == MERGE_STAGE) {
if (pCtx->scanFlag == MERGE_STAGE) {
// pCtx->param[0].param.d = DBL_MAX;
// pCtx->param[3].param.d = -DBL_MAX;
} else {
@ -3086,7 +3086,7 @@ void spread_function_finalizer(SqlFunctionCtx *pCtx) {
*/
SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx);
if (pCtx->currentStage == MERGE_STAGE) {
if (pCtx->scanFlag == MERGE_STAGE) {
assert(pCtx->inputType == TSDB_DATA_TYPE_BINARY);
// if (pResInfo->hasResult != DATA_SET_FLAG) {

View File

@ -1142,9 +1142,9 @@ static int32_t jsonToPhysiTableScanNode(const SJson* pJson, void* pObj) {
return code;
}
static int32_t physiStreamScanNodeToJson(const void* pObj, SJson* pJson) { return physiScanNodeToJson(pObj, pJson); }
static int32_t physiStreamScanNodeToJson(const void* pObj, SJson* pJson) { return physiTableScanNodeToJson(pObj, pJson); }
static int32_t jsonToPhysiStreamScanNode(const SJson* pJson, void* pObj) { return jsonToPhysiScanNode(pJson, pObj); }
static int32_t jsonToPhysiStreamScanNode(const SJson* pJson, void* pObj) { return jsonToPhysiTableScanNode(pJson, pObj); }
static const char* jkSysTableScanPhysiPlanMnodeEpSet = "MnodeEpSet";
static const char* jkSysTableScanPhysiPlanShowRewrite = "ShowRewrite";

View File

@ -460,9 +460,13 @@ static int32_t createTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubp
memcpy(pTableScan->scanSeq, pScanLogicNode->scanSeq, sizeof(pScanLogicNode->scanSeq));
pTableScan->scanRange = pScanLogicNode->scanRange;
pTableScan->ratio = pScanLogicNode->ratio;
vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
taosArrayPush(pCxt->pExecNodeList, &pSubplan->execNode);
pSubplan->execNodeStat.tableNum = pScanLogicNode->pVgroupList->vgroups[0].numOfTable;
if (pScanLogicNode->pVgroupList) {
vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
pSubplan->execNodeStat.tableNum = pScanLogicNode->pVgroupList->vgroups[0].numOfTable;
}
if (pCxt->pExecNodeList) {
taosArrayPush(pCxt->pExecNodeList, &pSubplan->execNode);
}
tNameGetFullDbName(&pScanLogicNode->tableName, pSubplan->dbFName);
pTableScan->dataRequired = pScanLogicNode->dataRequired;
pTableScan->pDynamicScanFuncs = nodesCloneList(pScanLogicNode->pDynamicScanFuncs);
@ -505,13 +509,12 @@ static int32_t createSystemTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan*
static int32_t createStreamScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode,
SPhysiNode** pPhyNode) {
SStreamScanPhysiNode* pScan =
(SStreamScanPhysiNode*)makePhysiNode(pCxt, pScanLogicNode->pMeta->tableInfo.precision,
(SLogicNode*)pScanLogicNode, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN);
if (NULL == pScan) {
return TSDB_CODE_OUT_OF_MEMORY;
int32_t res = createTableScanPhysiNode(pCxt, pSubplan, pScanLogicNode, pPhyNode);
if (res == TSDB_CODE_SUCCESS) {
ENodeType type = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN;
setNodeType(*pPhyNode, type);
}
return createScanPhysiNodeFinalize(pCxt, pScanLogicNode, (SScanPhysiNode*)pScan, pPhyNode);
return res;
}
static int32_t createScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode,
@ -786,7 +789,7 @@ static int32_t doCreateExchangePhysiNode(SPhysiPlanContext* pCxt, SExchangeLogic
}
static int32_t createStreamScanPhysiNodeByExchange(SPhysiPlanContext* pCxt, SExchangeLogicNode* pExchangeLogicNode,
SPhysiNode** pPhyNode) {
SStreamScanPhysiNode* pScan = (SStreamScanPhysiNode*)makePhysiNode(
SScanPhysiNode* pScan = (SScanPhysiNode*)makePhysiNode(
pCxt, pExchangeLogicNode->precision, (SLogicNode*)pExchangeLogicNode, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN);
if (NULL == pScan) {
return TSDB_CODE_OUT_OF_MEMORY;

View File

@ -154,7 +154,7 @@ int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, in
// sink
if (pTask->sinkType == TASK_SINK__TABLE) {
/*blockDebugShowData(pRes);*/
blockDebugShowData(pRes);
pTask->tbSink.tbSinkFunc(pTask, pTask->tbSink.vnode, 0, pRes);
} else if (pTask->sinkType == TASK_SINK__SMA) {
pTask->smaSink.smaSink(pTask->ahandle, pTask->smaSink.smaId, pRes);

View File

@ -21,15 +21,16 @@ typedef struct SCliConn {
uv_connect_t connReq;
uv_stream_t* stream;
uv_write_t writeReq;
void* hostThrd;
SConnBuffer readBuf;
void* data;
STransQueue cliMsgs;
queue conn;
uint64_t expireTime;
int hThrdIdx;
STransCtx ctx;
void* hostThrd;
int hThrdIdx;
SConnBuffer readBuf;
STransQueue cliMsgs;
queue conn;
uint64_t expireTime;
STransCtx ctx;
bool broken; // link broken or not
ConnStatus status; //
@ -157,13 +158,11 @@ static void cliWalkCb(uv_handle_t* handle, void* arg);
transClearBuffer(&conn->readBuf); \
transFreeMsg(transContFromHead((char*)head)); \
tDebug("cli conn %p receive release request, ref: %d", conn, T_REF_VAL_GET(conn)); \
while (T_REF_VAL_GET(conn) > 1) { \
transUnrefCliHandle(conn); \
} \
if (T_REF_VAL_GET(conn) == 1) { \
if (T_REF_VAL_GET(conn) > 1) { \
transUnrefCliHandle(conn); \
} \
destroyCmsg(pMsg); \
addConnToPool(((SCliThrdObj*)conn->hostThrd)->pool, conn); \
return; \
} \
} while (0)

View File

@ -35,7 +35,6 @@ typedef struct SSrvConn {
uv_timer_t pTimer;
queue queue;
int persist; // persist connection or not
SConnBuffer readBuf; // read buf,
int inType;
void* pTransInst; // rpc init
@ -138,6 +137,7 @@ static void destroySmsg(SSrvMsg* smsg);
// check whether already read complete packet
static SSrvConn* createConn(void* hThrd);
static void destroyConn(SSrvConn* conn, bool clear /*clear handle or not*/);
static int reallocConnRefHandle(SSrvConn* conn);
static void uvHandleQuit(SSrvMsg* msg, SWorkThrdObj* thrd);
static void uvHandleRelease(SSrvMsg* msg, SWorkThrdObj* thrd);
@ -164,7 +164,7 @@ static void* transWorkerThread(void* arg);
static void* transAcceptThread(void* arg);
// add handle loop
static bool addHandleToWorkloop(SWorkThrdObj* pThrd,char *pipeName);
static bool addHandleToWorkloop(SWorkThrdObj* pThrd, char* pipeName);
static bool addHandleToAcceptloop(void* arg);
#define CONN_SHOULD_RELEASE(conn, head) \
@ -180,6 +180,7 @@ static bool addHandleToAcceptloop(void* arg);
srvMsg->msg = tmsg; \
srvMsg->type = Release; \
srvMsg->pConn = conn; \
reallocConnRefHandle(conn); \
if (!transQueuePush(&conn->srvMsgs, srvMsg)) { \
return; \
} \
@ -360,10 +361,14 @@ void uvOnSendCb(uv_write_t* req, int status) {
tTrace("server conn %p data already was written on stream", conn);
if (!transQueueEmpty(&conn->srvMsgs)) {
SSrvMsg* msg = transQueuePop(&conn->srvMsgs);
if (msg->type == Release && conn->status != ConnNormal) {
conn->status = ConnNormal;
transUnrefSrvHandle(conn);
}
// if (msg->type == Release && conn->status != ConnNormal) {
// conn->status = ConnNormal;
// transUnrefSrvHandle(conn);
// reallocConnRefHandle(conn);
// destroySmsg(msg);
// transQueueClear(&conn->srvMsgs);
// return;
//}
destroySmsg(msg);
// send second data, just use for push
if (!transQueueEmpty(&conn->srvMsgs)) {
@ -421,8 +426,15 @@ static void uvPrepareSendData(SSrvMsg* smsg, uv_buf_t* wb) {
if (pConn->status == ConnNormal) {
pHead->msgType = pConn->inType + 1;
} else {
pHead->msgType = smsg->type == Release ? 0 : pMsg->msgType;
if (smsg->type == Release) {
pHead->msgType = 0;
pConn->status = ConnNormal;
transUnrefSrvHandle(pConn);
} else {
pHead->msgType = pMsg->msgType;
}
}
pHead->release = smsg->type == Release ? 1 : 0;
pHead->code = htonl(pMsg->code);
@ -517,7 +529,7 @@ void uvWorkerAsyncCb(uv_async_t* handle) {
int64_t refId = transMsg.refId;
SExHandle* exh2 = uvAcquireExHandle(refId);
if (exh2 == NULL || exh1 != exh2) {
tTrace("server handle %p except msg, ignore it", exh1);
tTrace("server handle except msg %p, ignore it", exh1);
uvReleaseExHandle(refId);
destroySmsg(msg);
continue;
@ -581,11 +593,12 @@ void uvOnAcceptCb(uv_stream_t* stream, int status) {
if (uv_accept(stream, (uv_stream_t*)cli) == 0) {
if (pObj->numOfWorkerReady < pObj->numOfThreads) {
tError("worker-threads are not ready for all, need %d instead of %d.", pObj->numOfThreads, pObj->numOfWorkerReady);
tError("worker-threads are not ready for all, need %d instead of %d.", pObj->numOfThreads,
pObj->numOfWorkerReady);
uv_close((uv_handle_t*)cli, NULL);
return;
}
uv_write_t* wr = (uv_write_t*)taosMemoryMalloc(sizeof(uv_write_t));
wr->data = cli;
uv_buf_t buf = uv_buf_init((char*)notify, strlen(notify));
@ -681,14 +694,14 @@ void* transAcceptThread(void* arg) {
return NULL;
}
void uvOnPipeConnectionCb(uv_connect_t *connect, int status) {
void uvOnPipeConnectionCb(uv_connect_t* connect, int status) {
if (status != 0) {
return;
}
SWorkThrdObj* pThrd = container_of(connect, SWorkThrdObj, connect_req);
uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb);
}
static bool addHandleToWorkloop(SWorkThrdObj* pThrd,char *pipeName) {
static bool addHandleToWorkloop(SWorkThrdObj* pThrd, char* pipeName) {
pThrd->loop = (uv_loop_t*)taosMemoryMalloc(sizeof(uv_loop_t));
if (0 != uv_loop_init(pThrd->loop)) {
return false;
@ -787,6 +800,19 @@ static void destroyConn(SSrvConn* conn, bool clear) {
// uv_shutdown(req, (uv_stream_t*)conn->pTcp, uvShutDownCb);
}
}
static int reallocConnRefHandle(SSrvConn* conn) {
uvReleaseExHandle(conn->refId);
uvRemoveExHandle(conn->refId);
// avoid app continue to send msg on invalid handle
SExHandle* exh = taosMemoryMalloc(sizeof(SExHandle));
exh->handle = conn;
exh->pThrd = conn->hostThrd;
exh->refId = uvAddExHandle(exh);
uvAcquireExHandle(exh->refId);
conn->refId = exh->refId;
return 0;
}
static void uvDestroyConn(uv_handle_t* handle) {
SSrvConn* conn = handle->data;
if (conn == NULL) {
@ -822,7 +848,7 @@ static void uvPipeListenCb(uv_stream_t* handle, int status) {
ASSERT(status == 0);
SServerObj* srv = container_of(handle, SServerObj, pipeListen);
uv_pipe_t* pipe = &(srv->pipe[srv->numOfWorkerReady][0]);
uv_pipe_t* pipe = &(srv->pipe[srv->numOfWorkerReady][0]);
ASSERT(0 == uv_pipe_init(srv->loop, pipe, 1));
ASSERT(0 == uv_accept((uv_stream_t*)&srv->pipeListen, (uv_stream_t*)pipe));
@ -859,7 +885,8 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
snprintf(pipeName, sizeof(pipeName), "\\\\?\\pipe\\trans.rpc.%p-%lu", taosSafeRand(), GetCurrentProcessId());
#else
char pipeName[PATH_MAX] = {0};
snprintf(pipeName, sizeof(pipeName), "%s%spipe.trans.rpc.%08X-%lu", tsTempDir, TD_DIRSEP, taosSafeRand(), taosGetSelfPthreadId());
snprintf(pipeName, sizeof(pipeName), "%s%spipe.trans.rpc.%08X-%lu", tsTempDir, TD_DIRSEP, taosSafeRand(),
taosGetSelfPthreadId());
#endif
assert(0 == uv_pipe_bind(&srv->pipeListen, pipeName));
assert(0 == uv_listen((uv_stream_t*)&srv->pipeListen, SOMAXCONN, uvPipeListenCb));
@ -874,7 +901,7 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
srv->pipe[i] = (uv_pipe_t*)taosMemoryCalloc(2, sizeof(uv_pipe_t));
thrd->pipe = &(srv->pipe[i][1]); // init read
if (false == addHandleToWorkloop(thrd,pipeName)) {
if (false == addHandleToWorkloop(thrd, pipeName)) {
goto End;
}
int err = taosThreadCreate(&(thrd->thread), NULL, transWorkerThread, (void*)(thrd));
@ -959,6 +986,7 @@ void uvHandleQuit(SSrvMsg* msg, SWorkThrdObj* thrd) {
void uvHandleRelease(SSrvMsg* msg, SWorkThrdObj* thrd) {
SSrvConn* conn = msg->pConn;
if (conn->status == ConnAcquire) {
reallocConnRefHandle(conn);
if (!transQueuePush(&conn->srvMsgs, msg)) {
return;
}

View File

@ -0,0 +1,45 @@
#!/bin/bash
case_file=/tmp/cases.task
function usage() {
echo "$0"
echo -e "\t -o output case file"
echo -e "\t -e enterprise edition"
echo -e "\t -h help"
}
ent=0
while getopts "o:eh" opt; do
case $opt in
o)
case_file=$OPTARG
;;
e)
ent=1
;;
h)
usage
exit 0
;;
\?)
echo "Invalid option: -$OPTARG"
usage
exit 0
;;
esac
done
script_dir=`dirname $0`
cd $script_dir
if [ $ent -eq 0 ]; then
echo ",,unit-test,bash test.sh" >$case_file
else
echo ",,unit-test,bash test.sh -e" >$case_file
fi
cat ../script/jenkins/basic.txt |grep -v "^#"|grep -v "^$"|sed "s/^/,,script,/" >>$case_file
grep "^python" ../system-test/fulltest.sh |sed "s/^/,,system-test,/" >>$case_file
exit 0

View File

@ -0,0 +1,59 @@
#!/bin/bash
function usage() {
echo "$0"
echo -e "\t -w work dir"
echo -e "\t -e enterprise edition"
echo -e "\t -t make thread count"
echo -e "\t -h help"
}
ent=0
while getopts "w:t:eh" opt; do
case $opt in
w)
WORKDIR=$OPTARG
;;
e)
ent=1
;;
t)
THREAD_COUNT=$OPTARG
;;
h)
usage
exit 0
;;
\?)
echo "Invalid option: -$OPTARG"
usage
exit 0
;;
esac
done
if [ -z "$WORKDIR" ]; then
usage
exit 1
fi
if [ -z "$THREAD_COUNT" ]; then
THREAD_COUNT=1
fi
ulimit -c unlimited
if [ $ent -eq 0 ]; then
REP_DIR=/home/TDengine
REP_MOUNT_PARAM=$WORKDIR/TDengine:/home/TDengine
else
REP_DIR=/home/TDinternal
REP_MOUNT_PARAM=$WORKDIR/TDinternal:/home/TDinternal
fi
docker run \
-v $REP_MOUNT_PARAM \
--rm --ulimit core=-1 taos_test:v1.0 sh -c "cd $REP_DIR;rm -rf debug;mkdir -p debug;cd debug;cmake .. -DBUILD_TOOLS=true;make -j $THREAD_COUNT"
ret=$?
exit $ret

357
tests/parallel_test/run.sh Executable file
View File

@ -0,0 +1,357 @@
#!/bin/bash
function usage() {
echo "$0"
echo -e "\t -m vm config file"
echo -e "\t -t task file"
echo -e "\t -b branch"
echo -e "\t -l log dir"
echo -e "\t -e enterprise edition"
echo -e "\t -o default timeout value"
echo -e "\t -h help"
}
ent=0
while getopts "m:t:b:l:o:eh" opt; do
case $opt in
m)
config_file=$OPTARG
;;
t)
t_file=$OPTARG
;;
b)
branch=$OPTARG
;;
l)
log_dir=$OPTARG
;;
e)
ent=1
;;
o)
timeout_param="-o $OPTARG"
;;
h)
usage
exit 0
;;
\?)
echo "Invalid option: -$OPTARG"
usage
exit 0
;;
esac
done
#config_file=$1
if [ -z $config_file ]; then
usage
exit 1
fi
if [ ! -f $config_file ]; then
echo "$config_file not found"
usage
exit 1
fi
#t_file=$2
if [ -z $t_file ]; then
usage
exit 1
fi
if [ ! -f $t_file ]; then
echo "$t_file not found"
usage
exit 1
fi
date_tag=`date +%Y%m%d-%H%M%S`
if [ -z $log_dir ]; then
log_dir="log/${branch}_${date_tag}"
else
log_dir="$log_dir/${branch}_${date_tag}"
fi
hosts=()
usernames=()
passwords=()
workdirs=()
threads=()
i=0
while [ 1 ]; do
host=`jq .[$i].host $config_file`
if [ "$host" = "null" ]; then
break
fi
username=`jq .[$i].username $config_file`
if [ "$username" = "null" ]; then
break
fi
password=`jq .[$i].password $config_file`
if [ "$password" = "null" ]; then
password=""
fi
workdir=`jq .[$i].workdir $config_file`
if [ "$workdir" = "null" ]; then
break
fi
thread=`jq .[$i].thread $config_file`
if [ "$thread" = "null" ]; then
break
fi
hosts[i]=`echo $host|sed 's/\"$//'|sed 's/^\"//'`
usernames[i]=`echo $username|sed 's/\"$//'|sed 's/^\"//'`
passwords[i]=`echo $password|sed 's/\"$//'|sed 's/^\"//'`
workdirs[i]=`echo $workdir|sed 's/\"$//'|sed 's/^\"//'`
threads[i]=$thread
i=$(( i + 1 ))
done
function prepare_cases() {
cat $t_file >>$task_file
local i=0
while [ $i -lt $1 ]; do
echo "%%FINISHED%%" >>$task_file
i=$(( i + 1 ))
done
}
function clean_tmp() {
# clean tmp dir
local index=$1
local ssh_script="sshpass -p ${passwords[index]} ssh -o StrictHostKeyChecking=no ${usernames[index]}@${hosts[index]}"
if [ -z ${passwords[index]} ]; then
ssh_script="ssh -o StrictHostKeyChecking=no ${usernames[index]}@${hosts[index]}"
fi
local cmd="${ssh_script} rm -rf ${workdirs[index]}/tmp"
${cmd}
}
function run_thread() {
local index=$1
local thread_no=$2
local runcase_script="sshpass -p ${passwords[index]} ssh -o StrictHostKeyChecking=no ${usernames[index]}@${hosts[index]}"
if [ -z ${passwords[index]} ]; then
runcase_script="ssh -o StrictHostKeyChecking=no ${usernames[index]}@${hosts[index]}"
fi
local count=0
local script="${workdirs[index]}/TDengine/tests/parallel_test/run_container.sh"
if [ $ent -ne 0 ]; then
local script="${workdirs[index]}/TDinternal/community/tests/parallel_test/run_container.sh -e"
fi
local cmd="${runcase_script} ${script}"
# script="echo"
while [ 1 ]; do
local line=`flock -x $lock_file -c "head -n1 $task_file;sed -i \"1d\" $task_file"`
if [ "x$line" = "x%%FINISHED%%" ]; then
# echo "$index . $thread_no EXIT"
break
fi
if [ -z "$line" ]; then
continue
fi
echo "$line"|grep -q "^#"
if [ $? -eq 0 ]; then
continue
fi
local case_redo_time=`echo "$line"|cut -d, -f2`
if [ -z "$case_redo_time" ]; then
case_redo_time=${DEFAULT_RETRY_TIME:-2}
fi
local exec_dir=`echo "$line"|cut -d, -f3`
local case_cmd=`echo "$line"|cut -d, -f4`
local case_file=""
echo "$case_cmd"|grep -q "\.sh"
if [ $? -eq 0 ]; then
case_file=`echo "$case_cmd"|grep -o ".*\.sh"|awk '{print $NF}'`
fi
echo "$case_cmd"|grep -q "^python3"
if [ $? -eq 0 ]; then
case_file=`echo "$case_cmd"|grep -o ".*\.py"|awk '{print $NF}'`
fi
echo "$case_cmd"|grep -q "\.sim"
if [ $? -eq 0 ]; then
case_file=`echo "$case_cmd"|grep -o ".*\.sim"|awk '{print $NF}'`
fi
if [ -z "$case_file" ]; then
case_file=`echo "$case_cmd"|awk '{print $NF}'`
fi
if [ -z "$case_file" ]; then
continue
fi
case_file="$exec_dir/${case_file}.${index}.${thread_no}.${count}"
count=$(( count + 1 ))
local case_path=`dirname "$case_file"`
if [ ! -z "$case_path" ]; then
mkdir -p $log_dir/$case_path
fi
cmd="${runcase_script} ${script} -w ${workdirs[index]} -c \"${case_cmd}\" -t ${thread_no} -d ${exec_dir} ${timeout_param}"
# echo "$thread_no $count $cmd"
local ret=0
local redo_count=1
start_time=`date +%s`
while [ ${redo_count} -lt 6 ]; do
if [ -f $log_dir/$case_file.log ]; then
cp $log_dir/$case_file.log $log_dir/$case_file.${redo_count}.redolog
fi
echo "${hosts[index]}-${thread_no} order:${count}, redo:${redo_count} task:${line}" >$log_dir/$case_file.log
echo -e "\e[33m >>>>> \e[0m ${case_cmd}"
date >>$log_dir/$case_file.log
# $cmd 2>&1 | tee -a $log_dir/$case_file.log
# ret=${PIPESTATUS[0]}
$cmd >>$log_dir/$case_file.log 2>&1
ret=$?
echo "${hosts[index]} `date` ret:${ret}" >>$log_dir/$case_file.log
if [ $ret -eq 0 ]; then
break
fi
redo=0
grep -q "wait too long for taosd start" $log_dir/$case_file.log
if [ $? -eq 0 ]; then
redo=1
fi
grep -q "kex_exchange_identification: Connection closed by remote host" $log_dir/$case_file.log
if [ $? -eq 0 ]; then
redo=1
fi
grep -q "ssh_exchange_identification: Connection closed by remote host" $log_dir/$case_file.log
if [ $? -eq 0 ]; then
redo=1
fi
grep -q "kex_exchange_identification: read: Connection reset by peer" $log_dir/$case_file.log
if [ $? -eq 0 ]; then
redo=1
fi
grep -q "Database not ready" $log_dir/$case_file.log
if [ $? -eq 0 ]; then
redo=1
fi
grep -q "Unable to establish connection" $log_dir/$case_file.log
if [ $? -eq 0 ]; then
redo=1
fi
if [ $redo_count -lt $case_redo_time ]; then
redo=1
fi
if [ $redo -eq 0 ]; then
break
fi
redo_count=$(( redo_count + 1 ))
done
end_time=`date +%s`
echo >>$log_dir/$case_file.log
echo "${hosts[index]} execute time: $(( end_time - start_time ))s" >>$log_dir/$case_file.log
# echo "$thread_no ${line} DONE"
if [ $ret -ne 0 ]; then
flock -x $lock_file -c "echo \"${hosts[index]} ret:${ret} ${line}\" >>$log_dir/failed.log"
mkdir -p $log_dir/${case_file}.coredump
local remote_coredump_dir="${workdirs[index]}/tmp/thread_volume/$thread_no/coredump"
local scpcmd="sshpass -p ${passwords[index]} scp -o StrictHostKeyChecking=no -r ${usernames[index]}@${hosts[index]}"
if [ -z ${passwords[index]} ]; then
scpcmd="scp -o StrictHostKeyChecking=no -r ${usernames[index]}@${hosts[index]}"
fi
cmd="$scpcmd:${remote_coredump_dir}/* $log_dir/${case_file}.coredump/"
$cmd # 2>/dev/null
local case_info=`echo "$line"|cut -d, -f 3,4`
local corefile=`ls $log_dir/${case_file}.coredump/`
corefile=`find $log_dir/${case_file}.coredump/ -name "core.*"`
echo -e "$case_info \e[31m failed\e[0m"
echo "=========================log============================"
cat $log_dir/$case_file.log
echo "====================================================="
echo -e "\e[34m log file: $log_dir/$case_file.log \e[0m"
if [ ! -z "$corefile" ]; then
echo -e "\e[34m corefiles: $corefile \e[0m"
local build_dir=$log_dir/build_${hosts[index]}
local remote_build_dir="${workdirs[index]}/TDengine/debug/build"
if [ $ent -ne 0 ]; then
remote_build_dir="${workdirs[index]}/TDinternal/debug/build"
fi
mkdir $build_dir 2>/dev/null
if [ $? -eq 0 ]; then
# scp build binary
cmd="$scpcmd:${remote_build_dir}/* ${build_dir}/"
echo "$cmd"
$cmd >/dev/null
fi
fi
# get remote sim dir
local remote_sim_dir="${workdirs[index]}/tmp/thread_volume/$thread_no"
local tarcmd="sshpass -p ${passwords[index]} ssh -o StrictHostKeyChecking=no -r ${usernames[index]}@${hosts[index]}"
if [ -z ${passwords[index]} ]; then
tarcmd="ssh -o StrictHostKeyChecking=no ${usernames[index]}@${hosts[index]}"
fi
cmd="$tarcmd sh -c \"cd $remote_sim_dir; tar -czf sim.tar.gz sim\""
$cmd
local remote_sim_tar="${workdirs[index]}/tmp/thread_volume/$thread_no/sim.tar.gz"
scpcmd="sshpass -p ${passwords[index]} scp -o StrictHostKeyChecking=no -r ${usernames[index]}@${hosts[index]}"
if [ -z ${passwords[index]} ]; then
scpcmd="scp -o StrictHostKeyChecking=no -r ${usernames[index]}@${hosts[index]}"
fi
cmd="$scpcmd:${remote_sim_tar} $log_dir/${case_file}.sim.tar.gz"
$cmd
fi
done
}
# echo "hosts: ${hosts[@]}"
# echo "usernames: ${usernames[@]}"
# echo "passwords: ${passwords[@]}"
# echo "workdirs: ${workdirs[@]}"
# echo "threads: ${threads[@]}"
# TODO: check host accessibility
i=0
while [ $i -lt ${#hosts[*]} ]; do
clean_tmp $i &
i=$(( i + 1 ))
done
wait
mkdir -p $log_dir
rm -rf $log_dir/*
task_file=$log_dir/$$.task
lock_file=$log_dir/$$.lock
i=0
j=0
while [ $i -lt ${#hosts[*]} ]; do
j=$(( j + threads[i] ))
i=$(( i + 1 ))
done
prepare_cases $j
i=0
while [ $i -lt ${#hosts[*]} ]; do
j=0
while [ $j -lt ${threads[i]} ]; do
run_thread $i $j &
j=$(( j + 1 ))
done
i=$(( i + 1 ))
done
wait
rm -f $lock_file
rm -f $task_file
# docker ps -a|grep -v CONTAINER|awk '{print $1}'|xargs docker rm -f
RET=0
i=1
if [ -f "$log_dir/failed.log" ]; then
echo "====================================================="
while read line; do
line=`echo "$line"|cut -d, -f 3,4`
echo -e "$i. $line \e[31m failed\e[0m" >&2
i=$(( i + 1 ))
done <$log_dir/failed.log
RET=1
fi
echo "${log_dir}" >&2
date
exit $RET

74
tests/parallel_test/run_case.sh Executable file
View File

@ -0,0 +1,74 @@
#!/bin/bash
function usage() {
echo "$0"
echo -e "\t -d execution dir"
echo -e "\t -c command"
echo -e "\t -e enterprise edition"
echo -e "\t -o default timeout value"
echo -e "\t -h help"
}
ent=0
while getopts "d:c:o:eh" opt; do
case $opt in
d)
exec_dir=$OPTARG
;;
c)
cmd=$OPTARG
;;
o)
TIMEOUT_CMD="timeout $OPTARG"
;;
e)
ent=1
;;
h)
usage
exit 0
;;
\?)
echo "Invalid option: -$OPTARG"
usage
exit 0
;;
esac
done
if [ -z "$exec_dir" ]; then
usage
exit 0
fi
if [ -z "$cmd" ]; then
usage
exit 0
fi
if [ $ent -eq 0 ]; then
export PATH=$PATH:/home/TDengine/debug/build/bin
export LD_LIBRARY_PATH=/home/TDengine/debug/build/lib
ln -s /home/TDengine/debug/build/lib/libtaos.so /usr/lib/libtaos.so 2>/dev/null
CONTAINER_TESTDIR=/home/TDengine
else
export PATH=$PATH:/home/TDinternal/debug/build/bin
export LD_LIBRARY_PATH=/home/TDinternal/debug/build/lib
ln -s /home/TDinternal/debug/build/lib/libtaos.so /usr/lib/libtaos.so 2>/dev/null
CONTAINER_TESTDIR=/home/TDinternal/community
fi
mkdir -p /var/lib/taos/subscribe
mkdir -p /var/log/taos
mkdir -p /var/lib/taos
cd $CONTAINER_TESTDIR/tests/$exec_dir
ulimit -c unlimited
$TIMEOUT_CMD $cmd
RET=$?
if [ $RET -ne 0 ]; then
pwd
fi
exit $RET

View File

@ -0,0 +1,106 @@
#!/bin/bash
function usage() {
echo "$0"
echo -e "\t -w work dir"
echo -e "\t -d execution dir"
echo -e "\t -c command"
echo -e "\t -t thread number"
echo -e "\t -e enterprise edition"
echo -e "\t -o default timeout value"
echo -e "\t -h help"
}
ent=0
while getopts "w:d:c:t:o:eh" opt; do
case $opt in
w)
WORKDIR=$OPTARG
;;
d)
exec_dir=$OPTARG
;;
c)
cmd=$OPTARG
;;
t)
thread_no=$OPTARG
;;
e)
ent=1
;;
o)
extra_param="-o $OPTARG"
;;
h)
usage
exit 0
;;
\?)
echo "Invalid option: -$OPTARG"
usage
exit 0
;;
esac
done
if [ -z "$WORKDIR" ]; then
usage
exit 1
fi
if [ -z "$exec_dir" ]; then
usage
exit 1
fi
if [ -z "$cmd" ]; then
usage
exit 1
fi
if [ -z "$thread_no" ]; then
usage
exit 1
fi
if [ $ent -ne 0 ]; then
# enterprise edition
extra_param="$extra_param -e"
INTERNAL_REPDIR=$WORKDIR/TDinternal
REPDIR=$INTERNAL_REPDIR/community
CONTAINER_TESTDIR=/home/TDinternal/community
SIM_DIR=/home/TDinternal/sim
REP_MOUNT_PARAM="$INTERNAL_REPDIR:/home/TDinternal"
else
# community edition
REPDIR=$WORKDIR/TDengine
CONTAINER_TESTDIR=/home/TDengine
SIM_DIR=/home/TDengine/sim
REP_MOUNT_PARAM="$REPDIR:/home/TDengine"
fi
ulimit -c unlimited
TMP_DIR=$WORKDIR/tmp
MOUNT_DIR=""
rm -rf ${TMP_DIR}/thread_volume/$thread_no/sim
mkdir -p ${TMP_DIR}/thread_volume/$thread_no/sim/tsim
mkdir -p ${TMP_DIR}/thread_volume/$thread_no/coredump
rm -rf ${TMP_DIR}/thread_volume/$thread_no/coredump/*
if [ ! -d "${TMP_DIR}/thread_volume/$thread_no/$exec_dir" ]; then
subdir=`echo "$exec_dir"|cut -d/ -f1`
echo "cp -rf ${REPDIR}/tests/$subdir ${TMP_DIR}/thread_volume/$thread_no/"
cp -rf ${REPDIR}/tests/$subdir ${TMP_DIR}/thread_volume/$thread_no/
fi
MOUNT_DIR="$TMP_DIR/thread_volume/$thread_no/$exec_dir:$CONTAINER_TESTDIR/tests/$exec_dir"
echo "$thread_no -> ${exec_dir}:$cmd"
coredump_dir=`cat /proc/sys/kernel/core_pattern | xargs dirname`
docker run \
-v $REP_MOUNT_PARAM \
-v $MOUNT_DIR \
-v "$TMP_DIR/thread_volume/$thread_no/sim:${SIM_DIR}" \
-v ${TMP_DIR}/thread_volume/$thread_no/coredump:$coredump_dir \
-v $WORKDIR/taos-connector-python/taos:/usr/local/lib/python3.8/site-packages/taos:ro \
--rm --ulimit core=-1 taos_test:v1.0 $CONTAINER_TESTDIR/tests/parallel_test/run_case.sh -d "$exec_dir" -c "$cmd" $extra_param
ret=$?
exit $ret

View File

@ -35,7 +35,7 @@ class TDSimClient:
"tableIncStepPerVnode": "10000",
"maxVgroupsPerDb": "1000",
"sdbDebugFlag": "143",
"rpcDebugFlag": "135",
"rpcDebugFlag": "143",
"tmrDebugFlag": "131",
"cDebugFlag": "135",
"udebugFlag": "135",
@ -136,7 +136,7 @@ class TDDnode:
"tsdbDebugFlag": "135",
"mDebugFlag": "135",
"sdbDebugFlag": "135",
"rpcDebugFlag": "135",
"rpcDebugFlag": "143",
"tmrDebugFlag": "131",
"cDebugFlag": "135",
"httpDebugFlag": "135",

View File

@ -138,7 +138,8 @@ class TDTestCase:
if "2: service ok" in retVal:
tdLog.info("taos -k success")
else:
tdLog.exit("taos -k fail")
tdLog.info(retVal)
tdLog.exit("taos -k fail 1")
# stop taosd
tdDnodes.stop(1)
@ -149,7 +150,8 @@ class TDTestCase:
if "0: unavailable" in retVal:
tdLog.info("taos -k success")
else:
tdLog.exit("taos -k fail")
tdLog.info(retVal)
tdLog.exit("taos -k fail 2")
# restart taosd
tdDnodes.start(1)
@ -158,7 +160,8 @@ class TDTestCase:
if "2: service ok" in retVal:
tdLog.info("taos -k success")
else:
tdLog.exit("taos -k fail")
tdLog.info(retVal)
tdLog.exit("taos -k fail 3")
tdLog.printNoPrefix("================================ parameter: -n")
# stop taosd

View File

@ -0,0 +1,348 @@
import taos
import sys
import inspect
import traceback
from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
PRIVILEGES_ALL = "ALL"
PRIVILEGES_READ = "READ"
PRIVILEGES_WRITE = "WRITE"
class TDconnect:
def __init__(self,
host = None,
port = None,
user = None,
password = None,
database = None,
config = None,
) -> None:
self._conn = None
self._host = host
self._user = user
self._password = password
self._database = database
self._port = port
self._config = config
def __enter__(self):
self._conn = taos.connect(
host =self._host,
port =self._port,
user =self._user,
password=self._password,
database=self._database,
config =self._config
)
self.cursor = self._conn.cursor()
return self
def error(self, sql):
expectErrNotOccured = True
try:
self.cursor.execute(sql)
except BaseException:
expectErrNotOccured = False
if expectErrNotOccured:
caller = inspect.getframeinfo(inspect.stack()[1][0])
tdLog.exit(f"{caller.filename}({caller.lineno}) failed: sql:{sql}, expect error not occured" )
else:
self.queryRows = 0
self.queryCols = 0
self.queryResult = None
tdLog.info(f"sql:{sql}, expect error occured")
def query(self, sql, row_tag=None):
# sourcery skip: raise-from-previous-error, raise-specific-error
self.sql = sql
try:
self.cursor.execute(sql)
self.queryResult = self.cursor.fetchall()
self.queryRows = len(self.queryResult)
self.queryCols = len(self.cursor.description)
except Exception as e:
caller = inspect.getframeinfo(inspect.stack()[1][0])
tdLog.notice(f"{caller.filename}({caller.lineno}) failed: sql:{sql}, {repr(e)}")
traceback.print_exc()
raise Exception(repr(e))
if row_tag:
return self.queryResult
return self.queryRows
def __exit__(self, types, values, trace):
if self._conn:
self.cursor.close()
self._conn.close()
def taos_connect(
host = "127.0.0.1",
port = 6030,
user = "root",
passwd = "taosdata",
database= None,
config = None
):
return TDconnect(
host = host,
port=port,
user=user,
password=passwd,
database=database,
config=config
)
class TDTestCase:
def init(self, conn, logSql):
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor())
@property
def __user_list(self):
return [f"user_test{i}" for i in range(self.users_count) ]
@property
def __passwd_list(self):
return [f"taosdata{i}" for i in range(self.users_count) ]
@property
def __privilege(self):
return [ PRIVILEGES_ALL, PRIVILEGES_READ, PRIVILEGES_WRITE ]
def __priv_level(self, dbname=None):
return f"{dbname}.*" if dbname else "*.*"
def create_user_current(self):
users = self.__user_list
passwds = self.__passwd_list
for i in range(self.users_count):
tdSql.execute(f"create user {users[i]} pass '{passwds[i]}' ")
tdSql.query("show users")
tdSql.checkRows(self.users_count + 1)
def create_user_err(self):
sqls = [
"create users u1 pass 'u1passwd' ",
"create user '' pass 'u1passwd' ",
"create user pass 'u1passwd' ",
"create user u1 pass u1passwd ",
"create user u1 password 'u1passwd' ",
"create user u1 pass u1passwd ",
"create user u1 pass '' ",
"create user u1 pass ' ' ",
"create user u1 pass ",
"create user u1 u2 pass 'u1passwd' 'u2passwd' ",
"create user u1 u2 pass 'u1passwd', 'u2passwd' ",
"create user u1, u2 pass 'u1passwd', 'u2passwd' ",
"create user u1, u2 pass 'u1passwd' 'u2passwd' ",
# length of user_name must <= 23
"create user u12345678901234567890123 pass 'u1passwd' " ,
# length of passwd must <= 128
"create user u1 pass 'u12345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678' " ,
# password must have not " ' ~ ` \
"create user u1 pass 'u1passwd\\' " ,
"create user u1 pass 'u1passwd~' " ,
"create user u1 pass 'u1passwd\"' " ,
"create user u1 pass 'u1passwd\'' " ,
"create user u1 pass 'u1passwd`' " ,
# must after create a user named u1
"create user u1 pass 'u1passwd' " ,
]
tdSql.execute("create user u1 pass 'u1passwd' ")
for sql in sqls:
tdSql.error(sql)
tdSql.execute("DROP USER u1")
def __alter_pass_sql(self, user, passwd):
return f'''ALTER USER {user} PASS '{passwd}' '''
def alter_pass_current(self):
self.__init_pass = True
for count, i in enumerate(range(self.users_count)):
if self.__init_pass:
tdSql.query(self.__alter_pass_sql(self.__user_list[i], f"new{self.__passwd_list[i]}"))
self.__init_pass = count != self.users_count - 1
else:
tdSql.query(self.__alter_pass_sql(self.__user_list[i], self.__passwd_list[i] ) )
self.__init_pass = count == self.users_count - 1
def alter_pass_err(self): # sourcery skip: remove-redundant-fstring
sqls = [
f"alter users {self.__user_list[0]} pass 'newpass' " ,
f"alter user {self.__user_list[0]} pass '' " ,
f"alter user {self.__user_list[0]} pass ' ' " ,
f"alter user anyuser pass 'newpass' " ,
f"alter user {self.__user_list[0]} pass " ,
f"alter user {self.__user_list[0]} password 'newpass' " ,
]
for sql in sqls:
tdSql.error(sql)
def grant_user_privileges(self, privilege, dbname=None, user_name="root"):
return f"GRANT {privilege} ON {self.__priv_level(dbname)} TO {user_name} "
def test_user_create(self):
self.create_user_current()
self.create_user_err()
def test_alter_pass(self):
self.alter_pass_current()
self.alter_pass_err()
def user_login(self, user, passwd):
login_except = False
try:
with taos_connect(user=user, passwd=passwd) as conn:
cursor = conn.cursor
except BaseException:
login_except = True
cursor = None
return login_except, cursor
def login_currrent(self, user, passwd):
login_except, _ = self.user_login(user, passwd)
if login_except:
tdLog.exit(f"connect failed, user: {user} and pass: {passwd} do not match!")
else:
tdLog.info("connect successfully, user and pass matched!")
def login_err(self, user, passwd):
login_except, _ = self.user_login(user, passwd)
if login_except:
tdLog.info("connect failed, except error occured!")
else:
tdLog.exit("connect successfully, except error not occrued!")
def __drop_user(self, user):
return f"DROP USER {user}"
def drop_user_current(self):
for user in self.__user_list:
tdSql.query(self.__drop_user(user))
def drop_user_error(self):
sqls = [
f"DROP {self.__user_list[0]}",
f"DROP user {self.__user_list[0]} {self.__user_list[1]}",
f"DROP user {self.__user_list[0]} , {self.__user_list[1]}",
f"DROP users {self.__user_list[0]} {self.__user_list[1]}",
f"DROP users {self.__user_list[0]} , {self.__user_list[1]}",
# "DROP user root",
"DROP user abcde",
"DROP user ALL",
]
for sql in sqls:
tdSql.error(sql)
def test_drop_user(self):
# must drop err first
self.drop_user_error()
self.drop_user_current()
def run(self):
# 默认只有 root 用户
tdLog.printNoPrefix("==========step0: init, user list only has root account")
tdSql.query("show users")
tdSql.checkData(0, 0, "root")
tdSql.checkData(0, 1, "super")
# root用户权限
# 创建用户测试
tdLog.printNoPrefix("==========step1: create user test")
self.users_count = 5
self.test_user_create()
# 查看用户
tdLog.printNoPrefix("==========step2: show user test")
tdSql.query("show users")
tdSql.checkRows(self.users_count + 1)
# 密码登录认证
self.login_currrent(self.__user_list[0], self.__passwd_list[0])
self.login_err(self.__user_list[0], f"new{self.__passwd_list[0]}")
# 修改密码
tdLog.printNoPrefix("==========step3: alter user pass test")
self.test_alter_pass()
# 密码修改后的登录认证
tdLog.printNoPrefix("==========step4: check login test")
self.login_err(self.__user_list[0], self.__passwd_list[0])
self.login_currrent(self.__user_list[0], f"new{self.__passwd_list[0]}")
tdDnodes.stop(1)
tdDnodes.start(1)
tdSql.query("show users")
tdSql.checkRows(self.users_count + 1)
# 普通用户权限
# 密码登录
# _, user = self.user_login(self.__user_list[0], f"new{self.__passwd_list[0]}")
with taos_connect(user=self.__user_list[0], passwd=f"new{self.__passwd_list[0]}") as user:
# user = conn
# 不能创建用户
tdLog.printNoPrefix("==========step5: normal user can not create user")
user.error("create use utest1 pass 'utest1pass'")
# 可以查看用户
tdLog.printNoPrefix("==========step6: normal user can show user")
user.query("show users")
assert user.queryRows == self.users_count + 1
# 不可以修改其他用户的密码
tdLog.printNoPrefix("==========step7: normal user can not alter other user pass")
user.error(self.__alter_pass_sql(self.__user_list[1], self.__passwd_list[1] ))
user.error(self.__alter_pass_sql("root", "taosdata_root" ))
# 可以修改自己的密码
tdLog.printNoPrefix("==========step8: normal user can alter owner pass")
user.query(self.__alter_pass_sql(self.__user_list[0], self.__passwd_list[0]))
# 不可以删除用户,包括自己
tdLog.printNoPrefix("==========step9: normal user can not drop any user ")
user.error(f"drop user {self.__user_list[0]}")
user.error(f"drop user {self.__user_list[1]}")
user.error("drop user root")
# root删除用户测试
tdLog.printNoPrefix("==========step10: super user drop normal user")
self.test_drop_user()
tdSql.query("show users")
tdSql.checkRows(1)
tdSql.checkData(0, 0, "root")
tdSql.checkData(0, 1, "super")
tdDnodes.stop(1)
tdDnodes.start(1)
# 删除后无法登录
self.login_err(self.__user_list[0], self.__passwd_list[0])
self.login_err(self.__user_list[0], f"new{self.__passwd_list[0]}")
self.login_err(self.__user_list[1], self.__passwd_list[1])
self.login_err(self.__user_list[1], f"new{self.__passwd_list[1]}")
tdSql.query("show users")
tdSql.checkRows(1)
tdSql.checkData(0, 0, "root")
tdSql.checkData(0, 1, "super")
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())

View File

@ -45,16 +45,16 @@ class TDTestCase:
tdLog.printNoPrefix("==========step3:query timestamp type")
# tdSql.query("select * from t1 where ts between now()-1m and now()+10m")
# tdSql.checkRows(10)
# tdSql.query("select * from t1 where ts between '2021-01-01 00:00:00.000' and '2121-01-01 00:00:00.000'")
tdSql.query("select * from t1 where ts between now()-1m and now()+10m")
tdSql.checkRows(10)
tdSql.query("select * from t1 where ts between '2021-01-01 00:00:00.000' and '2121-01-01 00:00:00.000'")
# tdSql.checkRows(11)
# tdSql.query("select * from t1 where ts between '1969-01-01 00:00:00.000' and '1969-12-31 23:59:59.999'")
tdSql.query("select * from t1 where ts between '1969-01-01 00:00:00.000' and '1969-12-31 23:59:59.999'")
# tdSql.checkRows(0)
# tdSql.query("select * from t1 where ts between -2793600 and 31507199")
# tdSql.checkRows(0)
# tdSql.query("select * from t1 where ts between 1609430400000 and 4765104000000")
# tdSql.checkRows(11)
tdSql.query("select * from t1 where ts between -2793600 and 31507199")
tdSql.checkRows(0)
tdSql.query("select * from t1 where ts between 1609430400000 and 4765104000000")
tdSql.checkRows(11)
tdLog.printNoPrefix("==========step4:query int type")
@ -68,11 +68,11 @@ class TDTestCase:
tdSql.checkRows(0)
# tdSql.query("select * from t1 where c1 between 0x64 and 0x69")
# tdSql.checkRows(6)
# tdSql.query("select * from t1 where c1 not between 100 and 106")
# tdSql.checkRows(11)
tdSql.query("select * from t1 where c1 not between 100 and 106")
tdSql.checkRows(11)
tdSql.query(f"select * from t1 where c1 between {2**31-2} and {2**31+1}")
tdSql.checkRows(1)
tdSql.error(f"select * from t2 where c1 between null and {1-2**31}")
tdSql.query(f"select * from t2 where c1 between null and {1-2**31}")
# tdSql.checkRows(3)
tdSql.query(f"select * from t2 where c1 between {-2**31} and {1-2**31}")
tdSql.checkRows(1)
@ -88,12 +88,12 @@ class TDTestCase:
tdSql.query("select * from t1 where c2 between 'DC3' and 'SYN'")
tdSql.checkRows(0)
tdSql.query("select * from t1 where c2 not between 0.1 and 0.2")
# tdSql.checkRows(11)
tdSql.checkRows(11)
tdSql.query(f"select * from t1 where c2 between {pow(10,38)*3.4} and {pow(10,38)*3.4+1}")
# tdSql.checkRows(1)
tdSql.query(f"select * from t2 where c2 between {-3.4*10**38-1} and {-3.4*10**38}")
# tdSql.checkRows(2)
tdSql.error(f"select * from t2 where c2 between null and {-3.4*10**38}")
tdSql.query(f"select * from t2 where c2 between null and {-3.4*10**38}")
# tdSql.checkRows(3)
tdLog.printNoPrefix("==========step6:query bigint type")
@ -101,7 +101,7 @@ class TDTestCase:
tdSql.query(f"select * from t1 where c3 between {2**31} and {2**31+10}")
tdSql.checkRows(10)
tdSql.query(f"select * from t1 where c3 between {-2**63} and {2**63}")
# tdSql.checkRows(11)
tdSql.checkRows(11)
tdSql.query(f"select * from t1 where c3 between {2**31+10} and {2**31}")
tdSql.checkRows(0)
tdSql.query("select * from t1 where c3 between 'a' and 'z'")
@ -112,7 +112,7 @@ class TDTestCase:
tdSql.checkRows(1)
tdSql.query(f"select * from t2 where c3 between {-2**63} and {1-2**63}")
# tdSql.checkRows(3)
tdSql.error(f"select * from t2 where c3 between null and {1-2**63}")
tdSql.query(f"select * from t2 where c3 between null and {1-2**63}")
# tdSql.checkRows(2)
tdLog.printNoPrefix("==========step7:query double type")
@ -129,10 +129,10 @@ class TDTestCase:
tdSql.query("select * from t1 where c4 not between 1 and 2")
# tdSql.checkRows(0)
tdSql.query(f"select * from t1 where c4 between {1.7*10**308} and {1.7*10**308+1}")
# tdSql.checkRows(1)
tdSql.checkRows(1)
tdSql.query(f"select * from t2 where c4 between {-1.7*10**308-1} and {-1.7*10**308}")
# tdSql.checkRows(3)
tdSql.error(f"select * from t2 where c4 between null and {-1.7*10**308}")
tdSql.query(f"select * from t2 where c4 between null and {-1.7*10**308}")
# tdSql.checkRows(3)
tdLog.printNoPrefix("==========step8:query smallint type")
@ -151,7 +151,7 @@ class TDTestCase:
tdSql.checkRows(1)
tdSql.query("select * from t2 where c5 between -32768 and -32767")
tdSql.checkRows(1)
tdSql.error("select * from t2 where c5 between null and -32767")
tdSql.query("select * from t2 where c5 between null and -32767")
# tdSql.checkRows(1)
tdLog.printNoPrefix("==========step9:query tinyint type")
@ -170,21 +170,21 @@ class TDTestCase:
tdSql.checkRows(1)
tdSql.query("select * from t2 where c6 between -128 and -127")
tdSql.checkRows(1)
tdSql.error("select * from t2 where c6 between null and -127")
tdSql.query("select * from t2 where c6 between null and -127")
# tdSql.checkRows(3)
tdLog.printNoPrefix("==========step10:invalid query type")
# tdSql.query("select * from supt where location between 'beijing' and 'shanghai'")
# tdSql.checkRows(23)
# # 非0值均解析为1因此"between 负值 and o"解析为"between 1 and 0"
# tdSql.query("select * from supt where isused between 0 and 1")
# tdSql.checkRows(23)
# tdSql.query("select * from supt where isused between -1 and 0")
# tdSql.checkRows(0)
# tdSql.error("select * from supt where isused between false and true")
# tdSql.query("select * from supt where family between '拖拉机' and '自行车'")
# tdSql.checkRows(23)
tdSql.query("select * from supt where location between 'beijing' and 'shanghai'")
tdSql.checkRows(23)
# 非0值均解析为1因此"between 负值 and o"解析为"between 1 and 0"
tdSql.query("select * from supt where isused between 0 and 1")
tdSql.checkRows(23)
tdSql.query("select * from supt where isused between -1 and 0")
tdSql.checkRows(0)
tdSql.error("select * from supt where isused between false and true")
tdSql.query("select * from supt where family between '拖拉机' and '自行车'")
tdSql.checkRows(23)
tdLog.printNoPrefix("==========step11:query HEX/OCT/BIN type")

View File

@ -15,8 +15,16 @@ class TDTestCase:
def run(self): # sourcery skip: extract-duplicate-method
tdSql.prepare()
# get system timezone
time_zone = os.popen('timedatectl | grep zone').read(
).strip().split(':')[1].lstrip()
time_zone_arr = os.popen('timedatectl | grep zone').read(
).strip().split(':')
if len(time_zone_arr) > 1:
time_zone = time_zone_arr[1].lstrip()
else:
# possibly in a docker container
time_zone_1 = os.popen('ls -l /etc/localtime|awk -F/ \'{print $(NF-1) "/" $NF}\'').read().strip()
time_zone_2 = os.popen('date "+(%Z, %z)"').read().strip()
time_zone = time_zone_1 + " " + time_zone_2
print("expected time zone: " + time_zone)
tdLog.printNoPrefix("==========step1:create tables==========")
tdSql.execute(

View File

@ -114,7 +114,7 @@ class TDTestCase:
def tmqCase1(self, cfgPath, buildPath):
tdLog.printNoPrefix("======== test scenario 1: ")
tdLog.printNoPrefix("======== test case 1: Produce while consume")
tdLog.info("step 1: create database, stb, ctb and insert data")
# create and start thread
parameterDict = {'cfg': '', \
@ -122,8 +122,8 @@ class TDTestCase:
'vgroups': 1, \
'stbName': 'stb', \
'ctbNum': 10, \
'rowsPerTbl': 100, \
'batchNum': 10, \
'rowsPerTbl': 1000, \
'batchNum': 100, \
'startTs': 1640966400000} # 2022-01-01 00:00:00.000
parameterDict['cfg'] = cfgPath
prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict)
@ -163,8 +163,7 @@ class TDTestCase:
tdSql.query("create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)")
consumerId = 0
expectmsgcnt = (parameterDict["rowsPerTbl"] / parameterDict["batchNum"] ) * parameterDict["ctbNum"]
expectmsgcnt1 = expectmsgcnt + parameterDict["ctbNum"]
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"]
topicList = topicFromStb
ifcheckdata = 0
keyList = 'group.id:cgrp1,\
@ -172,7 +171,7 @@ class TDTestCase:
auto.commit.interval.ms:6000,\
auto.offset.reset:earliest'
sql = "insert into consumeinfo values "
sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectmsgcnt1, ifcheckdata)
sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata)
tdSql.query(sql)
tdLog.info("check stb if there are data")
@ -209,18 +208,19 @@ class TDTestCase:
else:
time.sleep(5)
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"]
tdLog.info("consumer result: %d, %d"%(tdSql.getData(0 , 2), tdSql.getData(0 , 3)))
tdSql.checkData(0 , 1, consumerId)
tdSql.checkData(0 , 2, expectmsgcnt)
# mulit rows and mulit tables in one sql, this num of msg is not sure
#tdSql.checkData(0 , 2, expectmsgcnt)
tdSql.checkData(0 , 3, expectrowcnt)
tdSql.query("drop topic %s"%topicFromStb)
tdSql.query("drop topic %s"%topicFromCtb)
tdLog.printNoPrefix("======== test case 1 end ...... ")
def tmqCase2(self, cfgPath, buildPath):
tdLog.printNoPrefix("======== test scenario 2: add child table with consuming ")
tdLog.printNoPrefix("======== test case 2: add child table with consuming ")
# create and start thread
parameterDict = {'cfg': '', \
'dbName': 'db2', \
@ -275,9 +275,9 @@ class TDTestCase:
tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)"%cdbName)
tdSql.query("create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"%cdbName)
rowsOfNewCtb = 1000
consumerId = 0
expectmsgcnt = (parameterDict["rowsPerTbl"] / parameterDict["batchNum"] ) * parameterDict["ctbNum"]
expectmsgcnt1 = expectmsgcnt + parameterDict["ctbNum"]
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] + rowsOfNewCtb
topicList = topicFromStb
ifcheckdata = 0
keyList = 'group.id:cgrp1,\
@ -285,7 +285,7 @@ class TDTestCase:
auto.commit.interval.ms:6000,\
auto.offset.reset:earliest'
sql = "insert into consumeinfo values "
sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectmsgcnt1, ifcheckdata)
sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata)
tdSql.query(sql)
tdLog.info("check stb if there are data")
@ -312,7 +312,6 @@ class TDTestCase:
# create new child table and insert data
newCtbName = 'newctb'
rowsOfNewCtb = 1000
tdSql.query("create table %s.%s using %s.%s tags(9999)"%(parameterDict["dbName"], newCtbName, parameterDict["dbName"], parameterDict["stbName"]))
startTs = parameterDict["startTs"]
for j in range(rowsOfNewCtb):
@ -332,14 +331,135 @@ class TDTestCase:
else:
time.sleep(5)
expectmsgcnt += rowsOfNewCtb
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] + rowsOfNewCtb
tdSql.checkData(0 , 1, consumerId)
tdSql.checkData(0 , 2, expectmsgcnt)
tdSql.checkData(0 , 3, expectrowcnt)
tdSql.query("drop topic %s"%topicFromStb)
tdSql.query("drop topic %s"%topicFromCtb)
tdLog.printNoPrefix("======== test scenario 2 end ...... ")
tdLog.printNoPrefix("======== test case 2 end ...... ")
def tmqCase3(self, cfgPath, buildPath):
tdLog.printNoPrefix("======== test case 3: tow topics, each contains a stable, \
but at the beginning, no ctables in the stable of one topic,\
after starting consumer, create ctables ")
# create and start thread
parameterDict = {'cfg': '', \
'dbName': 'db2', \
'vgroups': 1, \
'stbName': 'stb', \
'ctbNum': 10, \
'rowsPerTbl': 10000, \
'batchNum': 100, \
'startTs': 1640966400000} # 2022-01-01 00:00:00.000
parameterDict['cfg'] = cfgPath
prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict)
prepareEnvThread.start()
# wait db ready
while 1:
tdSql.query("show databases")
if tdSql.getRows() == 4:
print (tdSql.getData(0,0), tdSql.getData(1,0),tdSql.getData(2,0),)
break
else:
time.sleep(1)
tdSql.query("use %s"%parameterDict['dbName'])
# wait stb ready
while 1:
tdSql.query("show %s.stables"%parameterDict['dbName'])
if tdSql.getRows() == 1:
break
else:
time.sleep(1)
tdLog.info("create topics from super table")
topicFromStb = 'topic_stb_column2'
topicFromCtb = 'topic_ctb_column2'
tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb, parameterDict['dbName'], parameterDict['stbName']))
tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s_0" %(topicFromCtb, parameterDict['dbName'], parameterDict['stbName']))
time.sleep(1)
tdSql.query("show topics")
topic1 = tdSql.getData(0 , 0)
topic2 = tdSql.getData(1 , 0)
tdLog.info("show topics: %s, %s"%(topic1, topic2))
if topic1 != topicFromStb and topic1 != topicFromCtb:
tdLog.exit("topic error1")
if topic2 != topicFromStb and topic2 != topicFromCtb:
tdLog.exit("topic error2")
tdLog.info("create consume info table and consume result table")
cdbName = parameterDict["dbName"]
tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)"%cdbName)
tdSql.query("create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"%cdbName)
rowsOfNewCtb = 1000
consumerId = 0
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] + rowsOfNewCtb
topicList = topicFromStb
ifcheckdata = 0
keyList = 'group.id:cgrp1,\
enable.auto.commit:false,\
auto.commit.interval.ms:6000,\
auto.offset.reset:earliest'
sql = "insert into consumeinfo values "
sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata)
tdSql.query(sql)
tdLog.info("check stb if there are data")
while 1:
tdSql.query("select count(*) from %s"%parameterDict["stbName"])
#tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3))
countOfStb = tdSql.getData(0, 0)
if countOfStb != 0:
tdLog.info("count from stb: %d"%countOfStb)
break
else:
time.sleep(1)
tdLog.info("start consume processor")
pollDelay = 5
showMsg = 1
showRow = 1
shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, parameterDict["dbName"], showMsg, showRow, cdbName)
shellCmd += "> /dev/null 2>&1 &"
tdLog.info(shellCmd)
os.system(shellCmd)
# create new child table and insert data
newCtbName = 'newctb'
tdSql.query("create table %s.%s using %s.%s tags(9999)"%(parameterDict["dbName"], newCtbName, parameterDict["dbName"], parameterDict["stbName"]))
startTs = parameterDict["startTs"]
for j in range(rowsOfNewCtb):
sql = "insert into %s.%s values (%d, %d, 'tmqrow_%d') "%(parameterDict["dbName"], newCtbName, startTs + j, j, j)
tdSql.execute(sql)
tdLog.debug("insert data into new child table ............ [OK]")
# wait for data ready
prepareEnvThread.join()
tdLog.info("insert process end, and start to check consume result")
while 1:
tdSql.query("select * from consumeresult")
#tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3))
if tdSql.getRows() == 1:
break
else:
time.sleep(5)
tdSql.checkData(0 , 1, consumerId)
tdSql.checkData(0 , 3, expectrowcnt)
tdSql.query("drop topic %s"%topicFromStb)
tdSql.query("drop topic %s"%topicFromCtb)
tdLog.printNoPrefix("======== test case 3 end ...... ")
def run(self):
tdSql.prepare()
@ -353,7 +473,7 @@ class TDTestCase:
tdLog.info("cfgPath: %s" % cfgPath)
self.tmqCase1(cfgPath, buildPath)
#self.tmqCase2(cfgPath, buildPath)
self.tmqCase2(cfgPath, buildPath)
#self.tmqCase3(cfgPath, buildPath)
def stop(self):

View File

@ -8,6 +8,8 @@ python3 ./test.py -f 0-others/taosShellNetChk.py
python3 ./test.py -f 0-others/telemetry.py
python3 ./test.py -f 0-others/taosdMonitor.py
python3 ./test.py -f 0-others/user_control.py
#python3 ./test.py -f 2-query/between.py
python3 ./test.py -f 2-query/distinct.py
python3 ./test.py -f 2-query/varchar.py
@ -53,5 +55,3 @@ python3 ./test.py -f 2-query/arctan.py
# python3 ./test.py -f 2-query/query_cols_tags_and_or.py
python3 ./test.py -f 7-tmq/basic5.py

View File

@ -98,12 +98,24 @@ static void printHelp() {
}
void initLogFile() {
// FILE *fp = fopen(g_stConfInfo.resultFileName, "a");
char file[256];
sprintf(file, "%s/../log/tmqlog.txt", configDir);
TdFilePtr pFile = taosOpenFile(file, TD_FILE_TEXT | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_STREAM);
time_t now;
struct tm curTime;
char filename[256];
now = taosTime(NULL);
taosLocalTime(&now, &curTime);
sprintf(filename,"%s/../log/tmqlog_%04d-%02d-%02d %02d-%02d-%02d.txt",
configDir,
curTime.tm_year+1900,
curTime.tm_mon+1,
curTime.tm_mday,
curTime.tm_hour,
curTime.tm_min,
curTime.tm_sec);
//sprintf(filename, "%s/../log/tmqlog.txt", configDir);
TdFilePtr pFile = taosOpenFile(filename, TD_FILE_TEXT | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_STREAM);
if (NULL == pFile) {
fprintf(stderr, "Failed to open %s for save result\n", "./tmqlog.txt");
fprintf(stderr, "Failed to open %s for save result\n", filename);
exit(-1);
}
g_fp = pFile;
@ -333,8 +345,8 @@ void loop_consume(SThreadInfo* pInfo) {
totalMsgs++;
if (totalMsgs >= pInfo->expectMsgCnt) {
taosFprintfFile(g_fp, "==== totalMsgs >= pInfo->expectMsgCnt, so break\n");
if (totalRows >= pInfo->expectMsgCnt) {
taosFprintfFile(g_fp, "==== totalRows >= pInfo->expectMsgCnt, so break\n");
break;
}
} else {

40
tests/unit-test/test.sh Executable file
View File

@ -0,0 +1,40 @@
#!/bin/bash
function usage() {
echo "$0"
echo -e "\t -e enterprise edition"
echo -e "\t -h help"
}
ent=0
while getopts "eh" opt; do
case $opt in
e)
ent=1
;;
h)
usage
exit 0
;;
\?)
echo "Invalid option: -$OPTARG"
usage
exit 0
;;
esac
done
script_dir=`dirname $0`
cd ${script_dir}
PWD=`pwd`
if [ $ent -eq 0 ]; then
cd ../../debug
else
cd ../../../debug
fi
ctest -j8
ret=$?
exit $ret