diff --git a/include/dnode/vnode/vnode.h b/include/dnode/vnode/vnode.h index 2212a8c29a..4332fc8e58 100644 --- a/include/dnode/vnode/vnode.h +++ b/include/dnode/vnode/vnode.h @@ -31,8 +31,12 @@ extern "C" { /* ------------------------ TYPES EXPOSED ------------------------ */ typedef struct SVnode SVnode; +typedef struct SDnode SDnode; +typedef int32_t (*PutReqToVQueryQFp)(SDnode *pDnode, struct SRpcMsg *pReq); + typedef struct SVnodeCfg { int32_t vgId; + SDnode *pDnode; /** vnode buffer pool options */ struct { @@ -66,15 +70,23 @@ typedef struct SVnodeCfg { SWalCfg walCfg; } SVnodeCfg; +typedef struct { + int32_t sver; + char *timezone; + char *locale; + char *charset; + uint16_t nthreads; // number of commit threads. 0 for no threads and a schedule queue should be given (TODO) + PutReqToVQueryQFp putReqToVQueryQFp; +} SVnodeOpt; + /* ------------------------ SVnode ------------------------ */ /** * @brief Initialize the vnode module * - * @param nthreads number of commit threads. 0 for no threads and - * a schedule queue should be given (TODO) + * @param pOption Option of the vnode mnodule * @return int 0 for success and -1 for failure */ -int vnodeInit(uint16_t nthreads); +int vnodeInit(const SVnodeOpt *pOption); /** * @brief clear a vnode @@ -89,7 +101,7 @@ void vnodeClear(); * @param pVnodeCfg options of the vnode * @return SVnode* The vnode object */ -SVnode *vnodeOpen(const char *path, const SVnodeCfg *pVnodeCfg, int32_t vid); +SVnode *vnodeOpen(const char *path, const SVnodeCfg *pVnodeCfg); /** * @brief Close a VNODE diff --git a/source/dnode/mgmt/impl/inc/dndInt.h b/source/dnode/mgmt/impl/inc/dndInt.h index afdd678213..e637b38815 100644 --- a/source/dnode/mgmt/impl/inc/dndInt.h +++ b/source/dnode/mgmt/impl/inc/dndInt.h @@ -94,6 +94,7 @@ typedef struct { pthread_t *threadId; SRWLatch latch; SDnodeWorker mgmtWorker; + SDnodeWorker statusWorker; } SDnodeMgmt; typedef struct { diff --git a/source/dnode/mgmt/impl/inc/dndVnodes.h b/source/dnode/mgmt/impl/inc/dndVnodes.h index b5fae62959..a78db602fe 100644 --- a/source/dnode/mgmt/impl/inc/dndVnodes.h +++ b/source/dnode/mgmt/impl/inc/dndVnodes.h @@ -36,6 +36,8 @@ int32_t dndProcessAuthVnodeReq(SDnode *pDnode, SRpcMsg *pReq); int32_t dndProcessSyncVnodeReq(SDnode *pDnode, SRpcMsg *pReq); int32_t dndProcessCompactVnodeReq(SDnode *pDnode, SRpcMsg *pReq); +int32_t dndPutReqToVQueryQ(SDnode *pDnode, SRpcMsg *pReq); + #ifdef __cplusplus } #endif diff --git a/source/dnode/mgmt/impl/src/dndMgmt.c b/source/dnode/mgmt/impl/src/dndMgmt.c index f252bffbbf..d9edf39b73 100644 --- a/source/dnode/mgmt/impl/src/dndMgmt.c +++ b/source/dnode/mgmt/impl/src/dndMgmt.c @@ -536,6 +536,11 @@ int32_t dndInitMgmt(SDnode *pDnode) { return -1; } + if (dndInitWorker(pDnode, &pMgmt->statusWorker, DND_WORKER_SINGLE, "dnode-status", 1, 1, dndProcessMgmtQueue) != 0) { + dError("failed to start dnode mgmt worker since %s", terrstr()); + return -1; + } + pMgmt->threadId = taosCreateThread(dnodeThreadRoutine, pDnode); if (pMgmt->threadId == NULL) { dError("failed to init dnode thread"); @@ -550,6 +555,7 @@ int32_t dndInitMgmt(SDnode *pDnode) { void dndStopMgmt(SDnode *pDnode) { SDnodeMgmt *pMgmt = &pDnode->dmgmt; dndCleanupWorker(&pMgmt->mgmtWorker); + dndCleanupWorker(&pMgmt->statusWorker); if (pMgmt->threadId != NULL) { taosDestoryThread(pMgmt->threadId); @@ -587,7 +593,12 @@ void dndProcessMgmtMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { dndUpdateMnodeEpSet(pDnode, pEpSet); } - if (dndWriteMsgToWorker(&pMgmt->mgmtWorker, pMsg, sizeof(SRpcMsg)) != 0) { + SDnodeWorker *pWorker = &pMgmt->mgmtWorker; + if (pMsg->msgType == TDMT_MND_STATUS_RSP) { + pWorker = &pMgmt->statusWorker; + } + + if (dndWriteMsgToWorker(pWorker, pMsg, sizeof(SRpcMsg)) != 0) { if (pMsg->msgType & 1u) { SRpcMsg rsp = {.handle = pMsg->handle, .code = TSDB_CODE_OUT_OF_MEMORY}; rpcSendResponse(&rsp); diff --git a/source/dnode/mgmt/impl/src/dndVnodes.c b/source/dnode/mgmt/impl/src/dndVnodes.c index bf27a542ae..2d7999fe5a 100644 --- a/source/dnode/mgmt/impl/src/dndVnodes.c +++ b/source/dnode/mgmt/impl/src/dndVnodes.c @@ -381,7 +381,8 @@ static void *dnodeOpenVnodeFunc(void *param) { pMgmt->openVnodes, pMgmt->totalVnodes); dndReportStartup(pDnode, "open-vnodes", stepDesc); - SVnode *pImpl = vnodeOpen(pCfg->path, NULL, pCfg->vgId); + SVnodeCfg cfg = {.pDnode = pDnode, .vgId = pCfg->vgId}; + SVnode *pImpl = vnodeOpen(pCfg->path, &cfg); if (pImpl == NULL) { dError("vgId:%d, failed to open vnode by thread:%d", pCfg->vgId, pThread->threadIndex); pThread->failed++; @@ -581,7 +582,8 @@ int32_t dndProcessCreateVnodeReq(SDnode *pDnode, SRpcMsg *pReq) { return -1; } - SVnode *pImpl = vnodeOpen(wrapperCfg.path, NULL /*pCfg*/, pCreate->vgId); + vnodeCfg.pDnode = pDnode; + SVnode *pImpl = vnodeOpen(wrapperCfg.path, &vnodeCfg); if (pImpl == NULL) { dError("vgId:%d, failed to create vnode since %s", pCreate->vgId, terrstr()); return -1; @@ -800,7 +802,7 @@ static void dndProcessVnodeSyncQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t } } -static int32_t dndWriteRpcMsgToVnodeQueue(STaosQueue *pQueue, SRpcMsg *pRpcMsg) { +static int32_t dndWriteRpcMsgToVnodeQueue(STaosQueue *pQueue, SRpcMsg *pRpcMsg, bool sendRsp) { int32_t code = 0; if (pQueue == NULL) { @@ -817,13 +819,15 @@ static int32_t dndWriteRpcMsgToVnodeQueue(STaosQueue *pQueue, SRpcMsg *pRpcMsg) } } - if (code != TSDB_CODE_SUCCESS) { + if (code != TSDB_CODE_SUCCESS && sendRsp) { if (pRpcMsg->msgType & 1u) { SRpcMsg rsp = {.handle = pRpcMsg->handle, .code = code}; rpcSendResponse(&rsp); } rpcFreeCont(pRpcMsg->pCont); } + + return code; } static SVnodeObj *dndAcquireVnodeFromMsg(SDnode *pDnode, SRpcMsg *pMsg) { @@ -846,7 +850,7 @@ static SVnodeObj *dndAcquireVnodeFromMsg(SDnode *pDnode, SRpcMsg *pMsg) { void dndProcessVnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { SVnodeObj *pVnode = dndAcquireVnodeFromMsg(pDnode, pMsg); if (pVnode != NULL) { - dndWriteRpcMsgToVnodeQueue(pVnode->pWriteQ, pMsg); + (void)dndWriteRpcMsgToVnodeQueue(pVnode->pWriteQ, pMsg, true); dndReleaseVnode(pDnode, pVnode); } } @@ -854,7 +858,7 @@ void dndProcessVnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { void dndProcessVnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { SVnodeObj *pVnode = dndAcquireVnodeFromMsg(pDnode, pMsg); if (pVnode != NULL) { - dndWriteRpcMsgToVnodeQueue(pVnode->pSyncQ, pMsg); + (void)dndWriteRpcMsgToVnodeQueue(pVnode->pSyncQ, pMsg, true); dndReleaseVnode(pDnode, pVnode); } } @@ -862,7 +866,7 @@ void dndProcessVnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { void dndProcessVnodeQueryMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { SVnodeObj *pVnode = dndAcquireVnodeFromMsg(pDnode, pMsg); if (pVnode != NULL) { - dndWriteRpcMsgToVnodeQueue(pVnode->pQueryQ, pMsg); + (void)dndWriteRpcMsgToVnodeQueue(pVnode->pQueryQ, pMsg, true); dndReleaseVnode(pDnode, pVnode); } } @@ -870,11 +874,23 @@ void dndProcessVnodeQueryMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { void dndProcessVnodeFetchMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { SVnodeObj *pVnode = dndAcquireVnodeFromMsg(pDnode, pMsg); if (pVnode != NULL) { - dndWriteRpcMsgToVnodeQueue(pVnode->pFetchQ, pMsg); + (void)dndWriteRpcMsgToVnodeQueue(pVnode->pFetchQ, pMsg, true); dndReleaseVnode(pDnode, pVnode); } } +int32_t dndPutReqToVQueryQ(SDnode *pDnode, SRpcMsg *pMsg) { + SMsgHead *pHead = pMsg->pCont; + // pHead->vgId = htonl(pHead->vgId); + + SVnodeObj *pVnode = dndAcquireVnode(pDnode, pHead->vgId); + if (pVnode == NULL) return -1; + + int32_t code = dndWriteRpcMsgToVnodeQueue(pVnode->pFetchQ, pMsg, false); + dndReleaseVnode(pDnode, pVnode); + return code; +} + static int32_t dndPutMsgIntoVnodeApplyQueue(SDnode *pDnode, int32_t vgId, SRpcMsg *pMsg) { SVnodeObj *pVnode = dndAcquireVnode(pDnode, vgId); if (pVnode == NULL) return -1; diff --git a/source/dnode/mgmt/impl/src/dnode.c b/source/dnode/mgmt/impl/src/dnode.c index cd27781df3..f3d5e09564 100644 --- a/source/dnode/mgmt/impl/src/dnode.c +++ b/source/dnode/mgmt/impl/src/dnode.c @@ -22,8 +22,8 @@ #include "dndTransport.h" #include "dndVnodes.h" #include "sync.h" -#include "wal.h" #include "tfs.h" +#include "wal.h" EStat dndGetStat(SDnode *pDnode) { return pDnode->stat; } @@ -196,7 +196,15 @@ SDnode *dndInit(SDnodeOpt *pOption) { return NULL; } - if (vnodeInit(pDnode->opt.numOfCommitThreads) != 0) { + SVnodeOpt vnodeOpt = { + .sver = pDnode->opt.sver, + .timezone = pDnode->opt.timezone, + .locale = pDnode->opt.locale, + .charset = pDnode->opt.charset, + .nthreads = pDnode->opt.numOfCommitThreads, + .putReqToVQueryQFp = dndPutReqToVQueryQ, + }; + if (vnodeInit(&vnodeOpt) != 0) { dError("failed to init vnode env"); dndCleanup(pDnode); return NULL; diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index 85b7fbbb42..bc4d890257 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -740,7 +740,7 @@ static int32_t mndBuildDropVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj * if (pReq == NULL) return -1; action.pCont = pReq; - action.contLen = sizeof(SCreateVnodeReq); + action.contLen = sizeof(SDropVnodeReq); action.msgType = TDMT_DND_DROP_VNODE; action.acceptableCode = TSDB_CODE_DND_VNODE_NOT_DEPLOYED; if (mndTransAppendRedoAction(pTrans, &action) != 0) { diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index e0d6d3dd42..93d6d104ff 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -319,6 +319,14 @@ static int32_t mndGetAvailableDnode(SMnode *pMnode, SVgObj *pVgroup, SArray *pAr taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes); + int32_t size = taosArrayGetSize(pArray); + if (size < pVgroup->replica) { + mError("db:%s, vgId:%d, no enough online dnodes:%d to alloc %d replica", pVgroup->dbName, pVgroup->vgId, size, + pVgroup->replica); + terrno = TSDB_CODE_MND_NO_ENOUGH_DNODES; + return -1; + } + for (int32_t v = 0; v < pVgroup->replica; ++v) { SVnodeGid *pVgid = &pVgroup->vnodeGid[v]; SDnodeObj *pDnode = taosArrayGet(pArray, v); diff --git a/source/dnode/mnode/impl/test/profile/profile.cpp b/source/dnode/mnode/impl/test/profile/profile.cpp index bdffb6c72a..bf047517d3 100644 --- a/source/dnode/mnode/impl/test/profile/profile.cpp +++ b/source/dnode/mnode/impl/test/profile/profile.cpp @@ -13,7 +13,7 @@ class MndTestProfile : public ::testing::Test { protected: - static void SetUpTestSuite() { test.Init("/tmp/mnode_test_profile", 9022); } + static void SetUpTestSuite() { test.Init("/tmp/mnode_test_profile", 9031); } static void TearDownTestSuite() { test.Cleanup(); } static Testbase test; @@ -53,7 +53,7 @@ TEST_F(MndTestProfile, 01_ConnectMsg) { EXPECT_EQ(pRsp->epSet.inUse, 0); EXPECT_EQ(pRsp->epSet.numOfEps, 1); - EXPECT_EQ(pRsp->epSet.port[0], 9022); + EXPECT_EQ(pRsp->epSet.port[0], 9031); EXPECT_STREQ(pRsp->epSet.fqdn[0], "localhost"); connId = pRsp->connId; @@ -127,7 +127,7 @@ TEST_F(MndTestProfile, 04_HeartBeatMsg) { EXPECT_EQ(pRsp->epSet.inUse, 0); EXPECT_EQ(pRsp->epSet.numOfEps, 1); - EXPECT_EQ(pRsp->epSet.port[0], 9022); + EXPECT_EQ(pRsp->epSet.port[0], 9031); EXPECT_STREQ(pRsp->epSet.fqdn[0], "localhost"); } @@ -185,7 +185,7 @@ TEST_F(MndTestProfile, 05_KillConnMsg) { EXPECT_EQ(pRsp->epSet.inUse, 0); EXPECT_EQ(pRsp->epSet.numOfEps, 1); - EXPECT_EQ(pRsp->epSet.port[0], 9022); + EXPECT_EQ(pRsp->epSet.port[0], 9031); EXPECT_STREQ(pRsp->epSet.fqdn[0], "localhost"); connId = pRsp->connId; @@ -249,7 +249,7 @@ TEST_F(MndTestProfile, 07_KillQueryMsg) { EXPECT_EQ(pRsp->epSet.inUse, 0); EXPECT_EQ(pRsp->epSet.numOfEps, 1); - EXPECT_EQ(pRsp->epSet.port[0], 9022); + EXPECT_EQ(pRsp->epSet.port[0], 9031); EXPECT_STREQ(pRsp->epSet.fqdn[0], "localhost"); } } diff --git a/source/dnode/vnode/impl/inc/vnodeDef.h b/source/dnode/vnode/impl/inc/vnodeDef.h index e70e891794..f9172dd351 100644 --- a/source/dnode/vnode/impl/inc/vnodeDef.h +++ b/source/dnode/vnode/impl/inc/vnodeDef.h @@ -57,6 +57,8 @@ typedef struct SVnodeMgr { pthread_cond_t hasTask; TD_DLIST(SVnodeTask) queue; // For vnode Mgmt + SDnode* pDnode; + PutReqToVQueryQFp putReqToVQueryQFp; } SVnodeMgr; extern SVnodeMgr vnodeMgr; @@ -75,10 +77,13 @@ struct SVnode { SVnodeFS* pFs; tsem_t canCommit; SQHandle* pQuery; + SDnode* pDnode; }; int vnodeScheduleTask(SVnodeTask* task); +int32_t vnodePutReqToVQueryQ(SVnode *pVnode, struct SRpcMsg *pReq); + #ifdef __cplusplus } #endif diff --git a/source/dnode/vnode/impl/src/vnodeMain.c b/source/dnode/vnode/impl/src/vnodeMain.c index eb4b45bc20..85ccc9879e 100644 --- a/source/dnode/vnode/impl/src/vnodeMain.c +++ b/source/dnode/vnode/impl/src/vnodeMain.c @@ -15,27 +15,29 @@ #include "vnodeDef.h" -static SVnode *vnodeNew(const char *path, const SVnodeCfg *pVnodeCfg, int32_t vid); +static SVnode *vnodeNew(const char *path, const SVnodeCfg *pVnodeCfg); static void vnodeFree(SVnode *pVnode); static int vnodeOpenImpl(SVnode *pVnode); static void vnodeCloseImpl(SVnode *pVnode); -SVnode *vnodeOpen(const char *path, const SVnodeCfg *pVnodeCfg, int32_t vid) { +SVnode *vnodeOpen(const char *path, const SVnodeCfg *pVnodeCfg) { SVnode *pVnode = NULL; // Set default options - //if (pVnodeCfg == NULL) { - pVnodeCfg = &defaultVnodeOptions; - //} + SVnodeCfg cfg = defaultVnodeOptions; + if (pVnodeCfg != NULL) { + cfg.vgId = pVnodeCfg->vgId; + cfg.pDnode = pVnodeCfg->pDnode; + } // Validate options - if (vnodeValidateOptions(pVnodeCfg) < 0) { + if (vnodeValidateOptions(&cfg) < 0) { // TODO return NULL; } // Create the handle - pVnode = vnodeNew(path, pVnodeCfg, vid); + pVnode = vnodeNew(path, &cfg); if (pVnode == NULL) { // TODO: handle error return NULL; @@ -62,7 +64,7 @@ void vnodeClose(SVnode *pVnode) { void vnodeDestroy(const char *path) { taosRemoveDir(path); } /* ------------------------ STATIC METHODS ------------------------ */ -static SVnode *vnodeNew(const char *path, const SVnodeCfg *pVnodeCfg, int32_t vid) { +static SVnode *vnodeNew(const char *path, const SVnodeCfg *pVnodeCfg) { SVnode *pVnode = NULL; pVnode = (SVnode *)calloc(1, sizeof(*pVnode)); @@ -71,7 +73,8 @@ static SVnode *vnodeNew(const char *path, const SVnodeCfg *pVnodeCfg, int32_t vi return NULL; } - pVnode->vgId = vid; + pVnode->vgId = pVnodeCfg->vgId; + pVnode->pDnode = pVnodeCfg->pDnode; pVnode->path = strdup(path); vnodeOptionsCopy(&(pVnode->config), pVnodeCfg); diff --git a/source/dnode/vnode/impl/src/vnodeMgr.c b/source/dnode/vnode/impl/src/vnodeMgr.c index 784d1abb60..cc369a0d15 100644 --- a/source/dnode/vnode/impl/src/vnodeMgr.c +++ b/source/dnode/vnode/impl/src/vnodeMgr.c @@ -19,17 +19,18 @@ SVnodeMgr vnodeMgr = {.vnodeInitFlag = TD_MOD_UNINITIALIZED}; static void* loop(void* arg); -int vnodeInit(uint16_t nthreads) { +int vnodeInit(const SVnodeOpt *pOption) { if (TD_CHECK_AND_SET_MODE_INIT(&(vnodeMgr.vnodeInitFlag)) == TD_MOD_INITIALIZED) { return 0; } vnodeMgr.stop = false; + vnodeMgr.putReqToVQueryQFp = pOption->putReqToVQueryQFp; // Start commit handers - if (nthreads > 0) { - vnodeMgr.nthreads = nthreads; - vnodeMgr.threads = (pthread_t*)calloc(nthreads, sizeof(pthread_t)); + if (pOption->nthreads > 0) { + vnodeMgr.nthreads = pOption->nthreads; + vnodeMgr.threads = (pthread_t*)calloc(pOption->nthreads, sizeof(pthread_t)); if (vnodeMgr.threads == NULL) { return -1; } @@ -38,7 +39,7 @@ int vnodeInit(uint16_t nthreads) { pthread_cond_init(&(vnodeMgr.hasTask), NULL); TD_DLIST_INIT(&(vnodeMgr.queue)); - for (uint16_t i = 0; i < nthreads; i++) { + for (uint16_t i = 0; i < pOption->nthreads; i++) { pthread_create(&(vnodeMgr.threads[i]), NULL, loop, NULL); pthread_setname_np(vnodeMgr.threads[i], "VND Commit Thread"); } @@ -89,6 +90,14 @@ int vnodeScheduleTask(SVnodeTask* pTask) { return 0; } +int32_t vnodePutReqToVQueryQ(SVnode* pVnode, struct SRpcMsg* pReq) { + if (pVnode == NULL || pVnode->pDnode == NULL || vnodeMgr.putReqToVQueryQFp == NULL) { + terrno = TSDB_CODE_VND_APP_ERROR; + return -1; + } + return (*vnodeMgr.putReqToVQueryQFp)(pVnode->pDnode, pReq); +} + /* ------------------------ STATIC METHODS ------------------------ */ static void* loop(void* arg) { SVnodeTask* pTask; diff --git a/tests/script/jenkins/basic.txt b/tests/script/jenkins/basic.txt index bc0c3a4f64..bb6569deb0 100644 --- a/tests/script/jenkins/basic.txt +++ b/tests/script/jenkins/basic.txt @@ -5,8 +5,9 @@ ./test.sh -f sim/user/basic1.sim # ---- db -./test.sh -f sim/db/basic1.sim -./test.sh -f sim/db/error1.sim +./test.sh -f sim/db/basic1.sim +./test.sh -f sim/db/basic6.sim +./test.sh -f sim/db/error1.sim # ---- table ./test.sh -f sim/table/basic1.sim diff --git a/tests/script/sh/exec.sh b/tests/script/sh/exec.sh index 2e95a740d0..05f756ebb6 100755 --- a/tests/script/sh/exec.sh +++ b/tests/script/sh/exec.sh @@ -30,7 +30,7 @@ do CLEAR_OPTION="clear" ;; v) - SHELL_OPTION="true" + VALGRIND_OPTION="true" ;; u) USERS=$OPTARG @@ -99,7 +99,7 @@ fi if [ "$EXEC_OPTON" = "start" ]; then echo "ExcuteCmd:" $EXE_DIR/taosd -c $CFG_DIR - if [ "$SHELL_OPTION" = "true" ]; then + if [ "$VALGRIND_OPTION" = "true" ]; then TT=`date +%s` mkdir ${LOG_DIR}/${TT} nohup valgrind --log-file=${LOG_DIR}/${TT}/valgrind.log --tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all -v --workaround-gcc296-bugs=yes $EXE_DIR/taosd -c $CFG_DIR > /dev/null 2>&1 & diff --git a/tests/script/sim/db/basic6.sim b/tests/script/sim/db/basic6.sim index 684ce825fe..a688b4c2f3 100644 --- a/tests/script/sim/db/basic6.sim +++ b/tests/script/sim/db/basic6.sim @@ -1,29 +1,24 @@ system sh/stop_dnodes.sh system sh/deploy.sh -n dnode1 -i 1 -system sh/cfg.sh -n dnode1 -c wallevel -v 0 -system sh/cfg.sh -n dnode1 -c maxVgroupsPerDb -v 4 -system sh/cfg.sh -n dnode1 -c maxTablesPerVnode -v 1000 - system sh/exec.sh -n dnode1 -s start - -sleep 2000 sql connect + print ============================ dnode1 start $i = 0 -$dbPrefix = ob_db_db -$tbPrefix = ob_db_tb +$dbPrefix = db +$tbPrefix = tb $db = $dbPrefix . $i $tb = $tbPrefix . $i print =============== step1 -sql create database $db replica 1 days 20 keep 2000 cache 16 +sql create database $db replica 1 days 20 keep 2000 cache 16 vgroups 4 sql show databases print $data00 $data01 $data02 $data03 $data04 $data05 $data06 $data07 if $data00 != $db then return -1 endi -if $data02 != 0 then +if $data02 != 4 then return -1 endi if $data03 != 0 then @@ -63,9 +58,6 @@ print $data00 $data01 $data02 $data03 $data04 $data05 $data06 $data07 if $data00 != $db then return -1 endi -if $data02 != 0 then - return -1 -endi if $data03 != 0 then return -1 endi @@ -77,30 +69,14 @@ if $data06 != 15 then endi print =============== step6 -sql use $db -sql create table $tb (ts timestamp, speed int) -$i = 1 -while $i < 4 +$i = $i + 1 +while $i < 5 $db = $dbPrefix . $i - $tb = $tbPrefix . $i sql create database $db sql use $db - sql create table $tb (ts timestamp, speed int) $i = $i + 1 endw -sql show databases -if $rows != 4 then - return -1 -endi - -$i = 4 -$db = $dbPrefix . $i -$tb = $tbPrefix . $i -sql create database $db -sql use $db -sql create table $tb (ts timestamp, speed int) - print =============== step7 $i = 0 while $i < 5 @@ -115,7 +91,12 @@ $db = $dbPrefix . $i $tb = $tbPrefix . $i sql create database $db sql use $db -sql create table $tb (ts timestamp, speed int) +sql create table st (ts timestamp, i int) tags (j int) +sql create table $tb using st tags(1) + +return +system sh/exec.sh -n dnode1 -s stop -x SIGINT + sql show tables if $rows != 1 then return -1 @@ -133,7 +114,8 @@ if $rows != 0 then endi print =============== step11 -sql create table $tb (ts timestamp, speed int) +sql create table st (ts timestamp, i int) tags (j int) +sql create table $tb using st tags(1) sql show tables if $rows != 1 then return -1 @@ -149,16 +131,23 @@ sql show tables if $rows != 0 then return -1 endi -sql create table $tb (ts timestamp, speed int) + +sql create table st (ts timestamp, i int) tags (j int) +sql create table $tb using st tags(1) sql show tables if $rows != 1 then return -1 endi + sql insert into $tb values (now+1a, 0) sql insert into $tb values (now+2a, 1) sql insert into $tb values (now+3a, 2) sql insert into $tb values (now+4a, 3) sql insert into $tb values (now+5a, 4) + + +return + sql select * from $tb if $rows != 5 then return -1 @@ -176,7 +165,8 @@ if $rows != 0 then endi print =============== step16 -sql create table $tb (ts timestamp, speed int) +sql create table st (ts timestamp, i int) tags (j int) +sql create table $tb using st tags(1) sql show tables if $rows != 1 then return -1 diff --git a/tests/test/c/create_table.c b/tests/test/c/create_table.c index f2db9d0a0c..a5a55bbc91 100644 --- a/tests/test/c/create_table.c +++ b/tests/test/c/create_table.c @@ -40,6 +40,8 @@ typedef struct { float createTableSpeed; float insertDataSpeed; int64_t startMs; + int64_t maxDelay; + int64_t minDelay; pthread_t thread; } SThreadInfo; @@ -58,12 +60,30 @@ int32_t main(int32_t argc, char *argv[]) { pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); SThreadInfo *pInfo = (SThreadInfo *)calloc(numOfThreads, sizeof(SThreadInfo)); - int64_t numOfTablesPerThread = numOfTables / numOfThreads; - numOfTables = numOfTablesPerThread * numOfThreads; + //int64_t numOfTablesPerThread = numOfTables / numOfThreads; + //numOfTables = numOfTablesPerThread * numOfThreads; + + + if (numOfThreads < 1) { + numOfThreads = 1; + } + + int64_t a = numOfTables / numOfThreads; + if (a < 1) { + numOfThreads = numOfTables; + a = 1; + } + + int64_t b = 0; + b = numOfTables % numOfThreads; + + int64_t tableFrom = 0; for (int32_t i = 0; i < numOfThreads; ++i) { - pInfo[i].tableBeginIndex = i * numOfTablesPerThread; - pInfo[i].tableEndIndex = (i + 1) * numOfTablesPerThread; + pInfo[i].tableBeginIndex = tableFrom; + pInfo[i].tableEndIndex = i < b ? tableFrom + a : tableFrom + a - 1; + tableFrom = pInfo[i].tableEndIndex + 1; pInfo[i].threadIndex = i; + pInfo[i].minDelay = INT64_MAX; strcpy(pInfo[i].dbName, dbName); strcpy(pInfo[i].stbName, stbName); pthread_create(&(pInfo[i].thread), &thattr, threadFunc, (void *)(pInfo + i)); @@ -74,9 +94,15 @@ int32_t main(int32_t argc, char *argv[]) { pthread_join(pInfo[i].thread, NULL); } + int64_t maxDelay = 0; + int64_t minDelay = INT64_MAX; + float createTableSpeed = 0; for (int32_t i = 0; i < numOfThreads; ++i) { createTableSpeed += pInfo[i].createTableSpeed; + + if (pInfo[i].maxDelay > maxDelay) maxDelay = pInfo[i].maxDelay; + if (pInfo[i].minDelay < minDelay) minDelay = pInfo[i].minDelay; } float insertDataSpeed = 0; @@ -84,10 +110,19 @@ int32_t main(int32_t argc, char *argv[]) { insertDataSpeed += pInfo[i].insertDataSpeed; } - pPrint("%s total %" PRId64 " tables, %.1f tables/second, threads:%d %s", GREEN, numOfTables, createTableSpeed, - numOfThreads, NC); - pPrint("%s total %" PRId64 " tables, %.1f rows/second, threads:%d %s", GREEN, numOfTables, insertDataSpeed, + pPrint("%s total %" PRId64 " tables, %.1f tables/second, threads:%d, maxDelay: %" PRId64 "us, minDelay: %" PRId64 "us %s", + GREEN, + numOfTables, + createTableSpeed, + numOfThreads, + maxDelay, + minDelay, + NC); + + if (insertData) { + pPrint("%s total %" PRId64 " tables, %.1f rows/second, threads:%d %s", GREEN, numOfTables, insertDataSpeed, numOfThreads, NC); + } pthread_attr_destroy(&thattr); free(pInfo); @@ -99,36 +134,36 @@ void createDbAndStb() { TAOS *con = taos_connect(NULL, "root", "taosdata", NULL, 0); if (con == NULL) { - pError("failed to connect to DB, reason:%s", taos_errstr(con)); + pError("failed to connect to DB, reason:%s", taos_errstr(NULL)); exit(1); } sprintf(qstr, "create database if not exists %s vgroups %d", dbName, numOfVgroups); - TAOS_RES *pSql = taos_query(con, qstr); - int32_t code = taos_errno(pSql); + TAOS_RES *pRes = taos_query(con, qstr); + int32_t code = taos_errno(pRes); if (code != 0) { - pError("failed to create database:%s, sql:%s, code:%d reason:%s", dbName, qstr, taos_errno(con), taos_errstr(con)); + pError("failed to create database:%s, sql:%s, code:%d reason:%s", dbName, qstr, taos_errno(pRes), taos_errstr(pRes)); exit(0); } - taos_free_result(pSql); + taos_free_result(pRes); sprintf(qstr, "use %s", dbName); - pSql = taos_query(con, qstr); - code = taos_errno(pSql); + pRes = taos_query(con, qstr); + code = taos_errno(pRes); if (code != 0) { - pError("failed to use db, code:%d reason:%s", taos_errno(con), taos_errstr(con)); + pError("failed to use db, code:%d reason:%s", taos_errno(pRes), taos_errstr(pRes)); exit(0); } - taos_free_result(pSql); + taos_free_result(pRes); sprintf(qstr, "create table %s (ts timestamp, i int) tags (j int)", stbName); - pSql = taos_query(con, qstr); - code = taos_errno(pSql); + pRes = taos_query(con, qstr); + code = taos_errno(pRes); if (code != 0) { - pError("failed to use db, code:%d reason:%s", taos_errno(con), taos_errstr(con)); + pError("failed to use db, code:%d reason:%s", taos_errno(pRes), taos_errstr(pRes)); exit(0); } - taos_free_result(pSql); + taos_free_result(pRes); taos_close(con); } @@ -160,16 +195,20 @@ void *threadFunc(void *param) { TAOS *con = taos_connect(NULL, "root", "taosdata", NULL, 0); if (con == NULL) { - pError("index:%d, failed to connect to DB, reason:%s", pInfo->threadIndex, taos_errstr(con)); + pError("index:%d, failed to connect to DB, reason:%s", pInfo->threadIndex, taos_errstr(NULL)); exit(1); } + //printf("thread:%d, table range: %"PRId64 " - %"PRId64 "\n", pInfo->threadIndex, pInfo->tableBeginIndex, pInfo->tableEndIndex); sprintf(qstr, "use %s", pInfo->dbName); - TAOS_RES *pSql = taos_query(con, qstr); - taos_free_result(pSql); + TAOS_RES *pRes = taos_query(con, qstr); + taos_free_result(pRes); + if (createTable) { - pInfo->startMs = taosGetTimestampMs(); + int64_t curMs = 0; + int64_t beginMs = taosGetTimestampMs(); + pInfo->startMs = beginMs; for (int64_t t = pInfo->tableBeginIndex; t < pInfo->tableEndIndex; ++t) { int64_t batch = (pInfo->tableEndIndex - t); batch = MIN(batch, batchNum); @@ -179,14 +218,22 @@ void *threadFunc(void *param) { len += sprintf(qstr + len, " t%" PRId64 " using %s tags(%" PRId64 ")", t + i, stbName, t + i); } - TAOS_RES *pSql = taos_query(con, qstr); - code = taos_errno(pSql); + int64_t startTs = taosGetTimestampUs(); + TAOS_RES *pRes = taos_query(con, qstr); + code = taos_errno(pRes); if (code != 0) { pError("failed to create table t%" PRId64 ", reason:%s", t, tstrerror(code)); } - taos_free_result(pSql); + taos_free_result(pRes); + int64_t endTs = taosGetTimestampUs(); + int64_t delay = endTs - startTs; + //printf("==== %"PRId64" - %"PRId64", %"PRId64"\n", startTs, endTs, delay); + if (delay > pInfo->maxDelay) pInfo->maxDelay = delay; + if (delay < pInfo->minDelay) pInfo->minDelay = delay; - if (t % 100000 == 0) { + curMs = taosGetTimestampMs(); + if (curMs - beginMs > 10000) { + beginMs = curMs; printCreateProgress(pInfo, t); } t += (batch - 1); @@ -195,6 +242,9 @@ void *threadFunc(void *param) { } if (insertData) { + int64_t curMs = 0; + int64_t beginMs = taosGetTimestampMs();; + pInfo->startMs = taosGetTimestampMs(); for (int64_t t = pInfo->tableBeginIndex; t < pInfo->tableEndIndex; ++t) { int64_t batch = (pInfo->tableEndIndex - t); @@ -205,14 +255,15 @@ void *threadFunc(void *param) { len += sprintf(qstr + len, " t%" PRId64 " values(now, %" PRId64 ")", t + i, t + i); } - TAOS_RES *pSql = taos_query(con, qstr); - code = taos_errno(pSql); + TAOS_RES *pRes = taos_query(con, qstr); + code = taos_errno(pRes); if (code != 0) { pError("failed to insert table t%" PRId64 ", reason:%s", t, tstrerror(code)); } - taos_free_result(pSql); + taos_free_result(pRes); - if (t % 100000 == 0) { + curMs = taosGetTimestampMs(); + if (curMs - beginMs > 10000) { printInsertProgress(pInfo, t); } t += (batch - 1); @@ -266,7 +317,7 @@ void parseArgument(int32_t argc, char *argv[]) { numOfThreads = atoi(argv[++i]); } else if (strcmp(argv[i], "-n") == 0) { numOfTables = atoll(argv[++i]); - } else if (strcmp(argv[i], "-n") == 0) { + } else if (strcmp(argv[i], "-v") == 0) { numOfVgroups = atoi(argv[++i]); } else if (strcmp(argv[i], "-a") == 0) { createTable = atoi(argv[++i]);