fix(rpc): fix duplicat port error

This commit is contained in:
yihaoDeng 2022-04-27 12:45:28 +08:00
parent 3abf351ae0
commit 5c025c0006
4 changed files with 18 additions and 8 deletions

View File

@ -62,6 +62,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_APP_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x0014) #define TSDB_CODE_APP_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x0014)
#define TSDB_CODE_RPC_FQDN_ERROR TAOS_DEF_ERROR_CODE(0, 0x0015) #define TSDB_CODE_RPC_FQDN_ERROR TAOS_DEF_ERROR_CODE(0, 0x0015)
#define TSDB_CODE_RPC_INVALID_VERSION TAOS_DEF_ERROR_CODE(0, 0x0016) #define TSDB_CODE_RPC_INVALID_VERSION TAOS_DEF_ERROR_CODE(0, 0x0016)
#define TSDB_CODE_RPC_PORT_EADDRINUSE TAOS_DEF_ERROR_CODE(0, 0x0017)
//common & util //common & util
#define TSDB_CODE_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0100) #define TSDB_CODE_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0100)

View File

@ -49,6 +49,10 @@ void* rpcOpen(const SRpcInit* pInit) {
pRpc->connType = pInit->connType; pRpc->connType = pInit->connType;
pRpc->idleTime = pInit->idleTime; pRpc->idleTime = pInit->idleTime;
pRpc->tcphandle = (*taosInitHandle[pRpc->connType])(0, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc); pRpc->tcphandle = (*taosInitHandle[pRpc->connType])(0, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc);
if (pRpc->tcphandle == NULL) {
taosMemoryFree(pRpc);
return NULL;
}
pRpc->parent = pInit->parent; pRpc->parent = pInit->parent;
if (pInit->user) { if (pInit->user) {
memcpy(pRpc->user, pInit->user, strlen(pInit->user)); memcpy(pRpc->user, pInit->user, strlen(pInit->user));

View File

@ -93,6 +93,8 @@ typedef struct SServerObj {
uint32_t ip; uint32_t ip;
uint32_t port; uint32_t port;
uv_async_t* pAcceptAsync; // just to quit from from accept thread uv_async_t* pAcceptAsync; // just to quit from from accept thread
bool inited;
} SServerObj; } SServerObj;
// handle // handle
@ -143,7 +145,7 @@ static void (*transAsyncHandle[])(SSrvMsg* msg, SWorkThrdObj* thrd) = {uvHandleR
static int32_t exHandlesMgt; static int32_t exHandlesMgt;
void uvInitExHandleMgt(); void uvInitEnv();
void uvOpenExHandleMgt(int size); void uvOpenExHandleMgt(int size);
void uvCloseExHandleMgt(); void uvCloseExHandleMgt();
int64_t uvAddExHandle(void* p); int64_t uvAddExHandle(void* p);
@ -716,6 +718,7 @@ static bool addHandleToAcceptloop(void* arg) {
} }
if ((err = uv_listen((uv_stream_t*)&srv->server, 512, uvOnAcceptCb)) != 0) { if ((err = uv_listen((uv_stream_t*)&srv->server, 512, uvOnAcceptCb)) != 0) {
tError("failed to listen: %s", uv_err_name(err)); tError("failed to listen: %s", uv_err_name(err));
terrno = TSDB_CODE_RPC_PORT_EADDRINUSE;
return false; return false;
} }
return true; return true;
@ -800,7 +803,7 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
srv->port = port; srv->port = port;
uv_loop_init(srv->loop); uv_loop_init(srv->loop);
taosThreadOnce(&transModuleInit, uvInitExHandleMgt); taosThreadOnce(&transModuleInit, uvInitEnv);
transSrvInst++; transSrvInst++;
for (int i = 0; i < srv->numOfThreads; i++) { for (int i = 0; i < srv->numOfThreads; i++) {
@ -844,15 +847,15 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
goto End; goto End;
// clear all resource later // clear all resource later
} }
srv->inited = true;
return srv; return srv;
End: End:
transCloseServer(srv); transCloseServer(srv);
return NULL; return NULL;
} }
void uvInitExHandleMgt() { void uvInitEnv() {
// init exhandle mgt uv_os_setenv("UV_TCP_SINGLE_ACCEPT", "1");
uvOpenExHandleMgt(10000); uvOpenExHandleMgt(10000);
} }
void uvOpenExHandleMgt(int size) { void uvOpenExHandleMgt(int size) {
@ -958,9 +961,10 @@ void transCloseServer(void* arg) {
SServerObj* srv = arg; SServerObj* srv = arg;
tDebug("send quit msg to accept thread"); tDebug("send quit msg to accept thread");
if (srv->inited) {
uv_async_send(srv->pAcceptAsync); uv_async_send(srv->pAcceptAsync);
taosThreadJoin(srv->thread, NULL); taosThreadJoin(srv->thread, NULL);
}
SRV_RELEASE_UV(srv->loop); SRV_RELEASE_UV(srv->loop);
for (int i = 0; i < srv->numOfThreads; i++) { for (int i = 0; i < srv->numOfThreads; i++) {

View File

@ -68,6 +68,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_RPC_INVALID_TIME_STAMP, "Client and server's t
TAOS_DEFINE_ERROR(TSDB_CODE_APP_NOT_READY, "Database not ready") TAOS_DEFINE_ERROR(TSDB_CODE_APP_NOT_READY, "Database not ready")
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_FQDN_ERROR, "Unable to resolve FQDN") TAOS_DEFINE_ERROR(TSDB_CODE_RPC_FQDN_ERROR, "Unable to resolve FQDN")
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_INVALID_VERSION, "Invalid app version") TAOS_DEFINE_ERROR(TSDB_CODE_RPC_INVALID_VERSION, "Invalid app version")
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_PORT_EADDRINUSE, "port already in use")
//common & util //common & util
TAOS_DEFINE_ERROR(TSDB_CODE_OUT_OF_MEMORY, "Out of Memory") TAOS_DEFINE_ERROR(TSDB_CODE_OUT_OF_MEMORY, "Out of Memory")