Merge pull request #25244 from taosdata/fix/portMainTo30

port main to 30
This commit is contained in:
Hongze Cheng 2024-04-07 09:57:47 +08:00 committed by GitHub
commit 456cec5b2d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 184 additions and 107 deletions

View File

@ -45,8 +45,8 @@ static FilterCondType checkTagCond(SNode* cond);
static int32_t optimizeTbnameInCond(void* metaHandle, int64_t suid, SArray* list, SNode* pTagCond, SStorageAPI* pAPI);
static int32_t optimizeTbnameInCondImpl(void* metaHandle, SArray* list, SNode* pTagCond, SStorageAPI* pStoreAPI);
static int32_t getTableList(void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, SNode* pTagIndexCond,
STableListInfo* pListInfo, uint8_t* digest, const char* idstr, SStorageAPI* pStorageAPI);
static int32_t getTableList(void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, SNode* pTagIndexCond,
STableListInfo* pListInfo, uint8_t* digest, const char* idstr, SStorageAPI* pStorageAPI);
static int64_t getLimit(const SNode* pLimit) { return NULL == pLimit ? -1 : ((SLimitNode*)pLimit)->limit; }
static int64_t getOffset(const SNode* pLimit) { return NULL == pLimit ? -1 : ((SLimitNode*)pLimit)->offset; }
@ -642,7 +642,8 @@ int32_t getColInfoResultForGroupby(void* pVnode, SNodeList* group, STableListInf
info->groupId = calcGroupId(keyBuf, len);
if (initRemainGroups) {
// groupId ~ table uid
taosHashPut(pTableListInfo->remainGroups, &(info->groupId), sizeof(info->groupId), &(info->uid), sizeof(info->uid));
taosHashPut(pTableListInfo->remainGroups, &(info->groupId), sizeof(info->groupId), &(info->uid),
sizeof(info->uid));
}
}
@ -858,7 +859,7 @@ static int32_t optimizeTbnameInCondImpl(void* pVnode, SArray* pExistedUidList, S
}
SSDataBlock* createTagValBlockForFilter(SArray* pColList, int32_t numOfTables, SArray* pUidTagList, void* pVnode,
SStorageAPI* pStorageAPI) {
SStorageAPI* pStorageAPI) {
SSDataBlock* pResBlock = createDataBlock();
if (pResBlock == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
@ -939,11 +940,12 @@ SSDataBlock* createTagValBlockForFilter(SArray* pColList, int32_t numOfTables, S
return pResBlock;
}
static int32_t doSetQualifiedUid(STableListInfo* pListInfo, SArray* pUidList, const SArray* pUidTagList, bool* pResultList, bool addUid) {
static int32_t doSetQualifiedUid(STableListInfo* pListInfo, SArray* pUidList, const SArray* pUidTagList,
bool* pResultList, bool addUid) {
taosArrayClear(pUidList);
STableKeyInfo info = {.uid = 0, .groupId = 0};
int32_t numOfTables = taosArrayGetSize(pUidTagList);
int32_t numOfTables = taosArrayGetSize(pUidTagList);
for (int32_t i = 0; i < numOfTables; ++i) {
if (pResultList[i]) {
uint64_t uid = ((STUidTagInfo*)taosArrayGet(pUidTagList, i))->uid;
@ -1143,7 +1145,7 @@ int32_t getTableList(void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, S
if (code != 0 || status == SFLT_NOT_INDEX) { // temporarily disable it for performance sake
qDebug("failed to get tableIds from index, suid:%" PRIu64, pScanNode->uid);
} else {
qInfo("succ to get filter result, table num: %d", (int)taosArrayGetSize(pUidList));
qDebug("succ to get filter result, table num: %d", (int)taosArrayGetSize(pUidList));
}
}
}
@ -1165,7 +1167,8 @@ int32_t getTableList(void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, S
memcpy(pPayload + sizeof(int32_t), taosArrayGet(pUidList, 0), numOfTables * sizeof(uint64_t));
}
pStorageAPI->metaFn.putCachedTableList(pVnode, pScanNode->suid, context.digest, tListLen(context.digest), pPayload, size, 1);
pStorageAPI->metaFn.putCachedTableList(pVnode, pScanNode->suid, context.digest, tListLen(context.digest),
pPayload, size, 1);
digest[0] = 1;
memcpy(digest + 1, context.digest, tListLen(context.digest));
}
@ -1725,7 +1728,8 @@ SColumn extractColumnFromColumnNode(SColumnNode* pColNode) {
return c;
}
int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysiNode* pTableScanNode, const SReadHandle* readHandle) {
int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysiNode* pTableScanNode,
const SReadHandle* readHandle) {
pCond->order = pTableScanNode->scanSeq[0] > 0 ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
pCond->numOfCols = LIST_LENGTH(pTableScanNode->scan.pScanCols);
@ -1748,8 +1752,7 @@ int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysi
// allowed read stt file optimization mode
pCond->notLoadData = (pTableScanNode->dataRequired == FUNC_DATA_REQUIRED_NOT_LOAD) &&
(pTableScanNode->scan.node.pConditions == NULL) &&
(pTableScanNode->interval == 0);
(pTableScanNode->scan.node.pConditions == NULL) && (pTableScanNode->interval == 0);
int32_t j = 0;
for (int32_t i = 0; i < pCond->numOfCols; ++i) {
@ -1891,7 +1894,8 @@ void getNextTimeWindow(const SInterval* pInterval, STimeWindow* tw, int32_t orde
int32_t factor = GET_FORWARD_DIRECTION_FACTOR(order);
slidingStart = taosTimeAdd(slidingStart, factor * pInterval->sliding, pInterval->slidingUnit, pInterval->precision);
tw->skey = taosTimeAdd(slidingStart, pInterval->offset, pInterval->offsetUnit, pInterval->precision);
int64_t slidingEnd = taosTimeAdd(slidingStart, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1;
int64_t slidingEnd =
taosTimeAdd(slidingStart, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1;
tw->ekey = taosTimeAdd(slidingEnd, pInterval->offset, pInterval->offsetUnit, pInterval->precision);
}
@ -2136,7 +2140,7 @@ int32_t buildGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle*
if (groupSort && groupByTbname) {
taosArraySort(pTableListInfo->pTableList, orderbyGroupIdComparFn);
pTableListInfo->numOfOuputGroups = numOfTables;
} else if (groupByTbname && pScanNode->groupOrderScan){
} else if (groupByTbname && pScanNode->groupOrderScan) {
pTableListInfo->numOfOuputGroups = numOfTables;
} else if (groupByTbname && tsCountAlwaysReturnValue && ((STableScanPhysiNode*)pScanNode)->needCountEmptyTable) {
pTableListInfo->numOfOuputGroups = numOfTables;
@ -2147,7 +2151,8 @@ int32_t buildGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle*
bool initRemainGroups = false;
if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == nodeType(pScanNode)) {
STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pScanNode;
if (tsCountAlwaysReturnValue && pTableScanNode->needCountEmptyTable && !(groupSort || pScanNode->groupOrderScan)) {
if (tsCountAlwaysReturnValue && pTableScanNode->needCountEmptyTable &&
!(groupSort || pScanNode->groupOrderScan)) {
initRemainGroups = true;
}
}
@ -2271,7 +2276,7 @@ void printSpecDataBlock(SSDataBlock* pBlock, const char* flag, const char* opStr
}
if (qDebugFlag & DEBUG_DEBUG) {
char* pBuf = NULL;
char flagBuf[64];
char flagBuf[64];
snprintf(flagBuf, sizeof(flagBuf), "%s %s", flag, opStr);
qDebug("%s", dumpBlockData(pBlock, flagBuf, &pBuf, taskIdStr));
taosMemoryFree(pBuf);
@ -2280,7 +2285,7 @@ void printSpecDataBlock(SSDataBlock* pBlock, const char* flag, const char* opStr
TSKEY getStartTsKey(STimeWindow* win, const TSKEY* tsCols) { return tsCols == NULL ? win->skey : tsCols[0]; }
void updateTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pWin, int64_t delta) {
void updateTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pWin, int64_t delta) {
int64_t* ts = (int64_t*)pColData->pData;
int64_t duration = pWin->ekey - pWin->skey + delta;
@ -2289,13 +2294,14 @@ void updateTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pWin, int64_t
ts[4] = pWin->ekey + delta; // window end key
}
int32_t compKeys(const SArray* pSortGroupCols, const char* oldkeyBuf, int32_t oldKeysLen, const SSDataBlock* pBlock, int32_t rowIndex) {
int32_t compKeys(const SArray* pSortGroupCols, const char* oldkeyBuf, int32_t oldKeysLen, const SSDataBlock* pBlock,
int32_t rowIndex) {
SColumnDataAgg* pColAgg = NULL;
const char* isNull = oldkeyBuf;
const char* p = oldkeyBuf + sizeof(int8_t) * pSortGroupCols->size;
for (int32_t i = 0; i < pSortGroupCols->size; ++i) {
const SColumn* pCol = (SColumn*)TARRAY_GET_ELEM(pSortGroupCols, i);
const SColumn* pCol = (SColumn*)TARRAY_GET_ELEM(pSortGroupCols, i);
const SColumnInfoData* pColInfoData = TARRAY_GET_ELEM(pBlock->pDataBlock, pCol->slotId);
if (pBlock->pBlockAgg) pColAgg = pBlock->pBlockAgg[pCol->slotId];
@ -2321,8 +2327,7 @@ int32_t compKeys(const SArray* pSortGroupCols, const char* oldkeyBuf, int32_t ol
return 0;
}
int32_t buildKeys(char* keyBuf, const SArray* pSortGroupCols, const SSDataBlock* pBlock,
int32_t rowIndex) {
int32_t buildKeys(char* keyBuf, const SArray* pSortGroupCols, const SSDataBlock* pBlock, int32_t rowIndex) {
uint32_t colNum = pSortGroupCols->size;
SColumnDataAgg* pColAgg = NULL;
char* isNull = keyBuf;
@ -2370,7 +2375,7 @@ uint64_t calcGroupId(char* pData, int32_t len) {
}
SNodeList* makeColsNodeArrFromSortKeys(SNodeList* pSortKeys) {
SNode* node;
SNode* node;
SNodeList* ret = NULL;
FOREACH(node, pSortKeys) {
SOrderByExprNode* pSortKey = (SOrderByExprNode*)node;
@ -2386,6 +2391,6 @@ int32_t extractKeysLen(const SArray* keys) {
SColumn* pCol = (SColumn*)taosArrayGet(keys, i);
len += pCol->bytes;
}
len += sizeof(int8_t) * keyNum; //null flag
len += sizeof(int8_t) * keyNum; // null flag
return len;
}

View File

@ -486,6 +486,7 @@ int32_t schHandleNotifyCallback(void *param, SDataBuf *pMsg, int32_t code) {
SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param;
qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 " task notify rsp received, code:0x%x", pParam->queryId, pParam->taskId,
code);
rpcReleaseHandle(pMsg->handle, TAOS_CONN_CLIENT);
if (pMsg) {
taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet);
@ -526,6 +527,7 @@ int32_t schHandleHbCallback(void *param, SDataBuf *pMsg, int32_t code) {
if (code) {
qError("hb rsp error:%s", tstrerror(code));
rpcReleaseHandle(pMsg->handle, TAOS_CONN_CLIENT);
SCH_ERR_JRET(code);
}
@ -1181,7 +1183,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
qMsg.queryId = pJob->queryId;
qMsg.taskId = pTask->taskId;
qMsg.refId = pJob->refId;
qMsg.execId = pTask->execId;
qMsg.execId = *(int32_t*)param;
msgSize = tSerializeSTaskDropReq(NULL, 0, &qMsg);
if (msgSize < 0) {

View File

@ -371,14 +371,13 @@ int32_t schChkUpdateRedirectCtx(SSchJob *pJob, SSchTask *pTask, SEpSet *pEpSet,
pCtx->roundTotal = pEpSet->numOfEps;
}
if (pCtx->roundTimes >= pCtx->roundTotal) {
int64_t nowTs = taosGetTimestampMs();
int64_t lastTime = nowTs - pCtx->startTs;
if (lastTime > tsMaxRetryWaitTime) {
SCH_TASK_DLOG("task no more redirect retry since timeout, now:%" PRId64 ", start:%" PRId64 ", max:%d, total:%d",
nowTs, pCtx->startTs, tsMaxRetryWaitTime, pCtx->totalTimes);
pJob->noMoreRetry = true;
pJob->noMoreRetry = true;
SCH_ERR_RET(SCH_GET_REDIRECT_CODE(pJob, rspCode));
}
@ -418,7 +417,7 @@ void schResetTaskForRetry(SSchJob *pJob, SSchTask *pTask) {
taosMemoryFreeClear(pTask->msg);
pTask->msgLen = 0;
pTask->lastMsgType = 0;
pTask->childReady = 0;
pTask->childReady = 0;
memset(&pTask->succeedAddr, 0, sizeof(pTask->succeedAddr));
}
@ -505,11 +504,11 @@ int32_t schHandleTaskSetRetry(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, i
pLevel->taskExecDoneNum = 0;
pLevel->taskLaunchedNum = 0;
}
SCH_RESET_JOB_LEVEL_IDX(pJob);
code = schDoTaskRedirect(pJob, pTask, pData, rspCode);
taosMemoryFreeClear(pData->pData);
taosMemoryFreeClear(pData->pEpSet);
@ -627,7 +626,7 @@ int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bo
pTask->maxRetryTimes);
return TSDB_CODE_SUCCESS;
}
if (TSDB_CODE_SCH_TIMEOUT_ERROR == errCode) {
pTask->maxExecTimes++;
pTask->maxRetryTimes++;
@ -862,7 +861,9 @@ void schDropTaskOnExecNode(SSchJob *pJob, SSchTask *pTask) {
while (nodeInfo) {
if (nodeInfo->handle) {
SCH_SET_TASK_HANDLE(pTask, nodeInfo->handle);
schBuildAndSendMsg(pJob, pTask, &nodeInfo->addr, TDMT_SCH_DROP_TASK, NULL);
void *pExecId = taosHashGetKey(nodeInfo, NULL);
schBuildAndSendMsg(pJob, pTask, &nodeInfo->addr, TDMT_SCH_DROP_TASK, pExecId);
SCH_TASK_DLOG("start to drop task's %dth execNode", i);
} else {
SCH_TASK_DLOG("no need to drop task %dth execNode", i);
@ -901,7 +902,6 @@ int32_t schNotifyTaskOnExecNode(SSchJob *pJob, SSchTask *pTask, ETaskNotifyType
return TSDB_CODE_SUCCESS;
}
int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId *pEpId, SArray *pStatusList) {
int32_t taskNum = (int32_t)taosArrayGetSize(pStatusList);
SSchTask *pTask = NULL;
@ -1269,7 +1269,7 @@ int32_t schNotifyTaskInHashList(SSchJob *pJob, SHashObj *list, ETaskNotifyType t
int32_t code = TSDB_CODE_SUCCESS;
SCH_ERR_RET(schNotifyTaskOnExecNode(pJob, pCurrTask, type));
void *pIter = taosHashIterate(list, NULL);
while (pIter) {
SSchTask *pTask = *(SSchTask **)pIter;
@ -1277,7 +1277,7 @@ int32_t schNotifyTaskInHashList(SSchJob *pJob, SHashObj *list, ETaskNotifyType t
SCH_LOCK_TASK(pTask);
code = schNotifyTaskOnExecNode(pJob, pTask, type);
SCH_UNLOCK_TASK(pTask);
if (TSDB_CODE_SUCCESS != code) {
break;
}
@ -1289,7 +1289,6 @@ int32_t schNotifyTaskInHashList(SSchJob *pJob, SHashObj *list, ETaskNotifyType t
SCH_RET(code);
}
int32_t schExecRemoteFetch(SSchJob *pJob, SSchTask *pTask) {
SCH_RET(schBuildAndSendMsg(pJob, pJob->fetchTask, &pJob->resNode, SCH_FETCH_TYPE(pJob->fetchTask), NULL));
}

View File

@ -19,16 +19,12 @@ extern "C" {
#endif
#include <uv.h>
#include "os.h"
#include "taoserror.h"
#include "theap.h"
#include "tmisce.h"
#include "tmsg.h"
#include "transLog.h"
#include "transportInt.h"
#include "trpc.h"
#include "ttrace.h"
#include "tutil.h"
typedef bool (*FilteFunc)(void* arg);
@ -115,9 +111,12 @@ typedef SRpcConnInfo STransHandleInfo;
// ref mgt handle
typedef struct SExHandle {
void* handle;
int64_t refId;
void* pThrd;
void* handle;
int64_t refId;
void* pThrd;
queue q;
int8_t inited;
SRWLatch latch;
} SExHandle;
typedef struct {

View File

@ -92,6 +92,7 @@ typedef struct SCliMsg {
int64_t refId;
uint64_t st;
int sent; //(0: no send, 1: alread sent)
queue seqq;
} SCliMsg;
typedef struct SCliThrd {
@ -121,11 +122,7 @@ typedef struct SCliThrd {
SHashObj* batchCache;
SCliMsg* stopMsg;
bool quit;
int newConnCount;
SHashObj* msgCount;
bool quit;
} SCliThrd;
typedef struct SCliObj {
@ -262,10 +259,8 @@ static void cliWalkCb(uv_handle_t* handle, void* arg);
} \
if (i == sz) { \
pMsg = NULL; \
tDebug("msg not found, %" PRIu64 "", ahandle); \
} else { \
pMsg = transQueueRm(&conn->cliMsgs, i); \
tDebug("msg found, %" PRIu64 "", ahandle); \
} \
} while (0)
@ -343,6 +338,34 @@ bool cliMaySendCachedMsg(SCliConn* conn) {
_RETURN:
return false;
}
bool cliConnSendSeqMsg(int64_t refId, SCliConn* conn) {
if (refId == 0) return false;
SExHandle* exh = transAcquireExHandle(transGetRefMgt(), refId);
if (exh == NULL) {
tDebug("release conn %p, refId: %" PRId64 "", conn, refId);
return false;
}
taosWLockLatch(&exh->latch);
if (exh->handle == NULL) exh->handle = conn;
exh->inited = 1;
if (!QUEUE_IS_EMPTY(&exh->q)) {
queue* h = QUEUE_HEAD(&exh->q);
QUEUE_REMOVE(h);
taosWUnLockLatch(&exh->latch);
SCliMsg* t = QUEUE_DATA(h, SCliMsg, seqq);
transCtxMerge(&conn->ctx, &t->ctx->appCtx);
transQueuePush(&conn->cliMsgs, t);
tDebug("pop from conn %p, refId: %" PRId64 "", conn, refId);
transReleaseExHandle(transGetRefMgt(), refId);
cliSend(conn);
return true;
}
taosWUnLockLatch(&exh->latch);
tDebug("empty conn %p, refId: %" PRId64 "", conn, refId);
transReleaseExHandle(transGetRefMgt(), refId);
return false;
}
void cliHandleResp(SCliConn* conn) {
SCliThrd* pThrd = conn->hostThrd;
STrans* pTransInst = pThrd->pTransInst;
@ -439,8 +462,14 @@ void cliHandleResp(SCliConn* conn) {
return;
}
}
int64_t refId = (pMsg == NULL ? 0 : (int64_t)(pMsg->msg.info.handle));
tDebug("conn %p msg refId: %" PRId64 "", conn, refId);
destroyCmsg(pMsg);
if (cliConnSendSeqMsg(refId, conn)) {
return;
}
if (cliMaySendCachedMsg(conn) == true) {
return;
}
@ -451,6 +480,21 @@ void cliHandleResp(SCliConn* conn) {
uv_read_start((uv_stream_t*)conn->stream, cliAllocRecvBufferCb, cliRecvCb);
}
static void cliDestroyMsgInExhandle(int64_t refId) {
if (refId == 0) return;
SExHandle* exh = transAcquireExHandle(transGetRefMgt(), refId);
if (exh) {
taosWLockLatch(&exh->latch);
while (!QUEUE_IS_EMPTY(&exh->q)) {
queue* h = QUEUE_HEAD(&exh->q);
QUEUE_REMOVE(h);
SCliMsg* t = QUEUE_DATA(h, SCliMsg, seqq);
destroyCmsg(t);
}
taosWUnLockLatch(&exh->latch);
transReleaseExHandle(transGetRefMgt(), refId);
}
}
void cliHandleExceptImpl(SCliConn* pConn, int32_t code) {
if (transQueueEmpty(&pConn->cliMsgs)) {
@ -510,6 +554,8 @@ void cliHandleExceptImpl(SCliConn* pConn, int32_t code) {
}
if (pMsg == NULL || (pMsg && pMsg->type != Release)) {
int64_t refId = (pMsg == NULL ? 0 : (int64_t)(pMsg->msg.info.handle));
cliDestroyMsgInExhandle(refId);
if (cliAppCb(pConn, &transMsg, pMsg) != 0) {
return;
}
@ -678,7 +724,7 @@ static SCliConn* getConnFromPool2(SCliThrd* pThrd, char* key, SCliMsg** pMsg) {
}
list->numOfConn++;
}
tTrace("%s numOfConn: %d, limit: %d", pTransInst->label, list->numOfConn, pTransInst->connLimitNum);
tDebug("%s numOfConn: %d, limit: %d, dst:%s", pTransInst->label, list->numOfConn, pTransInst->connLimitNum, key);
return NULL;
}
@ -742,13 +788,13 @@ static void addConnToPool(void* pool, SCliConn* conn) {
QUEUE_PUSH(&conn->list->conns, &conn->q);
conn->list->size += 1;
if (conn->list->size >= 20) {
if (conn->list->size >= 10) {
STaskArg* arg = taosMemoryCalloc(1, sizeof(STaskArg));
arg->param1 = conn;
arg->param2 = thrd;
STrans* pTransInst = thrd->pTransInst;
conn->task = transDQSched(thrd->timeoutQueue, doCloseIdleConn, arg, CONN_PERSIST_TIME(pTransInst->idleTime));
conn->task = transDQSched(thrd->timeoutQueue, doCloseIdleConn, arg, 10 * CONN_PERSIST_TIME(pTransInst->idleTime));
}
}
static int32_t allocConnRef(SCliConn* conn, bool update) {
@ -761,8 +807,10 @@ static int32_t allocConnRef(SCliConn* conn, bool update) {
exh->handle = conn;
exh->pThrd = conn->hostThrd;
exh->refId = transAddExHandle(transGetRefMgt(), exh);
conn->refId = exh->refId;
QUEUE_INIT(&exh->q);
taosInitRWLatch(&exh->latch);
conn->refId = exh->refId;
if (conn->refId == -1) {
taosMemoryFree(exh);
}
@ -779,9 +827,11 @@ static int32_t specifyConnRef(SCliConn* conn, bool update, int64_t handle) {
if (exh == NULL) {
return -1;
}
taosWLockLatch(&exh->latch);
exh->handle = conn;
exh->pThrd = conn->hostThrd;
conn->refId = exh->refId;
taosWUnLockLatch(&exh->latch);
transReleaseExHandle(transGetRefMgt(), handle);
return 0;
@ -882,7 +932,6 @@ static void cliDestroyConn(SCliConn* conn, bool clear) {
}
conn->list = NULL;
pThrd->newConnCount--;
transReleaseExHandle(transGetRefMgt(), conn->refId);
transRemoveExHandle(transGetRefMgt(), conn->refId);
@ -1190,7 +1239,6 @@ static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) {
addr.sin_port = (uint16_t)htons(pList->port);
tTrace("%s conn %p try to connect to %s", pTransInst->label, conn, pList->dst);
pThrd->newConnCount++;
int32_t fd = taosCreateSocketWithTimeout(TRANS_CONN_TIMEOUT * 10);
if (fd == -1) {
tError("%s conn %p failed to create socket, reason:%s", transLabel(pTransInst), conn,
@ -1392,7 +1440,10 @@ static void cliHandleRelease(SCliMsg* pMsg, SCliThrd* pThrd) {
return;
}
taosRLockLatch(&exh->latch);
SCliConn* conn = exh->handle;
taosRUnLockLatch(&exh->latch);
transReleaseExHandle(transGetRefMgt(), refId);
tDebug("%s conn %p start to release to inst", CONN_GET_INST_LABEL(conn), conn);
@ -1425,7 +1476,9 @@ SCliConn* cliGetConn(SCliMsg** pMsg, SCliThrd* pThrd, bool* ignore, char* addr)
*ignore = true;
return NULL;
} else {
taosRLockLatch(&exh->latch);
conn = exh->handle;
taosRUnLockLatch(&exh->latch);
if (conn == NULL) {
conn = getConnFromPool2(pThrd, addr, pMsg);
if (conn != NULL) specifyConnRef(conn, true, refId);
@ -1439,7 +1492,7 @@ SCliConn* cliGetConn(SCliMsg** pMsg, SCliThrd* pThrd, bool* ignore, char* addr)
if (conn != NULL) {
tTrace("%s conn %p get from conn pool:%p", CONN_GET_INST_LABEL(conn), conn, pThrd->pool);
} else {
tTrace("%s not found conn in conn pool:%p", ((STrans*)pThrd->pTransInst)->label, pThrd->pool);
tTrace("%s not found conn in conn pool:%p, dst:%s", ((STrans*)pThrd->pTransInst)->label, pThrd->pool, addr);
}
return conn;
}
@ -1598,7 +1651,6 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
addr.sin_port = (uint16_t)htons(port);
tGTrace("%s conn %p try to connect to %s", pTransInst->label, conn, conn->dstAddr);
pThrd->newConnCount++;
int32_t fd = taosCreateSocketWithTimeout(TRANS_CONN_TIMEOUT * 10);
if (fd == -1) {
tGError("%s conn %p failed to create socket, reason:%s", transLabel(pTransInst), conn,
@ -1858,9 +1910,10 @@ void cliIteraConnMsgs(SCliConn* conn) {
bool cliRecvReleaseReq(SCliConn* conn, STransMsgHead* pHead) {
if (pHead->release == 1 && (pHead->msgLen) == sizeof(*pHead)) {
uint64_t ahandle = pHead->ahandle;
tDebug("ahandle = %" PRIu64 "", ahandle);
SCliMsg* pMsg = NULL;
CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle);
tDebug("%s conn %p receive release request, refId:%" PRId64 ", may ignore", CONN_GET_INST_LABEL(conn), conn,
conn->refId);
transClearBuffer(&conn->readBuf);
transFreeMsg(transContFromHead((char*)pHead));
@ -1869,6 +1922,9 @@ bool cliRecvReleaseReq(SCliConn* conn, STransMsgHead* pHead) {
SCliMsg* cliMsg = transQueueGet(&conn->cliMsgs, i);
if (cliMsg->type == Release) {
ASSERTS(pMsg == NULL, "trans-cli recv invaid release-req");
tDebug("%s conn %p receive release request, refId:%" PRId64 ", ignore msg", CONN_GET_INST_LABEL(conn), conn,
conn->refId);
cliDestroyConn(conn, true);
return true;
}
}
@ -1984,11 +2040,9 @@ static SCliThrd* createThrdObj(void* trans) {
taosMemoryFree(pThrd);
return NULL;
}
if (pTransInst->supportBatch) {
pThrd->asyncPool = transAsyncPoolCreate(pThrd->loop, 4, pThrd, cliAsyncCb);
} else {
pThrd->asyncPool = transAsyncPoolCreate(pThrd->loop, 8, pThrd, cliAsyncCb);
}
int32_t nSync = pTransInst->supportBatch ? 4 : 8;
pThrd->asyncPool = transAsyncPoolCreate(pThrd->loop, nSync, pThrd, cliAsyncCb);
if (pThrd->asyncPool == NULL) {
tError("failed to init async pool");
uv_loop_close(pThrd->loop);
@ -2029,8 +2083,6 @@ static SCliThrd* createThrdObj(void* trans) {
pThrd->quit = false;
pThrd->newConnCount = 0;
pThrd->msgCount = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
return pThrd;
}
static void destroyThrdObj(SCliThrd* pThrd) {
@ -2076,7 +2128,6 @@ static void destroyThrdObj(SCliThrd* pThrd) {
pIter = (void**)taosHashIterate(pThrd->batchCache, pIter);
}
taosHashCleanup(pThrd->batchCache);
taosHashCleanup(pThrd->msgCount);
taosMemoryFree(pThrd);
}
@ -2095,14 +2146,7 @@ void cliSendQuit(SCliThrd* thrd) {
void cliWalkCb(uv_handle_t* handle, void* arg) {
if (!uv_is_closing(handle)) {
if (uv_handle_get_type(handle) == UV_TIMER) {
// SCliConn* pConn = handle->data;
// if (pConn != NULL && pConn->timer != NULL) {
// SCliThrd* pThrd = pConn->hostThrd;
// uv_timer_stop((uv_timer_t*)handle);
// handle->data = NULL;
// taosArrayPush(pThrd->timerList, &pConn->timer);
// pConn->timer = NULL;
// }
// do nothing
} else {
uv_read_stop((uv_stream_t*)handle);
}
@ -2137,18 +2181,23 @@ static void doCloseIdleConn(void* param) {
cliDestroyConn(conn, true);
taosMemoryFree(arg);
}
static void cliSchedMsgToDebug(SCliMsg* pMsg, char* label) {
if (!(rpcDebugFlag & DEBUG_DEBUG)) {
return;
}
STransConnCtx* pCtx = pMsg->ctx;
STraceId* trace = &pMsg->msg.info.traceId;
char tbuf[512] = {0};
EPSET_TO_STR(&pCtx->epSet, tbuf);
tGDebug("%s retry on next node,use:%s, step: %d,timeout:%" PRId64 "", label, tbuf, pCtx->retryStep,
pCtx->retryNextInterval);
return;
}
static void cliSchedMsgToNextNode(SCliMsg* pMsg, SCliThrd* pThrd) {
STrans* pTransInst = pThrd->pTransInst;
STransConnCtx* pCtx = pMsg->ctx;
if (rpcDebugFlag & DEBUG_DEBUG) {
STraceId* trace = &pMsg->msg.info.traceId;
char tbuf[512] = {0};
EPSET_TO_STR(&pCtx->epSet, tbuf);
tGDebug("%s retry on next node,use:%s, step: %d,timeout:%" PRId64 "", transLabel(pThrd->pTransInst), tbuf,
pCtx->retryStep, pCtx->retryNextInterval);
}
cliSchedMsgToDebug(pMsg, transLabel(pThrd->pTransInst));
STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg));
arg->param1 = pMsg;
@ -2157,12 +2206,6 @@ static void cliSchedMsgToNextNode(SCliMsg* pMsg, SCliThrd* pThrd) {
transDQSched(pThrd->delayQueue, doDelayTask, arg, pCtx->retryNextInterval);
}
FORCE_INLINE void cliCompareAndSwap(int8_t* val, int8_t exp, int8_t newVal) {
if (*val != exp) {
*val = newVal;
}
}
FORCE_INLINE bool cliTryExtractEpSet(STransMsg* pResp, SEpSet* dst) {
if ((pResp == NULL || pResp->info.hasEpSet == 0)) {
return false;
@ -2504,21 +2547,7 @@ int transReleaseCliHandle(void* handle) {
}
return 0;
}
int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransCtx* ctx) {
STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle);
if (pTransInst == NULL) {
transFreeMsg(pReq->pCont);
return TSDB_CODE_RPC_BROKEN_LINK;
}
SCliThrd* pThrd = transGetWorkThrd(pTransInst, (int64_t)pReq->info.handle);
if (pThrd == NULL) {
transFreeMsg(pReq->pCont);
transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
return TSDB_CODE_RPC_BROKEN_LINK;
}
static SCliMsg* transInitMsg(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransCtx* ctx) {
TRACE_SET_MSGID(&pReq->info.traceId, tGenIdPI64());
STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx));
epsetAssign(&pCtx->epSet, pEpSet);
@ -2535,12 +2564,48 @@ int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STran
cliMsg->st = taosGetTimestampUs();
cliMsg->type = Normal;
cliMsg->refId = (int64_t)shandle;
QUEUE_INIT(&cliMsg->seqq);
return cliMsg;
}
int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransCtx* ctx) {
STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle);
if (pTransInst == NULL) {
transFreeMsg(pReq->pCont);
return TSDB_CODE_RPC_BROKEN_LINK;
}
int64_t handle = (int64_t)pReq->info.handle;
SCliThrd* pThrd = transGetWorkThrd(pTransInst, handle);
if (pThrd == NULL) {
transFreeMsg(pReq->pCont);
transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
return TSDB_CODE_RPC_BROKEN_LINK;
}
if (handle != 0) {
SExHandle* exh = transAcquireExHandle(transGetRefMgt(), handle);
if (exh != NULL) {
taosWLockLatch(&exh->latch);
if (exh->handle == NULL && exh->inited != 0) {
SCliMsg* pCliMsg = transInitMsg(shandle, pEpSet, pReq, ctx);
QUEUE_PUSH(&exh->q, &pCliMsg->seqq);
taosWUnLockLatch(&exh->latch);
tDebug("msg refId: %" PRId64 "", handle);
transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
return 0;
}
exh->inited = 1;
taosWUnLockLatch(&exh->latch);
transReleaseExHandle(transGetRefMgt(), handle);
}
}
SCliMsg* pCliMsg = transInitMsg(shandle, pEpSet, pReq, ctx);
STraceId* trace = &pReq->info.traceId;
tGDebug("%s send request at thread:%08" PRId64 ", dst:%s:%d, app:%p", transLabel(pTransInst), pThrd->pid,
EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle);
if (0 != transAsyncSend(pThrd->asyncPool, &(cliMsg->q))) {
destroyCmsg(cliMsg);
EPSET_GET_INUSE_IP(pEpSet), EPSET_GET_INUSE_PORT(pEpSet), pReq->info.ahandle);
if (0 != transAsyncSend(pThrd->asyncPool, &(pCliMsg->q))) {
destroyCmsg(pCliMsg);
transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
return TSDB_CODE_RPC_BROKEN_LINK;
}
@ -2726,6 +2791,8 @@ int transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn) {
int64_t transAllocHandle() {
SExHandle* exh = taosMemoryCalloc(1, sizeof(SExHandle));
exh->refId = transAddExHandle(transGetRefMgt(), exh);
QUEUE_INIT(&exh->q);
taosInitRWLatch(&exh->latch);
tDebug("pre alloc refId %" PRId64 "", exh->refId);
return exh->refId;

View File

@ -761,9 +761,12 @@ static bool uvRecvReleaseReq(SSvrConn* pConn, STransMsgHead* pHead) {
tTrace("conn %p received release request", pConn);
STraceId traceId = pHead->traceId;
pConn->status = ConnRelease;
transClearBuffer(&pConn->readBuf);
transFreeMsg(transContFromHead((char*)pHead));
if (pConn->status != ConnAcquire) {
return true;
}
pConn->status = ConnRelease;
STransMsg tmsg = {.code = 0, .info.handle = (void*)pConn, .info.traceId = traceId, .info.ahandle = (void*)0x9527};
SSvrMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSvrMsg));
@ -1090,6 +1093,7 @@ static FORCE_INLINE SSvrConn* createConn(void* hThrd) {
STrans* pTransInst = pThrd->pTransInst;
pConn->refId = exh->refId;
QUEUE_INIT(&exh->q);
transRefSrvHandle(pConn);
tTrace("%s handle %p, conn %p created, refId:%" PRId64, transLabel(pTransInst), exh, pConn, pConn->refId);
return pConn;
@ -1121,6 +1125,7 @@ static int reallocConnRef(SSvrConn* conn) {
exh->handle = conn;
exh->pThrd = conn->hostThrd;
exh->refId = transAddExHandle(transGetRefMgt(), exh);
QUEUE_INIT(&exh->q);
transAcquireExHandle(transGetRefMgt(), exh->refId);
conn->refId = exh->refId;