Merge pull request #14214 from taosdata/feat/rpcRefactor1
feat: rpc refactor
This commit is contained in:
commit
6e4e10ad41
|
@ -16,6 +16,7 @@
|
|||
#ifndef _TD_QUERY_H_
|
||||
#define _TD_QUERY_H_
|
||||
|
||||
// clang-foramt off
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
@ -206,13 +207,15 @@ int32_t queryCreateTableMetaFromMsg(STableMetaRsp* msg, bool isSuperTable, STabl
|
|||
char* jobTaskStatusStr(int32_t status);
|
||||
|
||||
SSchema createSchema(int8_t type, int32_t bytes, col_id_t colId, const char* name);
|
||||
|
||||
void destroyQueryExecRes(SQueryExecRes* pRes);
|
||||
int32_t dataConverToStr(char* str, int type, void* buf, int32_t bufSize, int32_t* len);
|
||||
char* parseTagDatatoJson(void* p);
|
||||
int32_t cloneTableMeta(STableMeta* pSrc, STableMeta** pDst);
|
||||
int32_t cloneDbVgInfo(SDBVgInfo* pSrc, SDBVgInfo** pDst);
|
||||
|
||||
extern int32_t (*queryBuildMsg[TDMT_MAX])(void *input, char **msg, int32_t msgSize, int32_t *msgLen, void*(*mallocFp)(int32_t));
|
||||
extern int32_t (*queryBuildMsg[TDMT_MAX])(void* input, char** msg, int32_t msgSize, int32_t* msgLen,
|
||||
void* (*mallocFp)(int32_t));
|
||||
extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t msgSize);
|
||||
|
||||
#define SET_META_TYPE_NULL(t) (t) = META_TYPE_NULL_TABLE
|
||||
|
@ -231,11 +234,13 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t
|
|||
#define NEED_CLIENT_HANDLE_ERROR(_code) \
|
||||
(NEED_CLIENT_RM_TBLMETA_ERROR(_code) || NEED_CLIENT_REFRESH_VG_ERROR(_code) || \
|
||||
NEED_CLIENT_REFRESH_TBLMETA_ERROR(_code))
|
||||
#define NEED_CLIENT_RM_TBLMETA_REQ(_type) ((_type) == TDMT_VND_CREATE_TABLE || (_type) == TDMT_VND_CREATE_STB \
|
||||
|| (_type) == TDMT_VND_DROP_TABLE || (_type) == TDMT_VND_DROP_STB)
|
||||
#define NEED_CLIENT_RM_TBLMETA_REQ(_type) \
|
||||
((_type) == TDMT_VND_CREATE_TABLE || (_type) == TDMT_VND_CREATE_STB || (_type) == TDMT_VND_DROP_TABLE || \
|
||||
(_type) == TDMT_VND_DROP_STB)
|
||||
|
||||
#define NEED_SCHEDULER_RETRY_ERROR(_code) \
|
||||
((_code) == TSDB_CODE_RPC_REDIRECT || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL || (_code) == TSDB_CODE_SCH_TIMEOUT_ERROR)
|
||||
((_code) == TSDB_CODE_RPC_REDIRECT || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL || \
|
||||
(_code) == TSDB_CODE_SCH_TIMEOUT_ERROR)
|
||||
|
||||
#define REQUEST_TOTAL_EXEC_TIMES 2
|
||||
|
||||
|
@ -312,3 +317,4 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t
|
|||
#endif
|
||||
|
||||
#endif /*_TD_QUERY_H_*/
|
||||
// clang-foramt on
|
||||
|
|
|
@ -157,7 +157,10 @@ int32_t taosNonblockwrite(TdSocketPtr pSocket, char *ptr, int32_t nbytes);
|
|||
int64_t taosCopyFds(TdSocketPtr pSrcSocket, TdSocketPtr pDestSocket, int64_t len);
|
||||
void taosWinSocketInit();
|
||||
|
||||
int taosCreateSocketWithTimeOutOpt(uint32_t conn_timeout_sec);
|
||||
/*
|
||||
* set timeout(ms)
|
||||
*/
|
||||
int32_t taosCreateSocketWithTimeout(uint32_t timeout);
|
||||
|
||||
TdSocketPtr taosOpenUdpSocket(uint32_t localIp, uint16_t localPort);
|
||||
TdSocketPtr taosOpenTcpClientSocket(uint32_t ip, uint16_t port, uint32_t localIp);
|
||||
|
|
|
@ -47,7 +47,9 @@ typedef struct STraceId {
|
|||
|
||||
#define TRACE_TO_STR(traceId, buf) \
|
||||
do { \
|
||||
sprintf(buf, "0x%" PRIx64 ":0x%" PRIx64 "", traceId->rootId, traceId->msgId); \
|
||||
int64_t rootId = (traceId) != NULL ? (traceId)->rootId : 0; \
|
||||
int64_t msgId = (traceId) != NULL ? (traceId)->msgId : 0; \
|
||||
sprintf(buf, "0x%" PRIx64 ":0x%" PRIx64 "", rootId, msgId); \
|
||||
} while (0)
|
||||
|
||||
#ifdef __cplusplus
|
||||
|
|
|
@ -13,11 +13,11 @@
|
|||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "os.h"
|
||||
#include "catalog.h"
|
||||
#include "functionMgt.h"
|
||||
#include "clientInt.h"
|
||||
#include "clientLog.h"
|
||||
#include "functionMgt.h"
|
||||
#include "os.h"
|
||||
#include "query.h"
|
||||
#include "scheduler.h"
|
||||
#include "tcache.h"
|
||||
|
@ -85,7 +85,8 @@ void closeTransporter(STscObj *pTscObj) {
|
|||
}
|
||||
|
||||
static bool clientRpcRfp(int32_t code) {
|
||||
if (code == TSDB_CODE_RPC_REDIRECT) {
|
||||
if (code == TSDB_CODE_RPC_REDIRECT || code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_NODE_NOT_DEPLOYED ||
|
||||
code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_APP_NOT_READY) {
|
||||
return true;
|
||||
} else {
|
||||
return false;
|
||||
|
@ -137,7 +138,8 @@ void destroyTscObj(void *pObj) {
|
|||
// TODO
|
||||
// closeTransporter(pTscObj);
|
||||
}
|
||||
tscDebug("connObj 0x%" PRIx64 " destroyed, totalConn:%" PRId64, *(int64_t*)pTscObj->id, pTscObj->pAppInfo->numOfConns);
|
||||
tscDebug("connObj 0x%" PRIx64 " destroyed, totalConn:%" PRId64, *(int64_t *)pTscObj->id,
|
||||
pTscObj->pAppInfo->numOfConns);
|
||||
taosThreadMutexDestroy(&pTscObj->mutex);
|
||||
taosMemoryFreeClear(pTscObj);
|
||||
}
|
||||
|
|
|
@ -248,7 +248,14 @@ static inline void dmReleaseHandle(SRpcHandleInfo *pHandle, int8_t type) {
|
|||
}
|
||||
}
|
||||
|
||||
static bool rpcRfp(int32_t code) { return code == TSDB_CODE_RPC_REDIRECT; }
|
||||
static bool rpcRfp(int32_t code) {
|
||||
if (code == TSDB_CODE_RPC_REDIRECT || code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_NODE_NOT_DEPLOYED ||
|
||||
code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_APP_NOT_READY) {
|
||||
return true;
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
int32_t dmInitClient(SDnode *pDnode) {
|
||||
SDnodeTrans *pTrans = &pDnode->trans;
|
||||
|
|
|
@ -12,6 +12,8 @@
|
|||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
// clang-format off
|
||||
#include "uv.h"
|
||||
#include "os.h"
|
||||
#include "fnLog.h"
|
||||
|
@ -25,6 +27,7 @@
|
|||
#include "tglobal.h"
|
||||
#include "tmsg.h"
|
||||
#include "trpc.h"
|
||||
// clang-foramt on
|
||||
|
||||
typedef struct SUdfdContext {
|
||||
uv_loop_t * loop;
|
||||
|
@ -544,7 +547,8 @@ int32_t udfdLoadUdf(char *udfName, SUdf *udf) {
|
|||
return 0;
|
||||
}
|
||||
static bool udfdRpcRfp(int32_t code) {
|
||||
if (code == TSDB_CODE_RPC_REDIRECT) {
|
||||
if (code == TSDB_CODE_RPC_REDIRECT || code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_NODE_NOT_DEPLOYED ||
|
||||
code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_APP_NOT_READY) {
|
||||
return true;
|
||||
} else {
|
||||
return false;
|
||||
|
@ -652,8 +656,7 @@ void udfdAllocBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf) {
|
|||
buf->base = ctx->inputBuf;
|
||||
buf->len = ctx->inputCap;
|
||||
} else {
|
||||
fnError("udfd can not allocate enough memory")
|
||||
buf->base = NULL;
|
||||
fnError("udfd can not allocate enough memory") buf->base = NULL;
|
||||
buf->len = 0;
|
||||
}
|
||||
} else {
|
||||
|
@ -664,8 +667,7 @@ void udfdAllocBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf) {
|
|||
buf->base = ctx->inputBuf + ctx->inputLen;
|
||||
buf->len = ctx->inputCap - ctx->inputLen;
|
||||
} else {
|
||||
fnError("udfd can not allocate enough memory")
|
||||
buf->base = NULL;
|
||||
fnError("udfd can not allocate enough memory") buf->base = NULL;
|
||||
buf->len = 0;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
#include "tmsg.h"
|
||||
#include "trpc.h"
|
||||
#include "tsched.h"
|
||||
// clang-format off
|
||||
#include "cJSON.h"
|
||||
|
||||
#define VALIDNUMOFCOLS(x) ((x) >= TSDB_MIN_COLUMNS && (x) <= TSDB_MAX_COLUMNS)
|
||||
|
@ -147,13 +148,15 @@ int32_t asyncSendMsgToServerExt(void* pTransporter, SEpSet* epSet, int64_t* pTra
|
|||
}
|
||||
|
||||
memcpy(pMsg, pInfo->msgInfo.pData, pInfo->msgInfo.len);
|
||||
SRpcMsg rpcMsg = {.msgType = pInfo->msgType,
|
||||
SRpcMsg rpcMsg = {
|
||||
.msgType = pInfo->msgType,
|
||||
.pCont = pMsg,
|
||||
.contLen = pInfo->msgInfo.len,
|
||||
.info.ahandle = (void*)pInfo,
|
||||
.info.handle = pInfo->msgInfo.handle,
|
||||
.info.persistHandle = persistHandle,
|
||||
.code = 0};
|
||||
.code = 0
|
||||
};
|
||||
assert(pInfo->fp != NULL);
|
||||
TRACE_SET_ROOTID(&rpcMsg.info.traceId, pInfo->requestId);
|
||||
rpcSendRequestWithCtx(pTransporter, epSet, &rpcMsg, pTransporterId, rpcCtx);
|
||||
|
@ -221,6 +224,7 @@ void destroyQueryExecRes(SQueryExecRes* pRes) {
|
|||
qError("invalid exec result for request type %d", pRes->msgType);
|
||||
}
|
||||
}
|
||||
// clang-format on
|
||||
|
||||
int32_t dataConverToStr(char* str, int type, void* buf, int32_t bufSize, int32_t* len) {
|
||||
int32_t n = 0;
|
||||
|
@ -372,7 +376,6 @@ end:
|
|||
return string;
|
||||
}
|
||||
|
||||
|
||||
int32_t cloneTableMeta(STableMeta* pSrc, STableMeta** pDst) {
|
||||
if (NULL == pSrc) {
|
||||
*pDst = NULL;
|
||||
|
@ -400,7 +403,8 @@ int32_t cloneDbVgInfo(SDBVgInfo* pSrc, SDBVgInfo** pDst) {
|
|||
}
|
||||
memcpy(*pDst, pSrc, sizeof(*pSrc));
|
||||
if (pSrc->vgHash) {
|
||||
(*pDst)->vgHash = taosHashInit(taosHashGetSize(pSrc->vgHash), taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
|
||||
(*pDst)->vgHash = taosHashInit(taosHashGetSize(pSrc->vgHash), taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true,
|
||||
HASH_ENTRY_LOCK);
|
||||
if (NULL == (*pDst)->vgHash) {
|
||||
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
}
|
||||
|
@ -425,5 +429,3 @@ int32_t cloneDbVgInfo(SDBVgInfo* pSrc, SDBVgInfo** pDst) {
|
|||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -105,6 +105,13 @@ typedef SRpcCtxVal STransCtxVal;
|
|||
typedef SRpcInfo STrans;
|
||||
typedef SRpcConnInfo STransHandleInfo;
|
||||
|
||||
// ref mgt
|
||||
// handle
|
||||
typedef struct SExHandle {
|
||||
void* handle;
|
||||
int64_t refId;
|
||||
void* pThrd;
|
||||
} SExHandle;
|
||||
/*convet from fqdn to ip */
|
||||
typedef struct SCvtAddr {
|
||||
char ip[TSDB_FQDN_LEN];
|
||||
|
@ -118,9 +125,10 @@ typedef struct {
|
|||
void* ahandle; // handle provided by app
|
||||
tmsg_t msgType; // message type
|
||||
int8_t connType; // connection type cli/srv
|
||||
int64_t rid; // refId returned by taosAddRef
|
||||
|
||||
int8_t retryCount;
|
||||
int8_t retryCnt;
|
||||
int8_t retryLimit;
|
||||
// bool setMaxRetry;
|
||||
STransCtx appCtx; //
|
||||
STransMsg* pRsp; // for synchronous API
|
||||
tsem_t* pSem; // for synchronous API
|
||||
|
@ -239,6 +247,32 @@ int transSendAsync(SAsyncPool* pool, queue* mq);
|
|||
} \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
#define ASYNC_CHECK_HANDLE(exh1, id) \
|
||||
do { \
|
||||
if (id > 0) { \
|
||||
tTrace("handle step1"); \
|
||||
SExHandle* exh2 = transAcquireExHandle(refMgt, id); \
|
||||
if (exh2 == NULL || id != exh2->refId) { \
|
||||
tTrace("handle %p except, may already freed, ignore msg, ref1: %" PRIu64 ", ref2 : %" PRIu64 "", exh1, \
|
||||
exh2 ? exh2->refId : 0, id); \
|
||||
goto _return1; \
|
||||
} \
|
||||
} else if (id == 0) { \
|
||||
tTrace("handle step2"); \
|
||||
SExHandle* exh2 = transAcquireExHandle(refMgt, id); \
|
||||
if (exh2 == NULL || id == exh2->refId) { \
|
||||
tTrace("handle %p except, may already freed, ignore msg, ref1: %" PRIu64 ", ref2 : %" PRIu64 "", exh1, id, \
|
||||
exh2 ? exh2->refId : 0); \
|
||||
goto _return1; \
|
||||
} else { \
|
||||
id = exh1->refId; \
|
||||
} \
|
||||
} else if (id < 0) { \
|
||||
tTrace("handle step3"); \
|
||||
goto _return2; \
|
||||
} \
|
||||
} while (0)
|
||||
int transInitBuffer(SConnBuffer* buf);
|
||||
int transClearBuffer(SConnBuffer* buf);
|
||||
int transDestroyBuffer(SConnBuffer* buf);
|
||||
|
@ -349,21 +383,13 @@ void transDQDestroy(SDelayQueue* queue);
|
|||
|
||||
int transDQSched(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_t timeoutMs);
|
||||
|
||||
void transPrintEpSet(SEpSet* pEpSet);
|
||||
// void transPrintEpSet(SEpSet* pEpSet);
|
||||
bool transEpSetIsEqual(SEpSet* a, SEpSet* b);
|
||||
/*
|
||||
* init global func
|
||||
*/
|
||||
void transThreadOnce();
|
||||
|
||||
// ref mgt
|
||||
// handle
|
||||
typedef struct SExHandle {
|
||||
void* handle;
|
||||
int64_t refId;
|
||||
void* pThrd;
|
||||
} SExHandle;
|
||||
|
||||
void transInitEnv();
|
||||
int32_t transOpenExHandleMgt(int size);
|
||||
void transCloseExHandleMgt(int32_t mgt);
|
||||
|
|
|
@ -79,6 +79,7 @@ void* rpcOpen(const SRpcInit* pInit) {
|
|||
return pRpc;
|
||||
}
|
||||
void rpcClose(void* arg) {
|
||||
tInfo("start to close rpc");
|
||||
SRpcInfo* pRpc = (SRpcInfo*)arg;
|
||||
(*taosCloseHandle[pRpc->connType])(pRpc->tcphandle);
|
||||
transCloseExHandleMgt(pRpc->refMgt);
|
||||
|
|
|
@ -25,7 +25,6 @@ typedef struct SCliConn {
|
|||
uv_write_t writeReq;
|
||||
|
||||
void* hostThrd;
|
||||
int hThrdIdx;
|
||||
|
||||
SConnBuffer readBuf;
|
||||
STransQueue cliMsgs;
|
||||
|
@ -36,6 +35,7 @@ typedef struct SCliConn {
|
|||
bool broken; // link broken or not
|
||||
ConnStatus status; //
|
||||
|
||||
int64_t refId;
|
||||
char* ip;
|
||||
uint32_t port;
|
||||
|
||||
|
@ -54,7 +54,7 @@ typedef struct SCliMsg {
|
|||
int sent; //(0: no send, 1: alread sent)
|
||||
} SCliMsg;
|
||||
|
||||
typedef struct SCliThrdObj {
|
||||
typedef struct SCliThrd {
|
||||
TdThread thread; // tid
|
||||
int64_t pid; // pid
|
||||
uv_loop_t* loop;
|
||||
|
@ -72,13 +72,13 @@ typedef struct SCliThrdObj {
|
|||
SCvtAddr cvtAddr;
|
||||
|
||||
bool quit;
|
||||
} SCliThrdObj;
|
||||
} SCliThrd;
|
||||
|
||||
typedef struct SCliObj {
|
||||
char label[TSDB_LABEL_LEN];
|
||||
int32_t index;
|
||||
int numOfThreads;
|
||||
SCliThrdObj** pThreadObj;
|
||||
SCliThrd** pThreadObj;
|
||||
} SCliObj;
|
||||
|
||||
typedef struct SConnList {
|
||||
|
@ -106,11 +106,18 @@ static void cliAsyncCb(uv_async_t* handle);
|
|||
|
||||
static int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg);
|
||||
|
||||
static SCliConn* cliCreateConn(SCliThrdObj* thrd);
|
||||
static SCliConn* cliCreateConn(SCliThrd* thrd);
|
||||
static void cliDestroyConn(SCliConn* pConn, bool clear /*clear tcp handle or not*/);
|
||||
static void cliDestroy(uv_handle_t* handle);
|
||||
static void cliSend(SCliConn* pConn);
|
||||
|
||||
static bool cliIsEpsetUpdated(int32_t code, STransConnCtx* pCtx) {
|
||||
if (code != 0) return false;
|
||||
if (pCtx->retryCnt == 0) return false;
|
||||
if (transEpSetIsEqual(&pCtx->epSet, &pCtx->origEpSet)) return false;
|
||||
return true;
|
||||
}
|
||||
|
||||
void cliMayCvtFqdnToIp(SEpSet* pEpSet, SCvtAddr* pCvtAddr);
|
||||
/*
|
||||
* set TCP connection timeout per-socket level
|
||||
|
@ -122,14 +129,14 @@ static void cliHandleResp(SCliConn* conn);
|
|||
static void cliHandleExcept(SCliConn* conn);
|
||||
|
||||
// handle req from app
|
||||
static void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd);
|
||||
static void cliHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd);
|
||||
static void cliHandleRelease(SCliMsg* pMsg, SCliThrdObj* pThrd);
|
||||
static void cliHandleUpdate(SCliMsg* pMsg, SCliThrdObj* pThrd);
|
||||
static void (*cliAsyncHandle[])(SCliMsg* pMsg, SCliThrdObj* pThrd) = {cliHandleReq, cliHandleQuit, cliHandleRelease,
|
||||
NULL, cliHandleUpdate};
|
||||
static void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd);
|
||||
static void cliHandleQuit(SCliMsg* pMsg, SCliThrd* pThrd);
|
||||
static void cliHandleRelease(SCliMsg* pMsg, SCliThrd* pThrd);
|
||||
static void cliHandleUpdate(SCliMsg* pMsg, SCliThrd* pThrd);
|
||||
static void (*cliAsyncHandle[])(SCliMsg* pMsg, SCliThrd* pThrd) = {cliHandleReq, cliHandleQuit, cliHandleRelease, NULL,
|
||||
cliHandleUpdate};
|
||||
|
||||
static void cliSendQuit(SCliThrdObj* thrd);
|
||||
static void cliSendQuit(SCliThrd* thrd);
|
||||
static void destroyUserdata(STransMsg* userdata);
|
||||
|
||||
static int cliRBChoseIdx(STrans* pTransInst);
|
||||
|
@ -137,8 +144,8 @@ static int cliRBChoseIdx(STrans* pTransInst);
|
|||
static void destroyCmsg(SCliMsg* cmsg);
|
||||
static void transDestroyConnCtx(STransConnCtx* ctx);
|
||||
// thread obj
|
||||
static SCliThrdObj* createThrdObj();
|
||||
static void destroyThrdObj(SCliThrdObj* pThrd);
|
||||
static SCliThrd* createThrdObj();
|
||||
static void destroyThrdObj(SCliThrd* pThrd);
|
||||
|
||||
static void cliWalkCb(uv_handle_t* handle, void* arg);
|
||||
|
||||
|
@ -154,7 +161,6 @@ static void cliReleaseUnfinishedMsg(SCliConn* conn) {
|
|||
destroyCmsg(pMsg);
|
||||
}
|
||||
}
|
||||
|
||||
#define CLI_RELEASE_UV(loop) \
|
||||
do { \
|
||||
uv_walk(loop, cliWalkCb, NULL); \
|
||||
|
@ -168,17 +174,23 @@ static void cliReleaseUnfinishedMsg(SCliConn* conn) {
|
|||
snprintf(key, sizeof(key), "%s:%d", ip, (int)port); \
|
||||
} while (0)
|
||||
|
||||
#define CONN_HOST_THREAD_IDX(conn) (conn ? ((SCliConn*)conn)->hThrdIdx : -1)
|
||||
#define CONN_HOST_THREAD_IDX1(idx, exh, refId, pThrd) \
|
||||
do { \
|
||||
if (exh == NULL) { \
|
||||
idx = -1; \
|
||||
} else { \
|
||||
ASYNC_CHECK_HANDLE((exh), refId); \
|
||||
pThrd = (SCliThrd*)(exh)->pThrd; \
|
||||
} \
|
||||
} while (0)
|
||||
#define CONN_PERSIST_TIME(para) (para * 1000 * 10)
|
||||
#define CONN_GET_HOST_THREAD(conn) (conn ? ((SCliConn*)conn)->hostThrd : NULL)
|
||||
#define CONN_GET_INST_LABEL(conn) (((STrans*)(((SCliThrdObj*)(conn)->hostThrd)->pTransInst))->label)
|
||||
#define CONN_GET_INST_LABEL(conn) (((STrans*)(((SCliThrd*)(conn)->hostThrd)->pTransInst))->label)
|
||||
#define CONN_SHOULD_RELEASE(conn, head) \
|
||||
do { \
|
||||
if ((head)->release == 1 && (head->msgLen) == sizeof(*head)) { \
|
||||
int connStatus = conn->status; \
|
||||
uint64_t ahandle = head->ahandle; \
|
||||
CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle); \
|
||||
conn->status = ConnRelease; \
|
||||
transClearBuffer(&conn->readBuf); \
|
||||
transFreeMsg(transContFromHead((char*)head)); \
|
||||
tDebug("%s conn %p receive release request, ref: %d", CONN_GET_INST_LABEL(conn), conn, T_REF_VAL_GET(conn)); \
|
||||
|
@ -187,9 +199,7 @@ static void cliReleaseUnfinishedMsg(SCliConn* conn) {
|
|||
} \
|
||||
destroyCmsg(pMsg); \
|
||||
cliReleaseUnfinishedMsg(conn); \
|
||||
if (connStatus != ConnInPool) { \
|
||||
addConnToPool(((SCliThrdObj*)conn->hostThrd)->pool, conn); \
|
||||
} \
|
||||
addConnToPool(((SCliThrd*)conn->hostThrd)->pool, conn); \
|
||||
return; \
|
||||
} \
|
||||
} while (0)
|
||||
|
@ -255,8 +265,25 @@ static void cliReleaseUnfinishedMsg(SCliConn* conn) {
|
|||
#define REQUEST_PERSIS_HANDLE(msg) ((msg)->info.persistHandle == 1)
|
||||
#define REQUEST_RELEASE_HANDLE(cmsg) ((cmsg)->type == Release)
|
||||
|
||||
#define EPSET_GET_SIZE(epSet) (epSet)->numOfEps
|
||||
#define EPSET_GET_INUSE_IP(epSet) ((epSet)->eps[(epSet)->inUse].fqdn)
|
||||
#define EPSET_GET_INUSE_PORT(epSet) ((epSet)->eps[(epSet)->inUse].port)
|
||||
#define EPSET_FORWARD_INUSE(epSet) \
|
||||
do { \
|
||||
(epSet)->inUse = (++((epSet)->inUse)) % ((epSet)->numOfEps); \
|
||||
} while (0)
|
||||
#define EPSET_DEBUG_STR(epSet, tbuf) \
|
||||
do { \
|
||||
int len = snprintf(tbuf, sizeof(tbuf), "epset:{"); \
|
||||
for (int i = 0; i < (epSet)->numOfEps; i++) { \
|
||||
if (i == (epSet)->numOfEps - 1) { \
|
||||
len += snprintf(tbuf + len, sizeof(tbuf) - len, "%d. %s:%d", i, (epSet)->eps[i].fqdn, (epSet)->eps[i].port); \
|
||||
} else { \
|
||||
len += snprintf(tbuf + len, sizeof(tbuf) - len, "%d. %s:%d, ", i, (epSet)->eps[i].fqdn, (epSet)->eps[i].port); \
|
||||
} \
|
||||
} \
|
||||
len += snprintf(tbuf + len, sizeof(tbuf) - len, "}, inUse:%d", (epSet)->inUse); \
|
||||
} while (0);
|
||||
|
||||
static void* cliWorkThread(void* arg);
|
||||
|
||||
|
@ -271,7 +298,7 @@ _RETURN:
|
|||
return false;
|
||||
}
|
||||
void cliHandleResp(SCliConn* conn) {
|
||||
SCliThrdObj* pThrd = conn->hostThrd;
|
||||
SCliThrd* pThrd = conn->hostThrd;
|
||||
STrans* pTransInst = pThrd->pTransInst;
|
||||
|
||||
STransMsgHead* pHead = (STransMsgHead*)(conn->readBuf.buf);
|
||||
|
@ -292,17 +319,9 @@ void cliHandleResp(SCliConn* conn) {
|
|||
|
||||
if (CONN_NO_PERSIST_BY_APP(conn)) {
|
||||
pMsg = transQueuePop(&conn->cliMsgs);
|
||||
pCtx = pMsg ? pMsg->ctx : NULL;
|
||||
if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(conn)) {
|
||||
transMsg.info.ahandle = transCtxDumpVal(&conn->ctx, transMsg.msgType);
|
||||
if (transMsg.info.ahandle == NULL) {
|
||||
transMsg.info.ahandle = transCtxDumpBrokenlinkVal(&conn->ctx, (int32_t*)&(transMsg.msgType));
|
||||
}
|
||||
tDebug("%s conn %p construct ahandle %p, persist: 0", CONN_GET_INST_LABEL(conn), conn, transMsg.info.ahandle);
|
||||
} else {
|
||||
transMsg.info.ahandle = pCtx ? pCtx->ahandle : NULL;
|
||||
pCtx = pMsg->ctx;
|
||||
transMsg.info.ahandle = pCtx->ahandle;
|
||||
tDebug("%s conn %p get ahandle %p, persist: 0", CONN_GET_INST_LABEL(conn), conn, transMsg.info.ahandle);
|
||||
}
|
||||
} else {
|
||||
uint64_t ahandle = (uint64_t)pHead->ahandle;
|
||||
CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle);
|
||||
|
@ -324,26 +343,22 @@ void cliHandleResp(SCliConn* conn) {
|
|||
}
|
||||
// buf's mem alread translated to transMsg.pCont
|
||||
transClearBuffer(&conn->readBuf);
|
||||
|
||||
if (!CONN_NO_PERSIST_BY_APP(conn)) {
|
||||
transMsg.info.handle = conn;
|
||||
transMsg.info.handle = (void*)conn->refId;
|
||||
tDebug("%s conn %p ref by app", CONN_GET_INST_LABEL(conn), conn);
|
||||
}
|
||||
// char buf[64] = {0};
|
||||
// TRACE_TO_STR(&transMsg.info.traceId, buf);
|
||||
|
||||
STraceId* trace = &transMsg.info.traceId;
|
||||
tGTrace("conn %p %s received from %s:%d, local info: %s:%d, msg size: %d, code: %d", conn, TMSG_INFO(pHead->msgType),
|
||||
taosInetNtoa(conn->addr.sin_addr), ntohs(conn->addr.sin_port), taosInetNtoa(conn->localAddr.sin_addr),
|
||||
ntohs(conn->localAddr.sin_port), transMsg.contLen, transMsg.code);
|
||||
tGTrace("%s conn %p %s received from %s:%d, local info: %s:%d, msg size: %d, code: %d", CONN_GET_INST_LABEL(conn),
|
||||
conn, TMSG_INFO(pHead->msgType), taosInetNtoa(conn->addr.sin_addr), ntohs(conn->addr.sin_port),
|
||||
taosInetNtoa(conn->localAddr.sin_addr), ntohs(conn->localAddr.sin_port), transMsg.contLen, transMsg.code);
|
||||
|
||||
if (pCtx == NULL && CONN_NO_PERSIST_BY_APP(conn)) {
|
||||
tDebug("%s except, server continue send while cli ignore it", CONN_GET_INST_LABEL(conn));
|
||||
// transUnrefCliHandle(conn);
|
||||
tDebug("%s except, conn %p read while cli ignore it", CONN_GET_INST_LABEL(conn), conn);
|
||||
return;
|
||||
}
|
||||
if (CONN_RELEASE_BY_SERVER(conn) && transMsg.info.ahandle == NULL) {
|
||||
tDebug("%s except, server continue send while cli ignore it", CONN_GET_INST_LABEL(conn));
|
||||
// transUnrefCliHandle(conn);
|
||||
tDebug("%s except, conn %p read while cli ignore it", CONN_GET_INST_LABEL(conn), conn);
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -375,7 +390,7 @@ void cliHandleExcept(SCliConn* pConn) {
|
|||
return;
|
||||
}
|
||||
}
|
||||
SCliThrdObj* pThrd = pConn->hostThrd;
|
||||
SCliThrd* pThrd = pConn->hostThrd;
|
||||
STrans* pTransInst = pThrd->pTransInst;
|
||||
bool once = false;
|
||||
do {
|
||||
|
@ -389,7 +404,6 @@ void cliHandleExcept(SCliConn* pConn) {
|
|||
transMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
|
||||
transMsg.msgType = pMsg ? pMsg->msg.msgType + 1 : 0;
|
||||
transMsg.info.ahandle = NULL;
|
||||
transMsg.info.handle = pConn;
|
||||
|
||||
if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(pConn)) {
|
||||
transMsg.info.ahandle = transCtxDumpVal(&pConn->ctx, transMsg.msgType);
|
||||
|
@ -414,13 +428,13 @@ void cliHandleExcept(SCliConn* pConn) {
|
|||
return;
|
||||
}
|
||||
destroyCmsg(pMsg);
|
||||
tTrace("%s conn %p start to destroy", CONN_GET_INST_LABEL(pConn), pConn);
|
||||
tTrace("%s conn %p start to destroy, ref:%d", CONN_GET_INST_LABEL(pConn), pConn, T_REF_VAL_GET(pConn));
|
||||
} while (!transQueueEmpty(&pConn->cliMsgs));
|
||||
transUnrefCliHandle(pConn);
|
||||
}
|
||||
|
||||
void cliTimeoutCb(uv_timer_t* handle) {
|
||||
SCliThrdObj* pThrd = handle->data;
|
||||
SCliThrd* pThrd = handle->data;
|
||||
STrans* pTransInst = pThrd->pTransInst;
|
||||
int64_t currentTime = pThrd->nextTimeout;
|
||||
tTrace("%s conn timeout, try to remove expire conn from conn pool", pTransInst->label);
|
||||
|
@ -487,10 +501,26 @@ static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) {
|
|||
assert(h == &conn->conn);
|
||||
return conn;
|
||||
}
|
||||
static void allocConnRef(SCliConn* conn, bool update) {
|
||||
if (update) {
|
||||
transRemoveExHandle(refMgt, conn->refId);
|
||||
}
|
||||
SExHandle* exh = taosMemoryCalloc(1, sizeof(SExHandle));
|
||||
exh->handle = conn;
|
||||
exh->pThrd = conn->hostThrd;
|
||||
exh->refId = transAddExHandle(refMgt, exh);
|
||||
conn->refId = exh->refId;
|
||||
}
|
||||
static void addConnToPool(void* pool, SCliConn* conn) {
|
||||
SCliThrdObj* thrd = conn->hostThrd;
|
||||
if (conn->status == ConnInPool) {
|
||||
// assert(0);
|
||||
return;
|
||||
}
|
||||
SCliThrd* thrd = conn->hostThrd;
|
||||
CONN_HANDLE_THREAD_QUIT(thrd);
|
||||
|
||||
allocConnRef(conn, true);
|
||||
|
||||
STrans* pTransInst = thrd->pTransInst;
|
||||
conn->expireTime = taosGetTimestampMs() + CONN_PERSIST_TIME(pTransInst->idleTime);
|
||||
transQueueClear(&conn->cliMsgs);
|
||||
|
@ -540,13 +570,14 @@ static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
|
|||
return;
|
||||
}
|
||||
if (nread < 0) {
|
||||
tError("%s conn %p read error: %s", CONN_GET_INST_LABEL(conn), conn, uv_err_name(nread));
|
||||
tError("%s conn %p read error: %s, ref: %d", CONN_GET_INST_LABEL(conn), conn, uv_err_name(nread),
|
||||
T_REF_VAL_GET(conn));
|
||||
conn->broken = true;
|
||||
cliHandleExcept(conn);
|
||||
}
|
||||
}
|
||||
|
||||
static SCliConn* cliCreateConn(SCliThrdObj* pThrd) {
|
||||
static SCliConn* cliCreateConn(SCliThrd* pThrd) {
|
||||
SCliConn* conn = taosMemoryCalloc(1, sizeof(SCliConn));
|
||||
// read/write stream handle
|
||||
conn->stream = (uv_stream_t*)taosMemoryMalloc(sizeof(uv_tcp_t));
|
||||
|
@ -562,11 +593,16 @@ static SCliConn* cliCreateConn(SCliThrdObj* pThrd) {
|
|||
conn->status = ConnNormal;
|
||||
conn->broken = 0;
|
||||
transRefCliHandle(conn);
|
||||
|
||||
allocConnRef(conn, false);
|
||||
|
||||
return conn;
|
||||
}
|
||||
static void cliDestroyConn(SCliConn* conn, bool clear) {
|
||||
tTrace("%s conn %p remove from conn pool", CONN_GET_INST_LABEL(conn), conn);
|
||||
QUEUE_REMOVE(&conn->conn);
|
||||
QUEUE_INIT(&conn->conn);
|
||||
transRemoveExHandle(refMgt, conn->refId);
|
||||
if (clear) {
|
||||
uv_close((uv_handle_t*)conn->stream, cliDestroy);
|
||||
}
|
||||
|
@ -593,7 +629,7 @@ static bool cliHandleNoResp(SCliConn* conn) {
|
|||
}
|
||||
if (res == true) {
|
||||
if (cliMaySendCachedMsg(conn) == false) {
|
||||
SCliThrdObj* thrd = conn->hostThrd;
|
||||
SCliThrd* thrd = conn->hostThrd;
|
||||
addConnToPool(thrd->pool, conn);
|
||||
}
|
||||
}
|
||||
|
@ -629,7 +665,7 @@ void cliSend(SCliConn* pConn) {
|
|||
|
||||
STransConnCtx* pCtx = pCliMsg->ctx;
|
||||
|
||||
SCliThrdObj* pThrd = pConn->hostThrd;
|
||||
SCliThrd* pThrd = pConn->hostThrd;
|
||||
STrans* pTransInst = pThrd->pTransInst;
|
||||
|
||||
STransMsg* pMsg = (STransMsg*)(&pCliMsg->msg);
|
||||
|
@ -651,12 +687,10 @@ void cliSend(SCliConn* pConn) {
|
|||
|
||||
uv_buf_t wb = uv_buf_init((char*)pHead, msgLen);
|
||||
|
||||
// char buf[64] = {0};
|
||||
// TRACE_TO_STR(&pMsg->info.traceId, buf);
|
||||
STraceId* trace = &pMsg->info.traceId;
|
||||
tGTrace("conn %p %s is sent to %s:%d, local info %s:%d", pConn, TMSG_INFO(pHead->msgType),
|
||||
taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->localAddr.sin_addr),
|
||||
ntohs(pConn->localAddr.sin_port));
|
||||
tGTrace("%s conn %p %s is sent to %s:%d, local info %s:%d", CONN_GET_INST_LABEL(pConn), pConn,
|
||||
TMSG_INFO(pHead->msgType), taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port),
|
||||
taosInetNtoa(pConn->localAddr.sin_addr), ntohs(pConn->localAddr.sin_port));
|
||||
|
||||
if (pHead->persist == 1) {
|
||||
CONN_SET_PERSIST_BY_APP(pConn);
|
||||
|
@ -664,7 +698,6 @@ void cliSend(SCliConn* pConn) {
|
|||
|
||||
pConn->writeReq.data = pConn;
|
||||
uv_write(&pConn->writeReq, (uv_stream_t*)pConn->stream, &wb, 1, cliSendCb);
|
||||
|
||||
return;
|
||||
_RETURN:
|
||||
return;
|
||||
|
@ -690,19 +723,24 @@ void cliConnCb(uv_connect_t* req, int status) {
|
|||
cliSend(pConn);
|
||||
}
|
||||
|
||||
static void cliHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd) {
|
||||
static void cliHandleQuit(SCliMsg* pMsg, SCliThrd* pThrd) {
|
||||
pThrd->quit = true;
|
||||
tDebug("cli work thread %p start to quit", pThrd);
|
||||
destroyCmsg(pMsg);
|
||||
destroyConnPool(pThrd->pool);
|
||||
uv_timer_stop(&pThrd->timer);
|
||||
uv_walk(pThrd->loop, cliWalkCb, NULL);
|
||||
|
||||
pThrd->quit = true;
|
||||
|
||||
// uv_stop(pThrd->loop);
|
||||
}
|
||||
static void cliHandleRelease(SCliMsg* pMsg, SCliThrdObj* pThrd) {
|
||||
SCliConn* conn = pMsg->msg.info.handle;
|
||||
static void cliHandleRelease(SCliMsg* pMsg, SCliThrd* pThrd) {
|
||||
int64_t refId = (int64_t)(pMsg->msg.info.handle);
|
||||
SExHandle* exh = transAcquireExHandle(refMgt, refId);
|
||||
if (exh == NULL) {
|
||||
tDebug("%" PRId64 " already release", refId);
|
||||
}
|
||||
|
||||
SCliConn* conn = exh->handle;
|
||||
tDebug("%s conn %p start to release to inst", CONN_GET_INST_LABEL(conn), conn);
|
||||
|
||||
if (T_REF_VAL_GET(conn) == 2) {
|
||||
|
@ -711,33 +749,37 @@ static void cliHandleRelease(SCliMsg* pMsg, SCliThrdObj* pThrd) {
|
|||
return;
|
||||
}
|
||||
cliSend(conn);
|
||||
} else {
|
||||
// conn already broken down
|
||||
transUnrefCliHandle(conn);
|
||||
}
|
||||
}
|
||||
static void cliHandleUpdate(SCliMsg* pMsg, SCliThrdObj* pThrd) {
|
||||
static void cliHandleUpdate(SCliMsg* pMsg, SCliThrd* pThrd) {
|
||||
STransConnCtx* pCtx = pMsg->ctx;
|
||||
|
||||
pThrd->cvtAddr = pCtx->cvtAddr;
|
||||
destroyCmsg(pMsg);
|
||||
}
|
||||
|
||||
SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrdObj* pThrd) {
|
||||
SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrd* pThrd, bool* ignore) {
|
||||
SCliConn* conn = NULL;
|
||||
if (pMsg->msg.info.handle != NULL) {
|
||||
conn = (SCliConn*)(pMsg->msg.info.handle);
|
||||
if (conn != NULL) {
|
||||
tTrace("%s conn %p reused", CONN_GET_INST_LABEL(conn), conn);
|
||||
}
|
||||
int64_t refId = (int64_t)(pMsg->msg.info.handle);
|
||||
if (refId != 0) {
|
||||
SExHandle* exh = transAcquireExHandle(refMgt, refId);
|
||||
if (exh == NULL) {
|
||||
*ignore = true;
|
||||
destroyCmsg(pMsg);
|
||||
return NULL;
|
||||
// assert(0);
|
||||
} else {
|
||||
conn = exh->handle;
|
||||
transReleaseExHandle(refMgt, refId);
|
||||
}
|
||||
return conn;
|
||||
};
|
||||
|
||||
STransConnCtx* pCtx = pMsg->ctx;
|
||||
conn = getConnFromPool(pThrd->pool, EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet));
|
||||
if (conn != NULL) {
|
||||
tTrace("%s conn %p get from conn pool", CONN_GET_INST_LABEL(conn), conn);
|
||||
tTrace("%s conn %p get from conn pool:%p", CONN_GET_INST_LABEL(conn), conn, pThrd->pool);
|
||||
} else {
|
||||
tTrace("%s not found conn in conn pool %p", ((STrans*)pThrd->pTransInst)->label, pThrd->pool);
|
||||
}
|
||||
tTrace("%s not found conn in conn pool:%p", ((STrans*)pThrd->pTransInst)->label, pThrd->pool);
|
||||
}
|
||||
return conn;
|
||||
}
|
||||
|
@ -752,22 +794,19 @@ void cliMayCvtFqdnToIp(SEpSet* pEpSet, SCvtAddr* pCvtAddr) {
|
|||
}
|
||||
}
|
||||
}
|
||||
void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
|
||||
uint64_t et = taosGetTimestampUs();
|
||||
uint64_t el = et - pMsg->st;
|
||||
// tTrace("%s cli msg tran time cost: %" PRIu64 "us", ((STrans*)pThrd->pTransInst)->label, el);
|
||||
|
||||
void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
|
||||
STransConnCtx* pCtx = pMsg->ctx;
|
||||
STrans* pTransInst = pThrd->pTransInst;
|
||||
|
||||
cliMayCvtFqdnToIp(&pCtx->epSet, &pThrd->cvtAddr);
|
||||
|
||||
transPrintEpSet(&pCtx->epSet);
|
||||
|
||||
SCliConn* conn = cliGetConn(pMsg, pThrd);
|
||||
// transPrintEpSet(&pCtx->epSet);
|
||||
bool ignore = false;
|
||||
SCliConn* conn = cliGetConn(pMsg, pThrd, &ignore);
|
||||
if (ignore == true) {
|
||||
return;
|
||||
}
|
||||
if (conn != NULL) {
|
||||
conn->hThrdIdx = pCtx->hThrdIdx;
|
||||
|
||||
transCtxMerge(&conn->ctx, &pCtx->appCtx);
|
||||
transQueuePush(&conn->cliMsgs, pMsg);
|
||||
cliSend(conn);
|
||||
|
@ -776,7 +815,6 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
|
|||
transCtxMerge(&conn->ctx, &pCtx->appCtx);
|
||||
transQueuePush(&conn->cliMsgs, pMsg);
|
||||
|
||||
conn->hThrdIdx = pCtx->hThrdIdx;
|
||||
conn->ip = strdup(EPSET_GET_INUSE_IP(&pCtx->epSet));
|
||||
conn->port = EPSET_GET_INUSE_PORT(&pCtx->epSet);
|
||||
|
||||
|
@ -784,7 +822,7 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
|
|||
if (ret) {
|
||||
tError("%s conn %p failed to set conn option, errmsg %s", transLabel(pTransInst), conn, uv_err_name(ret));
|
||||
}
|
||||
int fd = taosCreateSocketWithTimeOutOpt(TRANS_CONN_TIMEOUT);
|
||||
int32_t fd = taosCreateSocketWithTimeout(TRANS_CONN_TIMEOUT);
|
||||
if (fd == -1) {
|
||||
tTrace("%s conn %p failed to create socket", transLabel(pTransInst), conn);
|
||||
cliHandleExcept(conn);
|
||||
|
@ -808,7 +846,7 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
|
|||
}
|
||||
static void cliAsyncCb(uv_async_t* handle) {
|
||||
SAsyncItem* item = handle->data;
|
||||
SCliThrdObj* pThrd = item->pThrd;
|
||||
SCliThrd* pThrd = item->pThrd;
|
||||
SCliMsg* pMsg = NULL;
|
||||
|
||||
// batch process to avoid to lock/unlock frequently
|
||||
|
@ -835,7 +873,7 @@ static void cliAsyncCb(uv_async_t* handle) {
|
|||
}
|
||||
|
||||
static void* cliWorkThread(void* arg) {
|
||||
SCliThrdObj* pThrd = (SCliThrdObj*)arg;
|
||||
SCliThrd* pThrd = (SCliThrd*)arg;
|
||||
pThrd->pid = taosGetSelfPthreadId();
|
||||
setThreadName("trans-cli-work");
|
||||
uv_run(pThrd->loop, UV_RUN_DEFAULT);
|
||||
|
@ -848,10 +886,10 @@ void* transInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads,
|
|||
STrans* pTransInst = shandle;
|
||||
memcpy(cli->label, label, strlen(label));
|
||||
cli->numOfThreads = numOfThreads;
|
||||
cli->pThreadObj = (SCliThrdObj**)taosMemoryCalloc(cli->numOfThreads, sizeof(SCliThrdObj*));
|
||||
cli->pThreadObj = (SCliThrd**)taosMemoryCalloc(cli->numOfThreads, sizeof(SCliThrd*));
|
||||
|
||||
for (int i = 0; i < cli->numOfThreads; i++) {
|
||||
SCliThrdObj* pThrd = createThrdObj();
|
||||
SCliThrd* pThrd = createThrdObj();
|
||||
pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pTransInst->idleTime);
|
||||
pThrd->pTransInst = shandle;
|
||||
|
||||
|
@ -885,8 +923,8 @@ static void destroyCmsg(SCliMsg* pMsg) {
|
|||
taosMemoryFree(pMsg);
|
||||
}
|
||||
|
||||
static SCliThrdObj* createThrdObj() {
|
||||
SCliThrdObj* pThrd = (SCliThrdObj*)taosMemoryCalloc(1, sizeof(SCliThrdObj));
|
||||
static SCliThrd* createThrdObj() {
|
||||
SCliThrd* pThrd = (SCliThrd*)taosMemoryCalloc(1, sizeof(SCliThrd));
|
||||
|
||||
QUEUE_INIT(&pThrd->msg);
|
||||
taosThreadMutexInit(&pThrd->msgMtx, NULL);
|
||||
|
@ -904,7 +942,7 @@ static SCliThrdObj* createThrdObj() {
|
|||
pThrd->quit = false;
|
||||
return pThrd;
|
||||
}
|
||||
static void destroyThrdObj(SCliThrdObj* pThrd) {
|
||||
static void destroyThrdObj(SCliThrd* pThrd) {
|
||||
if (pThrd == NULL) {
|
||||
return;
|
||||
}
|
||||
|
@ -925,7 +963,7 @@ static void transDestroyConnCtx(STransConnCtx* ctx) {
|
|||
taosMemoryFree(ctx);
|
||||
}
|
||||
|
||||
void cliSendQuit(SCliThrdObj* thrd) {
|
||||
void cliSendQuit(SCliThrd* thrd) {
|
||||
// cli can stop gracefully
|
||||
SCliMsg* msg = taosMemoryCalloc(1, sizeof(SCliMsg));
|
||||
msg->type = Quit;
|
||||
|
@ -938,7 +976,10 @@ void cliWalkCb(uv_handle_t* handle, void* arg) {
|
|||
}
|
||||
|
||||
int cliRBChoseIdx(STrans* pTransInst) {
|
||||
int64_t index = pTransInst->index;
|
||||
int8_t index = pTransInst->index;
|
||||
if (pTransInst->numOfThreads == 0) {
|
||||
return -1;
|
||||
}
|
||||
if (pTransInst->index++ >= pTransInst->numOfThreads) {
|
||||
pTransInst->index = 0;
|
||||
}
|
||||
|
@ -946,15 +987,35 @@ int cliRBChoseIdx(STrans* pTransInst) {
|
|||
}
|
||||
static void doDelayTask(void* param) {
|
||||
STaskArg* arg = param;
|
||||
|
||||
SCliMsg* pMsg = arg->param1;
|
||||
SCliThrdObj* pThrd = arg->param2;
|
||||
cliHandleReq(pMsg, pThrd);
|
||||
|
||||
SCliThrd* pThrd = arg->param2;
|
||||
taosMemoryFree(arg);
|
||||
|
||||
cliHandleReq(pMsg, pThrd);
|
||||
}
|
||||
|
||||
static void cliSchedMsgToNextNode(SCliMsg* pMsg, SCliThrd* pThrd) {
|
||||
STransConnCtx* pCtx = pMsg->ctx;
|
||||
|
||||
STraceId* trace = &pMsg->msg.info.traceId;
|
||||
char tbuf[256] = {0};
|
||||
EPSET_DEBUG_STR(&pCtx->epSet, tbuf);
|
||||
tGTrace("%s retry on next node, use %s, retryCnt:%d, limit:%d", transLabel(pThrd->pTransInst), tbuf,
|
||||
pCtx->retryCnt + 1, pCtx->retryLimit);
|
||||
|
||||
STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg));
|
||||
arg->param1 = pMsg;
|
||||
arg->param2 = pThrd;
|
||||
transDQSched(pThrd->delayQueue, doDelayTask, arg, TRANS_RETRY_INTERVAL);
|
||||
}
|
||||
|
||||
void cliCompareAndSwap(int8_t* val, int8_t exp, int8_t newVal) {
|
||||
if (*val != exp) {
|
||||
*val = newVal;
|
||||
}
|
||||
}
|
||||
int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
|
||||
SCliThrdObj* pThrd = pConn->hostThrd;
|
||||
SCliThrd* pThrd = pConn->hostThrd;
|
||||
STrans* pTransInst = pThrd->pTransInst;
|
||||
|
||||
if (pMsg == NULL || pMsg->ctx == NULL) {
|
||||
|
@ -962,66 +1023,35 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
|
|||
pTransInst->cfp(pTransInst->parent, pResp, NULL);
|
||||
return 0;
|
||||
}
|
||||
|
||||
STransConnCtx* pCtx = pMsg->ctx;
|
||||
SEpSet* pEpSet = &pCtx->epSet;
|
||||
|
||||
if (pCtx->retryCount == 0) {
|
||||
pCtx->origEpSet = pCtx->epSet;
|
||||
}
|
||||
/*
|
||||
* upper layer handle retry if code equal TSDB_CODE_RPC_NETWORK_UNAVAIL
|
||||
* no retry
|
||||
* 1. query conn
|
||||
* 2. rpc thread already receive quit msg
|
||||
*/
|
||||
tmsg_t msgType = pCtx->msgType;
|
||||
if ((pTransInst->retry != NULL && pEpSet->numOfEps > 1 && (pTransInst->retry(pResp->code))) ||
|
||||
(pResp->code == TSDB_CODE_RPC_NETWORK_UNAVAIL || pResp->code == TSDB_CODE_APP_NOT_READY ||
|
||||
pResp->code == TSDB_CODE_NODE_NOT_DEPLOYED || pResp->code == TSDB_CODE_SYN_NOT_LEADER)) {
|
||||
STransConnCtx* pCtx = pMsg->ctx;
|
||||
int32_t code = pResp->code;
|
||||
if (pTransInst->retry != NULL && pTransInst->retry(code)) {
|
||||
pMsg->sent = 0;
|
||||
tTrace("try to send req to next node");
|
||||
pMsg->st = taosGetTimestampUs();
|
||||
pCtx->retryCount += 1;
|
||||
if (pResp->code == TSDB_CODE_RPC_NETWORK_UNAVAIL && pCtx->setMaxRetry == false) {
|
||||
if (pCtx->retryCount < pEpSet->numOfEps * 3) {
|
||||
pEpSet->inUse = (++pEpSet->inUse) % pEpSet->numOfEps;
|
||||
if (pThrd->quit == false) {
|
||||
STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg));
|
||||
arg->param1 = pMsg;
|
||||
arg->param2 = pThrd;
|
||||
transDQSched(pThrd->delayQueue, doDelayTask, arg, TRANS_RETRY_INTERVAL);
|
||||
transPrintEpSet(pEpSet);
|
||||
tTrace("%s use local epset, inUse: %d, retry count:%d, limit: %d", pTransInst->label, pEpSet->inUse,
|
||||
pCtx->retryCount + 1, pEpSet->numOfEps * 3);
|
||||
|
||||
pCtx->retryCnt += 1;
|
||||
if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
|
||||
cliCompareAndSwap(&pCtx->retryLimit, TRANS_RETRY_COUNT_LIMIT, EPSET_GET_SIZE(&pCtx->epSet) * 3);
|
||||
if (pCtx->retryCnt < pCtx->retryLimit) {
|
||||
transUnrefCliHandle(pConn);
|
||||
EPSET_FORWARD_INUSE(&pCtx->epSet);
|
||||
cliSchedMsgToNextNode(pMsg, pThrd);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
} else if (pCtx->retryCount < TRANS_RETRY_COUNT_LIMIT) {
|
||||
pCtx->setMaxRetry = true;
|
||||
} else {
|
||||
cliCompareAndSwap(&pCtx->retryLimit, TRANS_RETRY_COUNT_LIMIT, TRANS_RETRY_COUNT_LIMIT);
|
||||
if (pCtx->retryCnt < pCtx->retryLimit) {
|
||||
addConnToPool(pThrd->pool, pConn);
|
||||
if (pResp->contLen == 0) {
|
||||
pEpSet->inUse = (++pEpSet->inUse) % pEpSet->numOfEps;
|
||||
transPrintEpSet(&pCtx->epSet);
|
||||
tTrace("%s use local epset, inUse: %d, retry count:%d, limit: %d", pTransInst->label, pEpSet->inUse,
|
||||
pCtx->retryCount + 1, TRANS_RETRY_COUNT_LIMIT);
|
||||
EPSET_FORWARD_INUSE(&pCtx->epSet);
|
||||
} else {
|
||||
SEpSet epSet = {0};
|
||||
tDeserializeSEpSet(pResp->pCont, pResp->contLen, &epSet);
|
||||
pCtx->epSet = epSet;
|
||||
|
||||
transPrintEpSet(&pCtx->epSet);
|
||||
tTrace("%s use remote epset, inUse: %d, retry count:%d, limit: %d", pTransInst->label, pEpSet->inUse,
|
||||
pCtx->retryCount + 1, TRANS_RETRY_COUNT_LIMIT);
|
||||
tDeserializeSEpSet(pResp->pCont, pResp->contLen, &pCtx->epSet);
|
||||
}
|
||||
if (pThrd->quit == false) {
|
||||
if (pResp->code != TSDB_CODE_RPC_NETWORK_UNAVAIL) {
|
||||
if (pConn->status != ConnInPool) addConnToPool(pThrd->pool, pConn);
|
||||
} else {
|
||||
transUnrefCliHandle(pConn);
|
||||
}
|
||||
STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg));
|
||||
arg->param1 = pMsg;
|
||||
arg->param2 = pThrd;
|
||||
transDQSched(pThrd->delayQueue, doDelayTask, arg, TRANS_RETRY_INTERVAL);
|
||||
transFreeMsg(pResp->pCont);
|
||||
cliSchedMsgToNextNode(pMsg, pThrd);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
@ -1029,20 +1059,20 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
|
|||
|
||||
STraceId* trace = &pResp->info.traceId;
|
||||
if (pCtx->pSem != NULL) {
|
||||
tGTrace("conn %p(sync) handle resp", pConn);
|
||||
tGTrace("%s conn %p(sync) handle resp", CONN_GET_INST_LABEL(pConn), pConn);
|
||||
if (pCtx->pRsp == NULL) {
|
||||
tGTrace("conn %p(sync) failed to resp, ignore", pConn);
|
||||
tGTrace("%s conn %p(sync) failed to resp, ignore", CONN_GET_INST_LABEL(pConn), pConn);
|
||||
} else {
|
||||
memcpy((char*)pCtx->pRsp, (char*)pResp, sizeof(*pResp));
|
||||
}
|
||||
tsem_post(pCtx->pSem);
|
||||
pCtx->pRsp = NULL;
|
||||
} else {
|
||||
tGTrace("conn %p handle resp", pConn);
|
||||
if (pResp->code != 0 || pCtx->retryCount == 0 || transEpSetIsEqual(&pCtx->epSet, &pCtx->origEpSet)) {
|
||||
tGTrace("%s conn %p handle resp", CONN_GET_INST_LABEL(pConn), pConn);
|
||||
if (!cliIsEpsetUpdated(code, pCtx)) {
|
||||
pTransInst->cfp(pTransInst->parent, pResp, NULL);
|
||||
} else {
|
||||
pTransInst->cfp(pTransInst->parent, pResp, pEpSet);
|
||||
pTransInst->cfp(pTransInst->parent, pResp, &pCtx->epSet);
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
|
@ -1074,38 +1104,58 @@ void transUnrefCliHandle(void* handle) {
|
|||
return;
|
||||
}
|
||||
int ref = T_REF_DEC((SCliConn*)handle);
|
||||
tTrace("%s conn %p ref %d", CONN_GET_INST_LABEL((SCliConn*)handle), handle, ref);
|
||||
tTrace("%s conn %p ref:%d", CONN_GET_INST_LABEL((SCliConn*)handle), handle, ref);
|
||||
if (ref == 0) {
|
||||
cliDestroyConn((SCliConn*)handle, true);
|
||||
}
|
||||
}
|
||||
SCliThrd* transGetWorkThrdFromHandle(int64_t handle) {
|
||||
SCliThrd* pThrd = NULL;
|
||||
SExHandle* exh = transAcquireExHandle(refMgt, handle);
|
||||
if (exh == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
pThrd = exh->pThrd;
|
||||
transReleaseExHandle(refMgt, handle);
|
||||
return pThrd;
|
||||
}
|
||||
SCliThrd* transGetWorkThrd(STrans* trans, int64_t handle) {
|
||||
if (handle == 0) {
|
||||
int idx = cliRBChoseIdx(trans);
|
||||
if (idx < 0) return NULL;
|
||||
return ((SCliObj*)trans->tcphandle)->pThreadObj[idx];
|
||||
}
|
||||
return transGetWorkThrdFromHandle(handle);
|
||||
}
|
||||
void transReleaseCliHandle(void* handle) {
|
||||
SCliThrdObj* thrd = CONN_GET_HOST_THREAD(handle);
|
||||
if (thrd == NULL) {
|
||||
int idx = -1;
|
||||
SCliThrd* pThrd = transGetWorkThrdFromHandle((int64_t)handle);
|
||||
if (pThrd == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
STransMsg tmsg = {.info.handle = handle};
|
||||
SCliMsg* cmsg = taosMemoryCalloc(1, sizeof(SCliMsg));
|
||||
cmsg->msg = tmsg;
|
||||
cmsg->type = Release;
|
||||
|
||||
transSendAsync(thrd->asyncPool, &cmsg->q);
|
||||
transSendAsync(pThrd->asyncPool, &cmsg->q);
|
||||
return;
|
||||
}
|
||||
|
||||
void transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransCtx* ctx) {
|
||||
STrans* pTransInst = (STrans*)shandle;
|
||||
int idx = CONN_HOST_THREAD_IDX((SCliConn*)pReq->info.handle);
|
||||
if (idx == -1) {
|
||||
idx = cliRBChoseIdx(pTransInst);
|
||||
SCliThrd* pThrd = transGetWorkThrd(pTransInst, (int64_t)pReq->info.handle);
|
||||
if (pThrd == NULL) {
|
||||
transFreeMsg(pReq->pCont);
|
||||
return;
|
||||
}
|
||||
|
||||
TRACE_SET_MSGID(&pReq->info.traceId, tGenIdPI64());
|
||||
|
||||
STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx));
|
||||
pCtx->epSet = *pEpSet;
|
||||
pCtx->ahandle = pReq->info.ahandle;
|
||||
pCtx->msgType = pReq->msgType;
|
||||
pCtx->hThrdIdx = idx;
|
||||
|
||||
if (ctx != NULL) {
|
||||
pCtx->appCtx = *ctx;
|
||||
|
@ -1118,19 +1168,19 @@ void transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STra
|
|||
cliMsg->st = taosGetTimestampUs();
|
||||
cliMsg->type = Normal;
|
||||
|
||||
SCliThrdObj* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[idx];
|
||||
|
||||
STraceId* trace = &pReq->info.traceId;
|
||||
tGTrace("%s send request at thread:%08" PRId64 ", dst: %s:%d, app:%p", transLabel(pTransInst), thrd->pid,
|
||||
tGTrace("%s send request at thread:%08" PRId64 ", dst: %s:%d, app:%p", transLabel(pTransInst), pThrd->pid,
|
||||
EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle);
|
||||
ASSERT(transSendAsync(thrd->asyncPool, &(cliMsg->q)) == 0);
|
||||
ASSERT(transSendAsync(pThrd->asyncPool, &(cliMsg->q)) == 0);
|
||||
return;
|
||||
}
|
||||
|
||||
void transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMsg* pRsp) {
|
||||
STrans* pTransInst = (STrans*)shandle;
|
||||
int idx = CONN_HOST_THREAD_IDX(pReq->info.handle);
|
||||
if (idx == -1) {
|
||||
idx = cliRBChoseIdx(pTransInst);
|
||||
SCliThrd* pThrd = transGetWorkThrd(pTransInst, (int64_t)pReq->info.handle);
|
||||
if (pThrd == NULL) {
|
||||
transFreeMsg(pReq->pCont);
|
||||
return;
|
||||
}
|
||||
tsem_t* sem = taosMemoryCalloc(1, sizeof(tsem_t));
|
||||
tsem_init(sem, 0, 0);
|
||||
|
@ -1139,9 +1189,9 @@ void transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransM
|
|||
|
||||
STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx));
|
||||
pCtx->epSet = *pEpSet;
|
||||
pCtx->origEpSet = *pEpSet;
|
||||
pCtx->ahandle = pReq->info.ahandle;
|
||||
pCtx->msgType = pReq->msgType;
|
||||
pCtx->hThrdIdx = idx;
|
||||
pCtx->pSem = sem;
|
||||
pCtx->pRsp = pRsp;
|
||||
|
||||
|
@ -1151,22 +1201,21 @@ void transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransM
|
|||
cliMsg->st = taosGetTimestampUs();
|
||||
cliMsg->type = Normal;
|
||||
|
||||
SCliThrdObj* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[idx];
|
||||
|
||||
STraceId* trace = &pReq->info.traceId;
|
||||
tGTrace("%s send request at thread:%08" PRId64 ", dst: %s:%d, app:%p", transLabel(pTransInst), thrd->pid,
|
||||
tGTrace("%s send request at thread:%08" PRId64 ", dst: %s:%d, app:%p", transLabel(pTransInst), pThrd->pid,
|
||||
EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle);
|
||||
|
||||
transSendAsync(thrd->asyncPool, &(cliMsg->q));
|
||||
transSendAsync(pThrd->asyncPool, &(cliMsg->q));
|
||||
tsem_wait(sem);
|
||||
tsem_destroy(sem);
|
||||
taosMemoryFree(sem);
|
||||
return;
|
||||
}
|
||||
/*
|
||||
*
|
||||
**/
|
||||
void transSetDefaultAddr(void* ahandle, const char* ip, const char* fqdn) {
|
||||
STrans* pTransInst = ahandle;
|
||||
void transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn) {
|
||||
STrans* pTransInst = shandle;
|
||||
|
||||
SCvtAddr cvtAddr = {0};
|
||||
if (ip != NULL && fqdn != NULL) {
|
||||
|
@ -1176,14 +1225,13 @@ void transSetDefaultAddr(void* ahandle, const char* ip, const char* fqdn) {
|
|||
}
|
||||
for (int i = 0; i < pTransInst->numOfThreads; i++) {
|
||||
STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx));
|
||||
pCtx->hThrdIdx = i;
|
||||
pCtx->cvtAddr = cvtAddr;
|
||||
|
||||
SCliMsg* cliMsg = taosMemoryCalloc(1, sizeof(SCliMsg));
|
||||
cliMsg->ctx = pCtx;
|
||||
cliMsg->type = Update;
|
||||
|
||||
SCliThrdObj* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[i];
|
||||
SCliThrd* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[i];
|
||||
tDebug("%s update epset at thread:%08" PRId64 "", pTransInst->label, thrd->pid);
|
||||
|
||||
transSendAsync(thrd->asyncPool, &(cliMsg->q));
|
||||
|
|
|
@ -455,7 +455,7 @@ void transPrintEpSet(SEpSet* pEpSet) {
|
|||
return;
|
||||
}
|
||||
char buf[512] = {0};
|
||||
int len = snprintf(buf, sizeof(buf), "epset { ");
|
||||
int len = snprintf(buf, sizeof(buf), "epset:{");
|
||||
for (int i = 0; i < pEpSet->numOfEps; i++) {
|
||||
if (i == pEpSet->numOfEps - 1) {
|
||||
len += snprintf(buf + len, sizeof(buf) - len, "%d. %s:%d", i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port);
|
||||
|
|
|
@ -65,7 +65,7 @@ typedef struct SSvrMsg {
|
|||
STransMsgType type;
|
||||
} SSvrMsg;
|
||||
|
||||
typedef struct SWorkThrdObj {
|
||||
typedef struct SWorkThrd {
|
||||
TdThread thread;
|
||||
uv_connect_t connect_req;
|
||||
uv_pipe_t* pipe;
|
||||
|
@ -78,7 +78,7 @@ typedef struct SWorkThrdObj {
|
|||
queue conn;
|
||||
void* pTransInst;
|
||||
bool quit;
|
||||
} SWorkThrdObj;
|
||||
} SWorkThrd;
|
||||
|
||||
typedef struct SServerObj {
|
||||
TdThread thread;
|
||||
|
@ -89,7 +89,7 @@ typedef struct SServerObj {
|
|||
int workerIdx;
|
||||
int numOfThreads;
|
||||
int numOfWorkerReady;
|
||||
SWorkThrdObj** pThreadObj;
|
||||
SWorkThrd** pThreadObj;
|
||||
|
||||
uv_pipe_t pipeListen;
|
||||
uv_pipe_t** pipe;
|
||||
|
@ -133,13 +133,13 @@ static SSvrConn* createConn(void* hThrd);
|
|||
static void destroyConn(SSvrConn* conn, bool clear /*clear handle or not*/);
|
||||
static void destroyConnRegArg(SSvrConn* conn);
|
||||
|
||||
static int reallocConnRefHandle(SSvrConn* conn);
|
||||
static int reallocConnRef(SSvrConn* conn);
|
||||
|
||||
static void uvHandleQuit(SSvrMsg* msg, SWorkThrdObj* thrd);
|
||||
static void uvHandleRelease(SSvrMsg* msg, SWorkThrdObj* thrd);
|
||||
static void uvHandleResp(SSvrMsg* msg, SWorkThrdObj* thrd);
|
||||
static void uvHandleRegister(SSvrMsg* msg, SWorkThrdObj* thrd);
|
||||
static void (*transAsyncHandle[])(SSvrMsg* msg, SWorkThrdObj* thrd) = {uvHandleResp, uvHandleQuit, uvHandleRelease,
|
||||
static void uvHandleQuit(SSvrMsg* msg, SWorkThrd* thrd);
|
||||
static void uvHandleRelease(SSvrMsg* msg, SWorkThrd* thrd);
|
||||
static void uvHandleResp(SSvrMsg* msg, SWorkThrd* thrd);
|
||||
static void uvHandleRegister(SSvrMsg* msg, SWorkThrd* thrd);
|
||||
static void (*transAsyncHandle[])(SSvrMsg* msg, SWorkThrd* thrd) = {uvHandleResp, uvHandleQuit, uvHandleRelease,
|
||||
uvHandleRegister, NULL};
|
||||
|
||||
static int32_t exHandlesMgt;
|
||||
|
@ -160,7 +160,7 @@ static void* transWorkerThread(void* arg);
|
|||
static void* transAcceptThread(void* arg);
|
||||
|
||||
// add handle loop
|
||||
static bool addHandleToWorkloop(SWorkThrdObj* pThrd, char* pipeName);
|
||||
static bool addHandleToWorkloop(SWorkThrd* pThrd, char* pipeName);
|
||||
static bool addHandleToAcceptloop(void* arg);
|
||||
|
||||
#define CONN_SHOULD_RELEASE(conn, head) \
|
||||
|
@ -176,7 +176,7 @@ static bool addHandleToAcceptloop(void* arg);
|
|||
srvMsg->msg = tmsg; \
|
||||
srvMsg->type = Release; \
|
||||
srvMsg->pConn = conn; \
|
||||
reallocConnRefHandle(conn); \
|
||||
reallocConnRef(conn); \
|
||||
if (!transQueuePush(&conn->srvMsgs, srvMsg)) { \
|
||||
return; \
|
||||
} \
|
||||
|
@ -206,32 +206,6 @@ static bool addHandleToAcceptloop(void* arg);
|
|||
} \
|
||||
} while (0)
|
||||
|
||||
#define ASYNC_CHECK_HANDLE(exh1, refId) \
|
||||
do { \
|
||||
if (refId > 0) { \
|
||||
tTrace("handle step1"); \
|
||||
SExHandle* exh2 = transAcquireExHandle(refMgt, refId); \
|
||||
if (exh2 == NULL || refId != exh2->refId) { \
|
||||
tTrace("handle %p except, may already freed, ignore msg, ref1: %" PRIu64 ", ref2 : %" PRIu64 "", exh1, \
|
||||
exh2 ? exh2->refId : 0, refId); \
|
||||
goto _return1; \
|
||||
} \
|
||||
} else if (refId == 0) { \
|
||||
tTrace("handle step2"); \
|
||||
SExHandle* exh2 = transAcquireExHandle(refMgt, refId); \
|
||||
if (exh2 == NULL || refId != exh2->refId) { \
|
||||
tTrace("handle %p except, may already freed, ignore msg, ref1: %" PRIu64 ", ref2 : %" PRIu64 "", exh1, refId, \
|
||||
exh2 ? exh2->refId : 0); \
|
||||
goto _return1; \
|
||||
} else { \
|
||||
refId = exh1->refId; \
|
||||
} \
|
||||
} else if (refId < 0) { \
|
||||
tTrace("handle step3"); \
|
||||
goto _return2; \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
void uvAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
|
||||
SSvrConn* conn = handle->data;
|
||||
SConnBuffer* pBuf = &conn->readBuf;
|
||||
|
@ -259,7 +233,7 @@ static void uvHandleReq(SSvrConn* pConn) {
|
|||
// wreq->data = pConn;
|
||||
// uv_read_stop((uv_stream_t*)pConn->pTcp);
|
||||
// transRefSrvHandle(pConn);
|
||||
// uv_queue_work(((SWorkThrdObj*)pConn->hostThrd)->loop, wreq, uvWorkDoTask, uvWorkAfterTask);
|
||||
// uv_queue_work(((SWorkThrd*)pConn->hostThrd)->loop, wreq, uvWorkDoTask, uvWorkAfterTask);
|
||||
|
||||
CONN_SHOULD_RELEASE(pConn, pHead);
|
||||
|
||||
|
@ -379,7 +353,7 @@ void uvOnSendCb(uv_write_t* req, int status) {
|
|||
// if (msg->type == Release && conn->status != ConnNormal) {
|
||||
// conn->status = ConnNormal;
|
||||
// transUnrefSrvHandle(conn);
|
||||
// reallocConnRefHandle(conn);
|
||||
// reallocConnRef(conn);
|
||||
// destroySmsg(msg);
|
||||
// transQueueClear(&conn->srvMsgs);
|
||||
// return;
|
||||
|
@ -448,6 +422,8 @@ static void uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) {
|
|||
transUnrefSrvHandle(pConn);
|
||||
} else {
|
||||
pHead->msgType = pMsg->msgType;
|
||||
if (pHead->msgType == 0 && transMsgLenFromCont(pMsg->contLen) == sizeof(STransMsgHead))
|
||||
pHead->msgType = pConn->inType + 1;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -504,7 +480,7 @@ static void destroySmsg(SSvrMsg* smsg) {
|
|||
transFreeMsg(smsg->msg.pCont);
|
||||
taosMemoryFree(smsg);
|
||||
}
|
||||
static void destroyAllConn(SWorkThrdObj* pThrd) {
|
||||
static void destroyAllConn(SWorkThrd* pThrd) {
|
||||
tTrace("thread %p destroy all conn ", pThrd);
|
||||
while (!QUEUE_IS_EMPTY(&pThrd->conn)) {
|
||||
queue* h = QUEUE_HEAD(&pThrd->conn);
|
||||
|
@ -520,7 +496,7 @@ static void destroyAllConn(SWorkThrdObj* pThrd) {
|
|||
}
|
||||
void uvWorkerAsyncCb(uv_async_t* handle) {
|
||||
SAsyncItem* item = handle->data;
|
||||
SWorkThrdObj* pThrd = item->pThrd;
|
||||
SWorkThrd* pThrd = item->pThrd;
|
||||
SSvrConn* conn = NULL;
|
||||
queue wq;
|
||||
|
||||
|
@ -650,7 +626,7 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
|
|||
assert(buf->base[0] == notify[0]);
|
||||
taosMemoryFree(buf->base);
|
||||
|
||||
SWorkThrdObj* pThrd = q->data;
|
||||
SWorkThrd* pThrd = q->data;
|
||||
|
||||
uv_pipe_t* pipe = (uv_pipe_t*)q;
|
||||
if (!uv_pipe_pending_count(pipe)) {
|
||||
|
@ -718,10 +694,10 @@ void uvOnPipeConnectionCb(uv_connect_t* connect, int status) {
|
|||
if (status != 0) {
|
||||
return;
|
||||
}
|
||||
SWorkThrdObj* pThrd = container_of(connect, SWorkThrdObj, connect_req);
|
||||
SWorkThrd* pThrd = container_of(connect, SWorkThrd, connect_req);
|
||||
uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb);
|
||||
}
|
||||
static bool addHandleToWorkloop(SWorkThrdObj* pThrd, char* pipeName) {
|
||||
static bool addHandleToWorkloop(SWorkThrd* pThrd, char* pipeName) {
|
||||
pThrd->loop = (uv_loop_t*)taosMemoryMalloc(sizeof(uv_loop_t));
|
||||
if (0 != uv_loop_init(pThrd->loop)) {
|
||||
return false;
|
||||
|
@ -774,14 +750,14 @@ static bool addHandleToAcceptloop(void* arg) {
|
|||
}
|
||||
void* transWorkerThread(void* arg) {
|
||||
setThreadName("trans-worker");
|
||||
SWorkThrdObj* pThrd = (SWorkThrdObj*)arg;
|
||||
SWorkThrd* pThrd = (SWorkThrd*)arg;
|
||||
uv_run(pThrd->loop, UV_RUN_DEFAULT);
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static SSvrConn* createConn(void* hThrd) {
|
||||
SWorkThrdObj* pThrd = hThrd;
|
||||
SWorkThrd* pThrd = hThrd;
|
||||
|
||||
SSvrConn* pConn = (SSvrConn*)taosMemoryCalloc(1, sizeof(SSvrConn));
|
||||
QUEUE_INIT(&pConn->queue);
|
||||
|
@ -826,7 +802,7 @@ static void destroyConnRegArg(SSvrConn* conn) {
|
|||
conn->regArg.init = 0;
|
||||
}
|
||||
}
|
||||
static int reallocConnRefHandle(SSvrConn* conn) {
|
||||
static int reallocConnRef(SSvrConn* conn) {
|
||||
transReleaseExHandle(refMgt, conn->refId);
|
||||
transRemoveExHandle(refMgt, conn->refId);
|
||||
// avoid app continue to send msg on invalid handle
|
||||
|
@ -844,7 +820,7 @@ static void uvDestroyConn(uv_handle_t* handle) {
|
|||
if (conn == NULL) {
|
||||
return;
|
||||
}
|
||||
SWorkThrdObj* thrd = conn->hostThrd;
|
||||
SWorkThrd* thrd = conn->hostThrd;
|
||||
|
||||
transReleaseExHandle(refMgt, conn->refId);
|
||||
transRemoveExHandle(refMgt, conn->refId);
|
||||
|
@ -889,7 +865,7 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
|
|||
srv->numOfThreads = numOfThreads;
|
||||
srv->workerIdx = 0;
|
||||
srv->numOfWorkerReady = 0;
|
||||
srv->pThreadObj = (SWorkThrdObj**)taosMemoryCalloc(srv->numOfThreads, sizeof(SWorkThrdObj*));
|
||||
srv->pThreadObj = (SWorkThrd**)taosMemoryCalloc(srv->numOfThreads, sizeof(SWorkThrd*));
|
||||
srv->pipe = (uv_pipe_t**)taosMemoryCalloc(srv->numOfThreads, sizeof(uv_pipe_t*));
|
||||
srv->ip = ip;
|
||||
srv->port = port;
|
||||
|
@ -914,7 +890,7 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
|
|||
assert(0 == uv_listen((uv_stream_t*)&srv->pipeListen, SOMAXCONN, uvPipeListenCb));
|
||||
|
||||
for (int i = 0; i < srv->numOfThreads; i++) {
|
||||
SWorkThrdObj* thrd = (SWorkThrdObj*)taosMemoryCalloc(1, sizeof(SWorkThrdObj));
|
||||
SWorkThrd* thrd = (SWorkThrd*)taosMemoryCalloc(1, sizeof(SWorkThrd));
|
||||
thrd->pTransInst = shandle;
|
||||
thrd->quit = false;
|
||||
srv->pThreadObj[i] = thrd;
|
||||
|
@ -959,7 +935,7 @@ End:
|
|||
return NULL;
|
||||
}
|
||||
|
||||
void uvHandleQuit(SSvrMsg* msg, SWorkThrdObj* thrd) {
|
||||
void uvHandleQuit(SSvrMsg* msg, SWorkThrd* thrd) {
|
||||
thrd->quit = true;
|
||||
if (QUEUE_IS_EMPTY(&thrd->conn)) {
|
||||
uv_walk(thrd->loop, uvWalkCb, NULL);
|
||||
|
@ -968,10 +944,10 @@ void uvHandleQuit(SSvrMsg* msg, SWorkThrdObj* thrd) {
|
|||
}
|
||||
taosMemoryFree(msg);
|
||||
}
|
||||
void uvHandleRelease(SSvrMsg* msg, SWorkThrdObj* thrd) {
|
||||
void uvHandleRelease(SSvrMsg* msg, SWorkThrd* thrd) {
|
||||
SSvrConn* conn = msg->pConn;
|
||||
if (conn->status == ConnAcquire) {
|
||||
reallocConnRefHandle(conn);
|
||||
reallocConnRef(conn);
|
||||
if (!transQueuePush(&conn->srvMsgs, msg)) {
|
||||
return;
|
||||
}
|
||||
|
@ -982,12 +958,12 @@ void uvHandleRelease(SSvrMsg* msg, SWorkThrdObj* thrd) {
|
|||
}
|
||||
destroySmsg(msg);
|
||||
}
|
||||
void uvHandleResp(SSvrMsg* msg, SWorkThrdObj* thrd) {
|
||||
void uvHandleResp(SSvrMsg* msg, SWorkThrd* thrd) {
|
||||
// send msg to client
|
||||
tDebug("%s conn %p start to send resp (2/2)", transLabel(thrd->pTransInst), msg->pConn);
|
||||
uvStartSendResp(msg);
|
||||
}
|
||||
void uvHandleRegister(SSvrMsg* msg, SWorkThrdObj* thrd) {
|
||||
void uvHandleRegister(SSvrMsg* msg, SWorkThrd* thrd) {
|
||||
SSvrConn* conn = msg->pConn;
|
||||
tDebug("%s conn %p register brokenlink callback", transLabel(thrd->pTransInst), conn);
|
||||
if (conn->status == ConnAcquire) {
|
||||
|
@ -1008,7 +984,7 @@ void uvHandleRegister(SSvrMsg* msg, SWorkThrdObj* thrd) {
|
|||
taosMemoryFree(msg);
|
||||
}
|
||||
}
|
||||
void destroyWorkThrd(SWorkThrdObj* pThrd) {
|
||||
void destroyWorkThrd(SWorkThrd* pThrd) {
|
||||
if (pThrd == NULL) {
|
||||
return;
|
||||
}
|
||||
|
@ -1019,7 +995,7 @@ void destroyWorkThrd(SWorkThrdObj* pThrd) {
|
|||
taosMemoryFree(pThrd->loop);
|
||||
taosMemoryFree(pThrd);
|
||||
}
|
||||
void sendQuitToWorkThrd(SWorkThrdObj* pThrd) {
|
||||
void sendQuitToWorkThrd(SWorkThrd* pThrd) {
|
||||
SSvrMsg* msg = taosMemoryCalloc(1, sizeof(SSvrMsg));
|
||||
msg->type = Quit;
|
||||
tDebug("server send quit msg to work thread");
|
||||
|
@ -1055,8 +1031,6 @@ void transCloseServer(void* arg) {
|
|||
|
||||
int ref = atomic_sub_fetch_32(&tranSSvrInst, 1);
|
||||
if (ref == 0) {
|
||||
// TdThreadOnce tmpInit = PTHREAD_ONCE_INIT;
|
||||
// memcpy(&transModuleInit, &tmpInit, sizeof(TdThreadOnce));
|
||||
transCloseExHandleMgt(refMgt);
|
||||
}
|
||||
}
|
||||
|
@ -1086,7 +1060,7 @@ void transReleaseSrvHandle(void* handle) {
|
|||
|
||||
ASYNC_CHECK_HANDLE(exh, refId);
|
||||
|
||||
SWorkThrdObj* pThrd = exh->pThrd;
|
||||
SWorkThrd* pThrd = exh->pThrd;
|
||||
ASYNC_ERR_JRET(pThrd);
|
||||
|
||||
STransMsg tmsg = {.code = 0, .info.handle = exh, .info.ahandle = NULL, .info.refId = refId};
|
||||
|
@ -1116,7 +1090,7 @@ void transSendResponse(const STransMsg* msg) {
|
|||
STransMsg tmsg = *msg;
|
||||
tmsg.info.refId = refId;
|
||||
|
||||
SWorkThrdObj* pThrd = exh->pThrd;
|
||||
SWorkThrd* pThrd = exh->pThrd;
|
||||
ASYNC_ERR_JRET(pThrd);
|
||||
|
||||
SSvrMsg* m = taosMemoryCalloc(1, sizeof(SSvrMsg));
|
||||
|
@ -1146,7 +1120,7 @@ void transRegisterMsg(const STransMsg* msg) {
|
|||
STransMsg tmsg = *msg;
|
||||
tmsg.info.refId = refId;
|
||||
|
||||
SWorkThrdObj* pThrd = exh->pThrd;
|
||||
SWorkThrd* pThrd = exh->pThrd;
|
||||
ASYNC_ERR_JRET(pThrd);
|
||||
|
||||
SSvrMsg* m = taosMemoryCalloc(1, sizeof(SSvrMsg));
|
||||
|
|
|
@ -1073,7 +1073,7 @@ int32_t taosCloseEpoll(TdEpollPtr *ppEpoll) {
|
|||
* Set TCP connection timeout per-socket level.
|
||||
* ref [https://github.com/libuv/help/issues/54]
|
||||
*/
|
||||
int taosCreateSocketWithTimeOutOpt(uint32_t conn_timeout_sec) {
|
||||
int32_t taosCreateSocketWithTimeout(uint32_t timeout) {
|
||||
#if defined(WINDOWS)
|
||||
SOCKET fd;
|
||||
#else
|
||||
|
@ -1083,11 +1083,11 @@ int taosCreateSocketWithTimeOutOpt(uint32_t conn_timeout_sec) {
|
|||
return -1;
|
||||
}
|
||||
#if defined(WINDOWS)
|
||||
if (0 != setsockopt(fd, IPPROTO_TCP, TCP_MAXRT, (char *)&conn_timeout_sec, sizeof(conn_timeout_sec))) {
|
||||
if (0 != setsockopt(fd, IPPROTO_TCP, TCP_MAXRT, (char *)&timeout, sizeof(timeout))) {
|
||||
return -1;
|
||||
}
|
||||
#else // Linux like systems
|
||||
uint32_t conn_timeout_ms = conn_timeout_sec * 1000;
|
||||
uint32_t conn_timeout_ms = timeout * 1000;
|
||||
if (0 != setsockopt(fd, IPPROTO_TCP, TCP_USER_TIMEOUT, (char *)&conn_timeout_ms, sizeof(conn_timeout_ms))) {
|
||||
return -1;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue