refactor transport

This commit is contained in:
Yihao Deng 2024-07-25 07:01:50 +00:00
parent cd030c186a
commit cc9e8cf14b
2 changed files with 50 additions and 6 deletions

View File

@ -713,10 +713,22 @@ static SCliConn* getConnFromPool2(SCliThrd* pThrd, char* key, SCliMsg** pMsg) {
}
STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg));
if (arg == NULL) {
doNotifyApp(*pMsg, pThrd, TSDB_CODE_OUT_OF_MEMORY);
*pMsg = NULL;
return NULL;
}
arg->param1 = *pMsg;
arg->param2 = pThrd;
(*pMsg)->ctx->task = transDQSched(pThrd->waitConnQueue, doFreeTimeoutMsg, arg, pTransInst->timeToGetConn);
SDelayTask* task = transDQSched(pThrd->waitConnQueue, doFreeTimeoutMsg, arg, pTransInst->timeToGetConn);
if (task == NULL) {
taosMemoryFree(arg);
doNotifyApp(*pMsg, pThrd, TSDB_CODE_OUT_OF_MEMORY);
*pMsg = NULL;
return NULL;
}
(*pMsg)->ctx->task = task;
tGTrace("%s msg %s delay to send, wait for avaiable connect", pTransInst->label, TMSG_INFO((*pMsg)->msg.msgType));
QUEUE_PUSH(&(list)->msgQ, &(*pMsg)->q);
*pMsg = NULL;
@ -724,9 +736,23 @@ static SCliConn* getConnFromPool2(SCliThrd* pThrd, char* key, SCliMsg** pMsg) {
// send msg in delay queue
if (!(QUEUE_IS_EMPTY(&(list)->msgQ))) {
STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg));
if (arg == NULL) {
doNotifyApp(*pMsg, pThrd, TSDB_CODE_OUT_OF_MEMORY);
*pMsg = NULL;
return NULL;
}
arg->param1 = *pMsg;
arg->param2 = pThrd;
(*pMsg)->ctx->task = transDQSched(pThrd->waitConnQueue, doFreeTimeoutMsg, arg, pTransInst->timeToGetConn);
SDelayTask* task = transDQSched(pThrd->waitConnQueue, doFreeTimeoutMsg, arg, pTransInst->timeToGetConn);
if (task == NULL) {
taosMemoryFree(arg);
doNotifyApp(*pMsg, pThrd, TSDB_CODE_OUT_OF_MEMORY);
*pMsg = NULL;
return NULL;
}
(*pMsg)->ctx->task = task;
tGTrace("%s msg %s delay to send, wait for avaiable connect", pTransInst->label,
TMSG_INFO((*pMsg)->msg.msgType));
@ -766,7 +792,11 @@ static void addConnToPool(void* pool, SCliConn* conn) {
if (conn->status == ConnInPool) {
return;
}
allocConnRef(conn, true);
int32_t code = allocConnRef(conn, true);
if (code != 0) {
cliDestroyConn(conn, true);
return;
}
SCliThrd* thrd = conn->hostThrd;
if (conn->timer != NULL) {
@ -811,6 +841,7 @@ static void addConnToPool(void* pool, SCliConn* conn) {
if (conn->list->size >= 10) {
STaskArg* arg = taosMemoryCalloc(1, sizeof(STaskArg));
if (arg == NULL) return;
arg->param1 = conn;
arg->param2 = thrd;
@ -826,9 +857,12 @@ static int32_t allocConnRef(SCliConn* conn, bool update) {
}
SExHandle* exh = taosMemoryCalloc(1, sizeof(SExHandle));
if (exh == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
exh->refId = transAddExHandle(transGetRefMgt(), exh);
SExHandle* self = transAcquireExHandle(transGetRefMgt(), exh->refId);
if (self != exh) {
if (exh->refId < 0) {
taosMemoryFree(exh);
return TSDB_CODE_REF_INVALID_ID;
}
@ -838,8 +872,14 @@ static int32_t allocConnRef(SCliConn* conn, bool update) {
exh->handle = conn;
exh->pThrd = conn->hostThrd;
SExHandle* self = transAcquireExHandle(transGetRefMgt(), exh->refId);
if (self != exh) {
taosMemoryFree(exh);
return TSDB_CODE_REF_INVALID_ID;
}
conn->refId = exh->refId;
if (conn->refId == -1) {
if (conn->refId < 0) {
taosMemoryFree(exh);
}
return 0;

View File

@ -637,6 +637,10 @@ void transDQCancel(SDelayQueue* queue, SDelayTask* task) {
SDelayTask* transDQSched(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_t timeoutMs) {
uint64_t now = taosGetTimestampMs();
SDelayTask* task = taosMemoryCalloc(1, sizeof(SDelayTask));
if (task == NULL) {
return NULL;
}
task->func = func;
task->arg = arg;
task->execTime = now + timeoutMs;