diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index e0821b8ca6..98677c64d3 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -1181,7 +1181,7 @@ static SCliThrd* createThrdObj(void* trans) { pThrd->loop = (uv_loop_t*)taosMemoryMalloc(sizeof(uv_loop_t)); uv_loop_init(pThrd->loop); - pThrd->asyncPool = transAsyncPoolCreate(pThrd->loop, 5, pThrd, cliAsyncCb); + pThrd->asyncPool = transAsyncPoolCreate(pThrd->loop, 8, pThrd, cliAsyncCb); pThrd->prepare = taosMemoryCalloc(1, sizeof(uv_prepare_t)); uv_prepare_init(pThrd->loop, pThrd->prepare); @@ -1253,11 +1253,14 @@ void cliWalkCb(uv_handle_t* handle, void* arg) { } FORCE_INLINE int cliRBChoseIdx(STrans* pTransInst) { - int8_t index = pTransInst->index; + int32_t index = pTransInst->index; if (pTransInst->numOfThreads == 0) { return -1; } - if (pTransInst->index++ >= pTransInst->numOfThreads) { + /* + * no lock, and to avoid CPU load imbalance, set limit pTransInst->numOfThreads * 2000; + */ + if (pTransInst->index++ >= pTransInst->numOfThreads * 2000) { pTransInst->index = 0; } return index % pTransInst->numOfThreads; @@ -1271,7 +1274,7 @@ static FORCE_INLINE void doDelayTask(void* param) { static void doCloseIdleConn(void* param) { STaskArg* arg = param; SCliConn* conn = arg->param1; - tTrace("%s conn %p idle, close it", CONN_GET_INST_LABEL(conn), conn); + tDebug("%s conn %p idle, close it", CONN_GET_INST_LABEL(conn), conn); conn->task = NULL; cliDestroyConn(conn, true); taosMemoryFree(arg); diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 912bdb2cd0..18b812f314 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -252,7 +252,7 @@ int transAsyncSend(SAsyncPool* pool, queue* q) { int idx = pool->index % pool->nAsync; // no need mutex here - if (pool->index++ > pool->nAsync) { + if (pool->index++ > pool->nAsync * 2000) { pool->index = 0; } uv_async_t* async = &(pool->asyncs[idx]); diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index 944995c892..5f36d91023 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -812,7 +812,7 @@ static bool addHandleToWorkloop(SWorkThrd* pThrd, char* pipeName) { // conn set QUEUE_INIT(&pThrd->conn); - pThrd->asyncPool = transAsyncPoolCreate(pThrd->loop, 5, pThrd, uvWorkerAsyncCb); + pThrd->asyncPool = transAsyncPoolCreate(pThrd->loop, 8, pThrd, uvWorkerAsyncCb); #if defined(WINDOWS) || defined(DARWIN) uv_pipe_connect(&pThrd->connect_req, pThrd->pipe, pipeName, uvOnPipeConnectionCb); #else diff --git a/source/libs/transport/test/cliBench.c b/source/libs/transport/test/cliBench.c index a296625ace..01e88b9988 100644 --- a/source/libs/transport/test/cliBench.c +++ b/source/libs/transport/test/cliBench.c @@ -32,6 +32,22 @@ typedef struct { void *pRpc; } SInfo; + +void initLogEnv() { + const char *logDir = "/tmp/trans_cli"; + const char* defaultLogFileNamePrefix = "taoslog"; + const int32_t maxLogFileNum = 10000; + tsAsyncLog = 0; + //idxDebugFlag = 143; + strcpy(tsLogDir, (char *)logDir); + taosRemoveDir(tsLogDir); + taosMkDir(tsLogDir); + + if (taosInitLog(defaultLogFileNamePrefix, maxLogFileNum) < 0) { + printf("failed to open log file in directory:%s\n", tsLogDir); + } +} + static void processResponse(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { SInfo *pInfo = (SInfo *)pMsg->info.ahandle; tDebug("thread:%d, response is received, type:%d contLen:%d code:0x%x", pInfo->index, pMsg->msgType, pMsg->contLen, @@ -98,7 +114,7 @@ int main(int argc, char *argv[]) { rpcInit.user = "michael"; rpcInit.connType = TAOS_CONN_CLIENT; - rpcDebugFlag = 131; + rpcDebugFlag = 135; for (int i = 1; i < argc; ++i) { if (strcmp(argv[i], "-p") == 0 && i < argc - 1) { } else if (strcmp(argv[i], "-i") == 0 && i < argc - 1) { @@ -132,7 +148,9 @@ int main(int argc, char *argv[]) { exit(0); } } - taosInitLog("client.log", 100000); + + + initLogEnv(); void *pRpc = rpcOpen(&rpcInit); if (pRpc == NULL) { diff --git a/source/libs/transport/test/svrBench.c b/source/libs/transport/test/svrBench.c index ce465d989a..464559c1e0 100644 --- a/source/libs/transport/test/svrBench.c +++ b/source/libs/transport/test/svrBench.c @@ -26,7 +26,40 @@ TdFilePtr pDataFile = NULL; STaosQueue *qhandle = NULL; STaosQset *qset = NULL; -void processShellMsg() { +int32_t balance = 0; + +typedef struct { + int32_t numOfThread; + STaosQueue **qhandle; + STaosQset **qset; + +} MultiThreadQhandle; + +typedef struct TThread { + TdThread thread; + int idx; +} TThread; + +MultiThreadQhandle *multiQ = NULL; + +void initLogEnv() { + const char *logDir = "/tmp/trans_svr"; + const char *defaultLogFileNamePrefix = "taoslog"; + const int32_t maxLogFileNum = 10000; + tsAsyncLog = 0; + // idxDebugFlag = 143; + strcpy(tsLogDir, logDir); + taosRemoveDir(tsLogDir); + taosMkDir(tsLogDir); + + if (taosInitLog(defaultLogFileNamePrefix, maxLogFileNum) < 0) { + printf("failed to open log file in directory:%s\n", tsLogDir); + } +} +void *processShellMsg(void *arg) { + TThread *thread = (TThread *)arg; + + int32_t idx = thread->idx; static int num = 0; STaosQall *qall; SRpcMsg *pRpcMsg, rpcMsg; @@ -36,7 +69,7 @@ void processShellMsg() { qall = taosAllocateQall(); while (1) { - int numOfMsgs = taosReadAllQitemsFromQset(qset, qall, &qinfo); + int numOfMsgs = taosReadAllQitemsFromQset(multiQ->qset[idx], qall, &qinfo); tDebug("%d shell msgs are received", numOfMsgs); if (numOfMsgs <= 0) break; @@ -89,6 +122,7 @@ void processShellMsg() { } taosFreeQall(qall); + return NULL; } void processRequestMsg(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) { @@ -97,8 +131,11 @@ void processRequestMsg(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) { pTemp = taosAllocateQitem(sizeof(SRpcMsg), DEF_QITEM); memcpy(pTemp, pMsg, sizeof(SRpcMsg)); + int32_t idx = balance % multiQ->numOfThread; tDebug("request is received, type:%d, contLen:%d, item:%p", pMsg->msgType, pMsg->contLen, pTemp); - taosWriteQitem(qhandle, pTemp); + taosWriteQitem(multiQ->qhandle[idx], pTemp); + balance++; + if (balance >= multiQ->numOfThread) balance = 0; } int main(int argc, char *argv[]) { @@ -147,9 +184,9 @@ int main(int argc, char *argv[]) { } } - tsAsyncLog = 0; rpcInit.connType = TAOS_CONN_SERVER; - taosInitLog("server.log", 100000); + + initLogEnv(); void *pRpc = rpcOpen(&rpcInit); if (pRpc == NULL) { @@ -164,16 +201,35 @@ int main(int argc, char *argv[]) { pDataFile = taosOpenFile(dataName, TD_FILE_APPEND | TD_FILE_CREATE | TD_FILE_WRITE); if (pDataFile == NULL) tInfo("failed to open data file, reason:%s", strerror(errno)); } - qhandle = taosOpenQueue(); - qset = taosOpenQset(); - taosAddIntoQset(qset, qhandle, NULL); - processShellMsg(); + int32_t numOfAthread = 5; + multiQ = taosMemoryMalloc(sizeof(numOfAthread)); + multiQ->numOfThread = numOfAthread; + multiQ->qhandle = (STaosQueue **)taosMemoryMalloc(sizeof(STaosQueue *) * numOfAthread); + multiQ->qset = (STaosQset **)taosMemoryMalloc(sizeof(STaosQset *) * numOfAthread); + + for (int i = 0; i < numOfAthread; i++) { + multiQ->qhandle[i] = taosOpenQueue(); + multiQ->qset[i] = taosOpenQset(); + taosAddIntoQset(multiQ->qset[i], multiQ->qhandle[i], NULL); + } + TThread *threads = taosMemoryMalloc(sizeof(TThread) * numOfAthread); + for (int i = 0; i < numOfAthread; i++) { + threads[i].idx = i; + taosThreadCreate(&(threads[i].thread), NULL, processShellMsg, (void *)&threads[i]); + } + // qhandle = taosOpenQueue(); + // qset = taosOpenQset(); + // taosAddIntoQset(qset, qhandle, NULL); + + // processShellMsg(); if (pDataFile != NULL) { taosCloseFile(&pDataFile); taosRemoveFile(dataName); } + int ch = getchar(); + UNUSED(ch); return 0; }