Merge pull request #26752 from taosdata/fix/refactorTransport
Fix/refactorTransport
This commit is contained in:
commit
a2944f1f45
|
@ -1075,10 +1075,10 @@ typedef struct {
|
||||||
SUpdateUserIpWhite* pUserIpWhite;
|
SUpdateUserIpWhite* pUserIpWhite;
|
||||||
} SUpdateIpWhite;
|
} SUpdateIpWhite;
|
||||||
|
|
||||||
int32_t tSerializeSUpdateIpWhite(void* buf, int32_t bufLen, SUpdateIpWhite* pReq);
|
int32_t tSerializeSUpdateIpWhite(void* buf, int32_t bufLen, SUpdateIpWhite* pReq);
|
||||||
int32_t tDeserializeSUpdateIpWhite(void* buf, int32_t bufLen, SUpdateIpWhite* pReq);
|
int32_t tDeserializeSUpdateIpWhite(void* buf, int32_t bufLen, SUpdateIpWhite* pReq);
|
||||||
void tFreeSUpdateIpWhiteReq(SUpdateIpWhite* pReq);
|
void tFreeSUpdateIpWhiteReq(SUpdateIpWhite* pReq);
|
||||||
SUpdateIpWhite* cloneSUpdateIpWhiteReq(SUpdateIpWhite* pReq);
|
int32_t cloneSUpdateIpWhiteReq(SUpdateIpWhite* pReq, SUpdateIpWhite** pUpdate);
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int64_t ipWhiteVer;
|
int64_t ipWhiteVer;
|
||||||
|
|
|
@ -164,13 +164,13 @@ int rpcRegisterBrokenLinkArg(SRpcMsg *msg);
|
||||||
int rpcReleaseHandle(void *handle, int8_t type); // just release conn to rpc instance, no close sock
|
int rpcReleaseHandle(void *handle, int8_t type); // just release conn to rpc instance, no close sock
|
||||||
|
|
||||||
// These functions will not be called in the child process
|
// These functions will not be called in the child process
|
||||||
int rpcSendRequestWithCtx(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid, SRpcCtx *ctx);
|
int rpcSendRequestWithCtx(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid, SRpcCtx *ctx);
|
||||||
int rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp);
|
int rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp);
|
||||||
int rpcSendRecvWithTimeout(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp, int8_t *epUpdated,
|
int rpcSendRecvWithTimeout(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp, int8_t *epUpdated,
|
||||||
int32_t timeoutMs);
|
int32_t timeoutMs);
|
||||||
int rpcSetDefaultAddr(void *thandle, const char *ip, const char *fqdn);
|
int rpcSetDefaultAddr(void *thandle, const char *ip, const char *fqdn);
|
||||||
void *rpcAllocHandle();
|
void *rpcAllocHandle();
|
||||||
void rpcSetIpWhite(void *thandl, void *arg);
|
int32_t rpcSetIpWhite(void *thandl, void *arg);
|
||||||
|
|
||||||
int32_t rpcUtilSIpRangeToStr(SIpV4Range *pRange, char *buf);
|
int32_t rpcUtilSIpRangeToStr(SIpV4Range *pRange, char *buf);
|
||||||
|
|
||||||
|
|
|
@ -90,6 +90,8 @@ int32_t taosGetErrSize();
|
||||||
#define TSDB_CODE_RPC_NETWORK_ERROR TAOS_DEF_ERROR_CODE(0, 0x0023)
|
#define TSDB_CODE_RPC_NETWORK_ERROR TAOS_DEF_ERROR_CODE(0, 0x0023)
|
||||||
#define TSDB_CODE_RPC_NETWORK_BUSY TAOS_DEF_ERROR_CODE(0, 0x0024)
|
#define TSDB_CODE_RPC_NETWORK_BUSY TAOS_DEF_ERROR_CODE(0, 0x0024)
|
||||||
#define TSDB_CODE_HTTP_MODULE_QUIT TAOS_DEF_ERROR_CODE(0, 0x0025)
|
#define TSDB_CODE_HTTP_MODULE_QUIT TAOS_DEF_ERROR_CODE(0, 0x0025)
|
||||||
|
#define TSDB_CODE_RPC_MODULE_QUIT TAOS_DEF_ERROR_CODE(0, 0x0026)
|
||||||
|
#define TSDB_CODE_RPC_ASYNC_MODULE_QUIT TAOS_DEF_ERROR_CODE(0, 0x0027)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -1764,23 +1764,33 @@ int32_t tDeserializeSUpdateIpWhite(void *buf, int32_t bufLen, SUpdateIpWhite *pR
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
void tFreeSUpdateIpWhiteReq(SUpdateIpWhite *pReq) {
|
void tFreeSUpdateIpWhiteReq(SUpdateIpWhite *pReq) {
|
||||||
for (int i = 0; i < pReq->numOfUser; i++) {
|
if (pReq == NULL) return;
|
||||||
SUpdateUserIpWhite *pUserWhite = &pReq->pUserIpWhite[i];
|
|
||||||
taosMemoryFree(pUserWhite->pIpRanges);
|
if (pReq->pUserIpWhite) {
|
||||||
|
for (int i = 0; i < pReq->numOfUser; i++) {
|
||||||
|
SUpdateUserIpWhite *pUserWhite = &pReq->pUserIpWhite[i];
|
||||||
|
taosMemoryFree(pUserWhite->pIpRanges);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
taosMemoryFree(pReq->pUserIpWhite);
|
taosMemoryFree(pReq->pUserIpWhite);
|
||||||
// impl later
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
SUpdateIpWhite *cloneSUpdateIpWhiteReq(SUpdateIpWhite *pReq) {
|
int32_t cloneSUpdateIpWhiteReq(SUpdateIpWhite *pReq, SUpdateIpWhite **pUpdateMsg) {
|
||||||
|
int32_t code = 0;
|
||||||
|
if (pReq == NULL) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
SUpdateIpWhite *pClone = taosMemoryCalloc(1, sizeof(SUpdateIpWhite));
|
SUpdateIpWhite *pClone = taosMemoryCalloc(1, sizeof(SUpdateIpWhite));
|
||||||
if (pClone == NULL) return NULL;
|
if (pClone == NULL) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
pClone->numOfUser = pReq->numOfUser;
|
pClone->numOfUser = pReq->numOfUser;
|
||||||
pClone->ver = pReq->ver;
|
pClone->ver = pReq->ver;
|
||||||
if ((pClone->pUserIpWhite = taosMemoryCalloc(1, sizeof(SUpdateUserIpWhite) * pReq->numOfUser)) == NULL) {
|
pClone->pUserIpWhite = taosMemoryCalloc(1, sizeof(SUpdateUserIpWhite) * pReq->numOfUser);
|
||||||
|
if (pClone->pUserIpWhite == NULL) {
|
||||||
taosMemoryFree(pClone);
|
taosMemoryFree(pClone);
|
||||||
return NULL;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int i = 0; i < pReq->numOfUser; i++) {
|
for (int i = 0; i < pReq->numOfUser; i++) {
|
||||||
|
@ -1792,17 +1802,21 @@ SUpdateIpWhite *cloneSUpdateIpWhiteReq(SUpdateIpWhite *pReq) {
|
||||||
pNew->numOfRange = pOld->numOfRange;
|
pNew->numOfRange = pOld->numOfRange;
|
||||||
|
|
||||||
int32_t sz = pOld->numOfRange * sizeof(SIpV4Range);
|
int32_t sz = pOld->numOfRange * sizeof(SIpV4Range);
|
||||||
if ((pNew->pIpRanges = taosMemoryCalloc(1, sz)) == NULL) {
|
pNew->pIpRanges = taosMemoryCalloc(1, sz);
|
||||||
for (int j = 0; j < i; j++) {
|
if (pNew->pIpRanges == NULL) {
|
||||||
taosMemoryFree(pClone->pUserIpWhite[j].pIpRanges);
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
break;
|
||||||
taosMemoryFree(pClone->pUserIpWhite);
|
|
||||||
taosMemoryFree(pClone);
|
|
||||||
return NULL;
|
|
||||||
}
|
}
|
||||||
memcpy(pNew->pIpRanges, pOld->pIpRanges, sz);
|
memcpy(pNew->pIpRanges, pOld->pIpRanges, sz);
|
||||||
}
|
}
|
||||||
return pClone;
|
_return:
|
||||||
|
if (code < 0) {
|
||||||
|
tFreeSUpdateIpWhiteReq(pClone);
|
||||||
|
taosMemoryFree(pClone);
|
||||||
|
} else {
|
||||||
|
*pUpdateMsg = pClone;
|
||||||
|
}
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
int32_t tSerializeRetrieveIpWhite(void *buf, int32_t bufLen, SRetrieveIpWhiteReq *pReq) {
|
int32_t tSerializeRetrieveIpWhite(void *buf, int32_t bufLen, SRetrieveIpWhiteReq *pReq) {
|
||||||
SEncoder encoder = {0};
|
SEncoder encoder = {0};
|
||||||
|
|
|
@ -65,10 +65,11 @@ static int32_t dmConvertErrCode(tmsg_t msgType, int32_t code) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
static void dmUpdateRpcIpWhite(SDnodeData *pData, void *pTrans, SRpcMsg *pRpc) {
|
static void dmUpdateRpcIpWhite(SDnodeData *pData, void *pTrans, SRpcMsg *pRpc) {
|
||||||
|
int32_t code = 0;
|
||||||
SUpdateIpWhite ipWhite = {0}; // aosMemoryCalloc(1, sizeof(SUpdateIpWhite));
|
SUpdateIpWhite ipWhite = {0}; // aosMemoryCalloc(1, sizeof(SUpdateIpWhite));
|
||||||
tDeserializeSUpdateIpWhite(pRpc->pCont, pRpc->contLen, &ipWhite);
|
tDeserializeSUpdateIpWhite(pRpc->pCont, pRpc->contLen, &ipWhite);
|
||||||
|
|
||||||
rpcSetIpWhite(pTrans, &ipWhite);
|
code = rpcSetIpWhite(pTrans, &ipWhite);
|
||||||
pData->ipWhiteVer = ipWhite.ver;
|
pData->ipWhiteVer = ipWhite.ver;
|
||||||
|
|
||||||
tFreeSUpdateIpWhiteReq(&ipWhite);
|
tFreeSUpdateIpWhiteReq(&ipWhite);
|
||||||
|
|
|
@ -103,11 +103,11 @@ typedef void* queue[2];
|
||||||
#define TRANS_MAGIC_NUM 0x5f375a86
|
#define TRANS_MAGIC_NUM 0x5f375a86
|
||||||
#define TRANS_NOVALID_PACKET(src) ((src) != TRANS_MAGIC_NUM ? 1 : 0)
|
#define TRANS_NOVALID_PACKET(src) ((src) != TRANS_MAGIC_NUM ? 1 : 0)
|
||||||
|
|
||||||
typedef struct SRpcMsg STransMsg;
|
typedef struct SRpcMsg STransMsg;
|
||||||
typedef SRpcCtx STransCtx;
|
typedef SRpcCtx STransCtx;
|
||||||
typedef SRpcCtxVal STransCtxVal;
|
typedef SRpcCtxVal STransCtxVal;
|
||||||
typedef SRpcInfo STrans;
|
typedef SRpcInfo STrans;
|
||||||
typedef SRpcConnInfo STransHandleInfo;
|
typedef SRpcConnInfo STransHandleInfo;
|
||||||
|
|
||||||
// ref mgt handle
|
// ref mgt handle
|
||||||
typedef struct SExHandle {
|
typedef struct SExHandle {
|
||||||
|
@ -250,10 +250,10 @@ typedef struct {
|
||||||
int8_t stop;
|
int8_t stop;
|
||||||
} SAsyncPool;
|
} SAsyncPool;
|
||||||
|
|
||||||
SAsyncPool* transAsyncPoolCreate(uv_loop_t* loop, int sz, void* arg, AsyncCB cb);
|
int32_t transAsyncPoolCreate(uv_loop_t* loop, int sz, void* arg, AsyncCB cb, SAsyncPool** pPool);
|
||||||
void transAsyncPoolDestroy(SAsyncPool* pool);
|
void transAsyncPoolDestroy(SAsyncPool* pool);
|
||||||
int transAsyncSend(SAsyncPool* pool, queue* mq);
|
int transAsyncSend(SAsyncPool* pool, queue* mq);
|
||||||
bool transAsyncPoolIsEmpty(SAsyncPool* pool);
|
bool transAsyncPoolIsEmpty(SAsyncPool* pool);
|
||||||
|
|
||||||
#define TRANS_DESTROY_ASYNC_POOL_MSG(pool, msgType, freeFunc, param) \
|
#define TRANS_DESTROY_ASYNC_POOL_MSG(pool, msgType, freeFunc, param) \
|
||||||
do { \
|
do { \
|
||||||
|
@ -279,6 +279,7 @@ bool transAsyncPoolIsEmpty(SAsyncPool* pool);
|
||||||
if (exh2 == NULL || id != exh2->refId) { \
|
if (exh2 == NULL || id != exh2->refId) { \
|
||||||
tTrace("handle %p except, may already freed, ignore msg, ref1:%" PRIu64 ", ref2:%" PRIu64, exh1, \
|
tTrace("handle %p except, may already freed, ignore msg, ref1:%" PRIu64 ", ref2:%" PRIu64, exh1, \
|
||||||
exh2 ? exh2->refId : 0, id); \
|
exh2 ? exh2->refId : 0, id); \
|
||||||
|
code = terrno; \
|
||||||
goto _return1; \
|
goto _return1; \
|
||||||
} \
|
} \
|
||||||
} else if (id == 0) { \
|
} else if (id == 0) { \
|
||||||
|
@ -287,6 +288,7 @@ bool transAsyncPoolIsEmpty(SAsyncPool* pool);
|
||||||
if (exh2 == NULL || id == exh2->refId) { \
|
if (exh2 == NULL || id == exh2->refId) { \
|
||||||
tTrace("handle %p except, may already freed, ignore msg, ref1:%" PRIu64 ", ref2:%" PRIu64, exh1, id, \
|
tTrace("handle %p except, may already freed, ignore msg, ref1:%" PRIu64 ", ref2:%" PRIu64, exh1, id, \
|
||||||
exh2 ? exh2->refId : 0); \
|
exh2 ? exh2->refId : 0); \
|
||||||
|
code = terrno; \
|
||||||
goto _return1; \
|
goto _return1; \
|
||||||
} else { \
|
} else { \
|
||||||
id = exh1->refId; \
|
id = exh1->refId; \
|
||||||
|
@ -297,13 +299,13 @@ bool transAsyncPoolIsEmpty(SAsyncPool* pool);
|
||||||
} \
|
} \
|
||||||
} while (0)
|
} while (0)
|
||||||
|
|
||||||
int transInitBuffer(SConnBuffer* buf);
|
int32_t transInitBuffer(SConnBuffer* buf);
|
||||||
int transClearBuffer(SConnBuffer* buf);
|
int32_t transClearBuffer(SConnBuffer* buf);
|
||||||
int transDestroyBuffer(SConnBuffer* buf);
|
int32_t transDestroyBuffer(SConnBuffer* buf);
|
||||||
int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf);
|
int32_t transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf);
|
||||||
bool transReadComplete(SConnBuffer* connBuf);
|
bool transReadComplete(SConnBuffer* connBuf);
|
||||||
int transResetBuffer(SConnBuffer* connBuf, int8_t resetBuf);
|
int transResetBuffer(SConnBuffer* connBuf, int8_t resetBuf);
|
||||||
int transDumpFromBuffer(SConnBuffer* connBuf, char** buf, int8_t resetBuf);
|
int transDumpFromBuffer(SConnBuffer* connBuf, char** buf, int8_t resetBuf);
|
||||||
|
|
||||||
int transSetConnOption(uv_tcp_t* stream, int keepalive);
|
int transSetConnOption(uv_tcp_t* stream, int keepalive);
|
||||||
|
|
||||||
|
@ -316,14 +318,14 @@ void transUnrefCliHandle(void* handle);
|
||||||
int transReleaseCliHandle(void* handle);
|
int transReleaseCliHandle(void* handle);
|
||||||
int transReleaseSrvHandle(void* handle);
|
int transReleaseSrvHandle(void* handle);
|
||||||
|
|
||||||
int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pMsg, STransCtx* pCtx);
|
int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pMsg, STransCtx* pCtx);
|
||||||
int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pMsg, STransMsg* pRsp);
|
int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pMsg, STransMsg* pRsp);
|
||||||
int transSendRecvWithTimeout(void* shandle, SEpSet* pEpSet, STransMsg* pMsg, STransMsg* pRsp, int8_t* epUpdated,
|
int transSendRecvWithTimeout(void* shandle, SEpSet* pEpSet, STransMsg* pMsg, STransMsg* pRsp, int8_t* epUpdated,
|
||||||
int32_t timeoutMs);
|
int32_t timeoutMs);
|
||||||
int transSendResponse(const STransMsg* msg);
|
int transSendResponse(const STransMsg* msg);
|
||||||
int transRegisterMsg(const STransMsg* msg);
|
int transRegisterMsg(const STransMsg* msg);
|
||||||
int transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn);
|
int transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn);
|
||||||
void transSetIpWhiteList(void* shandle, void* arg, FilteFunc* func);
|
int32_t transSetIpWhiteList(void* shandle, void* arg, FilteFunc* func);
|
||||||
|
|
||||||
int transSockInfo2Str(struct sockaddr* sockname, char* dst);
|
int transSockInfo2Str(struct sockaddr* sockname, char* dst);
|
||||||
|
|
||||||
|
@ -363,7 +365,7 @@ typedef struct {
|
||||||
* init queue
|
* init queue
|
||||||
* note: queue'size is small, default 1
|
* note: queue'size is small, default 1
|
||||||
*/
|
*/
|
||||||
void transQueueInit(STransQueue* queue, void (*freeFunc)(const void* arg));
|
int32_t transQueueInit(STransQueue* queue, void (*freeFunc)(const void* arg));
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* put arg into queue
|
* put arg into queue
|
||||||
|
@ -420,7 +422,7 @@ typedef struct SDelayQueue {
|
||||||
uv_loop_t* loop;
|
uv_loop_t* loop;
|
||||||
} SDelayQueue;
|
} SDelayQueue;
|
||||||
|
|
||||||
int transDQCreate(uv_loop_t* loop, SDelayQueue** queue);
|
int32_t transDQCreate(uv_loop_t* loop, SDelayQueue** queue);
|
||||||
void transDQDestroy(SDelayQueue* queue, void (*freeFunc)(void* arg));
|
void transDQDestroy(SDelayQueue* queue, void (*freeFunc)(void* arg));
|
||||||
SDelayTask* transDQSched(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_t timeoutMs);
|
SDelayTask* transDQSched(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_t timeoutMs);
|
||||||
void transDQCancel(SDelayQueue* queue, SDelayTask* task);
|
void transDQCancel(SDelayQueue* queue, SDelayTask* task);
|
||||||
|
@ -433,9 +435,9 @@ bool transEpSetIsEqual2(SEpSet* a, SEpSet* b);
|
||||||
*/
|
*/
|
||||||
void transThreadOnce();
|
void transThreadOnce();
|
||||||
|
|
||||||
void transInit();
|
int32_t transInit();
|
||||||
void transCleanup();
|
void transCleanup();
|
||||||
void transPrintEpSet(SEpSet* pEpSet);
|
void transPrintEpSet(SEpSet* pEpSet);
|
||||||
|
|
||||||
void transFreeMsg(void* msg);
|
void transFreeMsg(void* msg);
|
||||||
int32_t transCompressMsg(char* msg, int32_t len);
|
int32_t transCompressMsg(char* msg, int32_t len);
|
||||||
|
|
|
@ -719,9 +719,8 @@ int64_t transInitHttpChanImpl() {
|
||||||
goto _ERROR;
|
goto _ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
http->asyncPool = transAsyncPoolCreate(http->loop, 1, http, httpAsyncCb);
|
code = transAsyncPoolCreate(http->loop, 1, http, httpAsyncCb, &http->asyncPool);
|
||||||
if (http->asyncPool == NULL) {
|
if (code != 0) {
|
||||||
code = terrno;
|
|
||||||
goto _ERROR;
|
goto _ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -27,18 +27,20 @@ int (*transReleaseHandle[])(void* handle) = {transReleaseSrvHandle, transRelease
|
||||||
|
|
||||||
static int32_t transValidLocalFqdn(const char* localFqdn, uint32_t* ip) {
|
static int32_t transValidLocalFqdn(const char* localFqdn, uint32_t* ip) {
|
||||||
int32_t code = taosGetIpv4FromFqdn(localFqdn, ip);
|
int32_t code = taosGetIpv4FromFqdn(localFqdn, ip);
|
||||||
if (code) {
|
if (code != 0) {
|
||||||
terrno = TSDB_CODE_RPC_FQDN_ERROR;
|
return TSDB_CODE_RPC_FQDN_ERROR;
|
||||||
return -1;
|
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
void* rpcOpen(const SRpcInit* pInit) {
|
void* rpcOpen(const SRpcInit* pInit) {
|
||||||
rpcInit();
|
int32_t code = rpcInit();
|
||||||
|
if (code != 0) {
|
||||||
|
TAOS_CHECK_GOTO(code, NULL, _end);
|
||||||
|
}
|
||||||
|
|
||||||
SRpcInfo* pRpc = taosMemoryCalloc(1, sizeof(SRpcInfo));
|
SRpcInfo* pRpc = taosMemoryCalloc(1, sizeof(SRpcInfo));
|
||||||
if (pRpc == NULL) {
|
if (pRpc == NULL) {
|
||||||
return NULL;
|
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _end);
|
||||||
}
|
}
|
||||||
if (pInit->label) {
|
if (pInit->label) {
|
||||||
int len = strlen(pInit->label) > sizeof(pRpc->label) ? sizeof(pRpc->label) : strlen(pInit->label);
|
int len = strlen(pInit->label) > sizeof(pRpc->label) ? sizeof(pRpc->label) : strlen(pInit->label);
|
||||||
|
@ -84,10 +86,9 @@ void* rpcOpen(const SRpcInit* pInit) {
|
||||||
|
|
||||||
uint32_t ip = 0;
|
uint32_t ip = 0;
|
||||||
if (pInit->connType == TAOS_CONN_SERVER) {
|
if (pInit->connType == TAOS_CONN_SERVER) {
|
||||||
if (transValidLocalFqdn(pInit->localFqdn, &ip) != 0) {
|
if ((code = transValidLocalFqdn(pInit->localFqdn, &ip)) != 0) {
|
||||||
tError("invalid fqdn:%s, errmsg:%s", pInit->localFqdn, terrstr());
|
tError("invalid fqdn:%s, errmsg:%s", pInit->localFqdn, tstrerror(code));
|
||||||
taosMemoryFree(pRpc);
|
TAOS_CHECK_GOTO(code, NULL, _end);
|
||||||
return NULL;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -105,14 +106,19 @@ void* rpcOpen(const SRpcInit* pInit) {
|
||||||
(*taosInitHandle[pRpc->connType])(ip, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc);
|
(*taosInitHandle[pRpc->connType])(ip, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc);
|
||||||
|
|
||||||
if (pRpc->tcphandle == NULL) {
|
if (pRpc->tcphandle == NULL) {
|
||||||
taosMemoryFree(pRpc);
|
tError("failed to init rpc handle");
|
||||||
return NULL;
|
TAOS_CHECK_GOTO(terrno, NULL, _end);
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t refId = transAddExHandle(transGetInstMgt(), pRpc);
|
int64_t refId = transAddExHandle(transGetInstMgt(), pRpc);
|
||||||
transAcquireExHandle(transGetInstMgt(), refId);
|
transAcquireExHandle(transGetInstMgt(), refId);
|
||||||
pRpc->refId = refId;
|
pRpc->refId = refId;
|
||||||
return (void*)refId;
|
return (void*)refId;
|
||||||
|
_end:
|
||||||
|
taosMemoryFree(pRpc);
|
||||||
|
terrno = code;
|
||||||
|
|
||||||
|
return NULL;
|
||||||
}
|
}
|
||||||
void rpcClose(void* arg) {
|
void rpcClose(void* arg) {
|
||||||
tInfo("start to close rpc");
|
tInfo("start to close rpc");
|
||||||
|
@ -186,7 +192,7 @@ int rpcSetDefaultAddr(void* thandle, const char* ip, const char* fqdn) {
|
||||||
return transSetDefaultAddr(thandle, ip, fqdn);
|
return transSetDefaultAddr(thandle, ip, fqdn);
|
||||||
}
|
}
|
||||||
// server only
|
// server only
|
||||||
void rpcSetIpWhite(void* thandle, void* arg) { transSetIpWhiteList(thandle, arg, NULL); }
|
int32_t rpcSetIpWhite(void* thandle, void* arg) { return transSetIpWhiteList(thandle, arg, NULL); }
|
||||||
|
|
||||||
void* rpcAllocHandle() { return (void*)transAllocHandle(); }
|
void* rpcAllocHandle() { return (void*)transAllocHandle(); }
|
||||||
|
|
||||||
|
@ -202,10 +208,8 @@ int32_t rpcCvtErrCode(int32_t code) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t rpcInit() {
|
int32_t rpcInit() { return transInit(); }
|
||||||
transInit();
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
void rpcCleanup(void) {
|
void rpcCleanup(void) {
|
||||||
transCleanup();
|
transCleanup();
|
||||||
transHttpEnvDestroy();
|
transHttpEnvDestroy();
|
||||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -67,7 +67,11 @@ int32_t transDecompressMsg(char** msg, int32_t len) {
|
||||||
STransCompMsg* pComp = (STransCompMsg*)pCont;
|
STransCompMsg* pComp = (STransCompMsg*)pCont;
|
||||||
int32_t oriLen = htonl(pComp->contLen);
|
int32_t oriLen = htonl(pComp->contLen);
|
||||||
|
|
||||||
char* buf = taosMemoryCalloc(1, oriLen + sizeof(STransMsgHead));
|
char* buf = taosMemoryCalloc(1, oriLen + sizeof(STransMsgHead));
|
||||||
|
if (buf == NULL) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
STransMsgHead* pNewHead = (STransMsgHead*)buf;
|
STransMsgHead* pNewHead = (STransMsgHead*)buf;
|
||||||
int32_t decompLen = LZ4_decompress_safe(pCont + sizeof(STransCompMsg), (char*)pNewHead->content,
|
int32_t decompLen = LZ4_decompress_safe(pCont + sizeof(STransCompMsg), (char*)pNewHead->content,
|
||||||
len - sizeof(STransMsgHead) - sizeof(STransCompMsg), oriLen);
|
len - sizeof(STransMsgHead) - sizeof(STransCompMsg), oriLen);
|
||||||
|
@ -78,7 +82,7 @@ int32_t transDecompressMsg(char** msg, int32_t len) {
|
||||||
taosMemoryFree(pHead);
|
taosMemoryFree(pHead);
|
||||||
*msg = buf;
|
*msg = buf;
|
||||||
if (decompLen != oriLen) {
|
if (decompLen != oriLen) {
|
||||||
return -1;
|
return TSDB_CODE_INVALID_MSG;
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -98,26 +102,33 @@ int transSockInfo2Str(struct sockaddr* sockname, char* dst) {
|
||||||
sprintf(dst, "%s:%d", buf, ntohs(addr.sin_port));
|
sprintf(dst, "%s:%d", buf, ntohs(addr.sin_port));
|
||||||
return r;
|
return r;
|
||||||
}
|
}
|
||||||
int transInitBuffer(SConnBuffer* buf) {
|
int32_t transInitBuffer(SConnBuffer* buf) {
|
||||||
buf->cap = BUFFER_CAP;
|
|
||||||
buf->buf = taosMemoryCalloc(1, BUFFER_CAP);
|
buf->buf = taosMemoryCalloc(1, BUFFER_CAP);
|
||||||
|
if (buf->buf == NULL) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
|
buf->cap = BUFFER_CAP;
|
||||||
buf->left = -1;
|
buf->left = -1;
|
||||||
buf->len = 0;
|
buf->len = 0;
|
||||||
buf->total = 0;
|
buf->total = 0;
|
||||||
buf->invalid = 0;
|
buf->invalid = 0;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
int transDestroyBuffer(SConnBuffer* p) {
|
int32_t transDestroyBuffer(SConnBuffer* p) {
|
||||||
taosMemoryFree(p->buf);
|
taosMemoryFree(p->buf);
|
||||||
p->buf = NULL;
|
p->buf = NULL;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int transClearBuffer(SConnBuffer* buf) {
|
int32_t transClearBuffer(SConnBuffer* buf) {
|
||||||
SConnBuffer* p = buf;
|
SConnBuffer* p = buf;
|
||||||
if (p->cap > BUFFER_CAP) {
|
if (p->cap > BUFFER_CAP) {
|
||||||
p->cap = BUFFER_CAP;
|
p->cap = BUFFER_CAP;
|
||||||
p->buf = taosMemoryRealloc(p->buf, BUFFER_CAP);
|
p->buf = taosMemoryRealloc(p->buf, BUFFER_CAP);
|
||||||
|
if (p->buf == NULL) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
p->left = -1;
|
p->left = -1;
|
||||||
p->len = 0;
|
p->len = 0;
|
||||||
|
@ -126,27 +137,31 @@ int transClearBuffer(SConnBuffer* buf) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int transDumpFromBuffer(SConnBuffer* connBuf, char** buf, int8_t resetBuf) {
|
int32_t transDumpFromBuffer(SConnBuffer* connBuf, char** buf, int8_t resetBuf) {
|
||||||
static const int HEADSIZE = sizeof(STransMsgHead);
|
static const int HEADSIZE = sizeof(STransMsgHead);
|
||||||
|
int32_t code = 0;
|
||||||
SConnBuffer* p = connBuf;
|
SConnBuffer* p = connBuf;
|
||||||
if (p->left != 0 || p->total <= 0) {
|
if (p->left != 0 || p->total <= 0) {
|
||||||
return -1;
|
return TSDB_CODE_INVALID_MSG;
|
||||||
}
|
}
|
||||||
int total = p->total;
|
int total = p->total;
|
||||||
if (total >= HEADSIZE && !p->invalid) {
|
if (total >= HEADSIZE && !p->invalid) {
|
||||||
*buf = taosMemoryCalloc(1, total);
|
*buf = taosMemoryCalloc(1, total);
|
||||||
|
if (*buf == NULL) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
memcpy(*buf, p->buf, total);
|
memcpy(*buf, p->buf, total);
|
||||||
if (transResetBuffer(connBuf, resetBuf) < 0) {
|
if ((code = transResetBuffer(connBuf, resetBuf)) < 0) {
|
||||||
return -1;
|
return code;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
total = -1;
|
total = -1;
|
||||||
|
return TSDB_CODE_INVALID_MSG;
|
||||||
}
|
}
|
||||||
return total;
|
return total;
|
||||||
}
|
}
|
||||||
|
|
||||||
int transResetBuffer(SConnBuffer* connBuf, int8_t resetBuf) {
|
int32_t transResetBuffer(SConnBuffer* connBuf, int8_t resetBuf) {
|
||||||
SConnBuffer* p = connBuf;
|
SConnBuffer* p = connBuf;
|
||||||
if (p->total < p->len) {
|
if (p->total < p->len) {
|
||||||
int left = p->len - p->total;
|
int left = p->len - p->total;
|
||||||
|
@ -162,15 +177,18 @@ int transResetBuffer(SConnBuffer* connBuf, int8_t resetBuf) {
|
||||||
if (resetBuf) {
|
if (resetBuf) {
|
||||||
p->cap = BUFFER_CAP;
|
p->cap = BUFFER_CAP;
|
||||||
p->buf = taosMemoryRealloc(p->buf, p->cap);
|
p->buf = taosMemoryRealloc(p->buf, p->cap);
|
||||||
|
if (p->buf == NULL) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
ASSERTS(0, "invalid read from sock buf");
|
ASSERTS(0, "invalid read from sock buf");
|
||||||
return -1;
|
return TSDB_CODE_INVALID_MSG;
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf) {
|
int32_t transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf) {
|
||||||
/*
|
/*
|
||||||
* formate of data buffer:
|
* formate of data buffer:
|
||||||
* |<--------------------------data from socket------------------------------->|
|
* |<--------------------------data from socket------------------------------->|
|
||||||
|
@ -187,6 +205,9 @@ int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf) {
|
||||||
} else {
|
} else {
|
||||||
p->cap = p->left + p->len;
|
p->cap = p->left + p->len;
|
||||||
p->buf = taosMemoryRealloc(p->buf, p->cap);
|
p->buf = taosMemoryRealloc(p->buf, p->cap);
|
||||||
|
if (p->buf == NULL) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
uvBuf->base = p->buf + p->len;
|
uvBuf->base = p->buf + p->len;
|
||||||
uvBuf->len = p->left;
|
uvBuf->len = p->left;
|
||||||
}
|
}
|
||||||
|
@ -222,19 +243,19 @@ int transSetConnOption(uv_tcp_t* stream, int keepalive) {
|
||||||
// int ret = uv_tcp_keepalive(stream, 5, 60);
|
// int ret = uv_tcp_keepalive(stream, 5, 60);
|
||||||
}
|
}
|
||||||
|
|
||||||
SAsyncPool* transAsyncPoolCreate(uv_loop_t* loop, int sz, void* arg, AsyncCB cb) {
|
int32_t transAsyncPoolCreate(uv_loop_t* loop, int sz, void* arg, AsyncCB cb, SAsyncPool** pPool) {
|
||||||
SAsyncPool* pool = taosMemoryCalloc(1, sizeof(SAsyncPool));
|
SAsyncPool* pool = taosMemoryCalloc(1, sizeof(SAsyncPool));
|
||||||
if (pool == NULL) {
|
if (pool == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return NULL;
|
// return NULL;
|
||||||
}
|
}
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
pool->nAsync = sz;
|
pool->nAsync = sz;
|
||||||
pool->asyncs = taosMemoryCalloc(1, sizeof(uv_async_t) * pool->nAsync);
|
pool->asyncs = taosMemoryCalloc(1, sizeof(uv_async_t) * pool->nAsync);
|
||||||
if (pool->asyncs == NULL) {
|
if (pool->asyncs == NULL) {
|
||||||
taosMemoryFree(pool);
|
taosMemoryFree(pool);
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return NULL;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int i = 0, err = 0;
|
int i = 0, err = 0;
|
||||||
|
@ -243,7 +264,7 @@ SAsyncPool* transAsyncPoolCreate(uv_loop_t* loop, int sz, void* arg, AsyncCB cb)
|
||||||
|
|
||||||
SAsyncItem* item = taosMemoryCalloc(1, sizeof(SAsyncItem));
|
SAsyncItem* item = taosMemoryCalloc(1, sizeof(SAsyncItem));
|
||||||
if (item == NULL) {
|
if (item == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
item->pThrd = arg;
|
item->pThrd = arg;
|
||||||
|
@ -254,7 +275,7 @@ SAsyncPool* transAsyncPoolCreate(uv_loop_t* loop, int sz, void* arg, AsyncCB cb)
|
||||||
err = uv_async_init(loop, async, cb);
|
err = uv_async_init(loop, async, cb);
|
||||||
if (err != 0) {
|
if (err != 0) {
|
||||||
tError("failed to init async, reason:%s", uv_err_name(err));
|
tError("failed to init async, reason:%s", uv_err_name(err));
|
||||||
terrno = TSDB_CODE_THIRDPARTY_ERROR;
|
code = TSDB_CODE_THIRDPARTY_ERROR;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -264,10 +285,14 @@ SAsyncPool* transAsyncPoolCreate(uv_loop_t* loop, int sz, void* arg, AsyncCB cb)
|
||||||
pool = NULL;
|
pool = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
return pool;
|
*pPool = pool;
|
||||||
|
return 0;
|
||||||
|
// return pool;
|
||||||
}
|
}
|
||||||
|
|
||||||
void transAsyncPoolDestroy(SAsyncPool* pool) {
|
void transAsyncPoolDestroy(SAsyncPool* pool) {
|
||||||
|
if (pool == NULL) return;
|
||||||
|
|
||||||
for (int i = 0; i < pool->nAsync; i++) {
|
for (int i = 0; i < pool->nAsync; i++) {
|
||||||
uv_async_t* async = &(pool->asyncs[i]);
|
uv_async_t* async = &(pool->asyncs[i]);
|
||||||
SAsyncItem* item = async->data;
|
SAsyncItem* item = async->data;
|
||||||
|
@ -289,7 +314,7 @@ bool transAsyncPoolIsEmpty(SAsyncPool* pool) {
|
||||||
}
|
}
|
||||||
int transAsyncSend(SAsyncPool* pool, queue* q) {
|
int transAsyncSend(SAsyncPool* pool, queue* q) {
|
||||||
if (atomic_load_8(&pool->stop) == 1) {
|
if (atomic_load_8(&pool->stop) == 1) {
|
||||||
return -1;
|
return TSDB_CODE_RPC_ASYNC_MODULE_QUIT;
|
||||||
}
|
}
|
||||||
int idx = pool->index % pool->nAsync;
|
int idx = pool->index % pool->nAsync;
|
||||||
|
|
||||||
|
@ -303,7 +328,12 @@ int transAsyncSend(SAsyncPool* pool, queue* q) {
|
||||||
taosThreadMutexLock(&item->mtx);
|
taosThreadMutexLock(&item->mtx);
|
||||||
QUEUE_PUSH(&item->qmsg, q);
|
QUEUE_PUSH(&item->qmsg, q);
|
||||||
taosThreadMutexUnlock(&item->mtx);
|
taosThreadMutexUnlock(&item->mtx);
|
||||||
return uv_async_send(async);
|
int ret = uv_async_send(async);
|
||||||
|
if (ret != 0) {
|
||||||
|
tError("failed to send async,reason:%s", uv_err_name(ret));
|
||||||
|
return TSDB_CODE_THIRDPARTY_ERROR;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void transCtxInit(STransCtx* ctx) {
|
void transCtxInit(STransCtx* ctx) {
|
||||||
|
@ -408,9 +438,14 @@ void transReqQueueClear(queue* q) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void transQueueInit(STransQueue* queue, void (*freeFunc)(const void* arg)) {
|
int32_t transQueueInit(STransQueue* queue, void (*freeFunc)(const void* arg)) {
|
||||||
queue->q = taosArrayInit(2, sizeof(void*));
|
queue->q = taosArrayInit(2, sizeof(void*));
|
||||||
queue->freeFunc = (void (*)(const void*))freeFunc;
|
queue->freeFunc = (void (*)(const void*))freeFunc;
|
||||||
|
|
||||||
|
if (queue->q == NULL) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
bool transQueuePush(STransQueue* queue, void* arg) {
|
bool transQueuePush(STransQueue* queue, void* arg) {
|
||||||
if (queue->q == NULL) {
|
if (queue->q == NULL) {
|
||||||
|
@ -513,20 +548,44 @@ static void transDQTimeout(uv_timer_t* timer) {
|
||||||
uv_timer_start(queue->timer, transDQTimeout, timeout, 0);
|
uv_timer_start(queue->timer, transDQTimeout, timeout, 0);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
int transDQCreate(uv_loop_t* loop, SDelayQueue** queue) {
|
int32_t transDQCreate(uv_loop_t* loop, SDelayQueue** queue) {
|
||||||
uv_timer_t* timer = taosMemoryCalloc(1, sizeof(uv_timer_t));
|
int32_t code = 0;
|
||||||
uv_timer_init(loop, timer);
|
Heap* heap = NULL;
|
||||||
|
uv_timer_t* timer = NULL;
|
||||||
|
SDelayQueue* q = NULL;
|
||||||
|
|
||||||
Heap* heap = heapCreate(timeCompare);
|
timer = taosMemoryCalloc(1, sizeof(uv_timer_t));
|
||||||
|
if (timer == NULL) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
SDelayQueue* q = taosMemoryCalloc(1, sizeof(SDelayQueue));
|
heap = heapCreate(timeCompare);
|
||||||
|
if (heap == NULL) {
|
||||||
|
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _return1);
|
||||||
|
}
|
||||||
|
|
||||||
|
q = taosMemoryCalloc(1, sizeof(SDelayQueue));
|
||||||
|
if (q == NULL) {
|
||||||
|
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _return1);
|
||||||
|
}
|
||||||
q->heap = heap;
|
q->heap = heap;
|
||||||
q->timer = timer;
|
q->timer = timer;
|
||||||
q->loop = loop;
|
q->loop = loop;
|
||||||
q->timer->data = q;
|
q->timer->data = q;
|
||||||
|
|
||||||
|
int err = uv_timer_init(loop, timer);
|
||||||
|
if (err != 0) {
|
||||||
|
TAOS_CHECK_GOTO(TSDB_CODE_THIRDPARTY_ERROR, NULL, _return1);
|
||||||
|
}
|
||||||
|
|
||||||
*queue = q;
|
*queue = q;
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
|
_return1:
|
||||||
|
taosMemoryFree(timer);
|
||||||
|
heapDestroy(heap);
|
||||||
|
taosMemoryFree(q);
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
|
|
||||||
void transDQDestroy(SDelayQueue* queue, void (*freeFunc)(void* arg)) {
|
void transDQDestroy(SDelayQueue* queue, void (*freeFunc)(void* arg)) {
|
||||||
|
@ -578,6 +637,10 @@ void transDQCancel(SDelayQueue* queue, SDelayTask* task) {
|
||||||
SDelayTask* transDQSched(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_t timeoutMs) {
|
SDelayTask* transDQSched(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_t timeoutMs) {
|
||||||
uint64_t now = taosGetTimestampMs();
|
uint64_t now = taosGetTimestampMs();
|
||||||
SDelayTask* task = taosMemoryCalloc(1, sizeof(SDelayTask));
|
SDelayTask* task = taosMemoryCalloc(1, sizeof(SDelayTask));
|
||||||
|
if (task == NULL) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
task->func = func;
|
task->func = func;
|
||||||
task->arg = arg;
|
task->arg = arg;
|
||||||
task->execTime = now + timeoutMs;
|
task->execTime = now + timeoutMs;
|
||||||
|
@ -648,9 +711,13 @@ static void transDestroyEnv() {
|
||||||
transCloseRefMgt(transSyncMsgMgt);
|
transCloseRefMgt(transSyncMsgMgt);
|
||||||
}
|
}
|
||||||
|
|
||||||
void transInit() {
|
int32_t transInit() {
|
||||||
// init env
|
// init env
|
||||||
taosThreadOnce(&transModuleInit, transInitEnv);
|
int32_t code = taosThreadOnce(&transModuleInit, transInitEnv);
|
||||||
|
if (code != 0) {
|
||||||
|
code = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
}
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t transGetRefMgt() { return refMgt; }
|
int32_t transGetRefMgt() { return refMgt; }
|
||||||
|
@ -709,29 +776,6 @@ void transDestroySyncMsg(void* msg) {
|
||||||
taosMemoryFree(pSyncMsg);
|
taosMemoryFree(pSyncMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
// void subnetIp2int(const char* const ip_addr, uint8_t* dst) {
|
|
||||||
// char ip_addr_cpy[20];
|
|
||||||
// char ip[5];
|
|
||||||
|
|
||||||
// tstrncpy(ip_addr_cpy, ip_addr, sizeof(ip_addr_cpy));
|
|
||||||
|
|
||||||
// char *s_start, *s_end;
|
|
||||||
// s_start = ip_addr_cpy;
|
|
||||||
// s_end = ip_addr_cpy;
|
|
||||||
|
|
||||||
// int32_t k = 0;
|
|
||||||
|
|
||||||
// for (k = 0; *s_start != '\0'; s_start = s_end) {
|
|
||||||
// for (s_end = s_start; *s_end != '.' && *s_end != '\0'; s_end++) {
|
|
||||||
// }
|
|
||||||
// if (*s_end == '.') {
|
|
||||||
// *s_end = '\0';
|
|
||||||
// s_end++;
|
|
||||||
// }
|
|
||||||
// dst[k++] = (char)atoi(s_start);
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
|
|
||||||
uint32_t subnetIpRang2Int(SIpV4Range* pRange) {
|
uint32_t subnetIpRang2Int(SIpV4Range* pRange) {
|
||||||
uint32_t ip = pRange->ip;
|
uint32_t ip = pRange->ip;
|
||||||
return ((ip & 0xFF) << 24) | ((ip & 0xFF00) << 8) | ((ip & 0xFF0000) >> 8) | ((ip >> 24) & 0xFF);
|
return ((ip & 0xFF) << 24) | ((ip & 0xFF00) << 8) | ((ip & 0xFF0000) >> 8) | ((ip >> 24) & 0xFF);
|
||||||
|
@ -778,7 +822,11 @@ int32_t transUtilSIpRangeToStr(SIpV4Range* pRange, char* buf) {
|
||||||
struct in_addr addr;
|
struct in_addr addr;
|
||||||
addr.s_addr = pRange->ip;
|
addr.s_addr = pRange->ip;
|
||||||
|
|
||||||
uv_inet_ntop(AF_INET, &addr, buf, 32);
|
int32_t err = uv_inet_ntop(AF_INET, &addr, buf, 32);
|
||||||
|
if (err != 0) {
|
||||||
|
tError("failed to convert ip to string, reason:%s", uv_strerror(err));
|
||||||
|
return TSDB_CODE_THIRDPARTY_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
len = strlen(buf);
|
len = strlen(buf);
|
||||||
|
|
||||||
|
@ -794,14 +842,23 @@ int32_t transUtilSWhiteListToStr(SIpWhiteList* pList, char** ppBuf) {
|
||||||
*ppBuf = NULL;
|
*ppBuf = NULL;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t len = 0;
|
int32_t len = 0;
|
||||||
char* pBuf = taosMemoryCalloc(1, pList->num * 36);
|
char* pBuf = taosMemoryCalloc(1, pList->num * 36);
|
||||||
|
if (pBuf == NULL) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
for (int i = 0; i < pList->num; i++) {
|
for (int i = 0; i < pList->num; i++) {
|
||||||
SIpV4Range* pRange = &pList->pIpRange[i];
|
SIpV4Range* pRange = &pList->pIpRange[i];
|
||||||
|
|
||||||
char tbuf[32] = {0};
|
char tbuf[32] = {0};
|
||||||
int tlen = transUtilSIpRangeToStr(pRange, tbuf);
|
int tlen = transUtilSIpRangeToStr(pRange, tbuf);
|
||||||
|
if (tlen < 0) {
|
||||||
|
taosMemoryFree(pBuf);
|
||||||
|
return tlen;
|
||||||
|
}
|
||||||
|
|
||||||
len += sprintf(pBuf + len, "%s,", tbuf);
|
len += sprintf(pBuf + len, "%s,", tbuf);
|
||||||
}
|
}
|
||||||
if (len > 0) {
|
if (len > 0) {
|
||||||
|
|
|
@ -122,7 +122,7 @@ typedef struct SServerObj {
|
||||||
|
|
||||||
SIpWhiteListTab* uvWhiteListCreate();
|
SIpWhiteListTab* uvWhiteListCreate();
|
||||||
void uvWhiteListDestroy(SIpWhiteListTab* pWhite);
|
void uvWhiteListDestroy(SIpWhiteListTab* pWhite);
|
||||||
void uvWhiteListAdd(SIpWhiteListTab* pWhite, char* user, SIpWhiteList* pList, int64_t ver);
|
int32_t uvWhiteListAdd(SIpWhiteListTab* pWhite, char* user, SIpWhiteList* pList, int64_t ver);
|
||||||
void uvWhiteListUpdate(SIpWhiteListTab* pWhite, SHashObj* pTable);
|
void uvWhiteListUpdate(SIpWhiteListTab* pWhite, SHashObj* pTable);
|
||||||
bool uvWhiteListCheckConn(SIpWhiteListTab* pWhite, SSvrConn* pConn);
|
bool uvWhiteListCheckConn(SIpWhiteListTab* pWhite, SSvrConn* pConn);
|
||||||
bool uvWhiteListFilte(SIpWhiteListTab* pWhite, char* user, uint32_t ip, int64_t ver);
|
bool uvWhiteListFilte(SIpWhiteListTab* pWhite, char* user, uint32_t ip, int64_t ver);
|
||||||
|
@ -164,7 +164,7 @@ static FORCE_INLINE SSvrConn* createConn(void* hThrd);
|
||||||
static FORCE_INLINE void destroyConn(SSvrConn* conn, bool clear /*clear handle or not*/);
|
static FORCE_INLINE void destroyConn(SSvrConn* conn, bool clear /*clear handle or not*/);
|
||||||
static FORCE_INLINE void destroyConnRegArg(SSvrConn* conn);
|
static FORCE_INLINE void destroyConnRegArg(SSvrConn* conn);
|
||||||
|
|
||||||
static int reallocConnRef(SSvrConn* conn);
|
static int32_t reallocConnRef(SSvrConn* conn);
|
||||||
|
|
||||||
static void uvHandleQuit(SSvrMsg* msg, SWorkThrd* thrd);
|
static void uvHandleQuit(SSvrMsg* msg, SWorkThrd* thrd);
|
||||||
static void uvHandleRelease(SSvrMsg* msg, SWorkThrd* thrd);
|
static void uvHandleRelease(SSvrMsg* msg, SWorkThrd* thrd);
|
||||||
|
@ -181,8 +181,8 @@ static void* transWorkerThread(void* arg);
|
||||||
static void* transAcceptThread(void* arg);
|
static void* transAcceptThread(void* arg);
|
||||||
|
|
||||||
// add handle loop
|
// add handle loop
|
||||||
static bool addHandleToWorkloop(SWorkThrd* pThrd, char* pipeName);
|
static int32_t addHandleToWorkloop(SWorkThrd* pThrd, char* pipeName);
|
||||||
static bool addHandleToAcceptloop(void* arg);
|
static int32_t addHandleToAcceptloop(void* arg);
|
||||||
|
|
||||||
#define SRV_RELEASE_UV(loop) \
|
#define SRV_RELEASE_UV(loop) \
|
||||||
do { \
|
do { \
|
||||||
|
@ -202,7 +202,11 @@ static bool addHandleToAcceptloop(void* arg);
|
||||||
void uvAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
|
void uvAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
|
||||||
SSvrConn* conn = handle->data;
|
SSvrConn* conn = handle->data;
|
||||||
SConnBuffer* pBuf = &conn->readBuf;
|
SConnBuffer* pBuf = &conn->readBuf;
|
||||||
transAllocBuffer(pBuf, buf);
|
int32_t code = transAllocBuffer(pBuf, buf);
|
||||||
|
if (code < 0) {
|
||||||
|
tError("conn %p failed to alloc buffer, since %s", conn, tstrerror(code));
|
||||||
|
// destroyConn(conn, true);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// refers specifically to query or insert timeout
|
// refers specifically to query or insert timeout
|
||||||
|
@ -221,8 +225,16 @@ static bool uvCheckIp(SIpV4Range* pRange, int32_t ip) {
|
||||||
}
|
}
|
||||||
SIpWhiteListTab* uvWhiteListCreate() {
|
SIpWhiteListTab* uvWhiteListCreate() {
|
||||||
SIpWhiteListTab* pWhiteList = taosMemoryCalloc(1, sizeof(SIpWhiteListTab));
|
SIpWhiteListTab* pWhiteList = taosMemoryCalloc(1, sizeof(SIpWhiteListTab));
|
||||||
|
if (pWhiteList == NULL) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
pWhiteList->pList = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), 0, HASH_NO_LOCK);
|
pWhiteList->pList = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), 0, HASH_NO_LOCK);
|
||||||
|
if (pWhiteList->pList == NULL) {
|
||||||
|
taosMemoryFree(pWhiteList);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
pWhiteList->ver = -1;
|
pWhiteList->ver = -1;
|
||||||
return pWhiteList;
|
return pWhiteList;
|
||||||
}
|
}
|
||||||
|
@ -240,17 +252,26 @@ void uvWhiteListDestroy(SIpWhiteListTab* pWhite) {
|
||||||
taosMemoryFree(pWhite);
|
taosMemoryFree(pWhite);
|
||||||
}
|
}
|
||||||
|
|
||||||
void uvWhiteListToStr(SWhiteUserList* plist, char* user, char** ppBuf) {
|
int32_t uvWhiteListToStr(SWhiteUserList* plist, char* user, char** ppBuf) {
|
||||||
char* tmp = NULL;
|
char* tmp = NULL;
|
||||||
int32_t tlen = transUtilSWhiteListToStr(plist->pList, &tmp);
|
int32_t tlen = transUtilSWhiteListToStr(plist->pList, &tmp);
|
||||||
|
if (tlen < 0) {
|
||||||
|
return tlen;
|
||||||
|
}
|
||||||
|
|
||||||
|
char* pBuf = taosMemoryCalloc(1, tlen + 64);
|
||||||
|
if (pBuf == NULL) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
char* pBuf = taosMemoryCalloc(1, tlen + 64);
|
|
||||||
int32_t len = sprintf(pBuf, "user: %s, ver: %" PRId64 ", ip: {%s}", user, plist->ver, tmp);
|
int32_t len = sprintf(pBuf, "user: %s, ver: %" PRId64 ", ip: {%s}", user, plist->ver, tmp);
|
||||||
taosMemoryFree(tmp);
|
taosMemoryFree(tmp);
|
||||||
|
|
||||||
*ppBuf = pBuf;
|
*ppBuf = pBuf;
|
||||||
|
return len;
|
||||||
}
|
}
|
||||||
void uvWhiteListDebug(SIpWhiteListTab* pWrite) {
|
void uvWhiteListDebug(SIpWhiteListTab* pWrite) {
|
||||||
|
int32_t code = 0;
|
||||||
SHashObj* pWhiteList = pWrite->pList;
|
SHashObj* pWhiteList = pWrite->pList;
|
||||||
void* pIter = taosHashIterate(pWhiteList, NULL);
|
void* pIter = taosHashIterate(pWhiteList, NULL);
|
||||||
while (pIter) {
|
while (pIter) {
|
||||||
|
@ -262,23 +283,35 @@ void uvWhiteListDebug(SIpWhiteListTab* pWrite) {
|
||||||
SWhiteUserList* pUserList = *(SWhiteUserList**)pIter;
|
SWhiteUserList* pUserList = *(SWhiteUserList**)pIter;
|
||||||
|
|
||||||
char* buf = NULL;
|
char* buf = NULL;
|
||||||
uvWhiteListToStr(pUserList, user, &buf);
|
|
||||||
tDebug("ip-white-list %s", buf);
|
code = uvWhiteListToStr(pUserList, user, &buf);
|
||||||
|
if (code != 0) {
|
||||||
|
tDebug("ip-white-list failed to debug to str since %s", buf);
|
||||||
|
}
|
||||||
taosMemoryFree(buf);
|
taosMemoryFree(buf);
|
||||||
pIter = taosHashIterate(pWhiteList, pIter);
|
pIter = taosHashIterate(pWhiteList, pIter);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
void uvWhiteListAdd(SIpWhiteListTab* pWhite, char* user, SIpWhiteList* plist, int64_t ver) {
|
int32_t uvWhiteListAdd(SIpWhiteListTab* pWhite, char* user, SIpWhiteList* plist, int64_t ver) {
|
||||||
|
int32_t code = 0;
|
||||||
SHashObj* pWhiteList = pWhite->pList;
|
SHashObj* pWhiteList = pWhite->pList;
|
||||||
|
|
||||||
SWhiteUserList** ppUserList = taosHashGet(pWhiteList, user, strlen(user));
|
SWhiteUserList** ppUserList = taosHashGet(pWhiteList, user, strlen(user));
|
||||||
if (ppUserList == NULL || *ppUserList == NULL) {
|
if (ppUserList == NULL || *ppUserList == NULL) {
|
||||||
SWhiteUserList* pUserList = taosMemoryCalloc(1, sizeof(SWhiteUserList));
|
SWhiteUserList* pUserList = taosMemoryCalloc(1, sizeof(SWhiteUserList));
|
||||||
|
if (pUserList == NULL) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
pUserList->ver = ver;
|
pUserList->ver = ver;
|
||||||
|
|
||||||
pUserList->pList = plist;
|
pUserList->pList = plist;
|
||||||
|
|
||||||
taosHashPut(pWhiteList, user, strlen(user), &pUserList, sizeof(void*));
|
code = taosHashPut(pWhiteList, user, strlen(user), &pUserList, sizeof(void*));
|
||||||
|
if (code != 0) {
|
||||||
|
taosMemoryFree(pUserList);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
SWhiteUserList* pUserList = *ppUserList;
|
SWhiteUserList* pUserList = *ppUserList;
|
||||||
|
|
||||||
|
@ -287,6 +320,7 @@ void uvWhiteListAdd(SIpWhiteListTab* pWhite, char* user, SIpWhiteList* plist, in
|
||||||
pUserList->pList = plist;
|
pUserList->pList = plist;
|
||||||
}
|
}
|
||||||
uvWhiteListDebug(pWhite);
|
uvWhiteListDebug(pWhite);
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void uvWhiteListUpdate(SIpWhiteListTab* pWhite, SHashObj* pTable) {
|
void uvWhiteListUpdate(SIpWhiteListTab* pWhite, SHashObj* pTable) {
|
||||||
|
@ -584,6 +618,10 @@ static int uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) {
|
||||||
STransMsg* pMsg = &smsg->msg;
|
STransMsg* pMsg = &smsg->msg;
|
||||||
if (pMsg->pCont == 0) {
|
if (pMsg->pCont == 0) {
|
||||||
pMsg->pCont = (void*)rpcMallocCont(0);
|
pMsg->pCont = (void*)rpcMallocCont(0);
|
||||||
|
if (pMsg->pCont == NULL) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
pMsg->contLen = 0;
|
pMsg->contLen = 0;
|
||||||
}
|
}
|
||||||
STransMsgHead* pHead = transHeadFromCont(pMsg->pCont);
|
STransMsgHead* pHead = transHeadFromCont(pMsg->pCont);
|
||||||
|
@ -598,7 +636,7 @@ static int uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) {
|
||||||
if (pConn->inType == TDMT_SCH_DROP_TASK && pMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID) {
|
if (pConn->inType == TDMT_SCH_DROP_TASK && pMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID) {
|
||||||
transQueuePop(&pConn->srvMsgs);
|
transQueuePop(&pConn->srvMsgs);
|
||||||
destroySmsg(smsg);
|
destroySmsg(smsg);
|
||||||
return -1;
|
return TSDB_CODE_INVALID_MSG;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pConn->status == ConnNormal) {
|
if (pConn->status == ConnNormal) {
|
||||||
|
@ -764,7 +802,11 @@ static void uvShutDownCb(uv_shutdown_t* req, int status) {
|
||||||
}
|
}
|
||||||
static bool uvRecvReleaseReq(SSvrConn* pConn, STransMsgHead* pHead) {
|
static bool uvRecvReleaseReq(SSvrConn* pConn, STransMsgHead* pHead) {
|
||||||
if ((pHead)->release == 1 && (pHead->msgLen) == sizeof(*pHead)) {
|
if ((pHead)->release == 1 && (pHead->msgLen) == sizeof(*pHead)) {
|
||||||
reallocConnRef(pConn);
|
int32_t code = reallocConnRef(pConn);
|
||||||
|
if (code != 0) {
|
||||||
|
destroyConn(pConn, true);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
tTrace("conn %p received release request", pConn);
|
tTrace("conn %p received release request", pConn);
|
||||||
|
|
||||||
STraceId traceId = pHead->traceId;
|
STraceId traceId = pHead->traceId;
|
||||||
|
@ -935,23 +977,21 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// uv_handle_type pending = uv_pipe_pending_type(pipe);
|
|
||||||
|
|
||||||
SSvrConn* pConn = createConn(pThrd);
|
SSvrConn* pConn = createConn(pThrd);
|
||||||
|
if (pConn == NULL) {
|
||||||
|
uv_close((uv_handle_t*)q, NULL);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
pConn->pTransInst = pThrd->pTransInst;
|
// pConn->pTransInst = pThrd->pTransInst;
|
||||||
/* init conn timer*/
|
// /* init conn timer*/
|
||||||
// uv_timer_init(pThrd->loop, &pConn->pTimer);
|
// // uv_timer_init(pThrd->loop, &pConn->pTimer);
|
||||||
// pConn->pTimer.data = pConn;
|
// // pConn->pTimer.data = pConn;
|
||||||
|
// pConn->hostThrd = pThrd;
|
||||||
pConn->hostThrd = pThrd;
|
// // init client handle
|
||||||
|
// pConn->pTcp = (uv_tcp_t*)taosMemoryMalloc(sizeof(uv_tcp_t));
|
||||||
// init client handle
|
// uv_tcp_init(pThrd->loop, pConn->pTcp);
|
||||||
pConn->pTcp = (uv_tcp_t*)taosMemoryMalloc(sizeof(uv_tcp_t));
|
// pConn->pTcp->data = pConn;
|
||||||
uv_tcp_init(pThrd->loop, pConn->pTcp);
|
|
||||||
pConn->pTcp->data = pConn;
|
|
||||||
|
|
||||||
// transSetConnOption((uv_tcp_t*)pConn->pTcp);
|
|
||||||
|
|
||||||
if (uv_accept(q, (uv_stream_t*)(pConn->pTcp)) == 0) {
|
if (uv_accept(q, (uv_stream_t*)(pConn->pTcp)) == 0) {
|
||||||
uv_os_fd_t fd;
|
uv_os_fd_t fd;
|
||||||
|
@ -1005,17 +1045,36 @@ void uvOnPipeConnectionCb(uv_connect_t* connect, int status) {
|
||||||
SWorkThrd* pThrd = container_of(connect, SWorkThrd, connect_req);
|
SWorkThrd* pThrd = container_of(connect, SWorkThrd, connect_req);
|
||||||
uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb);
|
uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb);
|
||||||
}
|
}
|
||||||
static bool addHandleToWorkloop(SWorkThrd* pThrd, char* pipeName) {
|
static int32_t addHandleToWorkloop(SWorkThrd* pThrd, char* pipeName) {
|
||||||
|
int32_t code = 0;
|
||||||
pThrd->loop = (uv_loop_t*)taosMemoryMalloc(sizeof(uv_loop_t));
|
pThrd->loop = (uv_loop_t*)taosMemoryMalloc(sizeof(uv_loop_t));
|
||||||
if (0 != uv_loop_init(pThrd->loop)) {
|
if (pThrd->loop == NULL) {
|
||||||
return false;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
|
if ((code = uv_loop_init(pThrd->loop)) != 0) {
|
||||||
|
tError("failed to init loop since %s", uv_err_name(code));
|
||||||
|
return TSDB_CODE_THIRDPARTY_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
#if defined(WINDOWS) || defined(DARWIN)
|
#if defined(WINDOWS) || defined(DARWIN)
|
||||||
uv_pipe_init(pThrd->loop, pThrd->pipe, 1);
|
code = uv_pipe_init(pThrd->loop, pThrd->pipe, 1);
|
||||||
|
if (code != 0) {
|
||||||
|
tError("failed to init pip since %s", uv_err_name(code));
|
||||||
|
return TSDB_CODE_THIRDPARTY_ERROR;
|
||||||
|
}
|
||||||
#else
|
#else
|
||||||
uv_pipe_init(pThrd->loop, pThrd->pipe, 1);
|
code = uv_pipe_init(pThrd->loop, pThrd->pipe, 1);
|
||||||
uv_pipe_open(pThrd->pipe, pThrd->fd);
|
if (code != 0) {
|
||||||
|
tError("failed to init pip since %s", uv_err_name(code));
|
||||||
|
return TSDB_CODE_THIRDPARTY_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
|
code = uv_pipe_open(pThrd->pipe, pThrd->fd);
|
||||||
|
if (code != 0) {
|
||||||
|
tError("failed to open pip since %s", uv_err_name(code));
|
||||||
|
return TSDB_CODE_THIRDPARTY_ERROR;
|
||||||
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
pThrd->pipe->data = pThrd;
|
pThrd->pipe->data = pThrd;
|
||||||
|
@ -1023,50 +1082,86 @@ static bool addHandleToWorkloop(SWorkThrd* pThrd, char* pipeName) {
|
||||||
QUEUE_INIT(&pThrd->msg);
|
QUEUE_INIT(&pThrd->msg);
|
||||||
|
|
||||||
pThrd->prepare = taosMemoryCalloc(1, sizeof(uv_prepare_t));
|
pThrd->prepare = taosMemoryCalloc(1, sizeof(uv_prepare_t));
|
||||||
uv_prepare_init(pThrd->loop, pThrd->prepare);
|
if (pThrd->prepare == NULL) {
|
||||||
uv_prepare_start(pThrd->prepare, uvPrepareCb);
|
tError("failed to init prepare");
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
|
code = uv_prepare_init(pThrd->loop, pThrd->prepare);
|
||||||
|
if (code != 0) {
|
||||||
|
tError("failed to init prepare since %s", uv_err_name(code));
|
||||||
|
return TSDB_CODE_THIRDPARTY_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
|
code = uv_prepare_start(pThrd->prepare, uvPrepareCb);
|
||||||
|
if (code != 0) {
|
||||||
|
tError("failed to start prepare since %s", uv_err_name(code));
|
||||||
|
return TSDB_CODE_THIRDPARTY_ERROR;
|
||||||
|
}
|
||||||
pThrd->prepare->data = pThrd;
|
pThrd->prepare->data = pThrd;
|
||||||
|
|
||||||
// conn set
|
// conn set
|
||||||
QUEUE_INIT(&pThrd->conn);
|
QUEUE_INIT(&pThrd->conn);
|
||||||
|
|
||||||
pThrd->asyncPool = transAsyncPoolCreate(pThrd->loop, 8, pThrd, uvWorkerAsyncCb);
|
code = transAsyncPoolCreate(pThrd->loop, 8, pThrd, uvWorkerAsyncCb, &pThrd->asyncPool);
|
||||||
|
if (code != 0) {
|
||||||
|
tError("failed to init async pool since:%s", tstrerror(code));
|
||||||
|
return code;
|
||||||
|
}
|
||||||
#if defined(WINDOWS) || defined(DARWIN)
|
#if defined(WINDOWS) || defined(DARWIN)
|
||||||
uv_pipe_connect(&pThrd->connect_req, pThrd->pipe, pipeName, uvOnPipeConnectionCb);
|
uv_pipe_connect(&pThrd->connect_req, pThrd->pipe, pipeName, uvOnPipeConnectionCb);
|
||||||
|
|
||||||
#else
|
#else
|
||||||
uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb);
|
code = uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb);
|
||||||
|
if (code != 0) {
|
||||||
|
tError("failed to start read pipe:%s", uv_err_name(code));
|
||||||
|
return TSDB_CODE_THIRDPARTY_ERROR;
|
||||||
|
}
|
||||||
#endif
|
#endif
|
||||||
return true;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool addHandleToAcceptloop(void* arg) {
|
static int32_t addHandleToAcceptloop(void* arg) {
|
||||||
// impl later
|
// impl later
|
||||||
SServerObj* srv = arg;
|
SServerObj* srv = arg;
|
||||||
|
|
||||||
int err = 0;
|
int code = 0;
|
||||||
if ((err = uv_tcp_init(srv->loop, &srv->server)) != 0) {
|
if ((code = uv_tcp_init(srv->loop, &srv->server)) != 0) {
|
||||||
tError("failed to init accept server:%s", uv_err_name(err));
|
tError("failed to init accept server since %s", uv_err_name(code));
|
||||||
return false;
|
return TSDB_CODE_THIRDPARTY_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
// register an async here to quit server gracefully
|
// register an async here to quit server gracefully
|
||||||
srv->pAcceptAsync = taosMemoryCalloc(1, sizeof(uv_async_t));
|
srv->pAcceptAsync = taosMemoryCalloc(1, sizeof(uv_async_t));
|
||||||
uv_async_init(srv->loop, srv->pAcceptAsync, uvAcceptAsyncCb);
|
if (srv->pAcceptAsync == NULL) {
|
||||||
|
tError("failed to create async since %s", tstrerror(TSDB_CODE_OUT_OF_MEMORY));
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
|
code = uv_async_init(srv->loop, srv->pAcceptAsync, uvAcceptAsyncCb);
|
||||||
|
if (code != 0) {
|
||||||
|
tError("failed to init async since:%s", uv_err_name(code));
|
||||||
|
return TSDB_CODE_THIRDPARTY_ERROR;
|
||||||
|
}
|
||||||
srv->pAcceptAsync->data = srv;
|
srv->pAcceptAsync->data = srv;
|
||||||
|
|
||||||
struct sockaddr_in bind_addr;
|
struct sockaddr_in bind_addr;
|
||||||
uv_ip4_addr("0.0.0.0", srv->port, &bind_addr);
|
if ((code = uv_ip4_addr("0.0.0.0", srv->port, &bind_addr)) != 0) {
|
||||||
if ((err = uv_tcp_bind(&srv->server, (const struct sockaddr*)&bind_addr, 0)) != 0) {
|
tError("failed to bind addr since %s", uv_err_name(code));
|
||||||
tError("failed to bind:%s", uv_err_name(err));
|
return TSDB_CODE_THIRDPARTY_ERROR;
|
||||||
return false;
|
|
||||||
}
|
}
|
||||||
if ((err = uv_listen((uv_stream_t*)&srv->server, 4096 * 2, uvOnAcceptCb)) != 0) {
|
|
||||||
tError("failed to listen:%s", uv_err_name(err));
|
if ((code = uv_tcp_bind(&srv->server, (const struct sockaddr*)&bind_addr, 0)) != 0) {
|
||||||
terrno = TSDB_CODE_RPC_PORT_EADDRINUSE;
|
tError("failed to bind since %s", uv_err_name(code));
|
||||||
return false;
|
return TSDB_CODE_THIRDPARTY_ERROR;
|
||||||
}
|
}
|
||||||
return true;
|
if ((code = uv_listen((uv_stream_t*)&srv->server, 4096 * 2, uvOnAcceptCb)) != 0) {
|
||||||
|
tError("failed to listen since %s", uv_err_name(code));
|
||||||
|
return TSDB_CODE_RPC_PORT_EADDRINUSE;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void* transWorkerThread(void* arg) {
|
void* transWorkerThread(void* arg) {
|
||||||
setThreadName("trans-svr-work");
|
setThreadName("trans-svr-work");
|
||||||
SWorkThrd* pThrd = (SWorkThrd*)arg;
|
SWorkThrd* pThrd = (SWorkThrd*)arg;
|
||||||
|
@ -1076,35 +1171,84 @@ void* transWorkerThread(void* arg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static FORCE_INLINE SSvrConn* createConn(void* hThrd) {
|
static FORCE_INLINE SSvrConn* createConn(void* hThrd) {
|
||||||
|
int32_t code = 0;
|
||||||
SWorkThrd* pThrd = hThrd;
|
SWorkThrd* pThrd = hThrd;
|
||||||
|
|
||||||
SSvrConn* pConn = (SSvrConn*)taosMemoryCalloc(1, sizeof(SSvrConn));
|
SSvrConn* pConn = (SSvrConn*)taosMemoryCalloc(1, sizeof(SSvrConn));
|
||||||
|
if (pConn == NULL) {
|
||||||
|
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _end);
|
||||||
|
}
|
||||||
|
|
||||||
transReqQueueInit(&pConn->wreqQueue);
|
transReqQueueInit(&pConn->wreqQueue);
|
||||||
QUEUE_INIT(&pConn->queue);
|
QUEUE_INIT(&pConn->queue);
|
||||||
|
|
||||||
QUEUE_PUSH(&pThrd->conn, &pConn->queue);
|
QUEUE_PUSH(&pThrd->conn, &pConn->queue);
|
||||||
|
|
||||||
transQueueInit(&pConn->srvMsgs, NULL);
|
if ((code = transQueueInit(&pConn->srvMsgs, NULL)) != 0) {
|
||||||
|
TAOS_CHECK_GOTO(code, NULL, _end);
|
||||||
|
}
|
||||||
|
|
||||||
|
if ((code = transInitBuffer(&pConn->readBuf)) != 0) {
|
||||||
|
TAOS_CHECK_GOTO(code, NULL, _end);
|
||||||
|
}
|
||||||
|
|
||||||
memset(&pConn->regArg, 0, sizeof(pConn->regArg));
|
memset(&pConn->regArg, 0, sizeof(pConn->regArg));
|
||||||
pConn->broken = false;
|
pConn->broken = false;
|
||||||
pConn->status = ConnNormal;
|
pConn->status = ConnNormal;
|
||||||
transInitBuffer(&pConn->readBuf);
|
|
||||||
|
|
||||||
SExHandle* exh = taosMemoryMalloc(sizeof(SExHandle));
|
SExHandle* exh = taosMemoryMalloc(sizeof(SExHandle));
|
||||||
|
if (exh == NULL) {
|
||||||
|
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _end);
|
||||||
|
}
|
||||||
|
|
||||||
exh->handle = pConn;
|
exh->handle = pConn;
|
||||||
exh->pThrd = pThrd;
|
exh->pThrd = pThrd;
|
||||||
exh->refId = transAddExHandle(transGetRefMgt(), exh);
|
exh->refId = transAddExHandle(transGetRefMgt(), exh);
|
||||||
|
if (exh->refId < 0) {
|
||||||
|
TAOS_CHECK_GOTO(TSDB_CODE_REF_INVALID_ID, NULL, _end);
|
||||||
|
}
|
||||||
|
|
||||||
QUEUE_INIT(&exh->q);
|
QUEUE_INIT(&exh->q);
|
||||||
transAcquireExHandle(transGetRefMgt(), exh->refId);
|
|
||||||
|
SExHandle* pSelf = transAcquireExHandle(transGetRefMgt(), exh->refId);
|
||||||
|
if (pSelf != exh) {
|
||||||
|
TAOS_CHECK_GOTO(TSDB_CODE_REF_INVALID_ID, NULL, _end);
|
||||||
|
}
|
||||||
|
|
||||||
STrans* pTransInst = pThrd->pTransInst;
|
STrans* pTransInst = pThrd->pTransInst;
|
||||||
pConn->refId = exh->refId;
|
pConn->refId = exh->refId;
|
||||||
QUEUE_INIT(&exh->q);
|
QUEUE_INIT(&exh->q);
|
||||||
transRefSrvHandle(pConn);
|
transRefSrvHandle(pConn);
|
||||||
tTrace("%s handle %p, conn %p created, refId:%" PRId64, transLabel(pTransInst), exh, pConn, pConn->refId);
|
tTrace("%s handle %p, conn %p created, refId:%" PRId64, transLabel(pTransInst), exh, pConn, pConn->refId);
|
||||||
|
|
||||||
|
pConn->pTransInst = pThrd->pTransInst;
|
||||||
|
/* init conn timer*/
|
||||||
|
// uv_timer_init(pThrd->loop, &pConn->pTimer);
|
||||||
|
// pConn->pTimer.data = pConn;
|
||||||
|
pConn->hostThrd = pThrd;
|
||||||
|
// init client handle
|
||||||
|
pConn->pTcp = (uv_tcp_t*)taosMemoryMalloc(sizeof(uv_tcp_t));
|
||||||
|
if (pConn->pTcp == NULL) {
|
||||||
|
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _end);
|
||||||
|
}
|
||||||
|
|
||||||
|
code = uv_tcp_init(pThrd->loop, pConn->pTcp);
|
||||||
|
if (code != 0) {
|
||||||
|
tError("%s failed to create conn since %s" PRId64, transLabel(pTransInst), uv_strerror(code));
|
||||||
|
TAOS_CHECK_GOTO(TSDB_CODE_THIRDPARTY_ERROR, NULL, _end);
|
||||||
|
}
|
||||||
|
pConn->pTcp->data = pConn;
|
||||||
|
|
||||||
return pConn;
|
return pConn;
|
||||||
|
_end:
|
||||||
|
if (pConn) {
|
||||||
|
transQueueDestroy(&pConn->srvMsgs);
|
||||||
|
transDestroyBuffer(&pConn->readBuf);
|
||||||
|
taosMemoryFree(pConn->pTcp);
|
||||||
|
taosMemoryFree(pConn);
|
||||||
|
pConn = NULL;
|
||||||
|
}
|
||||||
|
tError("%s failed to create conn since %s" PRId64, transLabel(pTransInst), tstrerror(code));
|
||||||
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
static FORCE_INLINE void destroyConn(SSvrConn* conn, bool clear) {
|
static FORCE_INLINE void destroyConn(SSvrConn* conn, bool clear) {
|
||||||
|
@ -1125,16 +1269,33 @@ static FORCE_INLINE void destroyConnRegArg(SSvrConn* conn) {
|
||||||
conn->regArg.init = 0;
|
conn->regArg.init = 0;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
static int reallocConnRef(SSvrConn* conn) {
|
static int32_t reallocConnRef(SSvrConn* conn) {
|
||||||
transReleaseExHandle(transGetRefMgt(), conn->refId);
|
if (conn->refId > 0) {
|
||||||
transRemoveExHandle(transGetRefMgt(), conn->refId);
|
transReleaseExHandle(transGetRefMgt(), conn->refId);
|
||||||
|
transRemoveExHandle(transGetRefMgt(), conn->refId);
|
||||||
|
}
|
||||||
// avoid app continue to send msg on invalid handle
|
// avoid app continue to send msg on invalid handle
|
||||||
SExHandle* exh = taosMemoryMalloc(sizeof(SExHandle));
|
SExHandle* exh = taosMemoryMalloc(sizeof(SExHandle));
|
||||||
|
if (exh == NULL) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
exh->handle = conn;
|
exh->handle = conn;
|
||||||
exh->pThrd = conn->hostThrd;
|
exh->pThrd = conn->hostThrd;
|
||||||
exh->refId = transAddExHandle(transGetRefMgt(), exh);
|
exh->refId = transAddExHandle(transGetRefMgt(), exh);
|
||||||
|
if (exh->refId < 0) {
|
||||||
|
taosMemoryFree(exh);
|
||||||
|
return TSDB_CODE_REF_INVALID_ID;
|
||||||
|
}
|
||||||
|
|
||||||
QUEUE_INIT(&exh->q);
|
QUEUE_INIT(&exh->q);
|
||||||
transAcquireExHandle(transGetRefMgt(), exh->refId);
|
SExHandle* pSelf = transAcquireExHandle(transGetRefMgt(), exh->refId);
|
||||||
|
if (pSelf != exh) {
|
||||||
|
tError("conn %p failed to acquire handle", conn);
|
||||||
|
taosMemoryFree(exh);
|
||||||
|
return TSDB_CODE_REF_INVALID_ID;
|
||||||
|
}
|
||||||
|
|
||||||
conn->refId = exh->refId;
|
conn->refId = exh->refId;
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -1204,24 +1365,41 @@ static void uvPipeListenCb(uv_stream_t* handle, int status) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) {
|
void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) {
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
SServerObj* srv = taosMemoryCalloc(1, sizeof(SServerObj));
|
SServerObj* srv = taosMemoryCalloc(1, sizeof(SServerObj));
|
||||||
srv->loop = (uv_loop_t*)taosMemoryMalloc(sizeof(uv_loop_t));
|
if (srv == NULL) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
tError("failed to init server since: %s", tstrerror(code));
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
srv->ip = ip;
|
||||||
|
srv->port = port;
|
||||||
srv->numOfThreads = numOfThreads;
|
srv->numOfThreads = numOfThreads;
|
||||||
srv->workerIdx = 0;
|
srv->workerIdx = 0;
|
||||||
srv->numOfWorkerReady = 0;
|
srv->numOfWorkerReady = 0;
|
||||||
|
srv->loop = (uv_loop_t*)taosMemoryMalloc(sizeof(uv_loop_t));
|
||||||
srv->pThreadObj = (SWorkThrd**)taosMemoryCalloc(srv->numOfThreads, sizeof(SWorkThrd*));
|
srv->pThreadObj = (SWorkThrd**)taosMemoryCalloc(srv->numOfThreads, sizeof(SWorkThrd*));
|
||||||
srv->pipe = (uv_pipe_t**)taosMemoryCalloc(srv->numOfThreads, sizeof(uv_pipe_t*));
|
srv->pipe = (uv_pipe_t**)taosMemoryCalloc(srv->numOfThreads, sizeof(uv_pipe_t*));
|
||||||
srv->ip = ip;
|
if (srv->loop == NULL || srv->pThreadObj == NULL || srv->pipe == NULL) {
|
||||||
srv->port = port;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
uv_loop_init(srv->loop);
|
goto End;
|
||||||
|
}
|
||||||
|
|
||||||
char pipeName[PATH_MAX];
|
code = uv_loop_init(srv->loop);
|
||||||
|
if (code != 0) {
|
||||||
|
tError("failed to init server since: %s", uv_err_name(code));
|
||||||
|
code = TSDB_CODE_THIRDPARTY_ERROR;
|
||||||
|
goto End;
|
||||||
|
}
|
||||||
|
|
||||||
if (false == taosValidIpAndPort(srv->ip, srv->port)) {
|
if (false == taosValidIpAndPort(srv->ip, srv->port)) {
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
code = TAOS_SYSTEM_ERROR(errno);
|
||||||
tError("invalid ip/port, %d:%d, reason:%s", srv->ip, srv->port, terrstr());
|
tError("invalid ip/port, %d:%d, reason:%s", srv->ip, srv->port, terrstr());
|
||||||
goto End;
|
goto End;
|
||||||
}
|
}
|
||||||
|
char pipeName[PATH_MAX];
|
||||||
|
|
||||||
#if defined(WINDOWS) || defined(DARWIN)
|
#if defined(WINDOWS) || defined(DARWIN)
|
||||||
int ret = uv_pipe_init(srv->loop, &srv->pipeListen, 0);
|
int ret = uv_pipe_init(srv->loop, &srv->pipeListen, 0);
|
||||||
|
@ -1259,7 +1437,7 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
|
||||||
srv->pipe[i] = (uv_pipe_t*)taosMemoryCalloc(2, sizeof(uv_pipe_t));
|
srv->pipe[i] = (uv_pipe_t*)taosMemoryCalloc(2, sizeof(uv_pipe_t));
|
||||||
thrd->pipe = &(srv->pipe[i][1]); // init read
|
thrd->pipe = &(srv->pipe[i][1]); // init read
|
||||||
|
|
||||||
if (false == addHandleToWorkloop(thrd, pipeName)) {
|
if ((code = addHandleToWorkloop(thrd, pipeName)) != 0) {
|
||||||
goto End;
|
goto End;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1276,27 +1454,53 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
|
||||||
|
|
||||||
for (int i = 0; i < srv->numOfThreads; i++) {
|
for (int i = 0; i < srv->numOfThreads; i++) {
|
||||||
SWorkThrd* thrd = (SWorkThrd*)taosMemoryCalloc(1, sizeof(SWorkThrd));
|
SWorkThrd* thrd = (SWorkThrd*)taosMemoryCalloc(1, sizeof(SWorkThrd));
|
||||||
|
if (thrd == NULL) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto End;
|
||||||
|
}
|
||||||
|
|
||||||
thrd->pTransInst = shandle;
|
thrd->pTransInst = shandle;
|
||||||
thrd->quit = false;
|
thrd->quit = false;
|
||||||
thrd->pTransInst = shandle;
|
thrd->pTransInst = shandle;
|
||||||
thrd->pWhiteList = uvWhiteListCreate();
|
thrd->pWhiteList = uvWhiteListCreate();
|
||||||
|
if (thrd->pWhiteList == NULL) {
|
||||||
srv->pipe[i] = (uv_pipe_t*)taosMemoryCalloc(2, sizeof(uv_pipe_t));
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
srv->pThreadObj[i] = thrd;
|
|
||||||
|
|
||||||
uv_os_sock_t fds[2];
|
|
||||||
if (uv_socketpair(SOCK_STREAM, 0, fds, UV_NONBLOCK_PIPE, UV_NONBLOCK_PIPE) != 0) {
|
|
||||||
goto End;
|
goto End;
|
||||||
}
|
}
|
||||||
|
|
||||||
uv_pipe_init(srv->loop, &(srv->pipe[i][0]), 1);
|
srv->pipe[i] = (uv_pipe_t*)taosMemoryCalloc(2, sizeof(uv_pipe_t));
|
||||||
uv_pipe_open(&(srv->pipe[i][0]), fds[1]);
|
if (srv->pipe[i] == NULL) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto End;
|
||||||
|
}
|
||||||
|
|
||||||
|
srv->pThreadObj[i] = thrd;
|
||||||
|
|
||||||
|
uv_os_sock_t fds[2];
|
||||||
|
if ((code = uv_socketpair(SOCK_STREAM, 0, fds, UV_NONBLOCK_PIPE, UV_NONBLOCK_PIPE)) != 0) {
|
||||||
|
tError("failed to create pipe, errmsg: %s", uv_err_name(code));
|
||||||
|
code = TSDB_CODE_THIRDPARTY_ERROR;
|
||||||
|
goto End;
|
||||||
|
}
|
||||||
|
|
||||||
|
code = uv_pipe_init(srv->loop, &(srv->pipe[i][0]), 1);
|
||||||
|
if (code != 0) {
|
||||||
|
tError("failed to init pipe, errmsg: %s", uv_err_name(code));
|
||||||
|
code = TSDB_CODE_THIRDPARTY_ERROR;
|
||||||
|
goto End;
|
||||||
|
}
|
||||||
|
|
||||||
|
code = uv_pipe_open(&(srv->pipe[i][0]), fds[1]);
|
||||||
|
if (code != 0) {
|
||||||
|
tError("failed to init pipe, errmsg: %s", uv_err_name(code));
|
||||||
|
code = TSDB_CODE_THIRDPARTY_ERROR;
|
||||||
|
goto End;
|
||||||
|
}
|
||||||
|
|
||||||
thrd->pipe = &(srv->pipe[i][1]); // init read
|
thrd->pipe = &(srv->pipe[i][1]); // init read
|
||||||
thrd->fd = fds[0];
|
thrd->fd = fds[0];
|
||||||
|
|
||||||
if (false == addHandleToWorkloop(thrd, pipeName)) {
|
if ((code = addHandleToWorkloop(thrd, pipeName)) != 0) {
|
||||||
goto End;
|
goto End;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1311,15 +1515,17 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
if (false == addHandleToAcceptloop(srv)) {
|
if ((code = addHandleToAcceptloop(srv)) != 0) {
|
||||||
goto End;
|
goto End;
|
||||||
}
|
}
|
||||||
|
|
||||||
int err = taosThreadCreate(&srv->thread, NULL, transAcceptThread, (void*)srv);
|
code = taosThreadCreate(&srv->thread, NULL, transAcceptThread, (void*)srv);
|
||||||
if (err == 0) {
|
if (code == 0) {
|
||||||
tDebug("success to create accept-thread");
|
tDebug("success to create accept-thread");
|
||||||
} else {
|
} else {
|
||||||
tError("failed to create accept-thread");
|
code = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
tError("failed to create accept-thread since %s", tstrerror(code));
|
||||||
|
|
||||||
goto End;
|
goto End;
|
||||||
// clear all resource later
|
// clear all resource later
|
||||||
}
|
}
|
||||||
|
@ -1328,6 +1534,7 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
|
||||||
return srv;
|
return srv;
|
||||||
End:
|
End:
|
||||||
transCloseServer(srv);
|
transCloseServer(srv);
|
||||||
|
terrno = code;
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1341,9 +1548,14 @@ void uvHandleQuit(SSvrMsg* msg, SWorkThrd* thrd) {
|
||||||
taosMemoryFree(msg);
|
taosMemoryFree(msg);
|
||||||
}
|
}
|
||||||
void uvHandleRelease(SSvrMsg* msg, SWorkThrd* thrd) {
|
void uvHandleRelease(SSvrMsg* msg, SWorkThrd* thrd) {
|
||||||
|
int32_t code = 0;
|
||||||
SSvrConn* conn = msg->pConn;
|
SSvrConn* conn = msg->pConn;
|
||||||
if (conn->status == ConnAcquire) {
|
if (conn->status == ConnAcquire) {
|
||||||
reallocConnRef(conn);
|
code = reallocConnRef(conn);
|
||||||
|
if (code != 0) {
|
||||||
|
destroyConn(conn, true);
|
||||||
|
return;
|
||||||
|
}
|
||||||
if (!transQueuePush(&conn->srvMsgs, msg)) {
|
if (!transQueuePush(&conn->srvMsgs, msg)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -1387,30 +1599,43 @@ void uvHandleRegister(SSvrMsg* msg, SWorkThrd* thrd) {
|
||||||
}
|
}
|
||||||
void uvHandleUpdate(SSvrMsg* msg, SWorkThrd* thrd) {
|
void uvHandleUpdate(SSvrMsg* msg, SWorkThrd* thrd) {
|
||||||
SUpdateIpWhite* req = msg->arg;
|
SUpdateIpWhite* req = msg->arg;
|
||||||
if (req != NULL) {
|
if (req == NULL) {
|
||||||
for (int i = 0; i < req->numOfUser; i++) {
|
|
||||||
SUpdateUserIpWhite* pUser = &req->pUserIpWhite[i];
|
|
||||||
|
|
||||||
int32_t sz = pUser->numOfRange * sizeof(SIpV4Range);
|
|
||||||
SIpWhiteList* pList = taosMemoryCalloc(1, sz + sizeof(SIpWhiteList));
|
|
||||||
pList->num = pUser->numOfRange;
|
|
||||||
|
|
||||||
memcpy(pList->pIpRange, pUser->pIpRanges, sz);
|
|
||||||
uvWhiteListAdd(thrd->pWhiteList, pUser->user, pList, pUser->ver);
|
|
||||||
}
|
|
||||||
|
|
||||||
thrd->pWhiteList->ver = req->ver;
|
|
||||||
thrd->enableIpWhiteList = 1;
|
|
||||||
|
|
||||||
tFreeSUpdateIpWhiteReq(req);
|
|
||||||
taosMemoryFree(req);
|
|
||||||
} else {
|
|
||||||
tDebug("ip-white-list disable on trans");
|
tDebug("ip-white-list disable on trans");
|
||||||
thrd->enableIpWhiteList = 0;
|
thrd->enableIpWhiteList = 0;
|
||||||
|
taosMemoryFree(msg);
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
|
int32_t code = 0;
|
||||||
|
for (int i = 0; i < req->numOfUser; i++) {
|
||||||
|
SUpdateUserIpWhite* pUser = &req->pUserIpWhite[i];
|
||||||
|
|
||||||
|
int32_t sz = pUser->numOfRange * sizeof(SIpV4Range);
|
||||||
|
|
||||||
|
SIpWhiteList* pList = taosMemoryCalloc(1, sz + sizeof(SIpWhiteList));
|
||||||
|
if (pList == NULL) {
|
||||||
|
tError("failed to create ip-white-list since %s", tstrerror(code));
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
pList->num = pUser->numOfRange;
|
||||||
|
memcpy(pList->pIpRange, pUser->pIpRanges, sz);
|
||||||
|
code = uvWhiteListAdd(thrd->pWhiteList, pUser->user, pList, pUser->ver);
|
||||||
|
if (code != 0) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (code == 0) {
|
||||||
|
thrd->pWhiteList->ver = req->ver;
|
||||||
|
thrd->enableIpWhiteList = 1;
|
||||||
|
} else {
|
||||||
|
tError("failed to update ip-white-list since %s", tstrerror(code));
|
||||||
|
}
|
||||||
|
tFreeSUpdateIpWhiteReq(req);
|
||||||
|
taosMemoryFree(req);
|
||||||
taosMemoryFree(msg);
|
taosMemoryFree(msg);
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void destroyWorkThrd(SWorkThrd* pThrd) {
|
void destroyWorkThrd(SWorkThrd* pThrd) {
|
||||||
if (pThrd == NULL) {
|
if (pThrd == NULL) {
|
||||||
return;
|
return;
|
||||||
|
@ -1483,6 +1708,7 @@ void transUnrefSrvHandle(void* handle) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int transReleaseSrvHandle(void* handle) {
|
int transReleaseSrvHandle(void* handle) {
|
||||||
|
int32_t code = 0;
|
||||||
SRpcHandleInfo* info = handle;
|
SRpcHandleInfo* info = handle;
|
||||||
SExHandle* exh = info->handle;
|
SExHandle* exh = info->handle;
|
||||||
int64_t refId = info->refId;
|
int64_t refId = info->refId;
|
||||||
|
@ -1495,12 +1721,19 @@ int transReleaseSrvHandle(void* handle) {
|
||||||
STransMsg tmsg = {.code = 0, .info.handle = exh, .info.ahandle = NULL, .info.refId = refId};
|
STransMsg tmsg = {.code = 0, .info.handle = exh, .info.ahandle = NULL, .info.refId = refId};
|
||||||
|
|
||||||
SSvrMsg* m = taosMemoryCalloc(1, sizeof(SSvrMsg));
|
SSvrMsg* m = taosMemoryCalloc(1, sizeof(SSvrMsg));
|
||||||
|
if (m == NULL) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto _return1;
|
||||||
|
}
|
||||||
|
|
||||||
m->msg = tmsg;
|
m->msg = tmsg;
|
||||||
m->type = Release;
|
m->type = Release;
|
||||||
|
|
||||||
tDebug("%s conn %p start to release", transLabel(pThrd->pTransInst), exh->handle);
|
tDebug("%s conn %p start to release", transLabel(pThrd->pTransInst), exh->handle);
|
||||||
if (0 != transAsyncSend(pThrd->asyncPool, &m->q)) {
|
if ((code = transAsyncSend(pThrd->asyncPool, &m->q)) != 0) {
|
||||||
destroySmsg(m);
|
destroySmsg(m);
|
||||||
|
transReleaseExHandle(transGetRefMgt(), refId);
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
transReleaseExHandle(transGetRefMgt(), refId);
|
transReleaseExHandle(transGetRefMgt(), refId);
|
||||||
|
@ -1508,13 +1741,15 @@ int transReleaseSrvHandle(void* handle) {
|
||||||
_return1:
|
_return1:
|
||||||
tDebug("handle %p failed to send to release handle", exh);
|
tDebug("handle %p failed to send to release handle", exh);
|
||||||
transReleaseExHandle(transGetRefMgt(), refId);
|
transReleaseExHandle(transGetRefMgt(), refId);
|
||||||
return -1;
|
return code;
|
||||||
_return2:
|
_return2:
|
||||||
tDebug("handle %p failed to send to release handle", exh);
|
tDebug("handle %p failed to send to release handle", exh);
|
||||||
return -1;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int transSendResponse(const STransMsg* msg) {
|
int transSendResponse(const STransMsg* msg) {
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
if (msg->info.noResp) {
|
if (msg->info.noResp) {
|
||||||
rpcFreeCont(msg->pCont);
|
rpcFreeCont(msg->pCont);
|
||||||
tTrace("no need send resp");
|
tTrace("no need send resp");
|
||||||
|
@ -1536,13 +1771,20 @@ int transSendResponse(const STransMsg* msg) {
|
||||||
ASYNC_ERR_JRET(pThrd);
|
ASYNC_ERR_JRET(pThrd);
|
||||||
|
|
||||||
SSvrMsg* m = taosMemoryCalloc(1, sizeof(SSvrMsg));
|
SSvrMsg* m = taosMemoryCalloc(1, sizeof(SSvrMsg));
|
||||||
|
if (m == NULL) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto _return1;
|
||||||
|
}
|
||||||
|
|
||||||
m->msg = tmsg;
|
m->msg = tmsg;
|
||||||
m->type = Normal;
|
m->type = Normal;
|
||||||
|
|
||||||
STraceId* trace = (STraceId*)&msg->info.traceId;
|
STraceId* trace = (STraceId*)&msg->info.traceId;
|
||||||
tGDebug("conn %p start to send resp (1/2)", exh->handle);
|
tGDebug("conn %p start to send resp (1/2)", exh->handle);
|
||||||
if (0 != transAsyncSend(pThrd->asyncPool, &m->q)) {
|
if ((code = transAsyncSend(pThrd->asyncPool, &m->q)) != 0) {
|
||||||
destroySmsg(m);
|
destroySmsg(m);
|
||||||
|
transReleaseExHandle(transGetRefMgt(), refId);
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
transReleaseExHandle(transGetRefMgt(), refId);
|
transReleaseExHandle(transGetRefMgt(), refId);
|
||||||
|
@ -1552,13 +1794,15 @@ _return1:
|
||||||
tDebug("handle %p failed to send resp", exh);
|
tDebug("handle %p failed to send resp", exh);
|
||||||
rpcFreeCont(msg->pCont);
|
rpcFreeCont(msg->pCont);
|
||||||
transReleaseExHandle(transGetRefMgt(), refId);
|
transReleaseExHandle(transGetRefMgt(), refId);
|
||||||
return -1;
|
return code;
|
||||||
_return2:
|
_return2:
|
||||||
tDebug("handle %p failed to send resp", exh);
|
tDebug("handle %p failed to send resp", exh);
|
||||||
rpcFreeCont(msg->pCont);
|
rpcFreeCont(msg->pCont);
|
||||||
return -1;
|
return code;
|
||||||
}
|
}
|
||||||
int transRegisterMsg(const STransMsg* msg) {
|
int transRegisterMsg(const STransMsg* msg) {
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
SExHandle* exh = msg->info.handle;
|
SExHandle* exh = msg->info.handle;
|
||||||
int64_t refId = msg->info.refId;
|
int64_t refId = msg->info.refId;
|
||||||
ASYNC_CHECK_HANDLE(exh, refId);
|
ASYNC_CHECK_HANDLE(exh, refId);
|
||||||
|
@ -1572,13 +1816,20 @@ int transRegisterMsg(const STransMsg* msg) {
|
||||||
ASYNC_ERR_JRET(pThrd);
|
ASYNC_ERR_JRET(pThrd);
|
||||||
|
|
||||||
SSvrMsg* m = taosMemoryCalloc(1, sizeof(SSvrMsg));
|
SSvrMsg* m = taosMemoryCalloc(1, sizeof(SSvrMsg));
|
||||||
|
if (m == NULL) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto _return1;
|
||||||
|
}
|
||||||
|
|
||||||
m->msg = tmsg;
|
m->msg = tmsg;
|
||||||
m->type = Register;
|
m->type = Register;
|
||||||
|
|
||||||
STrans* pTransInst = pThrd->pTransInst;
|
STrans* pTransInst = pThrd->pTransInst;
|
||||||
tDebug("%s conn %p start to register brokenlink callback", transLabel(pTransInst), exh->handle);
|
tDebug("%s conn %p start to register brokenlink callback", transLabel(pTransInst), exh->handle);
|
||||||
if (0 != transAsyncSend(pThrd->asyncPool, &m->q)) {
|
if ((code = transAsyncSend(pThrd->asyncPool, &m->q)) != 0) {
|
||||||
destroySmsg(m);
|
destroySmsg(m);
|
||||||
|
transReleaseExHandle(transGetRefMgt(), refId);
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
transReleaseExHandle(transGetRefMgt(), refId);
|
transReleaseExHandle(transGetRefMgt(), refId);
|
||||||
|
@ -1588,29 +1839,56 @@ _return1:
|
||||||
tDebug("handle %p failed to register brokenlink", exh);
|
tDebug("handle %p failed to register brokenlink", exh);
|
||||||
rpcFreeCont(msg->pCont);
|
rpcFreeCont(msg->pCont);
|
||||||
transReleaseExHandle(transGetRefMgt(), refId);
|
transReleaseExHandle(transGetRefMgt(), refId);
|
||||||
return -1;
|
return code;
|
||||||
_return2:
|
_return2:
|
||||||
tDebug("handle %p failed to register brokenlink", exh);
|
tDebug("handle %p failed to register brokenlink", exh);
|
||||||
rpcFreeCont(msg->pCont);
|
rpcFreeCont(msg->pCont);
|
||||||
return -1;
|
return code;
|
||||||
}
|
}
|
||||||
void transSetIpWhiteList(void* thandle, void* arg, FilteFunc* func) {
|
|
||||||
|
int32_t transSetIpWhiteList(void* thandle, void* arg, FilteFunc* func) {
|
||||||
STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)thandle);
|
STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)thandle);
|
||||||
|
if (pTransInst == NULL) {
|
||||||
|
return TSDB_CODE_RPC_MODULE_QUIT;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
tDebug("ip-white-list update on rpc");
|
tDebug("ip-white-list update on rpc");
|
||||||
SServerObj* svrObj = pTransInst->tcphandle;
|
SServerObj* svrObj = pTransInst->tcphandle;
|
||||||
for (int i = 0; i < svrObj->numOfThreads; i++) {
|
for (int i = 0; i < svrObj->numOfThreads; i++) {
|
||||||
SWorkThrd* pThrd = svrObj->pThreadObj[i];
|
SWorkThrd* pThrd = svrObj->pThreadObj[i];
|
||||||
|
|
||||||
SSvrMsg* msg = taosMemoryCalloc(1, sizeof(SSvrMsg));
|
SSvrMsg* msg = taosMemoryCalloc(1, sizeof(SSvrMsg));
|
||||||
SUpdateIpWhite* pReq = (arg != NULL ? cloneSUpdateIpWhiteReq((SUpdateIpWhite*)arg) : NULL);
|
if (msg == NULL) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
SUpdateIpWhite* pReq = NULL;
|
||||||
|
code = cloneSUpdateIpWhiteReq((SUpdateIpWhite*)arg, &pReq);
|
||||||
|
if (code != 0) {
|
||||||
|
taosMemoryFree(msg);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
msg->type = Update;
|
msg->type = Update;
|
||||||
msg->arg = pReq;
|
msg->arg = pReq;
|
||||||
|
|
||||||
transAsyncSend(pThrd->asyncPool, &msg->q);
|
if ((code = transAsyncSend(pThrd->asyncPool, &msg->q)) != 0) {
|
||||||
|
code = (code == TSDB_CODE_RPC_ASYNC_MODULE_QUIT ? TSDB_CODE_RPC_MODULE_QUIT : code);
|
||||||
|
tFreeSUpdateIpWhiteReq(pReq);
|
||||||
|
taosMemoryFree(pReq);
|
||||||
|
taosMemoryFree(msg);
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
transReleaseExHandle(transGetInstMgt(), (int64_t)thandle);
|
transReleaseExHandle(transGetInstMgt(), (int64_t)thandle);
|
||||||
|
|
||||||
|
if (code != 0) {
|
||||||
|
tError("ip-white-list update failed since %s", tstrerror(code));
|
||||||
|
}
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int transGetConnInfo(void* thandle, STransHandleInfo* pConnInfo) { return -1; }
|
int transGetConnInfo(void* thandle, STransHandleInfo* pConnInfo) { return -1; }
|
||||||
|
|
|
@ -55,8 +55,10 @@ TAOS_DEFINE_ERROR(TSDB_CODE_RPC_TIMEOUT, "Conn read timeout")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED, "some vnode/qnode/mnode(s) out of service")
|
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED, "some vnode/qnode/mnode(s) out of service")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_MAX_SESSIONS, "rpc open too many session")
|
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_MAX_SESSIONS, "rpc open too many session")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_NETWORK_ERROR, "rpc network error")
|
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_NETWORK_ERROR, "rpc network error")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_NETWORK_BUSY, "rpc network busy")
|
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_NETWORK_BUSY, "rpc network busy")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_HTTP_MODULE_QUIT, "http-report already quit")
|
TAOS_DEFINE_ERROR(TSDB_CODE_HTTP_MODULE_QUIT, "http-report already quit")
|
||||||
|
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_MODULE_QUIT, "rpc module already quit")
|
||||||
|
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_ASYNC_MODULE_QUIT, "rpc async module already quit")
|
||||||
|
|
||||||
//common & util
|
//common & util
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_TIME_UNSYNCED, "Client and server's time is not synchronized")
|
TAOS_DEFINE_ERROR(TSDB_CODE_TIME_UNSYNCED, "Client and server's time is not synchronized")
|
||||||
|
|
Loading…
Reference in New Issue