diff --git a/README-CN.md b/README-CN.md index 189b7a059a..12ac7b9ee7 100644 --- a/README-CN.md +++ b/README-CN.md @@ -352,4 +352,4 @@ TDengine 提供了丰富的应用程序开发接口,其中包括 C/C++、Java # 加入技术交流群 -TDengine 官方社群「物联网大数据群」对外开放,欢迎您加入讨论。搜索微信号 "tdengine",加小 T 为好友,即可入群。 +TDengine 官方社群「物联网大数据群」对外开放,欢迎您加入讨论。搜索微信号 "tdengine1",加小 T 为好友,即可入群。 diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 8956801bd4..72309620d1 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -113,14 +113,14 @@ typedef struct { int64_t ver; int32_t* dataRef; SPackedData submit; -} SStreamDataSubmit2; +} SStreamDataSubmit; typedef struct { int8_t type; int64_t ver; SArray* dataRefs; // SArray SArray* submits; // SArray -} SStreamMergedSubmit2; +} SStreamMergedSubmit; typedef struct { int8_t type; @@ -209,10 +209,10 @@ static FORCE_INLINE void streamQueueProcessFail(SStreamQueue* queue) { void* streamQueueNextItem(SStreamQueue* queue); -SStreamDataSubmit2* streamDataSubmitNew(SPackedData submit, int32_t type); -void streamDataSubmitDestroy(SStreamDataSubmit2* pDataSubmit); +SStreamDataSubmit* streamDataSubmitNew(SPackedData* pData, int32_t type); +void streamDataSubmitDestroy(SStreamDataSubmit* pDataSubmit); -SStreamDataSubmit2* streamSubmitBlockClone(SStreamDataSubmit2* pSubmit); +SStreamDataSubmit* streamSubmitBlockClone(SStreamDataSubmit* pSubmit); typedef struct { char* qmsg; @@ -239,6 +239,7 @@ typedef struct { void* vnode; // not available to encoder and decoder FTbSink* tbSinkFunc; STSchema* pTSchema; + SSHashObj* pTblInfo; } STaskSinkTb; typedef void FSmaSink(void* vnode, int64_t smaId, const SArray* data); diff --git a/packaging/docker/bin/entrypoint.sh b/packaging/docker/bin/entrypoint.sh index f2811de7bd..a60254d7ef 100755 --- a/packaging/docker/bin/entrypoint.sh +++ b/packaging/docker/bin/entrypoint.sh @@ -55,7 +55,7 @@ else exit $? fi while true; do - es=$(taos -h $FIRST_EP_HOST -P $FIRST_EP_PORT --check) + es=$(taos -h $FIRST_EP_HOST -P $FIRST_EP_PORT --check | grep "^[0-9]*:") echo ${es} if [ "${es%%:*}" -eq 2 ]; then echo "execute create dnode" diff --git a/packaging/docker/bin/taos-check b/packaging/docker/bin/taos-check index 5dc06b6018..349187da9b 100755 --- a/packaging/docker/bin/taos-check +++ b/packaging/docker/bin/taos-check @@ -1,5 +1,5 @@ #!/bin/sh -es=$(taos --check) +es=$(taos --check | grep "^[0-9]*:") code=${es%%:*} if [ "$code" -ne "0" ] && [ "$code" -ne "4" ]; then exit 0 diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c index c8fb5f7bd8..105934f1b3 100644 --- a/source/client/src/clientSml.c +++ b/source/client/src/clientSml.c @@ -1351,7 +1351,7 @@ static int32_t smlInsertData(SSmlHandle *info) { } taosArrayPush(info->pRequest->tableList, &pName); - tstrncpy(pName.tname, tableData->childTableName, strlen(tableData->childTableName) + 1); + strcpy(pName.tname, tableData->childTableName); SRequestConnInfo conn = {0}; conn.pTrans = info->taos->pAppInfo->pTransporter; diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 81b9b47e5c..75db9600ee 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -84,7 +84,7 @@ bool tsMonitorComp = false; // telem bool tsEnableTelem = true; int32_t tsTelemInterval = 43200; -char tsTelemServer[TSDB_FQDN_LEN] = "telemetry.taosdata.com"; +char tsTelemServer[TSDB_FQDN_LEN] = "telemetry.tdengine.com"; uint16_t tsTelemPort = 80; char *tsTelemUri = "/report"; diff --git a/source/dnode/mgmt/exe/dmMain.c b/source/dnode/mgmt/exe/dmMain.c index f0e020edfe..989bff3984 100644 --- a/source/dnode/mgmt/exe/dmMain.c +++ b/source/dnode/mgmt/exe/dmMain.c @@ -87,18 +87,6 @@ static void dmStopDnode(int signum, void *sigInfo, void *context) { } void dmLogCrash(int signum, void *sigInfo, void *context) { - taosIgnSignal(SIGTERM); - taosIgnSignal(SIGHUP); - taosIgnSignal(SIGINT); - taosIgnSignal(SIGBREAK); - -#ifndef WINDOWS - taosIgnSignal(SIGBUS); -#endif - taosIgnSignal(SIGABRT); - taosIgnSignal(SIGFPE); - taosIgnSignal(SIGSEGV); - char *pMsg = NULL; const char *flags = "UTL FATAL "; ELogLevel level = DEBUG_FATAL; diff --git a/source/dnode/mnode/impl/src/mndProfile.c b/source/dnode/mnode/impl/src/mndProfile.c index 50e502f4ab..ff6a2f460a 100644 --- a/source/dnode/mnode/impl/src/mndProfile.c +++ b/source/dnode/mnode/impl/src/mndProfile.c @@ -256,10 +256,13 @@ static int32_t mndProcessConnectReq(SRpcMsg *pReq) { snprintf(db, TSDB_DB_FNAME_LEN, "%d%s%s", pUser->acctId, TS_PATH_DELIMITER, connReq.db); pDb = mndAcquireDb(pMnode, db); if (pDb == NULL) { - terrno = TSDB_CODE_MND_INVALID_DB; - mGError("user:%s, failed to login from %s while use db:%s since %s", pReq->info.conn.user, ip, connReq.db, - terrstr()); - goto _OVER; + if (0 != strcmp(connReq.db, TSDB_INFORMATION_SCHEMA_DB) && + (0 != strcmp(connReq.db, TSDB_PERFORMANCE_SCHEMA_DB))) { + terrno = TSDB_CODE_MND_INVALID_DB; + mGError("user:%s, failed to login from %s while use db:%s since %s", pReq->info.conn.user, ip, connReq.db, + terrstr()); + goto _OVER; + } } if (mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_READ_OR_WRITE_DB, pDb) != 0) { diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index d6e4b86097..2a4a471b97 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -101,6 +101,8 @@ typedef struct { STqPushHandle pushHandle; // push STqExecHandle execHandle; // exec SRpcMsg* msg; + int32_t noDataPollCnt; + int8_t exec; } STqHandle; typedef struct { @@ -181,7 +183,7 @@ int32_t tqStreamTasksScanWal(STQ* pTq); // tq util char* createStreamTaskIdStr(int64_t streamId, int32_t taskId); -int32_t tqAddInputBlockNLaunchTask(SStreamTask* pTask, SStreamQueueItem* pQueueItem, int64_t ver); +int32_t tqAddBlockNLaunchTask(SStreamTask* pTask, SPackedData* pData); int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg); #ifdef __cplusplus diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 21c2ab13ab..b39d9d74a0 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -572,6 +572,10 @@ end: return ret; } +void freePtr(void *ptr) { + taosMemoryFree(*(void**)ptr); +} + int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { int32_t vgId = TD_VID(pTq->pVnode); pTask->id.idStr = createStreamTaskIdStr(pTask->id.streamId, pTask->id.taskId); @@ -646,6 +650,8 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { if (pTask->tbSink.pTSchema == NULL) { return -1; } + pTask->tbSink.pTblInfo = tSimpleHashInit(10240, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT)); + tSimpleHashSetFreeFp(pTask->tbSink.pTblInfo, freePtr); } if (pTask->taskLevel == TASK_LEVEL__SOURCE) { diff --git a/source/dnode/vnode/src/tq/tqPush.c b/source/dnode/vnode/src/tq/tqPush.c index 950c5ea96b..85fcb4fd80 100644 --- a/source/dnode/vnode/src/tq/tqPush.c +++ b/source/dnode/vnode/src/tq/tqPush.c @@ -55,6 +55,7 @@ int32_t tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg) { memcpy(pHandle->msg, pMsg, sizeof(SRpcMsg)); pHandle->msg->pCont = rpcMallocCont(pMsg->contLen); } else { + tqPushDataRsp(pTq, pHandle); void* tmp = pHandle->msg->pCont; memcpy(pHandle->msg, pMsg, sizeof(SRpcMsg)); pHandle->msg->pCont = tmp; diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index edea7724b5..bd1ccfe5dc 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -312,7 +312,6 @@ int32_t extractSubmitMsgFromWal(SWalReader* pReader, SPackedData* pPackedData) { void* data = taosMemoryMalloc(len); if (data == NULL) { - // todo: for all stream in this vnode, keep this offset in the offset files, and wait for a moment, and then retry terrno = TSDB_CODE_OUT_OF_MEMORY; tqError("vgId:%d, failed to copy submit data for stream processing, since out of memory", 0); return -1; diff --git a/source/dnode/vnode/src/tq/tqRestore.c b/source/dnode/vnode/src/tq/tqRestore.c index b956027741..8ada268c4d 100644 --- a/source/dnode/vnode/src/tq/tqRestore.c +++ b/source/dnode/vnode/src/tq/tqRestore.c @@ -120,8 +120,6 @@ int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { continue; } - - // append the data for the stream tqDebug("vgId:%d s-task:%s wal reader seek to ver:%" PRId64, vgId, pTask->id.idStr, pTask->chkInfo.currentVer); } else { @@ -145,17 +143,9 @@ int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { continue; } - SStreamDataSubmit2* p = streamDataSubmitNew(packData, STREAM_INPUT__DATA_SUBMIT); - if (p == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - tqError("%s failed to create data submit for stream since out of memory", pTask->id.idStr); - streamMetaReleaseTask(pStreamMeta, pTask); - continue; - } - noNewDataInWal = false; - code = tqAddInputBlockNLaunchTask(pTask, (SStreamQueueItem*)p, packData.ver); + code = tqAddBlockNLaunchTask(pTask, &packData); if (code == TSDB_CODE_SUCCESS) { pTask->chkInfo.currentVer = walReaderGetCurrentVer(pTask->exec.pWalReader); tqDebug("s-task:%s set the ver:%" PRId64 " from WALReader after extract block from WAL", pTask->id.idStr, @@ -164,8 +154,6 @@ int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { tqError("s-task:%s append input queue failed, ver:%" PRId64, pTask->id.idStr, pTask->chkInfo.currentVer); } - streamDataSubmitDestroy(p); - taosFreeQitem(p); streamMetaReleaseTask(pStreamMeta, pTask); } diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 9373f23c02..0bd7d9a57b 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -17,6 +17,13 @@ #include "tmsg.h" #include "tq.h" +#define MAX_CACHE_TABLE_INFO_NUM 10240 + +typedef struct STableSinkInfo { + uint64_t uid; + char tbName[TSDB_TABLE_NAME_LEN]; +} STableSinkInfo; + int32_t tqBuildDeleteReq(const char* stbFullName, const SSDataBlock* pDataBlock, SBatchDeleteReq* deleteReq, const char* pIdStr) { int32_t totalRows = pDataBlock->info.rows; @@ -90,6 +97,24 @@ end: return ret; } +static int32_t tqGetTableInfo(SSHashObj* pTableInfoMap,uint64_t groupId, STableSinkInfo** pInfo) { + void* pVal = tSimpleHashGet(pTableInfoMap, &groupId, sizeof(uint64_t)); + if (pVal) { + *pInfo = *(STableSinkInfo**)pVal; + return TSDB_CODE_SUCCESS; + } + + return TSDB_CODE_FAILED; +} + +int32_t tqPutTableInfo(SSHashObj* tblInfo ,uint64_t groupId, STableSinkInfo* pTbl) { + if (tSimpleHashGetSize(tblInfo) > MAX_CACHE_TABLE_INFO_NUM) { + return TSDB_CODE_FAILED; + } + + return tSimpleHashPut(tblInfo, &groupId, sizeof(uint64_t), &pTbl, POINTER_BYTES); +} + int32_t tqPutReqToQueue(SVnode* pVnode, SVCreateTbBatchReq* pReqs) { void* buf = NULL; int32_t tlen = 0; @@ -251,7 +276,7 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d crTblArray = NULL; } else { SSubmitTbData tbData = {0}; - tqDebug("tq sink pipe, convert block1 %d, rows: %d", i, rows); + tqDebug("tq sink pipe, convert block:%d, rows:%d", i, rows); if (!(tbData.aRowP = taosArrayInit(rows, sizeof(SRow*)))) { goto _end; @@ -261,99 +286,114 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d tbData.uid = 0; // uid is assigned by vnode tbData.sver = pTSchema->version; - char* ctbName = NULL; - tqDebug("vgId:%d, stream write into %s, table auto created", TD_VID(pVnode), pDataBlock->info.parTbName); - if (pDataBlock->info.parTbName[0]) { - ctbName = taosStrdup(pDataBlock->info.parTbName); - } else { - ctbName = buildCtbNameByGroupId(stbFullName, pDataBlock->info.id.groupId); + STableSinkInfo* pTableSinkInfo = NULL; + int32_t res = tqGetTableInfo(pTask->tbSink.pTblInfo, pDataBlock->info.id.groupId, &pTableSinkInfo); + if (res != TSDB_CODE_SUCCESS) { + pTableSinkInfo = taosMemoryCalloc(1, sizeof(STableSinkInfo)); } - SMetaReader mr = {0}; - metaReaderInit(&mr, pVnode->pMeta, 0); - if (metaGetTableEntryByName(&mr, ctbName) < 0) { - metaReaderClear(&mr); - tqDebug("vgId:%d, stream write into %s, table auto created", TD_VID(pVnode), ctbName); - - SVCreateTbReq* pCreateTbReq = NULL; - - if (!(pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateStbReq)))) { - taosMemoryFree(ctbName); - goto _end; - }; - - // set const - pCreateTbReq->flags = 0; - pCreateTbReq->type = TSDB_CHILD_TABLE; - pCreateTbReq->ctb.suid = suid; - - // set super table name - SName name = {0}; - tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); - pCreateTbReq->ctb.stbName = taosStrdup((char*)tNameGetTableName(&name)); // taosStrdup(stbFullName); - - // set tag content - tagArray = taosArrayInit(1, sizeof(STagVal)); - if (!tagArray) { - taosMemoryFree(ctbName); - tdDestroySVCreateTbReq(pCreateTbReq); - goto _end; + char* ctbName = pDataBlock->info.parTbName; + if (!ctbName[0]) { + if (res == TSDB_CODE_SUCCESS) { + memcpy(ctbName, pTableSinkInfo->tbName, strlen(pTableSinkInfo->tbName)); + } else { + char* tmp = buildCtbNameByGroupId(stbFullName, pDataBlock->info.id.groupId); + memcpy(ctbName, tmp, strlen(tmp)); + memcpy(pTableSinkInfo->tbName, tmp, strlen(tmp)); + taosMemoryFree(tmp); + tqDebug("vgId:%d, gropuId:%" PRIu64 " datablock table name is null", TD_VID(pVnode), + pDataBlock->info.id.groupId); } - STagVal tagVal = { - .cid = pTSchema->numOfCols + 1, - .type = TSDB_DATA_TYPE_UBIGINT, - .i64 = (int64_t)pDataBlock->info.id.groupId, - }; - taosArrayPush(tagArray, &tagVal); - pCreateTbReq->ctb.tagNum = taosArrayGetSize(tagArray); + } - STag* pTag = NULL; - tTagNew(tagArray, 1, false, &pTag); - tagArray = taosArrayDestroy(tagArray); - if (pTag == NULL) { - taosMemoryFree(ctbName); - tdDestroySVCreateTbReq(pCreateTbReq); - terrno = TSDB_CODE_OUT_OF_MEMORY; - taosMemoryFree(ctbName); - tdDestroySVCreateTbReq(pCreateTbReq); - goto _end; - } - pCreateTbReq->ctb.pTag = (uint8_t*)pTag; - - // set tag name - SArray* tagName = taosArrayInit(1, TSDB_COL_NAME_LEN); - char tagNameStr[TSDB_COL_NAME_LEN] = {0}; - strcpy(tagNameStr, "group_id"); - taosArrayPush(tagName, tagNameStr); - pCreateTbReq->ctb.tagName = tagName; - - // set table name - pCreateTbReq->name = ctbName; - ctbName = NULL; - - tbData.pCreateTbReq = pCreateTbReq; - tbData.flags = SUBMIT_REQ_AUTO_CREATE_TABLE; + if (res == TSDB_CODE_SUCCESS) { + tbData.uid = pTableSinkInfo->uid; } else { - if (mr.me.type != TSDB_CHILD_TABLE) { - tqError("vgId:%d, failed to write into %s, since table type incorrect, type %d", TD_VID(pVnode), ctbName, - mr.me.type); + SMetaReader mr = {0}; + metaReaderInit(&mr, pVnode->pMeta, 0); + if (metaGetTableEntryByName(&mr, ctbName) < 0) { metaReaderClear(&mr); - taosMemoryFree(ctbName); - continue; - } + taosMemoryFree(pTableSinkInfo); + tqDebug("vgId:%d, stream write into %s, table auto created", TD_VID(pVnode), ctbName); - if (mr.me.ctbEntry.suid != suid) { - tqError("vgId:%d, failed to write into %s, since suid mismatch, expect suid: %" PRId64 - ", actual suid %" PRId64 "", - TD_VID(pVnode), ctbName, suid, mr.me.ctbEntry.suid); + SVCreateTbReq* pCreateTbReq = NULL; + + if (!(pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateStbReq)))) { + goto _end; + }; + + // set const + pCreateTbReq->flags = 0; + pCreateTbReq->type = TSDB_CHILD_TABLE; + pCreateTbReq->ctb.suid = suid; + + // set super table name + SName name = {0}; + tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); + pCreateTbReq->ctb.stbName = taosStrdup((char*)tNameGetTableName(&name)); // taosStrdup(stbFullName); + + // set tag content + tagArray = taosArrayInit(1, sizeof(STagVal)); + if (!tagArray) { + tdDestroySVCreateTbReq(pCreateTbReq); + goto _end; + } + STagVal tagVal = { + .cid = pTSchema->numOfCols + 1, + .type = TSDB_DATA_TYPE_UBIGINT, + .i64 = (int64_t)pDataBlock->info.id.groupId, + }; + taosArrayPush(tagArray, &tagVal); + pCreateTbReq->ctb.tagNum = taosArrayGetSize(tagArray); + + STag* pTag = NULL; + tTagNew(tagArray, 1, false, &pTag); + tagArray = taosArrayDestroy(tagArray); + if (pTag == NULL) { + tdDestroySVCreateTbReq(pCreateTbReq); + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto _end; + } + pCreateTbReq->ctb.pTag = (uint8_t*)pTag; + + // set tag name + SArray* tagName = taosArrayInit(1, TSDB_COL_NAME_LEN); + char tagNameStr[TSDB_COL_NAME_LEN] = {0}; + strcpy(tagNameStr, "group_id"); + taosArrayPush(tagName, tagNameStr); + pCreateTbReq->ctb.tagName = tagName; + + // set table name + pCreateTbReq->name = taosStrdup(ctbName); + + tbData.pCreateTbReq = pCreateTbReq; + tbData.flags = SUBMIT_REQ_AUTO_CREATE_TABLE; + } else { + if (mr.me.type != TSDB_CHILD_TABLE) { + tqError("vgId:%d, failed to write into %s, since table type incorrect, type %d", TD_VID(pVnode), ctbName, + mr.me.type); + metaReaderClear(&mr); + taosMemoryFree(pTableSinkInfo); + continue; + } + + if (mr.me.ctbEntry.suid != suid) { + tqError("vgId:%d, failed to write into %s, since suid mismatch, expect suid: %" PRId64 + ", actual suid %" PRId64 "", + TD_VID(pVnode), ctbName, suid, mr.me.ctbEntry.suid); + metaReaderClear(&mr); + taosMemoryFree(pTableSinkInfo); + continue; + } + + tbData.uid = mr.me.uid; + pTableSinkInfo->uid = mr.me.uid; + int32_t code = tqPutTableInfo(pTask->tbSink.pTblInfo, pDataBlock->info.id.groupId, pTableSinkInfo); + if (code != TSDB_CODE_SUCCESS) { + taosMemoryFreeClear(pTableSinkInfo); + } metaReaderClear(&mr); - taosMemoryFree(ctbName); - continue; } - - tbData.uid = mr.me.uid; - metaReaderClear(&mr); - taosMemoryFreeClear(ctbName); } // rows diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 051efccc4f..8f9a490048 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -16,6 +16,7 @@ #include "tq.h" #define IS_OFFSET_RESET_TYPE(_t) ((_t) < 0) +#define NO_POLL_CNT 5 static int32_t tqSendMetaPollRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqMetaRsp* pRsp); @@ -25,10 +26,15 @@ char* createStreamTaskIdStr(int64_t streamId, int32_t taskId) { return taosStrdup(buf); } -int32_t tqAddInputBlockNLaunchTask(SStreamTask* pTask, SStreamQueueItem* pQueueItem, int64_t ver) { - int32_t code = tAppendDataToInputQueue(pTask, pQueueItem); +int32_t tqAddBlockNLaunchTask(SStreamTask* pTask, SPackedData *pPackedData) { + SStreamDataSubmit* p = streamDataSubmitNew(pPackedData, STREAM_INPUT__DATA_SUBMIT); + + int32_t code = tAppendDataToInputQueue(pTask, (SStreamQueueItem*) p); + streamDataSubmitDestroy(p); + taosFreeQitem(p); + if (code < 0) { - tqError("s-task:%s failed to put into queue, too many, next start ver:%" PRId64, pTask->id.idStr, ver); + tqError("s-task:%s failed to put into queue, too many, next start ver:%" PRId64, pTask->id.idStr, pPackedData->ver); return -1; } @@ -161,6 +167,10 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand return 0; } +static bool isHandleExecuting(STqHandle* pHandle){ + return 1 == atomic_load_8(&pHandle->exec); +} + static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg, STqOffsetVal* pOffset) { uint64_t consumerId = pRequest->consumerId; @@ -176,6 +186,12 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, return code; } + while(isHandleExecuting(pHandle)){ + tqInfo("sub is executing, pHandle:%p", pHandle); + taosMsleep(5); + } + atomic_store_8(&pHandle->exec, 1); + qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId); code = tqScanData(pTq, pHandle, &dataRsp, pOffset); if(code != 0) { @@ -185,17 +201,23 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, // till now, all data has been transferred to consumer, new data needs to push client once arrived. if (dataRsp.blockNum == 0 && dataRsp.reqOffset.type == TMQ_OFFSET__LOG && dataRsp.reqOffset.version == dataRsp.rspOffset.version && pHandle->consumerId == pRequest->consumerId) { - // lock - taosWLockLatch(&pTq->lock); - code = tqRegisterPushHandle(pTq, pHandle, pMsg); - taosWUnLockLatch(&pTq->lock); - tDeleteSMqDataRsp(&dataRsp); - return code; + if(pHandle->noDataPollCnt >= NO_POLL_CNT){ // send poll result to client if no data 5 times to avoid lost data + pHandle->noDataPollCnt = 0; + // lock + taosWLockLatch(&pTq->lock); + code = tqRegisterPushHandle(pTq, pHandle, pMsg); + taosWUnLockLatch(&pTq->lock); + tDeleteSMqDataRsp(&dataRsp); + atomic_store_8(&pHandle->exec, 0); + return code; + } + else{ + pHandle->noDataPollCnt++; + } } code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_RSP); - // NOTE: this pHandle->consumerId may have been changed already. end: @@ -207,12 +229,14 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, // taosWUnLockLatch(&pTq->lock); tDeleteSMqDataRsp(&dataRsp); } + atomic_store_8(&pHandle->exec, 0); + return code; } static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg, STqOffsetVal *offset) { - int code = 0; + int code = 0; int32_t vgId = TD_VID(pTq->pVnode); SWalCkHead* pCkHead = NULL; SMqMetaRsp metaRsp = {0}; @@ -225,10 +249,16 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, return code; } + while(isHandleExecuting(pHandle)){ + tqInfo("sub is executing, pHandle:%p", pHandle); + taosMsleep(5); + } + atomic_store_8(&pHandle->exec, 1); + if (offset->type != TMQ_OFFSET__LOG) { if (tqScanTaosx(pTq, pHandle, &taosxRsp, &metaRsp, offset) < 0) { - tDeleteSTaosxRsp(&taosxRsp); - return -1; + code = -1; + goto end; } if (metaRsp.metaRspLen > 0) { @@ -236,16 +266,14 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, tqDebug("tmq poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send meta offset type:%d,uid:%" PRId64 ",ts:%" PRId64, pRequest->consumerId, pHandle->subKey, vgId, metaRsp.rspOffset.type, metaRsp.rspOffset.uid, metaRsp.rspOffset.ts); taosMemoryFree(metaRsp.metaRsp); - tDeleteSTaosxRsp(&taosxRsp); - return code; + goto end; } tqDebug("taosx poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send data blockNum:%d, offset type:%d,uid:%" PRId64 ",ts:%" PRId64,pRequest->consumerId, pHandle->subKey, vgId, taosxRsp.blockNum, taosxRsp.rspOffset.type, taosxRsp.rspOffset.uid,taosxRsp.rspOffset.ts); if (taosxRsp.blockNum > 0) { code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP); - tDeleteSTaosxRsp(&taosxRsp); - return code; + goto end; }else { *offset = taosxRsp.rspOffset; } @@ -257,9 +285,9 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, int64_t fetchVer = offset->version + 1; pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048); if (pCkHead == NULL) { - tDeleteSTaosxRsp(&taosxRsp); terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; + code = -1; + goto end; } walSetReaderCapacity(pHandle->pWalReader, 2048); int totalRows = 0; @@ -274,9 +302,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, if (tqFetchLog(pTq, pHandle, &fetchVer, &pCkHead, pRequest->reqId) < 0) { tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer); code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP); - tDeleteSTaosxRsp(&taosxRsp); - taosMemoryFreeClear(pCkHead); - return code; + goto end; } SWalCont* pHead = &pCkHead->head; @@ -288,9 +314,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, if(totalRows > 0) { tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer - 1); code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP); - tDeleteSTaosxRsp(&taosxRsp); - taosMemoryFreeClear(pCkHead); - return code; + goto end; } tqDebug("fetch meta msg, ver:%" PRId64 ", type:%s", pHead->version, TMSG_INFO(pHead->msgType)); @@ -298,17 +322,8 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, metaRsp.resMsgType = pHead->msgType; metaRsp.metaRspLen = pHead->bodyLen; metaRsp.metaRsp = pHead->body; - if (tqSendMetaPollRsp(pTq, pMsg, pRequest, &metaRsp) < 0) { - code = -1; - taosMemoryFreeClear(pCkHead); - tDeleteSTaosxRsp(&taosxRsp); - return code; - } - - code = 0; - taosMemoryFreeClear(pCkHead); - tDeleteSTaosxRsp(&taosxRsp); - return code; + code = tqSendMetaPollRsp(pTq, pMsg, pRequest, &metaRsp); + goto end; } // process data @@ -318,29 +333,28 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, .ver = pHead->version, }; - if (tqTaosxScanLog(pTq, pHandle, submit, &taosxRsp, &totalRows) < 0) { - tqError("tmq poll: tqTaosxScanLog error %" PRId64 ", in vgId:%d, subkey %s", pRequest->consumerId, vgId, - pRequest->subKey); - taosMemoryFreeClear(pCkHead); - tDeleteSTaosxRsp(&taosxRsp); - return -1; + code = tqTaosxScanLog(pTq, pHandle, submit, &taosxRsp, &totalRows); + if (code < 0) { + tqError("tmq poll: tqTaosxScanLog error %" PRId64 ", in vgId:%d, subkey %s", pRequest->consumerId, vgId, pRequest->subKey); + goto end; } if (totalRows >= 4096 || taosxRsp.createTableNum > 0) { tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer); code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP); - tDeleteSTaosxRsp(&taosxRsp); - taosMemoryFreeClear(pCkHead); - return code; + goto end; } else { fetchVer++; } } } +end: + atomic_store_8(&pHandle->exec, 0); + tDeleteSTaosxRsp(&taosxRsp); taosMemoryFreeClear(pCkHead); - return 0; + return code; } int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg) { diff --git a/source/libs/catalog/src/ctgAsync.c b/source/libs/catalog/src/ctgAsync.c index 56c79eac1f..9e654e89d9 100644 --- a/source/libs/catalog/src/ctgAsync.c +++ b/source/libs/catalog/src/ctgAsync.c @@ -1437,12 +1437,12 @@ _return: SMetaRes* pRes = taosArrayGet(ctx->pResList, pFetch->resIdx); pRes->code = code; pRes->pRes = NULL; + ctgTaskError("Get table %d.%s.%s meta failed with error %s", pName->acctId, pName->dbname, pName->tname, + tstrerror(code)); if (0 == atomic_sub_fetch_32(&ctx->fetchNum, 1)) { TSWAP(pTask->res, ctx->pResList); taskDone = true; } - ctgTaskError("Get table %d.%s.%s meta failed with error %s", pName->acctId, pName->dbname, pName->tname, - tstrerror(code)); } if (pTask->res && taskDone) { diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index c51dc39b5b..c8b16ad83b 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -1484,14 +1484,23 @@ static int32_t setSelectValueColumnInfo(SqlFunctionCtx* pCtx, int32_t numOfOutpu return TSDB_CODE_OUT_OF_MEMORY; } + SHashObj *pSelectFuncs = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); for (int32_t i = 0; i < numOfOutput; ++i) { const char* pName = pCtx[i].pExpr->pExpr->_function.functionName; if ((strcmp(pName, "_select_value") == 0) || (strcmp(pName, "_group_key") == 0)) { pValCtx[num++] = &pCtx[i]; } else if (fmIsSelectFunc(pCtx[i].functionId)) { - p = &pCtx[i]; + void* data = taosHashGet(pSelectFuncs, pName, strlen(pName)); + if (taosHashGetSize(pSelectFuncs) != 0 && data == NULL) { + p = NULL; + break; + } else { + taosHashPut(pSelectFuncs, pName, strlen(pName), &num, sizeof(num)); + p = &pCtx[i]; + } } } + taosHashCleanup(pSelectFuncs); if (p != NULL) { p->subsidiaries.pCtx = pValCtx; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 5a539106b3..3d114706d6 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1583,7 +1583,7 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock // currently only the tbname pseudo column if (pInfo->numOfPseudoExpr > 0) { int32_t code = addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, pInfo->pRes, - pInfo->pRes->info.rows, GET_TASKID(pTaskInfo), NULL); + pInfo->pRes->info.rows, GET_TASKID(pTaskInfo), &pTableScanInfo->base.metaCache); // ignore the table not exists error, since this table may have been dropped during the scan procedure. if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_PAR_TABLE_NOT_EXIST) { blockDataFreeRes((SSDataBlock*)pBlock); diff --git a/source/libs/parser/src/parCalcConst.c b/source/libs/parser/src/parCalcConst.c index c25d0e7036..01b62a9051 100644 --- a/source/libs/parser/src/parCalcConst.c +++ b/source/libs/parser/src/parCalcConst.c @@ -388,6 +388,9 @@ static bool isSetUselessCol(SSetOperator* pSetOp, int32_t index, SExprNode* pPro } static int32_t calcConstSetOpProjections(SCalcConstContext* pCxt, SSetOperator* pSetOp, bool subquery) { + if (subquery && pSetOp->opType == SET_OP_TYPE_UNION) { + return TSDB_CODE_SUCCESS; + } int32_t index = 0; SNode* pProj = NULL; WHERE_EACH(pProj, pSetOp->pProjectionList) { diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index d1bd47b884..26e774779e 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -5330,7 +5330,8 @@ static int32_t checkAlterSuperTableBySchema(STranslateContext* pCxt, SAlterTable } if (TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES == pStmt->alterType) { - if (calcTypeBytes(pStmt->dataType) > TSDB_MAX_FIELD_LEN) { + if ((TSDB_DATA_TYPE_VARCHAR == pStmt->dataType.type && calcTypeBytes(pStmt->dataType) > TSDB_MAX_BINARY_LEN) || + (TSDB_DATA_TYPE_NCHAR == pStmt->dataType.type && calcTypeBytes(pStmt->dataType) > TSDB_MAX_NCHAR_LEN)) { return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN); } @@ -5355,6 +5356,11 @@ static int32_t checkAlterSuperTableBySchema(STranslateContext* pCxt, SAlterTable return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_TOO_MANY_COLUMNS); } + if ((TSDB_DATA_TYPE_VARCHAR == pStmt->dataType.type && calcTypeBytes(pStmt->dataType) > TSDB_MAX_BINARY_LEN) || + (TSDB_DATA_TYPE_NCHAR == pStmt->dataType.type && calcTypeBytes(pStmt->dataType) > TSDB_MAX_NCHAR_LEN)) { + return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN); + } + if (pTableMeta->tableInfo.rowSize + calcTypeBytes(pStmt->dataType) > TSDB_MAX_BYTES_PER_ROW) { return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_ROW_LENGTH, TSDB_MAX_BYTES_PER_ROW); } @@ -8359,6 +8365,11 @@ static int32_t buildUpdateColReq(STranslateContext* pCxt, SAlterTableStmt* pStmt return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_MODIFY_COL); } + if ((TSDB_DATA_TYPE_VARCHAR == pStmt->dataType.type && calcTypeBytes(pStmt->dataType) > TSDB_MAX_BINARY_LEN) || + (TSDB_DATA_TYPE_NCHAR == pStmt->dataType.type && calcTypeBytes(pStmt->dataType) > TSDB_MAX_NCHAR_LEN)) { + return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN); + } + if (pTableMeta->tableInfo.rowSize + pReq->colModBytes - pSchema->bytes > TSDB_MAX_BYTES_PER_ROW) { return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_ROW_LENGTH, TSDB_MAX_BYTES_PER_ROW); } diff --git a/source/libs/planner/src/planner.c b/source/libs/planner/src/planner.c index c6a4a97f6e..58b8e53478 100644 --- a/source/libs/planner/src/planner.c +++ b/source/libs/planner/src/planner.c @@ -19,6 +19,14 @@ #include "scalar.h" #include "tglobal.h" +static void debugPrintNode(SNode* pNode) { + char* pStr = NULL; + nodesNodeToString(pNode, false, &pStr, NULL); + printf("%s\n", pStr); + taosMemoryFree(pStr); + return; +} + static void dumpQueryPlan(SQueryPlan* pPlan) { if (!tsQueryPlannerTrace) { return; diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 0286df2a9a..a0a7d5caf7 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -18,6 +18,9 @@ #define STREAM_TASK_INPUT_QUEUEU_CAPACITY 20480 #define STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE (100) +#define ONE_MB_F (1048576.0) + +#define QUEUE_MEM_SIZE_IN_MB(_q) (taosQueueMemorySize(_q)/ONE_MB_F) int32_t streamInit() { int8_t old; @@ -281,45 +284,36 @@ int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, S } bool tInputQueueIsFull(const SStreamTask* pTask) { - return taosQueueItemSize((pTask->inputQueue->queue)) >= STREAM_TASK_INPUT_QUEUEU_CAPACITY; + bool isFull = taosQueueItemSize((pTask->inputQueue->queue)) >= STREAM_TASK_INPUT_QUEUEU_CAPACITY; + double size = QUEUE_MEM_SIZE_IN_MB(pTask->inputQueue->queue); + return (isFull || size >= STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE); } int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) { int8_t type = pItem->type; if (type == STREAM_INPUT__DATA_SUBMIT) { - SStreamDataSubmit2* pSubmitBlock = streamSubmitBlockClone((SStreamDataSubmit2*)pItem); - if (pSubmitBlock == NULL) { - qDebug("task %d %p submit enqueue failed since out of memory", pTask->id.taskId, pTask); - terrno = TSDB_CODE_OUT_OF_MEMORY; - atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED); - return -1; - } - int32_t numOfBlocks = taosQueueItemSize(pTask->inputQueue->queue) + 1; - double size = taosQueueMemorySize(pTask->inputQueue->queue) / 1048576.0; + double size = QUEUE_MEM_SIZE_IN_MB(pTask->inputQueue->queue); - qDebug("s-task:%s submit enqueue %p %p msgLen:%d ver:%" PRId64 ", total in queue:%d, size:%.2fMiB", pTask->id.idStr, - pItem, pSubmitBlock->submit.msgStr, pSubmitBlock->submit.msgLen, - pSubmitBlock->submit.ver, numOfBlocks, size); + SStreamDataSubmit* px = (SStreamDataSubmit*)pItem; + qDebug("s-task:%s submit enqueue msgLen:%d ver:%" PRId64 ", total in queue:%d, size:%.2fMiB", pTask->id.idStr, + px->submit.msgLen, px->submit.ver, numOfBlocks, size); - if ((pTask->taskLevel == TASK_LEVEL__SOURCE) && - (numOfBlocks > STREAM_TASK_INPUT_QUEUEU_CAPACITY || (size >= STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE))) { + if ((pTask->taskLevel == TASK_LEVEL__SOURCE) && tInputQueueIsFull(pTask)) { qError("s-task:%s input queue is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) abort", pTask->id.idStr, STREAM_TASK_INPUT_QUEUEU_CAPACITY, STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE, numOfBlocks, size); - streamDataSubmitDestroy(pSubmitBlock); return -1; } - taosWriteQitem(pTask->inputQueue->queue, pSubmitBlock); + taosWriteQitem(pTask->inputQueue->queue, pItem); } else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE || type == STREAM_INPUT__REF_DATA_BLOCK) { int32_t numOfBlocks = taosQueueItemSize(pTask->inputQueue->queue) + 1; - double size = taosQueueMemorySize(pTask->inputQueue->queue) / 1048576.0; + double size = QUEUE_MEM_SIZE_IN_MB(pTask->inputQueue->queue); - if ((pTask->taskLevel == TASK_LEVEL__SOURCE) && - (numOfBlocks > STREAM_TASK_INPUT_QUEUEU_CAPACITY || (size >= STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE))) { + if ((pTask->taskLevel == TASK_LEVEL__SOURCE) && (tInputQueueIsFull(pTask))) { qError("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort", pTask->id.idStr, STREAM_TASK_INPUT_QUEUEU_CAPACITY, STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE, numOfBlocks, size); @@ -338,10 +332,6 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) { atomic_val_compare_exchange_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__INACTIVE, TASK_TRIGGER_STATUS__ACTIVE); } -#if 0 - atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__NORMAL); -#endif - return 0; } diff --git a/source/libs/stream/src/streamData.c b/source/libs/stream/src/streamData.c index e574cdbe8a..67177268d3 100644 --- a/source/libs/stream/src/streamData.c +++ b/source/libs/stream/src/streamData.c @@ -67,8 +67,8 @@ int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock return 0; } -SStreamDataSubmit2* streamDataSubmitNew(SPackedData submit, int32_t type) { - SStreamDataSubmit2* pDataSubmit = (SStreamDataSubmit2*)taosAllocateQitem(sizeof(SStreamDataSubmit2), DEF_QITEM, submit.msgLen); +SStreamDataSubmit* streamDataSubmitNew(SPackedData* pData, int32_t type) { + SStreamDataSubmit* pDataSubmit = (SStreamDataSubmit*)taosAllocateQitem(sizeof(SStreamDataSubmit), DEF_QITEM, pData->msgLen); if (pDataSubmit == NULL) { return NULL; } @@ -79,14 +79,14 @@ SStreamDataSubmit2* streamDataSubmitNew(SPackedData submit, int32_t type) { return NULL; } - pDataSubmit->submit = submit; + pDataSubmit->submit = *pData; *pDataSubmit->dataRef = 1; // initialize the reference count to be 1 pDataSubmit->type = type; return pDataSubmit; } -void streamDataSubmitDestroy(SStreamDataSubmit2* pDataSubmit) { +void streamDataSubmitDestroy(SStreamDataSubmit* pDataSubmit) { int32_t ref = atomic_sub_fetch_32(pDataSubmit->dataRef, 1); ASSERT(ref >= 0 && pDataSubmit->type == STREAM_INPUT__DATA_SUBMIT); @@ -96,8 +96,8 @@ void streamDataSubmitDestroy(SStreamDataSubmit2* pDataSubmit) { } } -SStreamMergedSubmit2* streamMergedSubmitNew() { - SStreamMergedSubmit2* pMerged = (SStreamMergedSubmit2*)taosAllocateQitem(sizeof(SStreamMergedSubmit2), DEF_QITEM, 0); +SStreamMergedSubmit* streamMergedSubmitNew() { + SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)taosAllocateQitem(sizeof(SStreamMergedSubmit), DEF_QITEM, 0); if (pMerged == NULL) { return NULL; } @@ -116,30 +116,30 @@ SStreamMergedSubmit2* streamMergedSubmitNew() { return pMerged; } -int32_t streamMergeSubmit(SStreamMergedSubmit2* pMerged, SStreamDataSubmit2* pSubmit) { +int32_t streamMergeSubmit(SStreamMergedSubmit* pMerged, SStreamDataSubmit* pSubmit) { taosArrayPush(pMerged->dataRefs, &pSubmit->dataRef); taosArrayPush(pMerged->submits, &pSubmit->submit); pMerged->ver = pSubmit->ver; return 0; } -static FORCE_INLINE void streamDataSubmitRefInc(SStreamDataSubmit2* pDataSubmit) { +static FORCE_INLINE void streamDataSubmitRefInc(SStreamDataSubmit* pDataSubmit) { atomic_add_fetch_32(pDataSubmit->dataRef, 1); } -SStreamDataSubmit2* streamSubmitBlockClone(SStreamDataSubmit2* pSubmit) { +SStreamDataSubmit* streamSubmitBlockClone(SStreamDataSubmit* pSubmit) { int32_t len = 0; if (pSubmit->type == STREAM_INPUT__DATA_SUBMIT) { len = pSubmit->submit.msgLen; } - SStreamDataSubmit2* pSubmitClone = taosAllocateQitem(sizeof(SStreamDataSubmit2), DEF_QITEM, len); + SStreamDataSubmit* pSubmitClone = taosAllocateQitem(sizeof(SStreamDataSubmit), DEF_QITEM, len); if (pSubmitClone == NULL) { return NULL; } streamDataSubmitRefInc(pSubmit); - memcpy(pSubmitClone, pSubmit, sizeof(SStreamDataSubmit2)); + memcpy(pSubmitClone, pSubmit, sizeof(SStreamDataSubmit)); return pSubmitClone; } @@ -152,17 +152,17 @@ SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* taosFreeQitem(pElem); return dst; } else if (dst->type == STREAM_INPUT__MERGED_SUBMIT && pElem->type == STREAM_INPUT__DATA_SUBMIT) { - SStreamMergedSubmit2* pMerged = (SStreamMergedSubmit2*)dst; - SStreamDataSubmit2* pBlockSrc = (SStreamDataSubmit2*)pElem; + SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)dst; + SStreamDataSubmit* pBlockSrc = (SStreamDataSubmit*)pElem; streamMergeSubmit(pMerged, pBlockSrc); taosFreeQitem(pElem); return dst; } else if (dst->type == STREAM_INPUT__DATA_SUBMIT && pElem->type == STREAM_INPUT__DATA_SUBMIT) { - SStreamMergedSubmit2* pMerged = streamMergedSubmitNew(); + SStreamMergedSubmit* pMerged = streamMergedSubmitNew(); // todo handle error - streamMergeSubmit(pMerged, (SStreamDataSubmit2*)dst); - streamMergeSubmit(pMerged, (SStreamDataSubmit2*)pElem); + streamMergeSubmit(pMerged, (SStreamDataSubmit*)dst); + streamMergeSubmit(pMerged, (SStreamDataSubmit*)pElem); taosFreeQitem(dst); taosFreeQitem(pElem); return (SStreamQueueItem*)pMerged; @@ -180,10 +180,10 @@ void streamFreeQitem(SStreamQueueItem* data) { taosArrayDestroyEx(((SStreamDataBlock*)data)->blocks, (FDelete)blockDataFreeRes); taosFreeQitem(data); } else if (type == STREAM_INPUT__DATA_SUBMIT) { - streamDataSubmitDestroy((SStreamDataSubmit2*)data); + streamDataSubmitDestroy((SStreamDataSubmit*)data); taosFreeQitem(data); } else if (type == STREAM_INPUT__MERGED_SUBMIT) { - SStreamMergedSubmit2* pMerge = (SStreamMergedSubmit2*)data; + SStreamMergedSubmit* pMerge = (SStreamMergedSubmit*)data; int32_t sz = taosArrayGetSize(pMerge->submits); for (int32_t i = 0; i < sz; i++) { int32_t* pRef = taosArrayGetP(pMerge->dataRefs, i); diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 918a203df4..95d9dda057 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -51,7 +51,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* qSetMultiStreamInput(pExecutor, pTrigger->pBlock, 1, STREAM_INPUT__DATA_BLOCK); } else if (pItem->type == STREAM_INPUT__DATA_SUBMIT) { ASSERT(pTask->taskLevel == TASK_LEVEL__SOURCE); - const SStreamDataSubmit2* pSubmit = (const SStreamDataSubmit2*)data; + const SStreamDataSubmit* pSubmit = (const SStreamDataSubmit*)data; qSetMultiStreamInput(pExecutor, &pSubmit->submit, 1, STREAM_INPUT__DATA_SUBMIT); qDebug("s-task:%s set submit blocks as source block completed, %p %p len:%d ver:%" PRId64, pTask->id.idStr, pSubmit, pSubmit->submit.msgStr, pSubmit->submit.msgLen, pSubmit->submit.ver); @@ -63,7 +63,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* qDebug("s-task:%s set sdata blocks as input num:%d, ver:%"PRId64, pTask->id.idStr, numOfBlocks, pBlock->sourceVer); qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__DATA_BLOCK); } else if (pItem->type == STREAM_INPUT__MERGED_SUBMIT) { - const SStreamMergedSubmit2* pMerged = (const SStreamMergedSubmit2*)data; + const SStreamMergedSubmit* pMerged = (const SStreamMergedSubmit*)data; SArray* pBlockList = pMerged->submits; int32_t numOfBlocks = taosArrayGetSize(pBlockList); @@ -367,11 +367,11 @@ int32_t streamExecForAll(SStreamTask* pTask) { qRes->blocks = pRes; if (((SStreamQueueItem*)pInput)->type == STREAM_INPUT__DATA_SUBMIT) { - SStreamDataSubmit2* pSubmit = (SStreamDataSubmit2*)pInput; + SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)pInput; qRes->childId = pTask->selfChildId; qRes->sourceVer = pSubmit->ver; } else if (((SStreamQueueItem*)pInput)->type == STREAM_INPUT__MERGED_SUBMIT) { - SStreamMergedSubmit2* pMerged = (SStreamMergedSubmit2*)pInput; + SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)pInput; qRes->childId = pTask->selfChildId; qRes->sourceVer = pMerged->ver; } diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index 20abcca197..52d6525769 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -105,3 +105,61 @@ SStreamQueueRes streamQueueGetRes(SStreamQueue1* pQueue) { return (SStreamQueueRes){0}; } #endif + +#define MAX_STREAM_EXEC_BATCH_NUM 128 +#define MIN_STREAM_EXEC_BATCH_NUM 16 + +// todo refactor: +// read data from input queue +typedef struct SQueueReader { + SStreamQueue* pQueue; + int32_t taskLevel; + int32_t maxBlocks; // maximum block in one batch + int32_t waitDuration; // maximum wait time to format several block into a batch to process, unit: ms +} SQueueReader; + +SStreamQueueItem* doReadMultiBlocksFromQueue(SQueueReader* pReader, const char* idstr) { + int32_t numOfBlocks = 0; + int32_t tryCount = 0; + SStreamQueueItem* pRet = NULL; + + while (1) { + SStreamQueueItem* qItem = streamQueueNextItem(pReader->pQueue); + if (qItem == NULL) { + if (pReader->taskLevel == TASK_LEVEL__SOURCE && numOfBlocks < MIN_STREAM_EXEC_BATCH_NUM && tryCount < pReader->waitDuration) { + tryCount++; + taosMsleep(1); + qDebug("===stream===try again batchSize:%d", numOfBlocks); + continue; + } + + qDebug("===stream===break batchSize:%d", numOfBlocks); + break; + } + + if (pRet == NULL) { + pRet = qItem; + streamQueueProcessSuccess(pReader->pQueue); + if (pReader->taskLevel == TASK_LEVEL__SINK) { + break; + } + } else { + // todo we need to sort the data block, instead of just appending into the array list. + void* newRet = NULL; + if ((newRet = streamMergeQueueItem(pRet, qItem)) == NULL) { + streamQueueProcessFail(pReader->pQueue); + break; + } else { + numOfBlocks++; + pRet = newRet; + streamQueueProcessSuccess(pReader->pQueue); + if (numOfBlocks > pReader->maxBlocks) { + qDebug("maximum blocks limit:%d reached, processing, %s", pReader->maxBlocks, idstr); + break; + } + } + } + } + + return pRet; +} diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index abd14801a8..fa9bfe6ca5 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -91,7 +91,6 @@ int stateKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) { } SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int32_t szPage, int32_t pages) { - qWarn("open stream state, %s", path); SStreamState* pState = taosMemoryCalloc(1, sizeof(SStreamState)); if (pState == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -112,13 +111,11 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int tstrncpy(statePath, path, 1024); } #ifdef USE_ROCKSDB - qWarn("open stream state1"); int code = streamInitBackend(pState, statePath); if (code == -1) { taosMemoryFree(pState); pState = NULL; } - qWarn("open stream state2, %s", statePath); pState->pTdbState->pOwner = pTask; pState->pFileState = NULL; _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT); diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index f301d9d517..d86dca18f1 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -195,6 +195,7 @@ void tFreeStreamTask(SStreamTask* pTask) { if (pTask->outputType == TASK_OUTPUT__TABLE) { tDeleteSchemaWrapper(pTask->tbSink.pSchemaWrapper); taosMemoryFree(pTask->tbSink.pTSchema); + tSimpleHashCleanup(pTask->tbSink.pTblInfo); } if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { diff --git a/source/libs/wal/src/walMeta.c b/source/libs/wal/src/walMeta.c index cda7e35b0f..a12f8051ba 100644 --- a/source/libs/wal/src/walMeta.c +++ b/source/libs/wal/src/walMeta.c @@ -295,6 +295,36 @@ void walAlignVersions(SWal* pWal) { wInfo("vgId:%d, reset commitVer to %" PRId64, pWal->cfg.vgId, pWal->vers.commitVer); } +int walRepairLogFileTs(SWal* pWal, bool* updateMeta) { + int32_t sz = taosArrayGetSize(pWal->fileInfoSet); + int32_t fileIdx = -1; + int32_t lastCloseTs = 0; + char fnameStr[WAL_FILE_LEN] = {0}; + + while (++fileIdx < sz - 1) { + SWalFileInfo* pFileInfo = taosArrayGet(pWal->fileInfoSet, fileIdx); + if (pFileInfo->closeTs != -1) { + lastCloseTs = pFileInfo->closeTs; + continue; + } + + walBuildLogName(pWal, pFileInfo->firstVer, fnameStr); + int32_t mtime = 0; + if (taosStatFile(fnameStr, NULL, &mtime) < 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + wError("vgId:%d, failed to stat file due to %s, file:%s", pWal->cfg.vgId, strerror(errno), fnameStr); + return -1; + } + + if (updateMeta != NULL) *updateMeta = true; + if (pFileInfo->createTs == -1) pFileInfo->createTs = lastCloseTs; + pFileInfo->closeTs = mtime; + lastCloseTs = pFileInfo->closeTs; + } + + return 0; +} + bool walLogEntriesComplete(const SWal* pWal) { int32_t sz = taosArrayGetSize(pWal->fileInfoSet); bool complete = true; @@ -433,15 +463,8 @@ int walCheckAndRepairMeta(SWal* pWal) { wError("failed to scan wal last ver since %s", terrstr()); return -1; } - // remove the empty wal log, and its idx - wInfo("vgId:%d, wal remove empty file %s", pWal->cfg.vgId, fnameStr); - taosRemoveFile(fnameStr); - walBuildIdxName(pWal, pFileInfo->firstVer, fnameStr); - wInfo("vgId:%d, wal remove empty file %s", pWal->cfg.vgId, fnameStr); - taosRemoveFile(fnameStr); - // remove its meta entry - taosArrayRemove(pWal->fileInfoSet, fileIdx); - continue; + // empty log file + lastVer = pFileInfo->firstVer - 1; } // update lastVer @@ -460,6 +483,11 @@ int walCheckAndRepairMeta(SWal* pWal) { } (void)walAlignVersions(pWal); + // repair ts of files + if (walRepairLogFileTs(pWal, &updateMeta) < 0) { + return -1; + } + // update meta file if (updateMeta) { (void)walSaveMeta(pWal); diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index 206cb75963..fbd4b77a3d 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -74,18 +74,17 @@ int32_t walNextValidMsg(SWalReader *pReader) { int64_t lastVer = walGetLastVer(pReader->pWal); int64_t committedVer = walGetCommittedVer(pReader->pWal); int64_t appliedVer = walGetAppliedVer(pReader->pWal); - while(appliedVer < committedVer){ // wait apply ver equal to commit ver, otherwise may lost data when consume data [TD-24010] - wDebug("vgId:%d, wal apply ver:%"PRId64" smaller than commit ver:%"PRId64", so sleep 1ms", pReader->pWal->cfg.vgId, appliedVer, committedVer); - taosMsleep(1); - appliedVer = walGetAppliedVer(pReader->pWal); + if(appliedVer < committedVer){ // wait apply ver equal to commit ver, otherwise may lost data when consume data [TD-24010] + wDebug("vgId:%d, wal apply ver:%"PRId64" smaller than commit ver:%"PRId64, pReader->pWal->cfg.vgId, appliedVer, committedVer); +// taosMsleep(10); } // int64_t endVer = pReader->cond.scanUncommited ? lastVer : committedVer; -// endVer = TMIN(appliedVer, endVer); + int64_t endVer = TMIN(appliedVer, committedVer); wDebug("vgId:%d, wal start to fetch, index:%" PRId64 ", last index:%" PRId64 " commit index:%" PRId64 - ", applied index:%" PRId64, - pReader->pWal->cfg.vgId, fetchVer, lastVer, committedVer, appliedVer); - while (fetchVer <= committedVer) { + ", applied index:%" PRId64", end index:%" PRId64, + pReader->pWal->cfg.vgId, fetchVer, lastVer, committedVer, appliedVer, endVer); + while (fetchVer <= endVer) { if (walFetchHeadNew(pReader, fetchVer) < 0) { return -1; } diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c index 848de4f36d..9b7b3dfd50 100644 --- a/source/libs/wal/src/walWrite.c +++ b/source/libs/wal/src/walWrite.c @@ -284,15 +284,15 @@ int32_t walEndSnapshot(SWal *pWal) { if (ver == -1) { code = -1; goto END; - }; + } pWal->vers.snapshotVer = ver; int ts = taosGetTimestampSec(); - ver = TMAX(ver - pWal->vers.logRetention, pWal->vers.firstVer - 1); + // compatible mode for refVer bool hasTopic = false; - int64_t refVer = ver; + int64_t refVer = INT64_MAX; void *pIter = NULL; while (1) { pIter = taosHashIterate(pWal->pRefHash, pIter); @@ -300,54 +300,40 @@ int32_t walEndSnapshot(SWal *pWal) { SWalRef *pRef = *(SWalRef **)pIter; if (pRef->refVer == -1) continue; refVer = TMIN(refVer, pRef->refVer - 1); - wDebug("vgId:%d, wal found ref %" PRId64 ", refId %" PRId64, pWal->cfg.vgId, pRef->refVer, pRef->refId); hasTopic = true; } - // compatible mode if (pWal->cfg.retentionPeriod == 0 && hasTopic) { + wInfo("vgId:%d, wal found refVer:%" PRId64 " in compatible mode, ver:%" PRId64, pWal->cfg.vgId, refVer, ver); ver = TMIN(ver, refVer); } + // find files safe to delete int deleteCnt = 0; int64_t newTotSize = pWal->totSize; - SWalFileInfo tmp; + SWalFileInfo tmp = {0}; tmp.firstVer = ver; - // find files safe to delete SWalFileInfo *pInfo = taosArraySearch(pWal->fileInfoSet, &tmp, compareWalFileInfo, TD_LE); + if (pInfo) { - SWalFileInfo *pLastFileInfo = taosArrayGetLast(pWal->fileInfoSet); - wDebug("vgId:%d, wal search found file info: first:%" PRId64 " last:%" PRId64, pWal->cfg.vgId, pInfo->firstVer, - pInfo->lastVer); - if (ver >= pInfo->lastVer) { + wDebug("vgId:%d, wal search found file info. ver:%" PRId64 ", first:%" PRId64 " last:%" PRId64, pWal->cfg.vgId, ver, + pInfo->firstVer, pInfo->lastVer); + ASSERT(ver <= pInfo->lastVer); + if (ver == pInfo->lastVer) { pInfo++; - wDebug("vgId:%d, wal remove advance one file: first:%" PRId64 " last:%" PRId64, pWal->cfg.vgId, pInfo->firstVer, - pInfo->lastVer); - } - if (pInfo <= pLastFileInfo) { - wDebug("vgId:%d, wal end remove for first:%" PRId64 " last:%" PRId64, pWal->cfg.vgId, pInfo->firstVer, - pInfo->lastVer); - } else { - wDebug("vgId:%d, wal no remove", pWal->cfg.vgId); } // iterate files, until the searched result + // delete according to file size or close time for (SWalFileInfo *iter = pWal->fileInfoSet->pData; iter < pInfo; iter++) { - wDebug("vgId:%d, wal check remove file %" PRId64 "(file size %" PRId64 " close ts %" PRId64 - "), new tot size %" PRId64, - pWal->cfg.vgId, iter->firstVer, iter->fileSize, iter->closeTs, newTotSize); - if ((pWal->cfg.retentionSize != -1 && pWal->cfg.retentionSize != 0 && newTotSize > pWal->cfg.retentionSize) || - ((pWal->cfg.retentionPeriod == 0) || (pWal->cfg.retentionPeriod != -1 && iter->closeTs != -1 && - iter->closeTs + pWal->cfg.retentionPeriod < ts))) { - // delete according to file size or close time - wDebug("vgId:%d, check pass", pWal->cfg.vgId); + if ((pWal->cfg.retentionSize > 0 && newTotSize > pWal->cfg.retentionSize) || + (pWal->cfg.retentionPeriod == 0 || + pWal->cfg.retentionPeriod > 0 && iter->closeTs >= 0 && iter->closeTs + pWal->cfg.retentionPeriod < ts)) { deleteCnt++; newTotSize -= iter->fileSize; taosArrayPush(pWal->toDeleteFiles, iter); } - wDebug("vgId:%d, check not pass", pWal->cfg.vgId); } - UPDATE_META: // make new array, remove files taosArrayPopFrontBatch(pWal->fileInfoSet, deleteCnt); if (taosArrayGetSize(pWal->fileInfoSet) == 0) { @@ -357,11 +343,12 @@ int32_t walEndSnapshot(SWal *pWal) { pWal->vers.firstVer = ((SWalFileInfo *)taosArrayGet(pWal->fileInfoSet, 0))->firstVer; } } + + // update meta pWal->writeCur = taosArrayGetSize(pWal->fileInfoSet) - 1; pWal->totSize = newTotSize; pWal->vers.verInSnapshotting = -1; - // save snapshot ver, commit ver code = walSaveMeta(pWal); if (code < 0) { goto END; @@ -369,23 +356,27 @@ int32_t walEndSnapshot(SWal *pWal) { // delete files deleteCnt = taosArrayGetSize(pWal->toDeleteFiles); - wDebug("vgId:%d, wal should delete %d files", pWal->cfg.vgId, deleteCnt); - char fnameStr[WAL_FILE_LEN]; + char fnameStr[WAL_FILE_LEN] = {0}; + pInfo = NULL; + for (int i = 0; i < deleteCnt; i++) { pInfo = taosArrayGet(pWal->toDeleteFiles, i); + walBuildLogName(pWal, pInfo->firstVer, fnameStr); - wDebug("vgId:%d, wal remove file %s", pWal->cfg.vgId, fnameStr); if (taosRemoveFile(fnameStr) < 0 && errno != ENOENT) { wError("vgId:%d, failed to remove log file %s due to %s", pWal->cfg.vgId, fnameStr, strerror(errno)); goto END; } walBuildIdxName(pWal, pInfo->firstVer, fnameStr); - wDebug("vgId:%d, wal remove file %s", pWal->cfg.vgId, fnameStr); if (taosRemoveFile(fnameStr) < 0 && errno != ENOENT) { wError("vgId:%d, failed to remove idx file %s due to %s", pWal->cfg.vgId, fnameStr, strerror(errno)); goto END; } } + if (pInfo != NULL) { + wInfo("vgId:%d, wal log files recycled. count:%d, until ver:%" PRId64 ", closeTs:%" PRId64, pWal->cfg.vgId, + deleteCnt, pInfo->lastVer, pInfo->closeTs); + } taosArrayClear(pWal->toDeleteFiles); END: diff --git a/test1 b/test1 new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tests/pytest/util/cluster.py b/tests/pytest/util/cluster.py index 2607cf63c2..a6e3530dc9 100644 --- a/tests/pytest/util/cluster.py +++ b/tests/pytest/util/cluster.py @@ -52,8 +52,9 @@ class ConfigureyCluster: dnode.addExtraCfg("secondEp", f"{hostname}:{startPort_sec}") # configure dnoe of independent mnodes - if num <= self.mnodeNums and self.mnodeNums != 0 and independentMnode == True : - dnode.addExtraCfg("supportVnodes", 1024) + if num <= self.mnodeNums and self.mnodeNums != 0 and independentMnode == "True" : + tdLog.info("set mnode supportVnodes 0") + dnode.addExtraCfg("supportVnodes", 0) # print(dnode) self.dnodes.append(dnode) return self.dnodes @@ -71,6 +72,7 @@ class ConfigureyCluster: tdSql.init(conn.cursor()) mnodeNums=int(mnodeNums) for i in range(2,mnodeNums+1): + tdLog.info("create mnode on dnode %d"%i) tdSql.execute(" create mnode on dnode %d;"%i) diff --git a/tests/script/tsim/alter/table.sim b/tests/script/tsim/alter/table.sim index ded5d6f78a..5f45b446ca 100644 --- a/tests/script/tsim/alter/table.sim +++ b/tests/script/tsim/alter/table.sim @@ -657,17 +657,34 @@ if $data20 != null then return -1 endi -print =============== error +print =============== error for normal table sql create table tb2023(ts timestamp, f int); sql_error alter table tb2023 add column v varchar(16375); sql_error alter table tb2023 add column v varchar(16385); sql_error alter table tb2023 add column v varchar(33100); sql alter table tb2023 add column v varchar(16374); +sql_error alter table tb2023 modify column v varchar(16375); sql desc tb2023 sql alter table tb2023 drop column v sql_error alter table tb2023 add column v nchar(4094); sql alter table tb2023 add column v nchar(4093); +sql_error alter table tb2023 modify column v nchar(4094); sql desc tb2023 + +print =============== error for super table +sql create table stb2023(ts timestamp, f int) tags(t1 int); +sql_error alter table stb2023 add column v varchar(16375); +sql_error alter table stb2023 add column v varchar(16385); +sql_error alter table stb2023 add column v varchar(33100); +sql alter table stb2023 add column v varchar(16374); +sql_error alter table stb2023 modify column v varchar(16375); +sql desc stb2023 +sql alter table stb2023 drop column v +sql_error alter table stb2023 add column v nchar(4094); +sql alter table stb2023 add column v nchar(4093); +sql_error alter table stb2023 modify column v nchar(4094); +sql desc stb2023 + print ======= over sql drop database d1 sql select * from information_schema.ins_databases diff --git a/tests/script/tsim/parser/alter_column.sim b/tests/script/tsim/parser/alter_column.sim index c70a604c73..d569e47735 100644 --- a/tests/script/tsim/parser/alter_column.sim +++ b/tests/script/tsim/parser/alter_column.sim @@ -48,7 +48,7 @@ sql_error alter table tb modify column c2 binary(10); sql_error alter table tb modify column c2 binary(9); sql_error alter table tb modify column c2 binary(-9); sql_error alter table tb modify column c2 binary(0); -sql alter table tb modify column c2 binary(17000); +sql_error alter table tb modify column c2 binary(17000); sql_error alter table tb modify column c2 nchar(30); sql_error alter table tb modify column c3 double; sql_error alter table tb modify column c3 nchar(10); diff --git a/tests/script/tsim/query/unionall_as_table.sim b/tests/script/tsim/query/unionall_as_table.sim index dc3d2cbec4..4d8f990718 100644 --- a/tests/script/tsim/query/unionall_as_table.sim +++ b/tests/script/tsim/query/unionall_as_table.sim @@ -25,4 +25,21 @@ if $data05 != @0021001@ then return -1 endi +sql create table st (ts timestamp, f int) tags (t int); +sql insert into ct1 using st tags(1) values(now, 1)(now+1s, 2) +sql insert into ct2 using st tags(2) values(now+2s, 3)(now+3s, 4) +sql select count(*) from (select * from ct1 union all select * from ct2) +if $rows != 1 then + return -1 +endi +if $data00 != 4 then + return -1 +endi +sql select count(*) from (select * from ct1 union select * from ct2) +if $rows != 1 then + return -1 +endi +if $data00 != 4 then + return -1 +endi system sh/exec.sh -n dnode1 -s stop -x SIGINT diff --git a/tests/system-test/0-others/user_privilege.py b/tests/system-test/0-others/user_privilege.py index 6d49ebfbfe..d1b93f6942 100644 --- a/tests/system-test/0-others/user_privilege.py +++ b/tests/system-test/0-others/user_privilege.py @@ -29,6 +29,7 @@ class TDTestCase: self.stbname = 'stb' self.binary_length = 20 # the length of binary for column_dict self.nchar_length = 20 # the length of nchar for column_dict + self.dbnames = ['db1', 'db2'] self.column_dict = { 'ts': 'timestamp', 'col1': 'float', @@ -57,21 +58,25 @@ class TDTestCase: def create_user(self): user_name = 'test' tdSql.execute(f'create user {user_name} pass "test"') - tdSql.execute(f'grant read on db.stb with t2 = "Beijing" to {user_name}') + tdSql.execute(f'grant read on {self.dbnames[0]}.{self.stbname} with t2 = "Beijing" to {user_name}') + tdSql.execute(f'grant write on {self.dbnames[1]}.{self.stbname} with t1 = 2 to {user_name}') def prepare_data(self): - tdSql.execute(self.setsql.set_create_stable_sql(self.stbname, self.column_dict, self.tag_dict)) - for i in range(self.tbnum): - tdSql.execute(f'create table {self.stbname}_{i} using {self.stbname} tags({self.tag_list[i]})') - for j in self.values_list: - tdSql.execute(f'insert into {self.stbname}_{i} values({j})') + for db in self.dbnames: + tdSql.execute(f"create database {db}") + tdSql.execute(f"use {db}") + tdSql.execute(self.setsql.set_create_stable_sql(self.stbname, self.column_dict, self.tag_dict)) + for i in range(self.tbnum): + tdSql.execute(f'create table {self.stbname}_{i} using {self.stbname} tags({self.tag_list[i]})') + for j in self.values_list: + tdSql.execute(f'insert into {self.stbname}_{i} values({j})') - def user_privilege_check(self): + def user_read_privilege_check(self, dbname): testconn = taos.connect(user='test', password='test') expectErrNotOccured = False try: - sql = "select count(*) from db.stb where t2 = 'Beijing'" + sql = f"select count(*) from {dbname}.stb where t2 = 'Beijing'" res = testconn.query(sql) data = res.fetch_all() count = data[0][0] @@ -85,11 +90,30 @@ class TDTestCase: tdLog.exit(f"{sql}, expect result doesn't match") pass + def user_write_privilege_check(self, dbname): + testconn = taos.connect(user='test', password='test') + expectErrNotOccured = False + + try: + sql = f"insert into {dbname}.stb_1 values(now, 1.1, 200, 0.3)" + testconn.execute(sql) + except BaseException: + expectErrNotOccured = True + + if expectErrNotOccured: + caller = inspect.getframeinfo(inspect.stack()[1][0]) + tdLog.exit(f"{caller.filename}({caller.lineno}) failed: sql:{sql}, expect error not occured") + else: + pass + def user_privilege_error_check(self): testconn = taos.connect(user='test', password='test') expectErrNotOccured = False - sql_list = ["alter talbe db.stb_1 set t2 = 'Wuhan'", "drop table db.stb_1"] + sql_list = [f"alter talbe {self.dbnames[0]}.stb_1 set t2 = 'Wuhan'", + f"insert into {self.dbnames[0]}.stb_1 values(now, 1.1, 200, 0.3)", + f"drop table {self.dbnames[0]}.stb_1", + f"select count(*) from {self.dbnames[1]}.stb"] for sql in sql_list: try: @@ -104,11 +128,11 @@ class TDTestCase: tdLog.exit(f"{caller.filename}({caller.lineno}) failed: sql:{sql}, expect error not occured") pass - def run(self): - tdSql.prepare() + def run(self): self.prepare_data() self.create_user() - self.user_privilege_check() + self.user_read_privilege_check(self.dbnames[0]) + self.user_write_privilege_check(self.dbnames[1]) self.user_privilege_error_check() def stop(self): diff --git a/tests/system-test/6-cluster/clusterCommonCheck.py b/tests/system-test/6-cluster/clusterCommonCheck.py index 149c6d8ded..f5926321da 100644 --- a/tests/system-test/6-cluster/clusterCommonCheck.py +++ b/tests/system-test/6-cluster/clusterCommonCheck.py @@ -207,7 +207,7 @@ class ClusterComCheck: count+=1 else: tdLog.debug(tdSql.queryResult) - tdLog.exit("stop mnodes on dnode %d failed in 10s ") + tdLog.exit(f"stop mnodes on dnode {offlineDnodeNo} failed in 10s ") def check3mnode2off(self,mnodeNums=3): count=0 @@ -226,7 +226,45 @@ class ClusterComCheck: count+=1 else: tdLog.debug(tdSql.queryResult) - tdLog.exit("stop mnodes on dnode %d failed in 10s ") + tdLog.exit("stop mnodes on dnode 2 or 3 failed in 10s") + + def check_vgroups_status(self,vgroup_numbers=2,db_replica=3,count_number=10,db_name="db"): + """ check vgroups status in 10s after db vgroups status is changed """ + vgroup_numbers = int(vgroup_numbers) + self.db_replica = int(db_replica) + tdLog.debug("start to check status of vgroups") + count=0 + last_number=vgroup_numbers-1 + while count < count_number: + time.sleep(1) + tdSql.query(f"show {db_name}.vgroups;") + if count == 0 : + if tdSql.checkRows(vgroup_numbers) : + tdLog.success(f"{db_name} has {vgroup_numbers} vgroups" ) + else: + tdLog.exit(f"vgroup number of {db_name} is not correct") + if self.db_replica == 1 : + if tdSql.queryResult[0][4] == 'leader' and tdSql.queryResult[1][4] == 'leader' and tdSql.queryResult[last_number][4] == 'leader': + ready_time= (count + 1) + tdLog.success(f"all vgroups of {db_name} are leaders in {count + 1} s") + return True + count+=1 + elif self.db_replica == 3 : + vgroup_status_first=[tdSql.queryResult[0][4],tdSql.queryResult[0][6],tdSql.queryResult[0][8]] + + vgroup_status_last=[tdSql.queryResult[last_number][4],tdSql.queryResult[last_number][6],tdSql.queryResult[last_number][8]] + if vgroup_status_first.count('leader') == 1 and vgroup_status_first.count('follower') == 2: + if vgroup_status_last.count('leader') == 1 and vgroup_status_last.count('follower') == 2: + ready_time= (count + 1) + tdLog.success(f"all vgroups of {db_name} are ready in {ready_time} s") + return True + count+=1 + else: + tdLog.debug(tdSql.queryResult) + tdLog.notice(f"all vgroups leader of {db_name} is selected {count}s ") + caller = inspect.getframeinfo(inspect.stack()[1][0]) + args = (caller.filename, caller.lineno) + tdLog.exit("%s(%d) failed " % args) diff --git a/tests/system-test/6-cluster/manually-test/6dnode3mnodeInsertDataRebootModifyMetaAlterRep1to3.py b/tests/system-test/6-cluster/manually-test/6dnode3mnodeInsertDataRebootModifyMetaAlterRep1to3.py new file mode 100644 index 0000000000..7d46b3143d --- /dev/null +++ b/tests/system-test/6-cluster/manually-test/6dnode3mnodeInsertDataRebootModifyMetaAlterRep1to3.py @@ -0,0 +1,206 @@ +import taos +import sys +import time +import os + +from util.log import * +from util.sql import * +from util.cases import * +from util.dnodes import TDDnodes +from util.dnodes import TDDnode +from util.cluster import * +sys.path.append("./6-cluster") +from clusterCommonCreate import * +from clusterCommonCheck import clusterComCheck + +import time +import socket +import subprocess +from multiprocessing import Process +import threading +import time +import inspect +import ctypes + +class TDTestCase: + + def init(self, conn, logSql, replicaVar=1): + tdLog.debug(f"start to excute {__file__}") + self.TDDnodes = None + tdSql.init(conn.cursor()) + self.host = socket.gethostname() + + + def getBuildPath(self): + selfPath = os.path.dirname(os.path.realpath(__file__)) + + if ("community" in selfPath): + projPath = selfPath[:selfPath.find("community")] + else: + projPath = selfPath[:selfPath.find("tests")] + + for root, dirs, files in os.walk(projPath): + if ("taosd" in files): + rootRealPath = os.path.dirname(os.path.realpath(root)) + if ("packaging" not in rootRealPath): + buildPath = root[:len(root) - len("/build/bin")] + break + return buildPath + + def _async_raise(self, tid, exctype): + """raises the exception, performs cleanup if needed""" + if not inspect.isclass(exctype): + exctype = type(exctype) + res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(exctype)) + if res == 0: + raise ValueError("invalid thread id") + elif res != 1: + # """if it returns a number greater than one, you're in trouble, + # and you should call it again with exc=NULL to revert the effect""" + ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None) + raise SystemError("PyThreadState_SetAsyncExc failed") + + def stopThread(self,thread): + self._async_raise(thread.ident, SystemExit) + + + def fiveDnodeThreeMnode(self,dnodeNumbers,mnodeNums,restartNumbers,stopRole): + tdLog.printNoPrefix("======== test case 1: ") + paraDict = {'dbName': 'db0_0', + 'dropFlag': 1, + 'event': '', + 'vgroups': 4, + 'replica': 1, + 'stbName': 'stb', + 'stbNumbers': 2, + 'colPrefix': 'c', + 'tagPrefix': 't', + 'colSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}], + 'tagSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}], + 'ctbPrefix': 'ctb', + 'ctbNum': 200, + 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 + "rowsPerTbl": 1000, + "batchNum": 5000 + } + + dnodeNumbers=int(dnodeNumbers) + mnodeNums=int(mnodeNums) + vnodeNumbers = int(dnodeNumbers-mnodeNums) + allctbNumbers=(paraDict['stbNumbers']*paraDict["ctbNum"]) + rowsPerStb=paraDict["ctbNum"]*paraDict["rowsPerTbl"] + rowsall=rowsPerStb*paraDict['stbNumbers'] + dbNumbers = 1 + + tdLog.info("first check dnode and mnode") + tdSql.query("select * from information_schema.ins_dnodes;") + tdSql.checkData(0,1,'%s:6030'%self.host) + tdSql.checkData(4,1,'%s:6430'%self.host) + clusterComCheck.checkDnodes(dnodeNumbers) + + #check mnode status + tdLog.info("check mnode status") + clusterComCheck.checkMnodeStatus(mnodeNums) + + # add some error operations and + tdLog.info("Confirm the status of the dnode again") + tdSql.error("create mnode on dnode 2") + tdSql.query("select * from information_schema.ins_dnodes;") + print(tdSql.queryResult) + clusterComCheck.checkDnodes(dnodeNumbers) + + # create database and stable + clusterComCreate.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], paraDict["vgroups"],paraDict['replica']) + tdLog.info("Take turns stopping Mnodes ") + + tdDnodes=cluster.dnodes + stopcount =0 + threads=[] + + # create stable:stb_0 + stableName= paraDict['stbName'] + newTdSql=tdCom.newTdSql() + clusterComCreate.create_stables(newTdSql, paraDict["dbName"],stableName,paraDict['stbNumbers']) + #create child table:ctb_0 + for i in range(paraDict['stbNumbers']): + stableName= '%s_%d'%(paraDict['stbName'],i) + newTdSql=tdCom.newTdSql() + clusterComCreate.create_ctable(newTdSql, paraDict["dbName"],stableName,stableName, paraDict['ctbNum']) + #insert date + for i in range(paraDict['stbNumbers']): + stableName= '%s_%d'%(paraDict['stbName'],i) + newTdSql=tdCom.newTdSql() + threads.append(threading.Thread(target=clusterComCreate.insert_data, args=(newTdSql, paraDict["dbName"],stableName,paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"],paraDict["startTs"]))) + for tr in threads: + tr.start() + for tr in threads: + tr.join() + + while stopcount < restartNumbers: + tdLog.info(" restart loop: %d"%stopcount ) + if stopRole == "mnode": + for i in range(mnodeNums): + tdDnodes[i].stoptaosd() + # sleep(10) + tdDnodes[i].starttaosd() + # sleep(10) + elif stopRole == "vnode": + for i in range(vnodeNumbers): + tdDnodes[i+mnodeNums].stoptaosd() + # sleep(10) + tdDnodes[i+mnodeNums].starttaosd() + # sleep(10) + elif stopRole == "dnode": + for i in range(dnodeNumbers): + if i == 0 : + stableName= '%s_%d'%(paraDict['stbName'],0) + newTdSql=tdCom.newTdSql() + # newTdSql.execute('alter database db0_0 replica 3') + clusterComCreate.alterStbMetaData(newTdSql, paraDict["dbName"],stableName,paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"]) + tdDnodes[i].stoptaosd() + clusterComCheck.checkDbRows(dbNumbers) + # sleep(10) + tdDnodes[i].starttaosd() + if i == 3 : + TdSqlEx=tdCom.newTdSql() + tdLog.info("alter database db0_0 replica 3") + TdSqlEx.execute('alter database db0_0 replica 3') + + + # dnodeNumbers don't include database of schema + if clusterComCheck.checkDnodes(dnodeNumbers): + tdLog.info("123") + else: + print("456") + + self.stopThread(threads) + tdLog.exit("one or more of dnodes failed to start ") + # self.check3mnode() + stopcount+=1 + + + clusterComCheck.checkDnodes(dnodeNumbers) + clusterComCheck.checkDbRows(dbNumbers) + # clusterComCheck.checkDb(dbNumbers,1,paraDict["dbName"]) + + # tdSql.execute("use %s" %(paraDict["dbName"])) + tdSql.query("show %s.stables"%(paraDict["dbName"])) + tdSql.checkRows(paraDict["stbNumbers"]) + for i in range(paraDict['stbNumbers']): + stableName= '%s.%s_%d'%(paraDict["dbName"],paraDict['stbName'],i) + tdSql.query("select count(*) from %s"%stableName) + if i == 0 : + tdSql.checkData(0,0,rowsPerStb*2) + else: + tdSql.checkData(0,0,rowsPerStb) + clusterComCheck.check_vgroups_status(vgroup_numbers=paraDict["vgroups"],db_replica=3,db_name=paraDict["dbName"],count_number=150) + def run(self): + # print(self.master_dnode.cfgDict) + self.fiveDnodeThreeMnode(dnodeNumbers=6,mnodeNums=3,restartNumbers=1,stopRole='dnode') + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) diff --git a/tests/system-test/6-cluster/manually-test/6dnode3mnodeInsertDataRebootModifyMetaAlterRep3to1.py b/tests/system-test/6-cluster/manually-test/6dnode3mnodeInsertDataRebootModifyMetaAlterRep3to1.py new file mode 100644 index 0000000000..5b5fb04969 --- /dev/null +++ b/tests/system-test/6-cluster/manually-test/6dnode3mnodeInsertDataRebootModifyMetaAlterRep3to1.py @@ -0,0 +1,206 @@ +import taos +import sys +import time +import os + +from util.log import * +from util.sql import * +from util.cases import * +from util.dnodes import TDDnodes +from util.dnodes import TDDnode +from util.cluster import * +sys.path.append("./6-cluster") +from clusterCommonCreate import * +from clusterCommonCheck import clusterComCheck + +import time +import socket +import subprocess +from multiprocessing import Process +import threading +import time +import inspect +import ctypes + +class TDTestCase: + + def init(self, conn, logSql, replicaVar=1): + tdLog.debug(f"start to excute {__file__}") + self.TDDnodes = None + tdSql.init(conn.cursor()) + self.host = socket.gethostname() + + + def getBuildPath(self): + selfPath = os.path.dirname(os.path.realpath(__file__)) + + if ("community" in selfPath): + projPath = selfPath[:selfPath.find("community")] + else: + projPath = selfPath[:selfPath.find("tests")] + + for root, dirs, files in os.walk(projPath): + if ("taosd" in files): + rootRealPath = os.path.dirname(os.path.realpath(root)) + if ("packaging" not in rootRealPath): + buildPath = root[:len(root) - len("/build/bin")] + break + return buildPath + + def _async_raise(self, tid, exctype): + """raises the exception, performs cleanup if needed""" + if not inspect.isclass(exctype): + exctype = type(exctype) + res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(exctype)) + if res == 0: + raise ValueError("invalid thread id") + elif res != 1: + # """if it returns a number greater than one, you're in trouble, + # and you should call it again with exc=NULL to revert the effect""" + ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None) + raise SystemError("PyThreadState_SetAsyncExc failed") + + def stopThread(self,thread): + self._async_raise(thread.ident, SystemExit) + + + def fiveDnodeThreeMnode(self,dnodeNumbers,mnodeNums,restartNumbers,stopRole): + tdLog.printNoPrefix("======== test case 1: ") + paraDict = {'dbName': 'db0_0', + 'dropFlag': 1, + 'event': '', + 'vgroups': 4, + 'replica': 3, + 'stbName': 'stb', + 'stbNumbers': 2, + 'colPrefix': 'c', + 'tagPrefix': 't', + 'colSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}], + 'tagSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}], + 'ctbPrefix': 'ctb', + 'ctbNum': 200, + 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 + "rowsPerTbl": 1000, + "batchNum": 5000 + } + + dnodeNumbers=int(dnodeNumbers) + mnodeNums=int(mnodeNums) + vnodeNumbers = int(dnodeNumbers-mnodeNums) + allctbNumbers=(paraDict['stbNumbers']*paraDict["ctbNum"]) + rowsPerStb=paraDict["ctbNum"]*paraDict["rowsPerTbl"] + rowsall=rowsPerStb*paraDict['stbNumbers'] + dbNumbers = 1 + + tdLog.info("first check dnode and mnode") + tdSql.query("select * from information_schema.ins_dnodes;") + tdSql.checkData(0,1,'%s:6030'%self.host) + tdSql.checkData(4,1,'%s:6430'%self.host) + clusterComCheck.checkDnodes(dnodeNumbers) + + #check mnode status + tdLog.info("check mnode status") + clusterComCheck.checkMnodeStatus(mnodeNums) + + # add some error operations and + tdLog.info("Confirm the status of the dnode again") + tdSql.error("create mnode on dnode 2") + tdSql.query("select * from information_schema.ins_dnodes;") + print(tdSql.queryResult) + clusterComCheck.checkDnodes(dnodeNumbers) + + # create database and stable + clusterComCreate.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], paraDict["vgroups"],paraDict['replica']) + tdLog.info("Take turns stopping Mnodes ") + + tdDnodes=cluster.dnodes + stopcount =0 + threads=[] + + # create stable:stb_0 + stableName= paraDict['stbName'] + newTdSql=tdCom.newTdSql() + clusterComCreate.create_stables(newTdSql, paraDict["dbName"],stableName,paraDict['stbNumbers']) + #create child table:ctb_0 + for i in range(paraDict['stbNumbers']): + stableName= '%s_%d'%(paraDict['stbName'],i) + newTdSql=tdCom.newTdSql() + clusterComCreate.create_ctable(newTdSql, paraDict["dbName"],stableName,stableName, paraDict['ctbNum']) + #insert date + for i in range(paraDict['stbNumbers']): + stableName= '%s_%d'%(paraDict['stbName'],i) + newTdSql=tdCom.newTdSql() + threads.append(threading.Thread(target=clusterComCreate.insert_data, args=(newTdSql, paraDict["dbName"],stableName,paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"],paraDict["startTs"]))) + for tr in threads: + tr.start() + for tr in threads: + tr.join() + + while stopcount < restartNumbers: + tdLog.info(" restart loop: %d"%stopcount ) + if stopRole == "mnode": + for i in range(mnodeNums): + tdDnodes[i].stoptaosd() + # sleep(10) + tdDnodes[i].starttaosd() + # sleep(10) + elif stopRole == "vnode": + for i in range(vnodeNumbers): + tdDnodes[i+mnodeNums].stoptaosd() + # sleep(10) + tdDnodes[i+mnodeNums].starttaosd() + # sleep(10) + elif stopRole == "dnode": + for i in range(dnodeNumbers): + tdDnodes[i].stoptaosd() + clusterComCheck.checkDbRows(dbNumbers) + if i == 0 : + stableName= '%s_%d'%(paraDict['stbName'],0) + newTdSql=tdCom.newTdSql() + # newTdSql.execute('alter database db0_0 replica 3') + clusterComCreate.alterStbMetaData(newTdSql, paraDict["dbName"],stableName,paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"]) + # sleep(10) + tdDnodes[i].starttaosd() + if i == 3 : + TdSqlEx=tdCom.newTdSql() + tdLog.info("alter database db0_0 replica 1") + TdSqlEx.execute('alter database db0_0 replica 1') + + + # dnodeNumbers don't include database of schema + if clusterComCheck.checkDnodes(dnodeNumbers): + tdLog.info("123") + else: + print("456") + + self.stopThread(threads) + tdLog.exit("one or more of dnodes failed to start ") + # self.check3mnode() + stopcount+=1 + + + clusterComCheck.checkDnodes(dnodeNumbers) + clusterComCheck.checkDbRows(dbNumbers) + # clusterComCheck.checkDb(dbNumbers,1,paraDict["dbName"]) + + # tdSql.execute("use %s" %(paraDict["dbName"])) + tdSql.query("show %s.stables"%(paraDict["dbName"])) + tdSql.checkRows(paraDict["stbNumbers"]) + for i in range(paraDict['stbNumbers']): + stableName= '%s.%s_%d'%(paraDict["dbName"],paraDict['stbName'],i) + tdSql.query("select count(*) from %s"%stableName) + if i == 0 : + tdSql.checkData(0,0,rowsPerStb*2) + else: + tdSql.checkData(0,0,rowsPerStb) + clusterComCheck.check_vgroups_status(vgroup_numbers=paraDict["vgroups"],db_replica=1,db_name=paraDict["dbName"],count_number=150) + def run(self): + # print(self.master_dnode.cfgDict) + self.fiveDnodeThreeMnode(dnodeNumbers=6,mnodeNums=3,restartNumbers=1,stopRole='dnode') + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) diff --git a/tests/system-test/6-cluster/manually-test/6dnode3mnodeInsertDatarRebootAlterRep1-3.py b/tests/system-test/6-cluster/manually-test/6dnode3mnodeInsertDatarRebootAlterRep1-3.py new file mode 100644 index 0000000000..aa3ed8e3fd --- /dev/null +++ b/tests/system-test/6-cluster/manually-test/6dnode3mnodeInsertDatarRebootAlterRep1-3.py @@ -0,0 +1,222 @@ +import taos +import sys +import time +import os + +from util.log import * +from util.sql import * +from util.cases import * +from util.dnodes import TDDnodes +from util.dnodes import TDDnode +from util.cluster import * +sys.path.append("./6-cluster") +from clusterCommonCreate import * +from clusterCommonCheck import clusterComCheck + +import time +import socket +import subprocess +from multiprocessing import Process +import threading +import time +import inspect +import ctypes + +class TDTestCase: + + def init(self, conn, logSql, replicaVar=1): + tdLog.debug(f"start to excute {__file__}") + self.TDDnodes = None + tdSql.init(conn.cursor()) + self.host = socket.gethostname() + + + def getBuildPath(self): + selfPath = os.path.dirname(os.path.realpath(__file__)) + + if ("community" in selfPath): + projPath = selfPath[:selfPath.find("community")] + else: + projPath = selfPath[:selfPath.find("tests")] + + for root, dirs, files in os.walk(projPath): + if ("taosd" in files): + rootRealPath = os.path.dirname(os.path.realpath(root)) + if ("packaging" not in rootRealPath): + buildPath = root[:len(root) - len("/build/bin")] + break + return buildPath + + def _async_raise(self, tid, exctype): + """raises the exception, performs cleanup if needed""" + if not inspect.isclass(exctype): + exctype = type(exctype) + res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(exctype)) + if res == 0: + raise ValueError("invalid thread id") + elif res != 1: + # """if it returns a number greater than one, you're in trouble, + # and you should call it again with exc=NULL to revert the effect""" + ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None) + raise SystemError("PyThreadState_SetAsyncExc failed") + + def stopThread(self,thread): + self._async_raise(thread.ident, SystemExit) + + + def insertData(self,countstart,countstop): + # fisrt add data : db\stable\childtable\general table + + for couti in range(countstart,countstop): + tdLog.debug("drop database if exists db%d" %couti) + tdSql.execute("drop database if exists db%d" %couti) + print("create database if not exists db%d replica 1 duration 300" %couti) + tdSql.execute("create database if not exists db%d replica 1 duration 300" %couti) + tdSql.execute("use db%d" %couti) + tdSql.execute( + '''create table stb1 + (ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp) + tags (t1 int) + ''' + ) + tdSql.execute( + ''' + create table t1 + (ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp) + ''' + ) + for i in range(4): + tdSql.execute(f'create table ct{i+1} using stb1 tags ( {i+1} )') + + + def fiveDnodeThreeMnode(self,dnodeNumbers,mnodeNums,restartNumbers,stopRole): + tdLog.printNoPrefix("======== test case 1: ") + paraDict = {'dbName': 'db0_0', + 'dropFlag': 1, + 'event': '', + 'vgroups': 4, + 'replica': 1, + 'stbName': 'stb', + 'stbNumbers': 2, + 'colPrefix': 'c', + 'tagPrefix': 't', + 'colSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}], + 'tagSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}], + 'ctbPrefix': 'ctb', + 'ctbNum': 1000, + 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 + "rowsPerTbl": 100, + "batchNum": 5000 + } + + dnodeNumbers = int(dnodeNumbers) + mnodeNums = int(mnodeNums) + vnodeNumbers = int(dnodeNumbers-mnodeNums) + allctbNumbers = (paraDict['stbNumbers']*paraDict["ctbNum"]) + rowsPerStb = paraDict["ctbNum"]*paraDict["rowsPerTbl"] + rowsall = rowsPerStb*paraDict['stbNumbers'] + dbNumbers = 1 + replica3 = 3 + tdLog.info("first check dnode and mnode") + tdSql.query("select * from information_schema.ins_dnodes;") + tdSql.checkData(0,1,'%s:6030'%self.host) + tdSql.checkData(4,1,'%s:6430'%self.host) + clusterComCheck.checkDnodes(dnodeNumbers) + + #check mnode status + tdLog.info("check mnode status") + clusterComCheck.checkMnodeStatus(mnodeNums) + + # add some error operations and + tdLog.info("Confirm the status of the dnode again") + tdSql.error("create mnode on dnode 2") + tdSql.query("select * from information_schema.ins_dnodes;") + print(tdSql.queryResult) + clusterComCheck.checkDnodes(dnodeNumbers) + + # create database and stable + clusterComCreate.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], paraDict["vgroups"],paraDict['replica']) + tdLog.info("Take turns stopping Mnodes ") + + tdDnodes=cluster.dnodes + stopcount =0 + threads=[] + + # create stable:stb_0 + stableName= paraDict['stbName'] + newTdSql=tdCom.newTdSql() + clusterComCreate.create_stables(newTdSql, paraDict["dbName"],stableName,paraDict['stbNumbers']) + #create child table:ctb_0 + for i in range(paraDict['stbNumbers']): + stableName= '%s_%d'%(paraDict['stbName'],i) + newTdSql=tdCom.newTdSql() + clusterComCreate.create_ctable(newTdSql, paraDict["dbName"],stableName,stableName, paraDict['ctbNum']) + #insert date + for i in range(paraDict['stbNumbers']): + stableName= '%s_%d'%(paraDict['stbName'],i) + newTdSql=tdCom.newTdSql() + threads.append(threading.Thread(target=clusterComCreate.insert_data, args=(newTdSql, paraDict["dbName"],stableName,paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"],paraDict["startTs"]))) + for tr in threads: + tr.start() + TdSqlEx=tdCom.newTdSql() + tdLog.info("alter database db0_0 replica 3") + TdSqlEx.execute('alter database db0_0 replica 3') + while stopcount < restartNumbers: + tdLog.info(" restart loop: %d"%stopcount ) + if stopRole == "mnode": + for i in range(mnodeNums): + tdDnodes[i].stoptaosd() + # sleep(10) + tdDnodes[i].starttaosd() + # sleep(10) + elif stopRole == "vnode": + for i in range(vnodeNumbers): + tdDnodes[i+mnodeNums].stoptaosd() + # sleep(10) + tdDnodes[i+mnodeNums].starttaosd() + # sleep(10) + elif stopRole == "dnode": + for i in range(dnodeNumbers): + tdDnodes[i].stoptaosd() + # tdLog.info('select cast(c2 as nchar(10)) from db0_0.stb_1;') + # TdSqlEx.execute('select cast(c2 as nchar(10)) from db0_0.stb_1;') + # tdLog.info('select avg(c1) from db0_0.stb_0 interval(10s);') + # TdSqlEx.execute('select avg(c1) from db0_0.stb_0 interval(10s);') + # sleep(10) + tdDnodes[i].starttaosd() + # sleep(10) + # dnodeNumbers don't include database of schema + if clusterComCheck.checkDnodes(dnodeNumbers): + tdLog.info("123") + else: + print("456") + + self.stopThread(threads) + tdLog.exit("one or more of dnodes failed to start ") + # self.check3mnode() + stopcount+=1 + + for tr in threads: + tr.join() + clusterComCheck.checkDnodes(dnodeNumbers) + clusterComCheck.checkDbRows(dbNumbers) + # clusterComCheck.checkDb(dbNumbers,1,paraDict["dbName"]) + + # tdSql.execute("use %s" %(paraDict["dbName"])) + tdSql.query("show %s.stables"%(paraDict["dbName"])) + tdSql.checkRows(paraDict["stbNumbers"]) + # for i in range(paraDict['stbNumbers']): + # stableName= '%s.%s_%d'%(paraDict["dbName"],paraDict['stbName'],i) + # tdSql.query("select count(*) from %s"%stableName) + # tdSql.checkData(0,0,rowsPerStb) + clusterComCheck.check_vgroups_status(vgroup_numbers=paraDict["vgroups"],db_replica=replica3,db_name=paraDict["dbName"],count_number=240) + def run(self): + # print(self.master_dnode.cfgDict) + self.fiveDnodeThreeMnode(dnodeNumbers=6,mnodeNums=3,restartNumbers=4,stopRole='dnode') + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) diff --git a/tests/system-test/6-cluster/manually-test/6dnode3mnodeInsertLessDataAlterRep3to1to3.py b/tests/system-test/6-cluster/manually-test/6dnode3mnodeInsertLessDataAlterRep3to1to3.py new file mode 100644 index 0000000000..ed7b99a880 --- /dev/null +++ b/tests/system-test/6-cluster/manually-test/6dnode3mnodeInsertLessDataAlterRep3to1to3.py @@ -0,0 +1,196 @@ +import taos +import sys +import time +import os + +from util.log import * +from util.sql import * +from util.cases import * +from util.dnodes import TDDnodes +from util.dnodes import TDDnode +from util.cluster import * +sys.path.append("./6-cluster") +from clusterCommonCreate import * +from clusterCommonCheck import clusterComCheck + +import time +import socket +import subprocess +from multiprocessing import Process +import threading +import time +import inspect +import ctypes + +class TDTestCase: + + def init(self, conn, logSql, replicaVar=1): + tdLog.debug(f"start to excute {__file__}") + self.TDDnodes = None + tdSql.init(conn.cursor()) + self.host = socket.gethostname() + + + def getBuildPath(self): + selfPath = os.path.dirname(os.path.realpath(__file__)) + + if ("community" in selfPath): + projPath = selfPath[:selfPath.find("community")] + else: + projPath = selfPath[:selfPath.find("tests")] + + for root, dirs, files in os.walk(projPath): + if ("taosd" in files): + rootRealPath = os.path.dirname(os.path.realpath(root)) + if ("packaging" not in rootRealPath): + buildPath = root[:len(root) - len("/build/bin")] + break + return buildPath + + def _async_raise(self, tid, exctype): + """raises the exception, performs cleanup if needed""" + if not inspect.isclass(exctype): + exctype = type(exctype) + res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(exctype)) + if res == 0: + raise ValueError("invalid thread id") + elif res != 1: + # """if it returns a number greater than one, you're in trouble, + # and you should call it again with exc=NULL to revert the effect""" + ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None) + raise SystemError("PyThreadState_SetAsyncExc failed") + + def stopThread(self,thread): + self._async_raise(thread.ident, SystemExit) + + + def insertData(self,countstart,countstop): + # fisrt add data : db\stable\childtable\general table + + for couti in range(countstart,countstop): + tdLog.debug("drop database if exists db%d" %couti) + tdSql.execute("drop database if exists db%d" %couti) + print("create database if not exists db%d replica 1 duration 300" %couti) + tdSql.execute("create database if not exists db%d replica 1 duration 300" %couti) + tdSql.execute("use db%d" %couti) + tdSql.execute( + '''create table stb1 + (ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp) + tags (t1 int) + ''' + ) + tdSql.execute( + ''' + create table t1 + (ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp) + ''' + ) + for i in range(4): + tdSql.execute(f'create table ct{i+1} using stb1 tags ( {i+1} )') + + + def fiveDnodeThreeMnode(self,dnodeNumbers,mnodeNums,restartNumbers,stopRole): + tdLog.printNoPrefix("======== test case 1: ") + paraDict = {'dbName': 'db0_0', + 'dropFlag': 1, + 'event': '', + 'vgroups': 4, + 'replica': 3, + 'stbName': 'stb', + 'stbNumbers': 2, + 'colPrefix': 'c', + 'tagPrefix': 't', + 'colSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}], + 'tagSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}], + 'ctbPrefix': 'ctb', + 'ctbNum': 1, + 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 + "rowsPerTbl": 1, + "batchNum": 5000 + } + + dnodeNumbers=int(dnodeNumbers) + mnodeNums=int(mnodeNums) + vnodeNumbers = int(dnodeNumbers-mnodeNums) + replica1 = 1 + replica3 = 3 + allctbNumbers=(paraDict['stbNumbers']*paraDict["ctbNum"]) + rowsPerStb=paraDict["ctbNum"]*paraDict["rowsPerTbl"] + rowsall=rowsPerStb*paraDict['stbNumbers'] + dbNumbers = 1 + + tdLog.info("first check dnode and mnode") + tdSql.query("select * from information_schema.ins_dnodes;") + tdSql.checkData(0,1,'%s:6030'%self.host) + tdSql.checkData(4,1,'%s:6430'%self.host) + clusterComCheck.checkDnodes(dnodeNumbers) + + #check mnode status + tdLog.info("check mnode status") + clusterComCheck.checkMnodeStatus(mnodeNums) + + # add some error operations and + tdLog.info("Confirm the status of the dnode again") + tdSql.error("create mnode on dnode 2") + tdSql.query("select * from information_schema.ins_dnodes;") + print(tdSql.queryResult) + clusterComCheck.checkDnodes(dnodeNumbers) + + # create database and stable + clusterComCreate.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], paraDict["vgroups"],paraDict['replica']) + tdLog.info("Take turns stopping Mnodes ") + + tdDnodes=cluster.dnodes + stopcount =0 + threads=[] + + # create stable:stb_0 + stableName= paraDict['stbName'] + newTdSql=tdCom.newTdSql() + clusterComCreate.create_stables(newTdSql, paraDict["dbName"],stableName,paraDict['stbNumbers']) + #create child table:ctb_0 + for i in range(paraDict['stbNumbers']): + stableName= '%s_%d'%(paraDict['stbName'],i) + newTdSql=tdCom.newTdSql() + clusterComCreate.create_ctable(newTdSql, paraDict["dbName"],stableName,stableName, paraDict['ctbNum']) + #insert date + for i in range(paraDict['stbNumbers']): + stableName= '%s_%d'%(paraDict['stbName'],i) + newTdSql=tdCom.newTdSql() + threads.append(threading.Thread(target=clusterComCreate.insert_data, args=(newTdSql, paraDict["dbName"],stableName,paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"],paraDict["startTs"]))) + for tr in threads: + tr.start() + TdSqlEx=tdCom.newTdSql() + tdLog.info(f"alter database db0_0 replica {replica1}") + TdSqlEx.execute(f'alter database db0_0 replica {replica1}') + for tr in threads: + tr.join() + clusterComCheck.checkDnodes(dnodeNumbers) + clusterComCheck.checkDbRows(dbNumbers) + # clusterComCheck.checkDb(dbNumbers,1,paraDict["dbName"]) + + # tdSql.execute("use %s" %(paraDict["dbName"])) + tdSql.query("show %s.stables"%(paraDict["dbName"])) + tdSql.checkRows(paraDict["stbNumbers"]) + for i in range(paraDict['stbNumbers']): + stableName= '%s.%s_%d'%(paraDict["dbName"],paraDict['stbName'],i) + tdSql.query("select count(*) from %s"%stableName) + tdSql.checkData(0,0,rowsPerStb) + + clusterComCheck.check_vgroups_status(vgroup_numbers=paraDict["vgroups"],db_replica=replica1,db_name=paraDict["dbName"],count_number=20) + sleep(5) + tdLog.info(f"show transactions;alter database db0_0 replica {replica3};") + TdSqlEx.execute(f'show transactions;') + TdSqlEx.execute(f'alter database db0_0 replica {replica3};') + clusterComCheck.check_vgroups_status(vgroup_numbers=paraDict["vgroups"],db_replica=replica3,db_name=paraDict["dbName"],count_number=120) + + def run(self): + # print(self.master_dnode.cfgDict) + self.fiveDnodeThreeMnode(dnodeNumbers=6,mnodeNums=3,restartNumbers=4,stopRole='dnode') + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) diff --git a/tests/system-test/6-cluster/manually-test/6dnode3mnodeStopDnodeInsertDatatb.py b/tests/system-test/6-cluster/manually-test/6dnode3mnodeStopDnodeInsertDatatb.py new file mode 100644 index 0000000000..e02af29a05 --- /dev/null +++ b/tests/system-test/6-cluster/manually-test/6dnode3mnodeStopDnodeInsertDatatb.py @@ -0,0 +1,191 @@ +import taos +import sys +import time +import os + +from util.log import * +from util.sql import * +from util.cases import * +from util.dnodes import TDDnodes +from util.dnodes import TDDnode +from util.cluster import * +sys.path.append("./6-cluster") +from clusterCommonCreate import * +from clusterCommonCheck import clusterComCheck + +import time +import socket +import subprocess +from multiprocessing import Process +import threading +import time +import inspect +import ctypes + +class TDTestCase: + + def init(self, conn, logSql, replicaVar=1): + tdLog.debug(f"start to excute {__file__}") + self.TDDnodes = None + tdSql.init(conn.cursor()) + self.host = socket.gethostname() + + + def getBuildPath(self): + selfPath = os.path.dirname(os.path.realpath(__file__)) + + if ("community" in selfPath): + projPath = selfPath[:selfPath.find("community")] + else: + projPath = selfPath[:selfPath.find("tests")] + + for root, dirs, files in os.walk(projPath): + if ("taosd" in files): + rootRealPath = os.path.dirname(os.path.realpath(root)) + if ("packaging" not in rootRealPath): + buildPath = root[:len(root) - len("/build/bin")] + break + return buildPath + + def _async_raise(self, tid, exctype): + """raises the exception, performs cleanup if needed""" + if not inspect.isclass(exctype): + exctype = type(exctype) + res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(exctype)) + if res == 0: + raise ValueError("invalid thread id") + elif res != 1: + # """if it returns a number greater than one, you're in trouble, + # and you should call it again with exc=NULL to revert the effect""" + ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None) + raise SystemError("PyThreadState_SetAsyncExc failed") + + def stopThread(self,thread): + self._async_raise(thread.ident, SystemExit) + + + def insertData(self,dbname,tableCount,rowsPerCount): + # tableCount : create table number + # rowsPerCount : rows per table + # fisrt add data : db\stable\childtable\general table + os.system(f"taosBenchmark -d {dbname} -n {tableCount} -t {rowsPerCount} -z 1 -k 10000 -y ") + + + def fiveDnodeThreeMnode(self,dnodeNumbers,mnodeNums,restartNumbers,stopRole): + tdLog.printNoPrefix("======== test case 1: ") + paraDict = {'dbName': 'db0_0', + 'dropFlag': 1, + 'event': '', + 'vgroups': 4, + 'replica': 1, + 'stbName': 'stb', + 'stbNumbers': 2, + 'colPrefix': 'c', + 'tagPrefix': 't', + 'colSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}], + 'tagSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}], + 'ctbPrefix': 'ctb', + 'ctbNum': 10000, + 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 + "rowsPerTbl": 10000, + "batchNum": 5000 + } + + dnodeNumbers=int(dnodeNumbers) + mnodeNums=int(mnodeNums) + vnodeNumbers = int(dnodeNumbers-mnodeNums) + allctbNumbers=(paraDict['stbNumbers']*paraDict["ctbNum"]) + rowsPerStb=paraDict["ctbNum"]*paraDict["rowsPerTbl"] + rowsall=rowsPerStb*paraDict['stbNumbers'] + dbNumbers = 1 + + tdLog.info("first check dnode and mnode") + tdSql.query("select * from information_schema.ins_dnodes;") + tdSql.checkData(0,1,'%s:6030'%self.host) + tdSql.checkData(4,1,'%s:6430'%self.host) + clusterComCheck.checkDnodes(dnodeNumbers) + + #check mnode status + tdLog.info("check mnode status") + clusterComCheck.checkMnodeStatus(mnodeNums) + + # add some error operations and + tdLog.info("Confirm the status of the dnode again") + tdSql.error("create mnode on dnode 2") + tdSql.query("select * from information_schema.ins_dnodes;") + print(tdSql.queryResult) + clusterComCheck.checkDnodes(dnodeNumbers) + + # create database and stable + tdLog.info("Take turns stopping Mnodes ") + + tdDnodes=cluster.dnodes + stopcount =0 + threads=[] + + # create stable:stb_0 + threads.append(threading.Thread(target=self.insertData, args=(paraDict["dbName"],paraDict["ctbNum"],paraDict["rowsPerTbl"]))) + for tr in threads: + tr.start() + TdSqlEx=tdCom.newTdSql() + tdLog.info("alter database db0_0 replica 3") + TdSqlEx.execute('alter database db0_0 replica 3') + while stopcount < restartNumbers: + tdLog.info(" restart loop: %d"%stopcount ) + if stopRole == "mnode": + for i in range(mnodeNums): + tdDnodes[i].stoptaosd() + # sleep(10) + tdDnodes[i].starttaosd() + # sleep(10) + elif stopRole == "vnode": + for i in range(vnodeNumbers): + tdDnodes[i+mnodeNums].stoptaosd() + # sleep(10) + tdDnodes[i+mnodeNums].starttaosd() + # sleep(10) + elif stopRole == "dnode": + for i in range(dnodeNumbers): + tdDnodes[i].stoptaosd() + # tdLog.info('select cast(c2 as nchar(10)) from db0_0.stb_1;') + # TdSqlEx.execute('select cast(c2 as nchar(10)) from db0_0.stb_1;') + # tdLog.info('select avg(c1) from db0_0.stb_0 interval(10s);') + # TdSqlEx.execute('select avg(c1) from db0_0.stb_0 interval(10s);') + # sleep(10) + tdDnodes[i].starttaosd() + # sleep(10) + # dnodeNumbers don't include database of schema + if clusterComCheck.checkDnodes(dnodeNumbers): + tdLog.info("123") + else: + print("456") + + self.stopThread(threads) + tdLog.exit("one or more of dnodes failed to start ") + # self.check3mnode() + stopcount+=1 + + for tr in threads: + tr.join() + clusterComCheck.checkDnodes(dnodeNumbers) + clusterComCheck.checkDbRows(dbNumbers) + # clusterComCheck.checkDb(dbNumbers,1,paraDict["dbName"]) + + # tdSql.execute("use %s" %(paraDict["dbName"])) + tdSql.query("show %s.stables"%(paraDict["dbName"])) + tdSql.checkRows(paraDict["stbNumbers"]) + for i in range(paraDict['stbNumbers']): + stableName= '%s.%s_%d'%(paraDict["dbName"],paraDict['stbName'],i) + tdSql.query("select count(*) from %s"%stableName) + tdSql.checkData(0,0,rowsPerStb) + clusterComCheck.check_vgroups_status(vgroup_numbers=paraDict["vgroups"],db_replica=3,db_name=paraDict["dbName"],count_number=240) + def run(self): + # print(self.master_dnode.cfgDict) + self.fiveDnodeThreeMnode(dnodeNumbers=6,mnodeNums=3,restartNumbers=4,stopRole='dnode') + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) diff --git a/tests/system-test/7-tmq/subscribeDb3.py b/tests/system-test/7-tmq/subscribeDb3.py index 5b5326cfba..b66334a6a6 100644 --- a/tests/system-test/7-tmq/subscribeDb3.py +++ b/tests/system-test/7-tmq/subscribeDb3.py @@ -336,7 +336,7 @@ class TDTestCase: for i in range(expectRows): totalConsumeRows += resultList[i] - if totalConsumeRows >= expectrowcnt or totalConsumeRows <= 0: + if totalConsumeRows > expectrowcnt or totalConsumeRows <= 0: tdLog.info("act consume rows: %d, expect consume rows between %d and 0"%(totalConsumeRows, expectrowcnt)) tdLog.exit("tmq consume rows error!") diff --git a/tests/system-test/7-tmq/subscribeStb.py b/tests/system-test/7-tmq/subscribeStb.py index 9dcbf5b351..53f1a34d58 100644 --- a/tests/system-test/7-tmq/subscribeStb.py +++ b/tests/system-test/7-tmq/subscribeStb.py @@ -226,12 +226,11 @@ class TDTestCase: self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) tdLog.info("start consume processor") - pollDelay = 5 + pollDelay = 10 showMsg = 1 showRow = 1 self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow) - time.sleep(5) self.create_ctables(tdSql, parameterDict["dbName"], parameterDict["stbName"], parameterDict["ctbNum"]) self.insert_data(tdSql,\ parameterDict["dbName"],\ @@ -307,7 +306,7 @@ class TDTestCase: self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) tdLog.info("start consume processor") - pollDelay = 5 + pollDelay = 10 showMsg = 1 showRow = 1 self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow)