handle except
This commit is contained in:
parent
524edc1552
commit
bc77e3c579
|
@ -228,8 +228,8 @@ typedef struct SConnBuffer {
|
||||||
typedef void (*AsyncCB)(uv_async_t* handle);
|
typedef void (*AsyncCB)(uv_async_t* handle);
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
void* pThrd;
|
void* pThrd;
|
||||||
queue qmsg;
|
queue qmsg;
|
||||||
TdThreadMutex mtx; // protect qmsg;
|
TdThreadMutex mtx; // protect qmsg;
|
||||||
} SAsyncItem;
|
} SAsyncItem;
|
||||||
|
|
||||||
|
@ -273,11 +273,52 @@ void transCloseClient(void* arg);
|
||||||
void transCloseServer(void* arg);
|
void transCloseServer(void* arg);
|
||||||
|
|
||||||
void transCtxInit(STransCtx* ctx);
|
void transCtxInit(STransCtx* ctx);
|
||||||
void transCtxDestroy(STransCtx* ctx);
|
void transCtxCleanup(STransCtx* ctx);
|
||||||
void transCtxClear(STransCtx* ctx);
|
void transCtxClear(STransCtx* ctx);
|
||||||
void transCtxMerge(STransCtx* dst, STransCtx* src);
|
void transCtxMerge(STransCtx* dst, STransCtx* src);
|
||||||
void* transCtxDumpVal(STransCtx* ctx, int32_t key);
|
void* transCtxDumpVal(STransCtx* ctx, int32_t key);
|
||||||
|
|
||||||
|
// queue sending msgs
|
||||||
|
typedef struct {
|
||||||
|
SArray* q;
|
||||||
|
void (*free)(void* arg);
|
||||||
|
} STransQueue;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* init queue
|
||||||
|
* note: queue'size is small, default 1
|
||||||
|
*/
|
||||||
|
void transQueueInit(STransQueue* queue, void (*free)(void* arg));
|
||||||
|
|
||||||
|
/*
|
||||||
|
* put arg into queue
|
||||||
|
* if queue'size > 1, return false; else return true
|
||||||
|
*/
|
||||||
|
bool transQueuePush(STransQueue* queue, void* arg);
|
||||||
|
/*
|
||||||
|
* pop head from queue
|
||||||
|
*/
|
||||||
|
|
||||||
|
void* transQueuePop(STransQueue* queue);
|
||||||
|
/*
|
||||||
|
* get head from queue
|
||||||
|
*/
|
||||||
|
void* transQueueGet(STransQueue* queue);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* queue empty or not
|
||||||
|
*/
|
||||||
|
|
||||||
|
bool transQueueEmpty(STransQueue* queue);
|
||||||
|
/*
|
||||||
|
* clear queue
|
||||||
|
*/
|
||||||
|
void transQueueClear(STransQueue* queue);
|
||||||
|
/*
|
||||||
|
* destroy queue
|
||||||
|
*/
|
||||||
|
void transQueueDestroy(STransQueue* queue);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -25,13 +25,14 @@ typedef struct SCliConn {
|
||||||
void* hostThrd;
|
void* hostThrd;
|
||||||
SConnBuffer readBuf;
|
SConnBuffer readBuf;
|
||||||
void* data;
|
void* data;
|
||||||
SArray* cliMsgs;
|
// SArray* cliMsgs;
|
||||||
queue conn;
|
STransQueue cliMsgs;
|
||||||
uint64_t expireTime;
|
queue conn;
|
||||||
int hThrdIdx;
|
uint64_t expireTime;
|
||||||
bool broken; // link broken or not
|
int hThrdIdx;
|
||||||
STransCtx ctx;
|
STransCtx ctx;
|
||||||
|
|
||||||
|
bool broken; // link broken or not
|
||||||
ConnStatus status; //
|
ConnStatus status; //
|
||||||
int release; // 1: release
|
int release; // 1: release
|
||||||
// spi configure
|
// spi configure
|
||||||
|
@ -56,14 +57,14 @@ typedef struct SCliMsg {
|
||||||
} SCliMsg;
|
} SCliMsg;
|
||||||
|
|
||||||
typedef struct SCliThrdObj {
|
typedef struct SCliThrdObj {
|
||||||
TdThread thread;
|
TdThread thread;
|
||||||
uv_loop_t* loop;
|
uv_loop_t* loop;
|
||||||
SAsyncPool* asyncPool;
|
SAsyncPool* asyncPool;
|
||||||
uv_timer_t timer;
|
uv_timer_t timer;
|
||||||
void* pool; // conn pool
|
void* pool; // conn pool
|
||||||
|
|
||||||
// msg queue
|
// msg queue
|
||||||
queue msg;
|
queue msg;
|
||||||
TdThreadMutex msgMtx;
|
TdThreadMutex msgMtx;
|
||||||
|
|
||||||
uint64_t nextTimeout; // next timeout
|
uint64_t nextTimeout; // next timeout
|
||||||
|
@ -181,12 +182,11 @@ static void destroyThrdObj(SCliThrdObj* pThrd);
|
||||||
static void* cliWorkThread(void* arg);
|
static void* cliWorkThread(void* arg);
|
||||||
|
|
||||||
bool cliMaySendCachedMsg(SCliConn* conn) {
|
bool cliMaySendCachedMsg(SCliConn* conn) {
|
||||||
if (taosArrayGetSize(conn->cliMsgs) > 0) {
|
if (!transQueueEmpty(&conn->cliMsgs)) {
|
||||||
cliSend(conn);
|
cliSend(conn);
|
||||||
return true;
|
return true;
|
||||||
} else {
|
|
||||||
return false;
|
|
||||||
}
|
}
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
void cliHandleResp(SCliConn* conn) {
|
void cliHandleResp(SCliConn* conn) {
|
||||||
SCliThrdObj* pThrd = conn->hostThrd;
|
SCliThrdObj* pThrd = conn->hostThrd;
|
||||||
|
@ -195,6 +195,7 @@ void cliHandleResp(SCliConn* conn) {
|
||||||
STransMsgHead* pHead = (STransMsgHead*)(conn->readBuf.buf);
|
STransMsgHead* pHead = (STransMsgHead*)(conn->readBuf.buf);
|
||||||
pHead->code = htonl(pHead->code);
|
pHead->code = htonl(pHead->code);
|
||||||
pHead->msgLen = htonl(pHead->msgLen);
|
pHead->msgLen = htonl(pHead->msgLen);
|
||||||
|
|
||||||
STransMsg transMsg = {0};
|
STransMsg transMsg = {0};
|
||||||
transMsg.contLen = transContLenFromMsg(pHead->msgLen);
|
transMsg.contLen = transContLenFromMsg(pHead->msgLen);
|
||||||
transMsg.pCont = transContFromHead((char*)pHead);
|
transMsg.pCont = transContFromHead((char*)pHead);
|
||||||
|
@ -204,11 +205,7 @@ void cliHandleResp(SCliConn* conn) {
|
||||||
|
|
||||||
CONN_SHOULD_RELEASE(conn, pHead);
|
CONN_SHOULD_RELEASE(conn, pHead);
|
||||||
|
|
||||||
SCliMsg* pMsg = NULL;
|
SCliMsg* pMsg = transQueuePop(&conn->cliMsgs);
|
||||||
if (taosArrayGetSize(conn->cliMsgs) > 0) {
|
|
||||||
pMsg = taosArrayGetP(conn->cliMsgs, 0);
|
|
||||||
taosArrayRemove(conn->cliMsgs, 0);
|
|
||||||
}
|
|
||||||
|
|
||||||
STransConnCtx* pCtx = pMsg ? pMsg->ctx : NULL;
|
STransConnCtx* pCtx = pMsg ? pMsg->ctx : NULL;
|
||||||
if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(conn)) {
|
if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(conn)) {
|
||||||
|
@ -264,7 +261,7 @@ _RETURN:
|
||||||
}
|
}
|
||||||
|
|
||||||
void cliHandleExcept(SCliConn* pConn) {
|
void cliHandleExcept(SCliConn* pConn) {
|
||||||
if (taosArrayGetSize(pConn->cliMsgs) == 0) {
|
if (transQueueEmpty(&pConn->cliMsgs)) {
|
||||||
if (pConn->broken == true || CONN_NO_PERSIST_BY_APP(pConn)) {
|
if (pConn->broken == true || CONN_NO_PERSIST_BY_APP(pConn)) {
|
||||||
transUnrefCliHandle(pConn);
|
transUnrefCliHandle(pConn);
|
||||||
return;
|
return;
|
||||||
|
@ -274,11 +271,7 @@ void cliHandleExcept(SCliConn* pConn) {
|
||||||
STrans* pTransInst = pThrd->pTransInst;
|
STrans* pTransInst = pThrd->pTransInst;
|
||||||
|
|
||||||
do {
|
do {
|
||||||
SCliMsg* pMsg = NULL;
|
SCliMsg* pMsg = transQueuePop(&pConn->cliMsgs);
|
||||||
if (taosArrayGetSize(pConn->cliMsgs) > 0) {
|
|
||||||
pMsg = taosArrayGetP(pConn->cliMsgs, 0);
|
|
||||||
taosArrayRemove(pConn->cliMsgs, 0);
|
|
||||||
}
|
|
||||||
|
|
||||||
STransConnCtx* pCtx = pMsg ? pMsg->ctx : NULL;
|
STransConnCtx* pCtx = pMsg ? pMsg->ctx : NULL;
|
||||||
|
|
||||||
|
@ -303,7 +296,7 @@ void cliHandleExcept(SCliConn* pConn) {
|
||||||
}
|
}
|
||||||
destroyCmsg(pMsg);
|
destroyCmsg(pMsg);
|
||||||
tTrace("%s cli conn %p start to destroy", CONN_GET_INST_LABEL(pConn), pConn);
|
tTrace("%s cli conn %p start to destroy", CONN_GET_INST_LABEL(pConn), pConn);
|
||||||
} while (taosArrayGetSize(pConn->cliMsgs) > 0);
|
} while (!transQueueEmpty(&pConn->cliMsgs));
|
||||||
|
|
||||||
transUnrefCliHandle(pConn);
|
transUnrefCliHandle(pConn);
|
||||||
}
|
}
|
||||||
|
@ -380,21 +373,20 @@ static void addConnToPool(void* pool, SCliConn* conn) {
|
||||||
SCliThrdObj* thrd = conn->hostThrd;
|
SCliThrdObj* thrd = conn->hostThrd;
|
||||||
CONN_HANDLE_THREAD_QUIT(thrd);
|
CONN_HANDLE_THREAD_QUIT(thrd);
|
||||||
|
|
||||||
char key[128] = {0};
|
STrans* pTransInst = ((SCliThrdObj*)conn->hostThrd)->pTransInst;
|
||||||
|
conn->expireTime = taosGetTimestampMs() + CONN_PERSIST_TIME(pTransInst->idleTime);
|
||||||
|
transCtxCleanup(&conn->ctx);
|
||||||
|
transQueueClear(&conn->cliMsgs);
|
||||||
|
conn->status = ConnNormal;
|
||||||
|
|
||||||
transCtxDestroy(&conn->ctx);
|
char key[128] = {0};
|
||||||
tstrncpy(key, conn->ip, strlen(conn->ip));
|
tstrncpy(key, conn->ip, strlen(conn->ip));
|
||||||
tstrncpy(key + strlen(key), (char*)(&conn->port), sizeof(conn->port));
|
tstrncpy(key + strlen(key), (char*)(&conn->port), sizeof(conn->port));
|
||||||
tTrace("cli conn %p added to conn pool, read buf cap: %d", conn, conn->readBuf.cap);
|
tTrace("cli conn %p added to conn pool, read buf cap: %d", conn, conn->readBuf.cap);
|
||||||
|
|
||||||
STrans* pTransInst = ((SCliThrdObj*)conn->hostThrd)->pTransInst;
|
|
||||||
|
|
||||||
conn->expireTime = taosGetTimestampMs() + CONN_PERSIST_TIME(pTransInst->idleTime);
|
|
||||||
SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key));
|
SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key));
|
||||||
conn->status = ConnNormal;
|
|
||||||
// list already create before
|
// list already create before
|
||||||
assert(plist != NULL);
|
assert(plist != NULL);
|
||||||
taosArrayClear(conn->cliMsgs);
|
|
||||||
QUEUE_PUSH(&plist->conn, &conn->conn);
|
QUEUE_PUSH(&plist->conn, &conn->conn);
|
||||||
assert(!QUEUE_IS_EMPTY(&plist->conn));
|
assert(!QUEUE_IS_EMPTY(&plist->conn));
|
||||||
}
|
}
|
||||||
|
@ -445,7 +437,8 @@ static SCliConn* cliCreateConn(SCliThrdObj* pThrd) {
|
||||||
|
|
||||||
conn->writeReq.data = conn;
|
conn->writeReq.data = conn;
|
||||||
conn->connReq.data = conn;
|
conn->connReq.data = conn;
|
||||||
conn->cliMsgs = taosArrayInit(2, sizeof(void*));
|
|
||||||
|
transQueueInit(&conn->cliMsgs, NULL);
|
||||||
QUEUE_INIT(&conn->conn);
|
QUEUE_INIT(&conn->conn);
|
||||||
conn->hostThrd = pThrd;
|
conn->hostThrd = pThrd;
|
||||||
conn->status = ConnNormal;
|
conn->status = ConnNormal;
|
||||||
|
@ -465,18 +458,18 @@ static void cliDestroy(uv_handle_t* handle) {
|
||||||
SCliConn* conn = handle->data;
|
SCliConn* conn = handle->data;
|
||||||
free(conn->ip);
|
free(conn->ip);
|
||||||
free(conn->stream);
|
free(conn->stream);
|
||||||
transCtxDestroy(&conn->ctx);
|
transCtxCleanup(&conn->ctx);
|
||||||
taosArrayDestroy(conn->cliMsgs);
|
transQueueDestroy(&conn->cliMsgs);
|
||||||
tTrace("%s cli conn %p destroy successfully", CONN_GET_INST_LABEL(conn), conn);
|
tTrace("%s cli conn %p destroy successfully", CONN_GET_INST_LABEL(conn), conn);
|
||||||
free(conn);
|
free(conn);
|
||||||
}
|
}
|
||||||
static bool cliHandleNoResp(SCliConn* conn) {
|
static bool cliHandleNoResp(SCliConn* conn) {
|
||||||
bool res = false;
|
bool res = false;
|
||||||
SArray* msgs = conn->cliMsgs;
|
if (!transQueueEmpty(&conn->cliMsgs)) {
|
||||||
if (taosArrayGetSize(msgs) > 0) {
|
SCliMsg* pMsg = transQueueGet(&conn->cliMsgs);
|
||||||
SCliMsg* pMsg = taosArrayGetP(msgs, 0);
|
|
||||||
if (REQUEST_NO_RESP(&pMsg->msg)) {
|
if (REQUEST_NO_RESP(&pMsg->msg)) {
|
||||||
taosArrayRemove(msgs, 0);
|
transQueuePop(&conn->cliMsgs);
|
||||||
|
// taosArrayRemove(msgs, 0);
|
||||||
destroyCmsg(pMsg);
|
destroyCmsg(pMsg);
|
||||||
res = true;
|
res = true;
|
||||||
}
|
}
|
||||||
|
@ -509,8 +502,9 @@ static void cliSendCb(uv_write_t* req, int status) {
|
||||||
void cliSend(SCliConn* pConn) {
|
void cliSend(SCliConn* pConn) {
|
||||||
CONN_HANDLE_BROKEN(pConn);
|
CONN_HANDLE_BROKEN(pConn);
|
||||||
|
|
||||||
assert(taosArrayGetSize(pConn->cliMsgs) > 0);
|
// assert(taosArrayGetSize(pConn->cliMsgs) > 0);
|
||||||
SCliMsg* pCliMsg = taosArrayGetP(pConn->cliMsgs, 0);
|
assert(!transQueueEmpty(&pConn->cliMsgs));
|
||||||
|
SCliMsg* pCliMsg = transQueueGet(&pConn->cliMsgs);
|
||||||
STransConnCtx* pCtx = pCliMsg->ctx;
|
STransConnCtx* pCtx = pCliMsg->ctx;
|
||||||
|
|
||||||
SCliThrdObj* pThrd = pConn->hostThrd;
|
SCliThrdObj* pThrd = pConn->hostThrd;
|
||||||
|
@ -600,9 +594,8 @@ static void cliHandleRelease(SCliMsg* pMsg, SCliThrdObj* pThrd) {
|
||||||
|
|
||||||
if (T_REF_VAL_GET(conn) == 2) {
|
if (T_REF_VAL_GET(conn) == 2) {
|
||||||
transUnrefCliHandle(conn);
|
transUnrefCliHandle(conn);
|
||||||
taosArrayPush(conn->cliMsgs, &pMsg);
|
if (!transQueuePush(&conn->cliMsgs, pMsg)) {
|
||||||
if (taosArrayGetSize(conn->cliMsgs) >= 2) {
|
return;
|
||||||
return; // send one by one
|
|
||||||
}
|
}
|
||||||
cliSend(conn);
|
cliSend(conn);
|
||||||
} else {
|
} else {
|
||||||
|
@ -643,17 +636,14 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
|
||||||
conn->hThrdIdx = pCtx->hThrdIdx;
|
conn->hThrdIdx = pCtx->hThrdIdx;
|
||||||
|
|
||||||
transCtxMerge(&conn->ctx, &pCtx->appCtx);
|
transCtxMerge(&conn->ctx, &pCtx->appCtx);
|
||||||
if (taosArrayGetSize(conn->cliMsgs) > 0) {
|
if (!transQueuePush(&conn->cliMsgs, pMsg)) {
|
||||||
taosArrayPush(conn->cliMsgs, &pMsg);
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
taosArrayPush(conn->cliMsgs, &pMsg);
|
|
||||||
transDestroyBuffer(&conn->readBuf);
|
transDestroyBuffer(&conn->readBuf);
|
||||||
cliSend(conn);
|
cliSend(conn);
|
||||||
} else {
|
} else {
|
||||||
conn = cliCreateConn(pThrd);
|
conn = cliCreateConn(pThrd);
|
||||||
taosArrayPush(conn->cliMsgs, &pMsg);
|
transQueuePush(&conn->cliMsgs, pMsg);
|
||||||
|
|
||||||
conn->hThrdIdx = pCtx->hThrdIdx;
|
conn->hThrdIdx = pCtx->hThrdIdx;
|
||||||
conn->ip = strdup(pMsg->ctx->ip);
|
conn->ip = strdup(pMsg->ctx->ip);
|
||||||
|
|
|
@ -228,7 +228,7 @@ void transCtxInit(STransCtx* ctx) {
|
||||||
// init transCtx
|
// init transCtx
|
||||||
ctx->args = taosHashInit(2, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UINT), true, HASH_NO_LOCK);
|
ctx->args = taosHashInit(2, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UINT), true, HASH_NO_LOCK);
|
||||||
}
|
}
|
||||||
void transCtxDestroy(STransCtx* ctx) {
|
void transCtxCleanup(STransCtx* ctx) {
|
||||||
if (ctx->args == NULL) {
|
if (ctx->args == NULL) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -276,4 +276,49 @@ void* transCtxDumpVal(STransCtx* ctx, int32_t key) {
|
||||||
return (void*)ret;
|
return (void*)ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void transQueueInit(STransQueue* queue, void (*free)(void* arg)) {
|
||||||
|
queue->q = taosArrayInit(2, sizeof(void*));
|
||||||
|
queue->free = free;
|
||||||
|
}
|
||||||
|
bool transQueuePush(STransQueue* queue, void* arg) {
|
||||||
|
taosArrayPush(queue->q, &arg);
|
||||||
|
if (taosArrayGetSize(queue->q) > 1) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
void* transQueuePop(STransQueue* queue) {
|
||||||
|
if (taosArrayGetSize(queue->q) == 0) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
void* ptr = taosArrayGetP(queue->q, 0);
|
||||||
|
taosArrayRemove(queue->q, 0);
|
||||||
|
return ptr;
|
||||||
|
}
|
||||||
|
|
||||||
|
void* transQueueGet(STransQueue* queue) {
|
||||||
|
if (taosArrayGetSize(queue->q) == 0) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
void* ptr = taosArrayGetP(queue->q, 0);
|
||||||
|
return ptr;
|
||||||
|
}
|
||||||
|
bool transQueueEmpty(STransQueue* queue) {
|
||||||
|
//
|
||||||
|
return taosArrayGetSize(queue->q) == 0;
|
||||||
|
}
|
||||||
|
void transQueueClear(STransQueue* queue) {
|
||||||
|
if (queue->free != NULL) {
|
||||||
|
for (int i = 0; i < taosArrayGetSize(queue->q); i++) {
|
||||||
|
void* p = taosArrayGetP(queue->q, i);
|
||||||
|
queue->free(p);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
taosArrayClear(queue->q);
|
||||||
|
}
|
||||||
|
void transQueueDestroy(STransQueue* queue) {
|
||||||
|
transQueueClear(queue);
|
||||||
|
taosArrayDestroy(queue->q);
|
||||||
|
}
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -37,7 +37,7 @@ typedef struct SSrvConn {
|
||||||
void* pTransInst; // rpc init
|
void* pTransInst; // rpc init
|
||||||
void* ahandle; //
|
void* ahandle; //
|
||||||
void* hostThrd;
|
void* hostThrd;
|
||||||
SArray* srvMsgs;
|
STransQueue srvMsgs;
|
||||||
|
|
||||||
SSrvRegArg regArg;
|
SSrvRegArg regArg;
|
||||||
bool broken; // conn broken;
|
bool broken; // conn broken;
|
||||||
|
@ -62,12 +62,12 @@ typedef struct SSrvMsg {
|
||||||
} SSrvMsg;
|
} SSrvMsg;
|
||||||
|
|
||||||
typedef struct SWorkThrdObj {
|
typedef struct SWorkThrdObj {
|
||||||
TdThread thread;
|
TdThread thread;
|
||||||
uv_pipe_t* pipe;
|
uv_pipe_t* pipe;
|
||||||
uv_os_fd_t fd;
|
uv_os_fd_t fd;
|
||||||
uv_loop_t* loop;
|
uv_loop_t* loop;
|
||||||
SAsyncPool* asyncPool;
|
SAsyncPool* asyncPool;
|
||||||
queue msg;
|
queue msg;
|
||||||
TdThreadMutex msgMtx;
|
TdThreadMutex msgMtx;
|
||||||
|
|
||||||
queue conn;
|
queue conn;
|
||||||
|
@ -76,7 +76,7 @@ typedef struct SWorkThrdObj {
|
||||||
} SWorkThrdObj;
|
} SWorkThrdObj;
|
||||||
|
|
||||||
typedef struct SServerObj {
|
typedef struct SServerObj {
|
||||||
TdThread thread;
|
TdThread thread;
|
||||||
uv_tcp_t server;
|
uv_tcp_t server;
|
||||||
uv_loop_t* loop;
|
uv_loop_t* loop;
|
||||||
|
|
||||||
|
@ -106,8 +106,7 @@ static const char* notify = "a";
|
||||||
srvMsg->msg = tmsg; \
|
srvMsg->msg = tmsg; \
|
||||||
srvMsg->type = Release; \
|
srvMsg->type = Release; \
|
||||||
srvMsg->pConn = conn; \
|
srvMsg->pConn = conn; \
|
||||||
taosArrayPush(conn->srvMsgs, &srvMsg); \
|
if (!transQueuePush(&conn->srvMsgs, srvMsg)) { \
|
||||||
if (taosArrayGetSize(conn->srvMsgs) > 1) { \
|
|
||||||
return; \
|
return; \
|
||||||
} \
|
} \
|
||||||
uvStartSendRespInternal(srvMsg); \
|
uvStartSendRespInternal(srvMsg); \
|
||||||
|
@ -271,20 +270,16 @@ void uvOnSendCb(uv_write_t* req, int status) {
|
||||||
transClearBuffer(&conn->readBuf);
|
transClearBuffer(&conn->readBuf);
|
||||||
if (status == 0) {
|
if (status == 0) {
|
||||||
tTrace("server conn %p data already was written on stream", conn);
|
tTrace("server conn %p data already was written on stream", conn);
|
||||||
if (conn->srvMsgs != NULL) {
|
if (!transQueueEmpty(&conn->srvMsgs)) {
|
||||||
assert(taosArrayGetSize(conn->srvMsgs) >= 1);
|
SSrvMsg* msg = transQueuePop(&conn->srvMsgs);
|
||||||
SSrvMsg* msg = taosArrayGetP(conn->srvMsgs, 0);
|
|
||||||
tTrace("server conn %p sending msg size: %d", conn, (int)taosArrayGetSize(conn->srvMsgs));
|
|
||||||
taosArrayRemove(conn->srvMsgs, 0);
|
|
||||||
if (msg->type == Release && conn->status != ConnNormal) {
|
if (msg->type == Release && conn->status != ConnNormal) {
|
||||||
conn->status = ConnNormal;
|
conn->status = ConnNormal;
|
||||||
transUnrefSrvHandle(conn);
|
transUnrefSrvHandle(conn);
|
||||||
}
|
}
|
||||||
destroySmsg(msg);
|
destroySmsg(msg);
|
||||||
// send second data, just use for push
|
// send second data, just use for push
|
||||||
if (taosArrayGetSize(conn->srvMsgs) > 0) {
|
if (!transQueueEmpty(&conn->srvMsgs)) {
|
||||||
tTrace("resent server conn %p sending msg size: %d", conn, (int)taosArrayGetSize(conn->srvMsgs));
|
msg = (SSrvMsg*)transQueueGet(&conn->srvMsgs);
|
||||||
msg = (SSrvMsg*)taosArrayGetP(conn->srvMsgs, 0);
|
|
||||||
if (msg->type == Register && conn->status == ConnAcquire) {
|
if (msg->type == Register && conn->status == ConnAcquire) {
|
||||||
conn->regArg.notifyCount = 0;
|
conn->regArg.notifyCount = 0;
|
||||||
conn->regArg.init = 1;
|
conn->regArg.init = 1;
|
||||||
|
@ -294,7 +289,7 @@ void uvOnSendCb(uv_write_t* req, int status) {
|
||||||
(pTransInst->cfp)(pTransInst->parent, &(conn->regArg.msg), NULL);
|
(pTransInst->cfp)(pTransInst->parent, &(conn->regArg.msg), NULL);
|
||||||
memset(&conn->regArg, 0, sizeof(conn->regArg));
|
memset(&conn->regArg, 0, sizeof(conn->regArg));
|
||||||
}
|
}
|
||||||
taosArrayRemove(conn->srvMsgs, 0);
|
transQueuePop(&conn->srvMsgs);
|
||||||
free(msg);
|
free(msg);
|
||||||
} else {
|
} else {
|
||||||
uvStartSendRespInternal(msg);
|
uvStartSendRespInternal(msg);
|
||||||
|
@ -373,10 +368,7 @@ static void uvStartSendResp(SSrvMsg* smsg) {
|
||||||
transUnrefSrvHandle(pConn);
|
transUnrefSrvHandle(pConn);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosArrayPush(pConn->srvMsgs, &smsg);
|
if (!transQueuePush(&pConn->srvMsgs, smsg)) {
|
||||||
if (taosArrayGetSize(pConn->srvMsgs) > 1) {
|
|
||||||
tDebug("server conn %p send data to client %s:%d, local info: %s:%d", pConn, taosInetNtoa(pConn->addr.sin_addr),
|
|
||||||
ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port));
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
uvStartSendRespInternal(smsg);
|
uvStartSendRespInternal(smsg);
|
||||||
|
@ -608,14 +600,15 @@ static SSrvConn* createConn(void* hThrd) {
|
||||||
QUEUE_INIT(&pConn->queue);
|
QUEUE_INIT(&pConn->queue);
|
||||||
|
|
||||||
QUEUE_PUSH(&pThrd->conn, &pConn->queue);
|
QUEUE_PUSH(&pThrd->conn, &pConn->queue);
|
||||||
pConn->srvMsgs = taosArrayInit(2, sizeof(void*)); //
|
|
||||||
tTrace("server conn %p created", pConn);
|
transQueueInit(&pConn->srvMsgs, NULL);
|
||||||
|
|
||||||
memset(&pConn->regArg, 0, sizeof(pConn->regArg));
|
memset(&pConn->regArg, 0, sizeof(pConn->regArg));
|
||||||
pConn->broken = false;
|
pConn->broken = false;
|
||||||
pConn->status = ConnNormal;
|
pConn->status = ConnNormal;
|
||||||
|
|
||||||
transRefSrvHandle(pConn);
|
transRefSrvHandle(pConn);
|
||||||
|
tTrace("server conn %p created", pConn);
|
||||||
return pConn;
|
return pConn;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -625,11 +618,7 @@ static void destroyConn(SSrvConn* conn, bool clear) {
|
||||||
}
|
}
|
||||||
transDestroyBuffer(&conn->readBuf);
|
transDestroyBuffer(&conn->readBuf);
|
||||||
|
|
||||||
for (int i = 0; i < taosArrayGetSize(conn->srvMsgs); i++) {
|
transQueueDestroy(&conn->srvMsgs);
|
||||||
SSrvMsg* msg = taosArrayGetP(conn->srvMsgs, i);
|
|
||||||
destroySmsg(msg);
|
|
||||||
}
|
|
||||||
conn->srvMsgs = taosArrayDestroy(conn->srvMsgs);
|
|
||||||
if (clear) {
|
if (clear) {
|
||||||
tTrace("server conn %p to be destroyed", conn);
|
tTrace("server conn %p to be destroyed", conn);
|
||||||
uv_shutdown_t* req = malloc(sizeof(uv_shutdown_t));
|
uv_shutdown_t* req = malloc(sizeof(uv_shutdown_t));
|
||||||
|
@ -724,8 +713,7 @@ void uvHandleRelease(SSrvMsg* msg, SWorkThrdObj* thrd) {
|
||||||
// release handle to rpc init
|
// release handle to rpc init
|
||||||
SSrvConn* conn = msg->pConn;
|
SSrvConn* conn = msg->pConn;
|
||||||
if (conn->status == ConnAcquire) {
|
if (conn->status == ConnAcquire) {
|
||||||
taosArrayPush(conn->srvMsgs, &msg);
|
if (!transQueuePush(&conn->srvMsgs, msg)) {
|
||||||
if (taosArrayGetSize(conn->srvMsgs) > 1) {
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
uvStartSendRespInternal(msg);
|
uvStartSendRespInternal(msg);
|
||||||
|
@ -744,8 +732,7 @@ void uvHandleRegister(SSrvMsg* msg, SWorkThrdObj* thrd) {
|
||||||
SSrvConn* conn = msg->pConn;
|
SSrvConn* conn = msg->pConn;
|
||||||
tDebug("server conn %p register brokenlink callback", conn);
|
tDebug("server conn %p register brokenlink callback", conn);
|
||||||
if (conn->status == ConnAcquire) {
|
if (conn->status == ConnAcquire) {
|
||||||
if (taosArrayGetSize(conn->srvMsgs) > 0) {
|
if (!transQueuePush(&conn->srvMsgs, msg)) {
|
||||||
taosArrayPush(conn->srvMsgs, &msg);
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
conn->regArg.notifyCount = 0;
|
conn->regArg.notifyCount = 0;
|
||||||
|
|
|
@ -144,7 +144,7 @@ class TransCtxEnv : public ::testing::Test {
|
||||||
// TODO
|
// TODO
|
||||||
}
|
}
|
||||||
virtual void TearDown() {
|
virtual void TearDown() {
|
||||||
transCtxDestroy(ctx);
|
transCtxCleanup(ctx);
|
||||||
// formate
|
// formate
|
||||||
}
|
}
|
||||||
STransCtx *ctx;
|
STransCtx *ctx;
|
||||||
|
|
Loading…
Reference in New Issue