commit
dd28770bf7
|
@ -25,9 +25,9 @@ extern "C" {
|
|||
#include "tarray.h"
|
||||
#include "thash.h"
|
||||
#include "tlog.h"
|
||||
#include "tsimplehash.h"
|
||||
#include "tmsg.h"
|
||||
#include "tmsgcb.h"
|
||||
#include "tsimplehash.h"
|
||||
|
||||
typedef enum {
|
||||
JOB_TASK_STATUS_NULL = 0,
|
||||
|
@ -69,16 +69,16 @@ typedef enum {
|
|||
#define QUERY_MSG_MASK_SHOW_REWRITE() (1 << 0)
|
||||
#define QUERY_MSG_MASK_AUDIT() (1 << 1)
|
||||
#define QUERY_MSG_MASK_VIEW() (1 << 2)
|
||||
#define TEST_SHOW_REWRITE_MASK(m) (((m) & QUERY_MSG_MASK_SHOW_REWRITE()) != 0)
|
||||
#define TEST_AUDIT_MASK(m) (((m) & QUERY_MSG_MASK_AUDIT()) != 0)
|
||||
#define TEST_VIEW_MASK(m) (((m) & QUERY_MSG_MASK_VIEW()) != 0)
|
||||
#define TEST_SHOW_REWRITE_MASK(m) (((m)&QUERY_MSG_MASK_SHOW_REWRITE()) != 0)
|
||||
#define TEST_AUDIT_MASK(m) (((m)&QUERY_MSG_MASK_AUDIT()) != 0)
|
||||
#define TEST_VIEW_MASK(m) (((m)&QUERY_MSG_MASK_VIEW()) != 0)
|
||||
|
||||
typedef struct STableComInfo {
|
||||
uint8_t numOfTags; // the number of tags in schema
|
||||
uint8_t precision; // the number of precision
|
||||
col_id_t numOfColumns; // the number of columns
|
||||
int16_t numOfPKs;
|
||||
int32_t rowSize; // row size of the schema
|
||||
int32_t rowSize; // row size of the schema
|
||||
} STableComInfo;
|
||||
|
||||
typedef struct SIndexMeta {
|
||||
|
@ -119,8 +119,9 @@ typedef struct STableMeta {
|
|||
int32_t sversion;
|
||||
int32_t tversion;
|
||||
STableComInfo tableInfo;
|
||||
SSchemaExt* schemaExt; // There is no additional memory allocation, and the pointer is fixed to the next address of the schema content.
|
||||
SSchema schema[];
|
||||
SSchemaExt* schemaExt; // There is no additional memory allocation, and the pointer is fixed to the next address of
|
||||
// the schema content.
|
||||
SSchema schema[];
|
||||
} STableMeta;
|
||||
#pragma pack(pop)
|
||||
|
||||
|
@ -196,9 +197,9 @@ typedef struct SBoundColInfo {
|
|||
} SBoundColInfo;
|
||||
|
||||
typedef struct STableColsData {
|
||||
char tbName[TSDB_TABLE_NAME_LEN];
|
||||
SArray* aCol;
|
||||
bool getFromHash;
|
||||
char tbName[TSDB_TABLE_NAME_LEN];
|
||||
SArray* aCol;
|
||||
bool getFromHash;
|
||||
} STableColsData;
|
||||
|
||||
typedef struct STableVgUid {
|
||||
|
@ -207,15 +208,14 @@ typedef struct STableVgUid {
|
|||
} STableVgUid;
|
||||
|
||||
typedef struct STableBufInfo {
|
||||
void* pCurBuff;
|
||||
SArray* pBufList;
|
||||
int64_t buffUnit;
|
||||
int64_t buffSize;
|
||||
int64_t buffIdx;
|
||||
int64_t buffOffset;
|
||||
void* pCurBuff;
|
||||
SArray* pBufList;
|
||||
int64_t buffUnit;
|
||||
int64_t buffSize;
|
||||
int64_t buffIdx;
|
||||
int64_t buffOffset;
|
||||
} STableBufInfo;
|
||||
|
||||
|
||||
typedef struct STableDataCxt {
|
||||
STableMeta* pMeta;
|
||||
STSchema* pSchema;
|
||||
|
@ -237,23 +237,22 @@ typedef struct SStbInterlaceInfo {
|
|||
void* pRequest;
|
||||
uint64_t requestId;
|
||||
int64_t requestSelf;
|
||||
bool tbFromHash;
|
||||
bool tbFromHash;
|
||||
SHashObj* pVgroupHash;
|
||||
SArray* pVgroupList;
|
||||
SSHashObj* pTableHash;
|
||||
int64_t tbRemainNum;
|
||||
STableBufInfo tbBuf;
|
||||
char firstName[TSDB_TABLE_NAME_LEN];
|
||||
STSchema *pTSchema;
|
||||
STableDataCxt *pDataCtx;
|
||||
void *boundTags;
|
||||
STSchema* pTSchema;
|
||||
STableDataCxt* pDataCtx;
|
||||
void* boundTags;
|
||||
|
||||
bool tableColsReady;
|
||||
SArray *pTableCols;
|
||||
int32_t pTableColsIdx;
|
||||
bool tableColsReady;
|
||||
SArray* pTableCols;
|
||||
int32_t pTableColsIdx;
|
||||
} SStbInterlaceInfo;
|
||||
|
||||
|
||||
typedef int32_t (*__async_send_cb_fn_t)(void* param, SDataBuf* pMsg, int32_t code);
|
||||
typedef int32_t (*__async_exec_fn_t)(void* param);
|
||||
|
||||
|
@ -308,6 +307,8 @@ void destroyAhandle(void* ahandle);
|
|||
int32_t asyncSendMsgToServerExt(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, SMsgSendInfo* pInfo,
|
||||
bool persistHandle, void* ctx);
|
||||
|
||||
int32_t asyncFreeConnById(void* pTransporter, int64_t pid);
|
||||
;
|
||||
/**
|
||||
* Asynchronously send message to server, after the response received, the callback will be incured.
|
||||
*
|
||||
|
@ -325,7 +326,7 @@ void initQueryModuleMsgHandle();
|
|||
|
||||
const SSchema* tGetTbnameColumnSchema();
|
||||
bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTags);
|
||||
int32_t getAsofJoinReverseOp(EOperatorType op);
|
||||
int32_t getAsofJoinReverseOp(EOperatorType op);
|
||||
|
||||
int32_t queryCreateCTableMetaFromMsg(STableMetaRsp* msg, SCTableMeta* pMeta);
|
||||
int32_t queryCreateTableMetaFromMsg(STableMetaRsp* msg, bool isSuperTable, STableMeta** pMeta);
|
||||
|
@ -384,7 +385,7 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t
|
|||
|
||||
#define NEED_CLIENT_RM_TBLMETA_REQ(_type) \
|
||||
((_type) == TDMT_VND_CREATE_TABLE || (_type) == TDMT_MND_CREATE_STB || (_type) == TDMT_VND_DROP_TABLE || \
|
||||
(_type) == TDMT_MND_DROP_STB || (_type) == TDMT_MND_CREATE_VIEW || (_type) == TDMT_MND_DROP_VIEW || \
|
||||
(_type) == TDMT_MND_DROP_STB || (_type) == TDMT_MND_CREATE_VIEW || (_type) == TDMT_MND_DROP_VIEW || \
|
||||
(_type) == TDMT_MND_CREATE_TSMA || (_type) == TDMT_MND_DROP_TSMA || (_type) == TDMT_MND_DROP_TB_WITH_TSMA)
|
||||
|
||||
#define NEED_SCHEDULER_REDIRECT_ERROR(_code) \
|
||||
|
|
|
@ -125,6 +125,7 @@ typedef struct SRpcInit {
|
|||
int32_t timeToGetConn;
|
||||
int8_t supportBatch; // 0: no batch, 1. batch
|
||||
int32_t batchSize;
|
||||
int8_t notWaitAvaliableConn; // 1: wait to get, 0: no wait
|
||||
void *parent;
|
||||
} SRpcInit;
|
||||
|
||||
|
@ -158,18 +159,21 @@ void *rpcReallocCont(void *ptr, int64_t contLen);
|
|||
// Because taosd supports multi-process mode
|
||||
// These functions should not be used on the server side
|
||||
// Please use tmsg<xx> functions, which are defined in tmsgcb.h
|
||||
int rpcSendRequest(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid);
|
||||
int rpcSendResponse(const SRpcMsg *pMsg);
|
||||
int rpcRegisterBrokenLinkArg(SRpcMsg *msg);
|
||||
int rpcReleaseHandle(void *handle, int8_t type); // just release conn to rpc instance, no close sock
|
||||
int32_t rpcSendRequest(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid);
|
||||
int32_t rpcSendResponse(const SRpcMsg *pMsg);
|
||||
int32_t rpcRegisterBrokenLinkArg(SRpcMsg *msg);
|
||||
int32_t rpcReleaseHandle(void *handle, int8_t type); // just release conn to rpc instance, no close sock
|
||||
|
||||
// These functions will not be called in the child process
|
||||
int rpcSendRequestWithCtx(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid, SRpcCtx *ctx);
|
||||
int rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp);
|
||||
int rpcSendRecvWithTimeout(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp, int8_t *epUpdated,
|
||||
int32_t rpcSendRequestWithCtx(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid, SRpcCtx *ctx);
|
||||
int32_t rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp);
|
||||
int32_t rpcSendRecvWithTimeout(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp, int8_t *epUpdated,
|
||||
int32_t timeoutMs);
|
||||
int rpcSetDefaultAddr(void *thandle, const char *ip, const char *fqdn);
|
||||
void *rpcAllocHandle();
|
||||
|
||||
int32_t rpcFreeConnById(void *shandle, int64_t connId);
|
||||
|
||||
int32_t rpcSetDefaultAddr(void *thandle, const char *ip, const char *fqdn);
|
||||
int32_t rpcAllocHandle(int64_t *refId);
|
||||
int32_t rpcSetIpWhite(void *thandl, void *arg);
|
||||
|
||||
int32_t rpcUtilSIpRangeToStr(SIpV4Range *pRange, char *buf);
|
||||
|
|
|
@ -387,6 +387,7 @@ int32_t dmInitClient(SDnode *pDnode) {
|
|||
rpcInit.supportBatch = 1;
|
||||
rpcInit.batchSize = 8 * 1024;
|
||||
rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
|
||||
rpcInit.notWaitAvaliableConn = 1;
|
||||
|
||||
(void)taosVersionStrToInt(version, &(rpcInit.compatibilityVer));
|
||||
|
||||
|
|
|
@ -202,6 +202,7 @@ typedef struct SExchangeInfo {
|
|||
SLimitInfo limitInfo;
|
||||
int64_t openedTs; // start exec time stamp, todo: move to SLoadRemoteDataInfo
|
||||
char* pTaskId;
|
||||
SArray* pFetchRpcHandles;
|
||||
} SExchangeInfo;
|
||||
|
||||
typedef struct SScanInfo {
|
||||
|
|
|
@ -298,13 +298,13 @@ _end:
|
|||
pTaskInfo->code = code;
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
}
|
||||
(*ppRes) = NULL;
|
||||
(*ppRes) = NULL;
|
||||
return code;
|
||||
}
|
||||
|
||||
static SSDataBlock* loadRemoteData(SOperatorInfo* pOperator) {
|
||||
SSDataBlock* pRes = NULL;
|
||||
int32_t code = loadRemoteDataNext(pOperator, &pRes);
|
||||
int32_t code = loadRemoteDataNext(pOperator, &pRes);
|
||||
return pRes;
|
||||
}
|
||||
|
||||
|
@ -346,6 +346,14 @@ static int32_t initExchangeOperator(SExchangePhysiNode* pExNode, SExchangeInfo*
|
|||
qError("%s invalid number: %d of sources in exchange operator", id, (int32_t)numOfSources);
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
pInfo->pFetchRpcHandles = taosArrayInit(numOfSources, sizeof(int64_t));
|
||||
if (!pInfo->pFetchRpcHandles) {
|
||||
return terrno;
|
||||
}
|
||||
void* ret = taosArrayReserve(pInfo->pFetchRpcHandles, numOfSources);
|
||||
if (!ret) {
|
||||
return terrno;
|
||||
}
|
||||
|
||||
pInfo->pSources = taosArrayInit(numOfSources, sizeof(SDownstreamSourceNode));
|
||||
if (pInfo->pSources == NULL) {
|
||||
|
@ -384,6 +392,7 @@ static int32_t initExchangeOperator(SExchangePhysiNode* pExNode, SExchangeInfo*
|
|||
initLimitInfo(pExNode->node.pLimit, pExNode->node.pSlimit, &pInfo->limitInfo);
|
||||
pInfo->self = taosAddRef(exchangeObjRefPool, pInfo);
|
||||
|
||||
|
||||
return initDataSource(numOfSources, pInfo, id);
|
||||
}
|
||||
|
||||
|
@ -391,7 +400,7 @@ int32_t createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode* pExNo
|
|||
SOperatorInfo** pOptrInfo) {
|
||||
QRY_OPTR_CHECK(pOptrInfo);
|
||||
|
||||
int32_t code = 0;
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
SExchangeInfo* pInfo = taosMemoryCalloc(1, sizeof(SExchangeInfo));
|
||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||
|
@ -468,6 +477,14 @@ void freeSourceDataInfo(void* p) {
|
|||
|
||||
void doDestroyExchangeOperatorInfo(void* param) {
|
||||
SExchangeInfo* pExInfo = (SExchangeInfo*)param;
|
||||
for (int32_t i = 0; i < pExInfo->pFetchRpcHandles->size; ++i) {
|
||||
int64_t* pRpcHandle = taosArrayGet(pExInfo->pFetchRpcHandles, i);
|
||||
if (*pRpcHandle > 0) {
|
||||
SDownstreamSourceNode* pSource = taosArrayGet(pExInfo->pSources, i);
|
||||
(void)asyncFreeConnById(pExInfo->pTransporter, *pRpcHandle);
|
||||
}
|
||||
}
|
||||
taosArrayDestroy(pExInfo->pFetchRpcHandles);
|
||||
|
||||
taosArrayDestroy(pExInfo->pSources);
|
||||
taosArrayDestroyEx(pExInfo->pSourceDataInfo, freeSourceDataInfo);
|
||||
|
@ -495,6 +512,8 @@ int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) {
|
|||
}
|
||||
|
||||
int32_t index = pWrapper->sourceIndex;
|
||||
int64_t* pRpcHandle = taosArrayGet(pExchangeInfo->pFetchRpcHandles, index);
|
||||
*pRpcHandle = -1;
|
||||
SSourceDataInfo* pSourceDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, index);
|
||||
if (!pSourceDataInfo) {
|
||||
return terrno;
|
||||
|
@ -668,6 +687,8 @@ int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTas
|
|||
int64_t transporterId = 0;
|
||||
code = asyncSendMsgToServer(pExchangeInfo->pTransporter, &pSource->addr.epSet, &transporterId, pMsgSendInfo);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
int64_t* pRpcHandle = taosArrayGet(pExchangeInfo->pFetchRpcHandles, sourceIndex);
|
||||
*pRpcHandle = transporterId;
|
||||
}
|
||||
|
||||
_end:
|
||||
|
@ -690,7 +711,7 @@ int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, SArray* pCo
|
|||
int32_t lino = 0;
|
||||
if (pColList == NULL) { // data from other sources
|
||||
blockDataCleanup(pRes);
|
||||
code = blockDecode(pRes, pData, (const char**) pNextStart);
|
||||
code = blockDecode(pRes, pData, (const char**)pNextStart);
|
||||
if (code) {
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -225,6 +225,7 @@ int32_t asyncSendMsgToServerExt(void* pTransporter, SEpSet* epSet, int64_t* pTra
|
|||
.code = 0
|
||||
};
|
||||
TRACE_SET_ROOTID(&rpcMsg.info.traceId, pInfo->requestId);
|
||||
|
||||
int code = rpcSendRequestWithCtx(pTransporter, epSet, &rpcMsg, pTransporterId, rpcCtx);
|
||||
if (code) {
|
||||
destroySendMsgInfo(pInfo);
|
||||
|
@ -235,6 +236,9 @@ int32_t asyncSendMsgToServerExt(void* pTransporter, SEpSet* epSet, int64_t* pTra
|
|||
int32_t asyncSendMsgToServer(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, SMsgSendInfo* pInfo) {
|
||||
return asyncSendMsgToServerExt(pTransporter, epSet, pTransporterId, pInfo, false, NULL);
|
||||
}
|
||||
int32_t asyncFreeConnById(void* pTransporter, int64_t pid) {
|
||||
return rpcFreeConnById(pTransporter, pid);
|
||||
}
|
||||
|
||||
char* jobTaskStatusStr(int32_t status) {
|
||||
switch (status) {
|
||||
|
@ -448,13 +452,13 @@ void parseTagDatatoJson(void* p, char** jsonStr) {
|
|||
if (value == NULL) {
|
||||
goto end;
|
||||
}
|
||||
if(!cJSON_AddItemToObject(json, tagJsonKey, value)){
|
||||
if (!cJSON_AddItemToObject(json, tagJsonKey, value)) {
|
||||
goto end;
|
||||
}
|
||||
} else if (type == TSDB_DATA_TYPE_NCHAR) {
|
||||
cJSON* value = NULL;
|
||||
if (pTagVal->nData > 0) {
|
||||
char* tagJsonValue = taosMemoryCalloc(pTagVal->nData, 1);
|
||||
char* tagJsonValue = taosMemoryCalloc(pTagVal->nData, 1);
|
||||
if (tagJsonValue == NULL) {
|
||||
goto end;
|
||||
}
|
||||
|
@ -479,7 +483,7 @@ void parseTagDatatoJson(void* p, char** jsonStr) {
|
|||
goto end;
|
||||
}
|
||||
|
||||
if(!cJSON_AddItemToObject(json, tagJsonKey, value)){
|
||||
if (!cJSON_AddItemToObject(json, tagJsonKey, value)) {
|
||||
goto end;
|
||||
}
|
||||
} else if (type == TSDB_DATA_TYPE_DOUBLE) {
|
||||
|
@ -488,7 +492,7 @@ void parseTagDatatoJson(void* p, char** jsonStr) {
|
|||
if (value == NULL) {
|
||||
goto end;
|
||||
}
|
||||
if(!cJSON_AddItemToObject(json, tagJsonKey, value)){
|
||||
if (!cJSON_AddItemToObject(json, tagJsonKey, value)) {
|
||||
goto end;
|
||||
}
|
||||
} else if (type == TSDB_DATA_TYPE_BOOL) {
|
||||
|
@ -497,7 +501,7 @@ void parseTagDatatoJson(void* p, char** jsonStr) {
|
|||
if (value == NULL) {
|
||||
goto end;
|
||||
}
|
||||
if(!cJSON_AddItemToObject(json, tagJsonKey, value)){
|
||||
if (!cJSON_AddItemToObject(json, tagJsonKey, value)) {
|
||||
goto end;
|
||||
}
|
||||
} else {
|
||||
|
|
|
@ -17,11 +17,11 @@
|
|||
#include "command.h"
|
||||
#include "query.h"
|
||||
#include "schInt.h"
|
||||
#include "tglobal.h"
|
||||
#include "tmisce.h"
|
||||
#include "tmsg.h"
|
||||
#include "tref.h"
|
||||
#include "trpc.h"
|
||||
#include "tglobal.h"
|
||||
#include "tmisce.h"
|
||||
|
||||
// clang-format off
|
||||
int32_t schValidateRspMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgType) {
|
||||
|
@ -975,11 +975,13 @@ int32_t schAsyncSendMsg(SSchJob *pJob, SSchTask *pTask, SSchTrans *trans, SQuery
|
|||
SCH_ERR_JRET(schUpdateSendTargetInfo(pMsgSendInfo, addr, pTask));
|
||||
|
||||
if (isHb && persistHandle && trans->pHandle == 0) {
|
||||
trans->pHandle = rpcAllocHandle();
|
||||
if (NULL == trans->pHandle) {
|
||||
SCH_TASK_ELOG("rpcAllocHandle failed, code:%x", terrno);
|
||||
SCH_ERR_JRET(terrno);
|
||||
int64_t refId = 0;
|
||||
code = rpcAllocHandle(&refId);
|
||||
if (code != 0) {
|
||||
SCH_TASK_ELOG("rpcAllocHandle failed, code:%x", code);
|
||||
SCH_ERR_JRET(code);
|
||||
}
|
||||
trans->pHandle = (void *)refId;
|
||||
}
|
||||
|
||||
if (pJob && pTask) {
|
||||
|
@ -1200,7 +1202,14 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
|
|||
}
|
||||
|
||||
persistHandle = true;
|
||||
SCH_SET_TASK_HANDLE(pTask, rpcAllocHandle());
|
||||
int64_t refId = 0;
|
||||
code = rpcAllocHandle(&refId);
|
||||
if (code != 0) {
|
||||
SCH_TASK_ELOG("rpcAllocHandle failed, code:%x", code);
|
||||
SCH_ERR_JRET(code);
|
||||
}
|
||||
|
||||
SCH_SET_TASK_HANDLE(pTask, (void *)refId);
|
||||
break;
|
||||
}
|
||||
case TDMT_SCH_FETCH:
|
||||
|
|
|
@ -148,7 +148,6 @@ typedef struct {
|
|||
STransSyncMsg* pSyncMsg; // for syncchronous with timeout API
|
||||
int64_t syncMsgRef;
|
||||
SCvtAddr cvtAddr;
|
||||
bool setMaxRetry;
|
||||
|
||||
int32_t retryMinInterval;
|
||||
int32_t retryMaxInterval;
|
||||
|
@ -207,7 +206,7 @@ typedef struct {
|
|||
|
||||
#pragma pack(pop)
|
||||
|
||||
typedef enum { Normal, Quit, Release, Register, Update } STransMsgType;
|
||||
typedef enum { Normal, Quit, Release, Register, Update, FreeById } STransMsgType;
|
||||
typedef enum { ConnNormal, ConnAcquire, ConnRelease, ConnBroken, ConnInPool } ConnStatus;
|
||||
|
||||
#define container_of(ptr, type, member) ((type*)((char*)(ptr)-offsetof(type, member)))
|
||||
|
@ -304,10 +303,10 @@ int32_t transClearBuffer(SConnBuffer* buf);
|
|||
int32_t transDestroyBuffer(SConnBuffer* buf);
|
||||
int32_t transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf);
|
||||
bool transReadComplete(SConnBuffer* connBuf);
|
||||
int transResetBuffer(SConnBuffer* connBuf, int8_t resetBuf);
|
||||
int transDumpFromBuffer(SConnBuffer* connBuf, char** buf, int8_t resetBuf);
|
||||
int32_t transResetBuffer(SConnBuffer* connBuf, int8_t resetBuf);
|
||||
int32_t transDumpFromBuffer(SConnBuffer* connBuf, char** buf, int8_t resetBuf);
|
||||
|
||||
int transSetConnOption(uv_tcp_t* stream, int keepalive);
|
||||
int32_t transSetConnOption(uv_tcp_t* stream, int keepalive);
|
||||
|
||||
void transRefSrvHandle(void* handle);
|
||||
void transUnrefSrvHandle(void* handle);
|
||||
|
@ -315,21 +314,24 @@ void transUnrefSrvHandle(void* handle);
|
|||
void transRefCliHandle(void* handle);
|
||||
void transUnrefCliHandle(void* handle);
|
||||
|
||||
int transReleaseCliHandle(void* handle);
|
||||
int transReleaseSrvHandle(void* handle);
|
||||
int32_t transReleaseCliHandle(void* handle);
|
||||
int32_t transReleaseSrvHandle(void* handle);
|
||||
|
||||
int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pMsg, STransCtx* pCtx);
|
||||
int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pMsg, STransMsg* pRsp);
|
||||
int transSendRecvWithTimeout(void* shandle, SEpSet* pEpSet, STransMsg* pMsg, STransMsg* pRsp, int8_t* epUpdated,
|
||||
int32_t transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pMsg, STransCtx* pCtx);
|
||||
int32_t transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pMsg, STransMsg* pRsp);
|
||||
int32_t transSendRecvWithTimeout(void* shandle, SEpSet* pEpSet, STransMsg* pMsg, STransMsg* pRsp, int8_t* epUpdated,
|
||||
int32_t timeoutMs);
|
||||
int transSendResponse(const STransMsg* msg);
|
||||
int transRegisterMsg(const STransMsg* msg);
|
||||
int transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn);
|
||||
int32_t transSendRequestWithId(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, int64_t* transpointId);
|
||||
int32_t transFreeConnById(void* shandle, int64_t transpointId);
|
||||
|
||||
int32_t transSendResponse(const STransMsg* msg);
|
||||
int32_t transRegisterMsg(const STransMsg* msg);
|
||||
int32_t transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn);
|
||||
int32_t transSetIpWhiteList(void* shandle, void* arg, FilteFunc* func);
|
||||
|
||||
int transSockInfo2Str(struct sockaddr* sockname, char* dst);
|
||||
int32_t transSockInfo2Str(struct sockaddr* sockname, char* dst);
|
||||
|
||||
int64_t transAllocHandle();
|
||||
int32_t transAllocHandle(int64_t* refId);
|
||||
|
||||
void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle);
|
||||
void* transInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle);
|
||||
|
|
|
@ -58,6 +58,8 @@ typedef struct {
|
|||
int32_t failFastThreshold;
|
||||
int32_t failFastInterval;
|
||||
|
||||
int8_t notWaitAvaliableConn; // 1: no delay, 0: delay
|
||||
|
||||
void (*cfp)(void* parent, SRpcMsg*, SEpSet*);
|
||||
bool (*retry)(int32_t code, tmsg_t msgType);
|
||||
bool (*startTimer)(int32_t code, tmsg_t msgType);
|
||||
|
|
|
@ -102,6 +102,8 @@ void* rpcOpen(const SRpcInit* pInit) {
|
|||
if (pRpc->timeToGetConn == 0) {
|
||||
pRpc->timeToGetConn = 10 * 1000;
|
||||
}
|
||||
pRpc->notWaitAvaliableConn = pInit->notWaitAvaliableConn;
|
||||
|
||||
pRpc->tcphandle =
|
||||
(*taosInitHandle[pRpc->connType])(ip, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc);
|
||||
|
||||
|
@ -163,38 +165,48 @@ void* rpcReallocCont(void* ptr, int64_t contLen) {
|
|||
return st + TRANS_MSG_OVERHEAD;
|
||||
}
|
||||
|
||||
int rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid) {
|
||||
int32_t rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid) {
|
||||
return transSendRequest(shandle, pEpSet, pMsg, NULL);
|
||||
}
|
||||
int rpcSendRequestWithCtx(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid, SRpcCtx* pCtx) {
|
||||
return transSendRequest(shandle, pEpSet, pMsg, pCtx);
|
||||
}
|
||||
int rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pMsg, SRpcMsg* pRsp) {
|
||||
return transSendRecv(shandle, pEpSet, pMsg, pRsp);
|
||||
}
|
||||
int rpcSendRecvWithTimeout(void* shandle, SEpSet* pEpSet, SRpcMsg* pMsg, SRpcMsg* pRsp, int8_t* epUpdated,
|
||||
int32_t timeoutMs) {
|
||||
return transSendRecvWithTimeout(shandle, pEpSet, pMsg, pRsp, epUpdated, timeoutMs);
|
||||
int32_t rpcSendRequestWithCtx(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid, SRpcCtx* pCtx) {
|
||||
if (pCtx != NULL || pMsg->info.handle != 0 || pMsg->info.noResp != 0|| pRid == NULL) {
|
||||
return transSendRequest(shandle, pEpSet, pMsg, pCtx);
|
||||
} else {
|
||||
return transSendRequestWithId(shandle, pEpSet, pMsg, pRid);
|
||||
}
|
||||
}
|
||||
|
||||
int rpcSendResponse(const SRpcMsg* pMsg) { return transSendResponse(pMsg); }
|
||||
int32_t rpcSendRequestWithId(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, int64_t* transpointId) {
|
||||
return transSendRequestWithId(shandle, pEpSet, pReq, transpointId);
|
||||
}
|
||||
|
||||
int32_t rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pMsg, SRpcMsg* pRsp) {
|
||||
return transSendRecv(shandle, pEpSet, pMsg, pRsp);
|
||||
}
|
||||
int32_t rpcSendRecvWithTimeout(void* shandle, SEpSet* pEpSet, SRpcMsg* pMsg, SRpcMsg* pRsp, int8_t* epUpdated,
|
||||
int32_t timeoutMs) {
|
||||
return transSendRecvWithTimeout(shandle, pEpSet, pMsg, pRsp, epUpdated, timeoutMs);
|
||||
}
|
||||
int32_t rpcFreeConnById(void* shandle, int64_t connId) { return transFreeConnById(shandle, connId); }
|
||||
|
||||
int32_t rpcSendResponse(const SRpcMsg* pMsg) { return transSendResponse(pMsg); }
|
||||
|
||||
void rpcRefHandle(void* handle, int8_t type) { (*taosRefHandle[type])(handle); }
|
||||
|
||||
void rpcUnrefHandle(void* handle, int8_t type) { (*taosUnRefHandle[type])(handle); }
|
||||
|
||||
int rpcRegisterBrokenLinkArg(SRpcMsg* msg) { return transRegisterMsg(msg); }
|
||||
int rpcReleaseHandle(void* handle, int8_t type) { return (*transReleaseHandle[type])(handle); }
|
||||
int32_t rpcRegisterBrokenLinkArg(SRpcMsg* msg) { return transRegisterMsg(msg); }
|
||||
int32_t rpcReleaseHandle(void* handle, int8_t type) { return (*transReleaseHandle[type])(handle); }
|
||||
|
||||
// client only
|
||||
int rpcSetDefaultAddr(void* thandle, const char* ip, const char* fqdn) {
|
||||
int32_t rpcSetDefaultAddr(void* thandle, const char* ip, const char* fqdn) {
|
||||
// later
|
||||
return transSetDefaultAddr(thandle, ip, fqdn);
|
||||
}
|
||||
// server only
|
||||
int32_t rpcSetIpWhite(void* thandle, void* arg) { return transSetIpWhiteList(thandle, arg, NULL); }
|
||||
|
||||
void* rpcAllocHandle() { return (void*)transAllocHandle(); }
|
||||
int32_t rpcAllocHandle(int64_t* refId) { return transAllocHandle(refId); }
|
||||
|
||||
int32_t rpcUtilSIpRangeToStr(SIpV4Range* pRange, char* buf) { return transUtilSIpRangeToStr(pRange, buf); }
|
||||
int32_t rpcUtilSWhiteListToStr(SIpWhiteList* pWhiteList, char** ppBuf) {
|
||||
|
|
|
@ -213,8 +213,10 @@ static void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd);
|
|||
static void cliHandleQuit(SCliMsg* pMsg, SCliThrd* pThrd);
|
||||
static void cliHandleRelease(SCliMsg* pMsg, SCliThrd* pThrd);
|
||||
static void cliHandleUpdate(SCliMsg* pMsg, SCliThrd* pThrd);
|
||||
static void (*cliAsyncHandle[])(SCliMsg* pMsg, SCliThrd* pThrd) = {cliHandleReq, cliHandleQuit, cliHandleRelease, NULL,
|
||||
cliHandleUpdate};
|
||||
static void cliHandleFreeById(SCliMsg* pMsg, SCliThrd* pThrd);
|
||||
|
||||
static void (*cliAsyncHandle[])(SCliMsg* pMsg, SCliThrd* pThrd) = {cliHandleReq, cliHandleQuit, cliHandleRelease,
|
||||
NULL, cliHandleUpdate, cliHandleFreeById};
|
||||
/// static void (*cliAsyncHandle[])(SCliMsg* pMsg, SCliThrd* pThrd) = {cliHandleReq, cliHandleQuit, cliHandleRelease,
|
||||
/// NULL,cliHandleUpdate};
|
||||
|
||||
|
@ -660,7 +662,9 @@ static SCliConn* getConnFromPool(SCliThrd* pThrd, char* key, bool* exceed) {
|
|||
if (QUEUE_IS_EMPTY(&plist->conns)) {
|
||||
if (plist->list->numOfConn >= pTranInst->connLimitNum) {
|
||||
*exceed = true;
|
||||
return NULL;;
|
||||
}
|
||||
plist->list->numOfConn++;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
@ -704,7 +708,7 @@ static SCliConn* getConnFromPool2(SCliThrd* pThrd, char* key, SCliMsg** pMsg) {
|
|||
SMsgList* list = plist->list;
|
||||
if ((list)->numOfConn >= pTransInst->connLimitNum) {
|
||||
STraceId* trace = &(*pMsg)->msg.info.traceId;
|
||||
if (pTransInst->noDelayFp != NULL && pTransInst->noDelayFp((*pMsg)->msg.msgType)) {
|
||||
if (pTransInst->notWaitAvaliableConn || (pTransInst->noDelayFp != NULL && pTransInst->noDelayFp((*pMsg)->msg.msgType))) {
|
||||
tDebug("%s msg %s not to send, reason: %s", pTransInst->label, TMSG_INFO((*pMsg)->msg.msgType),
|
||||
tstrerror(TSDB_CODE_RPC_NETWORK_BUSY));
|
||||
doNotifyApp(*pMsg, pThrd, TSDB_CODE_RPC_NETWORK_BUSY);
|
||||
|
@ -899,10 +903,12 @@ static int32_t specifyConnRef(SCliConn* conn, bool update, int64_t handle) {
|
|||
exh->handle = conn;
|
||||
exh->pThrd = conn->hostThrd;
|
||||
taosWUnLockLatch(&exh->latch);
|
||||
|
||||
|
||||
conn->refId = exh->refId;
|
||||
taosWUnLockLatch(&exh->latch);
|
||||
|
||||
tDebug("conn %p specified by %"PRId64"", conn, handle);
|
||||
|
||||
(void)transReleaseExHandle(transGetRefMgt(), handle);
|
||||
return 0;
|
||||
}
|
||||
|
@ -1035,7 +1041,6 @@ static void cliDestroyConn(SCliConn* conn, bool clear) {
|
|||
list->size--;
|
||||
}
|
||||
}
|
||||
|
||||
conn->list = NULL;
|
||||
|
||||
(void)transReleaseExHandle(transGetRefMgt(), conn->refId);
|
||||
|
@ -1075,8 +1080,11 @@ static void cliDestroy(uv_handle_t* handle) {
|
|||
|
||||
(void)atomic_sub_fetch_32(&pThrd->connCount, 1);
|
||||
|
||||
if (conn->refId > 0) {
|
||||
(void)transReleaseExHandle(transGetRefMgt(), conn->refId);
|
||||
(void)transRemoveExHandle(transGetRefMgt(), conn->refId);
|
||||
|
||||
}
|
||||
taosMemoryFree(conn->dstAddr);
|
||||
taosMemoryFree(conn->stream);
|
||||
|
||||
|
@ -1589,6 +1597,40 @@ static void cliHandleUpdate(SCliMsg* pMsg, SCliThrd* pThrd) {
|
|||
pThrd->cvtAddr = pCtx->cvtAddr;
|
||||
destroyCmsg(pMsg);
|
||||
}
|
||||
static void cliHandleFreeById(SCliMsg* pMsg, SCliThrd* pThrd) {
|
||||
int32_t code = 0;
|
||||
int64_t refId = (int64_t)(pMsg->msg.info.handle);
|
||||
SExHandle* exh = transAcquireExHandle(transGetRefMgt(), refId);
|
||||
if (exh == NULL) {
|
||||
tDebug("id %" PRId64 " already released", refId);
|
||||
destroyCmsg(pMsg);
|
||||
return;
|
||||
}
|
||||
|
||||
taosRLockLatch(&exh->latch);
|
||||
SCliConn* conn = exh->handle;
|
||||
taosRUnLockLatch(&exh->latch);
|
||||
|
||||
if (conn == NULL || conn->refId != refId) {
|
||||
TAOS_CHECK_GOTO(TSDB_CODE_REF_INVALID_ID, NULL, _exception);
|
||||
}
|
||||
tDebug("do free conn %p by id %" PRId64 "", conn, refId);
|
||||
|
||||
int32_t size = transQueueSize(&conn->cliMsgs);
|
||||
if (size == 0) {
|
||||
// already recv, and notify upper layer
|
||||
TAOS_CHECK_GOTO(TSDB_CODE_REF_INVALID_ID, NULL, _exception);
|
||||
return;
|
||||
} else {
|
||||
while (T_REF_VAL_GET(conn) >= 1) transUnrefCliHandle(conn);
|
||||
}
|
||||
return;
|
||||
_exception:
|
||||
tDebug("already free conn %p by id %" PRId64"", conn, refId);
|
||||
|
||||
(void)transReleaseExHandle(transGetRefMgt(), refId);
|
||||
destroyCmsg(pMsg);
|
||||
}
|
||||
|
||||
SCliConn* cliGetConn(SCliMsg** pMsg, SCliThrd* pThrd, bool* ignore, char* addr) {
|
||||
STransConnCtx* pCtx = (*pMsg)->ctx;
|
||||
|
@ -2759,7 +2801,7 @@ SCliThrd* transGetWorkThrd(STrans* trans, int64_t handle) {
|
|||
SCliThrd* pThrd = transGetWorkThrdFromHandle(trans, handle);
|
||||
return pThrd;
|
||||
}
|
||||
int transReleaseCliHandle(void* handle) {
|
||||
int32_t transReleaseCliHandle(void* handle) {
|
||||
int32_t code = 0;
|
||||
SCliThrd* pThrd = transGetWorkThrdFromHandle(NULL, (int64_t)handle);
|
||||
if (pThrd == NULL) {
|
||||
|
@ -2823,25 +2865,25 @@ static int32_t transInitMsg(void* shandle, const SEpSet* pEpSet, STransMsg* pReq
|
|||
cliMsg->type = Normal;
|
||||
cliMsg->refId = (int64_t)shandle;
|
||||
QUEUE_INIT(&cliMsg->seqq);
|
||||
|
||||
*pCliMsg = cliMsg;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransCtx* ctx) {
|
||||
int32_t transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransCtx* ctx) {
|
||||
STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle);
|
||||
if (pTransInst == NULL) {
|
||||
transFreeMsg(pReq->pCont);
|
||||
return TSDB_CODE_RPC_BROKEN_LINK;
|
||||
return TSDB_CODE_RPC_MODULE_QUIT;
|
||||
}
|
||||
int32_t code = 0;
|
||||
int64_t handle = (int64_t)pReq->info.handle;
|
||||
SCliThrd* pThrd = transGetWorkThrd(pTransInst, handle);
|
||||
if (pThrd == NULL) {
|
||||
transFreeMsg(pReq->pCont);
|
||||
(void)transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
|
||||
return TSDB_CODE_RPC_BROKEN_LINK;
|
||||
TAOS_CHECK_GOTO(TSDB_CODE_RPC_BROKEN_LINK, NULL, _exception;);
|
||||
}
|
||||
|
||||
if (handle != 0) {
|
||||
SExHandle* exh = transAcquireExHandle(transGetRefMgt(), handle);
|
||||
if (exh != NULL) {
|
||||
|
@ -2849,26 +2891,27 @@ int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STran
|
|||
if (exh->handle == NULL && exh->inited != 0) {
|
||||
SCliMsg* pCliMsg = NULL;
|
||||
code = transInitMsg(shandle, pEpSet, pReq, ctx, &pCliMsg);
|
||||
ASSERT(code == 0);
|
||||
if (code != 0) {
|
||||
taosWUnLockLatch(&exh->latch);
|
||||
(void)transReleaseExHandle(transGetRefMgt(), handle);
|
||||
TAOS_CHECK_GOTO(code, NULL, _exception);
|
||||
}
|
||||
|
||||
QUEUE_PUSH(&exh->q, &pCliMsg->seqq);
|
||||
taosWUnLockLatch(&exh->latch);
|
||||
tDebug("msg refId: %" PRId64 "", handle);
|
||||
(void)transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
|
||||
return 0;
|
||||
} else {
|
||||
exh->inited = 1;
|
||||
taosWUnLockLatch(&exh->latch);
|
||||
(void)transReleaseExHandle(transGetRefMgt(), handle);
|
||||
}
|
||||
exh->inited = 1;
|
||||
taosWUnLockLatch(&exh->latch);
|
||||
(void)transReleaseExHandle(transGetRefMgt(), handle);
|
||||
}
|
||||
}
|
||||
|
||||
SCliMsg* pCliMsg = NULL;
|
||||
code = transInitMsg(shandle, pEpSet, pReq, ctx, &pCliMsg);
|
||||
if (code != 0) {
|
||||
(void)transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
|
||||
return code;
|
||||
}
|
||||
TAOS_CHECK_GOTO(transInitMsg(shandle, pEpSet, pReq, ctx, &pCliMsg), NULL, _exception);
|
||||
|
||||
STraceId* trace = &pReq->info.traceId;
|
||||
tGDebug("%s send request at thread:%08" PRId64 ", dst:%s:%d, app:%p", transLabel(pTransInst), pThrd->pid,
|
||||
|
@ -2880,13 +2923,63 @@ int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STran
|
|||
}
|
||||
(void)transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
|
||||
return 0;
|
||||
|
||||
_exception:
|
||||
transFreeMsg(pReq->pCont);
|
||||
(void)transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
|
||||
return code;
|
||||
}
|
||||
int32_t transSendRequestWithId(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, int64_t* transpointId) {
|
||||
if (transpointId == NULL) {
|
||||
ASSERT(0);
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
int32_t code = 0;
|
||||
|
||||
STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle);
|
||||
if (pTransInst == NULL) {
|
||||
TAOS_CHECK_GOTO(TSDB_CODE_RPC_MODULE_QUIT, NULL, _exception);
|
||||
}
|
||||
|
||||
TAOS_CHECK_GOTO(transAllocHandle(transpointId), NULL, _exception);
|
||||
|
||||
SCliThrd* pThrd = transGetWorkThrd(pTransInst, *transpointId);
|
||||
if (pThrd == NULL) {
|
||||
TAOS_CHECK_GOTO(TSDB_CODE_RPC_BROKEN_LINK, NULL, _exception);
|
||||
}
|
||||
|
||||
SExHandle* exh = transAcquireExHandle(transGetRefMgt(), *transpointId);
|
||||
if (exh == NULL) {
|
||||
TAOS_CHECK_GOTO(TSDB_CODE_RPC_MODULE_QUIT, NULL, _exception);
|
||||
}
|
||||
|
||||
pReq->info.handle = (void*)(*transpointId);
|
||||
|
||||
SCliMsg* pCliMsg = NULL;
|
||||
TAOS_CHECK_GOTO(transInitMsg(shandle, pEpSet, pReq, NULL, &pCliMsg), NULL, _exception);
|
||||
|
||||
STraceId* trace = &pReq->info.traceId;
|
||||
tGDebug("%s send request at thread:%08" PRId64 ", dst:%s:%d, app:%p", transLabel(pTransInst), pThrd->pid,
|
||||
EPSET_GET_INUSE_IP(pEpSet), EPSET_GET_INUSE_PORT(pEpSet), pReq->info.ahandle);
|
||||
if ((code = transAsyncSend(pThrd->asyncPool, &(pCliMsg->q))) != 0) {
|
||||
destroyCmsg(pCliMsg);
|
||||
(void)transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
|
||||
return (code == TSDB_CODE_RPC_ASYNC_MODULE_QUIT ? TSDB_CODE_RPC_MODULE_QUIT : code);
|
||||
}
|
||||
(void)transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
|
||||
return 0;
|
||||
|
||||
_exception:
|
||||
transFreeMsg(pReq->pCont);
|
||||
(void)transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
|
||||
return code;
|
||||
}
|
||||
|
||||
int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMsg* pRsp) {
|
||||
int32_t transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMsg* pRsp) {
|
||||
STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle);
|
||||
if (pTransInst == NULL) {
|
||||
transFreeMsg(pReq->pCont);
|
||||
return TSDB_CODE_RPC_BROKEN_LINK;
|
||||
return TSDB_CODE_RPC_MODULE_QUIT;
|
||||
}
|
||||
int32_t code = 0;
|
||||
|
||||
|
@ -2908,8 +3001,7 @@ int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMs
|
|||
code = tsem_init(sem, 0, 0);
|
||||
if (code != 0) {
|
||||
taosMemoryFree(sem);
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
TAOS_CHECK_GOTO(code, NULL, _RETURN1);
|
||||
TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), NULL, _RETURN1);
|
||||
}
|
||||
|
||||
TRACE_SET_MSGID(&pReq->info.traceId, tGenIdPI64());
|
||||
|
@ -3003,13 +3095,13 @@ _EXIT:
|
|||
taosMemoryFree(pSyncMsg);
|
||||
return code;
|
||||
}
|
||||
int transSendRecvWithTimeout(void* shandle, SEpSet* pEpSet, STransMsg* pReq, STransMsg* pRsp, int8_t* epUpdated,
|
||||
int32_t timeoutMs) {
|
||||
int32_t transSendRecvWithTimeout(void* shandle, SEpSet* pEpSet, STransMsg* pReq, STransMsg* pRsp, int8_t* epUpdated,
|
||||
int32_t timeoutMs) {
|
||||
int32_t code = 0;
|
||||
STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle);
|
||||
if (pTransInst == NULL) {
|
||||
transFreeMsg(pReq->pCont);
|
||||
return TSDB_CODE_RPC_BROKEN_LINK;
|
||||
return TSDB_CODE_RPC_MODULE_QUIT;
|
||||
}
|
||||
|
||||
STransMsg* pTransMsg = taosMemoryCalloc(1, sizeof(STransMsg));
|
||||
|
@ -3096,22 +3188,21 @@ _RETURN2:
|
|||
/*
|
||||
*
|
||||
**/
|
||||
int transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn) {
|
||||
int32_t transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn) {
|
||||
if (ip == NULL || fqdn == NULL) return TSDB_CODE_INVALID_PARA;
|
||||
|
||||
STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle);
|
||||
if (pTransInst == NULL) {
|
||||
return TSDB_CODE_RPC_BROKEN_LINK;
|
||||
return TSDB_CODE_RPC_MODULE_QUIT;
|
||||
}
|
||||
|
||||
SCvtAddr cvtAddr = {0};
|
||||
if (ip != NULL && fqdn != NULL) {
|
||||
tstrncpy(cvtAddr.ip, ip, sizeof(cvtAddr.ip));
|
||||
tstrncpy(cvtAddr.fqdn, fqdn, sizeof(cvtAddr.fqdn));
|
||||
cvtAddr.cvt = true;
|
||||
}
|
||||
tstrncpy(cvtAddr.ip, ip, sizeof(cvtAddr.ip));
|
||||
tstrncpy(cvtAddr.fqdn, fqdn, sizeof(cvtAddr.fqdn));
|
||||
cvtAddr.cvt = true;
|
||||
|
||||
int32_t code = 0;
|
||||
int8_t i = 0;
|
||||
for (i = 0; i < pTransInst->numOfThreads; i++) {
|
||||
for (int8_t i = 0; i < pTransInst->numOfThreads; i++) {
|
||||
STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx));
|
||||
if (pCtx == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -3136,7 +3227,9 @@ int transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn) {
|
|||
|
||||
if ((code = transAsyncSend(thrd->asyncPool, &(cliMsg->q))) != 0) {
|
||||
destroyCmsg(cliMsg);
|
||||
code = (code == TSDB_CODE_RPC_ASYNC_MODULE_QUIT ? TSDB_CODE_RPC_MODULE_QUIT : code);
|
||||
if (code == TSDB_CODE_RPC_ASYNC_MODULE_QUIT) {
|
||||
code = TSDB_CODE_RPC_MODULE_QUIT;
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -3145,7 +3238,7 @@ int transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn) {
|
|||
return code;
|
||||
}
|
||||
|
||||
int64_t transAllocHandle() {
|
||||
int32_t transAllocHandle(int64_t* refId) {
|
||||
SExHandle* exh = taosMemoryCalloc(1, sizeof(SExHandle));
|
||||
if (exh == NULL) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -3166,5 +3259,43 @@ int64_t transAllocHandle() {
|
|||
QUEUE_INIT(&exh->q);
|
||||
taosInitRWLatch(&exh->latch);
|
||||
tDebug("pre alloc refId %" PRId64 "", exh->refId);
|
||||
return exh->refId;
|
||||
*refId = exh->refId;
|
||||
return 0;
|
||||
}
|
||||
int32_t transFreeConnById(void* shandle, int64_t transpointId) {
|
||||
int32_t code = 0;
|
||||
STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle);
|
||||
if (pTransInst == NULL) {
|
||||
return TSDB_CODE_RPC_MODULE_QUIT;
|
||||
}
|
||||
if (transpointId == 0) {
|
||||
tDebug("not free by refId:%"PRId64"", transpointId);
|
||||
TAOS_CHECK_GOTO(0, NULL, _exception);
|
||||
}
|
||||
|
||||
SCliThrd* pThrd = transGetWorkThrdFromHandle(pTransInst, transpointId);
|
||||
if (pThrd == NULL) {
|
||||
TAOS_CHECK_GOTO(TSDB_CODE_REF_INVALID_ID, NULL, _exception);
|
||||
}
|
||||
|
||||
SCliMsg* pCli = taosMemoryCalloc(1, sizeof(SCliMsg));
|
||||
if (pCli == NULL) {
|
||||
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _exception);
|
||||
}
|
||||
pCli->type = FreeById;
|
||||
|
||||
tDebug("release conn id %" PRId64 "", transpointId);
|
||||
|
||||
STransMsg msg = {.info.handle = (void*)transpointId};
|
||||
pCli->msg = msg;
|
||||
|
||||
code = transAsyncSend(pThrd->asyncPool, &pCli->q);
|
||||
if (code != 0) {
|
||||
taosMemoryFree(pCli);
|
||||
TAOS_CHECK_GOTO(code, NULL, _exception);
|
||||
}
|
||||
|
||||
_exception:
|
||||
transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -234,7 +234,7 @@ bool transReadComplete(SConnBuffer* connBuf) {
|
|||
return (p->left == 0 || p->invalid) ? true : false;
|
||||
}
|
||||
|
||||
int transSetConnOption(uv_tcp_t* stream, int keepalive) {
|
||||
int32_t transSetConnOption(uv_tcp_t* stream, int keepalive) {
|
||||
#if defined(WINDOWS) || defined(DARWIN)
|
||||
#else
|
||||
return uv_tcp_keepalive(stream, 1, keepalive);
|
||||
|
@ -745,8 +745,7 @@ int32_t transRemoveExHandle(int32_t refMgt, int64_t refId) {
|
|||
return taosRemoveRef(refMgt, refId);
|
||||
}
|
||||
|
||||
void* transAcquireExHandle(int32_t refMgt, int64_t refId) {
|
||||
// acquire extern handle
|
||||
void* transAcquireExHandle(int32_t refMgt, int64_t refId) { // acquire extern handle
|
||||
return (void*)taosAcquireRef(refMgt, refId);
|
||||
}
|
||||
|
||||
|
|
|
@ -1707,7 +1707,7 @@ void transUnrefSrvHandle(void* handle) {
|
|||
}
|
||||
}
|
||||
|
||||
int transReleaseSrvHandle(void* handle) {
|
||||
int32_t transReleaseSrvHandle(void* handle) {
|
||||
int32_t code = 0;
|
||||
SRpcHandleInfo* info = handle;
|
||||
SExHandle* exh = info->handle;
|
||||
|
@ -1747,7 +1747,7 @@ _return2:
|
|||
return code;
|
||||
}
|
||||
|
||||
int transSendResponse(const STransMsg* msg) {
|
||||
int32_t transSendResponse(const STransMsg* msg) {
|
||||
int32_t code = 0;
|
||||
|
||||
if (msg->info.noResp) {
|
||||
|
@ -1800,7 +1800,7 @@ _return2:
|
|||
rpcFreeCont(msg->pCont);
|
||||
return code;
|
||||
}
|
||||
int transRegisterMsg(const STransMsg* msg) {
|
||||
int32_t transRegisterMsg(const STransMsg* msg) {
|
||||
int32_t code = 0;
|
||||
|
||||
SExHandle* exh = msg->info.handle;
|
||||
|
@ -1891,4 +1891,4 @@ int32_t transSetIpWhiteList(void* thandle, void* arg, FilteFunc* func) {
|
|||
return code;
|
||||
}
|
||||
|
||||
int transGetConnInfo(void* thandle, STransHandleInfo* pConnInfo) { return -1; }
|
||||
int32_t transGetConnInfo(void* thandle, STransHandleInfo* pConnInfo) { return -1; }
|
||||
|
|
Loading…
Reference in New Issue