commit
1b21c2338d
|
@ -507,6 +507,7 @@ static bool httpFailFastShoudIgnoreMsg(SHashObj* pTable, char* server, int16_t p
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
static void httpFailFastMayUpdate(SHashObj* pTable, char* server, int16_t port, int8_t succ) {
|
static void httpFailFastMayUpdate(SHashObj* pTable, char* server, int16_t port, int8_t succ) {
|
||||||
|
int32_t code = 0;
|
||||||
char buf[256] = {0};
|
char buf[256] = {0};
|
||||||
sprintf(buf, "%s:%d", server, port);
|
sprintf(buf, "%s:%d", server, port);
|
||||||
|
|
||||||
|
@ -514,7 +515,9 @@ static void httpFailFastMayUpdate(SHashObj* pTable, char* server, int16_t port,
|
||||||
(void)taosHashRemove(pTable, buf, strlen(buf));
|
(void)taosHashRemove(pTable, buf, strlen(buf));
|
||||||
} else {
|
} else {
|
||||||
int32_t st = taosGetTimestampSec();
|
int32_t st = taosGetTimestampSec();
|
||||||
(void)taosHashPut(pTable, buf, strlen(buf), &st, sizeof(st));
|
if ((code = taosHashPut(pTable, buf, strlen(buf), &st, sizeof(st))) != 0) {
|
||||||
|
tError("http-report failed to update conn status, dst:%s, reason:%s", buf, tstrerror(code));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
|
@ -332,6 +332,21 @@ static void cliReleaseUnfinishedMsg(SCliConn* conn) {
|
||||||
transQueueClear(&conn->cliMsgs);
|
transQueueClear(&conn->cliMsgs);
|
||||||
memset(&conn->ctx, 0, sizeof(conn->ctx));
|
memset(&conn->ctx, 0, sizeof(conn->ctx));
|
||||||
}
|
}
|
||||||
|
void cliResetTimer(SCliThrd* pThrd, SCliConn* conn) {
|
||||||
|
if (conn->timer) {
|
||||||
|
if (uv_is_active((uv_handle_t*)conn->timer)) {
|
||||||
|
tDebug("%s conn %p stop timer", CONN_GET_INST_LABEL(conn), conn);
|
||||||
|
(void)uv_timer_stop(conn->timer);
|
||||||
|
}
|
||||||
|
if (taosArrayPush(pThrd->timerList, &conn->timer) == NULL) {
|
||||||
|
tError("failed to push timer %p to list, reason:%s", conn->timer, tstrerror(TSDB_CODE_OUT_OF_MEMORY));
|
||||||
|
conn->timer = NULL;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
conn->timer->data = NULL;
|
||||||
|
conn->timer = NULL;
|
||||||
|
}
|
||||||
|
}
|
||||||
bool cliMaySendCachedMsg(SCliConn* conn) {
|
bool cliMaySendCachedMsg(SCliConn* conn) {
|
||||||
if (!transQueueEmpty(&conn->cliMsgs)) {
|
if (!transQueueEmpty(&conn->cliMsgs)) {
|
||||||
SCliMsg* pCliMsg = NULL;
|
SCliMsg* pCliMsg = NULL;
|
||||||
|
@ -376,15 +391,7 @@ void cliHandleResp(SCliConn* conn) {
|
||||||
SCliThrd* pThrd = conn->hostThrd;
|
SCliThrd* pThrd = conn->hostThrd;
|
||||||
STrans* pTransInst = pThrd->pTransInst;
|
STrans* pTransInst = pThrd->pTransInst;
|
||||||
|
|
||||||
if (conn->timer) {
|
cliResetTimer(pThrd, conn);
|
||||||
if (uv_is_active((uv_handle_t*)conn->timer)) {
|
|
||||||
tDebug("%s conn %p stop timer", CONN_GET_INST_LABEL(conn), conn);
|
|
||||||
(void)uv_timer_stop(conn->timer);
|
|
||||||
}
|
|
||||||
(void)taosArrayPush(pThrd->timerList, &conn->timer);
|
|
||||||
conn->timer->data = NULL;
|
|
||||||
conn->timer = NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
STransMsgHead* pHead = NULL;
|
STransMsgHead* pHead = NULL;
|
||||||
|
|
||||||
|
@ -593,8 +600,8 @@ void cliConnTimeout(uv_timer_t* handle) {
|
||||||
|
|
||||||
(void)uv_timer_stop(handle);
|
(void)uv_timer_stop(handle);
|
||||||
handle->data = NULL;
|
handle->data = NULL;
|
||||||
(void)taosArrayPush(pThrd->timerList, &conn->timer);
|
|
||||||
conn->timer = NULL;
|
cliResetTimer(pThrd, conn);
|
||||||
|
|
||||||
cliMayUpdateFqdnCache(pThrd->fqdn2ipCache, conn->dstAddr);
|
cliMayUpdateFqdnCache(pThrd->fqdn2ipCache, conn->dstAddr);
|
||||||
cliHandleFastFail(conn, UV_ECANCELED);
|
cliHandleFastFail(conn, UV_ECANCELED);
|
||||||
|
@ -643,13 +650,16 @@ void* destroyConnPool(SCliThrd* pThrd) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static SCliConn* getConnFromPool(SCliThrd* pThrd, char* key, bool* exceed) {
|
static SCliConn* getConnFromPool(SCliThrd* pThrd, char* key, bool* exceed) {
|
||||||
|
int32_t code = 0;
|
||||||
void* pool = pThrd->pool;
|
void* pool = pThrd->pool;
|
||||||
STrans* pTranInst = pThrd->pTransInst;
|
STrans* pTranInst = pThrd->pTransInst;
|
||||||
size_t klen = strlen(key);
|
size_t klen = strlen(key);
|
||||||
SConnList* plist = taosHashGet((SHashObj*)pool, key, klen);
|
SConnList* plist = taosHashGet((SHashObj*)pool, key, klen);
|
||||||
if (plist == NULL) {
|
if (plist == NULL) {
|
||||||
SConnList list = {0};
|
SConnList list = {0};
|
||||||
(void)taosHashPut((SHashObj*)pool, key, klen, (void*)&list, sizeof(list));
|
if ((code = taosHashPut((SHashObj*)pool, key, klen, (void*)&list, sizeof(list))) != 0) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
plist = taosHashGet(pool, key, klen);
|
plist = taosHashGet(pool, key, klen);
|
||||||
|
|
||||||
SMsgList* nList = taosMemoryCalloc(1, sizeof(SMsgList));
|
SMsgList* nList = taosMemoryCalloc(1, sizeof(SMsgList));
|
||||||
|
@ -686,13 +696,17 @@ static SCliConn* getConnFromPool(SCliThrd* pThrd, char* key, bool* exceed) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static SCliConn* getConnFromPool2(SCliThrd* pThrd, char* key, SCliMsg** pMsg) {
|
static SCliConn* getConnFromPool2(SCliThrd* pThrd, char* key, SCliMsg** pMsg) {
|
||||||
|
int32_t code = 0;
|
||||||
void* pool = pThrd->pool;
|
void* pool = pThrd->pool;
|
||||||
STrans* pTransInst = pThrd->pTransInst;
|
STrans* pTransInst = pThrd->pTransInst;
|
||||||
size_t klen = strlen(key);
|
size_t klen = strlen(key);
|
||||||
SConnList* plist = taosHashGet((SHashObj*)pool, key, klen);
|
SConnList* plist = taosHashGet((SHashObj*)pool, key, klen);
|
||||||
if (plist == NULL) {
|
if (plist == NULL) {
|
||||||
SConnList list = {0};
|
SConnList list = {0};
|
||||||
(void)taosHashPut((SHashObj*)pool, key, klen, (void*)&list, sizeof(list));
|
if ((code = taosHashPut((SHashObj*)pool, key, klen, (void*)&list, sizeof(list))) != 0) {
|
||||||
|
tError("failed to put key %s to pool, reason:%s", key, tstrerror(code));
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
plist = taosHashGet(pool, key, klen);
|
plist = taosHashGet(pool, key, klen);
|
||||||
|
|
||||||
SMsgList* nList = taosMemoryCalloc(1, sizeof(SMsgList));
|
SMsgList* nList = taosMemoryCalloc(1, sizeof(SMsgList));
|
||||||
|
@ -805,12 +819,8 @@ static void addConnToPool(void* pool, SCliConn* conn) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SCliThrd* thrd = conn->hostThrd;
|
SCliThrd* thrd = conn->hostThrd;
|
||||||
if (conn->timer != NULL) {
|
cliResetTimer(thrd, conn);
|
||||||
(void)uv_timer_stop(conn->timer);
|
|
||||||
(void)taosArrayPush(thrd->timerList, &conn->timer);
|
|
||||||
conn->timer->data = NULL;
|
|
||||||
conn->timer = NULL;
|
|
||||||
}
|
|
||||||
if (T_REF_VAL_GET(conn) > 1) {
|
if (T_REF_VAL_GET(conn) > 1) {
|
||||||
transUnrefCliHandle(conn);
|
transUnrefCliHandle(conn);
|
||||||
}
|
}
|
||||||
|
@ -1053,12 +1063,7 @@ static void cliDestroyConn(SCliConn* conn, bool clear) {
|
||||||
transDQCancel(pThrd->timeoutQueue, conn->task);
|
transDQCancel(pThrd->timeoutQueue, conn->task);
|
||||||
conn->task = NULL;
|
conn->task = NULL;
|
||||||
}
|
}
|
||||||
if (conn->timer != NULL) {
|
cliResetTimer(pThrd, conn);
|
||||||
(void)uv_timer_stop(conn->timer);
|
|
||||||
conn->timer->data = NULL;
|
|
||||||
(void)taosArrayPush(pThrd->timerList, &conn->timer);
|
|
||||||
conn->timer = NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (clear) {
|
if (clear) {
|
||||||
if (!uv_is_closing((uv_handle_t*)conn->stream)) {
|
if (!uv_is_closing((uv_handle_t*)conn->stream)) {
|
||||||
|
@ -1073,12 +1078,7 @@ static void cliDestroy(uv_handle_t* handle) {
|
||||||
}
|
}
|
||||||
SCliConn* conn = handle->data;
|
SCliConn* conn = handle->data;
|
||||||
SCliThrd* pThrd = conn->hostThrd;
|
SCliThrd* pThrd = conn->hostThrd;
|
||||||
if (conn->timer != NULL) {
|
cliResetTimer(pThrd, conn);
|
||||||
(void)uv_timer_stop(conn->timer);
|
|
||||||
(void)taosArrayPush(pThrd->timerList, &conn->timer);
|
|
||||||
conn->timer->data = NULL;
|
|
||||||
conn->timer = NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
(void)atomic_sub_fetch_32(&pThrd->connCount, 1);
|
(void)atomic_sub_fetch_32(&pThrd->connCount, 1);
|
||||||
|
|
||||||
|
@ -1385,10 +1385,7 @@ static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) {
|
||||||
|
|
||||||
uint32_t ipaddr = 0;
|
uint32_t ipaddr = 0;
|
||||||
if ((code = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, pList->ip, &ipaddr)) != 0) {
|
if ((code = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, pList->ip, &ipaddr)) != 0) {
|
||||||
(void)uv_timer_stop(conn->timer);
|
cliResetTimer(pThrd, conn);
|
||||||
conn->timer->data = NULL;
|
|
||||||
(void)taosArrayPush(pThrd->timerList, &conn->timer);
|
|
||||||
conn->timer = NULL;
|
|
||||||
cliHandleFastFail(conn, code);
|
cliHandleFastFail(conn, code);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -1421,10 +1418,7 @@ static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) {
|
||||||
|
|
||||||
ret = uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb);
|
ret = uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb);
|
||||||
if (ret != 0) {
|
if (ret != 0) {
|
||||||
(void)uv_timer_stop(conn->timer);
|
cliResetTimer(pThrd, conn);
|
||||||
conn->timer->data = NULL;
|
|
||||||
(void)taosArrayPush(pThrd->timerList, &conn->timer);
|
|
||||||
conn->timer = NULL;
|
|
||||||
|
|
||||||
cliMayUpdateFqdnCache(pThrd->fqdn2ipCache, conn->dstAddr);
|
cliMayUpdateFqdnCache(pThrd->fqdn2ipCache, conn->dstAddr);
|
||||||
cliHandleFastFail(conn, -1);
|
cliHandleFastFail(conn, -1);
|
||||||
|
@ -1502,7 +1496,10 @@ static void cliHandleFastFail(SCliConn* pConn, int status) {
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
SFailFastItem item = {.count = 1, .timestamp = cTimestamp};
|
SFailFastItem item = {.count = 1, .timestamp = cTimestamp};
|
||||||
(void)taosHashPut(pThrd->failFastCache, pConn->dstAddr, strlen(pConn->dstAddr), &item, sizeof(SFailFastItem));
|
int32_t code = taosHashPut(pThrd->failFastCache, pConn->dstAddr, strlen(pConn->dstAddr), &item, sizeof(SFailFastItem));
|
||||||
|
if (code != 0) {
|
||||||
|
tError("failed to put fail-fast item to cache, reason:%s", tstrerror(code));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -1522,10 +1519,7 @@ void cliConnCb(uv_connect_t* req, int status) {
|
||||||
if (pConn->timer == NULL) {
|
if (pConn->timer == NULL) {
|
||||||
timeout = true;
|
timeout = true;
|
||||||
} else {
|
} else {
|
||||||
(void)uv_timer_stop(pConn->timer);
|
cliResetTimer(pThrd, pConn);
|
||||||
pConn->timer->data = NULL;
|
|
||||||
(void)taosArrayPush(pThrd->timerList, &pConn->timer);
|
|
||||||
pConn->timer = NULL;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
STUB_RAND_NETWORK_ERR(status);
|
STUB_RAND_NETWORK_ERR(status);
|
||||||
|
@ -1870,11 +1864,7 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
|
||||||
uint32_t ipaddr;
|
uint32_t ipaddr;
|
||||||
int32_t code = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, fqdn, &ipaddr);
|
int32_t code = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, fqdn, &ipaddr);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
(void)uv_timer_stop(conn->timer);
|
cliResetTimer(pThrd, conn);
|
||||||
conn->timer->data = NULL;
|
|
||||||
(void)taosArrayPush(pThrd->timerList, &conn->timer);
|
|
||||||
conn->timer = NULL;
|
|
||||||
|
|
||||||
cliHandleExcept(conn, code);
|
cliHandleExcept(conn, code);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -1910,10 +1900,7 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
|
||||||
|
|
||||||
ret = uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb);
|
ret = uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb);
|
||||||
if (ret != 0) {
|
if (ret != 0) {
|
||||||
(void)uv_timer_stop(conn->timer);
|
cliResetTimer(pThrd, conn);
|
||||||
conn->timer->data = NULL;
|
|
||||||
(void)taosArrayPush(pThrd->timerList, &conn->timer);
|
|
||||||
conn->timer = NULL;
|
|
||||||
|
|
||||||
cliMayUpdateFqdnCache(pThrd->fqdn2ipCache, conn->dstAddr);
|
cliMayUpdateFqdnCache(pThrd->fqdn2ipCache, conn->dstAddr);
|
||||||
cliHandleFastFail(conn, ret);
|
cliHandleFastFail(conn, ret);
|
||||||
|
@ -2377,7 +2364,9 @@ static int32_t createThrdObj(void* trans, SCliThrd** ppThrd) {
|
||||||
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _end);
|
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _end);
|
||||||
}
|
}
|
||||||
(void)uv_timer_init(pThrd->loop, timer);
|
(void)uv_timer_init(pThrd->loop, timer);
|
||||||
(void)taosArrayPush(pThrd->timerList, &timer);
|
if (taosArrayPush(pThrd->timerList, &timer) == NULL) {
|
||||||
|
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _end);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pThrd->pool = createConnPool(4);
|
pThrd->pool = createConnPool(4);
|
||||||
|
|
|
@ -375,11 +375,10 @@ void transCtxMerge(STransCtx* dst, STransCtx* src) {
|
||||||
STransCtxVal* sVal = (STransCtxVal*)iter;
|
STransCtxVal* sVal = (STransCtxVal*)iter;
|
||||||
key = taosHashGetKey(sVal, &klen);
|
key = taosHashGetKey(sVal, &klen);
|
||||||
|
|
||||||
// STransCtxVal* dVal = taosHashGet(dst->args, key, klen);
|
int32_t code = taosHashPut(dst->args, key, klen, sVal, sizeof(*sVal));
|
||||||
// if (dVal) {
|
if (code != 0) {
|
||||||
// dst->freeFunc(dVal->val);
|
tError("failed to put val to hash, reason:%s", tstrerror(code));
|
||||||
// }
|
}
|
||||||
(void)taosHashPut(dst->args, key, klen, sVal, sizeof(*sVal));
|
|
||||||
iter = taosHashIterate(src->args, iter);
|
iter = taosHashIterate(src->args, iter);
|
||||||
}
|
}
|
||||||
taosHashCleanup(src->args);
|
taosHashCleanup(src->args);
|
||||||
|
@ -453,7 +452,9 @@ bool transQueuePush(STransQueue* queue, void* arg) {
|
||||||
if (queue->q == NULL) {
|
if (queue->q == NULL) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
(void)taosArrayPush(queue->q, &arg);
|
if (taosArrayPush(queue->q, &arg) == NULL) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
if (taosArrayGetSize(queue->q) > 1) {
|
if (taosArrayGetSize(queue->q) > 1) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue