refactor transport
This commit is contained in:
parent
10d6aeef8d
commit
32c0c40c4d
|
@ -1051,10 +1051,10 @@ typedef struct {
|
||||||
SUpdateUserIpWhite* pUserIpWhite;
|
SUpdateUserIpWhite* pUserIpWhite;
|
||||||
} SUpdateIpWhite;
|
} SUpdateIpWhite;
|
||||||
|
|
||||||
int32_t tSerializeSUpdateIpWhite(void* buf, int32_t bufLen, SUpdateIpWhite* pReq);
|
int32_t tSerializeSUpdateIpWhite(void* buf, int32_t bufLen, SUpdateIpWhite* pReq);
|
||||||
int32_t tDeserializeSUpdateIpWhite(void* buf, int32_t bufLen, SUpdateIpWhite* pReq);
|
int32_t tDeserializeSUpdateIpWhite(void* buf, int32_t bufLen, SUpdateIpWhite* pReq);
|
||||||
void tFreeSUpdateIpWhiteReq(SUpdateIpWhite* pReq);
|
void tFreeSUpdateIpWhiteReq(SUpdateIpWhite* pReq);
|
||||||
SUpdateIpWhite* cloneSUpdateIpWhiteReq(SUpdateIpWhite* pReq);
|
int32_t cloneSUpdateIpWhiteReq(SUpdateIpWhite* pReq, SUpdateIpWhite** pUpdate);
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int64_t ipWhiteVer;
|
int64_t ipWhiteVer;
|
||||||
|
|
|
@ -1774,20 +1774,35 @@ int32_t tDeserializeSUpdateIpWhite(void *buf, int32_t bufLen, SUpdateIpWhite *pR
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
void tFreeSUpdateIpWhiteReq(SUpdateIpWhite *pReq) {
|
void tFreeSUpdateIpWhiteReq(SUpdateIpWhite *pReq) {
|
||||||
for (int i = 0; i < pReq->numOfUser; i++) {
|
if (pReq == NULL) return;
|
||||||
SUpdateUserIpWhite *pUserWhite = &pReq->pUserIpWhite[i];
|
|
||||||
taosMemoryFree(pUserWhite->pIpRanges);
|
if (pReq->pUserIpWhite) {
|
||||||
|
for (int i = 0; i < pReq->numOfUser; i++) {
|
||||||
|
SUpdateUserIpWhite *pUserWhite = &pReq->pUserIpWhite[i];
|
||||||
|
taosMemoryFree(pUserWhite->pIpRanges);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
taosMemoryFree(pReq->pUserIpWhite);
|
taosMemoryFree(pReq->pUserIpWhite);
|
||||||
// impl later
|
// impl later
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
SUpdateIpWhite *cloneSUpdateIpWhiteReq(SUpdateIpWhite *pReq) {
|
int32_t cloneSUpdateIpWhiteReq(SUpdateIpWhite *pReq, SUpdateIpWhite **pUpdateMsg) {
|
||||||
|
int32_t code = 0;
|
||||||
|
if (pReq == NULL) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
SUpdateIpWhite *pClone = taosMemoryCalloc(1, sizeof(SUpdateIpWhite));
|
SUpdateIpWhite *pClone = taosMemoryCalloc(1, sizeof(SUpdateIpWhite));
|
||||||
|
if (pClone == NULL) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
pClone->numOfUser = pReq->numOfUser;
|
pClone->numOfUser = pReq->numOfUser;
|
||||||
pClone->ver = pReq->ver;
|
pClone->ver = pReq->ver;
|
||||||
pClone->pUserIpWhite = taosMemoryCalloc(1, sizeof(SUpdateUserIpWhite) * pReq->numOfUser);
|
pClone->pUserIpWhite = taosMemoryCalloc(1, sizeof(SUpdateUserIpWhite) * pReq->numOfUser);
|
||||||
|
if (pClone->pUserIpWhite == NULL) {
|
||||||
|
taosMemoryFree(pClone);
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
for (int i = 0; i < pReq->numOfUser; i++) {
|
for (int i = 0; i < pReq->numOfUser; i++) {
|
||||||
SUpdateUserIpWhite *pNew = &pClone->pUserIpWhite[i];
|
SUpdateUserIpWhite *pNew = &pClone->pUserIpWhite[i];
|
||||||
|
@ -1799,9 +1814,20 @@ SUpdateIpWhite *cloneSUpdateIpWhiteReq(SUpdateIpWhite *pReq) {
|
||||||
|
|
||||||
int32_t sz = pOld->numOfRange * sizeof(SIpV4Range);
|
int32_t sz = pOld->numOfRange * sizeof(SIpV4Range);
|
||||||
pNew->pIpRanges = taosMemoryCalloc(1, sz);
|
pNew->pIpRanges = taosMemoryCalloc(1, sz);
|
||||||
|
if (pNew->pIpRanges == NULL) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
break;
|
||||||
|
}
|
||||||
memcpy(pNew->pIpRanges, pOld->pIpRanges, sz);
|
memcpy(pNew->pIpRanges, pOld->pIpRanges, sz);
|
||||||
}
|
}
|
||||||
return pClone;
|
_return:
|
||||||
|
if (code < 0) {
|
||||||
|
tFreeSUpdateIpWhiteReq(pClone);
|
||||||
|
taosMemoryFree(pClone);
|
||||||
|
} else {
|
||||||
|
*pUpdateMsg = pClone;
|
||||||
|
}
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
int32_t tSerializeRetrieveIpWhite(void *buf, int32_t bufLen, SRetrieveIpWhiteReq *pReq) {
|
int32_t tSerializeRetrieveIpWhite(void *buf, int32_t bufLen, SRetrieveIpWhiteReq *pReq) {
|
||||||
SEncoder encoder = {0};
|
SEncoder encoder = {0};
|
||||||
|
|
|
@ -614,6 +614,10 @@ static int uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) {
|
||||||
STransMsg* pMsg = &smsg->msg;
|
STransMsg* pMsg = &smsg->msg;
|
||||||
if (pMsg->pCont == 0) {
|
if (pMsg->pCont == 0) {
|
||||||
pMsg->pCont = (void*)rpcMallocCont(0);
|
pMsg->pCont = (void*)rpcMallocCont(0);
|
||||||
|
if (pMsg->pCont == NULL) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
pMsg->contLen = 0;
|
pMsg->contLen = 0;
|
||||||
}
|
}
|
||||||
STransMsgHead* pHead = transHeadFromCont(pMsg->pCont);
|
STransMsgHead* pHead = transHeadFromCont(pMsg->pCont);
|
||||||
|
@ -628,7 +632,7 @@ static int uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) {
|
||||||
if (pConn->inType == TDMT_SCH_DROP_TASK && pMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID) {
|
if (pConn->inType == TDMT_SCH_DROP_TASK && pMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID) {
|
||||||
transQueuePop(&pConn->srvMsgs);
|
transQueuePop(&pConn->srvMsgs);
|
||||||
destroySmsg(smsg);
|
destroySmsg(smsg);
|
||||||
return -1;
|
return TSDB_CODE_INVALID_MSG;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pConn->status == ConnNormal) {
|
if (pConn->status == ConnNormal) {
|
||||||
|
@ -1528,6 +1532,7 @@ void uvHandleUpdate(SSvrMsg* msg, SWorkThrd* thrd) {
|
||||||
tDebug("ip-white-list disable on trans");
|
tDebug("ip-white-list disable on trans");
|
||||||
thrd->enableIpWhiteList = 0;
|
thrd->enableIpWhiteList = 0;
|
||||||
taosMemoryFree(msg);
|
taosMemoryFree(msg);
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
for (int i = 0; i < req->numOfUser; i++) {
|
for (int i = 0; i < req->numOfUser; i++) {
|
||||||
|
@ -1557,6 +1562,7 @@ void uvHandleUpdate(SSvrMsg* msg, SWorkThrd* thrd) {
|
||||||
}
|
}
|
||||||
tFreeSUpdateIpWhiteReq(req);
|
tFreeSUpdateIpWhiteReq(req);
|
||||||
taosMemoryFree(req);
|
taosMemoryFree(req);
|
||||||
|
taosMemoryFree(msg);
|
||||||
}
|
}
|
||||||
|
|
||||||
void destroyWorkThrd(SWorkThrd* pThrd) {
|
void destroyWorkThrd(SWorkThrd* pThrd) {
|
||||||
|
@ -1787,13 +1793,11 @@ int32_t transSetIpWhiteList(void* thandle, void* arg, FilteFunc* func) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
SUpdateIpWhite* pReq = NULL;
|
SUpdateIpWhite* pReq = NULL;
|
||||||
if (arg != NULL) {
|
code = cloneSUpdateIpWhiteReq((SUpdateIpWhite*)arg, &pReq);
|
||||||
if ((pReq = cloneSUpdateIpWhiteReq((SUpdateIpWhite*)arg)) == NULL) {
|
if (code != 0) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
taosMemoryFree(msg);
|
||||||
break;
|
break;
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
msg->type = Update;
|
msg->type = Update;
|
||||||
|
|
Loading…
Reference in New Issue