Merge pull request #10279 from taosdata/feature/trans_impl
fix conn persist bug
This commit is contained in:
commit
784f14c8f6
|
@ -140,6 +140,7 @@ typedef struct {
|
||||||
SRpcMsg* pRsp; // for synchronous API
|
SRpcMsg* pRsp; // for synchronous API
|
||||||
tsem_t* pSem; // for synchronous API
|
tsem_t* pSem; // for synchronous API
|
||||||
|
|
||||||
|
int hThrdIdx;
|
||||||
char* ip;
|
char* ip;
|
||||||
uint32_t port;
|
uint32_t port;
|
||||||
// SEpSet* pSet; // for synchronous API
|
// SEpSet* pSet; // for synchronous API
|
||||||
|
|
|
@ -17,7 +17,7 @@
|
||||||
|
|
||||||
#include "transComm.h"
|
#include "transComm.h"
|
||||||
|
|
||||||
#define CONN_HOST_THREAD_INDEX(conn ? ((SCliConn*)conn)->hThrdIdx : -1)
|
#define CONN_HOST_THREAD_INDEX(conn) (conn ? ((SCliConn*)conn)->hThrdIdx : -1)
|
||||||
#define CONN_PERSIST_TIME(para) (para * 1000 * 10)
|
#define CONN_PERSIST_TIME(para) (para * 1000 * 10)
|
||||||
|
|
||||||
typedef struct SCliConn {
|
typedef struct SCliConn {
|
||||||
|
@ -29,8 +29,8 @@ typedef struct SCliConn {
|
||||||
void* data;
|
void* data;
|
||||||
queue conn;
|
queue conn;
|
||||||
uint64_t expireTime;
|
uint64_t expireTime;
|
||||||
int8_t ctnRdCnt; // timers already notify to client
|
int8_t ctnRdCnt; // continue read count
|
||||||
int hostThreadIndex;
|
int hThrdIdx;
|
||||||
|
|
||||||
SRpcPush* push;
|
SRpcPush* push;
|
||||||
int persist; //
|
int persist; //
|
||||||
|
@ -482,12 +482,12 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
|
||||||
if (pMsg->msg.handle == NULL) {
|
if (pMsg->msg.handle == NULL) {
|
||||||
conn = getConnFromPool(pThrd->pool, pCtx->ip, pCtx->port);
|
conn = getConnFromPool(pThrd->pool, pCtx->ip, pCtx->port);
|
||||||
if (conn != NULL) {
|
if (conn != NULL) {
|
||||||
tTrace("client conn %d get from conn pool", conn);
|
tTrace("client conn %p get from conn pool", conn);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
conn = (SCliConn*)(pMsg->msg.handle);
|
conn = (SCliConn*)(pMsg->msg.handle);
|
||||||
if (conn != NULL) {
|
if (conn != NULL) {
|
||||||
tTrace("client conn %d reused", conn);
|
tTrace("client conn %p reused", conn);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -503,7 +503,7 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
|
||||||
clientWrite(conn);
|
clientWrite(conn);
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
SCliConn* conn = calloc(1, sizeof(SCliConn));
|
conn = calloc(1, sizeof(SCliConn));
|
||||||
conn->ref++;
|
conn->ref++;
|
||||||
// read/write stream handle
|
// read/write stream handle
|
||||||
conn->stream = (uv_stream_t*)malloc(sizeof(uv_tcp_t));
|
conn->stream = (uv_stream_t*)malloc(sizeof(uv_tcp_t));
|
||||||
|
@ -652,15 +652,9 @@ static void clientSendQuit(SCliThrdObj* thrd) {
|
||||||
SCliMsg* msg = calloc(1, sizeof(SCliMsg));
|
SCliMsg* msg = calloc(1, sizeof(SCliMsg));
|
||||||
msg->ctx = NULL; //
|
msg->ctx = NULL; //
|
||||||
|
|
||||||
// pthread_mutex_lock(&thrd->msgMtx);
|
|
||||||
// QUEUE_PUSH(&thrd->msg, &msg->q);
|
|
||||||
// pthread_mutex_unlock(&thrd->msgMtx);
|
|
||||||
|
|
||||||
transSendAsync(thrd->asyncPool, &msg->q);
|
transSendAsync(thrd->asyncPool, &msg->q);
|
||||||
// uv_async_send(thrd->cliAsync);
|
|
||||||
}
|
}
|
||||||
void taosCloseClient(void* arg) {
|
void taosCloseClient(void* arg) {
|
||||||
// impl later
|
|
||||||
SClientObj* cli = arg;
|
SClientObj* cli = arg;
|
||||||
for (int i = 0; i < cli->numOfThreads; i++) {
|
for (int i = 0; i < cli->numOfThreads; i++) {
|
||||||
clientSendQuit(cli->pThreadObj[i]);
|
clientSendQuit(cli->pThreadObj[i]);
|
||||||
|
@ -683,7 +677,7 @@ void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t*
|
||||||
|
|
||||||
SRpcInfo* pRpc = (SRpcInfo*)shandle;
|
SRpcInfo* pRpc = (SRpcInfo*)shandle;
|
||||||
|
|
||||||
int index = CONN_HOST_THREAD_INDEX(pMsg.handle);
|
int index = CONN_HOST_THREAD_INDEX(pMsg->handle);
|
||||||
if (index == -1) {
|
if (index == -1) {
|
||||||
index = clientRBChoseIdx(pRpc);
|
index = clientRBChoseIdx(pRpc);
|
||||||
}
|
}
|
||||||
|
@ -717,7 +711,7 @@ void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pReq, SRpcMsg* pRsp) {
|
||||||
|
|
||||||
SRpcInfo* pRpc = (SRpcInfo*)shandle;
|
SRpcInfo* pRpc = (SRpcInfo*)shandle;
|
||||||
|
|
||||||
int index = CONN_HOST_THREAD_INDEX(pMsg.handle);
|
int index = CONN_HOST_THREAD_INDEX(pReq->handle);
|
||||||
if (index == -1) {
|
if (index == -1) {
|
||||||
index = clientRBChoseIdx(pRpc);
|
index = clientRBChoseIdx(pRpc);
|
||||||
}
|
}
|
||||||
|
@ -728,6 +722,7 @@ void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pReq, SRpcMsg* pRsp) {
|
||||||
pCtx->msgType = pReq->msgType;
|
pCtx->msgType = pReq->msgType;
|
||||||
pCtx->ip = strdup(ip);
|
pCtx->ip = strdup(ip);
|
||||||
pCtx->port = port;
|
pCtx->port = port;
|
||||||
|
pCtx->hThrdIdx = index;
|
||||||
pCtx->pSem = calloc(1, sizeof(tsem_t));
|
pCtx->pSem = calloc(1, sizeof(tsem_t));
|
||||||
pCtx->pRsp = pRsp;
|
pCtx->pRsp = pRsp;
|
||||||
tsem_init(pCtx->pSem, 0, 0);
|
tsem_init(pCtx->pSem, 0, 0);
|
||||||
|
|
|
@ -96,7 +96,7 @@ static void *sendRequest(void *param) {
|
||||||
|
|
||||||
int main(int argc, char *argv[]) {
|
int main(int argc, char *argv[]) {
|
||||||
SRpcInit rpcInit;
|
SRpcInit rpcInit;
|
||||||
SEpSet epSet;
|
SEpSet epSet = {0};
|
||||||
int msgSize = 128;
|
int msgSize = 128;
|
||||||
int numOfReqs = 0;
|
int numOfReqs = 0;
|
||||||
int appThreads = 1;
|
int appThreads = 1;
|
||||||
|
|
Loading…
Reference in New Issue