feature(rpc): add retry
This commit is contained in:
parent
19a16640c9
commit
207fd455fb
|
@ -59,9 +59,13 @@ typedef struct {
|
||||||
void * pNode;
|
void * pNode;
|
||||||
} SNodeMsg;
|
} SNodeMsg;
|
||||||
|
|
||||||
typedef void (*RpcCfp)(void *parent, SRpcMsg *, SEpSet *);
|
typedef void (*RpcCfp)(void *parent, SRpcMsg *, SEpSet *rf);
|
||||||
typedef int (*RpcAfp)(void *parent, char *tableId, char *spi, char *encrypt, char *secret, char *ckey);
|
typedef int (*RpcAfp)(void *parent, char *tableId, char *spi, char *encrypt, char *secret, char *ckey);
|
||||||
typedef int (*RpcRfp)(void *parent, SRpcMsg *, SEpSet *);
|
///
|
||||||
|
// // SRpcMsg code
|
||||||
|
// REDIERE,
|
||||||
|
// NOT READY, EpSet
|
||||||
|
typedef bool (*RpcRfp)(int32_t code);
|
||||||
|
|
||||||
typedef struct SRpcInit {
|
typedef struct SRpcInit {
|
||||||
uint16_t localPort; // local port
|
uint16_t localPort; // local port
|
||||||
|
|
|
@ -103,6 +103,9 @@ typedef void* queue[2];
|
||||||
/* Return the structure holding the given element. */
|
/* Return the structure holding the given element. */
|
||||||
#define QUEUE_DATA(e, type, field) ((type*)((void*)((char*)(e)-offsetof(type, field))))
|
#define QUEUE_DATA(e, type, field) ((type*)((void*)((char*)(e)-offsetof(type, field))))
|
||||||
|
|
||||||
|
#define TRANS_RETRY_COUNT_LIMIT 10 // retry count limit
|
||||||
|
#define TRANS_RETRY_INTERVAL 5 // ms retry interval
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
SRpcInfo* pRpc; // associated SRpcInfo
|
SRpcInfo* pRpc; // associated SRpcInfo
|
||||||
SEpSet epSet; // ip list provided by app
|
SEpSet epSet; // ip list provided by app
|
||||||
|
@ -137,14 +140,12 @@ typedef struct {
|
||||||
int8_t connType; // connection type cli/srv
|
int8_t connType; // connection type cli/srv
|
||||||
int64_t rid; // refId returned by taosAddRef
|
int64_t rid; // refId returned by taosAddRef
|
||||||
|
|
||||||
|
int8_t retryCount;
|
||||||
STransCtx appCtx; //
|
STransCtx appCtx; //
|
||||||
STransMsg* pRsp; // for synchronous API
|
STransMsg* pRsp; // for synchronous API
|
||||||
tsem_t* pSem; // for synchronous API
|
tsem_t* pSem; // for synchronous API
|
||||||
|
|
||||||
int hThrdIdx;
|
int hThrdIdx;
|
||||||
char* ip;
|
|
||||||
uint32_t port;
|
|
||||||
// SEpSet* pSet; // for synchronous API
|
|
||||||
} STransConnCtx;
|
} STransConnCtx;
|
||||||
|
|
||||||
#pragma pack(push, 1)
|
#pragma pack(push, 1)
|
||||||
|
@ -215,8 +216,6 @@ void transBuildAuthHead(void* pMsg, int msgLen, void* pAuth, void* pKey);
|
||||||
bool transCompressMsg(char* msg, int32_t len, int32_t* flen);
|
bool transCompressMsg(char* msg, int32_t len, int32_t* flen);
|
||||||
bool transDecompressMsg(char* msg, int32_t len, int32_t* flen);
|
bool transDecompressMsg(char* msg, int32_t len, int32_t* flen);
|
||||||
|
|
||||||
void transConnCtxDestroy(STransConnCtx* ctx);
|
|
||||||
|
|
||||||
void transFreeMsg(void* msg);
|
void transFreeMsg(void* msg);
|
||||||
|
|
||||||
//
|
//
|
||||||
|
@ -262,8 +261,8 @@ void transUnrefCliHandle(void* handle);
|
||||||
void transReleaseCliHandle(void* handle);
|
void transReleaseCliHandle(void* handle);
|
||||||
void transReleaseSrvHandle(void* handle);
|
void transReleaseSrvHandle(void* handle);
|
||||||
|
|
||||||
void transSendRequest(void* shandle, const char* ip, uint32_t port, STransMsg* pMsg, STransCtx* pCtx);
|
void transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pMsg, STransCtx* pCtx);
|
||||||
void transSendRecv(void* shandle, const char* ip, uint32_t port, STransMsg* pMsg, STransMsg* pRsp);
|
void transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pMsg, STransMsg* pRsp);
|
||||||
void transSendResponse(const STransMsg* msg);
|
void transSendResponse(const STransMsg* msg);
|
||||||
void transRegisterMsg(const STransMsg* msg);
|
void transRegisterMsg(const STransMsg* msg);
|
||||||
int transGetConnInfo(void* thandle, STransHandleInfo* pInfo);
|
int transGetConnInfo(void* thandle, STransHandleInfo* pInfo);
|
||||||
|
|
|
@ -62,8 +62,7 @@ typedef struct {
|
||||||
char ckey[TSDB_PASSWORD_LEN]; // ciphering key
|
char ckey[TSDB_PASSWORD_LEN]; // ciphering key
|
||||||
|
|
||||||
void (*cfp)(void* parent, SRpcMsg*, SEpSet*);
|
void (*cfp)(void* parent, SRpcMsg*, SEpSet*);
|
||||||
int (*afp)(void* parent, char* user, char* spi, char* encrypt, char* secret, char* ckey);
|
bool (*retry)(int32_t code);
|
||||||
int (*retry)(void* parent, SRpcMsg*, SEpSet*);
|
|
||||||
|
|
||||||
int32_t refCount;
|
int32_t refCount;
|
||||||
void* parent;
|
void* parent;
|
||||||
|
|
|
@ -38,7 +38,6 @@ void* rpcOpen(const SRpcInit* pInit) {
|
||||||
|
|
||||||
// register callback handle
|
// register callback handle
|
||||||
pRpc->cfp = pInit->cfp;
|
pRpc->cfp = pInit->cfp;
|
||||||
pRpc->afp = pInit->afp;
|
|
||||||
pRpc->retry = pInit->rfp;
|
pRpc->retry = pInit->rfp;
|
||||||
|
|
||||||
if (pInit->connType == TAOS_CONN_SERVER) {
|
if (pInit->connType == TAOS_CONN_SERVER) {
|
||||||
|
@ -116,19 +115,13 @@ int rpcReportProgress(void* pConn, char* pCont, int contLen) { return -1; }
|
||||||
void rpcCancelRequest(int64_t rid) { return; }
|
void rpcCancelRequest(int64_t rid) { return; }
|
||||||
|
|
||||||
void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid) {
|
void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid) {
|
||||||
char* ip = (char*)(pEpSet->eps[pEpSet->inUse].fqdn);
|
transSendRequest(shandle, pEpSet, pMsg, NULL);
|
||||||
uint32_t port = pEpSet->eps[pEpSet->inUse].port;
|
|
||||||
transSendRequest(shandle, ip, port, pMsg, NULL);
|
|
||||||
}
|
}
|
||||||
void rpcSendRequestWithCtx(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid, SRpcCtx* pCtx) {
|
void rpcSendRequestWithCtx(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid, SRpcCtx* pCtx) {
|
||||||
char* ip = (char*)(pEpSet->eps[pEpSet->inUse].fqdn);
|
transSendRequest(shandle, pEpSet, pMsg, pCtx);
|
||||||
uint32_t port = pEpSet->eps[pEpSet->inUse].port;
|
|
||||||
transSendRequest(shandle, ip, port, pMsg, pCtx);
|
|
||||||
}
|
}
|
||||||
void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pMsg, SRpcMsg* pRsp) {
|
void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pMsg, SRpcMsg* pRsp) {
|
||||||
char* ip = (char*)(pEpSet->eps[pEpSet->inUse].fqdn);
|
transSendRecv(shandle, pEpSet, pMsg, pRsp);
|
||||||
uint32_t port = pEpSet->eps[pEpSet->inUse].port;
|
|
||||||
transSendRecv(shandle, ip, port, pMsg, pRsp);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void rpcSendResponse(const SRpcMsg* pMsg) { transSendResponse(pMsg); }
|
void rpcSendResponse(const SRpcMsg* pMsg) { transSendResponse(pMsg); }
|
||||||
|
|
|
@ -1,5 +1,4 @@
|
||||||
/*
|
/* * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
|
||||||
*
|
*
|
||||||
* This program is free software: you can use, redistribute, and/or modify
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
* it under the terms of the GNU Affero General Public License, version 3
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
@ -97,7 +96,7 @@ static void cliSendCb(uv_write_t* req, int status);
|
||||||
static void cliConnCb(uv_connect_t* req, int status);
|
static void cliConnCb(uv_connect_t* req, int status);
|
||||||
static void cliAsyncCb(uv_async_t* handle);
|
static void cliAsyncCb(uv_async_t* handle);
|
||||||
|
|
||||||
static void cliAppCb(SCliConn* pConn, STransMsg* pMsg);
|
static int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg);
|
||||||
|
|
||||||
static SCliConn* cliCreateConn(SCliThrdObj* thrd);
|
static SCliConn* cliCreateConn(SCliThrdObj* thrd);
|
||||||
static void cliDestroyConn(SCliConn* pConn, bool clear /*clear tcp handle or not*/);
|
static void cliDestroyConn(SCliConn* pConn, bool clear /*clear tcp handle or not*/);
|
||||||
|
@ -227,6 +226,9 @@ static void cliWalkCb(uv_handle_t* handle, void* arg);
|
||||||
#define REQUEST_PERSIS_HANDLE(msg) ((msg)->persistHandle == 1)
|
#define REQUEST_PERSIS_HANDLE(msg) ((msg)->persistHandle == 1)
|
||||||
#define REQUEST_RELEASE_HANDLE(cmsg) ((cmsg)->type == Release)
|
#define REQUEST_RELEASE_HANDLE(cmsg) ((cmsg)->type == Release)
|
||||||
|
|
||||||
|
#define EPSET_GET_INUSE_IP(epSet) ((epSet)->eps[(epSet)->inUse].fqdn)
|
||||||
|
#define EPSET_GET_INUSE_PORT(epSet) ((epSet)->eps[(epSet)->inUse].port)
|
||||||
|
|
||||||
static void* cliWorkThread(void* arg);
|
static void* cliWorkThread(void* arg);
|
||||||
|
|
||||||
bool cliMaySendCachedMsg(SCliConn* conn) {
|
bool cliMaySendCachedMsg(SCliConn* conn) {
|
||||||
|
@ -311,14 +313,10 @@ void cliHandleResp(SCliConn* conn) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pCtx == NULL || pCtx->pSem == NULL) {
|
int ret = cliAppCb(conn, &transMsg, pMsg);
|
||||||
tTrace("%s cli conn %p handle resp", pTransInst->label, conn);
|
if (ret != 0) {
|
||||||
cliAppCb(conn, &transMsg);
|
tTrace("try to send req to next node");
|
||||||
//(pTransInst->cfp)(pTransInst->parent, &transMsg, NULL);
|
return;
|
||||||
} else {
|
|
||||||
tTrace("%s cli conn(sync) %p handle resp", pTransInst->label, conn);
|
|
||||||
memcpy((char*)pCtx->pRsp, (char*)&transMsg, sizeof(transMsg));
|
|
||||||
tsem_post(pCtx->pSem);
|
|
||||||
}
|
}
|
||||||
destroyCmsg(pMsg);
|
destroyCmsg(pMsg);
|
||||||
|
|
||||||
|
@ -375,17 +373,15 @@ void cliHandleExcept(SCliConn* pConn) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pCtx == NULL || pCtx->pSem == NULL) {
|
if (pCtx == NULL || pCtx->pSem == NULL) {
|
||||||
tTrace("%s cli conn %p handle except", pTransInst->label, pConn);
|
|
||||||
if (transMsg.ahandle == NULL) {
|
if (transMsg.ahandle == NULL) {
|
||||||
once = true;
|
once = true;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
cliAppCb(pConn, &transMsg);
|
}
|
||||||
//(pTransInst->cfp)(pTransInst->parent, &transMsg, NULL);
|
int ret = cliAppCb(pConn, &transMsg, pMsg);
|
||||||
} else {
|
if (ret != 0) {
|
||||||
tTrace("%s cli conn(sync) %p handle except", pTransInst->label, pConn);
|
tTrace("try to send req to next node");
|
||||||
memcpy((char*)(pCtx->pRsp), (char*)(&transMsg), sizeof(transMsg));
|
return;
|
||||||
tsem_post(pCtx->pSem);
|
|
||||||
}
|
}
|
||||||
destroyCmsg(pMsg);
|
destroyCmsg(pMsg);
|
||||||
tTrace("%s cli conn %p start to destroy", CONN_GET_INST_LABEL(pConn), pConn);
|
tTrace("%s cli conn %p start to destroy", CONN_GET_INST_LABEL(pConn), pConn);
|
||||||
|
@ -695,7 +691,7 @@ SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrdObj* pThrd) {
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
STransConnCtx* pCtx = pMsg->ctx;
|
STransConnCtx* pCtx = pMsg->ctx;
|
||||||
conn = getConnFromPool(pThrd->pool, pCtx->ip, pCtx->port);
|
conn = getConnFromPool(pThrd->pool, EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet));
|
||||||
if (conn != NULL) {
|
if (conn != NULL) {
|
||||||
tTrace("%s cli conn %p get from conn pool", CONN_GET_INST_LABEL(conn), conn);
|
tTrace("%s cli conn %p get from conn pool", CONN_GET_INST_LABEL(conn), conn);
|
||||||
} else {
|
} else {
|
||||||
|
@ -719,10 +715,6 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
|
||||||
|
|
||||||
transCtxMerge(&conn->ctx, &pCtx->appCtx);
|
transCtxMerge(&conn->ctx, &pCtx->appCtx);
|
||||||
transQueuePush(&conn->cliMsgs, pMsg);
|
transQueuePush(&conn->cliMsgs, pMsg);
|
||||||
// tTrace("%s cli conn %p queue msg size %d", ((STrans*)pThrd->pTransInst)->label, conn, 2);
|
|
||||||
// return;
|
|
||||||
//}
|
|
||||||
// transDestroyBuffer(&conn->readBuf);
|
|
||||||
cliSend(conn);
|
cliSend(conn);
|
||||||
} else {
|
} else {
|
||||||
conn = cliCreateConn(pThrd);
|
conn = cliCreateConn(pThrd);
|
||||||
|
@ -730,8 +722,8 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
|
||||||
transQueuePush(&conn->cliMsgs, pMsg);
|
transQueuePush(&conn->cliMsgs, pMsg);
|
||||||
|
|
||||||
conn->hThrdIdx = pCtx->hThrdIdx;
|
conn->hThrdIdx = pCtx->hThrdIdx;
|
||||||
conn->ip = strdup(pMsg->ctx->ip);
|
conn->ip = strdup(EPSET_GET_INUSE_IP(&pCtx->epSet));
|
||||||
conn->port = pMsg->ctx->port;
|
conn->port = EPSET_GET_INUSE_PORT(&pCtx->epSet);
|
||||||
|
|
||||||
int ret = transSetConnOption((uv_tcp_t*)conn->stream);
|
int ret = transSetConnOption((uv_tcp_t*)conn->stream);
|
||||||
if (ret) {
|
if (ret) {
|
||||||
|
@ -743,10 +735,14 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
|
||||||
addr.sin_family = AF_INET;
|
addr.sin_family = AF_INET;
|
||||||
addr.sin_addr.s_addr = taosGetIpv4FromFqdn(conn->ip);
|
addr.sin_addr.s_addr = taosGetIpv4FromFqdn(conn->ip);
|
||||||
addr.sin_port = (uint16_t)htons((uint16_t)conn->port);
|
addr.sin_port = (uint16_t)htons((uint16_t)conn->port);
|
||||||
// uv_ip4_addr(pMsg->ctx->ip, pMsg->ctx->port, &addr);
|
tTrace("%s cli conn %p try to connect to %s:%d", pTransInst->label, conn, conn->ip, conn->port);
|
||||||
// handle error in callback if fail to connect
|
ret = uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb);
|
||||||
tTrace("%s cli conn %p try to connect to %s:%d", pTransInst->label, conn, pMsg->ctx->ip, pMsg->ctx->port);
|
if (ret != 0) {
|
||||||
uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb);
|
tTrace("%s cli conn %p failed to connect to %s:%d, reason: %s", pTransInst->label, conn, conn->ip, conn->port,
|
||||||
|
uv_err_name(ret));
|
||||||
|
cliHandleExcept(conn);
|
||||||
|
return;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
static void cliAsyncCb(uv_async_t* handle) {
|
static void cliAsyncCb(uv_async_t* handle) {
|
||||||
|
@ -856,12 +852,10 @@ static void destroyThrdObj(SCliThrdObj* pThrd) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static void transDestroyConnCtx(STransConnCtx* ctx) {
|
static void transDestroyConnCtx(STransConnCtx* ctx) {
|
||||||
if (ctx != NULL) {
|
//
|
||||||
taosMemoryFree(ctx->ip);
|
|
||||||
}
|
|
||||||
taosMemoryFree(ctx);
|
taosMemoryFree(ctx);
|
||||||
}
|
}
|
||||||
//
|
|
||||||
void cliSendQuit(SCliThrdObj* thrd) {
|
void cliSendQuit(SCliThrdObj* thrd) {
|
||||||
// cli can stop gracefully
|
// cli can stop gracefully
|
||||||
SCliMsg* msg = taosMemoryCalloc(1, sizeof(SCliMsg));
|
SCliMsg* msg = taosMemoryCalloc(1, sizeof(SCliMsg));
|
||||||
|
@ -881,17 +875,41 @@ int cliRBChoseIdx(STrans* pTransInst) {
|
||||||
}
|
}
|
||||||
return index % pTransInst->numOfThreads;
|
return index % pTransInst->numOfThreads;
|
||||||
}
|
}
|
||||||
void cliAppCb(SCliConn* pConn, STransMsg* transMsg) {
|
int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
|
||||||
SCliThrdObj* pThrd = pConn->hostThrd;
|
SCliThrdObj* pThrd = pConn->hostThrd;
|
||||||
STrans* pTransInst = pThrd->pTransInst;
|
STrans* pTransInst = pThrd->pTransInst;
|
||||||
|
|
||||||
if (transMsg->code == TSDB_CODE_RPC_REDIRECT && pTransInst->retry != NULL) {
|
if (pMsg == NULL || pMsg->ctx == NULL) {
|
||||||
SMEpSet emsg = {0};
|
tTrace("%s cli conn %p handle resp", pTransInst->label, pConn);
|
||||||
tDeserializeSMEpSet(transMsg->pCont, transMsg->contLen, &emsg);
|
pTransInst->cfp(pTransInst->parent, pResp, NULL);
|
||||||
pTransInst->retry(pTransInst, transMsg, &(emsg.epSet));
|
return 0;
|
||||||
} else {
|
|
||||||
pTransInst->cfp(pTransInst->parent, transMsg, NULL);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
STransConnCtx* pCtx = pMsg->ctx;
|
||||||
|
SEpSet* pEpSet = &pCtx->epSet;
|
||||||
|
if (pTransInst->retry != NULL && pTransInst->retry(pResp->code) && pCtx->retryCount <= TRANS_RETRY_COUNT_LIMIT) {
|
||||||
|
pCtx->retryCount += 1;
|
||||||
|
if (pResp->contLen == 0) {
|
||||||
|
pEpSet->inUse = (pEpSet->inUse++) % pEpSet->numOfEps;
|
||||||
|
} else {
|
||||||
|
SMEpSet emsg = {0};
|
||||||
|
tDeserializeSMEpSet(pResp->pCont, pResp->contLen, &emsg);
|
||||||
|
pCtx->epSet = emsg.epSet;
|
||||||
|
}
|
||||||
|
// release pConn
|
||||||
|
cliHandleReq(pMsg, pThrd);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pCtx->pSem != NULL) {
|
||||||
|
tTrace("%s cli conn %p handle resp", pTransInst->label, pConn);
|
||||||
|
memcpy((char*)pCtx->pRsp, (char*)&pResp, sizeof(*pResp));
|
||||||
|
tsem_post(pCtx->pSem);
|
||||||
|
} else {
|
||||||
|
tTrace("%s cli conn %p handle resp", pTransInst->label, pConn);
|
||||||
|
pTransInst->cfp(pTransInst->parent, pResp, pEpSet);
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void transCloseClient(void* arg) {
|
void transCloseClient(void* arg) {
|
||||||
|
@ -934,18 +952,17 @@ void transReleaseCliHandle(void* handle) {
|
||||||
transSendAsync(thrd->asyncPool, &cmsg->q);
|
transSendAsync(thrd->asyncPool, &cmsg->q);
|
||||||
}
|
}
|
||||||
|
|
||||||
void transSendRequest(void* shandle, const char* ip, uint32_t port, STransMsg* pMsg, STransCtx* ctx) {
|
void transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransCtx* ctx) {
|
||||||
STrans* pTransInst = (STrans*)shandle;
|
STrans* pTransInst = (STrans*)shandle;
|
||||||
int index = CONN_HOST_THREAD_INDEX((SCliConn*)pMsg->handle);
|
int index = CONN_HOST_THREAD_INDEX((SCliConn*)pReq->handle);
|
||||||
if (index == -1) {
|
if (index == -1) {
|
||||||
index = cliRBChoseIdx(pTransInst);
|
index = cliRBChoseIdx(pTransInst);
|
||||||
}
|
}
|
||||||
|
|
||||||
STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx));
|
STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx));
|
||||||
pCtx->ahandle = pMsg->ahandle;
|
pCtx->epSet = *pEpSet;
|
||||||
pCtx->msgType = pMsg->msgType;
|
pCtx->ahandle = pReq->ahandle;
|
||||||
pCtx->ip = strdup(ip);
|
pCtx->msgType = pReq->msgType;
|
||||||
pCtx->port = port;
|
|
||||||
pCtx->hThrdIdx = index;
|
pCtx->hThrdIdx = index;
|
||||||
|
|
||||||
if (ctx != NULL) {
|
if (ctx != NULL) {
|
||||||
|
@ -955,17 +972,18 @@ void transSendRequest(void* shandle, const char* ip, uint32_t port, STransMsg* p
|
||||||
|
|
||||||
SCliMsg* cliMsg = taosMemoryCalloc(1, sizeof(SCliMsg));
|
SCliMsg* cliMsg = taosMemoryCalloc(1, sizeof(SCliMsg));
|
||||||
cliMsg->ctx = pCtx;
|
cliMsg->ctx = pCtx;
|
||||||
cliMsg->msg = *pMsg;
|
cliMsg->msg = *pReq;
|
||||||
cliMsg->st = taosGetTimestampUs();
|
cliMsg->st = taosGetTimestampUs();
|
||||||
cliMsg->type = Normal;
|
cliMsg->type = Normal;
|
||||||
|
|
||||||
SCliThrdObj* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[index];
|
SCliThrdObj* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[index];
|
||||||
|
|
||||||
tDebug("send request at thread:%d %p, dst: %s:%d, app:%p", index, pMsg, ip, port, pMsg->ahandle);
|
tDebug("send request at thread:%d %p, dst: %s:%d, app:%p", index, pReq, EPSET_GET_INUSE_IP(&pCtx->epSet),
|
||||||
|
EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->ahandle);
|
||||||
ASSERT(transSendAsync(thrd->asyncPool, &(cliMsg->q)) == 0);
|
ASSERT(transSendAsync(thrd->asyncPool, &(cliMsg->q)) == 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
void transSendRecv(void* shandle, const char* ip, uint32_t port, STransMsg* pReq, STransMsg* pRsp) {
|
void transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMsg* pRsp) {
|
||||||
STrans* pTransInst = (STrans*)shandle;
|
STrans* pTransInst = (STrans*)shandle;
|
||||||
int index = CONN_HOST_THREAD_INDEX(pReq->handle);
|
int index = CONN_HOST_THREAD_INDEX(pReq->handle);
|
||||||
if (index == -1) {
|
if (index == -1) {
|
||||||
|
@ -973,10 +991,9 @@ void transSendRecv(void* shandle, const char* ip, uint32_t port, STransMsg* pReq
|
||||||
}
|
}
|
||||||
|
|
||||||
STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx));
|
STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx));
|
||||||
|
pCtx->epSet = *pEpSet;
|
||||||
pCtx->ahandle = pReq->ahandle;
|
pCtx->ahandle = pReq->ahandle;
|
||||||
pCtx->msgType = pReq->msgType;
|
pCtx->msgType = pReq->msgType;
|
||||||
pCtx->ip = strdup(ip);
|
|
||||||
pCtx->port = port;
|
|
||||||
pCtx->hThrdIdx = index;
|
pCtx->hThrdIdx = index;
|
||||||
pCtx->pSem = taosMemoryCalloc(1, sizeof(tsem_t));
|
pCtx->pSem = taosMemoryCalloc(1, sizeof(tsem_t));
|
||||||
pCtx->pRsp = pRsp;
|
pCtx->pRsp = pRsp;
|
||||||
|
@ -989,6 +1006,9 @@ void transSendRecv(void* shandle, const char* ip, uint32_t port, STransMsg* pReq
|
||||||
cliMsg->type = Normal;
|
cliMsg->type = Normal;
|
||||||
|
|
||||||
SCliThrdObj* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[index];
|
SCliThrdObj* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[index];
|
||||||
|
tDebug("send request at thread:%d %p, dst: %s:%d, app:%p", index, pReq, EPSET_GET_INUSE_IP(&pCtx->epSet),
|
||||||
|
EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->ahandle);
|
||||||
|
|
||||||
transSendAsync(thrd->asyncPool, &(cliMsg->q));
|
transSendAsync(thrd->asyncPool, &(cliMsg->q));
|
||||||
tsem_t* pSem = pCtx->pSem;
|
tsem_t* pSem = pCtx->pSem;
|
||||||
tsem_wait(pSem);
|
tsem_wait(pSem);
|
||||||
|
|
|
@ -93,11 +93,6 @@ bool transDecompressMsg(char* msg, int32_t len, int32_t* flen) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
void transConnCtxDestroy(STransConnCtx* ctx) {
|
|
||||||
taosMemoryFree(ctx->ip);
|
|
||||||
taosMemoryFree(ctx);
|
|
||||||
}
|
|
||||||
|
|
||||||
void transFreeMsg(void* msg) {
|
void transFreeMsg(void* msg) {
|
||||||
if (msg == NULL) {
|
if (msg == NULL) {
|
||||||
return;
|
return;
|
||||||
|
@ -363,10 +358,4 @@ void transQueueDestroy(STransQueue* queue) {
|
||||||
transQueueClear(queue);
|
transQueueClear(queue);
|
||||||
taosArrayDestroy(queue->q);
|
taosArrayDestroy(queue->q);
|
||||||
}
|
}
|
||||||
// int32_t transGetExHandle() {
|
|
||||||
// static
|
|
||||||
//}
|
|
||||||
// void transThreadOnce() {
|
|
||||||
// taosThreadOnce(&transModuleInit, );
|
|
||||||
//}
|
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -802,7 +802,6 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
|
||||||
|
|
||||||
taosThreadOnce(&transModuleInit, uvInitExHandleMgt);
|
taosThreadOnce(&transModuleInit, uvInitExHandleMgt);
|
||||||
transSrvInst++;
|
transSrvInst++;
|
||||||
// uvOpenExHandleMgt(10000);
|
|
||||||
|
|
||||||
for (int i = 0; i < srv->numOfThreads; i++) {
|
for (int i = 0; i < srv->numOfThreads; i++) {
|
||||||
SWorkThrdObj* thrd = (SWorkThrdObj*)taosMemoryCalloc(1, sizeof(SWorkThrdObj));
|
SWorkThrdObj* thrd = (SWorkThrdObj*)taosMemoryCalloc(1, sizeof(SWorkThrdObj));
|
||||||
|
@ -831,6 +830,7 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
|
||||||
} else {
|
} else {
|
||||||
// TODO: clear all other resource later
|
// TODO: clear all other resource later
|
||||||
tError("failed to create worker-thread %d", i);
|
tError("failed to create worker-thread %d", i);
|
||||||
|
goto End;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (false == addHandleToAcceptloop(srv)) {
|
if (false == addHandleToAcceptloop(srv)) {
|
||||||
|
@ -840,6 +840,8 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
|
||||||
if (err == 0) {
|
if (err == 0) {
|
||||||
tDebug("success to create accept-thread");
|
tDebug("success to create accept-thread");
|
||||||
} else {
|
} else {
|
||||||
|
tError("failed to create accept-thread");
|
||||||
|
goto End;
|
||||||
// clear all resource later
|
// clear all resource later
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1078,6 +1080,7 @@ void transRegisterMsg(const STransMsg* msg) {
|
||||||
transSendAsync(pThrd->asyncPool, &srvMsg->q);
|
transSendAsync(pThrd->asyncPool, &srvMsg->q);
|
||||||
uvReleaseExHandle(refId);
|
uvReleaseExHandle(refId);
|
||||||
return;
|
return;
|
||||||
|
|
||||||
_return1:
|
_return1:
|
||||||
tTrace("server handle %p failed to send to register brokenlink", exh);
|
tTrace("server handle %p failed to send to register brokenlink", exh);
|
||||||
rpcFreeCont(msg->pCont);
|
rpcFreeCont(msg->pCont);
|
||||||
|
|
Loading…
Reference in New Issue