handle except

This commit is contained in:
yihaoDeng 2022-03-21 16:37:19 +08:00
parent bc77e3c579
commit d3212463dd
6 changed files with 54 additions and 20 deletions

View File

@ -52,8 +52,8 @@ typedef struct {
char user[TSDB_USER_LEN]; char user[TSDB_USER_LEN];
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
int32_t rspLen; int32_t rspLen;
void *pRsp; void * pRsp;
void *pNode; void * pNode;
} SNodeMsg; } SNodeMsg;
typedef struct SRpcInit { typedef struct SRpcInit {
@ -87,7 +87,15 @@ typedef struct {
} SRpcCtxVal; } SRpcCtxVal;
typedef struct { typedef struct {
SHashObj *args; int32_t msgType;
void * val;
int32_t len;
void (*free)(void *arg);
} SRpcBrokenlinkVal;
typedef struct {
SHashObj * args;
SRpcBrokenlinkVal brokenVal;
} SRpcCtx; } SRpcCtx;
int32_t rpcInit(); int32_t rpcInit();

View File

@ -154,15 +154,7 @@ AutomationCtx* automCtxCreate(void* data, AutomationType atype) {
// add more search type // add more search type
} }
char* dst = NULL; ctx->data = strdup((char*)data);
if (data != NULL) {
char* src = (char*)data;
size_t len = strlen(src);
dst = (char*)calloc(1, len * sizeof(char) + 1);
memcpy(dst, src, len);
}
ctx->data = dst;
ctx->type = atype; ctx->type = atype;
ctx->stdata = (void*)sv; ctx->stdata = (void*)sv;
return ctx; return ctx;

View File

@ -277,6 +277,7 @@ void transCtxCleanup(STransCtx* ctx);
void transCtxClear(STransCtx* ctx); void transCtxClear(STransCtx* ctx);
void transCtxMerge(STransCtx* dst, STransCtx* src); void transCtxMerge(STransCtx* dst, STransCtx* src);
void* transCtxDumpVal(STransCtx* ctx, int32_t key); void* transCtxDumpVal(STransCtx* ctx, int32_t key);
void* transCtxDumpBrokenlinkVal(STransCtx* ctx, int32_t* msgType);
// queue sending msgs // queue sending msgs
typedef struct { typedef struct {

View File

@ -210,6 +210,9 @@ void cliHandleResp(SCliConn* conn) {
STransConnCtx* pCtx = pMsg ? pMsg->ctx : NULL; STransConnCtx* pCtx = pMsg ? pMsg->ctx : NULL;
if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(conn)) { if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(conn)) {
transMsg.ahandle = transCtxDumpVal(&conn->ctx, transMsg.msgType); transMsg.ahandle = transCtxDumpVal(&conn->ctx, transMsg.msgType);
if (transMsg.ahandle == NULL) {
transMsg.ahandle = transCtxDumpBrokenlinkVal(&conn->ctx, (int32_t*)&(transMsg.msgType));
}
} else { } else {
transMsg.ahandle = pCtx ? pCtx->ahandle : NULL; transMsg.ahandle = pCtx ? pCtx->ahandle : NULL;
} }
@ -282,6 +285,9 @@ void cliHandleExcept(SCliConn* pConn) {
if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(pConn)) { if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(pConn)) {
transMsg.ahandle = transCtxDumpVal(&pConn->ctx, transMsg.msgType); transMsg.ahandle = transCtxDumpVal(&pConn->ctx, transMsg.msgType);
if (transMsg.ahandle == NULL) {
transMsg.ahandle = transCtxDumpBrokenlinkVal(&pConn->ctx, (int32_t*)&(transMsg.msgType));
}
} else { } else {
transMsg.ahandle = pCtx ? pCtx->ahandle : NULL; transMsg.ahandle = pCtx ? pCtx->ahandle : NULL;
} }

View File

