Merge pull request #11923 from taosdata/fix/TD-15127

fix(rpc): fix duplicat port error
This commit is contained in:
Yihao Deng 2022-04-27 18:49:30 +08:00 committed by GitHub
commit 795f2a18d4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 18 additions and 9 deletions

View File

@ -62,6 +62,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_APP_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x0014) #define TSDB_CODE_APP_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x0014)
#define TSDB_CODE_RPC_FQDN_ERROR TAOS_DEF_ERROR_CODE(0, 0x0015) #define TSDB_CODE_RPC_FQDN_ERROR TAOS_DEF_ERROR_CODE(0, 0x0015)
#define TSDB_CODE_RPC_INVALID_VERSION TAOS_DEF_ERROR_CODE(0, 0x0016) #define TSDB_CODE_RPC_INVALID_VERSION TAOS_DEF_ERROR_CODE(0, 0x0016)
#define TSDB_CODE_RPC_PORT_EADDRINUSE TAOS_DEF_ERROR_CODE(0, 0x0017)
//common & util //common & util
#define TSDB_CODE_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0100) #define TSDB_CODE_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0100)

View File

@ -49,6 +49,10 @@ void* rpcOpen(const SRpcInit* pInit) {
pRpc->connType = pInit->connType; pRpc->connType = pInit->connType;
pRpc->idleTime = pInit->idleTime; pRpc->idleTime = pInit->idleTime;
pRpc->tcphandle = (*taosInitHandle[pRpc->connType])(0, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc); pRpc->tcphandle = (*taosInitHandle[pRpc->connType])(0, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc);
if (pRpc->tcphandle == NULL) {
taosMemoryFree(pRpc);
return NULL;
}
pRpc->parent = pInit->parent; pRpc->parent = pInit->parent;
if (pInit->user) { if (pInit->user) {
memcpy(pRpc->user, pInit->user, strlen(pInit->user)); memcpy(pRpc->user, pInit->user, strlen(pInit->user));

View File

@ -912,7 +912,6 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
cliDestroy((uv_handle_t*)pConn->stream); cliDestroy((uv_handle_t*)pConn->stream);
return -1; return -1;
} }
} else if (pCtx->retryCount < TRANS_RETRY_COUNT_LIMIT) { } else if (pCtx->retryCount < TRANS_RETRY_COUNT_LIMIT) {
if (pResp->contLen == 0) { if (pResp->contLen == 0) {
pEpSet->inUse = (pEpSet->inUse++) % pEpSet->numOfEps; pEpSet->inUse = (pEpSet->inUse++) % pEpSet->numOfEps;

View File

@ -93,6 +93,8 @@ typedef struct SServerObj {
uint32_t ip; uint32_t ip;
uint32_t port; uint32_t port;
uv_async_t* pAcceptAsync; // just to quit from from accept thread uv_async_t* pAcceptAsync; // just to quit from from accept thread
bool inited;
} SServerObj; } SServerObj;
// handle // handle
@ -143,7 +145,7 @@ static void (*transAsyncHandle[])(SSrvMsg* msg, SWorkThrdObj* thrd) = {uvHandleR
static int32_t exHandlesMgt; static int32_t exHandlesMgt;
void uvInitExHandleMgt(); void uvInitEnv();
void uvOpenExHandleMgt(int size); void uvOpenExHandleMgt(int size);
void uvCloseExHandleMgt(); void uvCloseExHandleMgt();
int64_t uvAddExHandle(void* p); int64_t uvAddExHandle(void* p);
@ -716,6 +718,7 @@ static bool addHandleToAcceptloop(void* arg) {
} }
if ((err = uv_listen((uv_stream_t*)&srv->server, 512, uvOnAcceptCb)) != 0) { if ((err = uv_listen((uv_stream_t*)&srv->server, 512, uvOnAcceptCb)) != 0) {
tError("failed to listen: %s", uv_err_name(err)); tError("failed to listen: %s", uv_err_name(err));
terrno = TSDB_CODE_RPC_PORT_EADDRINUSE;
return false; return false;
} }
return true; return true;
@ -800,7 +803,7 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
srv->port = port; srv->port = port;
uv_loop_init(srv->loop); uv_loop_init(srv->loop);
taosThreadOnce(&transModuleInit, uvInitExHandleMgt); taosThreadOnce(&transModuleInit, uvInitEnv);
transSrvInst++; transSrvInst++;
for (int i = 0; i < srv->numOfThreads; i++) { for (int i = 0; i < srv->numOfThreads; i++) {
@ -844,15 +847,15 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
goto End; goto End;
// clear all resource later // clear all resource later
} }
srv->inited = true;
return srv; return srv;
End: End:
transCloseServer(srv); transCloseServer(srv);
return NULL; return NULL;
} }
void uvInitExHandleMgt() { void uvInitEnv() {
// init exhandle mgt uv_os_setenv("UV_TCP_SINGLE_ACCEPT", "1");
uvOpenExHandleMgt(10000); uvOpenExHandleMgt(10000);
} }
void uvOpenExHandleMgt(int size) { void uvOpenExHandleMgt(int size) {
@ -958,9 +961,10 @@ void transCloseServer(void* arg) {
SServerObj* srv = arg; SServerObj* srv = arg;
tDebug("send quit msg to accept thread"); tDebug("send quit msg to accept thread");
uv_async_send(srv->pAcceptAsync); if (srv->inited) {
taosThreadJoin(srv->thread, NULL); uv_async_send(srv->pAcceptAsync);
taosThreadJoin(srv->thread, NULL);
}
SRV_RELEASE_UV(srv->loop); SRV_RELEASE_UV(srv->loop);
for (int i = 0; i < srv->numOfThreads; i++) { for (int i = 0; i < srv->numOfThreads; i++) {

View File

@ -68,6 +68,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_RPC_INVALID_TIME_STAMP, "Client and server's t
TAOS_DEFINE_ERROR(TSDB_CODE_APP_NOT_READY, "Database not ready") TAOS_DEFINE_ERROR(TSDB_CODE_APP_NOT_READY, "Database not ready")
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_FQDN_ERROR, "Unable to resolve FQDN") TAOS_DEFINE_ERROR(TSDB_CODE_RPC_FQDN_ERROR, "Unable to resolve FQDN")
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_INVALID_VERSION, "Invalid app version") TAOS_DEFINE_ERROR(TSDB_CODE_RPC_INVALID_VERSION, "Invalid app version")
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_PORT_EADDRINUSE, "port already in use")
//common & util //common & util
TAOS_DEFINE_ERROR(TSDB_CODE_OUT_OF_MEMORY, "Out of Memory") TAOS_DEFINE_ERROR(TSDB_CODE_OUT_OF_MEMORY, "Out of Memory")