Merge pull request #17941 from taosdata/fix/bench

fix: avoid cpu load imbalance
This commit is contained in:
Shengliang Guan 2022-11-07 23:34:55 +08:00 committed by GitHub
commit e21c20e229
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 94 additions and 17 deletions

View File

@ -1181,7 +1181,7 @@ static SCliThrd* createThrdObj(void* trans) {
pThrd->loop = (uv_loop_t*)taosMemoryMalloc(sizeof(uv_loop_t)); pThrd->loop = (uv_loop_t*)taosMemoryMalloc(sizeof(uv_loop_t));
uv_loop_init(pThrd->loop); uv_loop_init(pThrd->loop);
pThrd->asyncPool = transAsyncPoolCreate(pThrd->loop, 5, pThrd, cliAsyncCb); pThrd->asyncPool = transAsyncPoolCreate(pThrd->loop, 8, pThrd, cliAsyncCb);
pThrd->prepare = taosMemoryCalloc(1, sizeof(uv_prepare_t)); pThrd->prepare = taosMemoryCalloc(1, sizeof(uv_prepare_t));
uv_prepare_init(pThrd->loop, pThrd->prepare); uv_prepare_init(pThrd->loop, pThrd->prepare);
@ -1253,11 +1253,14 @@ void cliWalkCb(uv_handle_t* handle, void* arg) {
} }
FORCE_INLINE int cliRBChoseIdx(STrans* pTransInst) { FORCE_INLINE int cliRBChoseIdx(STrans* pTransInst) {
int8_t index = pTransInst->index; int32_t index = pTransInst->index;
if (pTransInst->numOfThreads == 0) { if (pTransInst->numOfThreads == 0) {
return -1; return -1;
} }
if (pTransInst->index++ >= pTransInst->numOfThreads) { /*
* no lock, and to avoid CPU load imbalance, set limit pTransInst->numOfThreads * 2000;
*/
if (pTransInst->index++ >= pTransInst->numOfThreads * 2000) {
pTransInst->index = 0; pTransInst->index = 0;
} }
return index % pTransInst->numOfThreads; return index % pTransInst->numOfThreads;
@ -1271,7 +1274,7 @@ static FORCE_INLINE void doDelayTask(void* param) {
static void doCloseIdleConn(void* param) { static void doCloseIdleConn(void* param) {
STaskArg* arg = param; STaskArg* arg = param;
SCliConn* conn = arg->param1; SCliConn* conn = arg->param1;
tTrace("%s conn %p idle, close it", CONN_GET_INST_LABEL(conn), conn); tDebug("%s conn %p idle, close it", CONN_GET_INST_LABEL(conn), conn);
conn->task = NULL; conn->task = NULL;
cliDestroyConn(conn, true); cliDestroyConn(conn, true);
taosMemoryFree(arg); taosMemoryFree(arg);

View File

@ -252,7 +252,7 @@ int transAsyncSend(SAsyncPool* pool, queue* q) {
int idx = pool->index % pool->nAsync; int idx = pool->index % pool->nAsync;
// no need mutex here // no need mutex here
if (pool->index++ > pool->nAsync) { if (pool->index++ > pool->nAsync * 2000) {
pool->index = 0; pool->index = 0;
} }
uv_async_t* async = &(pool->asyncs[idx]); uv_async_t* async = &(pool->asyncs[idx]);

View File

@ -812,7 +812,7 @@ static bool addHandleToWorkloop(SWorkThrd* pThrd, char* pipeName) {
// conn set // conn set
QUEUE_INIT(&pThrd->conn); QUEUE_INIT(&pThrd->conn);
pThrd->asyncPool = transAsyncPoolCreate(pThrd->loop, 5, pThrd, uvWorkerAsyncCb); pThrd->asyncPool = transAsyncPoolCreate(pThrd->loop, 8, pThrd, uvWorkerAsyncCb);
#if defined(WINDOWS) || defined(DARWIN) #if defined(WINDOWS) || defined(DARWIN)
uv_pipe_connect(&pThrd->connect_req, pThrd->pipe, pipeName, uvOnPipeConnectionCb); uv_pipe_connect(&pThrd->connect_req, pThrd->pipe, pipeName, uvOnPipeConnectionCb);
#else #else

View File

@ -32,6 +32,22 @@ typedef struct {
void *pRpc; void *pRpc;
} SInfo; } SInfo;
void initLogEnv() {
const char *logDir = "/tmp/trans_cli";
const char* defaultLogFileNamePrefix = "taoslog";
const int32_t maxLogFileNum = 10000;
tsAsyncLog = 0;
//idxDebugFlag = 143;
strcpy(tsLogDir, (char *)logDir);
taosRemoveDir(tsLogDir);
taosMkDir(tsLogDir);
if (taosInitLog(defaultLogFileNamePrefix, maxLogFileNum) < 0) {
printf("failed to open log file in directory:%s\n", tsLogDir);
}
}
static void processResponse(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { static void processResponse(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
SInfo *pInfo = (SInfo *)pMsg->info.ahandle; SInfo *pInfo = (SInfo *)pMsg->info.ahandle;
tDebug("thread:%d, response is received, type:%d contLen:%d code:0x%x", pInfo->index, pMsg->msgType, pMsg->contLen, tDebug("thread:%d, response is received, type:%d contLen:%d code:0x%x", pInfo->index, pMsg->msgType, pMsg->contLen,
@ -98,7 +114,7 @@ int main(int argc, char *argv[]) {
rpcInit.user = "michael"; rpcInit.user = "michael";
rpcInit.connType = TAOS_CONN_CLIENT; rpcInit.connType = TAOS_CONN_CLIENT;
rpcDebugFlag = 131; rpcDebugFlag = 135;
for (int i = 1; i < argc; ++i) { for (int i = 1; i < argc; ++i) {
if (strcmp(argv[i], "-p") == 0 && i < argc - 1) { if (strcmp(argv[i], "-p") == 0 && i < argc - 1) {
} else if (strcmp(argv[i], "-i") == 0 && i < argc - 1) { } else if (strcmp(argv[i], "-i") == 0 && i < argc - 1) {
@ -132,7 +148,9 @@ int main(int argc, char *argv[]) {
exit(0); exit(0);
} }
} }
taosInitLog("client.log", 100000);
initLogEnv();
void *pRpc = rpcOpen(&rpcInit); void *pRpc = rpcOpen(&rpcInit);
if (pRpc == NULL) { if (pRpc == NULL) {

View File

@ -26,7 +26,40 @@ TdFilePtr pDataFile = NULL;
STaosQueue *qhandle = NULL; STaosQueue *qhandle = NULL;
STaosQset *qset = NULL; STaosQset *qset = NULL;
void processShellMsg() { int32_t balance = 0;
typedef struct {
int32_t numOfThread;
STaosQueue **qhandle;
STaosQset **qset;
} MultiThreadQhandle;
typedef struct TThread {
TdThread thread;
int idx;
} TThread;
MultiThreadQhandle *multiQ = NULL;
void initLogEnv() {
const char *logDir = "/tmp/trans_svr";
const char *defaultLogFileNamePrefix = "taoslog";
const int32_t maxLogFileNum = 10000;
tsAsyncLog = 0;
// idxDebugFlag = 143;
strcpy(tsLogDir, logDir);
taosRemoveDir(tsLogDir);
taosMkDir(tsLogDir);
if (taosInitLog(defaultLogFileNamePrefix, maxLogFileNum) < 0) {
printf("failed to open log file in directory:%s\n", tsLogDir);
}
}
void *processShellMsg(void *arg) {
TThread *thread = (TThread *)arg;
int32_t idx = thread->idx;
static int num = 0; static int num = 0;
STaosQall *qall; STaosQall *qall;
SRpcMsg *pRpcMsg, rpcMsg; SRpcMsg *pRpcMsg, rpcMsg;
@ -36,7 +69,7 @@ void processShellMsg() {
qall = taosAllocateQall(); qall = taosAllocateQall();
while (1) { while (1) {
int numOfMsgs = taosReadAllQitemsFromQset(qset, qall, &qinfo); int numOfMsgs = taosReadAllQitemsFromQset(multiQ->qset[idx], qall, &qinfo);
tDebug("%d shell msgs are received", numOfMsgs); tDebug("%d shell msgs are received", numOfMsgs);
if (numOfMsgs <= 0) break; if (numOfMsgs <= 0) break;
@ -89,6 +122,7 @@ void processShellMsg() {
} }
taosFreeQall(qall); taosFreeQall(qall);
return NULL;
} }
void processRequestMsg(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) { void processRequestMsg(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) {
@ -97,8 +131,11 @@ void processRequestMsg(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) {
pTemp = taosAllocateQitem(sizeof(SRpcMsg), DEF_QITEM); pTemp = taosAllocateQitem(sizeof(SRpcMsg), DEF_QITEM);
memcpy(pTemp, pMsg, sizeof(SRpcMsg)); memcpy(pTemp, pMsg, sizeof(SRpcMsg));
int32_t idx = balance % multiQ->numOfThread;
tDebug("request is received, type:%d, contLen:%d, item:%p", pMsg->msgType, pMsg->contLen, pTemp); tDebug("request is received, type:%d, contLen:%d, item:%p", pMsg->msgType, pMsg->contLen, pTemp);
taosWriteQitem(qhandle, pTemp); taosWriteQitem(multiQ->qhandle[idx], pTemp);
balance++;
if (balance >= multiQ->numOfThread) balance = 0;
} }
int main(int argc, char *argv[]) { int main(int argc, char *argv[]) {
@ -147,9 +184,9 @@ int main(int argc, char *argv[]) {
} }
} }
tsAsyncLog = 0;
rpcInit.connType = TAOS_CONN_SERVER; rpcInit.connType = TAOS_CONN_SERVER;
taosInitLog("server.log", 100000);
initLogEnv();
void *pRpc = rpcOpen(&rpcInit); void *pRpc = rpcOpen(&rpcInit);
if (pRpc == NULL) { if (pRpc == NULL) {
@ -164,16 +201,35 @@ int main(int argc, char *argv[]) {
pDataFile = taosOpenFile(dataName, TD_FILE_APPEND | TD_FILE_CREATE | TD_FILE_WRITE); pDataFile = taosOpenFile(dataName, TD_FILE_APPEND | TD_FILE_CREATE | TD_FILE_WRITE);
if (pDataFile == NULL) tInfo("failed to open data file, reason:%s", strerror(errno)); if (pDataFile == NULL) tInfo("failed to open data file, reason:%s", strerror(errno));
} }
qhandle = taosOpenQueue();
qset = taosOpenQset();
taosAddIntoQset(qset, qhandle, NULL);
processShellMsg(); int32_t numOfAthread = 5;
multiQ = taosMemoryMalloc(sizeof(numOfAthread));
multiQ->numOfThread = numOfAthread;
multiQ->qhandle = (STaosQueue **)taosMemoryMalloc(sizeof(STaosQueue *) * numOfAthread);
multiQ->qset = (STaosQset **)taosMemoryMalloc(sizeof(STaosQset *) * numOfAthread);
for (int i = 0; i < numOfAthread; i++) {
multiQ->qhandle[i] = taosOpenQueue();
multiQ->qset[i] = taosOpenQset();
taosAddIntoQset(multiQ->qset[i], multiQ->qhandle[i], NULL);
}
TThread *threads = taosMemoryMalloc(sizeof(TThread) * numOfAthread);
for (int i = 0; i < numOfAthread; i++) {
threads[i].idx = i;
taosThreadCreate(&(threads[i].thread), NULL, processShellMsg, (void *)&threads[i]);
}
// qhandle = taosOpenQueue();
// qset = taosOpenQset();
// taosAddIntoQset(qset, qhandle, NULL);
// processShellMsg();
if (pDataFile != NULL) { if (pDataFile != NULL) {
taosCloseFile(&pDataFile); taosCloseFile(&pDataFile);
taosRemoveFile(dataName); taosRemoveFile(dataName);
} }
int ch = getchar();
UNUSED(ch);
return 0; return 0;
} }