diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 48c15e9117..5fd9a8b12b 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -340,6 +340,7 @@ typedef struct SStreamMeta { TTB* pTaskDb; TTB* pCheckpointDb; SHashObj* pTasks; + SArray* pTaskList; // SArray void* ahandle; TXN* txn; FTaskExpand* expandFunc; diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c index 7267c72ec2..dd3f50f440 100644 --- a/source/client/src/clientSml.c +++ b/source/client/src/clientSml.c @@ -558,10 +558,15 @@ static int32_t smlGenerateSchemaAction(SSchema *colField, SHashObj *colHash, SSm return 0; } +#define BOUNDARY 1024 static int32_t smlFindNearestPowerOf2(int32_t length, uint8_t type) { int32_t result = 1; - while (result <= length) { - result *= 2; + if (length >= BOUNDARY){ + result = length; + }else{ + while (result <= length) { + result *= 2; + } } if (type == TSDB_DATA_TYPE_BINARY && result > TSDB_MAX_BINARY_LEN - VARSTR_HEADER_SIZE) { result = TSDB_MAX_BINARY_LEN - VARSTR_HEADER_SIZE; @@ -657,7 +662,7 @@ static int32_t smlBuildFieldsList(SSmlHandle *info, SSchema *schemaField, SHashO len += field->bytes; } if(len > maxLen){ - return TSDB_CODE_TSC_INVALID_VALUE; + return isTag ? TSDB_CODE_PAR_INVALID_TAGS_LENGTH : TSDB_CODE_PAR_INVALID_ROW_LENGTH; } return TSDB_CODE_SUCCESS; diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index 63bcef2a5b..da3c3b98a8 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -2524,6 +2524,9 @@ int32_t mndValidateStbInfo(SMnode *pMnode, SSTableVersion *pStbVersions, int32_t if (mndBuildStbSchema(pMnode, pStbVersion->dbFName, pStbVersion->stbName, &metaRsp, &smaVer) != 0) { metaRsp.numOfColumns = -1; metaRsp.suid = pStbVersion->suid; + tstrncpy(metaRsp.dbFName, pStbVersion->dbFName, sizeof(metaRsp.dbFName)); + tstrncpy(metaRsp.tbName, pStbVersion->stbName, sizeof(metaRsp.tbName)); + tstrncpy(metaRsp.stbName, pStbVersion->stbName, sizeof(metaRsp.stbName)); taosArrayPush(hbRsp.pMetaRsp, &metaRsp); continue; } diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index cefc4fa63e..7352bbc0fe 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -153,11 +153,15 @@ int32_t sndProcessTaskDeployReq(SSnode *pSnode, char *msg, int32_t msgLen) { ASSERT(pTask->taskLevel == TASK_LEVEL__AGG); // 2.save task + taosWLockLatch(&pSnode->pMeta->lock); code = streamMetaAddDeployedTask(pSnode->pMeta, -1, pTask); if (code < 0) { + taosWUnLockLatch(&pSnode->pMeta->lock); return -1; } + taosWUnLockLatch(&pSnode->pMeta->lock); + // 3.go through recover steps to fill history if (pTask->fillHistory) { streamSetParamForRecover(pTask); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 4997db684f..6a52d0fc39 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -781,13 +781,17 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms tDecoderClear(&decoder); // 2.save task, use the newest commit version as the initial start version of stream task. + taosWLockLatch(&pTq->pStreamMeta->lock); code = streamMetaAddDeployedTask(pTq->pStreamMeta, sversion, pTask); if (code < 0) { tqError("vgId:%d failed to add s-task:%s, total:%d", TD_VID(pTq->pVnode), pTask->id.idStr, streamMetaGetNumOfTasks(pTq->pStreamMeta)); + taosWUnLockLatch(&pTq->pStreamMeta->lock); return -1; } + taosWUnLockLatch(&pTq->pStreamMeta->lock); + // 3.go through recover steps to fill history if (pTask->fillHistory) { streamTaskCheckDownstream(pTask, sversion); @@ -1268,7 +1272,8 @@ int32_t tqStartStreamTasks(STQ* pTq) { SStreamMeta* pMeta = pTq->pStreamMeta; taosWLockLatch(&pMeta->lock); - int32_t numOfTasks = taosHashGetSize(pTq->pStreamMeta->pTasks); + + int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); if (numOfTasks == 0) { tqInfo("vgId:%d no stream tasks exists", vgId); taosWUnLockLatch(&pTq->pStreamMeta->lock); diff --git a/source/dnode/vnode/src/tq/tqPush.c b/source/dnode/vnode/src/tq/tqPush.c index c8195f72a9..950c5ea96b 100644 --- a/source/dnode/vnode/src/tq/tqPush.c +++ b/source/dnode/vnode/src/tq/tqPush.c @@ -22,14 +22,15 @@ int32_t tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t v tqProcessSubmitReqForSubscribe(pTq); } - tqDebug("handle submit, restore:%d, size:%d", pTq->pVnode->restored, (int)taosHashGetSize(pTq->pStreamMeta->pTasks)); + int32_t numOfTasks = streamMetaGetNumOfTasks(pTq->pStreamMeta); + tqDebug("handle submit, restore:%d, size:%d", pTq->pVnode->restored, numOfTasks); // push data for stream processing: // 1. the vnode has already been restored. // 2. the vnode should be the leader. // 3. the stream is not suspended yet. if (!tsDisableStream && vnodeIsRoleLeader(pTq->pVnode) && pTq->pVnode->restored) { - if (taosHashGetSize(pTq->pStreamMeta->pTasks) == 0) { + if (numOfTasks == 0) { return 0; } diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 7ed77edd5b..1fbdb25528 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -1093,6 +1093,5 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) { } taosWUnLockLatch(&pTq->pStreamMeta->lock); - return 0; } diff --git a/source/dnode/vnode/src/tq/tqRestore.c b/source/dnode/vnode/src/tq/tqRestore.c index 58cb7b9e63..1e45f578f6 100644 --- a/source/dnode/vnode/src/tq/tqRestore.c +++ b/source/dnode/vnode/src/tq/tqRestore.c @@ -36,6 +36,7 @@ int32_t tqStreamTasksScanWal(STQ* pTq) { if (shouldIdle) { taosWLockLatch(&pMeta->lock); + pMeta->walScanCounter -= 1; times = pMeta->walScanCounter; @@ -56,42 +57,28 @@ int32_t tqStreamTasksScanWal(STQ* pTq) { return 0; } -static SArray* extractTaskIdList(SStreamMeta* pStreamMeta, int32_t numOfTasks) { - SArray* pTaskIdList = taosArrayInit(numOfTasks, sizeof(int32_t)); - void* pIter = NULL; - - taosWLockLatch(&pStreamMeta->lock); - while(1) { - pIter = taosHashIterate(pStreamMeta->pTasks, pIter); - if (pIter == NULL) { - break; - } - - SStreamTask* pTask = *(SStreamTask**)pIter; - taosArrayPush(pTaskIdList, &pTask->id.taskId); - } - - taosWUnLockLatch(&pStreamMeta->lock); - return pTaskIdList; -} - int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { *pScanIdle = true; bool noNewDataInWal = true; int32_t vgId = pStreamMeta->vgId; - int32_t numOfTasks = taosHashGetSize(pStreamMeta->pTasks); + int32_t numOfTasks = taosArrayGetSize(pStreamMeta->pTaskList); if (numOfTasks == 0) { return TSDB_CODE_SUCCESS; } + SArray* pTaskList = NULL; + taosWLockLatch(&pStreamMeta->lock); + pTaskList = taosArrayDup(pStreamMeta->pTaskList, NULL); + taosWUnLockLatch(&pStreamMeta->lock); + tqDebug("vgId:%d start to check wal to extract new submit block for %d tasks", vgId, numOfTasks); - SArray* pTaskIdList = extractTaskIdList(pStreamMeta, numOfTasks); // update the new task number - numOfTasks = taosArrayGetSize(pTaskIdList); + numOfTasks = taosArrayGetSize(pTaskList); + for (int32_t i = 0; i < numOfTasks; ++i) { - int32_t* pTaskId = taosArrayGet(pTaskIdList, i); + int32_t* pTaskId = taosArrayGet(pTaskList, i); SStreamTask* pTask = streamMetaAcquireTask(pStreamMeta, *pTaskId); if (pTask == NULL) { continue; @@ -165,7 +152,7 @@ int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { *pScanIdle = true; } - taosArrayDestroy(pTaskIdList); + taosArrayDestroy(pTaskList); return 0; } diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 1ef86f5b30..d0a0ea7947 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -21,10 +21,9 @@ #define getCurrentKeyInLastBlock(_r) ((_r)->currentKey) typedef enum { - READER_STATUS_SUSPEND = 0x1, - READER_STATUS_SHOULD_STOP = 0x2, - READER_STATUS_NORMAL = 0x3, -} EReaderExecStatus; + READER_STATUS_SUSPEND = 0x1, + READER_STATUS_NORMAL = 0x2, +} EReaderStatus; typedef enum { EXTERNAL_ROWS_PREV = 0x1, @@ -184,6 +183,7 @@ typedef struct STsdbReaderAttr { STimeWindow window; bool freeBlock; SVersionRange verRange; + int16_t order; } STsdbReaderAttr; typedef struct SResultBlockInfo { @@ -196,7 +196,8 @@ struct STsdbReader { STsdb* pTsdb; SVersionRange verRange; TdThreadMutex readerMutex; - EReaderExecStatus flag; + EReaderStatus flag; + int32_t code; uint64_t suid; int16_t order; EReadMode readMode; @@ -2995,9 +2996,9 @@ static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum, SAr while (1) { // only check here, since the iterate data in memory is very fast. - if (pReader->flag == READER_STATUS_SHOULD_STOP) { - tsdbWarn("tsdb reader is stopped ASAP, %s", pReader->idStr); - return TSDB_CODE_SUCCESS; + if (pReader->code != TSDB_CODE_SUCCESS) { + tsdbWarn("tsdb reader is stopped ASAP, code:%s, %s", strerror(pReader->code), pReader->idStr); + return pReader->code; } bool hasNext = false; @@ -3093,9 +3094,9 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) { SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock; while (1) { - if (pReader->flag == READER_STATUS_SHOULD_STOP) { - tsdbWarn("tsdb reader is stopped ASAP, %s", pReader->idStr); - return TSDB_CODE_SUCCESS; + if (pReader->code != TSDB_CODE_SUCCESS) { + tsdbWarn("tsdb reader is stopped ASAP, code:%s, %s", strerror(pReader->code), pReader->idStr); + return pReader->code; } // load the last data block of current table @@ -3246,7 +3247,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { } } - return code; + return (pReader->code != TSDB_CODE_SUCCESS)? pReader->code:code; } static int32_t doSumFileBlockRows(STsdbReader* pReader, SDataFReader* pFileReader) { @@ -3395,9 +3396,9 @@ static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) { STableUidList* pUidList = &pStatus->uidList; while (1) { - if (pReader->flag == READER_STATUS_SHOULD_STOP) { - tsdbWarn("tsdb reader is stopped ASAP, %s", pReader->idStr); - return TSDB_CODE_SUCCESS; + if (pReader->code != TSDB_CODE_SUCCESS) { + tsdbWarn("tsdb reader is stopped ASAP, code:%s, %s", strerror(pReader->code), pReader->idStr); + return pReader->code; } STableBlockScanInfo** pBlockScanInfo = pStatus->pTableIter; @@ -3493,7 +3494,7 @@ static ERetrieveType doReadDataFromLastFiles(STsdbReader* pReader) { terrno = 0; code = doLoadLastBlockSequentially(pReader); - if (code != TSDB_CODE_SUCCESS || pReader->flag == READER_STATUS_SHOULD_STOP) { + if (code != TSDB_CODE_SUCCESS) { terrno = code; return TSDB_READ_RETURN; } @@ -3507,8 +3508,7 @@ static ERetrieveType doReadDataFromLastFiles(STsdbReader* pReader) { code = initForFirstBlockInFile(pReader, pBlockIter); // error happens or all the data files are completely checked - if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false) || - pReader->flag == READER_STATUS_SHOULD_STOP) { + if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) { terrno = code; return TSDB_READ_RETURN; } @@ -3536,13 +3536,9 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) { } code = doBuildDataBlock(pReader); - if (code != TSDB_CODE_SUCCESS || pReader->flag == READER_STATUS_SHOULD_STOP) { + if (code != TSDB_CODE_SUCCESS || pResBlock->info.rows > 0) { return code; } - - if (pResBlock->info.rows > 0) { - return TSDB_CODE_SUCCESS; - } } while (1) { @@ -3581,13 +3577,9 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) { code = doBuildDataBlock(pReader); } - if (code != TSDB_CODE_SUCCESS || pReader->flag == READER_STATUS_SHOULD_STOP) { + if (code != TSDB_CODE_SUCCESS || pResBlock->info.rows > 0) { return code; } - - if (pResBlock->info.rows > 0) { - return TSDB_CODE_SUCCESS; - } } } @@ -4849,8 +4841,8 @@ int32_t tsdbNextDataBlock(STsdbReader* pReader, bool* hasNext) { *hasNext = false; - if (isEmptyQueryTimeWindow(&pReader->window) || pReader->step == EXTERNAL_ROWS_NEXT) { - return code; + if (isEmptyQueryTimeWindow(&pReader->window) || pReader->step == EXTERNAL_ROWS_NEXT || pReader->code != TSDB_CODE_SUCCESS) { + return (pReader->code != TSDB_CODE_SUCCESS)? pReader->code:code; } SReaderStatus* pStatus = &pReader->status; @@ -5456,4 +5448,4 @@ void tsdbReaderSetId(STsdbReader* pReader, const char* idstr) { pReader->idStr = taosStrdup(idstr); } -void tsdbReaderSetCloseFlag(STsdbReader* pReader) { pReader->flag = READER_STATUS_SHOULD_STOP; } +void tsdbReaderSetCloseFlag(STsdbReader* pReader) { pReader->code = TSDB_CODE_TSC_QUERY_CANCELLED; } diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index fcdc6418a0..b598fffbc6 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -6666,22 +6666,40 @@ static int32_t createRealTableForGrantTable(SGrantStmt* pStmt, SRealTableNode** } static int32_t translateGrantTagCond(STranslateContext* pCxt, SGrantStmt* pStmt, SAlterUserReq* pReq) { - if (NULL == pStmt->pTagCond) { - return TSDB_CODE_SUCCESS; - } + SRealTableNode* pTable = NULL; if ('\0' == pStmt->tabName[0] || '*' == pStmt->tabName[0]) { - return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_SYNTAX_ERROR, - "The With clause can only be used for table level privilege"); + if (pStmt->pTagCond) { + return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_SYNTAX_ERROR, + "The With clause can only be used for table level privilege"); + } else { + return TSDB_CODE_SUCCESS; + } } - pCxt->pCurrStmt = (SNode*)pStmt; - SRealTableNode* pTable = NULL; int32_t code = createRealTableForGrantTable(pStmt, &pTable); if (TSDB_CODE_SUCCESS == code) { SName name; code = getTableMetaImpl(pCxt, toName(pCxt->pParseCxt->acctId, pTable->table.dbName, pTable->table.tableName, &name), &(pTable->pMeta)); + if (code) { + nodesDestroyNode((SNode*)pTable); + return code; + } + + if (TSDB_SUPER_TABLE != pTable->pMeta->tableType && TSDB_NORMAL_TABLE != pTable->pMeta->tableType) { + nodesDestroyNode((SNode*)pTable); + return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_SYNTAX_ERROR, + "Only supertable and normal table can be granted"); + } } + + if (TSDB_CODE_SUCCESS == code && NULL == pStmt->pTagCond) { + nodesDestroyNode((SNode*)pTable); + return TSDB_CODE_SUCCESS; + } + + pCxt->pCurrStmt = (SNode*)pStmt; + if (TSDB_CODE_SUCCESS == code) { code = addNamespace(pCxt, pTable); } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index e85a552d13..e10562f5cb 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -314,6 +314,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { pTask->chkInfo = (SCheckpointInfo) {.version = dataVer, .id = ckId, .currentVer = pTask->chkInfo.currentVer}; taosWLockLatch(&pTask->pMeta->lock); + streamMetaSaveTask(pTask->pMeta, pTask); if (streamMetaCommit(pTask->pMeta) < 0) { taosWUnLockLatch(&pTask->pMeta->lock); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 822ae2a485..aefe7885f9 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -57,6 +57,13 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF goto _err; } + // task list + pMeta->pTaskList = taosArrayInit(4, sizeof(int32_t)); + if (pMeta->pTaskList == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + if (streamMetaBegin(pMeta) < 0) { goto _err; } @@ -70,6 +77,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF _err: taosMemoryFree(pMeta->path); if (pMeta->pTasks) taosHashCleanup(pMeta->pTasks); + if (pMeta->pTaskList) taosArrayDestroy(pMeta->pTaskList); if (pMeta->pTaskDb) tdbTbClose(pMeta->pTaskDb); if (pMeta->pCheckpointDb) tdbTbClose(pMeta->pCheckpointDb); if (pMeta->db) tdbClose(pMeta->db); @@ -100,6 +108,7 @@ void streamMetaClose(SStreamMeta* pMeta) { } taosHashCleanup(pMeta->pTasks); + pMeta->pTaskList = taosArrayDestroy(pMeta->pTaskList); taosMemoryFree(pMeta->path); taosMemoryFree(pMeta); } @@ -180,11 +189,15 @@ int32_t streamMetaAddDeployedTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* } taosHashPut(pMeta->pTasks, &pTask->id.taskId, sizeof(int32_t), &pTask, POINTER_BYTES); + taosArrayPush(pMeta->pTaskList, &pTask->id.taskId); return 0; } int32_t streamMetaGetNumOfTasks(const SStreamMeta* pMeta) { - return (int32_t) taosHashGetSize(pMeta->pTasks); + size_t size = taosHashGetSize(pMeta->pTasks); + ASSERT(taosArrayGetSize(pMeta->pTaskList) == taosHashGetSize(pMeta->pTasks)); + + return (int32_t) size; } SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId) { @@ -216,12 +229,23 @@ void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) { SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t)); if (ppTask) { SStreamTask* pTask = *ppTask; + + taosWLockLatch(&pMeta->lock); + taosHashRemove(pMeta->pTasks, &taskId, sizeof(int32_t)); tdbTbDelete(pMeta->pTaskDb, &taskId, sizeof(int32_t), pMeta->txn); atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__STOP); - taosWLockLatch(&pMeta->lock); + int32_t num = taosArrayGetSize(pMeta->pTaskList); + for(int32_t i = 0; i < num; ++i) { + int32_t* pTaskId = taosArrayGet(pMeta->pTaskList, i); + if (*pTaskId == taskId) { + taosArrayRemove(pMeta->pTaskList, i); + break; + } + } + streamMetaReleaseTask(pMeta, pTask); taosWUnLockLatch(&pMeta->lock); } @@ -306,6 +330,8 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) { return -1; } + taosArrayPush(pMeta->pTaskList, &pTask->id.taskId); + if (pTask->fillHistory) { pTask->status.taskStatus = TASK_STATUS__WAIT_DOWNSTREAM; streamTaskCheckDownstream(pTask, ver); diff --git a/source/libs/tdb/src/db/tdbBtree.c b/source/libs/tdb/src/db/tdbBtree.c index 6df2b40000..c49b5726b6 100644 --- a/source/libs/tdb/src/db/tdbBtree.c +++ b/source/libs/tdb/src/db/tdbBtree.c @@ -1814,6 +1814,11 @@ int tdbBtreeNext(SBTC *pBtc, void **ppKey, int *kLen, void **ppVal, int *vLen) { *ppVal = pVal; *vLen = cd.vLen; + } else { + if (TDB_CELLDECODER_FREE_VAL(&cd)) { + tdbTrace("tdb/btree-next2 decoder: %p pVal free: %p", &cd, cd.pVal); + tdbFree(cd.pVal); + } } ret = tdbBtcMoveToNext(pBtc); diff --git a/source/os/src/osLocale.c b/source/os/src/osLocale.c index 129faaacc8..136b8cf022 100644 --- a/source/os/src/osLocale.c +++ b/source/os/src/osLocale.c @@ -171,7 +171,7 @@ void taosGetSystemLocale(char *outLocale, char *outCharset) { strcpy(outLocale, "en_US.UTF-8"); } else { tstrncpy(outLocale, locale, TD_LOCALE_LEN); - printf("locale not configured, set to system default:%s\n", outLocale); + //printf("locale not configured, set to system default:%s\n", outLocale); } // if user does not specify the charset, extract it from locale diff --git a/tests/parallel_test/run_case.sh b/tests/parallel_test/run_case.sh index ffc23ba6d2..2d736e1414 100755 --- a/tests/parallel_test/run_case.sh +++ b/tests/parallel_test/run_case.sh @@ -79,7 +79,7 @@ md5sum /home/TDinternal/debug/build/lib/libtaos.so #define taospy 2.7.6 pip3 list|grep taospy pip3 uninstall taospy -y -pip3 install taospy==2.7.6 +pip3 install --default-timeout=120 taospy==2.7.6 $TIMEOUT_CMD $cmd RET=$? diff --git a/tests/pytest/auto_crash_gen.py b/tests/pytest/auto_crash_gen.py index 56629ede13..5af2f055cd 100755 --- a/tests/pytest/auto_crash_gen.py +++ b/tests/pytest/auto_crash_gen.py @@ -1,3 +1,4 @@ +import datetime import os import socket import requests @@ -238,17 +239,7 @@ def start_taosd(): start_cmd = 'cd %s && python3 test.py >>/dev/null '%(start_path) os.system(start_cmd) -def get_cmds(args_list): - # build_path = get_path() - # if repo == "community": - # crash_gen_path = build_path[:-5]+"community/tests/pytest/" - # elif repo == "TDengine": - # crash_gen_path = build_path[:-5]+"/tests/pytest/" - # else: - # pass - - # crash_gen_cmd = 'cd %s && ./crash_gen.sh --valgrind -p -t 10 -s 1000 -g 0x32c,0x32d,0x3d3,0x18,0x2501,0x369,0x388,0x061a,0x2550 '%(crash_gen_path) - +def get_cmds(args_list): crash_gen_cmd = get_auto_mix_cmds(args_list,valgrind=valgrind_mode) return crash_gen_cmd @@ -295,7 +286,7 @@ def check_status(): elif "Crash_Gen is now exiting with status code: 0" in run_code: return 0 else: - return 2 + return 2 def main(): @@ -310,7 +301,7 @@ def main(): build_path = get_path() - os.system("pip install git+https://github.com/taosdata/taos-connector-python.git") + if repo =="community": crash_gen_path = build_path[:-5]+"community/tests/pytest/" elif repo =="TDengine": @@ -334,7 +325,9 @@ def main(): if not os.path.exists(run_dir): os.mkdir(run_dir) print(crash_cmds) + starttime = datetime.datetime.now() run_crash_gen(crash_cmds) + endtime = datetime.datetime.now() status = check_status() print("exit status : ", status) @@ -349,7 +342,12 @@ def main(): print('======== crash_gen run sucess and exit as expected ========') try: - text = f"crash_gen instance exit status of docker [ {hostname} ] is : {msg_dict[status]}\n " + f" and git commit : {git_commit}" + text = f'''exit status: {msg_dict[status]} + git commit : {git_commit} + hostname: {hostname} + start time: {starttime} + end time: {endtime} + cmd: {crash_cmds}''' send_msg(get_msg(text)) except Exception as e: print("exception:", e) diff --git a/tests/pytest/auto_crash_gen_valgrind.py b/tests/pytest/auto_crash_gen_valgrind.py index 22fc5a480f..49e2c43f84 100755 --- a/tests/pytest/auto_crash_gen_valgrind.py +++ b/tests/pytest/auto_crash_gen_valgrind.py @@ -1,6 +1,7 @@ #!/usr/bin/python3 +import datetime import os import socket import requests @@ -241,15 +242,6 @@ def start_taosd(): os.system(start_cmd +">>/dev/null") def get_cmds(args_list): - # build_path = get_path() - # if repo == "community": - # crash_gen_path = build_path[:-5]+"community/tests/pytest/" - # elif repo == "TDengine": - # crash_gen_path = build_path[:-5]+"/tests/pytest/" - # else: - # pass - - # crash_gen_cmd = 'cd %s && ./crash_gen.sh --valgrind -p -t 10 -s 1000 -g 0x32c,0x32d,0x3d3,0x18,0x2501,0x369,0x388,0x061a,0x2550 '%(crash_gen_path) crash_gen_cmd = get_auto_mix_cmds(args_list,valgrind=valgrind_mode) return crash_gen_cmd @@ -343,7 +335,6 @@ def main(): args = limits(args) build_path = get_path() - os.system("pip install git+https://github.com/taosdata/taos-connector-python.git >>/dev/null") if repo =="community": crash_gen_path = build_path[:-5]+"community/tests/pytest/" elif repo =="TDengine": @@ -368,7 +359,9 @@ def main(): if not os.path.exists(run_dir): os.mkdir(run_dir) print(crash_cmds) + starttime = datetime.datetime.now() run_crash_gen(crash_cmds) + endtime = datetime.datetime.now() status = check_status() # back_path = os.path.join(core_path,"valgrind_report") @@ -384,7 +377,12 @@ def main(): print('======== crash_gen run sucess and exit as expected ========') try: - text = f"crash_gen instance exit status of docker [ {hostname} ] is : {msg_dict[status]}\n " + f" and git commit : {git_commit}" + text = f'''exit status: {msg_dict[status]} + git commit : {git_commit} + hostname: {hostname} + start time: {starttime} + end time: {endtime} + cmd: {crash_cmds}''' send_msg(get_msg(text)) except Exception as e: print("exception:", e) diff --git a/tests/pytest/auto_crash_gen_valgrind_cluster.py b/tests/pytest/auto_crash_gen_valgrind_cluster.py index 547de9af47..5189ff4262 100755 --- a/tests/pytest/auto_crash_gen_valgrind_cluster.py +++ b/tests/pytest/auto_crash_gen_valgrind_cluster.py @@ -1,6 +1,7 @@ #!/usr/bin/python3 +import datetime import os import socket import requests @@ -241,16 +242,7 @@ def start_taosd(): os.system(start_cmd +">>/dev/null") def get_cmds(args_list): - # build_path = get_path() - # if repo == "community": - # crash_gen_path = build_path[:-5]+"community/tests/pytest/" - # elif repo == "TDengine": - # crash_gen_path = build_path[:-5]+"/tests/pytest/" - # else: - # pass - - # crash_gen_cmd = 'cd %s && ./crash_gen.sh --valgrind -p -t 10 -s 1000 -g 0x32c,0x32d,0x3d3,0x18,0x2501,0x369,0x388,0x061a,0x2550 '%(crash_gen_path) - + crash_gen_cmd = get_auto_mix_cmds(args_list,valgrind=valgrind_mode) return crash_gen_cmd @@ -342,8 +334,7 @@ def main(): args = random_args(args_list) args = limits(args) - build_path = get_path() - os.system("pip install git+https://github.com/taosdata/taos-connector-python.git >>/dev/null") + build_path = get_path() if repo =="community": crash_gen_path = build_path[:-5]+"community/tests/pytest/" elif repo =="TDengine": @@ -368,7 +359,9 @@ def main(): if not os.path.exists(run_dir): os.mkdir(run_dir) print(crash_cmds) + starttime = datetime.datetime.now() run_crash_gen(crash_cmds) + endtime = datetime.datetime.now() status = check_status() # back_path = os.path.join(core_path,"valgrind_report") @@ -384,7 +377,12 @@ def main(): print('======== crash_gen run sucess and exit as expected ========') try: - text = f"crash_gen instance exit status of docker [ {hostname} ] is : {msg_dict[status]}\n " + f" and git commit : {git_commit}" + text = f'''exit status: {msg_dict[status]} + git commit : {git_commit} + hostname: {hostname} + start time: {starttime} + end time: {endtime} + cmd: {crash_cmds}''' send_msg(get_msg(text)) except Exception as e: print("exception:", e)