@ -238,12 +238,14 @@ void transCtxCleanup(STransCtx* ctx) {
iter->free(iter->val); iter->free(iter->val);
iter = taosHashIterate(ctx->args, iter); iter = taosHashIterate(ctx->args, iter);
} }
taosHashCleanup(ctx->args); taosHashCleanup(ctx->args);
} }
void transCtxMerge(STransCtx* dst, STransCtx* src) { void transCtxMerge(STransCtx* dst, STransCtx* src) {
if (dst->args == NULL) { if (dst->args == NULL) {
dst->args = src->args; dst->args = src->args;
dst->brokenVal = src->brokenVal;
src->args = NULL; src->args = NULL;
return; return;
} }
@ -275,6 +277,14 @@ void* transCtxDumpVal(STransCtx* ctx, int32_t key) {
memcpy(ret, (char*)cVal->val, cVal->len); memcpy(ret, (char*)cVal->val, cVal->len);
return (void*)ret; return (void*)ret;
} }
void* transCtxDumpBrokenlinkVal(STransCtx* ctx, int32_t* msgType) {
char* ret = calloc(1, ctx->brokenVal.len);
memcpy(ret, (char*)(ctx->brokenVal.val), ctx->brokenVal.len);
*msgType = ctx->brokenVal.msgType;
return (void*)ret;
}
void transQueueInit(STransQueue* queue, void (*free)(void* arg)) { void transQueueInit(STransQueue* queue, void (*free)(void* arg)) {
queue->q = taosArrayInit(2, sizeof(void*)); queue->q = taosArrayInit(2, sizeof(void*));

View File

@ -364,9 +364,12 @@ TEST_F(TransEnv, srvReleaseHandle) {
SRpcMsg resp = {0}; SRpcMsg resp = {0};
tr->SetSrvContinueSend(processReleaseHandleCb); tr->SetSrvContinueSend(processReleaseHandleCb);
// tr->Restart(processReleaseHandleCb); // tr->Restart(processReleaseHandleCb);
void *handle = NULL; void * handle = NULL;
SRpcMsg req = {0};
for (int i = 0; i < 1; i++) { for (int i = 0; i < 1; i++) {
SRpcMsg req = {.handle = resp.handle, .persistHandle = 1}; memset(&req, 0, sizeof(req));
req.handle = resp.handle;
req.persistHandle = 1;
req.msgType = 1; req.msgType = 1;
req.pCont = rpcMallocCont(10); req.pCont = rpcMallocCont(10);
req.contLen = 10; req.contLen = 10;
@ -378,8 +381,11 @@ TEST_F(TransEnv, srvReleaseHandle) {
} }
TEST_F(TransEnv, cliReleaseHandleExcept) { TEST_F(TransEnv, cliReleaseHandleExcept) {
SRpcMsg resp = {0}; SRpcMsg resp = {0};
SRpcMsg req = {0};
for (int i = 0; i < 3; i++) { for (int i = 0; i < 3; i++) {
SRpcMsg req = {.handle = resp.handle, .persistHandle = 1}; memset(&req, 0, sizeof(req));
req.handle = resp.handle;
req.persistHandle = 1;
req.msgType = 1; req.msgType = 1;
req.pCont = rpcMallocCont(10); req.pCont = rpcMallocCont(10);
req.contLen = 10; req.contLen = 10;
@ -396,8 +402,10 @@ TEST_F(TransEnv, cliReleaseHandleExcept) {
} }
TEST_F(TransEnv, srvContinueSend) { TEST_F(TransEnv, srvContinueSend) {
tr->SetSrvContinueSend(processContinueSend); tr->SetSrvContinueSend(processContinueSend);
SRpcMsg req = {0}, resp = {0};
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
SRpcMsg req = {0}, resp = {0}; memset(&req, 0, sizeof(req));
memset(&resp, 0, sizeof(resp));
req.msgType = 1; req.msgType = 1;
req.pCont = rpcMallocCont(10); req.pCont = rpcMallocCont(10);
req.contLen = 10; req.contLen = 10;
@ -410,8 +418,10 @@ TEST_F(TransEnv, srvPersistHandleExcept) {
tr->SetSrvContinueSend(processContinueSend); tr->SetSrvContinueSend(processContinueSend);
// tr->SetCliPersistFp(cliPersistHandle); // tr->SetCliPersistFp(cliPersistHandle);
SRpcMsg resp = {0}; SRpcMsg resp = {0};
SRpcMsg req = {0};
for (int i = 0; i < 5; i++) { for (int i = 0; i < 5; i++) {
SRpcMsg req = {.handle = resp.handle}; memset(&req, 0, sizeof(req));
req.handle = resp.handle;
req.msgType = 1; req.msgType = 1;
req.pCont = rpcMallocCont(10); req.pCont = rpcMallocCont(10);
req.contLen = 10; req.contLen = 10;
@ -428,8 +438,10 @@ TEST_F(TransEnv, srvPersistHandleExcept) {
TEST_F(TransEnv, cliPersistHandleExcept) { TEST_F(TransEnv, cliPersistHandleExcept) {
tr->SetSrvContinueSend(processContinueSend); tr->SetSrvContinueSend(processContinueSend);
SRpcMsg resp = {0}; SRpcMsg resp = {0};
SRpcMsg req = {0};
for (int i = 0; i < 5; i++) { for (int i = 0; i < 5; i++) {
SRpcMsg req = {.handle = resp.handle}; memset(&req, 0, sizeof(req));
req.handle = resp.handle;
req.msgType = 1; req.msgType = 1;
req.pCont = rpcMallocCont(10); req.pCont = rpcMallocCont(10);
req.contLen = 10; req.contLen = 10;
@ -450,8 +462,11 @@ TEST_F(TransEnv, multiCliPersistHandleExcept) {
TEST_F(TransEnv, queryExcept) { TEST_F(TransEnv, queryExcept) {
tr->SetSrvContinueSend(processRegisterFailure); tr->SetSrvContinueSend(processRegisterFailure);
SRpcMsg resp = {0}; SRpcMsg resp = {0};
SRpcMsg req = {0};
for (int i = 0; i < 5; i++) { for (int i = 0; i < 5; i++) {
SRpcMsg req = {.handle = resp.handle, .persistHandle = 1}; memset(&req, 0, sizeof(req));
req.handle = resp.handle;
req.persistHandle = 1;
req.msgType = 1; req.msgType = 1;
req.pCont = rpcMallocCont(10); req.pCont = rpcMallocCont(10);
req.contLen = 10; req.contLen = 10;
@ -466,8 +481,10 @@ TEST_F(TransEnv, queryExcept) {
} }
TEST_F(TransEnv, noResp) { TEST_F(TransEnv, noResp) {
SRpcMsg resp = {0}; SRpcMsg resp = {0};
SRpcMsg req = {0};
for (int i = 0; i < 5; i++) { for (int i = 0; i < 5; i++) {
SRpcMsg req = {.noResp = 1}; memset(&req, 0, sizeof(req));
req.noResp = 1;
req.msgType = 1; req.msgType = 1;
req.pCont = rpcMallocCont(10); req.pCont = rpcMallocCont(10);
req.contLen = 10; req.contLen = 10;