diff --git a/README-CN.md b/README-CN.md index 189b7a059a..12ac7b9ee7 100644 --- a/README-CN.md +++ b/README-CN.md @@ -352,4 +352,4 @@ TDengine 提供了丰富的应用程序开发接口,其中包括 C/C++、Java # 加入技术交流群 -TDengine 官方社群「物联网大数据群」对外开放,欢迎您加入讨论。搜索微信号 "tdengine",加小 T 为好友,即可入群。 +TDengine 官方社群「物联网大数据群」对外开放,欢迎您加入讨论。搜索微信号 "tdengine1",加小 T 为好友,即可入群。 diff --git a/contrib/CMakeLists.txt b/contrib/CMakeLists.txt index c1255d15bb..893481322a 100644 --- a/contrib/CMakeLists.txt +++ b/contrib/CMakeLists.txt @@ -246,6 +246,11 @@ if(${BUILD_WITH_ROCKSDB}) option(WITH_MD_LIBRARY "build with MD" OFF) set(SYSTEM_LIBS ${SYSTEM_LIBS} shlwapi.lib rpcrt4.lib) endif(${TD_WINDOWS}) + + if(${TD_WINDOWS}) + option(WITH_MD_LIBRARY "build with MD" OFF) + set(SYSTEM_LIBS ${SYSTEM_LIBS} shlwapi.lib rpcrt4.lib) + endif(${TD_WINDOWS}) option(WITH_FALLOCATE "" OFF) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 95b2f94f3f..865977d62b 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -207,8 +207,6 @@ static FORCE_INLINE void streamQueueProcessFail(SStreamQueue* queue) { atomic_store_8(&queue->status, STREAM_QUEUE__FAILED); } -static FORCE_INLINE void* streamQueueCurItem(SStreamQueue* queue) { return queue->qItem; } - void* streamQueueNextItem(SStreamQueue* queue); SStreamDataSubmit2* streamDataSubmitNew(SPackedData submit, int32_t type); @@ -241,6 +239,7 @@ typedef struct { void* vnode; // not available to encoder and decoder FTbSink* tbSinkFunc; STSchema* pTSchema; + SSHashObj* pTblInfo; } STaskSinkTb; typedef void FSmaSink(void* vnode, int64_t smaId, const SArray* data); diff --git a/packaging/docker/bin/entrypoint.sh b/packaging/docker/bin/entrypoint.sh index f2811de7bd..a60254d7ef 100755 --- a/packaging/docker/bin/entrypoint.sh +++ b/packaging/docker/bin/entrypoint.sh @@ -55,7 +55,7 @@ else exit $? fi while true; do - es=$(taos -h $FIRST_EP_HOST -P $FIRST_EP_PORT --check) + es=$(taos -h $FIRST_EP_HOST -P $FIRST_EP_PORT --check | grep "^[0-9]*:") echo ${es} if [ "${es%%:*}" -eq 2 ]; then echo "execute create dnode" diff --git a/packaging/docker/bin/taos-check b/packaging/docker/bin/taos-check index 5dc06b6018..349187da9b 100755 --- a/packaging/docker/bin/taos-check +++ b/packaging/docker/bin/taos-check @@ -1,5 +1,5 @@ #!/bin/sh -es=$(taos --check) +es=$(taos --check | grep "^[0-9]*:") code=${es%%:*} if [ "$code" -ne "0" ] && [ "$code" -ne "4" ]; then exit 0 diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c index 33fabbed50..443c276cd1 100644 --- a/source/client/src/clientSml.c +++ b/source/client/src/clientSml.c @@ -1359,7 +1359,7 @@ static int32_t smlInsertData(SSmlHandle *info) { } taosArrayPush(info->pRequest->tableList, &pName); - tstrncpy(pName.tname, tableData->childTableName, strlen(tableData->childTableName) + 1); + strcpy(pName.tname, tableData->childTableName); SRequestConnInfo conn = {0}; conn.pTrans = info->taos->pAppInfo->pTransporter; diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 4748e98913..237a52efe5 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -84,7 +84,7 @@ bool tsMonitorComp = false; // telem bool tsEnableTelem = true; int32_t tsTelemInterval = 43200; -char tsTelemServer[TSDB_FQDN_LEN] = "telemetry.taosdata.com"; +char tsTelemServer[TSDB_FQDN_LEN] = "telemetry.tdengine.com"; uint16_t tsTelemPort = 80; char *tsTelemUri = "/report"; diff --git a/source/dnode/mgmt/exe/dmMain.c b/source/dnode/mgmt/exe/dmMain.c index f0e020edfe..989bff3984 100644 --- a/source/dnode/mgmt/exe/dmMain.c +++ b/source/dnode/mgmt/exe/dmMain.c @@ -87,18 +87,6 @@ static void dmStopDnode(int signum, void *sigInfo, void *context) { } void dmLogCrash(int signum, void *sigInfo, void *context) { - taosIgnSignal(SIGTERM); - taosIgnSignal(SIGHUP); - taosIgnSignal(SIGINT); - taosIgnSignal(SIGBREAK); - -#ifndef WINDOWS - taosIgnSignal(SIGBUS); -#endif - taosIgnSignal(SIGABRT); - taosIgnSignal(SIGFPE); - taosIgnSignal(SIGSEGV); - char *pMsg = NULL; const char *flags = "UTL FATAL "; ELogLevel level = DEBUG_FATAL; diff --git a/source/dnode/mnode/impl/src/mndProfile.c b/source/dnode/mnode/impl/src/mndProfile.c index 2ebb5aeb99..d0f88940a9 100644 --- a/source/dnode/mnode/impl/src/mndProfile.c +++ b/source/dnode/mnode/impl/src/mndProfile.c @@ -256,10 +256,13 @@ static int32_t mndProcessConnectReq(SRpcMsg *pReq) { snprintf(db, TSDB_DB_FNAME_LEN, "%d%s%s", pUser->acctId, TS_PATH_DELIMITER, connReq.db); pDb = mndAcquireDb(pMnode, db); if (pDb == NULL) { - terrno = TSDB_CODE_MND_INVALID_DB; - mGError("user:%s, failed to login from %s while use db:%s since %s", pReq->info.conn.user, ip, connReq.db, - terrstr()); - goto _OVER; + if (0 != strcmp(connReq.db, TSDB_INFORMATION_SCHEMA_DB) && + (0 != strcmp(connReq.db, TSDB_PERFORMANCE_SCHEMA_DB))) { + terrno = TSDB_CODE_MND_INVALID_DB; + mGError("user:%s, failed to login from %s while use db:%s since %s", pReq->info.conn.user, ip, connReq.db, + terrstr()); + goto _OVER; + } } if (mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_READ_OR_WRITE_DB, pDb) != 0) { diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index eb70598201..d098ec9be2 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -261,13 +261,13 @@ int32_t tqReaderAddTbUidList(STqReader *pReader, const SArray *pTableUidList); int32_t tqReaderRemoveTbUidList(STqReader *pReader, const SArray *tbUidList); int32_t tqSeekVer(STqReader *pReader, int64_t ver, const char *id); -int32_t tqNextBlockInWal(STqReader *pReader); -bool tqNextBlockImpl(STqReader *pReader); +int32_t tqNextBlockInWal(STqReader* pReader); +bool tqNextBlockImpl(STqReader *pReader, const char* idstr); int32_t extractSubmitMsgFromWal(SWalReader *pReader, SPackedData *pPackedData); int32_t tqReaderSetSubmitMsg(STqReader *pReader, void *msgStr, int32_t msgLen, int64_t ver); bool tqNextDataBlockFilterOut(STqReader *pReader, SHashObj *filterOutUids); -int32_t tqRetrieveDataBlock(STqReader *pReader, SSubmitTbData **pSubmitTbDataRet); +int32_t tqRetrieveDataBlock(STqReader *pReader, const char* idstr); int32_t tqRetrieveTaosxBlock(STqReader *pReader, SArray *blocks, SArray *schemas, SSubmitTbData **pSubmitTbDataRet); int32_t vnodeEnqueueStreamMsg(SVnode *pVnode, SRpcMsg *pMsg); diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 8207b02c4c..e9811a56f1 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -101,6 +101,8 @@ typedef struct { STqPushHandle pushHandle; // push STqExecHandle execHandle; // exec SRpcMsg* msg; + int32_t noDataPollCnt; + int8_t exec; } STqHandle; typedef struct { diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index f4fcc7c02a..5684c40df0 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -753,6 +753,10 @@ end: return ret; } +void freePtr(void *ptr) { + taosMemoryFree(*(void**)ptr); +} + int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { int32_t vgId = TD_VID(pTq->pVnode); pTask->id.idStr = createStreamTaskIdStr(pTask->id.streamId, pTask->id.taskId); @@ -789,6 +793,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { return -1; } + qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId); } else if (pTask->taskLevel == TASK_LEVEL__AGG) { pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1); if (pTask->pState == NULL) { @@ -802,10 +807,11 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { if (pTask->exec.pExecutor == NULL) { return -1; } + + qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId); } // sink - /*pTask->ahandle = pTq->pVnode;*/ if (pTask->outputType == TASK_OUTPUT__SMA) { pTask->smaSink.vnode = pTq->pVnode; pTask->smaSink.smaSink = smaHandleRes; @@ -825,6 +831,8 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { if (pTask->tbSink.pTSchema == NULL) { return -1; } + pTask->tbSink.pTblInfo = tSimpleHashInit(10240, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT)); + tSimpleHashSetFreeFp(pTask->tbSink.pTblInfo, freePtr); } if (pTask->taskLevel == TASK_LEVEL__SOURCE) { diff --git a/source/dnode/vnode/src/tq/tqPush.c b/source/dnode/vnode/src/tq/tqPush.c index 46414e198b..ba8bbed27e 100644 --- a/source/dnode/vnode/src/tq/tqPush.c +++ b/source/dnode/vnode/src/tq/tqPush.c @@ -310,6 +310,7 @@ int32_t tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg) { memcpy(pHandle->msg, pMsg, sizeof(SRpcMsg)); pHandle->msg->pCont = rpcMallocCont(pMsg->contLen); } else { + tqPushDataRsp(pTq, pHandle); void* tmp = pHandle->msg->pCont; memcpy(pHandle->msg, pMsg, sizeof(SRpcMsg)); pHandle->msg->pCont = tmp; diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index e13c0288be..e9eb9d05fc 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -419,15 +419,15 @@ int32_t tqReaderSetSubmitMsg(STqReader* pReader, void* msgStr, int32_t msgLen, i return 0; } -bool tqNextBlockImpl(STqReader* pReader) { +bool tqNextBlockImpl(STqReader* pReader, const char* idstr) { if (pReader->msg.msgStr == NULL) { return false; } - int32_t blockSz = taosArrayGetSize(pReader->submit.aSubmitTbData); - while (pReader->nextBlk < blockSz) { - tqDebug("tq reader next data block %p, %d %" PRId64 " %d", pReader->msg.msgStr, pReader->msg.msgLen, - pReader->msg.ver, pReader->nextBlk); + int32_t numOfBlocks = taosArrayGetSize(pReader->submit.aSubmitTbData); + while (pReader->nextBlk < numOfBlocks) { + tqDebug("tq reader next data block, len:%d ver:%" PRId64 " index:%d/%d, %s", pReader->msg.msgLen, + pReader->msg.ver, pReader->nextBlk, numOfBlocks, idstr); SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk); if (pReader->tbIdHash == NULL) { @@ -503,13 +503,11 @@ int32_t tqMaskBlock(SSchemaWrapper* pDst, SSDataBlock* pBlock, const SSchemaWrap return 0; } -int32_t tqRetrieveDataBlock(STqReader* pReader, SSubmitTbData** pSubmitTbDataRet) { - tqDebug("tq reader retrieve data block %p, index:%d", pReader->msg.msgStr, pReader->nextBlk); +int32_t tqRetrieveDataBlock(STqReader* pReader, const char* idstr) { + tqDebug("tq reader retrieve data block %p, index:%d/%d, %s", pReader->msg.msgStr, pReader->nextBlk, + (int32_t)taosArrayGetSize(pReader->submit.aSubmitTbData), idstr); SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk++); - if (pSubmitTbDataRet) { - *pSubmitTbDataRet = pSubmitTbData; - } SSDataBlock* pBlock = pReader->pResBlock; blockDataCleanup(pBlock); @@ -674,12 +672,10 @@ int32_t tqRetrieveDataBlock(STqReader* pReader, SSubmitTbData** pSubmitTbDataRet SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, j); while (1) { SColVal colVal; - tqDebug("start to extract column id:%d, index:%d", pColData->info.colId, sourceIdx); - tRowGet(pRow, pTSchema, sourceIdx, &colVal); if (colVal.cid < pColData->info.colId) { - tqDebug("colIndex:%d column id:%d in row, ignore, the required colId:%d, total cols in schema:%d", - sourceIdx, colVal.cid, pColData->info.colId, pTSchema->numOfCols); +// tqDebug("colIndex:%d column id:%d in row, ignore, the required colId:%d, total cols in schema:%d", +// sourceIdx, colVal.cid, pColData->info.colId, pTSchema->numOfCols); sourceIdx++; continue; } else if (colVal.cid == pColData->info.colId) { diff --git a/source/dnode/vnode/src/tq/tqRestore.c b/source/dnode/vnode/src/tq/tqRestore.c index 82c31f2f57..ea7e9ee715 100644 --- a/source/dnode/vnode/src/tq/tqRestore.c +++ b/source/dnode/vnode/src/tq/tqRestore.c @@ -57,29 +57,6 @@ int32_t tqStreamTasksScanWal(STQ* pTq) { return 0; } -<<<<<<< HEAD -static SArray* extractTaskIdList(SStreamMeta* pStreamMeta, int32_t numOfTasks) { - SArray* pTaskIdList = taosArrayInit(numOfTasks, sizeof(int32_t)); -======= -int32_t streamTaskReplayWal(SStreamMeta* pStreamMeta, STqOffsetStore* pOffsetStore, bool* pScanIdle) { ->>>>>>> enh/3.0 - void* pIter = NULL; - - taosWLockLatch(&pStreamMeta->lock); - while (1) { - pIter = taosHashIterate(pStreamMeta->pTasks, pIter); - if (pIter == NULL) { - break; - } - - SStreamTask* pTask = *(SStreamTask**)pIter; - taosArrayPush(pTaskIdList, &pTask->id.taskId); - } - - taosWUnLockLatch(&pStreamMeta->lock); - return pTaskIdList; -} - int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { *pScanIdle = true; bool noNewDataInWal = true; @@ -143,6 +120,8 @@ int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { continue; } + + // append the data for the stream tqDebug("vgId:%d s-task:%s wal reader seek to ver:%" PRId64, vgId, pTask->id.idStr, pTask->chkInfo.currentVer); } else { diff --git a/source/dnode/vnode/src/tq/tqScan.c b/source/dnode/vnode/src/tq/tqScan.c index 800bcc8b71..52f92cd229 100644 --- a/source/dnode/vnode/src/tq/tqScan.c +++ b/source/dnode/vnode/src/tq/tqScan.c @@ -205,7 +205,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR if (pExec->subType == TOPIC_SUB_TYPE__TABLE) { STqReader* pReader = pExec->pTqReader; tqReaderSetSubmitMsg(pReader, submit.msgStr, submit.msgLen, submit.ver); - while (tqNextBlockImpl(pReader)) { + while (tqNextBlockImpl(pReader, NULL)) { taosArrayClear(pBlocks); taosArrayClear(pSchemas); SSubmitTbData* pSubmitTbDataRet = NULL; diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 9373f23c02..4a9e3dcee7 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -17,6 +17,13 @@ #include "tmsg.h" #include "tq.h" +#define MAX_CATCH_NUM 10240 + +typedef struct STblInfo { + uint64_t uid; + char tbName[TSDB_TABLE_NAME_LEN]; +} STblInfo; + int32_t tqBuildDeleteReq(const char* stbFullName, const SSDataBlock* pDataBlock, SBatchDeleteReq* deleteReq, const char* pIdStr) { int32_t totalRows = pDataBlock->info.rows; @@ -90,6 +97,22 @@ end: return ret; } +int32_t tqGetTableInfo(SSHashObj* tblInfo ,uint64_t groupId, STblInfo** pTbl) { + void* pVal = tSimpleHashGet(tblInfo, &groupId, sizeof(uint64_t)); + if (pVal) { + *pTbl = *(STblInfo**)pVal; + return TSDB_CODE_SUCCESS; + } + return TSDB_CODE_FAILED; +} + +int32_t tqPutTableInfo(SSHashObj* tblInfo ,uint64_t groupId, STblInfo* pTbl) { + if (tSimpleHashGetSize(tblInfo) > MAX_CATCH_NUM) { + return TSDB_CODE_SUCCESS; + } + return tSimpleHashPut(tblInfo, &groupId, sizeof(uint64_t), &pTbl, POINTER_BYTES); +} + int32_t tqPutReqToQueue(SVnode* pVnode, SVCreateTbBatchReq* pReqs) { void* buf = NULL; int32_t tlen = 0; @@ -260,100 +283,112 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d tbData.suid = suid; tbData.uid = 0; // uid is assigned by vnode tbData.sver = pTSchema->version; + STblInfo* pTblMeta = NULL; - char* ctbName = NULL; - tqDebug("vgId:%d, stream write into %s, table auto created", TD_VID(pVnode), pDataBlock->info.parTbName); - if (pDataBlock->info.parTbName[0]) { - ctbName = taosStrdup(pDataBlock->info.parTbName); - } else { - ctbName = buildCtbNameByGroupId(stbFullName, pDataBlock->info.id.groupId); + int32_t res = tqGetTableInfo(pTask->tbSink.pTblInfo, pDataBlock->info.id.groupId, &pTblMeta); + if (res != TSDB_CODE_SUCCESS) { + pTblMeta = taosMemoryCalloc(1, sizeof(STblInfo)); } - SMetaReader mr = {0}; - metaReaderInit(&mr, pVnode->pMeta, 0); - if (metaGetTableEntryByName(&mr, ctbName) < 0) { - metaReaderClear(&mr); - tqDebug("vgId:%d, stream write into %s, table auto created", TD_VID(pVnode), ctbName); - - SVCreateTbReq* pCreateTbReq = NULL; - - if (!(pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateStbReq)))) { - taosMemoryFree(ctbName); - goto _end; - }; - - // set const - pCreateTbReq->flags = 0; - pCreateTbReq->type = TSDB_CHILD_TABLE; - pCreateTbReq->ctb.suid = suid; - - // set super table name - SName name = {0}; - tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); - pCreateTbReq->ctb.stbName = taosStrdup((char*)tNameGetTableName(&name)); // taosStrdup(stbFullName); - - // set tag content - tagArray = taosArrayInit(1, sizeof(STagVal)); - if (!tagArray) { - taosMemoryFree(ctbName); - tdDestroySVCreateTbReq(pCreateTbReq); - goto _end; + char* ctbName = pDataBlock->info.parTbName; + if (!ctbName[0]) { + if (res == TSDB_CODE_SUCCESS) { + memcpy(ctbName, pTblMeta->tbName, strlen(pTblMeta->tbName)); + } else { + char* tmp = buildCtbNameByGroupId(stbFullName, pDataBlock->info.id.groupId); + memcpy(ctbName, tmp, strlen(tmp)); + memcpy(pTblMeta->tbName, tmp, strlen(tmp)); + taosMemoryFree(tmp); + tqDebug("vgId:%d, gropuid:%" PRIu64 " datablock tabel name is null", TD_VID(pVnode), + pDataBlock->info.id.groupId); } - STagVal tagVal = { - .cid = pTSchema->numOfCols + 1, - .type = TSDB_DATA_TYPE_UBIGINT, - .i64 = (int64_t)pDataBlock->info.id.groupId, - }; - taosArrayPush(tagArray, &tagVal); - pCreateTbReq->ctb.tagNum = taosArrayGetSize(tagArray); + } - STag* pTag = NULL; - tTagNew(tagArray, 1, false, &pTag); - tagArray = taosArrayDestroy(tagArray); - if (pTag == NULL) { - taosMemoryFree(ctbName); - tdDestroySVCreateTbReq(pCreateTbReq); - terrno = TSDB_CODE_OUT_OF_MEMORY; - taosMemoryFree(ctbName); - tdDestroySVCreateTbReq(pCreateTbReq); - goto _end; - } - pCreateTbReq->ctb.pTag = (uint8_t*)pTag; - - // set tag name - SArray* tagName = taosArrayInit(1, TSDB_COL_NAME_LEN); - char tagNameStr[TSDB_COL_NAME_LEN] = {0}; - strcpy(tagNameStr, "group_id"); - taosArrayPush(tagName, tagNameStr); - pCreateTbReq->ctb.tagName = tagName; - - // set table name - pCreateTbReq->name = ctbName; - ctbName = NULL; - - tbData.pCreateTbReq = pCreateTbReq; - tbData.flags = SUBMIT_REQ_AUTO_CREATE_TABLE; + if (res == TSDB_CODE_SUCCESS) { + tbData.uid = pTblMeta->uid; } else { - if (mr.me.type != TSDB_CHILD_TABLE) { - tqError("vgId:%d, failed to write into %s, since table type incorrect, type %d", TD_VID(pVnode), ctbName, - mr.me.type); + SMetaReader mr = {0}; + metaReaderInit(&mr, pVnode->pMeta, 0); + if (metaGetTableEntryByName(&mr, ctbName) < 0) { metaReaderClear(&mr); - taosMemoryFree(ctbName); - continue; - } + taosMemoryFree(pTblMeta); + tqDebug("vgId:%d, stream write into %s, table auto created", TD_VID(pVnode), ctbName); - if (mr.me.ctbEntry.suid != suid) { - tqError("vgId:%d, failed to write into %s, since suid mismatch, expect suid: %" PRId64 - ", actual suid %" PRId64 "", - TD_VID(pVnode), ctbName, suid, mr.me.ctbEntry.suid); + SVCreateTbReq* pCreateTbReq = NULL; + + if (!(pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateStbReq)))) { + goto _end; + }; + + // set const + pCreateTbReq->flags = 0; + pCreateTbReq->type = TSDB_CHILD_TABLE; + pCreateTbReq->ctb.suid = suid; + + // set super table name + SName name = {0}; + tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); + pCreateTbReq->ctb.stbName = taosStrdup((char*)tNameGetTableName(&name)); // taosStrdup(stbFullName); + + // set tag content + tagArray = taosArrayInit(1, sizeof(STagVal)); + if (!tagArray) { + tdDestroySVCreateTbReq(pCreateTbReq); + goto _end; + } + STagVal tagVal = { + .cid = pTSchema->numOfCols + 1, + .type = TSDB_DATA_TYPE_UBIGINT, + .i64 = (int64_t)pDataBlock->info.id.groupId, + }; + taosArrayPush(tagArray, &tagVal); + pCreateTbReq->ctb.tagNum = taosArrayGetSize(tagArray); + + STag* pTag = NULL; + tTagNew(tagArray, 1, false, &pTag); + tagArray = taosArrayDestroy(tagArray); + if (pTag == NULL) { + tdDestroySVCreateTbReq(pCreateTbReq); + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto _end; + } + pCreateTbReq->ctb.pTag = (uint8_t*)pTag; + + // set tag name + SArray* tagName = taosArrayInit(1, TSDB_COL_NAME_LEN); + char tagNameStr[TSDB_COL_NAME_LEN] = {0}; + strcpy(tagNameStr, "group_id"); + taosArrayPush(tagName, tagNameStr); + pCreateTbReq->ctb.tagName = tagName; + + // set table name + pCreateTbReq->name = taosStrdup(ctbName); + + tbData.pCreateTbReq = pCreateTbReq; + tbData.flags = SUBMIT_REQ_AUTO_CREATE_TABLE; + } else { + if (mr.me.type != TSDB_CHILD_TABLE) { + tqError("vgId:%d, failed to write into %s, since table type incorrect, type %d", TD_VID(pVnode), ctbName, + mr.me.type); + metaReaderClear(&mr); + taosMemoryFree(pTblMeta); + continue; + } + + if (mr.me.ctbEntry.suid != suid) { + tqError("vgId:%d, failed to write into %s, since suid mismatch, expect suid: %" PRId64 + ", actual suid %" PRId64 "", + TD_VID(pVnode), ctbName, suid, mr.me.ctbEntry.suid); + metaReaderClear(&mr); + taosMemoryFree(pTblMeta); + continue; + } + + tbData.uid = mr.me.uid; + pTblMeta->uid = mr.me.uid; + tqPutTableInfo(pTask->tbSink.pTblInfo, pDataBlock->info.id.groupId, pTblMeta); metaReaderClear(&mr); - taosMemoryFree(ctbName); - continue; } - - tbData.uid = mr.me.uid; - metaReaderClear(&mr); - taosMemoryFreeClear(ctbName); } // rows diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 5dd14f69e7..a48cef73cf 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -16,12 +16,13 @@ #include "tq.h" #define IS_OFFSET_RESET_TYPE(_t) ((_t) < 0) +#define NO_POLL_CNT 5 static int32_t tqSendMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqMetaRsp* pRsp, int32_t vgId); char* createStreamTaskIdStr(int64_t streamId, int32_t taskId) { char buf[128] = {0}; - sprintf(buf, "0x%" PRIx64 "-%d", streamId, taskId); + sprintf(buf, "0x%" PRIx64 "-0x%x", streamId, taskId); return taosStrdup(buf); } @@ -234,6 +235,10 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand return 0; } +static bool isHandleExecuting(STqHandle* pHandle){ + return 1 == atomic_load_8(&pHandle->exec); +} + static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg, STqOffsetVal* pOffset) { char buf[80] = {0}; @@ -251,6 +256,12 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, return code; } + while(isHandleExecuting(pHandle)){ + tqInfo("sub is executing, pHandle:%p", pHandle); + taosMsleep(5); + } + atomic_store_8(&pHandle->exec, 1); + qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId); code = tqScanData(pTq, pHandle, &dataRsp, pOffset); if(code != 0) { @@ -260,17 +271,23 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, // till now, all data has been transferred to consumer, new data needs to push client once arrived. if (dataRsp.blockNum == 0 && dataRsp.reqOffset.type == TMQ_OFFSET__LOG && dataRsp.reqOffset.version == dataRsp.rspOffset.version && pHandle->consumerId == pRequest->consumerId) { - // lock - taosWLockLatch(&pTq->lock); - code = tqRegisterPushHandle(pTq, pHandle, pMsg); - taosWUnLockLatch(&pTq->lock); - tDeleteSMqDataRsp(&dataRsp); - return code; + if(pHandle->noDataPollCnt >= NO_POLL_CNT){ // send poll result to client if no data 5 times to avoid lost data + pHandle->noDataPollCnt = 0; + // lock + taosWLockLatch(&pTq->lock); + code = tqRegisterPushHandle(pTq, pHandle, pMsg); + taosWUnLockLatch(&pTq->lock); + tDeleteSMqDataRsp(&dataRsp); + atomic_store_8(&pHandle->exec, 0); + return code; + } + else{ + pHandle->noDataPollCnt++; + } } code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_RSP); - // NOTE: this pHandle->consumerId may have been changed already. end: @@ -279,45 +296,15 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, tFormatOffset(buf, 80, &dataRsp.rspOffset); tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, rsp offset type:%s, reqId:0x%" PRIx64 " code:%d", consumerId, pHandle->subKey, vgId, dataRsp.blockNum, buf, pRequest->reqId, code); -// taosWUnLockLatch(&pTq->lock); - tDeleteSMqDataRsp(&dataRsp); + tDeleteMqDataRsp(&dataRsp); } -======= - tqInitDataRsp(&dataRsp, pRequest); + atomic_store_8(&pHandle->exec, 0); - // lock - taosWLockLatch(&pTq->lock); - - qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId); - - int code = tqScanData(pTq, pHandle, &dataRsp, pOffset); - if (code == 0) { - - // till now, all data has been transferred to consumer, new data needs to push client once arrived. - if (dataRsp.blockNum == 0 && dataRsp.reqOffset.type == TMQ_OFFSET__LOG && - dataRsp.reqOffset.version == dataRsp.rspOffset.version && pHandle->consumerId == pRequest->consumerId) { - code = tqRegisterPushHandle(pTq, pHandle, pRequest, pMsg, &dataRsp, TMQ_MSG_TYPE__POLL_RSP); - taosWUnLockLatch(&pTq->lock); - return code; - } - - // NOTE: this pHandle->consumerId may have been changed already. - code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_RSP, vgId); - } - - tFormatOffset(buf, 80, &dataRsp.rspOffset); - tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, rsp offset type:%s, reqId:0x%" PRIx64 - " code:%d", - consumerId, pHandle->subKey, vgId, dataRsp.blockNum, buf, pRequest->reqId, code); - taosWUnLockLatch(&pTq->lock); - tDeleteMqDataRsp(&dataRsp); - ->>>>>>> enh/3.0 return code; } static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg, STqOffsetVal *offset) { - int code = 0; + int code = 0; int32_t vgId = TD_VID(pTq->pVnode); SWalCkHead* pCkHead = NULL; SMqMetaRsp metaRsp = {0}; @@ -330,10 +317,16 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, return code; } + while(isHandleExecuting(pHandle)){ + tqInfo("sub is executing, pHandle:%p", pHandle); + taosMsleep(5); + } + atomic_store_8(&pHandle->exec, 1); + if (offset->type != TMQ_OFFSET__LOG) { if (tqScanTaosx(pTq, pHandle, &taosxRsp, &metaRsp, offset) < 0) { - tDeleteSTaosxRsp(&taosxRsp); - return -1; + code = -1; + goto end; } if (metaRsp.metaRspLen > 0) { @@ -341,16 +334,14 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, tqDebug("tmq poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send meta offset type:%d,uid:%" PRId64 ",ts:%" PRId64, pRequest->consumerId, pHandle->subKey, vgId, metaRsp.rspOffset.type, metaRsp.rspOffset.uid, metaRsp.rspOffset.ts); taosMemoryFree(metaRsp.metaRsp); - tDeleteSTaosxRsp(&taosxRsp); - return code; + goto end; } tqDebug("taosx poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send data blockNum:%d, offset type:%d,uid:%" PRId64 ",ts:%" PRId64,pRequest->consumerId, pHandle->subKey, vgId, taosxRsp.blockNum, taosxRsp.rspOffset.type, taosxRsp.rspOffset.uid,taosxRsp.rspOffset.ts); if (taosxRsp.blockNum > 0) { - code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP, vgId); - tDeleteSTaosxRsp(&taosxRsp); - return code; + code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP); + goto end; }else { *offset = taosxRsp.rspOffset; } @@ -361,9 +352,9 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, int64_t fetchVer = offset->version + 1; pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048); if (pCkHead == NULL) { - tDeleteSTaosxRsp(&taosxRsp); terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; + code = -1; + goto end; } walSetReaderCapacity(pHandle->pWalReader, 2048); int totalRows = 0; @@ -378,10 +369,8 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, if (tqFetchLog(pTq, pHandle, &fetchVer, &pCkHead, pRequest->reqId) < 0) { tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer); - code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP, vgId); - tDeleteSTaosxRsp(&taosxRsp); - taosMemoryFreeClear(pCkHead); - return code; + code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP); + goto end; } SWalCont* pHead = &pCkHead->head; @@ -392,10 +381,8 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, if (pHead->msgType != TDMT_VND_SUBMIT) { if(totalRows > 0) { tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer - 1); - code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP, vgId); - tDeleteSTaosxRsp(&taosxRsp); - taosMemoryFreeClear(pCkHead); - return code; + code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP); + goto end; } tqDebug("fetch meta msg, ver:%" PRId64 ", type:%s", pHead->version, TMSG_INFO(pHead->msgType)); @@ -403,17 +390,8 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, metaRsp.resMsgType = pHead->msgType; metaRsp.metaRspLen = pHead->bodyLen; metaRsp.metaRsp = pHead->body; - if (tqSendMetaPollRsp(pHandle, pMsg, pRequest, &metaRsp, vgId) < 0) { - code = -1; - taosMemoryFreeClear(pCkHead); - tDeleteSTaosxRsp(&taosxRsp); - return code; - } - - code = 0; - taosMemoryFreeClear(pCkHead); - tDeleteSTaosxRsp(&taosxRsp); - return code; + code = tqSendMetaPollRsp(pTq, pMsg, pRequest, &metaRsp); + goto end; } // process data @@ -423,29 +401,28 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, .ver = pHead->version, }; - if (tqTaosxScanLog(pTq, pHandle, submit, &taosxRsp, &totalRows) < 0) { - tqError("tmq poll: tqTaosxScanLog error %" PRId64 ", in vgId:%d, subkey %s", pRequest->consumerId, vgId, - pRequest->subKey); - taosMemoryFreeClear(pCkHead); - tDeleteSTaosxRsp(&taosxRsp); - return -1; + code = tqTaosxScanLog(pTq, pHandle, submit, &taosxRsp, &totalRows); + if (code < 0) { + tqError("tmq poll: tqTaosxScanLog error %" PRId64 ", in vgId:%d, subkey %s", pRequest->consumerId, vgId, pRequest->subKey); + goto end; } if (totalRows >= 4096 || taosxRsp.createTableNum > 0) { tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer); - code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP, vgId); - tDeleteSTaosxRsp(&taosxRsp); - taosMemoryFreeClear(pCkHead); - return code; + code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP); + goto end; } else { fetchVer++; } } } +end: + atomic_store_8(&pHandle->exec, 0); + tDeleteSTaosxRsp(&taosxRsp); taosMemoryFreeClear(pCkHead); - return 0; + return code; } int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg) { diff --git a/source/libs/catalog/src/ctgAsync.c b/source/libs/catalog/src/ctgAsync.c index 56c79eac1f..9e654e89d9 100644 --- a/source/libs/catalog/src/ctgAsync.c +++ b/source/libs/catalog/src/ctgAsync.c @@ -1437,12 +1437,12 @@ _return: SMetaRes* pRes = taosArrayGet(ctx->pResList, pFetch->resIdx); pRes->code = code; pRes->pRes = NULL; + ctgTaskError("Get table %d.%s.%s meta failed with error %s", pName->acctId, pName->dbname, pName->tname, + tstrerror(code)); if (0 == atomic_sub_fetch_32(&ctx->fetchNum, 1)) { TSWAP(pTask->res, ctx->pResList); taskDone = true; } - ctgTaskError("Get table %d.%s.%s meta failed with error %s", pName->acctId, pName->dbname, pName->tname, - tstrerror(code)); } if (pTask->res && taskDone) { diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index c51dc39b5b..c8b16ad83b 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -1484,14 +1484,23 @@ static int32_t setSelectValueColumnInfo(SqlFunctionCtx* pCtx, int32_t numOfOutpu return TSDB_CODE_OUT_OF_MEMORY; } + SHashObj *pSelectFuncs = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); for (int32_t i = 0; i < numOfOutput; ++i) { const char* pName = pCtx[i].pExpr->pExpr->_function.functionName; if ((strcmp(pName, "_select_value") == 0) || (strcmp(pName, "_group_key") == 0)) { pValCtx[num++] = &pCtx[i]; } else if (fmIsSelectFunc(pCtx[i].functionId)) { - p = &pCtx[i]; + void* data = taosHashGet(pSelectFuncs, pName, strlen(pName)); + if (taosHashGetSize(pSelectFuncs) != 0 && data == NULL) { + p = NULL; + break; + } else { + taosHashPut(pSelectFuncs, pName, strlen(pName), &num, sizeof(num)); + p = &pCtx[i]; + } } } + taosHashCleanup(pSelectFuncs); if (p != NULL) { p->subsidiaries.pCtx = pValCtx; diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 8bbbd3524d..cafed977b7 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -112,7 +112,7 @@ void resetTaskInfo(qTaskInfo_t tinfo) { clearStreamBlock(pTaskInfo->pRoot); } -static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, char* id) { +static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, const char* id) { if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { if (pOperator->numOfDownstream == 0) { qError("failed to find stream scan operator to set the input data block, %s" PRIx64, id); @@ -129,7 +129,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu pOperator->status = OP_NOT_OPENED; SStreamScanInfo* pInfo = pOperator->info; - qDebug("s-task set source blocks:%d %s", (int32_t)numOfBlocks, id); + qDebug("s-task:%s set source blocks:%d", id, (int32_t)numOfBlocks); ASSERT(pInfo->validBlockIndex == 0 && taosArrayGetSize(pInfo->pBlockLists) == 0); if (type == STREAM_INPUT__MERGED_SUBMIT) { @@ -144,9 +144,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu } else if (type == STREAM_INPUT__DATA_BLOCK) { for (int32_t i = 0; i < numOfBlocks; ++i) { SSDataBlock* pDataBlock = &((SSDataBlock*)input)[i]; - SPackedData tmp = { - .pDataBlock = pDataBlock, - }; + SPackedData tmp = { .pDataBlock = pDataBlock }; taosArrayPush(pInfo->pBlockLists, &tmp); } pInfo->blockType = STREAM_INPUT__DATA_BLOCK; @@ -162,9 +160,11 @@ void doSetTaskId(SOperatorInfo* pOperator) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { SStreamScanInfo* pStreamScanInfo = pOperator->info; - STableScanInfo* pScanInfo = pStreamScanInfo->pTableScanOp->info; - if (pScanInfo->base.dataReader != NULL) { - tsdbReaderSetId(pScanInfo->base.dataReader, pTaskInfo->id.str); + if (pStreamScanInfo->pTableScanOp != NULL) { + STableScanInfo* pScanInfo = pStreamScanInfo->pTableScanOp->info; + if (pScanInfo->base.dataReader != NULL) { + tsdbReaderSetId(pScanInfo->base.dataReader, pTaskInfo->id.str); + } } } else { doSetTaskId(pOperator->pDownstream[0]); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index af310305af..8909d83d31 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1583,7 +1583,7 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock // currently only the tbname pseudo column if (pInfo->numOfPseudoExpr > 0) { int32_t code = addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, pInfo->pRes, - pInfo->pRes->info.rows, GET_TASKID(pTaskInfo), NULL); + pInfo->pRes->info.rows, GET_TASKID(pTaskInfo), &pTableScanInfo->base.metaCache); // ignore the table not exists error, since this table may have been dropped during the scan procedure. if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_PAR_TABLE_NOT_EXIST) { blockDataFreeRes((SSDataBlock*)pBlock); @@ -1626,7 +1626,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { blockDataCleanup(pInfo->pRes); SDataBlockInfo* pBlockInfo = &pInfo->pRes->info; - while (tqNextBlockImpl(pInfo->tqReader)) { + while (tqNextBlockImpl(pInfo->tqReader, NULL)) { int32_t code = tqRetrieveDataBlock(pInfo->tqReader, NULL); if (code != TSDB_CODE_SUCCESS || pInfo->tqReader->pResBlock->info.rows == 0) { continue; @@ -2046,17 +2046,18 @@ FETCH_NEXT_BLOCK: return pInfo->pUpdateRes; } + const char* id = GET_TASKID(pTaskInfo); SDataBlockInfo* pBlockInfo = &pInfo->pRes->info; - - int32_t totBlockNum = taosArrayGetSize(pInfo->pBlockLists); + int32_t totalBlocks = taosArrayGetSize(pInfo->pBlockLists); NEXT_SUBMIT_BLK: while (1) { if (pInfo->tqReader->msg.msgStr == NULL) { - if (pInfo->validBlockIndex >= totBlockNum) { + if (pInfo->validBlockIndex >= totalBlocks) { updateInfoDestoryColseWinSBF(pInfo->pUpdateInfo); doClearBufferedBlocks(pInfo); - qDebug("stream scan return empty, consume block %d", totBlockNum); + + qDebug("stream scan return empty, all %d submit blocks consumed, %s", totalBlocks, id); void* buff = NULL; // int32_t len = streamScanOperatorEncode(pInfo, &buff); // if (len > 0) { @@ -2068,17 +2069,18 @@ FETCH_NEXT_BLOCK: int32_t current = pInfo->validBlockIndex++; SPackedData* pSubmit = taosArrayGet(pInfo->pBlockLists, current); + + qDebug("set %d/%d as the input submit block, %s", current, totalBlocks, id); if (tqReaderSetSubmitMsg(pInfo->tqReader, pSubmit->msgStr, pSubmit->msgLen, pSubmit->ver) < 0) { - qError("submit msg messed up when initing stream submit block %p, current %d, total %d", pSubmit, current, - totBlockNum); + qError("submit msg messed up when initializing stream submit block %p, current %d/%d, %s", pSubmit, current, totalBlocks, id); continue; } } blockDataCleanup(pInfo->pRes); - while (tqNextBlockImpl(pInfo->tqReader)) { - int32_t code = tqRetrieveDataBlock(pInfo->tqReader, NULL); + while (tqNextBlockImpl(pInfo->tqReader, id)) { + int32_t code = tqRetrieveDataBlock(pInfo->tqReader, id); if (code != TSDB_CODE_SUCCESS || pInfo->tqReader->pResBlock->info.rows == 0) { continue; } @@ -2099,6 +2101,7 @@ FETCH_NEXT_BLOCK: break; } } + if (pBlockInfo->rows > 0 || pInfo->pUpdateDataRes->info.rows > 0) { break; } else { @@ -2110,7 +2113,7 @@ FETCH_NEXT_BLOCK: pInfo->numOfExec++; pOperator->resultInfo.totalRows += pBlockInfo->rows; - qDebug("scan rows: %" PRId64, pBlockInfo->rows); + qDebug("stream scan get source rows:%" PRId64", %s", pBlockInfo->rows, id); if (pBlockInfo->rows > 0) { return pInfo->pRes; } diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 63b60b6d2c..652825165c 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -2564,7 +2564,9 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { doDeleteWindows(pOperator, &pInfo->interval, pBlock, delWins, pInfo->pUpdatedMap); if (IS_FINAL_OP(pInfo)) { addRetriveWindow(delWins, pInfo); - taosArrayAddAll(pInfo->pDelWins, delWins); + if (pBlock->info.type != STREAM_CLEAR) { + taosArrayAddAll(pInfo->pDelWins, delWins); + } taosArrayDestroy(delWins); continue; } @@ -2576,6 +2578,11 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { if (pInfo->pDelRes->info.rows != 0) { // process the rest of the data printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi"); + if (pBlock->info.type == STREAM_CLEAR) { + pInfo->pDelRes->info.type = STREAM_CLEAR; + } else { + pInfo->pDelRes->info.type = STREAM_DELETE_RESULT; + } return pInfo->pDelRes; } diff --git a/source/libs/parser/src/parCalcConst.c b/source/libs/parser/src/parCalcConst.c index c25d0e7036..01b62a9051 100644 --- a/source/libs/parser/src/parCalcConst.c +++ b/source/libs/parser/src/parCalcConst.c @@ -388,6 +388,9 @@ static bool isSetUselessCol(SSetOperator* pSetOp, int32_t index, SExprNode* pPro } static int32_t calcConstSetOpProjections(SCalcConstContext* pCxt, SSetOperator* pSetOp, bool subquery) { + if (subquery && pSetOp->opType == SET_OP_TYPE_UNION) { + return TSDB_CODE_SUCCESS; + } int32_t index = 0; SNode* pProj = NULL; WHERE_EACH(pProj, pSetOp->pProjectionList) { diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index ec4e203133..529a063538 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -5349,7 +5349,8 @@ static int32_t checkAlterSuperTableBySchema(STranslateContext* pCxt, SAlterTable } if (TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES == pStmt->alterType) { - if (calcTypeBytes(pStmt->dataType) > TSDB_MAX_FIELD_LEN) { + if ((TSDB_DATA_TYPE_VARCHAR == pStmt->dataType.type && calcTypeBytes(pStmt->dataType) > TSDB_MAX_BINARY_LEN) || + (TSDB_DATA_TYPE_NCHAR == pStmt->dataType.type && calcTypeBytes(pStmt->dataType) > TSDB_MAX_NCHAR_LEN)) { return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN); } @@ -5374,6 +5375,11 @@ static int32_t checkAlterSuperTableBySchema(STranslateContext* pCxt, SAlterTable return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_TOO_MANY_COLUMNS); } + if ((TSDB_DATA_TYPE_VARCHAR == pStmt->dataType.type && calcTypeBytes(pStmt->dataType) > TSDB_MAX_BINARY_LEN) || + (TSDB_DATA_TYPE_NCHAR == pStmt->dataType.type && calcTypeBytes(pStmt->dataType) > TSDB_MAX_NCHAR_LEN)) { + return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN); + } + if (pTableMeta->tableInfo.rowSize + calcTypeBytes(pStmt->dataType) > TSDB_MAX_BYTES_PER_ROW) { return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_ROW_LENGTH, TSDB_MAX_BYTES_PER_ROW); } @@ -8321,6 +8327,11 @@ static int32_t buildAddColReq(STranslateContext* pCxt, SAlterTableStmt* pStmt, S return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_DUPLICATED_COLUMN); } + if ((TSDB_DATA_TYPE_VARCHAR == pStmt->dataType.type && calcTypeBytes(pStmt->dataType) > TSDB_MAX_BINARY_LEN) || + (TSDB_DATA_TYPE_NCHAR == pStmt->dataType.type && calcTypeBytes(pStmt->dataType) > TSDB_MAX_NCHAR_LEN)) { + return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN); + } + if (TSDB_MAX_COLUMNS == pTableMeta->tableInfo.numOfColumns) { return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_TOO_MANY_COLUMNS); } @@ -8373,6 +8384,11 @@ static int32_t buildUpdateColReq(STranslateContext* pCxt, SAlterTableStmt* pStmt return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_MODIFY_COL); } + if ((TSDB_DATA_TYPE_VARCHAR == pStmt->dataType.type && calcTypeBytes(pStmt->dataType) > TSDB_MAX_BINARY_LEN) || + (TSDB_DATA_TYPE_NCHAR == pStmt->dataType.type && calcTypeBytes(pStmt->dataType) > TSDB_MAX_NCHAR_LEN)) { + return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN); + } + if (pTableMeta->tableInfo.rowSize + pReq->colModBytes - pSchema->bytes > TSDB_MAX_BYTES_PER_ROW) { return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_ROW_LENGTH, TSDB_MAX_BYTES_PER_ROW); } diff --git a/source/libs/planner/src/planner.c b/source/libs/planner/src/planner.c index c6a4a97f6e..58b8e53478 100644 --- a/source/libs/planner/src/planner.c +++ b/source/libs/planner/src/planner.c @@ -19,6 +19,14 @@ #include "scalar.h" #include "tglobal.h" +static void debugPrintNode(SNode* pNode) { + char* pStr = NULL; + nodesNodeToString(pNode, false, &pStr, NULL); + printf("%s\n", pStr); + taosMemoryFree(pStr); + return; +} + static void dumpQueryPlan(SQueryPlan* pPlan) { if (!tsQueryPlannerTrace) { return; diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 9a39bb09dc..635024519e 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -299,9 +299,8 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) { int32_t numOfBlocks = taosQueueItemSize(pTask->inputQueue->queue) + 1; double size = taosQueueMemorySize(pTask->inputQueue->queue) / 1048576.0; - qDebug("s-task:%s submit enqueue %p %p msgLen:%d ver:%" PRId64 ", total in queue:%d, size:%.2fMiB", pTask->id.idStr, - pItem, pSubmitBlock->submit.msgStr, pSubmitBlock->submit.msgLen, - pSubmitBlock->submit.ver, numOfBlocks, size); + qDebug("s-task:%s submit enqueue msgLen:%d ver:%" PRId64 ", total in queue:%d, size:%.2fMiB", pTask->id.idStr, + pSubmitBlock->submit.msgLen, pSubmitBlock->submit.ver, numOfBlocks, size); if ((pTask->taskLevel == TASK_LEVEL__SOURCE) && (numOfBlocks > STREAM_TASK_INPUT_QUEUEU_CAPACITY || (size >= STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE))) { @@ -345,6 +344,8 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) { return 0; } +static void* streamQueueCurItem(SStreamQueue* queue) { return queue->qItem; } + void* streamQueueNextItem(SStreamQueue* queue) { int8_t dequeueFlag = atomic_exchange_8(&queue->status, STREAM_QUEUE__PROCESSING); if (dequeueFlag == STREAM_QUEUE__FAILED) { diff --git a/source/libs/stream/src/streamData.c b/source/libs/stream/src/streamData.c index 7fb35ad2ad..e574cdbe8a 100644 --- a/source/libs/stream/src/streamData.c +++ b/source/libs/stream/src/streamData.c @@ -159,7 +159,8 @@ SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* return dst; } else if (dst->type == STREAM_INPUT__DATA_SUBMIT && pElem->type == STREAM_INPUT__DATA_SUBMIT) { SStreamMergedSubmit2* pMerged = streamMergedSubmitNew(); - ASSERT(pMerged); + // todo handle error + streamMergeSubmit(pMerged, (SStreamDataSubmit2*)dst); streamMergeSubmit(pMerged, (SStreamDataSubmit2*)pElem); taosFreeQitem(dst); diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 78a1f3e967..0fb78fb589 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -15,7 +15,8 @@ #include "streamInc.h" -#define MAX_STREAM_EXEC_BATCH_NUM 10240 +// maximum allowed processed block batches. One block may include several submit blocks +#define MAX_STREAM_EXEC_BATCH_NUM 128 #define MIN_STREAM_EXEC_BATCH_NUM 16 bool streamTaskShouldStop(const SStreamStatus* pStatus) { @@ -66,7 +67,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* SArray* pBlockList = pMerged->submits; int32_t numOfBlocks = taosArrayGetSize(pBlockList); - qDebug("st-task:%s %p set submit input (merged), batch num:%d", pTask->id.idStr, pTask, numOfBlocks); + qDebug("s-task:%s %p set submit input (merged), numOfblocks:%d", pTask->id.idStr, pTask, numOfBlocks); qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__MERGED_SUBMIT); } else if (pItem->type == STREAM_INPUT__REF_DATA_BLOCK) { const SStreamRefDataBlock* pRefBlock = (const SStreamRefDataBlock*)data; @@ -259,9 +260,10 @@ int32_t streamExecForAll(SStreamTask* pTask) { int32_t code = 0; while (1) { int32_t batchSize = 1; - void* pInput = NULL; int16_t times = 0; + SStreamQueueItem* pInput = NULL; + // merge multiple input data if possible in the input queue. qDebug("s-task:%s start to extract data block from inputQ", pTask->id.idStr); @@ -271,10 +273,11 @@ int32_t streamExecForAll(SStreamTask* pTask) { if (pTask->taskLevel == TASK_LEVEL__SOURCE && batchSize < MIN_STREAM_EXEC_BATCH_NUM && times < 5) { times++; taosMsleep(1); - qDebug("===stream===try agian batchSize:%d", batchSize); + qDebug("===stream===try again batchSize:%d", batchSize); continue; } + qDebug("===stream===break batchSize:%d", batchSize); break; } @@ -285,6 +288,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { break; } } else { + // todo we need to sort the data block, instead of just appending into the array list. void* newRet = NULL; if ((newRet = streamMergeQueueItem(pInput, qItem)) == NULL) { streamQueueProcessFail(pTask->inputQueue); @@ -294,6 +298,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { pInput = newRet; streamQueueProcessSuccess(pTask->inputQueue); if (batchSize > MAX_STREAM_EXEC_BATCH_NUM) { + qDebug("maximum batch limit:%d reached, processing, %s", MAX_STREAM_EXEC_BATCH_NUM, pTask->id.idStr); break; } } @@ -304,6 +309,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { if (pInput) { streamFreeQitem(pInput); } + return 0; } @@ -312,14 +318,14 @@ int32_t streamExecForAll(SStreamTask* pTask) { } if (pTask->taskLevel == TASK_LEVEL__SINK) { - ASSERT(((SStreamQueueItem*)pInput)->type == STREAM_INPUT__DATA_BLOCK); + ASSERT(pInput->type == STREAM_INPUT__DATA_BLOCK); qDebug("s-task:%s sink node start to sink result. numOfBlocks:%d", pTask->id.idStr, batchSize); - streamTaskOutput(pTask, pInput); + streamTaskOutput(pTask, (SStreamDataBlock*)pInput); continue; } SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock)); - qDebug("s-task:%s start to execute, numOfBlocks:%d", pTask->id.idStr, batchSize); + qDebug("s-task:%s start to execute, block batches:%d", pTask->id.idStr, batchSize); streamTaskExecImpl(pTask, pInput, pRes); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 49710b0934..de56cf24ca 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -217,8 +217,12 @@ int32_t streamMetaAddDeployedTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* return -1; } - taosHashPut(pMeta->pTasks, &pTask->id.taskId, sizeof(int32_t), &pTask, POINTER_BYTES); - taosArrayPush(pMeta->pTaskList, &pTask->id.taskId); + void* p = taosHashGet(pMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId)); + if (p == NULL) { + taosArrayPush(pMeta->pTaskList, &pTask->id.taskId); + } + + taosHashPut(pMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId), &pTask, POINTER_BYTES); return 0; } @@ -357,15 +361,18 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) { return -1; } - if (taosHashPut(pMeta->pTasks, &pTask->id.taskId, sizeof(int32_t), &pTask, sizeof(void*)) < 0) { + void* p = taosHashGet(pMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId)); + if (p == NULL) { + taosArrayPush(pMeta->pTaskList, &pTask->id.taskId); + } + + if (taosHashPut(pMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId), &pTask, sizeof(void*)) < 0) { tdbFree(pKey); tdbFree(pVal); tdbTbcClose(pCur); return -1; } - taosArrayPush(pMeta->pTaskList, &pTask->id.taskId); - if (pTask->fillHistory) { pTask->status.taskStatus = TASK_STATUS__WAIT_DOWNSTREAM; streamTaskCheckDownstream(pTask, ver); diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index fee2f2ce58..1cca4d55cf 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -93,10 +93,6 @@ int stateKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) { SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int32_t szPage, int32_t pages) { qWarn("open stream state, %s", path); - if (pTask == NULL) { - qWarn("failed to open stream state, %s", path); - return NULL; - } SStreamState* pState = taosMemoryCalloc(1, sizeof(SStreamState)); if (pState == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -127,7 +123,6 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int taosMemoryFree(pState); pState = NULL; } - qWarn("open stream state2, %s", statePath); pState->pTdbState->pOwner = pTask; pState->pFileState = NULL; _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT); diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index af084cb1b5..8a03896978 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -195,6 +195,7 @@ void tFreeStreamTask(SStreamTask* pTask) { if (pTask->outputType == TASK_OUTPUT__TABLE) { tDeleteSchemaWrapper(pTask->tbSink.pSchemaWrapper); taosMemoryFree(pTask->tbSink.pTSchema); + tSimpleHashCleanup(pTask->tbSink.pTblInfo); } if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { diff --git a/source/libs/wal/src/walMeta.c b/source/libs/wal/src/walMeta.c index cda7e35b0f..a12f8051ba 100644 --- a/source/libs/wal/src/walMeta.c +++ b/source/libs/wal/src/walMeta.c @@ -295,6 +295,36 @@ void walAlignVersions(SWal* pWal) { wInfo("vgId:%d, reset commitVer to %" PRId64, pWal->cfg.vgId, pWal->vers.commitVer); } +int walRepairLogFileTs(SWal* pWal, bool* updateMeta) { + int32_t sz = taosArrayGetSize(pWal->fileInfoSet); + int32_t fileIdx = -1; + int32_t lastCloseTs = 0; + char fnameStr[WAL_FILE_LEN] = {0}; + + while (++fileIdx < sz - 1) { + SWalFileInfo* pFileInfo = taosArrayGet(pWal->fileInfoSet, fileIdx); + if (pFileInfo->closeTs != -1) { + lastCloseTs = pFileInfo->closeTs; + continue; + } + + walBuildLogName(pWal, pFileInfo->firstVer, fnameStr); + int32_t mtime = 0; + if (taosStatFile(fnameStr, NULL, &mtime) < 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + wError("vgId:%d, failed to stat file due to %s, file:%s", pWal->cfg.vgId, strerror(errno), fnameStr); + return -1; + } + + if (updateMeta != NULL) *updateMeta = true; + if (pFileInfo->createTs == -1) pFileInfo->createTs = lastCloseTs; + pFileInfo->closeTs = mtime; + lastCloseTs = pFileInfo->closeTs; + } + + return 0; +} + bool walLogEntriesComplete(const SWal* pWal) { int32_t sz = taosArrayGetSize(pWal->fileInfoSet); bool complete = true; @@ -433,15 +463,8 @@ int walCheckAndRepairMeta(SWal* pWal) { wError("failed to scan wal last ver since %s", terrstr()); return -1; } - // remove the empty wal log, and its idx - wInfo("vgId:%d, wal remove empty file %s", pWal->cfg.vgId, fnameStr); - taosRemoveFile(fnameStr); - walBuildIdxName(pWal, pFileInfo->firstVer, fnameStr); - wInfo("vgId:%d, wal remove empty file %s", pWal->cfg.vgId, fnameStr); - taosRemoveFile(fnameStr); - // remove its meta entry - taosArrayRemove(pWal->fileInfoSet, fileIdx); - continue; + // empty log file + lastVer = pFileInfo->firstVer - 1; } // update lastVer @@ -460,6 +483,11 @@ int walCheckAndRepairMeta(SWal* pWal) { } (void)walAlignVersions(pWal); + // repair ts of files + if (walRepairLogFileTs(pWal, &updateMeta) < 0) { + return -1; + } + // update meta file if (updateMeta) { (void)walSaveMeta(pWal); diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index 7ef7e010a5..bdf091022e 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -74,18 +74,17 @@ int32_t walNextValidMsg(SWalReader *pReader) { int64_t lastVer = walGetLastVer(pReader->pWal); int64_t committedVer = walGetCommittedVer(pReader->pWal); int64_t appliedVer = walGetAppliedVer(pReader->pWal); - while(appliedVer < committedVer){ // wait apply ver equal to commit ver, otherwise may lost data when consume data [TD-24010] - wDebug("vgId:%d, wal apply ver:%"PRId64" smaller than commit ver:%"PRId64", so sleep 1ms", pReader->pWal->cfg.vgId, appliedVer, committedVer); - taosMsleep(1); - appliedVer = walGetAppliedVer(pReader->pWal); + if(appliedVer < committedVer){ // wait apply ver equal to commit ver, otherwise may lost data when consume data [TD-24010] + wDebug("vgId:%d, wal apply ver:%"PRId64" smaller than commit ver:%"PRId64, pReader->pWal->cfg.vgId, appliedVer, committedVer); +// taosMsleep(10); } // int64_t endVer = pReader->cond.scanUncommited ? lastVer : committedVer; -// endVer = TMIN(appliedVer, endVer); + int64_t endVer = TMIN(appliedVer, committedVer); wDebug("vgId:%d, wal start to fetch, index:%" PRId64 ", last index:%" PRId64 " commit index:%" PRId64 - ", applied index:%" PRId64, - pReader->pWal->cfg.vgId, fetchVer, lastVer, committedVer, appliedVer); - while (fetchVer <= committedVer) { + ", applied index:%" PRId64", end index:%" PRId64, + pReader->pWal->cfg.vgId, fetchVer, lastVer, committedVer, appliedVer, endVer); + while (fetchVer <= endVer) { if (walFetchHeadNew(pReader, fetchVer) < 0) { return -1; } diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c index 848de4f36d..9b7b3dfd50 100644 --- a/source/libs/wal/src/walWrite.c +++ b/source/libs/wal/src/walWrite.c @@ -284,15 +284,15 @@ int32_t walEndSnapshot(SWal *pWal) { if (ver == -1) { code = -1; goto END; - }; + } pWal->vers.snapshotVer = ver; int ts = taosGetTimestampSec(); - ver = TMAX(ver - pWal->vers.logRetention, pWal->vers.firstVer - 1); + // compatible mode for refVer bool hasTopic = false; - int64_t refVer = ver; + int64_t refVer = INT64_MAX; void *pIter = NULL; while (1) { pIter = taosHashIterate(pWal->pRefHash, pIter); @@ -300,54 +300,40 @@ int32_t walEndSnapshot(SWal *pWal) { SWalRef *pRef = *(SWalRef **)pIter; if (pRef->refVer == -1) continue; refVer = TMIN(refVer, pRef->refVer - 1); - wDebug("vgId:%d, wal found ref %" PRId64 ", refId %" PRId64, pWal->cfg.vgId, pRef->refVer, pRef->refId); hasTopic = true; } - // compatible mode if (pWal->cfg.retentionPeriod == 0 && hasTopic) { + wInfo("vgId:%d, wal found refVer:%" PRId64 " in compatible mode, ver:%" PRId64, pWal->cfg.vgId, refVer, ver); ver = TMIN(ver, refVer); } + // find files safe to delete int deleteCnt = 0; int64_t newTotSize = pWal->totSize; - SWalFileInfo tmp; + SWalFileInfo tmp = {0}; tmp.firstVer = ver; - // find files safe to delete SWalFileInfo *pInfo = taosArraySearch(pWal->fileInfoSet, &tmp, compareWalFileInfo, TD_LE); + if (pInfo) { - SWalFileInfo *pLastFileInfo = taosArrayGetLast(pWal->fileInfoSet); - wDebug("vgId:%d, wal search found file info: first:%" PRId64 " last:%" PRId64, pWal->cfg.vgId, pInfo->firstVer, - pInfo->lastVer); - if (ver >= pInfo->lastVer) { + wDebug("vgId:%d, wal search found file info. ver:%" PRId64 ", first:%" PRId64 " last:%" PRId64, pWal->cfg.vgId, ver, + pInfo->firstVer, pInfo->lastVer); + ASSERT(ver <= pInfo->lastVer); + if (ver == pInfo->lastVer) { pInfo++; - wDebug("vgId:%d, wal remove advance one file: first:%" PRId64 " last:%" PRId64, pWal->cfg.vgId, pInfo->firstVer, - pInfo->lastVer); - } - if (pInfo <= pLastFileInfo) { - wDebug("vgId:%d, wal end remove for first:%" PRId64 " last:%" PRId64, pWal->cfg.vgId, pInfo->firstVer, - pInfo->lastVer); - } else { - wDebug("vgId:%d, wal no remove", pWal->cfg.vgId); } // iterate files, until the searched result + // delete according to file size or close time for (SWalFileInfo *iter = pWal->fileInfoSet->pData; iter < pInfo; iter++) { - wDebug("vgId:%d, wal check remove file %" PRId64 "(file size %" PRId64 " close ts %" PRId64 - "), new tot size %" PRId64, - pWal->cfg.vgId, iter->firstVer, iter->fileSize, iter->closeTs, newTotSize); - if ((pWal->cfg.retentionSize != -1 && pWal->cfg.retentionSize != 0 && newTotSize > pWal->cfg.retentionSize) || - ((pWal->cfg.retentionPeriod == 0) || (pWal->cfg.retentionPeriod != -1 && iter->closeTs != -1 && - iter->closeTs + pWal->cfg.retentionPeriod < ts))) { - // delete according to file size or close time - wDebug("vgId:%d, check pass", pWal->cfg.vgId); + if ((pWal->cfg.retentionSize > 0 && newTotSize > pWal->cfg.retentionSize) || + (pWal->cfg.retentionPeriod == 0 || + pWal->cfg.retentionPeriod > 0 && iter->closeTs >= 0 && iter->closeTs + pWal->cfg.retentionPeriod < ts)) { deleteCnt++; newTotSize -= iter->fileSize; taosArrayPush(pWal->toDeleteFiles, iter); } - wDebug("vgId:%d, check not pass", pWal->cfg.vgId); } - UPDATE_META: // make new array, remove files taosArrayPopFrontBatch(pWal->fileInfoSet, deleteCnt); if (taosArrayGetSize(pWal->fileInfoSet) == 0) { @@ -357,11 +343,12 @@ int32_t walEndSnapshot(SWal *pWal) { pWal->vers.firstVer = ((SWalFileInfo *)taosArrayGet(pWal->fileInfoSet, 0))->firstVer; } } + + // update meta pWal->writeCur = taosArrayGetSize(pWal->fileInfoSet) - 1; pWal->totSize = newTotSize; pWal->vers.verInSnapshotting = -1; - // save snapshot ver, commit ver code = walSaveMeta(pWal); if (code < 0) { goto END; @@ -369,23 +356,27 @@ int32_t walEndSnapshot(SWal *pWal) { // delete files deleteCnt = taosArrayGetSize(pWal->toDeleteFiles); - wDebug("vgId:%d, wal should delete %d files", pWal->cfg.vgId, deleteCnt); - char fnameStr[WAL_FILE_LEN]; + char fnameStr[WAL_FILE_LEN] = {0}; + pInfo = NULL; + for (int i = 0; i < deleteCnt; i++) { pInfo = taosArrayGet(pWal->toDeleteFiles, i); + walBuildLogName(pWal, pInfo->firstVer, fnameStr); - wDebug("vgId:%d, wal remove file %s", pWal->cfg.vgId, fnameStr); if (taosRemoveFile(fnameStr) < 0 && errno != ENOENT) { wError("vgId:%d, failed to remove log file %s due to %s", pWal->cfg.vgId, fnameStr, strerror(errno)); goto END; } walBuildIdxName(pWal, pInfo->firstVer, fnameStr); - wDebug("vgId:%d, wal remove file %s", pWal->cfg.vgId, fnameStr); if (taosRemoveFile(fnameStr) < 0 && errno != ENOENT) { wError("vgId:%d, failed to remove idx file %s due to %s", pWal->cfg.vgId, fnameStr, strerror(errno)); goto END; } } + if (pInfo != NULL) { + wInfo("vgId:%d, wal log files recycled. count:%d, until ver:%" PRId64 ", closeTs:%" PRId64, pWal->cfg.vgId, + deleteCnt, pInfo->lastVer, pInfo->closeTs); + } taosArrayClear(pWal->toDeleteFiles); END: diff --git a/tests/pytest/util/cluster.py b/tests/pytest/util/cluster.py index 2607cf63c2..a6e3530dc9 100644 --- a/tests/pytest/util/cluster.py +++ b/tests/pytest/util/cluster.py @@ -52,8 +52,9 @@ class ConfigureyCluster: dnode.addExtraCfg("secondEp", f"{hostname}:{startPort_sec}") # configure dnoe of independent mnodes - if num <= self.mnodeNums and self.mnodeNums != 0 and independentMnode == True : - dnode.addExtraCfg("supportVnodes", 1024) + if num <= self.mnodeNums and self.mnodeNums != 0 and independentMnode == "True" : + tdLog.info("set mnode supportVnodes 0") + dnode.addExtraCfg("supportVnodes", 0) # print(dnode) self.dnodes.append(dnode) return self.dnodes @@ -71,6 +72,7 @@ class ConfigureyCluster: tdSql.init(conn.cursor()) mnodeNums=int(mnodeNums) for i in range(2,mnodeNums+1): + tdLog.info("create mnode on dnode %d"%i) tdSql.execute(" create mnode on dnode %d;"%i) diff --git a/tests/script/tsim/alter/table.sim b/tests/script/tsim/alter/table.sim index dccfc7f5d6..5f45b446ca 100644 --- a/tests/script/tsim/alter/table.sim +++ b/tests/script/tsim/alter/table.sim @@ -657,6 +657,34 @@ if $data20 != null then return -1 endi +print =============== error for normal table +sql create table tb2023(ts timestamp, f int); +sql_error alter table tb2023 add column v varchar(16375); +sql_error alter table tb2023 add column v varchar(16385); +sql_error alter table tb2023 add column v varchar(33100); +sql alter table tb2023 add column v varchar(16374); +sql_error alter table tb2023 modify column v varchar(16375); +sql desc tb2023 +sql alter table tb2023 drop column v +sql_error alter table tb2023 add column v nchar(4094); +sql alter table tb2023 add column v nchar(4093); +sql_error alter table tb2023 modify column v nchar(4094); +sql desc tb2023 + +print =============== error for super table +sql create table stb2023(ts timestamp, f int) tags(t1 int); +sql_error alter table stb2023 add column v varchar(16375); +sql_error alter table stb2023 add column v varchar(16385); +sql_error alter table stb2023 add column v varchar(33100); +sql alter table stb2023 add column v varchar(16374); +sql_error alter table stb2023 modify column v varchar(16375); +sql desc stb2023 +sql alter table stb2023 drop column v +sql_error alter table stb2023 add column v nchar(4094); +sql alter table stb2023 add column v nchar(4093); +sql_error alter table stb2023 modify column v nchar(4094); +sql desc stb2023 + print ======= over sql drop database d1 sql select * from information_schema.ins_databases diff --git a/tests/script/tsim/parser/alter_column.sim b/tests/script/tsim/parser/alter_column.sim index c70a604c73..d569e47735 100644 --- a/tests/script/tsim/parser/alter_column.sim +++ b/tests/script/tsim/parser/alter_column.sim @@ -48,7 +48,7 @@ sql_error alter table tb modify column c2 binary(10); sql_error alter table tb modify column c2 binary(9); sql_error alter table tb modify column c2 binary(-9); sql_error alter table tb modify column c2 binary(0); -sql alter table tb modify column c2 binary(17000); +sql_error alter table tb modify column c2 binary(17000); sql_error alter table tb modify column c2 nchar(30); sql_error alter table tb modify column c3 double; sql_error alter table tb modify column c3 nchar(10); diff --git a/tests/script/tsim/query/unionall_as_table.sim b/tests/script/tsim/query/unionall_as_table.sim index dc3d2cbec4..4d8f990718 100644 --- a/tests/script/tsim/query/unionall_as_table.sim +++ b/tests/script/tsim/query/unionall_as_table.sim @@ -25,4 +25,21 @@ if $data05 != @0021001@ then return -1 endi +sql create table st (ts timestamp, f int) tags (t int); +sql insert into ct1 using st tags(1) values(now, 1)(now+1s, 2) +sql insert into ct2 using st tags(2) values(now+2s, 3)(now+3s, 4) +sql select count(*) from (select * from ct1 union all select * from ct2) +if $rows != 1 then + return -1 +endi +if $data00 != 4 then + return -1 +endi +sql select count(*) from (select * from ct1 union select * from ct2) +if $rows != 1 then + return -1 +endi +if $data00 != 4 then + return -1 +endi system sh/exec.sh -n dnode1 -s stop -x SIGINT diff --git a/tests/system-test/0-others/user_privilege.py b/tests/system-test/0-others/user_privilege.py index 6d49ebfbfe..d1b93f6942 100644 --- a/tests/system-test/0-others/user_privilege.py +++ b/tests/system-test/0-others/user_privilege.py @@ -29,6 +29,7 @@ class TDTestCase: self.stbname = 'stb' self.binary_length = 20 # the length of binary for column_dict self.nchar_length = 20 # the length of nchar for column_dict + self.dbnames = ['db1', 'db2'] self.column_dict = { 'ts': 'timestamp', 'col1': 'float', @@ -57,21 +58,25 @@ class TDTestCase: def create_user(self): user_name = 'test' tdSql.execute(f'create user {user_name} pass "test"') - tdSql.execute(f'grant read on db.stb with t2 = "Beijing" to {user_name}') + tdSql.execute(f'grant read on {self.dbnames[0]}.{self.stbname} with t2 = "Beijing" to {user_name}') + tdSql.execute(f'grant write on {self.dbnames[1]}.{self.stbname} with t1 = 2 to {user_name}') def prepare_data(self): - tdSql.execute(self.setsql.set_create_stable_sql(self.stbname, self.column_dict, self.tag_dict)) - for i in range(self.tbnum): - tdSql.execute(f'create table {self.stbname}_{i} using {self.stbname} tags({self.tag_list[i]})') - for j in self.values_list: - tdSql.execute(f'insert into {self.stbname}_{i} values({j})') + for db in self.dbnames: + tdSql.execute(f"create database {db}") + tdSql.execute(f"use {db}") + tdSql.execute(self.setsql.set_create_stable_sql(self.stbname, self.column_dict, self.tag_dict)) + for i in range(self.tbnum): + tdSql.execute(f'create table {self.stbname}_{i} using {self.stbname} tags({self.tag_list[i]})') + for j in self.values_list: + tdSql.execute(f'insert into {self.stbname}_{i} values({j})') - def user_privilege_check(self): + def user_read_privilege_check(self, dbname): testconn = taos.connect(user='test', password='test') expectErrNotOccured = False try: - sql = "select count(*) from db.stb where t2 = 'Beijing'" + sql = f"select count(*) from {dbname}.stb where t2 = 'Beijing'" res = testconn.query(sql) data = res.fetch_all() count = data[0][0] @@ -85,11 +90,30 @@ class TDTestCase: tdLog.exit(f"{sql}, expect result doesn't match") pass + def user_write_privilege_check(self, dbname): + testconn = taos.connect(user='test', password='test') + expectErrNotOccured = False + + try: + sql = f"insert into {dbname}.stb_1 values(now, 1.1, 200, 0.3)" + testconn.execute(sql) + except BaseException: + expectErrNotOccured = True + + if expectErrNotOccured: + caller = inspect.getframeinfo(inspect.stack()[1][0]) + tdLog.exit(f"{caller.filename}({caller.lineno}) failed: sql:{sql}, expect error not occured") + else: + pass + def user_privilege_error_check(self): testconn = taos.connect(user='test', password='test') expectErrNotOccured = False - sql_list = ["alter talbe db.stb_1 set t2 = 'Wuhan'", "drop table db.stb_1"] + sql_list = [f"alter talbe {self.dbnames[0]}.stb_1 set t2 = 'Wuhan'", + f"insert into {self.dbnames[0]}.stb_1 values(now, 1.1, 200, 0.3)", + f"drop table {self.dbnames[0]}.stb_1", + f"select count(*) from {self.dbnames[1]}.stb"] for sql in sql_list: try: @@ -104,11 +128,11 @@ class TDTestCase: tdLog.exit(f"{caller.filename}({caller.lineno}) failed: sql:{sql}, expect error not occured") pass - def run(self): - tdSql.prepare() + def run(self): self.prepare_data() self.create_user() - self.user_privilege_check() + self.user_read_privilege_check(self.dbnames[0]) + self.user_write_privilege_check(self.dbnames[1]) self.user_privilege_error_check() def stop(self): diff --git a/tests/system-test/6-cluster/clusterCommonCheck.py b/tests/system-test/6-cluster/clusterCommonCheck.py index 149c6d8ded..f5926321da 100644 --- a/tests/system-test/6-cluster/clusterCommonCheck.py +++ b/tests/system-test/6-cluster/clusterCommonCheck.py @@ -207,7 +207,7 @@ class ClusterComCheck: count+=1 else: tdLog.debug(tdSql.queryResult) - tdLog.exit("stop mnodes on dnode %d failed in 10s ") + tdLog.exit(f"stop mnodes on dnode {offlineDnodeNo} failed in 10s ") def check3mnode2off(self,mnodeNums=3): count=0 @@ -226,7 +226,45 @@ class ClusterComCheck: count+=1 else: tdLog.debug(tdSql.queryResult) - tdLog.exit("stop mnodes on dnode %d failed in 10s ") + tdLog.exit("stop mnodes on dnode 2 or 3 failed in 10s") + + def check_vgroups_status(self,vgroup_numbers=2,db_replica=3,count_number=10,db_name="db"): + """ check vgroups status in 10s after db vgroups status is changed """ + vgroup_numbers = int(vgroup_numbers) + self.db_replica = int(db_replica) + tdLog.debug("start to check status of vgroups") + count=0 + last_number=vgroup_numbers-1 + while count < count_number: + time.sleep(1) + tdSql.query(f"show {db_name}.vgroups;") + if count == 0 : + if tdSql.checkRows(vgroup_numbers) : + tdLog.success(f"{db_name} has {vgroup_numbers} vgroups" ) + else: + tdLog.exit(f"vgroup number of {db_name} is not correct") + if self.db_replica == 1 : + if tdSql.queryResult[0][4] == 'leader' and tdSql.queryResult[1][4] == 'leader' and tdSql.queryResult[last_number][4] == 'leader': + ready_time= (count + 1) + tdLog.success(f"all vgroups of {db_name} are leaders in {count + 1} s") + return True + count+=1 + elif self.db_replica == 3 : + vgroup_status_first=[tdSql.queryResult[0][4],tdSql.queryResult[0][6],tdSql.queryResult[0][8]] + + vgroup_status_last=[tdSql.queryResult[last_number][4],tdSql.queryResult[last_number][6],tdSql.queryResult[last_number][8]] + if vgroup_status_first.count('leader') == 1 and vgroup_status_first.count('follower') == 2: + if vgroup_status_last.count('leader') == 1 and vgroup_status_last.count('follower') == 2: + ready_time= (count + 1) + tdLog.success(f"all vgroups of {db_name} are ready in {ready_time} s") + return True + count+=1 + else: + tdLog.debug(tdSql.queryResult) + tdLog.notice(f"all vgroups leader of {db_name} is selected {count}s ") + caller = inspect.getframeinfo(inspect.stack()[1][0]) + args = (caller.filename, caller.lineno) + tdLog.exit("%s(%d) failed " % args) diff --git a/tests/system-test/6-cluster/manually-test/6dnode3mnodeInsertDataRebootModifyMetaAlterRep1to3.py b/tests/system-test/6-cluster/manually-test/6dnode3mnodeInsertDataRebootModifyMetaAlterRep1to3.py new file mode 100644 index 0000000000..7d46b3143d --- /dev/null +++ b/tests/system-test/6-cluster/manually-test/6dnode3mnodeInsertDataRebootModifyMetaAlterRep1to3.py @@ -0,0 +1,206 @@ +import taos +import sys +import time +import os + +from util.log import * +from util.sql import * +from util.cases import * +from util.dnodes import TDDnodes +from util.dnodes import TDDnode +from util.cluster import * +sys.path.append("./6-cluster") +from clusterCommonCreate import * +from clusterCommonCheck import clusterComCheck + +import time +import socket +import subprocess +from multiprocessing import Process +import threading +import time +import inspect +import ctypes + +class TDTestCase: + + def init(self, conn, logSql, replicaVar=1): + tdLog.debug(f"start to excute {__file__}") + self.TDDnodes = None + tdSql.init(conn.cursor()) + self.host = socket.gethostname() + + + def getBuildPath(self): + selfPath = os.path.dirname(os.path.realpath(__file__)) + + if ("community" in selfPath): + projPath = selfPath[:selfPath.find("community")] + else: + projPath = selfPath[:selfPath.find("tests")] + + for root, dirs, files in os.walk(projPath): + if ("taosd" in files): + rootRealPath = os.path.dirname(os.path.realpath(root)) + if ("packaging" not in rootRealPath): + buildPath = root[:len(root) - len("/build/bin")] + break + return buildPath + + def _async_raise(self, tid, exctype): + """raises the exception, performs cleanup if needed""" + if not inspect.isclass(exctype): + exctype = type(exctype) + res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(exctype)) + if res == 0: + raise ValueError("invalid thread id") + elif res != 1: + # """if it returns a number greater than one, you're in trouble, + # and you should call it again with exc=NULL to revert the effect""" + ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None) + raise SystemError("PyThreadState_SetAsyncExc failed") + + def stopThread(self,thread): + self._async_raise(thread.ident, SystemExit) + + + def fiveDnodeThreeMnode(self,dnodeNumbers,mnodeNums,restartNumbers,stopRole): + tdLog.printNoPrefix("======== test case 1: ") + paraDict = {'dbName': 'db0_0', + 'dropFlag': 1, + 'event': '', + 'vgroups': 4, + 'replica': 1, + 'stbName': 'stb', + 'stbNumbers': 2, + 'colPrefix': 'c', + 'tagPrefix': 't', + 'colSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}], + 'tagSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}], + 'ctbPrefix': 'ctb', + 'ctbNum': 200, + 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 + "rowsPerTbl": 1000, + "batchNum": 5000 + } + + dnodeNumbers=int(dnodeNumbers) + mnodeNums=int(mnodeNums) + vnodeNumbers = int(dnodeNumbers-mnodeNums) + allctbNumbers=(paraDict['stbNumbers']*paraDict["ctbNum"]) + rowsPerStb=paraDict["ctbNum"]*paraDict["rowsPerTbl"] + rowsall=rowsPerStb*paraDict['stbNumbers'] + dbNumbers = 1 + + tdLog.info("first check dnode and mnode") + tdSql.query("select * from information_schema.ins_dnodes;") + tdSql.checkData(0,1,'%s:6030'%self.host) + tdSql.checkData(4,1,'%s:6430'%self.host) + clusterComCheck.checkDnodes(dnodeNumbers) + + #check mnode status + tdLog.info("check mnode status") + clusterComCheck.checkMnodeStatus(mnodeNums) + + # add some error operations and + tdLog.info("Confirm the status of the dnode again") + tdSql.error("create mnode on dnode 2") + tdSql.query("select * from information_schema.ins_dnodes;") + print(tdSql.queryResult) + clusterComCheck.checkDnodes(dnodeNumbers) + + # create database and stable + clusterComCreate.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], paraDict["vgroups"],paraDict['replica']) + tdLog.info("Take turns stopping Mnodes ") + + tdDnodes=cluster.dnodes + stopcount =0 + threads=[] + + # create stable:stb_0 + stableName= paraDict['stbName'] + newTdSql=tdCom.newTdSql() + clusterComCreate.create_stables(newTdSql, paraDict["dbName"],stableName,paraDict['stbNumbers']) + #create child table:ctb_0 + for i in range(paraDict['stbNumbers']): + stableName= '%s_%d'%(paraDict['stbName'],i) + newTdSql=tdCom.newTdSql() + clusterComCreate.create_ctable(newTdSql, paraDict["dbName"],stableName,stableName, paraDict['ctbNum']) + #insert date + for i in range(paraDict['stbNumbers']): + stableName= '%s_%d'%(paraDict['stbName'],i) + newTdSql=tdCom.newTdSql() + threads.append(threading.Thread(target=clusterComCreate.insert_data, args=(newTdSql, paraDict["dbName"],stableName,paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"],paraDict["startTs"]))) + for tr in threads: + tr.start() + for tr in threads: + tr.join() + + while stopcount < restartNumbers: + tdLog.info(" restart loop: %d"%stopcount ) + if stopRole == "mnode": + for i in range(mnodeNums): + tdDnodes[i].stoptaosd() + # sleep(10) + tdDnodes[i].starttaosd() + # sleep(10) + elif stopRole == "vnode": + for i in range(vnodeNumbers): + tdDnodes[i+mnodeNums].stoptaosd() + # sleep(10) + tdDnodes[i+mnodeNums].starttaosd() + # sleep(10) + elif stopRole == "dnode": + for i in range(dnodeNumbers): + if i == 0 : + stableName= '%s_%d'%(paraDict['stbName'],0) + newTdSql=tdCom.newTdSql() + # newTdSql.execute('alter database db0_0 replica 3') + clusterComCreate.alterStbMetaData(newTdSql, paraDict["dbName"],stableName,paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"]) + tdDnodes[i].stoptaosd() + clusterComCheck.checkDbRows(dbNumbers) + # sleep(10) + tdDnodes[i].starttaosd() + if i == 3 : + TdSqlEx=tdCom.newTdSql() + tdLog.info("alter database db0_0 replica 3") + TdSqlEx.execute('alter database db0_0 replica 3') + + + # dnodeNumbers don't include database of schema + if clusterComCheck.checkDnodes(dnodeNumbers): + tdLog.info("123") + else: + print("456") + + self.stopThread(threads) + tdLog.exit("one or more of dnodes failed to start ") + # self.check3mnode() + stopcount+=1 + + + clusterComCheck.checkDnodes(dnodeNumbers) + clusterComCheck.checkDbRows(dbNumbers) + # clusterComCheck.checkDb(dbNumbers,1,paraDict["dbName"]) + + # tdSql.execute("use %s" %(paraDict["dbName"])) + tdSql.query("show %s.stables"%(paraDict["dbName"])) + tdSql.checkRows(paraDict["stbNumbers"]) + for i in range(paraDict['stbNumbers']): + stableName= '%s.%s_%d'%(paraDict["dbName"],paraDict['stbName'],i) + tdSql.query("select count(*) from %s"%stableName) + if i == 0 : + tdSql.checkData(0,0,rowsPerStb*2) + else: + tdSql.checkData(0,0,rowsPerStb) + clusterComCheck.check_vgroups_status(vgroup_numbers=paraDict["vgroups"],db_replica=3,db_name=paraDict["dbName"],count_number=150) + def run(self): + # print(self.master_dnode.cfgDict) + self.fiveDnodeThreeMnode(dnodeNumbers=6,mnodeNums=3,restartNumbers=1,stopRole='dnode') + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) diff --git a/tests/system-test/6-cluster/manually-test/6dnode3mnodeInsertDataRebootModifyMetaAlterRep3to1.py b/tests/system-test/6-cluster/manually-test/6dnode3mnodeInsertDataRebootModifyMetaAlterRep3to1.py new file mode 100644 index 0000000000..5b5fb04969 --- /dev/null +++ b/tests/system-test/6-cluster/manually-test/6dnode3mnodeInsertDataRebootModifyMetaAlterRep3to1.py @@ -0,0 +1,206 @@ +import taos +import sys +import time +import os + +from util.log import * +from util.sql import * +from util.cases import * +from util.dnodes import TDDnodes +from util.dnodes import TDDnode +from util.cluster import * +sys.path.append("./6-cluster") +from clusterCommonCreate import * +from clusterCommonCheck import clusterComCheck + +import time +import socket +import subprocess +from multiprocessing import Process +import threading +import time +import inspect +import ctypes + +class TDTestCase: + + def init(self, conn, logSql, replicaVar=1): + tdLog.debug(f"start to excute {__file__}") + self.TDDnodes = None + tdSql.init(conn.cursor()) + self.host = socket.gethostname() + + + def getBuildPath(self): + selfPath = os.path.dirname(os.path.realpath(__file__)) + + if ("community" in selfPath): + projPath = selfPath[:selfPath.find("community")] + else: + projPath = selfPath[:selfPath.find("tests")] + + for root, dirs, files in os.walk(projPath): + if ("taosd" in files): + rootRealPath = os.path.dirname(os.path.realpath(root)) + if ("packaging" not in rootRealPath): + buildPath = root[:len(root) - len("/build/bin")] + break + return buildPath + + def _async_raise(self, tid, exctype): + """raises the exception, performs cleanup if needed""" + if not inspect.isclass(exctype): + exctype = type(exctype) + res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(exctype)) + if res == 0: + raise ValueError("invalid thread id") + elif res != 1: + # """if it returns a number greater than one, you're in trouble, + # and you should call it again with exc=NULL to revert the effect""" + ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None) + raise SystemError("PyThreadState_SetAsyncExc failed") + + def stopThread(self,thread): + self._async_raise(thread.ident, SystemExit) + + + def fiveDnodeThreeMnode(self,dnodeNumbers,mnodeNums,restartNumbers,stopRole): + tdLog.printNoPrefix("======== test case 1: ") + paraDict = {'dbName': 'db0_0', + 'dropFlag': 1, + 'event': '', + 'vgroups': 4, + 'replica': 3, + 'stbName': 'stb', + 'stbNumbers': 2, + 'colPrefix': 'c', + 'tagPrefix': 't', + 'colSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}], + 'tagSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}], + 'ctbPrefix': 'ctb', + 'ctbNum': 200, + 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 + "rowsPerTbl": 1000, + "batchNum": 5000 + } + + dnodeNumbers=int(dnodeNumbers) + mnodeNums=int(mnodeNums) + vnodeNumbers = int(dnodeNumbers-mnodeNums) + allctbNumbers=(paraDict['stbNumbers']*paraDict["ctbNum"]) + rowsPerStb=paraDict["ctbNum"]*paraDict["rowsPerTbl"] + rowsall=rowsPerStb*paraDict['stbNumbers'] + dbNumbers = 1 + + tdLog.info("first check dnode and mnode") + tdSql.query("select * from information_schema.ins_dnodes;") + tdSql.checkData(0,1,'%s:6030'%self.host) + tdSql.checkData(4,1,'%s:6430'%self.host) + clusterComCheck.checkDnodes(dnodeNumbers) + + #check mnode status + tdLog.info("check mnode status") + clusterComCheck.checkMnodeStatus(mnodeNums) + + # add some error operations and + tdLog.info("Confirm the status of the dnode again") + tdSql.error("create mnode on dnode 2") + tdSql.query("select * from information_schema.ins_dnodes;") + print(tdSql.queryResult) + clusterComCheck.checkDnodes(dnodeNumbers) + + # create database and stable + clusterComCreate.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], paraDict["vgroups"],paraDict['replica']) + tdLog.info("Take turns stopping Mnodes ") + + tdDnodes=cluster.dnodes + stopcount =0 + threads=[] + + # create stable:stb_0 + stableName= paraDict['stbName'] + newTdSql=tdCom.newTdSql() + clusterComCreate.create_stables(newTdSql, paraDict["dbName"],stableName,paraDict['stbNumbers']) + #create child table:ctb_0 + for i in range(paraDict['stbNumbers']): + stableName= '%s_%d'%(paraDict['stbName'],i) + newTdSql=tdCom.newTdSql() + clusterComCreate.create_ctable(newTdSql, paraDict["dbName"],stableName,stableName, paraDict['ctbNum']) + #insert date + for i in range(paraDict['stbNumbers']): + stableName= '%s_%d'%(paraDict['stbName'],i) + newTdSql=tdCom.newTdSql() + threads.append(threading.Thread(target=clusterComCreate.insert_data, args=(newTdSql, paraDict["dbName"],stableName,paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"],paraDict["startTs"]))) + for tr in threads: + tr.start() + for tr in threads: + tr.join() + + while stopcount < restartNumbers: + tdLog.info(" restart loop: %d"%stopcount ) + if stopRole == "mnode": + for i in range(mnodeNums): + tdDnodes[i].stoptaosd() + # sleep(10) + tdDnodes[i].starttaosd() + # sleep(10) + elif stopRole == "vnode": + for i in range(vnodeNumbers): + tdDnodes[i+mnodeNums].stoptaosd() + # sleep(10) + tdDnodes[i+mnodeNums].starttaosd() + # sleep(10) + elif stopRole == "dnode": + for i in range(dnodeNumbers): + tdDnodes[i].stoptaosd() + clusterComCheck.checkDbRows(dbNumbers) + if i == 0 : + stableName= '%s_%d'%(paraDict['stbName'],0) + newTdSql=tdCom.newTdSql() + # newTdSql.execute('alter database db0_0 replica 3') + clusterComCreate.alterStbMetaData(newTdSql, paraDict["dbName"],stableName,paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"]) + # sleep(10) + tdDnodes[i].starttaosd() + if i == 3 : + TdSqlEx=tdCom.newTdSql() + tdLog.info("alter database db0_0 replica 1") + TdSqlEx.execute('alter database db0_0 replica 1') + + + # dnodeNumbers don't include database of schema + if clusterComCheck.checkDnodes(dnodeNumbers): + tdLog.info("123") + else: + print("456") + + self.stopThread(threads) + tdLog.exit("one or more of dnodes failed to start ") + # self.check3mnode() + stopcount+=1 + + + clusterComCheck.checkDnodes(dnodeNumbers) + clusterComCheck.checkDbRows(dbNumbers) + # clusterComCheck.checkDb(dbNumbers,1,paraDict["dbName"]) + + # tdSql.execute("use %s" %(paraDict["dbName"])) + tdSql.query("show %s.stables"%(paraDict["dbName"])) + tdSql.checkRows(paraDict["stbNumbers"]) + for i in range(paraDict['stbNumbers']): + stableName= '%s.%s_%d'%(paraDict["dbName"],paraDict['stbName'],i) + tdSql.query("select count(*) from %s"%stableName) + if i == 0 : + tdSql.checkData(0,0,rowsPerStb*2) + else: + tdSql.checkData(0,0,rowsPerStb) + clusterComCheck.check_vgroups_status(vgroup_numbers=paraDict["vgroups"],db_replica=1,db_name=paraDict["dbName"],count_number=150) + def run(self): + # print(self.master_dnode.cfgDict) + self.fiveDnodeThreeMnode(dnodeNumbers=6,mnodeNums=3,restartNumbers=1,stopRole='dnode') + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) diff --git a/tests/system-test/6-cluster/manually-test/6dnode3mnodeInsertDatarRebootAlterRep1-3.py b/tests/system-test/6-cluster/manually-test/6dnode3mnodeInsertDatarRebootAlterRep1-3.py new file mode 100644 index 0000000000..aa3ed8e3fd --- /dev/null +++ b/tests/system-test/6-cluster/manually-test/6dnode3mnodeInsertDatarRebootAlterRep1-3.py @@ -0,0 +1,222 @@ +import taos +import sys +import time +import os + +from util.log import * +from util.sql import * +from util.cases import * +from util.dnodes import TDDnodes +from util.dnodes import TDDnode +from util.cluster import * +sys.path.append("./6-cluster") +from clusterCommonCreate import * +from clusterCommonCheck import clusterComCheck + +import time +import socket +import subprocess +from multiprocessing import Process +import threading +import time +import inspect +import ctypes + +class TDTestCase: + + def init(self, conn, logSql, replicaVar=1): + tdLog.debug(f"start to excute {__file__}") + self.TDDnodes = None + tdSql.init(conn.cursor()) + self.host = socket.gethostname() + + + def getBuildPath(self): + selfPath = os.path.dirname(os.path.realpath(__file__)) + + if ("community" in selfPath): + projPath = selfPath[:selfPath.find("community")] + else: + projPath = selfPath[:selfPath.find("tests")] + + for root, dirs, files in os.walk(projPath): + if ("taosd" in files): + rootRealPath = os.path.dirname(os.path.realpath(root)) + if ("packaging" not in rootRealPath): + buildPath = root[:len(root) - len("/build/bin")] + break + return buildPath + + def _async_raise(self, tid, exctype): + """raises the exception, performs cleanup if needed""" + if not inspect.isclass(exctype): + exctype = type(exctype) + res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(exctype)) + if res == 0: + raise ValueError("invalid thread id") + elif res != 1: + # """if it returns a number greater than one, you're in trouble, + # and you should call it again with exc=NULL to revert the effect""" + ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None) + raise SystemError("PyThreadState_SetAsyncExc failed") + + def stopThread(self,thread): + self._async_raise(thread.ident, SystemExit) + + + def insertData(self,countstart,countstop): + # fisrt add data : db\stable\childtable\general table + + for couti in range(countstart,countstop): + tdLog.debug("drop database if exists db%d" %couti) + tdSql.execute("drop database if exists db%d" %couti) + print("create database if not exists db%d replica 1 duration 300" %couti) + tdSql.execute("create database if not exists db%d replica 1 duration 300" %couti) + tdSql.execute("use db%d" %couti) + tdSql.execute( + '''create table stb1 + (ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp) + tags (t1 int) + ''' + ) + tdSql.execute( + ''' + create table t1 + (ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp) + ''' + ) + for i in range(4): + tdSql.execute(f'create table ct{i+1} using stb1 tags ( {i+1} )') + + + def fiveDnodeThreeMnode(self,dnodeNumbers,mnodeNums,restartNumbers,stopRole): + tdLog.printNoPrefix("======== test case 1: ") + paraDict = {'dbName': 'db0_0', + 'dropFlag': 1, + 'event': '', + 'vgroups': 4, + 'replica': 1, + 'stbName': 'stb', + 'stbNumbers': 2, + 'colPrefix': 'c', + 'tagPrefix': 't', + 'colSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}], + 'tagSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}], + 'ctbPrefix': 'ctb', + 'ctbNum': 1000, + 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 + "rowsPerTbl": 100, + "batchNum": 5000 + } + + dnodeNumbers = int(dnodeNumbers) + mnodeNums = int(mnodeNums) + vnodeNumbers = int(dnodeNumbers-mnodeNums) + allctbNumbers = (paraDict['stbNumbers']*paraDict["ctbNum"]) + rowsPerStb = paraDict["ctbNum"]*paraDict["rowsPerTbl"] + rowsall = rowsPerStb*paraDict['stbNumbers'] + dbNumbers = 1 + replica3 = 3 + tdLog.info("first check dnode and mnode") + tdSql.query("select * from information_schema.ins_dnodes;") + tdSql.checkData(0,1,'%s:6030'%self.host) + tdSql.checkData(4,1,'%s:6430'%self.host) + clusterComCheck.checkDnodes(dnodeNumbers) + + #check mnode status + tdLog.info("check mnode status") + clusterComCheck.checkMnodeStatus(mnodeNums) + + # add some error operations and + tdLog.info("Confirm the status of the dnode again") + tdSql.error("create mnode on dnode 2") + tdSql.query("select * from information_schema.ins_dnodes;") + print(tdSql.queryResult) + clusterComCheck.checkDnodes(dnodeNumbers) + + # create database and stable + clusterComCreate.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], paraDict["vgroups"],paraDict['replica']) + tdLog.info("Take turns stopping Mnodes ") + + tdDnodes=cluster.dnodes + stopcount =0 + threads=[] + + # create stable:stb_0 + stableName= paraDict['stbName'] + newTdSql=tdCom.newTdSql() + clusterComCreate.create_stables(newTdSql, paraDict["dbName"],stableName,paraDict['stbNumbers']) + #create child table:ctb_0 + for i in range(paraDict['stbNumbers']): + stableName= '%s_%d'%(paraDict['stbName'],i) + newTdSql=tdCom.newTdSql() + clusterComCreate.create_ctable(newTdSql, paraDict["dbName"],stableName,stableName, paraDict['ctbNum']) + #insert date + for i in range(paraDict['stbNumbers']): + stableName= '%s_%d'%(paraDict['stbName'],i) + newTdSql=tdCom.newTdSql() + threads.append(threading.Thread(target=clusterComCreate.insert_data, args=(newTdSql, paraDict["dbName"],stableName,paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"],paraDict["startTs"]))) + for tr in threads: + tr.start() + TdSqlEx=tdCom.newTdSql() + tdLog.info("alter database db0_0 replica 3") + TdSqlEx.execute('alter database db0_0 replica 3') + while stopcount < restartNumbers: + tdLog.info(" restart loop: %d"%stopcount ) + if stopRole == "mnode": + for i in range(mnodeNums): + tdDnodes[i].stoptaosd() + # sleep(10) + tdDnodes[i].starttaosd() + # sleep(10) + elif stopRole == "vnode": + for i in range(vnodeNumbers): + tdDnodes[i+mnodeNums].stoptaosd() + # sleep(10) + tdDnodes[i+mnodeNums].starttaosd() + # sleep(10) + elif stopRole == "dnode": + for i in range(dnodeNumbers): + tdDnodes[i].stoptaosd() + # tdLog.info('select cast(c2 as nchar(10)) from db0_0.stb_1;') + # TdSqlEx.execute('select cast(c2 as nchar(10)) from db0_0.stb_1;') + # tdLog.info('select avg(c1) from db0_0.stb_0 interval(10s);') + # TdSqlEx.execute('select avg(c1) from db0_0.stb_0 interval(10s);') + # sleep(10) + tdDnodes[i].starttaosd() + # sleep(10) + # dnodeNumbers don't include database of schema + if clusterComCheck.checkDnodes(dnodeNumbers): + tdLog.info("123") + else: + print("456") + + self.stopThread(threads) + tdLog.exit("one or more of dnodes failed to start ") + # self.check3mnode() + stopcount+=1 + + for tr in threads: + tr.join() + clusterComCheck.checkDnodes(dnodeNumbers) + clusterComCheck.checkDbRows(dbNumbers) + # clusterComCheck.checkDb(dbNumbers,1,paraDict["dbName"]) + + # tdSql.execute("use %s" %(paraDict["dbName"])) + tdSql.query("show %s.stables"%(paraDict["dbName"])) + tdSql.checkRows(paraDict["stbNumbers"]) + # for i in range(paraDict['stbNumbers']): + # stableName= '%s.%s_%d'%(paraDict["dbName"],paraDict['stbName'],i) + # tdSql.query("select count(*) from %s"%stableName) + # tdSql.checkData(0,0,rowsPerStb) + clusterComCheck.check_vgroups_status(vgroup_numbers=paraDict["vgroups"],db_replica=replica3,db_name=paraDict["dbName"],count_number=240) + def run(self): + # print(self.master_dnode.cfgDict) + self.fiveDnodeThreeMnode(dnodeNumbers=6,mnodeNums=3,restartNumbers=4,stopRole='dnode') + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) diff --git a/tests/system-test/6-cluster/manually-test/6dnode3mnodeInsertLessDataAlterRep3to1to3.py b/tests/system-test/6-cluster/manually-test/6dnode3mnodeInsertLessDataAlterRep3to1to3.py new file mode 100644 index 0000000000..ed7b99a880 --- /dev/null +++ b/tests/system-test/6-cluster/manually-test/6dnode3mnodeInsertLessDataAlterRep3to1to3.py @@ -0,0 +1,196 @@ +import taos +import sys +import time +import os + +from util.log import * +from util.sql import * +from util.cases import * +from util.dnodes import TDDnodes +from util.dnodes import TDDnode +from util.cluster import * +sys.path.append("./6-cluster") +from clusterCommonCreate import * +from clusterCommonCheck import clusterComCheck + +import time +import socket +import subprocess +from multiprocessing import Process +import threading +import time +import inspect +import ctypes + +class TDTestCase: + + def init(self, conn, logSql, replicaVar=1): + tdLog.debug(f"start to excute {__file__}") + self.TDDnodes = None + tdSql.init(conn.cursor()) + self.host = socket.gethostname() + + + def getBuildPath(self): + selfPath = os.path.dirname(os.path.realpath(__file__)) + + if ("community" in selfPath): + projPath = selfPath[:selfPath.find("community")] + else: + projPath = selfPath[:selfPath.find("tests")] + + for root, dirs, files in os.walk(projPath): + if ("taosd" in files): + rootRealPath = os.path.dirname(os.path.realpath(root)) + if ("packaging" not in rootRealPath): + buildPath = root[:len(root) - len("/build/bin")] + break + return buildPath + + def _async_raise(self, tid, exctype): + """raises the exception, performs cleanup if needed""" + if not inspect.isclass(exctype): + exctype = type(exctype) + res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(exctype)) + if res == 0: + raise ValueError("invalid thread id") + elif res != 1: + # """if it returns a number greater than one, you're in trouble, + # and you should call it again with exc=NULL to revert the effect""" + ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None) + raise SystemError("PyThreadState_SetAsyncExc failed") + + def stopThread(self,thread): + self._async_raise(thread.ident, SystemExit) + + + def insertData(self,countstart,countstop): + # fisrt add data : db\stable\childtable\general table + + for couti in range(countstart,countstop): + tdLog.debug("drop database if exists db%d" %couti) + tdSql.execute("drop database if exists db%d" %couti) + print("create database if not exists db%d replica 1 duration 300" %couti) + tdSql.execute("create database if not exists db%d replica 1 duration 300" %couti) + tdSql.execute("use db%d" %couti) + tdSql.execute( + '''create table stb1 + (ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp) + tags (t1 int) + ''' + ) + tdSql.execute( + ''' + create table t1 + (ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp) + ''' + ) + for i in range(4): + tdSql.execute(f'create table ct{i+1} using stb1 tags ( {i+1} )') + + + def fiveDnodeThreeMnode(self,dnodeNumbers,mnodeNums,restartNumbers,stopRole): + tdLog.printNoPrefix("======== test case 1: ") + paraDict = {'dbName': 'db0_0', + 'dropFlag': 1, + 'event': '', + 'vgroups': 4, + 'replica': 3, + 'stbName': 'stb', + 'stbNumbers': 2, + 'colPrefix': 'c', + 'tagPrefix': 't', + 'colSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}], + 'tagSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}], + 'ctbPrefix': 'ctb', + 'ctbNum': 1, + 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 + "rowsPerTbl": 1, + "batchNum": 5000 + } + + dnodeNumbers=int(dnodeNumbers) + mnodeNums=int(mnodeNums) + vnodeNumbers = int(dnodeNumbers-mnodeNums) + replica1 = 1 + replica3 = 3 + allctbNumbers=(paraDict['stbNumbers']*paraDict["ctbNum"]) + rowsPerStb=paraDict["ctbNum"]*paraDict["rowsPerTbl"] + rowsall=rowsPerStb*paraDict['stbNumbers'] + dbNumbers = 1 + + tdLog.info("first check dnode and mnode") + tdSql.query("select * from information_schema.ins_dnodes;") + tdSql.checkData(0,1,'%s:6030'%self.host) + tdSql.checkData(4,1,'%s:6430'%self.host) + clusterComCheck.checkDnodes(dnodeNumbers) + + #check mnode status + tdLog.info("check mnode status") + clusterComCheck.checkMnodeStatus(mnodeNums) + + # add some error operations and + tdLog.info("Confirm the status of the dnode again") + tdSql.error("create mnode on dnode 2") + tdSql.query("select * from information_schema.ins_dnodes;") + print(tdSql.queryResult) + clusterComCheck.checkDnodes(dnodeNumbers) + + # create database and stable + clusterComCreate.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], paraDict["vgroups"],paraDict['replica']) + tdLog.info("Take turns stopping Mnodes ") + + tdDnodes=cluster.dnodes + stopcount =0 + threads=[] + + # create stable:stb_0 + stableName= paraDict['stbName'] + newTdSql=tdCom.newTdSql() + clusterComCreate.create_stables(newTdSql, paraDict["dbName"],stableName,paraDict['stbNumbers']) + #create child table:ctb_0 + for i in range(paraDict['stbNumbers']): + stableName= '%s_%d'%(paraDict['stbName'],i) + newTdSql=tdCom.newTdSql() + clusterComCreate.create_ctable(newTdSql, paraDict["dbName"],stableName,stableName, paraDict['ctbNum']) + #insert date + for i in range(paraDict['stbNumbers']): + stableName= '%s_%d'%(paraDict['stbName'],i) + newTdSql=tdCom.newTdSql() + threads.append(threading.Thread(target=clusterComCreate.insert_data, args=(newTdSql, paraDict["dbName"],stableName,paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"],paraDict["startTs"]))) + for tr in threads: + tr.start() + TdSqlEx=tdCom.newTdSql() + tdLog.info(f"alter database db0_0 replica {replica1}") + TdSqlEx.execute(f'alter database db0_0 replica {replica1}') + for tr in threads: + tr.join() + clusterComCheck.checkDnodes(dnodeNumbers) + clusterComCheck.checkDbRows(dbNumbers) + # clusterComCheck.checkDb(dbNumbers,1,paraDict["dbName"]) + + # tdSql.execute("use %s" %(paraDict["dbName"])) + tdSql.query("show %s.stables"%(paraDict["dbName"])) + tdSql.checkRows(paraDict["stbNumbers"]) + for i in range(paraDict['stbNumbers']): + stableName= '%s.%s_%d'%(paraDict["dbName"],paraDict['stbName'],i) + tdSql.query("select count(*) from %s"%stableName) + tdSql.checkData(0,0,rowsPerStb) + + clusterComCheck.check_vgroups_status(vgroup_numbers=paraDict["vgroups"],db_replica=replica1,db_name=paraDict["dbName"],count_number=20) + sleep(5) + tdLog.info(f"show transactions;alter database db0_0 replica {replica3};") + TdSqlEx.execute(f'show transactions;') + TdSqlEx.execute(f'alter database db0_0 replica {replica3};') + clusterComCheck.check_vgroups_status(vgroup_numbers=paraDict["vgroups"],db_replica=replica3,db_name=paraDict["dbName"],count_number=120) + + def run(self): + # print(self.master_dnode.cfgDict) + self.fiveDnodeThreeMnode(dnodeNumbers=6,mnodeNums=3,restartNumbers=4,stopRole='dnode') + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) diff --git a/tests/system-test/6-cluster/manually-test/6dnode3mnodeStopDnodeInsertDatatb.py b/tests/system-test/6-cluster/manually-test/6dnode3mnodeStopDnodeInsertDatatb.py new file mode 100644 index 0000000000..e02af29a05 --- /dev/null +++ b/tests/system-test/6-cluster/manually-test/6dnode3mnodeStopDnodeInsertDatatb.py @@ -0,0 +1,191 @@ +import taos +import sys +import time +import os + +from util.log import * +from util.sql import * +from util.cases import * +from util.dnodes import TDDnodes +from util.dnodes import TDDnode +from util.cluster import * +sys.path.append("./6-cluster") +from clusterCommonCreate import * +from clusterCommonCheck import clusterComCheck + +import time +import socket +import subprocess +from multiprocessing import Process +import threading +import time +import inspect +import ctypes + +class TDTestCase: + + def init(self, conn, logSql, replicaVar=1): + tdLog.debug(f"start to excute {__file__}") + self.TDDnodes = None + tdSql.init(conn.cursor()) + self.host = socket.gethostname() + + + def getBuildPath(self): + selfPath = os.path.dirname(os.path.realpath(__file__)) + + if ("community" in selfPath): + projPath = selfPath[:selfPath.find("community")] + else: + projPath = selfPath[:selfPath.find("tests")] + + for root, dirs, files in os.walk(projPath): + if ("taosd" in files): + rootRealPath = os.path.dirname(os.path.realpath(root)) + if ("packaging" not in rootRealPath): + buildPath = root[:len(root) - len("/build/bin")] + break + return buildPath + + def _async_raise(self, tid, exctype): + """raises the exception, performs cleanup if needed""" + if not inspect.isclass(exctype): + exctype = type(exctype) + res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(exctype)) + if res == 0: + raise ValueError("invalid thread id") + elif res != 1: + # """if it returns a number greater than one, you're in trouble, + # and you should call it again with exc=NULL to revert the effect""" + ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None) + raise SystemError("PyThreadState_SetAsyncExc failed") + + def stopThread(self,thread): + self._async_raise(thread.ident, SystemExit) + + + def insertData(self,dbname,tableCount,rowsPerCount): + # tableCount : create table number + # rowsPerCount : rows per table + # fisrt add data : db\stable\childtable\general table + os.system(f"taosBenchmark -d {dbname} -n {tableCount} -t {rowsPerCount} -z 1 -k 10000 -y ") + + + def fiveDnodeThreeMnode(self,dnodeNumbers,mnodeNums,restartNumbers,stopRole): + tdLog.printNoPrefix("======== test case 1: ") + paraDict = {'dbName': 'db0_0', + 'dropFlag': 1, + 'event': '', + 'vgroups': 4, + 'replica': 1, + 'stbName': 'stb', + 'stbNumbers': 2, + 'colPrefix': 'c', + 'tagPrefix': 't', + 'colSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}], + 'tagSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}], + 'ctbPrefix': 'ctb', + 'ctbNum': 10000, + 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 + "rowsPerTbl": 10000, + "batchNum": 5000 + } + + dnodeNumbers=int(dnodeNumbers) + mnodeNums=int(mnodeNums) + vnodeNumbers = int(dnodeNumbers-mnodeNums) + allctbNumbers=(paraDict['stbNumbers']*paraDict["ctbNum"]) + rowsPerStb=paraDict["ctbNum"]*paraDict["rowsPerTbl"] + rowsall=rowsPerStb*paraDict['stbNumbers'] + dbNumbers = 1 + + tdLog.info("first check dnode and mnode") + tdSql.query("select * from information_schema.ins_dnodes;") + tdSql.checkData(0,1,'%s:6030'%self.host) + tdSql.checkData(4,1,'%s:6430'%self.host) + clusterComCheck.checkDnodes(dnodeNumbers) + + #check mnode status + tdLog.info("check mnode status") + clusterComCheck.checkMnodeStatus(mnodeNums) + + # add some error operations and + tdLog.info("Confirm the status of the dnode again") + tdSql.error("create mnode on dnode 2") + tdSql.query("select * from information_schema.ins_dnodes;") + print(tdSql.queryResult) + clusterComCheck.checkDnodes(dnodeNumbers) + + # create database and stable + tdLog.info("Take turns stopping Mnodes ") + + tdDnodes=cluster.dnodes + stopcount =0 + threads=[] + + # create stable:stb_0 + threads.append(threading.Thread(target=self.insertData, args=(paraDict["dbName"],paraDict["ctbNum"],paraDict["rowsPerTbl"]))) + for tr in threads: + tr.start() + TdSqlEx=tdCom.newTdSql() + tdLog.info("alter database db0_0 replica 3") + TdSqlEx.execute('alter database db0_0 replica 3') + while stopcount < restartNumbers: + tdLog.info(" restart loop: %d"%stopcount ) + if stopRole == "mnode": + for i in range(mnodeNums): + tdDnodes[i].stoptaosd() + # sleep(10) + tdDnodes[i].starttaosd() + # sleep(10) + elif stopRole == "vnode": + for i in range(vnodeNumbers): + tdDnodes[i+mnodeNums].stoptaosd() + # sleep(10) + tdDnodes[i+mnodeNums].starttaosd() + # sleep(10) + elif stopRole == "dnode": + for i in range(dnodeNumbers): + tdDnodes[i].stoptaosd() + # tdLog.info('select cast(c2 as nchar(10)) from db0_0.stb_1;') + # TdSqlEx.execute('select cast(c2 as nchar(10)) from db0_0.stb_1;') + # tdLog.info('select avg(c1) from db0_0.stb_0 interval(10s);') + # TdSqlEx.execute('select avg(c1) from db0_0.stb_0 interval(10s);') + # sleep(10) + tdDnodes[i].starttaosd() + # sleep(10) + # dnodeNumbers don't include database of schema + if clusterComCheck.checkDnodes(dnodeNumbers): + tdLog.info("123") + else: + print("456") + + self.stopThread(threads) + tdLog.exit("one or more of dnodes failed to start ") + # self.check3mnode() + stopcount+=1 + + for tr in threads: + tr.join() + clusterComCheck.checkDnodes(dnodeNumbers) + clusterComCheck.checkDbRows(dbNumbers) + # clusterComCheck.checkDb(dbNumbers,1,paraDict["dbName"]) + + # tdSql.execute("use %s" %(paraDict["dbName"])) + tdSql.query("show %s.stables"%(paraDict["dbName"])) + tdSql.checkRows(paraDict["stbNumbers"]) + for i in range(paraDict['stbNumbers']): + stableName= '%s.%s_%d'%(paraDict["dbName"],paraDict['stbName'],i) + tdSql.query("select count(*) from %s"%stableName) + tdSql.checkData(0,0,rowsPerStb) + clusterComCheck.check_vgroups_status(vgroup_numbers=paraDict["vgroups"],db_replica=3,db_name=paraDict["dbName"],count_number=240) + def run(self): + # print(self.master_dnode.cfgDict) + self.fiveDnodeThreeMnode(dnodeNumbers=6,mnodeNums=3,restartNumbers=4,stopRole='dnode') + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) diff --git a/tests/system-test/7-tmq/subscribeDb3.py b/tests/system-test/7-tmq/subscribeDb3.py index 5b5326cfba..b66334a6a6 100644 --- a/tests/system-test/7-tmq/subscribeDb3.py +++ b/tests/system-test/7-tmq/subscribeDb3.py @@ -336,7 +336,7 @@ class TDTestCase: for i in range(expectRows): totalConsumeRows += resultList[i] - if totalConsumeRows >= expectrowcnt or totalConsumeRows <= 0: + if totalConsumeRows > expectrowcnt or totalConsumeRows <= 0: tdLog.info("act consume rows: %d, expect consume rows between %d and 0"%(totalConsumeRows, expectrowcnt)) tdLog.exit("tmq consume rows error!") diff --git a/tests/system-test/7-tmq/subscribeStb.py b/tests/system-test/7-tmq/subscribeStb.py index 9dcbf5b351..53f1a34d58 100644 --- a/tests/system-test/7-tmq/subscribeStb.py +++ b/tests/system-test/7-tmq/subscribeStb.py @@ -226,12 +226,11 @@ class TDTestCase: self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) tdLog.info("start consume processor") - pollDelay = 5 + pollDelay = 10 showMsg = 1 showRow = 1 self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow) - time.sleep(5) self.create_ctables(tdSql, parameterDict["dbName"], parameterDict["stbName"], parameterDict["ctbNum"]) self.insert_data(tdSql,\ parameterDict["dbName"],\ @@ -307,7 +306,7 @@ class TDTestCase: self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) tdLog.info("start consume processor") - pollDelay = 5 + pollDelay = 10 showMsg = 1 showRow = 1 self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow)