handle except
This commit is contained in:
parent
3cc9979a99
commit
3b147dd5bc
|
@ -70,12 +70,19 @@ typedef struct SRpcInit {
|
||||||
// call back to retrieve the client auth info, for server app only
|
// call back to retrieve the client auth info, for server app only
|
||||||
int (*afp)(void *parent, char *tableId, char *spi, char *encrypt, char *secret, char *ckey);
|
int (*afp)(void *parent, char *tableId, char *spi, char *encrypt, char *secret, char *ckey);
|
||||||
|
|
||||||
// to support Send messages multiple times on a link
|
|
||||||
void *(*mfp)(void *parent, tmsg_t msgType);
|
|
||||||
|
|
||||||
void *parent;
|
void *parent;
|
||||||
} SRpcInit;
|
} SRpcInit;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
void * val;
|
||||||
|
int32_t len;
|
||||||
|
void (*free)(void *arg);
|
||||||
|
} SRpcCtxVal;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
SHashObj *args;
|
||||||
|
} SRpcCtx;
|
||||||
|
|
||||||
int32_t rpcInit();
|
int32_t rpcInit();
|
||||||
void rpcCleanup();
|
void rpcCleanup();
|
||||||
void * rpcOpen(const SRpcInit *pRpc);
|
void * rpcOpen(const SRpcInit *pRpc);
|
||||||
|
@ -84,16 +91,17 @@ void * rpcMallocCont(int contLen);
|
||||||
void rpcFreeCont(void *pCont);
|
void rpcFreeCont(void *pCont);
|
||||||
void * rpcReallocCont(void *ptr, int contLen);
|
void * rpcReallocCont(void *ptr, int contLen);
|
||||||
void rpcSendRequest(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid);
|
void rpcSendRequest(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid);
|
||||||
void rpcSendResponse(const SRpcMsg *pMsg);
|
void rpcSendRequestWithCtx(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid, SRpcCtx *ctx);
|
||||||
void rpcSendRedirectRsp(void *pConn, const SEpSet *pEpSet);
|
|
||||||
int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo);
|
|
||||||
void rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp);
|
|
||||||
int rpcReportProgress(void *pConn, char *pCont, int contLen);
|
|
||||||
void rpcCancelRequest(int64_t rid);
|
|
||||||
|
|
||||||
|
void rpcSendResponse(const SRpcMsg *pMsg);
|
||||||
|
void rpcSendRedirectRsp(void *pConn, const SEpSet *pEpSet);
|
||||||
|
int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo);
|
||||||
|
void rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp);
|
||||||
|
int rpcReportProgress(void *pConn, char *pCont, int contLen);
|
||||||
|
void rpcCancelRequest(int64_t rid);
|
||||||
|
void rpcRegisterBrokenLinkArg(SRpcMsg *msg);
|
||||||
// just release client conn to rpc instance, no close sock
|
// just release client conn to rpc instance, no close sock
|
||||||
void rpcReleaseHandle(void *handle, int8_t type);
|
void rpcReleaseHandle(void *handle, int8_t type); //
|
||||||
|
|
||||||
void rpcRefHandle(void *handle, int8_t type);
|
void rpcRefHandle(void *handle, int8_t type);
|
||||||
void rpcUnrefHandle(void *handle, int8_t type);
|
void rpcUnrefHandle(void *handle, int8_t type);
|
||||||
|
|
||||||
|
|
|
@ -14,6 +14,10 @@
|
||||||
*/
|
*/
|
||||||
#ifdef USE_UV
|
#ifdef USE_UV
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
extern "C" {
|
||||||
|
#endif
|
||||||
|
|
||||||
#include <uv.h>
|
#include <uv.h>
|
||||||
#include "lz4.h"
|
#include "lz4.h"
|
||||||
#include "os.h"
|
#include "os.h"
|
||||||
|
@ -121,24 +125,21 @@ typedef struct {
|
||||||
} SRpcReqContext;
|
} SRpcReqContext;
|
||||||
|
|
||||||
typedef SRpcMsg STransMsg;
|
typedef SRpcMsg STransMsg;
|
||||||
|
typedef SRpcCtx STransCtx;
|
||||||
|
typedef SRpcCtxVal STransCtxVal;
|
||||||
typedef SRpcInfo STrans;
|
typedef SRpcInfo STrans;
|
||||||
typedef SRpcConnInfo STransHandleInfo;
|
typedef SRpcConnInfo STransHandleInfo;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
SEpSet epSet; // ip list provided by app
|
SEpSet epSet; // ip list provided by app
|
||||||
void* ahandle; // handle provided by app
|
void* ahandle; // handle provided by app
|
||||||
tmsg_t msgType; // message type
|
tmsg_t msgType; // message type
|
||||||
uint8_t* pCont; // content provided by app
|
|
||||||
int32_t contLen; // content length
|
|
||||||
// int32_t code; // error code
|
|
||||||
// int16_t numOfTry; // number of try for different servers
|
|
||||||
// int8_t oldInUse; // server EP inUse passed by app
|
|
||||||
// int8_t redirect; // flag to indicate redirect
|
|
||||||
int8_t connType; // connection type cli/srv
|
int8_t connType; // connection type cli/srv
|
||||||
int64_t rid; // refId returned by taosAddRef
|
int64_t rid; // refId returned by taosAddRef
|
||||||
|
|
||||||
STransMsg* pRsp; // for synchronous API
|
STransCtx appCtx; //
|
||||||
tsem_t* pSem; // for synchronous API
|
STransMsg* pRsp; // for synchronous API
|
||||||
|
tsem_t* pSem; // for synchronous API
|
||||||
|
|
||||||
int hThrdIdx;
|
int hThrdIdx;
|
||||||
char* ip;
|
char* ip;
|
||||||
|
@ -181,7 +182,7 @@ typedef struct {
|
||||||
#pragma pack(pop)
|
#pragma pack(pop)
|
||||||
|
|
||||||
typedef enum { Normal, Quit, Release } STransMsgType;
|
typedef enum { Normal, Quit, Release } STransMsgType;
|
||||||
typedef enum { ConnNormal, ConnAcquire, ConnRelease } ConnStatus;
|
typedef enum { ConnNormal, ConnAcquire, ConnRelease, ConnBroken } ConnStatus;
|
||||||
|
|
||||||
#define container_of(ptr, type, member) ((type*)((char*)(ptr)-offsetof(type, member)))
|
#define container_of(ptr, type, member) ((type*)((char*)(ptr)-offsetof(type, member)))
|
||||||
#define RPC_RESERVE_SIZE (sizeof(STranConnCtx))
|
#define RPC_RESERVE_SIZE (sizeof(STranConnCtx))
|
||||||
|
@ -259,7 +260,7 @@ void transUnrefCliHandle(void* handle);
|
||||||
void transReleaseCliHandle(void* handle);
|
void transReleaseCliHandle(void* handle);
|
||||||
void transReleaseSrvHandle(void* handle);
|
void transReleaseSrvHandle(void* handle);
|
||||||
|
|
||||||
void transSendRequest(void* shandle, const char* ip, uint32_t port, STransMsg* pMsg);
|
void transSendRequest(void* shandle, const char* ip, uint32_t port, STransMsg* pMsg, STransCtx* pCtx);
|
||||||
void transSendRecv(void* shandle, const char* ip, uint32_t port, STransMsg* pMsg, STransMsg* pRsp);
|
void transSendRecv(void* shandle, const char* ip, uint32_t port, STransMsg* pMsg, STransMsg* pRsp);
|
||||||
void transSendResponse(const STransMsg* pMsg);
|
void transSendResponse(const STransMsg* pMsg);
|
||||||
int transGetConnInfo(void* thandle, STransHandleInfo* pInfo);
|
int transGetConnInfo(void* thandle, STransHandleInfo* pInfo);
|
||||||
|
@ -270,4 +271,14 @@ void* transInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads,
|
||||||
void transCloseClient(void* arg);
|
void transCloseClient(void* arg);
|
||||||
void transCloseServer(void* arg);
|
void transCloseServer(void* arg);
|
||||||
|
|
||||||
|
void transCtxInit(STransCtx* ctx);
|
||||||
|
void transCtxDestroy(STransCtx* ctx);
|
||||||
|
void transCtxClear(STransCtx* ctx);
|
||||||
|
void transCtxMerge(STransCtx* dst, STransCtx* src);
|
||||||
|
void* transCtxDumpVal(STransCtx* ctx, int32_t key);
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -63,9 +63,6 @@ typedef struct {
|
||||||
|
|
||||||
void (*cfp)(void* parent, SRpcMsg*, SEpSet*);
|
void (*cfp)(void* parent, SRpcMsg*, SEpSet*);
|
||||||
int (*afp)(void* parent, char* user, char* spi, char* encrypt, char* secret, char* ckey);
|
int (*afp)(void* parent, char* user, char* spi, char* encrypt, char* secret, char* ckey);
|
||||||
bool (*pfp)(void* parent, tmsg_t msgType);
|
|
||||||
void* (*mfp)(void* parent, tmsg_t msgType);
|
|
||||||
bool (*efp)(void* parent, tmsg_t msgType);
|
|
||||||
|
|
||||||
int32_t refCount;
|
int32_t refCount;
|
||||||
void* parent;
|
void* parent;
|
||||||
|
|
Binary file not shown.
|
@ -39,7 +39,6 @@ void* rpcOpen(const SRpcInit* pInit) {
|
||||||
// register callback handle
|
// register callback handle
|
||||||
pRpc->cfp = pInit->cfp;
|
pRpc->cfp = pInit->cfp;
|
||||||
pRpc->afp = pInit->afp;
|
pRpc->afp = pInit->afp;
|
||||||
pRpc->mfp = pInit->mfp;
|
|
||||||
|
|
||||||
if (pInit->connType == TAOS_CONN_SERVER) {
|
if (pInit->connType == TAOS_CONN_SERVER) {
|
||||||
pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads;
|
pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads;
|
||||||
|
@ -119,7 +118,12 @@ void rpcCancelRequest(int64_t rid) { return; }
|
||||||
void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid) {
|
void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid) {
|
||||||
char* ip = (char*)(pEpSet->eps[pEpSet->inUse].fqdn);
|
char* ip = (char*)(pEpSet->eps[pEpSet->inUse].fqdn);
|
||||||
uint32_t port = pEpSet->eps[pEpSet->inUse].port;
|
uint32_t port = pEpSet->eps[pEpSet->inUse].port;
|
||||||
transSendRequest(shandle, ip, port, pMsg);
|
transSendRequest(shandle, ip, port, pMsg, NULL);
|
||||||
|
}
|
||||||
|
void rpcSendRequestWithCtx(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid, SRpcCtx* pCtx) {
|
||||||
|
char* ip = (char*)(pEpSet->eps[pEpSet->inUse].fqdn);
|
||||||
|
uint32_t port = pEpSet->eps[pEpSet->inUse].port;
|
||||||
|
transSendRequest(shandle, ip, port, pMsg, pCtx);
|
||||||
}
|
}
|
||||||
void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pMsg, SRpcMsg* pRsp) {
|
void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pMsg, SRpcMsg* pRsp) {
|
||||||
char* ip = (char*)(pEpSet->eps[pEpSet->inUse].fqdn);
|
char* ip = (char*)(pEpSet->eps[pEpSet->inUse].fqdn);
|
||||||
|
@ -140,6 +144,10 @@ void rpcUnrefHandle(void* handle, int8_t type) {
|
||||||
(*taosUnRefHandle[type])(handle);
|
(*taosUnRefHandle[type])(handle);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void rpcRegisterBrokenLinkArg(SRpcMsg* msg) {
|
||||||
|
//
|
||||||
|
rpcSendResponse(msg);
|
||||||
|
}
|
||||||
void rpcReleaseHandle(void* handle, int8_t type) {
|
void rpcReleaseHandle(void* handle, int8_t type) {
|
||||||
assert(type == TAOS_CONN_SERVER || type == TAOS_CONN_CLIENT);
|
assert(type == TAOS_CONN_SERVER || type == TAOS_CONN_CLIENT);
|
||||||
(*transReleaseHandle[type])(handle);
|
(*transReleaseHandle[type])(handle);
|
||||||
|
|
|
@ -30,6 +30,7 @@ typedef struct SCliConn {
|
||||||
uint64_t expireTime;
|
uint64_t expireTime;
|
||||||
int hThrdIdx;
|
int hThrdIdx;
|
||||||
bool broken; // link broken or not
|
bool broken; // link broken or not
|
||||||
|
STransCtx ctx;
|
||||||
|
|
||||||
ConnStatus status; //
|
ConnStatus status; //
|
||||||
int release; // 1: release
|
int release; // 1: release
|
||||||
|
@ -207,7 +208,7 @@ void cliHandleResp(SCliConn* conn) {
|
||||||
|
|
||||||
STransConnCtx* pCtx = pMsg ? pMsg->ctx : NULL;
|
STransConnCtx* pCtx = pMsg ? pMsg->ctx : NULL;
|
||||||
if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(conn)) {
|
if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(conn)) {
|
||||||
transMsg.ahandle = pTransInst->mfp ? (*pTransInst->mfp)(pTransInst->parent, transMsg.msgType) : NULL;
|
transMsg.ahandle = transCtxDumpVal(&conn->ctx, transMsg.msgType);
|
||||||
} else {
|
} else {
|
||||||
transMsg.ahandle = pCtx ? pCtx->ahandle : NULL;
|
transMsg.ahandle = pCtx ? pCtx->ahandle : NULL;
|
||||||
}
|
}
|
||||||
|
@ -283,7 +284,7 @@ void cliHandleExcept(SCliConn* pConn) {
|
||||||
transMsg.ahandle = NULL;
|
transMsg.ahandle = NULL;
|
||||||
|
|
||||||
if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(pConn)) {
|
if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(pConn)) {
|
||||||
transMsg.ahandle = pTransInst->mfp ? (*pTransInst->mfp)(pTransInst->parent, transMsg.msgType) : NULL;
|
transMsg.ahandle = transCtxDumpVal(&pConn->ctx, transMsg.msgType);
|
||||||
} else {
|
} else {
|
||||||
transMsg.ahandle = pCtx ? pCtx->ahandle : NULL;
|
transMsg.ahandle = pCtx ? pCtx->ahandle : NULL;
|
||||||
}
|
}
|
||||||
|
@ -374,6 +375,7 @@ static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) {
|
||||||
static void addConnToPool(void* pool, SCliConn* conn) {
|
static void addConnToPool(void* pool, SCliConn* conn) {
|
||||||
char key[128] = {0};
|
char key[128] = {0};
|
||||||
|
|
||||||
|
transCtxDestroy(&conn->ctx);
|
||||||
tstrncpy(key, conn->ip, strlen(conn->ip));
|
tstrncpy(key, conn->ip, strlen(conn->ip));
|
||||||
tstrncpy(key + strlen(key), (char*)(&conn->port), sizeof(conn->port));
|
tstrncpy(key + strlen(key), (char*)(&conn->port), sizeof(conn->port));
|
||||||
tTrace("cli conn %p added to conn pool, read buf cap: %d", conn, conn->readBuf.cap);
|
tTrace("cli conn %p added to conn pool, read buf cap: %d", conn, conn->readBuf.cap);
|
||||||
|
@ -436,7 +438,6 @@ static SCliConn* cliCreateConn(SCliThrdObj* pThrd) {
|
||||||
conn->writeReq.data = conn;
|
conn->writeReq.data = conn;
|
||||||
conn->connReq.data = conn;
|
conn->connReq.data = conn;
|
||||||
conn->cliMsgs = taosArrayInit(2, sizeof(void*));
|
conn->cliMsgs = taosArrayInit(2, sizeof(void*));
|
||||||
|
|
||||||
QUEUE_INIT(&conn->conn);
|
QUEUE_INIT(&conn->conn);
|
||||||
conn->hostThrd = pThrd;
|
conn->hostThrd = pThrd;
|
||||||
conn->status = ConnNormal;
|
conn->status = ConnNormal;
|
||||||
|
@ -446,6 +447,7 @@ static SCliConn* cliCreateConn(SCliThrdObj* pThrd) {
|
||||||
}
|
}
|
||||||
static void cliDestroyConn(SCliConn* conn, bool clear) {
|
static void cliDestroyConn(SCliConn* conn, bool clear) {
|
||||||
tTrace("%s cli conn %p remove from conn pool", CONN_GET_INST_LABEL(conn), conn);
|
tTrace("%s cli conn %p remove from conn pool", CONN_GET_INST_LABEL(conn), conn);
|
||||||
|
|
||||||
QUEUE_REMOVE(&conn->conn);
|
QUEUE_REMOVE(&conn->conn);
|
||||||
if (clear) {
|
if (clear) {
|
||||||
uv_close((uv_handle_t*)conn->stream, cliDestroy);
|
uv_close((uv_handle_t*)conn->stream, cliDestroy);
|
||||||
|
@ -455,6 +457,7 @@ static void cliDestroy(uv_handle_t* handle) {
|
||||||
SCliConn* conn = handle->data;
|
SCliConn* conn = handle->data;
|
||||||
free(conn->ip);
|
free(conn->ip);
|
||||||
free(conn->stream);
|
free(conn->stream);
|
||||||
|
transCtxDestroy(&conn->ctx);
|
||||||
taosArrayDestroy(conn->cliMsgs);
|
taosArrayDestroy(conn->cliMsgs);
|
||||||
tTrace("%s cli conn %p destroy successfully", CONN_GET_INST_LABEL(conn), conn);
|
tTrace("%s cli conn %p destroy successfully", CONN_GET_INST_LABEL(conn), conn);
|
||||||
free(conn);
|
free(conn);
|
||||||
|
@ -630,10 +633,12 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
|
||||||
if (conn != NULL) {
|
if (conn != NULL) {
|
||||||
conn->hThrdIdx = pCtx->hThrdIdx;
|
conn->hThrdIdx = pCtx->hThrdIdx;
|
||||||
|
|
||||||
|
transCtxMerge(&conn->ctx, &pCtx->appCtx);
|
||||||
if (taosArrayGetSize(conn->cliMsgs) > 0) {
|
if (taosArrayGetSize(conn->cliMsgs) > 0) {
|
||||||
taosArrayPush(conn->cliMsgs, &pMsg);
|
taosArrayPush(conn->cliMsgs, &pMsg);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
taosArrayPush(conn->cliMsgs, &pMsg);
|
taosArrayPush(conn->cliMsgs, &pMsg);
|
||||||
transDestroyBuffer(&conn->readBuf);
|
transDestroyBuffer(&conn->readBuf);
|
||||||
cliSend(conn);
|
cliSend(conn);
|
||||||
|
@ -825,7 +830,7 @@ void transReleaseCliHandle(void* handle) {
|
||||||
transSendAsync(thrd->asyncPool, &cmsg->q);
|
transSendAsync(thrd->asyncPool, &cmsg->q);
|
||||||
}
|
}
|
||||||
|
|
||||||
void transSendRequest(void* shandle, const char* ip, uint32_t port, STransMsg* pMsg) {
|
void transSendRequest(void* shandle, const char* ip, uint32_t port, STransMsg* pMsg, STransCtx* ctx) {
|
||||||
STrans* pTransInst = (STrans*)shandle;
|
STrans* pTransInst = (STrans*)shandle;
|
||||||
int index = CONN_HOST_THREAD_INDEX((SCliConn*)pMsg->handle);
|
int index = CONN_HOST_THREAD_INDEX((SCliConn*)pMsg->handle);
|
||||||
if (index == -1) {
|
if (index == -1) {
|
||||||
|
@ -835,13 +840,14 @@ void transSendRequest(void* shandle, const char* ip, uint32_t port, STransMsg* p
|
||||||
if (transCompressMsg(pMsg->pCont, pMsg->contLen, &flen)) {
|
if (transCompressMsg(pMsg->pCont, pMsg->contLen, &flen)) {
|
||||||
// imp later
|
// imp later
|
||||||
}
|
}
|
||||||
tDebug("send request at thread:%d %p", index, pMsg);
|
tDebug("send request at thread:%d %p, dst: %s:%d", index, pMsg, ip, port);
|
||||||
STransConnCtx* pCtx = calloc(1, sizeof(STransConnCtx));
|
STransConnCtx* pCtx = calloc(1, sizeof(STransConnCtx));
|
||||||
pCtx->ahandle = pMsg->ahandle;
|
pCtx->ahandle = pMsg->ahandle;
|
||||||
pCtx->msgType = pMsg->msgType;
|
pCtx->msgType = pMsg->msgType;
|
||||||
pCtx->ip = strdup(ip);
|
pCtx->ip = strdup(ip);
|
||||||
pCtx->port = port;
|
pCtx->port = port;
|
||||||
pCtx->hThrdIdx = index;
|
pCtx->hThrdIdx = index;
|
||||||
|
pCtx->appCtx = *ctx;
|
||||||
|
|
||||||
assert(pTransInst->connType == TAOS_CONN_CLIENT);
|
assert(pTransInst->connType == TAOS_CONN_CLIENT);
|
||||||
// atomic or not
|
// atomic or not
|
||||||
|
@ -855,6 +861,7 @@ void transSendRequest(void* shandle, const char* ip, uint32_t port, STransMsg* p
|
||||||
SCliThrdObj* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[index];
|
SCliThrdObj* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[index];
|
||||||
transSendAsync(thrd->asyncPool, &(cliMsg->q));
|
transSendAsync(thrd->asyncPool, &(cliMsg->q));
|
||||||
}
|
}
|
||||||
|
|
||||||
void transSendRecv(void* shandle, const char* ip, uint32_t port, STransMsg* pReq, STransMsg* pRsp) {
|
void transSendRecv(void* shandle, const char* ip, uint32_t port, STransMsg* pReq, STransMsg* pRsp) {
|
||||||
STrans* pTransInst = (STrans*)shandle;
|
STrans* pTransInst = (STrans*)shandle;
|
||||||
int index = CONN_HOST_THREAD_INDEX(pReq->handle);
|
int index = CONN_HOST_THREAD_INDEX(pReq->handle);
|
||||||
|
|
|
@ -155,9 +155,9 @@ bool transReadComplete(SConnBuffer* connBuf) {
|
||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
int transPackMsg(STransMsgHead* msgHead, bool sercured, bool auth) {return 0;}
|
int transPackMsg(STransMsgHead* msgHead, bool sercured, bool auth) { return 0; }
|
||||||
|
|
||||||
int transUnpackMsg(STransMsgHead* msgHead) {return 0;}
|
int transUnpackMsg(STransMsgHead* msgHead) { return 0; }
|
||||||
int transDestroyBuffer(SConnBuffer* buf) {
|
int transDestroyBuffer(SConnBuffer* buf) {
|
||||||
if (buf->cap > 0) {
|
if (buf->cap > 0) {
|
||||||
tfree(buf->buf);
|
tfree(buf->buf);
|
||||||
|
@ -224,4 +224,56 @@ int transSendAsync(SAsyncPool* pool, queue* q) {
|
||||||
return uv_async_send(async);
|
return uv_async_send(async);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void transCtxInit(STransCtx* ctx) {
|
||||||
|
// init transCtx
|
||||||
|
ctx->args = taosHashInit(2, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UINT), true, HASH_NO_LOCK);
|
||||||
|
}
|
||||||
|
void transCtxDestroy(STransCtx* ctx) {
|
||||||
|
if (ctx->args == NULL) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
STransCtxVal* iter = taosHashIterate(ctx->args, NULL);
|
||||||
|
while (iter) {
|
||||||
|
iter->free(iter->val);
|
||||||
|
iter = taosHashIterate(ctx->args, iter);
|
||||||
|
}
|
||||||
|
taosHashCleanup(ctx->args);
|
||||||
|
}
|
||||||
|
|
||||||
|
void transCtxMerge(STransCtx* dst, STransCtx* src) {
|
||||||
|
if (dst->args == NULL) {
|
||||||
|
dst->args = src->args;
|
||||||
|
src->args = NULL;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
void* key = NULL;
|
||||||
|
size_t klen = 0;
|
||||||
|
void* iter = taosHashIterate(src->args, NULL);
|
||||||
|
while (iter) {
|
||||||
|
STransCtxVal* sVal = (STransCtxVal*)iter;
|
||||||
|
key = taosHashGetKey(sVal, &klen);
|
||||||
|
|
||||||
|
STransCtxVal* dVal = taosHashGet(dst->args, key, klen);
|
||||||
|
if (dVal) {
|
||||||
|
dVal->free(dVal->val);
|
||||||
|
}
|
||||||
|
taosHashPut(dst->args, key, klen, sVal, sizeof(*sVal));
|
||||||
|
iter = taosHashIterate(src->args, iter);
|
||||||
|
}
|
||||||
|
taosHashCleanup(src->args);
|
||||||
|
}
|
||||||
|
void* transCtxDumpVal(STransCtx* ctx, int32_t key) {
|
||||||
|
if (ctx->args == NULL) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
STransCtxVal* cVal = taosHashGet(ctx->args, (const void*)&key, sizeof(key));
|
||||||
|
if (cVal == NULL) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
char* ret = calloc(1, cVal->len);
|
||||||
|
memcpy(ret, (char*)cVal->val, cVal->len);
|
||||||
|
return (void*)ret;
|
||||||
|
}
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -403,16 +403,16 @@ static void uvStartSendResp(SSrvMsg* smsg) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void uvNotifyLinkBrokenToApp(SSrvConn* conn) {
|
// static void uvNotifyLinkBrokenToApp(SSrvConn* conn) {
|
||||||
STrans* pTransInst = conn->pTransInst;
|
// STrans* pTransInst = conn->pTransInst;
|
||||||
if (pTransInst->efp != NULL && (*pTransInst->efp)(NULL, conn->inType) && T_REF_VAL_GET(conn) >= 2) {
|
// if (pTransInst->efp != NULL && (*pTransInst->efp)(NULL, conn->inType) && T_REF_VAL_GET(conn) >= 2) {
|
||||||
STransMsg transMsg = {0};
|
// STransMsg transMsg = {0};
|
||||||
transMsg.msgType = conn->inType;
|
// transMsg.msgType = conn->inType;
|
||||||
transMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
|
// transMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
|
||||||
// transRefSrvHandle(conn);
|
// // transRefSrvHandle(conn);
|
||||||
(*pTransInst->cfp)(pTransInst->parent, &transMsg, 0);
|
// (*pTransInst->cfp)(pTransInst->parent, &transMsg, 0);
|
||||||
}
|
// }
|
||||||
}
|
//}
|
||||||
static void destroySmsg(SSrvMsg* smsg) {
|
static void destroySmsg(SSrvMsg* smsg) {
|
||||||
if (smsg == NULL) {
|
if (smsg == NULL) {
|
||||||
return;
|
return;
|
||||||
|
|
|
@ -86,18 +86,8 @@ class Client {
|
||||||
rpcClose(this->transCli);
|
rpcClose(this->transCli);
|
||||||
this->transCli = NULL;
|
this->transCli = NULL;
|
||||||
}
|
}
|
||||||
void SetPersistFP(bool (*pfp)(void *parent, tmsg_t msgType)) {
|
|
||||||
rpcClose(this->transCli);
|
|
||||||
this->transCli = rpcOpen(&rpcInit_);
|
|
||||||
}
|
|
||||||
void SetConstructFP(void *(*mfp)(void *parent, tmsg_t msgType)) {
|
void SetConstructFP(void *(*mfp)(void *parent, tmsg_t msgType)) {
|
||||||
rpcClose(this->transCli);
|
rpcClose(this->transCli);
|
||||||
rpcInit_.mfp = mfp;
|
|
||||||
this->transCli = rpcOpen(&rpcInit_);
|
|
||||||
}
|
|
||||||
void SetPAndMFp(bool (*pfp)(void *parent, tmsg_t msgType), void *(*mfp)(void *parent, tmsg_t msgType)) {
|
|
||||||
rpcClose(this->transCli);
|
|
||||||
rpcInit_.mfp = mfp;
|
|
||||||
this->transCli = rpcOpen(&rpcInit_);
|
this->transCli = rpcOpen(&rpcInit_);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -156,10 +146,6 @@ class Server {
|
||||||
rpcClose(this->transSrv);
|
rpcClose(this->transSrv);
|
||||||
this->transSrv = NULL;
|
this->transSrv = NULL;
|
||||||
}
|
}
|
||||||
void SetExceptFp(bool (*efp)(void *parent, tmsg_t msgType)) {
|
|
||||||
this->Stop();
|
|
||||||
this->Start();
|
|
||||||
}
|
|
||||||
void SetSrvContinueSend(void (*cfp)(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet)) {
|
void SetSrvContinueSend(void (*cfp)(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet)) {
|
||||||
this->Stop();
|
this->Stop();
|
||||||
rpcInit_.cfp = cfp;
|
rpcInit_.cfp = cfp;
|
||||||
|
@ -252,23 +238,11 @@ class TransObj {
|
||||||
//
|
//
|
||||||
srv->Stop();
|
srv->Stop();
|
||||||
}
|
}
|
||||||
void SetCliPersistFp(bool (*pfp)(void *parent, tmsg_t msgType)) {
|
|
||||||
// do nothing
|
|
||||||
cli->SetPersistFP(pfp);
|
|
||||||
}
|
|
||||||
void SetCliMFp(void *(*mfp)(void *parent, tmsg_t msgType)) {
|
void SetCliMFp(void *(*mfp)(void *parent, tmsg_t msgType)) {
|
||||||
// do nothing
|
// do nothing
|
||||||
cli->SetConstructFP(mfp);
|
cli->SetConstructFP(mfp);
|
||||||
}
|
}
|
||||||
void SetCliMAndPFp(bool (*pfp)(void *parent, tmsg_t msgType), void *(*mfp)(void *parent, tmsg_t msgType)) {
|
|
||||||
// do nothing
|
|
||||||
cli->SetPAndMFp(pfp, mfp);
|
|
||||||
}
|
|
||||||
// call when link broken, and notify query or fetch stop
|
// call when link broken, and notify query or fetch stop
|
||||||
void SetSrvExceptFp(bool (*efp)(void *parent, tmsg_t msgType)) {
|
|
||||||
////////
|
|
||||||
srv->SetExceptFp(efp);
|
|
||||||
}
|
|
||||||
void SetSrvContinueSend(void (*cfp)(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet)) {
|
void SetSrvContinueSend(void (*cfp)(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet)) {
|
||||||
///////
|
///////
|
||||||
srv->SetSrvContinueSend(cfp);
|
srv->SetSrvContinueSend(cfp);
|
||||||
|
@ -375,22 +349,15 @@ TEST_F(TransEnv, cliReleaseHandle) {
|
||||||
req.pCont = rpcMallocCont(10);
|
req.pCont = rpcMallocCont(10);
|
||||||
req.contLen = 10;
|
req.contLen = 10;
|
||||||
tr->cliSendAndRecvNoHandle(&req, &resp);
|
tr->cliSendAndRecvNoHandle(&req, &resp);
|
||||||
// if (i == 5) {
|
|
||||||
// std::cout << "stop server" << std::endl;
|
|
||||||
// tr->StopSrv();
|
|
||||||
//}
|
|
||||||
// if (i >= 6) {
|
|
||||||
EXPECT_TRUE(resp.code == 0);
|
EXPECT_TRUE(resp.code == 0);
|
||||||
//}
|
//}
|
||||||
}
|
}
|
||||||
//////////////////
|
//////////////////
|
||||||
}
|
}
|
||||||
TEST_F(TransEnv, cliReleaseHandleExcept) {
|
TEST_F(TransEnv, cliReleaseHandleExcept) {
|
||||||
// tr->SetCliPersistFp(cliPersistHandle);
|
|
||||||
|
|
||||||
SRpcMsg resp = {0};
|
SRpcMsg resp = {0};
|
||||||
for (int i = 0; i < 10; i++) {
|
for (int i = 0; i < 10; i++) {
|
||||||
SRpcMsg req = {.handle = resp.handle};
|
SRpcMsg req = {.handle = resp.handle, .persistHandle = 1};
|
||||||
req.msgType = 1;
|
req.msgType = 1;
|
||||||
req.pCont = rpcMallocCont(10);
|
req.pCont = rpcMallocCont(10);
|
||||||
req.contLen = 10;
|
req.contLen = 10;
|
||||||
|
@ -459,7 +426,7 @@ TEST_F(TransEnv, multiCliPersistHandleExcept) {
|
||||||
// conn broken
|
// conn broken
|
||||||
}
|
}
|
||||||
TEST_F(TransEnv, queryExcept) {
|
TEST_F(TransEnv, queryExcept) {
|
||||||
tr->SetSrvExceptFp(handleExcept);
|
// tr->SetSrvExceptFp(handleExcept);
|
||||||
|
|
||||||
// query and conn is broken
|
// query and conn is broken
|
||||||
}
|
}
|
||||||
|
|
|
@ -136,4 +136,98 @@ TEST_F(QueueEnv, testIter) {
|
||||||
assert(result.size() == vals.size());
|
assert(result.size() == vals.size());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
class TransCtxEnv : public ::testing::Test {
|
||||||
|
protected:
|
||||||
|
virtual void SetUp() {
|
||||||
|
ctx = (STransCtx *)calloc(1, sizeof(STransCtx));
|
||||||
|
transCtxInit(ctx);
|
||||||
|
// TODO
|
||||||
|
}
|
||||||
|
virtual void TearDown() {
|
||||||
|
transCtxDestroy(ctx);
|
||||||
|
// formate
|
||||||
|
}
|
||||||
|
STransCtx *ctx;
|
||||||
|
};
|
||||||
|
|
||||||
|
TEST_F(TransCtxEnv, mergeTest) {
|
||||||
|
int key = 1;
|
||||||
|
{
|
||||||
|
STransCtx *src = (STransCtx *)calloc(1, sizeof(STransCtx));
|
||||||
|
transCtxInit(src);
|
||||||
|
{
|
||||||
|
STransCtxVal val1 = {.val = NULL, .len = 0, .free = free};
|
||||||
|
val1.val = malloc(12);
|
||||||
|
val1.len = 12;
|
||||||
|
|
||||||
|
taosHashPut(src->args, &key, sizeof(key), &val1, sizeof(val1));
|
||||||
|
key++;
|
||||||
|
}
|
||||||
|
{
|
||||||
|
STransCtxVal val1 = {.val = NULL, .len = 0, .free = free};
|
||||||
|
val1.val = malloc(12);
|
||||||
|
val1.len = 12;
|
||||||
|
taosHashPut(src->args, &key, sizeof(key), &val1, sizeof(val1));
|
||||||
|
key++;
|
||||||
|
}
|
||||||
|
transCtxMerge(ctx, src);
|
||||||
|
free(src);
|
||||||
|
}
|
||||||
|
EXPECT_EQ(2, taosHashGetSize(ctx->args));
|
||||||
|
{
|
||||||
|
STransCtx *src = (STransCtx *)calloc(1, sizeof(STransCtx));
|
||||||
|
transCtxInit(src);
|
||||||
|
{
|
||||||
|
STransCtxVal val1 = {.val = NULL, .len = 0, .free = free};
|
||||||
|
val1.val = malloc(12);
|
||||||
|
val1.len = 12;
|
||||||
|
|
||||||
|
taosHashPut(src->args, &key, sizeof(key), &val1, sizeof(val1));
|
||||||
|
key++;
|
||||||
|
}
|
||||||
|
{
|
||||||
|
STransCtxVal val1 = {.val = NULL, .len = 0, .free = free};
|
||||||
|
val1.val = malloc(12);
|
||||||
|
val1.len = 12;
|
||||||
|
taosHashPut(src->args, &key, sizeof(key), &val1, sizeof(val1));
|
||||||
|
key++;
|
||||||
|
}
|
||||||
|
transCtxMerge(ctx, src);
|
||||||
|
free(src);
|
||||||
|
}
|
||||||
|
std::string val("Hello");
|
||||||
|
EXPECT_EQ(4, taosHashGetSize(ctx->args));
|
||||||
|
{
|
||||||
|
key = 1;
|
||||||
|
STransCtx *src = (STransCtx *)calloc(1, sizeof(STransCtx));
|
||||||
|
transCtxInit(src);
|
||||||
|
{
|
||||||
|
STransCtxVal val1 = {.val = NULL, .len = 0, .free = free};
|
||||||
|
val1.val = calloc(1, 11);
|
||||||
|
memcpy(val1.val, val.c_str(), val.size());
|
||||||
|
val1.len = 11;
|
||||||
|
|
||||||
|
taosHashPut(src->args, &key, sizeof(key), &val1, sizeof(val1));
|
||||||
|
key++;
|
||||||
|
}
|
||||||
|
{
|
||||||
|
STransCtxVal val1 = {.val = NULL, .len = 0, .free = free};
|
||||||
|
val1.val = calloc(1, 11);
|
||||||
|
memcpy(val1.val, val.c_str(), val.size());
|
||||||
|
val1.len = 11;
|
||||||
|
taosHashPut(src->args, &key, sizeof(key), &val1, sizeof(val1));
|
||||||
|
key++;
|
||||||
|
}
|
||||||
|
transCtxMerge(ctx, src);
|
||||||
|
free(src);
|
||||||
|
}
|
||||||
|
EXPECT_EQ(4, taosHashGetSize(ctx->args));
|
||||||
|
|
||||||
|
char *skey = (char *)transCtxDumpVal(ctx, 1);
|
||||||
|
EXPECT_EQ(0, strcmp(skey, val.c_str()));
|
||||||
|
free(skey);
|
||||||
|
|
||||||
|
skey = (char *)transCtxDumpVal(ctx, 2);
|
||||||
|
EXPECT_EQ(0, strcmp(skey, val.c_str()));
|
||||||
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
Loading…
Reference in New Issue