refactor transport
This commit is contained in:
parent
83449e4b70
commit
10d6aeef8d
|
@ -170,7 +170,7 @@ int rpcSendRecvWithTimeout(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcM
|
||||||
int32_t timeoutMs);
|
int32_t timeoutMs);
|
||||||
int rpcSetDefaultAddr(void *thandle, const char *ip, const char *fqdn);
|
int rpcSetDefaultAddr(void *thandle, const char *ip, const char *fqdn);
|
||||||
void *rpcAllocHandle();
|
void *rpcAllocHandle();
|
||||||
void rpcSetIpWhite(void *thandl, void *arg);
|
int32_t rpcSetIpWhite(void *thandl, void *arg);
|
||||||
|
|
||||||
int32_t rpcUtilSIpRangeToStr(SIpV4Range *pRange, char *buf);
|
int32_t rpcUtilSIpRangeToStr(SIpV4Range *pRange, char *buf);
|
||||||
|
|
||||||
|
|
|
@ -64,10 +64,11 @@ static void dmConvertErrCode(tmsg_t msgType) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
static void dmUpdateRpcIpWhite(SDnodeData *pData, void *pTrans, SRpcMsg *pRpc) {
|
static void dmUpdateRpcIpWhite(SDnodeData *pData, void *pTrans, SRpcMsg *pRpc) {
|
||||||
|
int32_t code = 0;
|
||||||
SUpdateIpWhite ipWhite = {0}; // aosMemoryCalloc(1, sizeof(SUpdateIpWhite));
|
SUpdateIpWhite ipWhite = {0}; // aosMemoryCalloc(1, sizeof(SUpdateIpWhite));
|
||||||
tDeserializeSUpdateIpWhite(pRpc->pCont, pRpc->contLen, &ipWhite);
|
tDeserializeSUpdateIpWhite(pRpc->pCont, pRpc->contLen, &ipWhite);
|
||||||
|
|
||||||
rpcSetIpWhite(pTrans, &ipWhite);
|
code = rpcSetIpWhite(pTrans, &ipWhite);
|
||||||
pData->ipWhiteVer = ipWhite.ver;
|
pData->ipWhiteVer = ipWhite.ver;
|
||||||
|
|
||||||
tFreeSUpdateIpWhiteReq(&ipWhite);
|
tFreeSUpdateIpWhiteReq(&ipWhite);
|
||||||
|
|
|
@ -279,6 +279,7 @@ bool transAsyncPoolIsEmpty(SAsyncPool* pool);
|
||||||
if (exh2 == NULL || id != exh2->refId) { \
|
if (exh2 == NULL || id != exh2->refId) { \
|
||||||
tTrace("handle %p except, may already freed, ignore msg, ref1:%" PRIu64 ", ref2:%" PRIu64, exh1, \
|
tTrace("handle %p except, may already freed, ignore msg, ref1:%" PRIu64 ", ref2:%" PRIu64, exh1, \
|
||||||
exh2 ? exh2->refId : 0, id); \
|
exh2 ? exh2->refId : 0, id); \
|
||||||
|
code = terrno; \
|
||||||
goto _return1; \
|
goto _return1; \
|
||||||
} \
|
} \
|
||||||
} else if (id == 0) { \
|
} else if (id == 0) { \
|
||||||
|
@ -287,6 +288,7 @@ bool transAsyncPoolIsEmpty(SAsyncPool* pool);
|
||||||
if (exh2 == NULL || id == exh2->refId) { \
|
if (exh2 == NULL || id == exh2->refId) { \
|
||||||
tTrace("handle %p except, may already freed, ignore msg, ref1:%" PRIu64 ", ref2:%" PRIu64, exh1, id, \
|
tTrace("handle %p except, may already freed, ignore msg, ref1:%" PRIu64 ", ref2:%" PRIu64, exh1, id, \
|
||||||
exh2 ? exh2->refId : 0); \
|
exh2 ? exh2->refId : 0); \
|
||||||
|
code = terrno; \
|
||||||
goto _return1; \
|
goto _return1; \
|
||||||
} else { \
|
} else { \
|
||||||
id = exh1->refId; \
|
id = exh1->refId; \
|
||||||
|
@ -323,7 +325,7 @@ int transSendRecvWithTimeout(void* shandle, SEpSet* pEpSet, STransMsg* pMsg, ST
|
||||||
int transSendResponse(const STransMsg* msg);
|
int transSendResponse(const STransMsg* msg);
|
||||||
int transRegisterMsg(const STransMsg* msg);
|
int transRegisterMsg(const STransMsg* msg);
|
||||||
int transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn);
|
int transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn);
|
||||||
void transSetIpWhiteList(void* shandle, void* arg, FilteFunc* func);
|
int32_t transSetIpWhiteList(void* shandle, void* arg, FilteFunc* func);
|
||||||
|
|
||||||
int transSockInfo2Str(struct sockaddr* sockname, char* dst);
|
int transSockInfo2Str(struct sockaddr* sockname, char* dst);
|
||||||
|
|
||||||
|
@ -363,7 +365,7 @@ typedef struct {
|
||||||
* init queue
|
* init queue
|
||||||
* note: queue'size is small, default 1
|
* note: queue'size is small, default 1
|
||||||
*/
|
*/
|
||||||
void transQueueInit(STransQueue* queue, void (*freeFunc)(const void* arg));
|
int32_t transQueueInit(STransQueue* queue, void (*freeFunc)(const void* arg));
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* put arg into queue
|
* put arg into queue
|
||||||
|
@ -420,7 +422,7 @@ typedef struct SDelayQueue {
|
||||||
uv_loop_t* loop;
|
uv_loop_t* loop;
|
||||||
} SDelayQueue;
|
} SDelayQueue;
|
||||||
|
|
||||||
int transDQCreate(uv_loop_t* loop, SDelayQueue** queue);
|
int32_t transDQCreate(uv_loop_t* loop, SDelayQueue** queue);
|
||||||
void transDQDestroy(SDelayQueue* queue, void (*freeFunc)(void* arg));
|
void transDQDestroy(SDelayQueue* queue, void (*freeFunc)(void* arg));
|
||||||
SDelayTask* transDQSched(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_t timeoutMs);
|
SDelayTask* transDQSched(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_t timeoutMs);
|
||||||
void transDQCancel(SDelayQueue* queue, SDelayTask* task);
|
void transDQCancel(SDelayQueue* queue, SDelayTask* task);
|
||||||
|
|
|
@ -186,7 +186,7 @@ int rpcSetDefaultAddr(void* thandle, const char* ip, const char* fqdn) {
|
||||||
return transSetDefaultAddr(thandle, ip, fqdn);
|
return transSetDefaultAddr(thandle, ip, fqdn);
|
||||||
}
|
}
|
||||||
// server only
|
// server only
|
||||||
void rpcSetIpWhite(void* thandle, void* arg) { transSetIpWhiteList(thandle, arg, NULL); }
|
int32_t rpcSetIpWhite(void* thandle, void* arg) { return transSetIpWhiteList(thandle, arg, NULL); }
|
||||||
|
|
||||||
void* rpcAllocHandle() { return (void*)transAllocHandle(); }
|
void* rpcAllocHandle() { return (void*)transAllocHandle(); }
|
||||||
|
|
||||||
|
|
|
@ -180,7 +180,7 @@ static int32_t allocConnRef(SCliConn* conn, bool update);
|
||||||
|
|
||||||
static int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg);
|
static int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg);
|
||||||
|
|
||||||
static SCliConn* cliCreateConn(SCliThrd* thrd);
|
static int32_t cliCreateConn(SCliThrd* thrd, SCliConn** pCliConn);
|
||||||
static void cliDestroyConn(SCliConn* pConn, bool clear /*clear tcp handle or not*/);
|
static void cliDestroyConn(SCliConn* pConn, bool clear /*clear tcp handle or not*/);
|
||||||
static void cliDestroy(uv_handle_t* handle);
|
static void cliDestroy(uv_handle_t* handle);
|
||||||
static void cliSend(SCliConn* pConn);
|
static void cliSend(SCliConn* pConn);
|
||||||
|
@ -909,16 +909,35 @@ static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static SCliConn* cliCreateConn(SCliThrd* pThrd) {
|
static int32_t cliCreateConn(SCliThrd* pThrd, SCliConn** pCliConn) {
|
||||||
|
int32_t code = 0;
|
||||||
SCliConn* conn = taosMemoryCalloc(1, sizeof(SCliConn));
|
SCliConn* conn = taosMemoryCalloc(1, sizeof(SCliConn));
|
||||||
|
if (conn == NULL) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
// read/write stream handle
|
// read/write stream handle
|
||||||
conn->stream = (uv_stream_t*)taosMemoryMalloc(sizeof(uv_tcp_t));
|
conn->stream = (uv_stream_t*)taosMemoryMalloc(sizeof(uv_tcp_t));
|
||||||
uv_tcp_init(pThrd->loop, (uv_tcp_t*)(conn->stream));
|
if (conn->stream == NULL) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
TAOS_CHECK_GOTO(code, NULL, _failed);
|
||||||
|
}
|
||||||
|
|
||||||
|
code = uv_tcp_init(pThrd->loop, (uv_tcp_t*)(conn->stream));
|
||||||
|
if (code != 0) {
|
||||||
|
tError("failed to init tcp handle, code:%d, %s", code, uv_strerror(code));
|
||||||
|
code = TSDB_CODE_THIRDPARTY_ERROR;
|
||||||
|
TAOS_CHECK_GOTO(code, NULL, _failed);
|
||||||
|
}
|
||||||
conn->stream->data = conn;
|
conn->stream->data = conn;
|
||||||
|
|
||||||
uv_timer_t* timer = taosArrayGetSize(pThrd->timerList) > 0 ? *(uv_timer_t**)taosArrayPop(pThrd->timerList) : NULL;
|
uv_timer_t* timer = taosArrayGetSize(pThrd->timerList) > 0 ? *(uv_timer_t**)taosArrayPop(pThrd->timerList) : NULL;
|
||||||
if (timer == NULL) {
|
if (timer == NULL) {
|
||||||
timer = taosMemoryCalloc(1, sizeof(uv_timer_t));
|
timer = taosMemoryCalloc(1, sizeof(uv_timer_t));
|
||||||
|
if (timer == NULL) {
|
||||||
|
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _failed);
|
||||||
|
}
|
||||||
|
|
||||||
tDebug("no available timer, create a timer %p", timer);
|
tDebug("no available timer, create a timer %p", timer);
|
||||||
uv_timer_init(pThrd->loop, timer);
|
uv_timer_init(pThrd->loop, timer);
|
||||||
}
|
}
|
||||||
|
@ -927,8 +946,11 @@ static SCliConn* cliCreateConn(SCliThrd* pThrd) {
|
||||||
conn->timer = timer;
|
conn->timer = timer;
|
||||||
conn->connReq.data = conn;
|
conn->connReq.data = conn;
|
||||||
transReqQueueInit(&conn->wreqQueue);
|
transReqQueueInit(&conn->wreqQueue);
|
||||||
transQueueInit(&conn->cliMsgs, NULL);
|
|
||||||
transInitBuffer(&conn->readBuf);
|
TAOS_CHECK_GOTO(transQueueInit(&conn->cliMsgs, NULL), NULL, _failed);
|
||||||
|
|
||||||
|
TAOS_CHECK_GOTO(transInitBuffer(&conn->readBuf), NULL, _failed);
|
||||||
|
|
||||||
QUEUE_INIT(&conn->q);
|
QUEUE_INIT(&conn->q);
|
||||||
conn->hostThrd = pThrd;
|
conn->hostThrd = pThrd;
|
||||||
conn->status = ConnNormal;
|
conn->status = ConnNormal;
|
||||||
|
@ -936,9 +958,19 @@ static SCliConn* cliCreateConn(SCliThrd* pThrd) {
|
||||||
transRefCliHandle(conn);
|
transRefCliHandle(conn);
|
||||||
|
|
||||||
atomic_add_fetch_32(&pThrd->connCount, 1);
|
atomic_add_fetch_32(&pThrd->connCount, 1);
|
||||||
allocConnRef(conn, false);
|
|
||||||
|
|
||||||
return conn;
|
TAOS_CHECK_GOTO(allocConnRef(conn, false), NULL, _failed);
|
||||||
|
|
||||||
|
*pCliConn = conn;
|
||||||
|
return code;
|
||||||
|
_failed:
|
||||||
|
if (conn) {
|
||||||
|
taosMemoryFree(conn->stream);
|
||||||
|
transReqQueueClear(&conn->wreqQueue);
|
||||||
|
transDestroyBuffer(&conn->readBuf);
|
||||||
|
}
|
||||||
|
taosMemoryFree(conn);
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
static void cliDestroyConn(SCliConn* conn, bool clear) {
|
static void cliDestroyConn(SCliConn* conn, bool clear) {
|
||||||
SCliThrd* pThrd = conn->hostThrd;
|
SCliThrd* pThrd = conn->hostThrd;
|
||||||
|
@ -1250,9 +1282,22 @@ static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (conn == NULL) {
|
if (conn == NULL) {
|
||||||
conn = cliCreateConn(pThrd);
|
code = cliCreateConn(pThrd, &conn);
|
||||||
|
if (code != 0) {
|
||||||
|
tError("%s failed to send batch msg, batch size:%d, msgLen: %d, conn limit:%d, reason:%s", pTransInst->label,
|
||||||
|
pBatch->wLen, pBatch->batchSize, pTransInst->connLimitNum, tstrerror(code));
|
||||||
|
cliDestroyBatch(pBatch);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
conn->pBatch = pBatch;
|
conn->pBatch = pBatch;
|
||||||
conn->dstAddr = taosStrdup(pList->dst);
|
conn->dstAddr = taosStrdup(pList->dst);
|
||||||
|
if (conn->dstAddr == NULL) {
|
||||||
|
tError("%s conn %p failed to send batch msg, reason:%s", transLabel(pTransInst), conn,
|
||||||
|
tstrerror(TSDB_CODE_OUT_OF_MEMORY));
|
||||||
|
cliHandleFastFail(conn, -1);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
uint32_t ipaddr = 0;
|
uint32_t ipaddr = 0;
|
||||||
if ((code = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, pList->ip, &ipaddr)) != 0) {
|
if ((code = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, pList->ip, &ipaddr)) != 0) {
|
||||||
|
@ -1263,6 +1308,7 @@ static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) {
|
||||||
cliHandleFastFail(conn, code);
|
cliHandleFastFail(conn, code);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
struct sockaddr_in addr;
|
struct sockaddr_in addr;
|
||||||
addr.sin_family = AF_INET;
|
addr.sin_family = AF_INET;
|
||||||
addr.sin_addr.s_addr = ipaddr;
|
addr.sin_addr.s_addr = ipaddr;
|
||||||
|
@ -1628,6 +1674,7 @@ static void doFreeTimeoutMsg(void* param) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
|
void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
|
||||||
|
int32_t code = 0;
|
||||||
STrans* pTransInst = pThrd->pTransInst;
|
STrans* pTransInst = pThrd->pTransInst;
|
||||||
|
|
||||||
cliMayCvtFqdnToIp(&pMsg->ctx->epSet, &pThrd->cvtAddr);
|
cliMayCvtFqdnToIp(&pMsg->ctx->epSet, &pThrd->cvtAddr);
|
||||||
|
@ -1666,7 +1713,7 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
|
||||||
transQueuePush(&conn->cliMsgs, pMsg);
|
transQueuePush(&conn->cliMsgs, pMsg);
|
||||||
cliSend(conn);
|
cliSend(conn);
|
||||||
} else {
|
} else {
|
||||||
conn = cliCreateConn(pThrd);
|
code = cliCreateConn(pThrd, &conn);
|
||||||
|
|
||||||
int64_t refId = (int64_t)pMsg->msg.info.handle;
|
int64_t refId = (int64_t)pMsg->msg.info.handle;
|
||||||
if (refId != 0) specifyConnRef(conn, true, refId);
|
if (refId != 0) specifyConnRef(conn, true, refId);
|
||||||
|
|
|
@ -103,8 +103,12 @@ int transSockInfo2Str(struct sockaddr* sockname, char* dst) {
|
||||||
return r;
|
return r;
|
||||||
}
|
}
|
||||||
int transInitBuffer(SConnBuffer* buf) {
|
int transInitBuffer(SConnBuffer* buf) {
|
||||||
buf->cap = BUFFER_CAP;
|
|
||||||
buf->buf = taosMemoryCalloc(1, BUFFER_CAP);
|
buf->buf = taosMemoryCalloc(1, BUFFER_CAP);
|
||||||
|
if (buf->buf == NULL) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
|
buf->cap = BUFFER_CAP;
|
||||||
buf->left = -1;
|
buf->left = -1;
|
||||||
buf->len = 0;
|
buf->len = 0;
|
||||||
buf->total = 0;
|
buf->total = 0;
|
||||||
|
@ -419,9 +423,14 @@ void transReqQueueClear(queue* q) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void transQueueInit(STransQueue* queue, void (*freeFunc)(const void* arg)) {
|
int32_t transQueueInit(STransQueue* queue, void (*freeFunc)(const void* arg)) {
|
||||||
queue->q = taosArrayInit(2, sizeof(void*));
|
queue->q = taosArrayInit(2, sizeof(void*));
|
||||||
queue->freeFunc = (void (*)(const void*))freeFunc;
|
queue->freeFunc = (void (*)(const void*))freeFunc;
|
||||||
|
|
||||||
|
if (queue->q == NULL) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
bool transQueuePush(STransQueue* queue, void* arg) {
|
bool transQueuePush(STransQueue* queue, void* arg) {
|
||||||
if (queue->q == NULL) {
|
if (queue->q == NULL) {
|
||||||
|
@ -524,20 +533,44 @@ static void transDQTimeout(uv_timer_t* timer) {
|
||||||
uv_timer_start(queue->timer, transDQTimeout, timeout, 0);
|
uv_timer_start(queue->timer, transDQTimeout, timeout, 0);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
int transDQCreate(uv_loop_t* loop, SDelayQueue** queue) {
|
int32_t transDQCreate(uv_loop_t* loop, SDelayQueue** queue) {
|
||||||
uv_timer_t* timer = taosMemoryCalloc(1, sizeof(uv_timer_t));
|
int32_t code = 0;
|
||||||
uv_timer_init(loop, timer);
|
Heap* heap = NULL;
|
||||||
|
uv_timer_t* timer = NULL;
|
||||||
|
SDelayQueue* q = NULL;
|
||||||
|
|
||||||
Heap* heap = heapCreate(timeCompare);
|
timer = taosMemoryCalloc(1, sizeof(uv_timer_t));
|
||||||
|
if (timer == NULL) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
SDelayQueue* q = taosMemoryCalloc(1, sizeof(SDelayQueue));
|
heap = heapCreate(timeCompare);
|
||||||
|
if (heap == NULL) {
|
||||||
|
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _return1);
|
||||||
|
}
|
||||||
|
|
||||||
|
q = taosMemoryCalloc(1, sizeof(SDelayQueue));
|
||||||
|
if (q == NULL) {
|
||||||
|
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _return1);
|
||||||
|
}
|
||||||
q->heap = heap;
|
q->heap = heap;
|
||||||
q->timer = timer;
|
q->timer = timer;
|
||||||
q->loop = loop;
|
q->loop = loop;
|
||||||
q->timer->data = q;
|
q->timer->data = q;
|
||||||
|
|
||||||
|
int err = uv_timer_init(loop, timer);
|
||||||
|
if (err != 0) {
|
||||||
|
TAOS_CHECK_GOTO(TSDB_CODE_THIRDPARTY_ERROR, NULL, _return1);
|
||||||
|
}
|
||||||
|
|
||||||
*queue = q;
|
*queue = q;
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
|
_return1:
|
||||||
|
taosMemoryFree(timer);
|
||||||
|
heapDestroy(heap);
|
||||||
|
taosMemoryFree(q);
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
|
|
||||||
void transDQDestroy(SDelayQueue* queue, void (*freeFunc)(void* arg)) {
|
void transDQDestroy(SDelayQueue* queue, void (*freeFunc)(void* arg)) {
|
||||||
|
@ -720,29 +753,6 @@ void transDestroySyncMsg(void* msg) {
|
||||||
taosMemoryFree(pSyncMsg);
|
taosMemoryFree(pSyncMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
// void subnetIp2int(const char* const ip_addr, uint8_t* dst) {
|
|
||||||
// char ip_addr_cpy[20];
|
|
||||||
// char ip[5];
|
|
||||||
|
|
||||||
// tstrncpy(ip_addr_cpy, ip_addr, sizeof(ip_addr_cpy));
|
|
||||||
|
|
||||||
// char *s_start, *s_end;
|
|
||||||
// s_start = ip_addr_cpy;
|
|
||||||
// s_end = ip_addr_cpy;
|
|
||||||
|
|
||||||
// int32_t k = 0;
|
|
||||||
|
|
||||||
// for (k = 0; *s_start != '\0'; s_start = s_end) {
|
|
||||||
// for (s_end = s_start; *s_end != '.' && *s_end != '\0'; s_end++) {
|
|
||||||
// }
|
|
||||||
// if (*s_end == '.') {
|
|
||||||
// *s_end = '\0';
|
|
||||||
// s_end++;
|
|
||||||
// }
|
|
||||||
// dst[k++] = (char)atoi(s_start);
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
|
|
||||||
uint32_t subnetIpRang2Int(SIpV4Range* pRange) {
|
uint32_t subnetIpRang2Int(SIpV4Range* pRange) {
|
||||||
uint32_t ip = pRange->ip;
|
uint32_t ip = pRange->ip;
|
||||||
return ((ip & 0xFF) << 24) | ((ip & 0xFF00) << 8) | ((ip & 0xFF0000) >> 8) | ((ip >> 24) & 0xFF);
|
return ((ip & 0xFF) << 24) | ((ip & 0xFF00) << 8) | ((ip & 0xFF0000) >> 8) | ((ip >> 24) & 0xFF);
|
||||||
|
@ -789,7 +799,11 @@ int32_t transUtilSIpRangeToStr(SIpV4Range* pRange, char* buf) {
|
||||||
struct in_addr addr;
|
struct in_addr addr;
|
||||||
addr.s_addr = pRange->ip;
|
addr.s_addr = pRange->ip;
|
||||||
|
|
||||||
uv_inet_ntop(AF_INET, &addr, buf, 32);
|
int32_t err = uv_inet_ntop(AF_INET, &addr, buf, 32);
|
||||||
|
if (err != 0) {
|
||||||
|
tError("failed to convert ip to string, reason:%s", uv_strerror(err));
|
||||||
|
return TSDB_CODE_THIRDPARTY_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
len = strlen(buf);
|
len = strlen(buf);
|
||||||
|
|
||||||
|
@ -805,14 +819,23 @@ int32_t transUtilSWhiteListToStr(SIpWhiteList* pList, char** ppBuf) {
|
||||||
*ppBuf = NULL;
|
*ppBuf = NULL;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t len = 0;
|
int32_t len = 0;
|
||||||
char* pBuf = taosMemoryCalloc(1, pList->num * 36);
|
char* pBuf = taosMemoryCalloc(1, pList->num * 36);
|
||||||
|
if (pBuf == NULL) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
for (int i = 0; i < pList->num; i++) {
|
for (int i = 0; i < pList->num; i++) {
|
||||||
SIpV4Range* pRange = &pList->pIpRange[i];
|
SIpV4Range* pRange = &pList->pIpRange[i];
|
||||||
|
|
||||||
char tbuf[32] = {0};
|
char tbuf[32] = {0};
|
||||||
int tlen = transUtilSIpRangeToStr(pRange, tbuf);
|
int tlen = transUtilSIpRangeToStr(pRange, tbuf);
|
||||||
|
if (tlen < 0) {
|
||||||
|
taosMemoryFree(pBuf);
|
||||||
|
return tlen;
|
||||||
|
}
|
||||||
|
|
||||||
len += sprintf(pBuf + len, "%s,", tbuf);
|
len += sprintf(pBuf + len, "%s,", tbuf);
|
||||||
}
|
}
|
||||||
if (len > 0) {
|
if (len > 0) {
|
||||||
|
|
|
@ -122,7 +122,7 @@ typedef struct SServerObj {
|
||||||
|
|
||||||
SIpWhiteListTab* uvWhiteListCreate();
|
SIpWhiteListTab* uvWhiteListCreate();
|
||||||
void uvWhiteListDestroy(SIpWhiteListTab* pWhite);
|
void uvWhiteListDestroy(SIpWhiteListTab* pWhite);
|
||||||
void uvWhiteListAdd(SIpWhiteListTab* pWhite, char* user, SIpWhiteList* pList, int64_t ver);
|
int32_t uvWhiteListAdd(SIpWhiteListTab* pWhite, char* user, SIpWhiteList* pList, int64_t ver);
|
||||||
void uvWhiteListUpdate(SIpWhiteListTab* pWhite, SHashObj* pTable);
|
void uvWhiteListUpdate(SIpWhiteListTab* pWhite, SHashObj* pTable);
|
||||||
bool uvWhiteListCheckConn(SIpWhiteListTab* pWhite, SSvrConn* pConn);
|
bool uvWhiteListCheckConn(SIpWhiteListTab* pWhite, SSvrConn* pConn);
|
||||||
bool uvWhiteListFilte(SIpWhiteListTab* pWhite, char* user, uint32_t ip, int64_t ver);
|
bool uvWhiteListFilte(SIpWhiteListTab* pWhite, char* user, uint32_t ip, int64_t ver);
|
||||||
|
@ -248,17 +248,26 @@ void uvWhiteListDestroy(SIpWhiteListTab* pWhite) {
|
||||||
taosMemoryFree(pWhite);
|
taosMemoryFree(pWhite);
|
||||||
}
|
}
|
||||||
|
|
||||||
void uvWhiteListToStr(SWhiteUserList* plist, char* user, char** ppBuf) {
|
int32_t uvWhiteListToStr(SWhiteUserList* plist, char* user, char** ppBuf) {
|
||||||
char* tmp = NULL;
|
char* tmp = NULL;
|
||||||
int32_t tlen = transUtilSWhiteListToStr(plist->pList, &tmp);
|
int32_t tlen = transUtilSWhiteListToStr(plist->pList, &tmp);
|
||||||
|
if (tlen < 0) {
|
||||||
|
return tlen;
|
||||||
|
}
|
||||||
|
|
||||||
char* pBuf = taosMemoryCalloc(1, tlen + 64);
|
char* pBuf = taosMemoryCalloc(1, tlen + 64);
|
||||||
|
if (pBuf == NULL) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t len = sprintf(pBuf, "user: %s, ver: %" PRId64 ", ip: {%s}", user, plist->ver, tmp);
|
int32_t len = sprintf(pBuf, "user: %s, ver: %" PRId64 ", ip: {%s}", user, plist->ver, tmp);
|
||||||
taosMemoryFree(tmp);
|
taosMemoryFree(tmp);
|
||||||
|
|
||||||
*ppBuf = pBuf;
|
*ppBuf = pBuf;
|
||||||
|
return len;
|
||||||
}
|
}
|
||||||
void uvWhiteListDebug(SIpWhiteListTab* pWrite) {
|
void uvWhiteListDebug(SIpWhiteListTab* pWrite) {
|
||||||
|
int32_t code = 0;
|
||||||
SHashObj* pWhiteList = pWrite->pList;
|
SHashObj* pWhiteList = pWrite->pList;
|
||||||
void* pIter = taosHashIterate(pWhiteList, NULL);
|
void* pIter = taosHashIterate(pWhiteList, NULL);
|
||||||
while (pIter) {
|
while (pIter) {
|
||||||
|
@ -270,23 +279,35 @@ void uvWhiteListDebug(SIpWhiteListTab* pWrite) {
|
||||||
SWhiteUserList* pUserList = *(SWhiteUserList**)pIter;
|
SWhiteUserList* pUserList = *(SWhiteUserList**)pIter;
|
||||||
|
|
||||||
char* buf = NULL;
|
char* buf = NULL;
|
||||||
uvWhiteListToStr(pUserList, user, &buf);
|
|
||||||
tDebug("ip-white-list %s", buf);
|
code = uvWhiteListToStr(pUserList, user, &buf);
|
||||||
|
if (code != 0) {
|
||||||
|
tDebug("ip-white-list failed to debug to str since %s", buf);
|
||||||
|
}
|
||||||
taosMemoryFree(buf);
|
taosMemoryFree(buf);
|
||||||
pIter = taosHashIterate(pWhiteList, pIter);
|
pIter = taosHashIterate(pWhiteList, pIter);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
void uvWhiteListAdd(SIpWhiteListTab* pWhite, char* user, SIpWhiteList* plist, int64_t ver) {
|
int32_t uvWhiteListAdd(SIpWhiteListTab* pWhite, char* user, SIpWhiteList* plist, int64_t ver) {
|
||||||
|
int32_t code = 0;
|
||||||
SHashObj* pWhiteList = pWhite->pList;
|
SHashObj* pWhiteList = pWhite->pList;
|
||||||
|
|
||||||
SWhiteUserList** ppUserList = taosHashGet(pWhiteList, user, strlen(user));
|
SWhiteUserList** ppUserList = taosHashGet(pWhiteList, user, strlen(user));
|
||||||
if (ppUserList == NULL || *ppUserList == NULL) {
|
if (ppUserList == NULL || *ppUserList == NULL) {
|
||||||
SWhiteUserList* pUserList = taosMemoryCalloc(1, sizeof(SWhiteUserList));
|
SWhiteUserList* pUserList = taosMemoryCalloc(1, sizeof(SWhiteUserList));
|
||||||
|
if (pUserList == NULL) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
pUserList->ver = ver;
|
pUserList->ver = ver;
|
||||||
|
|
||||||
pUserList->pList = plist;
|
pUserList->pList = plist;
|
||||||
|
|
||||||
taosHashPut(pWhiteList, user, strlen(user), &pUserList, sizeof(void*));
|
code = taosHashPut(pWhiteList, user, strlen(user), &pUserList, sizeof(void*));
|
||||||
|
if (code != 0) {
|
||||||
|
taosMemoryFree(pUserList);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
SWhiteUserList* pUserList = *ppUserList;
|
SWhiteUserList* pUserList = *ppUserList;
|
||||||
|
|
||||||
|
@ -295,6 +316,7 @@ void uvWhiteListAdd(SIpWhiteListTab* pWhite, char* user, SIpWhiteList* plist, in
|
||||||
pUserList->pList = plist;
|
pUserList->pList = plist;
|
||||||
}
|
}
|
||||||
uvWhiteListDebug(pWhite);
|
uvWhiteListDebug(pWhite);
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void uvWhiteListUpdate(SIpWhiteListTab* pWhite, SHashObj* pTable) {
|
void uvWhiteListUpdate(SIpWhiteListTab* pWhite, SHashObj* pTable) {
|
||||||
|
@ -1502,30 +1524,41 @@ void uvHandleRegister(SSvrMsg* msg, SWorkThrd* thrd) {
|
||||||
}
|
}
|
||||||
void uvHandleUpdate(SSvrMsg* msg, SWorkThrd* thrd) {
|
void uvHandleUpdate(SSvrMsg* msg, SWorkThrd* thrd) {
|
||||||
SUpdateIpWhite* req = msg->arg;
|
SUpdateIpWhite* req = msg->arg;
|
||||||
if (req != NULL) {
|
if (req == NULL) {
|
||||||
|
tDebug("ip-white-list disable on trans");
|
||||||
|
thrd->enableIpWhiteList = 0;
|
||||||
|
taosMemoryFree(msg);
|
||||||
|
}
|
||||||
|
int32_t code = 0;
|
||||||
for (int i = 0; i < req->numOfUser; i++) {
|
for (int i = 0; i < req->numOfUser; i++) {
|
||||||
SUpdateUserIpWhite* pUser = &req->pUserIpWhite[i];
|
SUpdateUserIpWhite* pUser = &req->pUserIpWhite[i];
|
||||||
|
|
||||||
int32_t sz = pUser->numOfRange * sizeof(SIpV4Range);
|
int32_t sz = pUser->numOfRange * sizeof(SIpV4Range);
|
||||||
SIpWhiteList* pList = taosMemoryCalloc(1, sz + sizeof(SIpWhiteList));
|
|
||||||
pList->num = pUser->numOfRange;
|
|
||||||
|
|
||||||
|
SIpWhiteList* pList = taosMemoryCalloc(1, sz + sizeof(SIpWhiteList));
|
||||||
|
if (pList == NULL) {
|
||||||
|
tError("failed to create ip-white-list since %s", tstrerror(code));
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
pList->num = pUser->numOfRange;
|
||||||
memcpy(pList->pIpRange, pUser->pIpRanges, sz);
|
memcpy(pList->pIpRange, pUser->pIpRanges, sz);
|
||||||
uvWhiteListAdd(thrd->pWhiteList, pUser->user, pList, pUser->ver);
|
code = uvWhiteListAdd(thrd->pWhiteList, pUser->user, pList, pUser->ver);
|
||||||
|
if (code != 0) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (code == 0) {
|
||||||
thrd->pWhiteList->ver = req->ver;
|
thrd->pWhiteList->ver = req->ver;
|
||||||
thrd->enableIpWhiteList = 1;
|
thrd->enableIpWhiteList = 1;
|
||||||
|
} else {
|
||||||
|
tError("failed to update ip-white-list since %s", tstrerror(code));
|
||||||
|
}
|
||||||
tFreeSUpdateIpWhiteReq(req);
|
tFreeSUpdateIpWhiteReq(req);
|
||||||
taosMemoryFree(req);
|
taosMemoryFree(req);
|
||||||
} else {
|
|
||||||
tDebug("ip-white-list disable on trans");
|
|
||||||
thrd->enableIpWhiteList = 0;
|
|
||||||
}
|
|
||||||
taosMemoryFree(msg);
|
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void destroyWorkThrd(SWorkThrd* pThrd) {
|
void destroyWorkThrd(SWorkThrd* pThrd) {
|
||||||
if (pThrd == NULL) {
|
if (pThrd == NULL) {
|
||||||
return;
|
return;
|
||||||
|
@ -1598,6 +1631,7 @@ void transUnrefSrvHandle(void* handle) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int transReleaseSrvHandle(void* handle) {
|
int transReleaseSrvHandle(void* handle) {
|
||||||
|
int32_t code = 0;
|
||||||
SRpcHandleInfo* info = handle;
|
SRpcHandleInfo* info = handle;
|
||||||
SExHandle* exh = info->handle;
|
SExHandle* exh = info->handle;
|
||||||
int64_t refId = info->refId;
|
int64_t refId = info->refId;
|
||||||
|
@ -1610,12 +1644,19 @@ int transReleaseSrvHandle(void* handle) {
|
||||||
STransMsg tmsg = {.code = 0, .info.handle = exh, .info.ahandle = NULL, .info.refId = refId};
|
STransMsg tmsg = {.code = 0, .info.handle = exh, .info.ahandle = NULL, .info.refId = refId};
|
||||||
|
|
||||||
SSvrMsg* m = taosMemoryCalloc(1, sizeof(SSvrMsg));
|
SSvrMsg* m = taosMemoryCalloc(1, sizeof(SSvrMsg));
|
||||||
|
if (m == NULL) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto _return1;
|
||||||
|
}
|
||||||
|
|
||||||
m->msg = tmsg;
|
m->msg = tmsg;
|
||||||
m->type = Release;
|
m->type = Release;
|
||||||
|
|
||||||
tDebug("%s conn %p start to release", transLabel(pThrd->pTransInst), exh->handle);
|
tDebug("%s conn %p start to release", transLabel(pThrd->pTransInst), exh->handle);
|
||||||
if (0 != transAsyncSend(pThrd->asyncPool, &m->q)) {
|
if ((code = transAsyncSend(pThrd->asyncPool, &m->q)) != 0) {
|
||||||
destroySmsg(m);
|
destroySmsg(m);
|
||||||
|
transReleaseExHandle(transGetRefMgt(), refId);
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
transReleaseExHandle(transGetRefMgt(), refId);
|
transReleaseExHandle(transGetRefMgt(), refId);
|
||||||
|
@ -1623,13 +1664,15 @@ int transReleaseSrvHandle(void* handle) {
|
||||||
_return1:
|
_return1:
|
||||||
tDebug("handle %p failed to send to release handle", exh);
|
tDebug("handle %p failed to send to release handle", exh);
|
||||||
transReleaseExHandle(transGetRefMgt(), refId);
|
transReleaseExHandle(transGetRefMgt(), refId);
|
||||||
return -1;
|
return code;
|
||||||
_return2:
|
_return2:
|
||||||
tDebug("handle %p failed to send to release handle", exh);
|
tDebug("handle %p failed to send to release handle", exh);
|
||||||
return -1;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int transSendResponse(const STransMsg* msg) {
|
int transSendResponse(const STransMsg* msg) {
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
if (msg->info.noResp) {
|
if (msg->info.noResp) {
|
||||||
rpcFreeCont(msg->pCont);
|
rpcFreeCont(msg->pCont);
|
||||||
tTrace("no need send resp");
|
tTrace("no need send resp");
|
||||||
|
@ -1651,13 +1694,20 @@ int transSendResponse(const STransMsg* msg) {
|
||||||
ASYNC_ERR_JRET(pThrd);
|
ASYNC_ERR_JRET(pThrd);
|
||||||
|
|
||||||
SSvrMsg* m = taosMemoryCalloc(1, sizeof(SSvrMsg));
|
SSvrMsg* m = taosMemoryCalloc(1, sizeof(SSvrMsg));
|
||||||
|
if (m == NULL) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto _return1;
|
||||||
|
}
|
||||||
|
|
||||||
m->msg = tmsg;
|
m->msg = tmsg;
|
||||||
m->type = Normal;
|
m->type = Normal;
|
||||||
|
|
||||||
STraceId* trace = (STraceId*)&msg->info.traceId;
|
STraceId* trace = (STraceId*)&msg->info.traceId;
|
||||||
tGDebug("conn %p start to send resp (1/2)", exh->handle);
|
tGDebug("conn %p start to send resp (1/2)", exh->handle);
|
||||||
if (0 != transAsyncSend(pThrd->asyncPool, &m->q)) {
|
if ((code = transAsyncSend(pThrd->asyncPool, &m->q)) != 0) {
|
||||||
destroySmsg(m);
|
destroySmsg(m);
|
||||||
|
transReleaseExHandle(transGetRefMgt(), refId);
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
transReleaseExHandle(transGetRefMgt(), refId);
|
transReleaseExHandle(transGetRefMgt(), refId);
|
||||||
|
@ -1667,13 +1717,15 @@ _return1:
|
||||||
tDebug("handle %p failed to send resp", exh);
|
tDebug("handle %p failed to send resp", exh);
|
||||||
rpcFreeCont(msg->pCont);
|
rpcFreeCont(msg->pCont);
|
||||||
transReleaseExHandle(transGetRefMgt(), refId);
|
transReleaseExHandle(transGetRefMgt(), refId);
|
||||||
return -1;
|
return code;
|
||||||
_return2:
|
_return2:
|
||||||
tDebug("handle %p failed to send resp", exh);
|
tDebug("handle %p failed to send resp", exh);
|
||||||
rpcFreeCont(msg->pCont);
|
rpcFreeCont(msg->pCont);
|
||||||
return -1;
|
return code;
|
||||||
}
|
}
|
||||||
int transRegisterMsg(const STransMsg* msg) {
|
int transRegisterMsg(const STransMsg* msg) {
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
SExHandle* exh = msg->info.handle;
|
SExHandle* exh = msg->info.handle;
|
||||||
int64_t refId = msg->info.refId;
|
int64_t refId = msg->info.refId;
|
||||||
ASYNC_CHECK_HANDLE(exh, refId);
|
ASYNC_CHECK_HANDLE(exh, refId);
|
||||||
|
@ -1687,13 +1739,20 @@ int transRegisterMsg(const STransMsg* msg) {
|
||||||
ASYNC_ERR_JRET(pThrd);
|
ASYNC_ERR_JRET(pThrd);
|
||||||
|
|
||||||
SSvrMsg* m = taosMemoryCalloc(1, sizeof(SSvrMsg));
|
SSvrMsg* m = taosMemoryCalloc(1, sizeof(SSvrMsg));
|
||||||
|
if (m == NULL) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto _return1;
|
||||||
|
}
|
||||||
|
|
||||||
m->msg = tmsg;
|
m->msg = tmsg;
|
||||||
m->type = Register;
|
m->type = Register;
|
||||||
|
|
||||||
STrans* pTransInst = pThrd->pTransInst;
|
STrans* pTransInst = pThrd->pTransInst;
|
||||||
tDebug("%s conn %p start to register brokenlink callback", transLabel(pTransInst), exh->handle);
|
tDebug("%s conn %p start to register brokenlink callback", transLabel(pTransInst), exh->handle);
|
||||||
if (0 != transAsyncSend(pThrd->asyncPool, &m->q)) {
|
if ((code = transAsyncSend(pThrd->asyncPool, &m->q)) != 0) {
|
||||||
destroySmsg(m);
|
destroySmsg(m);
|
||||||
|
transReleaseExHandle(transGetRefMgt(), refId);
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
transReleaseExHandle(transGetRefMgt(), refId);
|
transReleaseExHandle(transGetRefMgt(), refId);
|
||||||
|
@ -1703,14 +1762,20 @@ _return1:
|
||||||
tDebug("handle %p failed to register brokenlink", exh);
|
tDebug("handle %p failed to register brokenlink", exh);
|
||||||
rpcFreeCont(msg->pCont);
|
rpcFreeCont(msg->pCont);
|
||||||
transReleaseExHandle(transGetRefMgt(), refId);
|
transReleaseExHandle(transGetRefMgt(), refId);
|
||||||
return -1;
|
return code;
|
||||||
_return2:
|
_return2:
|
||||||
tDebug("handle %p failed to register brokenlink", exh);
|
tDebug("handle %p failed to register brokenlink", exh);
|
||||||
rpcFreeCont(msg->pCont);
|
rpcFreeCont(msg->pCont);
|
||||||
return -1;
|
return code;
|
||||||
}
|
}
|
||||||
void transSetIpWhiteList(void* thandle, void* arg, FilteFunc* func) {
|
|
||||||
|
int32_t transSetIpWhiteList(void* thandle, void* arg, FilteFunc* func) {
|
||||||
STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)thandle);
|
STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)thandle);
|
||||||
|
if (pTransInst == NULL) {
|
||||||
|
return TSDB_CODE_RPC_MODULE_QUIT;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
tDebug("ip-white-list update on rpc");
|
tDebug("ip-white-list update on rpc");
|
||||||
SServerObj* svrObj = pTransInst->tcphandle;
|
SServerObj* svrObj = pTransInst->tcphandle;
|
||||||
|
@ -1718,14 +1783,33 @@ void transSetIpWhiteList(void* thandle, void* arg, FilteFunc* func) {
|
||||||
SWorkThrd* pThrd = svrObj->pThreadObj[i];
|
SWorkThrd* pThrd = svrObj->pThreadObj[i];
|
||||||
|
|
||||||
SSvrMsg* msg = taosMemoryCalloc(1, sizeof(SSvrMsg));
|
SSvrMsg* msg = taosMemoryCalloc(1, sizeof(SSvrMsg));
|
||||||
SUpdateIpWhite* pReq = (arg != NULL ? cloneSUpdateIpWhiteReq((SUpdateIpWhite*)arg) : NULL);
|
if (msg == NULL) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
SUpdateIpWhite* pReq = NULL;
|
||||||
|
if (arg != NULL) {
|
||||||
|
if ((pReq = cloneSUpdateIpWhiteReq((SUpdateIpWhite*)arg)) == NULL) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
msg->type = Update;
|
msg->type = Update;
|
||||||
msg->arg = pReq;
|
msg->arg = pReq;
|
||||||
|
|
||||||
transAsyncSend(pThrd->asyncPool, &msg->q);
|
if ((code = transAsyncSend(pThrd->asyncPool, &msg->q)) != 0) {
|
||||||
|
code = (code == TSDB_CODE_RPC_ASYNC_MODULE_QUIT ? TSDB_CODE_RPC_MODULE_QUIT : code);
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
transReleaseExHandle(transGetInstMgt(), (int64_t)thandle);
|
transReleaseExHandle(transGetInstMgt(), (int64_t)thandle);
|
||||||
|
|
||||||
|
if (code != 0) {
|
||||||
|
tError("ip-white-list update failed since %s", tstrerror(code));
|
||||||
|
}
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int transGetConnInfo(void* thandle, STransHandleInfo* pConnInfo) { return -1; }
|
int transGetConnInfo(void* thandle, STransHandleInfo* pConnInfo) { return -1; }
|
||||||
|
|
Loading…
Reference in New Issue