stop cli/srv gracefully
This commit is contained in:
parent
d6e8a0ede1
commit
19517077b3
|
@ -526,6 +526,7 @@ static void destroyThrdObj(SCliThrdObj* pThrd) {
|
||||||
if (pThrd == NULL) {
|
if (pThrd == NULL) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
uv_stop(pThrd->loop);
|
||||||
pthread_join(pThrd->thread, NULL);
|
pthread_join(pThrd->thread, NULL);
|
||||||
pthread_mutex_destroy(&pThrd->msgMtx);
|
pthread_mutex_destroy(&pThrd->msgMtx);
|
||||||
free(pThrd->cliAsync);
|
free(pThrd->cliAsync);
|
||||||
|
|
|
@ -70,6 +70,7 @@ typedef struct SServerObj {
|
||||||
uv_pipe_t** pipe;
|
uv_pipe_t** pipe;
|
||||||
uint32_t ip;
|
uint32_t ip;
|
||||||
uint32_t port;
|
uint32_t port;
|
||||||
|
uv_async_t* pAcceptAsync; // just to quit from from accept thread
|
||||||
} SServerObj;
|
} SServerObj;
|
||||||
|
|
||||||
static const char* notify = "a";
|
static const char* notify = "a";
|
||||||
|
@ -88,9 +89,11 @@ static void uvOnPipeWriteCb(uv_write_t* req, int status);
|
||||||
static void uvOnAcceptCb(uv_stream_t* stream, int status);
|
static void uvOnAcceptCb(uv_stream_t* stream, int status);
|
||||||
static void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf);
|
static void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf);
|
||||||
static void uvWorkerAsyncCb(uv_async_t* handle);
|
static void uvWorkerAsyncCb(uv_async_t* handle);
|
||||||
|
static void uvAcceptAsyncCb(uv_async_t* handle);
|
||||||
|
|
||||||
static void uvPrepareSendData(SSrvMsg* msg, uv_buf_t* wb);
|
static void uvPrepareSendData(SSrvMsg* msg, uv_buf_t* wb);
|
||||||
static void uvStartSendResp(SSrvMsg* msg);
|
static void uvStartSendResp(SSrvMsg* msg);
|
||||||
|
|
||||||
static void destroySmsg(SSrvMsg* smsg);
|
static void destroySmsg(SSrvMsg* smsg);
|
||||||
// check whether already read complete packet
|
// check whether already read complete packet
|
||||||
static bool readComplete(SConnBuffer* buf);
|
static bool readComplete(SConnBuffer* buf);
|
||||||
|
@ -389,7 +392,13 @@ void uvWorkerAsyncCb(uv_async_t* handle) {
|
||||||
tError("except occurred, continue");
|
tError("except occurred, continue");
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
if (msg->pConn == NULL) {
|
||||||
|
//
|
||||||
|
free(msg);
|
||||||
|
uv_stop(pThrd->loop);
|
||||||
|
} else {
|
||||||
uvStartSendResp(msg);
|
uvStartSendResp(msg);
|
||||||
|
}
|
||||||
// uv_buf_t wb;
|
// uv_buf_t wb;
|
||||||
// uvPrepareSendData(msg, &wb);
|
// uvPrepareSendData(msg, &wb);
|
||||||
// uv_timer_stop(conn->pTimer);
|
// uv_timer_stop(conn->pTimer);
|
||||||
|
@ -397,6 +406,10 @@ void uvWorkerAsyncCb(uv_async_t* handle) {
|
||||||
// uv_write(conn->pWriter, (uv_stream_t*)conn->pTcp, &wb, 1, uvOnWriteCb);
|
// uv_write(conn->pWriter, (uv_stream_t*)conn->pTcp, &wb, 1, uvOnWriteCb);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
static void uvAcceptAsyncCb(uv_async_t* async) {
|
||||||
|
SServerObj* srv = async->data;
|
||||||
|
uv_stop(srv->loop);
|
||||||
|
}
|
||||||
|
|
||||||
void uvOnAcceptCb(uv_stream_t* stream, int status) {
|
void uvOnAcceptCb(uv_stream_t* stream, int status) {
|
||||||
if (status == -1) {
|
if (status == -1) {
|
||||||
|
@ -517,8 +530,12 @@ static bool addHandleToAcceptloop(void* arg) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
struct sockaddr_in bind_addr;
|
// register an async here to quit server gracefully
|
||||||
|
srv->pAcceptAsync = calloc(1, sizeof(uv_async_t));
|
||||||
|
uv_async_init(srv->loop, srv->pAcceptAsync, uvAcceptAsyncCb);
|
||||||
|
srv->pAcceptAsync->data = srv;
|
||||||
|
|
||||||
|
struct sockaddr_in bind_addr;
|
||||||
uv_ip4_addr("0.0.0.0", srv->port, &bind_addr);
|
uv_ip4_addr("0.0.0.0", srv->port, &bind_addr);
|
||||||
if ((err = uv_tcp_bind(&srv->server, (const struct sockaddr*)&bind_addr, 0)) != 0) {
|
if ((err = uv_tcp_bind(&srv->server, (const struct sockaddr*)&bind_addr, 0)) != 0) {
|
||||||
tError("failed to bind: %s", uv_err_name(err));
|
tError("failed to bind: %s", uv_err_name(err));
|
||||||
|
@ -646,24 +663,43 @@ void destroyWorkThrd(SWorkThrdObj* pThrd) {
|
||||||
if (pThrd == NULL) {
|
if (pThrd == NULL) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
uv_stop(pThrd->loop);
|
|
||||||
pthread_join(pThrd->thread, NULL);
|
pthread_join(pThrd->thread, NULL);
|
||||||
// free(srv->pipe[i]);
|
|
||||||
free(pThrd->loop);
|
free(pThrd->loop);
|
||||||
pthread_mutex_destroy(&pThrd->msgMtx);
|
free(pThrd->workerAsync);
|
||||||
free(pThrd);
|
free(pThrd);
|
||||||
}
|
}
|
||||||
|
void sendQuitToWorkThrd(SWorkThrdObj* pThrd) {
|
||||||
|
SSrvMsg* srvMsg = calloc(1, sizeof(SSrvMsg));
|
||||||
|
|
||||||
|
pthread_mutex_lock(&pThrd->msgMtx);
|
||||||
|
QUEUE_PUSH(&pThrd->msg, &srvMsg->q);
|
||||||
|
pthread_mutex_unlock(&pThrd->msgMtx);
|
||||||
|
tDebug("send quit msg to work thread");
|
||||||
|
|
||||||
|
uv_async_send(pThrd->workerAsync);
|
||||||
|
}
|
||||||
|
|
||||||
void taosCloseServer(void* arg) {
|
void taosCloseServer(void* arg) {
|
||||||
// impl later
|
// impl later
|
||||||
SServerObj* srv = arg;
|
SServerObj* srv = arg;
|
||||||
for (int i = 0; i < srv->numOfThreads; i++) {
|
for (int i = 0; i < srv->numOfThreads; i++) {
|
||||||
|
sendQuitToWorkThrd(srv->pThreadObj[i]);
|
||||||
destroyWorkThrd(srv->pThreadObj[i]);
|
destroyWorkThrd(srv->pThreadObj[i]);
|
||||||
}
|
}
|
||||||
uv_stop(srv->loop);
|
|
||||||
free(srv->loop);
|
tDebug("send quit msg to accept thread");
|
||||||
free(srv->pipe);
|
uv_async_send(srv->pAcceptAsync);
|
||||||
free(srv->pThreadObj);
|
|
||||||
pthread_join(srv->thread, NULL);
|
pthread_join(srv->thread, NULL);
|
||||||
|
|
||||||
|
free(srv->pThreadObj);
|
||||||
|
free(srv->pAcceptAsync);
|
||||||
|
free(srv->loop);
|
||||||
|
|
||||||
|
for (int i = 0; i < srv->numOfThreads; i++) {
|
||||||
|
free(srv->pipe[i]);
|
||||||
|
}
|
||||||
|
free(srv->pipe);
|
||||||
|
|
||||||
free(srv);
|
free(srv);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -29,7 +29,7 @@ class TransObj {
|
||||||
memset(&rpcInit, 0, sizeof(rpcInit));
|
memset(&rpcInit, 0, sizeof(rpcInit));
|
||||||
rpcInit.localPort = 0;
|
rpcInit.localPort = 0;
|
||||||
rpcInit.label = (char *)label;
|
rpcInit.label = (char *)label;
|
||||||
rpcInit.numOfThreads = 1;
|
rpcInit.numOfThreads = 5;
|
||||||
rpcInit.cfp = NULL;
|
rpcInit.cfp = NULL;
|
||||||
rpcInit.sessions = 100;
|
rpcInit.sessions = 100;
|
||||||
rpcInit.idleTime = 100;
|
rpcInit.idleTime = 100;
|
||||||
|
@ -37,12 +37,22 @@ class TransObj {
|
||||||
rpcInit.secret = (char *)secret;
|
rpcInit.secret = (char *)secret;
|
||||||
rpcInit.ckey = (char *)ckey;
|
rpcInit.ckey = (char *)ckey;
|
||||||
rpcInit.spi = 1;
|
rpcInit.spi = 1;
|
||||||
|
}
|
||||||
|
bool startCli() {
|
||||||
|
trans = NULL;
|
||||||
rpcInit.connType = TAOS_CONN_CLIENT;
|
rpcInit.connType = TAOS_CONN_CLIENT;
|
||||||
|
|
||||||
trans = rpcOpen(&rpcInit);
|
trans = rpcOpen(&rpcInit);
|
||||||
|
return trans != NULL ? true : false;
|
||||||
|
}
|
||||||
|
bool startSrv() {
|
||||||
|
trans = NULL;
|
||||||
|
rpcInit.connType = TAOS_CONN_SERVER;
|
||||||
|
trans = rpcOpen(&rpcInit);
|
||||||
|
return trans != NULL ? true : false;
|
||||||
}
|
}
|
||||||
bool stop() {
|
bool stop() {
|
||||||
rpcClose(trans);
|
rpcClose(trans);
|
||||||
|
trans = NULL;
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -63,4 +73,10 @@ class TransEnv : public ::testing::Test {
|
||||||
|
|
||||||
TransObj *tr = NULL;
|
TransObj *tr = NULL;
|
||||||
};
|
};
|
||||||
TEST_F(TransEnv, test_start_stop) { assert(tr->stop()); }
|
TEST_F(TransEnv, test_start_stop) {
|
||||||
|
assert(tr->startCli());
|
||||||
|
assert(tr->stop());
|
||||||
|
|
||||||
|
assert(tr->startSrv());
|
||||||
|
assert(tr->stop());
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue