diff --git a/source/libs/transport/src/transport.c b/source/libs/transport/src/transport.c index 93bbaf2820..6cc2ca8c49 100644 --- a/source/libs/transport/src/transport.c +++ b/source/libs/transport/src/transport.c @@ -101,6 +101,13 @@ typedef struct SThreadObj { void* shandle; } SThreadObj; +typedef struct SClientObj { + char label[TSDB_LABEL_LEN]; + int32_t index; + int numOfThreads; + SThreadObj** pThreadObj; +} SClientObj; + #define RPC_MSG_OVERHEAD (sizeof(SRpcReqContext) + sizeof(SRpcHead) + sizeof(SRpcDigest)) #define rpcHeadFromCont(cont) ((SRpcHead*)((char*)cont - sizeof(SRpcHead))) #define rpcContFromHead(msg) (msg + sizeof(SRpcHead)) @@ -113,7 +120,7 @@ typedef struct SServerObj { uv_tcp_t server; uv_loop_t* loop; int workerIdx; - int numOfThread; + int numOfThreads; SThreadObj** pThreadObj; uv_pipe_t** pipe; uint32_t ip; @@ -179,18 +186,37 @@ static void* acceptThread(void* arg); void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle); void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle); +void* (*taosHandle[])(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) = {taosInitServer, taosInitClient}; + +void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) { + SClientObj* cli = calloc(1, sizeof(SClientObj)); + memcpy(cli->label, label, strlen(label)); + cli->numOfThreads = numOfThreads; + cli->pThreadObj = (SThreadObj**)calloc(cli->numOfThreads, sizeof(SThreadObj*)); + + for (int i = 0; i < cli->numOfThreads; i++) { + SThreadObj* thrd = (SThreadObj*)calloc(1, sizeof(SThreadObj)); + + int err = pthread_create(&thrd->thread, NULL, workerThread, (void*)(thrd)); + if (err == 0) { + tDebug("sucess to create tranport-client thread %d", i); + } + } + return cli; +} + void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) { SServerObj* srv = calloc(1, sizeof(SServerObj)); srv->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t)); - srv->numOfThread = numOfThreads; + srv->numOfThreads = numOfThreads; srv->workerIdx = 0; - srv->pThreadObj = (SThreadObj**)calloc(srv->numOfThread, sizeof(SThreadObj*)); - srv->pipe = (uv_pipe_t**)calloc(srv->numOfThread, sizeof(uv_pipe_t*)); + srv->pThreadObj = (SThreadObj**)calloc(srv->numOfThreads, sizeof(SThreadObj*)); + srv->pipe = (uv_pipe_t**)calloc(srv->numOfThreads, sizeof(uv_pipe_t*)); srv->ip = ip; srv->port = port; uv_loop_init(srv->loop); - for (int i = 0; i < srv->numOfThread; i++) { + for (int i = 0; i < srv->numOfThreads; i++) { SThreadObj* thrd = (SThreadObj*)calloc(1, sizeof(SThreadObj)); srv->pipe[i] = (uv_pipe_t*)calloc(2, sizeof(uv_pipe_t)); int fds[2]; @@ -299,8 +325,7 @@ static int uvAuthMsg(SRpcConn* pConn, char* msg, int len) { if (!rpcIsReq(pHead->msgType)) { // for response, if code is auth failure, it shall bypass the auth process code = htonl(pHead->code); - if (code == TSDB_CODE_RPC_INVALID_TIME_STAMP || code == TSDB_CODE_RPC_AUTH_FAILURE || - code == TSDB_CODE_RPC_INVALID_VERSION || code == TSDB_CODE_RPC_AUTH_REQUIRED || + if (code == TSDB_CODE_RPC_INVALID_TIME_STAMP || code == TSDB_CODE_RPC_AUTH_FAILURE || code == TSDB_CODE_RPC_INVALID_VERSION || code == TSDB_CODE_RPC_AUTH_REQUIRED || code == TSDB_CODE_MND_USER_NOT_EXIST || code == TSDB_CODE_RPC_NOT_READY) { pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen); // tTrace("%s, dont check authentication since code is:0x%x", pConn->info, code); @@ -460,7 +485,7 @@ void uvOnAcceptCb(uv_stream_t* stream, int status) { uv_buf_t buf = uv_buf_init((char*)notify, strlen(notify)); - pObj->workerIdx = (pObj->workerIdx + 1) % pObj->numOfThread; + pObj->workerIdx = (pObj->workerIdx + 1) % pObj->numOfThreads; tDebug("new conntion accepted by main server, dispatch to %dth worker-thread", pObj->workerIdx); uv_write2(wr, (uv_stream_t*)&(pObj->pipe[pObj->workerIdx][0]), &buf, 1, (uv_stream_t*)cli, uvOnWriteCb); } else { @@ -587,7 +612,9 @@ void* rpcOpen(const SRpcInit* pInit) { tstrncpy(pRpc->label, pInit->label, strlen(pInit->label)); } pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads; - pRpc->tcphandle = taosInitServer(0, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc); + pRpc->connType = pInit->connType; + pRpc->tcphandle = (*taosHandle[pRpc->connType])(0, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc); + // pRpc->taosInitServer(0, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc); return pRpc; } void rpcClose(void* arg) { return; }