diff --git a/cmake/curl_CMakeLists.txt.in b/cmake/curl_CMakeLists.txt.in index af7f005dda..197d978fb7 100644 --- a/cmake/curl_CMakeLists.txt.in +++ b/cmake/curl_CMakeLists.txt.in @@ -1,6 +1,7 @@ # curl ExternalProject_Add(curl2 - URL https://curl.se/download/curl-8.2.1.tar.gz + URL https://github.com/curl/curl/releases/download/curl-8_2_1/curl-8.2.1.tar.gz + #URL https://curl.se/download/curl-8.2.1.tar.gz URL_HASH MD5=b25588a43556068be05e1624e0e74d41 DOWNLOAD_NO_PROGRESS 1 DOWNLOAD_DIR "${TD_CONTRIB_DIR}/deps-download" diff --git a/include/util/taoserror.h b/include/util/taoserror.h index b49eb916f0..c4d61157b6 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -355,6 +355,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_MND_TRANS_NETWORK_UNAVAILL TAOS_DEF_ERROR_CODE(0, 0x03D5) #define TSDB_CODE_MND_LAST_TRANS_NOT_FINISHED TAOS_DEF_ERROR_CODE(0, 0x03D6) //internal #define TSDB_CODE_MND_TRANS_SYNC_TIMEOUT TAOS_DEF_ERROR_CODE(0, 0x03D7) +#define TSDB_CODE_MND_TRANS_CTX_SWITCH TAOS_DEF_ERROR_CODE(0, 0x03D8) #define TSDB_CODE_MND_TRANS_UNKNOW_ERROR TAOS_DEF_ERROR_CODE(0, 0x03DF) // mnode-mq diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index fd88098b03..fdac3882c3 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -1432,6 +1432,13 @@ static int32_t taosCfgSetOption(OptionNameAndVar *pOptions, int32_t optionSize, static int32_t taosCfgDynamicOptionsForServer(SConfig *pCfg, char *name) { terrno = TSDB_CODE_SUCCESS; + if (strcasecmp(name, "resetlog") == 0) { + // trigger, no item in cfg + taosResetLog(); + cfgDumpCfg(tsCfg, 0, false); + return 0; + } + SConfigItem *pItem = cfgGetItem(pCfg, name); if (!pItem || (pItem->dynScope & CFG_DYN_SERVER) == 0) { uError("failed to config:%s, not support", name); @@ -1445,12 +1452,6 @@ static int32_t taosCfgDynamicOptionsForServer(SConfig *pCfg, char *name) { return 0; } - if (strcasecmp(name, "resetlog") == 0) { - taosResetLog(); - cfgDumpCfg(tsCfg, 0, false); - return 0; - } - { // 'bool/int32_t/int64_t/float/double' variables with general modification function static OptionNameAndVar debugOptions[] = { {"dDebugFlag", &dDebugFlag}, {"vDebugFlag", &vDebugFlag}, {"mDebugFlag", &mDebugFlag}, @@ -1774,4 +1775,4 @@ void taosSetAllDebugFlag(int32_t flag, bool rewrite) { uInfo("all debug flag are set to %d", flag); } -int8_t taosGranted() { return atomic_load_8(&tsGrant); } \ No newline at end of file +int8_t taosGranted() { return atomic_load_8(&tsGrant); } diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index d6de406987..0fb246e945 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -108,11 +108,11 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_SNODE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_VNODE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_VNODE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_DND_CONFIG_DNODE_RSP, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_DND_ALTER_MNODE_TYPE_RSP, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_DND_ALTER_VNODE_TYPE_RSP, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_DND_CHECK_VNODE_LEARNER_CATCHUP_RSP, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_SYNC_CONFIG_CHANGE_RSP, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_DND_CONFIG_DNODE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_DND_ALTER_MNODE_TYPE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_DND_ALTER_VNODE_TYPE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_DND_CHECK_VNODE_LEARNER_CATCHUP_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SYNC_CONFIG_CHANGE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_CONNECT, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_ACCT, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mnode/impl/inc/mndTrans.h b/source/dnode/mnode/impl/inc/mndTrans.h index 04544da80e..1bd39a2299 100644 --- a/source/dnode/mnode/impl/inc/mndTrans.h +++ b/source/dnode/mnode/impl/inc/mndTrans.h @@ -97,7 +97,7 @@ SSdbRaw *mndTransEncode(STrans *pTrans); SSdbRow *mndTransDecode(SSdbRaw *pRaw); void mndTransDropData(STrans *pTrans); -bool mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans); +bool mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans, bool topHalf); #ifdef __cplusplus } #endif diff --git a/source/dnode/mnode/impl/src/mndCompactDetail.c b/source/dnode/mnode/impl/src/mndCompactDetail.c index a1c0e95c20..6b1dd78093 100644 --- a/source/dnode/mnode/impl/src/mndCompactDetail.c +++ b/source/dnode/mnode/impl/src/mndCompactDetail.c @@ -290,11 +290,11 @@ int32_t mndAddCompactDetailToTran(SMnode *pMnode, STrans *pTrans, SCompactObj* p SSdbRaw *pVgRaw = mndCompactDetailActionEncode(&compactDetail); if (pVgRaw == NULL) return -1; - if (mndTransAppendRedolog(pTrans, pVgRaw) != 0) { + if (mndTransAppendCommitlog(pTrans, pVgRaw) != 0) { sdbFreeRaw(pVgRaw); return -1; } (void)sdbSetRawStatus(pVgRaw, SDB_STATUS_READY); return 0; -} \ No newline at end of file +} diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 44ee804d22..01d4d1029c 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -1215,7 +1215,7 @@ int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topicName) return -1; } - if (mndSetDropSubRedoLogs(pMnode, pTrans, pSub) < 0) { + if (mndSetDropSubCommitLogs(pMnode, pTrans, pSub) < 0) { sdbRelease(pSdb, pSub); sdbCancelFetch(pSdb, pIter); return -1; diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index f46f33ac22..0fc8dad420 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -180,7 +180,7 @@ int32_t mndProcessWriteMsg(SMnode *pMnode, SRpcMsg *pMsg, SFsmCbMeta *pMeta) { goto _OUT; } - mInfo("trans:%d, is proposed, saved:%d code:0x%x, apply index:%" PRId64 " term:%" PRIu64 " config:%" PRId64 + mInfo("trans:%d, process sync proposal, saved:%d code:0x%x, apply index:%" PRId64 " term:%" PRIu64 " config:%" PRId64 " role:%s raw:%p sec:%d seq:%" PRId64, transId, pMgmt->transId, pMeta->code, pMeta->index, pMeta->term, pMeta->lastConfigIndex, syncStr(pMeta->state), pRaw, pMgmt->transSec, pMgmt->transSeq); @@ -208,15 +208,11 @@ int32_t mndProcessWriteMsg(SMnode *pMnode, SRpcMsg *pMsg, SFsmCbMeta *pMeta) { } if (pTrans->stage == TRN_STAGE_PREPARE) { - bool continueExec = mndTransPerformPrepareStage(pMnode, pTrans); + bool continueExec = mndTransPerformPrepareStage(pMnode, pTrans, false); if (!continueExec) goto _OUT; } - if (pTrans->id != pMgmt->transId) { - mInfo("trans:%d, execute in mnode which not leader or sync timeout, createTime:%" PRId64 " saved trans:%d", - pTrans->id, pTrans->createdTime, pMgmt->transId); - mndTransRefresh(pMnode, pTrans); - } + mndTransRefresh(pMnode, pTrans); sdbSetApplyInfo(pMnode->pSdb, pMeta->index, pMeta->term, pMeta->lastConfigIndex); sdbWriteFile(pMnode->pSdb, tsMndSdbWriteDelta); @@ -234,6 +230,7 @@ static int32_t mndPostMgmtCode(SMnode *pMnode, int32_t code) { goto _OUT; } + int32_t transId = pMgmt->transId; pMgmt->transId = 0; pMgmt->transSec = 0; pMgmt->transSeq = 0; @@ -241,9 +238,9 @@ static int32_t mndPostMgmtCode(SMnode *pMnode, int32_t code) { tsem_post(&pMgmt->syncSem); if (pMgmt->errCode != 0) { - mError("trans:%d, failed to propose since %s, post sem", pMgmt->transId, tstrerror(pMgmt->errCode)); + mError("trans:%d, failed to propose since %s, post sem", transId, tstrerror(pMgmt->errCode)); } else { - mInfo("trans:%d, is proposed and post sem, seq:%" PRId64, pMgmt->transId, pMgmt->transSeq); + mInfo("trans:%d, is proposed and post sem, seq:%" PRId64, transId, pMgmt->transSeq); } _OUT: @@ -542,7 +539,7 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) { taosThreadMutexLock(&pMgmt->lock); pMgmt->errCode = 0; - if (pMgmt->transId != 0 /* && pMgmt->transId != transId*/) { + if (pMgmt->transId != 0) { mError("trans:%d, can't be proposed since trans:%d already waiting for confirm", transId, pMgmt->transId); taosThreadMutexUnlock(&pMgmt->lock); rpcFreeCont(req.pCont); diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 7749decf91..9e478f3aa5 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -36,21 +36,25 @@ static int32_t mndTransAppendLog(SArray *pArray, SSdbRaw *pRaw); static int32_t mndTransAppendAction(SArray *pArray, STransAction *pAction); static void mndTransDropLogs(SArray *pArray); static void mndTransDropActions(SArray *pArray); -static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pArray); -static int32_t mndTransExecuteRedoLogs(SMnode *pMnode, STrans *pTrans); -static int32_t mndTransExecuteUndoLogs(SMnode *pMnode, STrans *pTrans); -static int32_t mndTransExecuteRedoActions(SMnode *pMnode, STrans *pTrans); -static int32_t mndTransExecuteUndoActions(SMnode *pMnode, STrans *pTrans); -static int32_t mndTransExecuteCommitActions(SMnode *pMnode, STrans *pTrans); -static bool mndTransPerformRedoLogStage(SMnode *pMnode, STrans *pTrans); -static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans); -static bool mndTransPerformUndoLogStage(SMnode *pMnode, STrans *pTrans); -static bool mndTransPerformUndoActionStage(SMnode *pMnode, STrans *pTrans); -static bool mndTransPerformCommitActionStage(SMnode *pMnode, STrans *pTrans); -static bool mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans); -static bool mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans); -static bool mndTransPerformFinishStage(SMnode *pMnode, STrans *pTrans); -static bool mndCannotExecuteTransAction(SMnode *pMnode) { return !pMnode->deploy && !mndIsLeader(pMnode); } + +static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pArray, bool topHalf); +static int32_t mndTransExecuteRedoLogs(SMnode *pMnode, STrans *pTrans, bool topHalf); +static int32_t mndTransExecuteUndoLogs(SMnode *pMnode, STrans *pTrans, bool topHalf); +static int32_t mndTransExecuteRedoActions(SMnode *pMnode, STrans *pTrans, bool topHalf); +static int32_t mndTransExecuteUndoActions(SMnode *pMnode, STrans *pTrans, bool topHalf); +static int32_t mndTransExecuteCommitActions(SMnode *pMnode, STrans *pTrans, bool topHalf); +static bool mndTransPerformRedoLogStage(SMnode *pMnode, STrans *pTrans, bool topHalf); +static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans, bool topHalf); +static bool mndTransPerformUndoLogStage(SMnode *pMnode, STrans *pTrans, bool topHalf); +static bool mndTransPerformUndoActionStage(SMnode *pMnode, STrans *pTrans, bool topHalf); +static bool mndTransPerformCommitActionStage(SMnode *pMnode, STrans *pTrans, bool topHalf); +static bool mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans, bool topHalf); +static bool mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans, bool topHalf); +static bool mndTransPerformFinishStage(SMnode *pMnode, STrans *pTrans, bool topHalf); + +static bool mndCannotExecuteTransAction(SMnode *pMnode, bool topHalf) { + return (!pMnode->deploy && !mndIsLeader(pMnode)) || !topHalf; +} static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans); static int32_t mndProcessTransTimer(SRpcMsg *pReq); @@ -1090,8 +1094,9 @@ static void mndTransResetActions(SMnode *pMnode, STrans *pTrans, SArray *pArray) } } -static int32_t mndTransWriteSingleLog(SMnode *pMnode, STrans *pTrans, STransAction *pAction) { +static int32_t mndTransWriteSingleLog(SMnode *pMnode, STrans *pTrans, STransAction *pAction, bool topHalf) { if (pAction->rawWritten) return 0; + if (topHalf) return TSDB_CODE_MND_TRANS_CTX_SWITCH; int32_t code = sdbWriteWithoutFree(pMnode->pSdb, pAction->pRaw); if (code == 0 || terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) { @@ -1112,9 +1117,9 @@ static int32_t mndTransWriteSingleLog(SMnode *pMnode, STrans *pTrans, STransActi return code; } -static int32_t mndTransSendSingleMsg(SMnode *pMnode, STrans *pTrans, STransAction *pAction) { - if (pAction->msgSent) return 0; - if (mndCannotExecuteTransAction(pMnode)) return -1; +static int32_t mndTransSendSingleMsg(SMnode *pMnode, STrans *pTrans, STransAction *pAction, bool topHalf) { + if (pAction->msgSent) return 0; + if (mndCannotExecuteTransAction(pMnode, topHalf)) return TSDB_CODE_MND_TRANS_CTX_SWITCH; int64_t signature = pTrans->id; signature = (signature << 32); @@ -1159,7 +1164,8 @@ static int32_t mndTransSendSingleMsg(SMnode *pMnode, STrans *pTrans, STransActio return code; } -static int32_t mndTransExecNullMsg(SMnode *pMnode, STrans *pTrans, STransAction *pAction) { +static int32_t mndTransExecNullMsg(SMnode *pMnode, STrans *pTrans, STransAction *pAction, bool topHalf) { + if (!topHalf) return TSDB_CODE_MND_TRANS_CTX_SWITCH; pAction->rawWritten = 0; pAction->errCode = 0; mInfo("trans:%d, %s:%d confirm action executed", pTrans->id, mndTransStr(pAction->stage), pAction->id); @@ -1168,34 +1174,39 @@ static int32_t mndTransExecNullMsg(SMnode *pMnode, STrans *pTrans, STransAction return 0; } -static int32_t mndTransExecSingleAction(SMnode *pMnode, STrans *pTrans, STransAction *pAction) { +static int32_t mndTransExecSingleAction(SMnode *pMnode, STrans *pTrans, STransAction *pAction, bool topHalf) { if (pAction->actionType == TRANS_ACTION_RAW) { - return mndTransWriteSingleLog(pMnode, pTrans, pAction); + return mndTransWriteSingleLog(pMnode, pTrans, pAction, topHalf); } else if (pAction->actionType == TRANS_ACTION_MSG) { - return mndTransSendSingleMsg(pMnode, pTrans, pAction); + return mndTransSendSingleMsg(pMnode, pTrans, pAction, topHalf); } else { - return mndTransExecNullMsg(pMnode, pTrans, pAction); + return mndTransExecNullMsg(pMnode, pTrans, pAction, topHalf); } } -static int32_t mndTransExecSingleActions(SMnode *pMnode, STrans *pTrans, SArray *pArray) { +static int32_t mndTransExecSingleActions(SMnode *pMnode, STrans *pTrans, SArray *pArray, bool topHalf) { int32_t numOfActions = taosArrayGetSize(pArray); int32_t code = 0; for (int32_t action = 0; action < numOfActions; ++action) { STransAction *pAction = taosArrayGet(pArray, action); - code = mndTransExecSingleAction(pMnode, pTrans, pAction); - if (code != 0) break; + code = mndTransExecSingleAction(pMnode, pTrans, pAction, topHalf); + if (code != 0) { + mInfo("trans:%d, action:%d not executed since %s. numOfActions:%d", pTrans->id, action, tstrerror(code), + numOfActions); + break; + } } return code; } -static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pArray) { +static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pArray, bool topHalf) { int32_t numOfActions = taosArrayGetSize(pArray); + int32_t code = 0; if (numOfActions == 0) return 0; - if (mndTransExecSingleActions(pMnode, pTrans, pArray) != 0) { + if ((code = mndTransExecSingleActions(pMnode, pTrans, pArray, topHalf)) != 0) { return -1; } @@ -1248,31 +1259,31 @@ static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pA } } -static int32_t mndTransExecuteRedoActions(SMnode *pMnode, STrans *pTrans) { - int32_t code = mndTransExecuteActions(pMnode, pTrans, pTrans->redoActions); +static int32_t mndTransExecuteRedoActions(SMnode *pMnode, STrans *pTrans, bool topHalf) { + int32_t code = mndTransExecuteActions(pMnode, pTrans, pTrans->redoActions, topHalf); if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) { mError("failed to execute redoActions since:%s, code:0x%x", terrstr(), terrno); } return code; } -static int32_t mndTransExecuteUndoActions(SMnode *pMnode, STrans *pTrans) { - int32_t code = mndTransExecuteActions(pMnode, pTrans, pTrans->undoActions); +static int32_t mndTransExecuteUndoActions(SMnode *pMnode, STrans *pTrans, bool topHalf) { + int32_t code = mndTransExecuteActions(pMnode, pTrans, pTrans->undoActions, topHalf); if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) { mError("failed to execute undoActions since %s", terrstr()); } return code; } -static int32_t mndTransExecuteCommitActions(SMnode *pMnode, STrans *pTrans) { - int32_t code = mndTransExecuteActions(pMnode, pTrans, pTrans->commitActions); +static int32_t mndTransExecuteCommitActions(SMnode *pMnode, STrans *pTrans, bool topHalf) { + int32_t code = mndTransExecuteActions(pMnode, pTrans, pTrans->commitActions, topHalf); if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) { mError("failed to execute commitActions since %s", terrstr()); } return code; } -static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans) { +static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans, bool topHalf) { int32_t code = 0; int32_t numOfActions = taosArrayGetSize(pTrans->redoActions); if (numOfActions == 0) return code; @@ -1289,7 +1300,7 @@ static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans) for (int32_t action = pTrans->redoActionPos; action < numOfActions; ++action) { STransAction *pAction = taosArrayGet(pTrans->redoActions, pTrans->redoActionPos); - code = mndTransExecSingleAction(pMnode, pTrans, pAction); + code = mndTransExecSingleAction(pMnode, pTrans, pAction, topHalf); if (code == 0) { if (pAction->msgSent) { if (pAction->msgReceived) { @@ -1317,14 +1328,16 @@ static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans) } mndSetTransLastAction(pTrans, pAction); - if (mndCannotExecuteTransAction(pMnode)) break; + if (mndCannotExecuteTransAction(pMnode, topHalf)) break; if (code == 0) { pTrans->code = 0; pTrans->redoActionPos++; mInfo("trans:%d, %s:%d is executed and need sync to other mnodes", pTrans->id, mndTransStr(pAction->stage), pAction->id); + taosThreadMutexUnlock(&pTrans->mutex); code = mndTransSync(pMnode, pTrans); + taosThreadMutexLock(&pTrans->mutex); if (code != 0) { pTrans->redoActionPos--; pTrans->code = terrno; @@ -1357,7 +1370,7 @@ static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans) return code; } -bool mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans) { +bool mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans, bool topHalf) { bool continueExec = true; int32_t code = 0; @@ -1368,7 +1381,7 @@ bool mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans) { for (int32_t action = 0; action < numOfActions; ++action) { STransAction *pAction = taosArrayGet(pTrans->prepareActions, action); - code = mndTransExecSingleAction(pMnode, pTrans, pAction); + code = mndTransExecSingleAction(pMnode, pTrans, pAction, topHalf); if (code != 0) { mError("trans:%d, failed to execute prepare action:%d, numOfActions:%d", pTrans->id, action, numOfActions); return false; @@ -1381,17 +1394,17 @@ _OVER: return continueExec; } -static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans) { +static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans, bool topHalf) { bool continueExec = true; int32_t code = 0; if (pTrans->exec == TRN_EXEC_SERIAL) { - code = mndTransExecuteRedoActionsSerial(pMnode, pTrans); + code = mndTransExecuteRedoActionsSerial(pMnode, pTrans, topHalf); } else { - code = mndTransExecuteRedoActions(pMnode, pTrans); + code = mndTransExecuteRedoActions(pMnode, pTrans, topHalf); } - if (mndCannotExecuteTransAction(pMnode)) return false; + if (mndCannotExecuteTransAction(pMnode, topHalf)) return false; terrno = code; if (code == 0) { @@ -1431,8 +1444,8 @@ static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans) { return continueExec; } -static bool mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans) { - if (mndCannotExecuteTransAction(pMnode)) return false; +static bool mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans, bool topHalf) { + if (mndCannotExecuteTransAction(pMnode, topHalf)) return false; bool continueExec = true; int32_t code = mndTransCommit(pMnode, pTrans); @@ -1452,9 +1465,9 @@ static bool mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans) { return continueExec; } -static bool mndTransPerformCommitActionStage(SMnode *pMnode, STrans *pTrans) { +static bool mndTransPerformCommitActionStage(SMnode *pMnode, STrans *pTrans, bool topHalf) { bool continueExec = true; - int32_t code = mndTransExecuteCommitActions(pMnode, pTrans); + int32_t code = mndTransExecuteCommitActions(pMnode, pTrans, topHalf); if (code == 0) { pTrans->code = 0; @@ -1471,9 +1484,9 @@ static bool mndTransPerformCommitActionStage(SMnode *pMnode, STrans *pTrans) { return continueExec; } -static bool mndTransPerformUndoActionStage(SMnode *pMnode, STrans *pTrans) { +static bool mndTransPerformUndoActionStage(SMnode *pMnode, STrans *pTrans, bool topHalf) { bool continueExec = true; - int32_t code = mndTransExecuteUndoActions(pMnode, pTrans); + int32_t code = mndTransExecuteUndoActions(pMnode, pTrans, topHalf); if (code == 0) { pTrans->stage = TRN_STAGE_PRE_FINISH; @@ -1491,8 +1504,8 @@ static bool mndTransPerformUndoActionStage(SMnode *pMnode, STrans *pTrans) { return continueExec; } -static bool mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans) { - if (mndCannotExecuteTransAction(pMnode)) return false; +static bool mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans, bool topHalf) { + if (mndCannotExecuteTransAction(pMnode, topHalf)) return false; bool continueExec = true; int32_t code = mndTransRollback(pMnode, pTrans); @@ -1510,8 +1523,8 @@ static bool mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans) { return continueExec; } -static bool mndTransPerformPreFinishStage(SMnode *pMnode, STrans *pTrans) { - if (mndCannotExecuteTransAction(pMnode)) return false; +static bool mndTransPerformPreFinishStage(SMnode *pMnode, STrans *pTrans, bool topHalf) { + if (mndCannotExecuteTransAction(pMnode, topHalf)) return false; bool continueExec = true; int32_t code = mndTransPreFinish(pMnode, pTrans); @@ -1529,8 +1542,9 @@ static bool mndTransPerformPreFinishStage(SMnode *pMnode, STrans *pTrans) { return continueExec; } -static bool mndTransPerformFinishStage(SMnode *pMnode, STrans *pTrans) { +static bool mndTransPerformFinishStage(SMnode *pMnode, STrans *pTrans, bool topHalf) { bool continueExec = false; + if (topHalf) return continueExec; SSdbRaw *pRaw = mndTransEncode(pTrans); if (pRaw == NULL) { @@ -1558,43 +1572,28 @@ void mndTransExecuteImp(SMnode *pMnode, STrans *pTrans, bool topHalf) { pTrans->lastExecTime = taosGetTimestampMs(); switch (pTrans->stage) { case TRN_STAGE_PREPARE: - continueExec = mndTransPerformPrepareStage(pMnode, pTrans); + continueExec = mndTransPerformPrepareStage(pMnode, pTrans, topHalf); break; case TRN_STAGE_REDO_ACTION: - continueExec = mndTransPerformRedoActionStage(pMnode, pTrans); + continueExec = mndTransPerformRedoActionStage(pMnode, pTrans, topHalf); break; case TRN_STAGE_COMMIT: - if (topHalf) { - continueExec = mndTransPerformCommitStage(pMnode, pTrans); - } else { - mInfo("trans:%d, can not commit since not leader", pTrans->id); - continueExec = false; - } + continueExec = mndTransPerformCommitStage(pMnode, pTrans, topHalf); break; case TRN_STAGE_COMMIT_ACTION: - continueExec = mndTransPerformCommitActionStage(pMnode, pTrans); + continueExec = mndTransPerformCommitActionStage(pMnode, pTrans, topHalf); break; case TRN_STAGE_ROLLBACK: - if (topHalf) { - continueExec = mndTransPerformRollbackStage(pMnode, pTrans); - } else { - mInfo("trans:%d, can not rollback since not leader", pTrans->id); - continueExec = false; - } + continueExec = mndTransPerformRollbackStage(pMnode, pTrans, topHalf); break; case TRN_STAGE_UNDO_ACTION: - continueExec = mndTransPerformUndoActionStage(pMnode, pTrans); + continueExec = mndTransPerformUndoActionStage(pMnode, pTrans, topHalf); break; case TRN_STAGE_PRE_FINISH: - if (topHalf) { - continueExec = mndTransPerformPreFinishStage(pMnode, pTrans); - } else { - mInfo("trans:%d, can not pre-finish since not leader", pTrans->id); - continueExec = false; - } + continueExec = mndTransPerformPreFinishStage(pMnode, pTrans, topHalf); break; case TRN_STAGE_FINISH: - continueExec = mndTransPerformFinishStage(pMnode, pTrans); + continueExec = mndTransPerformFinishStage(pMnode, pTrans, topHalf); break; default: continueExec = false; diff --git a/source/dnode/vnode/src/meta/metaTable.c b/source/dnode/vnode/src/meta/metaTable.c index 976ee616f9..ac2486eda1 100644 --- a/source/dnode/vnode/src/meta/metaTable.c +++ b/source/dnode/vnode/src/meta/metaTable.c @@ -36,7 +36,6 @@ static int metaDeleteBtimeIdx(SMeta *pMeta, const SMetaEntry *pME); static int metaUpdateNcolIdx(SMeta *pMeta, const SMetaEntry *pME); static int metaDeleteNcolIdx(SMeta *pMeta, const SMetaEntry *pME); - static void metaGetEntryInfo(const SMetaEntry *pEntry, SMetaInfo *pInfo) { pInfo->uid = pEntry->uid; pInfo->version = pEntry->version; @@ -562,6 +561,7 @@ int metaAddIndexToSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) { tdbTbUpsert(pMeta->pTagIdx, pTagIdxKey, nTagIdxKey, NULL, 0, pMeta->txn); metaULock(pMeta); metaDestroyTagIdxKey(pTagIdxKey); + pTagIdxKey = NULL; } nStbEntry.version = version; @@ -692,6 +692,7 @@ int metaDropIndexFromSTable(SMeta *pMeta, int64_t version, SDropIndexReq *pReq) tdbTbDelete(pMeta->pTagIdx, pTagIdxKey, nTagIdxKey, pMeta->txn); metaULock(pMeta); metaDestroyTagIdxKey(pTagIdxKey); + pTagIdxKey = NULL; } // clear idx flag @@ -1076,7 +1077,7 @@ static int metaDeleteTtl(SMeta *pMeta, const SMetaEntry *pME) { return ttlMgrDeleteTtl(pMeta->pTtlMgr, &ctx); } -static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type, tb_uid_t *pSuid, int8_t* pSysTbl) { +static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type, tb_uid_t *pSuid, int8_t *pSysTbl) { void *pData = NULL; int nData = 0; int rc = 0; @@ -1146,6 +1147,7 @@ static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type, tb_uid_t *p tdbTbDelete(pMeta->pTagIdx, pTagIdxKey, nTagIdxKey, pMeta->txn); } metaDestroyTagIdxKey(pTagIdxKey); + pTagIdxKey = NULL; } } tDecoderClear(&tdc); @@ -1865,6 +1867,7 @@ static int metaAddTagIndex(SMeta *pMeta, int64_t version, SVAlterTbReq *pAlterTb } tdbTbUpsert(pMeta->pTagIdx, pTagIdxKey, nTagIdxKey, NULL, 0, pMeta->txn); metaDestroyTagIdxKey(pTagIdxKey); + pTagIdxKey = NULL; } tdbTbcClose(pCtbIdxc); return 0; @@ -2122,7 +2125,7 @@ int metaUpdateChangeTimeWithLock(SMeta *pMeta, tb_uid_t uid, int64_t changeTimeM if (!tsTtlChangeOnWrite) return 0; metaWLock(pMeta); - int ret = metaUpdateChangeTime(pMeta, uid, changeTimeMs); + int ret = metaUpdateChangeTime(pMeta, uid, changeTimeMs); metaULock(pMeta); return ret; } @@ -2228,15 +2231,14 @@ static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry) { nTagData = tDataTypes[pTagColumn->type].bytes; } - if (pTagData != NULL) { - if (metaCreateTagIdxKey(pCtbEntry->ctbEntry.suid, pTagColumn->colId, pTagData, nTagData, pTagColumn->type, - pCtbEntry->uid, &pTagIdxKey, &nTagIdxKey) < 0) { - ret = -1; - goto end; - } - tdbTbUpsert(pMeta->pTagIdx, pTagIdxKey, nTagIdxKey, NULL, 0, pMeta->txn); + if (metaCreateTagIdxKey(pCtbEntry->ctbEntry.suid, pTagColumn->colId, pTagData, nTagData, pTagColumn->type, + pCtbEntry->uid, &pTagIdxKey, &nTagIdxKey) < 0) { + ret = -1; + goto end; } + tdbTbUpsert(pMeta->pTagIdx, pTagIdxKey, nTagIdxKey, NULL, 0, pMeta->txn); metaDestroyTagIdxKey(pTagIdxKey); + pTagIdxKey = NULL; } } end: diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 7b68b65c17..c9490e2c55 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -1623,6 +1623,7 @@ void initDummyFunction(SqlFunctionCtx* pDummy, SqlFunctionCtx* pCtx, int32_t num pDummy[i].functionId = pCtx[i].functionId; pDummy[i].isNotNullFunc = pCtx[i].isNotNullFunc; pDummy[i].isPseudoFunc = pCtx[i].isPseudoFunc; + pDummy[i].fpSet.init = pCtx[i].fpSet.init; } } @@ -2774,6 +2775,9 @@ void streamSessionSemiReloadState(SOperatorInfo* pOperator) { for (int32_t i = 0; i < num; i++) { SResultWindowInfo winInfo = {0}; getSessionWindowInfoByKey(pAggSup, pSeKeyBuf + i, &winInfo); + if (!IS_VALID_SESSION_WIN(winInfo)) { + continue; + } compactSessionSemiWindow(pOperator, &winInfo); saveSessionOutputBuf(pAggSup, &winInfo); } diff --git a/source/libs/parser/CMakeLists.txt b/source/libs/parser/CMakeLists.txt index 4e4a4def1d..c5ee1a00c4 100644 --- a/source/libs/parser/CMakeLists.txt +++ b/source/libs/parser/CMakeLists.txt @@ -4,9 +4,6 @@ IF (TD_ENTERPRISE) LIST(APPEND PARSER_SRC ${TD_ENTERPRISE_DIR}/src/plugins/view/src/parserView.c) ENDIF () -IF (TD_BI_SUPPORT) - LIST(APPEND PARSER_SRC ${TD_ENTERPRISE_DIR}/src/plugins/bi/src/biRewriteQuery.c) -ENDIF () add_library(parser STATIC ${PARSER_SRC}) target_include_directories( parser diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index cf6db9f549..cec123dc3d 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -1104,11 +1104,192 @@ static EDealRes translateColumnUseAlias(STranslateContext* pCxt, SColumnNode** p return DEAL_RES_CONTINUE; } -#ifndef TD_ENTERPRISE +static void biMakeAliasNameInMD5(char* pExprStr, int32_t len, char* pAlias) { + T_MD5_CTX ctx; + tMD5Init(&ctx); + tMD5Update(&ctx, pExprStr, len); + tMD5Final(&ctx); + char* p = pAlias; + for (uint8_t i = 0; i < tListLen(ctx.digest); ++i) { + sprintf(p, "%02x", ctx.digest[i]); + p += 2; + } +} + +static SNode* biMakeTbnameProjectAstNode(char* funcName, char* tableAlias) { + SValueNode* valNode = NULL; + if (tableAlias != NULL) { + SValueNode* n = (SValueNode*)nodesMakeNode(QUERY_NODE_VALUE); + n->literal = tstrdup(tableAlias); + n->node.resType.type = TSDB_DATA_TYPE_BINARY; + n->node.resType.bytes = strlen(n->literal); + n->isDuration = false; + n->translate = false; + valNode = n; + } + + SFunctionNode* tbNameFunc = (SFunctionNode*)nodesMakeNode(QUERY_NODE_FUNCTION); + tstrncpy(tbNameFunc->functionName, "tbname", TSDB_FUNC_NAME_LEN); + if (valNode != NULL) { + nodesListMakeAppend(&tbNameFunc->pParameterList, (SNode*)valNode); + } + snprintf(tbNameFunc->node.userAlias, sizeof(tbNameFunc->node.userAlias), + (tableAlias)? "%s.tbname" : "%stbname", + (tableAlias)? tableAlias : ""); + strncpy(tbNameFunc->node.aliasName, tbNameFunc->functionName, TSDB_COL_NAME_LEN); + + if (funcName == NULL) { + return (SNode*)tbNameFunc; + } else { + SFunctionNode* multiResFunc = (SFunctionNode*)nodesMakeNode(QUERY_NODE_FUNCTION); + tstrncpy(multiResFunc->functionName, funcName, TSDB_FUNC_NAME_LEN); + nodesListMakeAppend(&multiResFunc->pParameterList, (SNode*)tbNameFunc); + + if (tsKeepColumnName) { + snprintf(multiResFunc->node.userAlias, sizeof(tbNameFunc->node.userAlias), + (tableAlias)? "%s.tbname" : "%stbname", + (tableAlias)? tableAlias : ""); + strcpy(multiResFunc->node.aliasName, tbNameFunc->functionName); + } else { + snprintf(multiResFunc->node.userAlias, sizeof(multiResFunc->node.userAlias), + tableAlias? "%s(%s.tbname)" : "%s(%stbname)", funcName, + tableAlias? tableAlias: ""); + biMakeAliasNameInMD5(multiResFunc->node.userAlias, strlen(multiResFunc->node.userAlias), multiResFunc->node.aliasName); + } + + return (SNode*)multiResFunc; + } +} + +static int32_t biRewriteSelectFuncParamStar(STranslateContext* pCxt, SSelectStmt* pSelect, SNode* pNode, SListCell* pSelectListCell) { + SNodeList* pTbnameNodeList = nodesMakeList(); + + SFunctionNode* pFunc = (SFunctionNode*)pNode; + if (strcasecmp(pFunc->functionName, "last") == 0 || + strcasecmp(pFunc->functionName, "last_row") == 0 || + strcasecmp(pFunc->functionName, "first") == 0) { + SNodeList* pParams = pFunc->pParameterList; + SNode* pPara = NULL; + FOREACH(pPara, pParams) { + if (nodesIsStar(pPara)) { + SArray* pTables = taosArrayGetP(pCxt->pNsLevel, pCxt->currLevel); + size_t n = taosArrayGetSize(pTables); + for (int32_t i = 0; i < n; ++i) { + STableNode* pTable = taosArrayGetP(pTables, i); + if (nodeType(pTable) == QUERY_NODE_REAL_TABLE && ((SRealTableNode*)pTable)->pMeta != NULL && + ((SRealTableNode*)pTable)->pMeta->tableType == TSDB_SUPER_TABLE) { + SNode* pTbnameNode = biMakeTbnameProjectAstNode(pFunc->functionName, NULL); + nodesListAppend(pTbnameNodeList, pTbnameNode); + } + } + if (LIST_LENGTH(pTbnameNodeList) > 0) { + nodesListInsertListAfterPos(pSelect->pProjectionList, pSelectListCell, pTbnameNodeList); + } + } else if (nodesIsTableStar(pPara)) { + char* pTableAlias = ((SColumnNode*)pPara)->tableAlias; + STableNode* pTable = NULL; + int32_t code = findTable(pCxt, pTableAlias, &pTable); + if (TSDB_CODE_SUCCESS == code && nodeType(pTable) == QUERY_NODE_REAL_TABLE && + ((SRealTableNode*)pTable)->pMeta != NULL && + ((SRealTableNode*)pTable)->pMeta->tableType == TSDB_SUPER_TABLE) { + SNode* pTbnameNode = biMakeTbnameProjectAstNode(pFunc->functionName, pTableAlias); + nodesListAppend(pTbnameNodeList, pTbnameNode); + } + if (LIST_LENGTH(pTbnameNodeList) > 0) { + nodesListInsertListAfterPos(pSelect->pProjectionList, pSelectListCell, pTbnameNodeList); + } + } + } + } + return TSDB_CODE_SUCCESS; +} + +// after translate from +// before translate select list +int32_t biRewriteSelectStar(STranslateContext* pCxt, SSelectStmt* pSelect) { + SNode* pNode = NULL; + SNodeList* pTbnameNodeList = nodesMakeList(); + WHERE_EACH(pNode, pSelect->pProjectionList) { + if (nodesIsStar(pNode)) { + SArray* pTables = taosArrayGetP(pCxt->pNsLevel, pCxt->currLevel); + size_t n = taosArrayGetSize(pTables); + for (int32_t i = 0; i < n; ++i) { + STableNode* pTable = taosArrayGetP(pTables, i); + if (nodeType(pTable) == QUERY_NODE_REAL_TABLE && + ((SRealTableNode*)pTable)->pMeta != NULL && + ((SRealTableNode*)pTable)->pMeta->tableType == TSDB_SUPER_TABLE) { + SNode* pTbnameNode = biMakeTbnameProjectAstNode(NULL, NULL); + nodesListAppend(pTbnameNodeList, pTbnameNode); + } + } + if (LIST_LENGTH(pTbnameNodeList) > 0) { + nodesListInsertListAfterPos(pSelect->pProjectionList, cell, pTbnameNodeList); + } + } else if (nodesIsTableStar(pNode)) { + char* pTableAlias = ((SColumnNode*)pNode)->tableAlias; + STableNode* pTable = NULL; + int32_t code = findTable(pCxt, pTableAlias, &pTable); + if (TSDB_CODE_SUCCESS == code && + nodeType(pTable) == QUERY_NODE_REAL_TABLE && + ((SRealTableNode*)pTable)->pMeta != NULL && + ((SRealTableNode*)pTable)->pMeta->tableType == TSDB_SUPER_TABLE) { + SNode* pTbnameNode = biMakeTbnameProjectAstNode(NULL, pTableAlias); + nodesListAppend(pTbnameNodeList, pTbnameNode); + } + if (LIST_LENGTH(pTbnameNodeList) > 0) { + nodesListInsertListAfterPos(pSelect->pProjectionList, cell, pTbnameNodeList); + } + } else if (nodeType(pNode) == QUERY_NODE_FUNCTION) { + biRewriteSelectFuncParamStar(pCxt, pSelect, pNode, cell); + } + WHERE_NEXT; + } + + return TSDB_CODE_SUCCESS; +} + bool biRewriteToTbnameFunc(STranslateContext* pCxt, SNode** ppNode) { + SColumnNode* pCol = (SColumnNode*)(*ppNode); + if ((strcasecmp(pCol->colName, "tbname") == 0) && + ((SSelectStmt*)pCxt->pCurrStmt)->pFromTable && + QUERY_NODE_REAL_TABLE == nodeType(((SSelectStmt*)pCxt->pCurrStmt)->pFromTable)) { + SFunctionNode* tbnameFuncNode = NULL; + tbnameFuncNode = (SFunctionNode*)biMakeTbnameProjectAstNode(NULL, (pCol->tableAlias[0]!='\0') ? pCol->tableAlias : NULL); + tbnameFuncNode->node.resType = pCol->node.resType; + strcpy(tbnameFuncNode->node.aliasName, pCol->node.aliasName); + strcpy(tbnameFuncNode->node.userAlias, pCol->node.userAlias); + + nodesDestroyNode(*ppNode); + *ppNode = (SNode*)tbnameFuncNode; + return true; + } + return false; } -#endif + +int32_t biCheckCreateTableTbnameCol(STranslateContext* pCxt, SCreateTableStmt* pStmt) { + if (pStmt->pTags) { + SNode* pNode = NULL; + FOREACH(pNode, pStmt->pTags) { + SColumnDefNode* pTag = (SColumnDefNode*)pNode; + if (strcasecmp(pTag->colName, "tbname") == 0) { + int32_t code = generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_TAG_NAME, "tbname can not used for tags in BI mode"); + return code; + } + } + } + if (pStmt->pCols) { + SNode* pNode = NULL; + FOREACH(pNode, pStmt->pCols) { + SColumnDefNode* pCol = (SColumnDefNode*)pNode; + if (strcasecmp(pCol->colName, "tbname") == 0) { + int32_t code = generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_COLUMN, "tbname can not used for columns in BI mode"); + return code; + } + } + } + return TSDB_CODE_SUCCESS; +} static EDealRes translateColumn(STranslateContext* pCxt, SColumnNode** pCol) { if (NULL == pCxt->pCurrStmt || @@ -3178,10 +3359,6 @@ static int32_t createTags(STranslateContext* pCxt, SNodeList** pOutput) { return TSDB_CODE_SUCCESS; } -#ifndef TD_ENTERPRISE -int32_t biRewriteSelectStar(STranslateContext* pCxt, SSelectStmt* pSelect) { return TSDB_CODE_SUCCESS; } -#endif - static int32_t translateStar(STranslateContext* pCxt, SSelectStmt* pSelect) { SNode* pNode = NULL; WHERE_EACH(pNode, pSelect->pProjectionList) { @@ -5743,11 +5920,6 @@ static int32_t checkTableDeleteMarkOption(STranslateContext* pCxt, STableOptions return code; } -#ifndef TD_ENTERPRISE -int32_t biCheckCreateTableTbnameCol(STranslateContext* pCxt, SCreateTableStmt* pStmt) { - return TSDB_CODE_SUCCESS; -} -#endif static int32_t checkCreateTable(STranslateContext* pCxt, SCreateTableStmt* pStmt, bool createStable) { if (NULL != strchr(pStmt->tableName, '.')) { @@ -8769,7 +8941,7 @@ static int32_t extractShowVariablesResultSchema(int32_t* numOfCols, SSchema** pS (*pSchema)[0].type = TSDB_DATA_TYPE_BINARY; (*pSchema)[0].bytes = TSDB_CONFIG_OPTION_LEN; - strcpy((*pSchema)[0].name, "name"); + strcpy((*pSchema)[0].name, "result"); (*pSchema)[1].type = TSDB_DATA_TYPE_BINARY; (*pSchema)[1].bytes = TSDB_CONFIG_VALUE_LEN; diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index b8740a2858..6f3b3fdf98 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -1162,9 +1162,12 @@ int32_t syncNodeRestore(SSyncNode* pSyncNode) { ASSERTS(pSyncNode->pLogStore != NULL, "log store not created"); ASSERTS(pSyncNode->pLogBuf != NULL, "ring log buffer not created"); + taosThreadMutexLock(&pSyncNode->pLogBuf->mutex); SyncIndex lastVer = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore); SyncIndex commitIndex = pSyncNode->pLogStore->syncLogCommitIndex(pSyncNode->pLogStore); SyncIndex endIndex = pSyncNode->pLogBuf->endIndex; + taosThreadMutexUnlock(&pSyncNode->pLogBuf->mutex); + if (lastVer != -1 && endIndex != lastVer + 1) { terrno = TSDB_CODE_WAL_LOG_INCOMPLETE; sError("vgId:%d, failed to restore sync node since %s. expected lastLogIndex:%" PRId64 ", lastVer:%" PRId64 "", diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index f060e9da13..2277b70c8f 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -1087,7 +1087,10 @@ static int32_t syncSnapSenderExchgSnapInfo(SSyncNode *pSyncNode, SSyncSnapshotSe // sender static int32_t syncNodeOnSnapshotPrepRsp(SSyncNode *pSyncNode, SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg) { + int32_t code = -1; SSnapshot snapshot = {0}; + + taosThreadMutexLock(&pSender->pSndBuf->mutex); pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot); // prepare @@ -1103,20 +1106,24 @@ static int32_t syncNodeOnSnapshotPrepRsp(SSyncNode *pSyncNode, SSyncSnapshotSend // start reader if (pMsg->payloadType == TDMT_SYNC_PREP_SNAPSHOT_REPLY) { if (syncSnapSenderExchgSnapInfo(pSyncNode, pSender, pMsg) != 0) { - return -1; + goto _out; } } - int32_t code = pSyncNode->pFsm->FpSnapshotStartRead(pSyncNode->pFsm, &pSender->snapshotParam, &pSender->pReader); + code = pSyncNode->pFsm->FpSnapshotStartRead(pSyncNode->pFsm, &pSender->snapshotParam, &pSender->pReader); if (code != 0) { sSError(pSender, "prepare snapshot failed since %s", terrstr()); - return -1; + goto _out; } // update next index syncIndexMgrSetIndex(pSyncNode->pNextIndex, &pMsg->srcId, snapshot.lastApplyIndex + 1); - return snapshotSend(pSender); + code = snapshotSend(pSender); + +_out: + taosThreadMutexUnlock(&pSender->pSndBuf->mutex); + return code; } static int32_t snapshotSenderSignatureCmp(SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg) { diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 6a32fed147..c294270e4a 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -286,6 +286,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_CLOG_IS_NULL, "Transaction commitlog TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_NETWORK_UNAVAILL, "Unable to establish connection While execute transaction and will continue in the background") TAOS_DEFINE_ERROR(TSDB_CODE_MND_LAST_TRANS_NOT_FINISHED, "Last Transaction not finished") TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_SYNC_TIMEOUT, "Sync timeout While execute transaction and will continue in the background") +TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_CTX_SWITCH, "Transaction context switch") TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_UNKNOW_ERROR, "Unknown transaction error") // mnode-mq diff --git a/source/util/src/tlog.c b/source/util/src/tlog.c index 184e18fc67..e113a95fcc 100644 --- a/source/util/src/tlog.c +++ b/source/util/src/tlog.c @@ -337,14 +337,10 @@ static int32_t taosOpenNewLogFile() { } void taosResetLog() { - char lastName[LOG_FILE_NAME_LEN + 20]; - sprintf(lastName, "%s.%d", tsLogObj.logName, tsLogObj.flag); - // force create a new log file tsLogObj.lines = tsNumOfLogLines + 10; taosOpenNewLogFile(); - (void)taosRemoveFile(lastName); uInfo("=================================="); uInfo(" reset log file "); diff --git a/tests/army/enterprise/multi-level/mlevel_basic.json b/tests/army/enterprise/multi-level/mlevel_basic.json new file mode 100644 index 0000000000..1c2b9274d2 --- /dev/null +++ b/tests/army/enterprise/multi-level/mlevel_basic.json @@ -0,0 +1,58 @@ +{ + "filetype": "insert", + "cfgdir": "/etc/taos", + "host": "127.0.0.1", + "port": 6030, + "user": "root", + "password": "taosdata", + "connection_pool_size": 8, + "num_of_records_per_req": 2000, + "thread_count": 2, + "create_table_thread_count": 1, + "confirm_parameter_prompt": "no", + "databases": [ + { + "dbinfo": { + "name": "db", + "drop": "yes", + "vgroups": 2, + "replica": 1, + "duration":"1d", + "keep": "3d,6d,30d" + }, + "super_tables": [ + { + "name": "stb", + "child_table_exists": "no", + "childtable_count": 4, + "insert_rows": 1000000, + "childtable_prefix": "d", + "insert_mode": "taosc", + "timestamp_step": 1000, + "start_timestamp":"now-12d", + "columns": [ + { "type": "bool", "name": "bc"}, + { "type": "float", "name": "fc" }, + { "type": "double", "name": "dc"}, + { "type": "tinyint", "name": "ti", "values":["1"]}, + { "type": "smallint", "name": "si" }, + { "type": "int", "name": "ic" }, + { "type": "bigint", "name": "bi" }, + { "type": "utinyint", "name": "uti"}, + { "type": "usmallint", "name": "usi"}, + { "type": "uint", "name": "ui" }, + { "type": "ubigint", "name": "ubi"}, + { "type": "binary", "name": "bin", "len": 32}, + { "type": "nchar", "name": "nch", "len": 64} + ], + "tags": [ + {"type": "tinyint", "name": "groupid","max": 10,"min": 1}, + {"name": "location","type": "binary", "len": 16, "values": + ["San Francisco", "Los Angles", "San Diego", "San Jose", "Palo Alto", "Campbell", "Mountain View","Sunnyvale", "Santa Clara", "Cupertino"] + } + ] + } + ] + } + ] +} diff --git a/tests/army/enterprise/multi-level/mlevel_basic.py b/tests/army/enterprise/multi-level/mlevel_basic.py index e3a3f57c74..3bec2bfb72 100644 --- a/tests/army/enterprise/multi-level/mlevel_basic.py +++ b/tests/army/enterprise/multi-level/mlevel_basic.py @@ -15,26 +15,62 @@ import sys import time import taos +import frame +import frame.etool + from frame.log import * from frame.cases import * from frame.sql import * +from frame.caseBase import * +from frame import * -class TDTestCase: - # init - def init(self, conn, logSql, replicaVar=1): - self.replicaVar = int(replicaVar) - tdLog.debug(f"start to excute {__file__}") - tdSql.init(conn.cursor(), True) + +class TDTestCase(TBase): + + + def insertData(self): + tdLog.info(f"insert data.") + # taosBenchmark run + json = etool.curFile(__file__, "mlevel_basic.json") + etool.runBenchmark(json=json) + + tdSql.execute(f"use {self.db}") + # set insert data information + self.childtable_count = 4 + self.insert_rows = 1000000 + self.timestamp_step = 1000 + + def doAction(self): + tdLog.info(f"do action.") + self.flushDb() + self.trimDb() + self.compactDb() # run def run(self): - # check two db query result same - tdLog.info(f"hello world.") + tdLog.debug(f"start to excute {__file__}") + + # insert data + self.insertData() + + # check insert data correct + self.checkInsertCorrect() + + # save + self.snapshotAgg() + + # do action + self.doAction() + + # check save agg result correct + self.checkAggCorrect() + + # check insert correct again + self.checkInsertCorrect() - # stop - def stop(self): - tdSql.close() tdLog.success(f"{__file__} successfully executed") + + tdCases.addLinux(__file__, TDTestCase()) -tdCases.addWindows(__file__, TDTestCase()) \ No newline at end of file +tdCases.addWindows(__file__, TDTestCase()) diff --git a/tests/army/frame/caseBase.py b/tests/army/frame/caseBase.py new file mode 100644 index 0000000000..441196f050 --- /dev/null +++ b/tests/army/frame/caseBase.py @@ -0,0 +1,106 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + +import sys +import os +import time +import datetime + +from frame.log import * +from frame.sql import * + +# test case base +class TBase: + +# +# frame call +# + + # init + def init(self, conn, logSql, replicaVar=1): + # save param + self.replicaVar = int(replicaVar) + tdSql.init(conn.cursor(), True) + + # record server information + self.dnodeNum = 0 + self.mnodeNum = 0 + self.mLevel = 0 + self.mLevelDisk = 0 + + # test case information + self.db = "db" + self.stb = "stb" + + # variant in taosBenchmark json + self.childtable_count = 2 + self.insert_rows = 1000000 + self.timestamp_step = 1000 + + # sql + self.sqlSum = f"select sum(ic) from {self.stb}" + self.sqlMax = f"select max(ic) from {self.stb}" + self.sqlMin = f"select min(ic) from {self.stb}" + self.sqlAvg = f"select avg(ic) from {self.stb}" + + + # stop + def stop(self): + tdSql.close() + + +# +# db action +# + + def trimDb(self): + tdSql.execute(f"trim database {self.db}") + + def compactDb(self): + tdSql.execute(f"compact database {self.db}") + + def flushDb(self): + tdSql.execute(f"flush database {self.db}") + +# +# check db correct +# + + # basic + def checkInsertCorrect(self): + # check count + sql = f"select count(*) from {self.stb}" + tdSql.checkAgg(sql, self.childtable_count * self.insert_rows) + + # check child table count + sql = f" select count(*) from (select count(*) as cnt , tbname from {self.stb} group by tbname) where cnt = {self.insert_rows} " + tdSql.checkAgg(sql, self.childtable_count) + + # check step + sql = f"select count(*) from (select diff(ts) as dif from {self.stb} partition by tbname) where dif != {self.timestamp_step}" + tdSql.checkAgg(sql, 0) + + # save agg result + def snapshotAgg(self): + + self.sum = tdSql.getFirstValue(self.sqlSum) + self.avg = tdSql.getFirstValue(self.sqlAvg) + self.min = tdSql.getFirstValue(self.sqlMin) + self.max = tdSql.getFirstValue(self.sqlMax) + + # check agg + def checkAggCorrect(self): + tdSql.checkAgg(self.sqlSum, self.sum) + tdSql.checkAgg(self.sqlAvg, self.avg) + tdSql.checkAgg(self.sqlMin, self.min) + tdSql.checkAgg(self.sqlMax, self.max) diff --git a/tests/army/frame/common.py b/tests/army/frame/common.py index 93059ff078..5cdb3f9f46 100644 --- a/tests/army/frame/common.py +++ b/tests/army/frame/common.py @@ -23,7 +23,7 @@ import taos from frame.log import * from frame.sql import * from frame.cases import * -from frame.dnodes import * +from frame.server.dnodes import * from frame.common import * from frame.constant import * from dataclasses import dataclass,field diff --git a/tests/army/frame/dnodes-default.py b/tests/army/frame/dnodes-default.py deleted file mode 100644 index a1d1698b00..0000000000 --- a/tests/army/frame/dnodes-default.py +++ /dev/null @@ -1,502 +0,0 @@ -################################################################### -# Copyright (c) 2016 by TAOS Technologies, Inc. -# All rights reserved. -# -# This file is proprietary and confidential to TAOS Technologies. -# No part of this file may be reproduced, stored, transmitted, -# disclosed or used in any form or by any means other than as -# expressly provided by the written permission from Jianhui Tao -# -################################################################### - -# -*- coding: utf-8 -*- - -import sys -import os -import os.path -import subprocess -from frame.log import * - - -class TDSimClient: - def __init__(self): - self.testCluster = False - - self.cfgDict = { - "numOfLogLines": "100000000", - "numOfThreadsPerCore": "2.0", - "locale": "en_US.UTF-8", - "charset": "UTF-8", - "asyncLog": "0", - "minTablesPerVnode": "4", - "maxTablesPerVnode": "1000", - "tableIncStepPerVnode": "10000", - "maxVgroupsPerDb": "1000", - "sdbDebugFlag": "143", - "rpcDebugFlag": "135", - "tmrDebugFlag": "131", - "cDebugFlag": "135", - "udebugFlag": "135", - "jnidebugFlag": "135", - "qdebugFlag": "135", - "telemetryReporting": "0", - } - def init(self, path): - self.__init__() - self.path = path - - def getLogDir(self): - self.logDir = os.path.join(self.path,"sim","psim","log") - return self.logDir - - def getCfgDir(self): - self.cfgDir = os.path.join(self.path,"sim","psim","cfg") - return self.cfgDir - - def setTestCluster(self, value): - self.testCluster = value - - def addExtraCfg(self, option, value): - self.cfgDict.update({option: value}) - - def cfg(self, option, value): - cmd = "echo %s %s >> %s" % (option, value, self.cfgPath) - if os.system(cmd) != 0: - tdLog.exit(cmd) - - def deploy(self): - self.logDir = os.path.join(self.path,"sim","psim","log") - self.cfgDir = os.path.join(self.path,"sim","psim","cfg") - self.cfgPath = os.path.join(self.path,"sim","psim","cfg","taos.cfg") - - cmd = "rm -rf " + self.logDir - if os.system(cmd) != 0: - tdLog.exit(cmd) - - cmd = "mkdir -p " + self.logDir - if os.system(cmd) != 0: - tdLog.exit(cmd) - - cmd = "rm -rf " + self.cfgDir - if os.system(cmd) != 0: - tdLog.exit(cmd) - - cmd = "mkdir -p " + self.cfgDir - if os.system(cmd) != 0: - tdLog.exit(cmd) - - cmd = "touch " + self.cfgPath - if os.system(cmd) != 0: - tdLog.exit(cmd) - - if self.testCluster: - self.cfg("masterIp", "192.168.0.1") - self.cfg("secondIp", "192.168.0.2") - self.cfg("logDir", self.logDir) - - for key, value in self.cfgDict.items(): - self.cfg(key, value) - - tdLog.debug("psim is deployed and configured by %s" % (self.cfgPath)) - - -class TDDnode: - def __init__(self, index): - self.index = index - self.running = 0 - self.deployed = 0 - self.testCluster = False - self.valgrind = 0 - - def init(self, path): - self.path = path - - def setTestCluster(self, value): - self.testCluster = value - - def setValgrind(self, value): - self.valgrind = value - - def getDataSize(self): - totalSize = 0 - - if (self.deployed == 1): - for dirpath, dirnames, filenames in os.walk(self.dataDir): - for f in filenames: - fp = os.path.join(dirpath, f) - - if not os.path.islink(fp): - totalSize = totalSize + os.path.getsize(fp) - - return totalSize - - def deploy(self): - self.logDir = os.path.join(self.path,"sim","dnode%d" % self.index, "log") - self.dataDir = os.path.join(self.path,"sim","dnode%d" % self.index, "data") - self.cfgDir = os.path.join(self.path,"sim","dnode%d" % self.index, "cfg") - self.cfgPath = os.path.join(self.path,"sim","dnode%d" % self.index, "cfg","taos.cfg") - - cmd = "rm -rf " + self.dataDir - if os.system(cmd) != 0: - tdLog.exit(cmd) - - cmd = "rm -rf " + self.logDir - if os.system(cmd) != 0: - tdLog.exit(cmd) - - cmd = "rm -rf " + self.cfgDir - if os.system(cmd) != 0: - tdLog.exit(cmd) - - cmd = "mkdir -p " + self.dataDir - if os.system(cmd) != 0: - tdLog.exit(cmd) - - cmd = "mkdir -p " + self.logDir - if os.system(cmd) != 0: - tdLog.exit(cmd) - - cmd = "mkdir -p " + self.cfgDir - if os.system(cmd) != 0: - tdLog.exit(cmd) - - cmd = "touch " + self.cfgPath - if os.system(cmd) != 0: - tdLog.exit(cmd) - - if self.testCluster: - self.startIP() - - if self.testCluster: - self.cfg("masterIp", "192.168.0.1") - self.cfg("secondIp", "192.168.0.2") - self.cfg("publicIp", "192.168.0.%d" % (self.index)) - self.cfg("internalIp", "192.168.0.%d" % (self.index)) - self.cfg("privateIp", "192.168.0.%d" % (self.index)) - self.cfg("dataDir", self.dataDir) - self.cfg("logDir", self.logDir) - self.cfg("numOfLogLines", "100000000") - self.cfg("mnodeEqualVnodeNum", "0") - self.cfg("walLevel", "2") - self.cfg("fsync", "1000") - self.cfg("statusInterval", "1") - self.cfg("numOfMnodes", "3") - self.cfg("numOfThreadsPerCore", "2.0") - self.cfg("monitor", "0") - self.cfg("maxVnodeConnections", "30000") - self.cfg("maxMgmtConnections", "30000") - self.cfg("maxMeterConnections", "30000") - self.cfg("maxShellConns", "30000") - self.cfg("locale", "en_US.UTF-8") - self.cfg("charset", "UTF-8") - self.cfg("asyncLog", "0") - self.cfg("anyIp", "0") - self.cfg("dDebugFlag", "135") - self.cfg("mDebugFlag", "135") - self.cfg("sdbDebugFlag", "135") - self.cfg("rpcDebugFlag", "135") - self.cfg("tmrDebugFlag", "131") - self.cfg("cDebugFlag", "135") - self.cfg("httpDebugFlag", "135") - self.cfg("monitorDebugFlag", "135") - self.cfg("udebugFlag", "135") - self.cfg("jnidebugFlag", "135") - self.cfg("qdebugFlag", "135") - self.deployed = 1 - tdLog.debug( - "dnode:%d is deployed and configured by %s" % - (self.index, self.cfgPath)) - - def getBuildPath(self): - selfPath = os.path.dirname(os.path.realpath(__file__)) - - if ("community" in selfPath): - projPath = selfPath[:selfPath.find("community")] - else: - projPath = selfPath[:selfPath.find("tests")] - - for root, dirs, files in os.walk(projPath): - if ("taosd" in files): - rootRealPath = os.path.dirname(os.path.realpath(root)) - if ("packaging" not in rootRealPath): - buildPath = root[:len(root)-len("/build/bin")] - break - return buildPath - - def start(self): - buildPath = self.getBuildPath() - - if (buildPath == ""): - tdLog.exit("taosd not found!") - else: - tdLog.info("taosd found in %s" % buildPath) - - binPath = buildPath + "/build/bin/taosd" - - if self.deployed == 0: - tdLog.exit("dnode:%d is not deployed" % (self.index)) - - if self.valgrind == 0: - cmd = "nohup %s -c %s > /dev/null 2>&1 & " % ( - binPath, self.cfgDir) - else: - valgrindCmdline = "valgrind --tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all -v --workaround-gcc296-bugs=yes" - - cmd = "nohup %s %s -c %s 2>&1 & " % ( - valgrindCmdline, binPath, self.cfgDir) - - print(cmd) - - if os.system(cmd) != 0: - tdLog.exit(cmd) - self.running = 1 - tdLog.debug("dnode:%d is running with %s " % (self.index, cmd)) - - tdLog.debug("wait 5 seconds for the dnode:%d to start." % (self.index)) - time.sleep(5) - - def stop(self): - if self.valgrind == 0: - toBeKilled = "taosd" - else: - toBeKilled = "valgrind.bin" - - if self.running != 0: - psCmd = "ps -ef|grep -w %s| grep -v grep | awk '{print $2}'" % toBeKilled - processID = subprocess.check_output( - psCmd, shell=True).decode("utf-8") - - while(processID): - killCmd = "kill -INT %s > /dev/null 2>&1" % processID - os.system(killCmd) - time.sleep(1) - processID = subprocess.check_output( - psCmd, shell=True).decode("utf-8") - for port in range(6030, 6041): - fuserCmd = "fuser -k -n tcp %d" % port - os.system(fuserCmd) - if self.valgrind: - time.sleep(2) - - self.running = 0 - tdLog.debug("dnode:%d is stopped by kill -INT" % (self.index)) - - def forcestop(self): - if self.valgrind == 0: - toBeKilled = "taosd" - else: - toBeKilled = "valgrind.bin" - - if self.running != 0: - psCmd = "ps -ef|grep -w %s| grep -v grep | awk '{print $2}'" % toBeKilled - processID = subprocess.check_output( - psCmd, shell=True).decode("utf-8") - - while(processID): - killCmd = "kill -KILL %s > /dev/null 2>&1" % processID - os.system(killCmd) - time.sleep(1) - processID = subprocess.check_output( - psCmd, shell=True).decode("utf-8") - for port in range(6030, 6041): - fuserCmd = "fuser -k -n tcp %d" % port - os.system(fuserCmd) - if self.valgrind: - time.sleep(2) - - self.running = 0 - tdLog.debug("dnode:%d is stopped by kill -KILL" % (self.index)) - - def startIP(self): - cmd = "sudo ifconfig lo:%d 192.168.0.%d up" % (self.index, self.index) - if os.system(cmd) != 0: - tdLog.exit(cmd) - - def stopIP(self): - cmd = "sudo ifconfig lo:%d 192.168.0.%d down" % ( - self.index, self.index) - if os.system(cmd) != 0: - tdLog.exit(cmd) - - def cfg(self, option, value): - cmd = "echo %s %s >> %s" % (option, value, self.cfgPath) - if os.system(cmd) != 0: - tdLog.exit(cmd) - - def getDnodeRootDir(self, index): - dnodeRootDir = os.path.join(self.path,"sim","psim","dnode%d" % index) - return dnodeRootDir - - def getDnodesRootDir(self): - dnodesRootDir = os.path.join(self.path,"sim","psim") - return dnodesRootDir - - -class TDDnodes: - def __init__(self): - self.dnodes = [] - self.dnodes.append(TDDnode(1)) - self.dnodes.append(TDDnode(2)) - self.dnodes.append(TDDnode(3)) - self.dnodes.append(TDDnode(4)) - self.dnodes.append(TDDnode(5)) - self.dnodes.append(TDDnode(6)) - self.dnodes.append(TDDnode(7)) - self.dnodes.append(TDDnode(8)) - self.dnodes.append(TDDnode(9)) - self.dnodes.append(TDDnode(10)) - self.simDeployed = False - - def init(self, path): - psCmd = "ps -ef|grep -w taosd| grep -v grep | awk '{print $2}'" - processID = subprocess.check_output(psCmd, shell=True).decode("utf-8") - while(processID): - killCmd = "kill -TERM %s > /dev/null 2>&1" % processID - os.system(killCmd) - time.sleep(1) - processID = subprocess.check_output( - psCmd, shell=True).decode("utf-8") - - psCmd = "ps -ef|grep -w valgrind.bin| grep -v grep | awk '{print $2}'" - processID = subprocess.check_output(psCmd, shell=True).decode("utf-8") - while(processID): - killCmd = "kill -TERM %s > /dev/null 2>&1" % processID - os.system(killCmd) - time.sleep(1) - processID = subprocess.check_output( - psCmd, shell=True).decode("utf-8") - - binPath = os.path.dirname(os.path.realpath(__file__)) - binPath = binPath + "/../../../debug/" - tdLog.debug("binPath %s" % (binPath)) - binPath = os.path.realpath(binPath) - tdLog.debug("binPath real path %s" % (binPath)) - - # cmd = "sudo cp %s/build/lib/libtaos.so /usr/local/lib/taos/" % (binPath) - # tdLog.debug(cmd) - # os.system(cmd) - - # cmd = "sudo cp %s/build/bin/taos /usr/local/bin/taos/" % (binPath) - # if os.system(cmd) != 0 : - # tdLog.exit(cmd) - # tdLog.debug("execute %s" % (cmd)) - - # cmd = "sudo cp %s/build/bin/taosd /usr/local/bin/taos/" % (binPath) - # if os.system(cmd) != 0 : - # tdLog.exit(cmd) - # tdLog.debug("execute %s" % (cmd)) - - if path == "": - # self.path = os.path.expanduser('~') - self.path = os.path.abspath(binPath + "../../") - else: - self.path = os.path.realpath(path) - - for i in range(len(self.dnodes)): - self.dnodes[i].init(self.path) - - self.sim = TDSimClient() - self.sim.init(self.path) - - def setTestCluster(self, value): - self.testCluster = value - - def setValgrind(self, value): - self.valgrind = value - - def deploy(self, index): - self.sim.setTestCluster(self.testCluster) - - if (self.simDeployed == False): - self.sim.deploy() - self.simDeployed = True - - self.check(index) - self.dnodes[index - 1].setTestCluster(self.testCluster) - self.dnodes[index - 1].setValgrind(self.valgrind) - self.dnodes[index - 1].deploy() - - def cfg(self, index, option, value): - self.check(index) - self.dnodes[index - 1].cfg(option, value) - - def start(self, index): - self.check(index) - self.dnodes[index - 1].start() - - def stop(self, index): - self.check(index) - self.dnodes[index - 1].stop() - - def getDataSize(self, index): - self.check(index) - return self.dnodes[index - 1].getDataSize() - - def forcestop(self, index): - self.check(index) - self.dnodes[index - 1].forcestop() - - def startIP(self, index): - self.check(index) - - if self.testCluster: - self.dnodes[index - 1].startIP() - - def stopIP(self, index): - self.check(index) - - if self.dnodes[index - 1].testCluster: - self.dnodes[index - 1].stopIP() - - def check(self, index): - if index < 1 or index > 10: - tdLog.exit("index:%d should on a scale of [1, 10]" % (index)) - - def stopAll(self): - tdLog.info("stop all dnodes") - for i in range(len(self.dnodes)): - self.dnodes[i].stop() - - psCmd = "ps -ef | grep -w taosd | grep 'root' | grep -v grep | awk '{print $2}'" - processID = subprocess.check_output(psCmd, shell=True).decode("utf-8") - if processID: - cmd = "sudo systemctl stop taosd" - os.system(cmd) - # if os.system(cmd) != 0 : - # tdLog.exit(cmd) - psCmd = "ps -ef|grep -w taosd| grep -v grep | awk '{print $2}'" - processID = subprocess.check_output(psCmd, shell=True).decode("utf-8") - while(processID): - killCmd = "kill -TERM %s > /dev/null 2>&1" % processID - os.system(killCmd) - time.sleep(1) - processID = subprocess.check_output( - psCmd, shell=True).decode("utf-8") - - psCmd = "ps -ef|grep -w valgrind.bin| grep -v grep | awk '{print $2}'" - processID = subprocess.check_output(psCmd, shell=True).decode("utf-8") - while(processID): - killCmd = "kill -TERM %s > /dev/null 2>&1" % processID - os.system(killCmd) - time.sleep(1) - processID = subprocess.check_output( - psCmd, shell=True).decode("utf-8") - - # if os.system(cmd) != 0 : - # tdLog.exit(cmd) - - def getDnodesRootDir(self): - dnodesRootDir = "%s/sim" % (self.path) - return dnodesRootDir - - def getSimCfgPath(self): - return self.sim.getCfgDir() - - def getSimLogPath(self): - return self.sim.getLogDir() - - def addSimExtraCfg(self, option, value): - self.sim.addExtraCfg(option, value) - - -tdDnodes = TDDnodes() diff --git a/tests/army/frame/dnodes-no-random-fail.py b/tests/army/frame/dnodes-no-random-fail.py deleted file mode 100644 index 3b3083396b..0000000000 --- a/tests/army/frame/dnodes-no-random-fail.py +++ /dev/null @@ -1,500 +0,0 @@ -################################################################### -# Copyright (c) 2016 by TAOS Technologies, Inc. -# All rights reserved. -# -# This file is proprietary and confidential to TAOS Technologies. -# No part of this file may be reproduced, stored, transmitted, -# disclosed or used in any form or by any means other than as -# expressly provided by the written permission from Jianhui Tao -# -################################################################### - -# -*- coding: utf-8 -*- - -import sys -import os -import os.path -import subprocess -from frame.log import * - - -class TDSimClient: - def __init__(self): - self.testCluster = False - - self.cfgDict = { - "numOfLogLines": "100000000", - "numOfThreadsPerCore": "2.0", - "locale": "en_US.UTF-8", - "charset": "UTF-8", - "asyncLog": "0", - "anyIp": "0", - "sdbDebugFlag": "135", - "rpcDebugFlag": "135", - "tmrDebugFlag": "131", - "cDebugFlag": "135", - "udebugFlag": "135", - "jnidebugFlag": "135", - "qdebugFlag": "135", - "telemetryReporting": "0", - } - - def init(self, path): - self.__init__() - self.path = path - - def getLogDir(self): - self.logDir = os.path.join(self.path,"sim","psim","log") - return self.logDir - - def getCfgDir(self): - self.cfgDir = os.path.join(self.path,"sim","psim","cfg") - return self.cfgDir - - def setTestCluster(self, value): - self.testCluster = value - - def addExtraCfg(self, option, value): - self.cfgDict.update({option: value}) - - def cfg(self, option, value): - cmd = "echo %s %s >> %s" % (option, value, self.cfgPath) - if os.system(cmd) != 0: - tdLog.exit(cmd) - - def deploy(self): - self.logDir = os.path.join(self.path,"sim","psim","log") - self.cfgDir = os.path.join(self.path,"sim","psim","cfg") - self.cfgPath = os.path.join(self.path,"sim","psim","cfg","taos.cfg") - - cmd = "rm -rf " + self.logDir - if os.system(cmd) != 0: - tdLog.exit(cmd) - - cmd = "mkdir -p " + self.logDir - if os.system(cmd) != 0: - tdLog.exit(cmd) - - cmd = "rm -rf " + self.cfgDir - if os.system(cmd) != 0: - tdLog.exit(cmd) - - cmd = "mkdir -p " + self.cfgDir - if os.system(cmd) != 0: - tdLog.exit(cmd) - - cmd = "touch " + self.cfgPath - if os.system(cmd) != 0: - tdLog.exit(cmd) - - if self.testCluster: - self.cfg("masterIp", "192.168.0.1") - self.cfg("secondIp", "192.168.0.2") - self.cfg("logDir", self.logDir) - - for key, value in self.cfgDict.items(): - self.cfg(key, value) - - tdLog.debug("psim is deployed and configured by %s" % (self.cfgPath)) - - -class TDDnode: - def __init__(self, index): - self.index = index - self.running = 0 - self.deployed = 0 - self.testCluster = False - self.valgrind = 0 - - def init(self, path): - self.path = path - - def setTestCluster(self, value): - self.testCluster = value - - def setValgrind(self, value): - self.valgrind = value - - def getDataSize(self): - totalSize = 0 - - if (self.deployed == 1): - for dirpath, dirnames, filenames in os.walk(self.dataDir): - for f in filenames: - fp = os.path.join(dirpath, f) - - if not os.path.islink(fp): - totalSize = totalSize + os.path.getsize(fp) - - return totalSize - - def deploy(self): - self.logDir = os.path.join(self.path,"sim","dnode%d" % self.index, "log") - self.dataDir = os.path.join(self.path,"sim","dnode%d" % self.index, "data") - self.cfgDir = os.path.join(self.path,"sim","dnode%d" % self.index, "cfg") - self.cfgPath = os.path.join(self.path,"sim","dnode%d" % self.index, "cfg","taos.cfg") - - cmd = "rm -rf " + self.dataDir - if os.system(cmd) != 0: - tdLog.exit(cmd) - - cmd = "rm -rf " + self.logDir - if os.system(cmd) != 0: - tdLog.exit(cmd) - - cmd = "rm -rf " + self.cfgDir - if os.system(cmd) != 0: - tdLog.exit(cmd) - - cmd = "mkdir -p " + self.dataDir - if os.system(cmd) != 0: - tdLog.exit(cmd) - - cmd = "mkdir -p " + self.logDir - if os.system(cmd) != 0: - tdLog.exit(cmd) - - cmd = "mkdir -p " + self.cfgDir - if os.system(cmd) != 0: - tdLog.exit(cmd) - - cmd = "touch " + self.cfgPath - if os.system(cmd) != 0: - tdLog.exit(cmd) - - if self.testCluster: - self.startIP() - - if self.testCluster: - self.cfg("masterIp", "192.168.0.1") - self.cfg("secondIp", "192.168.0.2") - self.cfg("publicIp", "192.168.0.%d" % (self.index)) - self.cfg("internalIp", "192.168.0.%d" % (self.index)) - self.cfg("privateIp", "192.168.0.%d" % (self.index)) - self.cfg("dataDir", self.dataDir) - self.cfg("logDir", self.logDir) - self.cfg("numOfLogLines", "100000000") - self.cfg("mnodeEqualVnodeNum", "0") - self.cfg("walLevel", "2") - self.cfg("fsync", "1000") - self.cfg("statusInterval", "1") - self.cfg("numOfMnodes", "3") - self.cfg("numOfThreadsPerCore", "2.0") - self.cfg("monitor", "0") - self.cfg("maxVnodeConnections", "30000") - self.cfg("maxMgmtConnections", "30000") - self.cfg("maxMeterConnections", "30000") - self.cfg("maxShellConns", "30000") - self.cfg("locale", "en_US.UTF-8") - self.cfg("charset", "UTF-8") - self.cfg("asyncLog", "0") - self.cfg("anyIp", "0") - self.cfg("dDebugFlag", "135") - self.cfg("mDebugFlag", "135") - self.cfg("sdbDebugFlag", "135") - self.cfg("rpcDebugFlag", "135") - self.cfg("tmrDebugFlag", "131") - self.cfg("cDebugFlag", "135") - self.cfg("httpDebugFlag", "135") - self.cfg("monitorDebugFlag", "135") - self.cfg("udebugFlag", "135") - self.cfg("jnidebugFlag", "135") - self.cfg("qdebugFlag", "135") - self.deployed = 1 - tdLog.debug( - "dnode:%d is deployed and configured by %s" % - (self.index, self.cfgPath)) - - def getBuildPath(self): - selfPath = os.path.dirname(os.path.realpath(__file__)) - - if ("community" in selfPath): - projPath = selfPath[:selfPath.find("community")] - else: - projPath = selfPath[:selfPath.find("tests")] - - for root, dirs, files in os.walk(projPath): - if ("taosd" in files): - rootRealPath = os.path.dirname(os.path.realpath(root)) - if ("packaging" not in rootRealPath): - buildPath = root[:len(root)-len("/build/bin")] - break - return buildPath - - def start(self): - buildPath = self.getBuildPath() - - if (buildPath == ""): - tdLog.exit("taosd not found!") - else: - tdLog.info("taosd found in %s" % buildPath) - - binPath = buildPath + "/build/bin/taosd" - - if self.deployed == 0: - tdLog.exit("dnode:%d is not deployed" % (self.index)) - - if self.valgrind == 0: - cmd = "nohup %s -c %s --random-file-fail-factor 0 > /dev/null 2>&1 & " % ( - binPath, self.cfgDir) - else: - valgrindCmdline = "valgrind --tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all -v --workaround-gcc296-bugs=yes" - - cmd = "nohup %s %s -c %s 2>&1 & " % ( - valgrindCmdline, binPath, self.cfgDir) - - print(cmd) - - if os.system(cmd) != 0: - tdLog.exit(cmd) - self.running = 1 - tdLog.debug("dnode:%d is running with %s " % (self.index, cmd)) - - tdLog.debug("wait 5 seconds for the dnode:%d to start." % (self.index)) - time.sleep(5) - - def stop(self): - if self.valgrind == 0: - toBeKilled = "taosd" - else: - toBeKilled = "valgrind.bin" - - if self.running != 0: - psCmd = "ps -ef|grep -w %s| grep -v grep | awk '{print $2}'" % toBeKilled - processID = subprocess.check_output( - psCmd, shell=True).decode("utf-8") - - while(processID): - killCmd = "kill -INT %s > /dev/null 2>&1" % processID - os.system(killCmd) - time.sleep(1) - processID = subprocess.check_output( - psCmd, shell=True).decode("utf-8") - for port in range(6030, 6041): - fuserCmd = "fuser -k -n tcp %d" % port - os.system(fuserCmd) - if self.valgrind: - time.sleep(2) - - self.running = 0 - tdLog.debug("dnode:%d is stopped by kill -INT" % (self.index)) - - def forcestop(self): - if self.valgrind == 0: - toBeKilled = "taosd" - else: - toBeKilled = "valgrind.bin" - - if self.running != 0: - psCmd = "ps -ef|grep -w %s| grep -v grep | awk '{print $2}'" % toBeKilled - processID = subprocess.check_output( - psCmd, shell=True).decode("utf-8") - - while(processID): - killCmd = "kill -KILL %s > /dev/null 2>&1" % processID - os.system(killCmd) - time.sleep(1) - processID = subprocess.check_output( - psCmd, shell=True).decode("utf-8") - for port in range(6030, 6041): - fuserCmd = "fuser -k -n tcp %d" % port - os.system(fuserCmd) - if self.valgrind: - time.sleep(2) - - self.running = 0 - tdLog.debug("dnode:%d is stopped by kill -KILL" % (self.index)) - - def startIP(self): - cmd = "sudo ifconfig lo:%d 192.168.0.%d up" % (self.index, self.index) - if os.system(cmd) != 0: - tdLog.exit(cmd) - - def stopIP(self): - cmd = "sudo ifconfig lo:%d 192.168.0.%d down" % ( - self.index, self.index) - if os.system(cmd) != 0: - tdLog.exit(cmd) - - def cfg(self, option, value): - cmd = "echo %s %s >> %s" % (option, value, self.cfgPath) - if os.system(cmd) != 0: - tdLog.exit(cmd) - - def getDnodeRootDir(self, index): - dnodeRootDir = os.path.join(self.path,"sim","psim","dnode%d" % index) - return dnodeRootDir - - def getDnodesRootDir(self): - dnodesRootDir = os.path.join(self.path,"sim","psim") - return dnodesRootDir - - -class TDDnodes: - def __init__(self): - self.dnodes = [] - self.dnodes.append(TDDnode(1)) - self.dnodes.append(TDDnode(2)) - self.dnodes.append(TDDnode(3)) - self.dnodes.append(TDDnode(4)) - self.dnodes.append(TDDnode(5)) - self.dnodes.append(TDDnode(6)) - self.dnodes.append(TDDnode(7)) - self.dnodes.append(TDDnode(8)) - self.dnodes.append(TDDnode(9)) - self.dnodes.append(TDDnode(10)) - self.simDeployed = False - - def init(self, path): - psCmd = "ps -ef|grep -w taosd| grep -v grep | awk '{print $2}'" - processID = subprocess.check_output(psCmd, shell=True).decode("utf-8") - while(processID): - killCmd = "kill -TERM %s > /dev/null 2>&1" % processID - os.system(killCmd) - time.sleep(1) - processID = subprocess.check_output( - psCmd, shell=True).decode("utf-8") - - psCmd = "ps -ef|grep -w valgrind.bin| grep -v grep | awk '{print $2}'" - processID = subprocess.check_output(psCmd, shell=True).decode("utf-8") - while(processID): - killCmd = "kill -TERM %s > /dev/null 2>&1" % processID - os.system(killCmd) - time.sleep(1) - processID = subprocess.check_output( - psCmd, shell=True).decode("utf-8") - - binPath = os.path.dirname(os.path.realpath(__file__)) - binPath = binPath + "/../../../debug/" - tdLog.debug("binPath %s" % (binPath)) - binPath = os.path.realpath(binPath) - tdLog.debug("binPath real path %s" % (binPath)) - - # cmd = "sudo cp %s/build/lib/libtaos.so /usr/local/lib/taos/" % (binPath) - # tdLog.debug(cmd) - # os.system(cmd) - - # cmd = "sudo cp %s/build/bin/taos /usr/local/bin/taos/" % (binPath) - # if os.system(cmd) != 0 : - # tdLog.exit(cmd) - # tdLog.debug("execute %s" % (cmd)) - - # cmd = "sudo cp %s/build/bin/taosd /usr/local/bin/taos/" % (binPath) - # if os.system(cmd) != 0 : - # tdLog.exit(cmd) - # tdLog.debug("execute %s" % (cmd)) - - if path == "": - # self.path = os.path.expanduser('~') - self.path = os.path.abspath(binPath + "../../") - else: - self.path = os.path.realpath(path) - - for i in range(len(self.dnodes)): - self.dnodes[i].init(self.path) - - self.sim = TDSimClient() - self.sim.init(self.path) - - def setTestCluster(self, value): - self.testCluster = value - - def setValgrind(self, value): - self.valgrind = value - - def deploy(self, index): - self.sim.setTestCluster(self.testCluster) - - if (self.simDeployed == False): - self.sim.deploy() - self.simDeployed = True - - self.check(index) - self.dnodes[index - 1].setTestCluster(self.testCluster) - self.dnodes[index - 1].setValgrind(self.valgrind) - self.dnodes[index - 1].deploy() - - def cfg(self, index, option, value): - self.check(index) - self.dnodes[index - 1].cfg(option, value) - - def start(self, index): - self.check(index) - self.dnodes[index - 1].start() - - def stop(self, index): - self.check(index) - self.dnodes[index - 1].stop() - - def getDataSize(self, index): - self.check(index) - return self.dnodes[index - 1].getDataSize() - - def forcestop(self, index): - self.check(index) - self.dnodes[index - 1].forcestop() - - def startIP(self, index): - self.check(index) - - if self.testCluster: - self.dnodes[index - 1].startIP() - - def stopIP(self, index): - self.check(index) - - if self.dnodes[index - 1].testCluster: - self.dnodes[index - 1].stopIP() - - def check(self, index): - if index < 1 or index > 10: - tdLog.exit("index:%d should on a scale of [1, 10]" % (index)) - - def stopAll(self): - tdLog.info("stop all dnodes") - for i in range(len(self.dnodes)): - self.dnodes[i].stop() - - psCmd = "ps -ef | grep -w taosd | grep 'root' | grep -v grep | awk '{print $2}'" - processID = subprocess.check_output(psCmd, shell=True).decode("utf-8") - if processID: - cmd = "sudo systemctl stop taosd" - os.system(cmd) - # if os.system(cmd) != 0 : - # tdLog.exit(cmd) - psCmd = "ps -ef|grep -w taosd| grep -v grep | awk '{print $2}'" - processID = subprocess.check_output(psCmd, shell=True).decode("utf-8") - while(processID): - killCmd = "kill -TERM %s > /dev/null 2>&1" % processID - os.system(killCmd) - time.sleep(1) - processID = subprocess.check_output( - psCmd, shell=True).decode("utf-8") - - psCmd = "ps -ef|grep -w valgrind.bin| grep -v grep | awk '{print $2}'" - processID = subprocess.check_output(psCmd, shell=True).decode("utf-8") - while(processID): - killCmd = "kill -TERM %s > /dev/null 2>&1" % processID - os.system(killCmd) - time.sleep(1) - processID = subprocess.check_output( - psCmd, shell=True).decode("utf-8") - - # if os.system(cmd) != 0 : - # tdLog.exit(cmd) - - def getDnodesRootDir(self): - dnodesRootDir = "%s/sim" % (self.path) - return dnodesRootDir - - def getSimCfgPath(self): - return self.sim.getCfgDir() - - def getSimLogPath(self): - return self.sim.getLogDir() - - def addSimExtraCfg(self, option, value): - self.sim.addExtraCfg(option, value) - - -tdDnodes = TDDnodes() diff --git a/tests/army/frame/dnodes-random-fail.py b/tests/army/frame/dnodes-random-fail.py deleted file mode 100644 index 794adfd7ed..0000000000 --- a/tests/army/frame/dnodes-random-fail.py +++ /dev/null @@ -1,497 +0,0 @@ -################################################################### -# Copyright (c) 2016 by TAOS Technologies, Inc. -# All rights reserved. -# -# This file is proprietary and confidential to TAOS Technologies. -# No part of this file may be reproduced, stored, transmitted, -# disclosed or used in any form or by any means other than as -# expressly provided by the written permission from Jianhui Tao -# -################################################################### - -# -*- coding: utf-8 -*- - -import sys -import os -import os.path -import subprocess -from frame.log import * - - -class TDSimClient: - def __init__(self): - self.testCluster = False - - self.cfgDict = { - "numOfLogLines": "100000000", - "locale": "en_US.UTF-8", - "charset": "UTF-8", - "asyncLog": "0", - "rpcDebugFlag": "135", - "tmrDebugFlag": "131", - "cDebugFlag": "135", - "udebugFlag": "135", - "jnidebugFlag": "135", - "qdebugFlag": "135", - "telemetryReporting": "0", - } - - def init(self, path): - self.__init__() - self.path = path - - def getLogDir(self): - self.logDir = os.path.join(self.path,"sim","psim","log") - return self.logDir - - def getCfgDir(self): - self.cfgDir = os.path.join(self.path,"sim","psim","cfg") - return self.cfgDir - - def setTestCluster(self, value): - self.testCluster = value - - def addExtraCfg(self, option, value): - self.cfgDict.update({option: value}) - - def cfg(self, option, value): - cmd = "echo %s %s >> %s" % (option, value, self.cfgPath) - if os.system(cmd) != 0: - tdLog.exit(cmd) - - def deploy(self): - self.logDir = os.path.join(self.path,"sim","psim","log") - self.cfgDir = os.path.join(self.path,"sim","psim","cfg") - self.cfgPath = os.path.join(self.path,"sim","psim","cfg","taos.cfg") - - cmd = "rm -rf " + self.logDir - if os.system(cmd) != 0: - tdLog.exit(cmd) - - cmd = "mkdir -p " + self.logDir - if os.system(cmd) != 0: - tdLog.exit(cmd) - - cmd = "rm -rf " + self.cfgDir - if os.system(cmd) != 0: - tdLog.exit(cmd) - - cmd = "mkdir -p " + self.cfgDir - if os.system(cmd) != 0: - tdLog.exit(cmd) - - cmd = "touch " + self.cfgPath - if os.system(cmd) != 0: - tdLog.exit(cmd) - - if self.testCluster: - self.cfg("masterIp", "192.168.0.1") - self.cfg("secondIp", "192.168.0.2") - self.cfg("logDir", self.logDir) - - for key, value in self.cfgDict.items(): - self.cfg(key, value) - - tdLog.debug("psim is deployed and configured by %s" % (self.cfgPath)) - - -class TDDnode: - def __init__(self, index): - self.index = index - self.running = 0 - self.deployed = 0 - self.testCluster = False - self.valgrind = 0 - - def init(self, path): - self.path = path - - def setTestCluster(self, value): - self.testCluster = value - - def setValgrind(self, value): - self.valgrind = value - - def getDataSize(self): - totalSize = 0 - - if (self.deployed == 1): - for dirpath, dirnames, filenames in os.walk(self.dataDir): - for f in filenames: - fp = os.path.join(dirpath, f) - - if not os.path.islink(fp): - totalSize = totalSize + os.path.getsize(fp) - - return totalSize - - def deploy(self): - self.logDir = os.path.join(self.path,"sim","dnode%d" % self.index, "log") - self.dataDir = os.path.join(self.path,"sim","dnode%d" % self.index, "data") - self.cfgDir = os.path.join(self.path,"sim","dnode%d" % self.index, "cfg") - self.cfgPath = os.path.join(self.path,"sim","dnode%d" % self.index, "cfg","taos.cfg") - - cmd = "rm -rf " + self.dataDir - if os.system(cmd) != 0: - tdLog.exit(cmd) - - cmd = "rm -rf " + self.logDir - if os.system(cmd) != 0: - tdLog.exit(cmd) - - cmd = "rm -rf " + self.cfgDir - if os.system(cmd) != 0: - tdLog.exit(cmd) - - cmd = "mkdir -p " + self.dataDir - if os.system(cmd) != 0: - tdLog.exit(cmd) - - cmd = "mkdir -p " + self.logDir - if os.system(cmd) != 0: - tdLog.exit(cmd) - - cmd = "mkdir -p " + self.cfgDir - if os.system(cmd) != 0: - tdLog.exit(cmd) - - cmd = "touch " + self.cfgPath - if os.system(cmd) != 0: - tdLog.exit(cmd) - - if self.testCluster: - self.startIP() - - if self.testCluster: - self.cfg("masterIp", "192.168.0.1") - self.cfg("secondIp", "192.168.0.2") - self.cfg("publicIp", "192.168.0.%d" % (self.index)) - self.cfg("internalIp", "192.168.0.%d" % (self.index)) - self.cfg("privateIp", "192.168.0.%d" % (self.index)) - self.cfg("dataDir", self.dataDir) - self.cfg("logDir", self.logDir) - self.cfg("numOfLogLines", "100000000") - self.cfg("mnodeEqualVnodeNum", "0") - self.cfg("walLevel", "2") - self.cfg("fsync", "1000") - self.cfg("statusInterval", "1") - self.cfg("numOfMnodes", "3") - self.cfg("numOfThreadsPerCore", "2.0") - self.cfg("monitor", "0") - self.cfg("maxVnodeConnections", "30000") - self.cfg("maxMgmtConnections", "30000") - self.cfg("maxMeterConnections", "30000") - self.cfg("maxShellConns", "30000") - self.cfg("locale", "en_US.UTF-8") - self.cfg("charset", "UTF-8") - self.cfg("asyncLog", "0") - self.cfg("anyIp", "0") - self.cfg("dDebugFlag", "135") - self.cfg("mDebugFlag", "135") - self.cfg("sdbDebugFlag", "135") - self.cfg("rpcDebugFlag", "135") - self.cfg("tmrDebugFlag", "131") - self.cfg("cDebugFlag", "135") - self.cfg("httpDebugFlag", "135") - self.cfg("monitorDebugFlag", "135") - self.cfg("udebugFlag", "135") - self.cfg("jnidebugFlag", "135") - self.cfg("qdebugFlag", "135") - self.deployed = 1 - tdLog.debug( - "dnode:%d is deployed and configured by %s" % - (self.index, self.cfgPath)) - - def getBuildPath(self): - selfPath = os.path.dirname(os.path.realpath(__file__)) - - if ("community" in selfPath): - projPath = selfPath[:selfPath.find("community")] - else: - projPath = selfPath[:selfPath.find("tests")] - - for root, dirs, files in os.walk(projPath): - if ("taosd" in files): - rootRealPath = os.path.dirname(os.path.realpath(root)) - if ("packaging" not in rootRealPath): - buildPath = root[:len(root)-len("/build/bin")] - break - return buildPath - - def start(self): - buildPath = self.getBuildPath() - - if (buildPath == ""): - tdLog.exit("taosd not found!") - else: - tdLog.info("taosd found in %s" % buildPath) - - binPath = buildPath + "/build/bin/taosd" - - if self.deployed == 0: - tdLog.exit("dnode:%d is not deployed" % (self.index)) - - if self.valgrind == 0: - cmd = "nohup %s -c %s --alloc-random-fail --random-file-fail-factor 5 > /dev/null 2>&1 & " % ( - binPath, self.cfgDir) - else: - valgrindCmdline = "valgrind --tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all -v --workaround-gcc296-bugs=yes" - - cmd = "nohup %s %s -c %s 2>&1 & " % ( - valgrindCmdline, binPath, self.cfgDir) - - print(cmd) - - if os.system(cmd) != 0: - tdLog.exit(cmd) - self.running = 1 - tdLog.debug("dnode:%d is running with %s " % (self.index, cmd)) - - tdLog.debug("wait 5 seconds for the dnode:%d to start." % (self.index)) - time.sleep(5) - - def stop(self): - if self.valgrind == 0: - toBeKilled = "taosd" - else: - toBeKilled = "valgrind.bin" - - if self.running != 0: - psCmd = "ps -ef|grep -w %s| grep -v grep | awk '{print $2}'" % toBeKilled - processID = subprocess.check_output( - psCmd, shell=True).decode("utf-8") - - while(processID): - killCmd = "kill -INT %s > /dev/null 2>&1" % processID - os.system(killCmd) - time.sleep(1) - processID = subprocess.check_output( - psCmd, shell=True).decode("utf-8") - for port in range(6030, 6041): - fuserCmd = "fuser -k -n tcp %d" % port - os.system(fuserCmd) - if self.valgrind: - time.sleep(2) - - self.running = 0 - tdLog.debug("dnode:%d is stopped by kill -INT" % (self.index)) - - def forcestop(self): - if self.valgrind == 0: - toBeKilled = "taosd" - else: - toBeKilled = "valgrind.bin" - - if self.running != 0: - psCmd = "ps -ef|grep -w %s| grep -v grep | awk '{print $2}'" % toBeKilled - processID = subprocess.check_output( - psCmd, shell=True).decode("utf-8") - - while(processID): - killCmd = "kill -KILL %s > /dev/null 2>&1" % processID - os.system(killCmd) - time.sleep(1) - processID = subprocess.check_output( - psCmd, shell=True).decode("utf-8") - for port in range(6030, 6041): - fuserCmd = "fuser -k -n tcp %d" % port - os.system(fuserCmd) - if self.valgrind: - time.sleep(2) - - self.running = 0 - tdLog.debug("dnode:%d is stopped by kill -KILL" % (self.index)) - - def startIP(self): - cmd = "sudo ifconfig lo:%d 192.168.0.%d up" % (self.index, self.index) - if os.system(cmd) != 0: - tdLog.exit(cmd) - - def stopIP(self): - cmd = "sudo ifconfig lo:%d 192.168.0.%d down" % ( - self.index, self.index) - if os.system(cmd) != 0: - tdLog.exit(cmd) - - def cfg(self, option, value): - cmd = "echo %s %s >> %s" % (option, value, self.cfgPath) - if os.system(cmd) != 0: - tdLog.exit(cmd) - - def getDnodeRootDir(self, index): - dnodeRootDir = os.path.join(self.path,"sim","psim","dnode%d" % index) - return dnodeRootDir - - def getDnodesRootDir(self): - dnodesRootDir = os.path.join(self.path,"sim","psim") - return dnodesRootDir - - -class TDDnodes: - def __init__(self): - self.dnodes = [] - self.dnodes.append(TDDnode(1)) - self.dnodes.append(TDDnode(2)) - self.dnodes.append(TDDnode(3)) - self.dnodes.append(TDDnode(4)) - self.dnodes.append(TDDnode(5)) - self.dnodes.append(TDDnode(6)) - self.dnodes.append(TDDnode(7)) - self.dnodes.append(TDDnode(8)) - self.dnodes.append(TDDnode(9)) - self.dnodes.append(TDDnode(10)) - self.simDeployed = False - - def init(self, path): - psCmd = "ps -ef|grep -w taosd| grep -v grep | awk '{print $2}'" - processID = subprocess.check_output(psCmd, shell=True).decode("utf-8") - while(processID): - killCmd = "kill -TERM %s > /dev/null 2>&1" % processID - os.system(killCmd) - time.sleep(1) - processID = subprocess.check_output( - psCmd, shell=True).decode("utf-8") - - psCmd = "ps -ef|grep -w valgrind.bin| grep -v grep | awk '{print $2}'" - processID = subprocess.check_output(psCmd, shell=True).decode("utf-8") - while(processID): - killCmd = "kill -TERM %s > /dev/null 2>&1" % processID - os.system(killCmd) - time.sleep(1) - processID = subprocess.check_output( - psCmd, shell=True).decode("utf-8") - - binPath = os.path.dirname(os.path.realpath(__file__)) - binPath = binPath + "/../../../debug/" - tdLog.debug("binPath %s" % (binPath)) - binPath = os.path.realpath(binPath) - tdLog.debug("binPath real path %s" % (binPath)) - - # cmd = "sudo cp %s/build/lib/libtaos.so /usr/local/lib/taos/" % (binPath) - # tdLog.debug(cmd) - # os.system(cmd) - - # cmd = "sudo cp %s/build/bin/taos /usr/local/bin/taos/" % (binPath) - # if os.system(cmd) != 0 : - # tdLog.exit(cmd) - # tdLog.debug("execute %s" % (cmd)) - - # cmd = "sudo cp %s/build/bin/taosd /usr/local/bin/taos/" % (binPath) - # if os.system(cmd) != 0 : - # tdLog.exit(cmd) - # tdLog.debug("execute %s" % (cmd)) - - if path == "": - # self.path = os.path.expanduser('~') - self.path = os.path.abspath(binPath + "../../") - else: - self.path = os.path.realpath(path) - - for i in range(len(self.dnodes)): - self.dnodes[i].init(self.path) - - self.sim = TDSimClient() - self.sim.init(self.path) - - def setTestCluster(self, value): - self.testCluster = value - - def setValgrind(self, value): - self.valgrind = value - - def deploy(self, index): - self.sim.setTestCluster(self.testCluster) - - if (self.simDeployed == False): - self.sim.deploy() - self.simDeployed = True - - self.check(index) - self.dnodes[index - 1].setTestCluster(self.testCluster) - self.dnodes[index - 1].setValgrind(self.valgrind) - self.dnodes[index - 1].deploy() - - def cfg(self, index, option, value): - self.check(index) - self.dnodes[index - 1].cfg(option, value) - - def start(self, index): - self.check(index) - self.dnodes[index - 1].start() - - def stop(self, index): - self.check(index) - self.dnodes[index - 1].stop() - - def getDataSize(self, index): - self.check(index) - return self.dnodes[index - 1].getDataSize() - - def forcestop(self, index): - self.check(index) - self.dnodes[index - 1].forcestop() - - def startIP(self, index): - self.check(index) - - if self.testCluster: - self.dnodes[index - 1].startIP() - - def stopIP(self, index): - self.check(index) - - if self.dnodes[index - 1].testCluster: - self.dnodes[index - 1].stopIP() - - def check(self, index): - if index < 1 or index > 10: - tdLog.exit("index:%d should on a scale of [1, 10]" % (index)) - - def stopAll(self): - tdLog.info("stop all dnodes") - for i in range(len(self.dnodes)): - self.dnodes[i].stop() - - psCmd = "ps -ef | grep -w taosd | grep 'root' | grep -v grep | awk '{print $2}'" - processID = subprocess.check_output(psCmd, shell=True).decode("utf-8") - if processID: - cmd = "sudo systemctl stop taosd" - os.system(cmd) - # if os.system(cmd) != 0 : - # tdLog.exit(cmd) - psCmd = "ps -ef|grep -w taosd| grep -v grep | awk '{print $2}'" - processID = subprocess.check_output(psCmd, shell=True).decode("utf-8") - while(processID): - killCmd = "kill -TERM %s > /dev/null 2>&1" % processID - os.system(killCmd) - time.sleep(1) - processID = subprocess.check_output( - psCmd, shell=True).decode("utf-8") - - psCmd = "ps -ef|grep -w valgrind.bin| grep -v grep | awk '{print $2}'" - processID = subprocess.check_output(psCmd, shell=True).decode("utf-8") - while(processID): - killCmd = "kill -TERM %s > /dev/null 2>&1" % processID - os.system(killCmd) - time.sleep(1) - processID = subprocess.check_output( - psCmd, shell=True).decode("utf-8") - - # if os.system(cmd) != 0 : - # tdLog.exit(cmd) - - def getDnodesRootDir(self): - dnodesRootDir = "%s/sim" % (self.path) - return dnodesRootDir - - def getSimCfgPath(self): - return self.sim.getCfgDir() - - def getSimLogPath(self): - return self.sim.getLogDir() - - def addSimExtraCfg(self, option, value): - self.sim.addExtraCfg(option, value) - - -tdDnodes = TDDnodes() diff --git a/tests/army/frame/eos.py b/tests/army/frame/eos.py new file mode 100644 index 0000000000..dcb63ff3aa --- /dev/null +++ b/tests/army/frame/eos.py @@ -0,0 +1,36 @@ +################################################################### +# Copyright (c) 2023 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + +# +# about system funciton extension +# + +import sys +import os +import time +import datetime +import platform + +# if windows platform return True +def isWin(): + return platform.system().lower() == 'windows' + +# wait util execute file finished +def exe(file): + return os.system(file) + +# execute file and return immediately +def exeNoWait(file): + print("exe no wait") + + diff --git a/tests/army/frame/epath.py b/tests/army/frame/epath.py new file mode 100644 index 0000000000..edff9c78a4 --- /dev/null +++ b/tests/army/frame/epath.py @@ -0,0 +1,53 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + +# +# about path function extension +# + +import os +from frame.log import * + +# build/bin path +binDir = "" + +def binPath(): + global binDir + + if binDir != "": + return binDir + + selfPath = os.path.dirname(os.path.realpath(__file__)) + + if ("community/tests" in selfPath): + projPath = selfPath[:selfPath.find("community/tests")] + else: + projPath = selfPath[:selfPath.find("TDengine/tests")] + + for root, dirs, files in os.walk(projPath): + if ("taosd" in files): + rootRealPath = os.path.dirname(os.path.realpath(root)) + if ("packaging" not in rootRealPath): + buildPath = root[:len(root)-len("/build/bin")] + break + # check + if (buildPath == ""): + tdLog.exit("taosd not found!") + else: + tdLog.info(f"taosd found in {buildPath}") + # return + binDir = buildPath + "/build/bin/" + return binDir + +def binFile(filename): + return binPath() + filename diff --git a/tests/army/frame/etime.py b/tests/army/frame/etime.py new file mode 100644 index 0000000000..2ee223e122 --- /dev/null +++ b/tests/army/frame/etime.py @@ -0,0 +1,23 @@ +################################################################### +# Copyright (c) 2023 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + +# +# about tools funciton extension +# + +import sys +import os +import time +import datetime + + diff --git a/tests/army/frame/etool.py b/tests/army/frame/etool.py new file mode 100644 index 0000000000..35c390dc1a --- /dev/null +++ b/tests/army/frame/etool.py @@ -0,0 +1,46 @@ +################################################################### +# Copyright (c) 2023 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + +# +# about system funciton extension +# + +import sys +import os +import time +import datetime +import frame.epath +import frame.eos +from frame.log import * + +# run taosBenchmark with command or json file mode +def runBenchmark(command = "", json = "") : + # get taosBenchmark path + bmFile = frame.epath.binFile("taosBenchmark") + if frame.eos.isWin(): + bmFile += ".exe" + + # run + if command != "": + frame.eos.exe(bmFile + " " + command) + if json != "": + cmd = f"{bmFile} -f {json}" + print(cmd) + status = frame.eos.exe(cmd) + if status !=0: + tdLog.exit(f"run failed {cmd} status={status}") + + +# get current directory file name +def curFile(fullPath, filename): + return os.path.dirname(fullPath) + "/" + filename diff --git a/tests/army/frame/pathFinding.py b/tests/army/frame/pathFinding.py deleted file mode 100644 index df03f0ed68..0000000000 --- a/tests/army/frame/pathFinding.py +++ /dev/null @@ -1,83 +0,0 @@ -################################################################### -# Copyright (c) 2016 by TAOS Technologies, Inc. -# All rights reserved. -# -# This file is proprietary and confidential to TAOS Technologies. -# No part of this file may be reproduced, stored, transmitted, -# disclosed or used in any form or by any means other than as -# expressly provided by the written permission from Jianhui Tao -# -################################################################### - -# -*- coding: utf-8 -*- - - -import os -from frame.log import * - - - -class TDFindPath: - """This class is for finding path within TDengine - """ - def __init__(self): - self.file = "" - - - def init(self, file): - """[summary] - - Args: - file (str): the file location you want to start the query. Generally using __file__ - """ - self.file = file - - def getTaosdemoPath(self): - """for finding the path of directory containing taosdemo - - Returns: - str: the path to directory containing taosdemo - """ - selfPath = os.path.dirname(os.path.realpath(self.file)) - - if ("community" in selfPath): - projPath = selfPath[:selfPath.find("community")] - else: - projPath = selfPath[:selfPath.find("tests")] - - for root, dirs, files in os.walk(projPath): - if ("taosd" in files): - rootRealPath = os.path.dirname(os.path.realpath(root)) - if ("packaging" not in rootRealPath): - buildPath = root[:len(root)-len("/build/bin")] - break - if (buildPath == ""): - tdLog.exit("taosd not found!") - else: - tdLog.info(f"taosd found in {buildPath}") - return buildPath + "/build/bin/" - - def getTDenginePath(self): - """for finding the root path of TDengine - - Returns: - str: the root path of TDengine - """ - selfPath = os.path.dirname(os.path.realpath(self.file)) - - if ("community" in selfPath): - projPath = selfPath[:selfPath.find("community")] - else: - projPath = selfPath[:selfPath.find("tests")] - print(projPath) - for root, dirs, files in os.walk(projPath): - if ("sim" in dirs): - print(root) - rootRealPath = os.path.realpath(root) - if (rootRealPath == ""): - tdLog.exit("TDengine not found!") - else: - tdLog.info(f"TDengine found in {rootRealPath}") - return rootRealPath - -tdFindPath = TDFindPath() \ No newline at end of file diff --git a/tests/army/frame/cluster.py b/tests/army/frame/server/cluster.py similarity index 91% rename from tests/army/frame/cluster.py rename to tests/army/frame/server/cluster.py index 4da53840c0..ade8ac39a2 100644 --- a/tests/army/frame/cluster.py +++ b/tests/army/frame/server/cluster.py @@ -8,7 +8,7 @@ import socket from frame.log import * from frame.sql import * from frame.cases import * -from frame.dnodes import * +from frame.server.dnodes import * from frame.common import * class ClusterDnodes(TDDnodes): @@ -35,17 +35,21 @@ class ConfigureyCluster: self.startPort = 6030 self.portStep = 100 self.mnodeNums = 0 + self.level = 0 + self.disk = 0 - def configure_cluster(self ,dnodeNums=5,mnodeNums=0,independentMnode=True,startPort=6030,portStep=100,hostname="%s"%hostname): + def configure_cluster(self ,dnodeNums=5, mnodeNums=0, independentMnode=True, startPort=6030, portStep=100, hostname="%s"%hostname, level=1, disk=1): self.startPort=int(startPort) self.portStep=int(portStep) self.hostname=hostname self.dnodeNums = int(dnodeNums) self.mnodeNums = int(mnodeNums) + self.level = int(level) + self.disk = int(disk) self.dnodes = [] startPort_sec = int(startPort+portStep) for num in range(1, (self.dnodeNums+1)): - dnode = TDDnode(num) + dnode = TDDnode(num, self.level, self.disk) dnode.addExtraCfg("firstEp", f"{hostname}:{self.startPort}") dnode.addExtraCfg("fqdn", f"{hostname}") dnode.addExtraCfg("serverPort", f"{self.startPort + (num-1)*self.portStep}") diff --git a/tests/army/frame/dnodes.py b/tests/army/frame/server/dnodes.py similarity index 96% rename from tests/army/frame/dnodes.py rename to tests/army/frame/server/dnodes.py index 5581f29a57..0d40b665dd 100644 --- a/tests/army/frame/dnodes.py +++ b/tests/army/frame/server/dnodes.py @@ -115,8 +115,11 @@ class TDSimClient: class TDDnode: - def __init__(self, index): + def __init__(self, index=1, level=1, disk=1): self.index = index + self.level = level + self.disk = disk + self.dataDir = [] self.running = 0 self.deployed = 0 self.testCluster = False @@ -209,14 +212,30 @@ class TDDnode: self.remote_conn.run("python3 ./test.py %s -d %s -e %s"%(valgrindStr,remoteCfgDictStr,execCmdStr)) def deploy(self, *updatecfgDict): + # logDir self.logDir = os.path.join(self.path,"sim","dnode%d" % self.index, "log") - self.dataDir = os.path.join(self.path,"sim","dnode%d" % self.index, "data") + # dataDir + simPath = os.path.join(self.path, "sim", "dnode%d" % self.index) + primary = 1 + if self.level == 1 and self.disk == 1: + eDir = os.path.join(simPath, "data") + self.dataDir.append(eDir) + else: + for i in range(self.level): + for j in range(self.disk): + eDir = os.path.join(simPath, f"data{i}{j}") + self.dataDir.append(f"{eDir} {i} {primary}") + if primary == 1: + primary = 0 + + # taos.cfg self.cfgDir = os.path.join(self.path,"sim","dnode%d" % self.index, "cfg") self.cfgPath = os.path.join(self.path,"sim","dnode%d" % self.index, "cfg","taos.cfg") - - cmd = "rm -rf " + self.dataDir - if os.system(cmd) != 0: - tdLog.exit(cmd) + + for eDir in self.dataDir: + cmd = "rm -rf " + eDir + if os.system(cmd) != 0: + tdLog.exit(cmd) cmd = "rm -rf " + self.logDir if os.system(cmd) != 0: @@ -229,7 +248,8 @@ class TDDnode: # cmd = "mkdir -p " + self.dataDir # if os.system(cmd) != 0: # tdLog.exit(cmd) - os.makedirs(self.dataDir) + for eDir in self.dataDir: + os.makedirs(eDir.split(' ')[0]) # cmd = "mkdir -p " + self.logDir # if os.system(cmd) != 0: @@ -275,7 +295,11 @@ class TDDnode: self.addExtraCfg(key, value) if (self.remoteIP == ""): for key, value in self.cfgDict.items(): - self.cfg(key, value) + if type(value) == list: + for v in value: + self.cfg(key, v) + else: + self.cfg(key, value) else: self.remoteExec(self.cfgDict, "tdDnodes.deploy(%d,updateCfgDict)"%self.index) @@ -887,4 +911,10 @@ class TDDnodes: def getAsan(self): return self.asan -tdDnodes = TDDnodes() \ No newline at end of file + def setLevelDisk(self, level, disk): + for i in range(len(self.dnodes)): + self.dnodes[i].level = int(level) + self.dnodes[i].disk = int(disk) + + +tdDnodes = TDDnodes() diff --git a/tests/army/frame/sql.py b/tests/army/frame/sql.py index d7dce2bc3e..2e14f0c2f0 100644 --- a/tests/army/frame/sql.py +++ b/tests/army/frame/sql.py @@ -535,6 +535,16 @@ class TDSql: tdLog.info("%s(%d) failed: sql:%s, elm:%s == expect_elm:%s" % args) raise Exception + # check like select count(*) ... sql + def checkAgg(self, sql, expectCnt): + self.query(sql) + self.checkData(0, 0, expectCnt) + + # get first value + def getFirstValue(self, sql) : + self.query(sql) + return self.getData(0, 0) + def get_times(self, time_str, precision="ms"): caller = inspect.getframeinfo(inspect.stack()[1][0]) if time_str[-1] not in TAOS_TIME_INIT: @@ -602,6 +612,7 @@ class TDSql: if self.cursor.istype(col, "BIGINT UNSIGNED"): return "BIGINT UNSIGNED" + ''' def taosdStatus(self, state): tdLog.sleep(5) pstate = 0 @@ -630,6 +641,7 @@ class TDSql: tdLog.exit("taosd state is %d != expect:%d" %args) pass + def haveFile(self, dir, state): if os.path.exists(dir) and os.path.isdir(dir): if not os.listdir(dir): @@ -644,6 +656,7 @@ class TDSql: tdLog.exit("dir: %s is not empty, expect: empty" %dir) else: tdLog.exit("dir: %s doesn't exist" %dir) + def createDir(self, dir): if os.path.exists(dir): shrmtree(dir) @@ -651,5 +664,6 @@ class TDSql: os.makedirs( dir, 755 ) tdLog.info("dir: %s is created" %dir) pass +''' tdSql = TDSql() diff --git a/tests/army/frame/srvCtl.py b/tests/army/frame/srvCtl.py new file mode 100644 index 0000000000..e7ef08cde9 --- /dev/null +++ b/tests/army/frame/srvCtl.py @@ -0,0 +1,27 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + +import sys +import os +import time +import datetime + +class srvCtl: + def __init__(self): + # record server information + self.dnodeNum = 0 + self.mnodeNum = 0 + self.mLevel = 0 + self.mLevelDisk = 0 + +sc = srvCtl() \ No newline at end of file diff --git a/tests/army/test.py b/tests/army/test.py index cc114e0d16..a3e28b772d 100644 --- a/tests/army/test.py +++ b/tests/army/test.py @@ -28,9 +28,9 @@ import importlib import toml from frame.log import * -from frame.dnodes import * +from frame.server.dnodes import * +from frame.server.cluster import * from frame.cases import * -from frame.cluster import * from frame.taosadapter import * import taos @@ -111,8 +111,12 @@ if __name__ == "__main__": asan = False independentMnode = False previousCluster = False - opts, args = getopt.gnu_getopt(sys.argv[1:], 'f:p:m:l:scghrd:k:e:N:M:Q:C:RWD:n:i:aP', [ - 'file=', 'path=', 'master', 'logSql', 'stop', 'cluster', 'valgrind', 'help', 'restart', 'updateCfgDict', 'killv', 'execCmd','dnodeNums','mnodeNums','queryPolicy','createDnodeNums','restful','websocket','adaptercfgupdate','replicaVar','independentMnode','previous']) + level = 1 + disk = 1 + + opts, args = getopt.gnu_getopt(sys.argv[1:], 'f:p:m:l:scghrd:k:e:N:M:Q:C:RWU:n:i:aP:L:D:', [ + 'file=', 'path=', 'master', 'logSql', 'stop', 'cluster', 'valgrind', 'help', 'restart', 'updateCfgDict', 'killv', 'execCmd','dnodeNums','mnodeNums', + 'queryPolicy','createDnodeNums','restful','websocket','adaptercfgupdate','replicaVar','independentMnode',"asan",'previous','level','disk']) for key, value in opts: if key in ['-h', '--help']: tdLog.printNoPrefix( @@ -134,11 +138,13 @@ if __name__ == "__main__": tdLog.printNoPrefix('-C create Dnode Numbers in one cluster') tdLog.printNoPrefix('-R restful realization form') tdLog.printNoPrefix('-W websocket connection') - tdLog.printNoPrefix('-D taosadapter update cfg dict ') + tdLog.printNoPrefix('-U taosadapter update cfg dict ') tdLog.printNoPrefix('-n the number of replicas') tdLog.printNoPrefix('-i independentMnode Mnode') tdLog.printNoPrefix('-a address sanitizer mode') tdLog.printNoPrefix('-P run case with [P]revious cluster, do not create new cluster to run case.') + tdLog.printNoPrefix('-L set multiple level number. range 1 ~ 3') + tdLog.printNoPrefix('-D set disk number on each level. range 1 ~ 10') sys.exit(0) @@ -213,7 +219,7 @@ if __name__ == "__main__": if key in ['-a', '--asan']: asan = True - if key in ['-D', '--adaptercfgupdate']: + if key in ['-U', '--adaptercfgupdate']: try: adaptercfgupdate = eval(base64.b64decode(value.encode()).decode()) except: @@ -226,6 +232,12 @@ if __name__ == "__main__": if key in ['-P', '--previous']: previousCluster = True + if key in ['-L', '--level']: + level = value + + if key in ['-D', '--disk']: + disk = value + # # do exeCmd command # @@ -361,6 +373,7 @@ if __name__ == "__main__": tAdapter.stop(force_kill=True) if dnodeNums == 1 : + tdDnodes.setLevelDisk(level, disk) tdDnodes.deploy(1,updateCfgDict) tdDnodes.start(1) tdCases.logSql(logSql) @@ -391,7 +404,7 @@ if __name__ == "__main__": tdLog.exit(f"alter queryPolicy to {queryPolicy} failed") else : tdLog.debug("create an cluster with %s nodes and make %s dnode as independent mnode"%(dnodeNums,mnodeNums)) - dnodeslist = cluster.configure_cluster(dnodeNums=dnodeNums, mnodeNums=mnodeNums, independentMnode=independentMnode) + dnodeslist = cluster.configure_cluster(dnodeNums=dnodeNums, mnodeNums=mnodeNums, independentMnode=independentMnode, level=level, disk=disk) tdDnodes = ClusterDnodes(dnodeslist) tdDnodes.init(deployPath, masterIp) tdDnodes.setTestCluster(testCluster) @@ -498,6 +511,7 @@ if __name__ == "__main__": else: tdLog.info("not need to query") else: + # except windows tdDnodes.setKillValgrind(killValgrind) tdDnodes.init(deployPath, masterIp) tdDnodes.setTestCluster(testCluster) @@ -529,6 +543,7 @@ if __name__ == "__main__": if dnodeNums == 1 : # dnode is one + tdDnodes.setLevelDisk(level, disk) tdDnodes.deploy(1,updateCfgDict) tdDnodes.start(1) tdCases.logSql(logSql) @@ -574,7 +589,8 @@ if __name__ == "__main__": # dnode > 1 cluster tdLog.debug("create an cluster with %s nodes and make %s dnode as independent mnode"%(dnodeNums,mnodeNums)) print(independentMnode,"independentMnode valuse") - dnodeslist = cluster.configure_cluster(dnodeNums=dnodeNums, mnodeNums=mnodeNums, independentMnode=independentMnode) + # create dnode list + dnodeslist = cluster.configure_cluster(dnodeNums=dnodeNums, mnodeNums=mnodeNums, independentMnode=independentMnode, level=level, disk=disk) tdDnodes = ClusterDnodes(dnodeslist) tdDnodes.init(deployPath, masterIp) tdDnodes.setTestCluster(testCluster) diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index bcdd143cfc..19a44767a7 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -6,7 +6,7 @@ ,,y,unit-test,bash test.sh #army-test -,,y,army,./pytest.sh python3 ./test.py -f empty.py +,,y,army,./pytest.sh python3 ./test.py -f enterprise/multi-level/mlevel_basic.py -N 3 -L 3 -D 2 #system test ,,y,system-test,./pytest.sh python3 ./test.py -f 8-stream/stream_basic.py diff --git a/tests/pytest/util/sql.py b/tests/pytest/util/sql.py index 99d166ee33..f8a49983e1 100644 --- a/tests/pytest/util/sql.py +++ b/tests/pytest/util/sql.py @@ -78,6 +78,26 @@ class TDSql: self.cursor.execute(s) time.sleep(2) + def execute(self, sql, queryTimes=30, show=False): + self.sql = sql + if show: + tdLog.info(sql) + i=1 + while i <= queryTimes: + try: + self.affectedRows = self.cursor.execute(sql) + return self.affectedRows + except Exception as e: + tdLog.notice("Try to execute sql again, query times: %d "%i) + if i == queryTimes: + caller = inspect.getframeinfo(inspect.stack()[1][0]) + args = (caller.filename, caller.lineno, sql, repr(e)) + tdLog.notice("%s(%d) failed: sql:%s, %s" % args) + raise Exception(repr(e)) + i+=1 + time.sleep(1) + pass + def error(self, sql, expectedErrno = None, expectErrInfo = None, fullMatched = True): caller = inspect.getframeinfo(inspect.stack()[1][0]) expectErrNotOccured = True @@ -108,7 +128,7 @@ class TDSql: if expectErrInfo == self.error_info: tdLog.info("sql:%s, expected expectErrInfo '%s' occured" % (sql, expectErrInfo)) else: - tdLog.exit("%s(%d) failed: sql:%s, expectErrInfo '%s' occured, but not expected errno '%s'" % (caller.filename, caller.lineno, sql, self.error_info, expectErrInfo)) + tdLog.exit("%s(%d) failed: sql:%s, expectErrInfo '%s' occured, but not expected expectErrInfo '%s'" % (caller.filename, caller.lineno, sql, self.error_info, expectErrInfo)) else: if expectedErrno != None: if expectedErrno in self.errno: @@ -120,7 +140,7 @@ class TDSql: if expectErrInfo in self.error_info: tdLog.info("sql:%s, expected expectErrInfo '%s' occured" % (sql, expectErrInfo)) else: - tdLog.exit("%s(%d) failed: sql:%s, expectErrInfo %s occured, but not expected errno '%s'" % (caller.filename, caller.lineno, sql, self.error_info, expectErrInfo)) + tdLog.exit("%s(%d) failed: sql:%s, expectErrInfo %s occured, but not expected expectErrInfo '%s'" % (caller.filename, caller.lineno, sql, self.error_info, expectErrInfo)) return self.error_info @@ -158,6 +178,63 @@ class TDSql: time.sleep(1) pass + def query_success_failed(self, sql, row_tag=None, queryTimes=10, count_expected_res=None, expectErrInfo = None, fullMatched = True): + self.sql = sql + i=1 + while i <= queryTimes: + try: + self.cursor.execute(sql) + self.queryResult = self.cursor.fetchall() + self.queryRows = len(self.queryResult) + self.queryCols = len(self.cursor.description) + + if count_expected_res is not None: + counter = 0 + while count_expected_res != self.queryResult[0][0]: + self.cursor.execute(sql) + self.queryResult = self.cursor.fetchall() + if counter < queryTimes: + counter += 0.5 + time.sleep(0.5) + else: + return False + + tdLog.info("query is success") + time.sleep(1) + continue + except Exception as e: + tdLog.notice("Try to query again, query times: %d "%i) + caller = inspect.getframeinfo(inspect.stack()[1][0]) + if i < queryTimes: + error_info = repr(e) + print(error_info) + self.error_info = ','.join(error_info[error_info.index('(')+1:-1].split(",")[:-1]).replace("'","") + self.queryRows = 0 + self.queryCols = 0 + self.queryResult = None + + if fullMatched: + if expectErrInfo != None: + if expectErrInfo == self.error_info: + tdLog.info("sql:%s, expected expectErrInfo '%s' occured" % (sql, expectErrInfo)) + else: + tdLog.exit("%s(%d) failed: sql:%s, expectErrInfo '%s' occured, but not expected expectErrInfo '%s'" % (caller.filename, caller.lineno, sql, self.error_info, expectErrInfo)) + else: + if expectErrInfo != None: + if expectErrInfo in self.error_info: + tdLog.info("sql:%s, expected expectErrInfo '%s' occured" % (sql, expectErrInfo)) + else: + tdLog.exit("%s(%d) failed: sql:%s, expectErrInfo %s occured, but not expected expectErrInfo '%s'" % (caller.filename, caller.lineno, sql, self.error_info, expectErrInfo)) + + return self.error_info + elif i == queryTimes: + caller = inspect.getframeinfo(inspect.stack()[1][0]) + args = (caller.filename, caller.lineno, sql, repr(e)) + tdLog.notice("%s(%d) failed: sql:%s, %s" % args) + raise Exception(repr(e)) + i+=1 + time.sleep(1) + pass def is_err_sql(self, sql): err_flag = True @@ -471,25 +548,7 @@ class TDSql: time.sleep(1) continue - def execute(self, sql, queryTimes=30, show=False): - self.sql = sql - if show: - tdLog.info(sql) - i=1 - while i <= queryTimes: - try: - self.affectedRows = self.cursor.execute(sql) - return self.affectedRows - except Exception as e: - tdLog.notice("Try to execute sql again, query times: %d "%i) - if i == queryTimes: - caller = inspect.getframeinfo(inspect.stack()[1][0]) - args = (caller.filename, caller.lineno, sql, repr(e)) - tdLog.notice("%s(%d) failed: sql:%s, %s" % args) - raise Exception(repr(e)) - i+=1 - time.sleep(1) - pass + def checkAffectedRows(self, expectAffectedRows): if self.affectedRows != expectAffectedRows: diff --git a/tests/script/tsim/stream/session1.sim b/tests/script/tsim/stream/session1.sim index cf42159d84..aae17053b2 100644 --- a/tests/script/tsim/stream/session1.sim +++ b/tests/script/tsim/stream/session1.sim @@ -327,4 +327,40 @@ if $rows != 1 then goto loop17 endi +sql create database test2 vgroups 4; +sql use test2; +sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int); +sql create table t1 using st tags(1,1,1); +sql create table t2 using st tags(2,2,2); +sql create stream streams4 trigger at_once ignore update 0 ignore expired 0 into streamt4 as select _wstart, count(*) c1, count(a) c2 from st session(ts, 2s) ; + +sql insert into t1 values(1648791255100,1,2,3); +sql insert into t1 values(1648791255300,1,2,3); + +sleep 1000 + +sql insert into t1 values(1648791253000,1,2,3) (1648791254000,1,2,3); + +$loop_count = 0 +loop18: +sleep 1000 +sql select * from streamt4; + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +if $rows != 1 then + print =====rows=$rows + goto loop18 +endi + +if $data01 != 4 then + print =====data01=$data01 + goto loop18 +endi + +print =====over + system sh/exec.sh -n dnode1 -s stop -x SIGINT diff --git a/tests/system-test/0-others/test_hot_refresh_configurations.py b/tests/system-test/0-others/test_hot_refresh_configurations.py index cbde8c060e..59222281ae 100644 --- a/tests/system-test/0-others/test_hot_refresh_configurations.py +++ b/tests/system-test/0-others/test_hot_refresh_configurations.py @@ -215,6 +215,12 @@ class TDTestCase: self.svr_check(item["name"], item["alias"], item["except_values"], True) else: raise Exception(f"unknown key: {key}") + # reset log + path = os.sep.join([tdDnodes.getDnodesRootDir(), "dnode1", "log", "taosdlog.*"]) + tdSql.execute("alter all dnodes 'resetlog';") + r = subprocess.Popen("cat {} | grep 'reset log file'".format(path), shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + stdout, stderr = r.communicate() + assert('reset log file' in stdout.decode()) def stop(self): tdSql.close() diff --git a/tests/system-test/1-insert/ts-4272.py b/tests/system-test/1-insert/ts-4272.py index bb81305eb3..f2bacd0c2b 100644 --- a/tests/system-test/1-insert/ts-4272.py +++ b/tests/system-test/1-insert/ts-4272.py @@ -26,12 +26,10 @@ class TDTestCase: self.file1 = f"{self.testcasePath}/b.csv" self.file2 = f"{self.testcasePath}/c.csv" - #os.system("rm -rf %s/b.csv" %self.testcasePath) tdLog.debug(f"start to excute {__file__}") tdSql.init(conn.cursor(), logSql) def check_count(self, rows, records): - tdSql.execute(f"use {self.db};") tdSql.query(f"select tbname,count(*) from {self.stable0} group by tbname order by tbname;") tdSql.checkRows(rows) for i in range(rows): @@ -39,13 +37,6 @@ class TDTestCase: def reset_tb(self): # create database and tables - # os.system("taos -s 'drop database if exists d1;'") - # os.system("taos -s 'create database d1;use d1;create stable meters (ts timestamp, current float, voltage int, phase float) tags (location binary(64), groupId int);'") - # os.system(f"taos -s 'use d1;create table d2001 using meters(groupId) tags(5);'") - # res = os.system(f"taos -s 'use d1;create table d2002 using meters(groupId) tags(6);'") - # if (0 != res): - # tdLog.exit(f"create tb error") - tdSql.execute(f"drop database if exists {self.db};") tdSql.execute(f"create database {self.db};") tdSql.execute(f"use {self.db};") @@ -54,13 +45,14 @@ class TDTestCase: tdSql.execute(f"create table {self.tb2} {self.tag2};") tdSql.execute(f"create stable {self.stable1} (ts timestamp , q_int int , q_bigint bigint , q_smallint smallint , q_tinyint tinyint , q_float float , q_double double , q_bool bool , q_binary binary(100) , q_nchar nchar(100) , q_ts timestamp , q_int_null int , q_bigint_null bigint , q_smallint_null smallint , q_tinyint_null tinyint, q_float_null float , q_double_null double , q_bool_null bool , q_binary_null binary(20) , q_nchar_null nchar(20) , q_ts_null timestamp) tags(loc nchar(100) , t_int int , t_bigint bigint , t_smallint smallint , t_tinyint tinyint, t_bool bool , t_binary binary(100) , t_nchar nchar(100) ,t_float float , t_double double , t_ts timestamp);") tdSql.execute(f"create stable {self.stable2} (ts timestamp , q_int int , q_bigint bigint , q_smallint smallint , q_tinyint tinyint , q_float float , q_double double , q_bool bool , q_binary binary(100) , q_nchar nchar(100) , q_ts timestamp , q_int_null int , q_bigint_null bigint , q_smallint_null smallint , q_tinyint_null tinyint, q_float_null float , q_double_null double , q_bool_null bool , q_binary_null binary(20) , q_nchar_null nchar(20) , q_ts_null timestamp) tags(loc nchar(100) , t_int int , t_bigint bigint , t_smallint smallint , t_tinyint tinyint, t_bool bool , t_binary binary(100) , t_nchar nchar(100) ,t_float float , t_double double , t_ts timestamp);") + tdSql.execute(f"create table {self.stable1}_1 using {self.stable1}(t_int) tags(1);") + tdSql.execute(f"create table {self.stable2}_1 using {self.stable2}(t_int) tags(2);") def test(self, sql): - sql = f"use {self.db};" + sql - res = os.system(f'taos -s "{sql}"') - # if (0 != res): - # tdLog.exit(f"taos sql error") - + # sql = f"use {self.db};" + sql + # os.system(f'taos -s "{sql}"') + print(f'{sql}\n') + tdSql.execute(sql, 1) def check(self): # same table, auto create + create @@ -95,13 +87,8 @@ class TDTestCase: sql = f"insert into {self.tb1} file '{self.file1}' {self.tb2} file '{self.file2}';" self.test(sql) - # bigNum = 1010000 - # self.check_count(5, [2100, 2100, bigNum, bigNum, bigNum]) + self.check_count(2, [2010000, 1000000]) - result = os.popen("taos -s 'select count(*) from %s.%s'" %(self.db, self.tb1)) - res = result.read() - if (f"OK" in res): - tdLog.info(f"check count success") def make_csv(self, filepath, once, qtime, startts): f = open(filepath, 'w') @@ -118,10 +105,8 @@ class TDTestCase: def test_mix(self): #forbid use both value and file in one insert - result = os.popen(f"insert into {self.tb1} file '{self.file2}' {self.tb2} values('2021-07-13 14:06:34.630', 10.2, 219, 0.32);") - res = result.read() - if (f"error" in res): - tdLog.info(f"forbid success") + self.make_csv(self.file2, 10, 10, self.ts) + tdSql.error(f"insert into {self.tb1} file '{self.file2}' {self.tb2} values('2021-07-13 14:06:34.630', 10.2, 219, 0.32);") def test_bigcsv(self): # prepare csv @@ -144,7 +129,6 @@ class TDTestCase: self.test(sql) print("end insert to table") - #tdSql.execute(f"use d1;") tdSql.query(f"select tbname,count(*) from {self.stable0} group by tbname order by tbname;") tdSql.checkRows(2) tdSql.checkData(0, 1, rowNum1) @@ -160,7 +144,7 @@ class TDTestCase: ts = startts + offset rows = [] for i in range(once): - rows.append([table_name, ts + i, offset + i, 'NULL']) + rows.append([f"\'{table_name}\'", ts + i, offset + i, 'NULL']) writer.writerows(rows) f.close() print(datetime.now(), filepath, " ready!") @@ -171,22 +155,22 @@ class TDTestCase: once = 10000 qtime1 = 101 qtime2 = 100 - # rowNum1 = qtime1 * once - # rowNum2 = qtime2 * once child_1 = f"{self.stable1}_1" child_2 = f"{self.stable2}_1" self.make_stable_csv(self.file1, once, qtime1, self.ts - 86400000, child_1) self.make_stable_csv(self.file2, once, qtime2, self.ts, child_2) print("end stable_csv data prepare") - - # insert create child table of stable + sql = f"insert into {self.db}.{self.stable1}(tbname,ts,q_int,q_binary) file '{self.file1}' {self.db}.{self.stable2}(tbname,ts,q_int,q_binary) file '{self.file2}';" self.test(sql) print("end insert to stable") - #tdSql.execute(f"insert into {self.db}.{child_1}(ts, q_int) values(now, 1);") - tdSql.query(f"select tbname,count(*) from {self.stable1} group by tbname order by tbname;") - tdSql.checkRows(0) + tdSql.query(f"select tbname,count(*) from {self.stable1} group by tbname;") + tdSql.checkRows(1) + tdSql.checkData(0, 1, qtime1 * once) + tdSql.query(f"select tbname,count(*) from {self.stable2} group by tbname;") + tdSql.checkRows(1) + tdSql.checkData(0, 1, qtime2 * once) print("check stable success") def run(self): @@ -194,8 +178,10 @@ class TDTestCase: self.reset_tb() self.test_stable_csv() self.test_bigcsv() - self.test_mix() self.check() + self.test_mix() + os.system(f"rm -rf {self.file1}") + os.system(f"rm -rf {self.file2}") tdSql.close() def stop(self): diff --git a/tests/system-test/2-query/last_cache_scan.py b/tests/system-test/2-query/last_cache_scan.py index 2ef2e50a88..39271318ba 100644 --- a/tests/system-test/2-query/last_cache_scan.py +++ b/tests/system-test/2-query/last_cache_scan.py @@ -28,7 +28,7 @@ class TDTestCase: def init(self, conn, logSql, replicaVar=1): self.replicaVar = int(replicaVar) tdLog.debug(f"start to excute {__file__}") - tdSql.init(conn.cursor(), False) + tdSql.init(conn.cursor(), True) def create_database(self,tsql, dbName,dropFlag=1,vgroups=2,replica=1, duration:str='1d'): if dropFlag == 1: @@ -341,44 +341,12 @@ class TDTestCase: tdSql.checkData(0, 0, '999') p = subprocess.run(["taos", '-s', "alter table test.meters drop column c1; alter table test.meters add column c12 int"]) p.check_returncode() - tdSql.query("select last(c1) from meters", queryTimes=1) - tdSql.checkData(0, 0, None) - tdSql.query('select last(*) from meters', queryTimes=1) - print(str(tdSql.queryResult)) - tdSql.checkData(0, 1, None) - tdSql.query('select last(c1), c1, ts from meters', queryTimes=1) - tdSql.checkRows(1) - tdSql.checkData(0, 0, None) - tdSql.checkData(0, 1, None) - tdSql.checkData(0, 2, None) - - try: - tdSql.query('select ts, last(c1), c1, ts, c1 from meters', queryTimes=1) - except Exception as e: - if str(e).count('Invalid column name') == 1: - print('column has been dropped, the cache has been updated: %s' % (str(e))) - return - else: - raise - tdSql.checkRows(1) - tdSql.checkCols(5) - tdSql.checkData(0, 0, None) - tdSql.checkData(0, 1, None) - tdSql.checkData(0, 2, None) - tdSql.checkData(0, 3, None) - tdSql.checkData(0, 4, None) - - try: - tdSql.query('select last(c1), last(c2), last(c3) from meters', queryTimes=1) - except Exception as e: - if str(e).count('Invalid column name') == 1: - print('column has been dropped, the cache has been updated: %s' % (str(e))) - return - else: - raise + tdSql.query_success_failed("select ts, last(c1), c1, ts, c1 from meters", queryTimes=10, expectErrInfo="Invalid column name: c1") + tdSql.query('select last(c12), c12, ts from meters', queryTimes=1) tdSql.checkRows(1) tdSql.checkCols(3) tdSql.checkData(0, 0, None) + tdSql.checkData(0, 1, None) def test_cache_scan_with_drop_column(self): tdSql.query('select last(*) from meters') @@ -403,49 +371,48 @@ class TDTestCase: tdSql.checkData(0, 10, None) def test_cache_scan_last_row_with_drop_column2(self): - tdSql.query('select last_row(c1) from meters') + tdSql.query('select last_row(c2) from meters') print(str(tdSql.queryResult)) tdSql.checkCols(1) - p = subprocess.run(["taos", '-s', "alter table test.meters drop column c1; alter table test.meters add column c11 int"]) + p = subprocess.run(["taos", '-s', "alter table test.meters drop column c2; alter table test.meters add column c1 int"]) p.check_returncode() - tdSql.query('select last_row(c1) from meters', queryTimes=1) - print(str(tdSql.queryResult)) - tdSql.checkCols(1) + tdSql.query_success_failed("select ts, last_row(c2), c12, ts, c12 from meters", queryTimes=10, expectErrInfo="Invalid column name: c2") + tdSql.query('select last(c1), c1, ts from meters', queryTimes=1) + tdSql.checkRows(1) + tdSql.checkCols(3) tdSql.checkData(0, 0, None) + tdSql.checkData(0, 1, None) def test_cache_scan_last_row_with_partition_by(self): tdSql.query('select last(c1) from meters partition by t1') print(str(tdSql.queryResult)) tdSql.checkCols(1) - tdSql.checkRows(5) - p = subprocess.run(["taos", '-s', "alter table test.meters drop column c1; alter table test.meters add column c11 int"]) + tdSql.checkRows(2) + p = subprocess.run(["taos", '-s', "alter table test.meters drop column c1; alter table test.meters add column c2 int"]) p.check_returncode() - tdSql.query('select last_row(c1) from meters partition by t1', queryTimes=1) + tdSql.query_success_failed('select last(c1) from meters partition by t1', queryTimes=10, expectErrInfo="Invalid column name: c1") + tdSql.query('select last(c2), c2, ts from meters', queryTimes=1) print(str(tdSql.queryResult)) - tdSql.checkCols(1) - tdSql.checkRows(5) + tdSql.checkRows(1) + tdSql.checkCols(3) tdSql.checkData(0, 0, None) - tdSql.checkData(1, 0, None) - tdSql.checkData(2, 0, None) - tdSql.checkData(3, 0, None) - tdSql.checkData(4, 0, None) + tdSql.checkData(0, 1, None) + def test_cache_scan_last_row_with_partition_by_tbname(self): - tdSql.query('select last(c1) from meters partition by tbname', queryTimes=1) + tdSql.query('select last(c2) from meters partition by tbname') print(str(tdSql.queryResult)) tdSql.checkCols(1) tdSql.checkRows(10) - p = subprocess.run(["taos", '-s', "alter table test.meters drop column c1; alter table test.meters add column c11 int"]) + p = subprocess.run(["taos", '-s', "alter table test.meters drop column c2; alter table test.meters add column c1 int"]) p.check_returncode() - tdSql.query('select last_row(c1) from meters partition by tbname', queryTimes=1) + tdSql.query_success_failed('select last_row(c2) from meters partition by tbname', queryTimes=10, expectErrInfo="Invalid column name: c2") + tdSql.query('select last(c1), c1, ts from meters', queryTimes=1) print(str(tdSql.queryResult)) - tdSql.checkCols(1) - tdSql.checkRows(10) + tdSql.checkRows(1) + tdSql.checkCols(3) tdSql.checkData(0, 0, None) - tdSql.checkData(1, 0, None) - tdSql.checkData(2, 0, None) - tdSql.checkData(3, 0, None) - tdSql.checkData(4, 0, None) + tdSql.checkData(0, 1, None) @@ -457,9 +424,9 @@ class TDTestCase: self.test_cache_scan_with_drop_and_add_column2() #self.test_cache_scan_with_drop_column() #self.test_cache_scan_last_row_with_drop_column() - #self.test_cache_scan_last_row_with_drop_column2() - #self.test_cache_scan_last_row_with_partition_by() - #self.test_cache_scan_last_row_with_partition_by_tbname() + self.test_cache_scan_last_row_with_drop_column2() + self.test_cache_scan_last_row_with_partition_by() + self.test_cache_scan_last_row_with_partition_by_tbname() def stop(self): tdSql.close()