diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 8cddc24bc7..a59326fa7c 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -996,7 +996,7 @@ typedef struct { char encrypt; char secret[TSDB_PASSWORD_LEN]; char ckey[TSDB_PASSWORD_LEN]; -} SAuthMsg, SAuthRsp; +} SAuthReq, SAuthRsp; typedef struct { int8_t finished; diff --git a/source/dnode/mgmt/impl/src/dndMnode.c b/source/dnode/mgmt/impl/src/dndMnode.c index e947b590ba..5f2d90123c 100644 --- a/source/dnode/mgmt/impl/src/dndMnode.c +++ b/source/dnode/mgmt/impl/src/dndMnode.c @@ -305,18 +305,18 @@ static void dndBuildMnodeOpenOption(SDnode *pDnode, SMnodeOpt *pOption) { memcpy(&pOption->replicas, pMgmt->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA); } -static int32_t dndBuildMnodeOptionFromMsg(SDnode *pDnode, SMnodeOpt *pOption, SDCreateMnodeMsg *pMsg) { +static int32_t dndBuildMnodeOptionFromReq(SDnode *pDnode, SMnodeOpt *pOption, SDCreateMnodeMsg *pReq) { dndInitMnodeOption(pDnode, pOption); pOption->dnodeId = dndGetDnodeId(pDnode); pOption->clusterId = dndGetClusterId(pDnode); - pOption->replica = pMsg->replica; + pOption->replica = pReq->replica; pOption->selfIndex = -1; - for (int32_t i = 0; i < pMsg->replica; ++i) { + for (int32_t i = 0; i < pReq->replica; ++i) { SReplica *pReplica = &pOption->replicas[i]; - pReplica->id = pMsg->replicas[i].id; - pReplica->port = pMsg->replicas[i].port; - memcpy(pReplica->fqdn, pMsg->replicas[i].fqdn, TSDB_FQDN_LEN); + pReplica->id = pReq->replicas[i].id; + pReplica->port = pReq->replicas[i].port; + memcpy(pReplica->fqdn, pReq->replicas[i].fqdn, TSDB_FQDN_LEN); if (pReplica->id == pOption->dnodeId) { pOption->selfIndex = i; } @@ -423,26 +423,26 @@ static int32_t dndDropMnode(SDnode *pDnode) { return 0; } -static SDCreateMnodeMsg *dndParseCreateMnodeMsg(SRpcMsg *pRpcMsg) { - SDCreateMnodeMsg *pMsg = pRpcMsg->pCont; - pMsg->dnodeId = htonl(pMsg->dnodeId); - for (int32_t i = 0; i < pMsg->replica; ++i) { - pMsg->replicas[i].id = htonl(pMsg->replicas[i].id); - pMsg->replicas[i].port = htons(pMsg->replicas[i].port); +static SDCreateMnodeMsg *dndParseCreateMnodeReq(SRpcMsg *pReq) { + SDCreateMnodeMsg *pCreate = pReq->pCont; + pCreate->dnodeId = htonl(pCreate->dnodeId); + for (int32_t i = 0; i < pCreate->replica; ++i) { + pCreate->replicas[i].id = htonl(pCreate->replicas[i].id); + pCreate->replicas[i].port = htons(pCreate->replicas[i].port); } - return pMsg; + return pCreate; } -int32_t dndProcessCreateMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) { - SDCreateMnodeMsg *pMsg = dndParseCreateMnodeMsg(pRpcMsg); +int32_t dndProcessCreateMnodeReq(SDnode *pDnode, SRpcMsg *pReq) { + SDCreateMnodeMsg *pCreate = dndParseCreateMnodeReq(pReq); - if (pMsg->dnodeId != dndGetDnodeId(pDnode)) { + if (pCreate->dnodeId != dndGetDnodeId(pDnode)) { terrno = TSDB_CODE_DND_MNODE_ID_INVALID; return -1; } else { SMnodeOpt option = {0}; - if (dndBuildMnodeOptionFromMsg(pDnode, &option, pMsg) != 0) { + if (dndBuildMnodeOptionFromReq(pDnode, &option, pCreate) != 0) { return -1; } @@ -450,16 +450,16 @@ int32_t dndProcessCreateMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) { } } -int32_t dndProcessAlterMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) { - SDAlterMnodeMsg *pMsg = dndParseCreateMnodeMsg(pRpcMsg); +int32_t dndProcessAlterMnodeReq(SDnode *pDnode, SRpcMsg *pReq) { + SDAlterMnodeMsg *pAlter = dndParseCreateMnodeReq(pReq); - if (pMsg->dnodeId != dndGetDnodeId(pDnode)) { + if (pAlter->dnodeId != dndGetDnodeId(pDnode)) { terrno = TSDB_CODE_DND_MNODE_ID_INVALID; return -1; } SMnodeOpt option = {0}; - if (dndBuildMnodeOptionFromMsg(pDnode, &option, pMsg) != 0) { + if (dndBuildMnodeOptionFromReq(pDnode, &option, pAlter) != 0) { return -1; } @@ -470,11 +470,11 @@ int32_t dndProcessAlterMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) { return dndWriteMnodeFile(pDnode); } -int32_t dndProcessDropMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) { - SDDropMnodeMsg *pMsg = pRpcMsg->pCont; - pMsg->dnodeId = htonl(pMsg->dnodeId); +int32_t dndProcessDropMnodeReq(SDnode *pDnode, SRpcMsg *pReq) { + SDDropMnodeMsg *pDrop = pReq->pCont; + pDrop->dnodeId = htonl(pDrop->dnodeId); - if (pMsg->dnodeId != dndGetDnodeId(pDnode)) { + if (pDrop->dnodeId != dndGetDnodeId(pDnode)) { terrno = TSDB_CODE_DND_MNODE_ID_INVALID; return -1; } else { diff --git a/source/dnode/mgmt/impl/src/dndTransport.c b/source/dnode/mgmt/impl/src/dndTransport.c index 49b9e8d6e1..509e8f4cab 100644 --- a/source/dnode/mgmt/impl/src/dndTransport.c +++ b/source/dnode/mgmt/impl/src/dndTransport.c @@ -143,26 +143,26 @@ static void dndInitMsgFp(STransMgmt *pMgmt) { pMgmt->msgFp[TMSG_INDEX(TDMT_VND_SHOW_TABLES_FETCH)] = dndProcessVnodeFetchMsg; } -static void dndProcessResponse(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { +static void dndProcessResponse(void *parent, SRpcMsg *pRsp, SEpSet *pEpSet) { SDnode *pDnode = parent; STransMgmt *pMgmt = &pDnode->tmgmt; - tmsg_t msgType = pMsg->msgType; + tmsg_t msgType = pRsp->msgType; if (dndGetStat(pDnode) == DND_STAT_STOPPED) { - if (pMsg == NULL || pMsg->pCont == NULL) return; - dTrace("RPC %p, rsp:%s is ignored since dnode is stopping", pMsg->handle, TMSG_INFO(msgType)); - rpcFreeCont(pMsg->pCont); + if (pRsp == NULL || pRsp->pCont == NULL) return; + dTrace("RPC %p, rsp:%s is ignored since dnode is stopping", pRsp->handle, TMSG_INFO(msgType)); + rpcFreeCont(pRsp->pCont); return; } DndMsgFp fp = pMgmt->msgFp[TMSG_INDEX(msgType)]; if (fp != NULL) { - dTrace("RPC %p, rsp:%s will be processed, code:0x%x", pMsg->handle, TMSG_INFO(msgType), pMsg->code & 0XFFFF); - (*fp)(pDnode, pMsg, pEpSet); + dTrace("RPC %p, rsp:%s will be processed, code:0x%x", pRsp->handle, TMSG_INFO(msgType), pRsp->code & 0XFFFF); + (*fp)(pDnode, pRsp, pEpSet); } else { - dError("RPC %p, rsp:%s not processed", pMsg->handle, TMSG_INFO(msgType)); - rpcFreeCont(pMsg->pCont); + dError("RPC %p, rsp:%s not processed", pRsp->handle, TMSG_INFO(msgType)); + rpcFreeCont(pRsp->pCont); } } @@ -201,48 +201,48 @@ static void dndCleanupClient(SDnode *pDnode) { } } -static void dndProcessRequest(void *param, SRpcMsg *pMsg, SEpSet *pEpSet) { +static void dndProcessRequest(void *param, SRpcMsg *pReq, SEpSet *pEpSet) { SDnode *pDnode = param; STransMgmt *pMgmt = &pDnode->tmgmt; - tmsg_t msgType = pMsg->msgType; + tmsg_t msgType = pReq->msgType; if (msgType == TDMT_DND_NETWORK_TEST) { - dTrace("RPC %p, network test req, app:%p will be processed, code:0x%x", pMsg->handle, pMsg->ahandle, pMsg->code); - dndProcessStartupReq(pDnode, pMsg); + dTrace("RPC %p, network test req, app:%p will be processed, code:0x%x", pReq->handle, pReq->ahandle, pReq->code); + dndProcessStartupReq(pDnode, pReq); return; } if (dndGetStat(pDnode) == DND_STAT_STOPPED) { - dError("RPC %p, req:%s app:%p is ignored since dnode exiting", pMsg->handle, TMSG_INFO(msgType), pMsg->ahandle); - SRpcMsg rspMsg = {.handle = pMsg->handle, .code = TSDB_CODE_DND_OFFLINE}; + dError("RPC %p, req:%s app:%p is ignored since dnode exiting", pReq->handle, TMSG_INFO(msgType), pReq->ahandle); + SRpcMsg rspMsg = {.handle = pReq->handle, .code = TSDB_CODE_DND_OFFLINE}; rpcSendResponse(&rspMsg); - rpcFreeCont(pMsg->pCont); + rpcFreeCont(pReq->pCont); return; } else if (dndGetStat(pDnode) != DND_STAT_RUNNING) { - dError("RPC %p, req:%s app:%p is ignored since dnode not running", pMsg->handle, TMSG_INFO(msgType), pMsg->ahandle); - SRpcMsg rspMsg = {.handle = pMsg->handle, .code = TSDB_CODE_APP_NOT_READY}; + dError("RPC %p, req:%s app:%p is ignored since dnode not running", pReq->handle, TMSG_INFO(msgType), pReq->ahandle); + SRpcMsg rspMsg = {.handle = pReq->handle, .code = TSDB_CODE_APP_NOT_READY}; rpcSendResponse(&rspMsg); - rpcFreeCont(pMsg->pCont); + rpcFreeCont(pReq->pCont); return; } - if (pMsg->pCont == NULL) { - dTrace("RPC %p, req:%s app:%p not processed since content is null", pMsg->handle, TMSG_INFO(msgType), - pMsg->ahandle); - SRpcMsg rspMsg = {.handle = pMsg->handle, .code = TSDB_CODE_DND_INVALID_MSG_LEN}; + if (pReq->pCont == NULL) { + dTrace("RPC %p, req:%s app:%p not processed since content is null", pReq->handle, TMSG_INFO(msgType), + pReq->ahandle); + SRpcMsg rspMsg = {.handle = pReq->handle, .code = TSDB_CODE_DND_INVALID_MSG_LEN}; rpcSendResponse(&rspMsg); return; } DndMsgFp fp = pMgmt->msgFp[TMSG_INDEX(msgType)]; if (fp != NULL) { - dTrace("RPC %p, req:%s app:%p will be processed", pMsg->handle, TMSG_INFO(msgType), pMsg->ahandle); - (*fp)(pDnode, pMsg, pEpSet); + dTrace("RPC %p, req:%s app:%p will be processed", pReq->handle, TMSG_INFO(msgType), pReq->ahandle); + (*fp)(pDnode, pReq, pEpSet); } else { - dError("RPC %p, req:%s app:%p is not processed since no handle", pMsg->handle, TMSG_INFO(msgType), pMsg->ahandle); - SRpcMsg rspMsg = {.handle = pMsg->handle, .code = TSDB_CODE_MSG_NOT_PROCESSED}; + dError("RPC %p, req:%s app:%p is not processed since no handle", pReq->handle, TMSG_INFO(msgType), pReq->ahandle); + SRpcMsg rspMsg = {.handle = pReq->handle, .code = TSDB_CODE_MSG_NOT_PROCESSED}; rpcSendResponse(&rspMsg); - rpcFreeCont(pMsg->pCont); + rpcFreeCont(pReq->pCont); } } @@ -254,7 +254,7 @@ static void dndSendMsgToMnodeRecv(SDnode *pDnode, SRpcMsg *pRpcMsg, SRpcMsg *pRp rpcSendRecv(pMgmt->clientRpc, &epSet, pRpcMsg, pRpcRsp); } -static int32_t dndAuthInternalMsg(SDnode *pDnode, char *user, char *spi, char *encrypt, char *secret, char *ckey) { +static int32_t dndAuthInternalReq(SDnode *pDnode, char *user, char *spi, char *encrypt, char *secret, char *ckey) { if (strcmp(user, INTERNAL_USER) == 0) { // A simple temporary implementation char pass[TSDB_PASSWORD_LEN] = {0}; @@ -281,7 +281,7 @@ static int32_t dndAuthInternalMsg(SDnode *pDnode, char *user, char *spi, char *e static int32_t dndRetrieveUserAuthInfo(void *parent, char *user, char *spi, char *encrypt, char *secret, char *ckey) { SDnode *pDnode = parent; - if (dndAuthInternalMsg(parent, user, spi, encrypt, secret, ckey) == 0) { + if (dndAuthInternalReq(parent, user, spi, encrypt, secret, ckey) == 0) { // dTrace("get internal auth success"); return 0; } @@ -298,10 +298,10 @@ static int32_t dndRetrieveUserAuthInfo(void *parent, char *user, char *spi, char // dDebug("user:%s, send auth msg to other mnodes", user); - SAuthMsg *pMsg = rpcMallocCont(sizeof(SAuthMsg)); - tstrncpy(pMsg->user, user, TSDB_USER_LEN); + SAuthReq *pReq = rpcMallocCont(sizeof(SAuthReq)); + tstrncpy(pReq->user, user, TSDB_USER_LEN); - SRpcMsg rpcMsg = {.pCont = pMsg, .contLen = sizeof(SAuthMsg), .msgType = TDMT_MND_AUTH}; + SRpcMsg rpcMsg = {.pCont = pReq, .contLen = sizeof(SAuthReq), .msgType = TDMT_MND_AUTH}; SRpcMsg rpcRsp = {0}; dndSendMsgToMnodeRecv(pDnode, &rpcMsg, &rpcRsp); @@ -381,19 +381,19 @@ void dndCleanupTrans(SDnode *pDnode) { dInfo("dnode-transport is cleaned up"); } -int32_t dndSendReqToDnode(SDnode *pDnode, SEpSet *pEpSet, SRpcMsg *pMsg) { +int32_t dndSendReqToDnode(SDnode *pDnode, SEpSet *pEpSet, SRpcMsg *pReq) { STransMgmt *pMgmt = &pDnode->tmgmt; if (pMgmt->clientRpc == NULL) { terrno = TSDB_CODE_DND_OFFLINE; return -1; } - rpcSendRequest(pMgmt->clientRpc, pEpSet, pMsg, NULL); + rpcSendRequest(pMgmt->clientRpc, pEpSet, pReq, NULL); return 0; } -int32_t dndSendReqToMnode(SDnode *pDnode, SRpcMsg *pMsg) { +int32_t dndSendReqToMnode(SDnode *pDnode, SRpcMsg *pReq) { SEpSet epSet = {0}; dndGetMnodeEpSet(pDnode, &epSet); - return dndSendReqToDnode(pDnode, &epSet, pMsg); + return dndSendReqToDnode(pDnode, &epSet, pReq); } diff --git a/source/dnode/mgmt/impl/test/mnode/qmnode.cpp b/source/dnode/mgmt/impl/test/mnode/qmnode.cpp index 1bf692d892..00098a856a 100644 --- a/source/dnode/mgmt/impl/test/mnode/qmnode.cpp +++ b/source/dnode/mgmt/impl/test/mnode/qmnode.cpp @@ -24,3 +24,160 @@ class DndTestMnode : public ::testing::Test { }; Testbase DndTestMnode::test; + +#if 0 +TEST_F(DndTestMnode, 01_Create_Mnode) { + { + int32_t contLen = sizeof(SDCreateMnodeReq); + + SDCreateMnodeReq* pReq = (SDCreateMnodeReq*)rpcMallocCont(contLen); + pReq->dnodeId = htonl(2); + + SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_MNODE, pReq, contLen); + ASSERT_NE(pRsp, nullptr); + ASSERT_EQ(pRsp->code, TSDB_CODE_DND_MNODE_ID_INVALID); + } + + { + int32_t contLen = sizeof(SDCreateMnodeReq); + + SDCreateMnodeReq* pReq = (SDCreateMnodeReq*)rpcMallocCont(contLen); + pReq->dnodeId = htonl(1); + + SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_MNODE, pReq, contLen); + ASSERT_NE(pRsp, nullptr); + ASSERT_EQ(pRsp->code, 0); + } + + { + int32_t contLen = sizeof(SDCreateMnodeReq); + + SDCreateMnodeReq* pReq = (SDCreateMnodeReq*)rpcMallocCont(contLen); + pReq->dnodeId = htonl(1); + + SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_MNODE, pReq, contLen); + ASSERT_NE(pRsp, nullptr); + ASSERT_EQ(pRsp->code, TSDB_CODE_DND_MNODE_ALREADY_DEPLOYED); + } + + test.Restart(); + + { + int32_t contLen = sizeof(SDCreateMnodeReq); + + SDCreateMnodeReq* pReq = (SDCreateMnodeReq*)rpcMallocCont(contLen); + pReq->dnodeId = htonl(1); + + SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_MNODE, pReq, contLen); + ASSERT_NE(pRsp, nullptr); + ASSERT_EQ(pRsp->code, TSDB_CODE_DND_MNODE_ALREADY_DEPLOYED); + } +} + +TEST_F(DndTestMnode, 02_Alter_Mnode) { + { + int32_t contLen = sizeof(SDCreateMnodeReq); + + SDCreateMnodeReq* pReq = (SDCreateMnodeReq*)rpcMallocCont(contLen); + pReq->dnodeId = htonl(2); + + SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_MNODE, pReq, contLen); + ASSERT_NE(pRsp, nullptr); + ASSERT_EQ(pRsp->code, TSDB_CODE_DND_MNODE_ID_INVALID); + } + + { + int32_t contLen = sizeof(SDCreateMnodeReq); + + SDCreateMnodeReq* pReq = (SDCreateMnodeReq*)rpcMallocCont(contLen); + pReq->dnodeId = htonl(1); + + SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_MNODE, pReq, contLen); + ASSERT_NE(pRsp, nullptr); + ASSERT_EQ(pRsp->code, 0); + } + + { + int32_t contLen = sizeof(SDCreateMnodeReq); + + SDCreateMnodeReq* pReq = (SDCreateMnodeReq*)rpcMallocCont(contLen); + pReq->dnodeId = htonl(1); + + SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_MNODE, pReq, contLen); + ASSERT_NE(pRsp, nullptr); + ASSERT_EQ(pRsp->code, TSDB_CODE_DND_MNODE_ALREADY_DEPLOYED); + } + + test.Restart(); + + { + int32_t contLen = sizeof(SDCreateMnodeReq); + + SDCreateMnodeReq* pReq = (SDCreateMnodeReq*)rpcMallocCont(contLen); + pReq->dnodeId = htonl(1); + + SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_MNODE, pReq, contLen); + ASSERT_NE(pRsp, nullptr); + ASSERT_EQ(pRsp->code, TSDB_CODE_DND_MNODE_ALREADY_DEPLOYED); + } +} + +TEST_F(DndTestMnode, 03_Drop_Mnode) { + { + int32_t contLen = sizeof(SDDropMnodeReq); + + SDDropMnodeReq* pReq = (SDDropMnodeReq*)rpcMallocCont(contLen); + pReq->dnodeId = htonl(2); + + SRpcMsg* pRsp = test.SendReq(TDMT_DND_DROP_MNODE, pReq, contLen); + ASSERT_NE(pRsp, nullptr); + ASSERT_EQ(pRsp->code, TSDB_CODE_DND_MNODE_ID_INVALID); + } + + { + int32_t contLen = sizeof(SDDropMnodeReq); + + SDDropMnodeReq* pReq = (SDDropMnodeReq*)rpcMallocCont(contLen); + pReq->dnodeId = htonl(1); + + SRpcMsg* pRsp = test.SendReq(TDMT_DND_DROP_MNODE, pReq, contLen); + ASSERT_NE(pRsp, nullptr); + ASSERT_EQ(pRsp->code, 0); + } + + { + int32_t contLen = sizeof(SDDropMnodeReq); + + SDDropMnodeReq* pReq = (SDDropMnodeReq*)rpcMallocCont(contLen); + pReq->dnodeId = htonl(1); + + SRpcMsg* pRsp = test.SendReq(TDMT_DND_DROP_MNODE, pReq, contLen); + ASSERT_NE(pRsp, nullptr); + ASSERT_EQ(pRsp->code, TSDB_CODE_DND_MNODE_NOT_DEPLOYED); + } + + test.Restart(); + + { + int32_t contLen = sizeof(SDDropMnodeReq); + + SDDropMnodeReq* pReq = (SDDropMnodeReq*)rpcMallocCont(contLen); + pReq->dnodeId = htonl(1); + + SRpcMsg* pRsp = test.SendReq(TDMT_DND_DROP_MNODE, pReq, contLen); + ASSERT_NE(pRsp, nullptr); + ASSERT_EQ(pRsp->code, TSDB_CODE_DND_MNODE_NOT_DEPLOYED); + } + + { + int32_t contLen = sizeof(SDCreateMnodeReq); + + SDCreateMnodeReq* pReq = (SDCreateMnodeReq*)rpcMallocCont(contLen); + pReq->dnodeId = htonl(1); + + SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_MNODE, pReq, contLen); + ASSERT_NE(pRsp, nullptr); + ASSERT_EQ(pRsp->code, 0); + } +} +#endif \ No newline at end of file diff --git a/tests/test/c/create_table.c b/tests/test/c/create_table.c index c46812c9a5..f2db9d0a0c 100644 --- a/tests/test/c/create_table.c +++ b/tests/test/c/create_table.c @@ -25,20 +25,21 @@ char dbName[32] = "db"; char stbName[64] = "st"; int32_t numOfThreads = 1; -int32_t numOfTables = 10000; +int64_t numOfTables = 200000; int32_t createTable = 1; int32_t insertData = 0; -int32_t batchNum = 10; +int32_t batchNum = 100; int32_t numOfVgroups = 2; typedef struct { - int32_t tableBeginIndex; - int32_t tableEndIndex; + int64_t tableBeginIndex; + int64_t tableEndIndex; int32_t threadIndex; char dbName[32]; char stbName[64]; float createTableSpeed; float insertDataSpeed; + int64_t startMs; pthread_t thread; } SThreadInfo; @@ -57,7 +58,7 @@ int32_t main(int32_t argc, char *argv[]) { pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); SThreadInfo *pInfo = (SThreadInfo *)calloc(numOfThreads, sizeof(SThreadInfo)); - int32_t numOfTablesPerThread = numOfTables / numOfThreads; + int64_t numOfTablesPerThread = numOfTables / numOfThreads; numOfTables = numOfTablesPerThread * numOfThreads; for (int32_t i = 0; i < numOfThreads; ++i) { pInfo[i].tableBeginIndex = i * numOfTablesPerThread; @@ -83,8 +84,10 @@ int32_t main(int32_t argc, char *argv[]) { insertDataSpeed += pInfo[i].insertDataSpeed; } - pPrint("%s total %.1f tables/second, threads:%d %s", GREEN, createTableSpeed, numOfThreads, NC); - pPrint("%s total %.1f rows/second, threads:%d %s", GREEN, insertDataSpeed, numOfThreads, NC); + pPrint("%s total %" PRId64 " tables, %.1f tables/second, threads:%d %s", GREEN, numOfTables, createTableSpeed, + numOfThreads, NC); + pPrint("%s total %" PRId64 " tables, %.1f rows/second, threads:%d %s", GREEN, numOfTables, insertDataSpeed, + numOfThreads, NC); pthread_attr_destroy(&thattr); free(pInfo); @@ -130,6 +133,26 @@ void createDbAndStb() { taos_close(con); } +void printCreateProgress(SThreadInfo *pInfo, int64_t t) { + int64_t endMs = taosGetTimestampMs(); + int64_t totalTables = t - pInfo->tableBeginIndex; + float seconds = (endMs - pInfo->startMs) / 1000.0; + float speed = totalTables / seconds; + pInfo->createTableSpeed = speed; + pPrint("thread:%d, %" PRId64 " tables created, time:%.2f sec, speed:%.1f tables/second, ", pInfo->threadIndex, + totalTables, seconds, speed); +} + +void printInsertProgress(SThreadInfo *pInfo, int64_t t) { + int64_t endMs = taosGetTimestampMs(); + int64_t totalTables = t - pInfo->tableBeginIndex; + float seconds = (endMs - pInfo->startMs) / 1000.0; + float speed = totalTables / seconds; + pInfo->insertDataSpeed = speed; + pPrint("thread:%d, %" PRId64 " rows inserted, time:%.2f sec, speed:%.1f rows/second, ", pInfo->threadIndex, + totalTables, seconds, speed); +} + void *threadFunc(void *param) { SThreadInfo *pInfo = (SThreadInfo *)param; char *qstr = malloc(2000 * 1000); @@ -146,47 +169,55 @@ void *threadFunc(void *param) { taos_free_result(pSql); if (createTable) { - int64_t startMs = taosGetTimestampMs(); - for (int32_t t = pInfo->tableBeginIndex; t < pInfo->tableEndIndex; ++t) { - int32_t batch = (pInfo->tableEndIndex - t); + pInfo->startMs = taosGetTimestampMs(); + for (int64_t t = pInfo->tableBeginIndex; t < pInfo->tableEndIndex; ++t) { + int64_t batch = (pInfo->tableEndIndex - t); batch = MIN(batch, batchNum); int32_t len = sprintf(qstr, "create table"); for (int32_t i = 0; i < batch; ++i) { - len += sprintf(qstr + len, " t%d using %s tags(%d)", t + i, stbName, t + i); + len += sprintf(qstr + len, " t%" PRId64 " using %s tags(%" PRId64 ")", t + i, stbName, t + i); } + TAOS_RES *pSql = taos_query(con, qstr); code = taos_errno(pSql); if (code != 0) { - pError("failed to create table t%d, reason:%s", t, tstrerror(code)); + pError("failed to create table t%" PRId64 ", reason:%s", t, tstrerror(code)); } taos_free_result(pSql); + + if (t % 100000 == 0) { + printCreateProgress(pInfo, t); + } + t += (batch - 1); } - int64_t endMs = taosGetTimestampMs(); - int32_t totalTables = pInfo->tableEndIndex - pInfo->tableBeginIndex; - float seconds = (endMs - startMs) / 1000.0; - float speed = totalTables / seconds; - pInfo->createTableSpeed = speed; - pPrint("thread:%d, time:%.2f sec, speed:%.1f tables/second, ", pInfo->threadIndex, seconds, speed); + printCreateProgress(pInfo, pInfo->tableEndIndex); } if (insertData) { - int64_t startMs = taosGetTimestampMs(); - for (int32_t t = pInfo->tableBeginIndex; t < pInfo->tableEndIndex; ++t) { - sprintf(qstr, "insert into %s%d values(now, 1)", stbName, t); + pInfo->startMs = taosGetTimestampMs(); + for (int64_t t = pInfo->tableBeginIndex; t < pInfo->tableEndIndex; ++t) { + int64_t batch = (pInfo->tableEndIndex - t); + batch = MIN(batch, batchNum); + + int32_t len = sprintf(qstr, "insert into"); + for (int32_t i = 0; i < batch; ++i) { + len += sprintf(qstr + len, " t%" PRId64 " values(now, %" PRId64 ")", t + i, t + i); + } + TAOS_RES *pSql = taos_query(con, qstr); code = taos_errno(pSql); if (code != 0) { - pError("failed to create table %s%d, reason:%s", stbName, t, tstrerror(code)); + pError("failed to insert table t%" PRId64 ", reason:%s", t, tstrerror(code)); } taos_free_result(pSql); + + if (t % 100000 == 0) { + printInsertProgress(pInfo, t); + } + t += (batch - 1); } - int64_t endMs = taosGetTimestampMs(); - int32_t totalTables = pInfo->tableEndIndex - pInfo->tableBeginIndex; - float seconds = (endMs - startMs) / 1000.0; - float speed = totalTables / seconds; - pInfo->insertDataSpeed = speed; - pPrint("thread:%d, time:%.2f sec, speed:%.1f rows/second, ", pInfo->threadIndex, seconds, speed); + printInsertProgress(pInfo, pInfo->tableEndIndex); } taos_close(con); @@ -207,7 +238,7 @@ void printHelp() { printf("%s%s\n", indent, "-t"); printf("%s%s%s%d\n", indent, indent, "numOfThreads, default is ", numOfThreads); printf("%s%s\n", indent, "-n"); - printf("%s%s%s%d\n", indent, indent, "numOfTables, default is ", numOfTables); + printf("%s%s%s%" PRId64 "\n", indent, indent, "numOfTables, default is ", numOfTables); printf("%s%s\n", indent, "-v"); printf("%s%s%s%d\n", indent, indent, "numOfVgroups, default is ", numOfVgroups); printf("%s%s\n", indent, "-a"); @@ -234,7 +265,7 @@ void parseArgument(int32_t argc, char *argv[]) { } else if (strcmp(argv[i], "-t") == 0) { numOfThreads = atoi(argv[++i]); } else if (strcmp(argv[i], "-n") == 0) { - numOfTables = atoi(argv[++i]); + numOfTables = atoll(argv[++i]); } else if (strcmp(argv[i], "-n") == 0) { numOfVgroups = atoi(argv[++i]); } else if (strcmp(argv[i], "-a") == 0) { @@ -250,7 +281,7 @@ void parseArgument(int32_t argc, char *argv[]) { pPrint("%s dbName:%s %s", GREEN, dbName, NC); pPrint("%s stbName:%s %s", GREEN, stbName, NC); pPrint("%s configDir:%s %s", GREEN, configDir, NC); - pPrint("%s numOfTables:%d %s", GREEN, numOfTables, NC); + pPrint("%s numOfTables:%" PRId64 " %s", GREEN, numOfTables, NC); pPrint("%s numOfThreads:%d %s", GREEN, numOfThreads, NC); pPrint("%s numOfVgroups:%d %s", GREEN, numOfVgroups, NC); pPrint("%s createTable:%d %s", GREEN, createTable, NC);