fix rpc code
This commit is contained in:
parent
46d4bf90fd
commit
607c042a2f
|
@ -396,6 +396,7 @@ typedef struct SDelayQueue {
|
|||
int transDQCreate(uv_loop_t* loop, SDelayQueue** queue);
|
||||
void transDQDestroy(SDelayQueue* queue, void (*freeFunc)(void* arg));
|
||||
SDelayTask* transDQSched(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_t timeoutMs);
|
||||
void transDQCancel(SDelayQueue* queue, SDelayTask* task);
|
||||
|
||||
bool transEpSetIsEqual(SEpSet* a, SEpSet* b);
|
||||
/*
|
||||
|
|
|
@ -37,6 +37,7 @@ typedef struct SCliConn {
|
|||
char* ip;
|
||||
uint32_t port;
|
||||
|
||||
SDelayTask* task;
|
||||
// debug and log info
|
||||
struct sockaddr_in addr;
|
||||
struct sockaddr_in localAddr;
|
||||
|
@ -65,6 +66,7 @@ typedef struct SCliThrd {
|
|||
queue msg;
|
||||
TdThreadMutex msgMtx;
|
||||
SDelayQueue* delayQueue;
|
||||
SDelayQueue* timeoutQueue;
|
||||
uint64_t nextTimeout; // next timeout
|
||||
void* pTransInst; //
|
||||
|
||||
|
@ -92,6 +94,7 @@ static void* createConnPool(int size);
|
|||
static void* destroyConnPool(void* pool);
|
||||
static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port);
|
||||
static void addConnToPool(void* pool, SCliConn* conn);
|
||||
static void doCloseIdleConn(void* param);
|
||||
|
||||
// register timer in each thread to clear expire conn
|
||||
static void cliTimeoutCb(uv_timer_t* handle);
|
||||
|
@ -184,7 +187,7 @@ static void cliReleaseUnfinishedMsg(SCliConn* conn) {
|
|||
pThrd = (SCliThrd*)(exh)->pThrd; \
|
||||
} \
|
||||
} while (0)
|
||||
#define CONN_PERSIST_TIME(para) (para * 1000 * 10)
|
||||
#define CONN_PERSIST_TIME(para) (para * 20)
|
||||
#define CONN_GET_HOST_THREAD(conn) (conn ? ((SCliConn*)conn)->hostThrd : NULL)
|
||||
#define CONN_GET_INST_LABEL(conn) (((STrans*)(((SCliThrd*)(conn)->hostThrd)->pTransInst))->label)
|
||||
#define CONN_SHOULD_RELEASE(conn, head) \
|
||||
|
@ -506,6 +509,10 @@ static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) {
|
|||
QUEUE_REMOVE(&conn->q);
|
||||
QUEUE_INIT(&conn->q);
|
||||
assert(h == &conn->q);
|
||||
|
||||
transDQCancel(((SCliThrd*)conn->hostThrd)->timeoutQueue, conn->task);
|
||||
conn->task = NULL;
|
||||
|
||||
return conn;
|
||||
}
|
||||
static int32_t allocConnRef(SCliConn* conn, bool update) {
|
||||
|
@ -537,6 +544,7 @@ static int32_t specifyConnRef(SCliConn* conn, bool update, int64_t handle) {
|
|||
transReleaseExHandle(transGetRefMgt(), handle);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void addConnToPool(void* pool, SCliConn* conn) {
|
||||
if (conn->status == ConnInPool) {
|
||||
return;
|
||||
|
@ -562,7 +570,14 @@ static void addConnToPool(void* pool, SCliConn* conn) {
|
|||
assert(plist != NULL);
|
||||
QUEUE_INIT(&conn->q);
|
||||
QUEUE_PUSH(&plist->conn, &conn->q);
|
||||
|
||||
assert(!QUEUE_IS_EMPTY(&plist->conn));
|
||||
|
||||
STaskArg* arg = taosMemoryCalloc(1, sizeof(STaskArg));
|
||||
arg->param1 = conn;
|
||||
arg->param2 = thrd;
|
||||
|
||||
conn->task = transDQSched(thrd->timeoutQueue, doCloseIdleConn, arg, CONN_PERSIST_TIME(pTransInst->idleTime));
|
||||
}
|
||||
static void cliAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
|
||||
SCliConn* conn = handle->data;
|
||||
|
@ -631,6 +646,8 @@ static void cliDestroyConn(SCliConn* conn, bool clear) {
|
|||
transRemoveExHandle(transGetRefMgt(), conn->refId);
|
||||
conn->refId = -1;
|
||||
|
||||
if (conn->task != NULL) transDQCancel(((SCliThrd*)conn->hostThrd)->timeoutQueue, conn->task);
|
||||
|
||||
if (clear) {
|
||||
if (!uv_is_closing((uv_handle_t*)conn->stream)) {
|
||||
uv_read_stop(conn->stream);
|
||||
|
@ -997,6 +1014,8 @@ static SCliThrd* createThrdObj() {
|
|||
pThrd->pool = createConnPool(4);
|
||||
transDQCreate(pThrd->loop, &pThrd->delayQueue);
|
||||
|
||||
transDQCreate(pThrd->loop, &pThrd->timeoutQueue);
|
||||
|
||||
pThrd->quit = false;
|
||||
return pThrd;
|
||||
}
|
||||
|
@ -1012,6 +1031,7 @@ static void destroyThrdObj(SCliThrd* pThrd) {
|
|||
transAsyncPoolDestroy(pThrd->asyncPool);
|
||||
|
||||
transDQDestroy(pThrd->delayQueue, destroyCmsg);
|
||||
transDQDestroy(pThrd->timeoutQueue, NULL);
|
||||
taosMemoryFree(pThrd->loop);
|
||||
taosMemoryFree(pThrd);
|
||||
}
|
||||
|
@ -1058,6 +1078,9 @@ static void doCloseIdleConn(void* param) {
|
|||
STaskArg* arg = param;
|
||||
SCliConn* conn = arg->param1;
|
||||
SCliThrd* pThrd = arg->param2;
|
||||
|
||||
cliDestroyConn(conn, true);
|
||||
taosMemoryFree(arg);
|
||||
}
|
||||
|
||||
static void cliSchedMsgToNextNode(SCliMsg* pMsg, SCliThrd* pThrd) {
|
||||
|
@ -1248,11 +1271,17 @@ int transReleaseCliHandle(void* handle) {
|
|||
if (pThrd == NULL) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
STransMsg tmsg = {.info.handle = handle};
|
||||
SCliMsg* cmsg = taosMemoryCalloc(1, sizeof(SCliMsg));
|
||||
TRACE_SET_MSGID(&tmsg.info.traceId, tGenIdPI64());
|
||||
|
||||
SCliMsg* cmsg = taosMemoryCalloc(1, sizeof(SCliMsg));
|
||||
cmsg->msg = tmsg;
|
||||
cmsg->type = Release;
|
||||
|
||||
STraceId* trace = &tmsg.info.traceId;
|
||||
tGDebug("send release request at thread:%08" PRId64 "", pThrd->pid);
|
||||
|
||||
if (0 != transAsyncSend(pThrd->asyncPool, &cmsg->q)) {
|
||||
return -1;
|
||||
}
|
||||
|
|
|
@ -480,7 +480,7 @@ void transDQDestroy(SDelayQueue* queue, void (*freeFunc)(void* arg)) {
|
|||
SDelayTask* task = container_of(minNode, SDelayTask, node);
|
||||
|
||||
STaskArg* arg = task->arg;
|
||||
freeFunc(arg->param1);
|
||||
if (freeFunc) freeFunc(arg->param1);
|
||||
taosMemoryFree(arg);
|
||||
|
||||
taosMemoryFree(task);
|
||||
|
@ -491,9 +491,16 @@ void transDQDestroy(SDelayQueue* queue, void (*freeFunc)(void* arg)) {
|
|||
void transDQCancel(SDelayQueue* queue, SDelayTask* task) {
|
||||
uv_timer_stop(queue->timer);
|
||||
|
||||
if (heapSize(queue->heap) <= 0) return;
|
||||
if (heapSize(queue->heap) <= 0) {
|
||||
taosMemoryFree(task->arg);
|
||||
taosMemoryFree(task);
|
||||
return;
|
||||
}
|
||||
heapRemove(queue->heap, &task->node);
|
||||
|
||||
taosMemoryFree(task->arg);
|
||||
taosMemoryFree(task);
|
||||
|
||||
if (heapSize(queue->heap) != 0) {
|
||||
HeapNode* minNode = heapMin(queue->heap);
|
||||
if (minNode != NULL) return;
|
||||
|
|
|
@ -149,32 +149,31 @@ static void* transAcceptThread(void* arg);
|
|||
static bool addHandleToWorkloop(SWorkThrd* pThrd, char* pipeName);
|
||||
static bool addHandleToAcceptloop(void* arg);
|
||||
|
||||
#define CONN_SHOULD_RELEASE(conn, head) \
|
||||
do { \
|
||||
if ((head)->release == 1 && (head->msgLen) == sizeof(*head)) { \
|
||||
conn->status = ConnRelease; \
|
||||
transClearBuffer(&conn->readBuf); \
|
||||
transFreeMsg(transContFromHead((char*)head)); \
|
||||
tTrace("conn %p received release request", conn); \
|
||||
\
|
||||
STransMsg tmsg = {.code = 0, .info.handle = (void*)conn, .info.ahandle = NULL}; \
|
||||
SSvrMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSvrMsg)); \
|
||||
srvMsg->msg = tmsg; \
|
||||
srvMsg->type = Release; \
|
||||
srvMsg->pConn = conn; \
|
||||
reallocConnRef(conn); \
|
||||
if (!transQueuePush(&conn->srvMsgs, srvMsg)) { \
|
||||
return; \
|
||||
} \
|
||||
if (conn->regArg.init) { \
|
||||
tTrace("conn %p release, notify server app", conn); \
|
||||
STrans* pTransInst = conn->pTransInst; \
|
||||
(*pTransInst->cfp)(pTransInst->parent, &(conn->regArg.msg), NULL); \
|
||||
memset(&conn->regArg, 0, sizeof(conn->regArg)); \
|
||||
} \
|
||||
uvStartSendRespInternal(srvMsg); \
|
||||
return; \
|
||||
} \
|
||||
#define CONN_SHOULD_RELEASE(conn, head) \
|
||||
do { \
|
||||
if ((head)->release == 1 && (head->msgLen) == sizeof(*head)) { \
|
||||
conn->status = ConnRelease; \
|
||||
transClearBuffer(&conn->readBuf); \
|
||||
transFreeMsg(transContFromHead((char*)head)); \
|
||||
tTrace("conn %p received release request", conn); \
|
||||
STransMsg tmsg = {.code = 0, .info.handle = (void*)conn, .info.traceId = head->traceId, .info.ahandle = NULL}; \
|
||||
SSvrMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSvrMsg)); \
|
||||
srvMsg->msg = tmsg; \
|
||||
srvMsg->type = Release; \
|
||||
srvMsg->pConn = conn; \
|
||||
reallocConnRef(conn); \
|
||||
if (!transQueuePush(&conn->srvMsgs, srvMsg)) { \
|
||||
return; \
|
||||
} \
|
||||
if (conn->regArg.init) { \
|
||||
tTrace("conn %p release, notify server app", conn); \
|
||||
STrans* pTransInst = conn->pTransInst; \
|
||||
(*pTransInst->cfp)(pTransInst->parent, &(conn->regArg.msg), NULL); \
|
||||
memset(&conn->regArg, 0, sizeof(conn->regArg)); \
|
||||
} \
|
||||
uvStartSendRespInternal(srvMsg); \
|
||||
return; \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
#define SRV_RELEASE_UV(loop) \
|
||||
|
|
Loading…
Reference in New Issue